具体解释这一句 3555 bt_status_t status = do_in_main_thread( 3556 FROM_HERE, base::BindOnce(connection_task, peer_address, uuid));

时间: 2025-06-07 18:04:36 AIGC 浏览: 29
<think>好的,我需要详细解释用户提供的代码片段中的这两行: 3555 bt_status_t status = do_in_main_thread( 3556 FROM_HERE, base::BindOnce(connection_task, peer_address, uuid)); 首先,我需要理解用户的问题。用户可能是在学习Android蓝牙协议栈的代码,或者在进行相关开发时遇到了这段代码,需要了解其具体作用和实现机制。用户之前已经询问过整个代码块的含义,现在特别关注这两行的具体解释,因此需要深入讲解。 接下来,我要回忆相关的知识。do_in_main_thread通常是Android蓝牙协议栈中用于跨线程调用的函数,确保任务在主线程执行。base::BindOnce是Chromium/Google的base库中的函数,用于绑定参数并创建回调任务。FROM_HERE可能是一个宏,用于跟踪代码位置,方便调试。 我需要分步骤解析这两行代码: 1. base::BindOnce的作用:将connection_task函数和参数peer_address、uuid绑定,生成一个可调用的OnceCallback对象。BindOnce使用后,参数的所有权会被转移,意味着这个任务只会执行一次。 2. do_in_main_thread函数的作用:将生成的任务投递到主线程的消息队列中,确保任务在主线程执行。这对于需要访问主线程资源的操作(如UI操作或某些蓝牙协议栈内部状态)是必要的,避免多线程竞争。 3. FROM_HERE宏:通常用于记录调用位置,帮助调试时定位问题,比如在日志中显示是从哪个文件和哪一行投递的任务。 4. bt_status_t status:接收do_in_main_thread的返回值,表示任务是否成功投递到主线程。需要检查status的值来判断后续处理,比如是否重试或报错。 然后,我需要考虑用户可能的疑问点: - 为什么要用do_in_main_thread?可能涉及到线程安全,蓝牙协议栈某些操作必须在主线程执行。 - base::BindOnce和普通函数调用有什么区别?BindOnce允许延迟执行,并且可以绑定参数,方便跨线程传递任务。 - FROM_HERE的作用是什么?用户可能不太熟悉这个宏的具体用途,需要解释其调试和追踪的功能。 - status的可能返回值有哪些?比如BT_STATUS_SUCCESS、BT_STATUS_BUSY等,不同返回值代表不同的结果。 接下来,我需要验证自己的理解是否正确,确保没有技术错误。例如,确认do_in_main_thread确实是Android蓝牙模块中的函数,base::BindOnce的正确用法,以及线程调度的必要性。 然后,组织语言,确保解释清晰易懂,结构合理。可能需要分点说明每个部分的作用,并结合实际场景举例,比如为什么必须主线程执行,或者BindOnce如何避免内存泄漏。 最后,检查是否有遗漏的重要点,比如线程安全的重要性,或者蓝牙协议栈中其他类似的使用案例,帮助用户举一反三。</think>这段代码是Android蓝牙协议栈中典型的**跨线程任务调度**实现,主要涉及两个关键操作: ```cpp 3555 bt_status_t status = do_in_main_thread( 3556 FROM_HERE, base::BindOnce(connection_task, peer_address, uuid)); ``` ### 逐层解析 #### 1. `base::BindOnce` 的作用 - **功能**:创建一个一次性执行的闭包(closure),将`connection_task`与其参数绑定 - **参数绑定原理**: ```cpp // 原始lambda签名 auto connection_task = [](RawAddress*, uint16_t){...}; // BindOnce后的等效对象 struct BoundTask { RawAddress* captured_peer_address; uint16_t captured_uuid; void Run() { connection_task(captured_peer_address, captured_uuid); } }; ``` #### 2. `do_in_main_thread` 的线程调度 - **工作流程**: ```mermaid sequenceDiagram participant 当前线程 participant 主线程消息队列 participant 主线程 当前线程->>主线程消息队列: PostTask(BoundTask) 主线程消息队列->>主线程: 取出任务 主线程->>BoundTask: Run() ``` - **典型场景**: | 操作类型 | 必须主线程原因 | |---|---| | 蓝牙协议栈API调用 | 内部状态机非线程安全 | | HCI命令发送 | 硬件操作需串行化 | | GATT数据库访问 | 防止多线程竞争 | #### 3. `FROM_HERE` 宏的作用 - **调试支持**: ```cpp #define FROM_HERE \ ::base::Location::Create(__FILE__, __LINE__, __func__) ``` - **在日志中的表现**: ``` W/BtAv ( 3254): [bt_av.cc:3555] Task posted from main_thread.cc:42 ``` #### 4. 返回值`bt_status_t`的典型值 ```cpp typedef enum { BT_STATUS_SUCCESS = 0, BT_STATUS_FAIL, // 通用错误 BT_STATUS_NOT_READY, // 协议栈未初始化 BT_STATUS_NOMEM, // 内存不足 BT_STATUS_BUSY, // 队列已满 BT_STATUS_DONE // 任务已完成(同步执行时) } bt_status_t; ``` ### 设计原理深入 #### 线程安全实现 ```cpp // 伪代码展示线程检查 void do_in_main_thread(Location from, base::OnceClosure task) { if (pthread_equal(pthread_self(), main_thread_id)) { // 已在主线程直接执行 std::move(task).Run(); } else { // 跨线程投递 message_loop_.PostTask(from, std::move(task)); } } ``` #### 参数生命周期管理 - **地址指针处理**: ```cpp // 原始指针传递的风险控制 connection_task = [](RawAddress* addr, ...) { // 假设peer_address是在堆上分配且生命周期被管理 // 实际代码中可能使用智能指针: // std::unique_ptr<RawAddress> managed_addr(addr); }; ``` #### 性能优化点 - **任务队列优先级**: ```cpp enum TaskPriority { HIGHEST, // 连接/断连等关键操作 HIGH, // 数据传输 NORMAL // 状态查询 }; message_loop_.PostTaskWithPriority(FROM_HERE, std::move(task), HIGHEST); ``` ### 典型问题排查 #### 场景:任务未执行 1. 检查`status`返回值: - 若为`BT_STATUS_BUSY`,需检查消息队列容量 - 若为`BT_STATUS_NOT_READY`,确认协议栈初始化状态 2. 使用调试宏: ```cpp #define LOG_TASK_POST() ALOGI("Task posted from %s", FROM_HERE.ToString().c_str()) ``` 3. 线程死锁检测: ```cpp void CheckDeadlock() { if (message_loop_.GetPendingTaskCount() > 100) { ALOGW("Possible deadlock detected"); } } ``` 这段代码体现了Android蓝牙协议栈的三个核心设计理念: 1. **线程安全边界**:通过强制主线程执行保证关键操作的原子性 2. **资源高效管理**:使用`base::BindOnce`避免不必要的拷贝 3. **可追溯性**:`FROM_HERE`提供完整的调用链追踪能力
阅读全文

相关推荐

AnnotateLine# Scopes# Navigate#Raw Download current directory 1 /* 2 * Copyright 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "bt_bta_sd" 18 19 #include "bta/dm/bta_dm_device_search.h" 20 21 #include <base/functional/bind.h> 22 #include <base/strings/stringprintf.h> 23 #include <bluetooth/log.h> 24 #include <com_android_bluetooth_flags.h> 25 #include <stddef.h> 26 27 #include <cstdint> 28 #include <string> 29 #include <variant> 30 #include <vector> 31 32 #include "bta/dm/bta_dm_device_search_int.h" 33 #include "bta/dm/bta_dm_disc_legacy.h" 34 #include "bta/include/bta_gatt_api.h" 35 #include "bta/include/bta_sdp_api.h" 36 #include "btif/include/btif_config.h" 37 #include "com_android_bluetooth_flags.h" 38 #include "common/circular_buffer.h" 39 #include "common/init_flags.h" 40 #include "common/strings.h" 41 #include "device/include/interop.h" 42 #include "internal_include/bt_target.h" 43 #include "main/shim/dumpsys.h" 44 #include "os/logging/log_adapter.h" 45 #include "osi/include/allocator.h" 46 #include "stack/btm/btm_int_types.h" // TimestampedStringCircularBuffer 47 #include "stack/btm/neighbor_inquiry.h" 48 #include "stack/include/bt_dev_class.h" 49 #include "stack/include/bt_name.h" 50 #include "stack/include/bt_uuid16.h" 51 #include "stack/include/btm_client_interface.h" 52 #include "stack/include/btm_inq.h" 53 #include "stack/include/btm_log_history.h" 54 #include "stack/include/btm_sec_api.h" // BTM_IsRemoteNameKnown 55 #include "stack/include/gap_api.h" // GAP_BleReadPeerPrefConnParams 56 #include "stack/include/hidh_api.h" 57 #include "stack/include/main_thread.h" 58 #include "stack/include/sdp_status.h" 59 #include "stack/sdp/sdpint.h" // is_sdp_pbap_pce_disabled 60 #include "storage/config_keys.h" 61 #include "types/raw_address.h" 62 //#ifdef OPLUS_BUG_STABILITY 63 //[email protected], 2024/09/12 64 //Add for address mask output 65 #include "oplus_bt_util.h" 66 //#endif /* OPLUS_BUG_STABILITY */ 67 68 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 69 #include "btif/include/btif_dm.h" 70 //[email protected], 2021/03/09 71 //add for fast inquiry 72 #include "oplus_bt_stack_scan_manager.h" 73 extern tBTM_CB btm_cb; 74 #endif 75 76 #ifdef OPLUS_FEATURE_BT_HS_SDP 77 //[email protected], 2023/10/15 78 #include "oplus_bt_stack_sdp_manager.h" 79 #endif 80 81 //#ifdef OPLUS_BT_STACK_UNIFY 82 #include "oplus_btif_dm.h" 83 #include "oplus_btif_core.h" 84 //#endif /* OPLUS_BT_STACK_UNIFY */ 85 86 #include "common/include/hardware/oplus_vendor.h" /* OPLUS_BT_STACK_UNIFY */ 87 88 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 89 //[email protected] 90 #include "oplus_btif_fast_inquiry.h" 91 #include "main/shim/le_scanning_manager.h" 92 93 #ifdef OPLUS_FEATURE_BT_LE_AUDIO 94 #include "oplus_bt_interop.h" 95 #include "btm_ble_adv_data_processor.h" 96 #endif /* OPLUS_FEATURE_BT_LE_AUDIO */ 97 #endif 98 99 using namespace bluetooth; 100 101 namespace { 102 constexpr char kBtmLogTag[] = "DEV_SEARCH"; 103 104 tBTA_DM_SEARCH_CB bta_dm_search_cb; 105 } // namespace 106 107 //#ifdef OPLUS_BUG_COMPATIBILITY 108 //[email protected], 2024/07/22 109 tBTA_DM_SEARCH_CB* bta_dm_get_search_cb() { 110 return &bta_dm_search_cb; 111 } 112 //#endif /* OPLUS_BUG_COMPATIBILITY */ 113 114 static void bta_dm_inq_results_cb(tBTM_INQ_RESULTS* p_inq, const uint8_t* p_eir, 115 uint16_t eir_len); 116 static void bta_dm_inq_cmpl(); 117 static void bta_dm_inq_cmpl_cb(void* p_result); 118 static void bta_dm_search_cmpl(); 119 static void bta_dm_discover_next_device(void); 120 static void bta_dm_remname_cback(const tBTM_REMOTE_DEV_NAME* p); 121 122 static bool bta_dm_read_remote_device_name(const RawAddress& bd_addr, 123 tBT_TRANSPORT transport); 124 static void bta_dm_discover_name(const RawAddress& remote_bd_addr); 125 static void bta_dm_execute_queued_search_request(); 126 static void bta_dm_search_cancel_notify(); 127 static void bta_dm_disable_search(); 128 129 static void bta_dm_search_sm_execute(tBTA_DM_DEV_SEARCH_EVT event, 130 std::unique_ptr<tBTA_DM_SEARCH_MSG> msg); 131 static void bta_dm_observe_results_cb(tBTM_INQ_RESULTS* p_inq, 132 const uint8_t* p_eir, uint16_t eir_len); 133 static void bta_dm_observe_cmpl_cb(void* p_result); 134 135 static void bta_dm_search_set_state(tBTA_DM_DEVICE_SEARCH_STATE state) { 136 bta_dm_search_cb.search_state = state; 137 } 138 static tBTA_DM_DEVICE_SEARCH_STATE bta_dm_search_get_state() { 139 return bta_dm_search_cb.search_state; 140 } 141 142 static void post_search_evt(tBTA_DM_DEV_SEARCH_EVT event, 143 std::unique_ptr<tBTA_DM_SEARCH_MSG> msg) { 144 if (do_in_main_thread(FROM_HERE, base::BindOnce(&bta_dm_search_sm_execute, 145 event, std::move(msg))) != 146 BT_STATUS_SUCCESS) { 147 log::error("post_search_evt failed"); 148 } 149 } 150 151 void bta_dm_disc_disable_search() { 152 if (!com::android::bluetooth::flags:: 153 separate_service_and_device_discovery()) { 154 log::info("no-op when flag is disabled"); 155 return; 156 } 157 bta_dm_disable_search(); 158 } 159 160 /******************************************************************************* 161 * 162 * Function bta_dm_search_start 163 * 164 * Description Starts an inquiry 165 * 166 * 167 * Returns void 168 * 169 ******************************************************************************/ 170 static void bta_dm_search_start(tBTA_DM_API_SEARCH& search) { 171 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 172 //[email protected] 173 if(get_fast_act_interface()->enable_inquiry_optimize()) { 174 bluetooth::shim::initAddressCacheExt(); 175 } 176 #endif 177 178 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 179 //[email protected], 2021/03/09 180 if (btm_cb.btm_inq_vars.inq_num == 0) { 181 get_btm_client_interface().db.BTM_ClearInqDb(nullptr); 182 } 183 #else /* OPLUS_FEATURE_BT_HS_INQUIRY */ 184 if (get_btm_client_interface().db.BTM_ClearInqDb(nullptr) != BTM_SUCCESS) { 185 log::warn("Unable to clear inquiry database for device discovery"); 186 } 187 #endif/*OPLUS_FEATURE_BT_HS_INQUIRY*/ 188 /* save search params */ 189 bta_dm_search_cb.p_device_search_cback = search.p_cback; 190 191 const tBTM_STATUS btm_status = 192 BTM_StartInquiry(bta_dm_inq_results_cb, bta_dm_inq_cmpl_cb); 193 switch (btm_status) { 194 case BTM_CMD_STARTED: 195 // Completion callback will be executed when controller inquiry 196 // timer pops or is cancelled by the user 197 break; 198 default: 199 log::warn("Unable to start device discovery search btm_status:{}", 200 btm_status_text(btm_status)); 201 // Not started so completion callback is executed now 202 bta_dm_inq_cmpl(); 203 break; 204 } 205 } 206 207 /******************************************************************************* 208 * 209 * Function bta_dm_search_cancel 210 * 211 * Description Cancels an ongoing search for devices 212 * 213 * 214 * Returns void 215 * 216 ******************************************************************************/ 217 static void bta_dm_search_cancel() { 218 if (BTM_IsInquiryActive()) { 219 BTM_CancelInquiry(); 220 bta_dm_search_cancel_notify(); 221 bta_dm_search_cmpl(); 222 } 223 /* If no Service Search going on then issue cancel remote name in case it is 224 active */ 225 else if (!bta_dm_search_cb.name_discover_done) { 226 if (get_btm_client_interface().peer.BTM_CancelRemoteDeviceName() != 227 BTM_CMD_STARTED) { 228 log::warn("Unable to cancel RNR"); 229 } 230 /* bta_dm_search_cmpl is called when receiving the remote name cancel evt */ 231 if (!com::android::bluetooth::flags:: 232 bta_dm_defer_device_discovery_state_change_until_rnr_complete()) { 233 bta_dm_search_cmpl(); 234 } 235 } else { 236 bta_dm_inq_cmpl(); 237 } 238 } 239 240 /******************************************************************************* 241 * 242 * Function bta_dm_inq_cmpl_cb 243 * 244 * Description Inquiry complete callback from BTM 245 * 246 * Returns void 247 * 248 ******************************************************************************/ 249 static void bta_dm_inq_cmpl_cb(void* /* p_result */) { 250 log::verbose(""); 251 252 bta_dm_inq_cmpl(); 253 } 254 255 /******************************************************************************* 256 * 257 * Function bta_dm_inq_results_cb 258 * 259 * Description Inquiry results callback from BTM 260 * 261 * Returns void 262 * 263 ******************************************************************************/ 264 static void bta_dm_inq_results_cb(tBTM_INQ_RESULTS* p_inq, const uint8_t* p_eir, 265 uint16_t eir_len) { 266 tBTA_DM_SEARCH result; 267 tBTM_INQ_INFO* p_inq_info; 268 uint16_t service_class; 269 270 result.inq_res.bd_addr = p_inq->remote_bd_addr; 271 272 // Pass the original address to GattService#onScanResult 273 result.inq_res.original_bda = p_inq->original_bda; 274 275 result.inq_res.dev_class = p_inq->dev_class; 276 BTM_COD_SERVICE_CLASS(service_class, p_inq->dev_class); 277 result.inq_res.is_limited = 278 (service_class & BTM_COD_SERVICE_LMTD_DISCOVER) ? true : false; 279 result.inq_res.rssi = p_inq->rssi; 280 281 result.inq_res.ble_addr_type = p_inq->ble_addr_type; 282 result.inq_res.inq_result_type = p_inq->inq_result_type; 283 result.inq_res.device_type = p_inq->device_type; 284 result.inq_res.flag = p_inq->flag; 285 result.inq_res.include_rsi = p_inq->include_rsi; 286 result.inq_res.clock_offset = p_inq->clock_offset; 287 288 /* application will parse EIR to find out remote device name */ 289 result.inq_res.p_eir = const_cast<uint8_t*>(p_eir); 290 result.inq_res.eir_len = eir_len; 291 292 result.inq_res.ble_evt_type = p_inq->ble_evt_type; 293 294 p_inq_info = 295 get_btm_client_interface().db.BTM_InqDbRead(p_inq->remote_bd_addr); 296 if (p_inq_info != NULL) { 297 /* initialize remt_name_not_required to false so that we get the name by 298 * default */ 299 result.inq_res.remt_name_not_required = false; 300 } 301 302 if (bta_dm_search_cb.p_device_search_cback) 303 bta_dm_search_cb.p_device_search_cback(BTA_DM_INQ_RES_EVT, &result); 304 305 if (p_inq_info) { 306 /* application indicates if it knows the remote name, inside the callback 307 copy that to the inquiry data base*/ 308 if (result.inq_res.remt_name_not_required) 309 p_inq_info->appl_knows_rem_name = true; 310 } 311 } 312 313 /******************************************************************************* 314 * 315 * Function bta_dm_remname_cback 316 * 317 * Description Remote name complete call back from BTM 318 * 319 * Returns void 320 * 321 ******************************************************************************/ 322 static void bta_dm_remname_cback(const tBTM_REMOTE_DEV_NAME* p_remote_name) { 323 log::assert_that(p_remote_name != nullptr, 324 "assert failed: p_remote_name != nullptr"); 325 326 log::info( 327 "Remote name request complete peer:{} btm_status:{} hci_status:{} " 328 "name[0]:{:c} length:{}", 329 p_remote_name->bd_addr, btm_status_text(p_remote_name->status), 330 hci_error_code_text(p_remote_name->hci_status), 331 p_remote_name->remote_bd_name[0], 332 strnlen((const char*)p_remote_name->remote_bd_name, BD_NAME_LEN)); 333 334 if (bta_dm_search_cb.peer_bdaddr != p_remote_name->bd_addr) { 335 // if we got a different response, maybe ignore it 336 // we will have made a request directly from BTM_ReadRemoteDeviceName so we 337 // expect a dedicated response for us 338 if (p_remote_name->hci_status == HCI_ERR_CONNECTION_EXISTS) { 339 log::info( 340 "Assume command failed due to disconnection hci_status:{} peer:{}", 341 hci_error_code_text(p_remote_name->hci_status), 342 p_remote_name->bd_addr); 343 } else { 344 log::info( 345 "Ignored remote name response for the wrong address exp:{} act:{}", 346 bta_dm_search_cb.peer_bdaddr, p_remote_name->bd_addr); 347 return; 348 } 349 } 350 351 /* remote name discovery is done but it could be failed */ 352 bta_dm_search_cb.name_discover_done = true; 353 bd_name_copy(bta_dm_search_cb.peer_name, p_remote_name->remote_bd_name); 354 355 auto msg = std::make_unique<tBTA_DM_SEARCH_MSG>(tBTA_DM_REMOTE_NAME{}); 356 auto& rmt_name_msg = std::get<tBTA_DM_REMOTE_NAME>(*msg); 357 rmt_name_msg.bd_addr = bta_dm_search_cb.peer_bdaddr; 358 rmt_name_msg.hci_status = p_remote_name->hci_status; 359 bd_name_copy(rmt_name_msg.bd_name, p_remote_name->remote_bd_name); 360 361 post_search_evt(BTA_DM_REMT_NAME_EVT, std::move(msg)); 362 } 363 364 /******************************************************************************* 365 * 366 * Function bta_dm_read_remote_device_name 367 * 368 * Description Initiate to get remote device name 369 * 370 * Returns true if started to get remote name 371 * 372 ******************************************************************************/ 373 static bool bta_dm_read_remote_device_name(const RawAddress& bd_addr, 374 tBT_TRANSPORT transport) { 375 tBTM_STATUS btm_status; 376 377 log::verbose(""); 378 379 bta_dm_search_cb.peer_bdaddr = bd_addr; 380 bta_dm_search_cb.peer_name[0] = 0; 381 382 btm_status = get_btm_client_interface().peer.BTM_ReadRemoteDeviceName( 383 bta_dm_search_cb.peer_bdaddr, bta_dm_remname_cback, transport); 384 385 if (btm_status == BTM_CMD_STARTED) { 386 log::verbose("BTM_ReadRemoteDeviceName is started"); 387 388 return (true); 389 } else if (btm_status == BTM_BUSY) { 390 log::verbose("BTM_ReadRemoteDeviceName is busy"); 391 392 return (true); 393 } else { 394 log::warn("BTM_ReadRemoteDeviceName returns 0x{:02X}", btm_status); 395 396 return (false); 397 } 398 } 399 400 /******************************************************************************* 401 * 402 * Function bta_dm_inq_cmpl 403 * 404 * Description Process the inquiry complete event from BTM 405 * 406 * Returns void 407 * 408 ******************************************************************************/ 409 static void bta_dm_inq_cmpl() { 410 if (bta_dm_search_get_state() == BTA_DM_SEARCH_CANCELLING) { 411 bta_dm_search_set_state(BTA_DM_SEARCH_IDLE); 412 bta_dm_execute_queued_search_request(); 413 return; 414 } 415 416 if (bta_dm_search_get_state() != BTA_DM_SEARCH_ACTIVE) { 417 return; 418 } 419 420 log::verbose("bta_dm_inq_cmpl"); 421 422 bta_dm_search_cb.p_btm_inq_info = 423 get_btm_client_interface().db.BTM_InqDbFirst(); 424 if (bta_dm_search_cb.p_btm_inq_info != NULL) { 425 /* start name discovery from the first device on inquiry result 426 */ 427 bta_dm_search_cb.name_discover_done = false; 428 bta_dm_search_cb.peer_name[0] = 0; 429 bta_dm_discover_name( 430 bta_dm_search_cb.p_btm_inq_info->results.remote_bd_addr); 431 } else { 432 bta_dm_search_cmpl(); 433 } 434 } 435 436 static void bta_dm_remote_name_cmpl( 437 const tBTA_DM_REMOTE_NAME& remote_name_msg) { 438 BTM_LogHistory(kBtmLogTag, remote_name_msg.bd_addr, "Remote name completed", 439 base::StringPrintf( 440 "status:%s state:%s name:\"%s\"", 441 hci_status_code_text(remote_name_msg.hci_status).c_str(), 442 bta_dm_state_text(bta_dm_search_get_state()).c_str(), 443 PRIVATE_NAME(remote_name_msg.bd_name))); 444 445 tBTM_INQ_INFO* p_btm_inq_info = 446 get_btm_client_interface().db.BTM_InqDbRead(remote_name_msg.bd_addr); 447 if (!bd_name_is_empty(remote_name_msg.bd_name) && p_btm_inq_info) { 448 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 449 //[email protected], 2021/03/09 450 //If the Bluetooth device used as RNR has no name before, the address and name information of the device 451 //will be cached to the local file 452 std::string namestr((char*)remote_name_msg.bd_name); 453 log::debug("bta_dm_rmt_name: {},inq_num: {}", 454 obfuscate_name_string(namestr), 455 btm_cb.btm_inq_vars.inq_num); 456 oplus_save_rnr_info_to_file(remote_name_msg.bd_addr,(char*)remote_name_msg.bd_name, 457 bta_dm_search_cb.p_btm_inq_info->appl_knows_rem_name); 458 #endif/*OPLUS_FEATURE_BT_HS_INQUIRY*/ 459 p_btm_inq_info->appl_knows_rem_name = true; 460 } 461 462 // Callback with this property 463 if (bta_dm_search_cb.p_device_search_cback != nullptr) { 464 tBTA_DM_SEARCH search_data = { 465 .name_res = {.bd_addr = remote_name_msg.bd_addr, .bd_name = {}}, 466 }; 467 if (remote_name_msg.hci_status == HCI_SUCCESS) { 468 bd_name_copy(search_data.name_res.bd_name, remote_name_msg.bd_name); 469 } 470 bta_dm_search_cb.p_device_search_cback(BTA_DM_NAME_READ_EVT, &search_data); 471 } else { 472 log::warn("Received remote name complete without callback"); 473 } 474 475 switch (bta_dm_search_get_state()) { 476 case BTA_DM_SEARCH_ACTIVE: 477 bta_dm_discover_name(bta_dm_search_cb.peer_bdaddr); 478 break; 479 case BTA_DM_SEARCH_IDLE: 480 case BTA_DM_SEARCH_CANCELLING: 481 log::warn("Received remote name request in state:{}", 482 bta_dm_state_text(bta_dm_search_get_state())); 483 break; 484 } 485 } 486 487 static void bta_dm_search_cmpl() { 488 bta_dm_search_set_state(BTA_DM_SEARCH_IDLE); 489 490 if (bta_dm_search_cb.p_device_search_cback) { 491 bta_dm_search_cb.p_device_search_cback(BTA_DM_DISC_CMPL_EVT, nullptr); 492 } 493 494 bta_dm_execute_queued_search_request(); 495 } 496 497 static void bta_dm_execute_queued_search_request() { 498 if (!bta_dm_search_cb.p_pending_search) return; 499 500 log::info("Start pending search"); 501 post_search_evt(BTA_DM_API_SEARCH_EVT, 502 std::move(bta_dm_search_cb.p_pending_search)); 503 bta_dm_search_cb.p_pending_search.reset(); 504 } 505 506 /******************************************************************************* 507 * 508 * Function bta_dm_search_clear_queue 509 * 510 * Description Clears the queue if API search cancel is called 511 * 512 * Returns void 513 * 514 ******************************************************************************/ 515 static void bta_dm_search_clear_queue() { 516 bta_dm_search_cb.p_pending_search.reset(); 517 } 518 519 /******************************************************************************* 520 * 521 * Function bta_dm_search_cancel_notify 522 * 523 * Description Notify application that search has been cancelled 524 * 525 * Returns void 526 * 527 ******************************************************************************/ 528 static void bta_dm_search_cancel_notify() { 529 if (bta_dm_search_cb.p_device_search_cback) { 530 bta_dm_search_cb.p_device_search_cback(BTA_DM_SEARCH_CANCEL_CMPL_EVT, NULL); 531 } 532 switch (bta_dm_search_get_state()) { 533 case BTA_DM_SEARCH_ACTIVE: 534 case BTA_DM_SEARCH_CANCELLING: 535 if (!bta_dm_search_cb.name_discover_done) { 536 if (get_btm_client_interface().peer.BTM_CancelRemoteDeviceName() != 537 BTM_CMD_STARTED) { 538 log::warn("Unable to cancel RNR"); 539 } 540 } 541 break; 542 case BTA_DM_SEARCH_IDLE: 543 // Nothing to do 544 break; 545 } 546 } 547 548 /******************************************************************************* 549 * 550 * Function bta_dm_discover_next_device 551 * 552 * Description Starts discovery on the next device in Inquiry data base 553 * 554 * Returns void 555 * 556 ******************************************************************************/ 557 static void bta_dm_discover_next_device(void) { 558 log::verbose("bta_dm_discover_next_device"); 559 560 /* searching next device on inquiry result */ 561 bta_dm_search_cb.p_btm_inq_info = get_btm_client_interface().db.BTM_InqDbNext( 562 bta_dm_search_cb.p_btm_inq_info); 563 if (bta_dm_search_cb.p_btm_inq_info != NULL) { 564 bta_dm_search_cb.name_discover_done = false; 565 bta_dm_search_cb.peer_name[0] = 0; 566 bta_dm_discover_name( 567 bta_dm_search_cb.p_btm_inq_info->results.remote_bd_addr); 568 } else { 569 post_search_evt(BTA_DM_SEARCH_CMPL_EVT, nullptr); 570 } 571 } 572 573 /*TODO: this function is duplicated, make it common ?*/ 574 static tBT_TRANSPORT bta_dm_determine_discovery_transport( 575 const RawAddress& remote_bd_addr) { 576 tBT_DEVICE_TYPE dev_type; 577 tBLE_ADDR_TYPE addr_type; 578 579 get_btm_client_interface().peer.BTM_ReadDevInfo(remote_bd_addr, &dev_type, 580 &addr_type); 581 if (dev_type == BT_DEVICE_TYPE_BLE || addr_type == BLE_ADDR_RANDOM) { 582 return BT_TRANSPORT_LE; 583 } else if (dev_type == BT_DEVICE_TYPE_DUMO) { 584 if (get_btm_client_interface().peer.BTM_IsAclConnectionUp( 585 remote_bd_addr, BT_TRANSPORT_BR_EDR)) { 586 return BT_TRANSPORT_BR_EDR; 587 } else if (get_btm_client_interface().peer.BTM_IsAclConnectionUp( 588 remote_bd_addr, BT_TRANSPORT_LE)) { 589 return BT_TRANSPORT_LE; 590 } 591 } 592 return BT_TRANSPORT_BR_EDR; 593 } 594 595 static void bta_dm_discover_name(const RawAddress& remote_bd_addr) { 596 const tBT_TRANSPORT transport = 597 bta_dm_determine_discovery_transport(remote_bd_addr); 598 599 log::verbose("BDA: {}", remote_bd_addr); 600 601 bta_dm_search_cb.peer_bdaddr = remote_bd_addr; 602 603 log::verbose( 604 "name_discover_done = {} p_btm_inq_info 0x{} state = {}, transport={}", 605 bta_dm_search_cb.name_discover_done, 606 fmt::ptr(bta_dm_search_cb.p_btm_inq_info), bta_dm_search_get_state(), 607 transport); 608 609 if (bta_dm_search_cb.p_btm_inq_info) { 610 log::verbose("appl_knows_rem_name {}", 611 bta_dm_search_cb.p_btm_inq_info->appl_knows_rem_name); 612 } 613 /** M: Avoid initiate RNR to LE transport. @{ */ 614 if (((bta_dm_search_cb.p_btm_inq_info) && 615 (bta_dm_search_cb.p_btm_inq_info->results.device_type == 616 BT_DEVICE_TYPE_BLE) && 617 #ifndef OPLUS_FEATURE_BT_LE_AUDIO 618 (bta_dm_search_get_state() == BTA_DM_SEARCH_ACTIVE)) || 619 transport == BT_TRANSPORT_LE && 620 interop_match_addr(INTEROP_DISABLE_NAME_REQUEST, 621 &bta_dm_search_cb.peer_bdaddr))) { 622 #else 623 (bta_dm_search_get_state() == BTA_DM_SEARCH_ACTIVE)) || 624 (transport == BT_TRANSPORT_LE && (!is_device_le_audio_capable(remote_bd_addr)))) { 625 #endif /* OPLUS_FEATURE_BT_LE_AUDIO */ 626 /** @} */ 627 /* Do not perform RNR for LE devices at inquiry complete*/ 628 bta_dm_search_cb.name_discover_done = true; 629 } 630 631 #ifdef OPLUS_FEATURE_BT_HS_INQUIRY 632 //[email protected], 2021/03/09 633 if(bta_dm_search_cb.p_btm_inq_info) { 634 bta_dm_search_cb.name_discover_done = oplus_check_remote_device_not_need_rnr( 635 //#ifdef OPLUS_FEATURE_BT_LE_AUDIO 636 //[email protected], add acl exist, if exist to read remote name for BLE 637 remote_bd_addr, btm_cb.btm_inq_vars.inq_num, bta_dm_search_cb.p_btm_inq_info->results.rssi); 638 //#endif /*OPLUS_FEATURE_BT_LE_AUDIO*/ 639 if(!bta_dm_search_cb.name_discover_done) { 640 LOG_VERBOSE("%s rssi: %d ,inq_num: %d", __func__, 641 bta_dm_search_cb.p_btm_inq_info->results.rssi, 642 btm_cb.btm_inq_vars.inq_num); 643 } 644 } 645 #endif /*OPLUS_FEATURE_BT_HS_INQUIRY*/ 646 647 // If we already have the name we can skip getting the name 648 if (BTM_IsRemoteNameKnown(remote_bd_addr, transport) && 649 bluetooth::common::init_flags::sdp_skip_rnr_if_known_is_enabled()) { 650 log::debug( 651 "Security record already known skipping read remote name peer:{}", 652 remote_bd_addr); 653 bta_dm_search_cb.name_discover_done = true; 654 } 655 656 /* if name discovery is not done and application needs remote name */ 657 if ((!bta_dm_search_cb.name_discover_done) && 658 ((bta_dm_search_cb.p_btm_inq_info == NULL) || 659 (bta_dm_search_cb.p_btm_inq_info && 660 (!bta_dm_search_cb.p_btm_inq_info->appl_knows_rem_name)))) { 661 if (bta_dm_read_remote_device_name(bta_dm_search_cb.peer_bdaddr, 662 transport)) { 663 BTM_LogHistory(kBtmLogTag, bta_dm_search_cb.peer_bdaddr, 664 "Read remote name", 665 base::StringPrintf("Transport:%s", 666 bt_transport_text(transport).c_str())); 667 return; 668 } else { 669 log::error("Unable to start read remote device name"); 670 } 671 672 /* starting name discovery failed */ 673 bta_dm_search_cb.name_discover_done = true; 674 } 675 676 /* name discovery is done for this device */ 677 if (bta_dm_search_get_state() == BTA_DM_SEARCH_ACTIVE) { 678 // if p_btm_inq_info is nullptr, there is no more inquiry results to 679 // discover name for 680 if (bta_dm_search_cb.p_btm_inq_info) { 681 bta_dm_discover_next_device(); 682 } else { 683 log::info("end of parsing inquiry result"); 684 } 685 } else { 686 log::info("name discovery finished in bad state: {}", 687 bta_dm_state_text(bta_dm_search_get_state())); 688 } 689 } 690 691 /******************************************************************************* 692 * 693 * Function bta_dm_is_search_request_queued 694 * 695 * Description Checks if there is a queued search request 696 * 697 * Returns bool 698 * 699 ******************************************************************************/ 700 bool bta_dm_is_search_request_queued() { 701 if (!com::android::bluetooth::flags:: 702 separate_service_and_device_discovery()) { 703 return bta_dm_disc_legacy::bta_dm_is_search_request_queued(); 704 } 705 return bta_dm_search_cb.p_pending_search != NULL; 706 } 707 708 /******************************************************************************* 709 * 710 * Function bta_dm_queue_search 711 * 712 * Description Queues search command 713 * 714 * Returns void 715 * 716 ******************************************************************************/ 717 static void bta_dm_queue_search(tBTA_DM_API_SEARCH& search) { 718 if (bta_dm_search_cb.p_pending_search) { 719 log::warn("Overwrote previous device discovery inquiry scan request"); 720 } 721 bta_dm_search_cb.p_pending_search.reset(new tBTA_DM_SEARCH_MSG(search)); 722 log::info("Queued device discovery inquiry scan request"); 723 } 724 725 /******************************************************************************* 726 * 727 * Function bta_dm_observe_results_cb 728 * 729 * Description Callback for BLE Observe result 730 * 731 * 732 * Returns void 733 * 734 ******************************************************************************/ 735 static void bta_dm_observe_results_cb(tBTM_INQ_RESULTS* p_inq, 736 const uint8_t* p_eir, uint16_t eir_len) { 737 tBTA_DM_SEARCH result; 738 tBTM_INQ_INFO* p_inq_info; 739 log::verbose("bta_dm_observe_results_cb"); 740 741 result.inq_res.bd_addr = p_inq->remote_bd_addr; 742 result.inq_res.original_bda = p_inq->original_bda; 743 result.inq_res.rssi = p_inq->rssi; 744 result.inq_res.ble_addr_type = p_inq->ble_addr_type; 745 result.inq_res.inq_result_type = p_inq->inq_result_type; 746 result.inq_res.device_type = p_inq->device_type; 747 result.inq_res.flag = p_inq->flag; 748 result.inq_res.ble_evt_type = p_inq->ble_evt_type; 749 result.inq_res.ble_primary_phy = p_inq->ble_primary_phy; 750 result.inq_res.ble_secondary_phy = p_inq->ble_secondary_phy; 751 result.inq_res.ble_advertising_sid = p_inq->ble_advertising_sid; 752 result.inq_res.ble_tx_power = p_inq->ble_tx_power; 753 result.inq_res.ble_periodic_adv_int = p_inq->ble_periodic_adv_int; 754 755 /* application will parse EIR to find out remote device name */ 756 result.inq_res.p_eir = const_cast<uint8_t*>(p_eir); 757 result.inq_res.eir_len = eir_len; 758 759 p_inq_info = 760 get_btm_client_interface().db.BTM_InqDbRead(p_inq->remote_bd_addr); 761 if (p_inq_info != NULL) { 762 /* initialize remt_name_not_required to false so that we get the name by 763 * default */ 764 result.inq_res.remt_name_not_required = false; 765 } 766 767 if (p_inq_info) { 768 /* application indicates if it knows the remote name, inside the callback 769 copy that to the inquiry data base*/ 770 if (result.inq_res.remt_name_not_required) 771 p_inq_info->appl_knows_rem_name = true; 772 } 773 } 774 775 /******************************************************************************* 776 * 777 * Function bta_dm_opportunistic_observe_results_cb 778 * 779 * Description Callback for BLE Observe result 780 * 781 * 782 * Returns void 783 * 784 ******************************************************************************/ 785 static void bta_dm_opportunistic_observe_results_cb(tBTM_INQ_RESULTS* p_inq, 786 const uint8_t* p_eir, 787 uint16_t eir_len) { 788 tBTA_DM_SEARCH result; 789 tBTM_INQ_INFO* p_inq_info; 790 791 result.inq_res.bd_addr = p_inq->remote_bd_addr; 792 result.inq_res.rssi = p_inq->rssi; 793 result.inq_res.ble_addr_type = p_inq->ble_addr_type; 794 result.inq_res.inq_result_type = p_inq->inq_result_type; 795 result.inq_res.device_type = p_inq->device_type; 796 result.inq_res.flag = p_inq->flag; 797 result.inq_res.ble_evt_type = p_inq->ble_evt_type; 798 result.inq_res.ble_primary_phy = p_inq->ble_primary_phy; 799 result.inq_res.ble_secondary_phy = p_inq->ble_secondary_phy; 800 result.inq_res.ble_advertising_sid = p_inq->ble_advertising_sid; 801 result.inq_res.ble_tx_power = p_inq->ble_tx_power; 802 result.inq_res.ble_periodic_adv_int = p_inq->ble_periodic_adv_int; 803 804 /* application will parse EIR to find out remote device name */ 805 result.inq_res.p_eir = const_cast<uint8_t*>(p_eir); 806 result.inq_res.eir_len = eir_len; 807 808 p_inq_info = 809 get_btm_client_interface().db.BTM_InqDbRead(p_inq->remote_bd_addr); 810 if (p_inq_info != NULL) { 811 /* initialize remt_name_not_required to false so that we get the name by 812 * default */ 813 result.inq_res.remt_name_not_required = false; 814 } 815 816 if (bta_dm_search_cb.p_csis_scan_cback) 817 bta_dm_search_cb.p_csis_scan_cback(BTA_DM_INQ_RES_EVT, &result); 818 819 if (p_inq_info) { 820 /* application indicates if it knows the remote name, inside the callback 821 copy that to the inquiry data base*/ 822 if (result.inq_res.remt_name_not_required) 823 p_inq_info->appl_knows_rem_name = true; 824 } 825 } 826 827 /******************************************************************************* 828 * 829 * Function bta_dm_observe_cmpl_cb 830 * 831 * Description Callback for BLE Observe complete 832 * 833 * 834 * Returns void 835 * 836 ******************************************************************************/ 837 static void bta_dm_observe_cmpl_cb(void* p_result) { 838 log::verbose("bta_dm_observe_cmpl_cb"); 839 840 if (bta_dm_search_cb.p_csis_scan_cback) { 841 auto num_resps = ((tBTM_INQUIRY_CMPL*)p_result)->num_resp; 842 tBTA_DM_SEARCH data{.observe_cmpl{.num_resps = num_resps}}; 843 bta_dm_search_cb.p_csis_scan_cback(BTA_DM_OBSERVE_CMPL_EVT, &data); 844 } 845 } 846 847 static void bta_dm_start_scan(uint8_t duration_sec, 848 bool low_latency_scan = false) { 849 tBTM_STATUS status = get_btm_client_interface().ble.BTM_BleObserve( 850 true, duration_sec, bta_dm_observe_results_cb, bta_dm_observe_cmpl_cb, 851 low_latency_scan); 852 853 if (status != BTM_CMD_STARTED) { 854 log::warn("BTM_BleObserve failed. status {}", status); 855 if (bta_dm_search_cb.p_csis_scan_cback) { 856 tBTA_DM_SEARCH data{.observe_cmpl = {.num_resps = 0}}; 857 bta_dm_search_cb.p_csis_scan_cback(BTA_DM_OBSERVE_CMPL_EVT, &data); 858 } 859 } 860 } 861 862 void bta_dm_ble_scan(bool start, uint8_t duration_sec, 863 bool low_latency_scan = false) { 864 if (!start) { 865 if (get_btm_client_interface().ble.BTM_BleObserve( 866 false, 0, NULL, NULL, false) != BTM_CMD_STARTED) { 867 log::warn("Unable to start ble observe"); 868 } 869 return; 870 } 871 872 bta_dm_start_scan(duration_sec, low_latency_scan); 873 } 874 875 void bta_dm_ble_csis_observe(bool observe, tBTA_DM_SEARCH_CBACK* p_cback) { 876 if (!observe) { 877 bta_dm_search_cb.p_csis_scan_cback = NULL; 878 BTM_BleOpportunisticObserve(false, NULL); 879 return; 880 } 881 882 /* Save the callback to be called when a scan results are available */ 883 bta_dm_search_cb.p_csis_scan_cback = p_cback; 884 BTM_BleOpportunisticObserve(true, bta_dm_opportunistic_observe_results_cb); 885 } 886 887 namespace bluetooth { 888 namespace legacy { 889 namespace testing { 890 891 void bta_dm_remname_cback(const tBTM_REMOTE_DEV_NAME* p) { 892 ::bta_dm_remname_cback(p); 893 } 894 895 void bta_dm_remote_name_cmpl(const tBTA_DM_REMOTE_NAME& remote_name_msg) { 896 ::bta_dm_remote_name_cmpl(remote_name_msg); 897 } 898 899 } // namespace testing 900 } // namespace legacy 901 } // namespace bluetooth 902 903 namespace { 904 constexpr size_t kSearchStateHistorySize = 50; 905 constexpr char kTimeFormatString[] = "%Y-%m-%d %H:%M:%S"; 906 907 constexpr unsigned MillisPerSecond = 1000; 908 std::string EpochMillisToString(long long time_ms) { 909 time_t time_sec = time_ms / MillisPerSecond; 910 struct tm tm; 911 localtime_r(&time_sec, &tm); 912 std::string s = bluetooth::common::StringFormatTime(kTimeFormatString, tm); 913 return base::StringPrintf( 914 "%s.%03u", s.c_str(), 915 static_cast<unsigned int>(time_ms % MillisPerSecond)); 916 } 917 918 } // namespace 919 920 struct tSEARCH_STATE_HISTORY { 921 const tBTA_DM_DEVICE_SEARCH_STATE state; 922 const tBTA_DM_DEV_SEARCH_EVT event; 923 std::string ToString() const { 924 return base::StringPrintf("state:%25s event:%s", 925 bta_dm_state_text(state).c_str(), 926 bta_dm_event_text(event).c_str()); 927 } 928 }; 929 930 bluetooth::common::TimestampedCircularBuffer<tSEARCH_STATE_HISTORY> 931 search_state_history_(kSearchStateHistorySize); 932 933 /******************************************************************************* 934 * 935 * Function bta_dm_search_sm_execute 936 * 937 * Description State machine event handling function for DM 938 * 939 * 940 * Returns void 941 * 942 ******************************************************************************/ 943 static void bta_dm_search_sm_execute(tBTA_DM_DEV_SEARCH_EVT event, 944 std::unique_ptr<tBTA_DM_SEARCH_MSG> msg) { 945 log::info("state:{}, event:{}[0x{:x}]", 946 bta_dm_state_text(bta_dm_search_get_state()), 947 bta_dm_event_text(event), event); 948 search_state_history_.Push({ 949 .state = bta_dm_search_get_state(), 950 .event = event, 951 }); 952 953 switch (bta_dm_search_get_state()) { 954 case BTA_DM_SEARCH_IDLE: 955 switch (event) { 956 case BTA_DM_API_SEARCH_EVT: 957 bta_dm_search_set_state(BTA_DM_SEARCH_ACTIVE); 958 log::assert_that(std::holds_alternative<tBTA_DM_API_SEARCH>(*msg), 959 "bad message type: {}", msg->index()); 960 961 bta_dm_search_start(std::get<tBTA_DM_API_SEARCH>(*msg)); 962 break; 963 case BTA_DM_API_SEARCH_CANCEL_EVT: 964 bta_dm_search_clear_queue(); 965 bta_dm_search_cancel_notify(); 966 break; 967 default: 968 log::info("Received unexpected event {}[0x{:x}] in state {}", 969 bta_dm_event_text(event), event, 970 bta_dm_state_text(bta_dm_search_get_state())); 971 } 972 break; 973 case BTA_DM_SEARCH_ACTIVE: 974 switch (event) { 975 case BTA_DM_REMT_NAME_EVT: 976 log::assert_that(std::holds_alternative<tBTA_DM_REMOTE_NAME>(*msg), 977 "bad message type: {}", msg->index()); 978 979 bta_dm_remote_name_cmpl(std::get<tBTA_DM_REMOTE_NAME>(*msg)); 980 break; 981 case BTA_DM_SEARCH_CMPL_EVT: 982 bta_dm_search_cmpl(); 983 break; 984 case BTA_DM_API_SEARCH_CANCEL_EVT: 985 bta_dm_search_clear_queue(); 986 bta_dm_search_set_state(BTA_DM_SEARCH_CANCELLING); 987 bta_dm_search_cancel(); 988 break; 989 default: 990 log::info("Received unexpected event {}[0x{:x}] in state {}", 991 bta_dm_event_text(event), event, 992 bta_dm_state_text(bta_dm_search_get_state())); 993 } 994 break; 995 case BTA_DM_SEARCH_CANCELLING: 996 switch (event) { 997 case BTA_DM_API_SEARCH_EVT: 998 log::assert_that(std::holds_alternative<tBTA_DM_API_SEARCH>(*msg), 999 "bad message type: {}", msg->index()); 1000 1001 bta_dm_queue_search(std::get<tBTA_DM_API_SEARCH>(*msg)); 1002 break; 1003 case BTA_DM_API_SEARCH_CANCEL_EVT: 1004 bta_dm_search_clear_queue(); 1005 bta_dm_search_cancel_notify(); 1006 break; 1007 case BTA_DM_REMT_NAME_EVT: 1008 case BTA_DM_SEARCH_CMPL_EVT: 1009 bta_dm_search_set_state(BTA_DM_SEARCH_IDLE); 1010 bta_dm_search_cancel_notify(); 1011 bta_dm_execute_queued_search_request(); 1012 break; 1013 default: 1014 log::info("Received unexpected event {}[0x{:x}] in state {}", 1015 bta_dm_event_text(event), event, 1016 bta_dm_state_text(bta_dm_search_get_state())); 1017 } 1018 break; 1019 } 1020 } 1021 1022 static void bta_dm_disable_search(void) { 1023 switch (bta_dm_search_get_state()) { 1024 case BTA_DM_SEARCH_IDLE: 1025 break; 1026 case BTA_DM_SEARCH_ACTIVE: 1027 case BTA_DM_SEARCH_CANCELLING: 1028 default: 1029 log::debug( 1030 "Search state machine is not idle so issuing search cancel current " 1031 "state:{}", 1032 bta_dm_state_text(bta_dm_search_get_state())); 1033 bta_dm_search_cancel(); 1034 } 1035 } 1036 1037 void bta_dm_disc_start_device_discovery(tBTA_DM_SEARCH_CBACK* p_cback) { 1038 if (!com::android::bluetooth::flags:: 1039 separate_service_and_device_discovery()) { 1040 bta_dm_disc_legacy::bta_dm_disc_start_device_discovery(p_cback); 1041 return; 1042 } 1043 bta_dm_search_sm_execute(BTA_DM_API_SEARCH_EVT, 1044 std::make_unique<tBTA_DM_SEARCH_MSG>( 1045 tBTA_DM_API_SEARCH{.p_cback = p_cback})); 1046 } 分析一下这段代码的逻辑

# E:\AI_System\web_ui\server.py (完整可运行版) import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime from pathlib import Path from functools import wraps from concurrent.futures import ThreadPoolExecutor import logging.handlers # ========== 关键修复1: 最先执行eventlet猴子补丁 ========== try: import eventlet eventlet.monkey_patch() # 必须在所有导入之前执行 print("✅ Eventlet monkey patch applied at startup") except ImportError: print("⚠️ Eventlet not installed, using threading mode") pass # 修复1:更新依赖包列表 REQUIRED_PACKAGES = [ 'flask', 'flask_socketio', 'flask_limiter', 'psutil', 'waitress' ] def check_dependencies(): """增强依赖检查功能""" missing = [] for package in REQUIRED_PACKAGES: try: __import__(package) except ImportError: missing.append(package) if missing: print(f"❌ 缺少必要的依赖包: {', '.join(missing)}") print("请运行以下命令安装依赖:") print(f"pip install {' '.join(missing)}") sys.exit(1) if __name__ == '__main__': check_dependencies() # 在启动前检查依赖 # 现在导入其他模块 from flask import Flask, jsonify, request, render_template, send_from_directory from flask_socketio import SocketIO, emit from flask_limiter import Limiter from flask_limiter.util import get_remote_address # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '0.0.0.0' self.PORT = 5000 self.LOG_LEVEL = 'DEBUG' self.SECRET_KEY = os.getenv('SECRET_KEY', 'your_secret_key_here') self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' self.MAX_WORKERS = 4 # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.TEMPLATE_DIR = self.BASE_DIR / 'web_ui' / 'templates' self.STATIC_DIR = self.BASE_DIR / 'web_ui' / 'static' def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) def register_coordinator(coord): global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 线程安全装饰器 ========== def synchronized(lock): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): with lock: return func(*args, **kwargs) return wrapper return decorator # ========== 日志系统 ========== def setup_logger(): """优化日志配置""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG)) # 清除所有现有处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = logging.handlers.TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 设置Flask和SocketIO日志 flask_logger = logging.getLogger('werkzeug') flask_logger.setLevel(logging.WARNING) socketio_logger = logging.getLogger('engineio') socketio_logger.setLevel(logging.WARNING) return logger logger = setup_logger() # ========== 环境管理器 ========== class EnvironmentManager: """独立的环境管理器类""" def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } self.healthy = True self.lock = threading.Lock() @synchronized(threading.Lock()) def start(self): logger.info("环境管理器已启动") @synchronized(threading.Lock()) def get_state(self): # 更新模拟数据 self.state['temperature'] = round(20 + 5 * (time.time() % 10) / 10, 1) self.state['humidity'] = round(40 + 10 * (time.time() % 10) / 10, 1) self.state['light_level'] = round(70 + 10 * (time.time() % 10) / 10, 1) self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state @synchronized(threading.Lock()) def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False def is_healthy(self): return self.healthy # ========== 系统初始化 ========== class SystemInitializer: def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None self.life_lock = threading.Lock() def initialize_system_paths(self): sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path} - 已跳过") def initialize_environment_manager(self): try: env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): logger.info("✅ 模拟AI核心初始化") self.ai_core = type('AICore', (), { 'status': 'running', 'get_state': lambda: {"status": "running", "model": "gpt-3.5-turbo"} })() def initialize_hardware_manager(self): logger.info("✅ 模拟硬件管理器初始化") self.hardware_manager = type('HardwareManager', (), { 'get_status': lambda: { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "gpu_usage": 0 } })() @synchronized(lock=threading.Lock()) def initialize_life_scheduler(self): logger.info("✅ 模拟生活调度器初始化") self.life_scheduler = type('LifeScheduler', (), { 'get_status': lambda: { "current_activity": "thinking", "next_activity": "learning", "energy": 85 } })() @synchronized(lock=threading.Lock()) def initialize_ai_agent(self): logger.info("✅ 模拟AI智能体初始化") self.ai_agent = type('AIAgent', (), { 'process_input': lambda self, input, user_id: f"你好{user_id},我收到了你的消息: '{input}'" })() def start_evolution_monitor(self): logger.info("✅ 模拟进化监视器启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) @app.config['LIMITER'].limit("10 per minute") def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) @app.config['LIMITER'].limit("5 per minute") def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 路由注册 ========== def register_routes(app): # 添加根路由 - 关键修复 @app.route('/') def home(): """根路由显示欢迎页面""" current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") return f""" <!DOCTYPE html> <html> <head> <title>AI系统控制中心</title> <style> body {{ font-family: Arial, sans-serif; margin: 40px; background-color: #f5f8fa; }} .container {{ max-width: 800px; margin: 0 auto; padding: 20px; background: white; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }} h1 {{ color: #2c3e50; text-align: center; margin-bottom: 30px; }} .status {{ background: #f8f9fa; padding: 20px; border-radius: 8px; margin-top: 30px; border-left: 4px solid #3498db; }} .links {{ margin-top: 20px; display: flex; justify-content: center; flex-wrap: wrap; gap: 15px; }} .links a {{ display: inline-block; padding: 12px 25px; background: #3498db; color: white; text-decoration: none; border-radius: 4px; transition: all 0.3s ease; text-align: center; min-width: 150px; }} .links a:hover {{ background: #2980b9; transform: translateY(-2px); box-shadow: 0 4px 8px rgba(0,0,0,0.1); }} .footer {{ text-align: center; margin-top: 30px; color: #7f8c8d; font-size: 0.9em; }} </style> </head> <body> 🤖 AI系统控制中心 欢迎访问AI系统控制面板,请选择以下功能: 🌍 环境监控 📅 生活调度 📊 系统状态 🩺 健康检查 💬 聊天交互 系统状态 ✅ 服务器已启动,运行正常 🕒 启动时间: {current_time} 📍 服务器地址: http://{request.host} AI系统控制中心 v1.0 | 技术支持: [email protected] </body> </html> """ register_environment_routes(app) # 静态文件路由 @app.route('/static/') def static_files(filename): return send_from_directory(app.static_folder, filename) # 健康检查路由 @app.route('/health') def health_check(): return jsonify({"status": "healthy", "timestamp": datetime.datetime.now().isoformat()}) # 系统状态路由 @app.route('/status') @app.config['LIMITER'].exempt def status(): components = app.config['SYSTEM_COMPONENTS'] system_info = { "uptime": time.time() - app.config['START_TIME'], "ai_core_status": components['ai_core'].status if components['ai_core'] else "uninitialized", "hardware_status": components['hardware_manager'].get_status() if components[ 'hardware_manager'] else "uninitialized", "life_scheduler_status": components['life_scheduler'].get_status() if components[ 'life_scheduler'] else "uninitialized", "environment_status": components['environment_manager'].is_healthy() if components[ 'environment_manager'] else "uninitialized", "platform": platform.platform(), "python_version": sys.version, "memory_usage": psutil.virtual_memory().percent, "cpu_usage": psutil.cpu_percent(), "thread_count": threading.active_count(), "process_id": os.getpid() } return jsonify(system_info) # 核心系统路由 @app.route('/api/core/state') @app.config['LIMITER'].limit("10 per minute") def get_core_state(): ai_core = app.config['SYSTEM_COMPONENTS'].get('ai_core') if not ai_core: return jsonify({"error": "AI核心未初始化"}), 503 return jsonify(ai_core.get_state()) # 生活系统路由 @app.route('/life/dashboard') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') @app.config['LIMITER'].limit("10 per minute") def get_life_status(): life_scheduler = app.config['SYSTEM_COMPONENTS'].get('life_scheduler') if not life_scheduler: return jsonify({"error": "生活调度器未初始化"}), 503 status = life_scheduler.get_status() return jsonify(status) # 聊天路由 @app.route('/chat', methods=['GET']) def chat_interface(): """聊天界面""" return render_template('chat.html') @app.route('/api/chat', methods=['POST']) @app.config['LIMITER'].limit("30 per minute") def chat_handler(): """处理聊天请求的API端点""" components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") # 使用线程池异步处理 future = executor.submit(components['ai_agent'].process_input, user_input, user_id) response = future.result(timeout=10) # 10秒超时 return jsonify({"response": response}) except TimeoutError: return jsonify({"error": "处理超时"}), 504 except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 404处理 @app.route('/') def catch_all(path): return jsonify({"error": "路由不存在", "path": path}), 404 def register_error_handlers(app): @app.errorhandler(404) def not_found_error(error): return jsonify({"error": "资源未找到", "message": str(error)}), 404 @app.errorhandler(500) def internal_error(error): app.logger.error(f"服务器内部错误: {str(error)}") return jsonify({"error": "服务器内部错误", "message": "请查看日志获取详细信息"}), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 使用线程池处理消息 def process_message(): try: global coordinator if coordinator: return coordinator.process_message(message) else: return f"已收到您的消息: {message}" except Exception as e: logger.error(f"消息处理失败: {str(e)}") return "处理消息时出错" future = executor.submit(process_message) try: response = future.result(timeout=10) socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) except TimeoutError: socketio.emit('agent_response', { 'user_id': user_id, 'response': "处理超时,请重试" }) # ========== 生产环境启动器 ========== def run_production_server(app): try: from waitress import serve logger.info(f"🚀 生产服务器启动: http://{config.HOST}:{config.PORT}") logger.warning("⚠️ 当前运行在生产模式 (Waitress WSGI服务器)") serve(app, host=config.HOST, port=config.PORT, threads=8) except ImportError: logger.error("❌ 缺少生产环境依赖: waitress") logger.info("请运行: pip install waitress") sys.exit(1) # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder=str(config.TEMPLATE_DIR), static_folder=str(config.STATIC_DIR), static_url_path='/static' ) app.secret_key = config.SECRET_KEY # 初始化限流器 limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://" ) app.config['LIMITER'] = limiter system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 配置SocketIO async_mode = 'threading' try: import eventlet async_mode = 'eventlet' logger.info("✅ 使用eventlet异步模式") except ImportError: logger.warning("⚠️ eventlet未安装,使用threading模式") pass # 注册路由和错误处理 register_routes(app) register_error_handlers(app) # 创建SocketIO实例 socketio = SocketIO(app, async_mode=async_mode, logger=False, engineio_logger=False) setup_websocket_handlers(socketio) app.config['SOCKETIO'] = socketio return app, socketio # ========== 主程序入口 ========== if __name__ == '__main__': app, socketio = create_app() # 启动服务器 if os.environ.get('ENV') == 'production': run_production_server(app) else: logger.info(f"🚀 开发服务器启动: http://{config.HOST}:{config.PORT}") socketio.run( app, host=config.HOST, port=config.PORT, debug=config.DEBUG, use_reloader=False ) # E:\AI_System\agent\autonomous_agent.py import os import sys import time import logging import importlib import traceback import psutil import platform import threading import json from pathlib import Path from dotenv import load_dotenv from typing import Dict, Any, Optional, List, Callable from concurrent.futures import ThreadPoolExecutor from ..core.config import system_config # 使用绝对导入 - 确保路径正确 sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from core.config import system_config from core.exceptions import DependencyError, SubsystemFailure from core.dependency_manager import DependencyManager from core.metrics import PerformanceMetrics, MetricsCollector # 全局线程池 executor = ThreadPoolExecutor(max_workers=4) class AutonomousAgent: def __init__(self): """自主智能体核心类,负责协调所有子系统""" self.logger = self._setup_logger() self.logger.info("🔁 初始化自主智能体核心模块...") self._running = False # 运行状态标志 self._background_thread = None # 后台线程 # 初始化状态跟踪 self.initialization_steps = [] self._last_env_check = 0 self._initialization_time = time.time() self.subsystem_status = {} # 子系统熔断状态 self.metrics = MetricsCollector() # 性能监控 self._status_lock = threading.Lock() # 状态锁 # 依赖管理器 self.dependency_manager = DependencyManager() try: # 记录初始化步骤 self._record_step("加载环境变量") load_dotenv() self._record_step("验证环境") self.verify_environment() self._record_step("初始化核心组件") self._initialize_core_components() self._record_step("初始化子系统") self._initialize_subsystems() self.logger.info(f"✅ 自主智能体初始化完成 (耗时: {time.time() - self._initialization_time:.2f}秒)") self.logger.info(f"初始化步骤: {', '.join(self.initialization_steps)}") # 启动后台任务线程 self._start_background_tasks() except Exception as e: self.logger.exception(f"❌ 智能体初始化失败: {str(e)}") self.logger.error(f"堆栈跟踪:\n{traceback.format_exc()}") raise RuntimeError(f"智能体初始化失败: {str(e)}") from e def _start_background_tasks(self): """启动后台任务线程""" if self._running: self.logger.warning("后台任务已在运行") return self._running = True self._background_thread = threading.Thread( target=self._background_task_loop, daemon=True, name="AutonomousAgentBackgroundTasks" ) self._background_thread.start() self.logger.info("✅ 后台任务线程已启动") def _background_task_loop(self): """后台任务循环""" while self._running: try: start_time = time.time() self.run_periodic_tasks() # 动态调整睡眠时间 task_time = time.time() - start_time sleep_time = max(0.1, 10 - task_time) # 确保至少10秒间隔 time.sleep(sleep_time) except Exception as e: self.logger.error(f"后台任务错误: {str(e)}") self.metrics.record_error('background_task') time.sleep(30) def _record_step(self, step_name: str): """记录初始化步骤""" self.initialization_steps.append(step_name) self.logger.info(f"⏳ 步骤 {len(self.initialization_steps)}: {step_name}") def verify_environment(self): """验证运行环境是否满足要求""" missing = [] warnings = [] # 检查必需模块 required_modules = [ 'os', 'sys', 'logging', 'dotenv', 'flask', 'werkzeug', 'numpy', 'transformers', 'torch', 'psutil' ] for mod in required_modules: try: importlib.import_module(mod) except ImportError: missing.append(mod) # 检查配置文件 if not hasattr(system_config, 'CONFIG_PATH') or not os.path.exists(system_config.CONFIG_PATH): self.logger.error(f"❌ 配置文件缺失: {system_config.CONFIG_PATH}") warnings.append(f"配置文件缺失: {system_config.CONFIG_PATH}") # 检查模型目录 - 如果不存在则创建 model_dir = Path(system_config.MODEL_CACHE_DIR) if not model_dir.exists(): model_dir.mkdir(parents=True, exist_ok=True) self.logger.warning(f"⚠️ 创建模型缓存目录: {model_dir}") # 检查日志目录 - 如果不存在则创建 log_dir = Path(system_config.LOG_DIR) if not log_dir.exists(): log_dir.mkdir(parents=True, exist_ok=True) self.logger.warning(f"⚠️ 创建日志目录: {log_dir}") # 处理警告 for warning in warnings: self.logger.warning(warning) # 处理缺失项 if missing: error_msg = f"环境验证失败,缺失: {', '.join(missing)}" self.logger.error(error_msg) self.dependency_manager.record_missing_dependencies(missing) raise DependencyError(error_msg) self.logger.info("✅ 环境验证通过") def _setup_logger(self) -> logging.Logger: """配置日志记录器""" logger = logging.getLogger('AutonomousAgent') logger.setLevel(system_config.LOG_LEVEL) # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(system_config.LOG_LEVEL) # 创建文件处理器 log_file = Path(system_config.LOG_DIR) / 'autonomous_agent.log' file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(system_config.LOG_LEVEL) # 创建格式化器 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(console_handler) logger.addHandler(file_handler) logger.propagate = False return logger def _initialize_core_components(self): """初始化不依赖其他组件的核心组件""" # 获取项目根目录 base_dir = Path(__file__).resolve().parent.parent # 环境相关组件 - 使用回退实现 self.environment = self._create_fallback_environment(base_dir) self.logger.info("✅ 环境接口初始化完成") # 记录环境状态 self._log_environment_status() # 初始化状态持久化 self._load_subsystem_status() def _create_fallback_environment(self, base_dir: Path): """创建回退的环境实现""" class FallbackEnvironment: def __init__(self, base_dir): self.base_dir = base_dir self.status_file = base_dir / 'environment_status.json' def get_system_info(self): try: # 尝试从文件加载状态 if self.status_file.exists(): with open(self.status_file, 'r') as f: return json.load(f) except: pass # 创建新状态 status = { "os": platform.system(), "os_version": platform.version(), "cpu": platform.processor(), "cpu_cores": psutil.cpu_count(logical=False), "memory_total": round(psutil.virtual_memory().total / (1024 ** 3), 1), "memory_used": round(psutil.virtual_memory().used / (1024 ** 3), 1), "disk_total": round(psutil.disk_usage('/').total / (1024 ** 3), 1), "disk_used": round(psutil.disk_usage('/').used / (1024 ** 3), 1), "timestamp": time.time() } # 保存状态 try: with open(self.status_file, 'w') as f: json.dump(status, f) except: pass return status return FallbackEnvironment(base_dir) def _log_environment_status(self): """记录环境状态信息""" try: env_status = self.environment.get_system_info() or {} self.logger.info( f"📊 系统状态: OS={env_status.get('os', '未知')} {env_status.get('os_version', '')}, " f"CPU={env_status.get('cpu', '未知')} ({env_status.get('cpu_cores', 0)}核), " f"内存={env_status.get('memory_used', 0)}/{env_status.get('memory_total', 0)}GB, " f"磁盘={env_status.get('disk_used', 0)}/{env_status.get('disk_total', 0)}GB" ) except Exception as e: self.logger.error(f"环境状态获取失败: {str(e)}") self.metrics.record_error('environment_status') def _initialize_subsystems(self): """初始化所有子系统 - 使用动态导入并添加详细错误处理""" # 定义子系统初始化顺序 - 使用更简单的回退实现 subsystems = [ ('健康系统', self._create_fallback_health_system, {}), ('模型管理器', self._create_fallback_model_manager, {}), ('记忆系统', self._create_fallback_memory_system, {}), ('情感系统', self._create_fallback_affective_system, {}), ('认知架构', self._create_fallback_cognitive_architecture, {}), ('通信系统', self._create_fallback_communication_system, {}) ] # 注册子系统依赖关系 self.dependency_manager.register_dependency('通信系统', ['认知架构']) self.dependency_manager.register_dependency('情感系统', ['健康系统', '记忆系统']) self.dependency_manager.register_dependency('认知架构', ['记忆系统']) # 初始化子系统 for name, creator_func, kwargs in subsystems: try: # 检查依赖是否满足 missing_deps = self.dependency_manager.check_dependencies(name) if missing_deps: self.logger.warning(f"⚠️ 子系统 {name} 缺少依赖: {', '.join(missing_deps)}") # 尝试自动安装缺失依赖 self.dependency_manager.install_missing_dependencies(missing_deps) # 创建实例 instance = creator_func(**kwargs) setattr(self, name.lower().replace(' ', '_'), instance) self.logger.info(f"✅ {name}初始化完成") # 标记子系统为活跃状态 with self._status_lock: self.subsystem_status[name] = { 'active': True, 'error_count': 0, 'last_active': time.time(), 'last_recovery_attempt': 0 } except Exception as e: self.logger.error(f"❌ {name}初始化失败: {str(e)}") with self._status_lock: self.subsystem_status[name] = { 'active': False, 'error': str(e), 'error_count': 1, 'last_error': time.time() } # 记录指标 self.metrics.record_error(f'subsystem_init_{name.lower()}') # 保存子系统状态 self._save_subsystem_status() # 各子系统回退实现保持不变... def process_input(self, user_input: str, user_id: str = "default") -> Dict[str, Any]: """处理用户输入(通过通信系统)""" # 检查通信系统是否活跃 with self._status_lock: comm_status = self.subsystem_status.get('通信系统', {}) active = comm_status.get('active', False) if not active: self.logger.error("通信系统未激活,使用回退处理") self.metrics.record_error('communication_system_inactive') return {"response": "系统正在维护中,请稍后再试"} try: # 使用性能监控 with PerformanceMetrics() as pm: # 使用线程池异步处理 future = executor.submit( self.communication_system.process_input, user_input, user_id ) response = future.result(timeout=10) # 10秒超时 # 记录性能指标 self.metrics.record_latency('process_input', pm.duration) self.metrics.record_success('process_input') self.logger.info(f"📥 处理输入: '{user_input[:30]}...' → 耗时: {pm.duration:.2f}秒") return response except TimeoutError: self.logger.warning("处理输入超时") self.metrics.record_timeout('process_input') return {"error": "处理超时,请重试"} except Exception as e: # 更新错误计数 with self._status_lock: comm_status = self.subsystem_status.get('通信系统', {}) comm_status['error_count'] = comm_status.get('error_count', 0) + 1 comm_status['last_error'] = time.time() # 检查熔断条件 if comm_status['error_count'] >= 5: # 临时阈值 comm_status['active'] = False self.logger.critical(f"🚨 通信系统因连续错误被熔断!") self.metrics.record_event('circuit_breaker', '通信系统') self.logger.error(f"处理输入失败: {str(e)}") self.metrics.record_error('process_input') return {"error": "处理失败,请稍后再试"} def run_periodic_tasks(self): """运行周期性任务""" task_start = time.time() tasks_executed = 0 tasks_failed = 0 # 定义任务列表 tasks = [ ('健康系统更新', lambda: self.health_system.update()), ('情感系统更新', lambda: self.affective_system.grow()), ('记忆系统维护', lambda: self.memory_system.consolidate_memories()), ('环境监控', self._monitor_environment), ('子系统心跳检查', self._check_subsystem_heartbeats), ('子系统恢复', self._recover_failed_subsystems) ] # 执行任务 for name, task_func in tasks: try: if name == '环境监控' or self._is_subsystem_active(name.split()[0]): task_func() tasks_executed += 1 except Exception as e: tasks_failed += 1 subsystem_name = name.split()[0] self.logger.error(f"{name}失败: {str(e)}", exc_info=True) self._handle_subsystem_error(subsystem_name, e) self.metrics.record_error(f'periodic_{subsystem_name.lower()}') # 记录任务执行情况 if tasks_executed > 0: task_time = time.time() - task_start self.logger.debug(f"⏱️ 执行 {tasks_executed} 项周期性任务 ({tasks_failed}失败), 耗时: {task_time:.3f}秒") self.metrics.record_latency('periodic_tasks', task_time) self.metrics.record_value('periodic_tasks_count', tasks_executed) self.metrics.record_value('periodic_tasks_failed', tasks_failed) def _is_subsystem_active(self, name: str) -> bool: """检查子系统是否活跃""" with self._status_lock: status = self.subsystem_status.get(name, {}) return status.get('active', False) def _handle_subsystem_error(self, name: str, error: Exception): """处理子系统错误""" with self._status_lock: status = self.subsystem_status.get(name, {}) status['error_count'] = status.get('error_count', 0) + 1 status['last_error'] = time.time() # 检查熔断条件 if status['error_count'] >= 5: # 临时阈值 status['active'] = False self.logger.critical(f"🚨 子系统 {name} 因连续错误被熔断!") self.metrics.record_event('circuit_breaker', name) def _check_subsystem_heartbeats(self): """检查子系统心跳""" for name in list(self.subsystem_status.keys()): with self._status_lock: status = self.subsystem_status.get(name, {}) if not status.get('active', False): continue # 跳过已熔断的 subsystem = getattr(self, name.lower().replace(' ', '_'), None) if subsystem and hasattr(subsystem, 'check_heartbeat'): try: if not subsystem.check_heartbeat(): self.logger.warning(f"⚠️ 子系统 {name} 心跳检测失败") self._handle_subsystem_error(name, RuntimeError("心跳检测失败")) else: # 更新最后活跃时间 with self._status_lock: status['last_active'] = time.time() except Exception as e: self.logger.error(f"子系统 {name} 心跳检查异常: {str(e)}") self._handle_subsystem_error(name, e) self.metrics.record_error(f'heartbeat_{name.lower()}') def _recover_failed_subsystems(self): """尝试恢复失败的子系统""" for name in list(self.subsystem_status.keys()): with self._status_lock: status = self.subsystem_status.get(name, {}) if status.get('active', False): continue # 跳过活跃的 # 检查恢复条件:错误后至少等待5分钟 last_error = status.get('last_error', 0) if time.time() - last_error < 300: continue # 检查上次恢复尝试时间 last_attempt = status.get('last_recovery_attempt', 0) if time.time() - last_attempt < 600: # 每10分钟尝试一次 continue self.logger.info(f"🔄 尝试恢复子系统: {name}") status['last_recovery_attempt'] = time.time() try: # 尝试重新初始化子系统 # 这里需要根据子系统名称调用相应的初始化方法 # 简化实现:直接重置状态 subsystem = self._reinitialize_subsystem(name) setattr(self, name.lower().replace(' ', '_'), subsystem) with self._status_lock: status['active'] = True status['error_count'] = 0 status['last_error'] = 0 self.logger.info(f"✅ 子系统 {name} 恢复成功") self.metrics.record_event('subsystem_recovered', name) except Exception as e: with self._status_lock: status['active'] = False status['error_count'] += 1 status['last_error'] = time.time() self.logger.error(f"子系统 {name} 恢复失败: {str(e)}") self.metrics.record_error(f'recovery_{name.lower()}') def _reinitialize_subsystem(self, name: str) -> Any: """重新初始化子系统""" # 根据名称选择初始化方法 creators = { '健康系统': self._create_fallback_health_system, '模型管理器': self._create_fallback_model_manager, '记忆系统': self._create_fallback_memory_system, '情感系统': self._create_fallback_affective_system, '认知架构': self._create_fallback_cognitive_architecture, '通信系统': self._create_fallback_communication_system } if name in creators: return creators[name]() else: raise SubsystemFailure(f"未知子系统: {name}") def _monitor_environment(self): """监控环境状态""" try: self.logger.info("🔍 开始环境监控...") env_status = self.environment.get_system_info() or {} # 获取CPU和内存使用情况 env_status['cpu_usage'] = psutil.cpu_percent() env_status['memory_usage'] = psutil.virtual_memory().percent env_status['disk_usage'] = psutil.disk_usage('/').percent # 记录到日志 self.logger.info( f"📊 环境监控: CPU={env_status['cpu_usage']}%, " f"内存={env_status['memory_usage']}%, " f"磁盘={env_status['disk_usage']}%" ) # 记录到健康系统 if hasattr(self, 'health_system'): self.health_system.record_environment_status(env_status) # 记录指标 self.metrics.record_value('cpu_usage', env_status['cpu_usage']) self.metrics.record_value('memory_usage', env_status['memory_usage']) self.metrics.record_value('disk_usage', env_status['disk_usage']) except Exception as e: self.logger.error(f"环境监控失败: {str(e)}", exc_info=True) self.metrics.record_error('environment_monitoring') def _save_subsystem_status(self): """保存子系统状态到文件""" status_file = Path(system_config.CONFIG_DIR) / 'subsystem_status.json' try: with self._status_lock: data = { 'timestamp': time.time(), 'status': self.subsystem_status } with open(status_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: self.logger.error(f"保存子系统状态失败: {str(e)}") def _load_subsystem_status(self): """从文件加载子系统状态""" status_file = Path(system_config.CONFIG_DIR) / 'subsystem_status.json' if status_file.exists(): try: with open(status_file, 'r') as f: data = json.load(f) # 只加载24小时内的状态 if time.time() - data.get('timestamp', 0) < 86400: with self._status_lock: self.subsystem_status = data.get('status', {}) self.logger.info("加载子系统状态缓存") except Exception as e: self.logger.error(f"加载子系统状态失败: {str(e)}") def get_status(self) -> Dict[str, Any]: """获取智能体状态报告""" with self._status_lock: status_data = { "uptime": time.time() - self._initialization_time, "subsystems": { name: info.get('active', False) for name, info in self.subsystem_status.items() }, "circuit_breaker": { name: { "active": info.get('active', False), "error_count": info.get('error_count', 0), "last_error": info.get('last_error', 0) } for name, info in self.subsystem_status.items() }, "metrics": self.metrics.get_metrics(), "environment": self.environment.get_system_info() if hasattr(self, 'environment') else {} } # 添加子系统状态 for name in ['健康系统', '情感系统', '记忆系统', '模型管理器', '认知架构', '通信系统']: attr_name = name.lower().replace(' ', '_') if hasattr(self, attr_name) and hasattr(getattr(self, attr_name), 'get_status'): status_data[name] = getattr(self, attr_name).get_status() return status_data def shutdown(self): """关闭智能体""" self.logger.info("🛑 正在关闭智能体...") self._running = False # 停止线程池 executor.shutdown(wait=False) # 保存状态 self._save_subsystem_status() # 等待后台线程 if self._background_thread and self._background_thread.is_alive(): self._background_thread.join(timeout=5.0) if self._background_thread.is_alive(): self.logger.warning("后台线程未正常退出") self.logger.info("✅ 智能体已关闭") 你看看怎么修改 主要是哪个模块没启动 哪里有问题 我需要知道 你理解吗?

没看懂 是改这个文件吗:“# E:\AI_System\web_ui\server.py (完整修复版) import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime import subprocess from pathlib import Path from functools import wraps from concurrent.futures import ThreadPoolExecutor import logging.handlers # ========== 关键修复1: 最先执行eventlet猴子补丁 ========== try: import eventlet eventlet.monkey_patch() # 必须在所有导入之前执行 print("✅ Eventlet monkey patch applied at startup") except ImportError: print("⚠️ Eventlet not installed, using threading mode") pass # 修复1:更新依赖包列表 REQUIRED_PACKAGES = [ 'flask', 'flask_socketio', 'flask_limiter', 'psutil', 'eventlet', 'waitress' ] def check_dependencies(): """增强依赖检查功能""" missing = [] for package in REQUIRED_PACKAGES: try: __import__(package) except ImportError: missing.append(package) if missing: print(f"❌ 缺少必要的依赖包: {', '.join(missing)}") print("请运行以下命令安装依赖:") print(f"pip install {' '.join(missing)}") sys.exit(1) if __name__ == '__main__': check_dependencies() # 在启动前检查依赖 # 现在导入其他模块 from flask import Flask, jsonify, request, render_template from flask_socketio import SocketIO, emit from flask_limiter import Limiter from flask_limiter.util import get_remote_address # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '0.0.0.0' self.PORT = 5000 self.LOG_LEVEL = 'DEBUG' self.SECRET_KEY = os.getenv('SECRET_KEY', 'your_secret_key_here') self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' self.MAX_WORKERS = 4 # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.TEMPLATE_DIR = self.BASE_DIR / 'web_ui' / 'templates' def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) def register_coordinator(coord): global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 线程安全装饰器 ========== def synchronized(lock): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): with lock: return func(*args, **kwargs) return wrapper return decorator # ========== 日志系统 ========== def setup_logger(): """优化日志配置""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG)) # 清除所有现有处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = logging.handlers.TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 设置Flask和SocketIO日志 flask_logger = logging.getLogger('werkzeug') flask_logger.setLevel(logging.WARNING) socketio_logger = logging.getLogger('engineio') socketio_logger.setLevel(logging.WARNING) return logger logger = setup_logger() # ========== 环境管理器 ========== class EnvironmentManager: """独立的环境管理器类""" def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } self.healthy = True self.lock = threading.Lock() @synchronized(threading.Lock()) def start(self): logger.info("环境管理器已启动") @synchronized(threading.Lock()) def get_state(self): # 更新模拟数据 self.state['temperature'] = round(20 + 5 * (time.time() % 10) / 10, 1) self.state['humidity'] = round(40 + 10 * (time.time() % 10) / 10, 1) self.state['light_level'] = round(70 + 10 * (time.time() % 10) / 10, 1) self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state @synchronized(threading.Lock()) def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False def is_healthy(self): return self.healthy # ========== 系统初始化 ========== class SystemInitializer: def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None self.life_lock = threading.Lock() def initialize_system_paths(self): sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path} - 已跳过") def initialize_environment_manager(self): try: env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): logger.info("✅ 模拟AI核心初始化") self.ai_core = type('AICore', (), { 'status': 'running', 'get_state': lambda: {"status": "running", "model": "gpt-3.5-turbo"} })() def initialize_hardware_manager(self): logger.info("✅ 模拟硬件管理器初始化") self.hardware_manager = type('HardwareManager', (), { 'get_status': lambda: { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "gpu_usage": 0 } })() @synchronized(lock=threading.Lock()) def initialize_life_scheduler(self): logger.info("✅ 模拟生活调度器初始化") self.life_scheduler = type('LifeScheduler', (), { 'get_status': lambda: { "current_activity": "thinking", "next_activity": "learning", "energy": 85 } })() @synchronized(lock=threading.Lock()) def initialize_ai_agent(self): logger.info("✅ 模拟AI智能体初始化") self.ai_agent = type('AIAgent', (), { 'process_input': lambda self, input, user_id: f"你好{user_id},我收到了你的消息: '{input}'" })() def start_evolution_monitor(self): logger.info("✅ 模拟进化监视器启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) @app.config['LIMITER'].limit("10 per minute") def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) @app.config['LIMITER'].limit("5 per minute") def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) # 健康检查路由 @app.route('/health') def health_check(): return jsonify({"status": "healthy", "timestamp": datetime.datetime.now().isoformat()}) # 系统状态路由 @app.route('/status') @app.config['LIMITER'].exempt def status(): components = app.config['SYSTEM_COMPONENTS'] system_info = { "uptime": time.time() - app.config['START_TIME'], "ai_core_status": components['ai_core'].status if components['ai_core'] else "uninitialized", "hardware_status": components['hardware_manager'].get_status() if components[ 'hardware_manager'] else "uninitialized", "life_scheduler_status": components['life_scheduler'].get_status() if components[ 'life_scheduler'] else "uninitialized", "environment_status": components['environment_manager'].is_healthy() if components[ 'environment_manager'] else "uninitialized", "platform": platform.platform(), "python_version": sys.version, "memory_usage": psutil.virtual_memory().percent, "cpu_usage": psutil.cpu_percent(), "thread_count": threading.active_count(), "process_id": os.getpid() } return jsonify(system_info) # 核心系统路由 @app.route('/api/core/state') @app.config['LIMITER'].limit("10 per minute") def get_core_state(): ai_core = app.config['SYSTEM_COMPONENTS'].get('ai_core') if not ai_core: return jsonify({"error": "AI核心未初始化"}), 503 return jsonify(ai_core.get_state()) # 生活系统路由 @app.route('/life/dashboard') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') @app.config['LIMITER'].limit("10 per minute") def get_life_status(): life_scheduler = app.config['SYSTEM_COMPONENTS'].get('life_scheduler') if not life_scheduler: return jsonify({"error": "生活调度器未初始化"}), 503 status = life_scheduler.get_status() return jsonify(status) # 聊天路由 @app.route('/chat', methods=['POST']) @app.config['LIMITER'].limit("30 per minute") def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") # 使用线程池异步处理 future = executor.submit(components['ai_agent'].process_input, user_input, user_id) response = future.result(timeout=10) # 10秒超时 return jsonify({"response": response}) except TimeoutError: return jsonify({"error": "处理超时"}), 504 except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 404处理 @app.route('/') def catch_all(path): return jsonify({"error": "路由不存在", "path": path}), 404 def register_error_handlers(app): @app.errorhandler(404) def not_found_error(error): return jsonify({"error": "资源未找到", "message": str(error)}), 404 @app.errorhandler(500) def internal_error(error): app.logger.error(f"服务器内部错误: {str(error)}") return jsonify({"error": "服务器内部错误", "message": "请查看日志获取详细信息"}), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 使用线程池处理消息 def process_message(): try: global coordinator if coordinator: return coordinator.process_message(message) else: return f"已收到您的消息: {message}" except Exception as e: logger.error(f"消息处理失败: {str(e)}") return "处理消息时出错" future = executor.submit(process_message) try: response = future.result(timeout=10) socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) except TimeoutError: socketio.emit('agent_response', { 'user_id': user_id, 'response': "处理超时,请重试" }) # ========== 生产环境启动器 ========== def run_production_server(app): try: from waitress import serve logger.info(f"🚀 生产服务器启动: http://{config.HOST}:{config.PORT}") logger.warning("⚠️ 当前运行在生产模式 (Waitress WSGI服务器)") serve(app, host=config.HOST, port=config.PORT, threads=8) except ImportError: logger.error("❌ 缺少生产环境依赖: waitress") logger.info("请运行: pip install waitress") sys.exit(1) # ========== Flask应用工厂 (关键修复) ========== def create_app(): app = Flask( __name__, template_folder=str(config.TEMPLATE_DIR), static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY # 初始化限流器 (修复点: 先创建LIMITER并添加到配置) limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://" ) app.config['LIMITER'] = limiter # 关键修复: 先配置LIMITER system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 配置SocketIO async_mode = 'threading' try: import eventlet async_mode = 'eventlet' logger.info("✅ 使用eventlet异步模式") except ImportError: logger.warning("⚠️ eventlet未安装,使用threading模式,性能可能受影响") socketio = SocketIO( app, cors_allowed_origins="*", async_mode=async_mode, logger=True, engineio_logger=True ) app.config['SOCKETIO'] = socketio # 注册路由和错误处理 register_routes(app) register_error_handlers(app) return app, socketio # ========== 主程序入口 ========== if __name__ == '__main__': try: app, socketio = create_app() setup_websocket_handlers(socketio) if config.DEBUG: logger.info(f"开发服务器运行在 http://{config.HOST}:{config.PORT}") socketio.run( app, host=config.HOST, port=config.PORT, debug=True, use_reloader=False, log_output=True ) else: run_production_server(app) except KeyboardInterrupt: logger.info("服务器关闭") executor.shutdown(wait=False) except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc()) executor.shutdown(wait=False) sys.exit(1) ”如果是 请把改好的完整版发给我 注意不要随便删减我原来的主要内容哦

# E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime from pathlib import Path from flask import Flask, jsonify, request, render_template from logging.handlers import TimedRotatingFileHandler from flask_socketio import SocketIO, emit # 在server.py中添加健康检查路由 from flask import jsonify @app.route('/system/health') def health_check(): """系统健康检查接口""" components = { 'ai_core': True, # 实际应替换为状态检测函数 'hardware_manager': True, 'scheduler': True, 'environment': True, 'evolution_monitor': True } status = all(components.values()) return jsonify( status="running" if status else "degraded", components=components, timestamp=datetime.now().isoformat() ) # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '127.0.0.1' self.PORT = 5000 self.LOG_LEVEL = 'INFO' self.SECRET_KEY = 'your_secret_key_here' self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None def register_coordinator(coord): """注册意识系统协调器""" global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): """更新UI事件处理""" if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 初始化日志系统 ========== def setup_logger(): """配置全局日志系统""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 安全日志处理装饰器 def safe_logger(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except UnicodeEncodeError: new_args = [] for arg in args: if isinstance(arg, str): new_args.append(arg.encode('ascii', 'ignore').decode('ascii')) else: new_args.append(arg) return func(*new_args, **kwargs) return wrapper # 应用安全日志处理 for level in ['debug', 'info', 'warning', 'error', 'critical']: setattr(logger, level, safe_logger(getattr(logger, level))) return logger # 初始化日志 logger = setup_logger() # ========== 系统初始化 ========== class SystemInitializer: """负责初始化系统核心组件""" def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None def initialize_system_paths(self): """初始化系统路径""" sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path}") def initialize_environment_manager(self): """初始化环境管理器""" try: # 环境管理器模拟实现 class EnvironmentManager: def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } def start(self): logger.info("环境管理器已启动") def get_state(self): # 更新模拟数据 self.state['temperature'] = 20 + 5 * (time.time() % 10) / 10 self.state['humidity'] = 40 + 10 * (time.time() % 10) / 10 self.state['light_level'] = 70 + 10 * (time.time() % 10) / 10 self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): """初始化AI核心系统""" class AICore: def __init__(self, base_dir): self.base_dir = Path(base_dir) self.genetic_code = self.load_genetic_code() self.physical_state = { "health": 100, "energy": 100, "mood": "calm" } self.dependencies = self.scan_dependencies() def load_genetic_code(self): code_path = self.base_dir / "core" / "genetic_code.py" try: if not code_path.exists(): # 创建默认遗传代码文件 default_code = """# 默认遗传代码 class AICore: def __init__(self): self.version = "1.0.0" self.capabilities = ["learning", "reasoning", "problem_solving"] def evolve(self): print("系统正在进化...") """ with open(code_path, "w", encoding="utf-8") as f: f.write(default_code) with open(code_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: logger.error(f"加载遗传代码失败: {str(e)}") return "# 默认遗传代码\nclass AICore:\n pass" def scan_dependencies(self): return { "nervous_system": "flask", "memory": "sqlite", "perception": "opencv", "reasoning": "transformers" } def mutate(self, new_code): try: code_path = self.base_dir / "core" / "genetic_code.py" with open(code_path, "w", encoding="utf-8") as f: f.write(new_code) return True, "核心代码更新成功,系统已进化" except Exception as e: return False, f"进化失败: {str(e)}" def wear_dependency(self, dependency_name, version): self.dependencies[dependency_name] = version return f"已穿戴 {dependency_name}@{version}" def get_state(self): return { "genetic_code_hash": hash(self.genetic_code), "dependencies": self.dependencies, "physical_state": self.physical_state, "hardware_environment": self.get_hardware_info() } def get_hardware_info(self): return { "cpu": platform.processor(), "cpu_cores": psutil.cpu_count(logical=False), "cpu_threads": psutil.cpu_count(logical=True), "memory_gb": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage_gb": round(psutil.disk_usage('/').total / (1024 ** 3), 1), "os": f"{platform.system()} {platform.release()}" } self.ai_core = AICore(self.base_dir) logger.info("✅ AI核心系统初始化完成") return self.ai_core def initialize_hardware_manager(self): """初始化硬件管理器""" try: # 硬件管理器模拟实现 class HardwareManager: def __init__(self): self.available_hardware = { "cpu": ["Intel i9-13900K", "AMD Ryzen 9 7950X", "Apple M2 Max"], "gpu": ["NVIDIA RTX 4090", "AMD Radeon RX 7900 XTX", "Apple M2 GPU"], "memory": [16, 32, 64, 128], "storage": ["1TB SSD", "2TB SSD", "4TB SSD", "8TB HDD"], "peripherals": ["4K Camera", "3D Scanner", "High-Fidelity Microphone"] } self.current_setup = { "cpu": platform.processor(), "gpu": "Integrated Graphics", "memory": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } def request_hardware(self, hardware_type, specification): if hardware_type not in self.available_hardware: return False, f"不支持硬件类型: {hardware_type}" if specification not in self.available_hardware[hardware_type]: return False, f"不支持的规格: {specification}" self.current_setup[hardware_type] = specification return True, f"已请求 {hardware_type}: {specification}。请管理员完成安装。" def get_current_setup(self): return self.current_setup def get_performance_metrics(self): return { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "cpu_temp": 45.0, "gpu_temp": 55.0, "network_io": { "sent": psutil.net_io_counters().bytes_sent, "received": psutil.net_io_counters().bytes_recv }, "last_updated": datetime.datetime.now().isoformat() } self.hardware_manager = HardwareManager() logger.info("✅ 硬件管理器初始化成功") return self.hardware_manager except Exception as e: logger.error(f"❌ 硬件管理器初始化失败: {str(e)}") logger.warning("⚠️ 使用内置简单硬件管理器") return None def initialize_life_scheduler(self): """初始化生活调度器""" try: # 生活调度器模拟实现 class LifeScheduler: def __init__(self): self.daily_schedule = { "wake_up": "07:00", "breakfast": "08:00", "lunch": "12:30", "dinner": "19:00", "sleep": "23:00" } self.current_activity = "awake" self.energy_level = 100 self.mood = "calm" self.activity_log = [] def wake_up(self): self.current_activity = "awake" self.log_activity("醒来") def have_meal(self, meal_type): self.current_activity = f"eating_{meal_type}" self.energy_level = min(100, self.energy_level + 20) self.log_activity(f"用餐: {meal_type}") def go_to_sleep(self): self.current_activity = "sleeping" self.log_activity("睡觉") def log_activity(self, activity): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.activity_log.append(f"{timestamp} - {activity}") # 保留最近的100条记录 if len(self.activity_log) > 100: self.activity_log.pop(0) def adjust_schedule(self, adjustments): for activity, new_time in adjustments.items(): if activity in self.daily_schedule: self.daily_schedule[activity] = new_time def get_current_state(self): return { "current_activity": self.current_activity, "next_scheduled": self._get_next_scheduled(), "energy_level": self.energy_level, "mood": self.mood } def get_recent_activities(self, count=10): return self.activity_log[-count:] def _get_next_scheduled(self): now = datetime.datetime.now() current_time = now.strftime("%H:%M") # 找到下一个计划活动 schedule_times = sorted( [(k, v) for k, v in self.daily_schedule.items()], key=lambda x: x[1] ) for activity, time_str in schedule_times: if time_str > current_time: return f"{activity} at {time_str}" # 如果没有找到,返回第二天的第一个活动 return f"{schedule_times[0][0]} at {schedule_times[0][1]} tomorrow" self.life_scheduler = LifeScheduler() logger.info("✅ 生活调度器初始化成功") # 启动生活系统后台线程 life_thread = threading.Thread( target=self._update_life_status, daemon=True, name="LifeSystemThread" ) life_thread.start() logger.info("✅ 生活系统后台线程已启动") return self.life_scheduler except Exception as e: logger.error(f"❌ 生活调度器初始化失败: {str(e)}") return None def _update_life_status(self): logger.info("🚦 生活系统后台线程启动") while True: try: now = datetime.datetime.now() current_hour = now.hour current_minute = now.minute current_time = f"{current_hour:02d}:{current_minute:02d}" # 根据时间更新活动状态 if 7 <= current_hour < 8 and self.life_scheduler.current_activity != "awake": self.life_scheduler.wake_up() elif 12 <= current_hour < 13 and self.life_scheduler.current_activity != "eating_lunch": self.life_scheduler.have_meal("lunch") elif 19 <= current_hour < 20 and self.life_scheduler.current_activity != "eating_dinner": self.life_scheduler.have_meal("dinner") elif (23 <= current_hour or current_hour < 6) and self.life_scheduler.current_activity != "sleeping": self.life_scheduler.go_to_sleep() # 自然能量消耗 if self.life_scheduler.current_activity != "sleeping": self.life_scheduler.energy_level = max(0, self.life_scheduler.energy_level - 0.1) time.sleep(60) except Exception as e: logger.error(f"生活系统更新失败: {str(e)}", exc_info=True) time.sleep(300) def initialize_ai_agent(self): """初始化AI智能体""" try: # AI智能体模拟实现 class AutonomousAgent: def __init__(self, model_path, cache_dir, use_gpu, default_model): self.model_name = default_model self.cache_dir = cache_dir self.use_gpu = use_gpu self.conversation_history = {} def process_input(self, user_input, user_id): # 初始化用户对话历史 if user_id not in self.conversation_history: self.conversation_history[user_id] = [] # 添加用户输入到历史 self.conversation_history[user_id].append({"role": "user", "content": user_input}) # 生成AI响应(模拟) if "你好" in user_input or "hello" in user_input: response = "你好!我是AI助手,有什么可以帮您的吗?" elif "时间" in user_input: response = f"现在是 {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" elif "状态" in user_input: response = "系统运行正常,所有组件都在线。" else: response = f"我已收到您的消息: '{user_input}'。这是一个模拟响应,实际系统中我会分析您的问题并提供专业解答。" # 添加AI响应到历史 self.conversation_history[user_id].append({"role": "assistant", "content": response}) return response def get_status(self): return { "model": self.model_name, "cache_dir": str(self.cache_dir), "use_gpu": self.use_gpu, "active_conversations": len(self.conversation_history) } self.ai_agent = AutonomousAgent( model_path=config.AGENT_PATH, cache_dir=config.MODEL_CACHE_DIR, use_gpu=config.USE_GPU, default_model=config.DEFAULT_MODEL ) logger.info("✅ AI智能体初始化成功") return self.ai_agent except Exception as e: logger.error(f"❌ AI智能体初始化失败: {str(e)}") return None def start_evolution_monitor(self): def monitor(): while True: try: cpu_usage = psutil.cpu_percent() mem_usage = psutil.virtual_memory().percent # 根据系统负载调整AI状态 if cpu_usage > 80: self.ai_core.physical_state["energy"] = max(20, self.ai_core.physical_state["energy"] - 5) self.ai_core.physical_state["mood"] = "strained" elif cpu_usage < 30: self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state["energy"] + 2) time.sleep(60) except Exception as e: logging.error(f"进化监控错误: {str(e)}") time.sleep(300) monitor_thread = threading.Thread(target=monitor, daemon=True) monitor_thread.start() logger.info("✅ 进化监控线程已启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder='templates', static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 初始化SocketIO socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') app.config['SOCKETIO'] = socketio # 注册路由 register_routes(app) register_error_handlers(app) return app, socketio # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 环境状态广播 ========== def setup_environment_broadcast(app): socketio = app.config['SOCKETIO'] @socketio.on('connect', namespace='/environment') def handle_environment_connect(): app.logger.info('客户端已连接环境WebSocket') @socketio.on('disconnect', namespace='/environment') def handle_environment_disconnect(): app.logger.info('客户端已断开环境WebSocket') def broadcast_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return while True: try: state = env_manager.get_state() socketio.emit('environment_update', state, namespace='/environment') time.sleep(1) except Exception as e: app.logger.error(f"环境状态广播失败: {str(e)}") time.sleep(5) broadcast_thread = threading.Thread( target=broadcast_environment_state, daemon=True, name="EnvironmentBroadcastThread" ) broadcast_thread.start() app.logger.info("✅ 环境状态广播线程已启动") # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) setup_environment_broadcast(app) @app.route('/') def index(): return render_template('agent_interface.html') @app.route('/status') def status(): try: components = app.config['SYSTEM_COMPONENTS'] status_data = { "server": { "status": "running", "uptime": time.time() - app.config['START_TIME'], "version": "1.0.0", "config": { "host": config.HOST, "port": config.PORT, "log_level": config.LOG_LEVEL, "default_model": config.DEFAULT_MODEL } }, "core": components['ai_core'].get_state(), "hardware": components['hardware_manager'].get_current_setup() } if components['environment_manager']: try: status_data["environment"] = components['environment_manager'].get_state() except Exception as e: status_data["environment"] = {"error": str(e)} if components['life_scheduler']: try: status_data["life_system"] = components['life_scheduler'].get_current_state() except Exception as e: status_data["life_system"] = {"error": str(e)} if components['ai_agent']: try: status_data["agent"] = components['ai_agent'].get_status() except Exception as e: status_data["agent"] = {"error": str(e)} return jsonify(status_data) except Exception as e: app.logger.error(f"获取状态失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # 核心系统路由 @app.route('/api/core/state') def get_core_state(): return jsonify(app.config['SYSTEM_COMPONENTS']['ai_core'].get_state()) @app.route('/api/core/mutate', methods=['POST']) def mutate_core(): data = request.get_json() new_code = data.get('genetic_code') if not new_code: return jsonify({"success": False, "error": "缺少遗传代码"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['ai_core'].mutate(new_code) return jsonify({"success": success, "message": message}) @app.route('/api/core/wear', methods=['POST']) def wear_dependency(): data = request.get_json() dep_name = data.get('dependency') version = data.get('version', 'latest') if not dep_name: return jsonify({"success": False, "error": "缺少依赖名称"}), 400 result = app.config['SYSTEM_COMPONENTS']['ai_core'].wear_dependency(dep_name, version) return jsonify({"success": True, "message": result}) # 硬件管理路由 @app.route('/api/hardware/catalog') def get_hardware_catalog(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].available_hardware) @app.route('/api/hardware/request', methods=['POST']) def request_hardware(): data = request.get_json() hw_type = data.get('type') spec = data.get('specification') if not hw_type or not spec: return jsonify({"success": False, "error": "缺少硬件类型或规格"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['hardware_manager'].request_hardware(hw_type, spec) return jsonify({"success": success, "message": message}) @app.route('/api/hardware/current') def get_current_hardware(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].get_current_setup()) # 生活系统路由 @app.route('/life') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') def get_life_status(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: current_state = components['life_scheduler'].get_current_state() recent_activities = components['life_scheduler'].get_recent_activities(10) return jsonify({ "success": True, "current_activity": current_state.get("current_activity", "未知"), "next_scheduled": current_state.get("next_scheduled", "未知"), "recent_activities": recent_activities, "energy_level": current_state.get("energy_level", 100), "mood": current_state.get("mood", "平静") }) except Exception as e: app.logger.error(f"获取生活状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/adjust_schedule', methods=['POST']) def adjust_schedule(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: data = request.json adjustments = data.get("adjustments", {}) valid_activities = ["wake_up", "breakfast", "lunch", "dinner", "sleep"] for activity, new_time in adjustments.items(): if activity not in valid_activities: return jsonify({"success": False, "error": f"无效的活动类型: {activity}"}), 400 if not isinstance(new_time, str) or len(new_time) != 5 or new_time[2] != ':': return jsonify({"success": False, "error": f"无效的时间格式: {new_time}"}), 400 components['life_scheduler'].adjust_schedule(adjustments) return jsonify({ "success": True, "message": "计划表已更新", "new_schedule": components['life_scheduler'].daily_schedule }) except Exception as e: app.logger.error(f"调整作息时间失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 400 # 聊天路由 @app.route('/chat', methods=['POST']) def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") response = components['ai_agent'].process_input(user_input, user_id) return jsonify({"response": response}) except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 家具管理路由 furniture_cache = {} CACHE_DURATION = 3600 # 1小时 @app.route('/api/furniture') def get_furniture(): try: room = request.args.get('room', 'workshop') app.logger.info(f"获取家具数据: 房间={room}") current_time = time.time() if room in furniture_cache and current_time - furniture_cache[room]['timestamp'] < CACHE_DURATION: return jsonify(furniture_cache[room]['data']) furniture_data = { "workshop": [ {"type": "desk", "position": {"x": 0, "y": -1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "chair", "position": {"x": 0, "y": -1.5, "z": -1}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "bookshelf", "position": {"x": 3, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "computer", "position": {"x": 0.5, "y": -0.5, "z": -3.2}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "living_room": [ {"type": "sofa", "position": {"x": 0, "y": 0, "z": -2}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "tv", "position": {"x": 0, "y": 1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "bedroom": [ {"type": "bed", "position": {"x": 0, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "nightstand", "position": {"x": 1.5, "y": 0, "z": -2.5}, "rotation": {"x": 0, "y": 0, "z": 0}} ] } furniture_cache[room] = { 'timestamp': current_time, 'data': furniture_data.get(room, []) } return jsonify(furniture_cache[room]['data']) except Exception as e: app.logger.error(f"获取家具数据失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # ========== 错误处理器 ========== def register_error_handlers(app): @app.errorhandler(404) def page_not_found(error): app.logger.warning(f"404错误: {request.path}") return jsonify({ "error": "资源未找到", "path": request.path, "method": request.method }), 404 @app.errorhandler(500) def internal_server_error(error): app.logger.error(f"500错误: {str(error)}") return jsonify({ "error": "服务器内部错误", "message": "系统遇到意外错误,请稍后重试" }), 500 @app.errorhandler(Exception) def handle_general_exception(error): app.logger.error(f"未处理异常: {traceback.format_exc()}") return jsonify({ "error": "未处理的异常", "type": type(error).__name__, "message": str(error) }), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 处理消息逻辑 response = f"已收到您的消息: {message}" # 如果有协调器,使用协调器处理 global coordinator if coordinator: try: response = coordinator.process_message(message) except Exception as e: logger.error(f"协调器处理消息失败: {str(e)}") socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) # ========== 主程序入口 ========== if __name__ == '__main__': try: app, socketio = create_app() # 设置WebSocket处理器 setup_websocket_handlers(socketio) # 启动服务器 socketio.run( app, host=config.HOST, port=config.PORT, debug=config.DEBUG, use_reloader=False ) logger.info(f"服务器运行在 http://{config.HOST}:{config.PORT}") except KeyboardInterrupt: logger.info("服务器关闭") except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc()) 帮我改好 发我完整版哦

from data import COCODetection, get_label_map, MEANS, COLORS from yolact import Yolact from utils.augmentations import BaseTransform, FastBaseTransform, Resize from utils.functions import MovingAverage, ProgressBar from layers.box_utils import jaccard, center_size, mask_iou from utils import timer from utils.functions import SavePath from layers.output_utils import postprocess, undo_image_transformation import pycocotools from data import cfg, set_cfg, set_dataset import numpy as np import torch import torch.backends.cudnn as cudnn from torch.autograd import Variable import argparse import time import random import cProfile import pickle import json import os from collections import defaultdict from pathlib import Path from collections import OrderedDict from PIL import Image import matplotlib.pyplot as plt import cv2 def str2bool(v): if v.lower() in ('yes', 'true', 't', 'y', '1'): return True elif v.lower() in ('no', 'false', 'f', 'n', '0'): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') def parse_args(argv=None): parser = argparse.ArgumentParser( description='YOLACT COCO Evaluation') parser.add_argument('--trained_model', default='weights/yolact_base_105_101798_interrupt.pth', type=str, help='Trained state_dict file path to open. If "interrupt", this will open the interrupt file.') parser.add_argument('--top_k', default=5, type=int, help='Further restrict the number of predictions to parse') parser.add_argument('--cuda', default=True, type=str2bool, help='Use cuda to evaulate model') parser.add_argument('--fast_nms', default=True, type=str2bool, help='Whether to use a faster, but not entirely correct version of NMS.') parser.add_argument('--cross_class_nms', default=False, type=str2bool, help='Whether compute NMS cross-class or per-class.') parser.add_argument('--display_masks', default=True, type=str2bool, help='Whether or not to display masks over bounding boxes') parser.add_argument('--display_bboxes', default=True, type=str2bool, help='Whether or not to display bboxes around masks') parser.add_argument('--display_text', default=True, type=str2bool, help='Whether or not to display text (class [score])') parser.add_argument('--display_scores', default=True, type=str2bool, help='Whether or not to display scores in addition to classes') parser.add_argument('--display', dest='display', action='store_true', help='Display qualitative results instead of quantitative ones.') parser.add_argument('--shuffle', dest='shuffle', action='store_true', help='Shuffles the images when displaying them. Doesn\'t have much of an effect when display is off though.') parser.add_argument('--ap_data_file', default='results/ap_data.pkl', type=str, help='In quantitative mode, the file to save detections before calculating mAP.') parser.add_argument('--resume', dest='resume', action='store_true', help='If display not set, this resumes mAP calculations from the ap_data_file.') parser.add_argument('--max_images', default=-1, type=int, help='The maximum number of images from the dataset to consider. Use -1 for all.') parser.add_argument('--output_coco_json', dest='output_coco_json', action='store_true', help='If display is not set, instead of processing IoU values, this just dumps detections into the coco json file.') parser.add_argument('--bbox_det_file', default='results/bbox_detections.json', type=str, help='The output file for coco bbox results if --coco_results is set.') parser.add_argument('--mask_det_file', default='results/mask_detections.json', type=str, help='The output file for coco mask results if --coco_results is set.') parser.add_argument('--config', default=None, help='The config object to use.') parser.add_argument('--output_web_json', dest='output_web_json', action='store_true', help='If display is not set, instead of processing IoU values, this dumps detections for usage with the detections viewer web thingy.') parser.add_argument('--web_det_path', default='web/dets/', type=str, help='If output_web_json is set, this is the path to dump detections into.') parser.add_argument('--no_bar', dest='no_bar', action='store_true', help='Do not output the status bar. This is useful for when piping to a file.') parser.add_argument('--display_lincomb', default=False, type=str2bool, help='If the config uses lincomb masks, output a visualization of how those masks are created.') parser.add_argument('--benchmark', default=False, dest='benchmark', action='store_true', help='Equivalent to running display mode but without displaying an image.') parser.add_argument('--no_sort', default=False, dest='no_sort', action='store_true', help='Do not sort images by hashed image ID.') parser.add_argument('--seed', default=None, type=int, help='The seed to pass into random.seed. Note: this is only really for the shuffle and does not (I think) affect cuda stuff.') parser.add_argument('--mask_proto_debug', default=False, dest='mask_proto_debug', action='store_true', help='Outputs stuff for scripts/compute_mask.py.') parser.add_argument('--no_crop', default=False, dest='crop', action='store_false', help='Do not crop output masks with the predicted bounding box.') parser.add_argument('--image', default=None, type=str, help='A path to an image to use for display.') parser.add_argument('--images', default='E:/yolact-master/coco/images/train2017', type=str, help='Input and output paths separated by a colon.') parser.add_argument('--video', default=None, type=str, help='A path to a video to evaluate on. Passing in a number will use that index webcam.') parser.add_argument('--video_multiframe', default=1, type=int, help='The number of frames to evaluate in parallel to make videos play at higher fps.') parser.add_argument('--score_threshold', default=0.15, type=float, help='Detections with a score under this threshold will not be considered. This currently only works in display mode.') parser.add_argument('--dataset', default=None, type=str, help='If specified, override the dataset specified in the config with this one (example: coco2017_dataset).') parser.add_argument('--detect', default=False, dest='detect', action='store_true', help='Don\'t evauluate the mask branch at all and only do object detection. This only works for --display and --benchmark.') parser.add_argument('--display_fps', default=False, dest='display_fps', action='store_true', help='When displaying / saving video, draw the FPS on the frame') parser.add_argument('--emulate_playback', default=False, dest='emulate_playback', action='store_true', help='When saving a video, emulate the framerate that you\'d get running in real-time mode.') parser.set_defaults(no_bar=False, display=False, resume=False, output_coco_json=False, output_web_json=False, shuffle=False, benchmark=False, no_sort=False, no_hash=False, mask_proto_debug=False, crop=True, detect=False, display_fps=False, emulate_playback=False) global args args = parser.parse_args(argv) if args.output_web_json: args.output_coco_json = True if args.seed is not None: random.seed(args.seed) iou_thresholds = [x / 100 for x in range(50, 100, 5)] coco_cats = {} # Call prep_coco_cats to fill this coco_cats_inv = {} color_cache = defaultdict(lambda: {}) def prep_display(dets_out, img, h, w, undo_transform=True, class_color=False, mask_alpha=0.45, fps_str=''): """ Note: If undo_transform=False then im_h and im_w are allowed to be None. """ if undo_transform: img_numpy = undo_image_transformation(img, w, h) img_gpu = torch.Tensor(img_numpy).cuda() else: img_gpu = img / 255.0 h, w, _ = img.shape with timer.env('Postprocess'): save = cfg.rescore_bbox cfg.rescore_bbox = True t = postprocess(dets_out, w, h, visualize_lincomb = args.display_lincomb, crop_masks = args.crop, score_threshold = args.score_threshold) cfg.rescore_bbox = save with timer.env('Copy'): idx = t[1].argsort(0, descending=True)[:args.top_k] if cfg.eval_mask_branch: # Masks are drawn on the GPU, so don't copy masks = t[3][idx] classes, scores, boxes = [x[idx].cpu().numpy() for x in t[:3]] num_dets_to_consider = min(args.top_k, classes.shape[0]) for j in range(num_dets_to_consider): if scores[j] < args.score_threshold: num_dets_to_consider = j break # Quick and dirty lambda for selecting the color for a particular index # Also keeps track of a per-gpu color cache for maximum speed def get_color(j, on_gpu=None): global color_cache color_idx = (classes[j] * 5 if class_color else j * 5) % len(COLORS) if on_gpu is not None and color_idx in color_cache[on_gpu]: return color_cache[on_gpu][color_idx] else: color = COLORS[color_idx] if not undo_transform: # The image might come in as RGB or BRG, depending color = (color[2], color[1], color[0]) if on_gpu is not None: color = torch.Tensor(color).to(on_gpu).float() / 255. color_cache[on_gpu][color_idx] = color return color # First, draw the masks on the GPU where we can do it really fast # Beware: very fast but possibly unintelligible mask-drawing code ahead # I wish I had access to OpenGL or Vulkan but alas, I guess Pytorch tensor operations will have to suffice if args.display_masks and cfg.eval_mask_branch and num_dets_to_consider > 0: # After this, mask is of size [num_dets, h, w, 1] masks = masks[:num_dets_to_consider, :, :, None] # Prepare the RGB images for each mask given their color (size [num_dets, h, w, 1]) colors = torch.cat([get_color(j, on_gpu=img_gpu.device.index).view(1, 1, 1, 3) for j in range(num_dets_to_consider)], dim=0) masks_color = masks.repeat(1, 1, 1, 3) * colors * mask_alpha # This is 1 everywhere except for 1-mask_alpha where the mask is inv_alph_masks = masks * (-mask_alpha) + 1 # I did the math for this on pen and paper. This whole block should be equivalent to: # for j in range(num_dets_to_consider): # img_gpu = img_gpu * inv_alph_masks[j] + masks_color[j] masks_color_summand = masks_color[0] if num_dets_to_consider > 1: inv_alph_cumul = inv_alph_masks[:(num_dets_to_consider-1)].cumprod(dim=0) masks_color_cumul = masks_color[1:] * inv_alph_cumul masks_color_summand += masks_color_cumul.sum(dim=0) img_gpu = img_gpu * inv_alph_masks.prod(dim=0) + masks_color_summand if args.display_fps: # Draw the box for the fps on the GPU font_face = cv2.FONT_HERSHEY_DUPLEX font_scale = 0.6 font_thickness = 1 text_w, text_h = cv2.getTextSize(fps_str, font_face, font_scale, font_thickness)[0] img_gpu[0:text_h+8, 0:text_w+8] *= 0.6 # 1 - Box alpha # Then draw the stuff that needs to be done on the cpu # Note, make sure this is a uint8 tensor or opencv will not anti alias text for whatever reason img_numpy = (img_gpu * 255).byte().cpu().numpy() if args.display_fps: # Draw the text on the CPU text_pt = (4, text_h + 2) text_color = [255, 255, 255] cv2.putText(img_numpy, fps_str, text_pt, font_face, font_scale, text_color, font_thickness, cv2.LINE_AA) if num_dets_to_consider == 0: return img_numpy if args.display_text or args.display_bboxes: for j in reversed(range(num_dets_to_consider)): x1, y1, x2, y2 = boxes[j, :] color = get_color(j) score = scores[j] if args.display_bboxes: cv2.rectangle(img_numpy, (x1, y1), (x2, y2), color, 1) if args.display_text: _class = cfg.dataset.class_names[classes[j]] text_str = '%s: %.2f' % (_class, score) if args.display_scores else _class font_face = cv2.FONT_HERSHEY_DUPLEX font_scale = 0.6 font_thickness = 1 text_w, text_h = cv2.getTextSize(text_str, font_face, font_scale, font_thickness)[0] text_pt = (x1, y1 - 3) text_color = [255, 255, 255] cv2.rectangle(img_numpy, (x1, y1), (x1 + text_w, y1 - text_h - 4), color, -1) cv2.putText(img_numpy, text_str, text_pt, font_face, font_scale, text_color, font_thickness, cv2.LINE_AA) return img_numpy def prep_benchmark(dets_out, h, w): with timer.env('Postprocess'): t = postprocess(dets_out, w, h, crop_masks=args.crop, score_threshold=args.score_threshold) with timer.env('Copy'): classes, scores, boxes, masks = [x[:args.top_k] for x in t] if isinstance(scores, list): box_scores = scores[0].cpu().numpy() mask_scores = scores[1].cpu().numpy() else: scores = scores.cpu().numpy() classes = classes.cpu().numpy() boxes = boxes.cpu().numpy() masks = masks.cpu().numpy() with timer.env('Sync'): # Just in case torch.cuda.synchronize() def prep_coco_cats(): """ Prepare inverted table for category id lookup given a coco cats object. """ for coco_cat_id, transformed_cat_id_p1 in get_label_map().items(): transformed_cat_id = transformed_cat_id_p1 - 1 coco_cats[transformed_cat_id] = coco_cat_id coco_cats_inv[coco_cat_id] = transformed_cat_id def get_coco_cat(transformed_cat_id): """ transformed_cat_id is [0,80) as indices in cfg.dataset.class_names """ return coco_cats[transformed_cat_id] def get_transformed_cat(coco_cat_id): """ transformed_cat_id is [0,80) as indices in cfg.dataset.class_names """ return coco_cats_inv[coco_cat_id] class Detections: def __init__(self): self.bbox_data = [] self.mask_data = [] def add_bbox(self, image_id:int, category_id:int, bbox:list, score:float): """ Note that bbox should be a list or tuple of (x1, y1, x2, y2) """ bbox = [bbox[0], bbox[1], bbox[2]-bbox[0], bbox[3]-bbox[1]] # Round to the nearest 10th to avoid huge file sizes, as COCO suggests bbox = [round(float(x)*10)/10 for x in bbox] self.bbox_data.append({ 'image_id': int(image_id), 'category_id': get_coco_cat(int(category_id)), 'bbox': bbox, 'score': float(score) }) def add_mask(self, image_id:int, category_id:int, segmentation:np.ndarray, score:float): """ The segmentation should be the full mask, the size of the image and with size [h, w]. """ rle = pycocotools.mask.encode(np.asfortranarray(segmentation.astype(np.uint8))) rle['counts'] = rle['counts'].decode('ascii') # json.dump doesn't like bytes strings self.mask_data.append({ 'image_id': int(image_id), 'category_id': get_coco_cat(int(category_id)), 'segmentation': rle, 'score': float(score) }) def dump(self): dump_arguments = [ (self.bbox_data, args.bbox_det_file), (self.mask_data, args.mask_det_file) ] for data, path in dump_arguments: with open(path, 'w') as f: json.dump(data, f) def dump_web(self): """ Dumps it in the format for my web app. Warning: bad code ahead! """ config_outs = ['preserve_aspect_ratio', 'use_prediction_module', 'use_yolo_regressors', 'use_prediction_matching', 'train_masks'] output = { 'info' : { 'Config': {key: getattr(cfg, key) for key in config_outs}, } } image_ids = list(set([x['image_id'] for x in self.bbox_data])) image_ids.sort() image_lookup = {_id: idx for idx, _id in enumerate(image_ids)} output['images'] = [{'image_id': image_id, 'dets': []} for image_id in image_ids] # These should already be sorted by score with the way prep_metrics works. for bbox, mask in zip(self.bbox_data, self.mask_data): image_obj = output['images'][image_lookup[bbox['image_id']]] image_obj['dets'].append({ 'score': bbox['score'], 'bbox': bbox['bbox'], 'category': cfg.dataset.class_names[get_transformed_cat(bbox['category_id'])], 'mask': mask['segmentation'], }) with open(os.path.join(args.web_det_path, '%s.json' % cfg.name), 'w') as f: json.dump(output, f) def _mask_iou(mask1, mask2, iscrowd=False): with timer.env('Mask IoU'): ret = mask_iou(mask1, mask2, iscrowd) return ret.cpu() def _bbox_iou(bbox1, bbox2, iscrowd=False): with timer.env('BBox IoU'): ret = jaccard(bbox1, bbox2, iscrowd) return ret.cpu() def prep_metrics(ap_data, dets, img, gt, gt_masks, h, w, num_crowd, image_id, detections:Detections=None): """ Returns a list of APs for this image, with each element being for a class """ if not args.output_coco_json: with timer.env('Prepare gt'): gt_boxes = torch.Tensor(gt[:, :4]) gt_boxes[:, [0, 2]] *= w gt_boxes[:, [1, 3]] *= h gt_classes = list(gt[:, 4].astype(int)) gt_masks = torch.Tensor(gt_masks).view(-1, h*w) if num_crowd > 0: split = lambda x: (x[-num_crowd:], x[:-num_crowd]) crowd_boxes , gt_boxes = split(gt_boxes) crowd_masks , gt_masks = split(gt_masks) crowd_classes, gt_classes = split(gt_classes) with timer.env('Postprocess'): classes, scores, boxes, masks = postprocess(dets, w, h, crop_masks=args.crop, score_threshold=args.score_threshold) if classes.size(0) == 0: return classes = list(classes.cpu().numpy().astype(int)) if isinstance(scores, list): box_scores = list(scores[0].cpu().numpy().astype(float)) mask_scores = list(scores[1].cpu().numpy().astype(float)) else: scores = list(scores.cpu().numpy().astype(float)) box_scores = scores mask_scores = scores masks = masks.view(-1, h*w).cuda() boxes = boxes.cuda() if args.output_coco_json: with timer.env('JSON Output'): boxes = boxes.cpu().numpy() masks = masks.view(-1, h, w).cpu().numpy() for i in range(masks.shape[0]): # Make sure that the bounding box actually makes sense and a mask was produced if (boxes[i, 3] - boxes[i, 1]) * (boxes[i, 2] - boxes[i, 0]) > 0: detections.add_bbox(image_id, classes[i], boxes[i,:], box_scores[i]) detections.add_mask(image_id, classes[i], masks[i,:,:], mask_scores[i]) return with timer.env('Eval Setup'): num_pred = len(classes) num_gt = len(gt_classes) mask_iou_cache = _mask_iou(masks, gt_masks) bbox_iou_cache = _bbox_iou(boxes.float(), gt_boxes.float()) if num_crowd > 0: crowd_mask_iou_cache = _mask_iou(masks, crowd_masks, iscrowd=True) crowd_bbox_iou_cache = _bbox_iou(boxes.float(), crowd_boxes.float(), iscrowd=True) else: crowd_mask_iou_cache = None crowd_bbox_iou_cache = None box_indices = sorted(range(num_pred), key=lambda i: -box_scores[i]) mask_indices = sorted(box_indices, key=lambda i: -mask_scores[i]) iou_types = [ ('box', lambda i,j: bbox_iou_cache[i, j].item(), lambda i,j: crowd_bbox_iou_cache[i,j].item(), lambda i: box_scores[i], box_indices), ('mask', lambda i,j: mask_iou_cache[i, j].item(), lambda i,j: crowd_mask_iou_cache[i,j].item(), lambda i: mask_scores[i], mask_indices) ] timer.start('Main loop') for _class in set(classes + gt_classes): ap_per_iou = [] num_gt_for_class = sum([1 for x in gt_classes if x == _class]) for iouIdx in range(len(iou_thresholds)): iou_threshold = iou_thresholds[iouIdx] for iou_type, iou_func, crowd_func, score_func, indices in iou_types: gt_used = [False] * len(gt_classes) ap_obj = ap_data[iou_type][iouIdx][_class] ap_obj.add_gt_positives(num_gt_for_class) for i in indices: if classes[i] != _class: continue max_iou_found = iou_threshold max_match_idx = -1 for j in range(num_gt): if gt_used[j] or gt_classes[j] != _class: continue iou = iou_func(i, j) if iou > max_iou_found: max_iou_found = iou max_match_idx = j if max_match_idx >= 0: gt_used[max_match_idx] = True ap_obj.push(score_func(i), True) else: # If the detection matches a crowd, we can just ignore it matched_crowd = False if num_crowd > 0: for j in range(len(crowd_classes)): if crowd_classes[j] != _class: continue iou = crowd_func(i, j) if iou > iou_threshold: matched_crowd = True break # All this crowd code so that we can make sure that our eval code gives the # same result as COCOEval. There aren't even that many crowd annotations to # begin with, but accuracy is of the utmost importance. if not matched_crowd: ap_obj.push(score_func(i), False) timer.stop('Main loop') class APDataObject: """ Stores all the information necessary to calculate the AP for one IoU and one class. Note: I type annotated this because why not. """ def __init__(self): self.data_points = [] self.num_gt_positives = 0 def push(self, score:float, is_true:bool): self.data_points.append((score, is_true)) def add_gt_positives(self, num_positives:int): """ Call this once per image. """ self.num_gt_positives += num_positives def is_empty(self) -> bool: return len(self.data_points) == 0 and self.num_gt_positives == 0 def get_ap(self) -> float: """ Warning: result not cached. """ if self.num_gt_positives == 0: return 0 # Sort descending by score self.data_points.sort(key=lambda x: -x[0]) precisions = [] recalls = [] num_true = 0 num_false = 0 # Compute the precision-recall curve. The x axis is recalls and the y axis precisions. for datum in self.data_points: # datum[1] is whether the detection a true or false positive if datum[1]: num_true += 1 else: num_false += 1 precision = num_true / (num_true + num_false) recall = num_true / self.num_gt_positives precisions.append(precision) recalls.append(recall) # Smooth the curve by computing [max(precisions[i:]) for i in range(len(precisions))] # Basically, remove any temporary dips from the curve. # At least that's what I think, idk. COCOEval did it so I do too. for i in range(len(precisions)-1, 0, -1): if precisions[i] > precisions[i-1]: precisions[i-1] = precisions[i] # Compute the integral of precision(recall) d_recall from recall=0->1 using fixed-length riemann summation with 101 bars. y_range = [0] * 101 # idx 0 is recall == 0.0 and idx 100 is recall == 1.00 x_range = np.array([x / 100 for x in range(101)]) recalls = np.array(recalls) # I realize this is weird, but all it does is find the nearest precision(x) for a given x in x_range. # Basically, if the closest recall we have to 0.01 is 0.009 this sets precision(0.01) = precision(0.009). # I approximate the integral this way, because that's how COCOEval does it. indices = np.searchsorted(recalls, x_range, side='left') for bar_idx, precision_idx in enumerate(indices): if precision_idx < len(precisions): y_range[bar_idx] = precisions[precision_idx] # Finally compute the riemann sum to get our integral. # avg([precision(x) for x in 0:0.01:1]) return sum(y_range) / len(y_range) def badhash(x): """ Just a quick and dirty hash function for doing a deterministic shuffle based on image_id. Source: https://stackoverflow.com/questions/664014/what-integer-hash-function-are-good-that-accepts-an-integer-hash-key """ x = (((x >> 16) ^ x) * 0x045d9f3b) & 0xFFFFFFFF x = (((x >> 16) ^ x) * 0x045d9f3b) & 0xFFFFFFFF x = ((x >> 16) ^ x) & 0xFFFFFFFF return x def evalimage(net:Yolact, path:str, save_path:str=None): frame = torch.from_numpy(cv2.imread(path)).cuda().float() batch = FastBaseTransform()(frame.unsqueeze(0)) preds = net(batch) img_numpy = prep_display(preds, frame, None, None, undo_transform=False) if save_path is None: img_numpy = img_numpy[:, :, (2, 1, 0)] if save_path is None: plt.imshow(img_numpy) plt.title(path) plt.show() else: cv2.imwrite(save_path, img_numpy) def evalimages(net:Yolact, input_folder:str, output_folder:str): if not os.path.exists(output_folder): os.mkdir(output_folder) print() for p in Path(input_folder).glob('*'): path = str(p) name = os.path.basename(path) name = '.'.join(name.split('.')[:-1]) + '.png' out_path = os.path.join(output_folder, name) evalimage(net, path, out_path) print(path + ' -> ' + out_path) print('Done.') from multiprocessing.pool import ThreadPool from queue import Queue class CustomDataParallel(torch.nn.DataParallel): """ A Custom Data Parallel class that properly gathers lists of dictionaries. """ def gather(self, outputs, output_device): # Note that I don't actually want to convert everything to the output_device return sum(outputs, []) def evalvideo(net:Yolact, path:str, out_path:str=None): # If the path is a digit, parse it as a webcam index is_webcam = path.isdigit() # If the input image size is constant, this make things faster (hence why we can use it in a video setting). cudnn.benchmark = True if is_webcam: vid = cv2.VideoCapture(int(path)) else: vid = cv2.VideoCapture(path) if not vid.isOpened(): print('Could not open video "%s"' % path) exit(-1) target_fps = round(vid.get(cv2.CAP_PROP_FPS)) frame_width = round(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = round(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) if is_webcam: num_frames = float('inf') else: num_frames = round(vid.get(cv2.CAP_PROP_FRAME_COUNT)) net = CustomDataParallel(net).cuda() transform = torch.nn.DataParallel(FastBaseTransform()).cuda() frame_times = MovingAverage(100) fps = 0 frame_time_target = 1 / target_fps running = True fps_str = '' vid_done = False frames_displayed = 0 if out_path is not None: out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), target_fps, (frame_width, frame_height)) def cleanup_and_exit(): print() pool.terminate() vid.release() if out_path is not None: out.release() cv2.destroyAllWindows() exit() def get_next_frame(vid): frames = [] for idx in range(args.video_multiframe): frame = vid.read()[1] if frame is None: return frames frames.append(frame) return frames def transform_frame(frames): with torch.no_grad(): frames = [torch.from_numpy(frame).cuda().float() for frame in frames] return frames, transform(torch.stack(frames, 0)) def eval_network(inp): with torch.no_grad(): frames, imgs = inp num_extra = 0 while imgs.size(0) < args.video_multiframe: imgs = torch.cat([imgs, imgs[0].unsqueeze(0)], dim=0) num_extra += 1 out = net(imgs) if num_extra > 0: out = out[:-num_extra] return frames, out def prep_frame(inp, fps_str): with torch.no_grad(): frame, preds = inp return prep_display(preds, frame, None, None, undo_transform=False, class_color=True, fps_str=fps_str) frame_buffer = Queue() video_fps = 0 # All this timing code to make sure that def play_video(): try: nonlocal frame_buffer, running, video_fps, is_webcam, num_frames, frames_displayed, vid_done video_frame_times = MovingAverage(100) frame_time_stabilizer = frame_time_target last_time = None stabilizer_step = 0.0005 progress_bar = ProgressBar(30, num_frames) while running: frame_time_start = time.time() if not frame_buffer.empty(): next_time = time.time() if last_time is not None: video_frame_times.add(next_time - last_time) video_fps = 1 / video_frame_times.get_avg() if out_path is None: cv2.imshow(path, frame_buffer.get()) else: out.write(frame_buffer.get()) frames_displayed += 1 last_time = next_time if out_path is not None: if video_frame_times.get_avg() == 0: fps = 0 else: fps = 1 / video_frame_times.get_avg() progress = frames_displayed / num_frames * 100 progress_bar.set_val(frames_displayed) print('\rProcessing Frames %s %6d / %6d (%5.2f%%) %5.2f fps ' % (repr(progress_bar), frames_displayed, num_frames, progress, fps), end='') # This is split because you don't want savevideo to require cv2 display functionality (see #197) if out_path is None and cv2.waitKey(1) == 27: # Press Escape to close running = False if not (frames_displayed < num_frames): running = False if not vid_done: buffer_size = frame_buffer.qsize() if buffer_size < args.video_multiframe: frame_time_stabilizer += stabilizer_step elif buffer_size > args.video_multiframe: frame_time_stabilizer -= stabilizer_step if frame_time_stabilizer < 0: frame_time_stabilizer = 0 new_target = frame_time_stabilizer if is_webcam else max(frame_time_stabilizer, frame_time_target) else: new_target = frame_time_target next_frame_target = max(2 * new_target - video_frame_times.get_avg(), 0) target_time = frame_time_start + next_frame_target - 0.001 # Let's just subtract a millisecond to be safe if out_path is None or args.emulate_playback: # This gives more accurate timing than if sleeping the whole amount at once while time.time() < target_time: time.sleep(0.001) else: # Let's not starve the main thread, now time.sleep(0.001) except: # See issue #197 for why this is necessary import traceback traceback.print_exc() extract_frame = lambda x, i: (x[0][i] if x[1][i]['detection'] is None else x[0][i].to(x[1][i]['detection']['box'].device), [x[1][i]]) # Prime the network on the first frame because I do some thread unsafe things otherwise print('Initializing model... ', end='') first_batch = eval_network(transform_frame(get_next_frame(vid))) print('Done.') # For each frame the sequence of functions it needs to go through to be processed (in reversed order) sequence = [prep_frame, eval_network, transform_frame] pool = ThreadPool(processes=len(sequence) + args.video_multiframe + 2) pool.apply_async(play_video) active_frames = [{'value': extract_frame(first_batch, i), 'idx': 0} for i in range(len(first_batch[0]))] print() if out_path is None: print('Press Escape to close.') try: while vid.isOpened() and running: # Hard limit on frames in buffer so we don't run out of memory >.> while frame_buffer.qsize() > 100: time.sleep(0.001) start_time = time.time() # Start loading the next frames from the disk if not vid_done: next_frames = pool.apply_async(get_next_frame, args=(vid,)) else: next_frames = None if not (vid_done and len(active_frames) == 0): # For each frame in our active processing queue, dispatch a job # for that frame using the current function in the sequence for frame in active_frames: _args = [frame['value']] if frame['idx'] == 0: _args.append(fps_str) frame['value'] = pool.apply_async(sequence[frame['idx']], args=_args) # For each frame whose job was the last in the sequence (i.e. for all final outputs) for frame in active_frames: if frame['idx'] == 0: frame_buffer.put(frame['value'].get()) # Remove the finished frames from the processing queue active_frames = [x for x in active_frames if x['idx'] > 0] # Finish evaluating every frame in the processing queue and advanced their position in the sequence for frame in list(reversed(active_frames)): frame['value'] = frame['value'].get() frame['idx'] -= 1 if frame['idx'] == 0: # Split this up into individual threads for prep_frame since it doesn't support batch size active_frames += [{'value': extract_frame(frame['value'], i), 'idx': 0} for i in range(1, len(frame['value'][0]))] frame['value'] = extract_frame(frame['value'], 0) # Finish loading in the next frames and add them to the processing queue if next_frames is not None: frames = next_frames.get() if len(frames) == 0: vid_done = True else: active_frames.append({'value': frames, 'idx': len(sequence)-1}) # Compute FPS frame_times.add(time.time() - start_time) fps = args.video_multiframe / frame_times.get_avg() else: fps = 0 fps_str = 'Processing FPS: %.2f | Video Playback FPS: %.2f | Frames in Buffer: %d' % (fps, video_fps, frame_buffer.qsize()) if not args.display_fps: print('\r' + fps_str + ' ', end='') except KeyboardInterrupt: print('\nStopping...') cleanup_and_exit() def evaluate(net:Yolact, dataset, train_mode=False): net.detect.use_fast_nms = args.fast_nms net.detect.use_cross_class_nms = args.cross_class_nms cfg.mask_proto_debug = args.mask_proto_debug # TODO Currently we do not support Fast Mask Re-scroing in evalimage, evalimages, and evalvideo if args.image is not None: if ':' in args.image: inp, out = args.image.split(':') evalimage(net, inp, out) else: evalimage(net, args.image) return elif args.images is not None: inp, out = args.images.split('E:/yolact-master/coco/images/train2017: E:/yolact-master/results/output') evalimages(net, inp, out) return elif args.video is not None: if ':' in args.video: inp, out = args.video.split(':') evalvideo(net, inp, out) else: evalvideo(net, args.video) return frame_times = MovingAverage() dataset_size = len(dataset) if args.max_images < 0 else min(args.max_images, len(dataset)) progress_bar = ProgressBar(30, dataset_size) print() if not args.display and not args.benchmark: # For each class and iou, stores tuples (score, isPositive) # Index ap_data[type][iouIdx][classIdx] ap_data = { 'box' : [[APDataObject() for _ in cfg.dataset.class_names] for _ in iou_thresholds], 'mask': [[APDataObject() for _ in cfg.dataset.class_names] for _ in iou_thresholds] } detections = Detections() else: timer.disable('Load Data') dataset_indices = list(range(len(dataset))) if args.shuffle: random.shuffle(dataset_indices) elif not args.no_sort: # Do a deterministic shuffle based on the image ids # # I do this because on python 3.5 dictionary key order is *random*, while in 3.6 it's # the order of insertion. That means on python 3.6, the images come in the order they are in # in the annotations file. For some reason, the first images in the annotations file are # the hardest. To combat this, I use a hard-coded hash function based on the image ids # to shuffle the indices we use. That way, no matter what python version or how pycocotools # handles the data, we get the same result every time. hashed = [badhash(x) for x in dataset.ids] dataset_indices.sort(key=lambda x: hashed[x]) dataset_indices = dataset_indices[:dataset_size] try: # Main eval loop for it, image_idx in enumerate(dataset_indices): timer.reset() with timer.env('Load Data'): img, gt, gt_masks, h, w, num_crowd = dataset.pull_item(image_idx) # Test flag, do not upvote if cfg.mask_proto_debug: with open('scripts/info.txt', 'w') as f: f.write(str(dataset.ids[image_idx])) np.save('scripts/gt.npy', gt_masks) batch = Variable(img.unsqueeze(0)) if args.cuda: batch = batch.cuda() with timer.env('Network Extra'): preds = net(batch) # Perform the meat of the operation here depending on our mode. if args.display: img_numpy = prep_display(preds, img, h, w) elif args.benchmark: prep_benchmark(preds, h, w) else: prep_metrics(ap_data, preds, img, gt, gt_masks, h, w, num_crowd, dataset.ids[image_idx], detections) # First couple of images take longer because we're constructing the graph. # Since that's technically initialization, don't include those in the FPS calculations. if it > 1: frame_times.add(timer.total_time()) if args.display: if it > 1: print('Avg FPS: %.4f' % (1 / frame_times.get_avg())) plt.imshow(img_numpy) plt.title(str(dataset.ids[image_idx])) plt.show() elif not args.no_bar: if it > 1: fps = 1 / frame_times.get_avg() else: fps = 0 progress = (it+1) / dataset_size * 100 progress_bar.set_val(it+1) print('\rProcessing Images %s %6d / %6d (%5.2f%%) %5.2f fps ' % (repr(progress_bar), it+1, dataset_size, progress, fps), end='') if not args.display and not args.benchmark: print() if args.output_coco_json: print('Dumping detections...') if args.output_web_json: detections.dump_web() else: detections.dump() else: if not train_mode: print('Saving data...') with open(args.ap_data_file, 'wb') as f: pickle.dump(ap_data, f) return calc_map(ap_data) elif args.benchmark: print() print() print('Stats for the last frame:') timer.print_stats() avg_seconds = frame_times.get_avg() print('Average: %5.2f fps, %5.2f ms' % (1 / frame_times.get_avg(), 1000*avg_seconds)) except KeyboardInterrupt: print('Stopping...') def calc_map(ap_data): print('Calculating mAP...') aps = [{'box': [], 'mask': []} for _ in iou_thresholds] for _class in range(len(cfg.dataset.class_names)): for iou_idx in range(len(iou_thresholds)): for iou_type in ('box', 'mask'): ap_obj = ap_data[iou_type][iou_idx][_class] if not ap_obj.is_empty(): aps[iou_idx][iou_type].append(ap_obj.get_ap()) all_maps = {'box': OrderedDict(), 'mask': OrderedDict()} # Looking back at it, this code is really hard to read :/ for iou_type in ('box', 'mask'): all_maps[iou_type]['all'] = 0 # Make this first in the ordereddict for i, threshold in enumerate(iou_thresholds): mAP = sum(aps[i][iou_type]) / len(aps[i][iou_type]) * 100 if len(aps[i][iou_type]) > 0 else 0 all_maps[iou_type][int(threshold*100)] = mAP all_maps[iou_type]['all'] = (sum(all_maps[iou_type].values()) / (len(all_maps[iou_type].values())-1)) print_maps(all_maps) # Put in a prettier format so we can serialize it to json during training all_maps = {k: {j: round(u, 2) for j, u in v.items()} for k, v in all_maps.items()} return all_maps def print_maps(all_maps): # Warning: hacky make_row = lambda vals: (' %5s |' * len(vals)) % tuple(vals) make_sep = lambda n: ('-------+' * n) print() print(make_row([''] + [('.%d ' % x if isinstance(x, int) else x + ' ') for x in all_maps['box'].keys()])) print(make_sep(len(all_maps['box']) + 1)) for iou_type in ('box', 'mask'): print(make_row([iou_type] + ['%.2f' % x if x < 100 else '%.1f' % x for x in all_maps[iou_type].values()])) print(make_sep(len(all_maps['box']) + 1)) print() if __name__ == '__main__': parse_args() if args.config is not None: set_cfg(args.config) if args.trained_model == 'interrupt': args.trained_model = SavePath.get_interrupt('weights/') elif args.trained_model == 'latest': args.trained_model = SavePath.get_latest('weights/', cfg.name) if args.config is None: model_path = SavePath.from_str(args.trained_model) # TODO: Bad practice? Probably want to do a name lookup instead. args.config = model_path.model_name + '_config' print('Config not specified. Parsed %s from the file name.\n' % args.config) set_cfg(args.config) if args.detect: cfg.eval_mask_branch = False if args.dataset is not None: set_dataset(args.dataset) with torch.no_grad(): if not os.path.exists('results'): os.makedirs('results') if args.cuda: cudnn.fastest = True torch.set_default_tensor_type('torch.cuda.FloatTensor') else: torch.set_default_tensor_type('torch.FloatTensor') if args.resume and not args.display: with open(args.ap_data_file, 'rb') as f: ap_data = pickle.load(f) calc_map(ap_data) exit() if args.image is None and args.video is None and args.images is None: dataset = COCODetection(cfg.dataset.valid_images, cfg.dataset.valid_info, transform=BaseTransform(), has_gt=cfg.dataset.has_gt) prep_coco_cats() else: dataset = None print('Loading model...', end='') net = Yolact() net.load_weights(args.trained_model) net.eval() print(' Done.') if args.cuda: net = net.cuda() evaluate(net, dataset) Traceback (most recent call last): File "eval.py", line 1105, in <module> evaluate(net, dataset) File "eval.py", line 884, in evaluate inp, out = args.images.split('E:/yolact-master/coco/images/train2017: E:/yolact-master/results/output') ValueError: not enough values to unpack (expected 2, got 1)

PowerShell 7 环境已加载 (版本: 7.5.2) PS C:\Users\Administrator\Desktop> cd E:\AI_System PS E:\AI_System> venv\Scripts\activate (venv) PS E:\AI_System> pip install transformers diskcache python-dotenv prettytable huggingface_hub Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple Requirement already satisfied: transformers in e:\ai_system\venv\lib\site-packages (4.37.0) Requirement already satisfied: diskcache in e:\ai_system\venv\lib\site-packages (5.6.3) Requirement already satisfied: python-dotenv in e:\ai_system\venv\lib\site-packages (1.0.1) Requirement already satisfied: prettytable in e:\ai_system\venv\lib\site-packages (3.16.0) Requirement already satisfied: huggingface_hub in e:\ai_system\venv\lib\site-packages (0.21.3) Requirement already satisfied: tokenizers<0.19,>=0.14 in e:\ai_system\venv\lib\site-packages (from transformers) (0.15.2) Requirement already satisfied: tqdm>=4.27 in e:\ai_system\venv\lib\site-packages (from transformers) (4.67.1) Requirement already satisfied: requests in e:\ai_system\venv\lib\site-packages (from transformers) (2.31.0) Requirement already satisfied: numpy>=1.17 in e:\ai_system\venv\lib\site-packages (from transformers) (1.26.3) Requirement already satisfied: safetensors>=0.3.1 in e:\ai_system\venv\lib\site-packages (from transformers) (0.4.2) Requirement already satisfied: filelock in e:\ai_system\venv\lib\site-packages (from transformers) (3.19.1) Requirement already satisfied: regex!=2019.12.17 in e:\ai_system\venv\lib\site-packages (from transformers) (2025.7.34) Requirement already satisfied: pyyaml>=5.1 in e:\ai_system\venv\lib\site-packages (from transformers) (6.0.2) Requirement already satisfied: packaging>=20.0 in e:\ai_system\venv\lib\site-packages (from transformers) (25.0) Requirement already satisfied: wcwidth in e:\ai_system\venv\lib\site-packages (from prettytable) (0.2.13) Requirement already satisfied: fsspec>=2023.5.0 in e:\ai_system\venv\lib\site-packages (from huggingface_hub) (2025.7.0) Requirement already satisfied: typing-extensions>=3.7.4.3 in e:\ai_system\venv\lib\site-packages (from huggingface_hub) (4.14.1) Requirement already satisfied: colorama in e:\ai_system\venv\lib\site-packages (from tqdm>=4.27->transformers) (0.4.6) Requirement already satisfied: certifi>=2017.4.17 in e:\ai_system\venv\lib\site-packages (from requests->transformers) (2025.8.3) Requirement already satisfied: urllib3<3,>=1.21.1 in e:\ai_system\venv\lib\site-packages (from requests->transformers) (2.5.0) Requirement already satisfied: idna<4,>=2.5 in e:\ai_system\venv\lib\site-packages (from requests->transformers) (3.10) Requirement already satisfied: charset-normalizer<4,>=2 in e:\ai_system\venv\lib\site-packages (from requests->transformers) (3.4.3) [notice] A new release of pip available: 22.3.1 -> 25.2 [notice] To update, run: python.exe -m pip install --upgrade pip (venv) PS E:\AI_System> cd E:\AI_System (venv) PS E:\AI_System> venv\Scripts\activate # 激活虚拟环境 (venv) PS E:\AI_System> python main.py # 启动系统 2025-08-27 23:15:12,671 - CoreConfig - INFO - 📂 从 E:\AI_System\config\default.json 加载配置: { "LOG_DIR": "E:/AI_System/logs", "CONFIG_DIR": "E:/AI_System/config", "MODEL_CACHE_DIR": "E:/AI_System/model_cache", "AGENT_NAME": "\u5c0f\u84dd", "DEFAULT_USER": "\u7ba1\u7406\u5458", "MAX_WORKERS": 4, "AGENT_RESPONSE_TIMEOUT": 30.0, "MODEL_BASE_PATH": "E:/AI_Models", "MODEL_PATHS": { "TEXT_BASE": "E:/AI_Models/Qwen2-7B", "TEXT_CHAT": "E:/AI_Models/deepseek-7b-chat", "MULTIMODAL": "E:/AI_Models/deepseek-vl2", "IMAGE_GEN": "E:/AI_Models/sdxl", "YI_VL": "E:/AI_Models/yi-vl", "STABLE_DIFFUSION": "E:/AI_Models/stable-diffusion-xl-base-1.0" }, "NETWORK": { "HOST": "0.0.0.0", "FLASK_PORT": 8000, "GRADIO_PORT": 7860 }, "DATABASE": { "DB_HOST": "localhost", "DB_PORT": 5432, "DB_NAME": "ai_system", "DB_USER": "ai_user", "DB_PASSWORD": "secure_password_here" }, "SECURITY": { "SECRET_KEY": "generated-secret-key-here" }, "ENVIRONMENT": { "ENV": "dev", "LOG_LEVEL": "DEBUG", "USE_GPU": true }, "DIRECTORIES": { "DEFAULT_MODEL": "E:/AI_Models/Qwen2-7B", "WEB_UI_DIR": "E:/AI_System/web_ui", "AGENT_DIR": "E:/AI_System/agent", "PROJECT_ROOT": "E:/AI_System" } } 2025-08-27 23:15:12,672 - CoreConfig - INFO - 📂 从 E:\AI_System\config\local.json 加载配置: {} 2025-08-27 23:15:12,673 - CoreConfig - INFO - 🌐 从 E:\AI_System\.env 加载环境变量 2025-08-27 23:15:12,673 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_ROOT=E:\AI_System 2025-08-27 23:15:12,673 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DIRECTORIES__PROJECT_ROOT=E:\AI_System 2025-08-27 23:15:12,673 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DIRECTORIES__AGENT_DIR=E:\AI_System\agent 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DIRECTORIES__WEB_UI_DIR=E:\AI_System\web_ui 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DIRECTORIES__DEFAULT_MODEL=E:\AI_Models\Qwen2-7B 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_ENVIRONMENT__LOG_LEVEL=DEBUG 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DATABASE__DB_HOST=localhost 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DATABASE__DB_PORT=5432 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DATABASE__DB_NAME=ai_system 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DATABASE__DB_USER=ai_user 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_DATABASE__DB_PASSWORD=****** 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_SECURITY__SECRET_KEY=****** 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__TEXT_BASE=E:\AI_Models\Qwen2-7B 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__TEXT_CHAT=E:\AI_Models\deepseek-7b-chat 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__MULTIMODAL=E:\AI_Models\deepseek-vl2 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__IMAGE_GEN=E:\AI_Models\sdxl 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__YI_VL=E:\AI_Models\yi-vl 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_MODEL_PATHS__STABLE_DIFFUSION=E:\AI_Models\stable-diffusion-xl-base-1.0 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_NETWORK__HOST=0.0.0.0 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_NETWORK__FLASK_PORT=8000 2025-08-27 23:15:12,674 - CoreConfig - INFO - 🔄 环境变量覆盖: AI_SYSTEM_NETWORK__GRADIO_PORT=7860 2025-08-27 23:15:12,674 - CoreConfig - WARNING - ⚠️ 模型路径不存在: STABLE_DIFFUSION = E:/AI_Models/stable-diffusion-xl-base-1.0 2025-08-27 23:15:12,674 - CoreConfig - INFO - ✅ 配置系统初始化完成 2025-08-27 23:15:12,952 - Main - INFO - ================================================== 2025-08-27 23:15:12,952 - Main - INFO - 🚀 启动AI系统 2025-08-27 23:15:12,952 - Main - INFO - ================================================== 2025-08-27 23:15:12,952 - Main - INFO - 系统配置摘要: 2025-08-27 23:15:12,953 - Main - INFO - AGENT_NAME: 小蓝 2025-08-27 23:15:12,953 - Main - INFO - PROJECT_ROOT: E:\AI_System 2025-08-27 23:15:12,953 - Main - INFO - LOG_DIR: E:/AI_System/logs 2025-08-27 23:15:12,953 - Main - INFO - AGENT_DIR: E:/AI_System/agent 2025-08-27 23:15:12,953 - Main - INFO - WEB_UI_DIR: E:/AI_System/web_ui 2025-08-27 23:15:12,953 - Main - INFO - HOST: 0.0.0.0 2025-08-27 23:15:12,953 - Main - INFO - FLASK_PORT: 8000 2025-08-27 23:15:12,953 - Main - INFO - TEXT_BASE_MODEL: E:/AI_Models/Qwen2-7B 2025-08-27 23:15:12,953 - Main - INFO - USE_GPU: True 2025-08-27 23:15:12,953 - Main - INFO - LOG_LEVEL: DEBUG 2025-08-27 23:15:12,953 - ModelManager - INFO - 📦 初始化模型管理器 | 设备: cuda | 模型目录: E:/AI_Models | 缓存目录: E:/AI_System/model_cache 2025-08-27 23:15:12,953 - ModelManager - INFO - 📦 初始化模型管理器 | 设备: cuda | 模型目录: E:/AI_Models | 缓存目录: E:/AI_System/model_cache 2025-08-27 23:15:12,953 - ModelManager - INFO - 🔄 正在加载核心语言模型: E:\AI_Models\Qwen2-7B 2025-08-27 23:15:12,953 - ModelManager - INFO - 🔄 正在加载核心语言模型: E:\AI_Models\Qwen2-7B 2025-08-27 23:15:12,954 - ModelLoader - INFO - 初始化模型加载器 | 路径: E:\AI_Models\Qwen2-7B | 设备: cuda 2025-08-27 23:15:12,954 - ModelLoader - INFO - 初始化模型加载器 | 路径: E:\AI_Models\Qwen2-7B | 设备: cuda 2025-08-27 23:15:12,954 - ModelLoader - INFO - 🔍 加载模型: E:\AI_Models\Qwen2-7B 2025-08-27 23:15:12,954 - ModelLoader - INFO - 🔍 加载模型: E:\AI_Models\Qwen2-7B Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained. 2025-08-27 23:15:13,135 - ModelLoader - INFO - ✅ 加载tokenizer成功 2025-08-27 23:15:13,135 - ModelLoader - INFO - ✅ 加载tokenizer成功 Loading checkpoint shards: 100%|█████████████████████████████████████████████████████████| 4/4 [00:07<00:00, 1.84s/it] 2025-08-27 23:15:21,688 - root - WARNING - Some parameters are on the meta device device because they were offloaded to the cpu and disk. 2025-08-27 23:15:21,688 - ModelLoader - INFO - ✅ 加载模型成功 2025-08-27 23:15:21,688 - ModelLoader - INFO - ✅ 加载模型成功 2025-08-27 23:15:21,689 - ModelManager - INFO - ✅ 使用ModelLoader加载认知模型成功: E:/AI_Models/Qwen2-7B 2025-08-27 23:15:21,689 - ModelManager - INFO - ✅ 使用ModelLoader加载认知模型成功: E:/AI_Models/Qwen2-7B 2025-08-27 23:15:21,689 - Main - INFO - ✅ 模型管理器初始化完成 | 设备: cuda | 默认模型: E:\AI_Models\Qwen2-7B 2025-08-27 23:15:21,689 - 核心认知系统 - INFO - ✅ 初始化认知模块: 核心认知系统 2025-08-27 23:15:21,689 - 核心认知系统 - INFO - ✅ 初始化认知模块: 核心认知系统 2025-08-27 23:15:21,689 - 核心认知系统 - INFO - ✅ 认知系统初始化完成 2025-08-27 23:15:21,689 - 核心认知系统 - INFO - ✅ 认知系统初始化完成 2025-08-27 23:15:21,689 - Main - INFO - ✅ 认知系统初始化完成 2025-08-27 23:15:21,689 - Main - ERROR - ❌ 环境接口初始化失败: 'EnvironmentInterface' object has no attribute 'config' (venv) PS E:\AI_System>

E:\AI_System\web_ui>python server.py Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 16, in <module> from flask_socketio import SocketIO, emit ModuleNotFoundError: No module named 'flask_socketio' # E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading import platform import psutil import importlib import datetime from pathlib import Path from flask import Flask, jsonify, request, render_template, send_from_directory from logging.handlers import TimedRotatingFileHandler from flask_socketio import SocketIO, emit from core.config import system_config as config # 导入必要的模块(假设这些模块存在) try: from environment.hardware_manager import create_hardware_manager from cognitive_arch.life_scheduler import LifeScheduler from agent.autonomous_agent import AutonomousAgent from environment.environment_manager import EnvironmentManager from environment.environment_state import EnvironmentState from environment.spatial_simulator import SpatialSimulator from environment.db_manager import DatabaseManager except ImportError as e: logging.warning(f"部分模块导入失败: {str(e)} - 使用模拟实现") # ========== 全局协调器 ========== coordinator = None def register_coordinator(coord): """注册意识系统协调器""" global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): """更新UI事件处理""" if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 初始化日志系统 ========== def setup_logger(): """配置全局日志系统""" # 确保日志目录存在 config.LOG_DIR.mkdir(parents=True, exist_ok=True) # 创建主日志记录器 logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 安全日志处理装饰器 def safe_logger(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except UnicodeEncodeError: new_args = [] for arg in args: if isinstance(arg, str): new_args.append(arg.encode('ascii', 'ignore').decode('ascii')) else: new_args.append(arg) return func(*new_args, **kwargs) return wrapper # 应用安全日志处理 for level in ['debug', 'info', 'warning', 'error', 'critical']: setattr(logger, level, safe_logger(getattr(logger, level))) return logger # 初始化日志 logger = setup_logger() # ========== 系统初始化 ========== class SystemInitializer: """负责初始化系统核心组件""" def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None def initialize_system_paths(self): """初始化系统路径""" sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path}") def initialize_environment_manager(self): """初始化环境管理器""" try: env_config = { 'update_interval': 1.0, 'spatial': {'grid_size': 1.0} } self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): """初始化AI核心系统""" class AICore: def __init__(self, base_dir): self.base_dir = Path(base_dir) self.genetic_code = self.load_genetic_code() self.physical_state = { "health": 100, "energy": 100, "mood": "calm" } self.dependencies = self.scan_dependencies() def load_genetic_code(self): code_path = self.base_dir / "core" / "genetic_code.py" try: with open(code_path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return "# 默认遗传代码\nclass AICore:\n pass" def scan_dependencies(self): return { "nervous_system": "flask", "memory": "sqlite", "perception": "opencv", "reasoning": "transformers" } def mutate(self, new_code): try: code_path = self.base_dir / "core" / "genetic_code.py" with open(code_path, "w", encoding="utf-8") as f: f.write(new_code) importlib.reload(sys.modules['core.genetic_code']) self.genetic_code = new_code return True, "核心代码更新成功,系统已进化" except Exception as e: return False, f"进化失败: {str(e)}" def wear_dependency(self, dependency_name, version): self.dependencies[dependency_name] = version return f"已穿戴 {dependency_name}@{version}" def get_state(self): return { "genetic_code_hash": hash(self.genetic_code), "dependencies": self.dependencies, "physical_state": self.physical_state, "hardware_environment": self.get_hardware_info() } def get_hardware_info(self): return { "cpu": platform.processor(), "gpu": "NVIDIA RTX 3090" if psutil.virtual_memory().total > 16 * 1024 ** 3 else "Integrated", "memory_gb": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage_gb": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } self.ai_core = AICore(self.base_dir) logger.info("✅ AI核心系统初始化完成") return self.ai_core def initialize_hardware_manager(self): """初始化硬件管理器""" try: config_path = config.CONFIG_DIR / "hardware_config.json" self.hardware_manager = create_hardware_manager(config_path) logger.info("✅ 硬件管理器初始化成功") return self.hardware_manager except Exception as e: logger.error(f"❌ 硬件管理器初始化失败: {str(e)}") logger.warning("⚠️ 使用内置简单硬件管理器") class SimpleHardwareManager: def __init__(self): self.available_hardware = { "cpu": ["Intel i9-13900K", "AMD Ryzen 9 7950X", "Apple M2 Max"], "gpu": ["NVIDIA RTX 4090", "AMD Radeon RX 7900 XTX", "Apple M2 GPU"], "memory": [16, 32, 64, 128], "storage": ["1TB SSD", "2TB SSD", "4TB SSD", "8TB HDD"], "peripherals": ["4K Camera", "3D Scanner", "High-Fidelity Microphone"] } self.current_setup = { "cpu": platform.processor(), "gpu": "Integrated Graphics", "memory": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } def request_hardware(self, hardware_type, specification): if hardware_type not in self.available_hardware: return False, f"不支持硬件类型: {hardware_type}" if specification not in self.available_hardware[hardware_type]: return False, f"不支持的规格: {specification}" self.current_setup[hardware_type] = specification return True, f"已请求 {hardware_type}: {specification}。请管理员完成安装。" def get_current_setup(self): return self.current_setup def get_performance_metrics(self): return { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "cpu_temp": 45.0, "gpu_temp": 55.0, "network_io": { "sent": psutil.net_io_counters().bytes_sent, "received": psutil.net_io_counters().bytes_recv }, "last_updated": datetime.datetime.now().isoformat() } self.hardware_manager = SimpleHardwareManager() return self.hardware_manager def initialize_life_scheduler(self): """初始化生活调度器""" try: config.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.life_scheduler = LifeScheduler() logger.info("✅ 生活调度器初始化成功") life_thread = threading.Thread( target=self._update_life_status, daemon=True, name="LifeSystemThread" ) life_thread.start() logger.info("✅ 生活系统后台线程已启动") return self.life_scheduler except Exception as e: logger.error(f"❌ 生活调度器初始化失败: {str(e)}") return None def _update_life_status(self): logger.info("🚦 生活系统后台线程启动") while True: try: now = datetime.datetime.now() current_hour = now.hour if 8 <= current_hour < 9: self.life_scheduler.wake_up() elif 12 <= current_hour < 13: self.life_scheduler.have_meal("lunch") elif 19 <= current_hour < 20: self.life_scheduler.have_meal("dinner") elif 23 <= current_hour or current_hour < 6: self.life_scheduler.go_to_sleep() self.life_scheduler.log_activity("系统状态更新") time.sleep(60) except Exception as e: logger.error(f"生活系统更新失败: {str(e)}", exc_info=True) time.sleep(300) def initialize_ai_agent(self): """初始化AI智能体""" try: config.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.ai_agent = AutonomousAgent( model_path=config.AGENT_PATH, cache_dir=config.MODEL_CACHE_DIR, use_gpu=config.USE_GPU, default_model=config.DEFAULT_MODEL ) logger.info("✅ AI智能体初始化成功") return self.ai_agent except Exception as e: logger.error(f"❌ AI智能体初始化失败: {str(e)}") return None def start_evolution_monitor(self): def monitor(): while True: try: cpu_usage = psutil.cpu_percent() mem_usage = psutil.virtual_memory().percent if cpu_usage > 80: self.ai_core.physical_state["energy"] = max(20, self.ai_core.physical_state["energy"] - 5) self.ai_core.physical_state["mood"] = "strained" elif cpu_usage < 30: self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state["energy"] + 2) if self.hardware_manager: current_hw = self.ai_core.get_hardware_info() requested_hw = self.hardware_manager.get_current_setup() for hw_type, spec in requested_hw.items(): if current_hw.get(hw_type) != spec: if hw_type == "cpu": self.ai_core.physical_state["health"] = min(100, self.ai_core.physical_state[ "health"] + 10) elif hw_type == "gpu": self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state[ "energy"] + 20) time.sleep(60) except Exception as e: logging.error(f"进化监控错误: {str(e)}") time.sleep(300) monitor_thread = threading.Thread(target=monitor, daemon=True) monitor_thread.start() logger.info("✅ 进化监控线程已启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder='templates', static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 初始化SocketIO app.config['SOCKETIO'] = SocketIO(app, cors_allowed_origins="*", async_mode='threading') # 注册路由 register_routes(app) register_error_handlers(app) return app # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state.to_dict()) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 环境状态广播 ========== def setup_environment_broadcast(app): socketio = app.config['SOCKETIO'] @socketio.on('connect', namespace='/environment') def handle_environment_connect(): app.logger.info('客户端已连接环境WebSocket') @socketio.on('disconnect', namespace='/environment') def handle_environment_disconnect(): app.logger.info('客户端已断开环境WebSocket') def broadcast_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return while True: try: state = env_manager.get_state() socketio.emit('environment_update', state.to_dict(), namespace='/environment') time.sleep(1) except Exception as e: app.logger.error(f"环境状态广播失败: {str(e)}") time.sleep(5) broadcast_thread = threading.Thread( target=broadcast_environment_state, daemon=True, name="EnvironmentBroadcastThread" ) broadcast_thread.start() app.logger.info("✅ 环境状态广播线程已启动") # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) setup_environment_broadcast(app) @app.route('/') def index(): return render_template('agent_interface.html') @app.route('/static/') def static_files(filename): try: return send_from_directory(app.static_folder, filename) except Exception as e: app.logger.error(f"静态文件服务失败: {filename} - {str(e)}") return jsonify({"error": "文件未找到"}), 404 @app.route('/status') def status(): try: components = app.config['SYSTEM_COMPONENTS'] status_data = { "server": { "status": "running", "uptime": time.time() - app.config['START_TIME'], "version": "1.0.0", "config": { "host": config.HOST, "port": config.PORT, "log_level": config.LOG_LEVEL, "default_model": config.DEFAULT_MODEL } }, "core": components['ai_core'].get_state(), "hardware": components['hardware_manager'].get_current_setup() } if components['environment_manager']: try: status_data["environment"] = components['environment_manager'].get_state().to_dict() except Exception as e: status_data["environment"] = {"error": str(e)} if components['life_scheduler']: try: status_data["life_system"] = components['life_scheduler'].get_current_state() except Exception as e: status_data["life_system"] = {"error": str(e)} if components['ai_agent']: try: status_data["agent"] = components['ai_agent'].get_status() except Exception as e: status_data["agent"] = {"error": str(e)} return jsonify(status_data) except Exception as e: app.logger.error(f"获取状态失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # 核心系统路由 @app.route('/api/core/state') def get_core_state(): return jsonify(app.config['SYSTEM_COMPONENTS']['ai_core'].get_state()) @app.route('/api/core/mutate', methods=['POST']) def mutate_core(): data = request.get_json() new_code = data.get('genetic_code') if not new_code: return jsonify({"success": False, "error": "缺少遗传代码"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['ai_core'].mutate(new_code) return jsonify({"success": success, "message": message}) @app.route('/api/core/wear', methods=['POST']) def wear_dependency(): data = request.get_json() dep_name = data.get('dependency') version = data.get('version', 'latest') if not dep_name: return jsonify({"success": False, "error": "缺少依赖名称"}), 400 result = app.config['SYSTEM_COMPONENTS']['ai_core'].wear_dependency(dep_name, version) return jsonify({"success": True, "message": result}) # 硬件管理路由 @app.route('/api/hardware/catalog') def get_hardware_catalog(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].available_hardware) @app.route('/api/hardware/request', methods=['POST']) def request_hardware(): data = request.get_json() hw_type = data.get('type') spec = data.get('specification') if not hw_type or not spec: return jsonify({"success": False, "error": "缺少硬件类型或规格"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['hardware_manager'].request_hardware(hw_type, spec) return jsonify({"success": success, "message": message}) @app.route('/api/hardware/current') def get_current_hardware(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].get_current_setup()) # 生活系统路由 @app.route('/life') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') def get_life_status(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: current_state = components['life_scheduler'].get_current_state() recent_activities = components['life_scheduler'].get_recent_activities(10) return jsonify({ "success": True, "current_activity": current_state.get("current_activity", "未知"), "next_scheduled": current_state.get("next_scheduled", "未知"), "schedule": components['life_scheduler'].daily_schedule, "recent_activities": recent_activities, "energy_level": current_state.get("energy_level", 100), "mood": current_state.get("mood", "平静") }) except Exception as e: app.logger.error(f"获取生活状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/adjust_schedule', methods=['POST']) def adjust_schedule(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: data = request.json adjustments = data.get("adjustments", {}) valid_activities = ["wake_up", "breakfast", "lunch", "dinner", "sleep"] for activity, new_time in adjustments.items(): if activity not in valid_activities: return jsonify({"success": False, "error": f"无效的活动类型: {activity}"}), 400 if not isinstance(new_time, str) or len(new_time) != 5 or new_time[2] != ':': return jsonify({"success": False, "error": f"无效的时间格式: {new_time}"}), 400 components['life_scheduler'].adjust_schedule(adjustments) return jsonify({ "success": True, "message": "计划表已更新", "new_schedule": components['life_scheduler'].daily_schedule }) except Exception as e: app.logger.error(f"调整作息时间失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 400 # 聊天路由 @app.route('/chat', methods=['POST']) def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") response = components['ai_agent'].process_input(user_input, user_id) return jsonify({"response": response}) except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 家具管理路由 furniture_cache = {} CACHE_DURATION = 3600 # 1小时 @app.route('/api/furniture') def get_furniture(): try: room = request.args.get('room', 'workshop') app.logger.info(f"获取家具数据: 房间={room}") current_time = time.time() if room in furniture_cache and current_time - furniture_cache[room]['timestamp'] < CACHE_DURATION: return jsonify(furniture_cache[room]['data']) furniture_data = { "workshop": [ {"type": "desk", "position": {"x": 0, "y": -1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "chair", "position": {"x": 0, "y": -1.5, "z": -1}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "bookshelf", "position": {"x": 3, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "computer", "position": {"x": 0.5, "y": -0.5, "z": -3.2}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "living_room": [ {"type": "sofa", "position": {"x": 0, "y": 0, "z": -2}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "tv", "position": {"x": 0, "y": 1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "bedroom": [ {"type": "bed", "position": {"x": 0, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "nightstand", "position": {"x": 1.5, "y": 0, "z": -2.5}, "rotation": {"x": 0, "y": 0, "z": 0}} ] } furniture_cache[room] = { 'timestamp': current_time, 'data': furniture_data.get(room, []) } return jsonify(furniture_cache[room]['data']) except Exception as e: app.logger.error(f"获取家具数据失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # ========== 错误处理器 ========== def register_error_handlers(app): @app.errorhandler(404) def page_not_found(error): app.logger.warning(f"404错误: {request.path}") return jsonify({ "error": "资源未找到", "path": request.path, "method": request.method }), 404 @app.errorhandler(500) def internal_server_error(error): app.logger.error(f"500错误: {str(error)}") return jsonify({ "error": "服务器内部错误", "message": "系统遇到意外错误,请稍后重试" }), 500 @app.errorhandler(Exception) def handle_general_exception(error): app.logger.error(f"未处理异常: {traceback.format_exc()}") return jsonify({ "error": "未处理的异常", "type": type(error).__name__, "message": str(error) }), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 处理消息逻辑 response = f"已收到您的消息: {message}" # 如果有协调器,使用协调器处理 global coordinator if coordinator: try: response = coordinator.process_message(message) except Exception as e: logger.error(f"协调器处理消息失败: {str(e)}") socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) # ========== 主程序入口 ========== if __name__ == '__main__': try: app = create_app() socketio = app.config['SOCKETIO'] # 设置WebSocket处理器 setup_websocket_handlers(socketio) # 启动服务器 socketio.run( app, host=config.HOST, port=config.PORT, debug=config.DEBUG, use_reloader=False ) logger.info(f"服务器运行在 http://{config.HOST}:{config.PORT}") except KeyboardInterrupt: logger.info("服务器关闭") except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc())

我们将在飞牛NAS上部署一个Docker容器,该容器运行一个WebSocket服务器,要求:1.支持密钥验证2.接收数据并存储到SQLite数据库3.提供API接口用于查询实时数据、历史数据4.支持根据指定Tag生成历史曲线图##解决方案###1.项目结构创建项目目录ws-server,结构如下:ws-server/├──Dockerfile├──requirements.txt├──app/│├──main.py#主程序,包含WebSocket和API服务│├──auth.py#密钥验证│├──database.py#数据库操作│└──plotter.py#绘图功能###2.代码实现####2.1密钥验证(auth.py)pythonfromfastapiimportHTTPException,HeaderSECRET_KEY="your_secure_key_123"#实际部署时使用环境变量asyncdefverify_token(authorization:str=Header(None)):ifauthorizationisNoneorauthorization!=f"Bearer{SECRET_KEY}":raiseHTTPException(status_code=403,detail="Invalidauthenticationtoken")####2.2数据库操作(database.py)pythonimportsqlite3importosDB_PATH=os.getenv('DB_PATH','sensor_data.db')definit_db():conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute('''CREATETABLEIFNOTEXISTSsensor_data(idINTEGERPRIMARYKEYAUTOINCREMENT,timestampDATETIMEDEFAULTCURRENT_TIMESTAMP,tag_idINTEGER,tag_nameTEXT,valueREAL,data_timeTEXT)''')conn.commit()conn.close()definsert_data(tag_id:int,tag_name:str,value:float,data_time:str):conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute("INSERTINTOsensor_data(tag_id,tag_name,value,data_time)VALUES(?,?,?,?)",(tag_id,tag_name,value,data_time))conn.commit()conn.close()defget_latest_data(tag_id:int,limit:int=1):conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute("SELECT*FROMsensor_dataWHEREtag_id=?ORDERBYtimestampDESCLIMIT?",(tag_id,limit))result=c.fetchall()conn.close()returnresultdefget_historical_data(tag_id:int,hours:int=24):conn=sqlite3.connect(DB_PATH)c=conn.cursor()#获取最近hours小时的数据c.execute("SELECT*FROMsensor_dataWHEREtag_id=?ANDtimestamp>=datetime('now',?)ORDERBYtimestamp",(tag_id,f'-{hours}hours'))result=c.fetchall()conn.close()returnresult####2.3绘图功能(plotter.py)pythonimportmatplotlib.pyplotaspltimportnumpyasnpfromdatetimeimportdatetimeimportosfrom.databaseimportget_historical_dataPLOT_DIR='plots'defgenerate_plot(tag_id:int,hours:int=24):#确保绘图目录存在ifnotos.path.exists(PLOT_DIR):os.makedirs(PLOT_DIR)#获取数据data=get_historical_data(tag_id,hours)ifnotdata:returnNonetimestamps=[row[1]forrowindata]#时间戳values=[row[4]forrowindata]#值#将时间字符串转换为datetime对象timestamps=[datetime.strptime(ts,'%Y-%m-%d%H:%M:%S')fortsintimestamps]plt.figure(figsize=(10,6))plt.plot(timestamps,values,'b-')plt.title(f'TagID:{tag_id}-Last{hours}Hours')plt.xlabel('Time')plt.ylabel('Value')plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()#保存图像plot_path=os.path.join(PLOT_DIR,f'tag_{tag_id}_{hours}h.png')plt.savefig(plot_path)plt.close()returnplot_path####2.4主程序(main.py)pythonimportasyncioimportwebsocketsimportjsonimportosfromfastapiimportFastAPI,APIRouter,Depends,HTTPExceptionfromfastapi.responsesimportFileResponsefrom.importauth,database,plotterapp=FastAPI()api_router=APIRouter()#初始化数据库database.init_db()#WebSocket服务器asyncdefsensor_server(websocket,path):try:#第一步:接收密钥进行验证token=awaitwebsocket.recv()iftoken!=auth.SECRET_KEY:awaitwebsocket.close(code=1008,reason="Authenticationfailed")return#第二步:接收数据asyncformessageinwebsocket:try:data=json.loads(message)#数据格式:{"TagId":1,"TagName":"1#注浆A液压力","Value":0.76,"DataTime":"2025-6-182:28"}database.insert_data(tag_id=data['TagId'],tag_name=data['TagName'],value=data['Value'],data_time=data['DataTime'])print(f"Inserteddatafortag{data['TagId']}")exceptExceptionase:print(f"Errorprocessingmessage:{e}")exceptwebsockets.exceptions.ConnectionClosedase:print(f"Clientdisconnected:{e}")#API路由@api_router.get("/data/latest/{tag_id}")asyncdefget_latest_data(tag_id:int,_:str=Depends(auth.verify_token)):data=database.get_latest_data(tag_id)ifnotdata:raiseHTTPException(status_code=404,detail="Datanotfound")returndata[0]#返回最新一条@api_router.get("/data/history/{tag_id}")asyncdefget_history_data(tag_id:int,hours:int=24,_:str=Depends(auth.verify_token)):data=database.get_historical_data(tag_id,hours)ifnotdata:raiseHTTPException(status_code=404,detail="Datanotfound")returndata@api_router.get("/plot/{tag_id}")asyncdefget_plot(tag_id:int,hours:int=24,_:str=Depends(auth.verify_token)):plot_path=plotter.generate_plot(tag_id,hours)ifplot_pathisNone:raiseHTTPException(status_code=404,detail="Nodataavailableforplotting")returnFileResponse(plot_path)app.include_router(api_router,prefix="/api")#启动WebSocket服务器和FastAPI服务asyncdefstart_servers():#启动WebSocket服务器ws_server=awaitwebsockets.serve(sensor_server,"0.0.0.0",8765)print("WebSocketserverstartedonport8765")#注意:FastAPI服务需要单独启动,这里我们使用uvicorn,在Dockerfile中启动if__name__=="__main__":asyncio.run(start_servers())###3.Docker配置####3.1DockerfiledockerfileFROMpython:3.9-slimWORKDIR/app#安装依赖RUNapt-getupdate&&apt-getinstall-ylibsqlite3-devCOPYrequirements.txt.RUNpipinstall--no-cache-dir-rrequirements.txtCOPY..#设置环境变量(密钥通过环境变量传入)ENVSECRET_KEY=your_actual_key_here#启动命令:同时启动WebSocket服务器和FastAPI服务CMD["sh","-c","uvicornapp.main:app--host0.0.0.0--port8000&python-mapp.main"]####3.2requirements.txtfastapiuvicornwebsocketssqlalchemymatplotlibpython-multipart###4.在飞牛NAS上部署1.**构建镜像**:将整个ws-server目录上传到飞牛NAS,然后进入该目录执行:bashdockerbuild-tws-sensor-server.2.**运行容器**:bashdockerrun-d--namesensor-server-p8765:8765#WebSocket端口-p8000:8000#API端口-v/path/on/nas/data:/app/data#挂载数据库和绘图目录-eSECRET_KEY=“your_actual_key_here”\ws-sensor-server3.**飞牛NAS图形界面操作**:-打开飞牛NAS的Docker管理界面-选择“镜像”并导入构建好的ws-sensor-server镜像-创建容器,配置端口映射:8765和8000-设置环境变量:SECRET_KEY=your_actual_key_here-挂载数据卷:将NAS上的目录挂载到容器内的/app/data-启动容器###5.功能测试####5.1WebSocket数据发送pythonimportasyncioimportwebsocketsimportjsonasyncdefsend_data():asyncwithwebsockets.connect(‘ws://your_nas_ip:8765’)asws:#先发送密钥awaitws.send(“your_actual_key_here”)#发送数据data={“TagId”:1,“TagName”:“1#注浆A液压力”,“Value”:0.76,“DataTime”:“2025-06-1802:28:00”}awaitws.send(json.dumps(data))print(“Datasent”)asyncio.run(send_data())####5.2API接口调用-获取最新数据:GEThttp://your_nas_ip:8000/api/data/latest/1需在Header中添加:Authorization:Beareryour_actual_key_here-获取历史数据:GEThttp://your_nas_ip:8000/api/data/history/1?hours=24-获取曲线图:GEThttp://your_nas_ip:8000/api/plot/1?hours=72###6.注意事项1.密钥管理:在实际部署中,应通过环境变量设置密钥,避免硬编码。2.数据持久化:通过挂载卷将数据库文件和绘图目录保存在NAS上,避免容器重启数据丢失。3.性能优化:对于高频数据,考虑使用批量插入和数据库索引优化。4.安全性:建议在生产环境中使用HTTPS加密API和WebSocket通信(可通过Nginx反向代理实现)。§§相关问题§§1.如何为SQLite数据库添加索引以提高查询性能?2.如何在Docker容器中安全地管理密钥?3.如何通过Nginx为WebSocket和API服务添加HTTPS加密?4.如何扩展此系统以支持多客户端并发连接?5.如何实现数据存储的分区或分表以优化大量历史数据的查询?

“# E:\AI_System\main.py import os import sys import logging import time import threading from pathlib import Path from core.config import config from core.command_listener import start_command_listener from agent.model_manager import ModelManager from agent.cognitive_architecture import CognitiveSystem from agent.concrete_cognitive_system import AdvancedCognitiveSystem # 使用具体实现 from agent.environment_interface import EnvironmentInterface def setup_logging(): """配置日志系统""" log_dir = config.get("LOG_DIR", "logs") os.makedirs(log_dir, exist_ok=True) log_file = Path(log_dir) / f"system_{time.strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger('Main') def main(): """主函数""" logger = setup_logging() logger.info("=" * 50) logger.info("🚀 启动AI系统 - 增强模式控制版") logger.info("=" * 50) # 打印系统配置摘要 logger.info("系统配置摘要:") # [保持原有配置打印不变] # 初始化模型管理器 try: # [保持原有模型初始化不变] model_manager = ModelManager(...) logger.info(f"✅ 模型管理器初始化完成") except Exception as e: logger.error(f"❌ 模型管理器初始化失败: {str(e)}") return # 初始化高级认知系统 try: cognitive_system = AdvancedCognitiveSystem( name="增强型认知系统", model_manager=model_manager ) logger.info("✅ 高级认知系统初始化完成") except Exception as e: logger.error(f"❌ 认知系统初始化失败: {str(e)}") return # 启动命令监听器 start_command_listener(cognitive_system) logger.info("✅ 命令监听器已启动") print("\n" + "=" * 50) print("🌟 系统准备就绪! 输入命令控制模式") print("=" * 50) print("📟 输入 'help' 查看命令列表\n") # 初始化环境接口 try: environment = EnvironmentInterface() logger.info("✅ 环境接口初始化完成") except Exception as e: logger.error(f"❌ 环境接口初始化失败: {str(e)}") return # 主运行循环 try: logger.info("🏁 进入主运行循环") while True: # 获取环境输入 stimulus = environment.get_input() # 根据当前模式处理 if cognitive_system.get_current_mode() == "SELF_REFLECTION": # 执行深度反思 reflection_result = cognitive_system.execute_reflection() if reflection_result: environment.output(f"反思完成: {reflection_result['reflection_id']}") elif cognitive_system.get_current_mode() == "TASK_EXECUTION": # 处理用户任务 response = cognitive_system.execute_task(stimulus) if response: environment.output(response) elif cognitive_system.get_current_mode() == "LEARNING": # 处理学习任务 learning_result = cognitive_system.execute_learning(stimulus) if learning_result: environment.output(f"学习完成: {learning_result[:100]}...") time.sleep(0.1) # 防止CPU过载 except KeyboardInterrupt: logger.info("🛑 用户中断,关闭系统") except Exception as e: logger.error(f"❌ 系统运行时错误: {str(e)}") finally: # 清理资源 model_manager.unload_model() logger.info("🛑 系统已关闭") if __name__ == "__main__": main() ”“# E:\AI_System\.env # ======================== # AI 系统环境变量配置 # ======================== # 环境类型 (dev, test, prod) ENV=dev # 目录配置 (使用双下划线表示层级) AI_SYSTEM_DIRECTORIES__PROJECT_ROOT=E:\AI_System AI_SYSTEM_DIRECTORIES__AGENT_DIR=E:\AI_System\agent AI_SYSTEM_DIRECTORIES__WEB_UI_DIR=E:\AI_System\web_ui AI_SYSTEM_DIRECTORIES__DEFAULT_MODEL=E:\AI_Models\Qwen2-7B # 日志配置 AI_SYSTEM_ENVIRONMENT__LOG_LEVEL=DEBUG # 数据库配置 AI_SYSTEM_DATABASE__DB_HOST=localhost AI_SYSTEM_DATABASE__DB_PORT=5432 AI_SYSTEM_DATABASE__DB_NAME=ai_system AI_SYSTEM_DATABASE__DB_USER=ai_user AI_SYSTEM_DATABASE__DB_PASSWORD=your_secure_password_here # 安全配置 AI_SYSTEM_SECURITY__SECRET_KEY=your_generated_secret_key_here # 模型配置 AI_SYSTEM_MODEL_PATHS__TEXT_BASE=E:\AI_Models\Qwen2-7B AI_SYSTEM_MODEL_PATHS__TEXT_CHAT=E:\AI_Models\deepseek-7b-chat AI_SYSTEM_MODEL_PATHS__MULTIMODAL=E:\AI_Models\deepseek-vl2 AI_SYSTEM_MODEL_PATHS__IMAGE_GEN=E:\AI_Models\sdxl AI_SYSTEM_MODEL_PATHS__YI_VL=E:\AI_Models\yi-vl AI_SYSTEM_MODEL_PATHS__STABLE_DIFFUSION=E:\AI_Models\stable-diffusion-xl-base-1.0 # 网络配置 AI_SYSTEM_NETWORK__HOST=0.0.0.0 AI_SYSTEM_NETWORK__FLASK_PORT=8000 AI_SYSTEM_NETWORK__GRADIO_PORT=7860 ”“# E:\AI_System\agent\base_module.py import logging import abc class CognitiveModule(abc.ABC): """认知模块基类 - 所有认知模块的抽象基类""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(self.name) self._init_logger() self.logger.info(f"✅ 初始化认知模块: {self.name}") def _init_logger(self): """初始化日志记录器""" if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) @abc.abstractmethod def process_stimulus(self, stimulus: dict): """处理刺激 - 抽象方法""" pass @abc.abstractmethod def generate_response(self): """生成响应 - 抽象方法""" pass class EnvironmentModule: """环境模块基类 - 所有环境接口的抽象基类""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(self.name) self._init_logger() self.logger.info(f"✅ 初始化环境模块: {self.name}") def _init_logger(self): """初始化日志记录器""" if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) @abc.abstractmethod def get_input(self): """获取输入 - 抽象方法""" pass @abc.abstractmethod def output(self, response: dict): """输出响应 - 抽象方法""" pass ”

#E:\AI_System\agent\environment_interface.py import os import sys import logging import time import queue import threading import json from pathlib import Path from typing import Any, Optional, Dict from agent.base_module import EnvironmentModule # 确保项目根目录在路径中 sys.path.append(str(Path(__file__).parent.parent)) class EnvironmentInterface(EnvironmentModule): """环境接口模块 - 完整修复版""" def __init__( self, name: str = "EnvironmentInterface", coordinator: Optional[Any] = None, cognitive_system: Optional[Any] = None, # 添加对 cognitive_system 的支持 config: Optional[Dict] = None ): """初始化环境接口""" super().__init__(name) # 参数兼容处理:cognitive_system 可以替代 coordinator if cognitive_system is not None and coordinator is None: coordinator = cognitive_system self.coordinator = coordinator self.config = config or {} # 设置默认配置值 self.config.setdefault("max_workers", 4) self.config.setdefault("response_timeout", 30.0) self.config.setdefault("log_level", "INFO") # 配置日志 self.logger = logging.getLogger(name) log_level = getattr(logging, self.config["log_level"].upper(), logging.INFO) self.logger.setLevel(log_level) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) # 输入输出队列 self.input_queue = queue.Queue() self.output_queue = queue.Queue() # 运行状态 self.running = True # 启动输入监听线程 self.input_thread = threading.Thread( target=self._input_listener, daemon=True, name=f"EnvInput-{name}" ) self.input_thread.start() self.logger.info("✅ 环境接口初始化完成 - Coordinator: %s", "已连接" if coordinator else "未连接") self.logger.debug("配置参数: %s", self.config) def _input_listener(self): """后台监听用户输入""" self.logger.info("输入监听线程已启动") while self.running: try: user_input = input("> ").strip() if user_input: input_data = { "type": "command" if user_input.startswith("/") else "text", "content": user_input, "timestamp": time.time(), "source": "user" } self.input_queue.put(input_data) self.logger.debug("📥 收到输入: %s", user_input) except EOFError: self.logger.info("检测到EOF,停止输入监听") self.stop() except Exception as e: self.logger.error("输入监听错误: %s", str(e)) time.sleep(1) def get_input(self, timeout: float = 0.5) -> Optional[Dict]: """获取输入""" try: return self.input_queue.get(timeout=timeout) except queue.Empty: return None def output(self, response: dict): """输出响应""" try: # 确保响应中包含必要字段 response.setdefault("timestamp", time.time()) response.setdefault("source", "system") response_str = json.dumps(response, ensure_ascii=False, indent=2) print(f"<< {response_str}") self.logger.info("💬 系统响应: %s", response.get("content", "无内容")) except Exception as e: self.logger.error("输出响应失败: %s", str(e)) def add_input(self, input_data: dict): """添加新输入""" if not isinstance(input_data, dict): self.logger.error("输入数据格式错误,必须是字典") return # 设置必要的字段 input_data.setdefault("timestamp", time.time()) input_data.setdefault("source", "external") self.input_queue.put(input_data) self.logger.debug("手动添加输入: %s", input_data) def stop(self): """停止接口运行""" if not self.running: return self.logger.info("🛑 停止环境接口...") self.running = False # 优雅关闭线程 if self.input_thread.is_alive(): self.input_thread.join(timeout=2.0) if self.input_thread.is_alive(): self.logger.warning("输入线程未在超时时间内停止") self.logger.info("✅ 环境接口已停止") def shutdown(self): """兼容旧版本的停止方法""" self.stop() # 测试代码 if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) # 测试不同的初始化方式 print("\n测试1: 基本初始化") env1 = EnvironmentInterface() print("\n测试2: 使用 cognitive_system 参数") env2 = EnvironmentInterface(cognitive_system="模拟认知系统") print("\n测试3: 使用 coordinator 参数") env3 = EnvironmentInterface(coordinator="模拟协调器") print("\n测试4: 完整配置") env4 = EnvironmentInterface( name="自定义接口", cognitive_system="认知系统实例", config={ "max_workers": 8, "response_timeout": 60.0, "log_level": "DEBUG" } ) try: print("\n测试环境接口...输入任意内容测试") for i in range(5): user_input = env4.get_input() if user_input: print(f"收到输入: {user_input}") env4.output({ "response": f"回复 {user_input['content']}", "index": i }) time.sleep(0.5) finally: env4.stop() “{ "LOG_DIR": "E:/AI_System/logs", "CONFIG_DIR": "E:/AI_System/config", "MODEL_CACHE_DIR": "E:/AI_System/model_cache", "AGENT_NAME": "小蓝", "DEFAULT_USER": "管理员", "MAX_WORKERS": 4, "AGENT_RESPONSE_TIMEOUT": 30.0, "MODEL_BASE_PATH": "E:/AI_Models", "MODEL_PATHS": { "TEXT_BASE": "E:/AI_Models/Qwen2-7B", "TEXT_CHAT": "E:/AI_Models/deepseek-7b-chat", "MULTIMODAL": "E:/AI_Models/deepseek-vl2", "IMAGE_GEN": "E:/AI_Models/sdxl", "YI_VL": "E:/AI_Models/yi-vl", "STABLE_DIFFUSION": "E:/AI_Models/stable-diffusion-xl-base-1.0" }, "NETWORK": { "HOST": "0.0.0.0", "FLASK_PORT": 8000, "GRADIO_PORT": 7860 }, "DATABASE": { "DB_HOST": "localhost", "DB_PORT": 5432, "DB_NAME": "ai_system", "DB_USER": "ai_user", "DB_PASSWORD": "secure_password_here" }, "SECURITY": { "SECRET_KEY": "generated-secret-key-here" }, "ENVIRONMENT": { "ENV": "dev", "LOG_LEVEL": "DEBUG", "USE_GPU": true }, "DIRECTORIES": { "DEFAULT_MODEL": "E:/AI_Models/Qwen2-7B", "WEB_UI_DIR": "E:/AI_System/web_ui", "AGENT_DIR": "E:/AI_System/agent", "PROJECT_ROOT": "E:/AI_System" } } ” “#E:\AI_System\main.pyimport os import sys import traceback import threading import time import logging from pathlib import Path from core.config import config from core.command_listener import start_command_listener from agent.model_manager import ModelManager from agent.cognitive_architecture import CognitiveSystem from agent.environment_interface import EnvironmentInterface def setup_logging(): """配置系统日志记录""" log_dir = Path(config.get("LOG_DIR", "logs")) log_dir.mkdir(parents=True, exist_ok=True) log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" log_level = config.get("ENVIRONMENT.LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level), format=log_format, handlers=[ logging.FileHandler(log_dir / "ai_system.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger("Main") logger.info("日志系统初始化完成 (级别: %s)", log_level) return logger def main(): """主函数""" logger = setup_logging() logger.info("=" * 50) logger.info("🚀 启动AI系统 - 核心认知模式") logger.info("=" * 50) logger.info("系统配置摘要:") logger.info(f"项目根目录: {config.get('DIRECTORIES.PROJECT_ROOT', '未设置')}") logger.info(f"默认模型路径: {config.get('MODEL_PATHS.TEXT_BASE', '未设置')}") logger.info(f"日志级别: {config.get('ENVIRONMENT.LOG_LEVEL', 'INFO')}") # 初始化模型管理器 try: model_manager = ModelManager( model_registry=config.get("MODEL_PATHS", {}), cache_dir=config.get("MODEL_CACHE_DIR", "model_cache"), use_gpu=config.get("ENVIRONMENT.USE_GPU", True) ) logger.info("✅ 模型管理器初始化完成") base_model = config.get("MODEL_PATHS.TEXT_BASE") if base_model and model_manager.load_model("TEXT_BASE"): logger.info(f"✅ 基础模型已加载: {base_model}") except Exception as e: logger.error("❌ 模型管理器初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 初始化认知系统接口 try: cognitive_system = CognitiveSystem( name=config.get("AGENT_NAME", "小蓝"), model_manager=model_manager, config=config.get("COGNITIVE_CONFIG", {}) ) logger.info("✅ 认知系统初始化完成 - 名称: %s", cognitive_system.name) except Exception as e: logger.error("❌ 认知系统初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 初始化环境接口 try: environment_interface = EnvironmentInterface( name="环境接口", cognitive_system=cognitive_system, config={ "max_workers": config.get("MAX_WORKERS", 4), "response_timeout": config.get("AGENT_RESPONSE_TIMEOUT", 30.0), "log_level": config.get("ENVIRONMENT.LOG_LEVEL", "INFO") } ) logger.info("✅ 环境接口初始化完成") except Exception as e: logger.error("❌ 环境接口初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 创建关闭处理函数 def shutdown_handler(): """系统关闭处理函数""" logger.info("🛑 收到关闭命令,开始关闭系统...") environment_interface.stop() logger.info("✅ 系统已完全关闭") sys.exit(0) # 自定义命令处理器 def command_handler(command: str) -> str: """处理用户命令""" try: logger.info(f"处理命令: {command}") # 简单命令路由示例 if command.lower().startswith("mode "): new_mode = command[5:].strip() return f"已切换到 {new_mode} 模式" elif command.lower() == "status": return ("系统状态:\n" f"- 模型加载: {model_manager.status}\n" f"- 认知系统: {cognitive_system.status}\n" f"- 环境接口: {'运行中' if environment_interface.running else '已停止'}") # 其他命令处理逻辑... return f"命令 '{command}' 已接收处理" except Exception as e: logger.error(f"命令处理错误: {str(e)}") return f"❌ 处理命令时出错: {str(e)}" # 启动命令监听器 - 使用新的CommandListener类 try: command_listener = start_command_listener( command_handler=command_handler, shutdown_handler=shutdown_handler ) logger.info("✅ 命令监听器已启动") except Exception as e: logger.error("❌ 命令监听器启动失败: %s", str(e)) logger.error(traceback.format_exc()) return logger.info("🌟 系统准备就绪! 输入命令控制模式") # 主循环:处理用户输入 try: while True: # 获取用户输入 user_input = environment_interface.get_input() if user_input: logger.info("收到用户输入: %s", user_input.get("content", "")) # 如果是关闭命令 if user_input.get("content") == "/shutdown": shutdown_handler() # TODO: 添加处理用户输入的逻辑 # 例如:cognitive_system.process_input(user_input) # 模拟处理响应 response = { "status": "success", "message": f"已处理您的请求: {user_input['content']}", "timestamp": time.time() } # 发送响应 environment_interface.output(response) # 短暂休眠以避免CPU占用过高 time.sleep(0.1) except KeyboardInterrupt: logger.info("🛑 收到中断信号,关闭系统...") shutdown_handler() except Exception as e: logger.error("❌ 主循环发生未预期错误: %s", str(e)) logger.error(traceback.format_exc()) shutdown_handler() if __name__ == "__main__": main() ”改好发我完整的

最新推荐

recommend-type

【地理信息科学】基于SRAI的荷兰土地利用处理优化:H3网格化与POI数据嵌入分析系统实现

内容概要:本文介绍了一个基于SRAI框架优化的荷兰土地利用数据处理流程,重点在于通过H3地理网格系统对荷兰区域进行区域化划分,并结合OpenStreetMap(OSM)中的POI数据,提取与土地利用相关的特定标签(如住宅、商业、工业、自然地貌等),进行高效的数据下载、空间交集分析与特征嵌入(embedding)训练。流程涵盖创建研究区域边界、生成H3网格、下载并处理POI数据、与网格进行空间连接、训练多种嵌入模型(如计数嵌入和Hex2Vec),并保存结果与生成可视化图示,提升了处理效率与错误容错能力。; 适合人群:具备地理信息系统(GIS)、Python编程及空间数据分析基础的科研人员或数据工程师,熟悉pandas、geopandas、H3等工具的用户;; 使用场景及目标:①实现城市尺度的土地利用特征提取与空间分布分析;②构建地理区域的低维向量表示用于下游机器学习任务;③优化大规模地理数据处理流程,提升数据加载与模型训练效率; 阅读建议:建议结合SRAI官方文档运行代码,注意配置好依赖环境(如含PyTorch的srai[torch]),并根据实际数据量调整H3分辨率与嵌入参数,建议在Linux或高性能计算环境中执行。
recommend-type

MATLAB中最大面积覆盖算法的实现_Implementation of a maximum area coverage

MATLAB中最大面积覆盖算法的实现_Implementation of a maximum area coverage algorithm in MATLAB.zip
recommend-type

MyBatis中文详尽注释版。书籍《通用源码阅读指导书——MyBatis源码详解》配套注释版源码。.zip

MyBatis中文详尽注释版。书籍《通用源码阅读指导书——MyBatis源码详解》配套注释版源码。.zip
recommend-type

合成外部声散射数据和样本解析代码。_Synthetic exterior acoustic scattering dat

合成外部声散射数据和样本解析代码。_Synthetic exterior acoustic scattering data and sample parsing code..zip
recommend-type

基于Python与Vue的共享单车时空数据管理与分析系统开发文档及代码实现

基于Python与Vue框架的共享单车时空数据管理与分析系统,包含完整代码及说明文档。该项目为个人毕业设计成果,答辩成绩为98分,所有代码均经过严格测试,可正常运行。适用于初学者学习与进阶研究。该资源主要面向计算机科学、通信工程、人工智能及自动化等相关领域学生、教师及从业人员,亦可用于课程设计、大作业及毕业设计等场景。项目具备较高参考价值,具备良好基础的开发者可在此基础上进行功能扩展与优化。 资源来源于网络分享,仅用于学习交流使用,请勿用于商业,如有侵权请联系我删除!
recommend-type

RaspberryMatic与Docker整合:CCU2固件容器化操作指南

### Docker与CCU2固件整合 #### 知识点1:Docker容器技术 Docker是一种开源的容器化平台,它允许开发者将应用及其依赖打包到一个可移植的容器中,该容器可以在任何支持Docker的机器上运行。Docker容器和传统的虚拟机不同,它不需要完整的操作系统镜像,而是利用宿主机的操作系统内核,实现了轻量级的隔离,启动速度快,资源消耗低。 #### 知识点2:CCU2固件与OpenHAB CCU2(CCU代表Comet Control Unit)固件通常用在HomeMatic智能家居自动化系统中,它负责管理和控制HomeMatic的设备。CCU2运行的是一个基于Linux的自定义系统,专门优化用于与HomeMatic硬件和软件通信。当把CCU2固件用于Docker容器时,意味着你可以在任何支持Docker的设备上,通过容器化的方式部署和运行CCU2环境,从而支持HomeMatic设备的控制。 #### 知识点3:RaspberryMatic RaspberryMatic是为树莓派量身打造的一个项目,它允许用户在树莓派上运行CCU2固件。项目提供了一整套的HomeMatic体验,包括备份功能、Dutty-Cycle、LAN GW等。RaspberryMatic的一个显著优点是支持多种架构,包括x86_64/amd64、ARM和ARM64。 #### 知识点4:Docker容器部署脚本 "docker-ccu"项目提供了一套脚本,这些脚本能够自动化创建一个Docker容器来运行CCU2固件。通常这类脚本命名为`deploy.sh`,开发者或者最终用户可以通过运行这些脚本来快速部署和启动Docker容器,而无需手动配置和启动容器的每一个步骤。 #### 知识点5:数据备份与迁移 在使用Docker容器进行部署时,用户可能需要在不同环境下迁移数据或者保留原有数据。脚本中提到了数据保留的问题,如果用户之前使用的是其他方式部署,比如非Docker方式或者使用了特定的docker卷或者容器名称,那么在调用`deploy.sh`脚本部署时,需要对设置进行相应的调整,以保证数据的完整性。 #### 知识点6:仓库维护与开源社区 项目维护者提到了不再计划继续更新该存储库,并提出了将仓库设置为只读模式的想法。这在开源社区中是比较常见的情况,尤其是在维护者有新的兴趣点或者由于个人时间限制时。在此情况下,开源项目可以通过社区协作来继续维护,或者寻求其他维护者的接手。 #### 知识点7:Shell脚本编写 由于项目中提到了一个叫做`deploy.sh`的脚本文件,这说明脚本是用Shell语言编写的。Shell脚本非常适合于执行自动化任务,比如配置环境、启动服务、管理文件系统等,因此在自动化部署或系统管理中经常被使用。了解Shell脚本编写,对于自动化管理Docker容器等任务至关重要。 #### 知识点8:社区支持和反馈 项目维护者在描述中提到,如果在一个月内没有收到任何关于将官方CCU作为容器使用的反馈,将会把仓库设置为只读模式。这表明了开源社区中项目的发展很大程度上依赖于社区成员的反馈和支持。因此,了解如何与开源项目互动,提交问题、建议和补丁,是参与开源社区的重要途径。 #### 知识点9:固件概念与兼容性 CCU2固件特别设计用于某些特定硬件,但通过Docker化的方式,开发者可以跨平台运行CCU2固件,这增加了固件的兼容性。Docker的隔离性允许用户在一个通用的软件层面上运行原本可能受限于特定硬件的固件,从而扩展了固件的应用场景。 #### 知识点10:操作系统架构支持 项目支持包括x86_64/amd64、ARM和ARM64在内的多种架构,说明了Docker容器在不同硬件平台上的高度可移植性。这一特点使得开发者可以在各种硬件上部署相同的环境,简化了跨平台应用的开发和部署。 #### 结语 该文档提供了一个关于如何将特定固件整合入Docker容器的方案,并说明了项目维护者对于未来发展的规划。这些内容不仅对有志于尝试或扩展该项目的个人有指导意义,同时也展示了开源社区协作以及Docker技术在部署和管理复杂系统环境中的重要性和便利性。
recommend-type

手把手封装SDK:C#如何高效集成汉印D35BT打印功能

# 摘要 本文围绕C# SDK封装与汉印D35BT打印机集成的技术实践展开,系统阐述了SDK封装的理论基础、架构设计及面向对象设计原则的应用。文章分析了汉印D35BT打印机的通信协议与API调用方式,并详细介绍了在C#中实现蓝牙设备交互与数据发送的方法。通过核心打印功能的类封装、异步任务处理机制的设计,提升了SDK的易用性与扩展性。结合WinForm项目示例验证功能完整性后,进一步探讨了SDK的性能优化策略、测试方法及发布流程,构建了从设计、实现到部署的完整技术路径。 # 关键字 SDK封装;蓝牙通信;面向对象设计;异步打印;API调用;NuGet包发布 参考资源链接:[C#开
recommend-type

VM虚拟机ubuntu桥接主机无线网络

### 配置 VMware Ubuntu 桥接模式连接无线网络 在 VMware 中配置 Ubuntu 虚拟机通过桥接模式连接主机的无线网络,需要确保虚拟机与主机处于同一网络段,并能够通过主机的无线网卡直接访问外部网络。以下是详细的配置步骤: #### VMware 设置桥接模式 1. **以管理员权限运行 VMware**,进入 **编辑 > 虚拟网络编辑器**。 2. 在 **虚拟网络编辑器** 界面中,找到 **VMnet0(桥接模式)** 的设置部分。 3. 在 **“桥接到”** 的下拉菜单中,选择主机的无线网卡设备。 4. 勾选 **“自动设置桥接”** 选项,确保 VMwar
recommend-type

Ruby on Rails跳蚤市场应用开发详解

根据提供的文件信息,我们可以从中提炼出以下知识点: ### 标题知识点 - **freemarket_sample_72h** - 标题暗示这是一份关于名为“freemarket”的跳蚤市场应用程序的72小时开发样例或原型。 - 样例名称“freemarket_sample_72h”可能用于内部标识或者版本控制,表明该样本是在有限的时间内(即72小时内)完成的。 ### 描述知识点 - **网站结构** - 首页:应用程序的入口点,通常包含总体介绍和导航链接。 - 产品页面:展示产品的列表或者详细信息。 - 展览页:可能指专门展示某些特殊产品或促销产品的页面。 - 应用信息:关于应用程序的基本信息,如版本号、开发团队、联系方式等。 - 应用概述:对应用程序功能和目标用户群体的简介。 - **用户账户信息** - 测试账号:为开发者或测试者提供的虚拟用户账号信息,以便进行应用程序的内部测试。 - 购买者信息:提供了邮箱地址、密码以及购买卡信息,是进行交易和购买所必需的。 - 卖家信息:提供了卖家的邮箱地址和密码,用于登录卖家账户进行产品上架和管理。 - **功能列表** - 新用户注册:允许新用户创建账户。 - 登录功能:用户可以使用凭证登录应用程序。 - 产品列表功能:展示所有可购买的产品。 - 产品购买功能:用户可以购买产品,涉及到支付信息的处理。 - 产品类别注册和显示:允许用户查看不同的产品分类。 - 产品详细信息显示:展示每个产品的详细信息,如描述、价格等。 - 编辑和删除列出的产品:赋予管理员或卖家权利更新或移除产品信息。 - **开发环境** - Ruby 2.5.1:这是Ruby编程语言的一个版本。 - Ruby on Rails 5.4.2:这是一个使用Ruby语言编写的开源Web应用框架。 - MySQL 14.14:这是一个流行的开源关系型数据库管理系统。 - Github:一个用于代码托管和版本控制的平台。 - AWS:亚马逊提供的云服务平台,包括EC2(弹性计算云)和S3(简单存储服务)。 - Capistrano:是一个开源的自动化部署工具,常用于Ruby on Rails项目。 - **开发周期和工作时间** - 开发时间:约4周,说明了项目从开始到完成所需的时间。 - 每天平均工作时间:大约9小时,表明项目的紧凑和开发团队的努力。 - 开发系统人数:4,指出了参与该项目的开发人员数量。 - 敏捷类型:可能指的是一种开发过程,强调快速迭代和响应变化。 ### 标签知识点 - **Ruby** - 这个标签直接指向了Ruby编程语言,说明该应用程序是使用Ruby开发的。 ### 压缩包子文件的文件名称列表知识点 - **freemarket_sample_72h-master** - 这是源代码压缩包的文件名称,指示了一个版本控制中的主分支(master)。 通过这些信息,我们可以了解到该应用程序是一个在线跳蚤市场,它允许用户注册、登录、查看和购买产品,并且提供了产品管理功能。项目采用Ruby语言和Ruby on Rails框架开发,并使用了多种技术和平台,如MySQL数据库、Github、AWS云服务以及Capistrano部署工具。项目开发遵循敏捷开发原则,并在一个紧凑的时间表中完成。
recommend-type

【C#条码打印实战技巧】:汉印D35BT数据格式转换全攻略

# 摘要 本文围绕C#语言实现条码打印的技术方案展开,重点以汉印D35BT打印机为实践对象,系统性地讲解了条码打印中数据格式的基本原理与处理方法。文章分析了条码打印的数据流向、通信协议与命令集结构,探讨了在C#开发环境下实现数据格式转换、命令封装与容错机制的关键技术。通过完整的打印流程实现、性能优化策略以及常见问题排查方法的介绍,帮助开发者构建高效稳定的条码打印应用。同时,文章还展望了条码打印技术在多协议支持、云服务集成与企业级系统对接方面的拓展方向。 # 关键字 条码打印;数据格式;C#开发;通信协议;命令封装;容错机制 参考资源链接:[C#开发汉印D35BT条码打印机源代码