活动介绍
file-type

维也纳技术大学EVRPTW课程:电动车辆路线优化

ZIP文件

下载需积分: 50 | 136KB | 更新于2025-08-12 | 143 浏览量 | 7 下载量 举报 1 收藏
download 立即下载
标题中提到的“electric_vehicle_routing_problem_with_time_windows”指的是一种特定类型的优化问题,即带有时间窗口的电动车辆路径问题(EVRPTW,Electric Vehicle Routing Problem with Time Windows)。这个问题是运筹学中车辆路径问题(Vehicle Routing Problem, VRP)的一个分支,专门针对电动车辆的物流和运输中特有的限制条件进行研究。与传统车辆路径问题相比,EVRPTW不仅需要考虑路线最优化,还要考虑电动车辆特有的能量消耗和充电需求,以及配送过程中可能遇到的时间窗口限制。 描述部分详细阐述了解决EVRPTW问题的一种伪代码启发式方法,这是设计智能算法以快速找到问题可行解的一种技术。启发式方法往往不保证找到最优解,但在实际应用中能够以合理的时间找到足够好的解。从描述中我们可以提炼出以下知识点: 1. 客户地图构建:使用距离保持器(如getInterCustomerDistances函数)获取客户间的距离,构建客户地图。这一步骤是为了后续的路线规划提供基础数据。 2. 路线构建:算法遍历每个客户,并基于某些策略将客户分配到不同的路线中。这里可能用到了贪心算法或最近邻居算法等启发式策略。 3. 考虑时间窗口:在规划路线时,需要考虑每个客户的时间窗口,即客户接受服务的时间范围。这是物流配送中的一个常见约束,以确保配送准时。 4. 电动车辆特性:由于是电动车辆,因此需要考虑每个客户点服务后的剩余电量。这涉及到电动车辆的能耗模型,需要在路线规划中实时计算并确保车辆能够到达下一个客户点或充电站。 5. 充电策略:在电量不足时,算法需要判断是否需要中途充电,并计算充电引起的时间偏移。这包括了寻找最近充电站的位置以及充电所需时间的估算。 6. 时间管理:算法需要实时跟踪当前时间,以及由于充电等因素造成的时间变化。这个步骤确保了时间窗口约束在路线规划中得到满足。 7. 优化目标:整个启发式过程的目标是优化运输和物流业的路线规划,尽量减少总行驶距离或总成本,同时满足所有的时间窗口和能量限制。 最后,提到的标签“Java”意味着这份文档或代码是使用Java编程语言来实现上述启发式算法的。而文件名“electric_vehicle_routing_problem_with_time_windows-master”表明这可能是一个包含有主代码库的项目或仓库,用于管理和发布与EVRPTW相关的软件代码和资源。 综上所述,这段文件信息为我们提供了EVRPTW问题的基本概念,解决EVRPTW问题的启发式方法,以及如何使用Java语言实现相关算法。这对于想要深入研究或开发相关物流配送优化软件的开发者来说,是一份宝贵的知识材料。

相关推荐

filetype

from ortools.constraint_solver import routing_enums_pb2 from ortools.constraint_solver import pywrapcp import numpy as np import datetime import math # ================== 数据结构定义 ================== class EnhancedVehicle: def __init__(self, vid, capacity, is_own, start_node, partitions): self.vid = vid # 车辆ID self.capacity = capacity # 车辆容量 self.is_own = is_own # 是否自有车 self.start_node = start_node # 出发节点 self.partitions = partitions # 允许访问分区 class EnhancedCustomer: def __init__(self, demand, time_window, partition, is_overdue): self.demand = demand # 需求重量 self.time_window = time_window # 时间窗(分钟) self.partition = partition # 所属分区 self.is_overdue = is_overdue # 是否超时订单 # ================== 数据生成器 ================== def create_enhanced_data_model(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime): """生成增强的测试数据集""" data = {} np.random.seed(42) # 基本参数 data['depot'] = 0 # 分拣中心节点 # current_time = datetime.datetime.now() data['base_time'] = current_time data['cross_time'] = fixed_cross_time # 允许跨区时间(分钟) # 生成200个客户订单(包含20%超时订单) data['customers'] = orders data['lenCustomers'] = len(data['customers']) for i in range(len(data['customers'])): if data['customers'][i].isOutTime: data['customers'][i].startTW = 0 data['customers'][i].endTW = 3600 * 48 # start = np.random.randint(0, 480) # end = start + np.random.randint(60, 240) # is_overdue = np.random.rand() < 0.2 # 20%超时 # data['customers'].append( # EnhancedCustomer( # demand=np.random.randint(1, 5), # time_window=(start, end) if not is_overdue else (0, 1440), # partition=np.random.randint(0, 10), # is_overdue=is_overdue # ) # ) # 生成15辆车(自有车10辆,外请车5辆) data['vehicles'] = vehicles data['lenVehicles'] = len(data['vehicles']) for vid in range(len(data['vehicles'])): data['vehicles'][vid].start_node = vid + 1 # 每辆车有独立出发位置 # data['vehicles'].append( # EnhancedVehicle( # vid=vid, # capacity=np.random.randint(20, 50) if vid < 10 else np.random.randint(15, 40), # is_own=(vid < 10), # start_node=start_node, # partitions=[vid % 10] + ([] if vid < 10 else [(vid % 10 + 1) % 10]) # ) # ) # 构建距离矩阵(包含分拣中心+车辆出发位置+客户位置) # node_count = 1 + len(data['vehicles']) + len(data['customers']) # 分拣中心(0) + 车辆起点(1-15) + 客户(16-215) data['distance_matrix'] = distance_matrix #np.random.randint(5, 100, (node_count, node_count)) data['time_matrix'] = time_matrix # np.fill_diagonal(data['distance_matrix'], 0) data['convertDistanceTime'] = convertDistanceTime data['customer_start_idx'] = 1 + len(data['vehicles']) #更改字段名称 for i in range(len(data['vehicles'])): data['vehicles'][i].vid = i for item in data['vehicles']: item.capacity = item.residualVolumeMax all_allowed_zones = [] for item in data['vehicles']: all_allowed_zones += item.areaIds all_allowed_zones = list(set(all_allowed_zones)) for item in data['vehicles']: # item.allowed_zones = item.areaIds if item.areaIds == []: item.allowed_zones = all_allowed_zones else: item.allowed_zones = item.areaIds item.allowed_zones = [int(item1) for item1 in item.allowed_zones] for item in data['customers']: item.zone = int(item.areaId) return data # ================== 核心约束实现 ================== def add_partition_constraints(routing, manager, data, time_dimension, fixed_cross_time): """实现动态分区访问控制""" solver = routing.solver() # 1. 创建分区状态维度 def zone_callback(from_index): from_node = manager.IndexToNode(from_index) # if from_node == data['depot']: # return -2 # 分拣中心特殊标记 if from_node < data['customer_start_idx']: return -1 # 车辆起点特殊标记 customer_idx = from_node - data['customer_start_idx'] return data['customers'][customer_idx].zone zone_callback_idx = routing.RegisterUnaryTransitCallback(zone_callback) routing.AddDimension( zone_callback_idx, 0, # slack_max 1000, # 分区容量上限 False, # start_cumul_to_zero 'Zone' ) zone_dim = routing.GetDimensionOrDie('Zone') # 2. 为每辆车添加动态分区约束 for vehicle in data['vehicles']: vehicle_id = vehicle.vid allowed_zones = set(vehicle.allowed_zones) cross_time = int(fixed_cross_time) #vehicle.cross_time # 动态分区检查回调 def zone_check_callback(from_index, to_index): nonlocal allowed_zones, cross_time to_node = manager.IndexToNode(to_index) # 允许返回分拣中心 if to_node == data['depot']: return True # 非客户节点直接允许 if to_node < data['customer_start_idx']: return True # 获取目标分区 zone = zone_callback(to_index) # 获取到达时间 arrival_time_var = time_dimension.CumulVar(to_index) min_arrival = int(solver.Min(arrival_time_var)) # 判断是否达到切换阈值 if min_arrival < cross_time: # 检查当前是否已有访问分区 start_index = routing.Start(vehicle_id) first_node_index = routing.NextVar(start_index) # 获取首个分区变量 first_zone_var = zone_dim.CumulVar(first_node_index) # 添加约束:时间阈值前必须与首个分区一致 solver.Add( solver.IfThen( zone_dim.CumulVar(to_index) != -1, zone_dim.CumulVar(to_index) == first_zone_var ) ) # 添加约束:首个分区必须在允许分区内 solver.Add( solver.IfThen( first_zone_var != -1, solver.Member(first_zone_var, list(allowed_zones)) ) ) else: # 添加约束:时间阈值后必须在允许分区内 solver.Add( solver.Member(zone_dim.CumulVar(to_index), list(allowed_zones)) ) # # # 添加约束:时间阈值前必须与首个分区一致 # # if solver.Value(zone_dim.CumulVar(to_index)) != -1: # # solver.Add(zone_dim.CumulVar(to_index) == first_zone_var) # # # # # 添加约束:首个分区必须在允许分区内 # # if solver.Value(first_zone_var) != -1: # # solver.Add(solver.Member(first_zone_var, list(allowed_zones))) # # else: # # # 添加约束:时间阈值后必须在允许分区内 # # if solver.Value(zone_dim.CumulVar(to_index)) != -1: # # solver.Add(solver.Member(zone_dim.CumulVar(to_index), list(allowed_zones))) # # return True # 注册转移回调 zone_check_idx = routing.RegisterTransitCallback(zone_check_callback) # # 3. 添加软约束确保路径可行性 # penalty = 1000000 # 大惩罚值 # routing.AddDisjunction( # [manager.NodeToIndex(i) for i in range(manager.GetNumberOfNodes())], # penalty # ) return routing # for vehicle in data['vehicles']: # vehicle_id = vehicle.vid # allowed_zones = vehicle.allowed_zones # cross_time = fixed_cross_time #vehicle.cross_time # # def zone_check(from_index, to_index): # from_node = manager.IndexToNode(from_index) # to_node = manager.IndexToNode(to_index) # # # 允许返回分拣中心 # if to_node == data['depot']: # return True # # # 客户节点处理 # if to_node >= data['customer_start_idx']: # customer = data['customers'][to_node - data['customer_start_idx']] # to_zone = customer.zone # else: # return True # 车辆起始点 # # # 获取当前时间 # time_var = time_dimension.CumulVar(to_index) # current_time = routing.solver().Min(time_var) # # # 判断时间阶段 # if current_time < cross_time: # # 检查路径中已访问的分区 # path_zones = set() # index = routing.Start(vehicle_id) # while not routing.IsEnd(index): # node = manager.IndexToNode(index) # if node >= data['customer_start_idx']: # zone = data['customers'][node - data['customer_start_idx']].zone # path_zones.add(zone) # index = routing.NextVar(index) #solution.Value(routing.NextVar(index)) # # # 允许访问的情况 # return (len(path_zones) == 0 and to_zone in allowed_zones) or \ # (len(path_zones) > 0 and to_zone in path_zones) # else: # return to_zone in allowed_zones # # next_var = routing.NextVar(routing.Start(vehicle_id)) # # 添加约束到车辆 # routing.solver().AddConstraint( # routing.solver().CheckAssignment(next_var, zone_check)) # ================== 主求解逻辑 ================== def optimized_vrp_solver(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime): data = create_enhanced_data_model(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime) # a = len(data['distance_matrix']) # data = {} # data['distance_matrix'] = [[0,1,2,3,4,5,6,7,8,9,10], # [1,0,2,3,4,5,6,7,8,9,10], # [2,1,0,3,4,5,6,7,8,9,10], # [3,1,2,0,4,5,6,7,8,9,10], # [4,1,2,3,0,5,6,7,8,9,10], # [5,1,2,3,4,0,6,7,8,9,10], # [6,1,2,3,4,5,0,7,8,9,10], # [7,1,2,3,4,5,6,0,8,9,10], # [8,1,2,3,4,5,6,7,0,9,10], # [9,1,2,3,4,5,6,7,8,0,10], # [10,1,2,3,4,5,6,7,8,9,0], # ] # data['demands'] = [0] * 6 + [item.demand for item in data['customers']] # data["vehicle_capacities"] = [item.capacity for item in data['vehicles']] # 3辆车的容量 # data["demands"] = [0, 0, 0, 0, 0, 0, 2.7, 2, 2, 1, 3] # 分拣中心和起点需求为0 # data["vehicle_capacities"] = [5, 7, 10, 6, 1] # 3辆车的容量 # print(data["demands"]) # print(data["vehicle_capacities"]) # 数据准备阶段 SCALE_FACTOR = 1000 # 转换为克/毫升 # data['demands'] = [int(d * SCALE_FACTOR) for d in data['demands']] # data['demands'] = [int(d * SCALE_FACTOR) for d in data['demands']] # data['customers'] = [int(item.demand * SCALE_FACTOR) for item in data['customers']] for item in data['customers']: item.demand = int(item.demand * SCALE_FACTOR) for item in data['vehicles']: item.capacity = int(item.capacity * SCALE_FACTOR) # data['vehicles'][0].capacity = 5000 # data['vehicles'][1].capacity = 2500 # data['vehicles'][2].capacity = 3500 # data['vehicles'][3].capacity = 15000 # data['vehicles'][4].capacity = 10000 # data["vehicle_capacities"] = [int(c * SCALE_FACTOR) for c in data["vehicle_capacities"]] # # 添加安全边界 # SAFETY_FACTOR = 0.999 # 99.9%容量利用率 # data["vehicle_capacities"] = [cap * SAFETY_FACTOR for cap in data["vehicle_capacities"]] print([item.demand for item in data['customers']]) print([item.capacity for item in data['vehicles']]) # print(data["demands"]) # print(data["vehicle_capacities"]) data["num_vehicles"] = len(data['vehicles']) # 每辆车的起点(索引1,2,3),终点都是分拣中心(索引0) # data["starts"] = [1, 2, 3, 4, 5] # data["ends"] = [0, 0, 0,0,0] # # 距离矩阵(包含分拣中心和所有起点) # data["distance_matrix"] = [ # [0, 5, 8, 6, 7, 1, 2, 10], # 分拣中心(索引0) # [5, 0, 6, 3, 2, 1, 2, 9], # 起点1 # [8, 6, 0, 8, 4, 1, 2, 8], # 起点2 # [6, 3, 8, 0, 5, 1, 2, 7], # 起点3 # [7, 2, 4, 5, 0, 1, 2, 3], # 客户点 # [7, 2, 4, 5, 2, 0, 2, 6], # 客户点 # [7, 2, 4, 5, 6, 1, 0, 5], # 客户点 # [7, 2, 4, 5, 6, 1, 2, 0], # 客户点 # ] # data["demands"] = [0, 0, 0, 0, 3,4,5,6] # 分拣中心和起点需求为0 # data["vehicle_capacities"] = [25, 20, 10] # 3辆车的容量 # data["num_vehicles"] = 3 # # 每辆车的起点(索引1,2,3),终点都是分拣中心(索引0) # data["starts"] = [1, 2, 3] # data["ends"] = [0, 0, 0] manager = pywrapcp.RoutingIndexManager( len(data['distance_matrix']), data["num_vehicles"], [v.start_node for v in data['vehicles']], # 各车起始位置 data["starts"], # [data['depot']] * len(data['vehicles']) #data["ends"] # ) # 统一返回分拣中心 routing = pywrapcp.RoutingModel(manager) # ========== 注册核心回调函数 ========== def distance_callback(from_index, to_index): from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return data["distance_matrix"][from_node][to_node] transit_callback_idx = routing.RegisterTransitCallback(distance_callback) routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_idx) # ========== 时间维度约束 ========== service_time = 10 # 每个客户服务时间10分钟 def time_callback(from_index, to_index): from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) travel_time = data['time_matrix'][from_node][to_node] # travel_time = distance_callback(from_index, to_index) * data['convertDistanceTime'] * 60 # 假设速度1单位/分钟 travel_time = travel_time + (data['customers'][from_node - data['customer_start_idx']].servTime if from_node >= data[ 'customer_start_idx'] else 0) return int(travel_time) time_callback_idx = routing.RegisterTransitCallback(time_callback) routing.AddDimension( time_callback_idx, slack_max=0, # 不允许等待 capacity=24 * 3600, # 最大时间限制(24小时)24 * 3600, # 车辆最大工作时间 fix_start_cumul_to_zero=False,# 起始时间为当前时间 name='Time' ) time_dimension = routing.GetDimensionOrDie('Time') # 设置车辆起始时间(当前系统时间) for vehicle in data['vehicles']: vehicle_id = vehicle.vid start_index = routing.Start(vehicle_id) time_dimension.CumulVar(start_index).SetMin(current_time) time_dimension.CumulVar(start_index).SetMax(current_time) # 设置客户时间窗 for customer_idx in range(len(data['customers'])): index = manager.NodeToIndex(customer_idx + data['customer_start_idx']) # 客户节点从16开始 print(index) if not data['customers'][customer_idx].isOutTime: time_dimension.CumulVar(index).SetRange( data['customers'][customer_idx].startTW, data['customers'][customer_idx].endTW ) # ========== 容量约束 ========== def demand_callback(from_index): node = manager.IndexToNode(from_index) if node < data['customer_start_idx']: return 0 # 非客户节点无需求 return data['customers'][node - data['customer_start_idx']].demand # def demand_callback(from_index): # from_node = manager.IndexToNode(from_index) # # print('from_index', from_index) # # print('node',node) # return data['demands'][from_node] demand_callback_idx = routing.RegisterUnaryTransitCallback(demand_callback) routing.AddDimensionWithVehicleCapacity( demand_callback_idx, 0, # null capacity slack [v.capacity for v in data['vehicles']], #data["vehicle_capacities"], # True, 'Capacity' ) capacity_dimension = routing.GetDimensionOrDie('Capacity') # capacity_dimension.SetCumulVarSoftUpperBoundToZero() # 严格约束 # ========== 高级约束 ========== #1. 分区动态约束 routing = add_partition_constraints(routing, manager, data, time_dimension, fixed_cross_time) # 2. 自有车优先(设置不同固定成本) for vid in range(len(data['vehicles'])): routing.SetFixedCostOfVehicle(100 if data['vehicles'][vid].isHaveTask == 0 else 10000, vid) # # 3. 超时订单优先(设置不同惩罚值) # penalty = 1000000 # 未服务的惩罚 # for customer_idx in range(len(data['customers'])): # penalty = 1000000 if data['customers'][customer_idx].isOutTime else 1000 # routing.AddDisjunction( # [manager.NodeToIndex(customer_idx + data['customer_start_idx'])], # penalty # ) # ========== 求解参数配置 ========== search_params = pywrapcp.DefaultRoutingSearchParameters() search_params.first_solution_strategy = ( routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC ) search_params.local_search_metaheuristic = ( routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH ) search_params.time_limit.seconds = 60 # 60秒求解时间 # search_params.log_search = True # ========== 执行求解 ========== solution = routing.SolveWithParameters(search_params) # ========== 结果解析 ========== if solution: """打印解决方案""" total_distance = 0 for vehicle_id in range(data["num_vehicles"]): print('vehicle_id', vehicle_id) index = routing.Start(vehicle_id) node_index = manager.IndexToNode(index) print('node_index', node_index) plan_output = f"车辆 {vehicle_id} 路线:\n" route_distance = 0 route_load = 0 while not routing.IsEnd(index): node_index = manager.IndexToNode(index) if node_index == 0: node_type = "中心" elif node_index >= data['customer_start_idx']: node_type = f"客户{node_index - data['customer_start_idx'] + 1}" else: node_type = f"车辆{node_index}起始点" # 获取时间信息 time_var = time_dimension.CumulVar(index) arrival_time = solution.Min(time_var) if node_index < data['customer_start_idx']: departure_time = arrival_time else: departure_time = arrival_time + data['customers'][node_index-data['customer_start_idx']].servTime # 格式化为可读时间 arr_time_str = arrival_time #min_to_time(arrival_time).strftime("%H:%M") dep_time_str = departure_time #min_to_time(departure_time).strftime("%H:%M") # 添加到输出 plan_output += ( f"节点 {node_index}({node_type}) | " f"到达: {arrival_time}s ({arr_time_str}) | " f"离开: {departure_time}s ({dep_time_str})\n" ) next_node_index = manager.IndexToNode(solution.Value(routing.NextVar(index))) if node_index >= data['customer_start_idx']: route_load += data['customers'][node_index-data['customer_start_idx']].demand #data["demands"][node_index] plan_output += f" {node_index} ->" previous_index = index index = solution.Value(routing.NextVar(index)) route_distance += routing.GetArcCostForVehicle( previous_index, index, vehicle_id ) # 输出终点(仓库) node_index = manager.IndexToNode(index) time_var = time_dimension.CumulVar(index) arrival_time = solution.Min(time_var) arr_time_str = arrival_time #min_to_time(arrival_time).strftime("%H:%M") plan_output += ( f"节点 {node_index}(中心) | " f"到达: {arrival_time}s ({arr_time_str})\n" ) plan_output += f" {manager.IndexToNode(index)}\n" plan_output += f"行驶距离: {route_distance}\t载重量: {route_load}\n" print(plan_output) total_distance += route_distance print(f"总行驶距离: {total_distance}") else: print("No solution found!") # if __name__ == '__main__': # optimized_vrp_solver(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time) # optimized_vrp_solver()

filetype

def add_partition_constraints(routing, manager, data, time_dimension, fixed_cross_time): """实现动态分区访问控制""" solver = routing.solver() # 1. 创建分区状态维度 def zone_callback(from_index): from_node = manager.IndexToNode(from_index) # if from_node == data['depot']: # return -2 # 分拣中心特殊标记 if from_node < data['customer_start_idx']: return -1 # 车辆起点特殊标记 customer_idx = from_node - data['customer_start_idx'] return data['customers'][customer_idx].zone zone_callback_idx = routing.RegisterUnaryTransitCallback(zone_callback) routing.AddDimension( zone_callback_idx, 0, # slack_max 1000, # 分区容量上限 False, # start_cumul_to_zero 'Zone' ) zone_dim = routing.GetDimensionOrDie('Zone') # 2. 为每辆车添加动态分区约束 for vehicle in data['vehicles']: vehicle_id = vehicle.vid allowed_zones = set(vehicle.allowed_zones) cross_time = fixed_cross_time #vehicle.cross_time # 动态分区检查回调 def zone_check_callback(from_index, to_index): nonlocal allowed_zones, cross_time to_node = manager.IndexToNode(to_index) # 允许返回分拣中心 if to_node == data['depot']: return True # 非客户节点直接允许 if to_node < data['customer_start_idx']: return True # 获取目标分区 zone = zone_callback(to_index) # 获取到达时间 arrival_time_var = time_dimension.CumulVar(to_index) min_arrival = solver.Min(arrival_time_var) # 判断是否达到切换阈值 if min_arrival < cross_time: # 检查当前是否已有访问分区 start_index = routing.Start(vehicle_id) first_node_index = routing.NextVar(start_index) # 获取首个分区变量 first_zone_var = zone_dim.CumulVar(first_node_index) # 添加约束:时间阈值前必须与首个分区一致 solver.Add( solver.IfThen( zone_dim.CumulVar(to_index) != -1, zone_dim.CumulVar(to_index) == first_zone_var ) ) # 添加约束:首个分区必须在允许分区内 solver.Add( solver.IfThen( first_zone_var != -1, solver.Member(first_zone_var, list(allowed_zones)) ) ) else: # 添加约束:时间阈值后必须在允许分区内 solver.Add( solver.Member(zone_dim.CumulVar(to_index), list(allowed_zones)) ) return True # 注册转移回调 zone_check_idx = routing.RegisterTransitCallback(zone_check_callback) # 3. 添加软约束确保路径可行性 penalty = 1000000 # 大惩罚值 routing.AddDisjunction( [manager.NodeToIndex(i) for i in range(manager.GetNumberOfNodes())], penalty ) 报错WARNING: All log messages before absl::InitializeLog() is called are written to STDERR F0000 00:00:1749109138.770297 395312 routing.cc:1859] Check failed: kUnassigned != indices[i] (-1 vs. -1)

filetype
filetype
filetype

我正在编辑【python】代码,遇到了 【ortools 求解CVRP问题,但是求解结果并没有满足容量限制的约束】 ,请帮我检查并改正错误点。我的原始代码如下: 【from ortools.constraint_solver import routing_enums_pb2 from ortools.constraint_solver import pywrapcp import numpy as np import datetime import math # ================== 数据结构定义 ================== class EnhancedVehicle: def __init__(self, vid, capacity, is_own, start_node, partitions): self.vid = vid # 车辆ID self.capacity = capacity # 车辆容量 self.is_own = is_own # 是否自有车 self.start_node = start_node # 出发节点 self.partitions = partitions # 允许访问分区 class EnhancedCustomer: def __init__(self, demand, time_window, partition, is_overdue): self.demand = demand # 需求重量 self.time_window = time_window # 时间窗(分钟) self.partition = partition # 所属分区 self.is_overdue = is_overdue # 是否超时订单 # ================== 数据生成器 ================== def create_enhanced_data_model(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime): """生成增强的测试数据集""" data = {} np.random.seed(42) # 基本参数 data['depot'] = 0 # 分拣中心节点 # current_time = datetime.datetime.now() data['base_time'] = current_time data['cross_time'] = fixed_cross_time # 允许跨区时间(分钟) # 生成200个客户订单(包含20%超时订单) data['customers'] = orders data['lenCustomers'] = len(data['customers']) for i in range(len(data['customers'])): if data['customers'][i].isOutTime: data['customers'][i].startTW = 0 data['customers'][i].endTW = 3600 * 48 # start = np.random.randint(0, 480) # end = start + np.random.randint(60, 240) # is_overdue = np.random.rand() < 0.2 # 20%超时 # data['customers'].append( # EnhancedCustomer( # demand=np.random.randint(1, 5), # time_window=(start, end) if not is_overdue else (0, 1440), # partition=np.random.randint(0, 10), # is_overdue=is_overdue # ) # ) # 生成15辆车(自有车10辆,外请车5辆) data['vehicles'] = vehicles data['lenVehicles'] = len(data['vehicles']) for vid in range(len(data['vehicles'])): data['vehicles'][vid].start_node = vid + 1 # 每辆车有独立出发位置 # data['vehicles'].append( # EnhancedVehicle( # vid=vid, # capacity=np.random.randint(20, 50) if vid < 10 else np.random.randint(15, 40), # is_own=(vid < 10), # start_node=start_node, # partitions=[vid % 10] + ([] if vid < 10 else [(vid % 10 + 1) % 10]) # ) # ) # 构建距离矩阵(包含分拣中心+车辆出发位置+客户位置) # node_count = 1 + len(data['vehicles']) + len(data['customers']) # 分拣中心(0) + 车辆起点(1-15) + 客户(16-215) data['distance_matrix'] = distance_matrix #np.random.randint(5, 100, (node_count, node_count)) data['time_matrix'] = time_matrix # np.fill_diagonal(data['distance_matrix'], 0) data['convertDistanceTime'] = convertDistanceTime data['customer_start_idx'] = 1 + len(data['vehicles']) #更改字段名称 for i in range(len(data['vehicles'])): data['vehicles'][i].vid = i for item in data['vehicles']: item.capacity = item.residualVolumeMax # all_allowed_zones = [] # for item in data['vehicles']: # all_allowed_zones += item.areaIds # all_allowed_zones = list(set(all_allowed_zones)) for item in data['vehicles']: item.allowed_zones = item.areaIds # if item.areaIds == []: # item.allowed_zones = all_allowed_zones # else: # item.allowed_zones = item.areaIds for item in data['customers']: item.zone = item.areaId return data # ================== 核心约束实现 ================== def add_partition_constraints(routing, manager, data, time_dimension, fixed_cross_time): """实现动态分区访问控制""" for vehicle in data['vehicles']: vehicle_id = vehicle.vid allowed_zones = vehicle.allowed_zones cross_time = fixed_cross_time #vehicle.cross_time def zone_check(from_index, to_index): from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) # 允许返回分拣中心 if to_node == data['depot']: return True # 客户节点处理 if to_node >= data['customer_start_idx']: customer = data['customers'][to_node - data['customer_start_idx']] to_zone = customer.zone else: return True # 车辆起始点 # 获取当前时间 time_var = time_dimension.CumulVar(to_index) current_time = routing.solver().Min(time_var) # 判断时间阶段 if current_time < cross_time: # 检查路径中已访问的分区 path_zones = set() index = routing.Start(vehicle_id) while not routing.IsEnd(index): node = manager.IndexToNode(index) if node >= data['customer_start_idx']: zone = data['customers'][node - data['customer_start_idx']].zone path_zones.add(zone) index = routing.NextVar(index) #solution.Value(routing.NextVar(index)) # 允许访问的情况 return (len(path_zones) == 0 and to_zone in allowed_zones) or \ (len(path_zones) > 0 and to_zone in path_zones) else: return to_zone in allowed_zones # 添加约束到车辆 routing.solver().AddConstraint( routing.solver().CheckAssignment( routing.NextVar(routing.Start(vehicle_id)), zone_check ) ) # ================== 主求解逻辑 ================== def optimized_vrp_solver(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime): data = create_enhanced_data_model(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time, convertDistanceTime) a = len(data['distance_matrix']) data['distance_matrix'] = [[0,1,2,3,4,5,6,7,8,9,10], [1,0,2,3,4,5,6,7,8,9,10], [2,1,0,3,4,5,6,7,8,9,10], [3,1,2,0,4,5,6,7,8,9,10], [4,1,2,3,0,5,6,7,8,9,10], [5,1,2,3,4,0,6,7,8,9,10], [6,1,2,3,4,5,0,7,8,9,10], [7,1,2,3,4,5,6,0,8,9,10], [8,1,2,3,4,5,6,7,0,9,10], [9,1,2,3,4,5,6,7,8,0,10], [10,1,2,3,4,5,6,7,8,9,0],] manager = pywrapcp.RoutingIndexManager( len(data['distance_matrix']), len(data['vehicles']), [v.start_node for v in data['vehicles']], # 各车起始位置 [data['depot']] * len(data['vehicles']) ) # 统一返回分拣中心 routing = pywrapcp.RoutingModel(manager) # ========== 注册核心回调函数 ========== def distance_callback(from_index, to_index): return data['distance_matrix'][manager.IndexToNode(from_index)][manager.IndexToNode(to_index)] transit_callback_idx = routing.RegisterTransitCallback(distance_callback) routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_idx) # # ========== 时间维度约束 ========== # service_time = 10 # 每个客户服务时间10分钟 # # def time_callback(from_index, to_index): # travel_time = distance_callback(from_index, to_index) * data['convertDistanceTime'] * 60 # 假设速度1单位/分钟 # return travel_time + (data['customers'].service_time if manager.IndexToNode(to_index) >= data['customer_start_idx'] else 0) # # time_callback_idx = routing.RegisterTransitCallback(time_callback) # routing.AddDimension( # time_callback_idx, # slack_max=0, # 不允许等待 # capacity=24 * 3600, # 最大时间限制(24小时)24 * 3600, # 车辆最大工作时间 # fix_start_cumul_to_zero=False,# 起始时间为当前时间 # name='Time' # ) # time_dimension = routing.GetDimensionOrDie('Time') # # # 设置车辆起始时间(当前系统时间) # for vehicle in data['vehicles']: # vehicle_id = vehicle.vid # start_index = routing.Start(vehicle_id) # time_dimension.CumulVar(start_index).SetMin(current_time) # time_dimension.CumulVar(start_index).SetMax(current_time) # # # 设置客户时间窗 # for customer_idx in range(len(data['customers'])): # index = manager.NodeToIndex(customer_idx + data['customer_start_idx']) # 客户节点从16开始 # print(index) # if not data['customers'][customer_idx].isOutTime: # time_dimension.CumulVar(index).SetRange( # data['customers'][customer_idx].startTW, # data['customers'][customer_idx].endTW # ) # ========== 容量约束 ========== def demand_callback(from_index): node = manager.IndexToNode(from_index) if node < data['customer_start_idx']: return 0 # 非客户节点无需求 return data['customers'][node - data['customer_start_idx']].demand demand_callback_idx = routing.RegisterUnaryTransitCallback(demand_callback) routing.AddDimensionWithVehicleCapacity( demand_callback_idx, 0, # null capacity slack [v.capacity for v in data['vehicles']], True, 'Capacity' ) # ========== 高级约束 ========== # 1. 分区动态约束 # add_partition_constraints(routing, manager, data, time_dimension, fixed_cross_time) # # 2. 自有车优先(设置不同固定成本) # for vid in range(len(data['vehicles'])): # routing.SetFixedCostOfVehicle(100 if data['vehicles'][vid].isHaveTask == 0 else 10000, vid) # # # 3. 超时订单优先(设置不同惩罚值) # penalty = 1000000 # 未服务的惩罚 # for customer_idx in range(len(data['customers'])): # penalty = 1000000 if data['customers'][customer_idx].isOutTime else 1000 # routing.AddDisjunction( # [manager.NodeToIndex(customer_idx + data['customer_start_idx'])], # penalty # ) # ========== 求解参数配置 ========== search_params = pywrapcp.DefaultRoutingSearchParameters() search_params.first_solution_strategy = ( routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC ) search_params.local_search_metaheuristic = ( routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH ) search_params.time_limit.seconds = 60 # 60秒求解时间 search_params.log_search = True # ========== 执行求解 ========== solution = routing.SolveWithParameters(search_params) # ========== 结果解析 ========== if solution: print(f'Objective: {solution.ObjectiveValue()}') total_distance = 0 total_demand = 0 paths = [] for vid in range(len(data['vehicles'])): index = routing.Start(vid) plan_output = '车辆 {} 的路径:\n'.format(data['vehicles'][vid].truckCode) path: list = [] route_distance = 0 route_load = 0 while not routing.IsEnd(index): node = manager.IndexToNode(index) path.append(node) if node >= data['customer_start_idx']: # 客户节点 customer = data['customers'][node - data['customer_start_idx']] # total_demand += customer.demand route_load += customer.demand plan_output += ' {0} 装载({1}) -> '.format(node, route_load) next_index = solution.Value(routing.NextVar(index)) next_node = manager.IndexToNode(next_index) route_distance += data['distance_matrix'][node][next_node] # route_distance += routing.GetArcCostForVehicle(index, next_index, vid) index = next_index plan_output += ' {0} 装载({1})\n'.format(manager.IndexToNode(index), route_load) path.append(manager.IndexToNode(index)) plan_output += '路径总长度: {}m\n'.format(route_distance) plan_output += '路径总负载: {}\n'.format(route_load) print(plan_output) paths.append(path) total_distance += route_distance total_demand += route_load # print(f'Vehicle {vid} ({data["vehicles"][vid].isHaveTask}) ' # f'Delivered: {route_load}/{data["vehicles"][vid].capacity}') print('所有路径总长度: {}m'.format(total_distance)) print('所有路径总负载: {}'.format(total_demand)) # print(f'\nTotal Delivered: {total_demand} (max possible: {sum(c.demand for c in data["customers"])})') # print(f'Total Distance: {total_distance} units') else: print("No solution found!") # if __name__ == '__main__': # optimized_vrp_solver(orders, vehicles, distance_matrix, time_matrix, fixed_cross_time, current_time) 】

许吴倩
  • 粉丝: 35
上传资源 快速赚钱