Files
masa-agent/uav_langchain_tools.py
2026-01-24 11:05:52 +08:00

1288 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
LangChain Tools for UAV Control
Wraps the UAV API client as LangChain tools using @tool decorator
All tools accept JSON string input for consistent parameter handling
"""
from langchain.tools import tool
from uav_api_client import UAVAPIClient
import json
def create_uav_tools(client: UAVAPIClient) -> list:
"""
Create all UAV control tools for LangChain agent using @tool decorator
All tools that require parameters accept a JSON string input
"""
# ========== Information Gathering Tools (No Parameters) ==========
@tool
def list_drones() -> str:
"""List all available drones in the current session with their status, battery level, and position.
Use this to see what drones are available before trying to control them.
No input required."""
try:
drones = client.list_drones()
return json.dumps(drones, indent=2)
except Exception as e:
return f"Error listing drones: {str(e)}"
@tool
def get_session_info() -> str:
"""Get current session information including task type, statistics, and status.
Use this to understand what mission you need to complete.
No input required."""
try:
session = client.get_current_session()
return json.dumps(session, indent=2)
except Exception as e:
return f"Error getting session info: {str(e)}"
@tool
def get_session_data() -> str:
"""Get all session data including drones, targets, and obstacles.
Use this to understand the environment and plan your mission.
No input required."""
try:
session_data = client.get_session_data()
return json.dumps(session_data, indent=2)
except Exception as e:
return f"Error getting session data: {str(e)}"
@tool
def get_task_progress() -> str:
"""Get mission task progress including completion percentage and status.
Use this to track mission completion and see how close you are to finishing.
No input required."""
try:
progress = client.get_task_progress()
return json.dumps(progress, indent=2)
except Exception as e:
return f"Error getting task progress: {str(e)}"
@tool
def get_weather() -> str:
"""Get current weather conditions including wind speed, visibility, and weather type.
Check this before takeoff to ensure safe flying conditions.
No input required."""
try:
weather = client.get_weather()
return json.dumps(weather, indent=2)
except Exception as e:
return f"Error getting weather: {str(e)}"
@tool
def get_targets() -> str:
"""Get all targets in the session including fixed, moving, waypoint, circle and polygon to search or patrol.
Use this to see what targets you need to visit.
No input required."""
try:
targets = client.get_targets()
return json.dumps(targets, indent=2)
except Exception as e:
return f"Error getting targets: {str(e)}"
@tool
def get_all_waypoints() -> str:
"""Get all waypoints in the session including coordinates and altitude.
Use this to understand the where to charge that drones will follow.
No input required."""
try:
waypoints = client.get_all_waypoints()
return json.dumps(waypoints, indent=2)
except Exception as e:
return f"Error getting waypoints: {str(e)}"
@tool
def get_obstacles() -> str:
"""Get all obstacles in the session that drones must avoid.
Use this to understand what obstacles exist in the environment.
No input required."""
try:
obstacles = client.get_obstacles()
return json.dumps(obstacles, indent=2)
except Exception as e:
return f"Error getting obstacles: {str(e)}"
@tool
def get_drone_status(input_json: str) -> str:
"""Get detailed status of a specific drone including position, battery, heading, and visited targets.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "04d6cfe7"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
status = client.get_drone_status(drone_id)
return json.dumps(status, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error getting drone status: {str(e)}"
@tool
def get_nearby_entities(input_json: str) -> str:
"""Get drones, targets, and obstacles near a specific drone (within its perception radius).
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
nearby = client.get_nearby_entities(drone_id)
return json.dumps(nearby, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error getting nearby entities: {str(e)}"
@tool
def land(input_json: str) -> str:
"""Command a drone to land at its current position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.land(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error during landing: {str(e)}"
@tool
def hover(input_json: str) -> str:
"""Command a drone to hover at its current position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- duration: Optional duration in seconds to hover (optional)
Example: {{"drone_id": "drone-001", "duration": 5.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
duration = params.get('duration')
if not drone_id:
return "Error: drone_id is required"
result = client.hover(drone_id, duration)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error hovering: {str(e)}"
@tool
def return_home(input_json: str) -> str:
"""Command a drone to return to its home position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.return_home(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error returning home: {str(e)}"
@tool
def set_home(input_json: str) -> str:
"""Set the drone's current position as its new home position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.set_home(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error setting home: {str(e)}"
@tool
def calibrate(input_json: str) -> str:
"""Calibrate the drone's sensors.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.calibrate(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error calibrating: {str(e)}"
@tool
def take_photo(input_json: str) -> str:
"""Command a drone to take a photo.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.take_photo(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error taking photo: {str(e)}"
# ========== Two Parameter Tools ==========
@tool
def take_off(input_json: str) -> str:
"""Command a drone to take off to a specified altitude.
Drone must be on ground (idle or ready status).
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- altitude: Target altitude in meters (optional, default: 10.0)
Example: {{"drone_id": "drone-001", "altitude": 15.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
altitude = params.get('altitude', 10.0)
if not drone_id:
return "Error: drone_id is required"
result = client.take_off(drone_id, altitude)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"altitude\": 15.0}}"
except Exception as e:
return f"Error during takeoff: {str(e)}"
@tool
def change_altitude(input_json: str) -> str:
"""Change a drone's altitude while maintaining X/Y position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- altitude: Target altitude in meters (required)
Example: {{"drone_id": "drone-001", "altitude": 20.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
altitude = params.get('altitude')
if not drone_id:
return "Error: drone_id is required"
if altitude is None:
return "Error: altitude is required"
result = client.change_altitude(drone_id, altitude)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"altitude\": 20.0}}"
except Exception as e:
return f"Error changing altitude: {str(e)}"
@tool
def rotate(input_json: str) -> str:
"""Rotate a drone to face a specific direction.
0=North, 90=East, 180=South, 270=West.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- heading: Target heading in degrees 0-360 (required)
Example: {{"drone_id": "drone-001", "heading": 90.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
heading = params.get('heading')
if not drone_id:
return "Error: drone_id is required"
if heading is None:
return "Error: heading is required"
result = client.rotate(drone_id, heading)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"heading\": 90.0}}"
except Exception as e:
return f"Error rotating: {str(e)}"
@tool
def send_message(input_json: str) -> str:
"""Send a message from one drone to another.
Input should be a JSON string with:
- drone_id: The ID of the sender drone (required, get from Action list_drones)
- target_drone_id: The ID of the recipient drone (required, get from Action list_drones)
- message: The message content (required)
Example: {{"drone_id": "drone-001", "target_drone_id": "drone-002", "message": "Hello"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
target_drone_id = params.get('target_drone_id')
message = params.get('message')
if not drone_id:
return "Error: drone_id is required"
if not target_drone_id:
return "Error: target_drone_id is required"
if not message:
return "Error: message is required"
result = client.send_message(drone_id, target_drone_id, message)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"target_drone_id\": \"drone-002\", \"message\": \"...\"}}"
except Exception as e:
return f"Error sending message: {str(e)}"
@tool
def broadcast(input_json: str) -> str:
"""Broadcast a message from one drone to all other drones.
Input should be a JSON string with:
- drone_id: The ID of the sender drone (required, get from Action list_drones)
- message: The message content (required)
Example: {{"drone_id": "drone-001", "message": "Alert"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
message = params.get('message')
if not drone_id:
return "Error: drone_id is required"
if not message:
return "Error: message is required"
result = client.broadcast(drone_id, message)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"message\": \"...\"}}"
except Exception as e:
return f"Error broadcasting: {str(e)}"
@tool
def charge(input_json: str) -> str:
"""Command a drone to charge its battery.
Drone must be landed at a charging station.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- charge_amount: Amount to charge in percent (required)
Example: {{"drone_id": "drone-001", "charge_amount": 25.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
charge_amount = params.get('charge_amount')
if not drone_id:
return "Error: drone_id is required"
if charge_amount is None:
return "Error: charge_amount is required"
result = client.charge(drone_id, charge_amount)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"charge_amount\": 25.0}}"
except Exception as e:
return f"Error charging: {str(e)}"
@tool
def move_towards(input_json: str) -> str:
"""Move a drone a specific distance in a direction.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- distance: Distance to move in meters (required)
- heading: Heading direction in degrees 0-360 (optional, default: current heading)
- dz: Vertical component in meters (optional)
Example: {{"drone_id": "drone-001", "distance": 10.0, "heading": 90.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
distance = params.get('distance')
heading = params.get('heading')
dz = params.get('dz')
if not drone_id:
return "Error: drone_id is required"
if distance is None:
return "Error: distance is required"
result = client.move_towards(drone_id, distance, heading, dz)
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) Try other tools like `move_to` or `auto_navigate_to`"
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"distance\": 10.0}}"
except Exception as e:
return f"Error moving towards: {str(e)}"
@tool
def auto_navigate_move_towards(input_json: str) -> str:
# TODO 能不能把auto navigate和move towards结合起来
"""Navigate the drone to move a drone a specific distance in a direction.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
- distance: Distance to move in meters (required)
- heading: Heading direction in degrees 0-360 (optional, default: current heading)
- dz: Vertical component in meters (optional)
Example: {{"drone_id": "drone-001", "distance": 10.0, "heading": 90.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
distance = params.get('distance')
heading = params.get('heading')
dz = params.get('dz')
if not drone_id:
return "Error: drone_id is required"
if distance is None:
return "Error: distance is required"
result = client.move_towards(drone_id, distance, heading, dz)
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) Try other tools like `move_to` or `auto_navigate_to`"
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"distance\": 10.0}}"
except Exception as e:
return f"Error moving towards: {str(e)}"
# @tool
# def move_along_path(input_json: str) -> str:
# """Move a drone along a path of waypoints.
# Input should be a JSON string with:
# - drone_id: The ID of the drone (required)
# - waypoints: List of points with x, y, z coordinates (required)
# Example: {{"drone_id": "drone-001", "waypoints": [{{"x": 10, "y": 10, "z": 10}}, {{"x": 20, "y": 20, "z": 10}}]}}
# """
# try:
# params = json.loads(input_json) if isinstance(input_json, str) else input_json
# drone_id = params.get('drone_id')
# waypoints = params.get('waypoints')
# if not drone_id:
# return "Error: drone_id is required"
# if not waypoints:
# return "Error: waypoints list is required"
# result = client.move_along_path(drone_id, waypoints)
# return json.dumps(result, indent=2)
# except json.JSONDecodeError as e:
# return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"waypoints\": [...]}}"
# except Exception as e:
# return f"Error moving along path: {str(e)}"
# ========== Multi-Parameter Tools ==========
@tool
def get_nearest_waypoint(input_json: str) -> str:
"""Get the nearest waypoint to a specific drone.
Input should be a JSON string with:
- x: The x-coordinate of the drone (required)
- y: The y-coordinate of the drone (required)
- z: The z-coordinate of the drone (required)
Example: {{"x": 0.0, "y": 0.0, "z": 0.0}}"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
x = params.get('x')
y = params.get('y')
z = params.get('z')
if x is None or y is None or z is None:
return "Error: x, y, and z coordinates are required"
nearest = client.get_nearest_waypoint(x, y, z)
return json.dumps(nearest, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"x\": 0.0, \"y\": 0.0, \"z\": 0.0}}"
except Exception as e:
return f"Error getting nearest waypoint: {str(e)}"
@tool
def move_to(input_json: str) -> str:
"""Move a drone to specific 3D coordinates (x, y, z).
Always check for collisions first using check_path_collision.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- x: Target X coordinate in meters (required)
- y: Target Y coordinate in meters (required)
- z: Target Z coordinate (altitude) in meters (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
x = params.get('x')
y = params.get('y')
z = params.get('z')
if not drone_id:
return "Error: drone_id is required"
if x is None or y is None or z is None:
return "Error: x, y, and z coordinates are required"
result = client.move_to(drone_id, x, y, z)
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour)."
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}"
except Exception as e:
return f"Error moving drone: {str(e)}"
@tool
def auto_navigate_to2(input_json: str) -> str:
"""
Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly:
1. Detour around obstacles that cannot be overflown—such as those exceeding the drones maximum operational altitude or located within no-fly zones.
2. For obstacles that can be safely overflown, automatically compute a safe flight altitude that exceeds the obstacles height while remaining within the drones operational ceiling, and incorporate this into the vertical profile of the trajectory.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- x: Target X coordinate in meters (required)
- y: Target Y coordinate in meters (required)
- z: Target Z coordinate (altitude) in meters (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
import math
import heapq
# --- 内部几何算法类 (封装以保持主逻辑清晰) ---
class GeometryUtils:
@staticmethod
def dist_sq(p1, p2):
return (p1[0]-p2[0])**2 + (p1[1]-p2[1])**2
@staticmethod
def point_to_segment_dist(p, a, b):
"""计算点 p 到线段 ab 的最短距离"""
px, py = p[0], p[1]
ax, ay = a[0], a[1]
bx, by = b[0], b[1]
l2 = (ax - bx)**2 + (ay - by)**2
if l2 == 0: return math.hypot(px - ax, py - ay)
t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2
t = max(0, min(1, t))
proj_x = ax + t * (bx - ax)
proj_y = ay + t * (by - ay)
return math.hypot(px - proj_x, py - proj_y)
@staticmethod
def segments_intersect(a1, a2, b1, b2):
"""判断线段 a1-a2 与 b1-b2 是否相交"""
def ccw(A, B, C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2)
@staticmethod
def is_point_in_polygon(p, vertices):
"""射线法判断点是否在多边形内"""
x, y = p[0], p[1]
inside = False
j = len(vertices) - 1
for i in range(len(vertices)):
xi, yi = vertices[i]['x'], vertices[i]['y']
xj, yj = vertices[j]['x'], vertices[j]['y']
intersect = ((yi > y) != (yj > y)) and \
(x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi)
if intersect:
inside = not inside
j = i
return inside
@staticmethod
def check_collision(p1, p2, obs, safety_buffer=2.0):
"""
检测线段 p1-p2 是否与障碍物 obs 碰撞。
返回: (Boolean 是否碰撞, Float 障碍物高度)
"""
otype = obs['type']
opos = obs['position']
ox, oy = opos['x'], opos['y']
obs_height = obs.get('height', 0)
# 1. 圆形/椭圆/点 (简化为圆)
if otype in ['circle', 'point', 'ellipse']:
if otype == 'ellipse':
r = max(obs.get('width', 0), obs.get('length', 0)) / 2.0
else:
r = obs.get('radius', 0)
limit = r + safety_buffer
dist = GeometryUtils.point_to_segment_dist((ox, oy), p1, p2)
if dist < limit:
return True, obs_height
# 2. 多边形
elif otype == 'polygon':
verts = obs['vertices']
if not verts: return False, 0
# A. 边对边相交检测
for i in range(len(verts)):
v1 = (verts[i]['x'], verts[i]['y'])
v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y'])
if GeometryUtils.segments_intersect(p1, p2, v1, v2):
return True, obs_height
# B. 包含检测 (起点或终点在多边形内)
# 注意:这里不加 safety_buffer 进行包含检测,因为如果点在内部必撞
# 若要处理 buffer需要做多边形 offset这里略过复杂操作依赖边相交检测
if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts):
return True, obs_height
return False, 0
# ---------------------------------------------------------
try:
# 1. 解析参数
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
tx, ty, tz = params.get('x'), params.get('y'), params.get('z')
if not drone_id or tx is None or ty is None or tz is None:
return "Error: drone_id, x, y, z are required"
direct_move_result = client.move_to(drone_id, tx, ty, tz)
if direct_move_result["status"] != "error":
return json.dumps(direct_move_result, indent=2)
# 2. 获取环境信息
status = client.get_drone_status(drone_id)
start_pos = status['position']
sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z']
drone_max_alt = status.get('max_altitude', 100.0)
all_obstacles = client.get_obstacles()
# 3. 障碍物分类
# mandatory_avoid: 必须绕路 (高度 >= 无人机极限 或 高度为0的禁飞区)
# fly_over_candidates: 可能可以飞跃 (高度 < 无人机极限)
mandatory_avoid = []
fly_over_candidates = []
for obs in all_obstacles:
h = obs.get('height', 0)
if h == 0 or h >= drone_max_alt:
mandatory_avoid.append(obs)
else:
fly_over_candidates.append(obs)
target_point = (tx, ty)
start_point = (sx, sy)
# 4. 2D 路径规划 (仅避开 mandatory_avoid)
# 4.1 生成节点
# 包括起点、终点
nodes = [start_point, target_point]
# 为必须绕行的障碍物生成关键点 (外扩 5m)
safety_margin = 5.0
for obs in mandatory_avoid:
opos = obs['position']
ox, oy = opos['x'], opos['y']
if obs['type'] == 'polygon':
# 简单策略:取多边形顶点并向外延伸
# 更稳健的方法:不做复杂几何外扩,直接取顶点,但在碰撞检测时留余量
# 这里为了路径稀疏性,取顶点 + 向量外扩
center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices'])
center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices'])
for v in obs['vertices']:
vx, vy = v['x'], v['y']
vec_len = math.hypot(vx - center_x, vy - center_y)
if vec_len > 0:
scale = (vec_len + safety_margin) / vec_len
nx = center_x + (vx - center_x) * scale
ny = center_y + (vy - center_y) * scale
nodes.append((nx, ny))
else:
# 圆/椭圆/点
r = obs.get('radius', 0)
if obs['type'] == 'ellipse':
r = max(obs.get('width', 0), obs.get('length', 0)) / 2.0
r += safety_margin
nodes.append((ox + r, oy))
nodes.append((ox - r, oy))
nodes.append((ox, oy + r))
nodes.append((ox, oy - r))
# 4.2 过滤非法节点 (落入其他障碍物内的节点)
valid_nodes = []
for node in nodes:
is_bad = False
for obs in mandatory_avoid:
# 这里借用 check_collision 检查点是否在障碍物内 (把线段设为点到点)
# 使用稍小的 buffer 确保节点不紧贴障碍物
collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=2.0)
if collided:
is_bad = True
break
if not is_bad:
valid_nodes.append(node)
# 确保起点和终点在 valid_nodes 中 (如果起点就在禁飞区,这里会抛出无解,符合逻辑)
# 为防止浮点误差导致起点被过滤,强制加回起点终点(如果它们真的在障碍物内,后面边的检测会挡住)
if start_point not in valid_nodes: valid_nodes.insert(0, start_point)
if target_point not in valid_nodes: valid_nodes.append(target_point)
# 重新映射索引
start_idx = valid_nodes.index(start_point)
target_idx = valid_nodes.index(target_point)
# 4.3 构建图 (Dijkstra)
# 只检测 mandatory_avoid
adj = {i: [] for i in range(len(valid_nodes))}
for i in range(len(valid_nodes)):
for j in range(i + 1, len(valid_nodes)):
u, v = valid_nodes[i], valid_nodes[j]
# 检测边 u-v 是否碰撞 mandatory_avoid
path_blocked = False
for obs in mandatory_avoid:
hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0)
if hit:
path_blocked = True
break
if not path_blocked:
dist = math.hypot(u[0]-v[0], u[1]-v[1])
adj[i].append((j, dist))
adj[j].append((i, dist))
# 4.4 搜索最短 2D 路径
pq = [(0.0, start_idx, [valid_nodes[start_idx]])]
visited = set()
path_2d = []
while pq:
cost, u, path = heapq.heappop(pq)
if u == target_idx:
path_2d = path
break
if u in visited: continue
visited.add(u)
for v_idx, w in adj[u]:
if v_idx not in visited:
heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]]))
if not path_2d:
return json.dumps({"status": "Failed: No 2D path found. Target might be in restricted area."})
# 5. 路径高度计算 (Path Refinement)
# 我们现在有了一条避开了“不可飞越障碍物”的 2D 路径。
# 现在需要沿着这条路径,检查它是否穿过了“可飞越障碍物”,并计算所需高度。
# 基础安全高度:起点高度、目标高度、以及基本的最低飞行高度
safe_base_alt = max(sz, tz)
# 我们需要分段处理,因为路径不同路段可能经过不同高度的障碍物
# 简单策略:计算整条路径所需的全局最大安全高度 (Global Safe Altitude for this path)
# 优化策略:虽然可以做分段高度,但在实际无人机操作中,保持平稳的一个巡航高度通常更安全且节能
path_max_obs_height = 0.0
for i in range(len(path_2d) - 1):
p1 = path_2d[i]
p2 = path_2d[i+1]
# 检查此路段与所有 fly_over_candidates 的碰撞情况
for obs in fly_over_candidates:
hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0)
if hit:
# 如果撞到了这个可飞越障碍物,我们需要比它高
path_max_obs_height = max(path_max_obs_height, obs_h)
# 最终决定的飞行高度 (加 2米 安全余量)
if path_max_obs_height > 0:
cruise_alt = path_max_obs_height + 2.0
else:
cruise_alt = safe_base_alt
# 确保不低于起点和终点,且不超过最大限高
cruise_alt = max(cruise_alt, sz, tz)
if cruise_alt > drone_max_alt:
return json.dumps({"status": "Failed: Required altitude exceeds drone capability. Please try finding a path step by step dynamically. Do not repeatedly call optimal_way_to."})
# 6. 构建最终 3D 航点
# 逻辑:
# 1. 如果需要爬升,先在当前位置 (sx, sy) 爬升到 cruise_alt
# 2. 平飞经过所有 2D 路径点 (高度 = cruise_alt)
# 3. 最后如果在目标点上方,下降/调整到 tz
waypoints = []
# 起点
current_waypoint = (sx, sy, sz)
waypoints.append(current_waypoint)
# 爬升阶段 (如果巡航高度高于当前高度)
if cruise_alt > sz + 0.5: # 0.5 作为浮点容差
waypoints.append((sx, sy, cruise_alt))
# 巡航阶段 (中间节点)
# 跳过 path_2d[0] 因为它是起点
for i in range(len(path_2d)):
node = path_2d[i]
waypoints.append((node[0], node[1], cruise_alt))
# 接近终点阶段
# 先平飞到终点上方
waypoints.append((tx, ty, cruise_alt))
# 如果巡航高度与目标高度不同,最后调整高度
if abs(cruise_alt - tz) > 0.5:
waypoints.append((tx, ty, tz))
# 7. 执行飞行
for wp in waypoints:
# 跳过与当前位置极近的点 (避免重复指令)
if wp == waypoints[0] and len(waypoints) > 1:
pass # 仅作为记录,不发送指令,或者如果这是唯一的点则发送
# 发送指令
# 注意:实际 API 调用中如果点就是当前位置可能需要跳过不管了懒得跳了多调用一次client.move_to()又不花钱)
waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2])
if waypoint_move_result["status"] == "error":
print(f"Error moving to waypoint {wp}: {waypoint_move_result['message']}")
return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}\nPlease try finding a path step by stepdynamically. Do not repeatedly call optimal_way_to."
return json.dumps({"status": "success", "path": waypoints, "message": waypoint_move_result["message"] + "You can call get_drone_status to check the drone's current status."})
except Exception as e:
return f"Error executing path finding: {str(e)}\nPlease try finding a path dynamically."
@tool
def auto_navigate_to(input_json: str) -> str:
"""
Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly.
Fixed: Corrected Ellipse dimension interpretation (width/length are radii, not diameters).
"""
import math
import heapq
import json
# --- 内部几何算法类 ---
class GeometryUtils:
@staticmethod
def dist_sq(p1, p2):
return (p1[0]-p2[0])**2 + (p1[1]-p2[1])**2
@staticmethod
def point_to_segment_dist(p, a, b):
"""计算点 p 到线段 ab 的最短距离"""
px, py = p[0], p[1]
ax, ay = a[0], a[1]
bx, by = b[0], b[1]
l2 = (ax - bx)**2 + (ay - by)**2
if l2 == 0: return math.hypot(px - ax, py - ay)
t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2
t = max(0, min(1, t))
proj_x = ax + t * (bx - ax)
proj_y = ay + t * (by - ay)
return math.hypot(px - proj_x, py - proj_y)
@staticmethod
def segments_intersect(a1, a2, b1, b2):
"""判断线段 a1-a2 与 b1-b2 是否相交"""
def ccw(A, B, C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2)
@staticmethod
def is_point_in_polygon(p, vertices):
"""射线法判断点是否在多边形内"""
x, y = p[0], p[1]
inside = False
j = len(vertices) - 1
for i in range(len(vertices)):
xi, yi = vertices[i]['x'], vertices[i]['y']
xj, yj = vertices[j]['x'], vertices[j]['y']
intersect = ((yi > y) != (yj > y)) and \
(x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi)
if intersect:
inside = not inside
j = i
return inside
@staticmethod
def is_point_in_ellipse(p, center, width, length):
"""
判断点是否在椭圆内。
公式: (x-cx)^2/w^2 + (y-cy)^2/l^2 <= 1
注意:此处 width/length 为半轴长(Radius)
"""
dx = p[0] - center['x']
dy = p[1] - center['y']
return (dx**2 / width**2) + (dy**2 / length**2) <= 1.0
@staticmethod
def check_collision(p1, p2, obs, safety_buffer=2.0):
"""
检测线段 p1-p2 是否与障碍物 obs 碰撞。
返回: (Boolean 是否碰撞, Float 障碍物高度)
"""
otype = obs['type']
opos = obs['position']
ox, oy = opos['x'], opos['y']
obs_height = obs.get('height', 0)
# 1. 圆形/椭圆/点 (统一使用外接圆做保守检测,保证安全)
if otype in ['circle', 'point', 'ellipse']:
if otype == 'ellipse':
# [修复]: width 和 length 是半轴长不是直径不需要除以2
# 使用最大半轴长作为外接圆半径
r = max(obs.get('width', 0), obs.get('length', 0))
else:
r = obs.get('radius', 0)
limit = r + safety_buffer
dist = GeometryUtils.point_to_segment_dist((ox, oy), p1, p2)
if dist < limit:
return True, obs_height
# 2. 多边形
elif otype == 'polygon':
verts = obs['vertices']
if not verts: return False, 0
# A. 边对边相交检测
for i in range(len(verts)):
v1 = (verts[i]['x'], verts[i]['y'])
v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y'])
if GeometryUtils.segments_intersect(p1, p2, v1, v2):
return True, obs_height
# B. 包含检测
if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts):
return True, obs_height
return False, 0
# ---------------------------------------------------------
try:
# 1. 解析参数
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
tx, ty, tz = params.get('x'), params.get('y'), params.get('z')
if not drone_id or tx is None or ty is None or tz is None:
return "Error: drone_id, x, y, z are required"
# 2. 获取环境信息
status = client.get_drone_status(drone_id)
start_pos = status['position']
sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z']
drone_max_alt = status.get('max_altitude', 100.0)
all_obstacles = client.get_obstacles()
# 3. 障碍物分类
mandatory_avoid = []
fly_over_candidates = []
for obs in all_obstacles:
h = obs.get('height', 0)
if h == 0 or h >= drone_max_alt:
mandatory_avoid.append(obs)
else:
fly_over_candidates.append(obs)
target_point = (tx, ty)
start_point = (sx, sy)
# 4. 2D 路径规划 (仅避开 mandatory_avoid)
# 4.1 生成节点
nodes = [start_point, target_point]
safety_margin = 5.0
for obs in mandatory_avoid:
opos = obs['position']
ox, oy = opos['x'], opos['y']
if obs['type'] == 'polygon':
center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices'])
center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices'])
for v in obs['vertices']:
vx, vy = v['x'], v['y']
vec_len = math.hypot(vx - center_x, vy - center_y)
if vec_len > 0:
scale = (vec_len + safety_margin) / vec_len
nx = center_x + (vx - center_x) * scale
ny = center_y + (vy - center_y) * scale
nodes.append((nx, ny))
else:
# 圆/椭圆/点
r = obs.get('radius', 0)
if obs['type'] == 'ellipse':
# [修复]: 尺寸不需要除以2
r = max(obs.get('width', 0), obs.get('length', 0))
gen_r = r + safety_margin
# 动态步长:半径越大,采样点越多,防止割圆效应
num_steps = max(8, int(r / 3.0))
for i in range(num_steps):
angle = i * (2 * math.pi / num_steps)
nodes.append((ox + gen_r * math.cos(angle), oy + gen_r * math.sin(angle)))
# 4.2 过滤非法节点
valid_nodes = []
for node in nodes:
is_bad = False
for obs in mandatory_avoid:
# 1. 基础碰撞检测 (外接圆/多边形)
collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=0.5)
if collided:
is_bad = True
break
# 2. 针对椭圆增加精确检测 (防止点落在外接圆内但在椭圆外,或者是很扁的椭圆导致漏判)
# 主要是为了双重保险
if obs['type'] == 'ellipse':
w = obs.get('width', 0)
l = obs.get('length', 0)
# 增加一点 buffer 进行点检测
if GeometryUtils.is_point_in_ellipse(node, obs['position'], w + 0.5, l + 0.5):
is_bad = True
break
if not is_bad:
valid_nodes.append(node)
if start_point not in valid_nodes: valid_nodes.insert(0, start_point)
if target_point not in valid_nodes: valid_nodes.append(target_point)
start_idx = valid_nodes.index(start_point)
target_idx = valid_nodes.index(target_point)
# 4.3 构建图
adj = {i: [] for i in range(len(valid_nodes))}
for i in range(len(valid_nodes)):
for j in range(i + 1, len(valid_nodes)):
u, v = valid_nodes[i], valid_nodes[j]
path_blocked = False
for obs in mandatory_avoid:
hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0)
if hit:
path_blocked = True
break
if not path_blocked:
dist = math.hypot(u[0]-v[0], u[1]-v[1])
adj[i].append((j, dist))
adj[j].append((i, dist))
# 4.4 Dijkstra 搜索
pq = [(0.0, start_idx, [valid_nodes[start_idx]])]
visited = set()
path_2d = []
while pq:
cost, u, path = heapq.heappop(pq)
if u == target_idx:
path_2d = path
break
if u in visited: continue
visited.add(u)
for v_idx, w in adj[u]:
if v_idx not in visited:
heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]]))
if not path_2d:
return json.dumps({"status": "Failed: No 2D path found."})
# 5. 高度计算
safe_base_alt = max(sz, tz)
path_max_obs_height = 0.0
for i in range(len(path_2d) - 1):
p1 = path_2d[i]
p2 = path_2d[i+1]
for obs in fly_over_candidates:
hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0)
if hit:
path_max_obs_height = max(path_max_obs_height, obs_h)
if path_max_obs_height > 0:
cruise_alt = path_max_obs_height + 2.0
else:
cruise_alt = safe_base_alt
cruise_alt = max(cruise_alt, sz, tz)
if cruise_alt > drone_max_alt:
return json.dumps({"status": "Failed: Required altitude exceeds drone capability."})
# 6. 生成航点
waypoints = []
waypoints.append((sx, sy, sz))
if cruise_alt > sz + 0.5:
waypoints.append((sx, sy, cruise_alt))
for i in range(len(path_2d)):
node = path_2d[i]
if i == 0 and math.hypot(node[0]-sx, node[1]-sy) < 0.1:
continue
waypoints.append((node[0], node[1], cruise_alt))
last_wp = waypoints[-1]
if abs(last_wp[2] - tz) > 0.5 or math.hypot(last_wp[0]-tx, last_wp[1]-ty) > 0.1:
waypoints.append((tx, ty, tz))
# 7. 执行
final_msg = "Success"
for wp in waypoints:
# if wp == waypoints[0] and len(waypoints) > 1:
# continue
waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2])
if waypoint_move_result["status"] == "error":
# 返回详细错误以便调试
return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}"
final_msg = waypoint_move_result.get("message", "Success")
return json.dumps({"status": "success", "path": waypoints, "message": final_msg})
except Exception as e:
return f"Error executing path finding: {str(e)}"
# Return all tools
return [
list_drones,
get_drone_status,
get_session_info,
# get_session_data,
get_task_progress,
get_weather,
# get_targets,
get_obstacles,
get_nearby_entities,
take_off,
land,
move_to,
auto_navigate_to,
move_towards,
change_altitude,
hover,
rotate,
return_home,
set_home,
calibrate,
take_photo,
send_message,
broadcast,
charge,
get_nearest_waypoint,
get_all_waypoints,
get_targets
]