152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
from PIL import Image
|
||
import imagehash, configparser, requests, os, logging, re
|
||
|
||
config = configparser.ConfigParser()
|
||
config.read('config.cfg')
|
||
logging.basicConfig(level=int(config['logging']['level']), format='%(asctime)s - %(levelname)s - %(message)s')
|
||
# logging.debug("这是调试信息")
|
||
# logging.info("这是信息级别的日志")
|
||
# logging.warning("这是警告级别的日志")
|
||
# logging.error("这是错误级别的日志")
|
||
# logging.critical("这是严重错误级别的日志")
|
||
|
||
class Hash:
|
||
def __init__(self, conn):
|
||
self.conn = conn
|
||
|
||
def gethash(self, _qqnumber):
|
||
"""
|
||
根据传入的qq号,获取数据库里的hash值
|
||
|
||
:param: _qqnumber: QQ号
|
||
:return: 字节类型Hash值
|
||
"""
|
||
_cursor = self.conn.cursor()
|
||
_cursor.execute('SELECT hash FROM image_hashes WHERE id = (%s)', (_qqnumber,))
|
||
byte_data_from_db = _cursor.fetchone()
|
||
if byte_data_from_db is None:
|
||
logging.info(f'{_qqnumber}不在数据库内')
|
||
else:
|
||
logging.debug(f'从数据库获取的Hash号: {byte_data_from_db[0]}')
|
||
_cursor.close()
|
||
return byte_data_from_db[0]
|
||
|
||
def getqq(self, _hash):
|
||
"""
|
||
根据传入的{字节类型Hash值},获取数据库里的QQ号
|
||
|
||
:param: _hash: 字节类型Hash值
|
||
:return: QQ号
|
||
"""
|
||
_cursor = self.conn.cursor()
|
||
_cursor.execute('SELECT id FROM image_hashes WHERE hash = (%s)', (_hash,))
|
||
byte_data_from_db = _cursor.fetchone()
|
||
if byte_data_from_db is None:
|
||
logging.info('Hash值不存在')
|
||
return 'error'
|
||
else:
|
||
logging.debug(f'从数据库获取的QQ号: {byte_data_from_db[0]}')
|
||
_cursor.close()
|
||
return byte_data_from_db[0]
|
||
|
||
def tomysql(self, _qqnumber):
|
||
"""
|
||
根据传入的qq号,下载并解析qq头像中的哈希值,最后上传到Mysql
|
||
|
||
:param: _qqnumber: QQ号
|
||
"""
|
||
_img = Image.open(str(f'./img/{_qqnumber}.jpg'))
|
||
if _img.mode == 'P' and 'transparency' in _img.info:
|
||
_img = _img.convert('RGBA')
|
||
_byte_data = bytes.fromhex(str(imagehash.average_hash(_img)))
|
||
_cursor = self.conn.cursor()
|
||
_cursor.execute("""
|
||
INSERT INTO image_hashes (id, hash)
|
||
VALUES (%s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
hash = VALUES(hash)
|
||
""", (_qqnumber, _byte_data))
|
||
logging.debug(f'[{_qqnumber}]:{_byte_data}')
|
||
self.conn.commit()
|
||
_cursor.close()
|
||
|
||
|
||
def compare(hash1, hash2):
|
||
"""
|
||
根据{字节类型Hash值},比较两个图片的相似度
|
||
|
||
:param hash1: {字节类型Hash值}
|
||
:param hash2: {字节类型Hash值}
|
||
:return: 相似度百分比
|
||
"""
|
||
hash1 = hash1.hex()
|
||
hash2 = hash2.hex()
|
||
|
||
# 将哈希值转换为二进制字符串
|
||
bin1 = bin(int(hash1, 16))[2:].zfill(len(hash1) * 4) # 16进制转二进制
|
||
bin2 = bin(int(hash2, 16))[2:].zfill(len(hash2) * 4)
|
||
|
||
# 计算汉明距离
|
||
hamming_distance = sum(c1 != c2 for c1, c2 in zip(bin1, bin2))
|
||
|
||
# 计算相似度百分比
|
||
return (1 - hamming_distance / 64) * 100
|
||
|
||
|
||
def download_image(_qqnumber):
|
||
"""
|
||
根据传入的 _qqnumber 下载 QQ 图片并保存到 img 文件夹中。
|
||
|
||
:param _qqnumber: QQ号
|
||
:return: 成功:1 失败:0
|
||
"""
|
||
if not os.path.exists('img'):
|
||
os.makedirs('img')
|
||
|
||
# 构建图片 URL 和文件名
|
||
image_url = f'https://q1.qlogo.cn/g?b=qq&nk={_qqnumber}&s=0'
|
||
image_name = os.path.join('img', f'{_qqnumber}.jpg')
|
||
|
||
# 发送请求获取图片
|
||
response = requests.get(image_url)
|
||
|
||
if response.status_code == 200:
|
||
# 保存图片
|
||
with open(image_name, 'wb') as f:
|
||
f.write(response.content)
|
||
logging.debug(f'图片已保存为 {image_name}')
|
||
return 1
|
||
else:
|
||
logging.info(f'下载图片失败,错误码:{response.status_code}')
|
||
return 0
|
||
|
||
|
||
def remove_image(_qqnumber):
|
||
"""
|
||
根据传入的 _qqnumber 删除对于图片
|
||
|
||
:param _qqnumber: QQ号
|
||
:return: 成功:1 失败:0
|
||
"""
|
||
if os.path.exists(f'./img/{_qqnumber}.jpg'):
|
||
os.remove(f'./img/{_qqnumber}.jpg')
|
||
return 1
|
||
else:
|
||
logging.info(f'没有找到这个QQ号的图片:{_qqnumber}')
|
||
|
||
|
||
def clean_image():
|
||
# 定义正则表达式匹配10位数字的文件名
|
||
pattern = r'^\d{10}\.jpg$'
|
||
# 获取./img目录下的所有文件
|
||
img_dir = './img'
|
||
# 遍历目录中的所有文件
|
||
for filename in os.listdir(img_dir):
|
||
# 检查文件是否符合10位数字.jpg的模式
|
||
if re.match(pattern, filename):
|
||
file_path = os.path.join(img_dir, filename)
|
||
try:
|
||
os.remove(file_path)
|
||
logging.debug(f"已删除文件: {file_path}")
|
||
except Exception as e:
|
||
logging.debug(f"删除文件 {file_path} 时发生错误: {e}") |