370 lines
16 KiB
Python
370 lines
16 KiB
Python
"""
|
||
UAV API Client
|
||
Wrapper for the UAV Control System API to simplify drone operations
|
||
"""
|
||
import requests
|
||
from typing import Dict, List, Any, Tuple, Optional
|
||
|
||
|
||
class UAVAPIClient:
|
||
"""Client for interacting with the UAV Control System API"""
|
||
|
||
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
|
||
"""
|
||
Initialize UAV API Client
|
||
|
||
Args:
|
||
base_url: Base URL of the UAV API server
|
||
api_key: Optional API key for authentication (defaults to USER role if not provided)
|
||
- None or empty: USER role (basic access)
|
||
- Valid key: SYSTEM or ADMIN role (based on key)
|
||
"""
|
||
self.base_url = base_url.rstrip('/')
|
||
self.api_key = api_key
|
||
self.headers = {}
|
||
if self.api_key:
|
||
self.headers['X-API-Key'] = self.api_key
|
||
|
||
def _request(self, method: str, endpoint: str, **kwargs) -> Any:
|
||
"""Make HTTP request to the API"""
|
||
url = f"{self.base_url}{endpoint}"
|
||
|
||
# Merge authentication headers with any provided headers
|
||
headers = kwargs.pop('headers', {})
|
||
headers.update(self.headers)
|
||
|
||
try:
|
||
response = requests.request(method, url, headers=headers, **kwargs)
|
||
response.raise_for_status()
|
||
if response.status_code == 204:
|
||
return None
|
||
return response.json()
|
||
except requests.exceptions.HTTPError as e:
|
||
if e.response.status_code == 401:
|
||
raise Exception(f"Authentication failed: Invalid API key")
|
||
elif e.response.status_code == 403:
|
||
error_detail = e.response.json().get('detail', 'Access denied')
|
||
raise Exception(f"Permission denied: {error_detail}")
|
||
raise Exception(f"API request failed: {e}")
|
||
except requests.exceptions.RequestException as e:
|
||
raise Exception(f"API request failed: {e}")
|
||
|
||
# Drone Operations
|
||
def list_drones(self) -> List[Dict[str, Any]]:
|
||
"""Get all drones in the current session"""
|
||
return self._request('GET', '/drones')
|
||
|
||
def get_all_waypoints(self) -> List[Dict[str, Any]]:
|
||
"""Get all waypoints in the current session"""
|
||
return self._request('GET', '/targets/type/waypoint')
|
||
|
||
def get_drone_status(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Get detailed status of a specific drone"""
|
||
return self._request('GET', f'/drones/{drone_id}')
|
||
|
||
def take_off(self, drone_id: str, altitude: float = 10.0) -> Dict[str, Any]:
|
||
"""Command drone to take off to specified altitude"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/take_off',params={'altitude': altitude})
|
||
|
||
def land(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Command drone to land at current position"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/land')
|
||
|
||
def move_to(self, drone_id: str, x: float, y: float, z: float) -> Dict[str, Any]:
|
||
"""Move drone to specific coordinates"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/move_to',
|
||
params={'x': x, 'y': y, 'z': z})
|
||
|
||
def optimal_way_to(self, drone_id: str, x: float, y: float, z: float, min_safe_height: float = 0.5) -> List[Dict[str, float]]:
|
||
"""
|
||
计算到达目标点的最优路径(仅水平绕行)。
|
||
"""
|
||
# 1. 目标高度检查
|
||
if z < min_safe_height:
|
||
print(f"Error: Target altitude {z}m is too low.")
|
||
return []
|
||
|
||
status = self.get_drone_status(drone_id)
|
||
start_pos = status['position']
|
||
start_coords = (start_pos['x'], start_pos['y'], start_pos['z'])
|
||
end_coords = (x, y, z)
|
||
|
||
# 2. 起点高度检查
|
||
if start_coords[2] < min_safe_height:
|
||
print(f"Warning: Drone is currently below safe height!")
|
||
|
||
# 3. 执行递归搜索
|
||
path_points = self._find_path_recursive(
|
||
start_coords,
|
||
end_coords,
|
||
avoidance_radius=2.0, # 初始绕行半径
|
||
depth=0,
|
||
max_depth=4, # 最大递归深度
|
||
min_safe_height=min_safe_height
|
||
)
|
||
|
||
if path_points is None:
|
||
print(f"Error: Unable to find a collision-free path.")
|
||
return []
|
||
|
||
# 4. 格式化输出
|
||
formatted_path = [{"x": p[0], "y": p[1], "z": p[2]} for p in path_points]
|
||
for point in formatted_path:
|
||
self.move_to(drone_id, **point)
|
||
return formatted_path
|
||
|
||
# --- 递归核心 ---
|
||
def _find_path_recursive(self, start: Tuple[float, float, float], end: Tuple[float, float, float],
|
||
avoidance_radius: float, depth: int, max_depth: int,
|
||
min_safe_height: float) -> Optional[List[Tuple[float, float, float]]]:
|
||
sx, sy, sz = start
|
||
ex, ey, ez = end
|
||
|
||
# 1. 检查直连是否有碰撞
|
||
collision = self.check_path_collision(sx, sy, sz, ex, ey, ez)
|
||
if not collision:
|
||
return [end]
|
||
|
||
# 2. 达到最大深度则停止
|
||
if depth >= max_depth:
|
||
return None
|
||
|
||
# 3. 计算路径中点
|
||
mid_point = ((sx + ex) / 2, (sy + ey) / 2, (sz + ez) / 2)
|
||
|
||
# 随着深度增加,减小绕行半径,进行更精细的搜索
|
||
current_radius = avoidance_radius / (1 + 0.5 * depth)
|
||
|
||
# 4. 获取仅包含左右方向的候选点
|
||
candidates = self._get_horizontal_avoidance_points(start, end, mid_point, current_radius)
|
||
|
||
# 5. 遍历候选点
|
||
for candidate in candidates:
|
||
# 过滤掉非法高度的点 (虽然水平偏移理论上不改变高度,但以防万一)
|
||
if candidate[2] < min_safe_height:
|
||
continue
|
||
|
||
# 递归处理:起点 -> 候选点
|
||
path_first = self._find_path_recursive(start, candidate, avoidance_radius, depth + 1, max_depth, min_safe_height)
|
||
|
||
if path_first is not None:
|
||
# 递归处理:候选点 -> 终点
|
||
path_second = self._find_path_recursive(candidate, end, avoidance_radius, depth + 1, max_depth, min_safe_height)
|
||
|
||
if path_second is not None:
|
||
# 路径拼接
|
||
return path_first + path_second
|
||
|
||
# 所有左右尝试都失败
|
||
return None
|
||
|
||
# --- 向量计算 (核心修改部分) ---
|
||
def _get_horizontal_avoidance_points(self, start, end, mid, radius) -> List[Tuple[float, float, float]]:
|
||
"""
|
||
生成候选点:强制仅在水平面上进行左右偏移。
|
||
"""
|
||
# 1. 计算飞行方向向量 D = End - Start
|
||
dx = end[0] - start[0]
|
||
dy = end[1] - start[1]
|
||
# dz 我们不关心,因为我们要在水平面找垂线
|
||
|
||
# 计算水平投影的长度
|
||
dist_horizontal = (dx*dx + dy*dy)**0.5
|
||
|
||
rx, ry, rz = 0.0, 0.0, 0.0
|
||
|
||
# 2. 计算右向量 (Right Vector)
|
||
if dist_horizontal == 0:
|
||
# 特殊情况:垂直升降 (Start和End的x,y相同)
|
||
# 此时"左右"没有绝对定义,我们任意选取 X 轴方向作为偏移方向
|
||
rx, ry, rz = 1.0, 0.0, 0.0
|
||
else:
|
||
# 标准情况:利用 2D 向量旋转 90 度原理
|
||
# 向量 (x, y) 顺时针旋转 90 度变为 (y, -x)
|
||
# 归一化
|
||
rx = dy / dist_horizontal
|
||
ry = -dx / dist_horizontal
|
||
rz = 0.0 # 强制 Z 轴分量为 0,保证水平
|
||
|
||
mx, my, mz = mid
|
||
|
||
# 3. 生成候选点:只生成 右(Right) 和 左(Left)
|
||
# 注意:Right 是 (rx, ry),Left 是 (-rx, -ry)
|
||
candidates = []
|
||
|
||
# 右侧点
|
||
c1 = (mx + rx * radius, my + ry * radius, mz) # Z高度保持中点高度不变
|
||
candidates.append(c1)
|
||
|
||
# 左侧点
|
||
c2 = (mx - rx * radius, my - ry * radius, mz)
|
||
candidates.append(c2)
|
||
|
||
return candidates
|
||
|
||
|
||
def move_along_path(self, drone_id: str, waypoints: List[Dict[str, float]]) -> Dict[str, Any]:
|
||
"""Move drone along a path of waypoints"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/move_along_path',
|
||
json={'waypoints': waypoints})
|
||
|
||
def change_altitude(self, drone_id: str, altitude: float) -> Dict[str, Any]:
|
||
"""Change drone altitude while maintaining X/Y position"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/change_altitude',
|
||
params={'altitude': altitude})
|
||
|
||
def hover(self, drone_id: str, duration: Optional[float] = None) -> Dict[str, Any]:
|
||
"""
|
||
Command drone to hover at current position.
|
||
|
||
Args:
|
||
drone_id: ID of the drone
|
||
duration: Optional duration to hover in seconds
|
||
"""
|
||
params = {}
|
||
if duration is not None:
|
||
params['duration'] = duration
|
||
return self._request('POST', f'/drones/{drone_id}/command/hover', params=params)
|
||
|
||
def rotate(self, drone_id: str, heading: float) -> Dict[str, Any]:
|
||
"""Rotate drone to face specific direction (0-360 degrees)"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/rotate',
|
||
params={'heading': heading})
|
||
|
||
def move_towards(self, drone_id: str, distance: float, heading: Optional[float] = None,
|
||
dz: Optional[float] = None) -> Dict[str, Any]:
|
||
"""
|
||
Move drone a specific distance in a direction.
|
||
|
||
Args:
|
||
drone_id: ID of the drone
|
||
distance: Distance to move in meters
|
||
heading: Optional heading direction (0-360). If None, uses current heading.
|
||
dz: Optional vertical component (altitude change)
|
||
"""
|
||
params = {'distance': distance}
|
||
if heading is not None:
|
||
params['heading'] = heading
|
||
if dz is not None:
|
||
params['dz'] = dz
|
||
return self._request('POST', f'/drones/{drone_id}/command/move_towards', params=params)
|
||
|
||
def return_home(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Command drone to return to home position"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/return_home')
|
||
|
||
def set_home(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Set current position as home position"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/set_home')
|
||
|
||
def calibrate(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Calibrate drone sensors"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/calibrate')
|
||
|
||
def charge(self, drone_id: str, charge_amount: float) -> Dict[str, Any]:
|
||
"""Charge drone battery (when landed)"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/charge',
|
||
params={'charge_amount': charge_amount})
|
||
|
||
def take_photo(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Take a photo with drone camera"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/take_photo')
|
||
|
||
def send_message(self, drone_id: str, target_drone_id: str, message: str) -> Dict[str, Any]:
|
||
"""
|
||
Send a message to another drone.
|
||
|
||
Args:
|
||
drone_id: ID of the sender drone
|
||
target_drone_id: ID of the recipient drone
|
||
message: Content of the message
|
||
"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/send_message',
|
||
params={'target_drone_id': target_drone_id, 'message': message})
|
||
|
||
def broadcast(self, drone_id: str, message: str) -> Dict[str, Any]:
|
||
"""
|
||
Broadcast a message to all other drones.
|
||
|
||
Args:
|
||
drone_id: ID of the sender drone
|
||
message: Content of the message
|
||
"""
|
||
return self._request('POST', f'/drones/{drone_id}/command/broadcast',
|
||
params={'message': message})
|
||
|
||
# Session Operations
|
||
def get_current_session(self) -> Dict[str, Any]:
|
||
"""Get information about current mission session"""
|
||
return self._request('GET', '/sessions/current')
|
||
|
||
def get_session_data(self, session_id: str = 'current') -> Dict[str, Any]:
|
||
"""Get all entities in a session (drones, targets, obstacles, environment)"""
|
||
return self._request('GET', f'/sessions/{session_id}/data')
|
||
|
||
def get_task_progress(self, session_id: str = 'current') -> Dict[str, Any]:
|
||
"""Get mission task completion progress"""
|
||
return self._request('GET', f'/sessions/{session_id}/task-progress')
|
||
|
||
# Environment Operations
|
||
def get_weather(self) -> Dict[str, Any]:
|
||
"""Get current weather conditions"""
|
||
return self._request('GET', '/environments/current')
|
||
|
||
# def get_targets(self) -> List[Dict[str, Any]]:
|
||
# """Get all targets in the session"""
|
||
# fixed = self._request('GET', '/targets/type/fixed')
|
||
# moving = self._request('GET', '/targets/type/moving')
|
||
# waypoint = self._request('GET', '/targets/type/waypoint')
|
||
# circle = self._request('GET', '/targets/type/circle')
|
||
# polygen = self._request('GET', '/targets/type/polygon')
|
||
# return fixed + moving + waypoint + circle + polygen
|
||
|
||
def get_target_status(self, target_id: str) -> Dict[str, Any]:
|
||
"""Get information about a specific target"""
|
||
return self._request('GET', f'/targets/{target_id}')
|
||
|
||
def get_waypoints(self) -> List[Dict[str, Any]]:
|
||
"""Get all charging station waypoints"""
|
||
return self._request('GET', '/targets/type/waypoint')
|
||
|
||
def get_nearest_waypoint(self, x: str, y: str, z: str) -> Dict[str, Any]:
|
||
"""Get nearest charging station waypoint"""
|
||
return self._request('GET', '/targets/waypoints/nearest',
|
||
json={'x': x, 'y': y, 'z': z})
|
||
|
||
# def get_obstacles(self) -> List[Dict[str, Any]]:
|
||
# """Get all obstacles in the session"""
|
||
# point = self._request('GET', '/obstacles/type/point')
|
||
# circle = self._request('GET', '/obstacles/type/circle')
|
||
# polygon = self._request('GET', '/obstacles/type/polygon')
|
||
# ellipse = self._request('GET', '/obstacles/type/ellipse')
|
||
# return point + circle + polygon + ellipse
|
||
|
||
def get_nearby_entities(self, drone_id: str) -> Dict[str, Any]:
|
||
"""Get entities near a drone (within perceived radius)"""
|
||
return self._request('GET', f'/drones/{drone_id}/nearby')
|
||
|
||
# Safety Operations
|
||
def check_point_collision(self, x: float, y: float, z: float,
|
||
safety_margin: float = 0.0) -> Optional[Dict[str, Any]]:
|
||
"""Check if a point collides with any obstacle"""
|
||
result = self._request('POST', '/obstacles/collision/check',
|
||
json={
|
||
'point': {'x': x, 'y': y, 'z': z},
|
||
'safety_margin': safety_margin
|
||
})
|
||
return result
|
||
|
||
def check_path_collision(self, start_x: float, start_y: float, start_z: float,
|
||
end_x: float, end_y: float, end_z: float,
|
||
safety_margin: float = 1.0) -> Optional[Dict[str, Any]]:
|
||
"""Check if a path intersects any obstacle"""
|
||
result = self._request('POST', '/obstacles/collision/path',
|
||
json={
|
||
'start': {'x': start_x, 'y': start_y, 'z': start_z},
|
||
'end': {'x': end_x, 'y': end_y, 'z': end_z},
|
||
'safety_margin': safety_margin
|
||
})
|
||
return result
|