From 10333df56892a1a660ff090aa449ef0b6f069ed1 Mon Sep 17 00:00:00 2001 From: abnerhexu <20591243+abnerhexu@users.noreply.github.com> Date: Sun, 25 Jan 2026 16:23:01 +0800 Subject: [PATCH] automatic move towards --- paper.md | 294 ++++++++++- template/agent_prompt.py | 2 +- uav_agent.py | 2 +- uav_langchain_tools.py | 1041 ++++++++++---------------------------- 4 files changed, 550 insertions(+), 789 deletions(-) diff --git a/paper.md b/paper.md index 3230af6..5c52953 100644 --- a/paper.md +++ b/paper.md @@ -1,260 +1,552 @@ +补全auto_navigate_move_towards函数,函数的输入包含drone_id,方向和在该方向上的预期移动距离。在遇到障碍物时,需要越过障碍物,并继续移动。最终到达的位置和初始位置相比,在指定的方向上移动的距离应当不小于预期移动距离。 + +@tool +def auto_navigate_move_towards(input_json: str) -> str: + try: + params = json.loads(input_json) if isinstance(input_json, str) else input_json + pass + +现在先补全函数,可以调用`get_obstacles`函数获取障碍物信息。该函数返回一个列表。obstacles的种类有4类,是point、circle、polygon、ellipse。这个列表形如: + [ + { + "id": "bb07b327", + "name": "Point Obstacle 1", + "type": "point", + "position": { + "x": 609.0, + "y": 459.0, + "z": 0.0 + }, + "description": "", + "radius": 30.0, + "vertices": [], + "width": null, + "length": null, + "height": 0.0, + "area": 2827.4333882308138, + "created_at": 1766327750.018749, + "last_updated": 1766327750.018749 + }, + { + "id": "fd0760b8", + "name": "Point Obstacle 2", + "type": "point", + "position": { + "x": 896.0, + "y": 241.0, + "z": 0.0 + }, + "description": "", + "radius": 6.0, + "vertices": [], + "width": null, + "length": null, + "height": 0.0, + "area": 113.09733552923255, + "created_at": 1766327750.0221481, + "last_updated": 1766327750.0221481 + }, + { + "id": "1d817ce2", + "name": "Point Obstacle 3", + "type": "point", + "position": { + "x": 809.0, + "y": 14.0, + "z": 0.0 + }, + "description": "", + "radius": 8.0, + "vertices": [], + "width": null, + "length": null, + "height": 0.0, + "area": 201.06192982974676, + "created_at": 1766327750.025595, + "last_updated": 1766327750.025595 + }, + { + "id": "2b38acd5", + "name": "Circle Obstacle 1", + "type": "circle", + "position": { + "x": 438.0, + "y": 465.0, + "z": 0.0 + }, + "description": "", + "radius": 40.0, + "vertices": [], + "width": null, + "length": null, + "height": 0.0, + "area": 5026.548245743669, + "created_at": 1766327750.019903, + "last_updated": 1766327750.019903 + }, + { + "id": "fa67aa30", + "name": "Circle Obstacle 2", + "type": "circle", + "position": { + "x": 431.0, + "y": 605.0, + "z": 0.0 + }, + "description": "", + "radius": 55.0, + "vertices": [], + "width": null, + "length": null, + "height": 0.0, + "area": 9503.317777109125, + "created_at": 1766327750.0267, + "last_updated": 1766327750.0267 + }, + { + "id": "845b6b36", + "name": "Polygon Obstacle 1", + "type": "polygon", + "position": { + "x": 686.0, + "y": 669.0, + "z": 0.0 + }, + "description": "", + "radius": null, + "vertices": [ + { + "x": 611.0, + "y": 594.0 + }, + { + "x": 761.0, + "y": 594.0 + }, + { + "x": 761.0, + "y": 744.0 + }, + { + "x": 611.0, + "y": 744.0 + } + ], + "width": null, + "length": null, + "height": 0.0, + "area": 22500.0, + "created_at": 1766327750.021056, + "last_updated": 1766327750.021056 + }, + { + "id": "8603630e", + "name": "Polygon Obstacle 2", + "type": "polygon", + "position": { + "x": 312.0, + "y": 232.0, + "z": 0.0 + }, + "description": "", + "radius": null, + "vertices": [ + { + "x": 247.0, + "y": 167.0 + }, + { + "x": 377.0, + "y": 167.0 + }, + { + "x": 377.0, + "y": 297.0 + }, + { + "x": 247.0, + "y": 297.0 + } + ], + "width": null, + "length": null, + "height": 0.0, + "area": 16900.0, + "created_at": 1766327750.023377, + "last_updated": 1766327750.023377 + }, + { + "id": "b32c509b", + "name": "Polygon Obstacle 3", + "type": "polygon", + "position": { + "x": 691.0, + "y": 344.0, + "z": 0.0 + }, + "description": "", + "radius": null, + "vertices": [ + { + "x": 625.0, + "y": 278.0 + }, + { + "x": 757.0, + "y": 278.0 + }, + { + "x": 757.0, + "y": 410.0 + }, + { + "x": 625.0, + "y": 410.0 + } + ], + "width": null, + "length": null, + "height": 0.0, + "area": 17424.0, + "created_at": 1766327750.027871, + "last_updated": 1766327750.027871 + }, + { + "id": "b6a5ec7c", + "name": "Polygon Obstacle 4", + "type": "polygon", + "position": { + "x": 237.0, + "y": 555.0, + "z": 0.0 + }, + "description": "", + "radius": null, + "vertices": [ + { + "x": 181.0, + "y": 499.0 + }, + { + "x": 293.0, + "y": 499.0 + }, + { + "x": 293.0, + "y": 611.0 + }, + { + "x": 181.0, + "y": 611.0 + } + ], + "width": null, + "length": null, + "height": 0.0, + "area": 12544.0, + "created_at": 1766327750.0289862, + "last_updated": 1766327750.0289862 + }, + { + "id": "9bf135c3", + "name": "Ellipse Obstacle 1", + "type": "ellipse", + "position": { + "x": 475.0, + "y": 156.0, + "z": 0.0 + }, + "description": "", + "radius": null, + "vertices": [], + "width": 46.0, + "length": 34.0, + "height": 0.0, + "area": 4913.450910214437, + "created_at": 1766327750.0245, + "last_updated": 1766327750.0245 + } -] \ No newline at end of file + +] + +对于高度不为0的obstacles,如果无人机的最大可达高度大于obstacle的高度,就能够飞跃。对于不能飞跃的和高度为0的obstacles,需要绕路。 + +该函数只需为无人机确定一个要到达的目标坐标点。该坐标点需要与障碍物保持一定的距离。确定好目标坐标点后,调用auto_navigate_to函数 + +``` +def auto_navigate_to(input_json: str) -> str: + """ + Plan an obstacle-avoiding path using analytic geometry for precise collision detection. + + Input should be a JSON string with: + - drone_id: The ID of the drone (required) + - x: Target X coordinate in meters (required) + - y: Target Y coordinate in meters (required) + - z: Target Z coordinate (altitude) in meters (required) + + Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}} + """ +``` + +即可将无人机从当前位置导航到目标位置,并避开所有障碍物。 + +现在请补全auto_navigate_move_towards函数。 \ No newline at end of file diff --git a/template/agent_prompt.py b/template/agent_prompt.py index e996d61..41ed2c4 100644 --- a/template/agent_prompt.py +++ b/template/agent_prompt.py @@ -87,7 +87,7 @@ Tips for you to finish task in the most efficient way: 6. Reaching to a higher latitude can help you see targets, but do not exceed the drone's limit. 7. Cannot move from current status: DroneStatus.IDLE means you need to take off first then move. 8. If the start point is different with the current position of the drone, first go to the start point, then continue the left paths. - +9. Moving X meters in a given direction means reaching a location that is at a distance of X meters from the current point along that direction. The distance must not be reduced due to detouring, nor should the detour proceed in the opposite direction. Obstacle avoidance do not mean you can reduce the distance. Only increase the distance is allowed. Begin! Question: {input} diff --git a/uav_agent.py b/uav_agent.py index f5897d1..cc3854b 100644 --- a/uav_agent.py +++ b/uav_agent.py @@ -284,7 +284,7 @@ class UAVControlAgent: # Create LLM instance if llm_provider == "anthropic-compatible": kwargs = { - "model_name": llm_model, + "model": llm_model, "temperature": temperature, "api_key": llm_api_key, "base_url": final_base_url diff --git a/uav_langchain_tools.py b/uav_langchain_tools.py index a2a3592..cb2433c 100644 --- a/uav_langchain_tools.py +++ b/uav_langchain_tools.py @@ -5,8 +5,142 @@ All tools accept JSON string input for consistent parameter handling """ from langchain.tools import tool from uav_api_client import UAVAPIClient +import math +import heapq import json +# --- 内部几何算法类 --- +class GeometryUtils: + @staticmethod + def point_to_segment_dist_sq(p, a, b): + """计算点 p 到线段 ab 的最短距离的平方""" + px, py = p[0], p[1] + ax, ay = a[0], a[1] + bx, by = b[0], b[1] + l2 = (ax - bx)**2 + (ay - by)**2 + if l2 == 0: return (px - ax)**2 + (py - ay)**2 + t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2 + t = max(0, min(1, t)) + proj_x = ax + t * (bx - ax) + proj_y = ay + t * (by - ay) + return (px - proj_x)**2 + (py - proj_y)**2 + @staticmethod + def segments_intersect(a1, a2, b1, b2): + """判断线段 a1-a2 与 b1-b2 是否相交""" + def ccw(A, B, C): + return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) + return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2) + @staticmethod + def is_point_in_polygon(p, vertices): + x, y = p[0], p[1] + inside = False + j = len(vertices) - 1 + for i in range(len(vertices)): + xi, yi = vertices[i]['x'], vertices[i]['y'] + xj, yj = vertices[j]['x'], vertices[j]['y'] + intersect = ((yi > y) != (yj > y)) and \ + (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) + if intersect: + inside = not inside + j = i + return inside + @staticmethod + def check_collision(p1, p2, obs, safety_buffer=2.0): + """ + 检测线段 p1-p2 是否与障碍物 obs 碰撞。 + 返回: (Boolean 是否碰撞, Float 障碍物高度) + """ + otype = obs['type'] + opos = obs['position'] + ox, oy = opos['x'], opos['y'] + obs_height = obs.get('height', 0) + # 1. 圆形/点 + if otype in ['circle', 'point']: + r = obs.get('radius', 0) + limit = r + safety_buffer + dist_sq = GeometryUtils.point_to_segment_dist_sq((ox, oy), p1, p2) + if dist_sq < limit**2: + return True, obs_height + # 2. 椭圆 (坐标变换法) + elif otype == 'ellipse': + w = obs.get('width', 0) + l = obs.get('length', 0) + semi_axis_x = w + safety_buffer + semi_axis_y = l + safety_buffer + + def to_unit_space(point): + dx = point[0] - ox + dy = point[1] - oy + return (dx / semi_axis_x, dy / semi_axis_y) + u_p1 = to_unit_space(p1) + u_p2 = to_unit_space(p2) + dist_sq_in_unit_space = GeometryUtils.point_to_segment_dist_sq((0,0), u_p1, u_p2) + + if dist_sq_in_unit_space < 1.0: + return True, obs_height + # 3. 多边形 + elif otype == 'polygon': + verts = obs['vertices'] + if not verts: return False, 0 + for i in range(len(verts)): + v1 = (verts[i]['x'], verts[i]['y']) + v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) + if GeometryUtils.segments_intersect(p1, p2, v1, v2): + return True, obs_height + if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts): + return True, obs_height + + return False, 0 + + # --- 新增方法: 仅检测单点是否在障碍物内 --- + @staticmethod + def check_point_overlap(point, obs, safety_buffer=2.0): + """ + 检测单个坐标点 point(x,y) 是否落在障碍物范围内 + """ + px, py = point + otype = obs['type'] + opos = obs['position'] + ox, oy = opos['x'], opos['y'] + obs_height = obs.get('height', 0.0) + + # 1. 圆形/点 (原有逻辑正确) + if otype in ['point', 'circle']: + r = obs.get('radius', 0.0) + if otype == 'point' and r is None: r = 0.5 + limit = r + safety_buffer + if (px - ox)**2 + (py - oy)**2 <= limit**2: + return True, obs_height + + # 2. 椭圆 (原有逻辑正确) + elif otype == 'ellipse': + semi_x = (obs.get('width', 0) / 2.0) + safety_buffer + semi_y = (obs.get('length', 0) / 2.0) + safety_buffer + if semi_x > 0 and semi_y > 0: + if ((px - ox)**2 / semi_x**2) + ((py - oy)**2 / semi_y**2) <= 1.0: + return True, obs_height + + # 3. 多边形 (修正版:增加 buffer 判定) + elif otype == 'polygon': + verts = obs.get('vertices', []) + + # A. 严格内部检测 + if GeometryUtils.is_point_in_polygon((px, py), verts): + return True, obs_height + + # B. [新增] 边缘缓冲检测 + # 即使点在几何外部,如果距离任意一条边的距离小于 buffer,视为重叠 + buffer_sq = safety_buffer**2 + for i in range(len(verts)): + v1 = (verts[i]['x'], verts[i]['y']) + v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) + # 计算点到边线段的距离平方 + dist_sq = GeometryUtils.point_to_segment_dist_sq((px, py), v1, v2) + if dist_sq < buffer_sq: + return True, obs_height + + return False, 0.0 + class ToolStates: def __init__(self): self.explored_count = 0 @@ -507,38 +641,111 @@ def create_uav_tools(client: UAVAPIClient) -> list: return f"Error moving towards: {str(e)}" @tool - def auto_navigate_move_towards(input_json: str) -> str: - # TODO 能不能把auto navigate和move towards结合起来 - """Navigate the drone to move a drone a specific distance in a direction. - + def auto_navigate_towards(input_json: str) -> str: + """Automatically find a route that bypasses obstacles and allows flying the specified distance in the given direction. + Input should be a JSON string with: - - drone_id: The ID of the drone (required, get from Action list_drones) + - drone_id: The ID of the drone (required) - distance: Distance to move in meters (required) - - heading: Heading direction in degrees 0-360 (optional, default: current heading) - - dz: Vertical component in meters (optional) - - Example: {{"drone_id": "drone-001", "distance": 10.0, "heading": 90.0}} + - direction: Direction in degrees 0, 90, 180, 270 (required) """ try: params = json.loads(input_json) if isinstance(input_json, str) else input_json drone_id = params.get('drone_id') - distance = params.get('distance') - heading = params.get('heading') - dz = params.get('dz') + direction = params.get('direction') # 0, 90, 180, 270 + min_distance = float(params.get('distance', 0.0)) - if not drone_id: - return "Error: drone_id is required" - if distance is None: - return "Error: distance is required" + if not drone_id or direction is None: + return "Error: drone_id and direction are required." - result = client.move_towards(drone_id, distance, heading, dz) - if result["status"] == "error": - result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacle’s height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) Try other tools like `move_to` or `auto_navigate_to`" - return json.dumps(result, indent=2) - except json.JSONDecodeError as e: - return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"distance\": 10.0}}" + # 获取状态 + drone_state = client.get_drone_status(drone_id) + obstacles = client.get_obstacles() + start_x = drone_state['position']['x'] + start_y = drone_state['position']['y'] + start_z = drone_state['position']['z'] + max_h = drone_state.get('max_altitude', 100.0) + + direction_map = { + 0: (0.0, 1.0), # North + 180: (0.0, -1.0), # South + 90: (1.0, 0.0), # East + 270: (-1.0, 0.0) # West + } + if direction not in direction_map: + return json.dumps({"error": f"Invalid direction: {direction}. Must be 0, 90, 180, or 270."}) + + dir_x, dir_y = direction_map[direction] + + # --- 核心修复 1: 更好的步长控制 --- + current_distance = min_distance + step_size = 5.0 # 减小步长以获得更精细的落点 + max_iterations = 500 + safety_margin = 2.5 # 稍微加大缓冲,确保落点绝对安全 + height_buffer = 5.0 + + # --- 核心修复 2: 移除错误的 (0,0) 初始化 --- + found_valid_target = False + final_target_x = 0.0 + final_target_y = 0.0 + final_target_z = start_z + + for _ in range(max_iterations): + # 计算当前尝试的坐标 + test_x = start_x + dir_x * current_distance + test_y = start_y + dir_y * current_distance + + is_location_safe = True + required_z = start_z + + # 检查该点相对于所有障碍物的情况 + for obs in obstacles: + # 使用修复后的 check_point_overlap (必须包含 buffer 检查!) + is_inside, obs_h = GeometryUtils.check_point_overlap( + (test_x, test_y), obs, safety_buffer=safety_margin + ) + + if is_inside: + # 冲突:检查是否可以飞越 + # 如果障碍物过高 OR 是禁飞区(height=0) -> 该点不可用 + if obs_h <= 0 or obs_h + height_buffer >= max_h: + is_location_safe = False + break + else: + # 可以飞越 -> 推高 Z 轴 + required_z = max(required_z, obs_h + height_buffer) + + if is_location_safe: + # 找到合法点 + final_target_x = test_x + final_target_y = test_y + final_target_z = required_z + found_valid_target = True + break + else: + # 当前距离的点不安全,继续向远处延伸 + current_distance += step_size + + if not found_valid_target: + return json.dumps({ + "status": "error", + "message": f"Could not find a safe target point in direction {direction} even after extending distance to {current_distance}m. Path is completely blocked." + }) + + # 生成导航指令 + nav_payload = json.dumps({ + "drone_id": drone_id, + "x": final_target_x, + "y": final_target_y, + "z": final_target_z + }) + + return f"Finding a destination successfully (dist={current_distance}m)... Now use `auto_navigate_to({nav_payload})` to move the drone." + except Exception as e: - return f"Error moving towards: {str(e)}" + return f"Error executing auto_navigate_towards: {str(e)}" + + # @tool # def move_along_path(input_json: str) -> str: @@ -620,647 +827,13 @@ def create_uav_tools(client: UAVAPIClient) -> list: result = client.move_to(drone_id, x, y, z) if result["status"] == "error": - result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacle’s height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour)." + result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacle’s height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) You can use `get_obstacles` tool to see the obstacle, and pay attention to the obstacle\'s shape and size. (4) You may try `auto_navigate_to` tool to detour if it works." return json.dumps(result, indent=2) except json.JSONDecodeError as e: return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}" except Exception as e: return f"Error moving drone: {str(e)}" - @tool - def auto_navigate_to2(input_json: str) -> str: - """ - Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly: - 1. Detour around obstacles that cannot be overflown—such as those exceeding the drone’s maximum operational altitude or located within no-fly zones. - 2. For obstacles that can be safely overflown, automatically compute a safe flight altitude that exceeds the obstacle’s height while remaining within the drone’s operational ceiling, and incorporate this into the vertical profile of the trajectory. - - Input should be a JSON string with: - - drone_id: The ID of the drone (required) - - x: Target X coordinate in meters (required) - - y: Target Y coordinate in meters (required) - - z: Target Z coordinate (altitude) in meters (required) - - Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}} - """ - import math - import heapq - - # --- 内部几何算法类 (封装以保持主逻辑清晰) --- - class GeometryUtils: - @staticmethod - def dist_sq(p1, p2): - return (p1[0]-p2[0])**2 + (p1[1]-p2[1])**2 - - @staticmethod - def point_to_segment_dist(p, a, b): - """计算点 p 到线段 ab 的最短距离""" - px, py = p[0], p[1] - ax, ay = a[0], a[1] - bx, by = b[0], b[1] - - l2 = (ax - bx)**2 + (ay - by)**2 - if l2 == 0: return math.hypot(px - ax, py - ay) - - t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2 - t = max(0, min(1, t)) - - proj_x = ax + t * (bx - ax) - proj_y = ay + t * (by - ay) - return math.hypot(px - proj_x, py - proj_y) - - @staticmethod - def segments_intersect(a1, a2, b1, b2): - """判断线段 a1-a2 与 b1-b2 是否相交""" - def ccw(A, B, C): - return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) - return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2) - - @staticmethod - def is_point_in_polygon(p, vertices): - """射线法判断点是否在多边形内""" - x, y = p[0], p[1] - inside = False - j = len(vertices) - 1 - for i in range(len(vertices)): - xi, yi = vertices[i]['x'], vertices[i]['y'] - xj, yj = vertices[j]['x'], vertices[j]['y'] - - intersect = ((yi > y) != (yj > y)) and \ - (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) - if intersect: - inside = not inside - j = i - return inside - - @staticmethod - def check_collision(p1, p2, obs, safety_buffer=2.0): - """ - 检测线段 p1-p2 是否与障碍物 obs 碰撞。 - 返回: (Boolean 是否碰撞, Float 障碍物高度) - """ - otype = obs['type'] - opos = obs['position'] - ox, oy = opos['x'], opos['y'] - obs_height = obs.get('height', 0) - - # 1. 圆形/椭圆/点 (简化为圆) - if otype in ['circle', 'point', 'ellipse']: - if otype == 'ellipse': - r = max(obs.get('width', 0), obs.get('length', 0)) / 2.0 - else: - r = obs.get('radius', 0) - - limit = r + safety_buffer - dist = GeometryUtils.point_to_segment_dist((ox, oy), p1, p2) - if dist < limit: - return True, obs_height - - # 2. 多边形 - elif otype == 'polygon': - verts = obs['vertices'] - if not verts: return False, 0 - - # A. 边对边相交检测 - for i in range(len(verts)): - v1 = (verts[i]['x'], verts[i]['y']) - v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) - if GeometryUtils.segments_intersect(p1, p2, v1, v2): - return True, obs_height - - # B. 包含检测 (起点或终点在多边形内) - # 注意:这里不加 safety_buffer 进行包含检测,因为如果点在内部必撞 - # 若要处理 buffer,需要做多边形 offset,这里略过复杂操作,依赖边相交检测 - if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts): - return True, obs_height - - return False, 0 - - # --------------------------------------------------------- - - try: - # 1. 解析参数 - params = json.loads(input_json) if isinstance(input_json, str) else input_json - drone_id = params.get('drone_id') - tx, ty, tz = params.get('x'), params.get('y'), params.get('z') - - if not drone_id or tx is None or ty is None or tz is None: - return "Error: drone_id, x, y, z are required" - - direct_move_result = client.move_to(drone_id, tx, ty, tz) - if direct_move_result["status"] != "error": - return json.dumps(direct_move_result, indent=2) - # 2. 获取环境信息 - status = client.get_drone_status(drone_id) - start_pos = status['position'] - sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z'] - drone_max_alt = status.get('max_altitude', 100.0) - - all_obstacles = client.get_obstacles() - - # 3. 障碍物分类 - # mandatory_avoid: 必须绕路 (高度 >= 无人机极限 或 高度为0的禁飞区) - # fly_over_candidates: 可能可以飞跃 (高度 < 无人机极限) - mandatory_avoid = [] - fly_over_candidates = [] - - for obs in all_obstacles: - h = obs.get('height', 0) - if h == 0 or h >= drone_max_alt: - mandatory_avoid.append(obs) - else: - fly_over_candidates.append(obs) - - target_point = (tx, ty) - start_point = (sx, sy) - - # 4. 2D 路径规划 (仅避开 mandatory_avoid) - - # 4.1 生成节点 - # 包括起点、终点 - nodes = [start_point, target_point] - - # 为必须绕行的障碍物生成关键点 (外扩 5m) - safety_margin = 5.0 - for obs in mandatory_avoid: - opos = obs['position'] - ox, oy = opos['x'], opos['y'] - - if obs['type'] == 'polygon': - # 简单策略:取多边形顶点并向外延伸 - # 更稳健的方法:不做复杂几何外扩,直接取顶点,但在碰撞检测时留余量 - # 这里为了路径稀疏性,取顶点 + 向量外扩 - center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices']) - center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices']) - for v in obs['vertices']: - vx, vy = v['x'], v['y'] - vec_len = math.hypot(vx - center_x, vy - center_y) - if vec_len > 0: - scale = (vec_len + safety_margin) / vec_len - nx = center_x + (vx - center_x) * scale - ny = center_y + (vy - center_y) * scale - nodes.append((nx, ny)) - else: - # 圆/椭圆/点 - r = obs.get('radius', 0) - if obs['type'] == 'ellipse': - r = max(obs.get('width', 0), obs.get('length', 0)) / 2.0 - r += safety_margin - nodes.append((ox + r, oy)) - nodes.append((ox - r, oy)) - nodes.append((ox, oy + r)) - nodes.append((ox, oy - r)) - - # 4.2 过滤非法节点 (落入其他障碍物内的节点) - valid_nodes = [] - for node in nodes: - is_bad = False - for obs in mandatory_avoid: - # 这里借用 check_collision 检查点是否在障碍物内 (把线段设为点到点) - # 使用稍小的 buffer 确保节点不紧贴障碍物 - collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=2.0) - if collided: - is_bad = True - break - if not is_bad: - valid_nodes.append(node) - - # 确保起点和终点在 valid_nodes 中 (如果起点就在禁飞区,这里会抛出无解,符合逻辑) - # 为防止浮点误差导致起点被过滤,强制加回起点终点(如果它们真的在障碍物内,后面边的检测会挡住) - if start_point not in valid_nodes: valid_nodes.insert(0, start_point) - if target_point not in valid_nodes: valid_nodes.append(target_point) - - # 重新映射索引 - start_idx = valid_nodes.index(start_point) - target_idx = valid_nodes.index(target_point) - - # 4.3 构建图 (Dijkstra) - # 只检测 mandatory_avoid - adj = {i: [] for i in range(len(valid_nodes))} - for i in range(len(valid_nodes)): - for j in range(i + 1, len(valid_nodes)): - u, v = valid_nodes[i], valid_nodes[j] - - # 检测边 u-v 是否碰撞 mandatory_avoid - path_blocked = False - for obs in mandatory_avoid: - hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0) - if hit: - path_blocked = True - break - - if not path_blocked: - dist = math.hypot(u[0]-v[0], u[1]-v[1]) - adj[i].append((j, dist)) - adj[j].append((i, dist)) - - # 4.4 搜索最短 2D 路径 - pq = [(0.0, start_idx, [valid_nodes[start_idx]])] - visited = set() - path_2d = [] - - while pq: - cost, u, path = heapq.heappop(pq) - if u == target_idx: - path_2d = path - break - if u in visited: continue - visited.add(u) - - for v_idx, w in adj[u]: - if v_idx not in visited: - heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]])) - - if not path_2d: - return json.dumps({"status": "Failed: No 2D path found. Target might be in restricted area."}) - - # 5. 路径高度计算 (Path Refinement) - # 我们现在有了一条避开了“不可飞越障碍物”的 2D 路径。 - # 现在需要沿着这条路径,检查它是否穿过了“可飞越障碍物”,并计算所需高度。 - - # 基础安全高度:起点高度、目标高度、以及基本的最低飞行高度 - safe_base_alt = max(sz, tz) - - # 我们需要分段处理,因为路径不同路段可能经过不同高度的障碍物 - # 简单策略:计算整条路径所需的全局最大安全高度 (Global Safe Altitude for this path) - # 优化策略:虽然可以做分段高度,但在实际无人机操作中,保持平稳的一个巡航高度通常更安全且节能 - - path_max_obs_height = 0.0 - - for i in range(len(path_2d) - 1): - p1 = path_2d[i] - p2 = path_2d[i+1] - - # 检查此路段与所有 fly_over_candidates 的碰撞情况 - for obs in fly_over_candidates: - hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0) - if hit: - # 如果撞到了这个可飞越障碍物,我们需要比它高 - path_max_obs_height = max(path_max_obs_height, obs_h) - - # 最终决定的飞行高度 (加 2米 安全余量) - if path_max_obs_height > 0: - cruise_alt = path_max_obs_height + 2.0 - else: - cruise_alt = safe_base_alt - - # 确保不低于起点和终点,且不超过最大限高 - cruise_alt = max(cruise_alt, sz, tz) - - if cruise_alt > drone_max_alt: - return json.dumps({"status": "Failed: Required altitude exceeds drone capability. Please try finding a path step by step dynamically. Do not repeatedly call optimal_way_to."}) - - # 6. 构建最终 3D 航点 - # 逻辑: - # 1. 如果需要爬升,先在当前位置 (sx, sy) 爬升到 cruise_alt - # 2. 平飞经过所有 2D 路径点 (高度 = cruise_alt) - # 3. 最后如果在目标点上方,下降/调整到 tz - - waypoints = [] - - # 起点 - current_waypoint = (sx, sy, sz) - waypoints.append(current_waypoint) - - # 爬升阶段 (如果巡航高度高于当前高度) - if cruise_alt > sz + 0.5: # 0.5 作为浮点容差 - waypoints.append((sx, sy, cruise_alt)) - - # 巡航阶段 (中间节点) - # 跳过 path_2d[0] 因为它是起点 - for i in range(len(path_2d)): - node = path_2d[i] - waypoints.append((node[0], node[1], cruise_alt)) - - # 接近终点阶段 - # 先平飞到终点上方 - waypoints.append((tx, ty, cruise_alt)) - - # 如果巡航高度与目标高度不同,最后调整高度 - if abs(cruise_alt - tz) > 0.5: - waypoints.append((tx, ty, tz)) - - # 7. 执行飞行 - for wp in waypoints: - # 跳过与当前位置极近的点 (避免重复指令) - if wp == waypoints[0] and len(waypoints) > 1: - pass # 仅作为记录,不发送指令,或者如果这是唯一的点则发送 - # 发送指令 - # 注意:实际 API 调用中,如果点就是当前位置,可能需要跳过(不管了懒得跳了,多调用一次client.move_to()又不花钱) - waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2]) - if waypoint_move_result["status"] == "error": - print(f"Error moving to waypoint {wp}: {waypoint_move_result['message']}") - return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}\nPlease try finding a path step by stepdynamically. Do not repeatedly call optimal_way_to." - - return json.dumps({"status": "success", "path": waypoints, "message": waypoint_move_result["message"] + "You can call get_drone_status to check the drone's current status."}) - - except Exception as e: - return f"Error executing path finding: {str(e)}\nPlease try finding a path dynamically." - - @tool - def auto_navigate_to3(input_json: str) -> str: - """ - Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly. - - Fixed: Corrected Ellipse dimension interpretation (width/length are radii, not diameters). - """ - import math - import heapq - import json - - # --- 内部几何算法类 --- - class GeometryUtils: - @staticmethod - def dist_sq(p1, p2): - return (p1[0]-p2[0])**2 + (p1[1]-p2[1])**2 - - @staticmethod - def point_to_segment_dist(p, a, b): - """计算点 p 到线段 ab 的最短距离""" - px, py = p[0], p[1] - ax, ay = a[0], a[1] - bx, by = b[0], b[1] - - l2 = (ax - bx)**2 + (ay - by)**2 - if l2 == 0: return math.hypot(px - ax, py - ay) - - t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2 - t = max(0, min(1, t)) - - proj_x = ax + t * (bx - ax) - proj_y = ay + t * (by - ay) - return math.hypot(px - proj_x, py - proj_y) - - @staticmethod - def segments_intersect(a1, a2, b1, b2): - """判断线段 a1-a2 与 b1-b2 是否相交""" - def ccw(A, B, C): - return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) - return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2) - - @staticmethod - def is_point_in_polygon(p, vertices): - """射线法判断点是否在多边形内""" - x, y = p[0], p[1] - inside = False - j = len(vertices) - 1 - for i in range(len(vertices)): - xi, yi = vertices[i]['x'], vertices[i]['y'] - xj, yj = vertices[j]['x'], vertices[j]['y'] - - intersect = ((yi > y) != (yj > y)) and \ - (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) - if intersect: - inside = not inside - j = i - return inside - - @staticmethod - def is_point_in_ellipse(p, center, width, length): - """ - 判断点是否在椭圆内。 - 公式: (x-cx)^2/w^2 + (y-cy)^2/l^2 <= 1 - 注意:此处 width/length 为半轴长(Radius) - """ - dx = p[0] - center['x'] - dy = p[1] - center['y'] - return (dx**2 / width**2) + (dy**2 / length**2) <= 1.0 - - @staticmethod - def check_collision(p1, p2, obs, safety_buffer=2.0): - """ - 检测线段 p1-p2 是否与障碍物 obs 碰撞。 - 返回: (Boolean 是否碰撞, Float 障碍物高度) - """ - otype = obs['type'] - opos = obs['position'] - ox, oy = opos['x'], opos['y'] - obs_height = obs.get('height', 0) - - # 1. 圆形/椭圆/点 (统一使用外接圆做保守检测,保证安全) - if otype in ['circle', 'point', 'ellipse']: - if otype == 'ellipse': - # [修复]: width 和 length 是半轴长,不是直径,不需要除以2 - # 使用最大半轴长作为外接圆半径 - r = max(obs.get('width', 0), obs.get('length', 0)) - else: - r = obs.get('radius', 0) - - limit = r + safety_buffer - dist = GeometryUtils.point_to_segment_dist((ox, oy), p1, p2) - if dist < limit: - return True, obs_height - - # 2. 多边形 - elif otype == 'polygon': - verts = obs['vertices'] - if not verts: return False, 0 - - # A. 边对边相交检测 - for i in range(len(verts)): - v1 = (verts[i]['x'], verts[i]['y']) - v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) - if GeometryUtils.segments_intersect(p1, p2, v1, v2): - return True, obs_height - - # B. 包含检测 - if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts): - return True, obs_height - - return False, 0 - - # --------------------------------------------------------- - - try: - # 1. 解析参数 - params = json.loads(input_json) if isinstance(input_json, str) else input_json - drone_id = params.get('drone_id') - tx, ty, tz = params.get('x'), params.get('y'), params.get('z') - - if not drone_id or tx is None or ty is None or tz is None: - return "Error: drone_id, x, y, z are required" - - # 2. 获取环境信息 - status = client.get_drone_status(drone_id) - start_pos = status['position'] - sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z'] - drone_max_alt = status.get('max_altitude', 100.0) - - all_obstacles = client.get_obstacles() - - # 3. 障碍物分类 - mandatory_avoid = [] - fly_over_candidates = [] - - for obs in all_obstacles: - h = obs.get('height', 0) - if h == 0 or h >= drone_max_alt: - mandatory_avoid.append(obs) - else: - fly_over_candidates.append(obs) - - target_point = (tx, ty) - start_point = (sx, sy) - - # 4. 2D 路径规划 (仅避开 mandatory_avoid) - - # 4.1 生成节点 - nodes = [start_point, target_point] - safety_margin = 5.0 - - for obs in mandatory_avoid: - opos = obs['position'] - ox, oy = opos['x'], opos['y'] - - if obs['type'] == 'polygon': - center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices']) - center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices']) - for v in obs['vertices']: - vx, vy = v['x'], v['y'] - vec_len = math.hypot(vx - center_x, vy - center_y) - if vec_len > 0: - scale = (vec_len + safety_margin) / vec_len - nx = center_x + (vx - center_x) * scale - ny = center_y + (vy - center_y) * scale - nodes.append((nx, ny)) - else: - # 圆/椭圆/点 - r = obs.get('radius', 0) - if obs['type'] == 'ellipse': - # [修复]: 尺寸不需要除以2 - r = max(obs.get('width', 0), obs.get('length', 0)) - - gen_r = r + safety_margin - - # 动态步长:半径越大,采样点越多,防止割圆效应 - num_steps = max(8, int(r / 3.0)) - - for i in range(num_steps): - angle = i * (2 * math.pi / num_steps) - nodes.append((ox + gen_r * math.cos(angle), oy + gen_r * math.sin(angle))) - - # 4.2 过滤非法节点 - valid_nodes = [] - for node in nodes: - is_bad = False - for obs in mandatory_avoid: - # 1. 基础碰撞检测 (外接圆/多边形) - collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=0.5) - if collided: - is_bad = True - break - - # 2. 针对椭圆增加精确检测 (防止点落在外接圆内但在椭圆外,或者是很扁的椭圆导致漏判) - # 主要是为了双重保险 - if obs['type'] == 'ellipse': - w = obs.get('width', 0) - l = obs.get('length', 0) - # 增加一点 buffer 进行点检测 - if GeometryUtils.is_point_in_ellipse(node, obs['position'], w + 0.5, l + 0.5): - is_bad = True - break - - if not is_bad: - valid_nodes.append(node) - - if start_point not in valid_nodes: valid_nodes.insert(0, start_point) - if target_point not in valid_nodes: valid_nodes.append(target_point) - - start_idx = valid_nodes.index(start_point) - target_idx = valid_nodes.index(target_point) - - # 4.3 构建图 - adj = {i: [] for i in range(len(valid_nodes))} - for i in range(len(valid_nodes)): - for j in range(i + 1, len(valid_nodes)): - u, v = valid_nodes[i], valid_nodes[j] - - path_blocked = False - for obs in mandatory_avoid: - hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0) - if hit: - path_blocked = True - break - - if not path_blocked: - dist = math.hypot(u[0]-v[0], u[1]-v[1]) - adj[i].append((j, dist)) - adj[j].append((i, dist)) - - # 4.4 Dijkstra 搜索 - pq = [(0.0, start_idx, [valid_nodes[start_idx]])] - visited = set() - path_2d = [] - - while pq: - cost, u, path = heapq.heappop(pq) - if u == target_idx: - path_2d = path - break - if u in visited: continue - visited.add(u) - - for v_idx, w in adj[u]: - if v_idx not in visited: - heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]])) - - if not path_2d: - return json.dumps({"status": "Failed: No 2D path found."}) - - # 5. 高度计算 - safe_base_alt = max(sz, tz) - path_max_obs_height = 0.0 - - for i in range(len(path_2d) - 1): - p1 = path_2d[i] - p2 = path_2d[i+1] - for obs in fly_over_candidates: - hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0) - if hit: - path_max_obs_height = max(path_max_obs_height, obs_h) - - if path_max_obs_height > 0: - cruise_alt = path_max_obs_height + 2.0 - else: - cruise_alt = safe_base_alt - - cruise_alt = max(cruise_alt, sz, tz) - if cruise_alt > drone_max_alt: - return json.dumps({"status": "Failed: Required altitude exceeds drone capability."}) - - # 6. 生成航点 - waypoints = [] - waypoints.append((sx, sy, sz)) - - if cruise_alt > sz + 0.5: - waypoints.append((sx, sy, cruise_alt)) - - for i in range(len(path_2d)): - node = path_2d[i] - if i == 0 and math.hypot(node[0]-sx, node[1]-sy) < 0.1: - continue - waypoints.append((node[0], node[1], cruise_alt)) - - last_wp = waypoints[-1] - if abs(last_wp[2] - tz) > 0.5 or math.hypot(last_wp[0]-tx, last_wp[1]-ty) > 0.1: - waypoints.append((tx, ty, tz)) - - # 7. 执行 - final_msg = "Success" - for wp in waypoints: - # if wp == waypoints[0] and len(waypoints) > 1: - # continue - - waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2]) - if waypoint_move_result["status"] == "error": - # 返回详细错误以便调试 - return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}" - final_msg = waypoint_move_result.get("message", "Success") - - return json.dumps({"status": "success", "path": waypoints, "message": final_msg}) - - except Exception as e: - return f"Error executing path finding: {str(e)}" - @tool def auto_navigate_to(input_json: str) -> str: """ @@ -1274,117 +847,6 @@ def create_uav_tools(client: UAVAPIClient) -> list: Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}} """ - import math - import heapq - import json - - # --- 内部几何算法类 --- - class GeometryUtils: - @staticmethod - def point_to_segment_dist_sq(p, a, b): - """ - 计算点 p 到线段 ab 的最短距离的平方 (避免开方,提高性能)。 - 返回: 距离的平方 - """ - px, py = p[0], p[1] - ax, ay = a[0], a[1] - bx, by = b[0], b[1] - - l2 = (ax - bx)**2 + (ay - by)**2 - if l2 == 0: return (px - ax)**2 + (py - ay)**2 - - t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2 - t = max(0, min(1, t)) - - proj_x = ax + t * (bx - ax) - proj_y = ay + t * (by - ay) - return (px - proj_x)**2 + (py - proj_y)**2 - - @staticmethod - def segments_intersect(a1, a2, b1, b2): - """判断线段 a1-a2 与 b1-b2 是否相交""" - def ccw(A, B, C): - return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) - return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2) - - @staticmethod - def is_point_in_polygon(p, vertices): - x, y = p[0], p[1] - inside = False - j = len(vertices) - 1 - for i in range(len(vertices)): - xi, yi = vertices[i]['x'], vertices[i]['y'] - xj, yj = vertices[j]['x'], vertices[j]['y'] - intersect = ((yi > y) != (yj > y)) and \ - (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) - if intersect: - inside = not inside - j = i - return inside - - @staticmethod - def check_collision(p1, p2, obs, safety_buffer=2.0): - """ - 检测线段 p1-p2 是否与障碍物 obs 碰撞。 - 返回: (Boolean 是否碰撞, Float 障碍物高度) - """ - otype = obs['type'] - opos = obs['position'] - ox, oy = opos['x'], opos['y'] - obs_height = obs.get('height', 0) - - # 1. 圆形/点 - if otype in ['circle', 'point']: - r = obs.get('radius', 0) - limit = r + safety_buffer - # 使用平方距离比较,避免 sqrt - dist_sq = GeometryUtils.point_to_segment_dist_sq((ox, oy), p1, p2) - if dist_sq < limit**2: - return True, obs_height - - # 2. 椭圆 (使用坐标变换法 - Analytic Solution) - elif otype == 'ellipse': - w = obs.get('width', 0) - l = obs.get('length', 0) - - # 为了处理 Buffer,我们将 Buffer 加到轴长上 - # 这是一种工程近似:虽然真正的 Offset Curve 不是椭圆, - # 但 (w+buffer, l+buffer) 的椭圆是其包围盒,且计算非常快。 - semi_axis_x = w + safety_buffer - semi_axis_y = l + safety_buffer - - # 坐标变换:将世界坐标系的点变换到“单位圆空间” - # 1. 平移:以椭圆中心为原点 - # 2. 缩放:X 除以长轴,Y 除以短轴 - def to_unit_space(point): - dx = point[0] - ox - dy = point[1] - oy - return (dx / semi_axis_x, dy / semi_axis_y) - - u_p1 = to_unit_space(p1) - u_p2 = to_unit_space(p2) - - # 现在问题变成了:线段 u_p1 -> u_p2 是否与 单位圆 (Radius=1) 相交 - # 即:原点 (0,0) 到线段的距离是否 < 1.0 - dist_sq_in_unit_space = GeometryUtils.point_to_segment_dist_sq((0,0), u_p1, u_p2) - - if dist_sq_in_unit_space < 1.0: # 1.0^2 = 1.0 - return True, obs_height - - # 3. 多边形 - elif otype == 'polygon': - verts = obs['vertices'] - if not verts: return False, 0 - for i in range(len(verts)): - v1 = (verts[i]['x'], verts[i]['y']) - v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) - if GeometryUtils.segments_intersect(p1, p2, v1, v2): - return True, obs_height - if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts): - return True, obs_height - - return False, 0 - # --------------------------------------------------------- try: @@ -1416,6 +878,19 @@ def create_uav_tools(client: UAVAPIClient) -> list: target_point = (tx, ty) start_point = (sx, sy) + # === 核心修复 1: 端点合法性预检查 === + for obs in mandatory_avoid: + # 检查终点是否在严格障碍物内 (buffer=0) + in_obs, _ = GeometryUtils.check_collision(target_point, target_point, obs, safety_buffer=0.0) + if in_obs: + return json.dumps({"status": f"Failed: Target position is inside obstacle '{obs.get('name', 'Unknown')}'. Path planning aborted."}) + + # 检查起点是否在严格障碍物内 + in_obs_start, _ = GeometryUtils.check_collision(start_point, start_point, obs, safety_buffer=0.0) + if in_obs_start: + # 如果起点在障碍物内,尝试飞出来(允许) + pass + # 4. 2D 路径规划 # 4.1 生成节点 @@ -1427,7 +902,6 @@ def create_uav_tools(client: UAVAPIClient) -> list: ox, oy = opos['x'], opos['y'] if obs['type'] == 'polygon': - # 多边形节点策略 (保持原样) center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices']) center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices']) for v in obs['vertices']: @@ -1440,7 +914,6 @@ def create_uav_tools(client: UAVAPIClient) -> list: nodes.append((nx, ny)) elif obs['type'] == 'ellipse': - # 椭圆节点生成:依然使用参数方程,因为要生成具体的绕行点坐标 w = obs.get('width', 0) l = obs.get('length', 0) gen_w = w + safety_margin @@ -1465,8 +938,6 @@ def create_uav_tools(client: UAVAPIClient) -> list: for node in nodes: is_bad = False for obs in mandatory_avoid: - # 使用更新后的 check_collision (数学解) 进行点包含测试 - # 线段长度为0即为点 collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=0.5) if collided: is_bad = True @@ -1474,6 +945,7 @@ def create_uav_tools(client: UAVAPIClient) -> list: if not is_bad: valid_nodes.append(node) + # 强制加回起点终点 (即使它们在缓冲区内) if start_point not in valid_nodes: valid_nodes.insert(0, start_point) if target_point not in valid_nodes: valid_nodes.append(target_point) start_idx = valid_nodes.index(start_point) @@ -1485,10 +957,15 @@ def create_uav_tools(client: UAVAPIClient) -> list: for j in range(i + 1, len(valid_nodes)): u, v = valid_nodes[i], valid_nodes[j] + # === 核心修复 2: 动态缓冲区策略 === + # 如果线段连接的是起点或终点,使用“宽松缓冲区”(0.1m) + # 允许无人机在安全的情况下起飞或降落到靠近障碍物的地方 + is_start_or_end_edge = (i == start_idx or i == target_idx or j == start_idx or j == target_idx) + current_buffer = 0.1 if is_start_or_end_edge else 2.0 + path_blocked = False for obs in mandatory_avoid: - # 使用更新后的 check_collision (数学解) 进行线段检测 - hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0) + hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=current_buffer) if hit: path_blocked = True break @@ -1516,7 +993,8 @@ def create_uav_tools(client: UAVAPIClient) -> list: heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]])) if not path_2d: - return json.dumps({"status": "Failed: No 2D path found."}) + # 如果依然找不到路径,说明真的被封死了 + return json.dumps({"status": "Failed: No 2D path found. Target is reachable but likely fully enclosed by obstacles."}) # 5. 高度计算 safe_base_alt = max(sz, tz) @@ -1741,16 +1219,6 @@ def create_uav_tools(client: UAVAPIClient) -> list: return f"Success: Target explored with coverage {current_coverage:.2%} (Visited {tool_states.explored_count}/{total_points} grid points)" return f"Finished path. Final coverage: {current_coverage:.2%}" - - - # drone = client.get_drone_status(drone_id) - # {"id":"c1a5bf4b","name":"Drone 1","model":"Model-A","status":"hovering","position":{"x":613.0,"y":437.0,"z":24.0},"heading":90.0,"speed":0.0,"battery_level":33.48548886660462,"max_speed":16.0,"max_altitude":362.0,"battery_capacity":93.4,"perceived_radius":100.0,"task_radius":10.0,"home_position":{"x":377.0,"y":268.0,"z":0.0},"created_at":1766328129.6861398,"last_updated":1769226225.5813532} - # target = client.get_target_status(target_id) - # {"id":"1122ecf0","name":"Circle Target 1","type":"circle","position":{"x":463.0,"y":312.0,"z":0.0},"description":"","radius":200.0,"created_at":1766328129.676041,"last_updated":1769224819.8737159,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"]} - # {"id":"d6816dd4","name":"Polygon Target 1","type":"polygon","position":{"x":911.0,"y":552.0,"z":0.0},"description":"","radius":1.0,"created_at":1766328129.677602,"last_updated":1769225798.403095,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"],"vertices":[{"x":831.08,"y":471.74,"z":0.0},{"x":991.08,"y":471.74,"z":0.0},{"x":991.08,"y":631.74,"z":0.0},{"x":831.08,"y":631.74,"z":0.0}]} - # {"id":"81f45d4c","name":"BS_01 PLA West","type":"waypoint","position":{"x":1065.0,"y":1200.0,"z":0.0},"description":"","radius":10.0,"created_at":1767770673.5973494,"last_updated":1768203787.0783787,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":30.0,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]} - # {"id":"05c89883","name":"Hong Kong International Airport Restricted Zone","type":"fixed","position":{"x":1900.0,"y":200.0,"z":0.0},"description":"No-fly zone due to airport operations. Drones must stay clear.","radius":150.0,"created_at":1767767958.9605737,"last_updated":1768204251.032104,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]} - # return drone['position'] == target['position'] # Return all tools @@ -1769,6 +1237,7 @@ def create_uav_tools(client: UAVAPIClient) -> list: land, move_to, auto_navigate_to, + auto_navigate_towards, move_towards, change_altitude, hover,