活动介绍

os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config_data', 'config.json')在打包后找不到文件

时间: 2025-08-19 20:03:28 浏览: 2
<think>我们正在解决Python中使用`os.path.join`、`os.path.dirname`、`os.path.abspath`在打包后(例如使用PyInstaller)找不到文件的问题。 根本原因:在打包后,程序运行在一个临时目录中,而不再是源代码目录。因此,使用相对路径或基于`__file__`的路径可能会出现问题。 解决方案:使用`sys._MEIPASS`(PyInstaller专用)来获取资源路径,或者确保使用正确的路径访问方式。 具体步骤: 1. **理解路径获取方式**: - `__file__`:当前执行脚本的路径(在打包后,这个路径指向临时目录中的脚本,但非资源文件可能不在同一目录)。 - `os.path.abspath(__file__)`:获取当前脚本的绝对路径。 - `os.path.dirname(__file__)`:获取当前脚本所在目录。 - `os.path.join()`:拼接路径。 2. **在打包时,资源文件(如图片、配置文件)需要被包含到打包程序中**: - 在PyInstaller中,可以使用`--add-data`选项将资源文件添加到打包程序中。 3. **运行时获取资源路径**: - 判断是否打包环境:通过检查`sys`模块中是否有`frozen`属性(例如`getattr(sys, 'frozen', False)`)。 - 如果打包了,则使用`sys._MEIPASS`作为基础路径(这是PyInstaller在运行时创建的临时目录,资源文件被解压到这里)。 - 如果没有打包,则使用基于`__file__`的路径。 示例代码: ```python import sys import os def resource_path(relative_path): """ 获取资源路径,兼容开发环境和PyInstaller打包后环境 """ if getattr(sys, 'frozen', False): # 是否打包 base_path = sys._MEIPASS else: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) # 使用示例 # 假设有一个资源文件位于项目根目录下的resources文件夹中,名为config.ini config_path = resource_path('resources/config.ini') ``` 4. **打包命令**: - 使用PyInstaller打包时,需要将资源文件通过`--add-data`指定: ```bash pyinstaller --add-data "resources/config.ini:resources" your_script.py ``` - 注意:在Windows上,路径分隔符使用分号(;),在Linux和macOS上使用冒号(:)。上面的例子中,`"源路径:目标路径"`,目标路径是相对于临时目录根目录的。 5. **注意事项**: - 在代码中访问资源文件时,必须使用`resource_path`函数来获取完整路径。 - 确保资源文件被正确添加到打包程序中(通过`--add-data`指定)。 6. **另一种方法(使用os.path.dirname(sys.argv[0]))**: - 但这种方法在打包后可能得到的是临时目录中的可执行文件所在目录,而资源文件可能被放在其他位置(如使用`--add-data`指定了子目录),所以结合`sys._MEIPASS`更可靠。 7. **调试建议**: - 在打包后,可以打印出`sys._MEIPASS`和拼接后的路径,确保路径正确。 引用说明: - 在开发环境中,我们通常使用基于`__file__`的路径来定位资源文件。但是打包后,这些路径会发生变化。因此,需要根据运行环境动态调整基础路径[^1][^2][^3][^4]。 通过以上方法,可以解决打包后找不到文件的问题。 §§相关问题§§ 1. 如何判断Python程序是否被打包(例如使用PyInstaller)? 2. 在PyInstaller打包时,如何包含资源文件夹? 3. 除了`sys._MEIPASS`,还有其他方法获取打包后的资源路径吗? 4. 打包后的程序在运行时,临时目录的结构是怎样的?</think>### Python打包后文件路径问题解决方案 在Python打包(如PyInstaller)后出现文件找不到问题,主要是因为打包后程序运行路径发生变化。以下是完整解决方案: #### 1. **问题根源** 打包后程序运行在临时目录(如`/tmp/_MEIxxxxx`),而非源代码目录。此时使用`__file__`或`os.getcwd()`获取的路径会失效[^1][^2]。 #### 2. **可靠路径获取方法** 使用组合方法获取基础路径: ```python import sys import os # 获取当前脚本的真实路径(兼容打包环境) if getattr(sys, 'frozen', False): # 是否打包环境 base_path = sys._MEIPASS # PyInstaller专用临时目录 else: base_path = os.path.dirname(os.path.abspath(__file__)) print(f"基础路径: {base_path}") ``` #### 3. **路径拼接最佳实践** ```python # 拼接资源文件路径(如配置文件) config_path = os.path.join(base_path, "resources", "config.ini") print(f"配置文件路径: {config_path}") # 拼接上级目录文件 parent_file = os.path.join(base_path, "..", "data.json") abs_parent_path = os.path.abspath(parent_file) # 规范路径 ``` #### 4. **PyInstaller打包配置** 在`.spec`文件或命令行中添加资源: ```python # 修改.spec文件 a = Analysis( ..., datas=[('src/resources/*', 'resources')], # (源路径, 目标路径) ) ``` 或命令行: ```bash pyinstaller --add-data "src/resources:resources" your_script.py ``` #### 5. **调试技巧** 添加路径检查代码: ```python print(f"当前工作目录: {os.getcwd()}") print(f"__file__ 路径: {__file__}") print(f"绝对路径: {os.path.abspath(__file__)}") print(f"目录名: {os.path.dirname(os.path.abspath(__file__))}") ``` #### 6. **注意事项** - **路径分隔符**:始终使用`os.path.join()`代替手动拼接`/`或`\` - **相对路径**:避免使用`os.chdir()`改变工作目录 - **只读文件**:打包后资源文件位于临时目录,需按只读方式处理 - **路径规范**:关键路径使用`os.path.abspath()`和`os.path.normpath()` > **示例场景**:读取打包后的配置文件 > ```python > def load_config(): > base_dir = sys._MEIPASS if getattr(sys, 'frozen', False) else os.path.dirname(__file__) > config_path = os.path.join(base_dir, "config", "settings.yaml") > with open(config_path, 'r') as f: > return yaml.safe_load(f) > ```
阅读全文

相关推荐

import os import datetime import shutil import subprocess import re import platform import hashlib def get_commit_id(): cmd = ['git', 'rev-parse', 'HEAD'] try: result = subprocess.check_output(cmd) commit_id = result.decode().strip() if not commit_id: raise Exception('commit id not found') return commit_id except Exception as e: print(e) raise e def remove_pycache(path): for root, dirs, files in os.walk(path): for dir in dirs: if dir == '__pycache__': pycache_dir = os.path.join(root, dir) shutil.rmtree(pycache_dir) def use_shell(): if platform.system() == "Linux": return False return True class SdkPacker: def __init__(self): self.starttime = datetime.datetime.now() self.record('*' * 10 + 'SDK pack init' + '*' * 10) # pip self.pipUrl = os.getenv('SDK_PIP_URL') self.pipTrustHost = os.getenv('SDK_PIP_TRUST_HOST') self.pipArgs = os.getenv('SDK_PIP_ARGS') self.record(f'pipUrl: {self.pipUrl}') self.record(f'pipTrustHost: {self.pipTrustHost}') self.record(f'pipArgs: {self.pipArgs}') # sdk path self.sdkPath = os.path.dirname(os.path.abspath(__file__)) self.outPath = os.path.join(self.sdkPath, 'out') self.enginePath = os.path.join(self.outPath, 'engine') self.remove_path(os.path.join(self.sdkPath, 'build')) self.remove_path(os.path.join(self.sdkPath, 'rpa', 'build')) self.remove_path(self.enginePath) # commit id self.commitId = get_commit_id() self.record(f"commit id:{self.commitId}") # cache path self.cachePath = os.path.join(self.sdkPath, 'out', 'cache') self.cacheZipPath = os.path.join(self.cachePath, f'{self.commitId}.7z') # env self.RPA_PACK_PLATFORM = self.get_env('RPA_PACK_PLATFORM') self.RPA_PACK_ARCH = self.get_env('RPA_PACK_ARCH') self.RPA_VERSION = self.get_env('RPA_PACK_VERSION') if not self.RPA_VERSION or not re.search(r"\d", self.RPA_VERSION): self.RPA_VERSION = "15.0.0" self.RPA_FORCE_REBUILD = self.get_env('RPA_PACK_FORCE_REBUILD') self.platform = platform.system() self.record(f"System: {self.platform}") # tools path self.sdkToolsPath = os.path.join(self.sdkPath, 'out', 'sdk_tools') # output path self.python_out = os.path.join(self.enginePath) if self.RPA_PACK_PLATFORM == 'windows': self.reqsPath = os.path.join(self.sdkPath, 'rpa', 'requirements.txt') self.site_packages = os.path.join(self.enginePath, 'Lib', 'site-packages') elif self.RPA_PACK_PLATFORM in ['linux', 'UOS', 'kylinOS']: self.reqsPath = os.path.join(self.sdkPath, 'rpa', 'requirements_uos.txt') self.site_packages = os.path.join(self.enginePath, 'lib', 'python3.7', 'site-packages') else: raise Exception(f'not support platform: {self.RPA_PACK_PLATFORM} and arch: {self.RPA_PACK_ARCH}') self.seven_zip_out = os.path.join(self.site_packages, 'rpa', 'file_folder') self.ffmpeg_out = os.path.join(self.site_packages, 'rpa', 'win32') self.db2_out = os.path.join(self.site_packages) self.db2_cli_out = os.path.join(self.site_packages, 'ibm_db-3.1.4') self.pip_args = [] if self.pipUrl: self.pip_args.extend(['-i', self.pipUrl]) if self.pipTrustHost: self.pip_args.extend(['--trusted-host', self.pipTrustHost]) if self.pipArgs: self.pip_args.extend(self.pipArgs.split(',')) # self.pip_args.extend(['--no-cache-dir', '--no-warn-script-location']) self.record("sdk pack init end") def run_command(self, command, cwd=None): process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd, shell=use_shell()) while True: output = process.stdout.readline() if output == b'' and process.poll() is not None: break if output: print(output.strip().decode('utf-8', errors='replace')) process.wait() if process.returncode is not 0: raise Exception(f'run command {command} error, return code: {process.returncode}') self.record(f"run command: {command}") def get_env(self, env_key): env_value = os.getenv(env_key) if env_value: self.record(f'{env_key}: {env_value}') else: raise Exception(f'{env_key} not found') return env_value def remove_path(self, path): if os.path.exists(path): try: if os.path.isfile(path): os.remove(path) elif os.path.isdir(path): shutil.rmtree(path) self.record(f"remove {path}: successfully") except Exception as e: self.record(f'remove {path}: {e} error') raise e else: self.record(f"remove {path}: not exists") def record(self, title): end_time = datetime.datetime.now() diff = (end_time - self.starttime) print(f"[{end_time.time()} - {diff.seconds}] {title}") def unzip(self, src, dst): # self.run_command(['7z', 'x', src, '-o' + dst, '-bb0'], cwd=os.path.join(self.sdkToolsPath, '7z')) os.system(f"7z x {src} -o{dst}") self.record(f"unzip {src} to {dict}") def calculate_md5(self, file_path): with open(file_path, "rb") as f: md5_hash = hashlib.md5() for chunk in iter(lambda: f.read(4096), b""): md5_hash.update(chunk) return md5_hash.hexdigest() def copy(self, package_name, *rpa_dir): package = os.path.join(self.sdkToolsPath, package_name) package_out = os.path.join(self.site_packages, 'rpa', *rpa_dir) for file in os.listdir(package): package_file = os.path.join(package, file) shutil.copy(package_file, package_out) self.record(f"{package_file} >> {package_out}") self.record(f"copy {package_name}") def pack(self): # encrypt sdk self.record('*' * 10 + 'SDK pack' + '*' * 10) if self.RPA_FORCE_REBUILD == 'false' and os.path.exists(self.cacheZipPath): self.record('SDK use cache') self.unzip(self.cacheZipPath, self.outPath) else: self.encrypt_sdk() # add version version_path = os.path.join(self.site_packages, 'rpa', 'version', 'version') content = f'{self.RPA_VERSION}\n{datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}\n{self.commitId}' with open(version_path, 'w') as f: f.write(content) shutil.copy(version_path, self.enginePath) with open(os.path.join(self.enginePath, 'version'), 'r') as f: self.record(f.read()) # remove cache remove_pycache(self.enginePath) self.record(f"remove engine pycache: {self.enginePath}") self.record("SDK pack end") def link_python(self): pass def is_linux(self): return self.RPA_PACK_PLATFORM in ['linux', 'UOS', 'kylinOS'] def is_windows(self): return self.RPA_PACK_PLATFORM == 'windows' def encrypt_sdk(self): def fix_tk(): # remove mouseinfo tk sys.exit mouseinfo_init_path = os.path.join(self.site_packages, 'mouseinfo', '__init__.py') command = """sed -i "s/sys.exit.*/pass/g" """ + mouseinfo_init_path ret = os.system(command) msg = f"remove mouseinfo tk sys.exit code: {ret}" if ret: raise SystemError(msg) self.record(msg) def install_pywpsrpc(): wheel_path = 'pywpsrpc-2.3.3-cp37-cp37m-manylinux_2_5_x86_64.whl' if self.RPA_PACK_ARCH == "arm64": wheel_path = 'pywpsrpc-2.3.3-cp37-cp37m-manylinux_2_28_aarch64.whl' pywpsrpc_path = os.path.join(self.sdkToolsPath, 'pywpsrpc', wheel_path) self.run_command([python_path, '-m', 'pip', 'install', pywpsrpc_path]) def copy_depends(): shutil.copytree(os.path.join(self.sdkToolsPath, 'deps', 'at-spi2-core'), os.path.join(self.enginePath, 'deps', 'at-spi2-core')) shutil.copytree(os.path.join(self.sdkToolsPath, 'deps', 'wps'), os.path.join(self.enginePath, 'deps', 'wps')) shutil.copytree(os.path.join(self.sdkToolsPath, 'deps', 'xclip'), os.path.join(self.site_packages, 'xclip')) # move python self.record('SDK encrypt') use_cache = False requirements_md5 = self.calculate_md5(self.reqsPath) requirements_cache_path = os.path.join(self.sdkPath, 'out', 'cache', f'{requirements_md5}.7z') if self.RPA_FORCE_REBUILD == 'false': if os.path.exists(requirements_cache_path): use_cache = True python_source = "" python_path = "" if self.is_windows(): python_source = os.path.join(self.sdkToolsPath, 'python') python_path = os.path.join(self.enginePath, "python.exe") elif self.is_linux(): python_source = "/opt/python3.7" python_path = os.path.join(self.enginePath, "bin", "python") if not use_cache: shutil.copytree(python_source, self.enginePath) self.record(f"{python_source} >> {self.enginePath}") if self.is_linux(): os.system(f'apt-get install -y libcairo2-dev libgirepository1.0-dev unixodbc-dev') current_cwd = os.getcwd() self.record(f"current cwd:{current_cwd}") bin_path = os.path.join(self.enginePath, "bin") os.chdir(bin_path) os.system(f'ln -s python3.7 python') os.system(f'ln -s python3.7 python3') self.record("link python3.7 to python python3") os.chdir(current_cwd) # install requirements # comtypes<1.1.11 need 2to3, setuptools<58 support 2to3 self.run_command([python_path, '-m', 'pip', 'install', '--upgrade', 'pip'] + self.pip_args) self.run_command([python_path, '-m', 'pip', 'install', '--upgrade', 'setuptools < 58'] + self.pip_args) self.run_command([python_path, '-m', 'pip', 'install', '-r', self.reqsPath] + self.pip_args) if self.is_windows(): # install db2 shutil.copytree(os.path.join(self.sdkToolsPath, 'db2', 'ibm_db-3.1.4'), os.path.join(self.enginePath, 'Lib', 'site-packages', 'ibm_db-3.1.4')) shutil.copytree(os.path.join(self.sdkToolsPath, 'db2_cli', 'clidriver'), os.path.join(self.enginePath, 'Lib', 'site-packages', 'ibm_db-3.1.4', 'clidriver')) self.run_command([os.path.join(self.enginePath, 'python'), 'setup.py', 'install'], cwd=os.path.join(self.enginePath, 'Lib', 'site-packages', 'ibm_db-3.1.4')) elif self.is_linux(): # install db2 # shutil.copytree(os.path.join(self.sdkToolsPath, 'db2', 'ibm_db-3.1.4'), # os.path.join(self.site_packages, 'ibm_db-3.1.4')) # shutil.copytree(os.path.join(self.sdkToolsPath, 'db2_cli', 'clidriver'), # os.path.join(self.site_packages, 'ibm_db-3.1.4', 'clidriver')) # self.run_command([python_path, 'setup.py', 'install'], # cwd=os.path.join(self.site_packages, 'ibm_db-3.1.4')) fix_tk() install_pywpsrpc() copy_depends() # install cython self.run_command([python_path, '-m', 'pip', 'install', 'cython==0.29.24'] + self.pip_args) self.remove_path(requirements_cache_path) self.run_command(['7z', 'a', '-mx1', requirements_cache_path, self.enginePath], cwd=os.path.join(self.sdkToolsPath, '7z')) else: self.record("requirements use cache") self.unzip(requirements_cache_path, self.outPath) build_path = os.path.join(self.sdkPath, 'build', 'rpa') # encrypt sdk self.run_command([python_path, 'setup.py'], cwd=os.path.join(self.sdkPath, 'rpa')) # uninstall cython self.run_command([python_path, '-m', 'pip', 'uninstall', 'cython', '-y']) # remove pycache remove_pycache(build_path) self.record(f"remove rpa pycache: {build_path}") # copy sdk rpa_path = os.path.join(self.site_packages, 'rpa') shutil.move(build_path, rpa_path) self.record(f"move {build_path} >> {rpa_path}") if self.RPA_PACK_PLATFORM == 'windows': self.copy('activexinput', 'uia', 'activexinput') self.copy("7z", "file_folder") self.copy("ffmpeg", "win32") # save cache self.remove_path(self.cacheZipPath) self.run_command(['7z', 'a', '-mx1', self.cacheZipPath, self.enginePath], cwd=os.path.join(self.sdkToolsPath, '7z')) # self.run_command(['7z', 'a', '-tzip', self.cacheZipPath, '-r', self.enginePath, '-y', '-bb0'], # cwd=os.path.join(self.sdkToolsPath, '7z')) # remove paths self.remove_path(os.path.join(self.sdkPath, 'build')) self.remove_path(build_path) self.record("SDK encrypt end") if __name__ == '__main__': import sys if sys.platform == "win32": # for tests os.environ['RPA_PACK_PLATFORM'] = 'windows' os.environ['RPA_PACK_ARCH'] = 'x64' os.environ['RPA_TARGET_FORMAT'] = 'zip' os.environ['RPA_PACK_GITOKEN'] = 'pack_gitoken' os.environ['RPA_VERSION'] = '1.0.0' os.environ['RPA_GIT_TOKEN'] = 'git_token' os.environ['RPA_FORCE_REBUILD'] = 'false' os.environ['RPA_TOOLS_HOME'] = 'C:\\Repos\\tools' elif sys.platform == "darwin": sys.exit(0) else: os.environ['RPA_PACK_PLATFORM'] = 'linux' os.environ['RPA_PACK_ARCH'] = 'x64' os.environ['RPA_TARGET_FORMAT'] = 'deb' os.environ['RPA_PACK_GITOKEN'] = 'pack_gitoken' os.environ['RPA_VERSION'] = '1.0.0' os.environ['RPA_GIT_TOKEN'] = 'git_token' os.environ['RPA_FORCE_REBUILD'] = 'false' os.environ['RPA_TOOLS_HOME'] = '/home/uos/tools' os.environ['SDK_PIP_URL'] = 'https://repo.datagrand.com/repository/py/simple' packer = SdkPacker() packer.pack() 分解一下项目打包的流程,介绍如何实现打包,如何配置环境,依赖等等,以及是否实现可执行文件,将整个流程用图表表示

我的handlers代码里没有filetreehandler方法,我给你提供我的代码,你在我的代码中改动吧:handlers: import tornado.web import tornado.websocket import json import os import mimetypes import urllib.parse from config import RESOURCE_POOLS, save_config, ADMIN_IP, ADMIN_PASSWORD from file_utils import get_file_list, scan_pools, get_recursive_file_list import aiofiles # 改进管理员检查 def is_admin(handler): """检查当前用户是否有管理员权限""" client_ip = handler.request.remote_ip # 支持IPv4和IPv6的本地地址 return client_ip in ["127.0.0.1", "::1", "localhost"] or \ client_ip == ADMIN_IP or \ handler.get_cookie("admin_auth") == ADMIN_PASSWORD # 手动资源扫描接口 class ManualScanHandler(tornado.web.RequestHandler): async def get(self): if not is_admin(self): self.set_status(403) self.write("权限不足") return await tornado.ioloop.IOLoop.current().run_in_executor(None, scan_pools) WSHandler.broadcast_update() self.write("资源池已手动扫描完成") # 主页面处理器 class MainHandler(tornado.web.RequestHandler): def get(self): try: if self.request.path == "/admin": if not is_admin(self): self.redirect("/") return self.render("admin.html", pools=RESOURCE_POOLS) else: if is_admin(self): self.render("admin.html", pools=RESOURCE_POOLS) else: self.render("user.html", pools=RESOURCE_POOLS) except FileNotFoundError: self.set_status(404) self.render("404.html") except Exception as e: self.set_status(500) self.render("500.html", error=str(e)) # WebSocket实时更新处理器 class WSHandler(tornado.websocket.WebSocketHandler): clients = set() def open(self): self.clients.add(self) self.send_update() def on_message(self, message): pass def on_close(self): self.clients.remove(self) def send_update(self): for client in self.clients: client.write_message(json.dumps(RESOURCE_POOLS)) @classmethod def broadcast_update(cls): for client in cls.clients: client.write_message(json.dumps(RESOURCE_POOLS)) # 资源池更新处理器(仅管理员) class UpdatePoolHandler(tornado.web.RequestHandler): def post(self): if not is_admin(self): self.write({"status": "error", "message": "无权限执行此操作"}) return data = json.loads(self.request.body) pool_name = data.get("pool") new_path = data.get("path") if pool_name and new_path and pool_name in RESOURCE_POOLS: if not os.path.exists(new_path) or not os.path.isdir(new_path): self.write({"status": "error", "message": "路径不存在或不是目录"}) return RESOURCE_POOLS[pool_name]["path"] = new_path try: RESOURCE_POOLS[pool_name]["files"] = [ f for f in os.listdir(new_path) if os.path.isfile(os.path.join(new_path, f)) ] except Exception as e: RESOURCE_POOLS[pool_name]["files"] = [f"错误: {str(e)}"] save_config() WSHandler.broadcast_update() self.write({"status": "success"}) else: self.write({"status": "error", "message": "无效资源池或路径"}) # 文件列表处理器 class FileListHandler(tornado.web.RequestHandler): async def get(self, pool_name): if pool_name not in RESOURCE_POOLS: self.set_status(404) self.render("404.html") return current_path = self.get_argument("path", default="") pool_path = RESOURCE_POOLS[pool_name]["path"] full_path = os.path.join(pool_path, current_path) full_path = os.path.normpath(full_path) if not full_path.startswith(pool_path): self.set_status(403) self.write("非法路径访问") return if not os.path.exists(full_path) or not os.path.isdir(full_path): self.set_status(404) self.write("目录不存在") return file_tree = get_recursive_file_list(full_path) self.render("file_tree.html", pool_name=pool_name, current_path=current_path, file_tree=file_tree, pool_info=RESOURCE_POOLS[pool_name], is_admin=is_admin(self)) # 音乐播放处理器 class MusicPlayerHandler(tornado.web.RequestHandler): def get(self): pool_name = self.get_argument("pool") file_name = self.get_argument("file") if pool_name not in RESOURCE_POOLS: self.set_status(404) self.write("资源池不存在") return pool_path = RESOURCE_POOLS[pool_name]["path"] file_path = os.path.join(pool_path, file_name) real_path = os.path.realpath(file_path) if not real_path.startswith(os.path.realpath(pool_path)): self.set_status(403) self.write("禁止访问此文件路径") return if not os.path.exists(file_path): self.set_status(404) self.write("文件不存在") return self.render("music_player.html", file_name=file_name, pool_name=pool_name) # 文件下载处理器 class FileDownloadHandler(tornado.web.RequestHandler): async def get(self): pool_name = self.get_argument("pool") file_name = self.get_argument("file") if pool_name not in RESOURCE_POOLS: self.set_status(404) self.render("404.html") return pool_path = RESOURCE_POOLS[pool_name]["path"] file_path = os.path.join(pool_path, file_name) real_path = os.path.realpath(file_path) if not real_path.startswith(os.path.realpath(pool_path)): self.set_status(403) self.write("禁止访问此文件路径") return if not os.path.exists(file_path): self.set_status(404) self.render("404.html") return file_size = os.path.getsize(file_path) range_header = self.request.headers.get('Range') start = 0 end = file_size - 1 if range_header: self.set_status(206) start_bytes, end_bytes = range_header.replace('bytes=', '').split('-') start = int(start_bytes) if start_bytes else 0 end = int(end_bytes) if end_bytes else file_size - 1 self.set_header('Content-Range', f'bytes {start}-{end}/{file_size}') self.set_header("Content-Length", str(end - start + 1)) encoded_filename = urllib.parse.quote(file_name.encode('utf-8')) self.set_header("Content-Type", "application/octet-stream") self.set_header("Content-Disposition", f'attachment; filename*=UTF-8\'\'{encoded_filename}') self.set_header("Accept-Ranges", "bytes") chunk_size = 1024 * 1024 * 4 try: async with aiofiles.open(file_path, 'rb') as f: await f.seek(start) remaining = end - start + 1 while remaining > 0: read_size = min(chunk_size, remaining) chunk = await f.read(read_size) if not chunk: break self.write(chunk) await self.flush() remaining -= len(chunk) self.finish() except tornado.iostream.StreamClosedError: return except Exception as e: self.set_status(500) self.write(f"Internal Server Error: {str(e)}") return # 文件预览处理器 class FilePreviewHandler(tornado.web.RequestHandler): async def get(self): pool_name = self.get_argument("pool") file_name = self.get_argument("file") if pool_name not in RESOURCE_POOLS: self.set_status(404) self.render("404.html") return pool_path = RESOURCE_POOLS[pool_name]["path"] file_path = os.path.join(pool_path, file_name) real_path = os.path.realpath(file_path) if not real_path.startswith(os.path.realpath(pool_path)): self.set_status(403) self.write("禁止访问此文件路径") return if not os.path.exists(file_path): self.set_status(404) self.render("404.html") return ext = os.path.splitext(file_name)[1].lower() # 判断是否为音乐文件,跳转到播放器 if ext in ['.mp3', '.wav', '.flac', '.ogg', '.aac']: self.redirect(f"/music_player?pool={pool_name}&file={file_name}") return mime_type = { '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.pdf': 'application/pdf', '.txt': 'text/plain', '.md': 'text/markdown', '.html': 'text/html', '.css': 'text/css', '.js': 'application/javascript', '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.mp4': 'video/mp4', '.webm': 'video/webm', '.ogg': 'video/ogg', }.get(ext, mimetypes.guess_type(file_name)[0] or 'application/octet-stream') encoded_filename = urllib.parse.quote(file_name.encode('utf-8')) self.set_header("Content-Disposition", f'inline; filename*=UTF-8\'\'{encoded_filename}') file_size = os.path.getsize(file_path) range_header = self.request.headers.get('Range') is_download = not range_header and ext in ['.mp4', '.webm', '.ogg', '.pdf'] if is_download: self.set_header('Content-Type', mime_type) self.set_header('Content-Length', str(file_size)) self.set_status(200) if os.name == 'posix': fd = os.open(file_path, os.O_RDONLY) try: await tornado.ioloop.IOLoop.current().run_in_executor( None, os.sendfile, self.request.connection.fileno(), fd, None, file_size ) finally: os.close(fd) else: with open(file_path, 'rb') as f: while True: chunk = f.read(1024 * 1024 * 4) if not chunk: break self.write(chunk) await self.flush() self.finish() elif ext in ['.mp4', '.webm', '.ogg', '.pdf']: self.set_header('Accept-Ranges', 'bytes') start = 0 end = file_size - 1 if range_header: self.set_status(206) start_bytes, end_bytes = range_header.replace('bytes=', '').split('-') start = int(start_bytes) if start_bytes else 0 end = int(end_bytes) if end_bytes else file_size - 1 self.set_header('Content-Range', f'bytes {start}-{end}/{file_size}') self.set_header("Content-Length", str(end - start + 1)) self.set_header('Content-Type', mime_type) chunk_size = 4096 * 16 async with aiofiles.open(file_path, 'rb') as f: await f.seek(start) remaining = end - start + 1 while remaining > 0: chunk = await f.read(min(chunk_size, remaining)) if not chunk: break self.write(chunk) await self.flush() remaining -= len(chunk) self.finish() elif ext == '.txt': self.set_header('Content-Type', 'text/plain; charset=UTF-8') async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: content = await f.read() self.write(content) self.finish() elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.md', '.pdf', '.docx', '.xlsx', '.pptx']: self.set_header('Content-Type', mime_type) async with aiofiles.open(file_path, 'rb') as f: data = await f.read() self.write(data) self.finish() else: self.set_status(400) self.write("不支持预览此文件类型") server: import tornado.ioloop import tornado.web import threading import time import os import logging import asyncio from concurrent.futures import ThreadPoolExecutor from handlers import MainHandler, WSHandler, UpdatePoolHandler, FileListHandler, FileDownloadHandler, FilePreviewHandler, ManualScanHandler, MusicPlayerHandler from file_utils import scan_pools # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("ResourceManager") # 自定义线程池(用于异步执行资源池扫描等阻塞操作) executor = ThreadPoolExecutor(4) # 根据 CPU 核心数调整线程池大小 # 设置 asyncio 默认事件循环的执行器为线程池 asyncio.get_event_loop().set_default_executor(executor) # 修复模板路径和路由配置 def make_app(): base_dir = os.path.dirname(os.path.abspath(__file__)) template_path = os.path.join(base_dir, "templates") static_path = os.path.join(base_dir, "static") return tornado.web.Application( [ (r"/", MainHandler), (r"/ws", WSHandler), (r"/update_pool", UpdatePoolHandler), (r"/files/(.+)", FileListHandler), (r"/download", FileDownloadHandler), (r"/music_player", MusicPlayerHandler), (r"/preview", FilePreviewHandler), (r"/scan", ManualScanHandler), (r"/static/(.*)", tornado.web.StaticFileHandler, {"path": static_path}), (r"/admin", MainHandler), ], template_path=template_path, cookie_secret="YOUR_SECRET_KEY", login_url="/login", debug=False, # 生产环境建议关闭 debug max_buffer_size=1024 * 1024 * 1024, # 最大缓冲区大小(1GB) chunked_write_timeout=60000, # 分块写入超时时间(毫秒) idle_connection_timeout=60000, # 空闲连接超时时间 executor=executor # 使用自定义线程池 ) if __name__ == "__main__": try: # 初始化资源池扫描 scan_pools() logger.info("资源池扫描初始化完成") except Exception as e: logger.error(f"资源池扫描初始化失败: {str(e)}") # 创建并启动应用 app = make_app() app.listen(8888) logger.info("服务器运行中: http://localhost:8888") try: # 启动事件循环 tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: logger.info("\n服务器正在关闭...") except Exception as e: logger.error(f"服务器异常停止: {str(e)}") finally: # 清理资源(可选) executor.shutdown(wait=True) #预览视频下载和下载按钮下载速度大幅度提升,txt文本实现在线预览,word等文档预览会下载而不是显示不支持预览,本地速度快,跨设备比较慢 file_utils: import os import time import mimetypes import logging # 配置日志 logger = logging.getLogger("FileUtils") logging.basicConfig(level=logging.INFO) # 从 config 导入 RESOURCE_POOLS from config import RESOURCE_POOLS def get_file_list(pool_name): if pool_name not in RESOURCE_POOLS: return [] pool = RESOURCE_POOLS[pool_name] files = [] path = pool["path"] if not os.path.exists(path): logger.error(f"路径不存在: {path}") return [{"name": "错误: 路径不存在", "formatted_size": "0 B"}] if not os.path.isdir(path): logger.error(f"路径不是目录: {path}") return [{"name": "错误: 路径不是目录", "formatted_size": "0 B"}] for file_name in pool.get("files", []): file_path = os.path.join(path, file_name) if not os.path.exists(file_path): logger.warning(f"文件不存在: {file_path}") continue try: stat = os.stat(file_path) file_type = get_file_type(file_path) files.append({ "name": file_name, "size": stat.st_size, "formatted_size": _format_size(stat.st_size), "modified": time.strftime("%Y-%m-%d %H:%M", time.localtime(stat.st_mtime)), "path": file_path, "type": file_type }) except Exception as e: logger.error(f"访问文件错误 {file_path}: {e}") files.append({ "name": f"错误: {file_name}", "formatted_size": "0 B", "type": "error" }) return files def _format_size(size): for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} TB" def get_file_type(file_path): mime_type, _ = mimetypes.guess_type(file_path) if mime_type: return mime_type.split('/')[0] return 'other' def scan_pools(): for pool_name, pool in RESOURCE_POOLS.items(): path = pool["path"] if not os.path.exists(path) or not os.path.isdir(path): pool["files"] = [f"错误: 路径无效 {path}"] continue try: pool["files"] = [ f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) ] logger.info(f"扫描完成: {pool_name} ({len(pool['files'])} 文件)") except Exception as e: pool["files"] = [f"错误: {str(e)}"] logger.error(f"扫描资源池错误 {pool_name}: {e}") def get_recursive_file_list(path, base_path=None): if base_path is None: base_path = path files = [] try: for entry in os.scandir(path): relative_path = os.path.relpath(entry.path, base_path) if entry.is_dir(): files.append({ "name": entry.name, "type": "dir", "children": get_recursive_file_list(entry.path, base_path) }) elif entry.is_file(): files.append({ "name": entry.name, "type": "file", "size": os.path.getsize(entry.path), "modified": time.strftime("%Y-%m-%d %H:%M", time.localtime(os.path.getmtime(entry.path))) }) except PermissionError: logger.warning(f"无权访问目录: {path}") return [{"name": "无权访问", "type": "error"}] except Exception as e: logger.error(f"扫描目录失败 {path}: {e}") return [{"name": f"错误: {str(e)}", "type": "error"}] return files

帮我把os.path.abspath(path) 返回绝对路径 os.path.basename(path) 返回文件名 os.path.commonprefix(list) 返回list(多个路径)中,所有path共有的最长的路径 os.path.dirname(path) 返回文件路径 os.path.exists(path) 如果路径 path 存在,返回 True;如果路径 path 不存在或损坏,返回 False。 os.path.lexists(path) 路径存在则返回 True,路径损坏也返回 True os.path.expanduser(path) 把 path 中包含的 ~ 和 ~user 转换成用户目录 os.path.expandvars(path) 根据环境变量的值替换 path 中包含的 $name 和 ${name} os.path.getatime(path) 返回最近访问时间(浮点型秒数) os.path.getmtime(path) 返回最近文件修改时间 os.path.getctime(path) 返回文件 path 创建时间 os.path.getsize(path) 返回文件大小,如果文件不存在就返回错误 os.path.isabs(path) 判断是否为绝对路径 os.path.isfile(path) 判断路径是否为文件 os.path.isdir(path) 判断路径是否为目录 os.path.islink(path) 判断路径是否为链接 os.path.ismount(path) 判断路径是否为挂载点 os.path.join(path1[, path2[, ...]]) 把目录和文件名合成一个路径 os.path.normcase(path) 转换path的大小写和斜杠 os.path.normpath(path) 规范path字符串形式 os.path.realpath(path) 返回path的真实路径 os.path.relpath(path[, start]) 从start开始计算相对路径 os.path.samefile(path1, path2) 判断目录或文件是否相同 os.path.sameopenfile(fp1, fp2) 判断fp1和fp2是否指向同一文件 os.path.samestat(stat1, stat2) 判断stat tuple stat1和stat2是否指向同一个文件 os.path.split(path) 把路径分割成 dirname 和 basename,返回一个元组 os.path.splitdrive(path) 一般用在 windows 下,返回驱动器名和路径组成的元组 os.path.splitext(path) 分割路径,返回路径名和文件扩展名的元组 os.path.splitunc(path) 把路径分割为加载点与文件 os.path.walk(path, visit, arg) 遍历path,进入每个目录都调用visit函数,visit函数必须有3个参数(arg, dirname, names),dirname表示当前目录的目录名,names代表当前目录下的所有文件名,args则为walk的第三个参数 os.path.supports_unicode_filenames 设置是否支持unicode路径名修改成### Python os模块函数功能分类整理 #### 1. **文件与目录操作** - **路径处理** - os.path.join():拼接路径 - os.path.abspath():获取绝对路径 - os.path.dirname(path):返回目录路径(去掉文件名) python os.path.dirname("E:/Read_File/read_yaml.py") # 输出: E:/Read_File - os.path.basename():提取文件名 - os.path.splitext(path):分割文件名与扩展名(返回二元组) python os.path.splitext('a/b/text.txt') # 输出: ('a/b/text', '.txt') - **目录遍历与操作** - os.walk(top):递归遍历目录树(返回路径、子目录、文件列表) - os.mkdir() / os.makedirs():创建目录(单级/多级) - os.rmdir() / os.removedirs():删除目录(单级/多级) - **文件属性** - os.path.getsize(file):获取文件大小(字节) - os.path.exists():检查路径是否存在 - os.path.isfile() / os.path.isdir():判断是否为文件/目录 --- #### 2. **环境变量管理** - os.environ:字典形式存储所有环境变量 - os.environ.get(key):安全获取环境变量值(无键时返回 None) python db_password = os.environ.get("DB_PASSWORD") # 安全获取敏感信息 --- #### 3. **进程与系统控制** - **进程管理** - os.system(command):执行系统命令 - os.kill(pid, signal):终止进程 - **系统信息** - os.name:操作系统名称(如 'nt' 表示 Windows) - os.sep:系统路径分隔符(如 \ 或 /) --- #### 4. **路径规范化与扩展** - os.path.normpath(path):规范化路径(移除冗余分隔符) python os.path.normpath("C://Users//test") # 输出: C:\Users\test --- #### 5. **文件操作** - os.rename(src, dst):重命名/移动文件 - os.remove(path):删除文件 - os.stat(path):获取文件状态(大小、修改时间等) ---这样然后用.md文件格式返回给我

import os import sys import logging import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm import tkinter as tk from tkinter import filedialog, messagebox, ttk from tkinterdnd2 import DND_FILES, TkinterDnD import json from pdf2docx import Converter import pytesseract from pdf2image import convert_from_path from docx import Document from docx.shared import Pt, Cm, Inches from docx.enum.text import WD_ALIGN_PARAGRAPH import numexpr as ne import tempfile import traceback import io import threading import queue # 获取程序运行路径 def get_application_path(): """获取应用程序路径""" if getattr(sys, 'frozen', False): # 如果是打包后的exe return os.path.dirname(sys.executable) else: # 如果是直接运行的python脚本 return os.path.dirname(os.path.abspath(__file__)) # 设置NumExpr线程数 ne.set_num_threads(8) # 配置文件路径 CONFIG_FILE = os.path.join(get_application_path(), "config.json") # 日志文件路径 LOG_FILE = os.path.join(get_application_path(), "conversion.log") # 自定义StreamHandler来捕获所有输出 class StreamToLogger(io.StringIO): def __init__(self, logger, level): super().__init__() self.logger = logger self.level = level self.buf = '' def write(self, buf): self.buf = buf.strip('\r\n\t ') if self.buf: self.logger.log(self.level, self.buf) def flush(self): pass def setup_logging(): """配置日志系统""" try: app_path = get_application_path() log_dir = os.path.join(app_path, 'logs') os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, 'conversion.log') # 创建logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 创建文件处理器 file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8') file_handler.setLevel(logging.DEBUG) # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s') file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) logging.debug(f"日志系统初始化成功,日志文件路径:{log_file}") return logger except Exception as e: print(f"设置日志系统失败: {str(e)}") return None # 初始化日志记录器 logger = setup_logging() if not logger: print("警告:日志系统初始化失败,程序将继续运行但不会记录日志") # 创建一个基本的日志记录器 logger = logging.getLogger() logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) def load_config(): try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: config = json.load(f) # 自动修正Poppler路径 poppler_path = config.get('POPPLER_PATH', '') if poppler_path: # 检查是否存在bin子目录 bin_path = os.path.join(poppler_path, 'bin') if os.path.exists(bin_path): config['POPPLER_PATH'] = bin_path return config return {} except: return {} def save_config(config): """保存配置文件""" with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=4) def setup_config(): """首次运行配置界面""" root = TkinterDnD.Tk() root.title("PDF转Word工具 - 首次配置") root.geometry("600x400") # 设置窗口样式 root.configure(bg='#f0f0f0') style = {'bg': '#f0f0f0', 'font': ('微软雅黑', 10)} button_style = {'bg': '#4CAF50', 'fg': 'white', 'font': ('微软雅黑', 10), 'padx': 10, 'pady': 5} # 创建说明标签 tk.Label(root, text="首次使用需要配置以下路径:", **style).pack(pady=10) # Poppler路径配置 poppler_frame = tk.Frame(root, bg='#f0f0f0') poppler_frame.pack(fill='x', padx=20, pady=5) tk.Label(poppler_frame, text="Poppler路径:", **style).pack(side='left') poppler_path = tk.StringVar() poppler_entry = tk.Entry(poppler_frame, textvariable=poppler_path, width=50) poppler_entry.pack(side='left', padx=5) def select_poppler(): path = filedialog.askdirectory(title="选择Poppler安装目录") if path: poppler_path.set(path) tk.Button(poppler_frame, text="浏览", command=select_poppler, **button_style).pack(side='left') # Tesseract路径配置 tesseract_frame = tk.Frame(root, bg='#f0f0f0') tesseract_frame.pack(fill='x', padx=20, pady=5) tk.Label(tesseract_frame, text="Tesseract路径:", **style).pack(side='left') tesseract_path = tk.StringVar() tesseract_entry = tk.Entry(tesseract_frame, textvariable=tesseract_path, width=50) tesseract_entry.pack(side='left', padx=5) def select_tesseract(): path = filedialog.askdirectory(title="选择Tesseract安装目录") if path: tesseract_path.set(path) tk.Button(tesseract_frame, text="浏览", command=select_tesseract, **button_style).pack(side='left') # 说明文本 help_text = """ 使用说明: 1. Poppler路径:选择poppler的安装目录(包含bin文件夹的目录) 2. Tesseract路径:选择Tesseract-OCR的安装目录 3. 配置完成后点击"保存配置"即可开始使用 4. Poppler和Tesseract需放在在系统C盘下(C:\Program Files\) """ tk.Label(root, text=help_text, justify='left', **style).pack(pady=10) def save_and_exit(): if not poppler_path.get() or not tesseract_path.get(): messagebox.showerror("错误", "请填写所有配置项!") return tesseract_dir = tesseract_path.get() tesseract_exe = os.path.join(tesseract_dir, 'tesseract.exe') # 新增:验证Tesseract路径有效性 if not os.path.isfile(tesseract_exe): messagebox.showerror("错误", "选择的Tesseract路径无效,未找到tesseract.exe!") return config = { 'POPPLER_PATH': poppler_path.get(), 'TESSERACT_CMD': tesseract_exe # 直接使用正确路径 } save_config(config) root.destroy() tk.Button(root, text="保存配置", command=save_and_exit, **button_style).pack(pady=20) root.mainloop() class PDFConverterGUI: def __init__(self): self.root = TkinterDnD.Tk() self.root.title("PDF转Word工具") self.root.geometry("800x600") # 设置样式 self.style = {'bg': '#f0f0f0', 'font': ('微软雅黑', 10)} self.button_style = {'bg': '#4CAF50', 'fg': 'white', 'font': ('微软雅黑', 10), 'padx': 10, 'pady': 5} # 绑定窗口关闭事件 self.root.protocol("WM_DELETE_WINDOW", self.on_closing) self.setup_ui() def on_closing(self): """处理窗口关闭事件""" try: if messagebox.askokcancel("退出", "确定要退出程序吗?"): self.root.destroy() sys.exit(0) except Exception as e: logger.error(f"关闭窗口时出错: {str(e)}") self.root.destroy() sys.exit(1) def setup_ui(self): # 创建主框架 main_frame = tk.Frame(self.root, bg='#f0f0f0') main_frame.pack(fill='both', expand=True, padx=20, pady=20) # 路径选择区域 path_frame = tk.Frame(main_frame, bg='#f0f0f0') path_frame.pack(fill='x', pady=10) tk.Label(path_frame, text="选择PDF文件或目录:", **self.style).pack(side='left') self.path_var = tk.StringVar() path_entry = tk.Entry(path_frame, textvariable=self.path_var, width=50) path_entry.pack(side='left', padx=5) tk.Button(path_frame, text="选择文件", command=self.select_file, **self.button_style).pack(side='left', padx=5) tk.Button(path_frame, text="选择目录", command=self.select_directory, **self.button_style).pack(side='left') # 文件列表区域 list_frame = tk.Frame(main_frame, bg='#f0f0f0') list_frame.pack(fill='both', expand=True, pady=10) tk.Label(list_frame, text="待处理文件列表(支持拖放文件):", **self.style).pack(anchor='w') # 创建带滚动条的列表框 self.listbox_frame = tk.Frame(list_frame, bg='#f0f0f0') self.listbox_frame.pack(fill='both', expand=True) self.file_listbox = tk.Listbox(self.listbox_frame, width=80, height=15) self.file_listbox.pack(side='left', fill='both', expand=True) scrollbar = tk.Scrollbar(self.listbox_frame) scrollbar.pack(side='right', fill='y') self.file_listbox.config(yscrollcommand=scrollbar.set) scrollbar.config(command=self.file_listbox.yview) # 绑定拖放事件 self.file_listbox.drop_target_register(DND_FILES) self.file_listbox.dnd_bind('<<Drop>>', self.handle_drop) # 进度条 self.progress_var = tk.DoubleVar() self.progress = ttk.Progressbar(main_frame, variable=self.progress_var, maximum=100) self.progress.pack(fill='x', pady=10) # 状态标签 self.status_var = tk.StringVar(value="就绪") tk.Label(main_frame, textvariable=self.status_var, **self.style).pack(pady=5) # 按钮区域 button_frame = tk.Frame(main_frame, bg='#f0f0f0') button_frame.pack(pady=10) tk.Button(button_frame, text="开始转换", command=self.start_conversion, **self.button_style).pack(side='left', padx=5) tk.Button(button_frame, text="清空列表", command=self.clear_list, **self.button_style).pack(side='left', padx=5) tk.Button(button_frame, text="退出", command=self.root.quit, **self.button_style).pack(side='left', padx=5) # 添加日志显示区域 log_frame = tk.Frame(main_frame, bg='#f0f0f0') log_frame.pack(fill='both', expand=True, pady=10) tk.Label(log_frame, text="转换日志:", **self.style).pack(anchor='w') # 创建带滚动条的日志文本框 self.log_text = tk.Text(log_frame, wrap=tk.WORD, width=80, height=10) scrollbar = tk.Scrollbar(log_frame) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.log_text.config(yscrollcommand=scrollbar.set) scrollbar.config(command=self.log_text.yview) # 配置日志队列和更新机制 self.log_queue = queue.Queue() self.setup_log_handler() self.root.after(100, self.update_log_display) def update_log_display(self): """定期更新日志显示""" while not self.log_queue.empty(): msg = self.log_queue.get() self.log_text.configure(state='normal') self.log_text.insert(tk.END, msg + '\n') self.log_text.configure(state='disabled') # 自动滚动到底部 self.log_text.see(tk.END) self.root.after(100, self.update_log_display) def setup_log_handler(self): """配置GUI日志处理器""" class QueueHandler(logging.Handler): def __init__(self, queue): super().__init__() self.queue = queue def emit(self, record): msg = self.format(record) self.queue.put(msg) # 创建并添加自定义处理器 handler = QueueHandler(self.log_queue) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) def handle_drop(self, event): """处理文件拖放事件""" files = event.data.split() for file in files: # 移除文件路径中的花括号(如果有) file = file.strip('{}') if file.lower().endswith('.pdf'): if file not in self.file_listbox.get(0, tk.END): self.file_listbox.insert(tk.END, file) self.path_var.set(os.path.dirname(file)) elif os.path.isdir(file): # 如果是目录,则添加目录下的所有PDF文件 pdf_files = find_pdf_files(file) for pdf_file in pdf_files: if pdf_file not in self.file_listbox.get(0, tk.END): self.file_listbox.insert(tk.END, pdf_file) self.path_var.set(file) def select_file(self): files = filedialog.askopenfilenames( title="选择PDF文件", filetypes=[("PDF文件", "*.pdf")] ) if files: for file in files: if file not in self.file_listbox.get(0, tk.END): self.file_listbox.insert(tk.END, file) self.path_var.set(os.path.dirname(files[0])) def select_directory(self): directory = filedialog.askdirectory(title="选择包含PDF文件的目录") if directory: self.path_var.set(directory) self.clear_list() pdf_files = find_pdf_files(directory) for file in pdf_files: self.file_listbox.insert(tk.END, file) def clear_list(self): self.file_listbox.delete(0, tk.END) self.progress_var.set(0) self.status_var.set("就绪") def start_conversion(self): files = list(self.file_listbox.get(0, tk.END)) if not files: messagebox.showwarning("警告", "请先选择要转换的PDF文件!") return total_files = len(files) self.progress_var.set(0) def update_progress(current, total): try: progress = (current / total) * 100 self.progress_var.set(progress) self.status_var.set(f"正在处理: {current}/{total}") self.root.update() except Exception as e: logger.error(f"更新进度时出错: {str(e)}") for i, file in enumerate(files, 1): try: self.status_var.set(f"正在处理: {os.path.basename(file)}") self.root.update() if process_single_pdf(file): self.file_listbox.itemconfig(i - 1, {'bg': '#90EE90'}) # 浅绿色表示成功 else: self.file_listbox.itemconfig(i - 1, {'bg': '#FFB6C1'}) # 浅红色表示失败 update_progress(i, total_files) except Exception as e: logger.error(f"处理文件失败: {str(e)}", exc_info=True) self.file_listbox.itemconfig(i - 1, {'bg': '#FFB6C1'}) self.status_var.set("转换完成!") messagebox.showinfo("完成", f"处理完成!共处理 {total_files} 个文件。") def validate_environment(): missing_deps = [] # 检查Poppler路径 if not os.path.isdir(POPPLER_PATH): missing_deps.append(f"Poppler路径不存在: {POPPLER_PATH}") else: # 检查是否存在关键可执行文件 required_files = ['pdfinfo.exe', 'pdftoppm.exe'] for file in required_files: if not os.path.isfile(os.path.join(POPPLER_PATH, file)): missing_deps.append(f"Poppler路径中缺失文件: {file}") # 检查Tesseract是否存在且是可执行文件 if not os.path.isfile(TESSERACT_CMD) or not os.access(TESSERACT_CMD, os.X_OK): missing_deps.append(f"Tesseract路径不正确或不可执行: {TESSERACT_CMD}") if missing_deps: for dep in missing_deps: logger.error(dep) messagebox.showerror("配置错误", "环境验证失败:\n" + "\n".join(missing_deps) + "\n请检查Poppler是否安装正确") return False return True def extract_text_from_image(image): """从图片中提取文字""" try: # 配置OCR参数 custom_config = r'--oem 3 --psm 6 -l chi_sim' text = pytesseract.image_to_string(image, config=custom_config) return text except Exception as e: logger.error(f"OCR识别出错: {str(e)}", exc_info=True) return "" def process_single_pdf(pdf_path): """处理单个PDF文件""" temp_file = None temp_path = None try: start_time = time.time() pdf_name = os.path.basename(pdf_path) word_path = os.path.splitext(pdf_path)[0] + '.docx' logger.info(f"▶ 开始处理: {pdf_name}") # 基础转换 cv = Converter(pdf_path) cv.convert(word_path) cv.close() logger.info(" 基础转换完成") # OCR处理 images = convert_from_path(pdf_path, poppler_path=POPPLER_PATH) total_pages = len(images) logger.info(f" 共检测到 {total_pages} 页需要OCR识别") # 创建临时文件来存储OCR识别的文本 temp_path = os.path.join(tempfile.gettempdir(), f'ocr_text_{int(time.time())}.txt') temp_file = open(temp_path, 'w', encoding='utf-8') # 对每一页进行OCR识别 for idx, image in enumerate(images): logger.info(f" 正在处理第 {idx + 1}/{total_pages} 页") text = extract_text_from_image(image) if text.strip(): # 如果识别到文字 temp_file.write(f"第{idx + 1}页识别到的文字:\n{text}\n\n") # 确保所有内容都写入文件 temp_file.flush() temp_file.close() temp_file = None # 将OCR识别的文字添加到Word文档中 if os.path.exists(temp_path): with open(temp_path, 'r', encoding='utf-8') as f: ocr_text = f.read() if ocr_text.strip(): logger.info(" 正在将OCR识别的文字添加到Word文档...") # 打开已转换的Word文档 doc = Document(word_path) # 添加OCR识别的文字 doc.add_paragraph("\nOCR识别结果:") doc.add_paragraph(ocr_text) # 保存文档 doc.save(word_path) logger.info(f" 已添加OCR识别结果") cost_time = time.time() - start_time logger.info(f"✓ 处理完成,耗时 {cost_time:.2f} 秒\n") return True except Exception as e: logger.error(f"转换失败: {str(e)}", exc_info=True) return False finally: # 确保临时文件被关闭和删除 if temp_file and not temp_file.closed: try: temp_file.close() except: pass if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) except: pass def find_pdf_files(directory): """递归查找所有PDF文件""" pdf_files = [] try: for root, _, files in os.walk(directory): for file in files: if file.lower().endswith('.pdf'): full_path = os.path.join(root, file) pdf_files.append(full_path) return pdf_files except Exception as e: logger.error(f"文件搜索失败: {str(e)}") return [] def main(): try: print("程序启动...") logger.info("程序启动") # 检查配置文件 config = load_config() if not config: print("首次运行,请进行配置...") logger.info("首次运行,启动配置界面") setup_config() config = load_config() if not config: print("配置失败,程序退出") logger.error("配置失败,程序退出") return # 设置全局变量 global POPPLER_PATH, TESSERACT_CMD, TESSDATA_PREFIX POPPLER_PATH = config.get('POPPLER_PATH') TESSERACT_CMD = config.get('TESSERACT_CMD') TESSERACT_DIR = os.path.dirname(TESSERACT_CMD) TESSDATA_PREFIX = os.path.join(TESSERACT_DIR, 'tessdata') os.environ['TESSDATA_PREFIX'] = TESSDATA_PREFIX # 新增:设置pytesseract的tesseract路径 pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD # 添加这一行 print(f"Poppler路径: {POPPLER_PATH}") print(f"Tesseract路径: {TESSERACT_CMD}") # 验证环境 if not validate_environment(): print("环境验证失败") logger.error("环境验证失败") messagebox.showerror("错误", "环境配置不正确,请检查配置!") return # 启动GUI print("启动GUI界面...") logger.info("启动GUI界面") app = PDFConverterGUI() app.root.mainloop() except Exception as e: error_msg = f"程序运行出错: {str(e)}\n{traceback.format_exc()}" print(error_msg) logger.error(error_msg) messagebox.showerror("错误", f"程序运行出错:{str(e)}") finally: # 确保所有日志都被写入 for handler in logger.handlers: handler.flush() handler.close() if __name__ == "__main__": main()

那么根据我的提供的代码帮我添加一下,在HTTP模式下登录成功,对该用户名添加cookie和httponly from utils import USER_FILE, UPLOAD_DIR, DOWNLOAD_DIR, load_users, save_user, check_user, check_user_md5 from bottle import Bottle, static_file, redirect, request, run, template, BaseRequest, response from bottle_auth import AuthPlugin import hashlib import os import ctypes import tarfile import tempfile from routes_en import setup_routes_en, cleanup_tar_files import threading import time USER_FILE = '/mnt/usrfs/webTest/user_auth.txt' UPLOAD_DIR = '/mnt/usrfs/upload' DOWNLOAD_DIR = '/mnt/usrfs/' BaseRequest.MEMFILE_MAX = 1024 * 1024 * 15 # 15MB # 5. 自定义 AuthPlugin,支持md5密码 class MD5AuthPlugin(AuthPlugin): def __init__(self, users): super().__init__(users) def check_auth(self, username, password): # password是明文,转md5后比对 pwd_md5 = hashlib.md5(password.encode()).hexdigest() return check_user_md5(username, pwd_md5) def require(self, f): def check_auth(*args, **kwargs): username = request.auth[0] if request.auth else None password = request.auth[1] if request.auth else None if not username or not password: response.headers['WWW-Authenticate'] = 'Basic realm=\"Login Required\"' response.status = 401 return "Authentication required" if not self.check_auth(username, password): response.headers['WWW-Authenticate'] = 'Basic realm=\"Login Required\"' response.status = 401 return "Authentication failed" return f(*args, **kwargs) return check_auth # 6. 创建 Bottle 应用 app = Bottle() users = load_users() auth = MD5AuthPlugin(users) app.install(auth) setup_routes_en(app,auth) # 启动清理线程 if __name__ == '__main__': # 如果没有用户文件,自动创建一个默认用户 if not os.path.exists(USER_FILE): save_user('root', 'root') # 使用最简单的配置 t = threading.Thread(target=cleanup_tar_files, daemon=True) t.start() app.run(host='0.0.0.0', port=80, debug=False, reloader=False)以上是main;from utils import USER_FILE, UPLOAD_DIR, DOWNLOAD_DIR, check_user from bottle import static_file, redirect, request, template, response from bottle import template from page_generators_en import get_basic_info_html_en, get_dev_status_html_en import tarfile import os import time from vtysh_cmd_send import VtyshCommandExecutor USER_FILE = '/mnt/usrfs/webTest/user_auth.txt' UPLOAD_DIR = '/mnt/usrfs/upload' DOWNLOAD_DIR = '/mnt/usrfs/' def setup_routes_en(app, auth): @app.route('/www/baseMsg_en.asp') def basic_info_www_en(): html = get_basic_info_html_en() return template('basic_info_en', basic_info_html_en=html) @app.route('/www/devStatus_en.asp') def dev_status_www_en(): html = get_dev_status_html_en() return template('dev_status_en', dev_status_html_en=html) @app.route('/www/ipaddr_en.asp') def ip_config_page(): return template('ip_address_en') @app.route('/api/execute_ip_config', method='POST') def execute_ip_config(): try: # 获取前端发送的配置 config = request.json # 验证配置 if not config or 'network' not in config: return {'success': False, 'error': 'Invalid configuration'} # 使用VtyshCommandExecutor执行配置 from vtysh_cmd_send import VtyshCommandExecutor executor = VtyshCommandExecutor("../bin/vtysh") result = executor.execute_config(config) return result except Exception as e: return {'success': False, 'error': str(e)} # def file_root_en(app): @app.route('/') def index(): return redirect('/www/index1/index_en.html') @app.route('/download_file/') def download_file(path): print(f"DEBUG: 下载请求路径: {path}") # 调试信息 abs_file = os.path.abspath(os.path.join(DOWNLOAD_DIR, path)) print(f"DEBUG: 绝对文件路径: {abs_file}") # 调试信息 # 检查DOWNLOAD_DIR是否存在 if not os.path.exists(DOWNLOAD_DIR): print(f"DEBUG: DOWNLOAD_DIR不存在") # 调试信息 return "DOWNLOAD_DIR不存在" if not abs_file.startswith(os.path.abspath(DOWNLOAD_DIR)): print(f"DEBUG: 路径安全检查失败") # 调试信息 return "非法路径" # 检查文件或目录是否存在 if not os.path.exists(abs_file): print(f"DEBUG: 文件/目录不存在: {abs_file}") # 调试信息 return f"文件/目录不存在: {abs_file}" # 如果是目录,创建tar文件 if os.path.isdir(abs_file): print(f"DEBUG: 开始打包目录: {abs_file}")# 调试信息 try: # 在DOWNLOAD_DIR中创建tar文件,而不是临时目录 tar_filename = os.path.basename(abs_file) + '.tar' tar_path = os.path.join(DOWNLOAD_DIR, tar_filename) print(f"DEBUG: tar文件路径: {tar_path}") # 调试信息 # 创建tar文件 with tarfile.open(tar_path, 'w') as tar: tar.add(abs_file, arcname=os.path.basename(abs_file)) # 检查tar文件是否创建成功 if os.path.exists(tar_path): file_size = os.path.getsize(tar_path) print(f"DEBUG: tar文件创建成功,大小: {file_size} 字节") # 调试信息 else: print(f"DEBUG: tar文件创建失败") # 调试信息 return "tar文件创建失败" print(f"DEBUG: 目录打包完成: {tar_path}") # 调试信息 print(f"DEBUG: 下载文件名: {tar_filename}") # 调试信息 # 使用static_file返回tar文件 # 设置响应头 response.set_header('Content-Type', 'application/x-tar') response.set_header('Content-Disposition', f'attachment; filename=\"{tar_filename}\"') response.set_header('Content-Length', str(file_size)) response.set_header('Accept-Ranges', 'bytes') # 直接 return 生成器 return generate_with_timeout(tar_path, timeout=60) except Exception as e: print(f"DEBUG: 打包目录失败: {e}") # 调试信息 return f"打包目录失败: {e}" # 如果是文件,直接下载 if not os.path.isfile(abs_file): print(f"DEBUG: 不是普通文件: {abs_file}") # 调试信息 return f"不是普通文件: {abs_file}" # 检查文件权限 if not os.access(abs_file, os.R_OK): print(f"DEBUG: 文件没有读取权限: {abs_file}") # 调试信息 return f"文件没有读取权限: {abs_file}" print(f"DEBUG: 文件存在,开始下载: {abs_file}") # 调试信息 filename = os.path.basename(abs_file) print(f"DEBUG: 下载文件名: {filename}") # 调试信息 # 使用正确的MIME类型和文件名 return static_file(filename, root=os.path.dirname(abs_file), download=filename) @app.route('/<filepath:path>') def serve_static(filepath): return static_file(filepath, root='/mnt/usrfs/') # 表单登录 @app.route('/action/login', method='POST') def do_login(): username = request.forms.get('username') password = request.forms.get('password') if check_user(username, password): return redirect('/www/index_en.htm') else: return "Login failed!" # 受保护路由 @app.route('/protected') @auth.require def protected(): return "你已通过 HTTP Basic Auth 认证!" @app.route('/www/web_upload_file_en.asp', method=['GET', 'POST']) def web_upload_file_en(): if request.method == 'GET': return template('upload_en') # 注意这里是 upload._en else: upload_file = request.files.get('uploadfile') if not upload_file or not upload_file.filename: return "没有选择文件!" save_path = os.path.join(UPLOAD_DIR, upload_file.filename) upload_file.save(save_path, overwrite=True) return f"文件已上传到: {save_path}返回" @app.route('/filebrowser') @app.route('/filebrowser/') def filebrowser(path=''): print(f"DEBUG: 访问路径: {path}") # 调试信息 abs_dir = os.path.abspath(os.path.join(DOWNLOAD_DIR, path)) print(f"DEBUG: 绝对路径: {abs_dir}") # 调试信息 if not abs_dir.startswith(os.path.abspath(DOWNLOAD_DIR)): print(f"DEBUG: 路径安全检查失败") # 调试信息 return "非法路径" if not os.path.isdir(abs_dir): print(f"DEBUG: 目录不存在: {abs_dir}") # 调试信息 return "目录不存在" items = [] for fname in sorted(os.listdir(abs_dir)): fpath = os.path.join(abs_dir, fname) is_dir = os.path.isdir(fpath) items.append({'name': fname, 'is_dir': is_dir}) parent = os.path.dirname(path) if path else None print(f"DEBUG: 父目录: {parent}") # 调试信息 return template('download_en', items=items, cur_path=path, parent=parent) def generate_with_timeout(file_path, timeout=60): start_time = time.time() with open(file_path, 'rb') as f: while True: if time.time() - start_time > timeout: break chunk = f.read(1024 * 1024) if not chunk: break yield chunk def cleanup_tar_files(): while True: now = time.time() for fname in os.listdir(DOWNLOAD_DIR): if fname.endswith('.tar'): fpath = os.path.join(DOWNLOAD_DIR, fname) try: if os.path.isfile(fpath): # 删除2分钟以前的tar文件 if now - os.path.getmtime(fpath) > 120: os.remove(fpath) print(f"DEBUG: 自动清理tar文件: {fpath}") except Exception as e: print(f"DEBUG: 清理tar文件失败: {e}") time.sleep(60) # 每分钟检查一次 以上是route部分

解释以下代码: SETTINGS['CAPTURE_MODE'] = "fd" # set to "no" if you want to see stdout/stderr in console logger = get_logger() ex = Experiment("pymarl") ex.logger = logger ex.captured_out_filter = apply_backspaces_and_linefeeds results_path = os.path.join(dirname(dirname(abspath(__file__))), "results") @ex.main def my_main(_run, _config, _log): # Setting the random seed throughout the modules config = config_copy(_config) np.random.seed(config["seed"]) th.manual_seed(config["seed"]) config['env_args']['seed'] = config["seed"] # run the framework run(_run, config, _log) def _get_config(params, arg_name, subfolder): config_name = None for _i, _v in enumerate(params): if _v.split("=")[0] == arg_name: config_name = _v.split("=")[1] del params[_i] break if config_name is not None: with open(os.path.join(os.path.dirname(__file__), "config", subfolder, "{}.yaml".format(config_name)), "r") as f: try: config_dict = yaml.load(f) except yaml.YAMLError as exc: assert False, "{}.yaml error: {}".format(config_name, exc) return config_dict def recursive_dict_update(d, u): for k, v in u.items(): if isinstance(v, collections.Mapping): d[k] = recursive_dict_update(d.get(k, {}), v) else: d[k] = v return d def config_copy(config): if isinstance(config, dict): return {k: config_copy(v) for k, v in config.items()} elif isinstance(config, list): return [config_copy(v) for v in config] else: return deepcopy(config) if __name__ == '__main__': params = deepcopy(sys.argv) # Get the defaults from default.yaml with open(os.path.join(os.path.dirname(__file__), "config", "default.yaml"), "r") as f: try: config_dict = yaml.load(f) except yaml.YAMLError as exc: assert False, "default.yaml error: {}".form

altgraph 0.17.4 bidict 0.23.1 blinker 1.9.0 cffi 1.17.1 click 8.2.1 colorama 0.4.6 dnspython 2.7.0 Flask 3.1.1 flask-cors 6.0.0 Flask-SocketIO 5.5.1 gevent 25.5.1 gevent-websocket 0.10.1 greenlet 3.2.3 h11 0.16.0 itsdangerous 2.2.0 Jinja2 3.1.6 MarkupSafe 3.0.2 Nuitka 2.7.6 ordered-set 4.1.0 packaging 25.0 pefile 2023.2.7 pip 22.0.4 portalocker 3.1.1 pyarmor 9.1.7 pyarmor.cli.core 7.6.7 pycparser 2.22 pyinstaller 6.14.0 pyinstaller-hooks-contrib 2025.4 python-engineio 4.12.2 python-socketio 5.13.0 pywin32 310 pywin32-ctypes 0.2.3 setuptools 58.1.0 simple-websocket 1.1.0 waitress 3.0.2 Werkzeug 3.1.3 wsproto 1.2.0 zope.event 5.0 zope.interface 7.2 zstandard 0.23.0以及打包软件的代码import tkinter as tk from tkinter import ttk, filedialog, messagebox import subprocess import threading import os import sys import shutil import json class PackerApp: def __init__(self, root): self.root = root root.title("EXE打包专家 v4.0") root.geometry("1800x900") self.running = False # 初始化变量 self.app_path = tk.StringVar() self.static_dir = tk.StringVar() self.data_path = tk.StringVar() self.icon_path = tk.StringVar() self.safe_pack = tk.BooleanVar(value=True) # 创建界面 self.create_widgets() # DPI适配 self.dpi_scaling() # 初始化日志系统 self.log_buffer = [] def dpi_scaling(self): """处理高DPI显示""" if sys.platform == "win32": try: from ctypes import windll windll.shcore.SetProcessDpiAwareness(1) except: pass def create_widgets(self): # 主框架 main_frame = ttk.Frame(self.root, padding=15) main_frame.pack(fill=tk.BOTH, expand=True) # 输入区域 input_frame = ttk.LabelFrame(main_frame, text="文件选择", padding=10) input_frame.pack(fill=tk.X, pady=5) self.create_file_row(input_frame, "主程序(.py):", self.app_path, "选择文件", self.select_app) self.create_file_row(input_frame, "静态目录:", self.static_dir, "选择目录", self.select_static_dir) self.create_file_row(input_frame, "数据文件(.json):", self.data_path, "选择文件", self.select_data) self.create_file_row(input_frame, "程序图标(.ico):", self.icon_path, "选择图标", self.select_icon) # 设置区域 options_frame = ttk.LabelFrame(main_frame, text="打包选项", padding=10) options_frame.pack(fill=tk.X, pady=5) ttk.Checkbutton(options_frame, text="单文件模式", variable=tk.BooleanVar(value=True)).grid(row=0, column=0, padx=5) ttk.Checkbutton(options_frame, text="无控制台窗口", variable=tk.BooleanVar(value=True)).grid(row=0, column=1, padx=5) ttk.Checkbutton(options_frame, text="安全混淆打包(防逆向)", variable=self.safe_pack).grid(row=0, column=2, padx=5) # 操作按钮 btn_frame = ttk.Frame(main_frame) btn_frame.pack(pady=10) ttk.Button(btn_frame, text="开始打包", command=self.start_pack).grid(row=0, column=0, padx=5) ttk.Button(btn_frame, text="清除日志", command=self.clear_log).grid(row=0, column=1, padx=5) # 日志区域 log_frame = ttk.LabelFrame(main_frame, text="打包日志", padding=10) log_frame.pack(fill=tk.BOTH, expand=True) self.log_text = tk.Text(log_frame, wrap=tk.WORD, state=tk.DISABLED, font=('Consolas', 20)) vsb = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview) self.log_text.configure(yscrollcommand=vsb.set) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) vsb.pack(side=tk.RIGHT, fill=tk.Y) def create_file_row(self, parent, label, var, btn_text, command): """创建统一文件选择行""" frame = ttk.Frame(parent) frame.pack(fill=tk.X, pady=10) ttk.Label(frame, text=label, width=20).pack(side=tk.LEFT) entry = ttk.Entry(frame, textvariable=var, width=30) entry.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True) ttk.Button(frame, text=btn_text, command=command).pack(side=tk.LEFT) def select_app(self): path = filedialog.askopenfilename(filetypes=[("Python文件", "*.py")]) if path: self.app_path.set(path) self.log(f"选择主程序: {path}") def select_static_dir(self): path = filedialog.askdirectory() if path: self.static_dir.set(path) self.log(f"选择静态目录: {path}") def select_data(self): path = filedialog.askopenfilename(filetypes=[("JSON文件", "*.json")]) if path: self.data_path.set(path) self.log(f"选择数据文件: {path}") def select_icon(self): path = filedialog.askopenfilename(filetypes=[("图标文件", "*.ico")]) if path: if self.validate_icon(path): self.icon_path.set(path) self.log(f"选择图标文件: {path}") else: messagebox.showerror("错误", "无效的ICO文件格式") def validate_icon(self, path): """验证ICO文件有效性""" try: with open(path, "rb") as f: header = f.read(4) return header == b'\x00\x00\x01\x00' except: return False def validate_inputs(self): """输入验证""" errors = [] required = [ (self.app_path, "主程序文件", os.path.isfile), (self.static_dir, "静态目录", os.path.isdir), (self.data_path, "数据文件", os.path.isfile) ] for var, name, check in required: path = var.get() if not path: errors.append(f"请选择{name}") elif not check(path): errors.append(f"{name}路径无效: {path}") if self.icon_path.get() and not os.path.isfile(self.icon_path.get()): errors.append("图标文件路径无效") return errors def start_pack(self): if self.running: return if errors := self.validate_inputs(): messagebox.showerror("输入错误", "\n".join(errors)) return self.running = True self.clear_log() self.log("开始打包进程...") threading.Thread(target=self.pack_process, daemon=True).start() def get_pyarmor_cmd(self, obf_dir, main_py): try: import pyarmor version = getattr(pyarmor, '__version__', '7.0.0') except ImportError: version = '7.0.0' if version.startswith('8') or version >= '8.0.0': # PyArmor 8.x 用 python -m pyarmor.cli obfuscate pyarmor_cmd = [ sys.executable, "-m", "pyarmor.cli", "obfuscate", "--output", obf_dir, main_py ] else: # PyArmor 7.x 直接用 pyarmor 命令行 pyarmor_cmd = [ "pyarmor", "pack", "-e", f"--dist {obf_dir}", main_py ] return pyarmor_cmd def pack_process(self): try: if self.safe_pack.get(): obf_dir = os.path.abspath(os.path.join(os.path.dirname(self.app_path.get()))) main_py = os.path.abspath(self.app_path.get()) pyarmor_cmd = self.get_pyarmor_cmd(obf_dir, main_py) self.log(f"执行PyArmor混淆: {' '.join(pyarmor_cmd)}") result = subprocess.run(pyarmor_cmd, capture_output=True, text=True) if result.stdout: self.log(result.stdout) if result.stderr: self.log(result.stderr) if result.returncode != 0: self.log("PyArmor混淆失败,请检查上方错误信息!") messagebox.showerror("PyArmor混淆失败", result.stderr or "未知错误") return # 这里要用原始文件名 py_file_to_pack = os.path.join(obf_dir, os.path.basename(main_py)) else: py_file_to_pack = self.app_path.get() cmd = [ 'pyinstaller', '--onefile', '--noconsole', '--clean', f'--add-data={self.static_dir.get()}{os.pathsep}static', f'--add-data={self.data_path.get()}{os.pathsep}.', f'--distpath={os.path.abspath("dist")}', f'--workpath={os.path.abspath("build")}' ] # 添加 eventlet 及依赖 hidden-import gevent_hidden_imports = [ '--hidden-import=gevent', '--hidden-import=gevent.monkey', '--hidden-import=geventwebsocket', '--hidden-import=geventwebsocket.handler', '--hidden-import=dns', '--hidden-import=dns.rdtypes', '--hidden-import=dns.rdtypes.ANY', '--hidden-import=dns.rdtypes.IN', '--hidden-import=dns.rdtypes.ANY.SOA', '--hidden-import=dns.rdtypes.ANY.SRV', '--hidden-import=dns.rdtypes.ANY.TXT', '--hidden-import=dns.rdtypes.ANY.NS', '--hidden-import=dns.rdtypes.ANY.CNAME', '--hidden-import=dns.rdtypes.ANY.MX', '--hidden-import=dns.rdtypes.ANY.PTR', '--hidden-import=dns.rdtypes.ANY.AAAA', '--hidden-import=dns.rdtypes.ANY.A', '--hidden-import=dns.rdtypes.IN.A', '--hidden-import=dns.rdtypes.IN.AAAA', '--hidden-import=dns.rdtypes.IN.MX', '--hidden-import=dns.rdtypes.IN.NS', '--hidden-import=dns.rdtypes.IN.PTR', '--hidden-import=dns.rdtypes.IN.SOA', '--hidden-import=dns.rdtypes.IN.SRV', '--hidden-import=dns.rdtypes.IN.TXT', '--hidden-import=dns.asyncbackend', '--hidden-import=dns.asyncquery', '--hidden-import=dns.asyncresolver', '--hidden-import=dns.e164', '--hidden-import=dns.namedict', '--hidden-import=dns.tsigkeyring', '--hidden-import=dns.zone', '--hidden-import=dns.dnssec', '--hidden-import=dns.versioned', '--hidden-import=greenlet', ] cmd.extend(gevent_hidden_imports) if self.icon_path.get(): cmd.append(f'--icon={self.icon_path.get()}') cmd.append(py_file_to_pack) self.log(f"执行命令: {' '.join(cmd)}") with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) as proc: for line in iter(proc.stdout.readline, ''): self.log(line.strip()) exit_code = proc.wait() if exit_code == 0: self.log("打包成功完成!") messagebox.showinfo("成功", "EXE文件已生成在dist目录") else: self.log(f"打包失败,错误代码: {exit_code}") messagebox.showerror("错误", f"打包失败,请检查日志\n错误代码: {exit_code}") except Exception as e: self.log(f"发生未预期的错误: {str(e)}") messagebox.showerror("系统错误", str(e)) finally: self.running = False if os.path.exists("build"): shutil.rmtree("build", ignore_errors=True) if hasattr(self, "safe_pack") and self.safe_pack.get(): obf_dir = os.path.abspath(os.path.join(os.path.dirname(self.app_path.get()), "obf_temp")) if os.path.exists(obf_dir): shutil.rmtree(obf_dir, ignore_errors=True) self.log("打包进程结束") def log(self, message): self.log_buffer.append(message) self.update_log_display() def clear_log(self): self.log_text.config(state=tk.NORMAL) self.log_text.delete(1.0, tk.END) self.log_text.config(state=tk.DISABLED) def update_log_display(self): self.log_text.config(state=tk.NORMAL) while self.log_buffer: line = self.log_buffer.pop(0) self.log_text.insert(tk.END, line + "\n") self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) if __name__ == "__main__": root = tk.Tk() app = PackerApp(root) root.protocol("WM_DELETE_WINDOW", lambda: root.destroy()) root.mainloop()

大家在看

recommend-type

高频双调谐谐振放大电路设计3MHz+电压200倍放大.zip

高频双调谐谐振放大电路设计3MHz+电压200倍放大.zip
recommend-type

只输入固定-vc实现windows多显示器编程的方法

P0.0 只输入固定 P0.1 P0CON.1 P0.2 P0CON.2 PORT_SET.PORT_REFEN P0.3 P0CON.3 自动“偷”从C2的交易应用程序在. PORT_SET.PORT_CLKEN PORT_SET.PORT_CLKOUT[0] P0.4 P0CON.4 C2调试的LED驱动器的时钟输入,如果作为 未启用. P0.5 PORT_CTRL.PORT_LED[1:0] 输出港口被迫为.阅读 实际LED驱动器的状态(开/关) 用户应阅读 RBIT_DATA.GPIO_LED_DRIVE 14只脚 不能用于在开发系统中,由于C2交易扰 乱输出. 参考区间的时钟频率 对抗 控制控制 评论评论 NVM的编程电压 VPP = 6.5 V 矩阵,和ROFF工业* PORT_CTRL 2 GPIO 1 矩阵,和ROFF工业* PORT_CTRL 3 参考 clk_ref GPIO 矩阵 4 C2DAT 产量 CLK_OUT GPIO 5 C2CLK LED驱动器 1 2 工业* PORT_CTRL 1 2 3 1 2 6 产量 CLK_OUT GPIO 1 2 1 1 1 PORT_SET.PORT_CLKEN PORT_SET.PORT_CLKOUT[1] P0.6 P0CON.6 P0.7 P0CON.7 P1.0 P1CON.0 P1.1 P1CON.1 7 8 9 GPIO GPIO GPIO 14只脚 14只脚 14只脚 *注:工业注:工业 代表“独立报”设置. “ 矩阵矩阵 and Roff 模式控制模拟垫电路. 116 修订版修订版1.0
recommend-type

半导体Semi ALD Tungsten W and TiN for Advanced Contact Application

ALD Tungsten, W and TiN for Advanced Contact Application
recommend-type

声纹识别数据集 IDMT-ISA-ELECTRIC-ENGINE

包含发动机正常、高负荷、损坏三种状态.wav声音片段,每种状态包含几百个片段,每个片段时长3S,可用于声纹类型识别,包含数据集介绍文档。
recommend-type

StepInt3-Plugin-x64:StepInt3插件(x64)-x64dbg的插件

StepInt3插件(x64)-x64dbg的插件 有关此插件的x86版本,请访问 概述 一个插件来解决int3断点异常 特征 自动跳过int3断点异常 从插件菜单启用/禁用的选项 如何安装 如果当前正在运行x64dbg(x64dbg 64位),请停止并退出。 将StepInt3.dp64复制到x64dbg\x64\plugins文件夹中。 启动x64dbg 信息 由撰写 使用 RadASM项目(.rap)用于管理和编译插件。 RadASM IDE可以在下载 该插件的x64版本使用 要构建此x64版本,还需要。 x64dbg x64dbg github x64dbg开关

最新推荐

recommend-type

kernel-4.19.90-52.29.v2207.ky10.x86-64.rpm

kernel-4.19.90-52.29.v2207.ky10.x86-64.rpm
recommend-type

多数据源管理与分表实践:MybatisPlus与ShardingJdbc整合

根据给定的文件信息,我们可以详细地解读其中涉及到的关键知识点,这些知识点包括Mybatis Plus的使用、ShardingJdbc的数据分片策略、Swagger的API文档生成能力,以及如何通过注解方式切换数据源。以下是详细的知识点分析: ### Mybatis Plus Mybatis Plus是一个Mybatis的增强工具,在Mybatis的基础上只做增强不做改变,为简化开发、提高效率而生。Mybatis Plus提供了如CRUD、分页、多数据源等一些列增强功能,并且可以与Spring、Spring Boot无缝集成。 #### 使用Mybatis Plus的优势: 1. **简化CRUD操作**:Mybatis Plus自带通用的Mapper和Service,减少代码量,提高开发效率。 2. **支持多种数据库**:支持主流的数据库如MySQL、Oracle、SQL Server等。 3. **逻辑删除**:可以在数据库层面实现记录的软删除功能,无需手动在业务中进行判断。 4. **分页插件**:提供默认的分页功能,支持自定义SQL、Lambda表达式等。 5. **性能分析插件**:方便分析SQL性能问题。 6. **代码生成器**:可以一键生成实体类、Mapper、Service和Controller代码,进一步提高开发效率。 #### 关键点: - **代码生成器**:位于`com.example.demo.common.codegenerator`包下的`GeneratorConfig`类中,用户需要根据实际的数据库配置更改数据库账号密码。 ### ShardingJdbc ShardingJDBC是当当网开源的轻量级Java框架,它在JDBC的层次提供了数据分片的能力。通过ShardingJDBC,可以在应用层面进行分库分表、读写分离、分布式主键等操作。 #### 分库分表: - 通过ShardingJDBC可以配置分库分表的策略,例如按照某个字段的值来决定记录应该保存在哪个分库或分表中。 - **Sharding策略**:可以定义多种分片策略,如模运算、查找表、时间范围等。 #### 关键点: - **注解切换数据源**:文件中提到通过注解的方式切换数据源,这允许开发者在编写代码时通过简单注解即可控制数据访问的路由规则。 ### Swagger Swagger是一个规范且完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务。总体目标是使客户端和文件系统作为服务器以同样的速度来更新。Swagger文件可让机器读取以了解远程服务的功能,并且可以作为浏览器插件,以便用户与远程服务互动。 #### 使用Swagger的优势: 1. **API文档自动生成**:Swagger可以根据代码中的注释直接生成文档。 2. **动态接口测试**:可以动态地对API接口进行测试。 3. **交互式文档**:提供交互式的API文档,可以实时地在线测试API。 #### 关键点: - **动态文档**:项目中集成Swagger后,可以在开发过程中动态更新API文档,便于团队协作和文档维护。 ### 如何使用 1. **准备工作**:在解压之前,需要更改数据源的IP、账号和密码,并执行resources下的SQL脚本。 2. **数据源配置**:在实际使用中,需要根据实际情况更改GeneratorConfig类中的dataSourceConfig()的数据库账号密码。 3. **代码生成**:启动代码生成器,根据设定的模板和策略生成相应的代码,包含实体类、Mapper、Service和Controller等。 4. **分库分表配置**:配置ShardingJDBC的分库分表策略,包括分片键和分片算法等。 5. **API文档**:集成Swagger,自动生成API文档并提供动态测试功能。 ### 结语 通过本文件所描述的内容,我们可以了解到如何利用Mybatis Plus、ShardingJdbc、Swagger这三大工具配合实现复杂的数据库操作和维护,以及如何通过注解在不同数据源之间灵活切换。用户在使用时应严格按照文件中的指示进行操作,确保数据库连接信息的正确配置,并结合代码生成器快速搭建项目基础结构。同时,应充分理解ShardingJdbc的分库分表策略,以及Swagger的API文档管理功能,以优化开发流程和提高工作效率。如果以上内容使用得当,这将是一个能够有效提升项目开发和维护质量的强大工具集。
recommend-type

BLE广播机制深度解析:XN297_TO_BLE.zip中的创新实践与应用指南

# 摘要 本文全面分析了蓝牙低功耗(BLE)广播机制的理论与实践应用,特别关注了XN297_TO_BLE.zip的开发与优化。通过详细探讨BLE广播的工作原理、数据包结构、以及XN297_TO_BLE.zip的设计理念与架构,本文为开发者提供了深入了解和实践BLE技术的框架。文中不仅介绍了如何搭建开发环境和编程实践,还深入讨论了
recommend-type

我用的就是idea的内置maven呀

<think>好的,用户现在提到他们使用的是IDEA的内置Maven,但之前遇到了“mvn不是内部或外部命令”的问题。我需要仔细分析可能的场景和用户的需求。 首先,用户可能是在IDEA内部使用Maven时没有问题,但在外部命令行中使用mvn命令时遇到了问题。这说明用户可能混淆了IDEA内置Maven和系统环境变量配置的关系。内置的Maven只在IDEA内部有效,并不会自动配置系统的环境变量,因此当用户在命令行中直接运行mvn时,系统找不到该命令,导致报错。 接下来,我需要确定用户的具体使用场景。用户可能是在尝试通过命令行执行Maven命令,或者是在其他需要系统环境变量支持的地方使用mvn。
recommend-type

环保主题植树节PPT模板设计赏析

标题“清新淡雅绿色环保植树节ppt模板”和描述“茂密的一棵卡通树,散落的绿叶,藤蔓线条,清新淡雅,绿色环保,312植树节ppt模板”共同体现了该PPT模板的设计风格和主题。该模板旨在宣传和庆祝植树节,同时强调了环保的理念。以下是对标题和描述中所蕴含知识点的详细说明: 1. 植树节的概念 植树节,是为了提高人们对森林资源的认识、倡导植树造林而设定的节日。不同国家的植树节日期可能不同,而在中国,“312”植树节(每年的3月12日)被广泛认知和庆祝。这个节日起源于20世纪初,是纪念孙中山先生的逝世纪念日,并逐渐演变为全民植树造林的活动日。 2. 绿色环保理念 绿色环保是指在人类活动中,采取相应的措施减少对环境的破坏,保护地球的自然资源和生态系统。这包括节能减排、资源循环利用、减少废弃物产生、提高能源效率等方面。该PPT模板采用“清新淡雅”的视觉元素,通过卡通形象和自然元素来传递环保的理念,使人们对环保有更深的认同感。 3. 卡通风格设计 模板使用了卡通风格来呈现内容,卡通风格设计通常更加生动、活泼,易于吸引观众的注意力,尤其适合儿童及青少年教育和宣传场合。卡通化的树木和藤蔓线条,可以更好地将植树节这一主题与观众尤其是年轻一代进行连接。 4. 清新淡雅的设计风格 “清新淡雅”是一种设计理念,强调色彩的温和、简洁的布局和舒适的视觉体验。在设计中,它通常表现为使用柔和的色调、简单的图形和没有过多装饰的版面,以创造出一种宁静、舒适的感觉。这种风格的模板适合用于教育、公益宣传等场合,易于传达温暖、积极的信息。 5. PPT模板的应用 PPT(PowerPoint演示文稿)是微软公司开发的一款演示软件,广泛用于商业汇报、教育授课、会议演讲和各类展示活动。一个精心设计的PPT模板可以提高演示的专业性和观赏性,同时通过统一的风格和格式,帮助使用者节省准备演示的时间和精力。模板中预设的版式、字体和配色可以被用户根据自己的需求进行调整和补充内容。 结合以上知识点,可以得出这个植树节PPT模板的设计意图和使用价值。它不仅具有美化演示文稿的作用,而且通过其环保主题和设计风格,传达了植树造林、保护环境的重要性。模板的视觉元素如卡通树木和藤蔓线条等,使得环保理念的表达更为直观和亲民,适合在植树节等环保主题活动上使用。
recommend-type

BLE调试必备:XN297_TO_BLE.zip故障排除与性能监控手册

# 摘要 本文详细介绍了BLE技术的基础知识,并针对XN297_TO_BLE.zip这一软件包进行了深入分析。通过对安装、配置、故障排查、性能优化、高级功能实现及案例研究等方面的探讨,提供了全面的实施指导和最佳实践。文章首先概括了BLE技术的核心要点,随后重点阐述了XN297_TO_BLE.zip的安装、初始配置以及功能验证,特别是在连接故障诊断、数据同步问题解决、性能
recommend-type

开源 文件管理

<think>好的,用户想找关于开源文件管理系统的信息,包括工具、项目或解决方案。首先,我需要回忆一下相关的开源项目。根据之前的引用,用户提到过Redmine、Taiga、ProjectLibre等开源项目管理软件,但可能这些更多是项目管理而非专门的文件管理。不过,用户还提到了Gollum,这是一个基于Git的文档管理系统,可能更贴近文件管理。 接下来,我需要区分文件管理系统和项目管理系统的不同。文件管理更侧重于文档的存储、版本控制、权限管理等,而项目管理则涉及任务跟踪、协作等。因此,可能需要专门针对文件管理的工具。比如,Nextcloud和OwnCloud都是开源的云存储解决方案,适合文件
recommend-type

CISP标准课件V4.2:信息安全与系统保障全面指南

在详细阐述CISP课件V4.2标准版的知识点之前,我们首先需要明确几个概念。首先,“CISP”指的是“Certified Information Security Professional”(注册信息安全专业人员),这是一个在信息安全领域内认可的专业认证资格。而“CISE”和“CISO”则是指信息安全工程师(Certified Information Security Engineer)和信息安全官(Certified Information Security Officer)的认证,它们都属于CISP的范畴。此外,“CISM”指的是“Certified Information Security Manager”(注册信息安全经理),这是另一个与CISP相关的信息安全专业认证。 根据给出的标题和描述,这份CISP课件V4.2标准版是针对上述信息安全相关认证的教材和学习资源,涵盖了信息安全领域中各类专业人士需要掌握的核心知识。课件的内容体系是以模块化的方式组织的,包括知识域、知识子域和知识点三个层次。具体地,以下是对这份课件中提及的知识点的详细解释: 1. 知识体系模块化结构 - 知识体系:指的是课件内容的整体框架,它将复杂的信息安全知识划分成不同的模块,便于学习者理解和记忆。 - 知识域:指的是整个信息安全领域内的一大类知识主题,例如“信息安全保障”、“网络安全监管”等。 - 知识子域:是在知识域基础上细分出来的子主题,它们构成了实现知识域目标的具体内容。 - 知识点:是在知识子域中进一步细分的小知识点,是学习者需要掌握的基础内容。 2. 知识点掌握程度分类 - 了解:这是基础层级,学习者需要对知识点的基本概念和原理有所认识,但不涉及深层次的应用和分析。 - 理解:这个层次要求学习者不仅了解知识点的基础概念和原理,还能够深入理解其内容,并在此基础上进行判断和推理。 - 掌握:这是最高层级,学习者不仅要了解和理解知识点,还必须能够在实践中灵活运用所学知识,解决实际问题。 3. 知识体系结构涵盖的知识域 - 信息安全保障:涉及组织和机构在信息安全方面的整体策略和措施。 - 网络安全监管:关注如何监管和管理网络安全,以保障网络空间的安全性。 - 信息安全管理:包括信息资产保护、安全政策和程序的制定与实施等内容。 - 业务连续性:讨论如何确保组织在发生信息安全事件后的业务连续性和恢复。 - 安全工程与运营:涉及安全系统的设计、实施和运维管理。 - 安全评估:包括信息安全风险评估和审计等评估活动。 - 信息安全支撑技术:介绍支持信息安全的关键技术和工具。 - 物理与网络通信安全:讲述如何保护物理资产和网络通信不受威胁。 - 计算环境安全:涉及服务器、工作站和其他计算环境的安全保护。 - 软件安全开发:着重于在软件开发过程中如何实现安全性。 综上所述,CISP课件V4.2标准版是一份综合性的学习资源,旨在通过模块化和层次化的方式,帮助信息安全相关专业人士深入掌握信息安全领域的专业知识和技能。这份课件不仅适合作为教材和教学资源,还能够作为学习者自我提升和考核的知识依据。此外,这份课件的文件名称列表中只列出了一个文件名称“CISP课件V4.2标准版 20190214”,表明当前获取的资源是一个特定版本的压缩包文件。在实际使用中,需要解压这个文件以获取其中的教学内容和材料。
recommend-type

BLE通信距离扩展技术:XN297_TO_BLE.zip解决方案与应用分享

# 摘要 随着无线通信技术的快速发展,蓝牙低能耗(BLE)技术因其低功耗和低成本特性在智能家居和健康监控设备中得到广泛应用。本文首先概述了BLE通信技术的工作原理及优势,并介绍了一个名为XN297_TO_BLE.zip的解决方案,该方案旨在扩展BLE的通信距离。接着,本文解析了该技术实现的关键技术点,包括硬件提升和软件优化措施,并通过实践应用案例展示
recommend-type

古文文字识别 开源

<think>好的,我现在需要帮助用户寻找用于古文文字识别的开源项目或工具。用户提到了古文OCR或者古文字识别的开源库。首先,我应该回顾一下用户提供的引用内容,看看是否有相关的信息。 查看引用[1],里面提到了PaddleOCR,指出其中英文双语教程文档完善,适合快速入手,并且有微信用户群答疑。虽然这里提到的是中英文,但可能PaddleOCR是否支持古文呢?需要进一步确认。接着引用[3]提到华为与高校、博物馆合作解决古文识别问题,特别是小篆等字体,但用户问的是开源项目,所以华为的可能不是开源的?不过这里可能可以推荐PaddleOCR,因为它被引用[1]提到,且用户可能可以调整模型来处理古文。