把import asyncio import websockets import json import struct import datetime import logging import traceback import os import sys from contextlib import contextmanager from typing import Any, Dict, List, Optional import mysql.connector from mysql.connector import pooling from mysql.connector import errorcode # ====================== # 增强的配置参数 # ====================== CONFIG = { "TIME_FORMATS": [ "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y%m%d%H%M%S", "%d-%b-%Y %H:%M:%S" ], "WEBSOCKET_HOST": "0.0.0.0", # 监听所有接口 "WEBSOCKET_PORT": 8765, "MAX_CONNECTIONS": 100, "DB_WRITE_BATCH_SIZE": 50, "DB_WRITE_TIMEOUT": 5.0, "LOG_FILE": "sensor_server.log", "LOG_LEVEL": "DEBUG", # 增加日志级别配置 # MySQL数据库配置 "MYSQL": { "HOST": "localhost", "PORT": 3306, "USER": "root", "PASSWORD": "Adu@123.", "DATABASE": "CF_HIDB", "POOL_SIZE": 5, "POOL_NAME": "sensor_pool", "POOL_RESET_SESSION": True, "CONNECT_TIMEOUT": 5 # 连接超时时间(秒) } } # ====================== # 增强的日志系统 # ====================== class SensitiveDataFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if not hasattr(record, 'msg') or not record.msg: return True if not isinstance(record.msg, str): try: record.msg = str(record.msg) except Exception: return True # 过滤MySQL密码 password = CONFIG["MYSQL"]["PASSWORD"] if password: record.msg = record.msg.replace(password, "[REDACTED]") return True def setup_logging() -> logging.Logger: sensitive_filter = SensitiveDataFilter() # 配置日志级别 log_level = getattr(logging, CONFIG.get("LOG_LEVEL", "INFO").upper(), logging.INFO) logger = logging.getLogger("SensorServer") logger.setLevel(log_level) # 清除所有现有处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 文件处理器 file_handler = logging.FileHandler(CONFIG["LOG_FILE"]) file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) file_handler.addFilter(sensitive_filter) logger.addHandler(file_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) console_handler.addFilter(sensitive_filter) logger.addHandler(console_handler) return logger # 全局日志器 logger = setup_logging() # ====================== # 增强的MySQL数据库管理 (带超时处理) # ====================== class MySQLDatabaseManager: def __init__(self): self.data_buffer = [] self.last_write_time = datetime.datetime.now() self.connection_pool = None self.initialize_pool() self.initialize_db() def initialize_pool(self): """安全地初始化连接池""" try: logger.debug("尝试创建MySQL连接池...") self.connection_pool = pooling.MySQLConnectionPool( pool_name=CONFIG["MYSQL"]["POOL_NAME"], pool_size=CONFIG["MYSQL"]["POOL_SIZE"], pool_reset_session=CONFIG["MYSQL"]["POOL_RESET_SESSION"], host=CONFIG["MYSQL"]["HOST"], port=CONFIG["MYSQL"]["PORT"], user=CONFIG["MYSQL"]["USER"], password=CONFIG["MYSQL"]["PASSWORD"], database=CONFIG["MYSQL"]["DATABASE"], connect_timeout=CONFIG["MYSQL"]["CONNECT_TIMEOUT"] ) logger.info("MySQL连接池创建成功") except mysql.connector.Error as err: logger.error(f"创建MySQL连接池失败: {err}") # 尝试重新连接 self.reconnect() except Exception as e: logger.critical(f"连接池初始化意外错误: {e}") sys.exit(1) def reconnect(self): """尝试重新连接数据库""" logger.warning("尝试重新连接数据库...") retries = 3 for attempt in range(1, retries + 1): try: self.initialize_pool() logger.info("数据库重新连接成功") return except Exception as e: logger.error(f"重新连接尝试 {attempt}/{retries} 失败: {e}") if attempt < retries: logger.info(f"{5 * attempt}秒后重试...") time.sleep(5 * attempt) logger.critical("无法连接到数据库,服务器退出") sys.exit(1) @contextmanager def _get_connection(self): """安全地获取数据库连接""" if not self.connection_pool: self.reconnect() try: conn = self.connection_pool.get_connection() if not conn.is_connected(): conn.reconnect() yield conn except mysql.connector.Error as err: logger.error(f"获取数据库连接失败: {err}") if err.errno == errorcode.CR_SERVER_GONE_ERROR: self.reconnect() raise finally: if 'conn' in locals() and conn.is_connected(): conn.close() def initialize_db(self): """初始化数据库表结构 (带重试机制)""" max_attempts = 3 for attempt in range(1, max_attempts + 1): try: with self._get_connection() as conn: cursor = conn.cursor() # 创建设备表 cursor.execute(''' CREATE TABLE IF NOT EXISTS devices ( id INT AUTO_INCREMENT PRIMARY KEY, device_id VARCHAR(50) NOT NULL UNIQUE, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ''') # 创建传感器数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS sensor_data ( id BIGINT AUTO_INCREMENT PRIMARY KEY, device_id INT NOT NULL, timestamp TIMESTAMP NOT NULL, sensor_values JSON NOT NULL, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_device_id (device_id), INDEX idx_timestamp (timestamp), FOREIGN KEY (device_id) REFERENCES devices(id) ON DELETE CASCADE ) ''') conn.commit() logger.info("数据库表结构初始化成功") return except Exception as e: logger.error(f"数据库初始化失败 (尝试 {attempt}/{max_attempts}): {e}") if attempt < max_attempts: logger.info(f"{2 * attempt}秒后重试...") time.sleep(2 * attempt) logger.critical("数据库初始化失败,服务器退出") sys.exit(1) def insert_sensor_data(self, device_id: str, timestamp: str, sensor_values: List[float]) -> None: """将数据添加到缓冲区,批量写入数据库""" self.data_buffer.append({ 'device_id': device_id, 'timestamp': timestamp, 'sensor_values': sensor_values }) # 检查是否满足批量写入条件 now = datetime.datetime.now() buffer_full = len(self.data_buffer) >= CONFIG["DB_WRITE_BATCH_SIZE"] timeout_reached = (now - self.last_write_time).total_seconds() >= CONFIG["DB_WRITE_TIMEOUT"] if buffer_full or timeout_reached: self.flush_buffer() def flush_buffer(self) -> None: """安全地将缓冲区数据写入数据库""" if not self.data_buffer: return try: with self._get_connection() as conn: cursor = conn.cursor() # 批量写入设备数据 device_ids = [] for data in self.data_buffer: # 获取或创建设备ID device_db_id = self._get_or_create_device(conn, data['device_id']) device_ids.append(device_db_id) # 准备批量插入传感器数据 sensor_data = [] for i, data in enumerate(self.data_buffer): sensor_data.append(( device_ids[i], data['timestamp'], json.dumps(data['sensor_values']) )) # 批量插入传感器数据 cursor.executemany( "INSERT INTO sensor_data (device_id, timestamp, sensor_values) " "VALUES (%s, %s, %s)", sensor_data ) conn.commit() logger.info(f"成功写入 {len(self.data_buffer)} 条数据到MySQL数据库") # 清空缓冲区 self.data_buffer.clear() self.last_write_time = datetime.datetime.now() except Exception as e: logger.error(f"数据库写入失败: {e}") logger.error(f"异常详情:\n{traceback.format_exc()}") # 保留数据以便稍后重试 def _get_or_create_device(self, conn, device_id: str) -> int: """获取或创建设备记录 (带错误处理)""" cursor = conn.cursor() try: cursor.execute( "SELECT id FROM devices WHERE device_id = %s", (device_id,) ) device = cursor.fetchone() if device: device_id = device[0] cursor.execute( "UPDATE devices SET last_seen = CURRENT_TIMESTAMP WHERE id = %s", (device_id,) ) return device_id else: cursor.execute( "INSERT INTO devices (device_id) VALUES (%s)", (device_id,) ) return cursor.lastrowid except mysql.connector.Error as err: logger.error(f"设备操作失败: {err}") # 简单处理:返回一个虚拟ID,实际应用中应根据需要处理 return -1 # ====================== # 数据包解析 # ====================== def parse_timestamp(timestamp_bytes: bytes) -> str: """解析时间戳字节数据为字符串""" try: timestamp_str = timestamp_bytes.decode('ascii').strip() except UnicodeDecodeError: try: timestamp_str = timestamp_bytes.decode('latin-1').strip() except UnicodeDecodeError: timestamp_str = timestamp_bytes.hex() logger.warning(f"时间戳包含无效字符,使用十六进制: {timestamp_str}") return timestamp_str cleaned = ''.join(c for c in timestamp_str if c.isdigit() or c in '- :/') for fmt in CONFIG["TIME_FORMATS"]: try: dt = datetime.datetime.strptime(cleaned, fmt) return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: continue logger.warning(f"无法解析时间戳: {timestamp_str} (清理后: {cleaned})") return cleaned def parse_sensor_values(data: bytes, start_index: int) -> List[float]: """解析传感器值列表""" values = [] index = start_index max_count = (len(data) - index) // 4 for _ in range(max_count): if index + 4 > len(data): break value_bytes = data[index:index + 4] index += 4 try: value = struct.unpack('<f', value_bytes)[0] values.append(value) except struct.error: try: value = struct.unpack('>f', value_bytes)[0] values.append(value) except struct.error: continue return values def parse_binary_packet(raw_data: bytes) -> Optional[Dict]: """解析二进制数据包""" try: HEADER = b'$\x00\x08\x00\x00\x00' if not raw_data.startswith(HEADER): return None DEVICE_ID_START = len(HEADER) DEVICE_ID_END = DEVICE_ID_START + 8 if len(raw_data) < DEVICE_ID_END: return None device_id_bytes = raw_data[DEVICE_ID_START:DEVICE_ID_END] TIMESTAMP_START = DEVICE_ID_END TIMESTAMP_END = TIMESTAMP_START + 19 if len(raw_data) < TIMESTAMP_END: return None timestamp_bytes = raw_data[TIMESTAMP_START:TIMESTAMP_END] SENSOR_DATA_START = TIMESTAMP_END sensor_values = parse_sensor_values(raw_data, SENSOR_DATA_START) return { 'device_id': decode_with_fallback(device_id_bytes), 'timestamp': parse_timestamp(timestamp_bytes), 'sensor_values': sensor_values } except Exception as e: logger.error(f"解析错误: {str(e)}") return None def decode_with_fallback(data: bytes) -> str: """带回退机制的字节解码""" try: return data.decode('ascii').strip() except UnicodeDecodeError: try: return data.decode('latin-1').strip() except UnicodeDecodeError: return data.hex() # ====================== # 增强的WebSocket服务器 (带连接监控) # ====================== class SensorWebSocketServer: def __init__(self, host: str, port: int, db_manager: MySQLDatabaseManager): self.host = host self.port = port self.db_manager = db_manager self.connections = set() self.server = None self.stats = { 'total_connections': 0, 'total_messages': 0, 'valid_messages': 0, 'invalid_messages': 0 } self.start_time = None self.connection_monitor_task = None async def handler(self, websocket, path: str) -> None: """处理WebSocket连接""" client_ip = websocket.remote_address[0] if websocket.remote_address else "unknown" self.connections.add(websocket) self.stats['total_connections'] += 1 logger.info(f"客户端连接: {client_ip},当前连接数: {len(self.connections)}") try: async for message in websocket: self.stats['total_messages'] += 1 if not isinstance(message, bytes): self.stats['invalid_messages'] += 1 continue parsed_data = parse_binary_packet(message) if parsed_data: self.stats['valid_messages'] += 1 self.db_manager.insert_sensor_data( parsed_data['device_id'], parsed_data['timestamp'], parsed_data['sensor_values'] ) else: self.stats['invalid_messages'] += 1 except websockets.exceptions.ConnectionClosed: logger.info(f"客户端断开: {client_ip}") except Exception as e: logger.error(f"处理客户端 {client_ip} 时出错: {str(e)}") finally: if websocket in self.connections: self.connections.remove(websocket) logger.info(f"客户端断开,剩余连接数: {len(self.connections)}") async def start(self) -> None: """启动WebSocket服务器""" self.server = await websockets.serve( self.handler, self.host, self.port, max_size=2 ** 20, # 1MB ping_interval=30, # 更频繁的ping ping_timeout=15, close_timeout=10, max_queue=CONFIG["MAX_CONNECTIONS"] ) self.start_time = datetime.datetime.now() logger.info(f"WebSocket服务器启动在 ws://{self.host}:{self.port}") # 启动连接监控任务 self.connection_monitor_task = asyncio.create_task(self.monitor_connections()) async def monitor_connections(self): """监控连接状态""" while True: await asyncio.sleep(60) logger.debug(f"连接状态: {len(self.connections)} 个活跃连接") async def stop(self) -> None: """停止WebSocket服务器""" if self.connection_monitor_task: self.connection_monitor_task.cancel() if self.server: self.server.close() await self.server.wait_closed() logger.info("WebSocket服务器已停止") def get_stats(self) -> Dict: """获取服务器统计信息""" return { **self.stats, 'current_connections': len(self.connections), 'uptime': str(datetime.datetime.now() - self.start_time) if self.start_time else "0:00:00" } # ====================== # 增强的主程序 (带健康检查) # ====================== async def main(): # 初始化MySQL数据库管理器 logger.debug("初始化数据库管理器...") db_manager = MySQLDatabaseManager() # 创建WebSocket服务器 logger.debug("创建WebSocket服务器...") server = SensorWebSocketServer( CONFIG["WEBSOCKET_HOST"], CONFIG["WEBSOCKET_PORT"], db_manager ) # 启动服务器 logger.debug("启动WebSocket服务器...") await server.start() try: # 初始健康检查 logger.debug("执行初始健康检查...") await health_check(server, db_manager) # 运行主循环 logger.info("服务器进入主循环") while True: await asyncio.sleep(5) db_manager.flush_buffer() # 每分钟健康检查 now = datetime.datetime.now() if now.second < 5: # 每分钟前5秒执行 await health_check(server, db_manager) except asyncio.CancelledError: logger.info("服务器停止请求") except KeyboardInterrupt: logger.info("收到停止信号,正在关闭服务器...") finally: # 确保所有缓冲数据都写入数据库 db_manager.flush_buffer() await server.stop() async def health_check(server, db_manager): """服务器健康检查""" stats = server.get_stats() # 数据库连接检查 db_status = "OK" try: with db_manager._get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") if cursor.fetchone()[0] != 1: db_status = "ERROR" except Exception: db_status = "ERROR" logger.info( f"健康检查 | " f"连接数: {stats['current_connections']} | " f"消息: {stats['total_messages']} | " f"数据库: {db_status} | " f"运行时间: {stats['uptime']}" ) # 启动程序 if __name__ == "__main__": # 记录启动配置 logger.info("=" * 50) logger.info("启动传感器数据采集服务器 (增强版)") logger.info(f"时间: {datetime.datetime.now()}") logger.info(f"监听地址: {CONFIG['WEBSOCKET_HOST']}:{CONFIG['WEBSOCKET_PORT']}") logger.info(f"MySQL主机: {CONFIG['MYSQL']['HOST']}:{CONFIG['MYSQL']['PORT']}") logger.info(f"数据库: {CONFIG['MYSQL']['DATABASE']}") logger.info("=" * 50) try: # 设置事件循环策略 (解决Windows上的问题) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main()) except Exception as e: logger.critical(f"服务器意外停止: {e}") logger.critical(f"异常详情:\n{traceback.format_exc()}") finally: logger.info("=" * 50) logger.info("服务器已停止运行") logger.info("=" * 50) 部署到docker中
时间: 2025-07-30 10:35:22 浏览: 12
<think>我们有一个Python程序,它使用WebSocket服务器接收传感器数据并存储到MySQL数据库。现在需要将其部署到Docker容器中。步骤:1.创建Dockerfile2.构建Docker镜像3.运行Docker容器,并配置网络和端口映射4.确保MySQL数据库可以连接(可以是另一个容器或外部数据库)注意:在Docker中运行,我们需要考虑:-将代码复制到容器中-安装依赖项(requirements.txt)-暴露WebSocket端口-配置MySQL连接(通过环境变量或配置文件)由于我们的程序使用了MySQL,我们可以选择:a)使用DockerCompose来同时运行MySQL容器和我们的应用容器b)连接外部已有的MySQL服务这里我们将采用DockerCompose方式,这样更便于管理。首先,我们创建一个Dockerfile,然后创建一个docker-compose.yml文件。步骤1:创建Dockerfile在项目目录中,创建Dockerfile,内容如下:注意:由于我们的代码中使用了mysql-connector-python,所以我们需要安装它。另外,我们还需要websockets库。我们还需要将配置文件中的数据库连接信息改为从环境变量获取,这样在docker-compose中可以方便地配置。修改配置:将CONFIG中的MYSQL部分改为从环境变量读取。但是,由于原代码中CONFIG是硬编码的,我们需要修改代码以便从环境变量获取配置。这里我们采用环境变量覆盖配置的方式。修改CONFIG部分:原代码:CONFIG={...,"MYSQL":{"HOST":"localhost",...,}}修改为:importosCONFIG={...,"MYSQL":{"HOST":os.getenv("MYSQL_HOST","localhost"),"PORT":int(os.getenv("MYSQL_PORT","3306")),"USER":os.getenv("MYSQL_USER","root"),"PASSWORD":os.getenv("MYSQL_PASSWORD","Adu@123."),"DATABASE":os.getenv("MYSQL_DATABASE","CF_HIDB"),"POOL_SIZE":int(os.getenv("MYSQL_POOL_SIZE","5")),...#其他配置同理}}注意:我们还需要在Dockerfile中设置环境变量,或者通过docker-compose设置。另外,日志文件路径:在Docker中,我们通常将日志输出到控制台,所以我们可以修改日志配置,同时输出到控制台和文件,或者只输出到控制台(这样可以通过dockerlogs查看)。但是,原代码已经配置了控制台处理器,所以日志会输出到控制台。现在,编写Dockerfile:我们使用Python官方镜像作为基础镜像。步骤2:创建requirements.txt文件,列出依赖:创建requirements.txt,内容:websocketsmysql-connector-python步骤3:编写Dockerfile假设我们的代码文件名为:sensor_server.pyDockerfile:</think>要将这个传感器数据采集服务器部署到 Docker 中,需要以下步骤:
### 解决方案
#### 1. 创建 Dockerfile
```dockerfile
# 使用官方 Python 镜像
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
default-libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用程序代码
COPY sensor_server.py .
# 暴露 WebSocket 端口
EXPOSE 8765
# 设置环境变量(可被 docker-compose 覆盖)
ENV MYSQL_HOST=mysql
ENV MYSQL_PORT=3306
ENV MYSQL_USER=root
ENV MYSQL_PASSWORD=Adu@123.
ENV MYSQL_DATABASE=CF_HIDB
ENV LOG_LEVEL=INFO
# 启动应用程序
CMD ["python", "sensor_server.py"]
```
#### 2. 创建 requirements.txt
```
websockets
mysql-connector-python
```
#### 3. 创建 docker-compose.yml
```yaml
version: '3.8'
services:
sensor-server:
build: .
ports:
- "8765:8765"
environment:
- MYSQL_HOST=mysql
- MYSQL_PORT=3306
- MYSQL_USER=root
- MYSQL_PASSWORD=${MYSQL_ROOT_PASSWORD:-Adu@123.}
- MYSQL_DATABASE=CF_HIDB
- LOG_LEVEL=INFO
depends_on:
- mysql
restart: unless-stopped
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-Adu@123.}
MYSQL_DATABASE: CF_HIDB
volumes:
- mysql_data:/var/lib/mysql
command: --default-authentication-plugin=mysql_native_password
restart: unless-stopped
volumes:
mysql_data:
```
#### 4. 修改 Python 代码以支持环境变量
在代码开头添加环境变量处理:
```python
import os
# 从环境变量获取配置
CONFIG = {
"TIME_FORMATS": [
"%Y-%m-%d %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
"%Y%m%d%H%M%S",
"%d-%b-%Y %H:%M:%S"
],
"WEBSOCKET_HOST": "0.0.0.0",
"WEBSOCKET_PORT": 8765,
"MAX_CONNECTIONS": 100,
"DB_WRITE_BATCH_SIZE": 50,
"DB_WRITE_TIMEOUT": 5.0,
"LOG_FILE": "/dev/stdout", # 输出到控制台
"LOG_LEVEL": os.getenv("LOG_LEVEL", "INFO"),
"MYSQL": {
"HOST": os.getenv("MYSQL_HOST", "localhost"),
"PORT": int(os.getenv("MYSQL_PORT", "3306")),
"USER": os.getenv("MYSQL_USER", "root"),
"PASSWORD": os.getenv("MYSQL_PASSWORD", "Adu@123."),
"DATABASE": os.getenv("MYSQL_DATABASE", "CF_HIDB"),
"POOL_SIZE": 5,
"POOL_NAME": "sensor_pool",
"POOL_RESET_SESSION": True,
"CONNECT_TIMEOUT": 5
}
}
```
#### 5. 构建和运行
```bash
# 设置 MySQL root 密码(可选)
export MYSQL_ROOT_PASSWORD=your_secure_password
# 构建并启动服务
docker-compose up -d --build
# 查看日志
docker-compose logs -f
```
### 解释
1. **Dockerfile**:
- 基于 Python 3.10 官方镜像构建
- 安装 MySQL 客户端依赖
- 复制应用代码和依赖
- 设置环境变量默认值
- 暴露 WebSocket 端口 8765
2. **docker-compose.yml**:
- 定义两个服务:传感器服务器和 MySQL 数据库
- 使用环境变量配置 MySQL 连接
- 持久化 MySQL 数据卷
- 设置服务依赖关系
3. **环境变量支持**:
- 修改代码从环境变量读取配置
- 日志输出到 `/dev/stdout` 以便 Docker 捕获
- MySQL 配置完全通过环境变量控制
4. **安全增强**:
- 使用 `.env` 文件管理敏感信息
- MySQL 使用独立密码
- 服务自动重启策略
### 部署流程
```mermaid
graph TD
A[创建 Dockerfile] --> B[创建 docker-compose.yml]
C[修改代码支持环境变量] --> D[构建镜像]
D --> E[启动容器]
E --> F[验证服务]
```
### 验证部署
```bash
# 检查运行中的容器
docker-compose ps
# 测试 WebSocket 连接
# 使用 websocat 或其他 WebSocket 客户端
websocat ws://localhost:8765
```
阅读全文
相关推荐



















