from PIL import Image import imagehash, configparser, requests, os, logging, re config = configparser.ConfigParser() config.read('config.cfg') logging.basicConfig(level=int(config['logging']['level']), format='%(asctime)s - %(levelname)s - %(message)s') # logging.debug("这是调试信息") # logging.info("这是信息级别的日志") # logging.warning("这是警告级别的日志") # logging.error("这是错误级别的日志") # logging.critical("这是严重错误级别的日志") class Hash: def __init__(self, conn): self.conn = conn def gethash(self, _qqnumber): """ 根据传入的qq号,获取数据库里的hash值 :param: _qqnumber: QQ号 :return: 字节类型Hash值 """ _cursor = self.conn.cursor() _cursor.execute('SELECT hash FROM image_hashes WHERE id = (%s)', (_qqnumber,)) byte_data_from_db = _cursor.fetchone() if byte_data_from_db is None: logging.info(f'{_qqnumber}不在数据库内') else: logging.debug(f'从数据库获取的Hash号: {byte_data_from_db[0]}') _cursor.close() return byte_data_from_db[0] def getqq(self, _hash): """ 根据传入的{字节类型Hash值},获取数据库里的QQ号 :param: _hash: 字节类型Hash值 :return: QQ号 """ _cursor = self.conn.cursor() _cursor.execute('SELECT id FROM image_hashes WHERE hash = (%s)', (_hash,)) byte_data_from_db = _cursor.fetchone() if byte_data_from_db is None: logging.info('Hash值不存在') return 'error' else: logging.debug(f'从数据库获取的QQ号: {byte_data_from_db[0]}') _cursor.close() return byte_data_from_db[0] def tomysql(self, _qqnumber): """ 根据传入的qq号,下载并解析qq头像中的哈希值,最后上传到Mysql :param: _qqnumber: QQ号 """ _img = Image.open(str(f'./img/{_qqnumber}.jpg')) if _img.mode == 'P' and 'transparency' in _img.info: _img = _img.convert('RGBA') _byte_data = bytes.fromhex(str(imagehash.average_hash(_img))) _cursor = self.conn.cursor() _cursor.execute(""" INSERT INTO `{}` (`{}`, `{}`) VALUES (%s, %s) ON DUPLICATE KEY UPDATE `{}` = VALUES(`{}`) """.format( config['mysql']['table_name'], config['mysql']['id_column_name'], config['mysql']['hash_column_name'], config['mysql']['hash_column_name'], config['mysql']['hash_column_name'] ), (_qqnumber, _byte_data)) logging.debug(f'[{_qqnumber}]:{_byte_data}') self.conn.commit() _cursor.close() def compare(hash1, hash2): """ 根据{字节类型Hash值},比较两个图片的相似度 :param hash1: {字节类型Hash值} :param hash2: {字节类型Hash值} :return: 相似度百分比 """ hash1 = hash1.hex() hash2 = hash2.hex() # 将哈希值转换为二进制字符串 bin1 = bin(int(hash1, 16))[2:].zfill(len(hash1) * 4) # 16进制转二进制 bin2 = bin(int(hash2, 16))[2:].zfill(len(hash2) * 4) # 计算汉明距离 hamming_distance = sum(c1 != c2 for c1, c2 in zip(bin1, bin2)) # 计算相似度百分比 return (1 - hamming_distance / 64) * 100 def download_image(_qqnumber): """ 根据传入的 _qqnumber 下载 QQ 图片并保存到 img 文件夹中。 :param _qqnumber: QQ号 :return: 成功:1 失败:0 """ if not os.path.exists('img'): os.makedirs('img') # 构建图片 URL 和文件名 image_url = f'https://q1.qlogo.cn/g?b=qq&nk={_qqnumber}&s=0' image_name = os.path.join('img', f'{_qqnumber}.jpg') # 发送请求获取图片 response = requests.get(image_url) if response.status_code == 200: # 保存图片 with open(image_name, 'wb') as f: f.write(response.content) logging.debug(f'图片已保存为 {image_name}') return 1 else: logging.info(f'下载图片失败,错误码:{response.status_code}') return 0 def remove_image(_qqnumber): """ 根据传入的 _qqnumber 删除对于图片 :param _qqnumber: QQ号 :return: 成功:1 失败:0 """ if os.path.exists(f'./img/{_qqnumber}.jpg'): os.remove(f'./img/{_qqnumber}.jpg') return 1 else: logging.info(f'没有找到这个QQ号的图片:{_qqnumber}') def clean_image(): # 定义正则表达式匹配10位数字的文件名 pattern = r'^\d{10}\.jpg$' # 获取./img目录下的所有文件 img_dir = './img' # 遍历目录中的所有文件 for filename in os.listdir(img_dir): # 检查文件是否符合10位数字.jpg的模式 if re.match(pattern, filename): file_path = os.path.join(img_dir, filename) try: os.remove(file_path) logging.debug(f"已删除文件: {file_path}") except Exception as e: logging.debug(f"删除文件 {file_path} 时发生错误: {e}")