import imagehash, configparser, os, logging, re from PIL import Image import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import psutil, time, threading config = configparser.ConfigParser() config.read('config.cfg') myretry = int(config['download']['retry']) logging.basicConfig(level=int(config['logging']['level']), format='%(asctime)s - %(levelname)s - %(message)s') # logging.debug("这是调试信息") # logging.info("这是信息级别的日志") # logging.warning("这是警告级别的日志") # logging.error("这是错误级别的日志") # logging.critical("这是严重错误级别的日志") class Hash: def __init__(self, conn): self.conn = conn def gethash(self, _qqnumber): """ 根据传入的qq号,获取数据库里的hash值 :param: _qqnumber: QQ号 :return: 字节类型Hash值 """ _cursor = self.conn.cursor() _cursor.execute('SELECT hash FROM image_hashes WHERE id = (%s)', (_qqnumber,)) byte_data_from_db = _cursor.fetchone() if byte_data_from_db is None: logging.info(f'{_qqnumber}不在数据库内') else: logging.debug(f'从数据库获取的Hash号: {byte_data_from_db[0]}') _cursor.close() return byte_data_from_db[0] def getqq(self, _hash): """ 根据传入的{字节类型Hash值},获取数据库里的QQ号 :param: _hash: 字节类型Hash值 :return: QQ号 """ _cursor = self.conn.cursor() _cursor.execute('SELECT id FROM image_hashes WHERE hash = (%s)', (_hash,)) byte_data_from_db = _cursor.fetchone() if byte_data_from_db is None: logging.info('Hash值不存在') return 'error' else: logging.debug(f'从数据库获取的QQ号: {byte_data_from_db[0]}') _cursor.close() return byte_data_from_db[0] def tomysql(self, _qqnumber): """ 根据传入的qq号,下载并解析qq头像中的哈希值,最后上传到Mysql :param: _qqnumber: QQ号 """ _img = Image.open(str(f'./img/{_qqnumber}.jpg')) if _img.mode == 'P' and 'transparency' in _img.info: _img = _img.convert('RGBA') _byte_data = bytes.fromhex(str(imagehash.average_hash(_img))) _cursor = self.conn.cursor() _cursor.execute(""" INSERT INTO `{}` (`{}`, `{}`) VALUES (%s, %s) ON DUPLICATE KEY UPDATE `{}` = VALUES(`{}`) """.format( config['mysql']['table_name'], config['mysql']['id_column_name'], config['mysql']['hash_column_name'], config['mysql']['hash_column_name'], config['mysql']['hash_column_name'] ), (_qqnumber, _byte_data)) logging.debug(f'[{_qqnumber}]:{_byte_data}') self.conn.commit() _cursor.close() def compare(hash1, hash2): """ 根据{字节类型Hash值},比较两个图片的相似度 :param hash1: {字节类型Hash值} :param hash2: {字节类型Hash值} :return: 相似度百分比 """ hash1 = hash1.hex() hash2 = hash2.hex() # 将哈希值转换为二进制字符串 bin1 = bin(int(hash1, 16))[2:].zfill(len(hash1) * 4) # 16进制转二进制 bin2 = bin(int(hash2, 16))[2:].zfill(len(hash2) * 4) # 计算汉明距离 hamming_distance = sum(c1 != c2 for c1, c2 in zip(bin1, bin2)) # 计算相似度百分比 return (1 - hamming_distance / 64) * 100 def download_image(_qqnumber): """ 根据传入的 _qqnumber 下载 QQ 图片并保存到 img 文件夹中。 :param _qqnumber: QQ号 :return: 成功:1 失败:0 """ logging.debug(f'下载的QQ号是: {_qqnumber}') if not os.path.exists('img'): os.makedirs('img') # 构建图片 URL 和文件名 image_url = f'https://q1.qlogo.cn/g?b=qq&nk={_qqnumber}&s=0' image_name = os.path.join('img', f'{_qqnumber}.jpg') # 创建一个会进行重试的适配器 session = requests.Session() retry = Retry( total=myretry, # 最多重试255次 backoff_factor=1, # 每次重试的等待时间增加 status_forcelist=[500, 502, 503, 504], # 针对这些HTTP状态码才重试 ) adapter = HTTPAdapter(max_retries=retry) session.mount('https://', adapter) session.mount('http://', adapter) # 使用session发送请求 response = session.get(image_url, timeout=10) # 设置超时时间为10秒 if response.status_code == 200: # 保存图片 with open(image_name, 'wb') as f: f.write(response.content) logging.debug(f'图片已保存为 {image_name}') return 1 else: logging.info(f'下载图片失败,错误码:{response.status_code}') return 0 def remove_image(_qqnumber): """ 根据传入的 _qqnumber 删除对于图片 :param _qqnumber: QQ号 :return: 成功:1 失败:0 """ if os.path.exists(f'./img/{_qqnumber}.jpg'): os.remove(f'./img/{_qqnumber}.jpg') return 1 else: logging.debug(f'没有找到这个QQ号的图片:{_qqnumber}') def clean_image(): # 定义正则表达式匹配10位数字的文件名 pattern = r'^\d{10}\.jpg$' # 获取./img目录下的所有文件 img_dir = './img' # 遍历目录中的所有文件 for filename in os.listdir(img_dir): # 检查文件是否符合10位数字.jpg的模式 if re.match(pattern, filename): file_path = os.path.join(img_dir, filename) try: os.remove(file_path) logging.debug(f"已删除文件: {file_path}") except Exception as e: logging.debug(f"删除文件 {file_path} 时发生错误: {e}") def enforce_memory_limit(limit_mb): """ 限制当前进程的内存使用。 :param limit_mb: 内存限制,单位是 MB。 """ process = psutil.Process(os.getpid()) # 获取当前进程 limit_bytes = limit_mb * 1024 * 1024 # 转换为字节 while True: memory_usage = process.memory_info().rss # 当前内存使用量(常驻内存) if memory_usage > limit_bytes: print(f"内存使用超过限制!当前使用:{memory_usage / (1024 * 1024):.2f} MB,限制:{limit_mb} MB") os._exit(1) # 强制退出进程 time.sleep(1) # 每秒检查一次 def start_memory_monitor(limit_mb): """ 启动内存限制监控线程。 :param limit_mb: 内存限制,单位是 MB。 """ monitor_thread = threading.Thread(target=enforce_memory_limit, args=(limit_mb,), daemon=True) monitor_thread.start()