活动介绍
file-type

GitHub Actions新功能:等待外部构建系统状态

ZIP文件

下载需积分: 50 | 85KB | 更新于2025-08-15 | 28 浏览量 | 0 下载量 举报 收藏
download 立即下载
### 知识点 #### GitHub Action介绍 GitHub Action是GitHub平台提供的自动化工具,允许开发者自动化软件开发的各个阶段,从代码提交到CI/CD(持续集成与持续部署)。GitHub Actions可以针对特定的事件,如代码推送、Pull Request生成、定时任务等触发工作流(workflow),并在虚拟机中执行一系列命令。 #### 使用TypeScript创建GitHub Action GitHub Actions可以使用多种语言编写,包括JavaScript和TypeScript。TypeScript是JavaScript的一个超集,它在JavaScript的基础上添加了类型系统和对ES6+新特性的支持。通过使用TypeScript,开发者可以享受到更严格的类型检查和代码编辑器的智能提示,从而提升开发效率和代码质量。 #### 创建JavaScript操作的步骤 1. **使用模板创建新操作**:GitHub提供了一个模板仓库,可以帮助用户快速开始构建自己的GitHub Action。用户可以复制(使用“Use this template”按钮)这个模板到自己的仓库。 2. **安装依赖项**:使用npm(Node.js的包管理器)安装所需的依赖项。在本例中,执行命令`npm install`来安装。 3. **构建TypeScript代码**:TypeScript代码首先需要被编译成JavaScript代码才能在Node.js环境中运行。执行`npm run build`命令进行构建。 4. **打包操作**:构建后,需要将操作打包成一个可分发的格式,通常是压缩成一个ZIP文件。在模板中,可以通过`npm run pack`命令完成此操作。 5. **运行测试**:开发GitHub Action时,编写并运行测试非常重要,以确保操作的正确性。通过执行`npm test`来运行测试,并检查输出中是否显示所有测试已成功通过。 #### action.yml文件 `action.yml`文件是定义GitHub Action的关键,它包含了该Action的元数据和运行时行为。在`action.yml`中,需要指定操作的输入和输出,这样其他工作流可以依赖此操作并获取其输出数据。 - **输入(Inputs)**:可以定义其他GitHub工作流传递给该操作的输入参数。 - **输出(Outputs)**:可以定义此操作产生的输出参数,以便后续工作流使用。 #### 版本控制和发布 在创建GitHub Action后,可以对其进行版本控制和发布,以便其他用户和项目能够使用。版本控制通常使用语义化版本号(semver),并在发布时更新`action.yml`中的版本号。 ### 实际操作示例 假设我们正在开发一个名为`action-wait-for-status`的GitHub Action,其目的是等待外部构建系统的状态变更,然后根据状态做出相应的操作。以下是创建该Action的可能步骤: 1. **克隆模板仓库**:访问`action-wait-for-status`模板仓库页面,使用“Use this template”按钮复制到自己的GitHub账户。 2. **本地初始化**:将复制到本地,并初始化一个新的npm包。 ```bash git clone <template-repo-url> cd <action-name> npm init -y ``` 3. **安装TypeScript和其他依赖**:安装TypeScript及其编译器。 ```bash npm install typescript @types/node --save-dev ``` 4. **编写TypeScript代码**:在项目中编写TypeScript代码,实现等待外部构建系统状态的功能。 5. **配置TypeScript编译**:在项目根目录创建`tsconfig.json`文件,配置TypeScript编译选项。 6. **构建和测试**:编写npm脚本来编译TypeScript代码,并在`./dist`目录生成JavaScript文件。同时编写测试文件,并使用npm脚本运行它们。 ```bash npm run build npm run test ``` 7. **更新`action.yml`文件**:配置Action的输入输出参数。 8. **打包Action**:确保测试通过后,使用npm脚本打包Action到ZIP文件。 ```bash npm run pack ``` 9. **版本控制和发布**:一旦验证了Action的功能并且准备就绪,就可以推送到GitHub,并在GitHub仓库页面上发布一个新的版本。 10. **使用Action**:在其他GitHub仓库的工作流中,按照GitHub Actions的规范引用和使用发布的Action。 通过以上步骤,我们可以创建一个GitHub Action,即`action-wait-for-status`,它可以被整合到GitHub工作流中,帮助开发者在CI/CD过程中实现对构建系统状态的监控。

相关推荐

filetype

sftp -vvv sftp_dy@localhost debug3: spawning "C:\\Program Files\\OpenSSH\\ssh.exe" "-oForwardX11 no" "-oPermitLocalCommand no" "-oClearAllForwardings yes" -v -v -v "-oForwardAgent no" -l sftp_dy -s -- localhost sftp as subprocess OpenSSH_for_Windows_9.8p2 Win32-OpenSSH-GitHub, LibreSSL 4.0.0 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/config error:2 debug3: Failed to open file:C:/ProgramData/ssh/ssh_config error:2 debug3: expanded UserKnownHostsFile '~/.ssh/known_hosts' -> 'C:\\Users\\zwx1262540/.ssh/known_hosts' debug3: expanded UserKnownHostsFile '~/.ssh/known_hosts2' -> 'C:\\Users\\zwx1262540/.ssh/known_hosts2' debug2: resolving "localhost" port 22 debug3: resolve_host: lookup localhost:22 debug3: channel_clear_timeouts: clearing debug3: ssh_connect_direct: entering debug1: Connecting to localhost [127.0.0.1] port 22. debug1: Connection established. debug1: identity file C:\\Users\\zwx1262540/.ssh/id_rsa type 0 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_rsa-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_rsa-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_rsa-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_rsa-cert type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ecdsa type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ecdsa-cert type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ecdsa_sk type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ecdsa_sk-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ecdsa_sk-cert type -1 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ed25519 type 3 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ed25519-cert type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ed25519_sk type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_ed25519_sk-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_ed25519_sk-cert type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_xmss error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_xmss.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_xmss error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_xmss type -1 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_xmss-cert error:2 debug3: Failed to open file:C:/Users/zwx1262540/.ssh/id_xmss-cert.pub error:2 debug3: failed to open file:C:/Users/zwx1262540/.ssh/id_xmss-cert error:2 debug1: identity file C:\\Users\\zwx1262540/.ssh/id_xmss-cert type -1 debug1: Local version string SSH-2.0-OpenSSH_for_Windows_9.8 Win32-OpenSSH-GitHub debug1: Remote protocol version 2.0, remote software version OpenSSH_for_Windows_9.8 Win32-OpenSSH-GitHub debug1: compat_banner: match: OpenSSH_for_Windows_9.8 Win32-OpenSSH-GitHub pat OpenSSH* compat 0x04000000 debug2: fd 3 setting O_NONBLOCK debug1: Authenticating to localhost:22 as 'sftp_dy' debug3: record_hostkey: found key type ECDSA in file C:\\Users\\zwx1262540/.ssh/known_hosts:2 debug3: load_hostkeys_file: loaded 1 keys from localhost debug3: Failed to open file:C:/Users/zwx1262540/.ssh/known_hosts2 error:2 debug1: load_hostkeys: fopen C:\\Users\\zwx1262540/.ssh/known_hosts2: No such file or directory debug3: Failed to open file:C:/ProgramData/ssh/ssh_known_hosts error:2 debug1: load_hostkeys: fopen __PROGRAMDATA__\\ssh/ssh_known_hosts: No such file or directory debug3: Failed to open file:C:/ProgramData/ssh/ssh_known_hosts2 error:2 debug1: load_hostkeys: fopen __PROGRAMDATA__\\ssh/ssh_known_hosts2: No such file or directory debug3: order_hostkeyalgs: prefer hostkeyalgs: [email protected],ecdsa-sha2-nistp384 debug3: send packet: type 20 debug1: SSH2_MSG_KEXINIT sent debug3: receive packet: type 20 debug1: SSH2_MSG_KEXINIT received debug2: local client KEXINIT proposal debug2: KEX algorithms: curve25519-sha256,[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group14-sha256,ext-info-c,[email protected] debug2: host key algorithms: [email protected],ecdsa-sha2-nistp384,[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp521,[email protected],[email protected],rsa-sha2-512,rsa-sha2-256 debug2: ciphers ctos: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: ciphers stoc: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: MACs ctos: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: MACs stoc: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: compression ctos: none,[email protected],zlib debug2: compression stoc: none,[email protected],zlib debug2: languages ctos: debug2: languages stoc: debug2: first_kex_follows 0 debug2: reserved 0 debug2: peer server KEXINIT proposal debug2: KEX algorithms: curve25519-sha256,[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group14-sha256,ext-info-s,[email protected] debug2: host key algorithms: rsa-sha2-512,rsa-sha2-256,ecdsa-sha2-nistp384,ssh-ed25519 debug2: ciphers ctos: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: ciphers stoc: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: MACs ctos: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: MACs stoc: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: compression ctos: none,[email protected] debug2: compression stoc: none,[email protected] debug2: languages ctos: debug2: languages stoc: debug2: first_kex_follows 0 debug2: reserved 0 debug3: kex_choose_conf: will use strict KEX ordering debug1: kex: algorithm: curve25519-sha256 debug1: kex: host key algorithm: ecdsa-sha2-nistp384 debug1: kex: server->client cipher: [email protected] MAC: <implicit> compression: none debug1: kex: client->server cipher: [email protected] MAC: <implicit> compression: none debug3: send packet: type 30 debug1: expecting SSH2_MSG_KEX_ECDH_REPLY debug3: receive packet: type 31 debug1: SSH2_MSG_KEX_ECDH_REPLY received debug1: Server host key: ecdsa-sha2-nistp384 SHA256:UyANwfU8ig+Uf3gKnpza2NfNH19MfjRQxURpwC7ou0U debug3: record_hostkey: found key type ECDSA in file C:\\Users\\zwx1262540/.ssh/known_hosts:2 debug3: load_hostkeys_file: loaded 1 keys from localhost debug3: Failed to open file:C:/Users/zwx1262540/.ssh/known_hosts2 error:2 debug1: load_hostkeys: fopen C:\\Users\\zwx1262540/.ssh/known_hosts2: No such file or directory debug3: Failed to open file:C:/ProgramData/ssh/ssh_known_hosts error:2 debug1: load_hostkeys: fopen __PROGRAMDATA__\\ssh/ssh_known_hosts: No such file or directory debug3: Failed to open file:C:/ProgramData/ssh/ssh_known_hosts2 error:2 debug1: load_hostkeys: fopen __PROGRAMDATA__\\ssh/ssh_known_hosts2: No such file or directory debug1: Host 'localhost' is known and matches the ECDSA host key. debug1: Found key in C:\\Users\\zwx1262540/.ssh/known_hosts:2 debug3: send packet: type 21 debug1: ssh_packet_send2_wrapped: resetting send seqnr 3 debug2: ssh_set_newkeys: mode 1 debug1: rekey out after 134217728 blocks debug1: SSH2_MSG_NEWKEYS sent debug1: Sending SSH2_MSG_EXT_INFO debug3: send packet: type 7 debug1: expecting SSH2_MSG_NEWKEYS debug3: receive packet: type 21 debug1: ssh_packet_read_poll2: resetting read seqnr 3 debug1: SSH2_MSG_NEWKEYS received debug2: ssh_set_newkeys: mode 0 debug1: rekey in after 134217728 blocks debug2: KEX algorithms: curve25519-sha256,[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group14-sha256,ext-info-c,[email protected] debug2: host key algorithms: [email protected],ecdsa-sha2-nistp384,[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp521,[email protected],[email protected],rsa-sha2-512,rsa-sha2-256 debug2: ciphers ctos: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: ciphers stoc: [email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected] debug2: MACs ctos: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: MACs stoc: [email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512 debug2: compression ctos: none,[email protected],zlib debug2: compression stoc: none,[email protected],zlib debug2: languages ctos: debug2: languages stoc: debug2: first_kex_follows 0 debug2: reserved 0 debug3: send packet: type 5 debug3: receive packet: type 7 debug1: SSH2_MSG_EXT_INFO received debug3: kex_input_ext_info: extension server-sig-algs debug1: kex_ext_info_client_parse: server-sig-algs=<ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,[email protected],[email protected],rsa-sha2-512,rsa-sha2-256> debug3: kex_input_ext_info: extension [email protected] debug1: kex_ext_info_check_ver: [email protected]=<0> debug3: kex_input_ext_info: extension [email protected] debug1: kex_ext_info_check_ver: [email protected]=<0> debug3: receive packet: type 6 debug2: service_accept: ssh-userauth debug1: SSH2_MSG_SERVICE_ACCEPT received debug3: send packet: type 50 debug3: receive packet: type 7 debug1: SSH2_MSG_EXT_INFO received debug3: kex_input_ext_info: extension server-sig-algs debug1: kex_ext_info_client_parse: server-sig-algs=<ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,[email protected],[email protected],rsa-sha2-512,rsa-sha2-256> debug3: receive packet: type 51 debug1: Authentications that can continue: password debug3: start over, passed a different list password debug3: preferred publickey,keyboard-interactive,password debug3: authmethod_lookup password debug3: remaining preferred: ,keyboard-interactive,password debug3: authmethod_is_enabled password debug1: Next authentication method: password sftp_dy@localhost's password: debug3: send packet: type 50 debug2: we sent a password packet, wait for reply debug3: receive packet: type 51 debug1: Authentications that can continue: password Permission denied, please try again. sftp_dy@localhost's password: debug3: send packet: type 50 debug2: we sent a password packet, wait for reply debug3: receive packet: type 51 debug1: Authentications that can continue: password Permission denied, please try again.

filetype

参考我给你的图像上传代码和下面的图像识别并在oled上显示的代码,将他们整合在一起/* Edge Impulse Arduino examples * Copyright (c) 2022 EdgeImpulse Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ // These sketches are tested with 2.0.4 ESP32 Arduino Core // https://github.com/espressif/arduino-esp32/releases/tag/2.0.4 /* Includes ---------------------------------------------------------------- */ #include <shibie_inferencing.h> #include "edge-impulse-sdk/dsp/image/image.hpp" #include "esp_camera.h" // Select camera model - find more camera models in camera_pins.h file here // https://github.com/espressif/arduino-esp32/blob/master/libraries/ESP32/examples/Camera/CameraWebServer/camera_pins.h //#define CAMERA_MODEL_ESP_EYE // Has PSRAM #define CAMERA_MODEL_AI_THINKER // Has PSRAM #if defined(CAMERA_MODEL_ESP_EYE) #define PWDN_GPIO_NUM -1 #define RESET_GPIO_NUM -1 #define XCLK_GPIO_NUM 4 #define SIOD_GPIO_NUM 18 #define SIOC_GPIO_NUM 23 #define Y9_GPIO_NUM 36 #define Y8_GPIO_NUM 37 #define Y7_GPIO_NUM 38 #define Y6_GPIO_NUM 39 #define Y5_GPIO_NUM 35 #define Y4_GPIO_NUM 14 #define Y3_GPIO_NUM 13 #define Y2_GPIO_NUM 34 #define VSYNC_GPIO_NUM 5 #define HREF_GPIO_NUM 27 #define PCLK_GPIO_NUM 25 #elif defined(CAMERA_MODEL_AI_THINKER) #define PWDN_GPIO_NUM 32 #define RESET_GPIO_NUM -1 #define XCLK_GPIO_NUM 0 #define SIOD_GPIO_NUM 26 #define SIOC_GPIO_NUM 27 #define Y9_GPIO_NUM 35 #define Y8_GPIO_NUM 34 #define Y7_GPIO_NUM 39 #define Y6_GPIO_NUM 36 #define Y5_GPIO_NUM 21 #define Y4_GPIO_NUM 19 #define Y3_GPIO_NUM 18 #define Y2_GPIO_NUM 5 #define VSYNC_GPIO_NUM 25 #define HREF_GPIO_NUM 23 #define PCLK_GPIO_NUM 22 #else #error "Camera model not selected" #endif /* Constant defines -------------------------------------------------------- */ #define EI_CAMERA_RAW_FRAME_BUFFER_COLS 320 #define EI_CAMERA_RAW_FRAME_BUFFER_ROWS 240 #define EI_CAMERA_FRAME_BYTE_SIZE 3 /* Private variables ------------------------------------------------------- */ static bool debug_nn = false; // Set this to true to see e.g. features generated from the raw signal static bool is_initialised = false; uint8_t *snapshot_buf; //points to the output of the capture static camera_config_t camera_config = { .pin_pwdn = PWDN_GPIO_NUM, .pin_reset = RESET_GPIO_NUM, .pin_xclk = XCLK_GPIO_NUM, .pin_sscb_sda = SIOD_GPIO_NUM, .pin_sscb_scl = SIOC_GPIO_NUM, .pin_d7 = Y9_GPIO_NUM, .pin_d6 = Y8_GPIO_NUM, .pin_d5 = Y7_GPIO_NUM, .pin_d4 = Y6_GPIO_NUM, .pin_d3 = Y5_GPIO_NUM, .pin_d2 = Y4_GPIO_NUM, .pin_d1 = Y3_GPIO_NUM, .pin_d0 = Y2_GPIO_NUM, .pin_vsync = VSYNC_GPIO_NUM, .pin_href = HREF_GPIO_NUM, .pin_pclk = PCLK_GPIO_NUM, //XCLK 20MHz or 10MHz for OV2640 double FPS (Experimental) .xclk_freq_hz = 20000000, .ledc_timer = LEDC_TIMER_0, .ledc_channel = LEDC_CHANNEL_0, .pixel_format = PIXFORMAT_JPEG, //YUV422,GRAYSCALE,RGB565,JPEG .frame_size = FRAMESIZE_QVGA, //QQVGA-UXGA Do not use sizes above QVGA when not JPEG .jpeg_quality = 12, //0-63 lower number means higher quality .fb_count = 1, //if more than one, i2s runs in continuous mode. Use only with JPEG .fb_location = CAMERA_FB_IN_PSRAM, .grab_mode = CAMERA_GRAB_WHEN_EMPTY, }; /* Function definitions ------------------------------------------------------- */ bool ei_camera_init(void); void ei_camera_deinit(void); bool ei_camera_capture(uint32_t img_width, uint32_t img_height, uint8_t *out_buf) ; /** * @brief Arduino setup function */ void setup() { // put your setup code here, to run once: Serial.begin(115200); //comment out the below line to start inference immediately after upload while (!Serial); Serial.println("Edge Impulse Inferencing Demo"); if (ei_camera_init() == false) { ei_printf("Failed to initialize Camera!\r\n"); } else { ei_printf("Camera initialized\r\n"); } ei_printf("\nStarting continious inference in 2 seconds...\n"); ei_sleep(2000); } /** * @brief Get data and run inferencing * * @param[in] debug Get debug info if true */ void loop() { // instead of wait_ms, we'll wait on the signal, this allows threads to cancel us... if (ei_sleep(5) != EI_IMPULSE_OK) { return; } snapshot_buf = (uint8_t*)malloc(EI_CAMERA_RAW_FRAME_BUFFER_COLS * EI_CAMERA_RAW_FRAME_BUFFER_ROWS * EI_CAMERA_FRAME_BYTE_SIZE); // check if allocation was successful if(snapshot_buf == nullptr) { ei_printf("ERR: Failed to allocate snapshot buffer!\n"); return; } ei::signal_t signal; signal.total_length = EI_CLASSIFIER_INPUT_WIDTH * EI_CLASSIFIER_INPUT_HEIGHT; signal.get_data = &ei_camera_get_data; if (ei_camera_capture((size_t)EI_CLASSIFIER_INPUT_WIDTH, (size_t)EI_CLASSIFIER_INPUT_HEIGHT, snapshot_buf) == false) { ei_printf("Failed to capture image\r\n"); free(snapshot_buf); return; } // Run the classifier ei_impulse_result_t result = { 0 }; EI_IMPULSE_ERROR err = run_classifier(&signal, &result, debug_nn); if (err != EI_IMPULSE_OK) { ei_printf("ERR: Failed to run classifier (%d)\n", err); return; } // print the predictions ei_printf("Predictions (DSP: %d ms., Classification: %d ms., Anomaly: %d ms.): \n", result.timing.dsp, result.timing.classification, result.timing.anomaly); #if EI_CLASSIFIER_OBJECT_DETECTION == 1 ei_printf("Object detection bounding boxes:\r\n"); for (uint32_t i = 0; i < result.bounding_boxes_count; i++) { ei_impulse_result_bounding_box_t bb = result.bounding_boxes[i]; if (bb.value == 0) { continue; } ei_printf(" %s (%f) [ x: %u, y: %u, width: %u, height: %u ]\r\n", bb.label, bb.value, bb.x, bb.y, bb.width, bb.height); } // Print the prediction results (classification) #else ei_printf("Predictions:\r\n"); for (uint16_t i = 0; i < EI_CLASSIFIER_LABEL_COUNT; i++) { ei_printf(" %s: ", ei_classifier_inferencing_categories[i]); ei_printf("%.5f\r\n", result.classification[i].value); } #endif // Print anomaly result (if it exists) #if EI_CLASSIFIER_HAS_ANOMALY ei_printf("Anomaly prediction: %.3f\r\n", result.anomaly); #endif #if EI_CLASSIFIER_HAS_VISUAL_ANOMALY ei_printf("Visual anomalies:\r\n"); for (uint32_t i = 0; i < result.visual_ad_count; i++) { ei_impulse_result_bounding_box_t bb = result.visual_ad_grid_cells[i]; if (bb.value == 0) { continue; } ei_printf(" %s (%f) [ x: %u, y: %u, width: %u, height: %u ]\r\n", bb.label, bb.value, bb.x, bb.y, bb.width, bb.height); } #endif free(snapshot_buf); } /** * @brief Setup image sensor & start streaming * * @retval false if initialisation failed */ bool ei_camera_init(void) { if (is_initialised) return true; #if defined(CAMERA_MODEL_ESP_EYE) pinMode(13, INPUT_PULLUP); pinMode(14, INPUT_PULLUP); #endif //initialize the camera esp_err_t err = esp_camera_init(&camera_config); if (err != ESP_OK) { Serial.printf("Camera init failed with error 0x%x\n", err); return false; } sensor_t * s = esp_camera_sensor_get(); // initial sensors are flipped vertically and colors are a bit saturated if (s->id.PID == OV3660_PID) { s->set_vflip(s, 1); // flip it back s->set_brightness(s, 1); // up the brightness just a bit s->set_saturation(s, 0); // lower the saturation } #if defined(CAMERA_MODEL_M5STACK_WIDE) s->set_vflip(s, 1); s->set_hmirror(s, 1); #elif defined(CAMERA_MODEL_ESP_EYE) s->set_vflip(s, 1); s->set_hmirror(s, 1); s->set_awb_gain(s, 1); #endif is_initialised = true; return true; } /** * @brief Stop streaming of sensor data */ void ei_camera_deinit(void) { //deinitialize the camera esp_err_t err = esp_camera_deinit(); if (err != ESP_OK) { ei_printf("Camera deinit failed\n"); return; } is_initialised = false; return; } /** * @brief Capture, rescale and crop image * * @param[in] img_width width of output image * @param[in] img_height height of output image * @param[in] out_buf pointer to store output image, NULL may be used * if ei_camera_frame_buffer is to be used for capture and resize/cropping. * * @retval false if not initialised, image captured, rescaled or cropped failed * */ bool ei_camera_capture(uint32_t img_width, uint32_t img_height, uint8_t *out_buf) { bool do_resize = false; if (!is_initialised) { ei_printf("ERR: Camera is not initialized\r\n"); return false; } camera_fb_t *fb = esp_camera_fb_get(); if (!fb) { ei_printf("Camera capture failed\n"); return false; } bool converted = fmt2rgb888(fb->buf, fb->len, PIXFORMAT_JPEG, snapshot_buf); esp_camera_fb_return(fb); if(!converted){ ei_printf("Conversion failed\n"); return false; } if ((img_width != EI_CAMERA_RAW_FRAME_BUFFER_COLS) || (img_height != EI_CAMERA_RAW_FRAME_BUFFER_ROWS)) { do_resize = true; } if (do_resize) { ei::image::processing::crop_and_interpolate_rgb888( out_buf, EI_CAMERA_RAW_FRAME_BUFFER_COLS, EI_CAMERA_RAW_FRAME_BUFFER_ROWS, out_buf, img_width, img_height); } return true; } static int ei_camera_get_data(size_t offset, size_t length, float *out_ptr) { // we already have a RGB888 buffer, so recalculate offset into pixel index size_t pixel_ix = offset * 3; size_t pixels_left = length; size_t out_ptr_ix = 0; while (pixels_left != 0) { // Swap BGR to RGB here // due to https://github.com/espressif/esp32-camera/issues/379 out_ptr[out_ptr_ix] = (snapshot_buf[pixel_ix + 2] << 16) + (snapshot_buf[pixel_ix + 1] << 8) + snapshot_buf[pixel_ix]; // go to the next pixel out_ptr_ix++; pixel_ix+=3; pixels_left--; } // and done! return 0; } #if !defined(EI_CLASSIFIER_SENSOR) || EI_CLASSIFIER_SENSOR != EI_CLASSIFIER_SENSOR_CAMERA使用下面的头文件#include <WiFi.h> #include "esp_camera.h" #include <shibie_inferencing.h> #include "edge-impulse-sdk/dsp/image/image.hpp" #include "freertos/semphr.h" // 互斥锁头文件 #include "esp_task_wdt.h" #include "freertos/task.h" #include "esp_http_server.h" #error "Invalid model for current sensor" #endif

filetype

现在有个问题,我选择账号1进行抖音数据采集视频列表,然后切换平台的小红书的时候,发现账号1在转圈采集,好像显示抖音账号1和小红书账号1显示有点重叠,账号1-账号6都检查一下,切换平台的时候,不是要暂停采集数据,是放到后台采集数据,account_widget.py的from PyQt6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QTableWidget, QTableWidgetItem, QFrame, QGridLayout, QMessageBox, QHeaderView, QAbstractItemView, QApplication, QMenu, QFileDialog, QStyle, QProgressBar,QComboBox ) from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer from PyQt6.QtGui import QFont, QColor from tanchen_v2.core_v1.collector_threads import DouyinDataCollectorThread from tanchen_v2.ui.comment_window import CommentWindow from tanchen_v2.core_v1.loading_indicator import LoadingIndicator from tanchen_v2.core_v1.config_manager import ConfigManager import pandas as pd import json import os from tanchen_v2.core_v1.log_tc import logger class AccountWidget(QWidget): def __init__(self, index, config_manager,platform:str="douyin"): super().__init__() self.index = index self.account_index = index self.data = [] self.current_platform = platform #默认平台 self.platform = platform self.platform_configs=config_manager self.platform_data={}# 按平台存储数据:{"douyin":data,"xiaohongshu":data} self.config_manager = config_manager self.is_editing = True self.collector_thread = None self.comment_window = None self.init_ui() self.load_config() def init_ui(self): main_layout = QVBoxLayout() # 平台选择器 - 移到账号标题上方 # platform_layout = QHBoxLayout() # platform_layout.addWidget(QLabel("选择平台:")) # # self.platform_combo = QComboBox() # self.platform_combo.addItem("抖音", "douyin") # self.platform_combo.addItem("小红书", "xiaohongshu") # self.platform_combo.currentIndexChanged.connect(self.switch_platform) # platform_layout.addWidget(self.platform_combo) # platform_layout.addStretch() # main_layout.addLayout(platform_layout) # 平台选择器放在最上方 # 账号标题 title_layout = self.create_title_layout() main_layout.addLayout(title_layout) # 配置区域 config_layout = self.create_config_layout() main_layout.addLayout(config_layout) # 控制按钮 btn_layout = self.create_button_layout() main_layout.addLayout(btn_layout) # 表格容器 self.table_container = self.create_table_container() main_layout.addWidget(self.table_container, 1) # 状态栏 self.status_label = QLabel("就绪") self.status_label.setStyleSheet(""" QLabel { color: #666; font-size: 12px; padding: 5px; border-top: 1px solid #e0e0e0; } """) main_layout.addWidget(self.status_label) self.setLayout(main_layout) self.set_edit_mode(True) def update_platform(self,platform): """更新账号平台并保留配置""" # 保存当前平台配置 self.platform_configs[self.platform] = self.get_current_config() # 更新平台 self.platform = platform self.platform_config_group.setTitle(f"{platform}配置") # 加载新平台配置 self.update_platform_config_ui() def create_title_layout(self): layout = QHBoxLayout() self.title_label = QLabel(f"账号 {self.index + 1}") self.title_label.setFont(QFont("Arial", 12, QFont.Weight.Bold)) layout.addWidget(self.title_label) layout.addWidget(QLabel("备注:")) self.remark_input = QLineEdit() self.remark_input.setPlaceholderText("账号备注...") self.remark_input.setMaximumWidth(200) layout.addWidget(self.remark_input) self.edit_btn = self.create_icon_button("编辑配置", QStyle.StandardPixmap.SP_FileDialogDetailedView, "#FFC107", "#FFA000", "#FF8F00") self.edit_btn.clicked.connect(self.toggle_edit_mode) layout.addWidget(self.edit_btn) self.save_btn = self.create_icon_button("保存配置", QStyle.StandardPixmap.SP_DialogSaveButton, "#4CAF50", "#388E3C", "#2E7D32") self.save_btn.clicked.connect(self.save_config) layout.addWidget(self.save_btn) layout.addStretch() return layout def create_config_layout(self): layout = QGridLayout() layout.setColumnStretch(1, 1) # 关键字输入 layout.addWidget(QLabel("关键字:"), 0, 0) self.keyword_input = QLineEdit() self.keyword_input.setPlaceholderText("输入搜索关键字...") layout.addWidget(self.keyword_input, 0, 1) # Cookie输入 layout.addWidget(QLabel("Cookie:"), 1, 0) self.cookie_input = QLineEdit() self.cookie_input.setPlaceholderText("输入账号Cookie...") self.cookie_input.setEchoMode(QLineEdit.EchoMode.Password) layout.addWidget(self.cookie_input, 1, 1) # 显示/隐藏Cookie按钮 self.toggle_cookie_btn = QPushButton("显示") self.toggle_cookie_btn.setMaximumWidth(60) self.toggle_cookie_btn.setStyleSheet("QPushButton { padding: 3px; border-radius: 3px; }") self.toggle_cookie_btn.clicked.connect(self.toggle_cookie_visibility) layout.addWidget(self.toggle_cookie_btn, 1, 2) # 评论关键字输入 layout.addWidget(QLabel("评论关键字:"), 2, 0) self.comment_keyword_input = QLineEdit() self.comment_keyword_input.setPlaceholderText("输入评论关键字过滤...支持中文逗号、英文逗号、空格例如:测试1,测试2") layout.addWidget(self.comment_keyword_input, 2, 1) return layout def create_button_layout(self): layout = QHBoxLayout() # 开始采集按钮 self.start_btn = self.create_action_button("开始采集", QStyle.StandardPixmap.SP_MediaPlay, "#4CAF50", "#388E3C", "#2E7D32") self.start_btn.clicked.connect(self.start_collecting) layout.addWidget(self.start_btn) # 停止采集按钮 self.stop_btn = self.create_action_button("停止采集", QStyle.StandardPixmap.SP_MediaStop, "#f44336", "#D32F2F", "#B71C1C") self.stop_btn.clicked.connect(self.stop_collecting) self.stop_btn.setEnabled(False) layout.addWidget(self.stop_btn) # 获取评论按钮 self.get_comments_btn = self.create_action_button("获取评论", QStyle.StandardPixmap.SP_MessageBoxInformation, "#9C27B0", "#7B1FA2", "#4A148C") self.get_comments_btn.clicked.connect(self.get_comments) layout.addWidget(self.get_comments_btn) # 导出数据按钮 self.export_btn = self.create_action_button("导出数据", QStyle.StandardPixmap.SP_DialogSaveButton, "#2196F3", "#1976D2", "#0D47A1") self.export_btn.clicked.connect(self.export_data) layout.addWidget(self.export_btn) # 清空表格按钮 self.clear_table_btn = self.create_action_button("清空表格", QStyle.StandardPixmap.SP_TrashIcon, "#FF9800", "#F57C00", "#EF6C00") self.clear_table_btn.clicked.connect(self.clear_table_data) layout.addWidget(self.clear_table_btn) layout.addStretch() return layout def setup_table_columns(self,platform): """根据平台设置表格列""" if platform == "douyin": self.table.setColumnCount(7) self.table.setHorizontalHeaderLabels( ["序号", "作者昵称", "类型", "视频地址", "评论数量", "发布时间", "视频ID"]) # 设置列宽 self.table.setColumnWidth(0, 60) self.table.setColumnWidth(1, 150) self.table.setColumnWidth(2, 100) self.table.setColumnWidth(3, 300) self.table.setColumnWidth(4, 100) self.table.setColumnWidth(5, 150) self.table.setColumnWidth(6, 200) elif platform == "xiaohongshu": self.table.setColumnCount(8) self.table.setHorizontalHeaderLabels( ["序号", "作者昵称", "类型", "笔记链接", "点赞数", "收藏数", "发布时间", "笔记ID"]) # 设置列宽 self.table.setColumnWidth(0, 60) self.table.setColumnWidth(1, 150) self.table.setColumnWidth(2, 100) self.table.setColumnWidth(3, 300) self.table.setColumnWidth(4, 80) self.table.setColumnWidth(5, 80) self.table.setColumnWidth(6, 150) self.table.setColumnWidth(7, 200) # 添加新平台时只需在这里添加新的列配置 def create_table_container(self): container = QFrame() container.setFrameShape(QFrame.Shape.StyledPanel) container.setStyleSheet("background-color: white; border: 1px solid #E0E0E0; border-radius: 4px;") layout = QVBoxLayout(container) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) # v2数据表格 self.table = QTableWidget() self.setup_table_columns(self.current_platform) # 根据平台初始化表格列 self.table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive) self.table.horizontalHeader().setSectionsMovable(True) self.table.verticalHeader().setVisible(False) self.table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers) self.table.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows) self.table.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection) self.table.setSortingEnabled(True) self.table.setStyleSheet(""" QTableWidget { background-color: white; alternate-background-color: #f9f9f9; gridline-color: #e0e0e0; } QHeaderView::section { background-color: #f0f0f0; padding: 4px; border: 1px solid #e0e0e0; font-weight: bold; } """) self.table.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu) self.table.customContextMenuRequested.connect(self.show_context_menu) layout.addWidget(self.table) # 覆盖层用于显示加载指示器 self.overlay = QWidget(container) self.overlay.setGeometry(0, 0, container.width(), container.height()) self.overlay.setStyleSheet("background-color: rgba(255, 255, 255, 0.7);") overlay_layout = QVBoxLayout(self.overlay) overlay_layout.setAlignment(Qt.AlignmentFlag.AlignCenter) self.loading_indicator = LoadingIndicator() overlay_layout.addWidget(self.loading_indicator) self.overlay.hide() return container def create_icon_button(self, text, icon, base_color, hover_color, pressed_color): button = QPushButton(text) button.setIcon(self.style().standardIcon(icon)) text_color = "black" if base_color == "#FFC107" else "white" button.setStyleSheet(f""" QPushButton {{ background-color: {base_color}; color: {text_color}; padding: 5px; border-radius: 4px; }} QPushButton:hover {{ background-color: {hover_color}; }} QPushButton:pressed {{ background-color: {pressed_color}; }} """) return button def create_action_button(self, text, icon, base_color, hover_color, pressed_color): button = QPushButton(text) button.setIcon(self.style().standardIcon(icon)) button.setStyleSheet(f""" QPushButton {{ background-color: {base_color}; color: white; padding: 8px 16px; border-radius: 5px; font-weight: bold; }} QPushButton:hover {{ background-color: {hover_color}; }} QPushButton:pressed {{ background-color: {pressed_color}; }} QPushButton:disabled {{ background-color: {base_color}80; }} """) return button def toggle_cookie_visibility(self): if self.cookie_input.echoMode() == QLineEdit.EchoMode.Password: self.cookie_input.setEchoMode(QLineEdit.EchoMode.Normal) self.toggle_cookie_btn.setText("隐藏") else: self.cookie_input.setEchoMode(QLineEdit.EchoMode.Password) self.toggle_cookie_btn.setText("显示") def toggle_edit_mode(self): self.set_edit_mode(not self.is_editing) def set_edit_mode(self, edit_mode): self.is_editing = edit_mode self.remark_input.setReadOnly(not edit_mode) self.keyword_input.setReadOnly(not edit_mode) self.cookie_input.setReadOnly(not edit_mode) self.edit_btn.setVisible(not edit_mode) self.save_btn.setVisible(edit_mode) bg_color = "#f5f5f5" if not edit_mode else "white" self.remark_input.setStyleSheet(f"background-color: {bg_color}; border: 1px solid #e0e0e0; border-radius: 3px;") self.keyword_input.setStyleSheet( f"background-color: {bg_color}; border: 1px solid #e0e0e0; border-radius: 3px;") self.cookie_input.setStyleSheet(f"background-color: {bg_color}; border: 1px solid #e0e0e0; border-radius: 3px;") if edit_mode: self.status_label.setText("编辑模式:可以修改配置") else: self.status_label.setText("配置已锁定,点击'编辑配置'修改") def switch_platform(self,new_platform): """切换平台时的处理""" # 保存当前平台的数据 self.save_current_data() # 更新当前平台 self.current_platform = new_platform # 更新表格列 self.setup_table_columns(new_platform) # 加载新平台的配置 self.load_config() # 加载新平台的数据 self.load_platform_data() # 更新状态栏 self.status_label.setText(f"已切换到{new_platform}平台") def save_current_data(self): """保存当前平台的数据到内存""" # 从表格获取当前数据 data = [] for row in range(self.table.rowCount()): row_data = {} for col in range(self.table.columnCount()): item = self.table.item(row, col) if item: header = self.table.horizontalHeaderItem(col).text() row_data[header] = item.text() data.append(row_data) # 保存到平台数据字典 self.platform_data[self.current_platform] = data def load_platform_data(self): """加载当前平台的数据到表格""" data = self.platform_data.get(self.current_platform, []) # 清空表格 self.table.setRowCount(0) if not data: self.status_label.setText(f"{self.current_platform}平台无数据") return # 填充表格 self.table.setRowCount(len(data)) for row, item in enumerate(data): for col in range(self.table.columnCount()): header = self.table.horizontalHeaderItem(col).text() value = item.get(header, "") table_item = QTableWidgetItem(value) table_item.setFlags(Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEnabled) self.table.setItem(row, col, table_item) self.status_label.setText(f"已加载{len(data)}条{self.current_platform}数据") def save_config(self): #v2 self.set_edit_mode(False) # 保存配置时按平台存储 config = { "remark": self.remark_input.text(), "keyword": self.keyword_input.text(), "cookie": self.cookie_input.text(), "comment_keyword": self.comment_keyword_input.text(), "platform": self.current_platform # 保存当前平台 } # 使用平台特定的键保存配置 self.config_manager.save_account_config(self.index, config, self.current_platform) self.status_label.setText("配置已保存") self.save_btn.setStyleSheet(""" QPushButton { background-color: #2E7D32; color: white; padding: 5px; border-radius: 4px; } """) QTimer.singleShot(500, self.restore_save_button_style) def restore_save_button_style(self): self.save_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; padding: 5px; border-radius: 4px; } QPushButton:hover { background-color: #388E3C; } QPushButton:pressed { background-color: #2E7D32; } """) def load_config(self): # 加载当前平台的配置 config = self.config_manager.load_account_config(self.index, self.current_platform) if config: self.remark_input.setText(config.get("remark", "")) self.keyword_input.setText(config.get("keyword", "")) self.cookie_input.setText(config.get("cookie", "")) self.comment_keyword_input.setText(config.get("comment_keyword", "")) logger.debug(f"平台:{self.current_platform}--账号{self.index + 1}加载配置成功") logger.debug(f"平台:{self.current_platform}--账号{self.index + 1}加载配置:{str(config)}") else: # 如果没有配置,初始化空值 self.remark_input.setText("") self.keyword_input.setText("") self.cookie_input.setText("") self.comment_keyword_input.setText("") logger.debug(f"平台:{ self.current_platform}--账号{self.index + 1}无配置,使用默认值") logger.debug(f"平台:{self.current_platform}--账号{self.index + 1}无配置, 加载配置:{str(config)}") def start_collecting(self): keyword = self.keyword_input.text().strip() cookie = self.cookie_input.text().strip() if not keyword: QMessageBox.warning(self, "输入错误", "请输入搜索关键字") return if not cookie: QMessageBox.warning(self, "输入错误", "请输入账号Cookie") return if self.collector_thread and self.collector_thread.isRunning(): self.collector_thread.stop() self.collector_thread.quit() self.collector_thread.wait(1000) self.overlay.show() self.loading_indicator.start() self.start_btn.setStyleSheet(""" QPushButton { background-color: #2E7D32; color: white; padding: 8px 16px; border-radius: 5px; font-weight: bold; } """) # 使用爬虫工厂创建对应平台的爬虫线程 try: from core.Crawler.crawler_factory import CrawlerFactory self.collector_thread = CrawlerFactory.create_crawler( platform=self.current_platform, account_index=self.index, config={ "keyword": keyword, "cookie": cookie } ) # 设置关键字 self.collector_thread.keyword = keyword except Exception as e: logger.warning(f"创建爬虫失败: {str(e)},{str(self.current_platform)}") self.handle_error(f"创建爬虫失败: {str(e)}") self.overlay.hide() self.loading_indicator.stop() return try: self.collector_thread.data_collected.connect(self.update_data) self.collector_thread.status_updated.connect(self.update_status) self.collector_thread.error_occurred.connect(self.handle_error) self.collector_thread.finished.connect(self.on_collection_finished) self.collector_thread.start(keyword) self.start_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.keyword_input.setEnabled(False) self.cookie_input.setEnabled(False) self.status_label.setText(f"开始采集{self.current_platform}: {keyword}") self.status_label.setStyleSheet("color: #4CAF50; font-weight: bold;") except Exception as e: logger.warning(f"启动爬虫失败: {str(e)}") self.handle_error(f"启动爬虫失败: {str(e)}") def handle_error(self, error_message): self.status_label.setText(error_message) self.status_label.setStyleSheet("color: #f44336; font-weight: bold;") #self.stop_collecting() def update_status(self, message): self.status_label.setText(message) self.status_label.setStyleSheet("color: #4CAF50; font-weight: bold;") def on_collection_finished(self): self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.keyword_input.setEnabled(True) self.cookie_input.setEnabled(True) self.overlay.hide() self.loading_indicator.stop() self.restore_start_button_style() def restore_start_button_style(self): self.start_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; padding: 8px 16px; border-radius: 5px; font-weight: bold; } QPushButton:hover { background-color: #388E3C; } QPushButton:pressed { background-color: #2E7D32; } QPushButton:disabled { background-color: #81C784; } """) def stop_collecting(self): if self.collector_thread and self.collector_thread.isRunning(): self.collector_thread.stop() self.collector_thread.quit() self.collector_thread.wait(1000) self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.keyword_input.setEnabled(True) self.cookie_input.setEnabled(True) self.overlay.hide() self.loading_indicator.stop() self.restore_start_button_style() self.status_label.setText("采集已停止") self.status_label.setStyleSheet("color: #f44336; font-weight: bold;") def remove_duplicates(self, data): seen_urls = set() unique_data = [] url_field = "视频地址" if self.current_platform == "douyin" else "笔记链接" for item in data: item_url = item.get(url_field, "") if item_url and item_url not in seen_urls: seen_urls.add(item_url) unique_data.append(item) return unique_data def update_data(self, new_data): logger.debug("new_data",new_data) if not new_data or not isinstance(new_data, list): QMessageBox.warning(self, "数据错误", "接收到无效的数据格式") return if new_data and "状态" in new_data[0] and new_data[0]["状态"] == "错误": error_msg = new_data[0].get("消息", "未知错误") QMessageBox.warning(self, "采集错误", error_msg) self.stop_collecting() return unique_data = self.remove_duplicates(new_data) # 保存到当前平台的数据 self.platform_data[self.current_platform] = unique_data # 更新表格 self.table.setSortingEnabled(False) self.table.setRowCount(len(unique_data)) flags = Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEnabled for row, item in enumerate(unique_data): # 根据平台动态填充表格 if self.current_platform == "douyin": # 抖音数据处理 self.table.setItem(row, 0, QTableWidgetItem(str(item.get("序号", "")))) self.table.setItem(row, 1, QTableWidgetItem(item.get("作者昵称", ""))) self.table.setItem(row, 2, QTableWidgetItem(item.get("类型", ""))) video_url = item.get("视频地址", "") url_item = QTableWidgetItem(video_url) url_item.setForeground(QColor("#0000FF")) url_item.setToolTip(video_url) self.table.setItem(row, 3, url_item) self.table.setItem(row, 4, QTableWidgetItem(str(item.get("评论数量", 0)))) self.table.setItem(row, 5, QTableWidgetItem(item.get("发布时间", ""))) self.table.setItem(row, 6, QTableWidgetItem(item.get("视频ID", ""))) elif self.current_platform == "xiaohongshu": # 小红书数据处理 self.table.setItem(row, 0, QTableWidgetItem(str(item.get("序号", "")))) self.table.setItem(row, 1, QTableWidgetItem(item.get("作者昵称", ""))) self.table.setItem(row, 2, QTableWidgetItem(item.get("类型", ""))) note_url = item.get("笔记链接", "") url_item = QTableWidgetItem(note_url) url_item.setForeground(QColor("#0000FF")) url_item.setToolTip(note_url) self.table.setItem(row, 3, url_item) self.table.setItem(row, 4, QTableWidgetItem(str(item.get("点赞数", 0)))) self.table.setItem(row, 5, QTableWidgetItem(str(item.get("收藏数", 0)))) self.table.setItem(row, 6, QTableWidgetItem(item.get("发布时间", ""))) self.table.setItem(row, 7, QTableWidgetItem(item.get("笔记ID", ""))) # 添加新平台时在这里添加处理逻辑 self.table.setSortingEnabled(True) self.table.scrollToBottom() self.status_label.setText(f"已采集 {len(unique_data)} 条{self.current_platform}数据") self.status_label.setStyleSheet("color: #4CAF50; font-weight: bold;") def export_data(self): # 获取当前平台的数据 current_data = self.platform_data.get(self.current_platform, []) if not current_data: QMessageBox.warning(self, "导出失败", f"没有可导出的{self.current_platform}数据") return try: remark = self.remark_input.text().strip() platform_name = self.current_platform default_name = f"{platform_name}数据_账号{self.index + 1}_{remark}.xlsx" if remark else f"{platform_name}数据_账号{self.index + 1}.xlsx" filename, _ = QFileDialog.getSaveFileName( self, "导出数据", default_name, "Excel文件 (*.xlsx)" ) if not filename: return if not filename.endswith('.xlsx'): filename += '.xlsx' df = pd.DataFrame(current_data) df.to_excel(filename, index=False) self.export_btn.setStyleSheet(""" QPushButton { background-color: #0D47A1; color: white; padding: 8px 16px; border-radius: 5px; font-weight: bold; } """) QTimer.singleShot(500, self.restore_export_button_style) QMessageBox.information(self, "导出成功", f"数据已导出到: {filename}") except Exception as e: QMessageBox.critical(self, "导出错误", f"导出失败: {str(e)}") def restore_export_button_style(self): self.export_btn.setStyleSheet(""" QPushButton { background-color: #2196F3; color: white; padding: 8px 16px; border-radius: 5px; font-weight: bold; } QPushButton:hover { background-color: #1976D2; } QPushButton:pressed { background-color: #0D47A1; } """) def clear_table_data(self): # 获取当前平台的数据 current_data = self.platform_data.get(self.current_platform, []) if not current_data: QMessageBox.information(self, "提示", f"{self.current_platform}平台无数据") return reply = QMessageBox.question( self, "确认清空", f"确定要清空{self.current_platform}的表格数据吗? (共{len(current_data)}条)", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No ) if reply == QMessageBox.StandardButton.Yes: self.platform_data[self.current_platform] = [] self.table.setRowCount(0) self.status_label.setText(f"{self.current_platform}数据已清空") def get_comments(self): """修改:处理多个关键字""" # 获取当前平台的数据 current_data = self.platform_data.get(self.current_platform, []) if not current_data: QMessageBox.warning(self, "操作失败", f"没有可用的{self.current_platform}数据") return # 获取关键字并分割 keyword_input = self.comment_keyword_input.text().strip() if keyword_input: # 使用 | 作为分隔符,支持中文逗号、英文逗号、空格 keywords = keyword_input.replace(',', ',').replace(' ', ',') keyword_filter = '|'.join([kw.strip() for kw in keywords.split(',') if kw.strip()]) else: keyword_filter = "" # 构建有效的视频数据 video_data = [] for idx, item in enumerate(current_data): try: # 根据平台获取不同的字段 if self.current_platform == "douyin": video_id = self.extract_douyin_video_id(item.get("视频地址", "")) comment_count = int(item.get("评论数量", 0)) video_url = item.get("视频地址", "") elif self.current_platform == "xiaohongshu": video_id = self.extract_xiaohongshu_note_id(item.get("笔记链接", "")) comment_count = int(item.get("评论数量", 0)) # 假设小红书也有评论数量字段 video_url = item.get("笔记链接", "") else: continue if video_id and comment_count > 0: video_data.append({ "序号": idx + 1, "视频地址": video_url, "视频ID": video_id, "评论数量": comment_count }) except (ValueError, TypeError, KeyError): continue if not video_data: QMessageBox.warning(self, "操作失败", "没有包含评论的视频数据") return try: # 获取当前账号的cookie cookie = self.cookie_input.text().strip() # 如果评论窗口已存在,先关闭它 if hasattr(self, 'comment_window') and self.comment_window: self.comment_window.close() self.comment_window = None self.comment_window = CommentWindow( parent=self, account_index=self.index, video_data=video_data, keyword_filter=keyword_filter, # 传递处理后的关键字 cookie=cookie, # 传递cookie use_proxy=False, # 默认不开启代理 account_keywords=keyword_filter, # 传递账号关键字 platform=self.current_platform # 传递当前平台 ) # 使用独立窗口模式 self.comment_window.setWindowFlag(Qt.WindowType.Window) self.comment_window.finished.connect(self.on_comment_window_closed) self.comment_window.show() except Exception as e: QMessageBox.critical(self, "错误", f"无法打开评论窗口: {str(e)}") def extract_douyin_video_id(self, video_url): """从抖音视频地址中提取视频ID""" # 示例URL: https://www.douyin.com/video/1234567890123456789 parts = video_url.split('/') if len(parts) >= 5: return parts[-1] # 返回最后一部分作为视频ID return "" def extract_xiaohongshu_note_id(self, note_url): """从小红书笔记地址中提取笔记ID""" # 示例URL: https://www.xiaohongshu.com/explore/1234567890abcdef12345678 parts = note_url.split('/') if len(parts) >= 5: return parts[-1] # 返回最后一部分作为笔记ID return "" def extract_video_id(self, video_url): """从视频地址中提取视频ID""" # 示例URL: https://www.douyin.com/video/1234567890123456789 parts = video_url.split('/') if len(parts) >= 5: return parts[-1] # 返回最后一部分作为视频ID return "" def on_comment_window_closed(self): # 安全地关闭评论窗口 if hasattr(self, 'comment_window') and self.comment_window: # 确保停止线程 if hasattr(self.comment_window, 'comment_thread') and self.comment_window.comment_thread.isRunning(): self.comment_window.comment_thread.stop() self.comment_window.comment_thread.quit() self.comment_window.comment_thread.wait(2000) self.comment_window = None def show_context_menu(self, position): menu = QMenu(self) selected_rows = set(item.row() for item in self.table.selectedItems()) if not selected_rows: return copy_row_action = menu.addAction("复制整行") copy_row_action.triggered.connect(lambda: self.copy_selected_row(selected_rows)) copy_cell_action = menu.addAction("复制单元格") copy_cell_action.triggered.connect(self.copy_selected_cell) menu.addSeparator() # 根据平台显示不同的复制选项 if self.current_platform == "douyin": copy_url_action = menu.addAction("复制视频地址") copy_url_action.triggered.connect(lambda: self.copy_selected_column(3)) copy_video_id_action = menu.addAction("复制视频ID") copy_video_id_action.triggered.connect(lambda: self.copy_selected_column(6)) elif self.current_platform == "xiaohongshu": copy_url_action = menu.addAction("复制笔记链接") copy_url_action.triggered.connect(lambda: self.copy_selected_column(3)) copy_note_id_action = menu.addAction("复制笔记ID") copy_note_id_action.triggered.connect(lambda: self.copy_selected_column(7)) copy_author_action = menu.addAction("复制作者昵称") copy_author_action.triggered.connect(lambda: self.copy_selected_column(1)) menu.exec(self.table.viewport().mapToGlobal(position)) def copy_selected_row(self, rows): text = "" for row in sorted(rows): row_data = [] for col in range(self.table.columnCount()): item = self.table.item(row, col) if item: row_data.append(item.text()) text += "\t".join(row_data) + "\n" clipboard = QApplication.clipboard() clipboard.setText(text.strip()) def copy_selected_cell(self): selected_items = self.table.selectedItems() if not selected_items: return text = "\n".join(item.text() for item in selected_items) clipboard = QApplication.clipboard() clipboard.setText(text) def copy_selected_column(self, column): selected_items = [item for item in self.table.selectedItems() if item.column() == column] if not selected_items: return text = "\n".join(item.text() for item in selected_items) clipboard = QApplication.clipboard() clipboard.setText(text) def resizeEvent(self, event): super().resizeEvent(event) if self.table_container: self.overlay.setGeometry(0, 0, self.table_container.width(), self.table_container.height())和main_window.py的import pandas as pd from PyQt6.QtWidgets import ( QMainWindow, QWidget, QVBoxLayout, QLabel, QTabWidget, QHBoxLayout, QPushButton, QStatusBar, QStyle, QFileDialog, QMessageBox, QDialog, QFormLayout, QLineEdit, QComboBox, QCheckBox ) from PyQt6.QtGui import QFont from PyQt6.QtCore import Qt, QTimer from tanchen_v2.ui.account_widget import AccountWidget from tanchen_v2.core_v1.config_manager import ConfigManager class ProxyConfigDialog(QDialog): def __init__(self, config_manager, parent=None): super().__init__(parent) self.setWindowTitle("天启代理配置") self.setGeometry(200, 200, 500, 300) self.config_manager = config_manager layout = QVBoxLayout() layout.setContentsMargins(15, 15, 15, 15) layout.setSpacing(10) # 加载配置 self.config = self.config_manager.load_proxy_config() or {} form_layout = QFormLayout() form_layout.setSpacing(10) # 创建控件 self.secret_edit = QLineEdit() self.secret_edit.setText(self.config.get("secret", "")) self.type_combo = QComboBox() self.type_combo.addItems(["json", "txt"]) if "type" in self.config: self.type_combo.setCurrentText(self.config["type"]) else: self.type_combo.setCurrentText("json") self.time_combo = QComboBox() self.time_combo.addItems(["3", "5", "10", "15"]) if "time" in self.config: self.time_combo.setCurrentText(self.config["time"]) else: self.time_combo.setCurrentText("5") self.mr_check = QCheckBox("IP去重") self.mr_check.setChecked(self.config.get("mr", "1") == "1") self.sign_edit = QLineEdit() self.sign_edit.setText(self.config.get("sign", "")) # 添加表单行 form_layout.addRow("提取秘钥 (secret):", self.secret_edit) form_layout.addRow("返回类型 (type):", self.type_combo) form_layout.addRow("IP使用时长 (time):", self.time_combo) form_layout.addRow("", self.mr_check) # 单独一行显示复选框 form_layout.addRow("用户签名 (sign):", self.sign_edit) layout.addLayout(form_layout) # 按钮 btn_layout = QHBoxLayout() save_btn = QPushButton("保存") save_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; padding: 8px 16px; border-radius: 4px; } QPushButton:hover { background-color: #45a049; } """) save_btn.clicked.connect(self.save_config) cancel_btn = QPushButton("取消") cancel_btn.setStyleSheet(""" QPushButton { background-color: #f44336; color: white; padding: 8px 16px; border-radius: 4px; } QPushButton:hover { background-color: #d32f2f; } """) cancel_btn.clicked.connect(self.reject) btn_layout.addStretch() btn_layout.addWidget(save_btn) btn_layout.addWidget(cancel_btn) btn_layout.addStretch() layout.addLayout(btn_layout) self.setLayout(layout) def save_config(self): self.config = { "secret": self.secret_edit.text(), "type": self.type_combo.currentText(), "time": self.time_combo.currentText(), "mr": "1" if self.mr_check.isChecked() else "0", "sign": self.sign_edit.text() } self.config_manager.save_proxy_config(self.config) self.accept() class DouyinDataCollector(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("多平台数据采集工具") self.setGeometry(100, 100, 1400, 800) self.setStyleSheet(""" QMainWindow { background-color: #f5f5f5; } QTabWidget::pane { border: 1px solid #e0e0e0; background: white; } QTabBar::tab { background: #e0e0e0; border: 1px solid #e0e0e0; padding: 8px 16px; border-top-left-radius: 4px; border-top-right-radius: 4px; margin-right: 2px; } QTabBar::tab:selected { background: white; border-bottom: none; } QTabBar::tab:!selected { margin-top: 4px; } """) self.accounts = [] self.config_manager = ConfigManager() self.current_platform = "douyin" self.init_ui() def init_ui(self): main_widget = QWidget() main_layout = QVBoxLayout() main_layout.setContentsMargins(15, 15, 15, 15) main_layout.setSpacing(15) # 标题 title_label = QLabel("多平台数据采集系统") title_label.setFont(QFont("Arial", 18, QFont.Weight.Bold)) title_label.setAlignment(Qt.AlignmentFlag.AlignCenter) title_label.setStyleSheet(""" QLabel { color: #2196F3; padding: 10px; background-color: white; border-radius: 8px; border: 1px solid #e0e0e0; } """) main_layout.addWidget(title_label) ####################### # 平台选择和代理配置按钮 top_control_layout = QHBoxLayout() # 平台选择器 platform_layout = QHBoxLayout() platform_layout.addWidget(QLabel("选择平台:")) self.platform_combo = QComboBox() self.platform_combo.addItem("抖音", "douyin") self.platform_combo.addItem("小红书", "xiaohongshu") self.platform_combo.currentIndexChanged.connect(self.switch_platform) platform_layout.addWidget(self.platform_combo) top_control_layout.addLayout(platform_layout) # 代理配置按钮 top_control_layout.addStretch() self.proxy_config_btn = self.create_button( "天启代理配置", "#FF9800", "#F57C00", "#E65100", QStyle.StandardPixmap.SP_ComputerIcon) self.proxy_config_btn.clicked.connect(self.show_proxy_config) top_control_layout.addWidget(self.proxy_config_btn) main_layout.addLayout(top_control_layout) # 账号标签页 self.tabs = QTabWidget() self.tabs.setTabPosition(QTabWidget.TabPosition.North) self.tabs.setDocumentMode(True) for i in range(6): account_widget = AccountWidget(i, self.config_manager, self.current_platform) self.accounts.append(account_widget) tab_name = self.get_tab_name(i) # 添加平台参数 self.tabs.addTab(account_widget, tab_name) self.tabs.currentChanged.connect(self.update_tab_names) main_layout.addWidget(self.tabs, 1) # 状态栏 self.status_bar = self.statusBar() self.status_bar.setStyleSheet("background-color: #e0e0e0; color: #666; padding: 5px;") self.status_bar.showMessage("就绪 | 数据采集工具 v1.1 | 配置已自动保存") main_widget.setLayout(main_layout) self.setCentralWidget(main_widget) def switch_platform(self): """切换平台时的处理""" new_platform = self.platform_combo.currentData() self.current_platform = new_platform # 更新所有账号的平台 for account in self.accounts: account.switch_platform(new_platform) self.status_bar.showMessage(f"已切换到{self.platform_combo.currentText()}平台") def create_button(self, text, bg_color, hover_color, pressed_color, icon): button = QPushButton(text) button.setIcon(self.style().standardIcon(icon)) button.setStyleSheet(f""" QPushButton {{ background-color: {bg_color}; color: white; padding: 10px 20px; border-radius: 6px; font-weight: bold; font-size: 14px; }} QPushButton:hover {{ background-color: {hover_color}; }} QPushButton:pressed {{ background-color: {pressed_color}; }} """) # 存储基础颜色用于动画 button.setProperty("base_color", bg_color) return button def get_tab_name(self, index): config = self.config_manager.load_account_config(index,self.current_platform) tab_name = f"账号 {index + 1}" if config and config.get("remark"): tab_name = f"{tab_name}: {config['remark']}" return tab_name def update_tab_names(self): for i in range(self.tabs.count()): self.tabs.setTabText(i, self.get_tab_name(i)) def show_proxy_config(self): try: dialog = ProxyConfigDialog(self.config_manager, self) dialog.setWindowModality(Qt.WindowModality.ApplicationModal) if dialog.exec() == QDialog.DialogCode.Accepted: self.status_bar.showMessage("代理配置已保存") self.animate_button(self.proxy_config_btn, "#E65100") except Exception as e: QMessageBox.critical(self, "错误", f"打开代理配置时出错: {str(e)}") def animate_button(self, button, color): original_style = button.styleSheet() button.setStyleSheet(original_style.replace( f"background-color: {button.property('base_color')}", f"background-color: {color}") ) QTimer.singleShot(500, lambda: button.setStyleSheet(original_style)),因为我用的平台是抖音,然后用账号1进行采集数据,视频列表,切换到小红书平台,发现账号1的框里面在转圈圈,用平台是小红书,用账号2进行采集数据视频列表,切换到抖音,发现账号2的框里面在转圈圈,不要停止采集,切换到后台就行

filetype

我已经发了你三遍serber.py了 为什么你还是看不见??????# E:\AI_System\web_ui\server.py (完整修复版) import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime import subprocess from pathlib import Path from functools import wraps from concurrent.futures import ThreadPoolExecutor import logging.handlers # ========== 关键修复1: 最先执行eventlet猴子补丁 ========== try: import eventlet eventlet.monkey_patch() # 必须在所有导入之前执行 print("✅ Eventlet monkey patch applied at startup") except ImportError: print("⚠️ Eventlet not installed, using threading mode") pass # 修复1:更新依赖包列表 REQUIRED_PACKAGES = [ 'flask', 'flask_socketio', 'flask_limiter', 'psutil', 'eventlet', 'waitress' ] def check_dependencies(): """增强依赖检查功能""" missing = [] for package in REQUIRED_PACKAGES: try: __import__(package) except ImportError: missing.append(package) if missing: print(f"❌ 缺少必要的依赖包: {', '.join(missing)}") print("请运行以下命令安装依赖:") print(f"pip install {' '.join(missing)}") sys.exit(1) if __name__ == '__main__': check_dependencies() # 在启动前检查依赖 # 现在导入其他模块 from flask import Flask, jsonify, request, render_template from flask_socketio import SocketIO, emit from flask_limiter import Limiter from flask_limiter.util import get_remote_address # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '0.0.0.0' self.PORT = 5000 self.LOG_LEVEL = 'DEBUG' self.SECRET_KEY = os.getenv('SECRET_KEY', 'your_secret_key_here') self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' self.MAX_WORKERS = 4 # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.TEMPLATE_DIR = self.BASE_DIR / 'web_ui' / 'templates' def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) def register_coordinator(coord): global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 线程安全装饰器 ========== def synchronized(lock): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): with lock: return func(*args, **kwargs) return wrapper return decorator # ========== 日志系统 ========== def setup_logger(): """优化日志配置""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG)) # 清除所有现有处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = logging.handlers.TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 设置Flask和SocketIO日志 flask_logger = logging.getLogger('werkzeug') flask_logger.setLevel(logging.WARNING) socketio_logger = logging.getLogger('engineio') socketio_logger.setLevel(logging.WARNING) return logger logger = setup_logger() # ========== 环境管理器 ========== class EnvironmentManager: """独立的环境管理器类""" def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } self.healthy = True self.lock = threading.Lock() @synchronized(threading.Lock()) def start(self): logger.info("环境管理器已启动") @synchronized(threading.Lock()) def get_state(self): # 更新模拟数据 self.state['temperature'] = round(20 + 5 * (time.time() % 10) / 10, 1) self.state['humidity'] = round(40 + 10 * (time.time() % 10) / 10, 1) self.state['light_level'] = round(70 + 10 * (time.time() % 10) / 10, 1) self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state @synchronized(threading.Lock()) def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False def is_healthy(self): return self.healthy # ========== 系统初始化 ========== class SystemInitializer: def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None self.life_lock = threading.Lock() def initialize_system_paths(self): sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path} - 已跳过") def initialize_environment_manager(self): try: env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): logger.info("✅ 模拟AI核心初始化") self.ai_core = type('AICore', (), { 'status': 'running', 'get_state': lambda: {"status": "running", "model": "gpt-3.5-turbo"} })() def initialize_hardware_manager(self): logger.info("✅ 模拟硬件管理器初始化") self.hardware_manager = type('HardwareManager', (), { 'get_status': lambda: { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "gpu_usage": 0 } })() @synchronized(lock=threading.Lock()) def initialize_life_scheduler(self): logger.info("✅ 模拟生活调度器初始化") self.life_scheduler = type('LifeScheduler', (), { 'get_status': lambda: { "current_activity": "thinking", "next_activity": "learning", "energy": 85 } })() @synchronized(lock=threading.Lock()) def initialize_ai_agent(self): logger.info("✅ 模拟AI智能体初始化") self.ai_agent = type('AIAgent', (), { 'process_input': lambda self, input, user_id: f"你好{user_id},我收到了你的消息: '{input}'" })() def start_evolution_monitor(self): logger.info("✅ 模拟进化监视器启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) @app.config['LIMITER'].limit("10 per minute") def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) @app.config['LIMITER'].limit("5 per minute") def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) # 健康检查路由 @app.route('/health') def health_check(): return jsonify({"status": "healthy", "timestamp": datetime.datetime.now().isoformat()}) # 系统状态路由 @app.route('/status') @app.config['LIMITER'].exempt def status(): components = app.config['SYSTEM_COMPONENTS'] system_info = { "uptime": time.time() - app.config['START_TIME'], "ai_core_status": components['ai_core'].status if components['ai_core'] else "uninitialized", "hardware_status": components['hardware_manager'].get_status() if components[ 'hardware_manager'] else "uninitialized", "life_scheduler_status": components['life_scheduler'].get_status() if components[ 'life_scheduler'] else "uninitialized", "environment_status": components['environment_manager'].is_healthy() if components[ 'environment_manager'] else "uninitialized", "platform": platform.platform(), "python_version": sys.version, "memory_usage": psutil.virtual_memory().percent, "cpu_usage": psutil.cpu_percent(), "thread_count": threading.active_count(), "process_id": os.getpid() } return jsonify(system_info) # 核心系统路由 @app.route('/api/core/state') @app.config['LIMITER'].limit("10 per minute") def get_core_state(): ai_core = app.config['SYSTEM_COMPONENTS'].get('ai_core') if not ai_core: return jsonify({"error": "AI核心未初始化"}), 503 return jsonify(ai_core.get_state()) # 生活系统路由 @app.route('/life/dashboard') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') @app.config['LIMITER'].limit("10 per minute") def get_life_status(): life_scheduler = app.config['SYSTEM_COMPONENTS'].get('life_scheduler') if not life_scheduler: return jsonify({"error": "生活调度器未初始化"}), 503 status = life_scheduler.get_status() return jsonify(status) # 聊天路由 @app.route('/chat', methods=['POST']) @app.config['LIMITER'].limit("30 per minute") def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") # 使用线程池异步处理 future = executor.submit(components['ai_agent'].process_input, user_input, user_id) response = future.result(timeout=10) # 10秒超时 return jsonify({"response": response}) except TimeoutError: return jsonify({"error": "处理超时"}), 504 except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 404处理 @app.route('/<path:path>') def catch_all(path): return jsonify({"error": "路由不存在", "path": path}), 404 def register_error_handlers(app): @app.errorhandler(404) def not_found_error(error): return jsonify({"error": "资源未找到", "message": str(error)}), 404 @app.errorhandler(500) def internal_error(error): app.logger.error(f"服务器内部错误: {str(error)}") return jsonify({"error": "服务器内部错误", "message": "请查看日志获取详细信息"}), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 使用线程池处理消息 def process_message(): try: global coordinator if coordinator: return coordinator.process_message(message) else: return f"已收到您的消息: {message}" except Exception as e: logger.error(f"消息处理失败: {str(e)}") return "处理消息时出错" future = executor.submit(process_message) try: response = future.result(timeout=10) socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) except TimeoutError: socketio.emit('agent_response', { 'user_id': user_id, 'response': "处理超时,请重试" }) # ========== 生产环境启动器 ========== def run_production_server(app): try: from waitress import serve logger.info(f"🚀 生产服务器启动: http://{config.HOST}:{config.PORT}") logger.warning("⚠️ 当前运行在生产模式 (Waitress WSGI服务器)") serve(app, host=config.HOST, port=config.PORT, threads=8) except ImportError: logger.error("❌ 缺少生产环境依赖: waitress") logger.info("请运行: pip install waitress") sys.exit(1) # ========== Flask应用工厂 (关键修复) ========== def create_app(): app = Flask( __name__, template_folder=str(config.TEMPLATE_DIR), static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY # 初始化限流器 (修复点: 先创建LIMITER并添加到配置) limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://" ) app.config['LIMITER'] = limiter # 关键修复: 先配置LIMITER system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 配置SocketIO async_mode = 'threading' try: import eventlet async_mode = 'eventlet' logger.info("✅ 使用eventlet异步模式") except ImportError: logger.warning("⚠️ eventlet未安装,使用threading模式,性能可能受影响") socketio = SocketIO( app, cors_allowed_origins="*", async_mode=async_mode, logger=True, engineio_logger=True ) app.config['SOCKETIO'] = socketio # 注册路由和错误处理 register_routes(app) register_error_handlers(app) return app, socketio # ========== 主程序入口 ========== if __name__ == '__main__': try: app, socketio = create_app() setup_websocket_handlers(socketio) if config.DEBUG: logger.info(f"开发服务器运行在 http://{config.HOST}:{config.PORT}") socketio.run( app, host=config.HOST, port=config.PORT, debug=True, use_reloader=False, log_output=True ) else: run_production_server(app) except KeyboardInterrupt: logger.info("服务器关闭") executor.shutdown(wait=False) except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc()) executor.shutdown(wait=False) sys.exit(1)

filetype

import os import cv2 import numpy as np import psutil import time import argparse import json from datetime import datetime import logging import signal import sys import traceback import threading import GPUtil import subprocess import gc import shutil import queue import concurrent.futures import tracemalloc import platform import requests import zipfile class VideoProcessor: def __init__(self, config): self.config = config self.canceled = False self.start_time = time.time() self.frame_counter = 0 self.progress = 0 self.status = "就绪" self.fps = 0.0 self.mem_usage = 0.0 self.cpu_percent = 0.0 self.system_mem_percent = 0.0 self.system_mem_used = 0.0 self.system_mem_available = 0.0 self.gpu_load = 0.0 self.gpu_memory_used = 0.0 self.gpu_memory_total = 0.0 self.logger = logging.getLogger("VideoProcessor") self.resources = [] # 跟踪需要释放的资源 self.monitor_active = False self.monitor_thread = None # 多线程队列 self.frame_queue = queue.Queue(maxsize=self.config.get('queue_size', 30)) self.processed_queue = queue.Queue(maxsize=self.config.get('queue_size', 30)) # CUDA流管理 self.cuda_streams = [] self.cuda_ctx = None # 检测移动环境 self.is_mobile = self.detect_mobile_environment() if self.is_mobile: self.logger.info("检测到移动环境,启用移动端优化配置") # 内存跟踪 if self.config.get('enable_memory_monitor', False): tracemalloc.start() self.logger.info("内存跟踪已启用") # 注册信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def detect_mobile_environment(self): """检测是否在移动环境中运行""" try: system = platform.system().lower() uname = os.uname() # Android检测 if 'linux' in system and 'android' in uname.version.lower(): self.logger.info("检测到Android环境") return True # iOS检测 if system == 'darwin' and 'ios' in uname.machine.lower(): self.logger.info("检测到iOS环境") return True return False except Exception as e: self.logger.warning(f"移动环境检测失败: {str(e)}") return False def signal_handler(self, signum, frame): """处理中断信号""" self.logger.warning(f"接收到中断信号: {signum}, 正在优雅地停止...") self.cancel() sys.exit(1) def start_resource_monitor(self, interval=1): """启动资源监控线程""" self.monitor_active = True self.monitor_thread = threading.Thread( target=self.monitor_resources, args=(interval,), daemon=True ) self.monitor_thread.start() self.logger.info("资源监控线程已启动") def stop_resource_monitor(self): """停止资源监控线程""" if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_active = False self.monitor_thread.join(timeout=2.0) self.logger.info("资源监控线程已停止") def monitor_resources(self, interval=1): """资源监控线程函数""" self.logger.info("资源监控开始") print("\n资源监控 | 时间戳 | CPU使用率 | 内存使用 | GPU使用率 | GPU显存") print("-" * 70) while self.monitor_active: try: # CPU监控 cpu_percent = psutil.cpu_percent(interval=None) # 内存监控 mem = psutil.virtual_memory() mem_usage = f"{mem.used / (1024**3):.1f}GB/{mem.total / (1024**3):.1f}GB" # GPU监控 gpu_info = "" try: gpus = GPUtil.getGPUs() if gpus: gpu = gpus[0] gpu_info = f"{gpu.load*100:.1f}% | {gpu.memoryUsed:.1f}MB/{gpu.memoryTotal:.0f}MB" # 更新GPU状态 self.gpu_load = gpu.load * 100 self.gpu_memory_used = gpu.memoryUsed self.gpu_memory_total = gpu.memoryTotal else: gpu_info = "No GPU" except Exception as e: gpu_info = f"Error: {str(e)}" timestamp = time.strftime('%H:%M:%S') print(f"{timestamp} | {cpu_percent:6.1f}% | {mem_usage:^15} | {gpu_info}") self.logger.info(f"资源监控 | {timestamp} | CPU: {cpu_percent}% | 内存: {mem_usage} | GPU: {gpu_info}") # 内存泄漏检测 if self.config.get('enable_memory_monitor', False): snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') self.logger.info("内存分配Top 10:") for stat in top_stats[:10]: self.logger.info(str(stat)) time.sleep(interval) except Exception as e: self.logger.error(f"资源监控出错: {str(e)}") time.sleep(5) # 出错后等待5秒再重试 def init_cuda(self): """初始化CUDA环境""" if not self.config.get('use_gpu_processing', False) or self.is_mobile: return try: device_id = self.config.get('gpu_device_index', 0) if cv2.cuda.getCudaEnabledDeviceCount() > device_id: # 设置CUDA设备 cv2.cuda.setDevice(device_id) device = cv2.cuda.DeviceInfo(device_id) self.logger.info(f"使用GPU设备: {device.name()}") # 创建CUDA流 num_streams = self.config.get('cuda_streams', 4) self.cuda_streams = [cv2.cuda_Stream() for _ in range(num_streams)] self.logger.info(f"已创建 {num_streams} 个CUDA流") # 创建CUDA上下文 self.cuda_ctx = cv2.cuda.Device(device_id).createContext() self.logger.info("CUDA上下文已创建") else: self.logger.warning("请求的GPU设备不可用,将使用CPU处理") self.config['use_gpu_processing'] = False except Exception as e: self.logger.error(f"CUDA初始化失败: {str(e)}") self.config['use_gpu_processing'] = False def open_video_with_acceleration(self, file_path): """使用硬件加速打开视频""" # 移动端使用专用API if self.is_mobile: self.logger.info("移动端: 使用Android专用API") try: # Android专用API cap = cv2.VideoCapture(file_path, cv2.CAP_ANDROID) if cap.isOpened(): self.logger.info("Android专用API打开成功") self.resources.append(cap) return cap else: self.logger.warning("Android专用API打开失败,尝试默认方式") except: self.logger.warning("Android专用API不可用,使用默认方式") # 桌面端或移动端备选方案 if self.config.get('hardware_acceleration', 'disable') == 'disable': cap = cv2.VideoCapture(file_path) self.resources.append(cap) return cap cap = cv2.VideoCapture() self.resources.append(cap) acceleration = { 'auto': cv2.VIDEO_ACCELERATION_ANY, 'any': cv2.VIDEO_ACCELERATION_ANY, 'nvidia': cv2.VIDEO_ACCELERATION_NVIDIA, 'intel': cv2.VIDEO_ACCELERATION_INTEL, 'vaapi': cv2.VIDEO_ACCELERATION_VAAPI }.get(self.config.get('hardware_acceleration', 'auto'), cv2.VIDEO_ACCELERATION_ANY) params = [ cv2.CAP_PROP_HW_ACCELERATION, acceleration, cv2.CAP_PROP_HW_DEVICE, self.config.get('gpu_device_index', 0) ] # 降低延迟的优化参数 if self.config.get('reduce_latency', True): params.extend([ cv2.CAP_PROP_BUFFERSIZE, self.config.get('buffer_size', 3), cv2.CAP_PROP_FPS, self.config.get('target_fps', 30) ]) # MJPEG压缩 if self.config.get('use_mjpeg', True): params.extend([ cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G') ]) # 设置解码线程数 decoding_threads = self.config.get('decoding_threads', 0) if decoding_threads > 0: params.extend([cv2.CAP_PROP_FFMPEG_THREADS, decoding_threads]) try: cap.open(file_path, apiPreference=cv2.CAP_FFMPEG, params=params) # Intel专用加速 if self.config.get('hardware_acceleration', '') == 'intel' and cap.isOpened(): cap.set(cv2.CAP_PROP_INTEL_VIDEO_SRC_HW_ACCEL, 1) except Exception as e: self.logger.error(f"硬件加速打开失败: {str(e)}, 使用默认方式") cap = cv2.VideoCapture(file_path) return cap def update_system_stats(self): """更新系统资源统计""" self.cpu_percent = psutil.cpu_percent(interval=0.1) mem = psutil.virtual_memory() self.system_mem_percent = mem.percent self.system_mem_used = mem.used / (1024 ** 3) # GB self.system_mem_available = mem.available / (1024 ** 3) # GB def print_progress(self): """美观的进度显示""" elapsed = time.time() - self.start_time eta = (100 - self.progress) * elapsed / max(1, self.progress) if self.progress > 0 else 0 # 进度条 bar_length = 30 filled_length = int(bar_length * self.progress / 100) bar = '█' * filled_length + '-' * (bar_length - filled_length) # 队列状态 queue_status = f"Q: {self.frame_queue.qsize()}/{self.processed_queue.qsize()}" progress_str = ( f"进度: |{bar}| {self.progress}% " f"| 速度: {self.fps:.1f}fps " f"| 用时: {elapsed:.1f}s " f"| 剩余: {eta:.1f}s " f"| CPU: {self.cpu_percent:.0f}% " f"| 内存: {self.mem_usage:.1f}MB " f"| GPU: {self.gpu_load:.1f}% " f"| {queue_status}" ) print(f"\r{progress_str}", end="") self.logger.info(progress_str) def capture_thread(self, cap, total_frames): """视频捕获线程 (生产者)""" frame_idx = 0 while cap.isOpened() and not self.canceled and frame_idx < total_frames: ret, frame = cap.read() if not ret: break # 放入队列,非阻塞方式防止死锁 try: self.frame_queue.put((frame_idx, frame), timeout=1.0) frame_idx += 1 except queue.Full: if self.canceled: break time.sleep(0.01) # 发送结束信号 self.frame_queue.put((None, None)) self.logger.info(f"捕获线程完成,共捕获 {frame_idx} 帧") def processing_thread(self, output_resolution): """视频处理线程 (消费者)""" output_width, output_height = output_resolution while not self.canceled: try: # 获取帧,带超时防止死锁 frame_idx, frame = self.frame_queue.get(timeout=2.0) # 结束信号 if frame_idx is None: self.processed_queue.put((None, None)) self.frame_queue.task_done() break # 处理帧 try: # 移动端使用轻量级算法 if self.is_mobile: # 移动端优化:使用Canny边缘检测替代复杂特征检测 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 100, 200) # 将边缘检测结果与原帧合并 frame[:, :, 0] = np.minimum(frame[:, :, 0] + edges, 255) frame[:, :, 1] = np.minimum(frame[:, :, 1] + edges, 255) frame[:, :, 2] = np.minimum(frame[:, :, 2] + edges, 255) # 移动端使用快速插值方法 processed_frame = cv2.resize(frame, output_resolution, interpolation=cv2.INTER_LINEAR) else: # 桌面端使用完整算法 if self.config.get('use_gpu_processing', False) and self.cuda_streams: # 选择CUDA流 (轮询) stream_idx = frame_idx % len(self.cuda_streams) stream = self.cuda_streams[stream_idx] # 上传到GPU gpu_frame = cv2.cuda_GpuMat() gpu_frame.upload(frame, stream=stream) # GPU处理 if output_resolution: gpu_frame = cv2.cuda.resize(gpu_frame, output_resolution, stream=stream) # 下载回CPU processed_frame = gpu_frame.download(stream=stream) else: # CPU处理 if output_resolution: processed_frame = cv2.resize(frame, output_resolution) else: processed_frame = frame # 放入已处理队列 self.processed_queue.put((frame_idx, processed_frame), timeout=1.0) except cv2.error as e: if 'CUDA' in str(e): self.logger.error(f"GPU处理失败: {str(e)},切换到CPU模式") self.config['use_gpu_processing'] = False # 使用CPU重试 processed_frame = cv2.resize(frame, output_resolution) if output_resolution else frame self.processed_queue.put((frame_idx, processed_frame), timeout=1.0) else: self.logger.error(f"处理帧 {frame_idx} 失败: {str(e)}") except Exception as e: self.logger.error(f"处理帧 {frame_idx} 时出错: {str(e)}") self.frame_queue.task_done() except queue.Empty: if self.canceled: break except Exception as e: self.logger.error(f"处理线程出错: {str(e)}") self.logger.info("处理线程退出") def writer_thread(self, out, total_frames): """写入线程""" frame_idx = 0 last_log_time = time.time() while not self.canceled and frame_idx < total_frames: try: # 获取已处理帧 idx, processed_frame = self.processed_queue.get(timeout=2.0) # 结束信号 if idx is None: break # 写入输出 if processed_frame is not None: out.write(processed_frame) # 更新计数 self.frame_counter += 1 frame_idx += 1 # 计算帧率 elapsed = time.time() - self.start_time self.fps = self.frame_counter / elapsed if elapsed > 0 else 0 # 更新内存使用 process = psutil.Process(os.getpid()) self.mem_usage = process.memory_info().rss / (1024 ** 2) # MB # 更新系统状态 self.update_system_stats() # 更新进度 self.progress = int(frame_idx * 100 / total_frames) # 定期打印进度 current_time = time.time() if current_time - last_log_time > 1.0 or frame_idx % 50 == 0: self.print_progress() last_log_time = current_time # 内存管理 if frame_idx % 100 == 0: gc.collect() # 检查内存使用情况 if self.system_mem_percent > 90: self.logger.warning(f"系统内存使用超过90%! (当前: {self.system_mem_percent}%)") print(f"\n警告: 系统内存使用过高 ({self.system_mem_percent}%)") self.processed_queue.task_done() except queue.Empty: if self.canceled: break except Exception as e: self.logger.error(f"写入线程出错: {str(e)}") self.logger.info(f"写入线程完成,共写入 {frame_idx} 帧") def run(self): try: self.status = "处理中..." self.logger.info("视频处理开始") self.logger.info(f"主视频: {self.config['main_video']}") self.logger.info(f"副视频: {self.config['sub_video']}") self.logger.info(f"输出文件: {self.config['output_path']}") self.start_time = time.time() # 初始化CUDA self.init_cuda() # 启动资源监控 self.start_resource_monitor() # 打开主视频 self.logger.info("正在打开主视频...") main_cap = self.open_video_with_acceleration(self.config['main_video']) if not main_cap.isOpened(): self.status = "无法打开主视频文件!" self.logger.error(f"无法打开主视频文件: {self.config['main_video']}") return False # 获取主视频信息 main_fps = main_cap.get(cv2.CAP_PROP_FPS) main_width = int(main_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) main_height = int(main_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) main_total_frames = int(main_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.logger.info(f"主视频信息: {main_width}x{main_height}@{main_fps:.1f}fps, 总帧数: {main_total_frames}") # 打开副视频 self.logger.info("正在打开副视频...") sub_cap = self.open_video_with_acceleration(self.config['sub_video']) if not sub_cap.isOpened(): self.status = "无法打开副视频文件!" self.logger.error(f"无法打开副视频文件: {self.config['sub_video']}") return False # 获取副视频信息 sub_fps = sub_cap.get(cv2.CAP_PROP_FPS) sub_width = int(sub_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) sub_height = int(sub_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) sub_total_frames = int(sub_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.logger.info(f"副视频信息: {sub_width}x{sub_height}@{sub_fps:.1f}fps, 总帧数: {sub_total_frames}") # 创建输出目录 output_dir = os.path.dirname(self.config['output_path']) if output_dir and not os.path.exists(output_dir): try: os.makedirs(output_dir) self.logger.info(f"已创建输出目录: {output_dir}") except Exception as e: self.status = f"无法创建输出目录: {output_dir}" self.logger.error(f"创建输出目录失败: {str(e)}") return False # 创建输出视频 output_width, output_height = self.config['output_resolution'] fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(self.config['output_path'], fourcc, main_fps, (output_width, output_height)) self.resources.append(out) if not out.isOpened(): self.status = "无法创建输出视频文件!请检查分辨率设置。" self.logger.error(f"无法创建输出视频: {self.config['output_path']}, 分辨率: {output_width}x{output_height}") return False # 计算主视频分段参数 if self.config['main_segment_type'] == '秒': segment_length_main = int(float(self.config['segment_a']) * main_fps) else: segment_length_main = int(self.config['segment_a']) b1 = int(self.config['b1']) b2 = int(self.config['b2']) replace_frame_count = b2 - b1 + 1 # 计算副视频分段参数 if self.config['sub_segment_type'] == '秒': segment_length_sub = int(float(self.config['segment_c']) * sub_fps) else: segment_length_sub = int(self.config['segment_c']) d = int(self.config['d']) # 计算主视频段数 segments_main = (main_total_frames + segment_length_main - 1) // segment_length_main # 计算副视频段数 segments_sub = (sub_total_frames + segment_length_sub - 1) // segment_length_sub # 检查段数是否匹配 if segments_main > segments_sub: if self.config['sub_option'] == '循环使用': self.logger.warning(f"副视频段数不足({segments_sub}),将循环使用以满足主视频段数({segments_main})") else: self.status = "副视频段数不足,无法完成替换!" self.logger.error(f"副视频段数不足: {segments_sub} < {segments_main}") return False # 初始化性能监控 process = psutil.Process(os.getpid()) self.logger.info("="*50) self.logger.info("开始视频处理") self.logger.info(f"主视频: {self.config['main_video']} ({main_total_frames}帧, {main_fps:.1f}fps)") self.logger.info(f"副视频: {self.config['sub_video']} ({sub_total_frames}帧, {sub_fps:.1f}fps)") self.logger.info(f"输出文件: {self.config['output_path']}") self.logger.info(f"分辨率: {output_width}x{output_height}") self.logger.info(f"主视频分段数: {segments_main}, 每段{segment_length_main}帧") self.logger.info(f"替换帧范围: {b1}-{b2} (每段替换{replace_frame_count}帧)") self.logger.info(f"副视频分段数: {segments_sub}, 每段{segment_length_sub}帧") self.logger.info(f"副视频起始帧: {d}") self.logger.info(f"使用GPU处理: {self.config.get('use_gpu_processing', False)}") self.logger.info(f"CUDA流数量: {len(self.cuda_streams)}") self.logger.info(f"移动环境: {self.is_mobile}") self.logger.info("="*50) print("\n" + "="*50) print("开始视频处理") print(f"主视频: {self.config['main_video']} ({main_total_frames}帧, {main_fps:.1f}fps)") print(f"副视频: {self.config['sub_video']} ({sub_total_frames}帧, {sub_fps:.1f}fps)") print(f"输出文件: {self.config['output_path']}") print(f"分辨率: {output_width}x{output_height}") print(f"主视频分段数: {segments_main}, 每段{segment_length_main}帧") print(f"替换帧范围: {b1}-{b2} (每段替换{replace_frame_count}帧)") print(f"副视频分段数: {segments_sub}, 每段{segment_length_sub}帧") print(f"副视频起始帧: {d}") print(f"使用GPU处理: {self.config.get('use_gpu_processing', False)}") print(f"CUDA流数量: {len(self.cuda_streams)}") print(f"移动环境: {self.is_mobile}") print("="*50 + "\n") # 启动多线程处理 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 启动捕获线程 capture_future = executor.submit( self.capture_thread, main_cap, main_total_frames ) # 启动处理线程 processing_future = executor.submit( self.processing_thread, (output_width, output_height) ) # 启动写入线程 writer_future = executor.submit( self.writer_thread, out, main_total_frames ) # 等待所有线程完成 concurrent.futures.wait( [capture_future, processing_future, writer_future], return_when=concurrent.futures.ALL_COMPLETED ) if not self.canceled: self.status = "处理完成" self.progress = 100 self.print_progress() print(f"\n\n处理完成!输出文件: {self.config['output_path']}") self.logger.info(f"处理完成! 总帧数: {self.frame_counter}, 耗时: {time.time() - self.start_time:.1f}秒") self.logger.info(f"输出文件: {self.config['output_path']}") return True return False except Exception as e: self.status = f"处理过程中发生错误: {str(e)}" error_trace = traceback.format_exc() self.logger.error(f"处理过程中发生错误: {str(e)}") self.logger.error(f"错误详情:\n{error_trace}") print(f"\n\n错误: {str(e)}") return False finally: self.stop_resource_monitor() self.release_resources() if self.config.get('enable_memory_monitor', False): tracemalloc.stop() def release_resources(self): """释放所有资源""" self.logger.info("正在释放资源...") for resource in self.resources: try: if hasattr(resource, 'release'): resource.release() elif hasattr(resource, 'close'): resource.close() except Exception as e: self.logger.warning(f"释放资源时出错: {str(e)}") # 释放CUDA资源 if self.cuda_ctx: try: self.cuda_ctx.destroy() self.logger.info("CUDA上下文已释放") except Exception as e: self.logger.warning(f"释放CUDA上下文时出错: {str(e)}") self.resources = [] self.logger.info("资源已释放") def cancel(self): """取消处理""" self.canceled = True self.status = "正在取消..." self.logger.warning("用户请求取消处理") print("\n正在取消处理...") # 清空队列 while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() self.frame_queue.task_done() except queue.Empty: break while not self.processed_queue.empty(): try: self.processed_queue.get_nowait() self.processed_queue.task_done() except queue.Empty: break self.stop_resource_monitor() self.release_resources() def get_video_info(file_path): """获取视频文件信息""" cap = None try: cap = cv2.VideoCapture(file_path) if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps if fps > 0 else 0 return { "width": width, "height": height, "fps": fps, "frame_count": frame_count, "duration": duration } return None except Exception as e: print(f"获取视频信息时出错: {str(e)}") return None finally: if cap and cap.isOpened(): cap.release() def validate_config(config): """验证配置参数""" # 检查文件存在 if not os.path.exists(config['main_video']): print(f"错误: 主视频文件不存在 - {config['main_video']}") return False if not os.path.exists(config['sub_video']): print(f"错误: 副视频文件不存在 - {config['sub_video']}") return False # 检查输出目录 output_dir = os.path.dirname(config['output_path']) if output_dir and not os.path.exists(output_dir): try: os.makedirs(output_dir) print(f"已创建输出目录: {output_dir}") except: print(f"错误: 无法创建输出目录 - {output_dir}") return False # 检查参数有效性 try: # 主视频参数 segment_a = float(config['segment_a']) if segment_a <= 0: print("错误: 分段长度必须大于0!") return False b1 = int(config['b1']) b2 = int(config['b2']) if b1 < 0 or b2 < 0: print("错误: 帧索引不能为负数!") return False if b1 > b2: print("错误: 替换开始帧(b1)必须小于或等于替换结束帧(b2)!") return False # 副视频参数 segment_c = float(config['segment_c']) if segment_c <= 0: print("错误: 分段长度必须大于0!") return False d = int(config['d']) if d < 0: print("错误: 帧索引不能为负数!") return False # 分辨率 width = int(config['output_resolution'][0]) height = int(config['output_resolution'][1]) if width <= 0 or height <= 0: print("错误: 分辨率必须大于0!") return False return True except ValueError: print("错误: 请输入有效的数字参数!") return False def save_config(config, file_path): """保存配置到文件""" try: with open(file_path, 'w') as f: json.dump(config, f, indent=2) print(f"配置已保存到: {file_path}") except Exception as e: print(f"保存配置时出错: {str(e)}") def load_config(file_path): """从文件加载配置""" try: with open(file_path, 'r') as f: config = json.load(f) # 确保配置中包含所有必要字段 required_keys = [ 'main_video', 'sub_video', 'output_path', 'main_segment_type', 'segment_a', 'b1', 'b2', 'sub_segment_type', 'segment_c', 'd', 'sub_option', 'output_resolution' ] for key in required_keys: if key not in config: print(f"警告: 配置文件中缺少 '{key}' 参数") return config except FileNotFoundError: print(f"错误: 配置文件不存在 - {file_path}") except Exception as e: print(f"加载配置时出错: {str(e)}") return None def create_default_config(): """创建默认配置""" return { "main_video": "main_video.mp4", "sub_video": "sub_video.mp4", "output_path": "output/output_video.mp4", "main_segment_type": "秒", # 默认按秒分段 "segment_a": "1", # 默认1秒 "b1": "1", # 默认替换开始帧 "b2": "1", # 默认替换结束帧 "sub_segment_type": "帧", # 默认按帧分段 "segment_c": "1", # 默认1帧 "d": "1", # 默认起始帧 "sub_option": "循环使用", "output_resolution": [1280, 720], "hardware_acceleration": "auto", "gpu_device_index": 0, "reduce_latency": True, "decoding_threads": 4, "use_gpu_processing": True, "cuda_streams": 4, "queue_size": 30, "buffer_size": 3, "target_fps": 30, "use_mjpeg": True, "enable_memory_monitor": False, "mobile_optimized": True # 新增移动端优化标志 } def detect_hardware_acceleration(): """更全面的硬件加速支持检测""" print("\n=== 硬件加速支持检测 ===") print(f"OpenCV版本: {cv2.__version__}") # 检测CUDA支持 if cv2.cuda.getCudaEnabledDeviceCount() > 0: print("CUDA支持: 可用") for i in range(cv2.cuda.getCudaEnabledDeviceCount()): try: device = cv2.cuda.getDevice(i) print(f" 设备 {i}: {device.name()}, 计算能力: {device.majorVersion()}.{device.minorVersion()}") except: print(f" 设备 {i}: 信息获取失败") else: print("CUDA支持: 不可用") # 检测OpenCL支持 print(f"OpenCL支持: {'可用' if cv2.ocl.haveOpenCL() else '不可用'}") # 获取FFMPEG信息 try: result = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True) ffmpeg_version = result.stdout.split('\n')[0] print(f"FFMPEG版本: {ffmpeg_version}") except: print("FFMPEG版本: 未找到") # 检测可用加速类型 acceleration_types = { 'NVIDIA': cv2.VIDEO_ACCELERATION_NVIDIA, 'Intel': cv2.VIDEO_ACCELERATION_INTEL, 'VAAPI': cv2.VIDEO_ACCELERATION_VAAPI, 'ANY': cv2.VIDEO_ACCELERATION_ANY } print("\n支持的硬件加速类型:") available_accelerations = [] for name, accel_type in acceleration_types.items(): cap = cv2.VideoCapture() try: params = [cv2.CAP_PROP_HW_ACCELERATION, accel_type] test_result = cap.open("", apiPreference=cv2.CAP_FFMPEG, params=params) status = "可用" if test_result else "不可用" print(f"- {name}: {status}") if test_result: available_accelerations.append(name.lower()) except: print(f"- {name}: 检测失败") finally: if cap.isOpened(): cap.release() # 如果没有可用的硬件加速,提供备选方案 if not available_accelerations: print("\n警告: 未检测到任何硬件加速支持!") print("建议:") print("1. 使用软件解码 (设置 hardware_acceleration: 'disable')") print("2. 安装以下备选库:") print(" - NVIDIA GPU 用户: 安装 CUDA Toolkit 和 cuDNN") print(" - Intel GPU 用户: 安装 Intel Media SDK") print(" - AMD/其他 GPU 用户: 安装 VAAPI") print("3. 重新编译OpenCV以支持硬件加速") print("4. 使用支持硬件加速的FFmpeg版本") else: print("\n检测到以下可用的硬件加速类型:") print(", ".join(available_accelerations)) print("在配置文件中设置 'hardware_acceleration' 参数使用") def preview_frame(config, frame_index, is_main=True): """预览指定视频的指定帧""" video_path = config['main_video'] if is_main else config['sub_video'] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"无法打开视频文件: {video_path}") return total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_index >= total_frames: print(f"帧索引超出范围 (最大: {total_frames-1})") cap.release() return cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index) ret, frame = cap.read() if ret: # 创建预览窗口 window_name = f"预览: {'主视频' if is_main else '副视频'} - 帧 {frame_index}" cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) # 调整窗口大小 height, width = frame.shape[:2] max_height = 800 if height > max_height: scale = max_height / height frame = cv2.resize(frame, (int(width * scale), max_height)) cv2.imshow(window_name, frame) cv2.waitKey(0) cv2.destroyAllWindows() else: print(f"无法读取帧 {frame_index}") cap.release() def batch_process(config_file, output_dir): """批量处理多个配置""" try: with open(config_file) as f: batch_configs = json.load(f) except Exception as e: print(f"加载批量配置文件失败: {str(e)}") return total_tasks = len(batch_configs) print(f"\n开始批量处理 {total_tasks} 个任务") for i, config in enumerate(batch_configs): print(f"\n处理任务 {i+1}/{total_tasks}") print(f"主视频: {config['main_video']}") print(f"副视频: {config['sub_video']}") # 添加时间戳到输出文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.basename(config['output_path']) config['output_path'] = os.path.join( output_dir, f"{timestamp}_{base_name}" ) # 验证配置 if not validate_config(config): print(f"任务 {i+1} 配置验证失败,跳过") continue # 创建处理器 processor = VideoProcessor(config) success = processor.run() if success: print(f"任务 {i+1} 完成: {config['output_path']}") else: print(f"任务 {i+1} 失败") # 任务间延迟,让系统冷却 if i < total_tasks - 1: print("\n等待5秒,准备下一个任务...") time.sleep(5) def setup_logging(): """配置日志系统""" log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = os.path.join(log_dir, f"video_processor_{timestamp}.log") logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) logger = logging.getLogger() logger.info(f"日志系统初始化完成, 日志文件: {log_file}") return logger, log_file def install_termux_dependencies(): """安装Termux所需的依赖""" print("正在安装Termux依赖...") commands = [ "pkg update && pkg upgrade -y", "pkg install python libjpeg-turbo libvulkan vulkan-loader-android ffmpeg -y", "pkg install vulkan-tools vulkan-validation-layers -y", "pkg install ocl-icd opencl-headers -y" ] for cmd in commands: print(f"执行: {cmd}") result = subprocess.run(cmd, shell=True) if result.returncode != 0: print(f"命令执行失败: {cmd}") return False print("Termux依赖安装完成") return True def verify_gpu_support(): """验证GPU支持情况""" print("\n验证GPU支持:") # 验证MediaCodec支持 print("\n1. MediaCodec支持:") result = subprocess.run(["ffmpeg", "-hwaccels"], capture_output=True, text=True) if "mediacodec" in result.stdout: print(" ✓ 支持MediaCodec硬件加速") else: print(" ✗ 不支持MediaCodec硬件加速") # 验证Vulkan支持 print("\n2. Vulkan支持:") try: result = subprocess.run(["vulkaninfo"], capture_output=True, text=True) if "deviceName" in result.stdout: print(" ✓ 支持Vulkan API") else: print(" ✗ 不支持Vulkan API") except FileNotFoundError: print(" ✗ vulkaninfo未安装,无法验证Vulkan支持") # 验证OpenCL支持 print("\n3. OpenCL支持:") try: result = subprocess.run(["clinfo"], capture_output=True, text=True) if "Platform Name" in result.stdout: print(" ✓ 支持OpenCL") else: print(" ✗ 不支持OpenCL") except FileNotFoundError: print(" ✗ clinfo未安装,无法验证OpenCL支持") print("\n验证完成") def setup_termux_gpu_acceleration(): """设置Termux GPU加速环境""" print("="*50) print("Termux GPU加速视频处理设置") print("="*50) # 安装基础依赖 if not install_termux_dependencies(): print("依赖安装失败,无法继续设置") return # 验证GPU支持 verify_gpu_support() # 下载并编译CLBlast print("\n编译安装CLBlast...") commands = [ "pkg install git cmake make -y", "git clone https://github.com/CNugteren/CLBlast", "cd CLBlast && mkdir build && cd build", "cmake .. -DCMAKE_INSTALL_PREFIX=$PREFIX", "make install" ] for cmd in commands: print(f"执行: {cmd}") result = subprocess.run(cmd, shell=True) if result.returncode != 0: print(f"命令执行失败: {cmd}") return print("\nGPU加速环境设置完成!") print("现在可以使用以下命令进行硬件加速视频处理:") print("ffmpeg -hwaccel mediacodec -i input.mp4 -c:v h264_mediacodec output.mp4") # 创建示例批处理脚本 with open("gpu_batch_process.sh", "w") as f: f.write("""#!/bin/bash # GPU加速批处理脚本 for f in *.mp4; do echo "处理: $f" ffmpeg -hwaccel mediacodec -i "$f" -c:v h264_mediacodec "gpu_$f" done echo "所有视频处理完成!" """) print("\n已创建批处理脚本: gpu_batch_process.sh") print("使用命令运行: bash gpu_batch_process.sh") def main(): # 设置日志 logger, log_file = setup_logging() # 创建参数解析器 parser = argparse.ArgumentParser(description="专业视频帧替换工具", formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("--config", help="配置文件路径", default="") parser.add_argument("--save-config", help="保存默认配置到文件", action="store_true") parser.add_argument("--background", help="后台运行模式", action="store_true") parser.add_argument("--batch", help="批量处理模式,指定批量配置文件", default="") parser.add_argument("--preview-main", type=int, help="预览主视频指定帧", default=-1) parser.add_argument("--preview-sub", type=int, help="预览副视频指定帧", default=-1) parser.add_argument("--output-dir", help="批量处理输出目录", default="batch_output") parser.add_argument("--enable-gpu", help="启用GPU加速处理", action="store_true") parser.add_argument("--enable-mem-monitor", help="启用内存监控", action="store_true") parser.add_argument("--setup-termux", help="设置Termux GPU加速环境", action="store_true") args = parser.parse_args() # Termux GPU加速设置 if args.setup_termux: setup_termux_gpu_acceleration() return # 保存默认配置 if args.save_config: config_file = args.config if args.config else "video_config.json" default_config = create_default_config() save_config(default_config, config_file) print(f"默认配置已保存到: {config_file}") return # 批量处理模式 if args.batch: if not os.path.exists(args.output_dir): os.makedirs(args.output_dir) batch_process(args.batch, args.output_dir) return # 加载配置 config = None if args.config: config = load_config(args.config) # 如果没有提供配置或加载失败,使用默认配置 if not config: print("使用默认配置") config = create_default_config() # 命令行参数覆盖配置 if args.enable_gpu: config['use_gpu_processing'] = True if args.enable_mem_monitor: config['enable_memory_monitor'] = True # 预览功能 if args.preview_main >= 0: preview_frame(config, args.preview_main, is_main=True) return if args.preview_sub >= 0: preview_frame(config, args.preview_sub, is_main=False) return # 后台模式处理 if args.background: print("后台模式运行中...") logger.info("后台模式启动") # 重定向标准输出到日志 sys.stdout = open(log_file, 'a') sys.stderr = sys.stdout # 显示硬件加速信息 detect_hardware_acceleration() # 显示配置 logger.info("\n当前配置:") logger.info(f"主视频: {config['main_video']}") logger.info(f"副视频: {config['sub_video']}") logger.info(f"输出文件: {config['output_path']}") logger.info(f"主视频分段方式: {config['main_segment_type']}, 长度: {config['segment_a']}") logger.info(f"替换帧范围: b1={config['b1']}, b2={config['b2']}") logger.info(f"副视频分段方式: {config['sub_segment_type']}, 长度: {config['segment_c']}") logger.info(f"副视频起始帧: d={config['d']}") logger.info(f"副视频不足时: {config['sub_option']}") logger.info(f"输出分辨率: {config['output_resolution'][0]}x{config['output_resolution'][1]}") logger.info(f"硬件加速: {config.get('hardware_acceleration', 'auto')}") logger.info(f"解码线程数: {config.get('decoding_threads', 0)}") logger.info(f"使用GPU处理: {config.get('use_gpu_processing', False)}") logger.info(f"CUDA流数量: {config.get('cuda_streams', 0)}") logger.info(f"队列大小: {config.get('queue_size', 30)}") logger.info(f"启用内存监控: {config.get('enable_memory_monitor', False)}") logger.info(f"移动端优化: {config.get('mobile_optimized', True)}") print("\n当前配置:") print(f"主视频: {config['main_video']}") print(f"副视频: {config['sub_video']}") print(f"输出文件: {config['output_path']}") print(f"主视频分段方式: {config['main_segment_type']}, 长度: {config['segment_a']}") print(f"替换帧范围: b1={config['b1']}, b2={config['b2']}") print(f"副视频分段方式: {config['sub_segment_type']}, 长度: {config['segment_c']}") print(f"副视频起始帧: d={config['d']}") print(f"副视频不足时: {config['sub_option']}") print(f"输出分辨率: {config['output_resolution'][0]}x{config['output_resolution'][1]}") print(f"硬件加速: {config.get('hardware_acceleration', 'auto')}") print(f"解码线程数: {config.get('decoding_threads', 0)}") print(f"使用GPU处理: {config.get('use_gpu_processing', False)}") print(f"CUDA流数量: {config.get('cuda_streams', 0)}") print(f"队列大小: {config.get('queue_size', 30)}") print(f"启用内存监控: {config.get('enable_memory_monitor', False)}") print(f"移动端优化: {config.get('mobile_optimized', True)}\n") # 验证配置 if not validate_config(config): logger.error("配置验证失败") return # 显示视频信息 main_info = get_video_info(config['main_video']) if main_info: logger.info("主视频信息:") logger.info(f" 尺寸: {main_info['width']}x{main_info['height']}") logger.info(f" 帧率: {main_info['fps']:.1f} fps") logger.info(f" 总帧数: {main_info['frame_count']}") logger.info(f" 时长: {main_info['duration']:.1f}秒") print("主视频信息:") print(f" 尺寸: {main_info['width']}x{main_info['height']}") print(f" 帧率: {main_info['fps']:.1f} fps") print(f" 总帧数: {main_info['frame_count']}") print(f" 时长: {main_info['duration']:.1f}秒") sub_info = get_video_info(config['sub_video']) if sub_info: logger.info("\n副视频信息:") logger.info(f" 尺寸: {sub_info['width']}x{sub_info['height']}") logger.info(f" 帧率: {sub_info['fps']:.1f} fps") logger.info(f" 总帧数: {sub_info['frame_count']}") logger.info(f" 时长: {sub_info['duration']:.1f}秒") print("\n副视频信息:") print(f" 尺寸: {sub_info['width']}x{sub_info['height']}") print(f" 帧率: {sub_info['fps']:.1f} fps") print(f" 总帧数: {sub_info['frame_count']}") print(f" 时长: {sub_info['duration']:.1f}秒") # 确认开始处理 if not args.background: print("\n按 Enter 开始处理,或输入 'c' 取消...") user_input = input().strip().lower() if user_input == 'c': logger.info("用户取消处理") print("处理已取消") return # 创建并运行处理器 logger.info("开始视频处理") processor = VideoProcessor(config) processor.run() # 保存配置 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") config_file = f"video_config_{timestamp}.json" save_config(config, config_file) logger.info(f"配置已保存: {config_file}") if __name__ == "__main__": main() 分析此代码所需依赖

filetype

# E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime from pathlib import Path from flask import Flask, jsonify, request, render_template from flask_socketio import SocketIO, emit from flask_limiter import Limiter from flask_limiter.util import get_remote_address from functools import wraps from concurrent.futures import ThreadPoolExecutor # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '0.0.0.0' # 改为0.0.0.0允许外部访问 self.PORT = 5000 self.LOG_LEVEL = 'DEBUG' # 调试时使用DEBUG级别 self.SECRET_KEY = os.getenv('SECRET_KEY', 'your_secret_key_here') self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' self.MAX_WORKERS = 4 # 线程池大小 # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.TEMPLATE_DIR = self.BASE_DIR / 'web_ui' / 'templates' # 显式指定模板目录 def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) # 全局线程池 def register_coordinator(coord): """注册意识系统协调器""" global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): """更新UI事件处理""" if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 初始化日志系统 ========== def setup_logger(): """配置全局日志系统""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG)) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = logging.handlers.TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) return logger # 初始化日志 logger = setup_logger() # ========== 线程安全装饰器 ========== def synchronized(lock): """线程安全装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): with lock: return func(*args, **kwargs) return wrapper return decorator # ========== 系统初始化 ========== class SystemInitializer: """负责初始化系统核心组件""" def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None self.life_lock = threading.Lock() # 生活系统线程锁 def initialize_system_paths(self): """初始化系统路径""" sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path} - 已跳过") def initialize_environment_manager(self): """初始化环境管理器""" try: # 环境管理器实现 class EnvironmentManager: def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } self.healthy = True self.lock = threading.Lock() # 环境状态线程锁 @synchronized(threading.Lock()) def start(self): logger.info("环境管理器已启动") @synchronized(threading.Lock()) def get_state(self): # 更新模拟数据 self.state['temperature'] = round(20 + 5 * (time.time() % 10) / 10, 1) self.state['humidity'] = round(40 + 10 * (time.time() % 10) / 10, 1) self.state['light_level'] = round(70 + 10 * (time.time() % 10) / 10, 1) self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state @synchronized(threading.Lock()) def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False def is_healthy(self): return self.healthy env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None # 其他初始化方法保持不变,但添加线程安全装饰器... # initialize_ai_core, initialize_hardware_manager 等方法保持不变 @synchronized(lock=threading.Lock()) def initialize_life_scheduler(self): """初始化生活调度器(线程安全版本)""" # 实现保持不变... @synchronized(lock=threading.Lock()) def _update_life_status(self): """更新生活状态(线程安全版本)""" # 实现保持不变... @synchronized(lock=threading.Lock()) def initialize_ai_agent(self): """初始化AI智能体(线程安全版本)""" # 实现保持不变... def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder=str(config.TEMPLATE_DIR), # 使用配置的模板目录 static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY # 初始化限流器 limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://" ) system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 初始化SocketIO - 使用更高效的eventlet try: import eventlet eventlet.monkey_patch() async_mode = 'eventlet' except ImportError: async_mode = 'threading' logger.warning("eventlet未安装,使用threading模式,性能可能受影响") socketio = SocketIO( app, cors_allowed_origins="*", async_mode=async_mode, logger=logger, engineio_logger=logger ) app.config['SOCKETIO'] = socketio # 注册路由 register_routes(app) register_error_handlers(app) # 注册限流器到应用 app.config['LIMITER'] = limiter return app, socketio # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) @app.config['LIMITER'].limit("10 per minute") def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) @app.config['LIMITER'].limit("5 per minute") def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) setup_environment_broadcast(app) # 健康检查路由 @app.route('/health') def health_check(): return jsonify({"status": "healthy", "timestamp": datetime.datetime.now().isoformat()}) # 系统状态路由 @app.route('/status') @app.config['LIMITER'].exempt # 状态检查不限流 def status(): # 实现保持不变... # 核心系统路由 @app.route('/api/core/state') @app.config['LIMITER'].limit("10 per minute") def get_core_state(): return jsonify(app.config['SYSTEM_COMPONENTS']['ai_core'].get_state()) # 生活系统路由 - 修复路由冲突 @app.route('/life/dashboard') # 修改路由避免冲突 def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') @app.config['LIMITER'].limit("10 per minute") def get_life_status(): # 实现保持不变... # 聊天路由 - 添加异步处理 @app.route('/chat', methods=['POST']) @app.config['LIMITER'].limit("30 per minute") def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") # 使用线程池异步处理 future = executor.submit(components['ai_agent'].process_input, user_input, user_id) response = future.result(timeout=10) # 10秒超时 return jsonify({"response": response}) except TimeoutError: return jsonify({"error": "处理超时"}), 504 except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 添加404处理 @app.route('/<path:path>') def catch_all(path): return jsonify({"error": "路由不存在", "path": path}), 404 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 使用线程池处理消息 def process_message(): try: # 如果有协调器,使用协调器处理 global coordinator if coordinator: return coordinator.process_message(message) else: return f"已收到您的消息: {message}" except Exception as e: logger.error(f"消息处理失败: {str(e)}") return "处理消息时出错" # 提交任务到线程池 future = executor.submit(process_message) try: response = future.result(timeout=10) # 10秒超时 socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) except TimeoutError: socketio.emit('agent_response', { 'user_id': user_id, 'response': "处理超时,请重试" }) # ========== 生产环境启动器 ========== def run_production_server(app): """生产环境部署配置""" from waitress import serve # 添加生产环境日志轮转 handler = logging.handlers.RotatingFileHandler( config.LOG_DIR / 'production.log', maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, encoding='utf-8' ) handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) app.logger.addHandler(handler) logger.info(f"🚀 生产服务器启动: http://{config.HOST}:{config.PORT}") logger.warning("⚠️ 当前运行在生产模式 (Waitress WSGI服务器)") serve(app, host=config.HOST, port=config.PORT, threads=8) # ========== 主程序入口 ========== if __name__ == '__main__': try: app, socketio = create_app() # 设置WebSocket处理器 setup_websocket_handlers(socketio) # 根据配置选择运行模式 if config.DEBUG: # 开发模式 logger.info(f"开发服务器运行在 http://{config.HOST}:{config.PORT}") socketio.run( app, host=config.HOST, port=config.PORT, debug=True, use_reloader=False ) else: # 生产模式 run_production_server(app) except KeyboardInterrupt: logger.info("服务器关闭") executor.shutdown(wait=False) except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc()) executor.shutdown(wait=False) sys.exit(1) 大哥 这是你刚刚写的啊 好坑啊 能修复好不?

filetype

/* Edge Impulse Arduino examples * Copyright (c) 2022 EdgeImpulse Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ // These sketches are tested with 2.0.4 ESP32 Arduino Core // https://github.com/espressif/arduino-esp32/releases/tag/2.0.4 // If your target is limited in memory remove this macro to save 10K RAM #define EIDSP_QUANTIZE_FILTERBANK 0 /* ** NOTE: If you run into TFLite arena allocation issue. ** ** This may be due to may dynamic memory fragmentation. ** Try defining "-DEI_CLASSIFIER_ALLOCATION_STATIC" in boards.local.txt (create ** if it doesn't exist) and copy this file to ** `<ARDUINO_CORE_INSTALL_PATH>/arduino/hardware/<mbed_core>/<core_version>/`. ** ** See ** (https://support.arduino.cc/hc/en-us/articles/360012076960-Where-are-the-installed-cores-located-) ** to find where Arduino installs cores on your machine. ** ** If the problem persists then there's not enough memory for this model and application. */ /* Includes ---------------------------------------------------------------- */ #include <Project-name_inferencing.h> #include <string.h> #include "freertos/FreeRTOS.h" #include "freertos/task.h" /** Audio buffers, pointers and selectors */ typedef struct { int16_t *buffer; uint8_t buf_ready; uint32_t buf_count; uint32_t n_samples; } inference_t; static inference_t inference; static signed short sampleBuffer[EI_CLASSIFIER_RAW_SAMPLE_COUNT]; static bool debug_nn = false; // Set this to true to see e.g. features generated from the raw signal static bool record_status = true; static TaskHandle_t xCaptureTaskHandle = NULL; const int led = 21; const int led_record = 8; const int AUDIO_IN = 0; // 选择 GPIO 0 作为模拟输入 /** * @brief Arduino setup function */ void setup() { // put your setup code here, to run once: Serial.begin(115200); // comment out the below line to cancel the wait for USB connection (needed for native USB) while (!Serial) ; Serial.println("Edge Impulse Inferencing"); // summary of inferencing settings (from model_metadata.h) ei_printf("Inferencing settings:\n"); ei_printf("\tInterval: "); ei_printf_float((float)EI_CLASSIFIER_INTERVAL_MS); ei_printf(" ms.\n"); ei_printf("\tFrame size: %d\n", EI_CLASSIFIER_DSP_INPUT_FRAME_SIZE); ei_printf("\tSample length: %d ms.\n", EI_CLASSIFIER_RAW_SAMPLE_COUNT / 16); ei_printf("\tNo. of classes: %d\n", sizeof(ei_classifier_inferencing_categories) / sizeof(ei_classifier_inferencing_categories[0])); ei_printf("\nStarting continious inference in 2 seconds...\n"); pinMode(led, OUTPUT); pinMode(led_record, OUTPUT); digitalWrite(led, 1); ei_sleep(2000); digitalWrite(led, 0); if (microphone_inference_start(EI_CLASSIFIER_RAW_SAMPLE_COUNT) == false) { ei_printf("ERR: Could not allocate audio buffer (size %d), this could be due to the window length of your model\r\n", EI_CLASSIFIER_RAW_SAMPLE_COUNT); return; } ei_printf("Recording...\n"); xTaskNotifyGive(xCaptureTaskHandle); } /** * @brief Arduino main function. Runs the inferencing loop. */ float max_probability = 0; int max_probability_ix = 0; void loop() { bool m = microphone_inference_record(); if (!m) { ei_printf("ERR: Failed to record audio...\n"); return; } signal_t signal; signal.total_length = EI_CLASSIFIER_RAW_SAMPLE_COUNT; signal.get_data = µphone_audio_signal_get_data; ei_impulse_result_t result = {0}; EI_IMPULSE_ERROR r = run_classifier(&signal, &result, debug_nn); if (r != EI_IMPULSE_OK) { ei_printf("ERR: Failed to run classifier (%d)\n", r); return; } // print the predictions ei_printf("Predictions "); ei_printf("(DSP: %d ms., Classification: %d ms., Anomaly: %d ms.)", result.timing.dsp, result.timing.classification, result.timing.anomaly); ei_printf(": \n"); max_probability = 0; max_probability_ix = 0; for (size_t ix = 0; ix < EI_CLASSIFIER_LABEL_COUNT; ix++) { ei_printf(" %s: ", result.classification[ix].label); ei_printf_float(result.classification[ix].value); ei_printf("\n"); if (max_probability <= result.classification[ix].value) { max_probability = result.classification[ix].value; max_probability_ix = ix; } } if (strcmp(result.classification[max_probability_ix].label, "beihang") == 0) { ei_printf("北京航空航天大学\n"); digitalWrite(led, LOW); } else if (strcmp(result.classification[max_probability_ix].label, "shie") == 0) { ei_printf("士谔书院\n"); digitalWrite(led, HIGH); } else if (strcmp(result.classification[max_probability_ix].label, "hyn") == 0) { ei_printf("何玥凝\n"); digitalWrite(led, 0); } else if (strcmp(result.classification[max_probability_ix].label, "hsy") == 0) { ei_printf("黄诗莹\n"); digitalWrite(led, 0); } else { ei_printf("未检测到有效指令\n"); digitalWrite(led, LOW); } #if EI_CLASSIFIER_HAS_ANOMALY == 1 ei_printf(" anomaly score: "); ei_printf_float(result.anomaly); ei_printf("\n"); #endif xTaskNotifyGive(xCaptureTaskHandle); } static void audio_inference_callback(uint32_t n_bytes) { ei_printf("DATA "); for (int i = 0; i < n_bytes; i++) { inference.buffer[inference.buf_count++] = sampleBuffer[i]; if (inference.buf_count >= inference.n_samples) { inference.buf_count = 0; inference.buf_ready = 1; } // if(i > 8000 ) { ei_printf("%d ", sampleBuffer[i]); } } ei_printf("\n"); } unsigned long pre_time = 0; const byte sample_interval = 125; //采样时间间隔125us,即采样率8kHz const byte scale = 4; //幅度缩放因子 const int thresh = 400; int adcValue = 0; bool ADCfast(void) { static int zero = 580; int curSample = 0; static unsigned long prevTime; long val = 0; bool Collecting = false; unsigned long triggerTimeout = millis(); // 超时计时 while(!Collecting && (millis() - triggerTimeout < 2000)){ prevTime = micros(); adcValue = analogRead(AUDIO_IN); // 读取模拟信号 val = adcValue / scale; // 将ADC采样值进行尺度缩放 // 去除直流偏置值 if (val < zero) zero--; else zero++; val = val - zero; if (abs(val) > thresh){ Collecting = true; break; } while (micros() - prevTime < sample_interval); } for (curSample = 0; curSample < EI_CLASSIFIER_RAW_SAMPLE_COUNT; curSample++) { prevTime = micros(); adcValue = analogRead(AUDIO_IN); // 读取模拟信号 val = adcValue / scale; // 将ADC采样值进行尺度缩放 // 去除直流偏置值 if (val < zero) zero--; else zero++; val = val - zero; sampleBuffer[curSample] = val; while (micros() - prevTime < sample_interval); } return Collecting; } static void capture_samples(void *arg) { const int32_t bytes_to_read = (uint32_t)arg; size_t bytes_read = bytes_to_read; const int32_t threshold = 625; int threshold_cnt; int start_point; while (record_status) { ulTaskNotifyTake(pdTRUE, pdMS_TO_TICKS(5000)); ei_printf("----------------请说语音指令----------------\n"); digitalWrite(led_record, LOW); ADCfast(); digitalWrite(led_record, HIGH); if (record_status) { audio_inference_callback(EI_CLASSIFIER_RAW_SAMPLE_COUNT); } else { break; } vTaskDelay(pdMS_TO_TICKS(2000)); } vTaskDelete(NULL); } /** * @brief Init inferencing struct and setup/start PDM * @param [in] n_samples The n samples * @return { description_of_the_return_value } */ static bool microphone_inference_start(uint32_t n_samples) { inference.buffer = (int16_t *)malloc(n_samples * sizeof(int16_t)); if (inference.buffer == NULL) { return false; } inference.buf_count = 0; inference.n_samples = n_samples; inference.buf_ready = 0; ei_sleep(100); record_status = true; xTaskCreate(capture_samples, "CaptureSamples", 1024 * 32, (void *)(EI_CLASSIFIER_RAW_SAMPLE_COUNT * sizeof(sampleBuffer[0])), 10, &xCaptureTaskHandle); return true; } static bool microphone_inference_record(void) { bool ret = true; while (inference.buf_ready == 0) { delay(10); } inference.buf_ready = 0; return ret; } static int microphone_audio_signal_get_data(size_t offset, size_t length, float *out_ptr) { numpy::int16_to_float(&inference.buffer[offset], out_ptr, length); return 0; } static void microphone_inference_end(void) { ei_free(inference.buffer); } #if !defined(EI_CLASSIFIER_SENSOR) || EI_CLASSIFIER_SENSOR != EI_CLASSIFIER_SENSOR_MICROPHONE #error "Invalid model for current sensor." #endif 这个代码,我需要通过麦克风读入外界音频进行识别,但是实际上对于大多是人声,这个代码在串口监视器里面反馈的都是label何玥凝,其他标签反馈不出来,现在已知是模型精确度很高,达到97%

pangchenghe
  • 粉丝: 40
上传资源 快速赚钱