import asyncio import websockets import websockets.client # 显式导入 import platform import subprocess import json import socket import time import base64 import ssl import logging import sys # Import sys for diagnostics import functools from typing import Optional from datetime import datetime import aiohttp # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('pyquery.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # --- 诊断信息 --- logger.info(f"🐍 Python Version: {sys.version}") try: logger.info(f"📦 Websockets Library Version: {websockets.__version__}") except Exception as e: logger.error(f"无法获取 websockets 版本: {e}") # --- 结束诊断 --- # 配置参数 SERVER = "wss://r.maifeipin.com/ws" # ws:// 或 wss:// LOGIN_URL = "https://r.maifeipin.com/api/admin/login" # 登录接口 USERNAME = "user" PASSWORD = "password" TOKEN = None HEARTBEAT_INTERVAL = 10 RECONNECT_INTERVAL = 5 MAX_RECONNECT_ATTEMPTS = 10 CONNECTION_TIMEOUT = 30 COMMAND_TIMEOUT = 60 PING_INTERVAL = 30 PING_TIMEOUT = 10 DANGEROUS_COMMANDS = [ 'rm', 'del', 'format', 'shutdown', 'reboot', 'halt', 'poweroff', 'init', 'kill', 'killall', 'pkill', 'dd', 'fdisk', 'mkfs', 'mount', 'umount' ] async def get_token(): global TOKEN try: async with aiohttp.ClientSession() as session: payload = { "UserName": USERNAME, "Password": PASSWORD, "Email": "email@example.com" # 根据接口需要填 } headers = { "Content-Type": "application/json" } async with session.post(LOGIN_URL, headers=headers, json=payload) as resp: if resp.status == 200: data = await resp.json() TOKEN = data.get("token") if TOKEN: logger.info("✅ 成功获取到 token") else: logger.error("❌ 登录成功但未获取到 token") else: text = await resp.text() logger.error(f"❌ 登录失败,状态码: {resp.status},响应: {text}") except Exception as e: logger.error(f"❌ 获取 token 时出错: {e}") class WebSocketClient: def __init__(self): self.client_id = self.get_client_id() self.reconnect_count = 0 self.is_running = True self.is_registered = False self.websocket: Optional[websockets.WebSocketClientProtocol] = None self.last_heartbeat_response = time.time() def get_client_id(self) -> str: hostname = socket.gethostname() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') return f"client_{hostname}_{timestamp}" def is_safe_command(self, cmd: str) -> bool: cmd_lower = cmd.lower().strip() return not any(dangerous in cmd_lower for dangerous in DANGEROUS_COMMANDS) async def execute_command(self, cmd: str) -> str: try: if not self.is_safe_command(cmd): return "拒绝执行危险命令" process = await asyncio.wait_for( asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ), timeout=COMMAND_TIMEOUT ) stdout, _ = await asyncio.wait_for( process.communicate(), timeout=COMMAND_TIMEOUT ) return stdout.decode('utf-8', errors='ignore') except asyncio.TimeoutError: return "命令执行超时" except Exception as e: return f"命令执行出错:{str(e)}" def is_websocket_open(self) -> bool: if self.websocket is None: return False try: return self.websocket.state == websockets.protocol.State.OPEN except AttributeError: try: return not self.websocket.closed except AttributeError: return True async def send_heartbeat(self): if self.websocket is None: return False try: heartbeat_message = json.dumps({ "type": "heartbeat", "client_id": self.client_id, "timestamp": datetime.now().isoformat() }) await self.websocket.send(heartbeat_message) logger.debug("已发送心跳包") return True except websockets.exceptions.ConnectionClosed: logger.warning("连接已关闭") return False except Exception as e: logger.error(f"发送心跳包失败: {e}") return False async def send_response(self, response: str): if self.is_websocket_open(): try: encoded_response = base64.b64encode(response.encode('utf-8')).decode('utf-8') message = json.dumps({ "type": "response", "client_id": self.client_id, "result": encoded_response, "encoding": "base64", "timestamp": datetime.now().isoformat() }) await self.websocket.send(message) logger.info(f"已发送响应,长度: {len(response)}") except Exception as e: logger.error(f"发送响应失败: {e}") async def handle_heartbeat_response(self, data: dict): status = data.get("status") self.last_heartbeat_response = time.time() if status == "active": logger.debug("服务器确认客户端状态活跃") elif status in ["inactive", "need_register"]: logger.warning("服务器要求重新注册") self.is_registered = False await self.register_client() else: logger.warning(f"未知的心跳响应状态: {status}") async def handle_message(self, message: str): try: data = json.loads(message) message_type = data.get("type") if message_type == "command": cmd = data.get("data", "") logger.info(f"收到命令: {cmd}") if cmd.strip() == "你能和我通话吗?": response = f"好的,我是 {platform.system()} 系统,客户端ID: {self.client_id}" else: response = await self.execute_command(cmd) await self.send_response(response) elif message_type == "heartbeat_response": await self.handle_heartbeat_response(data) elif message_type == "ping": pong_message = json.dumps({ "type": "pong", "client_id": self.client_id, "timestamp": datetime.now().isoformat() }) await self.websocket.send(pong_message) logger.debug("已响应ping") except json.JSONDecodeError: logger.error(f"无法解析消息: {message}") except Exception as e: logger.error(f"处理消息时出错: {e}") async def create_ssl_context(self) -> Optional[ssl.SSLContext]: if SERVER.startswith("wss://"): try: ssl_context = ssl.create_default_context() return ssl_context except Exception as e: logger.error(f"创建SSL上下文失败: {e}") return None return None async def connect_websocket(self): """建立 WebSocket 连接""" if self.is_websocket_open(): logger.info("♻️ WebSocket 已连接,无需重连") return reconnect_delay = RECONNECT_INTERVAL while self.reconnect_count < MAX_RECONNECT_ATTEMPTS and self.is_running: try: if not TOKEN: await get_token() if not TOKEN: logger.error("❌ Token 获取失败,无法连接") return logger.info(f"正在连接到 {SERVER}... (第{self.reconnect_count + 1}次)") if self.reconnect_count>3: await get_token() headers = [("Authorization", f"Bearer {TOKEN}")] ssl_context = await self.create_ssl_context() # 使用 websockets.client.connect 并传入 headers async with websockets.client.connect( SERVER, ssl=ssl_context, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, open_timeout=CONNECTION_TIMEOUT, extra_headers=headers ) as websocket: self.websocket = websocket self.is_registered = False await self.register_client() self.reconnect_count = 0 heartbeat_task = asyncio.create_task(self.heartbeat_loop()) try: await self.message_handler() finally: heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass except asyncio.TimeoutError: logger.error("连接超时") except websockets.exceptions.InvalidURI: logger.error(f"无效的WebSocket URI: {SERVER}") break except websockets.exceptions.InvalidHandshake: logger.error("WebSocket握手失败") except ConnectionRefusedError: logger.error("连接被拒绝") except Exception as e: logger.error(f"连接失败: {e}") finally: if self.websocket: try: await self.websocket.close() except: pass self.websocket = None self.is_registered = False self.reconnect_count += 1 if self.reconnect_count < MAX_RECONNECT_ATTEMPTS: wait_time = min(reconnect_delay * (2 ** (self.reconnect_count - 1)), 60) logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: logger.error(f"已达到最大重连次数 ({MAX_RECONNECT_ATTEMPTS}),停止重连") break logger.info("客户端已停止") async def register_client(self): if self.is_websocket_open(): try: register_message = json.dumps({ "type": "register", "client_id": self.client_id, "platform": platform.system(), "hostname": socket.gethostname(), "timestamp": datetime.now().isoformat() }) await self.websocket.send(register_message) self.is_registered = True logger.info(f"已注册客户端,ID: {self.client_id}") return True except Exception as e: logger.error(f"注册客户端失败: {e}") self.is_registered = False return False return False async def heartbeat_loop(self): while self.is_running: try: if self.is_websocket_open(): if self.is_registered: success = await self.send_heartbeat() if not success: logger.warning("心跳发送失败,连接可能已断开") break else: logger.info("客户端未注册,尝试重新注册") await self.register_client() else: logger.warning("WebSocket连接未打开") break await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"心跳循环出错: {e}") await asyncio.sleep(5) break async def message_handler(self): try: while self.is_running and self.is_websocket_open(): try: try: message = await asyncio.wait_for( self.websocket.recv(), timeout=1.0 ) await self.handle_message(message) except asyncio.TimeoutError: continue except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket连接已关闭") break except websockets.exceptions.ConnectionClosedError: logger.warning("WebSocket连接异常关闭") break except Exception as e: logger.error(f"消息处理循环中出错: {e}") break except Exception as e: logger.error(f"消息处理器异常: {e}") async def run(self): global TOKEN await get_token() if not TOKEN: logger.error("无法获取有效的 token,终止运行") return while self.is_running and self.reconnect_count < MAX_RECONNECT_ATTEMPTS: try: logger.info(f"正在连接到 {SERVER}... (第{self.reconnect_count + 1}次)") self.websocket = await self.connect_websocket() logger.info("WebSocket连接成功") await self.register_client() self.reconnect_count = 0 heartbeat_task = asyncio.create_task(self.heartbeat_loop()) try: await self.message_handler() finally: heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass except asyncio.TimeoutError: logger.error("连接超时") except websockets.exceptions.InvalidURI: logger.error(f"无效的WebSocket URI: {SERVER}") break except websockets.exceptions.InvalidHandshake: logger.error("WebSocket握手失败") except ConnectionRefusedError: logger.error("连接被拒绝") except Exception as e: logger.error(f"连接失败: {e}") finally: if self.websocket: try: await self.websocket.close() except: pass self.websocket = None self.is_registered = False self.reconnect_count += 1 if self.reconnect_count < MAX_RECONNECT_ATTEMPTS: wait_time = min(RECONNECT_INTERVAL * (2 ** (self.reconnect_count - 1)), 60) logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: logger.error(f"已达到最大重连次数 ({MAX_RECONNECT_ATTEMPTS}),停止重连") break logger.info("客户端已停止") def stop(self): self.is_running = False logger.info("正在停止客户端...") async def main(): client = WebSocketClient() try: await client.run() except KeyboardInterrupt: logger.info("收到中断信号") finally: client.stop() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("程序被用户中断") except Exception as e: logger.error(f"程序异常退出: {e}")