Files
GetQQInfo/Tools.py
2025-01-28 19:03:58 +08:00

199 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import imagehash, configparser, os, logging, re
from PIL import Image
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import psutil, time, threading
config = configparser.ConfigParser()
config.read('config.cfg')
myretry = int(config['download']['retry'])
logging.basicConfig(level=int(config['logging']['level']), format='%(asctime)s - %(levelname)s - %(message)s')
# logging.debug("这是调试信息")
# logging.info("这是信息级别的日志")
# logging.warning("这是警告级别的日志")
# logging.error("这是错误级别的日志")
# logging.critical("这是严重错误级别的日志")
class Hash:
def __init__(self, conn):
self.conn = conn
def gethash(self, _qqnumber):
"""
根据传入的qq号获取数据库里的hash值
:param: _qqnumber: QQ号
:return: 字节类型Hash值
"""
_cursor = self.conn.cursor()
_cursor.execute('SELECT hash FROM image_hashes WHERE id = (%s)', (_qqnumber,))
byte_data_from_db = _cursor.fetchone()
if byte_data_from_db is None:
logging.info(f'{_qqnumber}不在数据库内')
else:
logging.debug(f'从数据库获取的Hash号: {byte_data_from_db[0]}')
_cursor.close()
return byte_data_from_db[0]
def getqq(self, _hash):
"""
根据传入的{字节类型Hash值}获取数据库里的QQ号
:param: _hash: 字节类型Hash值
:return: QQ号
"""
_cursor = self.conn.cursor()
_cursor.execute('SELECT id FROM image_hashes WHERE hash = (%s)', (_hash,))
byte_data_from_db = _cursor.fetchone()
if byte_data_from_db is None:
logging.info('Hash值不存在')
return 'error'
else:
logging.debug(f'从数据库获取的QQ号: {byte_data_from_db[0]}')
_cursor.close()
return byte_data_from_db[0]
def tomysql(self, _qqnumber):
"""
根据传入的qq号下载并解析qq头像中的哈希值最后上传到Mysql
:param: _qqnumber: QQ号
"""
_img = Image.open(str(f'./img/{_qqnumber}.jpg'))
if _img.mode == 'P' and 'transparency' in _img.info:
_img = _img.convert('RGBA')
_byte_data = bytes.fromhex(str(imagehash.average_hash(_img)))
_cursor = self.conn.cursor()
_cursor.execute("""
INSERT INTO `{}` (`{}`, `{}`)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE `{}` = VALUES(`{}`)
""".format(
config['mysql']['table_name'],
config['mysql']['id_column_name'],
config['mysql']['hash_column_name'],
config['mysql']['hash_column_name'],
config['mysql']['hash_column_name']
), (_qqnumber, _byte_data))
logging.debug(f'[{_qqnumber}]:{_byte_data}')
self.conn.commit()
_cursor.close()
def compare(hash1, hash2):
"""
根据{字节类型Hash值},比较两个图片的相似度
:param hash1: {字节类型Hash值}
:param hash2: {字节类型Hash值}
:return: 相似度百分比
"""
hash1 = hash1.hex()
hash2 = hash2.hex()
# 将哈希值转换为二进制字符串
bin1 = bin(int(hash1, 16))[2:].zfill(len(hash1) * 4) # 16进制转二进制
bin2 = bin(int(hash2, 16))[2:].zfill(len(hash2) * 4)
# 计算汉明距离
hamming_distance = sum(c1 != c2 for c1, c2 in zip(bin1, bin2))
# 计算相似度百分比
return (1 - hamming_distance / 64) * 100
def download_image(_qqnumber):
"""
根据传入的 _qqnumber 下载 QQ 图片并保存到 img 文件夹中。
:param _qqnumber: QQ号
:return: 成功1 失败0
"""
logging.debug(f'下载的QQ号是: {_qqnumber}')
if not os.path.exists('img'):
os.makedirs('img')
# 构建图片 URL 和文件名
image_url = f'https://q1.qlogo.cn/g?b=qq&nk={_qqnumber}&s=0'
image_name = os.path.join('img', f'{_qqnumber}.jpg')
# 创建一个会进行重试的适配器
session = requests.Session()
retry = Retry(
total=myretry, # 最多重试255次
backoff_factor=1, # 每次重试的等待时间增加
status_forcelist=[500, 502, 503, 504], # 针对这些HTTP状态码才重试
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
session.mount('http://', adapter)
# 使用session发送请求
response = session.get(image_url, timeout=10) # 设置超时时间为10秒
if response.status_code == 200:
# 保存图片
with open(image_name, 'wb') as f:
f.write(response.content)
logging.debug(f'图片已保存为 {image_name}')
return 1
else:
logging.info(f'下载图片失败,错误码:{response.status_code}')
return 0
def remove_image(_qqnumber):
"""
根据传入的 _qqnumber 删除对于图片
:param _qqnumber: QQ号
:return: 成功1 失败0
"""
if os.path.exists(f'./img/{_qqnumber}.jpg'):
os.remove(f'./img/{_qqnumber}.jpg')
return 1
else:
logging.debug(f'没有找到这个QQ号的图片:{_qqnumber}')
def clean_image():
# 定义正则表达式匹配10位数字的文件名
pattern = r'^\d{10}\.jpg$'
# 获取./img目录下的所有文件
img_dir = './img'
# 遍历目录中的所有文件
for filename in os.listdir(img_dir):
# 检查文件是否符合10位数字.jpg的模式
if re.match(pattern, filename):
file_path = os.path.join(img_dir, filename)
try:
os.remove(file_path)
logging.debug(f"已删除文件: {file_path}")
except Exception as e:
logging.debug(f"删除文件 {file_path} 时发生错误: {e}")
def enforce_memory_limit(limit_mb):
"""
限制当前进程的内存使用。
:param limit_mb: 内存限制,单位是 MB。
"""
process = psutil.Process(os.getpid()) # 获取当前进程
limit_bytes = limit_mb * 1024 * 1024 # 转换为字节
while True:
memory_usage = process.memory_info().rss # 当前内存使用量(常驻内存)
if memory_usage > limit_bytes:
print(f"内存使用超过限制!当前使用:{memory_usage / (1024 * 1024):.2f} MB限制{limit_mb} MB")
os._exit(1) # 强制退出进程
time.sleep(1) # 每秒检查一次
def start_memory_monitor(limit_mb):
"""
启动内存限制监控线程。
:param limit_mb: 内存限制,单位是 MB。
"""
monitor_thread = threading.Thread(target=enforce_memory_limit, args=(limit_mb,), daemon=True)
monitor_thread.start()