diff --git a/uav_api_client.py b/uav_api_client.py index c437d15..324f7f9 100644 --- a/uav_api_client.py +++ b/uav_api_client.py @@ -319,6 +319,10 @@ class UAVAPIClient: circle = self._request('GET', '/targets/type/circle') polygen = self._request('GET', '/targets/type/polygon') return fixed + moving + waypoint + circle + polygen + + def get_target_status(self, target_id: str) -> Dict[str, Any]: + """Get information about a specific target""" + return self._request('GET', f'/targets/{target_id}') def get_waypoints(self) -> List[Dict[str, Any]]: """Get all charging station waypoints""" diff --git a/uav_langchain_tools.py b/uav_langchain_tools.py index 5499042..a2a3592 100644 --- a/uav_langchain_tools.py +++ b/uav_langchain_tools.py @@ -7,6 +7,11 @@ from langchain.tools import tool from uav_api_client import UAVAPIClient import json +class ToolStates: + def __init__(self): + self.explored_count = 0 + +tool_states = ToolStates() def create_uav_tools(client: UAVAPIClient) -> list: """ @@ -952,7 +957,7 @@ def create_uav_tools(client: UAVAPIClient) -> list: return f"Error executing path finding: {str(e)}\nPlease try finding a path dynamically." @tool - def auto_navigate_to(input_json: str) -> str: + def auto_navigate_to3(input_json: str) -> str: """ Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly. @@ -1255,6 +1260,499 @@ def create_uav_tools(client: UAVAPIClient) -> list: except Exception as e: return f"Error executing path finding: {str(e)}" + + @tool + def auto_navigate_to(input_json: str) -> str: + """ + Plan an obstacle-avoiding path using analytic geometry for precise collision detection. + + Input should be a JSON string with: + - drone_id: The ID of the drone (required) + - x: Target X coordinate in meters (required) + - y: Target Y coordinate in meters (required) + - z: Target Z coordinate (altitude) in meters (required) + + Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}} + """ + import math + import heapq + import json + + # --- 内部几何算法类 --- + class GeometryUtils: + @staticmethod + def point_to_segment_dist_sq(p, a, b): + """ + 计算点 p 到线段 ab 的最短距离的平方 (避免开方,提高性能)。 + 返回: 距离的平方 + """ + px, py = p[0], p[1] + ax, ay = a[0], a[1] + bx, by = b[0], b[1] + + l2 = (ax - bx)**2 + (ay - by)**2 + if l2 == 0: return (px - ax)**2 + (py - ay)**2 + + t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2 + t = max(0, min(1, t)) + + proj_x = ax + t * (bx - ax) + proj_y = ay + t * (by - ay) + return (px - proj_x)**2 + (py - proj_y)**2 + + @staticmethod + def segments_intersect(a1, a2, b1, b2): + """判断线段 a1-a2 与 b1-b2 是否相交""" + def ccw(A, B, C): + return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) + return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2) + + @staticmethod + def is_point_in_polygon(p, vertices): + x, y = p[0], p[1] + inside = False + j = len(vertices) - 1 + for i in range(len(vertices)): + xi, yi = vertices[i]['x'], vertices[i]['y'] + xj, yj = vertices[j]['x'], vertices[j]['y'] + intersect = ((yi > y) != (yj > y)) and \ + (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) + if intersect: + inside = not inside + j = i + return inside + + @staticmethod + def check_collision(p1, p2, obs, safety_buffer=2.0): + """ + 检测线段 p1-p2 是否与障碍物 obs 碰撞。 + 返回: (Boolean 是否碰撞, Float 障碍物高度) + """ + otype = obs['type'] + opos = obs['position'] + ox, oy = opos['x'], opos['y'] + obs_height = obs.get('height', 0) + + # 1. 圆形/点 + if otype in ['circle', 'point']: + r = obs.get('radius', 0) + limit = r + safety_buffer + # 使用平方距离比较,避免 sqrt + dist_sq = GeometryUtils.point_to_segment_dist_sq((ox, oy), p1, p2) + if dist_sq < limit**2: + return True, obs_height + + # 2. 椭圆 (使用坐标变换法 - Analytic Solution) + elif otype == 'ellipse': + w = obs.get('width', 0) + l = obs.get('length', 0) + + # 为了处理 Buffer,我们将 Buffer 加到轴长上 + # 这是一种工程近似:虽然真正的 Offset Curve 不是椭圆, + # 但 (w+buffer, l+buffer) 的椭圆是其包围盒,且计算非常快。 + semi_axis_x = w + safety_buffer + semi_axis_y = l + safety_buffer + + # 坐标变换:将世界坐标系的点变换到“单位圆空间” + # 1. 平移:以椭圆中心为原点 + # 2. 缩放:X 除以长轴,Y 除以短轴 + def to_unit_space(point): + dx = point[0] - ox + dy = point[1] - oy + return (dx / semi_axis_x, dy / semi_axis_y) + + u_p1 = to_unit_space(p1) + u_p2 = to_unit_space(p2) + + # 现在问题变成了:线段 u_p1 -> u_p2 是否与 单位圆 (Radius=1) 相交 + # 即:原点 (0,0) 到线段的距离是否 < 1.0 + dist_sq_in_unit_space = GeometryUtils.point_to_segment_dist_sq((0,0), u_p1, u_p2) + + if dist_sq_in_unit_space < 1.0: # 1.0^2 = 1.0 + return True, obs_height + + # 3. 多边形 + elif otype == 'polygon': + verts = obs['vertices'] + if not verts: return False, 0 + for i in range(len(verts)): + v1 = (verts[i]['x'], verts[i]['y']) + v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y']) + if GeometryUtils.segments_intersect(p1, p2, v1, v2): + return True, obs_height + if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts): + return True, obs_height + + return False, 0 + + # --------------------------------------------------------- + + try: + # 1. 解析参数 + params = json.loads(input_json) if isinstance(input_json, str) else input_json + drone_id = params.get('drone_id') + tx, ty, tz = params.get('x'), params.get('y'), params.get('z') + + if not drone_id or tx is None or ty is None or tz is None: + return "Error: drone_id, x, y, z are required" + + # 2. 获取状态 + status = client.get_drone_status(drone_id) + start_pos = status['position'] + sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z'] + drone_max_alt = status.get('max_altitude', 100.0) + all_obstacles = client.get_obstacles() + + # 3. 分类 + mandatory_avoid = [] + fly_over_candidates = [] + for obs in all_obstacles: + h = obs.get('height', 0) + if h == 0 or h >= drone_max_alt: + mandatory_avoid.append(obs) + else: + fly_over_candidates.append(obs) + + target_point = (tx, ty) + start_point = (sx, sy) + + # 4. 2D 路径规划 + + # 4.1 生成节点 + nodes = [start_point, target_point] + safety_margin = 5.0 + + for obs in mandatory_avoid: + opos = obs['position'] + ox, oy = opos['x'], opos['y'] + + if obs['type'] == 'polygon': + # 多边形节点策略 (保持原样) + center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices']) + center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices']) + for v in obs['vertices']: + vx, vy = v['x'], v['y'] + vec_len = math.hypot(vx - center_x, vy - center_y) + if vec_len > 0: + scale = (vec_len + safety_margin) / vec_len + nx = center_x + (vx - center_x) * scale + ny = center_y + (vy - center_y) * scale + nodes.append((nx, ny)) + + elif obs['type'] == 'ellipse': + # 椭圆节点生成:依然使用参数方程,因为要生成具体的绕行点坐标 + w = obs.get('width', 0) + l = obs.get('length', 0) + gen_w = w + safety_margin + gen_l = l + safety_margin + num_steps = max(8, int(max(w, l) / 3.0)) + for i in range(num_steps): + angle = i * (2 * math.pi / num_steps) + nx = ox + gen_w * math.cos(angle) + ny = oy + gen_l * math.sin(angle) + nodes.append((nx, ny)) + + elif obs['type'] in ['circle', 'point']: + r = obs.get('radius', 0) + gen_r = r + safety_margin + num_steps = max(8, int(r / 3.0)) + for i in range(num_steps): + angle = i * (2 * math.pi / num_steps) + nodes.append((ox + gen_r * math.cos(angle), oy + gen_r * math.sin(angle))) + + # 4.2 过滤非法节点 + valid_nodes = [] + for node in nodes: + is_bad = False + for obs in mandatory_avoid: + # 使用更新后的 check_collision (数学解) 进行点包含测试 + # 线段长度为0即为点 + collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=0.5) + if collided: + is_bad = True + break + if not is_bad: + valid_nodes.append(node) + + if start_point not in valid_nodes: valid_nodes.insert(0, start_point) + if target_point not in valid_nodes: valid_nodes.append(target_point) + start_idx = valid_nodes.index(start_point) + target_idx = valid_nodes.index(target_point) + + # 4.3 构建图 + adj = {i: [] for i in range(len(valid_nodes))} + for i in range(len(valid_nodes)): + for j in range(i + 1, len(valid_nodes)): + u, v = valid_nodes[i], valid_nodes[j] + + path_blocked = False + for obs in mandatory_avoid: + # 使用更新后的 check_collision (数学解) 进行线段检测 + hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0) + if hit: + path_blocked = True + break + + if not path_blocked: + dist = math.hypot(u[0]-v[0], u[1]-v[1]) + adj[i].append((j, dist)) + adj[j].append((i, dist)) + + # 4.4 Dijkstra + pq = [(0.0, start_idx, [valid_nodes[start_idx]])] + visited = set() + path_2d = [] + + while pq: + cost, u, path = heapq.heappop(pq) + if u == target_idx: + path_2d = path + break + if u in visited: continue + visited.add(u) + + for v_idx, w in adj[u]: + if v_idx not in visited: + heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]])) + + if not path_2d: + return json.dumps({"status": "Failed: No 2D path found."}) + + # 5. 高度计算 + safe_base_alt = max(sz, tz) + path_max_obs_height = 0.0 + for i in range(len(path_2d) - 1): + p1 = path_2d[i] + p2 = path_2d[i+1] + for obs in fly_over_candidates: + hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0) + if hit: + path_max_obs_height = max(path_max_obs_height, obs_h) + + if path_max_obs_height > 0: + cruise_alt = path_max_obs_height + 2.0 + else: + cruise_alt = safe_base_alt + + cruise_alt = max(cruise_alt, sz, tz) + if cruise_alt > drone_max_alt: + return json.dumps({"status": "Failed: Required altitude exceeds drone capability."}) + + # 6. 生成航点 + waypoints = [] + waypoints.append((sx, sy, sz)) + if cruise_alt > sz + 0.5: + waypoints.append((sx, sy, cruise_alt)) + for i in range(len(path_2d)): + node = path_2d[i] + if i == 0 and math.hypot(node[0]-sx, node[1]-sy) < 0.1: + continue + waypoints.append((node[0], node[1], cruise_alt)) + last_wp = waypoints[-1] + if abs(last_wp[2] - tz) > 0.5 or math.hypot(last_wp[0]-tx, last_wp[1]-ty) > 0.1: + waypoints.append((tx, ty, tz)) + + # 7. 执行 + final_msg = "Success" + for wp in waypoints: + if wp == waypoints[0] and len(waypoints) > 1: + continue + waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2]) + if waypoint_move_result["status"] == "error": + return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}" + final_msg = waypoint_move_result.get("message", "Success") + + return json.dumps({"status": "success", "path": waypoints, "message": final_msg}) + + except Exception as e: + return f"Error executing path finding: {str(e)}" + + @tool + def auto_explore(input_json: str) -> str: + """ + Automatically move the drone to achieve a specified coverage ratio with respect to a given target. + + Input should be a JSON string with: + - drone_id: The ID of the drone (required) + - target_id: The ID of the target (required) + - coverage: Target coverage ratio (required) + + Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}} + """ + import json + import math + + # ================= 1. 内部几何判定函数 ================= + def check_drone_in_target(drone_id: str, target_id: str): + # 获取数据 + drone = client.get_drone_status(drone_id) + target = client.get_target_status(target_id) + + # 提取无人机坐标 + d_pos = drone['position'] + d_x, d_y, d_z = d_pos['x'], d_pos['y'], d_pos['z'] + + # 提取目标坐标信息 + t_pos = target['position'] + t_z = t_pos['z'] + + # --- 判定条件 1: Z 坐标相同 ---(无需判断) + # if not math.isclose(d_z, t_z, abs_tol=0.1): + # return False, drone, target # 返回数据以便复用 + + # --- 判定条件 2: X、Y 坐标在平面内 --- + t_type = target['type'] + inside = False + + # 情况 A: 多边形 (Polygon) + if t_type == 'polygon': + vertices = target.get('vertices', []) + if not vertices: + return False, drone, target + + # 射线法 + j = len(vertices) - 1 + for i in range(len(vertices)): + xi, yi = vertices[i]['x'], vertices[i]['y'] + xj, yj = vertices[j]['x'], vertices[j]['y'] + + intersect = ((yi > d_y) != (yj > d_y)) and \ + (d_x < (xj - xi) * (d_y - yi) / (yj - yi + 1e-9) + xi) + if intersect: + inside = not inside + j = i + + # 情况 B: 圆形区域 + elif t_type in ['circle', 'waypoint', 'fixed']: + t_x, t_y = t_pos['x'], t_pos['y'] + radius = target.get('radius', 0.0) + distance = math.sqrt((d_x - t_x)**2 + (d_y - t_y)**2) + inside = distance <= radius + + return inside, drone, target + + # ================= 2. 辅助函数:点是否在目标内 ================= + def is_point_in_target(x, y, target_data): + t_type = target_data['type'] + if t_type == 'polygon': + vertices = target_data.get('vertices', []) + inside = False + j = len(vertices) - 1 + for i in range(len(vertices)): + xi, yi = vertices[i]['x'], vertices[i]['y'] + xj, yj = vertices[j]['x'], vertices[j]['y'] + intersect = ((yi > y) != (yj > y)) and \ + (x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi) + if intersect: + inside = not inside + j = i + return inside + else: # Circle based + t_x, t_y = target_data['position']['x'], target_data['position']['y'] + radius = target_data.get('radius', 0.0) + return math.sqrt((x - t_x)**2 + (y - t_y)**2) <= radius + + # ================= 3. 主逻辑 ================= + try: + data = json.loads(input_json) + drone_id = data['drone_id'] + # 注意:这里我们假设输入包含 target_id,因为单纯的 x,y,z 无法描述多边形形状 + # 如果必须使用 x,y,z 寻找 target,则需要额外的逻辑去匹配 target_id + target_id = data.get('target_id', 'unknown_target') + required_coverage = data.get('coverage', 0.95) + except Exception as e: + return f"Error parsing input: {str(e)}" + + # 3.1 初始位置检查 + is_inside, drone_data, target_data = check_drone_in_target(drone_id, target_id) + if not is_inside: + return f"Error: Drone {drone_id} is not inside target {target_id}. Please move inside first." + + # 3.2 路径规划准备 + task_radius = drone_data.get('task_radius', 10.0) + current_z = drone_data['position']['z'] + + # 计算 Bounding Box (边界框) + if target_data['type'] == 'polygon': + vx = [v['x'] for v in target_data['vertices']] + vy = [v['y'] for v in target_data['vertices']] + min_x, max_x = min(vx), max(vx) + min_y, max_y = min(vy), max(vy) + else: + tx, ty = target_data['position']['x'], target_data['position']['y'] + r = target_data['radius'] + min_x, max_x = tx - r, tx + r + min_y, max_y = ty - r, ty + r + + # 3.3 网格化路径生成 (Grid Decomposition) + # 步长设定:为了保证覆盖率,步长通常设为 task_radius 的 √2 倍或更小 + # 这里设为 task_radius 确保每个网格点代表的圆都有重叠,保证无缝隙 + step_size = task_radius * 1.414 + + valid_waypoints = [] # 存储所有位于目标内部的网格点 + + # 扫描 Bounding Box + x_cursor = min_x + while x_cursor <= max_x: + y_cursor = min_y + col_points = [] + while y_cursor <= max_y: + # 只有当网格点在目标几何体内时,才加入路径 + if is_point_in_target(x_cursor, y_cursor, target_data): + col_points.append({'x': x_cursor, 'y': y_cursor}) + y_cursor += step_size + + # 蛇形排序 (Boustrophedon): 偶数列正向,奇数列反向 + # 这样可以最小化无人机换列时的飞行距离 + if col_points: + # 根据当前的列数决定方向,这里用 valid_waypoints 已有长度估算列是不准的 + # 简单做法:利用 x_cursor 的归一化索引,或者直接交替 append + pass # 后续统一处理 + + valid_waypoints.append(col_points) + x_cursor += step_size + + # 展平并执行蛇形排序 + final_path = [] + for i, col in enumerate(valid_waypoints): + if i % 2 == 1: + final_path.extend(reversed(col)) + else: + final_path.extend(col) + + total_points = len(final_path) + if total_points == 0: + return "Error: Target area is too small or invalid geometry found." + + # 3.4 执行探索 + + for idx, wp in enumerate(final_path): + if idx < tool_states.explored_count: + continue + # 移动无人机 + client.move_to(drone_id, wp['x'], wp['y'], current_z) + + tool_states.explored_count += 1 + current_coverage = tool_states.explored_count / total_points + + # 检查是否达标 + if current_coverage >= required_coverage: + tool_states.explored_count = 0 + return f"Success: Target explored with coverage {current_coverage:.2%} (Visited {tool_states.explored_count}/{total_points} grid points)" + + return f"Finished path. Final coverage: {current_coverage:.2%}" + + + # drone = client.get_drone_status(drone_id) + # {"id":"c1a5bf4b","name":"Drone 1","model":"Model-A","status":"hovering","position":{"x":613.0,"y":437.0,"z":24.0},"heading":90.0,"speed":0.0,"battery_level":33.48548886660462,"max_speed":16.0,"max_altitude":362.0,"battery_capacity":93.4,"perceived_radius":100.0,"task_radius":10.0,"home_position":{"x":377.0,"y":268.0,"z":0.0},"created_at":1766328129.6861398,"last_updated":1769226225.5813532} + # target = client.get_target_status(target_id) + # {"id":"1122ecf0","name":"Circle Target 1","type":"circle","position":{"x":463.0,"y":312.0,"z":0.0},"description":"","radius":200.0,"created_at":1766328129.676041,"last_updated":1769224819.8737159,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"]} + # {"id":"d6816dd4","name":"Polygon Target 1","type":"polygon","position":{"x":911.0,"y":552.0,"z":0.0},"description":"","radius":1.0,"created_at":1766328129.677602,"last_updated":1769225798.403095,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"],"vertices":[{"x":831.08,"y":471.74,"z":0.0},{"x":991.08,"y":471.74,"z":0.0},{"x":991.08,"y":631.74,"z":0.0},{"x":831.08,"y":631.74,"z":0.0}]} + # {"id":"81f45d4c","name":"BS_01 PLA West","type":"waypoint","position":{"x":1065.0,"y":1200.0,"z":0.0},"description":"","radius":10.0,"created_at":1767770673.5973494,"last_updated":1768203787.0783787,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":30.0,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]} + # {"id":"05c89883","name":"Hong Kong International Airport Restricted Zone","type":"fixed","position":{"x":1900.0,"y":200.0,"z":0.0},"description":"No-fly zone due to airport operations. Drones must stay clear.","radius":150.0,"created_at":1767767958.9605737,"last_updated":1768204251.032104,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]} + # return drone['position'] == target['position'] + + # Return all tools return [ list_drones, @@ -1263,6 +1761,7 @@ def create_uav_tools(client: UAVAPIClient) -> list: # get_session_data, get_task_progress, get_weather, + auto_explore, # get_targets, get_obstacles, get_nearby_entities,