Files
GetQQInfo/Thread.py
2024-12-23 20:26:04 +08:00

113 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pymysql, threading, time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from Tools import config, Hash, Image, imagehash, os, logging, download_image, remove_image, clean_image, compare
class MySQLConnectionPool:
def __init__(self):
self.pool_size = int(config['mysql']['pool_size'])
self.pool = Queue(maxsize=self.pool_size)
self.host = str(config['mysql']['host'])
self.user = str(config['mysql']['user'])
self.password = str(config['mysql']['password'])
self.database = str(config['mysql']['database'])
# 初始化连接池
for _ in range(self.pool_size):
conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
cursorclass=pymysql.cursors.DictCursor
)
self.pool.put(conn)
def get_connection(self, max_retries=65535, wait_time=3):
"""获取一个数据库连接,如果连接池为空,继续尝试直到能够获取连接"""
retries = 0
while retries < max_retries:
if not self.pool.empty():
return self.pool.get()
else:
retries += 1
print(f"连接池中没有可用连接,正在尝试第 {retries} 次连接...")
time.sleep(wait_time) # 等待指定时间再尝试
raise Exception("连接池中没有可用连接,已尝试多次连接")
def release_connection(self, conn):
"""释放数据库连接"""
if conn:
self.pool.put(conn)
def close_all(self):
"""关闭所有连接"""
while not self.pool.empty():
conn = self.pool.get()
conn.close()
class UploadThread(threading.Thread):
def __init__(self, uploadqqnumber, pool):
super().__init__()
self.uploadqqnumber = uploadqqnumber
self.pool = pool
self.conn = pool.get_connection()
def run(self):
logging.debug(f'我是上传线程{self.uploadqqnumber}')
with ThreadPoolExecutor() as executor:
future = executor.submit(download_image, f'{self.uploadqqnumber}')
res = future.result() # 等待结果并获取返回值
if res:
Hash(self.conn).tomysql(self.uploadqqnumber)
logging.info(f'上传成功: {self.uploadqqnumber}')
remove_image(self.uploadqqnumber)
self.pool.release_connection(self.conn)
class FindThread(threading.Thread):
def __init__(self, TargetImageHash, pool):
super().__init__()
self.TargetImageHash = TargetImageHash
self.pool = pool
self.conn = pool.get_connection()
def run(self):
logging.debug(f'我是查询线程{self.TargetImageHash}')
res = Hash(self.conn).getqq(self.TargetImageHash)
if res != 'error':
logging.info(f'查询成功QQ号是: {res}')
self.pool.release_connection(self.conn)
class ByNetFindThread(threading.Thread):
def __init__(self, TargetImageHash, findqqnumber):
super().__init__()
self.TargetImageHash = TargetImageHash
self.findqqnumber = findqqnumber
def run(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(download_image, f'{self.findqqnumber}')
res = future.result() # 等待结果并获取返回值
logging.debug(f'查找的QQ号是: {self.findqqnumber}, res: {res}')
if res:
FindImage = Image.open(str(f'./img/{self.findqqnumber}.jpg'))
if FindImage.mode == 'P' and 'transparency' in FindImage.info:
FindImage = FindImage.convert('RGBA')
FindImageHash = bytes.fromhex(str(imagehash.average_hash(FindImage)))
res = compare(self.TargetImageHash, FindImageHash)
if res-99.9 >= 0 :
logging.info(f'找到QQ号了:{self.findqqnumber}')
if os.path.exists(f'./img/congratulations_{self.findqqnumber}.jpg'):
os.remove(f'./img/congratulations_{self.findqqnumber}.jpg')
os.rename(f'./img/{self.findqqnumber}.jpg',f'./img/congratulations_{self.findqqnumber}.jpg')
clean_image()
os._exit(0)
logging.info(f'[{self.findqqnumber}]的相似度是:{res}%')
remove_image(self.findqqnumber)