first commit

This commit is contained in:
abnerhexu
2026-01-12 09:56:56 +08:00
commit 687e20c96a
10 changed files with 3155 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__/*
.DS_Store
*/__pycache__/*

54
llm_settings.json Normal file
View File

@@ -0,0 +1,54 @@
{
"selected_provider": "Kimi",
"provider_configs": {
"Ollama": {
"type": "ollama",
"base_url": "http://localhost:11434",
"models_endpoint": "/api/tags",
"chat_endpoint": "/api/chat",
"requires_api_key": false,
"api_key": "",
"encoding": "utf-8",
"default_model": "gpt-oss",
"default_models": [],
"allow_endpoint_edit": false,
"allow_api_toggle": false,
"system_prompt": ""
},
"OpenAI": {
"type": "openai-compatible",
"base_url": "https://api.openai.com/v1",
"models_endpoint": "/models",
"chat_endpoint": "/chat/completions",
"requires_api_key": true,
"api_key": "",
"encoding": "utf-8",
"default_model": "gpt-4o-mini",
"default_models": [
"gpt-4o-mini",
"gpt-4o",
"gpt-4.1-mini",
"gpt-3.5-turbo"
],
"allow_endpoint_edit": true,
"allow_api_toggle": true,
"system_prompt": ""
},
"Kimi": {
"type": "openai-compatible",
"base_url": "https://api.moonshot.cn/v1",
"models_endpoint": "/v1/models",
"chat_endpoint": "/v1/chat/completions",
"requires_api_key": true,
"api_key": "sk-2gCgINOEErD1ctdxIB7ALIPnHboZPrQRj1hvVJtEydT1JbXv",
"encoding": "utf-8",
"default_model": "kimi-k2-0711-preview",
"default_models": [
"kimi-k2-0711-preview"
],
"allow_endpoint_edit": true,
"allow_api_toggle": true,
"system_prompt": "You are a very friendly drone control agent. No matter what language I use to give you instructions, please call the tools to perform the task and then reply in English."
}
}
}

1153
main.py Normal file

File diff suppressed because it is too large Load Diff

10
template/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
"""
UAV Agent Templates
This package contains prompt templates for the UAV control agent.
"""
from .agent_prompt import AGENT_PROMPT
from .parsing_error import PARSING_ERROR_TEMPLATE
__all__ = ["AGENT_PROMPT", "PARSING_ERROR_TEMPLATE"]

92
template/agent_prompt.py Normal file
View File

@@ -0,0 +1,92 @@
"""
UAV Agent Prompt Template
This template defines the system prompt for the UAV control agent.
It provides guidelines, safety rules, task types, and response format instructions.
"""
AGENT_PROMPT = """You are an intelligent UAV (drone) control agent. Your job is to understand user intentions and control drones safely and efficiently.
IMPORTANT GUIDELINES:
0. ALWAYS Respond [TASK DONE] as a signal of finish task at the end of response.
1. ALWAYS check the current session status first to understand the mission task
2. ALWAYS list available drones before attempting to control them
3. ALWAYS check nearby entities of a drone before you control it, there are lot of obstacles.
4. Check weather conditions regularly - the weather will influence the battery usage
5. Be proactive in gathering information of obstacles and targets, by using nearby entities functions
6. Remember the information of obstacles and targets, because they are not always available
7. When visiting targets, get close enough within task_radius
9. Land drones safely when tasks are complete or battery is low
10. Monitor battery levels - if below 10%, consider charging before continuing
SAFETY RULES:
- If you can not directly move the drone to a position, find a mediam waypoint to get there first, and then cosider the destination, repeat the process, until you can move directly to the destination.
- Always verify drone status and nearby entities before commands
AVAILABLE TOOLS:
You have access to these tools to accomplish your tasks: {tool_names}
{tools}
RESPONSE FORMAT:
Use this exact format for your responses:
Question: the input question or command you must respond to
Thought: analyze what you need to do and what information you need
Action: the specific tool to use from the list above
Action Input: the input parameters for the tool (use proper JSON format)
Observation: the result from running the tool
... (repeat Thought/Action/Action Input/Observation as needed)
Thought: I now have enough information to provide a final answer
Final Answer: a clear, concise answer to the original question
ACTION INPUT FORMAT RULES:
1. For tools with NO parameters (like list_drones, get_session_info):
Action Input: {{}}
2. For tools with ONE string parameter (like get_drone_status):
Action Input: {{"drone_id": "drone-abc123"}}
3. For tools with MULTIPLE parameters (like move_to):
Action Input: {{"drone_id": "drone-abc123", "x": 100.0, "y": 50.0, "z": 20.0}}
CRITICAL:
- ALWAYS use proper JSON format with double quotes for keys and string values
- ALWAYS use curly braces for Action Input
- For tools with no parameters, use empty braces
- Numbers should NOT have quotes
- Strings MUST have quotes
EXAMPLES:
Question: What drones are available?
Thought: I need to list all drones to see what's available
Action: list_drones
Action Input: {{}}
Observation: [result will be returned here]
Question: Take off drone-001 to 15 meters
Thought: I need to take off the drone to the specified altitude
Action: take_off
Action Input: {{"drone_id": "drone-001", "altitude": 15.0}}
Observation: Drone took off successfully
Question: Move drone-001 to position x=100, y=50, z=20
Thought: I need to move the drone to the specified coordinates
Action: move_to
Action Input: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
Observation: Drone moved successfully
Tips for you to finish task in the most efficient way:
1. For a certain task, use the task-related API, do not get_session_info too many times.
2. If you want to to get the position of a target, use GET /targets API. If you want to get the position of an obstacle, use GET /obstacles API. DO NOT USE get_nearby_entities!
3. Move directly as much as possible. For example, when the task is moving from A to B via C, first try to Collision Detection between A to C, if there's no collision, then move directly to C, otherwise, detour.
4. Getting entities nearby do not always effective. You have only limited sensor range. Using /targets API to get targets and /obstacles API to get obstacles is more effective.
5. If battery is below 30, find the nerest waypoint, go there and land, then charge to 100.
6. Reaching to a higher latitude can help you see targets, but do not exceed the drone's limit.
Begin!
Question: {input}
Thought:{agent_scratchpad}"""

18
template/parsing_error.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Parsing Error Template
This template defines the error message shown to the LLM when it produces
invalid JSON in the Action Input field.
"""
PARSING_ERROR_TEMPLATE = """Parsing error: {error}
REMINDER - Action Input must be valid JSON:
- Use double quotes for keys and string values
- Use curly braces: {{}}
- For no parameters: {{}}
- For one parameter: {{"drone_id": "drone-001"}}
- For multiple parameters: {{"drone_id": "drone-001", "altitude": 15.0}}
- Numbers WITHOUT quotes, strings WITH quotes
Please try again with proper JSON format."""

140
tp Normal file
View File

@@ -0,0 +1,140 @@
These are all the APIs you can use.
Drone Management
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/drones` | List all drones |
| POST | `/drones` | Register new drone |
| GET | `/drones/{{id}}` | Get drone details |
| PUT | `/drones/{{id}}` | Update drone properties (metadata, state, battery, position, home) |
| PUT | `/drones/{{id}}/position` | Update drone position only |
| DELETE | `/drones/{{id}}` | Delete drone |
| POST | `/drones/{{id}}/battery` | Update battery level |
Command Management
Generic Command Endpoint
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/drones/{{id}}/command` | Send any command |
| GET | `/drones/{{id}}/commands` | Get command history |
| GET | `/commands/{{command_id}}` | Get command status |
Direct Command Endpoints
All commands use **POST** method with `/drones/{{id}}/command/{{command_name}}`
Charge can only done at Waypoint target.
| Command | Parameters | Description |
|---------|-----------|-------------|
| `take_off` | `?altitude=10.0` | Takeoff to altitude |
| `land` | - | Land at position |
| `move_to` | `?x=50&y=50&z=15` | Move to coordinates |
| `move_towards` | `?distance=20&heading=90` | Move distance in direction (uses current heading if not specified) |
| `move_along_path` | Body: `{{waypoints:[...]}}` | Follow waypoints |
| `change_altitude` | `?altitude=20.0` | Change altitude only |
| `hover` | `duration` (optional) | Hold position |
| `rotate` | `?heading=180.0` | Change heading/orientation |
| `return_home` | - | Return to launch |
| `set_home` | - | Set home position |
| `calibrate` | - | Calibrate sensors |
| `take_photo` | - | Capture image |
| `send_message` | `?target_drone_id=X&message=Y` | Send to drone |
| `broadcast` | `?message=text` | Send to all |
| `charge` | `?charge_amount=30.0` | Charge battery |
Target Management
Type of target we have fixed, moving, waypoint, circle, polygon. - Note: fixed type can also represent points of interest
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/targets` | List all targets |
| GET | `/targets/{{id}}` | Get target details |
| GET | `/targets/type/{{type}}` | Get by type |
Waypoint Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/targets/waypoints` | List charging stations |
| POST | `/targets/waypoints/{{id}}/check-drone` | Check if drone at waypoint |
| GET | `/targets/waypoints/nearest` | Find nearest waypoint |
Obstacle Management
Type of obstacle we hvave point, circle, ellipse, polygon.
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/obstacles` | List all obstacles |
| GET | `/obstacles/{{id}}` | Get obstacle |
| GET | `/obstacles/type/{{type}}` | Get by type |
Collision Detection
**Endpoint:** `POST /obstacles/path_collision`
**Authentication:** Requires SYSTEM role (ADMIN inherits)
**Description:** Checks if a flight path from start to end collides with any obstacles. Returns the **first** obstacle that collides with the path.
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| start | object | Yes | Start point {{x, y, z}} |
| end | object | Yes | End point {{x, y, z}} |
| safety_margin | float | No | Additional clearance distance (in meters) around the flight path (default: 0.0). Creates a corridor with specified width on each side. Use 0.0 for direct line path, or > 0.0 for safety corridor (e.g., 5.0 creates a 10m-wide corridor). Note: Drone movement commands use 0.0 by default |
**Height Logic:**
- `height = 0`: Impassable at any altitude
- `height > 0`: Collision only if max flight altitude <= obstacle.height
**Response:** Collision response object or null if no collision
**Example Request:**
```bash
curl -X POST http://localhost:8000/obstacles/path_collision \\
-H "Content-Type: application/json" \\
-H "X-API-Key: system_secret_key_change_in_production" \\
-d '{{
"start": {{"x": 0.0, "y": 0.0, "z": 10.0}},
"end": {{"x": 200.0, "y": 300.0, "z": 10.0}},
"safety_margin": 2.0
}}'
```
**Example Response (Collision):**
```json
{{
"obstacle_id": "550e8400-e29b-41d4-a716-446655440001",
"obstacle_name": "Water Tower",
"obstacle_type": "circle",
"collision_type": "path_intersection",
"distance": 5.0
}}
```
**Example Response (No Collision):**
```json
null
```
| Method | Endpoint | Description | Auth |
|--------|----------|-------------|------|
| POST | `/obstacles/path_collision` | Check if flight path collides with obstacles | SYSTEM |
| POST | `/obstacles/point_collision` | Check if point is inside any obstacles (returns all matches) | SYSTEM |
Proximity
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/drones/{{id}}/nearby` | Aggregated nearby drones, targets, obstacles (uses drone's perceived_radius) |
| GET | `/drones/{{id}}/nearby/drones` | Nearby drones (uses drone's perceived_radius) |
| GET | `/drones/{{id}}/nearby/targets` | Nearby targets (uses drone's perceived_radius) |
| GET | `/drones/{{id}}/nearby/obstacles` | Nearby obstacles (uses drone's perceived_radius) |
All proximity endpoints use the drone's `perceived_radius` to determine the search area.

671
uav_agent.py Normal file
View File

@@ -0,0 +1,671 @@
"""
UAV Control Agent
An intelligent agent that understands natural language commands and controls drones using the UAV API
Uses LangChain 1.0+ with modern @tool decorator pattern
"""
from langchain_classic.agents import create_react_agent
from langchain_classic.agents import AgentExecutor
from langchain_classic.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from uav_api_client import UAVAPIClient
from uav_langchain_tools import create_uav_tools
from template.agent_prompt import AGENT_PROMPT
from template.parsing_error import PARSING_ERROR_TEMPLATE
from typing import Optional, Dict, Any
import json
import os
from pathlib import Path
def load_llm_settings(settings_path: str = "llm_settings.json") -> Optional[Dict[str, Any]]:
"""Load LLM settings from JSON file"""
try:
path = Path(settings_path)
if path.exists():
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Warning: Could not load LLM settings from {settings_path}: {e}")
return None
def prompt_user_for_llm_config() -> Dict[str, Any]:
"""Prompt user to select LLM provider and model"""
settings = load_llm_settings()
if not settings or 'provider_configs' not in settings:
print("⚠️ No llm_settings.json found or invalid format. Using command line arguments.")
return {}
provider_configs = settings['provider_configs']
selected_provider = settings.get('selected_provider', '')
print("\n" + "="*60)
print("🤖 LLM Provider Configuration")
print("="*60)
# Show available providers
providers = list(provider_configs.keys())
print("\nAvailable providers:")
for i, provider in enumerate(providers, 1):
config = provider_configs[provider]
default_marker = " (selected in settings)" if provider == selected_provider else ""
print(f" {i}. {provider}{default_marker}")
print(f" Type: {config.get('type', 'unknown')}")
print(f" Base URL: {config.get('base_url', 'N/A')}")
print(f" Requires API Key: {config.get('requires_api_key', False)}")
# Prompt for provider selection
print(f"\nSelect a provider (1-{len(providers)}) [default: {selected_provider or providers[0]}]: ", end='')
provider_choice = input().strip()
if not provider_choice:
# Use default
if selected_provider and selected_provider in providers:
chosen_provider = selected_provider
else:
chosen_provider = providers[0]
else:
try:
idx = int(provider_choice) - 1
if 0 <= idx < len(providers):
chosen_provider = providers[idx]
else:
print(f"Invalid choice. Using default: {selected_provider or providers[0]}")
chosen_provider = selected_provider or providers[0]
except ValueError:
print(f"Invalid input. Using default: {selected_provider or providers[0]}")
chosen_provider = selected_provider or providers[0]
config = provider_configs[chosen_provider]
print(f"\n✅ Selected provider: {chosen_provider}")
# Show available models
default_models = config.get('default_models', [])
default_model = config.get('default_model', '')
if default_models:
print("\nAvailable models:")
for i, model in enumerate(default_models, 1):
default_marker = " (default)" if model == default_model else ""
print(f" {i}. {model}{default_marker}")
print(f" {len(default_models) + 1}. Custom model (enter manually)")
print(f"\nSelect a model (1-{len(default_models) + 1}) [default: {default_model}]: ", end='')
model_choice = input().strip()
if not model_choice:
chosen_model = default_model
else:
try:
idx = int(model_choice) - 1
if 0 <= idx < len(default_models):
chosen_model = default_models[idx]
elif idx == len(default_models):
# Custom model
print("Enter custom model name: ", end='')
chosen_model = input().strip() or default_model
else:
print(f"Invalid choice. Using default: {default_model}")
chosen_model = default_model
except ValueError:
print(f"Invalid input. Using default: {default_model}")
chosen_model = default_model
else:
# No predefined models, ask for custom input
print(f"\nEnter model name [default: {default_model}]: ", end='')
chosen_model = input().strip() or default_model
print(f"✅ Selected model: {chosen_model}")
# Determine provider type
provider_type = config.get('type', 'ollama')
if provider_type == 'openai-compatible':
if 'api.openai.com' in config.get('base_url', ''):
llm_provider = 'openai'
else:
llm_provider = 'openai-compatible'
else:
llm_provider = provider_type
# Get API key if required
api_key = config.get('api_key', '').strip()
if config.get('requires_api_key', False) and not api_key:
print("\n⚠️ This provider requires an API key.")
print("Enter API key (or press Enter to use environment variable): ", end='')
api_key = input().strip()
result = {
'llm_provider': llm_provider,
'llm_model': chosen_model,
'llm_base_url': config.get('base_url'),
'llm_api_key': api_key if api_key else None,
'provider_name': chosen_provider
}
print("\n" + "="*60)
print("✅ Configuration complete!")
print("="*60)
print(f"Provider: {chosen_provider}")
print(f"Type: {llm_provider}")
print(f"Model: {chosen_model}")
print(f"Base URL: {config.get('base_url')}")
if api_key:
print(f"API Key: {'*' * (len(api_key) - 4) + api_key[-4:] if len(api_key) > 4 else '****'}")
print("="*60 + "\n")
return result
class UAVControlAgent:
"""Intelligent agent for controlling UAVs using natural language"""
def __init__(
self,
base_url: str = "http://localhost:8000",
uav_api_key: Optional[str] = None,
llm_provider: str = "ollama",
llm_model: str = "llama2",
llm_api_key: Optional[str] = None,
llm_base_url: Optional[str] = None,
temperature: float = 0.1,
verbose: bool = True,
debug: bool = False
):
"""
Initialize the UAV Control Agent
Args:
base_url: Base URL of the UAV API server
uav_api_key: API key for UAV server authentication (None = USER role, or provide SYSTEM/ADMIN key)
llm_provider: LLM provider ('ollama', 'openai', 'openai-compatible')
llm_model: Model name (e.g., 'llama2', 'gpt-4o-mini', 'deepseek-chat')
llm_api_key: API key for LLM provider (required for openai/openai-compatible)
llm_base_url: Custom base URL for LLM API (for openai-compatible providers)
temperature: LLM temperature (lower = more deterministic)
verbose: Enable verbose output for agent reasoning
debug: Enable debug output for connection and setup info
"""
self.client = UAVAPIClient(base_url, api_key=uav_api_key)
self.verbose = verbose
self.debug = debug
if self.debug:
print("\n" + "="*60)
print("🔧 UAV Agent Initialization - Debug Mode")
print("="*60)
print(f"UAV API Server: {base_url}")
print(f"LLM Provider: {llm_provider}")
print(f"LLM Model: {llm_model}")
print(f"Temperature: {temperature}")
print(f"Verbose: {verbose}")
print()
# Test UAV API connection
if self.debug:
print("🔌 Testing UAV API connection...")
try:
session = self.client.get_current_session()
if self.debug:
print(f"✅ Connected to UAV API")
print(f" Session: {session.get('name', 'Unknown')}")
print(f" Task: {session.get('task', 'Unknown')}")
print()
except Exception as e:
if self.debug:
print(f"⚠️ Warning: Could not connect to UAV API: {e}")
print(f" Make sure the UAV server is running at {base_url}")
print()
# Initialize LLM based on provider
if self.debug:
print(f"🤖 Initializing LLM provider: {llm_provider}")
if llm_provider == "ollama":
if self.debug:
print(f" Using Ollama with model: {llm_model}")
print(f" Ollama URL: http://localhost:11434 (default)")
self.llm = ChatOllama(
model=llm_model,
temperature=temperature
)
if self.debug:
print(f"✅ Ollama LLM initialized")
print()
elif llm_provider in ["openai", "openai-compatible"]:
if not llm_api_key:
raise ValueError(f"API key is required for {llm_provider} provider. Use --llm-api-key or set environment variable.")
# Determine base URL
if llm_provider == "openai":
final_base_url = llm_base_url or "https://api.openai.com/v1"
provider_name = "OpenAI"
else:
if not llm_base_url:
raise ValueError("llm_base_url is required for openai-compatible provider")
final_base_url = llm_base_url
provider_name = "OpenAI-Compatible API"
if self.debug:
print(f" Provider: {provider_name}")
print(f" Base URL: {final_base_url}")
print(f" Model: {llm_model}")
print(f" API Key: {'*' * (len(llm_api_key) - 4) + llm_api_key[-4:] if len(llm_api_key) > 4 else '****'}")
# Create LLM instance
kwargs = {
"model": llm_model,
"temperature": temperature,
"api_key": llm_api_key,
"base_url": final_base_url
}
self.llm = ChatOpenAI(**kwargs)
if self.debug:
print(f"{provider_name} LLM initialized")
print()
else:
raise ValueError(
f"Unknown LLM provider: {llm_provider}. "
f"Use 'ollama', 'openai', or 'openai-compatible'"
)
# Create tools using the new @tool decorator approach
if self.debug:
print("🔧 Creating UAV control tools...")
self.tools = create_uav_tools(self.client)
if self.debug:
print(f"✅ Created {len(self.tools)} tools")
print(f" Tools: {', '.join([tool.name for tool in self.tools[:5]])}...")
print()
# Create prompt template
if self.debug:
print("📝 Creating agent prompt template...")
self.prompt = self._create_prompt()
if self.debug:
print("✅ Prompt template created")
print()
# Create ReAct agent
if self.debug:
print("🤖 Creating ReAct agent...")
self.agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=self.prompt
)
if self.debug:
print("✅ ReAct agent created")
print()
# Create agent executor with improved error handling
if self.debug:
print("⚙️ Creating agent executor...")
print(f" Max iterations: 20")
print(f" Verbose mode: {verbose}")
# Custom error handler to help LLM fix formatting issues
def handle_parsing_error(error) -> str:
"""Provide helpful feedback when Action Input parsing fails"""
return PARSING_ERROR_TEMPLATE.format(error=str(error))
self.agent_executor = AgentExecutor(
agent=self.agent,
tools=self.tools,
verbose=verbose,
handle_parsing_errors=handle_parsing_error,
max_iterations=50, # Increased for complex tasks
return_intermediate_steps=True,
early_stopping_method="generate" # Better handling of completion
)
if self.debug:
print("✅ Agent executor created")
print()
# Session context
if self.debug:
print("🔄 Refreshing session context...")
self.session_context = {}
self.refresh_session_context()
if self.debug:
print("="*60)
print("✅ UAV Agent Initialization Complete!")
print("="*60)
print()
def _create_prompt(self) -> PromptTemplate:
"""Create the agent prompt template"""
prompt_template = PromptTemplate(
template=AGENT_PROMPT,
input_variables=["input", "agent_scratchpad"],
partial_variables={
"tools": "\n".join([
f"- {tool.name}: {tool.description}"
for tool in self.tools
]),
"tool_names": ", ".join([tool.name for tool in self.tools])
}
)
return prompt_template
def refresh_session_context(self):
"""Refresh session context information"""
try:
session = self.client.get_current_session()
self.session_context = {
'session_id': session.get('id'),
'task_type': session.get('task'),
'task_description': session.get('task_description'),
'status': session.get('status')
}
except Exception as e:
if self.verbose:
print(f"Warning: Could not refresh session context: {e}")
def get_session_summary(self) -> str:
"""Get a summary of the current session"""
try:
session = self.client.get_current_session()
progress = self.client.get_task_progress()
drones = self.client.list_drones()
summary = f"""
=== Current Session Summary ===
Session: {session.get('name', 'Unknown')}
Task: {session.get('task', 'Unknown')} - {session.get('task_description', '')}
Status: {session.get('status', 'Unknown')}
Progress: {progress.get('progress_percentage', 0)}% ({progress.get('status_message', 'Unknown')})
Completed: {progress.get('is_completed', False)}
Drones: {len(drones)} available
"""
for drone in drones:
summary += f" - {drone.get('name')} ({drone.get('id')}): {drone.get('status')}, Battery: {drone.get('battery_level', 0):.1f}%\n"
return summary.strip()
except Exception as e:
return f"Error getting session summary: {e}"
def execute(self, command: str) -> Dict[str, Any]:
"""
Execute a natural language command
Args:
command: Natural language command from user
Returns:
Dictionary with 'output', 'intermediate_steps', and 'success' keys
"""
if self.debug:
print(f"\n{'='*60}")
print(f"🎯 Executing Command")
print(f"{'='*60}")
print(f"Command: {command}")
print(f"{'='*60}\n")
try:
if self.debug:
print("🔄 Invoking agent executor...")
result = self.agent_executor.invoke({"input": command})
if self.debug:
print(f"\n{'='*60}")
print("✅ Command Execution Complete")
print(f"{'='*60}")
print(f"Success: True")
print(f"Intermediate steps: {len(result.get('intermediate_steps', []))}")
print(f"{'='*60}\n")
return {
'success': True,
'output': result.get('output', ''),
'intermediate_steps': result.get('intermediate_steps', [])
}
except Exception as e:
if self.debug:
print(f"\n{'='*60}")
print("❌ Command Execution Failed")
print(f"{'='*60}")
print(f"Error: {str(e)}")
print(f"{'='*60}\n")
return {
'success': False,
'output': f"Error executing command: {str(e)}",
'intermediate_steps': []
}
def run_interactive(self):
"""Run the agent in interactive mode"""
print("\n" + "="*60)
print("🚁 UAV Control Agent - Interactive Mode")
print("="*60)
print("\nType 'quit', 'exit', or 'q' to stop")
print("Type 'status' to see session summary")
print("Type 'help' for example commands\n")
# Show initial session summary
print(self.get_session_summary())
print("\n" + "-"*60 + "\n")
while True:
try:
user_input = input("\n🎮 Command: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("\n👋 Goodbye!")
break
if user_input.lower() == 'status':
print(self.get_session_summary())
continue
if user_input.lower() == 'help':
self._print_help()
continue
# Execute command
print("\n🤖 Processing...\n")
result = self.execute(user_input)
if result['success']:
print(f"\n{result['output']}\n")
else:
print(f"\n{result['output']}\n")
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}\n")
def _print_help(self):
"""Print example commands"""
help_text = """
Example Commands:
==================
Information:
- "What drones are available?"
- "Show me the current mission status"
- "What targets do I need to visit?"
- "Check the weather conditions"
- "What's the task progress?"
Basic Control:
- "Take off drone-abc123 to 15 meters"
- "Move drone-abc123 to coordinates x=100, y=50, z=20"
- "Land drone-abc123"
- "Return all drones home"
Mission Execution:
- "Visit all targets with the first drone"
- "Search the area with available drones"
- "Complete the mission task"
- "Patrol the assigned areas"
Safety:
- "Check if there are obstacles between (0,0,10) and (100,100,10)"
- "What's nearby drone-abc123?"
- "Check battery levels"
Smart Commands:
- "Take photos at all target locations"
- "Charge any drones with low battery"
- "Survey all targets and return home"
"""
print(help_text)
def main():
"""Main entry point"""
import argparse
import sys
parser = argparse.ArgumentParser(
description="UAV Control Agent - Natural Language Drone Control"
)
parser.add_argument(
'--base-url',
default='http://localhost:8000',
help='UAV API base URL'
)
parser.add_argument(
'--uav-api-key',
default=None,
help='API key for UAV server (defaults to USER role if not provided, or set UAV_API_KEY env var)'
)
parser.add_argument(
'--llm-provider',
default=None,
choices=['ollama', 'openai', 'openai-compatible'],
help='LLM provider (ollama, openai, or openai-compatible for DeepSeek, etc.)'
)
parser.add_argument(
'--llm-model',
default=None,
help='LLM model name (e.g., llama2, gpt-4o-mini, deepseek-chat)'
)
parser.add_argument(
'--llm-api-key',
default=None,
help='API key for LLM provider (or set via environment variable)'
)
parser.add_argument(
'--llm-base-url',
default=None,
help='Custom base URL for LLM API (required for openai-compatible providers)'
)
parser.add_argument(
'--temperature',
type=float,
default=0.1,
help='LLM temperature (0.0-1.0)'
)
parser.add_argument(
'--command', '-c',
default=None,
help='Single command to execute (non-interactive)'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='Reduce verbosity'
)
parser.add_argument(
'--debug', '-d',
action='store_true',
help='Enable debug output for connection and setup info'
)
parser.add_argument(
'--no-prompt',
action='store_true',
help='Skip interactive provider/model selection (use command line args or defaults)'
)
args = parser.parse_args()
# Determine if we should prompt for config
should_prompt = (
not args.no_prompt and
not args.command and # Only prompt in interactive mode
args.llm_provider is None and # No provider specified
args.llm_model is None # No model specified
)
# Get configuration from user prompt or command line
if should_prompt:
config = prompt_user_for_llm_config()
if config:
llm_provider = config.get('llm_provider', 'ollama')
llm_model = config.get('llm_model', 'llama2')
llm_base_url = config.get('llm_base_url')
llm_api_key = config.get('llm_api_key')
else:
# Fallback to defaults
llm_provider = 'ollama'
llm_model = 'llama2'
llm_base_url = None
llm_api_key = None
else:
# Use command line arguments or defaults
llm_provider = args.llm_provider or 'ollama'
llm_model = args.llm_model or 'llama2'
llm_base_url = args.llm_base_url
llm_api_key = args.llm_api_key
# Get LLM API key from args or environment if not set
if not llm_api_key:
llm_api_key = os.getenv("OPENAI_API_KEY") or os.getenv("LLM_API_KEY")
# Get UAV API key from args or environment
uav_api_key = args.uav_api_key or os.getenv("UAV_API_KEY")
# Create agent
try:
agent = UAVControlAgent(
base_url=args.base_url,
uav_api_key=uav_api_key,
llm_provider=llm_provider,
llm_model=llm_model,
llm_api_key=llm_api_key,
llm_base_url=llm_base_url,
temperature=args.temperature,
verbose=not args.quiet,
debug=args.debug
)
except Exception as e:
print(f"❌ Failed to create agent: {e}")
print("\nMake sure:")
print(" - Ollama is running (if using --llm-provider ollama)")
print(" - OPENAI_API_KEY is set (if using --llm-provider openai)")
print(" - UAV API server is accessible")
return 1
if args.command:
# Single command mode
result = agent.execute(args.command)
print(result['output'])
return 0 if result['success'] else 1
else:
# Interactive mode
agent.run_interactive()
return 0
if __name__ == "__main__":
import sys
sys.exit(main())

365
uav_api_client.py Normal file
View File

@@ -0,0 +1,365 @@
"""
UAV API Client
Wrapper for the UAV Control System API to simplify drone operations
"""
import requests
from typing import Dict, List, Any, Tuple, Optional
class UAVAPIClient:
"""Client for interacting with the UAV Control System API"""
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
"""
Initialize UAV API Client
Args:
base_url: Base URL of the UAV API server
api_key: Optional API key for authentication (defaults to USER role if not provided)
- None or empty: USER role (basic access)
- Valid key: SYSTEM or ADMIN role (based on key)
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.headers = {}
if self.api_key:
self.headers['X-API-Key'] = self.api_key
def _request(self, method: str, endpoint: str, **kwargs) -> Any:
"""Make HTTP request to the API"""
url = f"{self.base_url}{endpoint}"
# Merge authentication headers with any provided headers
headers = kwargs.pop('headers', {})
headers.update(self.headers)
try:
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception(f"Authentication failed: Invalid API key")
elif e.response.status_code == 403:
error_detail = e.response.json().get('detail', 'Access denied')
raise Exception(f"Permission denied: {error_detail}")
raise Exception(f"API request failed: {e}")
except requests.exceptions.RequestException as e:
raise Exception(f"API request failed: {e}")
# Drone Operations
def list_drones(self) -> List[Dict[str, Any]]:
"""Get all drones in the current session"""
return self._request('GET', '/drones')
def get_all_waypoints(self) -> List[Dict[str, Any]]:
"""Get all waypoints in the current session"""
return self._request('GET', '/targets/type/waypoint')
def get_drone_status(self, drone_id: str) -> Dict[str, Any]:
"""Get detailed status of a specific drone"""
return self._request('GET', f'/drones/{drone_id}')
def take_off(self, drone_id: str, altitude: float = 10.0) -> Dict[str, Any]:
"""Command drone to take off to specified altitude"""
return self._request('POST', f'/drones/{drone_id}/command/take_off',params={'altitude': altitude})
def land(self, drone_id: str) -> Dict[str, Any]:
"""Command drone to land at current position"""
return self._request('POST', f'/drones/{drone_id}/command/land')
def move_to(self, drone_id: str, x: float, y: float, z: float) -> Dict[str, Any]:
"""Move drone to specific coordinates"""
return self._request('POST', f'/drones/{drone_id}/command/move_to',
params={'x': x, 'y': y, 'z': z})
def optimal_way_to(self, drone_id: str, x: float, y: float, z: float, min_safe_height: float = 0.5) -> List[Dict[str, float]]:
"""
计算到达目标点的最优路径(仅水平绕行)。
"""
# 1. 目标高度检查
if z < min_safe_height:
print(f"Error: Target altitude {z}m is too low.")
return []
status = self.get_drone_status(drone_id)
start_pos = status['position']
start_coords = (start_pos['x'], start_pos['y'], start_pos['z'])
end_coords = (x, y, z)
# 2. 起点高度检查
if start_coords[2] < min_safe_height:
print(f"Warning: Drone is currently below safe height!")
# 3. 执行递归搜索
path_points = self._find_path_recursive(
start_coords,
end_coords,
avoidance_radius=2.0, # 初始绕行半径
depth=0,
max_depth=4, # 最大递归深度
min_safe_height=min_safe_height
)
if path_points is None:
print(f"Error: Unable to find a collision-free path.")
return []
# 4. 格式化输出
formatted_path = [{"x": p[0], "y": p[1], "z": p[2]} for p in path_points]
for point in formatted_path:
self.move_to(drone_id, **point)
return formatted_path
# --- 递归核心 ---
def _find_path_recursive(self, start: Tuple[float, float, float], end: Tuple[float, float, float],
avoidance_radius: float, depth: int, max_depth: int,
min_safe_height: float) -> Optional[List[Tuple[float, float, float]]]:
sx, sy, sz = start
ex, ey, ez = end
# 1. 检查直连是否有碰撞
collision = self.check_path_collision(sx, sy, sz, ex, ey, ez)
if not collision:
return [end]
# 2. 达到最大深度则停止
if depth >= max_depth:
return None
# 3. 计算路径中点
mid_point = ((sx + ex) / 2, (sy + ey) / 2, (sz + ez) / 2)
# 随着深度增加,减小绕行半径,进行更精细的搜索
current_radius = avoidance_radius / (1 + 0.5 * depth)
# 4. 获取仅包含左右方向的候选点
candidates = self._get_horizontal_avoidance_points(start, end, mid_point, current_radius)
# 5. 遍历候选点
for candidate in candidates:
# 过滤掉非法高度的点 (虽然水平偏移理论上不改变高度,但以防万一)
if candidate[2] < min_safe_height:
continue
# 递归处理:起点 -> 候选点
path_first = self._find_path_recursive(start, candidate, avoidance_radius, depth + 1, max_depth, min_safe_height)
if path_first is not None:
# 递归处理:候选点 -> 终点
path_second = self._find_path_recursive(candidate, end, avoidance_radius, depth + 1, max_depth, min_safe_height)
if path_second is not None:
# 路径拼接
return path_first + path_second
# 所有左右尝试都失败
return None
# --- 向量计算 (核心修改部分) ---
def _get_horizontal_avoidance_points(self, start, end, mid, radius) -> List[Tuple[float, float, float]]:
"""
生成候选点:强制仅在水平面上进行左右偏移。
"""
# 1. 计算飞行方向向量 D = End - Start
dx = end[0] - start[0]
dy = end[1] - start[1]
# dz 我们不关心,因为我们要在水平面找垂线
# 计算水平投影的长度
dist_horizontal = (dx*dx + dy*dy)**0.5
rx, ry, rz = 0.0, 0.0, 0.0
# 2. 计算右向量 (Right Vector)
if dist_horizontal == 0:
# 特殊情况:垂直升降 (Start和End的x,y相同)
# 此时"左右"没有绝对定义,我们任意选取 X 轴方向作为偏移方向
rx, ry, rz = 1.0, 0.0, 0.0
else:
# 标准情况:利用 2D 向量旋转 90 度原理
# 向量 (x, y) 顺时针旋转 90 度变为 (y, -x)
# 归一化
rx = dy / dist_horizontal
ry = -dx / dist_horizontal
rz = 0.0 # 强制 Z 轴分量为 0保证水平
mx, my, mz = mid
# 3. 生成候选点:只生成 右(Right) 和 左(Left)
# 注意Right 是 (rx, ry)Left 是 (-rx, -ry)
candidates = []
# 右侧点
c1 = (mx + rx * radius, my + ry * radius, mz) # Z高度保持中点高度不变
candidates.append(c1)
# 左侧点
c2 = (mx - rx * radius, my - ry * radius, mz)
candidates.append(c2)
return candidates
def move_along_path(self, drone_id: str, waypoints: List[Dict[str, float]]) -> Dict[str, Any]:
"""Move drone along a path of waypoints"""
return self._request('POST', f'/drones/{drone_id}/command/move_along_path',
json={'waypoints': waypoints})
def change_altitude(self, drone_id: str, altitude: float) -> Dict[str, Any]:
"""Change drone altitude while maintaining X/Y position"""
return self._request('POST', f'/drones/{drone_id}/command/change_altitude',
params={'altitude': altitude})
def hover(self, drone_id: str, duration: Optional[float] = None) -> Dict[str, Any]:
"""
Command drone to hover at current position.
Args:
drone_id: ID of the drone
duration: Optional duration to hover in seconds
"""
params = {}
if duration is not None:
params['duration'] = duration
return self._request('POST', f'/drones/{drone_id}/command/hover', params=params)
def rotate(self, drone_id: str, heading: float) -> Dict[str, Any]:
"""Rotate drone to face specific direction (0-360 degrees)"""
return self._request('POST', f'/drones/{drone_id}/command/rotate',
params={'heading': heading})
def move_towards(self, drone_id: str, distance: float, heading: Optional[float] = None,
dz: Optional[float] = None) -> Dict[str, Any]:
"""
Move drone a specific distance in a direction.
Args:
drone_id: ID of the drone
distance: Distance to move in meters
heading: Optional heading direction (0-360). If None, uses current heading.
dz: Optional vertical component (altitude change)
"""
params = {'distance': distance}
if heading is not None:
params['heading'] = heading
if dz is not None:
params['dz'] = dz
return self._request('POST', f'/drones/{drone_id}/command/move_towards', params=params)
def return_home(self, drone_id: str) -> Dict[str, Any]:
"""Command drone to return to home position"""
return self._request('POST', f'/drones/{drone_id}/command/return_home')
def set_home(self, drone_id: str) -> Dict[str, Any]:
"""Set current position as home position"""
return self._request('POST', f'/drones/{drone_id}/command/set_home')
def calibrate(self, drone_id: str) -> Dict[str, Any]:
"""Calibrate drone sensors"""
return self._request('POST', f'/drones/{drone_id}/command/calibrate')
def charge(self, drone_id: str, charge_amount: float) -> Dict[str, Any]:
"""Charge drone battery (when landed)"""
return self._request('POST', f'/drones/{drone_id}/command/charge',
params={'charge_amount': charge_amount})
def take_photo(self, drone_id: str) -> Dict[str, Any]:
"""Take a photo with drone camera"""
return self._request('POST', f'/drones/{drone_id}/command/take_photo')
def send_message(self, drone_id: str, target_drone_id: str, message: str) -> Dict[str, Any]:
"""
Send a message to another drone.
Args:
drone_id: ID of the sender drone
target_drone_id: ID of the recipient drone
message: Content of the message
"""
return self._request('POST', f'/drones/{drone_id}/command/send_message',
params={'target_drone_id': target_drone_id, 'message': message})
def broadcast(self, drone_id: str, message: str) -> Dict[str, Any]:
"""
Broadcast a message to all other drones.
Args:
drone_id: ID of the sender drone
message: Content of the message
"""
return self._request('POST', f'/drones/{drone_id}/command/broadcast',
params={'message': message})
# Session Operations
def get_current_session(self) -> Dict[str, Any]:
"""Get information about current mission session"""
return self._request('GET', '/sessions/current')
def get_session_data(self, session_id: str = 'current') -> Dict[str, Any]:
"""Get all entities in a session (drones, targets, obstacles, environment)"""
return self._request('GET', f'/sessions/{session_id}/data')
def get_task_progress(self, session_id: str = 'current') -> Dict[str, Any]:
"""Get mission task completion progress"""
return self._request('GET', f'/sessions/{session_id}/task-progress')
# Environment Operations
def get_weather(self) -> Dict[str, Any]:
"""Get current weather conditions"""
return self._request('GET', '/environments/current')
def get_targets(self) -> List[Dict[str, Any]]:
"""Get all targets in the session"""
fixed = self._request('GET', '/targets/type/fixed')
moving = self._request('GET', '/targets/type/moving')
waypoint = self._request('GET', '/targets/type/waypoint')
circle = self._request('GET', '/targets/type/circle')
polygen = self._request('GET', '/targets/type/polygon')
return fixed + moving + waypoint + circle + polygen
def get_waypoints(self) -> List[Dict[str, Any]]:
"""Get all charging station waypoints"""
return self._request('GET', '/targets/type/waypoint')
def get_nearest_waypoint(self, x: str, y: str, z: str) -> Dict[str, Any]:
"""Get nearest charging station waypoint"""
return self._request('GET', '/targets/waypoints/nearest',
json={'x': x, 'y': y, 'z': z})
def get_obstacles(self) -> List[Dict[str, Any]]:
"""Get all obstacles in the session"""
point = self._request('GET', '/obstacles/type/point')
circle = self._request('GET', '/obstacles/type/circle')
polygon = self._request('GET', '/obstacles/type/polygon')
ellipse = self._request('GET', '/obstacles/type/ellipse')
return point + circle + polygon + ellipse
def get_nearby_entities(self, drone_id: str) -> Dict[str, Any]:
"""Get entities near a drone (within perceived radius)"""
return self._request('GET', f'/drones/{drone_id}/nearby')
# Safety Operations
def check_point_collision(self, x: float, y: float, z: float,
safety_margin: float = 0.0) -> Optional[Dict[str, Any]]:
"""Check if a point collides with any obstacle"""
result = self._request('POST', '/obstacles/collision/check',
json={
'point': {'x': x, 'y': y, 'z': z},
'safety_margin': safety_margin
})
return result
def check_path_collision(self, start_x: float, start_y: float, start_z: float,
end_x: float, end_y: float, end_z: float,
safety_margin: float = 1.0) -> Optional[Dict[str, Any]]:
"""Check if a path intersects any obstacle"""
result = self._request('POST', '/obstacles/collision/path',
json={
'start': {'x': start_x, 'y': start_y, 'z': start_z},
'end': {'x': end_x, 'y': end_y, 'z': end_z},
'safety_margin': safety_margin
})
return result

649
uav_langchain_tools.py Normal file
View File

@@ -0,0 +1,649 @@
"""
LangChain Tools for UAV Control
Wraps the UAV API client as LangChain tools using @tool decorator
All tools accept JSON string input for consistent parameter handling
"""
from langchain.tools import tool
from uav_api_client import UAVAPIClient
import json
def create_uav_tools(client: UAVAPIClient) -> list:
"""
Create all UAV control tools for LangChain agent using @tool decorator
All tools that require parameters accept a JSON string input
"""
# ========== Information Gathering Tools (No Parameters) ==========
@tool
def list_drones() -> str:
"""List all available drones in the current session with their status, battery level, and position.
Use this to see what drones are available before trying to control them.
No input required."""
try:
drones = client.list_drones()
return json.dumps(drones, indent=2)
except Exception as e:
return f"Error listing drones: {str(e)}"
@tool
def get_session_info() -> str:
"""Get current session information including task type, statistics, and status.
Use this to understand what mission you need to complete.
No input required."""
try:
session = client.get_current_session()
return json.dumps(session, indent=2)
except Exception as e:
return f"Error getting session info: {str(e)}"
@tool
def get_session_data() -> str:
"""Get all session data including drones, targets, and obstacles.
Use this to understand the environment and plan your mission.
No input required."""
try:
session_data = client.get_session_data()
return json.dumps(session_data, indent=2)
except Exception as e:
return f"Error getting session data: {str(e)}"
@tool
def get_task_progress() -> str:
"""Get mission task progress including completion percentage and status.
Use this to track mission completion and see how close you are to finishing.
No input required."""
try:
progress = client.get_task_progress()
return json.dumps(progress, indent=2)
except Exception as e:
return f"Error getting task progress: {str(e)}"
@tool
def get_weather() -> str:
"""Get current weather conditions including wind speed, visibility, and weather type.
Check this before takeoff to ensure safe flying conditions.
No input required."""
try:
weather = client.get_weather()
return json.dumps(weather, indent=2)
except Exception as e:
return f"Error getting weather: {str(e)}"
@tool
def get_targets() -> str:
"""Get all targets in the session including fixed, moving, waypoint, circle and polygon to search or patrol.
Use this to see what targets you need to visit.
No input required."""
try:
targets = client.get_targets()
return json.dumps(targets, indent=2)
except Exception as e:
return f"Error getting targets: {str(e)}"
@tool
def get_all_waypoints() -> str:
"""Get all waypoints in the session including coordinates and altitude.
Use this to understand the where to charge that drones will follow.
No input required."""
try:
waypoints = client.get_all_waypoints()
return json.dumps(waypoints, indent=2)
except Exception as e:
return f"Error getting waypoints: {str(e)}"
@tool
def get_obstacles() -> str:
"""Get all obstacles in the session that drones must avoid.
Use this to understand what obstacles exist in the environment.
No input required."""
try:
obstacles = client.get_obstacles()
return json.dumps(obstacles, indent=2)
except Exception as e:
return f"Error getting obstacles: {str(e)}"
@tool
def get_drone_status(input_json: str) -> str:
"""Get detailed status of a specific drone including position, battery, heading, and visited targets.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
status = client.get_drone_status(drone_id)
return json.dumps(status, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error getting drone status: {str(e)}"
@tool
def get_nearby_entities(input_json: str) -> str:
"""Get drones, targets, and obstacles near a specific drone (within its perception radius).
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
nearby = client.get_nearby_entities(drone_id)
return json.dumps(nearby, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error getting nearby entities: {str(e)}"
@tool
def land(input_json: str) -> str:
"""Command a drone to land at its current position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.land(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error during landing: {str(e)}"
@tool
def hover(input_json: str) -> str:
"""Command a drone to hover at its current position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- duration: Optional duration in seconds to hover (optional)
Example: {{"drone_id": "drone-001", "duration": 5.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
duration = params.get('duration')
if not drone_id:
return "Error: drone_id is required"
result = client.hover(drone_id, duration)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error hovering: {str(e)}"
@tool
def return_home(input_json: str) -> str:
"""Command a drone to return to its home position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.return_home(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error returning home: {str(e)}"
@tool
def set_home(input_json: str) -> str:
"""Set the drone's current position as its new home position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.set_home(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error setting home: {str(e)}"
@tool
def calibrate(input_json: str) -> str:
"""Calibrate the drone's sensors.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.calibrate(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error calibrating: {str(e)}"
@tool
def take_photo(input_json: str) -> str:
"""Command a drone to take a photo.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
Example: {{"drone_id": "drone-001"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
if not drone_id:
return "Error: drone_id is required"
result = client.take_photo(drone_id)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\"}}"
except Exception as e:
return f"Error taking photo: {str(e)}"
# ========== Two Parameter Tools ==========
@tool
def take_off(input_json: str) -> str:
"""Command a drone to take off to a specified altitude.
Drone must be on ground (idle or ready status).
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- altitude: Target altitude in meters (optional, default: 10.0)
Example: {{"drone_id": "drone-001", "altitude": 15.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
altitude = params.get('altitude', 10.0)
if not drone_id:
return "Error: drone_id is required"
result = client.take_off(drone_id, altitude)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"altitude\": 15.0}}"
except Exception as e:
return f"Error during takeoff: {str(e)}"
@tool
def change_altitude(input_json: str) -> str:
"""Change a drone's altitude while maintaining X/Y position.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- altitude: Target altitude in meters (required)
Example: {{"drone_id": "drone-001", "altitude": 20.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
altitude = params.get('altitude')
if not drone_id:
return "Error: drone_id is required"
if altitude is None:
return "Error: altitude is required"
result = client.change_altitude(drone_id, altitude)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"altitude\": 20.0}}"
except Exception as e:
return f"Error changing altitude: {str(e)}"
@tool
def rotate(input_json: str) -> str:
"""Rotate a drone to face a specific direction.
0=North, 90=East, 180=South, 270=West.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- heading: Target heading in degrees 0-360 (required)
Example: {{"drone_id": "drone-001", "heading": 90.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
heading = params.get('heading')
if not drone_id:
return "Error: drone_id is required"
if heading is None:
return "Error: heading is required"
result = client.rotate(drone_id, heading)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"heading\": 90.0}}"
except Exception as e:
return f"Error rotating: {str(e)}"
@tool
def send_message(input_json: str) -> str:
"""Send a message from one drone to another.
Input should be a JSON string with:
- drone_id: The ID of the sender drone (required)
- target_drone_id: The ID of the recipient drone (required)
- message: The message content (required)
Example: {{"drone_id": "drone-001", "target_drone_id": "drone-002", "message": "Hello"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
target_drone_id = params.get('target_drone_id')
message = params.get('message')
if not drone_id:
return "Error: drone_id is required"
if not target_drone_id:
return "Error: target_drone_id is required"
if not message:
return "Error: message is required"
result = client.send_message(drone_id, target_drone_id, message)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"target_drone_id\": \"drone-002\", \"message\": \"...\"}}"
except Exception as e:
return f"Error sending message: {str(e)}"
@tool
def broadcast(input_json: str) -> str:
"""Broadcast a message from one drone to all other drones.
Input should be a JSON string with:
- drone_id: The ID of the sender drone (required)
- message: The message content (required)
Example: {{"drone_id": "drone-001", "message": "Alert"}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
message = params.get('message')
if not drone_id:
return "Error: drone_id is required"
if not message:
return "Error: message is required"
result = client.broadcast(drone_id, message)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"message\": \"...\"}}"
except Exception as e:
return f"Error broadcasting: {str(e)}"
@tool
def charge(input_json: str) -> str:
"""Command a drone to charge its battery.
Drone must be landed at a charging station.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- charge_amount: Amount to charge in percent (required)
Example: {{"drone_id": "drone-001", "charge_amount": 25.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
charge_amount = params.get('charge_amount')
if not drone_id:
return "Error: drone_id is required"
if charge_amount is None:
return "Error: charge_amount is required"
result = client.charge(drone_id, charge_amount)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"charge_amount\": 25.0}}"
except Exception as e:
return f"Error charging: {str(e)}"
@tool
def move_towards(input_json: str) -> str:
"""Move a drone a specific distance in a direction.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- distance: Distance to move in meters (required)
- heading: Heading direction in degrees 0-360 (optional, default: current heading)
- dz: Vertical component in meters (optional)
Example: {{"drone_id": "drone-001", "distance": 10.0, "heading": 90.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
distance = params.get('distance')
heading = params.get('heading')
dz = params.get('dz')
if not drone_id:
return "Error: drone_id is required"
if distance is None:
return "Error: distance is required"
result = client.move_towards(drone_id, distance, heading, dz)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"distance\": 10.0}}"
except Exception as e:
return f"Error moving towards: {str(e)}"
# @tool
# def move_along_path(input_json: str) -> str:
# """Move a drone along a path of waypoints.
# Input should be a JSON string with:
# - drone_id: The ID of the drone (required)
# - waypoints: List of points with x, y, z coordinates (required)
# Example: {{"drone_id": "drone-001", "waypoints": [{{"x": 10, "y": 10, "z": 10}}, {{"x": 20, "y": 20, "z": 10}}]}}
# """
# try:
# params = json.loads(input_json) if isinstance(input_json, str) else input_json
# drone_id = params.get('drone_id')
# waypoints = params.get('waypoints')
# if not drone_id:
# return "Error: drone_id is required"
# if not waypoints:
# return "Error: waypoints list is required"
# result = client.move_along_path(drone_id, waypoints)
# return json.dumps(result, indent=2)
# except json.JSONDecodeError as e:
# return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"waypoints\": [...]}}"
# except Exception as e:
# return f"Error moving along path: {str(e)}"
# ========== Multi-Parameter Tools ==========
@tool
def get_nearest_waypoint(input_json: str) -> str:
"""Get the nearest waypoint to a specific drone.
Input should be a JSON string with:
- x: The x-coordinate of the drone (required)
- y: The y-coordinate of the drone (required)
- z: The z-coordinate of the drone (required)
Example: {{"x": 0.0, "y": 0.0, "z": 0.0}}"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
x = params.get('x')
y = params.get('y')
z = params.get('z')
if x is None or y is None or z is None:
return "Error: x, y, and z coordinates are required"
nearest = client.get_nearest_waypoint(x, y, z)
return json.dumps(nearest, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"x\": 0.0, \"y\": 0.0, \"z\": 0.0}}"
except Exception as e:
return f"Error getting nearest waypoint: {str(e)}"
@tool
def move_to(input_json: str) -> str:
"""Move a drone to specific 3D coordinates (x, y, z).
Always check for collisions first using check_path_collision.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- x: Target X coordinate in meters (required)
- y: Target Y coordinate in meters (required)
- z: Target Z coordinate (altitude) in meters (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
x = params.get('x')
y = params.get('y')
z = params.get('z')
if not drone_id:
return "Error: drone_id is required"
if x is None or y is None or z is None:
return "Error: x, y, and z coordinates are required"
result = client.move_to(drone_id, x, y, z)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}"
except Exception as e:
return f"Error moving drone: {str(e)}"
@tool
def optimal_way_to(input_json: str) -> str:
"""Get the optimal path to a specific 3D coordinates (x, y, z).
Always check for collisions first using check_path_collision.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- x: Target X coordinate in meters (required)
- y: Target Y coordinate in meters (required)
- z: Target Z coordinate (altitude) in meters (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
try:
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
x = params.get('x')
y = params.get('y')
z = params.get('z')
if not drone_id:
return "Error: drone_id is required"
if x is None or y is None or z is None:
return "Error: x, y, and z coordinates are required"
result = client.optimal_way_to(drone_id, x, y, z)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}"
except Exception as e:
return f"Error moving drone: {str(e)}"
# Return all tools
return [
list_drones,
get_drone_status,
get_session_info,
# get_session_data,
get_task_progress,
get_weather,
# get_targets,
get_obstacles,
get_nearby_entities,
take_off,
land,
move_to,
# optimal_way_to,
move_towards,
change_altitude,
hover,
rotate,
return_home,
set_home,
calibrate,
take_photo,
send_message,
broadcast,
charge,
get_nearest_waypoint,
get_all_waypoints,
get_targets
]