llm version upgraded to 0905

uav api bug fixed in updating environments
This commit is contained in:
abnerhexu
2026-01-26 22:11:27 +08:00
parent 6c29be2fcb
commit e32c782909
3 changed files with 87 additions and 32 deletions

View File

@@ -42,9 +42,9 @@
"requires_api_key": true,
"api_key": "sk-2gCgINOEErD1ctdxIB7ALIPnHboZPrQRj1hvVJtEydT1JbXv",
"encoding": "utf-8",
"default_model": "kimi-k2-0711-preview",
"default_model": "kimi-k2-0905-preview",
"default_models": [
"kimi-k2-0711-preview"
"kimi-k2-0905-preview"
],
"allow_endpoint_edit": true,
"allow_api_toggle": true,

View File

@@ -311,14 +311,14 @@ class UAVAPIClient:
"""Get current weather conditions"""
return self._request('GET', '/environments/current')
def get_targets(self) -> List[Dict[str, Any]]:
"""Get all targets in the session"""
fixed = self._request('GET', '/targets/type/fixed')
moving = self._request('GET', '/targets/type/moving')
waypoint = self._request('GET', '/targets/type/waypoint')
circle = self._request('GET', '/targets/type/circle')
polygen = self._request('GET', '/targets/type/polygon')
return fixed + moving + waypoint + circle + polygen
# def get_targets(self) -> List[Dict[str, Any]]:
# """Get all targets in the session"""
# fixed = self._request('GET', '/targets/type/fixed')
# moving = self._request('GET', '/targets/type/moving')
# waypoint = self._request('GET', '/targets/type/waypoint')
# circle = self._request('GET', '/targets/type/circle')
# polygen = self._request('GET', '/targets/type/polygon')
# return fixed + moving + waypoint + circle + polygen
def get_target_status(self, target_id: str) -> Dict[str, Any]:
"""Get information about a specific target"""
@@ -333,13 +333,13 @@ class UAVAPIClient:
return self._request('GET', '/targets/waypoints/nearest',
json={'x': x, 'y': y, 'z': z})
def get_obstacles(self) -> List[Dict[str, Any]]:
"""Get all obstacles in the session"""
point = self._request('GET', '/obstacles/type/point')
circle = self._request('GET', '/obstacles/type/circle')
polygon = self._request('GET', '/obstacles/type/polygon')
ellipse = self._request('GET', '/obstacles/type/ellipse')
return point + circle + polygon + ellipse
# def get_obstacles(self) -> List[Dict[str, Any]]:
# """Get all obstacles in the session"""
# point = self._request('GET', '/obstacles/type/point')
# circle = self._request('GET', '/obstacles/type/circle')
# polygon = self._request('GET', '/obstacles/type/polygon')
# ellipse = self._request('GET', '/obstacles/type/ellipse')
# return point + circle + polygon + ellipse
def get_nearby_entities(self, drone_id: str) -> Dict[str, Any]:
"""Get entities near a drone (within perceived radius)"""

View File

@@ -260,6 +260,30 @@ def create_uav_tools(client: UAVAPIClient) -> list:
return json.dumps(drones, indent=2)
except Exception as e:
return f"Error listing drones: {str(e)}"
def env_perception() -> bool:
global obstacles_detected
drones_data = client.list_drones()
drone_ids = [drone["id"] for drone in drones_data]
flag = False
for drone_id in drone_ids:
nearby = client.get_nearby_entities(drone_id)
# Update explored targets
_copied_targets_expolred = [x.id for x in targets_expolred]
if 'targets' in nearby:
for target_data in nearby['targets']:
targets_expolred.add(TargetInfo(target_data))
if target_data['id'] not in _copied_targets_expolred:
flag = True
# Update detected obstacles
_copied_obstacles_detected = [x.id for x in obstacles_detected]
if 'obstacles' in nearby:
for obstacle_data in nearby['obstacles']:
obstacles_detected.add(ObstacleInfo(obstacle_data))
if obstacle_data['id'] not in _copied_obstacles_detected:
flag = True
return flag
@tool
def get_session_info() -> str:
@@ -269,6 +293,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
No input required."""
try:
session = client.get_current_session()
env_changed = env_perception()
if env_changed:
session["tips"] = "Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information."
return json.dumps(session, indent=2)
except Exception as e:
return f"Error getting session info: {str(e)}"
@@ -327,10 +354,11 @@ def create_uav_tools(client: UAVAPIClient) -> list:
This returns targets from the agent's memory.
No input required."""
global targets_expolred
try:
global targets_expolred
env_changed = env_perception()
targets_list = [target.to_dict() for target in targets_expolred]
print(len(targets_list))
print([tgt.name for tgt in targets_expolred])
return json.dumps(targets_list, indent=2)
except Exception as e:
return f"Error getting explored targets: {str(e)}"
@@ -365,10 +393,11 @@ def create_uav_tools(client: UAVAPIClient) -> list:
This returns obstacles from the agent's memory.
No input required."""
global obstacles_detected
try:
global obstacles_detected
env_changed = env_perception()
obstacles_list = [obstacle.to_dict() for obstacle in obstacles_detected]
print(len(obstacles_list))
print([obs.name for obs in obstacles_detected])
return json.dumps(obstacles_list, indent=2)
except Exception as e:
return f"Error getting detected obstacles: {str(e)}"
@@ -400,8 +429,8 @@ def create_uav_tools(client: UAVAPIClient) -> list:
def get_nearby_entities(input_json: str) -> str:
"""Get drones, targets, and obstacles near a specific drone (within its perception radius).
This also updates the internal sets of explored targets and detected obstacles."""
global targets_expolred, obstacles_detected
try:
global targets_expolred, obstacles_detected
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
@@ -740,6 +769,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
@tool
def move_towards(input_json: str) -> str:
"""Move a drone a specific distance in a direction.
Tips: Prefer using `auto_navigate_towards`. If it fails, use this tool and dynamically plan the destination.
Input should be a JSON string with:
- drone_id: The ID of the drone (required, get from Action list_drones)
@@ -763,7 +793,11 @@ def create_uav_tools(client: UAVAPIClient) -> list:
result = client.move_towards(drone_id, distance, heading, dz)
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) Try other tools like `move_to` or `auto_navigate_to`"
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) Try other tools like `move_to` or `auto_navigate_to` (4) Do not move so far since the perception range is 150m."
else:
env_changed = env_perception()
if env_changed:
result["tips"] = "Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information."
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"distance\": 10.0}}"
@@ -790,7 +824,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# 获取状态
drone_state = client.get_drone_status(drone_id)
obstacles = client.get_obstacles()
obstacles = [x.to_dict() for x in obstacles_detected]
start_x = drone_state['position']['x']
start_y = drone_state['position']['y']
start_z = drone_state['position']['z']
@@ -810,7 +844,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# --- 核心修复 1: 更好的步长控制 ---
current_distance = min_distance
step_size = 5.0 # 减小步长以获得更精细的落点
max_iterations = 500
max_iterations = 750
safety_margin = 2.5 # 稍微加大缓冲,确保落点绝对安全
height_buffer = 5.0
@@ -871,7 +905,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
})
# return f"Finding a destination successfully (dist={current_distance}m)... Now use `auto_navigate_to({nav_payload})` to move the drone. This tool safely brings drone to the destination and detour obstacles. First try it with exactly input {nav_payload}, if it fails, then adjust the positions."
return auto_navigate_to_non_tool(nav_payload)
return auto_navigate_to_non_tool(nav_payload, move_towards=True, move_towards_task=(direction, min_distance))
except Exception as e:
return f"Error executing auto_navigate_towards: {str(e)}"
@@ -935,6 +969,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
def move_to(input_json: str) -> str:
"""Move a drone to specific 3D coordinates (x, y, z).
Always check for collisions first using check_path_collision.
Tips: Prefer using `auto_navigate_to`. If it fails, use this tool and dynamically plan the destination.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
@@ -957,8 +992,12 @@ def create_uav_tools(client: UAVAPIClient) -> list:
return "Error: x, y, and z coordinates are required"
result = client.move_to(drone_id, x, y, z)
env_changed = env_perception()
if env_changed:
result["tips"] = "Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information."
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) You can use `get_obstacles` tool to see the obstacle, and pay attention to the obstacle\'s shape and size. (4) You may try `auto_navigate_to` tool to detour if it works."
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) You can use `get_obstacles` tool to see the obstacle, and pay attention to the obstacle\'s shape and size. (4) You may try `auto_navigate_to` tool to detour if it works. (5) Do not move so far since the perception range is 150m."
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}"
@@ -982,7 +1021,8 @@ def create_uav_tools(client: UAVAPIClient) -> list:
result = auto_navigate_to_non_tool(input_json)
return result
def auto_navigate_to_non_tool(input_json: str) -> str:
def auto_navigate_to_non_tool(input_json: str, move_towards_flag: bool = False, move_towards_distance: float = 0.0, move_towards_direction: int = 0) -> str:
global obstacles_detected
try:
# 1. 解析参数
params = json.loads(input_json) if isinstance(input_json, str) else input_json
@@ -997,7 +1037,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
start_pos = status['position']
sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z']
drone_max_alt = status.get('max_altitude', 100.0)
all_obstacles = client.get_obstacles()
all_obstacles = [obs.to_dict() for obs in obstacles_detected]
# 3. 分类
mandatory_avoid = []
@@ -1170,10 +1210,25 @@ def create_uav_tools(client: UAVAPIClient) -> list:
if wp == waypoints[0] and len(waypoints) > 1:
continue
waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2])
env_changed = env_perception()
if env_changed:
waypoint_move_result["tips"] = "Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information."
if waypoint_move_result["status"] == "error":
return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}"
error_str = f"Error automatically navigate to waypoint {wp}: {waypoint_move_result['message']}"
if env_changed:
error_str += f" (Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information.) If fails too many times of automatic navigation, consider dynamically plan a way."
else:
error_str += f" Try not move so far."
return error_str
final_msg = waypoint_move_result.get("message", "Success")
if len(waypoints) > 1 and move_towards_flag:
client.move_towards(drone_id, move_towards_distance, move_towards_direction)
env_changed = env_perception()
if env_changed:
additional_info = "Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information."
return json.dumps({"status": "success", "path": waypoints, "message": final_msg, "tips": additional_info})
return json.dumps({"status": "success", "path": waypoints, "message": final_msg})
except Exception as e:
@@ -1390,7 +1445,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
get_drone_status,
get_session_info,
# get_session_data,
get_task_progress,
# get_task_progress,
get_weather,
auto_explore,
# get_targets,