活动介绍

#!/bin/bash # 定义颜色代码 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[0;33m' NC='\033[0m' # 重置颜色 # 打印分隔线 echo -e "${GREEN}\n===== 系统基本信息 =====${NC}" echo "主机名: $(hostname)" echo "系统版本: $(cat /etc/redhat-release)" echo "运行时间: $(uptime | awk -F',' '{print $1}')" echo "当前时间: $(date '+%Y-%m-%d %H:%M:%S')" # CPU监控 echo -e "${GREEN}\n===== CPU状态 =====${NC}" echo "CPU型号: $(grep 'model name' /proc/cpuinfo | uniq | cut -d':' -f2)" echo "物理CPU数量: $(grep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)" echo "逻辑CPU数量: $(grep 'processor' /proc/cpuinfo | wc -l)" echo -e "当前CPU使用率: ${YELLOW}$(mpstat 1 1 | awk '/Average/ {printf \"%d%%\", 100-$12}')${NC}" # 内存监控 echo -e "${GREEN}\n===== 内存状态 =====${NC}" free -m | awk ' /Mem/ {printf "总内存: %dMB 已用: %dMB 剩余: %dMB\n", $2, $3, $4} /Swap/ {printf "交换空间: %dMB 已用: %dMB 剩余: %dMB\n", $2, $3, $4}' # 磁盘监控 echo -e "${GREEN}\n===== 磁盘状态 =====${NC}" echo "磁盘空间使用情况:" df -h | grep -Ev 'tmpfs|devtmpfs' | awk '{printf "%-20s %-10s %-10s %-10s %s\n", $1, $2, $3, $4, $6}' echo -e "\n磁盘I/O状态:" iostat -d | awk ' BEGIN {print "设备\t\t每秒读取\t每秒写入"} $1 ~ /^sd/ {print $1 "\t\t" $3 "\t\t" $4}' # 进程与端口 echo -e "${GREEN}\n===== 进程与端口状态 =====${NC}" echo "占用CPU前5的进程:" ps -eo pid,ppid,user,%cpu,%mem,cmd --sort=-%cpu | head -n6 echo -e "\n监听中的TCP端口:" netstat -tuln | awk '/^tcp/ {split($4, a, ":"); printf "%-8s %-15s:%s\n", $1, a[1], a[2]}' # 异常检测 echo -e "${GREEN}\n===== 异常检测 =====${NC}" # 检测内存使用超过90% if (( $(free | awk '/Mem/{printf "%d", $3/$2*100}') > 90 )); then echo -e "${RED}警告: 内存使用超过90%!${NC}" fi # 检测根分区使用超过90% if (( $(df / | awk 'NR==2 {gsub(/%/,""); print $5}') > 90 )); then echo -e "${RED}警告: 根分区使用超过90%!${NC}" fi # 添加/home目录分析 echo -e "${GREEN}\n===== /home目录空间使用 =====${NC}" if [ -d "/home" ]; then echo "用户目录大小统计:" # 使用du统计一级子目录大小,按降序排序 du -h --max-depth=1 /home/ 2>/dev/null | sort -hr | awk ' BEGIN {printf "%-25s %-10s\n", "目录", "大小"} !/home\/$/ {gsub("/home/",""); printf "%-25s %-10s\n", $2, $1}' else echo "未找到/home目录" fi echo -e "${GREEN}\n===== 状态收集完成 ====="${NC}

时间: 2025-07-09 11:55:34 浏览: 13
<think>好的,我现在需要分析用户提供的这个bash脚本,并按照用户的要求进行详细介绍。用户希望结构清晰,分步骤解释,同时使用中文回答。首先,我要仔细阅读这个脚本,理解每一部分的功能,然后按照逻辑结构逐步拆解。 首先,脚本开头定义了颜色代码,用于后续的输出着色。这应该是为了让不同部分的信息更醒目。接下来,脚本打印系统基本信息,包括主机名、系统版本、运行时间和当前时间。这里需要注意的是,系统版本是通过读取/etc/redhat-release文件获取的,所以这个脚本可能主要针对Red Hat系列的Linux系统,比如CentOS或RHEL。 然后是CPU状态的检查,这里通过/proc/cpuinfo文件获取CPU型号、物理和逻辑核心数量,并使用mpstat命令获取当前CPU使用率。这里要注意mpstat的输出处理,特别是使用awk截取平均值的部分,可能需要检查是否正确计算了使用率。 内存监控部分使用free命令,并以MB为单位显示。这里awk脚本处理输出,显示总内存、已用和剩余,以及交换空间的情况。需要确认free -m的输出格式是否与awk的处理匹配,避免截取错误的列。 磁盘监控分为空间使用和I/O状态。df命令排除了tmpfs和devtmpfs文件系统,并以易读的格式显示。iostat命令用于显示磁盘I/O,但需要确认系统是否安装了sysstat包,否则iostat可能不可用。此外,iostat的输出处理是否正确,特别是设备名的匹配(如sd)是否适用于不同环境。 进程与端口部分显示占用CPU前五的进程和监听的TCP端口。ps命令的参数是否正确,排序是否合理。netstat命令查看监听端口,处理输出时拆分本地地址和端口是否正确,尤其是IPv6地址可能包含多个冒号的情况需要注意。 异常检测部分检查内存和根分区使用是否超过90%,并输出警告。这里使用了算术比较,需要确认语法是否正确,特别是在不同的shell环境下,比如是否使用bash特有的语法。 最后,脚本添加了/home目录的空间分析,使用du命令统计各用户目录大小,并按大小排序。这里需要处理权限问题,因为某些目录可能无法访问,所以有2>/dev/null来忽略错误输出。同时,awk处理时排除了/home自身目录,只显示子目录。 在解释过程中,需要指出每个部分的功能、使用的命令及其作用,可能的问题点(比如依赖特定发行版或命令),以及颜色代码的应用。同时,要确保按照用户的要求,结构清晰,分步骤讲解,使用中文,并且行内公式正确使用$...$,不过在这个脚本中可能涉及较少数学公式,但如果有变量或数值处理,需要正确应用格式。 可能需要注意的地方包括:检查命令的兼容性,例如在非Red Hat系统上cat /etc/redhat-release会出错;iostat可能需要安装;netstat在某些系统中被ss取代,但用户提供的脚本使用的是netstat;还有处理df和du时的权限问题。此外,颜色代码中的转义字符是否正确使用echo -e来启用转义。 总结来说,我需要将整个脚本分解为各个功能模块,逐一解释其目的、实现方式和潜在注意事项,帮助用户理解脚本的工作原理和可能遇到的问题。</think>让我们逐步分析这个Bash监控脚本的实现逻辑和功能模块: $$注意:这个脚本主要针对Red Hat系Linux系统(CentOS/RHEL),部分命令在其他发行版中可能需要调整$$ 一、颜色定义模块 ```bash RED='\033[0;31m' # 红色 GREEN='\033[0;32m' # 绿色 YELLOW='\033[0;33m' # 黄色 NC='\033[0m' # 颜色重置 ``` * 使用ANSI转义码实现终端颜色输出 * 格式:`\033[格式代码m` * 颜色代码:31=红,32=绿,33=黄,0=重置 二、系统基本信息模块 ```bash echo -e "${GREEN}\n===== 系统基本信息 =====${NC}" echo "主机名: $(hostname)" # 获取主机名 echo "系统版本: $(cat /etc/redhat-release)" # 读取系统版本 ``` * 使用`uptime`命令的过滤输出获取运行时间 * 通过`date`命令格式化当前时间 三、CPU监控模块 ```bash grep 'model name' /proc/cpuinfo | uniq | cut -d':' -f2 ``` * 从/proc/cpuinfo提取CPU信息 * `mpstat`计算CPU使用率公式: $$ \text{CPU使用率} = 100\% - \text{空闲率} $$ 四、内存监控模块 ```bash free -m | awk ' /Mem/ {printf "总内存: %dMB 已用: %dMB 剩余: %dMB\n", $2, $3, $4} /Swap/ {printf "交换空间: %dMB 已用: %dMB 剩余: %dMB\n", $2, $3, $4}' ``` * `-m`选项以MB为单位显示 * 通过awk精确解析free命令输出 五、磁盘监控模块 ```bash df -h | grep -Ev 'tmpfs|devtmpfs' # 排除临时文件系统 iostat -d | awk '$1 ~ /^sd/...' # 仅显示物理磁盘 ``` * `df -h`以易读格式显示磁盘空间 * `iostat`显示磁盘I/O统计,需要安装sysstat包 六、进程与端口模块 ```bash ps -eo pid,ppid,user,%cpu,%mem,cmd --sort=-%cpu | head -n6 netstat -tuln | awk '/^tcp/...' ``` * `ps --sort`按CPU使用率降序排序 * `netstat -tuln`显示所有监听端口 七、异常检测模块 ```bash (( $(free | awk '/Mem/{printf "%d", $3/$2*100}') > 90 )) ``` * 使用算术比较`(())` * 内存使用率计算公式: $$ \text{内存使用率} = \frac{\text{已用内存}}{\text{总内存}} \times 100\% $$ 八、/home目录分析模块 ```bash du -h --max-depth=1 /home/ 2>/dev/null | sort -hr ``` * `--max-depth=1`只统计一级子目录 * `sort -hr`按人类可读格式逆序排序 潜在改进建议: 1. 增加网络连接状态监控(如`ss -s`) 2. 添加僵尸进程检测:`ps -eo stat | grep -c Z` 3. 使用`bc`命令进行浮点数计算提高精度 4. 添加日志记录功能 5. 设置阈值可配置参数 这个脚本通过组合常用系统工具,实现了快速系统状态检查,适合用于日常运维监控。实际使用时建议: 1. 添加执行权限:`chmod +x script.sh` 2. 通过crontab定期执行 3. 输出重定向到日志文件 4. 关键指标设置邮件报警
阅读全文

相关推荐

#!/bin/bash ############################################################ # 高级系统初始化脚本 # 功能:自动完成系统环境配置、软件安装和验证 ############################################################ ############################################################ # 参数定义 ############################################################ # 安装包所在路径 pack_dir="/iflytek/install" # 应用安装路径 server_dir="/iflytek/server" # FastDFS配置文件路径 fast_conf="/iflytek/conf" # shell脚本路径 shell_dir="/iflytek/install/shell" # 主机名 host_name="skynet" # 日志文件路径 log_file="/var/log/system_init_$(date +%Y%m%d_%H%M%S).log" ############################################################ # 颜色定义 ############################################################ RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[0;33m' BLUE='\033[0;34m' NC='\033[0m' # 无颜色 ############################################################ # 函数定义 ############################################################ # 记录日志 log() { local level=$1 local message=$2 local color=$NC case $level in INFO) color=$GREEN ;; WARN) color=$YELLOW ;; ERROR) color=$RED ;; DEBUG) color=$BLUE ;; esac echo -e "${color}[$(date "+%Y-%m-%d %H:%M:%S")] [$level] ${message}${NC}" echo "[$(date "+%Y-%m-%d %H:%M:%S")] [$level] ${message}" >> "$log_file" } # 带错误处理的命令执行 run_command() { local cmd=$1 local description=$2 local critical=${3:-true} # 默认为关键命令 log "DEBUG" "执行命令: $cmd" # 执行命令并捕获输出 if output=$(eval "$cmd" 2>&1); then log "INFO" "$description - 成功" return 0 else log "ERROR" "$description - 失败" log "ERROR" "错误详情: $output" if [ "$critical" = "true" ]; then log "ERROR" "关键步骤失败,终止执行" exit 1 else return 1 fi fi } # 创建必要目录 create_directories() { run_command "mkdir -p /iflytek/{engine,conf,bak,server,data/zookeeper} /usr/java" \ "创建目录结构" } # 操作系统设置 configure_os() { # 关闭防火墙 run_command "systemctl stop firewalld.service" "停止防火墙服务" run_command "systemctl disable firewalld.service" "禁用防火墙开机启动" # 关闭SELinux run_command "setenforce 0" "临时禁用SELinux" run_command "sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config" \ "永久禁用SELinux" # 调整系统限制 run_command "ulimit -n 655360" "设置打开文件数限制" run_command "ulimit -u 655360" "设置用户进程数限制" # 永久配置系统限制 run_command "echo '* soft noproc 655360' >> /etc/security/limits.conf" \ "配置进程数软限制" run_command "echo '* hard noproc 655360' >> /etc/security/limits.conf" \ "配置进程数硬限制" run_command "echo '* soft nofile 655360' >> /etc/security/limits.conf" \ "配置文件数软限制" run_command "echo '* hard nofile 655360' >> /etc/security/limits.conf" \ "配置文件数硬限制" run_command "echo 'vm.max_map_count=655360' >> /etc/sysctl.conf" \ "配置内核参数" run_command "sysctl -p > /dev/null" "加载内核参数配置" } # 配置Java环境 configure_java() { # 卸载旧版JDK run_command "rpm -e --nodeps \$(rpm -qa | grep -i jdk)" "卸载现有JDK" false # 安装新版JDK run_command "tar zxf \"$pack_dir/jdk-8u144-linux-x64.tar.gz\" -C /usr/java/" \ "解压JDK安装包" # 配置环境变量 local java_profile="/etc/profile.d/java.sh" run_command "cat > \"$java_profile\" <<EOF export JAVA_HOME=/usr/java/jdk1.8.0_144 export JRE_HOME=\\\$JAVA_HOME/jre export PATH=\\\$PATH:\\\$JAVA_HOME/bin export CLASSPATH=.:\\\$JAVA_HOME/lib/dt.jar:\\\$JAVA_HOME/lib/tools.jar EOF" "创建Java环境配置文件" run_command "source \"$java_profile\"" "加载Java环境变量" } # 设置本地Yum源 configure_yum() { # 备份现有Yum源 run_command "mkdir -p /etc/yum.repos.d/bak" "创建Yum源备份目录" run_command "mv /etc/yum.repos.d/*.repo /etc/yum.repos.d/bak 2>/dev/null" \ "备份现有Yum源" false # 挂载ISO镜像 run_command "mkdir -p /iflytek/web" "创建Yum源目录" run_command "mount -o loop \"$pack_dir/CentOS-7-x86_64-DVD-2009.iso\" /mnt" \ "挂载CentOS ISO镜像" # 复制文件 run_command "cp -a /mnt/* /iflytek/web/centos/" "复制Yum源文件" run_command "umount /mnt" "卸载ISO镜像" # 配置本地源 run_command "cp \"$shell_dir/my-z-centos.repo\" /etc/yum.repos.d/" \ "配置本地Yum源" # 更新缓存 run_command "yum clean all" "清理Yum缓存" run_command "yum makecache" "创建Yum元数据缓存" } # 安装依赖包 install_dependencies() { local packages=( openssl openssl-devel gcc gcc-c++ vim net-tools zip unzip wget gzip lrzsz dos2unix sysstat ) run_command "yum -y install ${packages[*]}" "安装系统依赖包" } # 解压安装包 unzip_packages() { # 解压ZIP文件 for zip_file in "$server_dir"/*.zip; do if [ -f "$zip_file" ]; then run_command "unzip -o \"$zip_file\" -d \"$server_dir\"" \ "解压文件: $(basename "$zip_file")" fi done # 解压TAR文件 for tar_file in "$pack_dir"/*.tar.gz; do if [ -f "$tar_file" ]; then run_command "tar -xf \"$tar_file\" -C \"$server_dir\"" \ "解压文件: $(basename "$tar_file")" fi done # 设置权限 run_command "chmod -R 775 /iflytek" "设置文件权限" # 清理ZIP文件 run_command "mv \"$server_dir\"/*.zip \"$pack_dir/\" 2>/dev/null" \ "移动ZIP文件到安装包目录" false } # 配置主机名 configure_hostname() { # 获取IP地址 run_command "ip=\$(ip -o -4 addr show | awk '\$2 != \"lo\" {print \$4}' | cut -d'/' -f1 | head -1)" \ "获取本机IP地址" # 配置主机名 run_command "hostnamectl set-hostname \"$host_name\"" "设置主机名" # 配置hosts文件 run_command "cat > /etc/hosts <<EOF 127.0.0.1 localhost localhost.localdomain ::1 localhost localhost.localdomain $ip $host_name EOF" "配置/etc/hosts文件" } # 验证配置 validate_configuration() { log "INFO" "开始配置验证..." # 验证Java环境 run_command "java -version" "验证Java安装" # 验证系统限制 run_command "grep 'vm.max_map_count' /etc/sysctl.conf" "验证内核参数" run_command "grep '655360' /etc/security/limits.conf" "验证系统限制" # 验证主机名 run_command "hostname" "验证当前主机名" run_command "grep '$host_name' /etc/hosts" "验证hosts配置" log "INFO" "配置验证完成!" } # 主函数 main() { log "INFO" "系统初始化开始" log "INFO" "日志文件: $log_file" local start_time=$(date +%s) # 执行各步骤 create_directories configure_os configure_java configure_yum install_dependencies unzip_packages configure_hostname validate_configuration local end_time=$(date +%s) local duration=$((end_time - start_time)) log "INFO" "系统初始化成功完成!" log "INFO" "总耗时: $duration 秒" } ############################################################ # 脚本执行入口 ############################################################ # 检查是否以root用户运行 if [ "$(id -u)" -ne 0 ]; then echo -e "${RED}错误:此脚本必须以root用户身份运行${NC}" >&2 exit 1 fi # 执行主函数 main 去掉输出控制的颜色

# Copyright (c) 2010, 2012, 2013, 2014 # Thorsten Glaser <[email protected]> # This file is provided under the same terms as mksh. #- # Minimal /system/etc/mkshrc for Android # # Support: https://launchpad.net/mksh AGVARR="$@" SCRIPT="$0" AGV1="$1" AGV2="$2" AGV3="$3" export MYSCRIPT="$(realpath "$0")" 2>/dev/null if [ "$MYSCRIPT" ]; then export MYPATH="${MYSCRIPT%/*}" cmds="$SCRIPT $AGVARR" fi priv_dir=/data/data/magisk.term p(){ COLOR=$1;TEXT="$2";escape="$1" [ "$COLOR" == "black" ] && escape="0;30" [ "$COLOR" == "red" ] && escape="0;31" [ "$COLOR" == "green" ] && escape="0;32" [ "$COLOR" == "orange" ] && escape="0;33" [ "$COLOR" == "blue" ] && escape="0;34" [ "$COLOR" == "purple" ] && escape="0;35" [ "$COLOR" == "cyan" ] && escape="0;36" [ "$COLOR" == "light_gray" ] && escape="0;37" [ "$COLOR" == "gray" ] && escape="1;30" [ "$COLOR" == "light_red" ] && escape="1;31" [ "$COLOR" == "light_green" ] && escape="1;32" [ "$COLOR" == "yellow" ] && escape="1;33" [ "$COLOR" == "light_blue" ] && escape="1;34" [ "$COLOR" == "light_purple" ] && escape="1;35" [ "$COLOR" == "light_cyan" ] && escape="1;36" [ "$COLOR" == "white" ] && escape="1;37" [ "$COLOR" == "none" ] && escape="0" code="\033[${escape}m" end_code="\033[0m" echo -en "$code$TEXT$end_code" } RC='\033[0m' RED='\033[0;31m' BRED='\033[1;31m' GRAY='\033[1;30m' BLUE='\033[0;34m' BBLUE='\033[1;34m' CYAN='\033[0;34m' CYAN='\033[1;34m' WHITE='\033[1;37m' GREEN='\033[0;32m' BGREEN='\033[1;32m' YELLOW='\033[1;33m' PURPLE='\033[0;35m' BPURPLE='\033[1;35m' ORANGE='\033[0;33m' pd(){ p "$1" "$2"; echo } DLPATH="$priv_dir/magisk" export OLDPATH="$PATH" ( cd "$priv_dir" if [ ! -d "$DLPATH" ]; then rm -rf "$DLPATH" 2>/dev/null mkdir -p "$DLPATH" 2>/dev/null fi link(){ ( agv1="$1"; agv2="$2" [ ! -f "$DLPATH/$agv2" ] && rm -rf "$DLPATH/$agv2" 2>/dev/null ln -s "$(which "$agv1")" "$DLPATH/$agv2" 2>/dev/null ) } link "libapp.so" "magisk.apk" link "libbusybox.so" "busybox" link "liblegacy.so" "legacy.zip" link "libbash.so" "menu" link "libgxp.so" "gearlock_extension.zip" ) export PATH="$DLPATH/bin:$DLPATH:$PATH" alias inmagisk='"$DLPATH/busybox" sh -o standalone "$DLPATH/menu" option "$@"' alias m='inmagisk "$@"' alias M='inmagisk "$@"' magiskhide(){ ( VAR_ARRAY="$@" MAGISK_VERSION="$(magisk -V)" || exit 1 MAGISKHIDE="--denylist" [ "$MAGISK_VERSION" -lt "23010" ] && MAGISKHIDE="magiskhide" magisk $MAGISKHIDE "$@" ) } denylist(){ ( magiskhide "$@"; ) } help(){ echo "$(inmagisk help) MagiskHide / DenyList ConFig usage: magiskhide [option]... or: denylist [option]... shortcut for \"magisk --denylist\" if Magisk version is 23010+" } pd none "> 欢迎使用Magisk安装脚本"; p none "> 原作者:"; pd light_blue "Github @HuskyDG" p none "> 汉化&修复:"; pd cyan "多开鸭 - www.duokaiya.com" pd none "" p none "输入 "; p light_green "m"; pd none " 开始"; pd none ""

#!/bin/bash ############################################################ # 参数定义 ############################################################ # 安装包所在路径 pack_dir=/iflytek/install # 应用安装路径 server_dir=/iflytek/server # Fastdfs配置文件路径 Fast_conf=/iflytek/conf # shell脚本路径 shell_dir=/iflytek/install/shell # 定义颜色变量以增强输出效果 RED='\033[0;31m' GREEN='\033[0;32m' NC='\033[0m' # No Color # 函数:显示进度信息 show_progress() { local message=$1 echo -e "${GREEN}=> ${message}${NC}" } # 函数:错误处理 handle_error() { local error_message=$1 echo -e "${RED}ERROR: ${error_message}${NC}" exit 1 } # 函数:创建必要目录 create_directories() { show_progress "正在创建所需目录..." mkdir -p /iflytek/{engine,conf,bak,server,data/zookeeperusr} /usr/java || handle_error "目录创建失败" show_progress "目录创建完成!" } # 函数:操作系统设置 configure_os() { show_progress "正在关闭防火墙和SELinux..." systemctl stop firewalld.service > /dev/null 2>&1 && systemctl disable firewalld.service > /dev/null 2>&1 || handle_error "防火墙关闭失败" setenforce 0 && sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config || handle_error "SELinux关闭失败" show_progress "正在调整系统限制..." ulimit -n 655360 && ulimit -u 655360 || handle_error "系统限制调整失败" echo '* soft noproc 655360' >> /etc/security/limits.conf echo '* hard noproc 655360' >> /etc/security/limits.conf echo '* soft nofile 655360' >> /etc/security/limits.conf echo '* hard nofile 655360' >> /etc/security/limits.conf echo 'vm.max_map_count=655360' >> /etc/sysctl.conf sysctl -p > /dev/null 2>&1 || handle_error "sysctl配置加载失败" show_progress "操作系统设置完成!" } # 函数:配置Java环境 configure_java() { show_progress "正在卸载旧版JDK并解压新版本..." rpm -e --nodeps rpm -qa | grep -i jdk > /dev/null 2>&1 || true tar zxf $pack_dir/jdk-8u144-linux-x64.tar.gz -C /usr/java/ > /dev/null 2>&1 || handle_error "JDK解压失败" show_progress "正在配置Java环境变量..." cat <<EOF >> /etc/profile export JAVA_HOME=/usr/java/jdk1.8.0_144 export JRE_HOME=\$JAVA_HOME/jre export PATH=\$PATH:\$JAVA_HOME/bin export CLASSPATH=.:\\\$JAVA_HOME/lib/dt.jar:\\\$JAVA_HOME/lib/tools.jar EOF source /etc/profile || handle_error "Java环境变量加载失败" show_progress "Java环境配置完成!" } # 函数:设置本地Yum源 configure_yum() { show_progress "正在配置本地Yum源..." mkdir -p /etc/yum.repos.d/bak && mv /etc/yum.repos.d/*.repo /etc/yum.repos.d/bak > /dev/null 2>&1 || handle_error "Yum源备份失败" mount -o loop $pack_dir//CentOS-7-x86_64-DVD-2009.iso /tmp > /dev/null 2>&1 || handle_error "ISO挂载失败" cp -a /tmp /iflytek/web/centos > /dev/null 2>&1 || handle_error "Yum源复制失败" umount /tmp > /dev/null 2>&1 || handle_error "ISO卸载失败" cp $pack_dir//shell/my-z-centos.repo /etc/yum.repos.d/ > /dev/null 2>&1 || handle_error "Yum源配置文件复制失败" yum clean all > /dev/null 2>&1 && yum makecache > /dev/null 2>&1 || handle_error "Yum缓存更新失败" show_progress "本地Yum源配置完成!" } # 函数:安装依赖包 install_dependencies() { show_progress "正在安装依赖包..." yum -y install openssl openssl-devel gcc* vim net-tools zip unzip wget gzip lrzsz dos2unix sysstat-11.7.3-1.x86_64 > /dev/null 2>&1 || handle_error "依赖包安装失败" show_progress "依赖包安装完成!" } # 函数:解压安装包 unzip_packages() { show_progress "正在解压安装包..." for i in /iflytek/server/*.zip; do unzip -o $i -d $server_dir > /dev/null 2>&1 || handle_error "文件解压失败" done chmod -R 775 /iflytek/ || handle_error "权限设置失败" mv $server_dir/*.zip $pack_dir// > /dev/null 2>&1 || handle_error "文件移动失败" show_progress "安装包解压完成!" } # 函数:配置主机名 configure_hostname() { show_progress "正在配置主机名..." ip=$(ip addr | grep 'state UP' -A2 | grep inet | egrep -v '(127.0.0.1|inet6|docker)' | awk '{print $2}' | tr -d "addr:" | head -n 1 | cut -d / -f1) hostname skynet > /etc/hosts || handle_error "主机名设置失败" echo "$ip skynet" >> /etc/hosts echo "HOSTNAME=skynet" >> /etc/sysconfig/network echo "skynet" >/etc/hostname show_progress "主机名配置完成!" } # 函数:验证配置 validate_configuration() { show_progress "正在验证配置..." java -version > /dev/null 2>&1 || handle_error "Java环境验证失败" grep "vm.max_map_count" /etc/sysctl.conf | tail -1 > /dev/null 2>&1 || handle_error "系统限制验证失败" cat /etc/security/limits.conf | grep -i "65536" > /dev/null 2>&1 || handle_error "文件限制验证失败" cat /etc/hosts | grep "skynet" > /dev/null 2>&1 || handle_error "主机名验证失败" show_progress "所有配置验证通过!" } # 主函数 main() { create_directories configure_os configure_java configure_yum install_dependencies unzip_packages configure_hostname validate_configuration show_progress "所有操作已完成!" } # 执行主函数 main

sh #!/bin/bash # Color definitions RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' CYAN='\033[0;36m' NC='\033[0m' # No Color # Logo print_logo() { echo -e "${CYAN}" cat << "EOF" ██████╗██╗ ██╗██████╗ ███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╔════╝██║ ██║██╔══██╗██╔════╝██╔═══██╗██╔══██╗ ██╔══██╗██╔══██╗██╔═══██╗ ██║ ██║ ██║██████╔╝███████╗██║ ██║██████╔╝ ██████╔╝██████╔╝██║ ██║ ██║ ██║ ██║██╔══██╗╚════██║██║ ██║██╔══██╗ ██╔═══╝ ██╔══██╗██║ ██║ ╚██████╗╚██████╔╝██║ ██║███████║╚██████╔╝██║ ██║ ██║ ██║ ██║╚██████╔╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚══════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ EOF echo -e "${NC}" } # Get download folder path get_downloads_dir() { if [[ "$(uname)" == "Darwin" ]]; then echo "$HOME/Downloads" else if [ -f "$HOME/.config/user-dirs.dirs" ]; then . "$HOME/.config/user-dirs.dirs" echo "${XDG_DOWNLOAD_DIR:-$HOME/Downloads}" else echo "$HOME/Downloads" fi fi } # Get latest version get_latest_version() { echo -e "${CYAN}ℹ️ Checking latest version...${NC}" latest_release=$(curl -s https://api.github.com/repos/yeongpin/cursor-free-vip/releases/latest) || { echo -e "${RED}❌ Cannot get latest version information${NC}" exit 1 } VERSION=$(echo "$latest_release" | grep -o '"tag_name": ".*"' | cut -d'"' -f4 | tr -d 'v') if [ -z "$VERSION" ]; then echo -e "${RED}❌ Failed to parse version from GitHub API response:\n${latest_release}" exit 1 fi echo -e "${GREEN}✅ Found latest version: ${VERSION}${NC}" } # Detect system type and architecture detect_os() { if [[ "$(uname)" == "Darwin" ]]; then # Detect macOS architecture ARCH=$(uname -m) if [[ "$ARCH" == "arm64" ]]; then OS="mac_arm64" echo -e "${CYAN}ℹ️ Detected macOS ARM64 architecture${NC}" else OS="mac_intel" echo -e "${CYAN}ℹ️ Detected macOS Intel architecture${NC}" fi elif [[ "$(uname)" == "Linux" ]]; then # Detect Linux architecture ARCH=$(uname -m) if [[ "$ARCH" == "aarch64" || "$ARCH" == "arm64" ]]; then OS="linux_arm64" echo -e "${CYAN}ℹ️ Detected Linux ARM64 architecture${NC}" else OS="linux_x64" echo -e "${CYAN}ℹ️ Detected Linux x64 architecture${NC}" fi else # Assume Windows OS="windows" echo -e "${CYAN}ℹ️ Detected Windows system${NC}" fi } # Install and download install_cursor_free_vip() { local downloads_dir=$(get_downloads_dir) local binary_name="CursorFreeVIP_${VERSION}_${OS}" local binary_path="${downloads_dir}/${binary_name}" local download_url="https://github.com/yeongpin/cursor-free-vip/releases/download/v${VERSION}/${binary_name}" # Check if file already exists if [ -f "${binary_path}" ]; then echo -e "${GREEN}✅ Found existing installation file${NC}" echo -e "${CYAN}ℹ️ Location: ${binary_path}${NC}" # Check if running as root if [ "$EUID" -ne 0 ]; then echo -e "${YELLOW}⚠️ Requesting administrator privileges...${NC}" if command -v sudo >/dev/null 2>&1; then echo -e "${CYAN}ℹ️ Starting program with sudo...${NC}" sudo chmod +x "${binary_path}" sudo "${binary_path}" else echo -e "${YELLOW}⚠️ sudo not found, trying to run normally...${NC}" chmod +x "${binary_path}" "${binary_path}" fi else # Already running as root echo -e "${CYAN}ℹ️ Already running as root, starting program...${NC}" chmod +x "${binary_path}" "${binary_path}" fi return fi echo -e "${CYAN}ℹ️ No existing installation file found, starting download...${NC}" echo -e "${CYAN}ℹ️ Downloading to ${downloads_dir}...${NC}" echo -e "${CYAN}ℹ️ Download link: ${download_url}${NC}" # Check if file exists if curl --output /dev/null --silent --head --fail "$download_url"; then echo -e "${GREEN}✅ File exists, starting download...${NC}" else echo -e "${RED}❌ Download link does not exist: ${download_url}${NC}" echo -e "${YELLOW}⚠️ Trying without architecture...${NC}" # Try without architecture if [[ "$OS" == "mac_arm64" || "$OS" == "mac_intel" ]]; then OS="mac" binary_name="CursorFreeVIP_${VERSION}_${OS}" download_url="https://github.com/yeongpin/cursor-free-vip/releases/download/v${VERSION}/${binary_name}" echo -e "${CYAN}ℹ️ New download link: ${download_url}${NC}" if ! curl --output /dev/null --silent --head --fail "$download_url"; then echo -e "${RED}❌ New download link does not exist${NC}" exit 1 fi elif [[ "$OS" == "linux_x64" || "$OS" == "linux_arm64" ]]; then OS="linux" binary_name="CursorFreeVIP_${VERSION}_${OS}" download_url="https://github.com/yeongpin/cursor-free-vip/releases/download/v${VERSION}/${binary_name}" echo -e "${CYAN}ℹ️ New download link: ${download_url}${NC}" if ! curl --output /dev/null --silent --head --fail "$download_url"; then echo -e "${RED}❌ New download link does not exist${NC}" exit 1 fi else exit 1 fi fi # Download file if ! curl -L -o "${binary_path}" "$download_url"; then echo -e "${RED}❌ Download failed${NC}" exit 1 fi # Check downloaded file size local file_size=$(stat -f%z "${binary_path}" 2>/dev/null || stat -c%s "${binary_path}" 2>/dev/null) echo -e "${CYAN}ℹ️ Downloaded file size: ${file_size} bytes${NC}" # If file is too small, it might be an error message if [ "$file_size" -lt 1000 ]; then echo -e "${YELLOW}⚠️ Warning: Downloaded file is too small, possibly not a valid executable file${NC}" echo -e "${YELLOW}⚠️ File content:${NC}" cat "${binary_path}" echo "" echo -e "${RED}❌ Download failed, please check version and operating system${NC}" exit 1 fi echo -e "${CYAN}ℹ️ Setting executable permissions...${NC}" if chmod +x "${binary_path}"; then echo -e "${GREEN}✅ Installation completed!${NC}" echo -e "${CYAN}ℹ️ Program downloaded to: ${binary_path}${NC}" echo -e "${CYAN}ℹ️ Starting program...${NC}" # Run program directly "${binary_path}" else echo -e "${RED}❌ Installation failed${NC}" exit 1 fi } # Main program main() { print_logo get_latest_version detect_os install_cursor_free_vip } # Run main program main 下载的shell脚本是这样的,给我说一下这个脚本会做些什么

#!/bin/bash echo 1 > /proc/sys/net/ipv4/ip_forward lan1=enp8s0f0 lan2=enp8s0f1 lan3=enp4s0 lan4=enp5s0 lan5=enp6s0 lan6=enp7s0 lan7=enp10s0 lan8=enp11s0 lan9=enp5s0f2 lan10=enp5s0f3 lan11=enp7s0f2 lan12=enp7s0f3 lan13=enp8s0f0 lan14=enp8s0f1 lan15=enp8s0f2 lan16=enp8s0f3 ip netns add ns1 ip link set $lan1 netns ns1 ip netns exec ns1 ifconfig $lan1 1.1.1.1/24 ip netns add ns2 ip link set $lan2 netns ns2 ip netns exec ns2 ifconfig $lan2 1.1.1.2/24 ip netns add ns3 ip link set $lan3 netns ns3 ip netns exec ns3 ifconfig $lan3 2.2.2.1/24 ip netns add ns4 ip link set $lan4 netns ns4 ip netns exec ns4 ifconfig $lan4 2.2.2.2/24 ip netns add ns5 ip link set $lan5 netns ns5 ip netns exec ns5 ifconfig $lan5 3.3.3.1/24 ip netns add ns6 ip link set $lan6 netns ns6 ip netns exec ns6 ifconfig $lan6 3.3.3.2/24 ip netns add ns7 ip link set $lan7 netns ns7 ip netns exec ns7 ifconfig $lan7 4.4.4.1/24 ip netns add ns8 ip link set $lan8 netns ns8 ip netns exec ns8 ifconfig $lan8 4.4.4.2/24 ip netns add ns9 ip link set $lan9 netns ns9 ip netns exec ns9 ifconfig $lan9 5.5.5.1/24 ip netns add ns10 ip link set $lan10 netns ns10 ip netns exec ns10 ifconfig $lan10 5.5.5.2/24 ip netns add ns11 ip link set $lan11 netns ns11 ip netns exec ns11 ifconfig $lan11 6.6.6.1/24 ip netns add ns12 ip link set $lan12 netns ns12 ip netns exec ns12 ifconfig $lan12 6.6.6.2/24 ip netns add ns13 ip link set $lan13 netns ns13 ip netns exec ns13 ifconfig $lan13 7.7.7.1/24 ip netns add ns14 ip link set $lan14 netns ns14 ip netns exec ns14 ifconfig $lan14 7.7.7.2/24 ip netns add ns15 ip link set $lan15 netns ns15 ip netns exec ns15 ifconfig $lan15 8.8.8.1/24 ip netns add ns16 ip link set $lan16 netns ns16 ip netns exec ns16 ifconfig $lan16 8.8.8.2/24 echo -e "\033[1;33;40m Test network interface function \033[0m" sleep 5 ip netns exec ns1 iperf3 -s -D -p 1111 & sleep 1 ip netns exec ns2 iperf3 -c 1.1.1.1 -p 1111 -i 1 -t 5 -A 1,7 ip netns exec ns3 iperf3 -s -D -p 2222 & sleep 1 ip netns exec ns4 iperf3 -c 2.2.2.1 -p 2222 -i 1 -t 5 -R > br2.log & ip netns exec ns5 iperf3 -s -D -p 3333 & sleep 1 ip netns exec ns6 iperf3 -c 3.3.3.1 -p 3333 -i 1 -t 5 -R > br3.log & ip netns exec ns7 iperf3 -s -D -p 4444 & sleep 1 ip netns exec ns8 iperf3 -c 4.4.4.1 -p 4444 -i 1 -t 5 -R > br4.log & ip netns exec ns9 iperf3 -s -D -p 5555 & sleep 1 ip netns exec ns10 iperf3 -c 5.5.5.1 -p 5555 -i 1 -t 5 -R > br5.log & ip netns exec ns11 iperf3 -s -D -p 6666 & sleep 1 ip netns exec ns12 iperf3 -c 6.6.6.1 -p 6666 -i 1 -t 5 -R > br6.log & ip netns exec ns13 iperf3 -s -D -p 7777 sleep 1 ip netns exec ns14 iperf3 -c 7.7.7.1 -p 7777 -i 1 -t 5 -R > br7.log & ip netns exec ns15 iperf3 -s -D -p 8888 sleep 1 ip netns exec ns16 iperf3 -c 8.8.8.1 -p 8888 -i 1 -t 5 -R > br8.log & sleep 10 echo -e "\033[1;33;40m Number 1 network port \033[0m" cat br1.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 2 network port \033[0m" cat br2.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 3 network port \033[0m" cat br3.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 4 network port \033[0m" cat br4.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 5 network port \033[0m" cat br5.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 6 network port \033[0m" cat br6.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 7 network port \033[0m" cat br7.log |grep 0.00-5.00 -B 1 echo -e "\033[1;33;40m Number 8 network port \033[0m" cat br8.log |grep 0.00-5.00 -B 1 read -n1 -p "Press any key to continue........."注释代码,详细一下,并且进行优化

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Computer Camera Cuboid Detection System - Traditional Algorithm Approach Features: 1. 支持红、蓝、绿三种颜色物块识别 2. 优化红色识别,减少橙色干扰 3. 增大了最大轮廓识别面积 4. 实时桌面预览和性能统计 """ import cv2 import time import numpy as np import sys import logging # Configure logging system logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('cuboid_detection.log') ] ) logger = logging.getLogger(__name__) class CuboidDetectionSystem: def __init__(self, camera_index=0): # Camera configuration self.camera_index = camera_index self.cap = None # Detection parameters self.min_area = 1000 # Minimum contour area to consider self.max_area = 150000 # Increased maximum contour area (was 50000) self.aspect_ratio_min = 0.3 # Minimum aspect ratio for rectangles self.aspect_ratio_max = 3.0 # Maximum aspect ratio for rectangles self.circularity_thresh = 0.6 # Circularity threshold for cylinders # Color detection parameters (in HSV space) # Red - optimized to reduce orange interference self.red_lower1 = np.array([0, 150, 100]) # Increased saturation and value self.red_upper1 = np.array([10, 255, 255]) self.red_lower2 = np.array([170, 150, 100]) # Increased saturation and value self.red_upper2 = np.array([180, 255, 255]) # Blue self.blue_lower = np.array([100, 120, 70]) self.blue_upper = np.array([130, 255, 255]) # Green - added for green objects self.green_lower = np.array([35, 80, 60]) # Green range self.green_upper = np.array([85, 255, 255]) # Performance tracking self.fps_history = [] self.detection_history = [] self.last_detection_time = 0 # Initialize camera self.open_camera() logger.info("Cuboid Detection System (Traditional Algorithm) initialized successfully") def open_camera(self): """Open computer's external camera""" try: self.cap = cv2.VideoCapture(self.camera_index) if not self.cap.isOpened(): # Try common alternative indices for idx in [2, 1, 0]: self.cap = cv2.VideoCapture(idx) if self.cap.isOpened(): self.camera_index = idx break if not self.cap.isOpened(): logger.error("Unable to open any camera!") return False # Set camera resolution self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) logger.info(f"Camera opened successfully at index {self.camera_index}") return True except Exception as e: logger.error(f"Camera initialization failed: {str(e)}") return False def preprocess_frame(self, frame): """Preprocess frame for contour detection""" # Convert to HSV color space hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Blur to reduce noise hsv = cv2.GaussianBlur(hsv, (5, 5), 0) # Create masks for red, blue and green red_mask1 = cv2.inRange(hsv, self.red_lower1, self.red_upper1) red_mask2 = cv2.inRange(hsv, self.red_lower2, self.red_upper2) red_mask = cv2.bitwise_or(red_mask1, red_mask2) blue_mask = cv2.inRange(hsv, self.blue_lower, self.blue_upper) green_mask = cv2.inRange(hsv, self.green_lower, self.green_upper) # Combine masks color_mask = cv2.bitwise_or(red_mask, blue_mask) color_mask = cv2.bitwise_or(color_mask, green_mask) # Apply morphological operations to clean up the mask kernel = np.ones((7, 7), np.uint8) color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_OPEN, kernel) color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_CLOSE, kernel) # Additional dilation to fill gaps color_mask = cv2.dilate(color_mask, kernel, iterations=1) return color_mask, red_mask, blue_mask, green_mask def detect_shapes(self, mask, frame): """Detect rectangular and circular shapes in the mask""" contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detected_objects = [] for contour in contours: # Filter by area - max_area increased to 150000 area = cv2.contourArea(contour) if area < self.min_area or area > self.max_area: continue # Approximate the contour to a polygon peri = cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, 0.04 * peri, True) # Calculate shape properties x, y, w, h = cv2.boundingRect(approx) aspect_ratio = float(w) / h circularity = 4 * np.pi * area / (peri * peri) if peri > 0 else 0 # Detect rectangles (books, boxes) if len(approx) == 4 and self.aspect_ratio_min < aspect_ratio < self.aspect_ratio_max: # Calculate the angles between consecutive edges vectors = [] for i in range(4): pt1 = approx[i][0] pt2 = approx[(i + 1) % 4][0] vectors.append(np.array(pt2) - np.array(pt1)) # Calculate angles between consecutive vectors angles = [] for i in range(4): v1 = vectors[i] v2 = vectors[(i + 1) % 4] cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-5) angle = np.arccos(np.clip(cos_angle, -1, 1)) * 180 / np.pi angles.append(angle) # Check if angles are close to 90 degrees (rectangle) if all(70 < angle < 110 for angle in angles): detected_objects.append({ 'type': 'rectangle', 'contour': contour, 'approx': approx, 'center': (x + w // 2, y + h // 2), 'box': (x, y, w, h), 'area': area }) # Detect circles/cylinders elif circularity > self.circularity_thresh: detected_objects.append({ 'type': 'cylinder', 'contour': contour, 'approx': approx, 'center': (x + w // 2, y + h // 2), 'box': (x, y, w, h), 'area': area }) return detected_objects def detect_colors(self, frame, detected_objects, red_mask, blue_mask, green_mask): """Determine the color of detected objects""" results = [] for obj in detected_objects: x, y, w, h = obj['box'] # Create mask for the object region obj_mask = np.zeros(frame.shape[:2], dtype=np.uint8) cv2.drawContours(obj_mask, [obj['contour']], -1, 255, -1) # Extract the object region from color masks obj_red = cv2.bitwise_and(red_mask, red_mask, mask=obj_mask) obj_blue = cv2.bitwise_and(blue_mask, blue_mask, mask=obj_mask) obj_green = cv2.bitwise_and(green_mask, green_mask, mask=obj_mask) # Count red, blue and green pixels in the object region red_pixels = cv2.countNonZero(obj_red) blue_pixels = cv2.countNonZero(obj_blue) green_pixels = cv2.countNonZero(obj_green) total_pixels = cv2.countNonZero(obj_mask) # Determine dominant color color = "unknown" if total_pixels > 0: red_ratio = red_pixels / total_pixels blue_ratio = blue_pixels / total_pixels green_ratio = green_pixels / total_pixels # Require at least 40% dominance for color classification if red_ratio > 0.4 and red_ratio > blue_ratio and red_ratio > green_ratio: color = "red" elif blue_ratio > 0.4 and blue_ratio > red_ratio and blue_ratio > green_ratio: color = "blue" elif green_ratio > 0.4 and green_ratio > red_ratio and green_ratio > blue_ratio: color = "green" # Add to results results.append({ 'type': obj['type'], 'color': color, 'center': obj['center'], 'box': obj['box'], 'contour': obj['contour'], 'timestamp': time.time() }) return results def detect_cuboids(self, frame): """Detect cuboid objects using traditional computer vision techniques""" # Step 1: Preprocess frame to create color mask color_mask, red_mask, blue_mask, green_mask = self.preprocess_frame(frame) # Step 2: Detect shapes detected_objects = self.detect_shapes(color_mask, frame) # Step 3: Detect colors of the shapes results = self.detect_colors(frame, detected_objects, red_mask, blue_mask, green_mask) return results, color_mask def run(self): """Main loop for desktop preview and detection""" logger.info("Starting main detection loop") window_name = "Cuboid Detection (Traditional Algorithm)" cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) cv2.resizeWindow(window_name, 1200, 800) # Create mask window cv2.namedWindow("Color Mask", cv2.WINDOW_NORMAL) cv2.resizeWindow("Color Mask", 600, 400) frame_count = 0 start_time = time.time() try: while True: # Start timer for FPS calculation frame_start = time.time() # Get frame from camera ret, frame = self.cap.read() if not ret: logger.warning("Failed to capture frame") time.sleep(0.1) continue frame_count += 1 # Detect cuboids results, color_mask = self.detect_cuboids(frame) # Draw results on frame for result in results: color_type = result['color'] obj_type = result['type'] x, y, w, h = result['box'] center_x, center_y = result['center'] # Determine drawing color based on detected color if color_type == "red": color = (0, 0, 255) # Red in BGR elif color_type == "blue": color = (255, 0, 0) # Blue in BGR elif color_type == "green": color = (0, 255, 0) # Green in BGR else: color = (0, 255, 255) # Yellow for unknown # Draw bounding box cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) # Draw contour cv2.drawContours(frame, [result['contour']], -1, color, 2) # Draw center point cv2.circle(frame, (center_x, center_y), 5, color, -1) # Draw label label = f"{color_type} {obj_type}" cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) # Calculate FPS current_time = time.time() elapsed_time = current_time - start_time fps = frame_count / elapsed_time if elapsed_time > 0 else 0 # Update FPS history (keep last 30 values) self.fps_history.append(fps) if len(self.fps_history) > 30: self.fps_history.pop(0) avg_fps = sum(self.fps_history) / len(self.fps_history) if self.fps_history else 0 # Update detection history detection_status = 1 if results else 0 self.detection_history.append(detection_status) if len(self.detection_history) > 30: self.detection_history.pop(0) detection_rate = sum(self.detection_history) / len(self.detection_history) * 100 # Display performance stats stats_y = 30 cv2.putText(frame, f"FPS: {avg_fps:.1f}", (10, stats_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) cv2.putText(frame, f"Detection Rate: {detection_rate:.1f}%", (10, stats_y + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.putText(frame, f"Objects: {len(results)}", (10, stats_y + 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 200, 255), 2) # Display algorithm info cv2.putText(frame, "Algorithm: Traditional CV (Contour + Color Analysis)", (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) # Show frames cv2.imshow(window_name, frame) cv2.imshow("Color Mask", color_mask) # Process keyboard input key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: # 'q' or ESC break elif key == ord('c'): # Capture screenshot timestamp = time.strftime("%Y%m%d-%H%M%S") filename = f"capture_{timestamp}.png" cv2.imwrite(filename, frame) logger.info(f"Screenshot saved as {filename}") elif key == ord('r'): # Reset detection history self.detection_history = [] self.fps_history = [] start_time = time.time() frame_count = 0 logger.info("Reset performance counters") elif key == ord('d'): # Toggle debug mode cv2.imshow("Red Mask", self.red_mask) cv2.imshow("Green Mask", self.green_mask) cv2.imshow("Blue Mask", self.blue_mask) logger.info("Debug masks displayed") # Calculate processing time frame_time = time.time() - frame_start target_frame_time = 1 / 30 # Target 30 FPS if frame_time < target_frame_time: time.sleep(target_frame_time - frame_time) except Exception as e: logger.error(f"Main loop exception: {str(e)}", exc_info=True) finally: self.cleanup() def cleanup(self): """Clean up resources""" # Release camera if self.cap and self.cap.isOpened(): self.cap.release() logger.info("Camera resources released") # Close windows cv2.destroyAllWindows() logger.info("Program exited safely") def print_controls(): """Print program controls""" print("=" * 70) print("Computer Camera Cuboid Detection System - Traditional Algorithm") print("=" * 70) print("Detects cuboid objects (books, boxes, etc.) using computer vision techniques") print("Supports RED, BLUE and GREEN objects") print("Desktop window shows real-time preview with bounding boxes") print("Second window shows the color mask used for detection") print("=" * 70) print("Controls:") print(" q or ESC - Quit program") print(" c - Capture screenshot") print(" r - Reset performance counters") print(" d - Show debug masks (red, green, blue)") print("=" * 70) print("Detection parameters:") print(" - Red, blue and green color detection in HSV space") print(" - Rectangle detection based on polygon approximation and angle analysis") print(" - Cylinder detection based on circularity metric") print(f" - Max contour area: 150000 (was 50000)") print("=" * 70) if __name__ == "__main__": print_controls() # Default camera index (0 is usually the built-in or first external camera) camera_index = 0 # Allow specifying camera index from command line if len(sys.argv) > 1: try: camera_index = int(sys.argv[1]) print(f"Using camera index: {camera_index}") except ValueError: print(f"Invalid camera index: {sys.argv[1]}, using default (0)") try: detector = CuboidDetectionSystem(camera_index=camera_index) detector.run() except Exception as e: logger.error(f"System startup failed: {str(e)}", exc_info=True) print(f"System startup failed: {str(e)}") 现在就只是简单的将这个桌面运行的代码,完整的转化为在vnc上运行的代码就可以了,因为这个代码的效果是很不错的,我现在只想看看它在vnc上运行时机器狗的识别效果如何!!!! 给出完整的转换代码!!!

#!/usr/bin/env python # coding=utf-8 import rospy import cv2 import cv_bridge import numpy import os # 用于运行外部程序 import time # 用于计时功能 import subprocess # 用于管理外部进程 from geometry_msgs.msg import Twist from sensor_msgs.msg import Image import math class Follower: def __init__(self): # 初始化ROS相关组件 self.bridge = cv_bridge.CvBridge() self.image_sub = rospy.Subscriber("/usb_cam/image_raw", Image, self.image_callback) self.cmd_vel_pub = rospy.Publisher("/cmd_vel", Twist, queue_size=1) self.twist = Twist() # 计时相关变量 self.primary_start_time = time.time() # 记录主开始时间 self.secondary_start_time = None # 记录恢复后的开始时间 self.third_start_time = None # 新增:记录renqunos1启动后的时间戳 self.elapsed_primary_time = 0.0 # 主经过时间 self.elapsed_secondary_time = 0.0 # 恢复后经过时间 self.elapsed_third_time = 0.0 # 新增:renqunos1启动后的经过时间 self.final_secondary_time = 0.0 # 记录停止时的二次时间 self.timer_stopped = False # 标记计时器是否已停止 self.secondary_phase = 0 # 二次计时阶段:0-初始阶段 1-fangos后阶段 # 状态标记变量 self.programs_executed = False # 标记初始外部程序是否已执行 self.loufangos_executed = False # 标记loufangos.py是否已执行 self.fangos_executed = False # 标记fangos.py是否已执行 self.renqunos_executed = False # 标记renqunos.py是否已执行 self.renqunos2_executed = False # 新增:标记renqunos2.py是否已执行 self.paused_primary_time = 0.0 # 记录暂停时已过去的主时间 self.navigation_restored = False # 标记是否已恢复巡线功能 # 控制参数整合 self.pid_params = { 'Kp': 0.082, 'Ki': 0.005, 'Kd': 0.05, 'output': 0.0, 'error': 0.0, 'last_error': 0.0, 'integral': 0.0, 'integral_limit': 50.0 } self.angular_params = { 'max': 0.8, 'min': -0.5, 'rate_limit': 0.3, 'last': 0.0, 'smoothing': 0.3, 'current': 0.0 } # 状态与检测参数 self.line_mode = "LINE_FOLLOWING" self.last_valid_line = "NONE" self.turn_threshold = 5.0 self.line_angle = 0.0 self.is_turning = False self.min_line_area = 1000 self.max_line_area = 40000 # 最大轨迹面积阈值 self.line_area = 0 # 运动参数 self.speeds = { 'no_line_linear': 0.16, 'no_line_angular': 0.0, 'turning_linear': 0.16, 'normal_linear': 0.16, 'straight_linear': 0.2 # 直行模式线速度 } # 可视化样式参数 self.colors = { 'panel': (40, 40, 40), 'text': (255, 255, 255), 'highlight': (0, 255, 255), 'border': (0, 255, 0), 'roi': (255, 165, 0), 'target_line': (0, 255, 0), 'marker': (0, 0, 255), # 红色标记点 (B, G, R) 'marker_border': (255, 255, 255), 'dashboard_bg': (30, 30, 30), 'dashboard_border': (100, 100, 100), 'speed': (0, 255, 0), 'angular': (0, 0, 255), 'tick': (200, 200, 200), 'label': (255, 255, 255), 'needle_head': (255, 255, 0) } self.style = { 'panel_alpha': 0.7, 'dashboard_alpha': 0.8, 'padding': 8, 'font': cv2.FONT_HERSHEY_SIMPLEX, 'font_scale': 0.6, 'watermark_scale': 0.8, 'watermark_thickness': 2, 'line_type': cv2.LINE_AA if hasattr(cv2, 'LINE_AA') else 1 } # 存储需要管理的ROS launch进程标识 self.launch_identifiers = [ "easy_handeye publish.launch", "apriltag_ros continuous_detection.launch", "mirobot_moveit_config mirobot.launch", "jxblaunch程序", "mirobot.launch" ] def reset_pid(self): """重置PID控制器参数""" pid = self.pid_params pid['output'] = pid['error'] = pid['last_error'] = pid['integral'] = 0.0 self.angular_params['last'] = 0.0 def smooth_angular(self, target_angular): """平滑角速度输出""" ap = self.angular_params delta = target_angular - ap['last'] # 限制角速度变化率 if delta > ap['rate_limit']: target_angular = ap['last'] + ap['rate_limit'] elif delta < -ap['rate_limit']: target_angular = ap['last'] - ap['rate_limit'] # 平滑处理 smoothed = (1 - ap['smoothing']) * target_angular + ap['smoothing'] * ap['last'] smoothed = max(min(smoothed, ap['max']), ap['min']) ap['last'] = ap['current'] = smoothed return smoothed def calculate_line_angle(self, mask): """计算线条相对水平线的角度""" contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return 0.0 max_contour = max(contours, key=cv2.contourArea) rect = cv2.minAreaRect(max_contour) angle = rect[2] # 角度标准化 if angle < -45: angle += 90 elif angle > 45: angle -= 90 return abs(angle) def draw_transparent_panel(self, image, x, y, width, height): """绘制半透明面板""" overlay = image.copy() cv2.rectangle(overlay, (x, y), (x + width, y + height), self.colors['panel'], -1) cv2.addWeighted(overlay, self.style['panel_alpha'], image, 1 - self.style['panel_alpha'], 0, image) cv2.rectangle(image, (x, y), (x + width, y + height), self.colors['border'], 1, self.style['line_type']) def draw_dashboard(self, image): """绘制带指针的仪表盘""" h, w = image.shape[:2] speed_center = (w//4, h - 100) angular_center = (3*w//4, h - 100) radius = 50 overlay = image.copy() # 绘制速度表盘 self.draw_gauge(overlay, speed_center, radius, "SPEED", 0, 0.4, self.twist.linear.x, self.colors['speed'], -150, -30) # 绘制角速度表盘 self.draw_gauge(overlay, angular_center, radius, "ANGULAR", -0.8, 0.8, self.angular_params['current'], self.colors['angular'], 30, 150) cv2.addWeighted(overlay, self.style['dashboard_alpha'], image, 1 - self.style['dashboard_alpha'], 0, image) def draw_gauge(self, overlay, center, radius, label, min_val, max_val, current_val, color, start_angle, end_angle): """绘制单个指针式仪表""" # 绘制表盘背景 cv2.circle(overlay, center, radius, self.colors['dashboard_bg'], -1) cv2.circle(overlay, center, radius, self.colors['dashboard_border'], 2, self.style['line_type']) # 绘制刻度 for i in range(7): # 6+1个刻度 angle = start_angle + (end_angle - start_angle) * i / 6 rad = math.radians(angle) tick_start = ( center[0] + (radius - 8) * math.cos(rad), center[1] + (radius - 8) * math.sin(rad) ) tick_end = ( center[0] + radius * math.cos(rad), center[1] + radius * math.sin(rad) ) cv2.line(overlay, (int(tick_start[0]), int(tick_start[1])), (int(tick_end[0]), int(tick_end[1])), self.colors['tick'], 2, self.style['line_type']) # 计算指针角度 val_range = max_val - min_val normalized = 0 if val_range == 0 else max(0, min(1, (current_val - min_val) / val_range)) pointer_angle = start_angle + (end_angle - start_angle) * normalized pointer_rad = math.radians(pointer_angle) # 绘制指针 pointer_end = ( center[0] + (radius - 10) * math.cos(pointer_rad), center[1] + (radius - 10) * math.sin(pointer_rad) ) cv2.line(overlay, center, (int(pointer_end[0]), int(pointer_end[1])), color, 3, self.style['line_type']) cv2.circle(overlay, (int(pointer_end[0]), int(pointer_end[1])), 5, self.colors['needle_head'], -1, self.style['line_type']) cv2.circle(overlay, center, 4, (255, 255, 255), -1, self.style['line_type']) # 绘制标签和数值 cv2.putText(overlay, label, (center[0] - 30, center[1] + radius + 15), self.style['font'], 0.5, self.colors['label'], 1, self.style['line_type']) cv2.putText(overlay, "%.2f" % current_val, (center[0] - 20, center[1] - radius - 10), self.style['font'], 0.45, self.colors['label'], 1, self.style['line_type']) def draw_watermark(self, image): """绘制水印""" h, w = image.shape[:2] text = "GiveMeFive" text_size = cv2.getTextSize(text, self.style['font'], self.style['watermark_scale'], self.style['watermark_thickness'])[0] text_pos = ((w - text_size[0]) // 2, h - 20) # 绘制带边框文本 cv2.putText(image, text, text_pos, self.style['font'], self.style['watermark_scale'], (0, 0, 0), 3, self.style['line_type']) cv2.putText(image, text, text_pos, self.style['font'], self.style['watermark_scale'], self.colors['text'], self.style['watermark_thickness'], self.style['line_type']) def draw_roi_border(self, image, left, right, top, bottom): """绘制ROI虚线边框""" line_len, gap_len = 10, 5 # 顶部边框 x = left while x < right: cv2.line(image, (x, top), (min(x + line_len, right), top), self.colors['roi'], 2, self.style['line_type']) x += line_len + gap_len # 底部边框 x = left while x < right: cv2.line(image, (x, bottom), (min(x + line_len, right), bottom), self.colors['roi'], 2, self.style['line_type']) x += line_len + gap_len # 左侧边框 y = top while y < bottom: cv2.line(image, (left, y), (left, min(y + line_len, bottom)), self.colors['roi'], 2, self.style['line_type']) y += line_len + gap_len # 右侧边框 y = top while y < bottom: cv2.line(image, (right, y), (right, min(y + line_len, bottom)), self.colors['roi'], 2, self.style['line_type']) y += line_len + gap_len def kill_ros_launch(self, launch_name): """关闭指定的ROS launch进程""" try: os.system("pkill -f '{}'".format(launch_name)) rospy.loginfo("已关闭ROS launch: {}".format(launch_name)) except Exception as e: rospy.logerr("关闭ROS launch {} 失败: {}".format(launch_name, str(e))) def execute_initial_programs(self): """执行初始外部程序并关闭相关launch""" rospy.loginfo("到达18.3秒,准备执行初始外部程序...") # 停止机器人运动 self.twist.linear.x = 0.0 self.twist.angular.z = 0.0 self.cmd_vel_pub.publish(self.twist) rospy.loginfo("机器人已停止") # 运行初始外部程序 rospy.loginfo("开始运行jxbos.py...") os.system("python /home/eaibot/rt_ws/src/shibie/jxbos.py") rospy.loginfo("开始运行5.py...") os.system("python /home/eaibot/rt_ws/src/shibie/jxb/5.py") rospy.loginfo("初始外部程序执行完毕,准备关闭ROS launch并恢复巡线功能") # 关闭所有指定的ROS launch文件 for launch in self.launch_identifiers: self.kill_ros_launch(launch) # 记录暂停时间并初始化二次计时器(第一阶段) self.paused_primary_time = self.elapsed_primary_time self.secondary_start_time = time.time() self.secondary_phase = 0 # 标记为初始二次计时阶段 rospy.loginfo("ROS launch已关闭,巡线功能已恢复") self.reset_pid() def execute_loufang_program(self): """执行loufangos.py程序(不停车,保持巡线)""" rospy.loginfo("二次计时达到10秒,启动loufangos1.py程序(保持巡线)...") # 启动程序但不停止机器人运动 os.system("python /home/eaibot/rt_ws/src/shibie/loufangos1.py &") # 后台运行以不阻塞巡线 self.loufangos_executed = True rospy.loginfo("loufangos.py程序启动完成,继续巡线...") def execute_fang_program(self): """执行fangos.py程序(停车)""" rospy.loginfo("二次计时达到13秒,停止小车并启动fangos.py程序...") # 停止小车运动 self.twist.linear.x = 0.0 self.twist.angular.z = 0.0 self.cmd_vel_pub.publish(self.twist) # 执行fangos.py程序 rospy.loginfo("开始运行fangos.py...") os.system("python /home/eaibot/rt_ws/src/shibie/fangos.py") # 程序执行完毕后关闭launch并恢复巡线 for launch in self.launch_identifiers: self.kill_ros_launch(launch) self.fangos_executed = True self.navigation_restored = True self.reset_pid() # 重置二次计时器,进入第二阶段计时 self.timer_stopped = False # 重新启动计时器 self.secondary_phase = 1 # 标记为fangos后的二次计时阶段 self.secondary_start_time = time.time() # 重置计时起点 self.elapsed_secondary_time = 0.0 # 重置已过时间 rospy.loginfo("fangos.py程序执行完毕,已重置二次计时器并恢复巡线功能") def execute_renqu_program(self): """执行renqunos.py程序(不停车,保持巡线)""" rospy.loginfo("fangos后二次计时达到8秒,启动renqunos1.py程序(保持巡线)...") # 不停止小车,保持当前运动状态 # 执行renqunos.py程序(后台运行不阻塞巡线) rospy.loginfo("开始运行renqunos1.py...") os.system("python /home/eaibot/rt_ws/src/shibie/renqunos1.py &") # 使用&后台运行 # 新增:初始化第三个时间戳 self.third_start_time = time.time() self.elapsed_third_time = 0.0 # 程序执行完毕后标记状态 self.renqunos_executed = True rospy.loginfo("renqunos1.py程序启动完成,继续巡线...") def execute_renqu2_program(self): """新增:执行renqunos2.py程序(不停车,保持巡线)""" rospy.loginfo("第三个时间戳达到18秒,启动renqunos2.py程序(保持巡线)...") # 不停止小车,保持当前运动状态 # 执行renqunos2.py程序(后台运行不阻塞巡线) rospy.loginfo("开始运行renqunos2.py...") os.system("python /home/eaibot/rt_ws/src/shibie/renqunos2.py &") # 使用&后台运行 # 程序执行完毕后标记状态 self.renqunos2_executed = True rospy.loginfo("renqunos2.py程序启动完成,继续巡线...") def image_callback(self, msg): # 计算经过的时间 if not self.programs_executed: self.elapsed_primary_time = time.time() - self.primary_start_time else: if self.secondary_start_time and not self.timer_stopped: self.elapsed_secondary_time = time.time() - self.secondary_start_time # 新增:计算第三个时间戳(renqunos1启动后) if self.third_start_time and not self.timer_stopped and self.renqunos_executed: self.elapsed_third_time = time.time() - self.third_start_time # 初始阶段:检查是否到达18.2秒执行初始程序 if not self.programs_executed and self.elapsed_primary_time >= 20.27: self.execute_initial_programs() self.programs_executed = True return # 二次计时第一阶段:检查10秒执行loufangos.py(保持巡线) if (self.programs_executed and self.secondary_phase == 0 and not self.loufangos_executed and not self.timer_stopped and self.elapsed_secondary_time >= 16): self.execute_loufang_program() # 不返回,继续执行巡线逻辑 # 二次计时第一阶段:检查13秒执行fangos.py(停车) if (self.programs_executed and self.secondary_phase == 0 and self.loufangos_executed and not self.fangos_executed and not self.timer_stopped and self.elapsed_secondary_time >= 22.8): self.execute_fang_program() return # 二次计时第二阶段(fangos后):检查8秒执行renqunos1.py if (self.programs_executed and self.secondary_phase == 1 and self.fangos_executed and not self.renqunos_executed and not self.timer_stopped and self.elapsed_secondary_time >= 8.2): self.execute_renqu_program() # 不返回,继续执行巡线逻辑 # 新增:第三个时间戳监控:检查12秒执行renqunos2.py if (self.programs_executed and self.secondary_phase == 1 and self.renqunos_executed and not self.renqunos2_executed and not self.timer_stopped and self.elapsed_third_time >= 18): self.execute_renqu2_program() # 不返回,继续执行巡线逻辑 # 图像转换与预处理 image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 黑色线条检测 mask = cv2.inRange(hsv, (0, 0, 0), (85, 55, 65)) mask = cv2.medianBlur(mask, 5) # 定义并应用ROI h, w, d = image.shape search = {'left': w//4-140, 'right': 3*w//4-160, 'top':0, 'bottom': h*2//3 +100} mask[0:search['top'], :] = mask[search['bottom']:h, :] = 0 mask[:, 0:search['left']] = mask[:, search['right']:w] = 0 # 计算线条面积并绘制ROI边框 self.line_area = cv2.countNonZero(mask) self.draw_roi_border(image, **search) # 线条检测与控制逻辑(始终保持巡线) if not self.timer_stopped or self.navigation_restored: pid = self.pid_params M = cv2.moments(mask) if M['m00'] > 0 and self.line_area > self.min_line_area: # 大面积轨迹处理 if self.line_area > self.max_line_area: self.line_mode = "STRAIGHT_LINE" self.twist.linear.x = self.speeds['straight_linear'] self.twist.angular.z = 0.0 self.angular_params['current'] = 0.0 self.last_valid_line = "WIDE" self.is_turning = False self.line_angle = 0.0 else: # 常规巡线模式 self.line_mode = "LINE_FOLLOWING" cx, cy = int(M['m10']/M['m00']), int(M['m01']/M['m00']) # 中心点调整 cx = cx - 200 # 绘制中心标记 cv2.circle(image, (cx, cy), 8, self.colors['marker'], -1) cv2.circle(image, (cx, cy), 9, self.colors['marker_border'], 1, self.style['line_type']) # 绘制目标线 desired_h = h*2//3 - 225 cv2.line(image, (0, desired_h), (w, desired_h), self.colors['target_line'], 2, self.style['line_type']) cv2.arrowedLine(image, (w-30, desired_h), (w, desired_h), self.colors['target_line'], 2, self.style['line_type']) cv2.arrowedLine(image, (30, desired_h), (0, desired_h), self.colors['target_line'], 2, self.style['line_type']) # 角度计算与转弯检测 self.line_angle = self.calculate_line_angle(mask) self.is_turning = self.line_angle > self.turn_threshold # PID控制计算 pid['last_error'] = pid['error'] pid['error'] = desired_h - cy pid['integral'] = max(min(pid['integral'] + pid['error'], pid['integral_limit']), -pid['integral_limit']) pid['output'] = pid['Kp']*pid['error'] + pid['Ki']*pid['integral'] + pid['Kd']*(pid['error'] - pid['last_error']) # 设置运动参数(始终保持巡线) self.twist.linear.x = self.speeds['turning_linear'] if self.is_turning else self.speeds['normal_linear'] self.smooth_angular(-pid['output'] / 15) self.twist.angular.z = self.angular_params['current'] self.last_valid_line = "FOUND" else: # 无线模式 self.line_mode = "NO_LINE" self.twist.linear.x = self.speeds['no_line_linear'] self.twist.angular.z = self.speeds['no_line_angular'] self.angular_params['current'] = self.speeds['no_line_angular'] self.last_valid_line = "LOST" if self.line_area <= self.min_line_area else "SMALL" self.is_turning = False self.line_angle = 0.0 else: # 计时停止且未恢复巡线时保持停止状态 self.twist.linear.x = 0.0 self.twist.angular.z = 0.0 self.angular_params['current'] = 0.0 self.line_mode = "STOPPED" self.last_valid_line = "STOPPED" # 时间显示 cv2.putText(image, "Primary Time: %.1fs" % self.elapsed_primary_time, (18, 130), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) if self.programs_executed: secondary_time = self.final_secondary_time if self.timer_stopped else self.elapsed_secondary_time # 显示当前二次计时阶段 phase_text = " (Phase {})".format(self.secondary_phase) if self.secondary_phase > 0 else "" time_text = "Secondary Time: %.1fs%s" % (secondary_time, phase_text) color = self.colors['highlight'] if not self.timer_stopped else (0, 165, 255) cv2.putText(image, time_text, (18, 150), self.style['font'], self.style['font_scale'], color, 1, self.style['line_type']) # 新增:显示第三个时间戳 if self.renqunos_executed: third_text = "Third Time: %.1fs" % self.elapsed_third_time cv2.putText(image, third_text, (18, 170), self.style['font'], self.style['font_scale'], (255, 100, 100), 1, self.style['line_type']) # 绘制信息面板 panel_height = 220 if self.programs_executed else 140 # 增加高度以显示更多状态 panel = {'w': 220, 'h': panel_height, 'offset': 20} # 系统状态面板 self.draw_transparent_panel(image, 10, 10, panel['w'], panel['h']) cv2.putText(image, "SYSTEM STATUS", (18, 30), self.style['font'], self.style['font_scale'], self.colors['highlight'], 1, self.style['line_type']) cv2.putText(image, "Mode: %s" % self.line_mode, (18, 50), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Line: %s" % self.last_valid_line, (18, 70), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Linear: %.2f m/s" % self.twist.linear.x, (18, 90), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Angular: %.3f rad/s" % self.angular_params['current'], (18, 110), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) # 二次阶段状态显示 if self.programs_executed: cv2.putText(image, "Loufangos: %s" % ("DONE" if self.loufangos_executed else "PENDING"), (18, 130), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.loufangos_executed else self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Fangos: %s" % ("DONE" if self.fangos_executed else "PENDING"), (18, 150), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.fangos_executed else self.colors['text'], 1, self.style['line_type']) # 显示renqunos执行状态 cv2.putText(image, "Renqunos1: %s" % ("DONE" if self.renqunos_executed else "PENDING"), (18, 170), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.renqunos_executed else self.colors['text'], 1, self.style['line_type']) # 新增:显示renqunos2执行状态 cv2.putText(image, "Renqunos2: %s" % ("DONE" if self.renqunos2_executed else "PENDING"), (18, 190), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.renqunos2_executed else self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Navigation: %s" % ("RESTORED" if self.navigation_restored else "ACTIVE"), (18, 210), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.navigation_restored else self.colors['text'], 1, self.style['line_type']) # 线条分析面板 self.draw_transparent_panel(image, w - panel['w'] - 10, 10, panel['w'], panel['h']) cv2.putText(image, "LINE ANALYSIS", (w - panel['w'] + 8, 30), self.style['font'], self.style['font_scale'], self.colors['highlight'], 1, self.style['line_type']) cv2.putText(image, "Angle: %.1f deg" % self.line_angle, (w - panel['w'] + 8, 50), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Turning: %s" % ("YES" if self.is_turning else "NO"), (w - panel['w'] + 8, 70), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Area: %d/%d px" % (self.line_area, self.min_line_area), (w - panel['w'] + 8, 90), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) cv2.putText(image, "Max Area: %d" % self.max_line_area, (w - panel['w'] + 8, 110), self.style['font'], self.style['font_scale'], self.colors['text'], 1, self.style['line_type']) # 程序执行状态 status_text = "Post-Program: Active" if self.programs_executed else "Pre-Program: Active" cv2.putText(image, status_text, (w - panel['w'] + 8, 130), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.programs_executed else self.colors['text'], 1, self.style['line_type']) # 计时器状态 timer_text = "Timer: STOPPED" if self.timer_stopped else "Timer: RUNNING" cv2.putText(image, timer_text, (w - panel['w'] + 8, 150), self.style['font'], self.style['font_scale'], (0, 165, 255) if self.timer_stopped else self.colors['text'], 1, self.style['line_type']) # 显示当前二次计时阶段 if self.programs_executed: phase_status = "Phase {} Active".format(self.secondary_phase) if self.secondary_phase > 0 else "Initial Phase" cv2.putText(image, "Secondary: {}".format(phase_status), (w - panel['w'] + 8, 170), self.style['font'], self.style['font_scale'], self.colors['highlight'] if self.secondary_phase > 0 else self.colors['text'], 1, self.style['line_type']) # 新增:显示第三个计时器状态 if self.renqunos_executed: third_status = "Third Timer: Active" if not self.renqunos2_executed else "Third Timer: Completed" cv2.putText(image, "{}".format(third_status), (w - panel['w'] + 8, 190), self.style['font'], self.style['font_scale'], (255, 100, 100) if not self.renqunos2_executed else self.colors['highlight'], 1, self.style['line_type']) # 绘制仪表盘和水印 self.draw_dashboard(image) self.draw_watermark(image) # 发布指令与显示图像 self.cmd_vel_pub.publish(self.twist) cv2.imshow("Camera View", image) cv2.imshow("Mask View", mask) cv2.waitKey(3) if __name__ == "__main__": rospy.init_node("opencv_line_follower") follower = Follower() rospy.spin() 上面是第一个程序叫做xunxian20.py,用来巡线。(python2.7) 修改上下两个红绿灯识别部分不太完善的代码。然后给我完整的修复过的,按照各自版本的代码。 修改方式为,当xunxian20.py的第三个时间戳结束后启动第四个时间戳当第四个时间戳到达8秒后。打开一个关于摄像头的launch用来发布图像话题,具体语句为"roslaunch usb_cam usb_cam-test1.launch",如果此launch已经启动则跳过,当此launch启动完毕后。启动位于“/home/eaibot/rt_ws/src/shibie/light”路径下且在虚拟环境名为yolov5的虚拟环境中的lightdetect.py代码。lightdetect.py会订阅上面启动的launch发布的图像,以固定频率订阅话题,话题名为'/usb_cam1/image_raw',并且以固定频率检测这些订阅的图片。第一种情况当第一次检测的结果是green时退出lightdetect.py程序,然后退出"roslaunch usb_cam usb_cam-test1.launch"启动的launch。这一过程从启动launch到关闭launch都是一直保持巡线的,小车不会停下。第二种情况,当第一次检测的结果是yellow和red时阻塞xunxian20.py里用“roslaunch xpkg_bringup bringup_basic_ctrl.launch”发布的 '/cmd_vel'话题,也就是说依旧保持巡线但是小车静止不动,然后再次订阅并检测图片,直到检测出green后退出lightdetect,py程序,然后关闭"roslaunch usb_cam usb_cam-test1.launch"启动的launch然后停止对'/cmd_vel'话题的阻塞。也就是说整个过程小车一直保持巡线唯一一种停下的情况是检测出red和yellow时阻塞“/cmd_vel"的接收使小车暂时停止。 下面是第二个程序叫做lightdetect.py,用来对红绿灯做出检测和判断。(python3) #!/usr/bin/python # -*- coding: utf-8 -*- import argparse import os import platform import sys from pathlib import Path import numpy as np import yaml import torch import cv2 import csv from ultralytics.utils.plotting import Annotator, colors # ===== 处理pathlib跨平台路径问题 ===== import pathlib plt = platform.system() if plt != 'Windows': pathlib.WindowsPath = pathlib.PosixPath # 再次强化路径兼容性 if platform.system() != "Windows": from pathlib import PosixPath Path.WindowsPath = PosixPath # ROS相关导入 import rospy from sensor_msgs.msg import Image from geometry_msgs.msg import Twist # 自定义CvBridge解决兼容性问题 class CvBridge: def imgmsg_to_cv2(self, img_msg, encoding): if encoding == "bgr8": if img_msg.encoding not in ["bgr8", "rgb8"]: raise Exception(f"图像编码不匹配: 期望bgr8或rgb8, 实际{img_msg.encoding}") dtype = np.uint8 image = np.frombuffer(img_msg.data, dtype=dtype).reshape( img_msg.height, img_msg.width, 3 ) if img_msg.encoding == "rgb8": image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) return image else: raise Exception(f"不支持的编码格式: {encoding}") def cv2_to_imgmsg(self, cv_image, encoding="bgr8"): img_msg = Image() img_msg.height = cv_image.shape[0] img_msg.width = cv_image.shape[1] img_msg.encoding = encoding img_msg.is_bigendian = 0 img_msg.data = cv_image.tobytes() img_msg.step = img_msg.width * 3 return img_msg # 添加YOLOv5根目录到系统路径 current_script_path = os.path.abspath(__file__) current_dir = os.path.dirname(current_script_path) yolov5_root = os.path.dirname(current_dir) sys.path.insert(0, yolov5_root) # 打印路径信息用于调试 print(f"当前脚本路径: {current_script_path}") print(f"YOLOv5根目录: {yolov5_root}") print(f"当前工作目录: {os.getcwd()}") print("系统路径:") for p in sys.path: print(f"- {p}") # 导入YOLOv5模块 from models.common import DetectMultiBackend from utils.general import ( check_file, check_img_size, check_imshow, non_max_suppression, scale_boxes, xyxy2xywh, increment_path ) from utils.torch_utils import select_device # ROS相关全局变量 bridge = CvBridge() cmd_vel_pub = None image_received = False # 全局变量 latest_image = None # 全局变量 # 交通灯颜色与指示映射(确保与模型类别名称一致) TRAFFIC_LIGHT_INDICATION = { 'red': '停车', 'yellow': '减速', 'green': '通行' } # 路径处理工具函数 def convert_windows_path_to_posix(obj): """递归将WindowsPath对象转换为PosixPath对象""" if isinstance(obj, dict): return {k: convert_windows_path_to_posix(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_windows_path_to_posix(v) for v in obj] elif isinstance(obj, Path): return Path(str(obj)) else: return obj # 模型加载函数 def load_model_safely(weights_path, data_path, device): """安全加载模型,处理跨平台路径问题""" print(f"尝试安全加载模型: {weights_path}") try: model = DetectMultiBackend(weights_path, device=device, data=data_path) stride = model.stride names = model.names print(f"成功安全加载模型,步长: {stride}, 类别: {names}") # 打印模型实际类别名称 return model except Exception as e: print(f"安全加载模型失败: {e}") try: checkpoint = torch.load(weights_path, map_location=device) checkpoint = convert_windows_path_to_posix(checkpoint) if isinstance(checkpoint, dict) and 'model' in checkpoint: model = checkpoint['model'].float().fuse().eval() else: model = checkpoint.float().fuse().eval() with open(data_path, 'r') as f: data_dict = yaml.safe_load(f) if 'names' in data_dict: model.names = data_dict['names'] print(f"模型类别名称: {model.names}") # 打印模型实际类别名称 stride = model.stride.max() if hasattr(model, 'stride') else 32 print(f"成功使用激进方法加载模型,步长: {stride}") return model except Exception as e2: print(f"激进方法加载模型失败: {e2}") return None # ROS图像回调函数 def image_callback(msg): global latest_image, image_received try: latest_image = bridge.imgmsg_to_cv2(msg, "bgr8") image_received = True except Exception as e: print(f"图像转换错误: {e}") def run( weights='best.pt', data='light.yaml', imgsz=640, conf_thres=0.3, # 降低置信度阈值,提高黄灯检测灵敏度 iou_thres=0.45, max_det=1000, device='', view_img=False, save_txt=False, save_crop=False, nosave=False, classes=None, line_thickness=18, ): global cmd_vel_pub, image_received, latest_image # 初始化ROS节点 rospy.init_node('traffic_light_detector', anonymous=True) cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) print("已创建/cmd_vel发布器") # 订阅摄像头图像话题 rospy.Subscriber('/usb_cam1/image_raw', Image, image_callback) print("已订阅 /usb_cam1/image_raw 话题") # 加载模型 device = select_device(device) weights_path = os.path.join(current_dir, weights) data_path = os.path.join(current_dir, data) print(f"尝试加载模型: {weights_path}") print(f"尝试加载数据配置: {data_path}") # 检查文件存在性 if not os.path.exists(weights_path): print(f"错误: 权重文件不存在: {weights_path}") print(f"当前目录内容: {os.listdir(current_dir)}") sys.exit(1) if not os.path.exists(data_path): print(f"错误: 数据配置文件不存在: {data_path}") sys.exit(1) # 加载模型 model = load_model_safely(weights_path, data_path, device) if model is None: print("所有加载方式均失败,程序无法继续") sys.exit(1) # 获取步长 stride = getattr(model, 'stride', 32) if isinstance(stride, torch.Tensor): stride = stride.max().item() imgsz = check_img_size(imgsz, s=stride) # 创建保存目录 save_dir = increment_path(Path('runs/detect/exp'), exist_ok=False) (save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True, exist_ok=True) # 模型预热 print("正在手动预热模型...") im = torch.zeros(1, 3, imgsz, imgsz).to(device) _ = model(im) if device.type != 'cpu' else None # 等待接收第一帧图像 print("等待接收图像...") while not image_received and not rospy.is_shutdown(): rospy.sleep(0.1) if rospy.is_shutdown(): print("ROS节点已关闭") return print("开始检测交通灯...") rate = rospy.Rate(10) # 10Hz should_exit = False frame_count = 0 # 用于生成唯一的图像文件名 # 连续检测计数器,避免瞬时检测导致误判 yellow_detected_count = 0 red_detected_count = 0 green_detected_count = 0 required_consecutive_detections = 2 # 需要连续2帧检测到才生效 consecutive_green_to_exit = 3 # 需要连续检测到绿灯3次才退出 while not rospy.is_shutdown() and not should_exit: if not image_received: rate.sleep() continue # 使用最新接收到的图像 im0 = latest_image.copy() image_received = False frame_count += 1 # 递增帧计数 # 图像预处理 im = cv2.resize(im0, (imgsz, imgsz)) im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) im = im.transpose(2, 0, 1) im = np.ascontiguousarray(im) im = torch.from_numpy(im).to(device) im = im.float() / 255.0 if len(im.shape) == 3: im = im.unsqueeze(0) # 模型推理 pred = model(im) pred = non_max_suppression(pred, conf_thres, iou_thres, classes, max_det=max_det) # 处理检测结果 detected_classes = set() for i, det in enumerate(pred): save_path = str(save_dir / f"frame_{frame_count}.jpg") txt_path = str(save_dir / 'labels' / f"frame_{frame_count}") if len(det): # 调整边界框尺寸 det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round() # 收集检测到的类别(转换为小写,增强兼容性) detected_classes = {model.names[int(c)].lower() for c in det[:, -1].unique()} traffic_light_info = [ f"{cls}({TRAFFIC_LIGHT_INDICATION.get(cls, '未知')})" for cls in detected_classes ] print(f"检测到目标: {', '.join(traffic_light_info)}") # 绘制边界框 annotator = Annotator(im0, line_width=line_thickness) for *xyxy, conf, cls in reversed(det): c = int(cls) label = f"{model.names[c]} {conf:.2f}" annotator.box_label(xyxy, label, color=colors(c, True)) if save_txt: xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))).view(-1).tolist() with open(f"{txt_path}.txt", 'a') as f: f.write(f"{c} {' '.join(map(str, xywh))} {conf:.2f}\n") if save_crop: cv2.imwrite(f"{save_dir}/{model.names[c]}_frame_{frame_count}.jpg", im0[int(xyxy[1]):int(xyxy[3]), int(xyxy[0]):int(xyxy[2])]) im0 = annotator.result() # 显示结果 if view_img: cv2.imshow("Traffic Light Detection", im0) if cv2.waitKey(1) == ord('q'): break # 保存结果图像 if not nosave: cv2.imwrite(save_path, im0) # 小车控制逻辑(优化版本) twist = Twist() # 重置计数器(如果未检测到对应颜色) if 'yellow' not in detected_classes: yellow_detected_count = 0 if 'red' not in detected_classes: red_detected_count = 0 if 'green' not in detected_classes: green_detected_count = 0 # 连续检测计数 if 'yellow' in detected_classes: yellow_detected_count += 1 print(f"连续检测到黄灯 {yellow_detected_count}/{required_consecutive_detections}") if 'red' in detected_classes: red_detected_count += 1 print(f"连续检测到红灯 {red_detected_count}/{required_consecutive_detections}") if 'green' in detected_classes: green_detected_count += 1 print(f"连续检测到绿灯 {green_detected_count}/{consecutive_green_to_exit}") # 控制逻辑:连续检测到指定次数才执行动作 if red_detected_count >= required_consecutive_detections: twist.linear.x = 0.0 twist.angular.z = 0.0 cmd_vel_pub.publish(twist) print("检测到红灯,小车已停止") elif yellow_detected_count >= required_consecutive_detections: twist.linear.x = 0.0 # 黄灯也停止(或根据需求改为减速) twist.angular.z = 0.0 cmd_vel_pub.publish(twist) print("检测到黄灯,小车已停止") elif green_detected_count >= consecutive_green_to_exit: print(f"连续检测到绿灯 {consecutive_green_to_exit} 次,退出红绿灯检测") should_exit = True # 未检测到需停止的灯时,不发布指令(避免覆盖巡线指令) # else: # # 不发布任何指令,保持巡线程序的控制 # pass rate.sleep() # 清理资源 cv2.destroyAllWindows() print("红绿灯检测程序已退出") def parse_opt(): parser = argparse.ArgumentParser() parser.add_argument('--weights', type=str, default='best.pt', help='模型路径') parser.add_argument('--data', type=str, default='light.yaml', help='数据集配置') parser.add_argument('--imgsz', type=int, default=640, help='推理图像尺寸') parser.add_argument('--conf-thres', type=float, default=0.3, help='置信度阈值') # 降低阈值 parser.add_argument('--iou-thres', type=float, default=0.45, help='NMS IoU阈值') parser.add_argument('--device', type=str, default='', help='设备') parser.add_argument('--view-img', action='store_true', help='显示结果') parser.add_argument('--save-txt', action='store_true', help='保存为txt') parser.add_argument('--save-crop', action='store_true', help='保存裁剪结果') parser.add_argument('--nosave', action='store_true', help='不保存结果') parser.add_argument('--classes', nargs='+', type=int, help='过滤类别') parser.add_argument('--line-thickness', type=int, default=18, help='边界框线宽') opt = parser.parse_args() return opt if __name__ == "__main__": opt = parse_opt() run(**vars(opt))

原始代码是这样的:#!./runtime/install/bin/python3 import os import sys import _ctypes import ctypes import _thread import colorama import subprocess import random import signal import click from threading import Thread from time import sleep from colorama import Fore, init from scapy.layers.bluetooth import HCI_Hdr, HCI_PHDR_Hdr from scapy.utils import wrpcap, PcapWriter # Custom libs from src.layers_bredr import ESP32_BREDR, BT_Baseband, BT_ACL_Hdr, BT_LMP from src.ESP32BTDriver import ESP32BTDriver class SnifferBREDR: TAG = 'Sniffer' working_dir = None wireshark_process = None pcap_fifo_filename = 'runtime/fifocap.fifo' pcap_filename = 'logs/capture_bluetooth.pcapng' save_pcap = False pcap_fifo_writer = None pcap_writer = None show_summary = True start_wireshark = False wireshark_started = False host_bdaddr = None driver = None # type: ESP32BTDriver driver_run = False serial_port = None serial_baud = None serial_thread = None bridge_hci = True bt_program = None bt_program_thread = None bt_program_run = False bt_program_process = None # program parameters bt_bdaddr = None # BT Vars tx_packets = 0 rx_packets = 0 # Constructor def __init__(self, serial_port=None, serial_baud=921600, show_summary=True, start_wireshark=False, save_pcap=True, pcap_filename=None, bridge_hci=True, bt_program=None, target_bdaddress=None, host_bdaddr='E0:D4:E8:19:C7:68'): self.show_summary = show_summary self.start_wireshark = start_wireshark self.serial_port = serial_port self.serial_baud = serial_baud self.save_pcap = save_pcap self.bridge_hci = bridge_hci self.bt_bdaddr = target_bdaddress self.host_bdaddr = host_bdaddr if pcap_filename: self.pcap_filename = pcap_filename if bt_program: self.bt_program = bt_program if self.start_wireshark: try: os.remove(self.pcap_fifo_filename) except: pass os.mkfifo(self.pcap_fifo_filename) try: self.l('[!] Starting Wireshark...') self.wireshark_process = subprocess.Popen( ['wireshark', '-k', '-i', self.pcap_fifo_filename]) self.pcap_fifo_writer = PcapWriter( self.pcap_fifo_filename, sync=True) self.wireshark_started = True except Exception as e: self.error('Wireshark could not start: ' + str(e)) if save_pcap: self.pcap_writer = PcapWriter(self.pcap_filename, sync=True) if sys.platform == 'linux': os.system('chmod o+rw ' + self.pcap_filename) def signal_handler(self, signal, frame): self.error('You pressed Ctrl+C - or killed me with -2') exit(0) # Logs def l(self, msg): print(Fore.YELLOW + '[' + self.TAG + '] ' + msg) def error(self, msg): print(Fore.RED + '[Error:' + self.TAG + '] ' + msg) # Main functions def start(self): if self.bridge_hci or self.bt_program is None: self.driver = ESP32BTDriver(self.serial_port, self.serial_baud) self.driver.enable_sniffing(1) self.driver.disable_poll_null(1) self.driver.set_bdaddr(self.host_bdaddr) print(Fore.GREEN + 'ESP32BT driver started on ' + self.serial_port + '@' + str(self.serial_baud)) self.driver_run = True self.serial_thread = Thread(target=self.uart_rx_handler) self.serial_thread.daemon = True self.serial_thread.start() if self.bt_program is not None: self.bt_program_run = True self.bt_program_thread = Thread(target=self.bt_program_handler) self.bt_program_thread.daemon = True self.bt_program_thread.start() def bt_program_handler(self): if self.bridge_hci: p_name = self.driver.serial_bridge_name else: p_name = self.serial_port p_args = [self.bt_program, '-u', p_name, '-a', str(self.bt_bdaddr)] print('Starting ' + str(p_args)) process = subprocess.Popen(p_args) self.bt_program_process = process while self.bt_program_run: sleep(1) rc = process.poll() return rc def uart_rx_handler(self): while self.driver_run: # Receive packet from the ESP32 Board data = self.driver.receive() if data is not None: # Decode Bluetooth Low Energy Data pkt = ESP32_BREDR(data) if pkt: summary = pkt[BT_Baseband].summary() direction = self.driver.direction if direction == 1: if self.show_summary: self.log_rx(summary) self.rx_packets += 1 elif direction == 0: if self.show_summary: self.log_tx(summary) self.tx_packets += 1 # Pipe / Save pcap hci_pkt = HCI_PHDR_Hdr( direction=direction) / HCI_Hdr() / pkt if self.wireshark_started is True: self.pcap_fifo_writer.write(hci_pkt) if self.save_pcap is True: self.pcap_writer.write(hci_pkt) def log_tx(self, log_message): print(Fore.CYAN + 'TX --> ' + log_message) def log_rx(self, log_message): print(Fore.GREEN + 'RX <-- ' + log_message) # Defaults serial_port = '/dev/ttyUSB0' serial_baud = 921600 @click.command() @click.option('--port', default=serial_port, help='Serial port name (/dev/ttyUSBx for Linux)') @click.option('--host', default='E0:D4:E8:19:C7:68', help='BDAddress of local host (default: E0:D4:E8:19:C7:68)') @click.option('--target', help='BDAddress of remote target (ex: a8:96:75:25:c2:ac)') @click.option('--live-wireshark', is_flag=True, help='Opens Wireshark live session') @click.option('--live-terminal', is_flag=True, help='Show a summary of each packet on terminal') @click.option('--bridge-only', is_flag=True, help='Starts the HCI bridge without connecting any BT Host stack') def sniffer(port, host, target, live_wireshark, live_terminal, bridge_only): bt_program = None host_bdaddress = None target_bdaddress = None bd_role_master = False if target: # Check BDAddress format if ':' in target and (len(target.split(':')) == 6) and (len(target) == 17): target_bdaddress = target.lower() else: raise ValueError("Incorrect BDAddress format") if host: # Check BDAddress format if ':' in host and (len(host.split(':')) == 6) and (len(host) == 17): host_bdaddress = host.lower() else: raise ValueError("Incorrect BDAddress format") if (live_terminal or live_wireshark) and not bridge_only: bd_role_master = True if target else False bt_program = ( './host_stack/sdp_rfcomm_query' if bd_role_master else './host_stack/spp_counter') else: print(Fore.YELLOW + '[!] Bridge will start without BT host stack') print('Using options:\n\ Serial Port: %s\n\ Serial Baud: %d\n\ BT Host Program: %s\n\ Host BDAddress: %s\n\ Target BDAddress: %s' % (port, serial_baud, bt_program, host_bdaddress, target_bdaddress)) Sniffer = SnifferBREDR(serial_port=port, serial_baud=serial_baud, show_summary=live_terminal, start_wireshark=live_wireshark, bt_program=bt_program, target_bdaddress=target) Sniffer.start() try: while True: sleep(1) except KeyboardInterrupt: if Sniffer.save_pcap: print(Fore.GREEN + 'Capture saved on logs/capture_bluetooth.pcapng') if Sniffer.bt_program_process is not None: Sniffer.bt_program_process.kill() print(Fore.YELLOW + 'BT Program finished') if __name__ == '__main__': init(autoreset=True) sniffer()

前置环境准备1. 相关软件包准备rhel软件包openssl软件包https://openssl-library.org/source/old/index.htmlopensshhttps://www.openssh.com/上传相关软件包2. 配置yum源mkdir /soft mkdir /mnt/filemkdir /etc/yum.repos.d/bakmv /etc/yum.repos.d/*.repo /etc/yum.repos.d/bakcat > /etc/yum.repos.d/base.repo << EDF[base]name=bashbaseurl=file:///mnt/fileenabled=1gpgcheck=0EDFmount -o loop /soft/rhel-server-7.9-x86_64-dvd.iso /mnt/file/yum clean allyum makecache3. 查询当前系统、openssh、openssl版本cat /etc/redhat-releasessh -V4. 安装依赖包yum -y install gcc gcc-c++ glibc make autoconf openssl-devel pcre-devel pam-devel rpm-build5. 禁止selinux##重点,禁止selinux 否则重启后会登录失败##sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/configsetenforce 06. 解压软件包cd /soft/tar -zxf openssh-9.8p1.tar.gz tar -zxf openssl-1.1.1w.tar.gz 升级opensslmv /usr/bin/openssl /usr/bin/openssl.bakmv /usr/include/openssl /usr/include/openssl.bakmv /usr/lib64/libssl.so /usr/lib64/libssl.so.bakcd /soft/openssl-1.1.1w/./config --prefix=/usr --shared && make && make installopenssl version升级openssh a. 编译安装opensshservice sshd stoprpm -e rpm -qa|grep openssh --nodepsrpm -qa |grep opensshrm -rf /etc/ssh#编译安装opensshcd /soft/openssh-9.8p1/./configure --prefix=/usr --sysconfdir=/etc/ssh --with-ssl-dir=/usr/local/openssl --with-zlib --with-md5-passwords --with-pam --with-tcp-wrappers && make && make install b. 修改ssh配置文件cat >> /etc/ssh/sshd_config << EDFport 22PermitRootLogin yesPermitEmptyPasswords noEDFcp contrib/redhat/sshd.init /etc/init.d/sshdcp contrib/redhat/sshd.pam /etc/pam.d/sshd.pam c. ssh服务chkconfig --add sshdchkconfig sshd onchkconfig sshd --listservice sshd startservice sshd restartssh -V版本回退方法yum remove -y openssh && yum install -y openssh openssh-clients openssh-serverservice sshd restart【注】:建议清理已保存的公私钥配置文件,rm -rf /etc/ssh/ssh_host 改为交互式脚本,扫描软件路径,列出相关软件包,交互式输入openssl版本,交互式输入openssh版本,交互式输入挂载镜像版本,每个步骤都需检查,根据检查结果交互执行下一步,显示详细日志

import numpy as np import matplotlib.pyplot as plt from pymatgen.io.vasp import Vasprun from pymatgen.core.structure import Structure from scipy.signal import savgol_filter from scipy.spatial import cKDTree from tqdm import tqdm import matplotlib as mpl import warnings from collections import defaultdict import os import csv import argparse import multiprocessing from functools import partial import time import dill # 忽略可能的警告 warnings.filterwarnings("ignore", category=UserWarning) # 专业绘图设置 - 符合Journal of Chemical Physics要求 plt.style.use('seaborn-v0_8-whitegrid') mpl.rcParams.update({ 'font.family': 'serif', 'font.serif': ['Times New Roman', 'DejaVu Serif'], 'font.size': 12, 'axes.labelsize': 14, 'axes.titlesize': 16, 'xtick.labelsize': 12, 'ytick.labelsize': 12, 'figure.dpi': 600, # 提高分辨率 'savefig.dpi': 600, 'figure.figsize': (8, 6), # 期刊常用尺寸 'lines.linewidth': 2.0, 'legend.fontsize': 10, 'legend.framealpha': 0.8, 'mathtext.default': 'regular', 'axes.linewidth': 1.5, # 加粗坐标轴线 'xtick.major.width': 1.5, 'ytick.major.width': 1.5, 'xtick.major.size': 5, 'ytick.major.size': 5, }) # 改进的原子类型识别函数 - 考虑质子转移和动态归属 def identify_atom_types(struct): """改进的原子类型识别函数,考虑质子转移和动态归属""" # 1. 初始化数据结构 p_oxygens = {"P=O": [], "P-O": [], "P-OH": []} phosphate_hydrogens = [] # P-OH基团中的H原子 water_oxygens = [] # 水分子中的O原子 water_hydrogens = [] # 水分子中的H原子 hydronium_oxygens = [] # 水合氢离子中的O原子 hydronium_hydrogens = [] # 水合氢离子中的H原子 fluoride_atoms = [i for i, site in enumerate(struct) if site.species_string == "F"] aluminum_atoms = [i for i, site in enumerate(struct) if site.species_string == "Al"] # 2. 构建全局KDTree用于快速查找 all_coords = np.array([site.coords for site in struct]) kdtree = cKDTree(all_coords, boxsize=struct.lattice.abc) # 3. 识别磷酸基团 p_atoms = [i for i, site in enumerate(struct) if site.species_string == "P"] phosphate_oxygens = [] # 所有磷酸氧原子 for p_idx in p_atoms: # 查找P周围的O原子 (距离 < 1.6Å) neighbors = kdtree.query_ball_point(all_coords[p_idx], r=1.6) p_o_indices = [idx for idx in neighbors if idx != p_idx and struct[idx].species_string == "O"] if not p_o_indices: continue # 按距离排序并确定P=O (最近的O) distances = [struct.get_distance(p_idx, o_idx) for o_idx in p_o_indices] sorted_indices = sorted(range(len(distances)), key=lambda i: distances[i]) closest_o = p_o_indices[sorted_indices[0]] phosphate_oxygens.append(closest_o) p_oxygens["P=O"].append(closest_o) # 处理其他O原子 for i in range(1, len(p_o_indices)): o_idx = p_o_indices[sorted_indices[i]] phosphate_oxygens.append(o_idx) p_oxygens["P-O"].append(o_idx) # 暂时标记为P-O,后续可能调整为P-OH # 4. 识别所有H原子并确定归属 hydrogen_owners = {} # 存储每个H原子的归属O原子 h_atoms = [i for i, site in enumerate(struct) if site.species_string == "H"] for h_idx in h_atoms: # 查找H周围1.2Å内的所有O原子 neighbors = kdtree.query_ball_point(all_coords[h_idx], r=1.2) candidate_os = [idx for idx in neighbors if idx != h_idx and struct[idx].species_string == "O"] if not candidate_os: continue # 计算距离并找到最近的O原子 min_dist = float('inf') owner_o = None for o_idx in candidate_os: dist = struct.get_distance(h_idx, o_idx) if dist < min_dist: min_dist = dist owner_o = o_idx hydrogen_owners[h_idx] = owner_o # 5. 重新分类磷酸氧 for o_idx in phosphate_oxygens: # 检查该氧原子是否有归属的H原子 has_hydrogen = any(owner_o == o_idx for h_idx, owner_o in hydrogen_owners.items()) # 如果是P=O且没有H,保持P=O if o_idx in p_oxygens["P=O"] and not has_hydrogen: continue # 保持为P=O # 如果是P=O但有H,需要重新分类 if o_idx in p_oxygens["P=O"] and has_hydrogen: p_oxygens["P=O"].remove(o_idx) p_oxygens["P-OH"].append(o_idx) # 如果是P-O且有H,改为P-OH if o_idx in p_oxygens["P-O"] and has_hydrogen: p_oxygens["P-O"].remove(o_idx) p_oxygens["P-OH"].append(o_idx) # 6. 识别水和水合氢离子 all_o_indices = [i for i, site in enumerate(struct) if site.species_string == "O"] non_phosphate_os = [o_idx for o_idx in all_o_indices if o_idx not in phosphate_oxygens] # 统计每个O原子的H原子数量 o_h_count = defaultdict(int) for h_idx, owner_o in hydrogen_owners.items(): o_h_count[owner_o] += 1 for o_idx in non_phosphate_os: h_count = o_h_count.get(o_idx, 0) # 获取归属的H原子 attached_hs = [h_idx for h_idx, owner_o in hydrogen_owners.items() if owner_o == o_idx] if h_count == 2: # 水分子 water_oxygens.append(o_idx) water_hydrogens.extend(attached_hs) elif h_count == 3: # 水合氢离子 hydronium_oxygens.append(o_idx) hydronium_hydrogens.extend(attached_hs) # h_count=1的羟基不处理 # 7. 识别磷酸基团的H原子 for o_idx in p_oxygens["P-OH"]: attached_hs = [h_idx for h_idx, owner_o in hydrogen_owners.items() if owner_o == o_idx] phosphate_hydrogens.extend(attached_hs) return { "phosphate_oxygens": p_oxygens, "phosphate_hydrogens": phosphate_hydrogens, "water_oxygens": water_oxygens, "water_hydrogens": water_hydrogens, "hydronium_oxygens": hydronium_oxygens, "hydronium_hydrogens": hydronium_hydrogens, "fluoride_atoms": fluoride_atoms, "aluminum_atoms": aluminum_atoms } # 2. RDF计算函数 - 修复负值问题和序列化问题 def process_frame(struct, center_sel, target_sel, r_max, exclude_bonds, bond_threshold): """处理单帧结构计算,完全处理空原子类型情况""" # 每帧重新识别原子类型(关键!) atom_types = identify_atom_types(struct) # 获取中心原子和目标原子 centers = center_sel(atom_types) targets = target_sel(atom_types) # 处理空原子类型情况 - 第一重保护 if len(centers) == 0 or len(targets) == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": 0, "n_targets": 0, "volume": struct.volume } center_coords = np.array([struct[i].coords for i in centers]) target_coords = np.array([struct[i].coords for i in targets]) lattice = struct.lattice kdtree = cKDTree(target_coords, boxsize=lattice.abc) # 动态确定邻居数量 - 不超过目标原子数 k_val = min(50, len(targets)) # 处理目标原子数量为0的情况 - 第二重保护 if k_val == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 执行查询并确保结果统一格式 try: query_result = kdtree.query(center_coords, k=k_val, distance_upper_bound=r_max) except Exception as e: # 异常处理 - 返回空结果 print(f"KDTree query error: {str(e)}") return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 统一处理不同维度的返回结果 if k_val == 1: # 处理单邻居情况 if isinstance(query_result, tuple): distances, indices = query_result else: distances = query_result indices = np.zeros_like(distances, dtype=int) # 确保数组格式 distances = np.atleast_1d(distances) indices = np.atleast_1d(indices) else: # 多邻居情况 distances, indices = query_result # 确保二维数组格式 if distances.ndim == 1: distances = distances.reshape(-1, 1) indices = indices.reshape(-1, 1) valid_distances = [] for i in range(distances.shape[0]): center_idx = centers[i] for j in range(distances.shape[1]): dist = distances[i, j] # 跳过超出范围的距离 if dist > r_max or np.isinf(dist): continue target_idx = targets[indices[i, j]] # 排除化学键 if exclude_bonds: actual_dist = struct.get_distance(center_idx, target_idx) if actual_dist < bond_threshold: continue valid_distances.append(dist) return { "distances": np.array(valid_distances, dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } def calculate_rdf_parallel(structures, center_sel, target_sel, r_max=8.0, bin_width=0.05, exclude_bonds=True, bond_threshold=1.3, workers=1): """ 并行计算径向分布函数 :param workers: 并行工作进程数 """ bins = np.arange(0, r_max, bin_width) hist = np.zeros(len(bins) - 1) total_centers = 0 total_targets = 0 total_volume = 0 # 准备参数 - 使用dill解决序列化问题 dill.settings['recurse'] = True func = partial(process_frame, center_sel=center_sel, target_sel=target_sel, r_max=r_max, exclude_bonds=exclude_bonds, bond_threshold=bond_threshold) # 使用多进程池 with multiprocessing.Pool(processes=workers) as pool: results = [] # 使用imap_unordered提高效率 for res in tqdm(pool.imap_unordered(func, structures), total=len(structures), desc="Calculating RDF"): results.append(res) # 处理结果 - 特别注意空结果处理 n_frames = 0 for res in results: if res is None: continue n_frames += 1 valid_distances = res["distances"] n_centers = res["n_centers"] n_targets = res["n_targets"] volume = res["volume"] # 累加计数 if len(valid_distances) > 0: hist += np.histogram(valid_distances, bins=bins)[0] total_centers += n_centers total_targets += n_targets total_volume += volume # 修正归一化 - 解决负值问题 if n_frames == 0: # 没有有效帧时返回空结果 r = bins[:-1] + bin_width/2 return r, np.zeros_like(r), {"position": None, "value": None} avg_density = total_targets / total_volume if total_volume > 0 else 0 r = bins[:-1] + bin_width/2 rdf = np.zeros_like(r) for i in range(len(hist)): r_lower = bins[i] r_upper = bins[i+1] shell_vol = 4/3 * np.pi * (r_upper**3 - r_lower**3) expected_count = shell_vol * avg_density * total_centers # 避免除以零 if expected_count > 1e-10: rdf[i] = hist[i] / expected_count else: rdf[i] = 0 # 更稳健的平滑处理 - 避免边界效应 if len(rdf) > 10: window_length = min(15, len(rdf)//2*2+1) polyorder = min(5, window_length-1) rdf_smoothed = savgol_filter(rdf, window_length=window_length, polyorder=polyorder, mode='mirror') else: rdf_smoothed = rdf # 计算主要峰值 peak_info = {} mask = (r >= 1.5) & (r <= 3.0) if np.any(mask) and np.any(rdf_smoothed[mask] > 0): peak_idx = np.argmax(rdf_smoothed[mask]) peak_pos = r[mask][peak_idx] peak_val = rdf_smoothed[mask][peak_idx] peak_info = {"position": peak_pos, "value": peak_val} else: peak_info = {"position": None, "value": None} return r, rdf_smoothed, peak_info # 3. 定义精细化的选择器函数(避免lambda序列化问题) def selector_phosphate_P_double_O(atom_types): return atom_types["phosphate_oxygens"]["P=O"] def selector_phosphate_P_OH(atom_types): return atom_types["phosphate_oxygens"]["P-OH"] def selector_phosphate_P_O(atom_types): return atom_types["phosphate_oxygens"]["P-O"] def selector_phosphate_hydrogens(atom_types): return atom_types["phosphate_hydrogens"] def selector_water_only_hydrogens(atom_types): """仅选择水分子中的氢原子""" return atom_types["water_hydrogens"] def selector_hydronium_only_hydrogens(atom_types): """仅选择水合氢离子中的氢原子""" return atom_types["hydronium_hydrogens"] def selector_water_only_oxygens(atom_types): """仅选择水分子中的氧原子""" return atom_types["water_oxygens"] def selector_hydronium_only_oxygens(atom_types): """仅选择水合氢离子中的氧原子""" return atom_types["hydronium_oxygens"] def selector_fluoride_atoms(atom_types): return atom_types["fluoride_atoms"] def selector_aluminum_atoms(atom_types): return atom_types["aluminum_atoms"] def selector_all_phosphate_oxygens(atom_types): return (atom_types["phosphate_oxygens"]["P=O"] + atom_types["phosphate_oxygens"]["P-O"] + atom_types["phosphate_oxygens"]["P-OH"]) # 4. 根据您的要求定义六张图的RDF分组配置 def get_rdf_groups(): """返回六张图的RDF分组配置(完全符合您的需求)""" return { # 图1: Al的配位情况 "Al_Coordination": [ (selector_aluminum_atoms, selector_fluoride_atoms, "Al-F", "blue"), (selector_aluminum_atoms, selector_water_only_oxygens, "Al-Ow", "green"), (selector_aluminum_atoms, selector_all_phosphate_oxygens, "Al-Op", "red") ], # 图2: F与H形成的氢键 "F_Hydrogen_Bonding": [ (selector_fluoride_atoms, selector_water_only_hydrogens, "F-Hw", "lightblue"), (selector_fluoride_atoms, selector_hydronium_only_hydrogens, "F-Hh", "blue"), (selector_fluoride_atoms, selector_phosphate_hydrogens, "F-Hp", "darkblue") ], # 图3: 磷酸作为受体与周围环境的氢键(区分氧类型) "Phosphate_Acceptor": [ (selector_phosphate_P_double_O, selector_water_only_hydrogens, "P=O···Hw", "orange"), (selector_phosphate_P_double_O, selector_hydronium_only_hydrogens, "P=O···Hh", "red"), (selector_phosphate_P_O, selector_water_only_hydrogens, "P-O···Hw", "lightgreen"), (selector_phosphate_P_O, selector_hydronium_only_hydrogens, "P-O···Hh", "green"), (selector_phosphate_P_OH, selector_water_only_hydrogens, "P-OH···Hw", "lightblue"), (selector_phosphate_P_OH, selector_hydronium_only_hydrogens, "P-OH···Hh", "blue") ], # 图4: 磷酸-水-水合氢离子交叉氢键(排除同种类型) "Cross_Species_HBonding": [ (selector_phosphate_hydrogens, selector_water_only_oxygens, "Hp···Ow", "pink"), (selector_phosphate_hydrogens, selector_hydronium_only_oxygens, "Hp···Oh", "purple"), (selector_water_only_hydrogens, selector_all_phosphate_oxygens, "Hw···Op", "lightgreen"), (selector_water_only_hydrogens, selector_hydronium_only_oxygens, "Hw···Oh", "green"), (selector_hydronium_only_hydrogens, selector_water_only_oxygens, "Hh···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_all_phosphate_oxygens, "Hh···Op", "blue") ], # 图5: 同类型分子内/间氢键(区分磷酸氧类型) "Same_Species_HBonding": [ (selector_phosphate_hydrogens, selector_phosphate_P_double_O, "Hp···P=O", "red"), (selector_phosphate_hydrogens, selector_phosphate_P_O, "Hp···P-O", "orange"), (selector_phosphate_hydrogens, selector_phosphate_P_OH, "Hp···P-OH", "yellow"), (selector_water_only_hydrogens, selector_water_only_oxygens, "Hw···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_hydronium_only_oxygens, "Hh···Oh", "blue") ], # 图6: O-O聚集分析(Op不区分类型) "O_O_Aggregation": [ (selector_all_phosphate_oxygens, selector_water_only_oxygens, "Op-Ow", "blue"), (selector_all_phosphate_oxygens, selector_hydronium_only_oxygens, "Op-Oh", "green"), (selector_all_phosphate_oxygens, selector_all_phosphate_oxygens, "Op-Op", "red"), (selector_water_only_oxygens, selector_hydronium_only_oxygens, "Ow-Oh", "purple"), (selector_water_only_oxygens, selector_water_only_oxygens, "Ow-Ow", "cyan"), (selector_hydronium_only_oxygens, selector_hydronium_only_oxygens, "Oh-Oh", "magenta") ] } # 5. 主程序 - 优化并行处理 def main(workers=1): # 定义要处理的体系 vasprun_files = { "System1": "vasprun1.xml", "System2": "vasprun2.xml", "System3": "vasprun3.xml", "System4": "vasprun4.xml" } # 获取RDF分组配置 rdf_groups = get_rdf_groups() # 标题映射(根据您的要求) title_map = { "Al_Coordination": "Al Coordination Environment", "F_Hydrogen_Bonding": "F-H Hydrogen Bonding", "Phosphate_Acceptor": "Phosphate as H-bond Acceptor", "Cross_Species_HBonding": "Cross H-bonding between Different Species", "Same_Species_HBonding": "Intra- and Inter-molecular H-bonding", "O_O_Aggregation": "O-O Aggregation Analysis" } # 存储所有数据 all_system_data = {} group_y_max = {group_name: 0 for group_name in list(rdf_groups.keys())} group_x_max = { "Al_Coordination": (1.5, 3.5), "F_Hydrogen_Bonding": (1.0, 3.0), "Phosphate_Acceptor": (1.0, 3.0), "Cross_Species_HBonding": (1.0, 3.0), "Same_Species_HBonding": (1.0, 3.0), "O_O_Aggregation": (2.0, 6.0) } # 创建输出目录 os.makedirs("RDF_Plots", exist_ok=True) # 计算所有体系的所有RDF数据 for system_name, vasprun_file in vasprun_files.items(): print(f"\n{'='*50}") print(f"Processing {system_name}: {vasprun_file} with {workers} workers") print(f"{'='*50}") start_time = time.time() try: # 加载VASP结果 vr = Vasprun(vasprun_file, ionic_step_skip=5) structures = vr.structures print(f"Loaded {len(structures)} frames") # 存储体系数据 system_data = { "rdf_results": {}, "peak_infos": {} } # 计算所有RDF分组 for group_name, pairs in rdf_groups.items(): system_data["rdf_results"][group_name] = {} system_data["peak_infos"][group_name] = {} group_y_max_current = 0 for center_sel, target_sel, label, color in pairs: print(f"\nCalculating RDF for: {label}") try: r, rdf, peak_info = calculate_rdf_parallel( structures, center_sel, target_sel, r_max=10.0, exclude_bonds=True, bond_threshold=1.3, workers=workers ) system_data["rdf_results"][group_name][label] = (r, rdf, color) system_data["peak_infos"][group_name][label] = peak_info if len(rdf) > 0: current_max = np.max(rdf) if current_max > group_y_max_current: group_y_max_current = current_max if peak_info["position"] is not None: print(f" Peak for {label}: {peak_info['position']:.3f} Å (g(r) = {peak_info['value']:.2f})") else: print(f" No significant peak found for {label} in 1.5-3.0 Å range") except Exception as e: print(f"Error calculating RDF for {label}: {str(e)}") system_data["rdf_results"][group_name][label] = (np.array([]), np.array([]), color) system_data["peak_infos"][group_name][label] = {"position": None, "value": None} if group_y_max_current > group_y_max[group_name]: group_y_max[group_name] = group_y_max_current all_system_data[system_name] = system_data elapsed = time.time() - start_time print(f"\nCompleted processing for {system_name} in {elapsed:.2f} seconds") except Exception as e: print(f"Error processing {system_name}: {str(e)}") # 为每个分组添加余量 for group_name in group_y_max: group_y_max[group_name] = max(group_y_max[group_name] * 1.15, 3.0) # 确保最小值 # 第二步:生成符合期刊要求的图表 for system_name, system_data in all_system_data.items(): print(f"\nGenerating publication-quality plots for {system_name}") for group_name, group_data in system_data["rdf_results"].items(): fig, ax = plt.subplots(figsize=(8, 6)) # 设置坐标轴范围 xlim = group_x_max.get(group_name, (0, 6.0)) ylim = (0, group_y_max[group_name]) for label, (r, rdf, color) in group_data.items(): if len(r) > 0 and len(rdf) > 0: ax.plot(r, rdf, color=color, label=label, linewidth=2.0) ax.set_xlim(xlim) ax.set_ylim(ylim) # 期刊格式标签 ax.set_xlabel('Radial Distance (Å)', fontweight='bold') ax.set_ylabel('g(r)', fontweight='bold') # 添加体系名称到标题 ax.set_title(f"{system_name}: {title_map[group_name]}", fontsize=16, pad=15) # 精简图例 ncol = 3 if group_name == "Same_Species_HBonding" else 1 # 图5使用三列图例 ax.legend(ncol=ncol, loc='best', framealpha=0.8, fontsize=10) # 添加氢键区域标记(除O-O聚集图外) if group_name != "O_O_Aggregation": ax.axvspan(1.5, 2.5, alpha=0.1, color='green', zorder=0) ax.text(1.7, ylim[1]*0.85, 'H-bond Region', fontsize=10) # 添加网格 ax.grid(True, linestyle='--', alpha=0.5) # 保存高分辨率图片 plt.tight_layout() filename = os.path.join("RDF_Plots", f"RDF_{system_name}_{group_name}.tiff") plt.savefig(filename, bbox_inches='tight', dpi=600, format='tiff') print(f"Saved publication plot: {filename}") plt.close() # 保存Origin兼容数据 save_origin_data(system_name, system_data) print("\nAll RDF analysis completed successfully!") def save_origin_data(system_name, system_data): """保存Origin兼容格式数据""" os.makedirs("Origin_Data", exist_ok=True) system_dir = os.path.join("Origin_Data", system_name) os.makedirs(system_dir, exist_ok=True) # 保存峰值信息 peak_info_path = os.path.join(system_dir, f"Peak_Positions_{system_name}.csv") with open(peak_info_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Group", "Interaction", "Peak Position (A)", "g(r) Value"]) for group_name, peaks in system_data["peak_infos"].items(): for label, info in peaks.items(): if info["position"] is not None: writer.writerow([group_name, label, f"{info['position']:.3f}", f"{info['value']:.3f}"]) else: writer.writerow([group_name, label, "N/A", "N/A"]) print(f"Saved peak positions: {peak_info_path}") # 保存RDF数据 for group_name, group_results in system_data["rdf_results"].items(): group_dir = os.path.join(system_dir, group_name) os.makedirs(group_dir, exist_ok=True) for label, (r, rdf, color) in group_results.items(): if len(r) > 0 and len(rdf) > 0: safe_label = label.replace(" ", "_").replace("/", "_").replace("=", "_") safe_label = safe_label.replace("(", "").replace(")", "").replace("$", "") filename = f"RDF_{system_name}_{group_name}_{safe_label}.csv" filepath = os.path.join(group_dir, filename) with open(filepath, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Distance (A)", "g(r)"]) for i in range(len(r)): writer.writerow([f"{r[i]:.6f}", f"{rdf[i]:.6f}"]) print(f"Saved Origin data: {filename}") if __name__ == "__main__": # 设置命令行参数 parser = argparse.ArgumentParser(description='Calculate RDF for VASP simulations') parser.add_argument('--workers', type=int, default=multiprocessing.cpu_count(), help=f'Number of parallel workers (default: {multiprocessing.cpu_count()})') args = parser.parse_args() print(f"Starting RDF analysis with {args.workers} workers...") main(workers=args.workers) 以上代码计算的VASP数据,同时进行了详细区分计算,以及计算条件,在这里我们利用同样的框架,但修改其内容,在这里我们只需要计算体系中存在的各个元素(可能换体系,元素可能会涉及Al Fe Mg F H O P Si)之间的RDF计算,所以在这里代码当中可能需要添加可更改的中心原子和目标原子,并输出文本结果

你先读取该代码import numpy as np import matplotlib.pyplot as plt from pymatgen.io.vasp import Vasprun from pymatgen.core.structure import Structure from scipy.signal import savgol_filter from scipy.spatial import cKDTree from tqdm import tqdm import matplotlib as mpl import warnings from collections import defaultdict import os import csv import argparse import multiprocessing from functools import partial import time import dill # 忽略可能的警告 warnings.filterwarnings("ignore", category=UserWarning) # 专业绘图设置 - 符合Journal of Chemical Physics要求 plt.style.use('seaborn-v0_8-whitegrid') mpl.rcParams.update({ 'font.family': 'serif', 'font.serif': ['Times New Roman', 'DejaVu Serif'], 'font.size': 12, 'axes.labelsize': 14, 'axes.titlesize': 16, 'xtick.labelsize': 12, 'ytick.labelsize': 12, 'figure.dpi': 600, # 提高分辨率 'savefig.dpi': 600, 'figure.figsize': (8, 6), # 期刊常用尺寸 'lines.linewidth': 2.0, 'legend.fontsize': 10, 'legend.framealpha': 0.8, 'mathtext.default': 'regular', 'axes.linewidth': 1.5, # 加粗坐标轴线 'xtick.major.width': 1.5, 'ytick.major.width': 1.5, 'xtick.major.size': 5, 'ytick.major.size': 5, }) # 1. 增强的原子类型识别函数 - 逐帧识别 def identify_atom_types(struct): """识别所有关键原子类型并排除自身化学键""" # 磷酸氧分类 p_oxygens = {"P=O": [], "P-O": [], "P-OH": []} phosphate_hydrogens = [] # 仅P-OH基团中的H原子 # 水合氢离子识别 hydronium_oxygens = [] hydronium_hydrogens = [] # H₃O⁺中的H原子 # 普通水分子 water_oxygens = [] water_hydrogens = [] # 普通水中的H原子 # 氟离子 fluoride_atoms = [i for i, site in enumerate(struct) if site.species_string == "F"] # 铝离子 aluminum_atoms = [i for i, site in enumerate(struct) if site.species_string == "Al"] # 创建快速邻居查找表 neighbor_cache = defaultdict(list) for i, site in enumerate(struct): if site.species_string == "O": neighbors = struct.get_neighbors(site, r=1.3) h_neighbors = [n[0] for n in neighbors if n[0].species_string == "H"] neighbor_cache[i] = h_neighbors # 识别水合氢离子 (H₃O⁺) if len(h_neighbors) == 3: hydronium_oxygens.append(i) for h_site in h_neighbors: hydronium_hydrogens.append(h_site.index) # 识别磷酸基团 for site in struct: if site.species_string == "P": neighbors = struct.get_neighbors(site, r=2.0) # 扩大搜索半径 # 筛选氧原子邻居 o_neighbors = [(n[0], n[1]) for n in neighbors if n[0].species_string == "O"] if len(o_neighbors) < 4: # 如果找不到4个氧原子,使用旧方法 for neighbor in o_neighbors: nn_site = neighbor[0] if neighbor[1] < 1.55: p_oxygens["P=O"].append(nn_site.index) else: if any(n[0].species_string == "H" for n in struct.get_neighbors(nn_site, r=1.3)): p_oxygens["P-OH"].append(nn_site.index) else: p_oxygens["P-O"].append(nn_site.index) continue # 按距离排序 o_neighbors.sort(key=lambda x: x[1]) # 最近的氧原子为P=O p_double_o = o_neighbors[0][0] p_oxygens["P=O"].append(p_double_o.index) # 其他三个氧原子 for i in range(1, 4): o_site = o_neighbors[i][0] # 检查氧原子上是否有氢 if neighbor_cache.get(o_site.index, []): p_oxygens["P-OH"].append(o_site.index) else: p_oxygens["P-O"].append(o_site.index) # 识别P-OH基团中的H原子 (磷酸中的H) for o_idx in p_oxygens["P-OH"]: # 获取与P-OH氧相连的H原子 h_neighbors = neighbor_cache.get(o_idx, []) for h_site in h_neighbors: if h_site.species_string == "H": phosphate_hydrogens.append(h_site.index) # 识别普通水分子 (排除磷酸氧和水合氢离子) for i, site in enumerate(struct): if site.species_string == "O" and i not in hydronium_oxygens: is_phosphate_oxygen = False for cat in p_oxygens.values(): if i in cat: is_phosphate_oxygen = True break if not is_phosphate_oxygen: water_oxygens.append(i) # 识别普通水分子中的H原子 (水中的H) for o_idx in water_oxygens: h_neighbors = neighbor_cache.get(o_idx, []) for h_site in h_neighbors: if h_site.species_string == "H": water_hydrogens.append(h_site.index) return { "phosphate_oxygens": p_oxygens, "phosphate_hydrogens": phosphate_hydrogens, "water_oxygens": water_oxygens, "water_hydrogens": water_hydrogens, "hydronium_oxygens": hydronium_oxygens, "hydronium_hydrogens": hydronium_hydrogens, "fluoride_atoms": fluoride_atoms, "aluminum_atoms": aluminum_atoms } # 2. RDF计算函数 - 修复负值问题和序列化问题 def process_frame(struct, center_sel, target_sel, r_max, exclude_bonds, bond_threshold): """处理单帧结构计算,完全处理空原子类型情况""" # 每帧重新识别原子类型(关键!) atom_types = identify_atom_types(struct) # 获取中心原子和目标原子 centers = center_sel(atom_types) targets = target_sel(atom_types) # 处理空原子类型情况 - 第一重保护 if len(centers) == 0 or len(targets) == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": 0, "n_targets": 0, "volume": struct.volume } center_coords = np.array([struct[i].coords for i in centers]) target_coords = np.array([struct[i].coords for i in targets]) lattice = struct.lattice kdtree = cKDTree(target_coords, boxsize=lattice.abc) # 动态确定邻居数量 - 不超过目标原子数 k_val = min(50, len(targets)) # 处理目标原子数量为0的情况 - 第二重保护 if k_val == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 执行查询并确保结果统一格式 try: query_result = kdtree.query(center_coords, k=k_val, distance_upper_bound=r_max) except Exception as e: # 异常处理 - 返回空结果 print(f"KDTree query error: {str(e)}") return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 统一处理不同维度的返回结果 if k_val == 1: # 处理单邻居情况 if isinstance(query_result, tuple): distances, indices = query_result else: distances = query_result indices = np.zeros_like(distances, dtype=int) # 确保数组格式 distances = np.atleast_1d(distances) indices = np.atleast_1d(indices) else: # 多邻居情况 distances, indices = query_result # 确保二维数组格式 if distances.ndim == 1: distances = distances.reshape(-1, 1) indices = indices.reshape(-1, 1) valid_distances = [] for i in range(distances.shape[0]): center_idx = centers[i] for j in range(distances.shape[1]): dist = distances[i, j] # 跳过超出范围的距离 if dist > r_max or np.isinf(dist): continue target_idx = targets[indices[i, j]] # 排除化学键 if exclude_bonds: actual_dist = struct.get_distance(center_idx, target_idx) if actual_dist < bond_threshold: continue valid_distances.append(dist) return { "distances": np.array(valid_distances, dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } def calculate_rdf_parallel(structures, center_sel, target_sel, r_max=8.0, bin_width=0.05, exclude_bonds=True, bond_threshold=1.3, workers=1): """ 并行计算径向分布函数 :param workers: 并行工作进程数 """ bins = np.arange(0, r_max, bin_width) hist = np.zeros(len(bins) - 1) total_centers = 0 total_targets = 0 total_volume = 0 # 准备参数 - 使用dill解决序列化问题 dill.settings['recurse'] = True func = partial(process_frame, center_sel=center_sel, target_sel=target_sel, r_max=r_max, exclude_bonds=exclude_bonds, bond_threshold=bond_threshold) # 使用多进程池 with multiprocessing.Pool(processes=workers) as pool: results = [] # 使用imap_unordered提高效率 for res in tqdm(pool.imap_unordered(func, structures), total=len(structures), desc="Calculating RDF"): results.append(res) # 处理结果 - 特别注意空结果处理 n_frames = 0 for res in results: if res is None: continue n_frames += 1 valid_distances = res["distances"] n_centers = res["n_centers"] n_targets = res["n_targets"] volume = res["volume"] # 累加计数 if len(valid_distances) > 0: hist += np.histogram(valid_distances, bins=bins)[0] total_centers += n_centers total_targets += n_targets total_volume += volume # 修正归一化 - 解决负值问题 if n_frames == 0: # 没有有效帧时返回空结果 r = bins[:-1] + bin_width/2 return r, np.zeros_like(r), {"position": None, "value": None} avg_density = total_targets / total_volume if total_volume > 0 else 0 r = bins[:-1] + bin_width/2 rdf = np.zeros_like(r) for i in range(len(hist)): r_lower = bins[i] r_upper = bins[i+1] shell_vol = 4/3 * np.pi * (r_upper**3 - r_lower**3) expected_count = shell_vol * avg_density * total_centers # 避免除以零 if expected_count > 1e-10: rdf[i] = hist[i] / expected_count else: rdf[i] = 0 # 更稳健的平滑处理 - 避免边界效应 if len(rdf) > 10: window_length = min(15, len(rdf)//2*2+1) polyorder = min(5, window_length-1) rdf_smoothed = savgol_filter(rdf, window_length=window_length, polyorder=polyorder, mode='mirror') else: rdf_smoothed = rdf # 计算主要峰值 peak_info = {} mask = (r >= 1.5) & (r <= 3.0) if np.any(mask) and np.any(rdf_smoothed[mask] > 0): peak_idx = np.argmax(rdf_smoothed[mask]) peak_pos = r[mask][peak_idx] peak_val = rdf_smoothed[mask][peak_idx] peak_info = {"position": peak_pos, "value": peak_val} else: peak_info = {"position": None, "value": None} return r, rdf_smoothed, peak_info # 3. 定义精细化的选择器函数(避免lambda序列化问题) def selector_phosphate_P_double_O(atom_types): return atom_types["phosphate_oxygens"]["P=O"] def selector_phosphate_P_OH(atom_types): return atom_types["phosphate_oxygens"]["P-OH"] def selector_phosphate_P_O(atom_types): return atom_types["phosphate_oxygens"]["P-O"] def selector_phosphate_hydrogens(atom_types): return atom_types["phosphate_hydrogens"] def selector_water_only_hydrogens(atom_types): """仅选择水分子中的氢原子""" return atom_types["water_hydrogens"] def selector_hydronium_only_hydrogens(atom_types): """仅选择水合氢离子中的氢原子""" return atom_types["hydronium_hydrogens"] def selector_water_only_oxygens(atom_types): """仅选择水分子中的氧原子""" return atom_types["water_oxygens"] def selector_hydronium_only_oxygens(atom_types): """仅选择水合氢离子中的氧原子""" return atom_types["hydronium_oxygens"] def selector_fluoride_atoms(atom_types): return atom_types["fluoride_atoms"] def selector_aluminum_atoms(atom_types): return atom_types["aluminum_atoms"] def selector_all_phosphate_oxygens(atom_types): return (atom_types["phosphate_oxygens"]["P=O"] + atom_types["phosphate_oxygens"]["P-O"] + atom_types["phosphate_oxygens"]["P-OH"]) # 4. 根据您的要求定义六张图的RDF分组配置 def get_rdf_groups(): """返回六张图的RDF分组配置(完全符合您的需求)""" return { # 图1: Al的配位情况 "Al_Coordination": [ (selector_aluminum_atoms, selector_fluoride_atoms, "Al-F", "blue"), (selector_aluminum_atoms, selector_water_only_oxygens, "Al-Ow", "green"), (selector_aluminum_atoms, selector_all_phosphate_oxygens, "Al-Op", "red") ], # 图2: F与H形成的氢键 "F_Hydrogen_Bonding": [ (selector_fluoride_atoms, selector_water_only_hydrogens, "F-Hw", "lightblue"), (selector_fluoride_atoms, selector_hydronium_only_hydrogens, "F-Hh", "blue"), (selector_fluoride_atoms, selector_phosphate_hydrogens, "F-Hp", "darkblue") ], # 图3: 磷酸作为受体与周围环境的氢键(区分氧类型) "Phosphate_Acceptor": [ (selector_phosphate_P_double_O, selector_water_only_hydrogens, "P=O···Hw", "orange"), (selector_phosphate_P_double_O, selector_hydronium_only_hydrogens, "P=O···Hh", "red"), (selector_phosphate_P_O, selector_water_only_hydrogens, "P-O···Hw", "lightgreen"), (selector_phosphate_P_O, selector_hydronium_only_hydrogens, "P-O···Hh", "green"), (selector_phosphate_P_OH, selector_water_only_hydrogens, "P-OH···Hw", "lightblue"), (selector_phosphate_P_OH, selector_hydronium_only_hydrogens, "P-OH···Hh", "blue") ], # 图4: 磷酸-水-水合氢离子交叉氢键(排除同种类型) "Cross_Species_HBonding": [ (selector_phosphate_hydrogens, selector_water_only_oxygens, "Hp···Ow", "pink"), (selector_phosphate_hydrogens, selector_hydronium_only_oxygens, "Hp···Oh", "purple"), (selector_water_only_hydrogens, selector_all_phosphate_oxygens, "Hw···Op", "lightgreen"), (selector_water_only_hydrogens, selector_hydronium_only_oxygens, "Hw···Oh", "green"), (selector_hydronium_only_hydrogens, selector_water_only_oxygens, "Hh···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_all_phosphate_oxygens, "Hh···Op", "blue") ], # 图5: 同类型分子内/间氢键(区分磷酸氧类型) "Same_Species_HBonding": [ (selector_phosphate_hydrogens, selector_phosphate_P_double_O, "Hp···P=O", "red"), (selector_phosphate_hydrogens, selector_phosphate_P_O, "Hp···P-O", "orange"), (selector_phosphate_hydrogens, selector_phosphate_P_OH, "Hp···P-OH", "yellow"), (selector_water_only_hydrogens, selector_water_only_oxygens, "Hw···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_hydronium_only_oxygens, "Hh···Oh", "blue") ], # 图6: O-O聚集分析(Op不区分类型) "O_O_Aggregation": [ (selector_all_phosphate_oxygens, selector_water_only_oxygens, "Op-Ow", "blue"), (selector_all_phosphate_oxygens, selector_hydronium_only_oxygens, "Op-Oh", "green"), (selector_all_phosphate_oxygens, selector_all_phosphate_oxygens, "Op-Op", "red"), (selector_water_only_oxygens, selector_hydronium_only_oxygens, "Ow-Oh", "purple"), (selector_water_only_oxygens, selector_water_only_oxygens, "Ow-Ow", "cyan"), (selector_hydronium_only_oxygens, selector_hydronium_only_oxygens, "Oh-Oh", "magenta") ] } # 5. 主程序 - 优化并行处理 def main(workers=1): # 定义要处理的体系 vasprun_files = { "System1": "vasprun1.xml", "System2": "vasprun2.xml", "System3": "vasprun3.xml", "System4": "vasprun4.xml" } # 获取RDF分组配置 rdf_groups = get_rdf_groups() # 标题映射(根据您的要求) title_map = { "Al_Coordination": "Al Coordination Environment", "F_Hydrogen_Bonding": "F-H Hydrogen Bonding", "Phosphate_Acceptor": "Phosphate as H-bond Acceptor", "Cross_Species_HBonding": "Cross H-bonding between Different Species", "Same_Species_HBonding": "Intra- and Inter-molecular H-bonding", "O_O_Aggregation": "O-O Aggregation Analysis" } # 存储所有数据 all_system_data = {} group_y_max = {group_name: 0 for group_name in list(rdf_groups.keys())} group_x_max = { "Al_Coordination": (1.5, 3.5), "F_Hydrogen_Bonding": (1.0, 3.0), "Phosphate_Acceptor": (1.0, 3.0), "Cross_Species_HBonding": (1.0, 3.0), "Same_Species_HBonding": (1.0, 3.0), "O_O_Aggregation": (2.0, 6.0) } # 创建输出目录 os.makedirs("RDF_Plots", exist_ok=True) # 计算所有体系的所有RDF数据 for system_name, vasprun_file in vasprun_files.items(): print(f"\n{'='*50}") print(f"Processing {system_name}: {vasprun_file} with {workers} workers") print(f"{'='*50}") start_time = time.time() try: # 加载VASP结果 vr = Vasprun(vasprun_file, ionic_step_skip=5) structures = vr.structures print(f"Loaded {len(structures)} frames") # 存储体系数据 system_data = { "rdf_results": {}, "peak_infos": {} } # 计算所有RDF分组 for group_name, pairs in rdf_groups.items(): system_data["rdf_results"][group_name] = {} system_data["peak_infos"][group_name] = {} group_y_max_current = 0 for center_sel, target_sel, label, color in pairs: print(f"\nCalculating RDF for: {label}") try: r, rdf, peak_info = calculate_rdf_parallel( structures, center_sel, target_sel, r_max=10.0, exclude_bonds=True, bond_threshold=1.3, workers=workers ) system_data["rdf_results"][group_name][label] = (r, rdf, color) system_data["peak_infos"][group_name][label] = peak_info if len(rdf) > 0: current_max = np.max(rdf) if current_max > group_y_max_current: group_y_max_current = current_max if peak_info["position"] is not None: print(f" Peak for {label}: {peak_info['position']:.3f} Å (g(r) = {peak_info['value']:.2f})") else: print(f" No significant peak found for {label} in 1.5-3.0 Å range") except Exception as e: print(f"Error calculating RDF for {label}: {str(e)}") system_data["rdf_results"][group_name][label] = (np.array([]), np.array([]), color) system_data["peak_infos"][group_name][label] = {"position": None, "value": None} if group_y_max_current > group_y_max[group_name]: group_y_max[group_name] = group_y_max_current all_system_data[system_name] = system_data elapsed = time.time() - start_time print(f"\nCompleted processing for {system_name} in {elapsed:.2f} seconds") except Exception as e: print(f"Error processing {system_name}: {str(e)}") # 为每个分组添加余量 for group_name in group_y_max: group_y_max[group_name] = max(group_y_max[group_name] * 1.15, 3.0) # 确保最小值 # 第二步:生成符合期刊要求的图表 for system_name, system_data in all_system_data.items(): print(f"\nGenerating publication-quality plots for {system_name}") for group_name, group_data in system_data["rdf_results"].items(): fig, ax = plt.subplots(figsize=(8, 6)) # 设置坐标轴范围 xlim = group_x_max.get(group_name, (0, 6.0)) ylim = (0, group_y_max[group_name]) for label, (r, rdf, color) in group_data.items(): if len(r) > 0 and len(rdf) > 0: ax.plot(r, rdf, color=color, label=label, linewidth=2.0) ax.set_xlim(xlim) ax.set_ylim(ylim) # 期刊格式标签 ax.set_xlabel('Radial Distance (Å)', fontweight='bold') ax.set_ylabel('g(r)', fontweight='bold') # 添加体系名称到标题 ax.set_title(f"{system_name}: {title_map[group_name]}", fontsize=16, pad=15) # 精简图例 ncol = 3 if group_name == "Same_Species_HBonding" else 1 # 图5使用三列图例 ax.legend(ncol=ncol, loc='best', framealpha=0.8, fontsize=10) # 添加氢键区域标记(除O-O聚集图外) if group_name != "O_O_Aggregation": ax.axvspan(1.5, 2.5, alpha=0.1, color='green', zorder=0) ax.text(1.7, ylim[1]*0.85, 'H-bond Region', fontsize=10) # 添加网格 ax.grid(True, linestyle='--', alpha=0.5) # 保存高分辨率图片 plt.tight_layout() filename = os.path.join("RDF_Plots", f"RDF_{system_name}_{group_name}.tiff") plt.savefig(filename, bbox_inches='tight', dpi=600, format='tiff') print(f"Saved publication plot: {filename}") plt.close() # 保存Origin兼容数据 save_origin_data(system_name, system_data) print("\nAll RDF analysis completed successfully!") def save_origin_data(system_name, system_data): """保存Origin兼容格式数据""" os.makedirs("Origin_Data", exist_ok=True) system_dir = os.path.join("Origin_Data", system_name) os.makedirs(system_dir, exist_ok=True) # 保存峰值信息 peak_info_path = os.path.join(system_dir, f"Peak_Positions_{system_name}.csv") with open(peak_info_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Group", "Interaction", "Peak Position (A)", "g(r) Value"]) for group_name, peaks in system_data["peak_infos"].items(): for label, info in peaks.items(): if info["position"] is not None: writer.writerow([group_name, label, f"{info['position']:.3f}", f"{info['value']:.3f}"]) else: writer.writerow([group_name, label, "N/A", "N/A"]) print(f"Saved peak positions: {peak_info_path}") # 保存RDF数据 for group_name, group_results in system_data["rdf_results"].items(): group_dir = os.path.join(system_dir, group_name) os.makedirs(group_dir, exist_ok=True) for label, (r, rdf, color) in group_results.items(): if len(r) > 0 and len(rdf) > 0: safe_label = label.replace(" ", "_").replace("/", "_").replace("=", "_") safe_label = safe_label.replace("(", "").replace(")", "").replace("$", "") filename = f"RDF_{system_name}_{group_name}_{safe_label}.csv" filepath = os.path.join(group_dir, filename) with open(filepath, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Distance (A)", "g(r)"]) for i in range(len(r)): writer.writerow([f"{r[i]:.6f}", f"{rdf[i]:.6f}"]) print(f"Saved Origin data: {filename}") if __name__ == "__main__": # 设置命令行参数 parser = argparse.ArgumentParser(description='Calculate RDF for VASP simulations') parser.add_argument('--workers', type=int, default=multiprocessing.cpu_count(), help=f'Number of parallel workers (default: {multiprocessing.cpu_count()})') args = parser.parse_args() print(f"Starting RDF analysis with {args.workers} workers...") main(workers=args.workers) 以上代码实现了湿法磷酸体系中水 磷酸 水合氢离子以及氟之间的RDF计算,其中将O和H分别归类。沿用该代码的框架,修改其中的判别逻辑。首先识别P,在P周围搜寻O原子,如果该O原子距离在1.6埃以内则视为Op,而对于Op在其周围搜寻H原子,如果该H在距离1.3埃以内则视为成键即该H为Hp,通过是否有Hp则可以识别P-OH与P=O/P-O,在这里P=O和P-O还不能区分,如果默认磷酸中存在稳定存在P=O,则一般可以将离P最近的O视为P=O。接着体系中全部的O原子在去除Op之后剩下的O,在这些剩余的O周围搜寻整体的H,如果H的距离在1.2埃以内则视为成键,然后依照成键的H数量判定:如果H的数量为1,则记为-OH羟基(在这里不需要计算羟基部分,只是识别出来有利于逻辑完整性,并不参与RDF计算,也不需要特别标注表明),H的数量为2,则记为H2O水(该O也随之记为Ow,对应的两个H也记为Hw),如果H的数量为3,则记为水合氢离子(该O随之记为Oh,对应的三个H也记为Hh)。体系中存在质子转移的情况,所以需要每一帧重新识别原子的归属问题,如果H同时处于两个成键识别范围则按照就近原则,离哪个近则归属到哪一个(这里包括磷酸-磷酸,磷酸-水,磷酸-水合氢离子,水-水,水-水合氢离子,水合氢离子-水合氢离子,如果H同时处于某种情况下两个化学成键范围则采用就近原则),在实时重新归属质子的情况下,计算出包含质子转移部分的RDF,在这里,我们将排除自身化学键的阈值先设置为0,不需要只看氢键部分了。直接将-OH视为质子转移或者不完整而直接忽略即可,磷酸上的O需要通过H来进一步识别,所以符合距离的氧可暂时作为Op的候选,等H的识别完成再进行细分P=O P-O P-OH。输出一个可执行的命令,由于该逻辑确实较为复杂,综合考虑避免输出为0,可用核数为58,内存为100g

import numpy as np import matplotlib.pyplot as plt from pymatgen.io.vasp import Vasprun from pymatgen.core.structure import Structure from scipy.signal import savgol_filter from scipy.spatial import cKDTree from tqdm import tqdm import matplotlib as mpl import warnings from collections import defaultdict import os import csv import argparse import multiprocessing from functools import partial import time import dill # 忽略可能的警告 warnings.filterwarnings("ignore", category=UserWarning) # 专业绘图设置 - 符合Journal of Chemical Physics要求 plt.style.use('seaborn-v0_8-whitegrid') mpl.rcParams.update({ 'font.family': 'serif', 'font.serif': ['Times New Roman', 'DejaVu Serif'], 'font.size': 12, 'axes.labelsize': 14, 'axes.titlesize': 16, 'xtick.labelsize': 12, 'ytick.labelsize': 12, 'figure.dpi': 600, # 提高分辨率 'savefig.dpi': 600, 'figure.figsize': (8, 6), # 期刊常用尺寸 'lines.linewidth': 2.0, 'legend.fontsize': 10, 'legend.framealpha': 0.8, 'mathtext.default': 'regular', 'axes.linewidth': 1.5, # 加粗坐标轴线 'xtick.major.width': 1.5, 'ytick.major.width': 1.5, 'xtick.major.size': 5, 'ytick.major.size': 5, }) # 1. 增强的原子类型识别函数 - 逐帧识别 def identify_atom_types(struct): """识别所有关键原子类型并排除自身化学键""" # 磷酸氧分类 p_oxygens = {"P=O": [], "P-O": [], "P-OH": []} phosphate_hydrogens = [] # 仅P-OH基团中的H原子 # 水合氢离子识别 hydronium_oxygens = [] hydronium_hydrogens = [] # H₃O⁺中的H原子 # 普通水分子 water_oxygens = [] water_hydrogens = [] # 普通水中的H原子 # 氟离子 fluoride_atoms = [i for i, site in enumerate(struct) if site.species_string == "F"] # 铝离子 aluminum_atoms = [i for i, site in enumerate(struct) if site.species_string == "Al"] # 创建快速邻居查找表 neighbor_cache = defaultdict(list) for i, site in enumerate(struct): if site.species_string == "O": neighbors = struct.get_neighbors(site, r=1.3) h_neighbors = [n[0] for n in neighbors if n[0].species_string == "H"] neighbor_cache[i] = h_neighbors # 识别水合氢离子 (H₃O⁺) if len(h_neighbors) == 3: hydronium_oxygens.append(i) for h_site in h_neighbors: hydronium_hydrogens.append(h_site.index) # 识别磷酸基团 for site in struct: if site.species_string == "P": neighbors = struct.get_neighbors(site, r=2.0) # 扩大搜索半径 # 筛选氧原子邻居 o_neighbors = [(n[0], n[1]) for n in neighbors if n[0].species_string == "O"] if len(o_neighbors) < 4: # 如果找不到4个氧原子,使用旧方法 for neighbor in o_neighbors: nn_site = neighbor[0] if neighbor[1] < 1.55: p_oxygens["P=O"].append(nn_site.index) else: if any(n[0].species_string == "H" for n in struct.get_neighbors(nn_site, r=1.3)): p_oxygens["P-OH"].append(nn_site.index) else: p_oxygens["P-O"].append(nn_site.index) continue # 按距离排序 o_neighbors.sort(key=lambda x: x[1]) # 最近的氧原子为P=O p_double_o = o_neighbors[0][0] p_oxygens["P=O"].append(p_double_o.index) # 其他三个氧原子 for i in range(1, 4): o_site = o_neighbors[i][0] # 检查氧原子上是否有氢 if neighbor_cache.get(o_site.index, []): p_oxygens["P-OH"].append(o_site.index) else: p_oxygens["P-O"].append(o_site.index) # 识别P-OH基团中的H原子 (磷酸中的H) for o_idx in p_oxygens["P-OH"]: # 获取与P-OH氧相连的H原子 h_neighbors = neighbor_cache.get(o_idx, []) for h_site in h_neighbors: if h_site.species_string == "H": phosphate_hydrogens.append(h_site.index) # 识别普通水分子 (排除磷酸氧和水合氢离子) for i, site in enumerate(struct): if site.species_string == "O" and i not in hydronium_oxygens: is_phosphate_oxygen = False for cat in p_oxygens.values(): if i in cat: is_phosphate_oxygen = True break if not is_phosphate_oxygen: water_oxygens.append(i) # 识别普通水分子中的H原子 (水中的H) for o_idx in water_oxygens: h_neighbors = neighbor_cache.get(o_idx, []) for h_site in h_neighbors: if h_site.species_string == "H": water_hydrogens.append(h_site.index) return { "phosphate_oxygens": p_oxygens, "phosphate_hydrogens": phosphate_hydrogens, "water_oxygens": water_oxygens, "water_hydrogens": water_hydrogens, "hydronium_oxygens": hydronium_oxygens, "hydronium_hydrogens": hydronium_hydrogens, "fluoride_atoms": fluoride_atoms, "aluminum_atoms": aluminum_atoms } # 2. RDF计算函数 - 修复负值问题和序列化问题 def process_frame(struct, center_sel, target_sel, r_max, exclude_bonds, bond_threshold): """处理单帧结构计算,完全处理空原子类型情况""" # 每帧重新识别原子类型(关键!) atom_types = identify_atom_types(struct) # 获取中心原子和目标原子 centers = center_sel(atom_types) targets = target_sel(atom_types) # 处理空原子类型情况 - 第一重保护 if len(centers) == 0 or len(targets) == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": 0, "n_targets": 0, "volume": struct.volume } center_coords = np.array([struct[i].coords for i in centers]) target_coords = np.array([struct[i].coords for i in targets]) lattice = struct.lattice kdtree = cKDTree(target_coords, boxsize=lattice.abc) # 动态确定邻居数量 - 不超过目标原子数 k_val = min(50, len(targets)) # 处理目标原子数量为0的情况 - 第二重保护 if k_val == 0: return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 执行查询并确保结果统一格式 try: query_result = kdtree.query(center_coords, k=k_val, distance_upper_bound=r_max) except Exception as e: # 异常处理 - 返回空结果 print(f"KDTree query error: {str(e)}") return { "distances": np.array([], dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } # 统一处理不同维度的返回结果 if k_val == 1: # 处理单邻居情况 if isinstance(query_result, tuple): distances, indices = query_result else: distances = query_result indices = np.zeros_like(distances, dtype=int) # 确保数组格式 distances = np.atleast_1d(distances) indices = np.atleast_1d(indices) else: # 多邻居情况 distances, indices = query_result # 确保二维数组格式 if distances.ndim == 1: distances = distances.reshape(-1, 1) indices = indices.reshape(-1, 1) valid_distances = [] for i in range(distances.shape[0]): center_idx = centers[i] for j in range(distances.shape[1]): dist = distances[i, j] # 跳过超出范围的距离 if dist > r_max or np.isinf(dist): continue target_idx = targets[indices[i, j]] # 排除化学键 if exclude_bonds: actual_dist = struct.get_distance(center_idx, target_idx) if actual_dist < bond_threshold: continue valid_distances.append(dist) return { "distances": np.array(valid_distances, dtype=np.float64), "n_centers": len(centers), "n_targets": len(targets), "volume": struct.volume } def calculate_rdf_parallel(structures, center_sel, target_sel, r_max=8.0, bin_width=0.05, exclude_bonds=True, bond_threshold=1.3, workers=1): """ 并行计算径向分布函数 :param workers: 并行工作进程数 """ bins = np.arange(0, r_max, bin_width) hist = np.zeros(len(bins) - 1) total_centers = 0 total_targets = 0 total_volume = 0 # 准备参数 - 使用dill解决序列化问题 dill.settings['recurse'] = True func = partial(process_frame, center_sel=center_sel, target_sel=target_sel, r_max=r_max, exclude_bonds=exclude_bonds, bond_threshold=bond_threshold) # 使用多进程池 with multiprocessing.Pool(processes=workers) as pool: results = [] # 使用imap_unordered提高效率 for res in tqdm(pool.imap_unordered(func, structures), total=len(structures), desc="Calculating RDF"): results.append(res) # 处理结果 - 特别注意空结果处理 n_frames = 0 for res in results: if res is None: continue n_frames += 1 valid_distances = res["distances"] n_centers = res["n_centers"] n_targets = res["n_targets"] volume = res["volume"] # 累加计数 if len(valid_distances) > 0: hist += np.histogram(valid_distances, bins=bins)[0] total_centers += n_centers total_targets += n_targets total_volume += volume # 修正归一化 - 解决负值问题 if n_frames == 0: # 没有有效帧时返回空结果 r = bins[:-1] + bin_width/2 return r, np.zeros_like(r), {"position": None, "value": None} avg_density = total_targets / total_volume if total_volume > 0 else 0 r = bins[:-1] + bin_width/2 rdf = np.zeros_like(r) for i in range(len(hist)): r_lower = bins[i] r_upper = bins[i+1] shell_vol = 4/3 * np.pi * (r_upper**3 - r_lower**3) expected_count = shell_vol * avg_density * total_centers # 避免除以零 if expected_count > 1e-10: rdf[i] = hist[i] / expected_count else: rdf[i] = 0 # 更稳健的平滑处理 - 避免边界效应 if len(rdf) > 10: window_length = min(15, len(rdf)//2*2+1) polyorder = min(5, window_length-1) rdf_smoothed = savgol_filter(rdf, window_length=window_length, polyorder=polyorder, mode='mirror') else: rdf_smoothed = rdf # 计算主要峰值 peak_info = {} mask = (r >= 1.5) & (r <= 3.0) if np.any(mask) and np.any(rdf_smoothed[mask] > 0): peak_idx = np.argmax(rdf_smoothed[mask]) peak_pos = r[mask][peak_idx] peak_val = rdf_smoothed[mask][peak_idx] peak_info = {"position": peak_pos, "value": peak_val} else: peak_info = {"position": None, "value": None} return r, rdf_smoothed, peak_info # 3. 定义精细化的选择器函数(避免lambda序列化问题) def selector_phosphate_P_double_O(atom_types): return atom_types["phosphate_oxygens"]["P=O"] def selector_phosphate_P_OH(atom_types): return atom_types["phosphate_oxygens"]["P-OH"] def selector_phosphate_P_O(atom_types): return atom_types["phosphate_oxygens"]["P-O"] def selector_phosphate_hydrogens(atom_types): return atom_types["phosphate_hydrogens"] def selector_water_only_hydrogens(atom_types): """仅选择水分子中的氢原子""" return atom_types["water_hydrogens"] def selector_hydronium_only_hydrogens(atom_types): """仅选择水合氢离子中的氢原子""" return atom_types["hydronium_hydrogens"] def selector_water_only_oxygens(atom_types): """仅选择水分子中的氧原子""" return atom_types["water_oxygens"] def selector_hydronium_only_oxygens(atom_types): """仅选择水合氢离子中的氧原子""" return atom_types["hydronium_oxygens"] def selector_fluoride_atoms(atom_types): return atom_types["fluoride_atoms"] def selector_aluminum_atoms(atom_types): return atom_types["aluminum_atoms"] def selector_all_phosphate_oxygens(atom_types): return (atom_types["phosphate_oxygens"]["P=O"] + atom_types["phosphate_oxygens"]["P-O"] + atom_types["phosphate_oxygens"]["P-OH"]) # 4. 根据您的要求定义六张图的RDF分组配置 def get_rdf_groups(): """返回六张图的RDF分组配置(完全符合您的需求)""" return { # 图1: Al的配位情况 "Al_Coordination": [ (selector_aluminum_atoms, selector_fluoride_atoms, "Al-F", "blue"), (selector_aluminum_atoms, selector_water_only_oxygens, "Al-Ow", "green"), (selector_aluminum_atoms, selector_all_phosphate_oxygens, "Al-Op", "red") ], # 图2: F与H形成的氢键 "F_Hydrogen_Bonding": [ (selector_fluoride_atoms, selector_water_only_hydrogens, "F-Hw", "lightblue"), (selector_fluoride_atoms, selector_hydronium_only_hydrogens, "F-Hh", "blue"), (selector_fluoride_atoms, selector_phosphate_hydrogens, "F-Hp", "darkblue") ], # 图3: 磷酸作为受体与周围环境的氢键(区分氧类型) "Phosphate_Acceptor": [ (selector_phosphate_P_double_O, selector_water_only_hydrogens, "P=O···Hw", "orange"), (selector_phosphate_P_double_O, selector_hydronium_only_hydrogens, "P=O···Hh", "red"), (selector_phosphate_P_O, selector_water_only_hydrogens, "P-O···Hw", "lightgreen"), (selector_phosphate_P_O, selector_hydronium_only_hydrogens, "P-O···Hh", "green"), (selector_phosphate_P_OH, selector_water_only_hydrogens, "P-OH···Hw", "lightblue"), (selector_phosphate_P_OH, selector_hydronium_only_hydrogens, "P-OH···Hh", "blue") ], # 图4: 磷酸-水-水合氢离子交叉氢键(排除同种类型) "Cross_Species_HBonding": [ (selector_phosphate_hydrogens, selector_water_only_oxygens, "Hp···Ow", "pink"), (selector_phosphate_hydrogens, selector_hydronium_only_oxygens, "Hp···Oh", "purple"), (selector_water_only_hydrogens, selector_all_phosphate_oxygens, "Hw···Op", "lightgreen"), (selector_water_only_hydrogens, selector_hydronium_only_oxygens, "Hw···Oh", "green"), (selector_hydronium_only_hydrogens, selector_water_only_oxygens, "Hh···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_all_phosphate_oxygens, "Hh···Op", "blue") ], # 图5: 同类型分子内/间氢键(区分磷酸氧类型) "Same_Species_HBonding": [ (selector_phosphate_hydrogens, selector_phosphate_P_double_O, "Hp···P=O", "red"), (selector_phosphate_hydrogens, selector_phosphate_P_O, "Hp···P-O", "orange"), (selector_phosphate_hydrogens, selector_phosphate_P_OH, "Hp···P-OH", "yellow"), (selector_water_only_hydrogens, selector_water_only_oxygens, "Hw···Ow", "lightblue"), (selector_hydronium_only_hydrogens, selector_hydronium_only_oxygens, "Hh···Oh", "blue") ], # 图6: O-O聚集分析(Op不区分类型) "O_O_Aggregation": [ (selector_all_phosphate_oxygens, selector_water_only_oxygens, "Op-Ow", "blue"), (selector_all_phosphate_oxygens, selector_hydronium_only_oxygens, "Op-Oh", "green"), (selector_all_phosphate_oxygens, selector_all_phosphate_oxygens, "Op-Op", "red"), (selector_water_only_oxygens, selector_hydronium_only_oxygens, "Ow-Oh", "purple"), (selector_water_only_oxygens, selector_water_only_oxygens, "Ow-Ow", "cyan"), (selector_hydronium_only_oxygens, selector_hydronium_only_oxygens, "Oh-Oh", "magenta") ] } # 5. 主程序 - 优化并行处理 def main(workers=1): # 定义要处理的体系 vasprun_files = { "System1": "vasprun1.xml", "System2": "vasprun2.xml", "System3": "vasprun3.xml", "System4": "vasprun4.xml" } # 获取RDF分组配置 rdf_groups = get_rdf_groups() # 标题映射(根据您的要求) title_map = { "Al_Coordination": "Al Coordination Environment", "F_Hydrogen_Bonding": "F-H Hydrogen Bonding", "Phosphate_Acceptor": "Phosphate as H-bond Acceptor", "Cross_Species_HBonding": "Cross H-bonding between Different Species", "Same_Species_HBonding": "Intra- and Inter-molecular H-bonding", "O_O_Aggregation": "O-O Aggregation Analysis" } # 存储所有数据 all_system_data = {} group_y_max = {group_name: 0 for group_name in list(rdf_groups.keys())} group_x_max = { "Al_Coordination": (1.5, 3.5), "F_Hydrogen_Bonding": (1.0, 3.0), "Phosphate_Acceptor": (1.0, 3.0), "Cross_Species_HBonding": (1.0, 3.0), "Same_Species_HBonding": (1.0, 3.0), "O_O_Aggregation": (2.0, 6.0) } # 创建输出目录 os.makedirs("RDF_Plots", exist_ok=True) # 计算所有体系的所有RDF数据 for system_name, vasprun_file in vasprun_files.items(): print(f"\n{'='*50}") print(f"Processing {system_name}: {vasprun_file} with {workers} workers") print(f"{'='*50}") start_time = time.time() try: # 加载VASP结果 vr = Vasprun(vasprun_file, ionic_step_skip=5) structures = vr.structures print(f"Loaded {len(structures)} frames") # 存储体系数据 system_data = { "rdf_results": {}, "peak_infos": {} } # 计算所有RDF分组 for group_name, pairs in rdf_groups.items(): system_data["rdf_results"][group_name] = {} system_data["peak_infos"][group_name] = {} group_y_max_current = 0 for center_sel, target_sel, label, color in pairs: print(f"\nCalculating RDF for: {label}") try: r, rdf, peak_info = calculate_rdf_parallel( structures, center_sel, target_sel, r_max=10.0, exclude_bonds=True, bond_threshold=1.3, workers=workers ) system_data["rdf_results"][group_name][label] = (r, rdf, color) system_data["peak_infos"][group_name][label] = peak_info if len(rdf) > 0: current_max = np.max(rdf) if current_max > group_y_max_current: group_y_max_current = current_max if peak_info["position"] is not None: print(f" Peak for {label}: {peak_info['position']:.3f} Å (g(r) = {peak_info['value']:.2f})") else: print(f" No significant peak found for {label} in 1.5-3.0 Å range") except Exception as e: print(f"Error calculating RDF for {label}: {str(e)}") system_data["rdf_results"][group_name][label] = (np.array([]), np.array([]), color) system_data["peak_infos"][group_name][label] = {"position": None, "value": None} if group_y_max_current > group_y_max[group_name]: group_y_max[group_name] = group_y_max_current all_system_data[system_name] = system_data elapsed = time.time() - start_time print(f"\nCompleted processing for {system_name} in {elapsed:.2f} seconds") except Exception as e: print(f"Error processing {system_name}: {str(e)}") # 为每个分组添加余量 for group_name in group_y_max: group_y_max[group_name] = max(group_y_max[group_name] * 1.15, 3.0) # 确保最小值 # 第二步:生成符合期刊要求的图表 for system_name, system_data in all_system_data.items(): print(f"\nGenerating publication-quality plots for {system_name}") for group_name, group_data in system_data["rdf_results"].items(): fig, ax = plt.subplots(figsize=(8, 6)) # 设置坐标轴范围 xlim = group_x_max.get(group_name, (0, 6.0)) ylim = (0, group_y_max[group_name]) for label, (r, rdf, color) in group_data.items(): if len(r) > 0 and len(rdf) > 0: ax.plot(r, rdf, color=color, label=label, linewidth=2.0) ax.set_xlim(xlim) ax.set_ylim(ylim) # 期刊格式标签 ax.set_xlabel('Radial Distance (Å)', fontweight='bold') ax.set_ylabel('g(r)', fontweight='bold') # 添加体系名称到标题 ax.set_title(f"{system_name}: {title_map[group_name]}", fontsize=16, pad=15) # 精简图例 ncol = 3 if group_name == "Same_Species_HBonding" else 1 # 图5使用三列图例 ax.legend(ncol=ncol, loc='best', framealpha=0.8, fontsize=10) # 添加氢键区域标记(除O-O聚集图外) if group_name != "O_O_Aggregation": ax.axvspan(1.5, 2.5, alpha=0.1, color='green', zorder=0) ax.text(1.7, ylim[1]*0.85, 'H-bond Region', fontsize=10) # 添加网格 ax.grid(True, linestyle='--', alpha=0.5) # 保存高分辨率图片 plt.tight_layout() filename = os.path.join("RDF_Plots", f"RDF_{system_name}_{group_name}.tiff") plt.savefig(filename, bbox_inches='tight', dpi=600, format='tiff') print(f"Saved publication plot: {filename}") plt.close() # 保存Origin兼容数据 save_origin_data(system_name, system_data) print("\nAll RDF analysis completed successfully!") def save_origin_data(system_name, system_data): """保存Origin兼容格式数据""" os.makedirs("Origin_Data", exist_ok=True) system_dir = os.path.join("Origin_Data", system_name) os.makedirs(system_dir, exist_ok=True) # 保存峰值信息 peak_info_path = os.path.join(system_dir, f"Peak_Positions_{system_name}.csv") with open(peak_info_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Group", "Interaction", "Peak Position (A)", "g(r) Value"]) for group_name, peaks in system_data["peak_infos"].items(): for label, info in peaks.items(): if info["position"] is not None: writer.writerow([group_name, label, f"{info['position']:.3f}", f"{info['value']:.3f}"]) else: writer.writerow([group_name, label, "N/A", "N/A"]) print(f"Saved peak positions: {peak_info_path}") # 保存RDF数据 for group_name, group_results in system_data["rdf_results"].items(): group_dir = os.path.join(system_dir, group_name) os.makedirs(group_dir, exist_ok=True) for label, (r, rdf, color) in group_results.items(): if len(r) > 0 and len(rdf) > 0: safe_label = label.replace(" ", "_").replace("/", "_").replace("=", "_") safe_label = safe_label.replace("(", "").replace(")", "").replace("$", "") filename = f"RDF_{system_name}_{group_name}_{safe_label}.csv" filepath = os.path.join(group_dir, filename) with open(filepath, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Distance (A)", "g(r)"]) for i in range(len(r)): writer.writerow([f"{r[i]:.6f}", f"{rdf[i]:.6f}"]) print(f"Saved Origin data: {filename}") if __name__ == "__main__": # 设置命令行参数 parser = argparse.ArgumentParser(description='Calculate RDF for VASP simulations') parser.add_argument('--workers', type=int, default=multiprocessing.cpu_count(), help=f'Number of parallel workers (default: {multiprocessing.cpu_count()})') args = parser.parse_args() print(f"Starting RDF analysis with {args.workers} workers...") main(workers=args.workers) 以上代码实现了湿法磷酸中水和水合氢离子以及磷酸之间的RDF计算,它是一个包含质子转移过程的部分。我们在这里将沿用这个框架来实现相同指令可执行的文件。我将对内容和识别逻辑进行修改,我们在这里只识别P,在识别P之后搜寻周围的O,如果P-O之间的距离小于1.8埃,则将该O视为Op。然后我们将Op作为中心原子,体系中的所有H(是整个体系中的所有H)作为目标原子计算RDF,输出文本和图。

大家在看

recommend-type

基于ADS的微带滤波器设计

微波滤波器是用来分离不同频率微波信号的一种器件。它的主要作用是抑制不需要的信号,使其不能通过滤波器,只让需要的信号通过。在微波电路系统中,滤波器的性能对电路的性能指标有很大的影响,因此如何设计出一个具有高性能的滤波器,对设计微波电路系统具有很重要的意义。
recommend-type

Pixhawk4飞控驱动.zip

已安装成功
recommend-type

ztecfg中兴配置加解密工具3.0版本.rar

中兴光猫配置文件加解密工具3.0 .\ztecfg.exe -d AESCBC -i .\(要解密的文件名)db_user_cfg.xml -o (解密后文件名)123.cfg
recommend-type

配置车辆-feedback systems_an introduction for scientists and engineers

5.2 道路场景 从界面右侧的道路场景列表中,双击载入所需的道路场景(如 Fld_FreeWay)。 PanoSim提供了 ADAS标准(ISO、Euro NCAP)典型场景库,如高速公路、乡村道路、 城镇、坡道、换道、停车场、高速出入口等。我们可根据用户需要定制丰富场景库。 PanoSim提供专门的道路场景设计工具,可通过常用工具栏\Tools\FieldBuilder 来创建自己的道路场景。 5.3 天气和光照 从右侧的实验环境列表栏中,通过双击载入所需的实验天气和光照。天气有多 云、雾天、雨天、雪天、晴天,光照有白天和夜晚,相关实验信息(如所选场景、天 气、车辆等),可在左侧实验信息栏中查看。 5.4 配置车辆 点击“Forward”,进入实验参数设置主界面(图 5-2)。
recommend-type

xilinx.com_user_IIC_AXI_1.0.zip

可以直接用在vivado 2017.4版本里。查看各个寄存器就知道用来干什么了,一号寄存器分频系数,二号的start、stop信号,三号寄存器8bit数据,四号寄存器只读,返回IIC状态和ACK信号,其中二号的一个bit可以用来不等待从机ACK,方便使用。

最新推荐

recommend-type

开发界面语义化:声控 + 画图协同生成代码.doc

开发界面语义化:声控 + 画图协同生成代码.doc
recommend-type

Python程序TXLWizard生成TXL文件及转换工具介绍

### 知识点详细说明: #### 1. 图形旋转与TXL向导 图形旋转是图形学领域的一个基本操作,用于改变图形的方向。在本上下文中,TXL向导(TXLWizard)是由Esteban Marin编写的Python程序,它实现了特定的图形旋转功能,主要用于电子束光刻掩模的生成。光刻掩模是半导体制造过程中非常关键的一个环节,它确定了在硅片上沉积材料的精确位置。TXL向导通过生成特定格式的TXL文件来辅助这一过程。 #### 2. TXL文件格式与用途 TXL文件格式是一种基于文本的文件格式,它设计得易于使用,并且可以通过各种脚本语言如Python和Matlab生成。这种格式通常用于电子束光刻中,因为它的文本形式使得它可以通过编程快速创建复杂的掩模设计。TXL文件格式支持引用对象和复制对象数组(如SREF和AREF),这些特性可以用于优化电子束光刻设备的性能。 #### 3. TXLWizard的特性与优势 - **结构化的Python脚本:** TXLWizard 使用结构良好的脚本来创建遮罩,这有助于开发者创建清晰、易于维护的代码。 - **灵活的Python脚本:** 作为Python程序,TXLWizard 可以利用Python语言的灵活性和强大的库集合来编写复杂的掩模生成逻辑。 - **可读性和可重用性:** 生成的掩码代码易于阅读,开发者可以轻松地重用和修改以适应不同的需求。 - **自动标签生成:** TXLWizard 还包括自动为图形对象生成标签的功能,这在管理复杂图形时非常有用。 #### 4. TXL转换器的功能 - **查看.TXL文件:** TXL转换器(TXLConverter)允许用户将TXL文件转换成HTML或SVG格式,这样用户就可以使用任何现代浏览器或矢量图形应用程序来查看文件。 - **缩放和平移:** 转换后的文件支持缩放和平移功能,这使得用户在图形界面中更容易查看细节和整体结构。 - **快速转换:** TXL转换器还提供快速的文件转换功能,以实现有效的蒙版开发工作流程。 #### 5. 应用场景与技术参考 TXLWizard的应用场景主要集中在电子束光刻技术中,特别是用于设计和制作半导体器件时所需的掩模。TXLWizard作为一个向导,不仅提供了生成TXL文件的基础框架,还提供了一种方式来优化掩模设计,提高光刻过程的效率和精度。对于需要进行光刻掩模设计的工程师和研究人员来说,TXLWizard提供了一种有效的方法来实现他们的设计目标。 #### 6. 系统开源特性 标签“系统开源”表明TXLWizard遵循开放源代码的原则,这意味着源代码对所有人开放,允许用户自由地查看、修改和分发软件。开源项目通常拥有活跃的社区,社区成员可以合作改进软件,添加新功能,或帮助解决遇到的问题。这种开放性促进了技术创新,并允许用户根据自己的需求定制软件。 #### 7. 压缩包子文件的文件名称列表 文件名称列表中的“txlwizard-master”可能指的是TXLWizard项目的主版本库或主分支。这个名称表明了这是项目源代码的中心点,其他开发者会从这个主分支拉取代码进行合作开发或部署。以“-master”结尾通常是版本控制系统中表示主要开发线路的常见约定,例如Git中的master(现在更常被称为main)分支。 通过这些知识点的详细解释,我们可以看到TXLWizard不仅是一个用于生成TXL文件的工具,它还整合了一系列的功能,使得电子束光刻掩模的设计工作更为高效和直观。同时,作为一个开源项目,它能够借助社区的力量不断进步,为用户带来更多的便利和创新。
recommend-type

【创新图生成:扣子平台的技术前沿与创新思维】:引领图像生成技术的新潮流

# 1. 扣子平台的图像生成技术概述 在当今数字化时代,图像生成技术正成为推动技术创新和社会变革的关键力量。扣子平台正是这一技术革新的代表。本章将对扣子平台的图像生成技术进行一个全面概述,探究它如何将理论技术转化为实际应用,并带来显著的社会影响。 扣子平台运用了先进的图像生成技术,将现实世界和数字空间无缝融合,打造了沉浸式的视觉体验。其技术的核心是通过高度复杂的算法模型,快速生成高质
recommend-type

海康威视机器视觉工程师考核

<think>好的,我现在需要帮助用户了解海康威视机器视觉工程师的考核内容和要求。根据用户提供的引用资料,特别是引用[1]和[2],里面提到了考核素材包分为初级和中级,涵盖理论、算法、应用案例等。首先,我要整理这些信息,确保结构清晰,符合用户要求的格式。 接下来,我需要确认素材包的具体内容,比如初级和中级的不同点。引用[2]提到初级包含基础理论、算法实现和实际案例,中级则增加复杂算法和项目分析。这部分需要分点说明,方便用户理解层次。 另外,用户可能想知道如何准备考核,比如下载素材、学习顺序、模拟考核等,引用[2]中有使用说明和注意事项,这部分也要涵盖进去。同时要注意提醒用户考核窗口已关闭,
recommend-type

Linux环境下Docker Hub公共容器映像检测工具集

在给出的知识点中,我们需要详细解释有关Docker Hub、公共容器映像、容器编排器以及如何与这些工具交互的详细信息。同时,我们会涵盖Linux系统下的相关操作和工具使用,以及如何在ECS和Kubernetes等容器编排工具中运用这些检测工具。 ### Docker Hub 和公共容器映像 Docker Hub是Docker公司提供的一项服务,它允许用户存储、管理以及分享Docker镜像。Docker镜像可以视为应用程序或服务的“快照”,包含了运行特定软件所需的所有必要文件和配置。公共容器映像指的是那些被标记为公开可见的Docker镜像,任何用户都可以拉取并使用这些镜像。 ### 静态和动态标识工具 静态和动态标识工具在Docker Hub上用于识别和分析公共容器映像。静态标识通常指的是在不运行镜像的情况下分析镜像的元数据和内容,例如检查Dockerfile中的指令、环境变量、端口映射等。动态标识则需要在容器运行时对容器的行为和性能进行监控和分析,如资源使用率、网络通信等。 ### 容器编排器与Docker映像 容器编排器是用于自动化容器部署、管理和扩展的工具。在Docker环境中,容器编排器能够自动化地启动、停止以及管理容器的生命周期。常见的容器编排器包括ECS和Kubernetes。 - **ECS (Elastic Container Service)**:是由亚马逊提供的容器编排服务,支持Docker容器,并提供了一种简单的方式来运行、停止以及管理容器化应用程序。 - **Kubernetes**:是一个开源平台,用于自动化容器化应用程序的部署、扩展和操作。它已经成为容器编排领域的事实标准。 ### 如何使用静态和动态标识工具 要使用这些静态和动态标识工具,首先需要获取并安装它们。从给定信息中了解到,可以通过克隆仓库或下载压缩包并解压到本地系统中。之后,根据需要针对不同的容器编排环境(如Dockerfile、ECS、Kubernetes)编写配置,以集成和使用这些检测工具。 ### Dockerfile中的工具使用 在Dockerfile中使用工具意味着将检测工具的指令嵌入到构建过程中。这可能包括安装检测工具的命令、运行容器扫描的步骤,以及将扫描结果集成到镜像构建流程中,确保只有通过安全和合规检查的容器镜像才能被构建和部署。 ### ECS与Kubernetes中的工具集成 在ECS或Kubernetes环境中,工具的集成可能涉及到创建特定的配置文件、定义服务和部署策略,以及编写脚本或控制器来自动执行检测任务。这样可以在容器编排的过程中实现实时监控,确保容器编排器只使用符合预期的、安全的容器镜像。 ### Linux系统下的操作 在Linux系统下操作这些工具,用户可能需要具备一定的系统管理和配置能力。这包括使用Linux命令行工具、管理文件系统权限、配置网络以及安装和配置软件包等。 ### 总结 综上所述,Docker Hub上的静态和动态标识工具提供了一种方法来检测和分析公共容器映像,确保这些镜像的安全性和可靠性。这些工具在Linux开发环境中尤为重要,因为它们帮助开发人员和运维人员确保他们的容器映像满足安全要求。通过在Dockerfile、ECS和Kubernetes中正确使用这些工具,可以提高应用程序的安全性,减少由于使用不安全的容器镜像带来的风险。此外,掌握Linux系统下的操作技能,可以更好地管理和维护这些工具,确保它们能够有效地发挥作用。
recommend-type

【扣子平台图像艺术探究:理论与实践的完美结合】:深入学习图像生成的艺术

# 1. 图像艺术的理论基础 艺术领域的每一个流派和技巧都有其理论基础。在图像艺术中,理论基础不仅是对艺术表现形式的认知,也是掌握艺术创作内在逻辑的关键。深入理解图像艺术的理论基础,能够帮助艺术家们在创作过程中更加明确地表达自己的艺术意图,以及更好地与观众沟通。 图像艺术的理论
recommend-type

增广路定理的证明

<think>嗯,用户想查找增广路定理的证明,关于图论和网络流算法的。我得先理解增广路定理是什么,然后找到相关的证明方法。根据之前的引用,尤其是引用5提到最大流最小割定理,里面有三个等价条件,其中第二个是残余网络中没有增广路径时,流就是最大流。这可能和增广路定理相关,也就是当残余网络中没有增广路时,当前流就是最大流,这可能就是增广路定理的内容。 首先,我需要明确增广路定理的陈述。根据引用5,增广路定理可能指的是:一个流是最大流当且仅当残余网络中不存在增广路径。这个定理的证明需要用到最大流最小割定理,也就是第三个条件,即最大流的流量等于最小割的容量。 证明的步骤可能需要分为两个方向:必要性(
recommend-type

Pulse:基于SwiftUI的Apple平台高效日志记录与网络监控

从给定文件信息中,我们可以提取出以下IT知识点进行详细阐述: **Pulse概览:** Pulse是一个专门针对Apple平台(如iOS、iPadOS、macOS等)的功能强大的日志记录系统。其设计目的是为了简化开发者在这些平台上调试网络请求和应用日志的过程。Pulse的核心特色是它使用SwiftUI来构建,这有助于开发者利用现代Swift语言的声明式UI优势来快速开发和维护。 **SwiftUI框架:** SwiftUI是一种声明式框架,由苹果公司推出,用于构建用户界面。与传统的UIKit相比,SwiftUI使用更加简洁的代码来描述界面和界面元素,它允许开发者以声明的方式定义视图和界面布局。SwiftUI支持跨平台,这意味着同一套代码可以在不同的Apple设备上运行,大大提高了开发效率和复用性。Pulse选择使用SwiftUI构建,显示了其对现代化、高效率开发的支持。 **Network Inspector功能:** Pulse具备Network Inspector功能,这个功能使得开发者能够在开发iOS应用时,直接从应用内记录和检查网络请求和日志。这种内嵌式的网络诊断能力非常有助于快速定位网络请求中的问题,如不正确的URL、不返回预期响应等。与传统的需要外部工具来抓包和分析的方式相比,这样的内嵌式工具大大减少了调试的复杂性。 **日志记录和隐私保护:** Pulse强调日志是本地记录的,并保证不会离开设备。这种做法对隐私保护至关重要,尤其是考虑到当前数据保护法规如GDPR等的严格要求。因此,Pulse的设计在帮助开发者进行问题诊断的同时,也确保了用户数据的安全性。 **集成和框架支持:** Pulse不仅仅是一个工具,它更是一个框架。它能够记录来自URLSession的事件,这意味着它可以与任何使用URLSession进行网络通信的应用或框架配合使用,包括但不限于Apple官方的网络库。此外,Pulse与使用它的框架(例如Alamofire)也能够良好配合,Alamofire是一个流行的网络请求库,广泛应用于Swift开发中。Pulse提供了一个PulseUI视图组件,开发者可以将其集成到自己的应用中,从而展示网络请求和其他事件。 **跨平台体验:** 开发者不仅可以在iOS应用中使用Pulse Console记录日志,还可以在macOS上通过Pulse应用程序查看和共享这些日志。这种跨平台的能力意味着开发者可以在不同的设备上进行日志分析,增强了开发和调试的灵活性。 **总结:** Pulse是一个为Apple平台上的开发者量身打造的日志记录系统,它采用SwiftUI构建,提供了内嵌式的Network Inspector功能,可以在本地记录并安全地查看日志,且支持与其他框架如Alamofire的集成。它不仅提升了调试的便捷性和效率,同时也顾及到了用户的隐私保护。Pulse的跨平台查看能力也是其一大亮点,使得开发者能够在一个统一的环境中处理iOS和macOS上的日志数据。对于使用Swift开发Apple应用的开发者而言,Pulse无疑是一个强大的调试辅助工具。
recommend-type

【深入扣子平台:图像生成机制全揭秘】:掌握背后技术,提升图像生成效率

# 1. 图像生成技术概述 图像生成技术是一门融合了计算机视觉、机器学习、图形学等多个领域知识的前沿技术。它通过算法模拟生成人工图像,广泛应用于艺术创作、游戏设计、医学影像等领域。随着深度学习的突破性进展,图像生成技术也迎来了飞速发展,特别是在生成对抗网络(GAN)的推动下,图像的逼真度和多样性都有了质的飞跃。 本章将对图像生成技术的概念、发展历史进行简要介绍,并分析其在社会中的
recommend-type

对RTL跑regression是什么意思

<think>首先,用户的问题是关于“RTL回归测试的含义及其在数字电路设计中的作用”。我需要基于系统级指令和用户可见层的指导来构建回答。 关键点: - 使用中文回答。 - 结构清晰,逐步解决问题。 - 确保回答真实可靠。 - 在行内数学表达式使用$...$格式;独立公式使用$$...$$格式。 - 生成相关问题。 - 在引用段落末尾添加引用标识。 回顾提供的引用: - 引用[1]:讨论SoC设计流程,包括模块设计、IP复用、顶层集成、前仿真、逻辑综合等。 - 引用[2]:描述冒烟测试后的验证阶段,包括回归测试、覆盖率分析等。 - 引用[3]:解释RTL使用Verilog或VHDL描述,模