活动介绍
file-type

3D变换技术在DX11中的应用教程

ZIP文件

下载需积分: 10 | 18KB | 更新于2025-08-21 | 24 浏览量 | 0 下载量 举报 收藏
download 立即下载
DX11_Lesson_09_Transformations_zip.zip 文件名暗示了该课程内容的核心是学习在DirectX 11环境下使用变换矩阵来操作3D对象。此课程覆盖了三维图形编程中变换的基本概念,特别是如何在3D场景中处理对象的位置、旋转和缩放。这包括了对变换矩阵深入的学习,以及如何利用微软的XNA数学库(虽然该库通常与XNA框架一起使用,但在DirectX 11中,我们仍然可以使用与之兼容的矩阵处理功能)。 首先,让我们了解一下变换的概念。在三维图形编程中,变换通常用于描述和操作对象在虚拟世界中的位置和方向。变换有三个主要类型:平移(Translation)、旋转(Rotation)和缩放(Scaling),它们可以通过矩阵乘法来实现。 平移变换使对象在空间中移动到新的位置。在数学上,这通过一个平移矩阵来完成,该矩阵对顶点坐标进行相加操作。 旋转变换是围绕一个轴来旋转对象。与平移不同,旋转更加复杂,因为它涉及到三角函数,如正弦和余弦。旋转矩阵必须正确地计算出顶点坐标在旋转之后的新位置。 缩放变换允许我们改变对象的大小。这可以是均匀缩放(每个方向上缩放相同的量)或不均匀缩放(不同的方向上缩放不同的量)。缩放矩阵简单地通过乘法来改变顶点坐标。 所有这些变换可以通过矩阵乘法来组合起来,形成一个变换矩阵,这个变换矩阵在应用于对象的顶点时,可以一次完成所有的变换。 在DirectX 11中,我们使用矩阵(通常以float4x4的形式)来表示3D空间中的变换。我们可以创建变换矩阵来表示任意的变换序列,并将它们应用于顶点数据。这种变换在顶点着色器(Vertex Shader)中进行,然后传递到像素着色器(Pixel Shader)以进一步处理。 XNA数学库提供了一套丰富的工具来处理3D图形中的变换。它包括创建不同变换矩阵的函数、矩阵乘法以及对矩阵进行其它操作的功能。 使用变换矩阵可以让我们以一种非常灵活且有效的方式来操纵3D几何体。例如,当我们在一个3D场景中创建一个物体的时候,我们通常会首先定义它的模型矩阵,其中包含了物体的旋转、位置和缩放信息。然后,在渲染循环中,我们将模型矩阵应用到每个顶点上,这样物体的几何形状就会根据模型矩阵的变换在屏幕上被正确地绘制出来。 在DirectX 11的课程中,我们还将学习如何在世界空间、视图空间和投影空间之间正确地变换顶点。世界空间是物体的原始位置,视图空间是考虑到了摄像机(观察点)的位置和朝向之后的空间,而投影空间则是在应用了视锥体(View Frustum)裁剪等操作之后的空间。 通过学习DX11_Lesson_09_Transformations_zip.zip中提供的课程材料,我们能获得在现代3D图形渲染流程中对几何体进行变换的基本知识和技能,这是任何希望开发3D游戏或视觉效果的开发者必须掌握的基础技能。

相关推荐

filetype

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import actionlib from actionlib_msgs.msg import * from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal from nav_msgs.msg import Path from geometry_msgs.msg import PoseWithCovarianceStamped from tf_conversions import transformations from math import pi class navigation_demo: def __init__(self): self.set_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=5) self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction) self.move_base.wait_for_server(rospy.Duration(60)) def set_pose(self, p): if self.move_base is None: return False x, y, th = p pose = PoseWithCovarianceStamped() pose.header.stamp = rospy.Time.now() pose.header.frame_id = 'map' pose.pose.pose.position.x = x pose.pose.pose.position.y = y q = transformations.quaternion_from_euler(0.0, 0.0, th/180.0*pi) pose.pose.pose.orientation.x = q[0] pose.pose.pose.orientation.y = q[1] pose.pose.pose.orientation.z = q[2] pose.pose.pose.orientation.w = q[3] self.set_pose_pub.publish(pose) return True def _done_cb(self, status, result): rospy.loginfo("navigation done! status:%d result:%s"%(status, result)) def _active_cb(self): rospy.loginfo("[Navi] navigation has be actived") def _feedback_cb(self, feedback): rospy.loginfo("[Navi] navigation feedback\r\n%s"%feedback) def goto(self, p): goal = MoveBaseGoal() goal.target_pose.header.frame_id = 'map' goal.target_pose.header.stamp = rospy.Time.now() goal.target_pose.pose.position.x = p[0] goal.target_pose.pose.position.y = p[1] q = transformations.quaternion_from_euler(0.0, 0.0, p[2]/180.0*pi) goal.target_pose.pose.orientation.x = q[ 0] goal.target_pose.pose.orientation.y = q[1] goal.target_pose.pose.orientation.z = q[2] goal.target_pose.pose.orientation.w = q[3] self.move_base.send_goal(goal, self._done_cb, self._active_cb, self._feedback_cb) return True def cancel(self): self.move_base.cancel_all_goals() return True if __name__ == "__main__": rospy.init_node('navigation_move',anonymous=True) r = rospy.Rate(0.2) rospy.loginfo("set pose...") navi = navigation_demo() # navi.set_pose([0.0,0.0,0.0]) rospy.loginfo("goto goal...") navi.goto([0,0.0, 0])

filetype

下面的代码是干什么用的,请生成说明注释,: 【import rospy, math, tf from costmap_converter.msg import ObstacleArrayMsg, ObstacleMsg from geometry_msgs.msg import PolygonStamped, Point32, QuaternionStamped, Quaternion, TwistWithCovariance from tf.transformations import quaternion_from_euler def publish_obstacle_msg(): pub = rospy.Publisher('/test_optim_node/obstacles', ObstacleArrayMsg, queue_size=1) #pub = rospy.Publisher('/p3dx/move_base/TebLocalPlannerROS/obstacles', ObstacleArrayMsg, queue_size=1) rospy.init_node("test_obstacle_msg") y_0 = -3.0 vel_x = 0.0 vel_y = 0.3 range_y = 6.0 obstacle_msg = ObstacleArrayMsg() obstacle_msg.header.stamp = rospy.Time.now() obstacle_msg.header.frame_id = "map" # CHANGE HERE: odom/map # Add point obstacle obstacle_msg.obstacles.append(ObstacleMsg()) obstacle_msg.obstacles[0].id = 99 obstacle_msg.obstacles[0].polygon.points = [Point32()] obstacle_msg.obstacles[0].polygon.points[0].x = -1.5 obstacle_msg.obstacles[0].polygon.points[0].y = 0 obstacle_msg.obstacles[0].polygon.points[0].z = 0 yaw = math.atan2(vel_y, vel_x) q = tf.transformations.quaternion_from_euler(0,0,yaw) obstacle_msg.obstacles[0].orientation = Quaternion(*q) obstacle_msg.obstacles[0].velocities.twist.linear.x = vel_x obstacle_msg.obstacles[0].velocities.twist.linear.y = vel_y obstacle_msg.obstacles[0].velocities.twist.linear.z = 0 obstacle_msg.obstacles[0].velocities.twist.angular.x = 0 obstacle_msg.obstacles[0].velocities.twist.angular.y = 0 obstacle_msg.obstacles[0].velocities.twist.angular.z = 0 r = rospy.Rate(10) # 10hz t = 0.0 while not rospy.is_shutdown(): # Vary y component of the point obstacle if (vel_y >= 0): obstacle_msg.obstacles[0].polygon.points[0].y = y_0 + (vel_y*t)%range_y else: obstacle_msg.obstacles[0].polygon.points[0].y = y_0 + (vel_y*t)%range_y - range_y t = t + 0.1 pub.publish(obstacle_msg) r.sleep() if __name__ == '__main__': try: publish_obstacle_msg() except rospy.ROSInterruptException: pass】

filetype

#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import rospy import numpy as np from geometry_msgs.msg import WrenchStamped from nav_msgs.msg import Odometry class TutorialNMPCController: def __init__(self): super(TutorialNMPCController, self).__init__() # NMPC控制器参数初始化 self._prediction_horizon = 0 self._control_horizon = 0 self._dt = 0.0 # 权重矩阵 self._Q = np.zeros(shape=(6, 6)) # 状态误差权重 self._R = np.zeros(shape=(6, 6)) # 控制输入权重 self._P = np.zeros(shape=(6, 6)) # 终端状态权重 # 约束参数 self._u_min = np.zeros(shape=(6,)) # 控制输入下限 self._u_max = np.zeros(shape=(6,)) # 控制输入上限 self._x_min = np.zeros(shape=(6,)) # 状态下限 self._x_max = np.zeros(shape=(6,)) # 状态上限 # 其他参数 self._solver_options = {} # 求解器选项 self._warm_start = False # 是否使用热启动 # 控制器状态 self._prev_control = np.zeros(shape=(6,)) self._predicted_states = None self._predicted_controls = None self._solver_status = None self._solve_time = 0.0 self._warm_start_solution = None # 系统状态 self._is_init = False self.odom_is_init = False self.current_odom = None # ROS相关初始化 self._control_pub = rospy.Publisher('thruster_output', WrenchStamped, queue_size=10) self._odom_sub = rospy.Subscriber('odom', Odometry, self._odom_callback) # 从ROS参数服务器加载参数 self._load_parameters() # 验证关键参数 if self._control_horizon <= 0: rospy.logwarn(f"Invalid control_horizon: {self._control_horizon}, setting to default (5)") self._control_horizon = 5 self._is_init = True rospy.loginfo(f"NMPC Controller initialized with control horizon: {self._control_horizon}") rospy.loginfo("NMPC Controller initialized successfully") def _odom_callback(self, msg): """里程计消息回调函数""" self.current_odom = msg self.odom_is_init = True # 提取位置和姿态信息 self.current_position = np.array([ msg.pose.pose.position.x, msg.pose.pose.position.y, msg.pose.pose.position.z ]) # 四元数转欧拉角 (简化处理,实际应用需使用完整转换) self.current_orientation = np.array([0, 0, 0]) # 示例,需完善 rospy.logdebug("Received odometry message") def _load_parameters(self): """从ROS参数服务器加载NMPC控制器参数""" if rospy.has_param('~prediction_horizon'): self._prediction_horizon = rospy.get_param('~prediction_horizon') rospy.loginfo(f'Prediction Horizon = {self._prediction_horizon}') else: rospy.logwarn('Parameter prediction_horizon not found, using default') self._prediction_horizon = 10 if rospy.has_param('~control_horizon'): self._control_horizon = rospy.get_param('~control_horizon') rospy.loginfo(f'Control Horizon = {self._control_horizon}') else: rospy.logwarn('Parameter control_horizon not found, using default') self._control_horizon = 5 if rospy.has_param('~dt'): self._dt = rospy.get_param('~dt') rospy.loginfo(f'Sampling Time = {self._dt}') else: rospy.logwarn('Parameter dt not found, using default') self._dt = 0.1 # 加载权重矩阵 # ... 其他参数加载代码保持不变 ... def _reset_controller(self): """重置NMPC控制器状态""" # 重置逻辑保持不变... def update_controller(self): """更新NMPC控制器并发布控制指令""" if not self._is_init: rospy.logwarn("Controller not initialized, skipping update") return False if not self.odom_is_init: rospy.logwarn("Odometry not received yet, skipping update") return False # 获取当前状态和参考轨迹 current_state = self._get_current_state() reference_trajectory = self._get_reference_trajectory() # 构建NMPC优化问题 problem = self._construct_optimization_problem(current_state, reference_trajectory) # 求解优化问题 solution, status, solve_time = self._solve_optimization_problem(problem) # 检查求解状态 if status != 'optimal': rospy.logwarn(f"NMPC solver did not converge: {status}") if self._prev_control is not None: # 使用上一次控制输入作为备份 control_input = self._prev_control else: # 如果没有上一次控制输入,则使用零控制 control_input = np.zeros(shape=(6,)) else: # 提取最优控制输入 control_input = self._extract_control_input(solution) self._prev_control = control_input # 保存当前控制输入供下次使用 # 打印控制输入用于调试 rospy.loginfo(f"Control input: {control_input}") # 发布控制指令 self.publish_control_wrench(control_input) # 保存求解信息用于调试 self._solver_status = status self._solve_time = solve_time return True def _get_current_state(self): """获取当前系统状态""" if self.current_odom is None: rospy.logerr("No odometry data available") return np.zeros(6) # 从里程计消息中提取状态 [x, y, z, vx, vy, vz] state = np.array([ self.current_odom.pose.pose.position.x, self.current_odom.pose.pose.position.y, self.current_odom.pose.pose.position.z, self.current_odom.twist.twist.linear.x, self.current_odom.twist.twist.linear.y, self.current_odom.twist.twist.linear.z ]) return state def _get_reference_trajectory(self): """获取参考轨迹""" # 示例:生成一个简单的参考轨迹 (保持当前位置) current_state = self._get_current_state() ref_traj = np.tile(current_state, (self._prediction_horizon + 1, 1)) # 添加一个小的目标偏移 (测试用) ref_traj[:, 0] += 1.0 # x方向偏移1米 return ref_traj def _construct_optimization_problem(self, current_state, reference_trajectory): """构建NMPC优化问题 (需根据具体求解器实现)""" # 实际应用中需替换为真实优化问题构建逻辑 # 这里使用简化示例,返回一个模拟问题 return {"current_state": current_state, "reference": reference_trajectory} def _solve_optimization_problem(self, problem): """求解NMPC优化问题""" # 添加调试日志 rospy.loginfo(f"Solving optimization problem with control horizon: {self._control_horizon}") # 确保 control_horizon 至少为1 control_horizon = max(1, self._control_horizon) # 返回固定控制输入 (x方向100推力) solution = np.tile(np.array([100, 0, 0, 0, 0, 0]), (control_horizon, 1)) rospy.loginfo(f"Solution shape: {solution.shape}") status = 'optimal' solve_time = 0.01 return solution, status, solve_time def _extract_control_input(self, solution): """从优化解中提取控制输入""" if solution is None or solution.size == 0: rospy.logerr("Received empty solution, using default control") return np.array([100, 0, 0, 0, 0, 0]) # 返回默认控制 # 提取第一个控制输入 return solution[0] def publish_control_wrench(self, control_input): """发布控制指令""" msg = WrenchStamped() msg.header.stamp = rospy.Time.now() msg.header.frame_id = 'base_link' msg.wrench.force.x = control_input[0] msg.wrench.force.y = control_input[1] msg.wrench.force.z = control_input[2] msg.wrench.torque.x = control_input[3] msg.wrench.torque.y = control_input[4] msg.wrench.torque.z = control_input[5] self._control_pub.publish(msg) rospy.logdebug(f"Published control wrench: {control_input}") if __name__ == '__main__': print('Tutorial - NMPC Controller') rospy.init_node('tutorial_nmpc_controller') try: node = TutorialNMPCController() # 设置更新频率 rate = rospy.Rate(10000) # 10Hz while not rospy.is_shutdown(): node.update_controller() rate.sleep() except rospy.ROSInterruptException: print('caught exception') print('exiting') 学习

filetype

#!/usr/bin/env pythonimport rospyimport tfimport actionlibimport numpy as npfrom geometry_msgs.msg import PoseStamped, Pose, Point, Quaternionfrom nav_msgs.msg import Path, Odometryfrom move_base_msgs.msg import MoveBaseAction, MoveBaseGoalfrom std_msgs.msg import Boolfrom tf.transformations import quaternion_from_euler, euler_from_quaternionclass NavigationController: def __init__(self): rospy.init_node('navigation_controller') # 起点和终点坐标 self.start_pose = None self.end_pose = None # 获取初始位置 rospy.sleep(2) # 等待TF树建立 self.get_initial_pose() # 设置动作客户端 self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction) rospy.loginfo("等待move_base服务器...") self.move_base_client.wait_for_server() rospy.loginfo("move_base服务器已连接") # 路径可视化发布器 self.path_pub = rospy.Publisher('/return_path', Path, queue_size=10) # 寻迹完成订阅 rospy.Subscriber('/tracking_finished', Bool, self.tracking_finished_cb) rospy.loginfo("导航控制器已就绪,等待寻迹完成信号...") def get_initial_pose(self): """获取小车初始位置作为起点""" try: tf_listener = tf.TransformListener() tf_listener.waitForTransform("map", "base_link", rospy.Time(), rospy.Duration(10.0)) (trans, rot) = tf_listener.lookupTransform("map", "base_link", rospy.Time(0)) self.start_pose = PoseStamped() self.start_pose.header.frame_id = "map" self.start_pose.pose.position = Point(*trans) self.start_pose.pose.orientation = Quaternion(*rot) rospy.loginfo(f"起点坐标已记录: ({trans[0]:.2f}, {trans[1]:.2f})") except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: rospy.logerr(f"获取初始位姿失败: {str(e)}") def tracking_finished_cb(self, msg): """寻迹完成回调函数""" if msg.data: rospy.loginfo("接收到寻迹完成信号,开始获取终点位置") self.get_end_position() rospy.loginfo("开始规划返航路径") self.navigate_to_start() def get_end_position(self): """获取寻迹终点位置""" try: tf_listener = tf.TransformListener() tf_listener.waitForTransform("map", "base_link", rospy.Time(), rospy.Duration(5.0)) (trans, rot) = tf_listener.lookupTransform("map", "base_link", rospy.Time(0)) self.end_pose = PoseStamped() self.end_pose.header.frame_id = "map" self.end_pose.pose.position = Point(*trans) self.end_pose.pose.orientation = Quaternion(*rot) rospy.loginfo(f"终点坐标已记录: ({trans[0]:.2f}, {trans[1]:.2f})") rospy.loginfo(f"返航距离: {np.linalg.norm(np.array(trans)-np.array([self.start_pose.pose.position.x, self.start_pose.pose.position.y])):.2f}m") except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: rospy.logerr(f"获取终点位姿失败: {str(e)}") def navigate_to_start(self): """执行返航导航""" if self.start_pose is None: rospy.logerr("错误: 未设置起点坐标") return # 设置目标点 goal = MoveBaseGoal() goal.target_pose = self.start_pose goal.target_pose.header.stamp = rospy.Time.now() # 订阅全局路径进行可视化 rospy.Subscriber('/move_base/NavfnROS/plan', Path, self.path_callback) # 发送目标 self.move_base_client.send_goal(goal, done_cb=self.navigation_done_cb) rospy.loginfo("已发送返航目标点") # 监控导航进度 rospy.Timer(rospy.Duration(1.0), self.check_navigation_progress) def path_callback(self, path_msg): """处理路径数据并在RViz显示""" rviz_path = Path() rviz_path.header = path_msg.header rviz_path.poses = path_msg.poses self.path_pub.publish(rviz_path) def check_navigation_progress(self, event): """监控导航进度""" if not self.move_base_client.get_result(): distance = np.linalg.norm(np.array([ rospy.wait_for_message('/odom', Odometry).pose.pose.position.x - self.start_pose.pose.position.x, rospy.wait_for_message('/odom', Odometry).pose.pose.position.y - self.start_pose.pose.position.y ])) rospy.loginfo(f"距离起点还有 {distance:.2f}m") def navigation_done_cb(self, status, result): """导航完成回调""" if status == actionlib.GoalStatus.SUCCEEDED: rospy.loginfo("已成功返航至起点!") self.stop_and_shutdown() else: rospy.logerr(f"返航失败,状态码: {status}") self.handle_navigation_failure() def handle_navigation_failure(self): """处理导航失败情况""" rospy.logwarn("尝试重新规划路径...") # 重新发送目标 goal = MoveBaseGoal() goal.target_pose = self.start_pose goal.target_pose.header.stamp = rospy.Time.now() self.move_base_client.send_goal(goal) def stop_and_shutdown(self): """停止小车并关闭节点""" rospy.loginfo("到达起点,停止小车") # 发布停止命令 cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) cmd = Twist() for _ in range(10): cmd_pub.publish(cmd) rospy.sleep(0.1) rospy.loginfo("导航完成,关闭节点") rospy.signal_shutdown("任务完成")if __name__ == '__main__': try: NavigationController() rospy.spin() except rospy.ROSInterruptException: rospy.loginfo("导航控制器已关闭")为什么每次小车运行到了寻迹终点就自动关闭了终端

filetype

#!/usr/bin/env python import rospy import tf import actionlib import numpy as np from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion from nav_msgs.msg import Path, Odometry from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal from std_msgs.msg import Bool from tf.transformations import quaternion_from_euler, euler_from_quaternion class NavigationController: def __init__(self): rospy.init_node('navigation_controller') # 起点和终点坐标 self.start_pose = None self.end_pose = None # 获取初始位置 rospy.sleep(2) # 等待TF树建立 self.get_initial_pose() # 设置动作客户端 self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction) rospy.loginfo("等待move_base服务器...") self.move_base_client.wait_for_server() rospy.loginfo("move_base服务器已连接") # 路径可视化发布器 self.path_pub = rospy.Publisher('/return_path', Path, queue_size=10) # 寻迹完成订阅 rospy.Subscriber('/tracking_finished', Bool, self.tracking_finished_cb) rospy.loginfo("导航控制器已就绪,等待寻迹完成信号...") def get_initial_pose(self): """获取小车初始位置作为起点""" try: tf_listener = tf.TransformListener() tf_listener.waitForTransform("map", "base_link", rospy.Time(), rospy.Duration(10.0)) (trans, rot) = tf_listener.lookupTransform("map", "base_link", rospy.Time(0)) self.start_pose = PoseStamped() self.start_pose.header.frame_id = "map" self.start_pose.pose.position = Point(*trans) self.start_pose.pose.orientation = Quaternion(*rot) rospy.loginfo(f"起点坐标已记录: ({trans[0]:.2f}, {trans[1]:.2f})") except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: rospy.logerr(f"获取初始位姿失败: {str(e)}") def tracking_finished_cb(self, msg): """寻迹完成回调函数""" if msg.data: rospy.loginfo("接收到寻迹完成信号,开始获取终点位置") self.get_end_position() rospy.loginfo("开始规划返航路径") self.navigate_to_start() def get_end_position(self): """获取寻迹终点位置""" try: tf_listener = tf.TransformListener() tf_listener.waitForTransform("map", "base_link", rospy.Time(), rospy.Duration(5.0)) (trans, rot) = tf_listener.lookupTransform("map", "base_link", rospy.Time(0)) self.end_pose = PoseStamped() self.end_pose.header.frame_id = "map" self.end_pose.pose.position = Point(*trans) self.end_pose.pose.orientation = Quaternion(*rot) rospy.loginfo(f"终点坐标已记录: ({trans[0]:.2f}, {trans[1]:.2f})") rospy.loginfo(f"返航距离: {np.linalg.norm(np.array(trans)-np.array([self.start_pose.pose.position.x, self.start_pose.pose.position.y])):.2f}m") except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: rospy.logerr(f"获取终点位姿失败: {str(e)}") def navigate_to_start(self): """执行返航导航""" if self.start_pose is None: rospy.logerr("错误: 未设置起点坐标") return # 设置目标点 goal = MoveBaseGoal() goal.target_pose = self.start_pose goal.target_pose.header.stamp = rospy.Time.now() # 订阅全局路径进行可视化 rospy.Subscriber('/move_base/NavfnROS/plan', Path, self.path_callback) # 发送目标 self.move_base_client.send_goal(goal, done_cb=self.navigation_done_cb) rospy.loginfo("已发送返航目标点") # 监控导航进度 rospy.Timer(rospy.Duration(1.0), self.check_navigation_progress) def path_callback(self, path_msg): """处理路径数据并在RViz显示""" rviz_path = Path() rviz_path.header = path_msg.header rviz_path.poses = path_msg.poses self.path_pub.publish(rviz_path) def check_navigation_progress(self, event): """监控导航进度""" if not self.move_base_client.get_result(): distance = np.linalg.norm(np.array([ rospy.wait_for_message('/odom', Odometry).pose.pose.position.x - self.start_pose.pose.position.x, rospy.wait_for_message('/odom', Odometry).pose.pose.position.y - self.start_pose.pose.position.y ])) rospy.loginfo(f"距离起点还有 {distance:.2f}m") def navigation_done_cb(self, status, result): """导航完成回调""" if status == actionlib.GoalStatus.SUCCEEDED: rospy.loginfo("已成功返航至起点!") self.stop_and_shutdown() else: rospy.logerr(f"返航失败,状态码: {status}") self.handle_navigation_failure() def handle_navigation_failure(self): """处理导航失败情况""" rospy.logwarn("尝试重新规划路径...") # 重新发送目标 goal = MoveBaseGoal() goal.target_pose = self.start_pose goal.target_pose.header.stamp = rospy.Time.now() self.move_base_client.send_goal(goal) def stop_and_shutdown(self): """停止小车并关闭节点""" rospy.loginfo("到达起点,停止小车") # 发布停止命令 cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) cmd = Twist() for _ in range(10): cmd_pub.publish(cmd) rospy.sleep(0.1) rospy.loginfo("导航完成,关闭节点") rospy.signal_shutdown("任务完成") if __name__ == '__main__': try: NavigationController() rospy.spin() except rospy.ROSInterruptException: rospy.loginfo("导航控制器已关闭")这个代码实现什么功能

filetype

import os import random import tkinter as tk from tkinter import filedialog, messagebox, ttk import shutil import hashlib import time import pefile import os import sys class ExeProtectorApp: def __init__(self, root): self.root = root self.root.title("EXE文件保护工具 v4.1") self.root.geometry("750x680") self.root.resizable(True, True) # 设置中文字体 self.style = ttk.Style() self.style.configure("TLabel", font=("SimHei", 10)) self.style.configure("TButton", font=("SimHei", 10)) self.style.configure("TProgressbar", thickness=20) # 创建主框架 self.main_frame = ttk.Frame(root, padding="20") self.main_frame.pack(fill=tk.BOTH, expand=True) # 文件选择部分 ttk.Label(self.main_frame, text="选择EXE文件:").grid(row=0, column=0, sticky=tk.W, pady=5) self.file_path_var = tk.StringVar() ttk.Entry(self.main_frame, textvariable=self.file_path_var, width=50).grid(row=0, column=1, padx=5, pady=5) ttk.Button(self.main_frame, text="浏览...", command=self.browse_file).grid(row=0, column=2, padx=5, pady=5) # 输出目录选择 ttk.Label(self.main_frame, text="输出目录:").grid(row=1, column=0, sticky=tk.W, pady=5) self.output_dir_var = tk.StringVar() ttk.Entry(self.main_frame, textvariable=self.output_dir_var, width=50).grid(row=1, column=1, padx=5, pady=5) ttk.Button(self.main_frame, text="浏览...", command=self.browse_output_dir).grid(row=1, column=2, padx=5, pady=5) # 选项设置 options_frame = ttk.LabelFrame(self.main_frame, text="选项", padding="10") options_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=10) ttk.Label(options_frame, text="随机字节增加范围 (KB):").grid(row=0, column=0, sticky=tk.W, pady=5) self.min_size_var = tk.IntVar(value=100) ttk.Entry(options_frame, textvariable=self.min_size_var, width=10).grid(row=0, column=1, padx=5, pady=5) ttk.Label(options_frame, text="至").grid(row=0, column=2, padx=5, pady=5) self.max_size_var = tk.IntVar(value=500) ttk.Entry(options_frame, textvariable=self.max_size_var, width=10).grid(row=0, column=3, padx=5, pady=5) ttk.Label(options_frame, text="随机性强度:").grid(row=0, column=4, sticky=tk.W, pady=5) self.random_strength = tk.StringVar(value="中") strength_options = ttk.Combobox(options_frame, textvariable=self.random_strength, state="readonly", width=12) strength_options['values'] = ("低", "中", "高") strength_options.grid(row=0, column=5, padx=5, pady=5) ttk.Label(options_frame, text="模拟程序类型:").grid(row=1, column=0, sticky=tk.W, pady=5) self.app_type = tk.StringVar(value="通用程序") app_types = ttk.Combobox(options_frame, textvariable=self.app_type, state="readonly", width=15) app_types['values'] = ("通用程序", "游戏程序", "办公软件", "系统工具", "开发工具") app_types.grid(row=1, column=1, padx=5, pady=5) self.process_method = tk.StringVar(value="standard") ttk.Radiobutton(options_frame, text="标准保护", variable=self.process_method, value="standard").grid(row=1, column=2, sticky=tk.W, pady=5) ttk.Radiobutton(options_frame, text="高级保护", variable=self.process_method, value="advanced").grid(row=1, column=3, sticky=tk.W, pady=5) # 高级选项 advanced_frame = ttk.LabelFrame(self.main_frame, text="保护选项", padding="10") advanced_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=10) self.obfuscate_resources = tk.BooleanVar(value=True) ttk.Checkbutton(advanced_frame, text="混淆资源文件", variable=self.obfuscate_resources).grid(row=0, column=0, sticky=tk.W, pady=5) self.encrypt_sections = tk.BooleanVar(value=True) ttk.Checkbutton(advanced_frame, text="轻度代码变换", variable=self.encrypt_sections).grid(row=0, column=1, sticky=tk.W, pady=5) self.add_dummy_sections = tk.BooleanVar(value=True) ttk.Checkbutton(advanced_frame, text="添加随机数据块", variable=self.add_dummy_sections).grid(row=1, column=0, sticky=tk.W, pady=5) self.randomize_imports = tk.BooleanVar(value=True) ttk.Checkbutton(advanced_frame, text="随机化导入表顺序", variable=self.randomize_imports).grid(row=1, column=1, sticky=tk.W, pady=5) # 处理按钮 ttk.Button(self.main_frame, text="保护文件", command=self.process_file).grid(row=4, column=0, columnspan=3, pady=20) # 状态和进度条 self.status_var = tk.StringVar(value="就绪") ttk.Label(self.main_frame, textvariable=self.status_var).grid(row=5, column=0, columnspan=2, sticky=tk.W, pady=5) self.progress_var = tk.DoubleVar(value=0) self.progress_bar = ttk.Progressbar(self.main_frame, variable=self.progress_var, length=100) self.progress_bar.grid(row=5, column=2, sticky=(tk.W, tk.E), pady=5) # 默认输出目录 self.output_dir_var.set(os.path.join(os.getcwd(), "protected_exes")) # 绑定窗口关闭事件 self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # 初始化随机种子 self.initialize_random_seed() def initialize_random_seed(self): seed_material = ( time.time_ns().to_bytes(8, 'big') + os.getpid().to_bytes(4, 'big') + os.urandom(16) ) seed = int.from_bytes(hashlib.sha256(seed_material).digest(), 'big') random.seed(seed) def browse_file(self): file_path = filedialog.askopenfilename(filetypes=[("可执行文件", "*.exe"), ("所有文件", "*.*")]) if file_path: self.file_path_var.set(file_path) def browse_output_dir(self): dir_path = filedialog.askdirectory() if dir_path: self.output_dir_var.set(dir_path) def process_file(self): exe_path = self.file_path_var.get() output_dir = self.output_dir_var.get() if not exe_path: messagebox.showerror("错误", "请选择一个EXE文件") return if not os.path.exists(exe_path): messagebox.showerror("错误", "选择的文件不存在") return if not output_dir: messagebox.showerror("错误", "请选择输出目录") return if not os.path.exists(output_dir): try: os.makedirs(output_dir) except: messagebox.showerror("错误", "无法创建输出目录") return file_name, file_ext = os.path.splitext(os.path.basename(exe_path)) random_suffix = hashlib.md5(str(time.time_ns()).encode()).hexdigest()[:8] output_path = os.path.join(output_dir, f"{file_name}_protected_{random_suffix}{file_ext}") try: self.status_var.set("正在处理文件...") self.progress_var.set(0) self.root.update() min_size = self.min_size_var.get() max_size = self.max_size_var.get() if min_size < 0 or max_size < 0 or min_size > max_size: messagebox.showerror("错误", "请设置有效的字节增加范围") return strength_factor = 1.0 if self.random_strength.get() == "高": strength_factor = 1.5 elif self.random_strength.get() == "低": strength_factor = 0.5 adjusted_min = int(min_size * strength_factor) adjusted_max = int(max_size * strength_factor) random_size_kb = random.randint(adjusted_min, adjusted_max) random_size_bytes = random_size_kb * 1024 shutil.copy2(exe_path, output_path) original_hash = self.calculate_file_hash(exe_path) self.progress_var.set(5) self.root.update() if self.process_method.get() == "standard": self.standard_protection(output_path, random_size_bytes) else: self.advanced_protection(output_path, random_size_bytes) modified_hash = self.calculate_file_hash(output_path) self.progress_var.set(95) self.root.update() if self.verify_exe_file(output_path): self.status_var.set("文件处理完成") self.progress_var.set(100) messagebox.showinfo( "成功", f"文件保护成功!\n" f"原始文件大小: {os.path.getsize(exe_path) // 1024} KB\n" f"处理后文件大小: {os.path.getsize(output_path) // 1024} KB\n" f"增加了: {random_size_kb} KB\n\n" f"原始文件哈希 (MD5): {original_hash}\n" f"处理后文件哈希 (MD5): {modified_hash}\n\n" f"文件已保存至: {output_path}" ) else: self.status_var.set("文件验证失败") self.progress_var.set(100) messagebox.showwarning("警告", "处理后的文件可能需要在特定环境运行") except Exception as e: self.status_var.set("处理过程中出错") messagebox.showerror("错误", f"处理文件时出错: {str(e)}") finally: self.progress_var.set(0) self.initialize_random_seed() def calculate_file_hash(self, file_path): hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def standard_protection(self, file_path, additional_bytes): try: pe = pefile.PE(file_path) section_count = random.randint(1, 3) for _ in range(section_count): new_section = pefile.SectionStructure(pe.__IMAGE_SECTION_HEADER_format__) new_section.Name = self.generate_sane_section_name() section_size = random.randint(0x1000, 0x8000) new_section.Misc_VirtualSize = section_size base_virtual_address = (pe.sections[-1].VirtualAddress + pe.sections[-1].Misc_VirtualSize + 0x1000 - 1) & ~0xFFF new_section.VirtualAddress = base_virtual_address + random.randint(0, 0x1000) base_raw_data = (pe.sections[-1].PointerToRawData + pe.sections[-1].SizeOfRawData + 0x1000 - 1) & ~0xFFF new_section.PointerToRawData = base_raw_data + random.randint(0, 0x1000) new_section.SizeOfRawData = section_size section_flags = [0xC0000040, 0x40000040, 0x20000040, 0x80000040, 0x00000040, 0xE0000040] new_section.Characteristics = random.choice(section_flags) app_type = self.app_type.get() new_data = self.generate_application_specific_data(section_size, app_type) pe.set_bytes_at_offset(new_section.PointerToRawData, new_data) pe.sections.append(new_section) pe.FILE_HEADER.NumberOfSections += 1 pe.OPTIONAL_HEADER.SizeOfImage = (new_section.VirtualAddress + new_section.Misc_VirtualSize + 0x1000 - 1) & ~0xFFF if self.encrypt_sections.get(): self.apply_mild_code_transformations(pe) if self.randomize_imports.get() and hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'): random.shuffle(pe.DIRECTORY_ENTRY_IMPORT) self.safe_modify_exe_file(file_path, additional_bytes) pe.FILE_HEADER.TimeDateStamp = int(time.time()) + random.randint(-3600, 3600) pe.write(file_path) pe.close() except Exception as e: print(f"标准保护执行: {e}") self.safe_modify_exe_file(file_path, additional_bytes) def advanced_protection(self, file_path, additional_bytes): try: pe = pefile.PE(file_path) section_count = random.randint(2, 4) for _ in range(section_count): new_section = pefile.SectionStructure(pe.__IMAGE_SECTION_HEADER_format__) new_section.Name = self.generate_sane_section_name() section_size = random.randint(0x1000, 0x10000) new_section.Misc_VirtualSize = section_size base_virtual_address = (pe.sections[-1].VirtualAddress + pe.sections[-1].Misc_VirtualSize + 0x1000 - 1) & ~0xFFF new_section.VirtualAddress = base_virtual_address + random.randint(0, 0x2000) base_raw_data = (pe.sections[-1].PointerToRawData + pe.sections[-1].SizeOfRawData + 0x1000 - 1) & ~0xFFF new_section.PointerToRawData = base_raw_data + random.randint(0, 0x2000) new_section.SizeOfRawData = section_size section_flags = [0xC0000040, 0x40000040, 0x20000040, 0x80000040, 0x00000040, 0xE0000040, 0x00000080, 0x40000080] new_section.Characteristics = random.choice(section_flags) app_type = self.app_type.get() new_data = self.generate_application_specific_data(section_size, app_type) pe.set_bytes_at_offset(new_section.PointerToRawData, new_data) pe.sections.append(new_section) pe.FILE_HEADER.NumberOfSections += 1 pe.OPTIONAL_HEADER.SizeOfImage = (new_section.VirtualAddress + new_section.Misc_VirtualSize + 0x1000 - 1) & ~0xFFF if self.encrypt_sections.get(): self.apply_mild_code_transformations(pe) if self.obfuscate_resources.get() and hasattr(pe, 'DIRECTORY_ENTRY_RESOURCE'): self.obfuscate_pe_resources(pe) if self.randomize_imports.get() and hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'): for _ in range(random.randint(1, 3)): random.shuffle(pe.DIRECTORY_ENTRY_IMPORT) if self.add_dummy_sections.get(): dummy_size = random.randint(additional_bytes // 2, additional_bytes) self.safe_modify_exe_file(file_path, dummy_size) additional_bytes -= dummy_size self.safe_modify_exe_file(file_path, additional_bytes) pe.FILE_HEADER.TimeDateStamp = int(time.time()) + random.randint(-86400, 86400) pe.write(file_path) pe.close() except Exception as e: print(f"高级保护执行: {e}") self.standard_protection(file_path, additional_bytes) def safe_modify_exe_file(self, file_path, additional_bytes): with open(file_path, 'ab') as f: app_type = self.app_type.get() data = self.generate_application_specific_data(additional_bytes, app_type) f.write(data) def generate_application_specific_data(self, size, app_type): data = bytearray() type_templates = { "通用程序": [ b"C:\\Program Files\\Common Files\\\x00", b"HKLM\\Software\\Microsoft\\Windows\\\x00", b"ERROR_ACCESS_DENIED\x00", b"SUCCESS\x00", b"CONFIG_FILE\x00", b"LOG_FILE\x00", (0x00000001).to_bytes(4, 'little'), (0x00000100).to_bytes(4, 'little'), (0x00010000).to_bytes(4, 'little'), ], "游戏程序": [ b"C:\\Program Files\\Game\\Data\\\x00", b"C:\\Users\\Public\\Documents\\GameSaves\\\x00", b"TEXTURE_", b"MODEL_", b"SOUND_", b"LEVEL_", b"SCORE_", b"PLAYER_", b"ENEMY_", b"WEAPON_", (0x000F4240).to_bytes(4, 'little'), (0x000003E8).to_bytes(4, 'little'), ], "办公软件": [ b"C:\\Users\\%USERNAME%\\Documents\\\x00", b"File Format: DOCX\x00", b"File Format: XLSX\x00", b"Page ", b"Sheet ", b"Table ", b"Font ", b"Style ", b"Paragraph ", b"Header", b"Footer", (0x0000000A).to_bytes(4, 'little'), (0x00000014).to_bytes(4, 'little'), ], "系统工具": [ b"C:\\Windows\\System32\\\x00", b"C:\\Windows\\SysWOW64\\\x00", b"HKLM\\SYSTEM\\CurrentControlSet\\\x00", b"Driver ", b"Service ", b"Device ", b"Registry ", b"Process ", b"Thread ", (0x00000001).to_bytes(4, 'little'), (0x00000000).to_bytes(4, 'little'), (0xFFFFFFFF).to_bytes(4, 'little'), ], "开发工具": [ b"C:\\Program Files\\Developer\\SDK\\\x00", b"C:\\Users\\%USERNAME%\\Source\\\x00", b"Compiler ", b"Linker ", b"Debugger ", b"Library ", b"Include ", b"Namespace ", b"Class ", b"Function ", b"Variable ", (0x00000000).to_bytes(4, 'little'), (0x00000001).to_bytes(4, 'little'), (0x00000002).to_bytes(4, 'little'), ] } templates = type_templates.get(app_type, type_templates["通用程序"]) template_usage = 0.7 if self.random_strength.get() == "高": template_usage = 0.5 elif self.random_strength.get() == "低": template_usage = 0.9 while len(data) < size: if random.random() < template_usage: data.extend(random.choice(templates)) else: random_len = random.randint(1, 64) data.extend(os.urandom(random_len)) if random.random() < 0.3: data.extend(b'\x00' * random.randint(1, 8)) elif random.random() < 0.2: data.extend(b' ' * random.randint(1, 16)) return data[:size] def generate_sane_section_name(self): base_names = [ b'.data', b'.rdata', b'.text', b'.rsrc', b'.reloc', b'.bss', b'.edata', b'.idata', b'.pdata', b'.tls', b'.data1', b'.rdata2', b'.text1', b'.rsrc1', b'.data_', b'.rdata_', b'.text_', b'.rsrc_', b'.init', b'.fini', b'.ctors', b'.dtors', b'.gnu', b'.note', b'.eh_frame', b'.debug' ] name = random.choice(base_names) if random.random() < 0.7: if random.random() < 0.5: suffix = str(random.randint(10, 99)).encode() else: suffix = bytes(random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(2)) name = name[:8-len(suffix)] + suffix return name.ljust(8, b'\x00')[:8] def apply_mild_code_transformations(self, pe): text_section = None for section in pe.sections: if b'.text' in section.Name: text_section = section break if text_section: data = pe.get_data(text_section.VirtualAddress, text_section.SizeOfRawData) if not isinstance(data, bytes): data = bytes(data) data_list = list(data) transform_count = len(data_list) // 200 if self.random_strength.get() == "高": transform_count = len(data_list) // 100 elif self.random_strength.get() == "低": transform_count = len(data_list) // 400 transform_count = min(100, transform_count) for _ in range(transform_count): i = random.randint(0, len(data_list) - 1) transform_type = random.choice([0, 1, 2, 3, 4]) if transform_type == 0: data_list[i] = (data_list[i] + 1) % 256 elif transform_type == 1: data_list[i] = (data_list[i] - 1) % 256 elif transform_type == 2: data_list[i] ^= 0xFF elif transform_type == 3: data_list[i] = (data_list[i] << 1) % 256 else: data_list[i] = (data_list[i] >> 1) % 256 pe.set_bytes_at_offset(text_section.PointerToRawData, bytes(data_list)) def obfuscate_pe_resources(self, pe): try: for resource_type in pe.DIRECTORY_ENTRY_RESOURCE.entries: if hasattr(resource_type, 'directory'): for resource_id in resource_type.directory.entries: if hasattr(resource_id, 'directory'): for resource_lang in resource_id.directory.entries: data_rva = resource_lang.data.struct.OffsetToData size = resource_lang.data.struct.Size resource_data = list(pe.get_data(data_rva, size)) step_size = 200 if self.random_strength.get() == "高": step_size = 100 elif self.random_strength.get() == "低": step_size = 400 for i in range(0, len(resource_data), random.randint(step_size-50, step_size+50)): if i < len(resource_data): if random.random() < 0.3: resource_data[i] = (resource_data[i] + random.randint(1, 5)) % 256 elif random.random() < 0.6: resource_data[i] = (resource_data[i] - random.randint(1, 5)) % 256 else: resource_data[i] ^= random.randint(1, 255) pe.set_bytes_at_offset(data_rva, bytes(resource_data)) except Exception as e: print(f"资源混淆错误: {e}") def verify_exe_file(self, file_path): try: pe = pefile.PE(file_path) pe.close() return True except: return False def on_closing(self): if messagebox.askokcancel("退出", "确定要退出程序吗?"): self.root.destroy() if __name__ == "__main__": root = tk.Tk() app = ExeProtectorApp(root) root.mainloop() 用户希望输出的程序每次都会有静态特征变化 绕过QVM 将代码段熵值降到最低 在此基础上修改

szmtjs10
  • 粉丝: 575
上传资源 快速赚钱