first commit
Some checks failed
Rust / Clippy (push) Has been cancelled
Rust / Test (nightly) (push) Has been cancelled
Rust / Test (stable) (push) Has been cancelled

This commit is contained in:
2026-01-04 16:50:19 +08:00
commit 6675986579
60 changed files with 11043 additions and 0 deletions

30
libmaestro/Cargo.toml Normal file
View File

@@ -0,0 +1,30 @@
[package]
name = "maestro"
authors = ["Maximilian Luz <m@mxnluz.io>"]
version = "0.1.5"
edition = "2024"
license = "MIT/Apache-2.0"
description = "Maestro protocol client implementation for controlling Google Pixel Buds Pro"
repository = "https://github.com/qzed/pbpctrl"
[dependencies]
arrayvec = "0.7.6"
bytes = "1.10.1"
futures = "0.3.31"
num_enum = "0.7.3"
prost = "0.13.5"
tokio = { version = "1.44.2", features = ["macros"] }
tokio-util = { version = "0.7.14", features = ["codec"] }
tracing = "0.1.41"
uuid = "1.16.0"
[build-dependencies]
prost-build = "0.13.5"
[dev-dependencies]
anyhow = "1.0.98"
bluer = { version = "0.17.4", features = ["bluetoothd", "rfcomm"] }
futures = "0.3.31"
pretty-hex = "0.4.1"
tokio = { version = "1.44.2", features = ["rt", "macros", "signal"] }
tracing-subscriber = "0.3.19"

7
libmaestro/build.rs Normal file
View File

@@ -0,0 +1,7 @@
use std::io::Result;
fn main() -> Result<()> {
prost_build::compile_protos(&["proto/pw.rpc.packet.proto"], &["proto/"])?;
prost_build::compile_protos(&["proto/maestro_pw.proto"], &["proto/"])?;
Ok(())
}

View File

@@ -0,0 +1,89 @@
use std::time::Duration;
use anyhow::Result;
use bluer::{Address, Device, Session};
use bluer::rfcomm::{ProfileHandle, Role, ReqError, Stream, Profile};
use futures::StreamExt;
use maestro::pwrpc::Error;
use maestro::pwrpc::client::Client;
use maestro::pwrpc::types::RpcPacket;
pub async fn run_client<S, E>(mut client: Client<S>) -> Result<()>
where
S: futures::Sink<RpcPacket>,
S: futures::Stream<Item = Result<RpcPacket, E>> + Unpin,
Error: From<E>,
Error: From<S::Error>,
{
tokio::select! {
res = client.run() => {
res?;
},
sig = tokio::signal::ctrl_c() => {
sig?;
tracing::trace!("client termination requested");
},
}
client.terminate().await?;
Ok(())
}
pub async fn connect_maestro_rfcomm(session: &Session, dev: &Device) -> Result<Stream> {
let maestro_profile = Profile {
uuid: maestro::UUID,
role: Some(Role::Client),
require_authentication: Some(false),
require_authorization: Some(false),
auto_connect: Some(false),
..Default::default()
};
tracing::debug!("registering maestro profile");
let mut handle = session.register_profile(maestro_profile).await?;
tracing::debug!("connecting to maestro profile");
let stream = tokio::try_join!(
try_connect_profile(dev),
handle_requests_for_profile(&mut handle, dev.address()),
)?.1;
Ok(stream)
}
async fn try_connect_profile(dev: &Device) -> Result<()> {
const RETRY_TIMEOUT: Duration = Duration::from_secs(1);
const MAX_TRIES: u32 = 3;
let mut i = 0;
while let Err(err) = dev.connect_profile(&maestro::UUID).await {
if i >= MAX_TRIES { return Err(err.into()) }
i += 1;
tracing::warn!(error=?err, "connecting to profile failed, trying again ({}/{})", i, MAX_TRIES);
tokio::time::sleep(RETRY_TIMEOUT).await;
}
tracing::debug!(address=%dev.address(), "maestro profile connected");
Ok(())
}
async fn handle_requests_for_profile(handle: &mut ProfileHandle, address: Address) -> Result<Stream> {
while let Some(req) = handle.next().await {
tracing::debug!(address=%req.device(), "received new profile connection request");
if req.device() == address {
tracing::debug!(address=%req.device(), "accepting profile connection request");
return Ok(req.accept()?);
} else {
req.reject(ReqError::Rejected);
}
}
anyhow::bail!("profile terminated without requests")
}

View File

@@ -0,0 +1,139 @@
//! Simple example for reading battery info via the Maestro service.
//!
//! Usage:
//! cargo run --example maestro_get_battery -- <bluetooth-device-address>
mod common;
use std::str::FromStr;
use anyhow::bail;
use bluer::{Address, Session};
use futures::StreamExt;
use maestro::protocol::codec::Codec;
use maestro::protocol::types::RuntimeInfo;
use maestro::protocol::utils;
use maestro::pwrpc::client::{Client, ClientHandle};
use maestro::service::MaestroService;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
// handle command line arguments
let addr = std::env::args().nth(1).expect("need device address as argument");
let addr = Address::from_str(&addr)?;
// set up session
let session = Session::new().await?;
let adapter = session.default_adapter().await?;
println!("Using adapter '{}'", adapter.name());
// get device
let dev = adapter.device(addr)?;
let uuids = {
let mut uuids = Vec::from_iter(dev.uuids().await?
.unwrap_or_default()
.into_iter());
uuids.sort_unstable();
uuids
};
println!("Found device:");
println!(" alias: {}", dev.alias().await?);
println!(" address: {}", dev.address());
println!(" paired: {}", dev.is_paired().await?);
println!(" connected: {}", dev.is_connected().await?);
println!(" UUIDs:");
for uuid in uuids {
println!(" {}", uuid);
}
println!();
println!("Connecting to Maestro profile");
let stream = common::connect_maestro_rfcomm(&session, &dev).await?;
println!("Profile connected");
// set up stream for RPC communication
let codec = Codec::new();
let stream = codec.wrap(stream);
// set up RPC client
let mut client = Client::new(stream);
let handle = client.handle();
// retreive the channel numer
let channel = utils::resolve_channel(&mut client).await?;
let exec_task = common::run_client(client);
let battery_task = get_battery(handle, channel);
let info = tokio::select! {
res = exec_task => {
match res {
Ok(_) => bail!("client terminated unexpectedly without error"),
Err(e) => Err(e),
}
},
res = battery_task => res,
}?;
let info = info.battery_info
.expect("did not receive battery status in runtime-info-changed event");
println!("Battery status:");
if let Some(info) = info.case {
match info.state {
1 => println!(" case: {}% (not charging)", info.level),
2 => println!(" case: {}% (charging)", info.level),
x => println!(" case: {}% (unknown state: {})", info.level, x),
}
} else {
println!(" case: unknown");
}
if let Some(info) = info.left {
match info.state {
1 => println!(" left: {}% (not charging)", info.level),
2 => println!(" left: {}% (charging)", info.level),
x => println!(" left: {}% (unknown state: {})", info.level, x),
}
} else {
println!(" left: unknown");
}
if let Some(info) = info.right {
match info.state {
1 => println!(" right: {}% (not charging)", info.level),
2 => println!(" right: {}% (charging)", info.level),
x => println!(" right: {}% (unknown state: {})", info.level, x),
}
} else {
println!(" right: unknown");
}
Ok(())
}
async fn get_battery(handle: ClientHandle, channel: u32) -> anyhow::Result<RuntimeInfo> {
println!("Reading battery info...");
println!();
let mut service = MaestroService::new(handle, channel);
let mut call = service.subscribe_to_runtime_info()?;
let rt_info = if let Some(msg) = call.stream().next().await {
msg?
} else {
bail!("did not receive any runtime-info event");
};
call.cancel_and_wait().await?;
Ok(rt_info)
}

View File

@@ -0,0 +1,172 @@
//! Simple example for listening to Maestro messages sent via the RFCOMM channel.
//!
//! Usage:
//! cargo run --example maestro_listen -- <bluetooth-device-address>
mod common;
use std::str::FromStr;
use bluer::{Address, Session};
use futures::StreamExt;
use maestro::protocol::codec::Codec;
use maestro::protocol::utils;
use maestro::pwrpc::client::{Client, ClientHandle};
use maestro::service::{MaestroService, DosimeterService};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
// handle command line arguments
let addr = std::env::args().nth(1).expect("need device address as argument");
let addr = Address::from_str(&addr)?;
// set up session
let session = Session::new().await?;
let adapter = session.default_adapter().await?;
println!("Using adapter '{}'", adapter.name());
// get device
let dev = adapter.device(addr)?;
let uuids = {
let mut uuids = Vec::from_iter(dev.uuids().await?
.unwrap_or_default()
.into_iter());
uuids.sort_unstable();
uuids
};
println!("Found device:");
println!(" alias: {}", dev.alias().await?);
println!(" address: {}", dev.address());
println!(" paired: {}", dev.is_paired().await?);
println!(" connected: {}", dev.is_connected().await?);
println!(" UUIDs:");
for uuid in uuids {
println!(" {}", uuid);
}
println!();
// try to reconnect if connection is reset
loop {
println!("Connecting to Maestro profile");
let stream = common::connect_maestro_rfcomm(&session, &dev).await?;
println!("Profile connected");
// set up stream for RPC communication
let codec = Codec::new();
let stream = codec.wrap(stream);
// set up RPC client
let mut client = Client::new(stream);
let handle = client.handle();
// retreive the channel numer
let channel = utils::resolve_channel(&mut client).await?;
let exec_task = common::run_client(client);
let listen_task = run_listener(handle, channel);
tokio::select! {
res = exec_task => {
match res {
Ok(_) => {
tracing::trace!("client terminated successfully");
return Ok(());
},
Err(e) => {
tracing::error!("client task terminated with error");
let cause = e.root_cause();
if let Some(cause) = cause.downcast_ref::<std::io::Error>() {
if cause.raw_os_error() == Some(104) {
// The Pixel Buds Pro can hand off processing between each
// other. On a switch, the connection is reset. Wait a bit
// and then try to reconnect.
println!();
println!("Connection reset. Attempting to reconnect...");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
}
return Err(e);
},
}
},
res = listen_task => {
match res {
Ok(_) => {
tracing::error!("server terminated stream");
return Ok(());
}
Err(e) => {
tracing::error!("main task terminated with error");
return Err(e);
}
}
},
}
}
}
async fn run_listener(handle: ClientHandle, channel: u32) -> anyhow::Result<()> {
println!("Sending GetSoftwareInfo request");
println!();
let mut service = MaestroService::new(handle.clone(), channel);
let mut dosimeter = DosimeterService::new(handle, channel);
let info = service.get_software_info().await?;
println!("{:#?}", info);
let info = dosimeter.fetch_daily_summaries().await?;
println!("{:#?}", info);
println!();
println!("Listening to settings changes...");
println!();
let task_rtinfo = run_listener_rtinfo(service.clone());
let task_settings = run_listener_settings(service.clone());
let task_dosimeter = run_listener_dosimeter(dosimeter.clone());
tokio::select! {
res = task_rtinfo => res,
res = task_settings => res,
res = task_dosimeter => res,
}
}
async fn run_listener_rtinfo(mut service: MaestroService) -> anyhow::Result<()> {
let mut call = service.subscribe_to_runtime_info()?;
while let Some(msg) = call.stream().next().await {
println!("{:#?}", msg?);
}
Ok(())
}
async fn run_listener_settings(mut service: MaestroService) -> anyhow::Result<()> {
let mut call = service.subscribe_to_settings_changes()?;
while let Some(msg) = call.stream().next().await {
println!("{:#?}", msg?);
}
Ok(())
}
async fn run_listener_dosimeter(mut service: DosimeterService) -> anyhow::Result<()> {
let mut call = service.subscribe_to_live_db()?;
while let Some(msg) = call.stream().next().await {
println!("volume: {:#?} dB", (msg.unwrap().intensity.log10() * 10.0).round());
}
Ok(())
}

View File

@@ -0,0 +1,152 @@
//! Simple example for reading settings on the Pixel Buds Pro via the Maestro service.
//!
//! Usage:
//! cargo run --example maestro_read_settings -- <bluetooth-device-address>
mod common;
use std::str::FromStr;
use anyhow::bail;
use bluer::{Address, Session};
use maestro::protocol::codec::Codec;
use maestro::protocol::utils;
use maestro::pwrpc::client::{Client, ClientHandle};
use maestro::service::MaestroService;
use maestro::service::settings::{self, SettingId};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
// handle command line arguments
let addr = std::env::args().nth(1).expect("need device address as argument");
let addr = Address::from_str(&addr)?;
// set up session
let session = Session::new().await?;
let adapter = session.default_adapter().await?;
println!("Using adapter '{}'", adapter.name());
// get device
let dev = adapter.device(addr)?;
let uuids = {
let mut uuids = Vec::from_iter(dev.uuids().await?
.unwrap_or_default()
.into_iter());
uuids.sort_unstable();
uuids
};
println!("Found device:");
println!(" alias: {}", dev.alias().await?);
println!(" address: {}", dev.address());
println!(" paired: {}", dev.is_paired().await?);
println!(" connected: {}", dev.is_connected().await?);
println!(" UUIDs:");
for uuid in uuids {
println!(" {}", uuid);
}
println!();
println!("Connecting to Maestro profile");
let stream = common::connect_maestro_rfcomm(&session, &dev).await?;
println!("Profile connected");
// set up stream for RPC communication
let codec = Codec::new();
let stream = codec.wrap(stream);
// set up RPC client
let mut client = Client::new(stream);
let handle = client.handle();
// retreive the channel numer
let channel = utils::resolve_channel(&mut client).await?;
let exec_task = common::run_client(client);
let settings_task = read_settings(handle, channel);
tokio::select! {
res = exec_task => {
match res {
Ok(_) => bail!("client terminated unexpectedly without error"),
Err(e) => Err(e),
}
},
res = settings_task => res,
}
}
async fn read_settings(handle: ClientHandle, channel: u32) -> anyhow::Result<()> {
let mut service = MaestroService::new(handle.clone(), channel);
println!();
println!("Read via types:");
// read some typed settings via proxy structs
let value = service.read_setting(settings::id::AutoOtaEnable).await?;
println!(" Auto-OTA enabled: {}", value);
let value = service.read_setting(settings::id::OhdEnable).await?;
println!(" OHD enabled: {}", value);
let value = service.read_setting(settings::id::OobeIsFinished).await?;
println!(" OOBE finished: {}", value);
let value = service.read_setting(settings::id::GestureEnable).await?;
println!(" Gestures enabled: {}", value);
let value = service.read_setting(settings::id::DiagnosticsEnable).await?;
println!(" Diagnostics enabled: {}", value);
let value = service.read_setting(settings::id::OobeMode).await?;
println!(" OOBE mode: {}", value);
let value = service.read_setting(settings::id::GestureControl).await?;
println!(" Gesture control: {}", value);
let value = service.read_setting(settings::id::MultipointEnable).await?;
println!(" Multi-point enabled: {}", value);
let value = service.read_setting(settings::id::AncrGestureLoop).await?;
println!(" ANCR gesture loop: {}", value);
let value = service.read_setting(settings::id::CurrentAncrState).await?;
println!(" ANC status: {}", value);
let value = service.read_setting(settings::id::OttsMode).await?;
println!(" OTTS mode: {}", value);
let value = service.read_setting(settings::id::VolumeEqEnable).await?;
println!(" Volume-EQ enabled: {}", value);
let value = service.read_setting(settings::id::CurrentUserEq).await?;
println!(" Current user EQ: {}", value);
let value = service.read_setting(settings::id::VolumeAsymmetry).await?;
println!(" Volume balance/asymmetry: {}", value);
let value = service.read_setting(settings::id::SumToMono).await?;
println!(" Mono output: {}", value);
let value = service.read_setting(settings::id::VolumeExposureNotifications).await?;
println!(" Volume level exposure notifications: {}", value);
let value = service.read_setting(settings::id::SpeechDetection).await?;
println!(" Speech detection: {}", value);
// read settings via variant
println!();
println!("Read via variants:");
let value = service.read_setting(SettingId::GestureEnable).await?;
println!(" Gesture enable: {:?}", value);
Ok(())
}

View File

@@ -0,0 +1,106 @@
//! Simple example for changing settings on the Pixel Buds Pro via the Maestro service.
//!
//! Sets active nois ecancelling (ANC) state. 1: off, 2: active, 3: aware, 4.adaptive
//!
//! Usage:
//! cargo run --example maestro_write_settings -- <bluetooth-device-address> <anc-state>
mod common;
use std::str::FromStr;
use anyhow::bail;
use bluer::{Address, Session};
use maestro::protocol::utils;
use num_enum::FromPrimitive;
use maestro::protocol::codec::Codec;
use maestro::pwrpc::client::{Client, ClientHandle};
use maestro::service::MaestroService;
use maestro::service::settings::{AncState, SettingValue};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
// handle command line arguments
let addr = std::env::args().nth(1).expect("need device address as argument");
let addr = Address::from_str(&addr)?;
let anc_state = std::env::args().nth(2).expect("need ANC state as argument");
let anc_state = i32::from_str(&anc_state)?;
let anc_state = AncState::from_primitive(anc_state);
if let AncState::Unknown(x) = anc_state {
bail!("invalid ANC state {x}");
}
// set up session
let session = Session::new().await?;
let adapter = session.default_adapter().await?;
println!("Using adapter '{}'", adapter.name());
// get device
let dev = adapter.device(addr)?;
let uuids = {
let mut uuids = Vec::from_iter(dev.uuids().await?
.unwrap_or_default()
.into_iter());
uuids.sort_unstable();
uuids
};
println!("Found device:");
println!(" alias: {}", dev.alias().await?);
println!(" address: {}", dev.address());
println!(" paired: {}", dev.is_paired().await?);
println!(" connected: {}", dev.is_connected().await?);
println!(" UUIDs:");
for uuid in uuids {
println!(" {}", uuid);
}
println!();
println!("Connecting to Maestro profile");
let stream = common::connect_maestro_rfcomm(&session, &dev).await?;
println!("Profile connected");
// set up stream for RPC communication
let codec = Codec::new();
let stream = codec.wrap(stream);
// set up RPC client
let mut client = Client::new(stream);
let handle = client.handle();
// retreive the channel numer
let channel = utils::resolve_channel(&mut client).await?;
let exec_task = common::run_client(client);
let settings_task = read_settings(handle, channel, anc_state);
tokio::select! {
res = exec_task => {
match res {
Ok(_) => bail!("client terminated unexpectedly without error"),
Err(e) => Err(e),
}
},
res = settings_task => res,
}
}
async fn read_settings(handle: ClientHandle, channel: u32, anc_state: AncState) -> anyhow::Result<()> {
let mut service = MaestroService::new(handle.clone(), channel);
println!();
println!("Setting ANC status to '{}'", anc_state);
service.write_setting(SettingValue::CurrentAncrState(anc_state)).await?;
Ok(())
}

View File

@@ -0,0 +1,301 @@
syntax = "proto3";
package maestro_pw;
import "google/protobuf/empty.proto";
/* -- Maestro Service --------------------------------------------------------------------------- */
message SoftwareInfo {
int32 unknown2 = 2;
FirmwareInfo firmware = 4;
fixed64 unknown5 = 5;
bool unknown6 = 6;
}
message FirmwareInfo {
// Note: order might not be correct
FirmwareVersion case = 1;
FirmwareVersion right = 2;
FirmwareVersion left = 3;
}
message FirmwareVersion {
string unknown = 1;
string version_string = 2;
}
message HardwareInfo {
int32 unknown1 = 1;
int32 unknown2 = 2;
int32 unknown5 = 5;
int32 unknown6 = 6;
SerialNumbers serial_number = 7;
}
message SerialNumbers {
string case = 1;
string right = 2;
string left = 3;
}
message RuntimeInfo {
int64 timestamp_ms = 2; // maybe unix time in ms (consistent ~60s difference to actual time)
int32 unknown3 = 3;
BatteryInfo battery_info = 6;
PlacementInfo placement = 7;
}
message BatteryInfo {
DeviceBatteryInfo case = 1;
DeviceBatteryInfo left = 2;
DeviceBatteryInfo right = 3;
}
message DeviceBatteryInfo {
int32 level = 1; // battery level in percent
BatteryState state = 2;
}
enum BatteryState {
BATTERY_STATE_UNKNOWN = 0;
BATTERY_NOT_CHARGING = 1;
BATTERY_CHARGING = 2;
}
message PlacementInfo {
bool right_bud_in_case = 1;
bool left_bud_in_case = 2;
}
message WallClockMsg {
// TODO
}
message ReadSettingMsg {
oneof value_oneof {
AllegroSettingType settings_id = 4;
}
}
enum AllegroSettingType {
ALLEGRO_SETTING_TYPE_UNKNOWN = 0;
ALLEGRO_AUTO_OTA_ENABLE = 1;
ALLEGRO_OHD_ENABLE = 2;
ALLEGRO_OOBE_IS_FINISHED = 3;
ALLEGRO_GESTURE_ENABLE = 4;
ALLEGRO_DIAGNOSTICS_ENABLE = 5;
ALLEGRO_OOBE_MODE = 6;
ALLEGRO_GESTURE_CONTROL = 7;
ALLEGRO_ANC_ACCESSIBILITY_MODE = 8;
ALLEGRO_ANCR_STATE_ONE_BUD = 9;
ALLEGRO_ANCR_STATE_TWO_BUDS = 10;
ALLEGRO_MULTIPOINT_ENABLE = 11;
ALLEGRO_ANCR_GESTURE_LOOP = 12;
ALLEGRO_CURRENT_ANCR_STATE = 13;
ALLEGRO_OTTS_MODE = 14;
ALLEGRO_VOLUME_EQ_ENABLE = 15;
ALLEGRO_CURRENT_USER_EQ = 16;
ALLEGRO_VOLUME_ASYMMETRY = 17;
ALLEGRO_LAST_SAVED_USER_EQ = 18;
}
message WriteSettingMsg {
oneof value_oneof {
SettingValue setting = 4;
}
}
message SettingsRsp {
oneof value_oneof {
SettingValue value = 4;
}
}
message SettingValue {
oneof value_oneof {
bool auto_ota_enable = 1;
bool ohd_enable = 2; // on-head detection
bool oobe_is_finished = 3; // out-of-box experience?
bool gesture_enable = 4;
bool diagnostics_enable = 5;
bool oobe_mode = 6;
GestureControl gesture_control = 7;
// reading anc_accessibility_mode returns non-zero status (code: 2)
// reading ancr_state_one_bud returns non-zero status (code: 2)
// reading ancr_state_two_buds returns non-zero status (code: 2)
bool multipoint_enable = 11;
AncrGestureLoop ancr_gesture_loop = 12;
AncState current_ancr_state = 13;
int32 otts_mode = 14; // might be bool
bool volume_eq_enable = 15;
EqBands current_user_eq = 16;
int32 volume_asymmetry = 17; // value goes from 0 t0 200 (incl.), even/odd indicates left/right
// reading last_saved_user_eq returns non-zero status (code: 2)
bool sum_to_mono = 19;
// id 20 does not seem to exist (yet?)
bool volume_exposure_notifications = 21;
bool speech_detection = 22;
}
}
message GestureControl {
DeviceGestureControl left = 1;
DeviceGestureControl right = 2;
}
message DeviceGestureControl {
oneof value_oneof {
GestureControlType type = 4;
}
}
message GestureControlType {
RegularActionTarget value = 1;
}
enum RegularActionTarget {
ACTION_TARGET_UNKNOWN = 0;
ACTION_TARGET_CHECK_NOTIFICATIONS = 1;
ACTION_TARGET_PREVIOUS_TRACK_REPEAT = 2;
ACTION_TARGET_NEXT_TRACK = 3;
ACTION_TARGET_PLAY_PAUSE_TRACK = 4;
ACTION_TARGET_ANC_CONTROL = 5;
ACTION_TARGET_ASSISTANT_QUERY = 6;
}
message AncrGestureLoop {
bool active = 1;
bool off = 2;
bool aware = 3;
bool adaptive = 4;
}
enum AncState {
ANC_STATE_UNKNOWN = 0;
ANC_STATE_OFF = 1;
ANC_STATE_ACTIVE = 2;
ANC_STATE_AWARE = 3;
ANC_STATE_ADAPTIVE = 4;
}
message EqBands {
// bands go from -6.0 to 6.0
float low_bass = 1;
float bass = 2;
float mid = 3;
float treble = 4;
float upper_treble = 5;
}
message OobeActionRsp {
OobeAction action = 1;
}
enum OobeAction {
OOBE_ACTION_UNKNOWN = 0;
OOBE_ACTION_SINGLE_TAP = 1;
OOBE_ACTION_DOUBLE_TAP = 2;
OOBE_ACTION_TRIPLE_TAP = 3;
OOBE_ACTION_HOLD = 4;
OOBE_ACTION_SWIPE_FORWARD = 5;
OOBE_ACTION_SWIPE_BACKWARD = 6;
OOBE_ACTION_SWIPE_UP = 7;
OOBE_ACTION_SWIPE_DOWN = 8;
OOBE_ACTION_HOTWORD = 9;
OOBE_ACTION_LEFT_ON_HEAD = 10;
OOBE_ACTION_LEFT_OFF_HEAD = 11;
OOBE_ACTION_RIGHT_ON_HEAD = 12;
OOBE_ACTION_RIGHT_OFF_HEAD = 13;
OOBE_ACTION_SPECULATIVE_TAP = 14;
OOBE_ACTION_HOLD_END = 15;
OOBE_ACTION_HOLD_CANCEL = 16;
}
service Maestro {
rpc GetSoftwareInfo(google.protobuf.Empty) returns (SoftwareInfo) {}
rpc GetHardwareInfo(google.protobuf.Empty) returns (HardwareInfo) {}
rpc SubscribeRuntimeInfo(google.protobuf.Empty) returns (stream RuntimeInfo) {}
rpc SetWallClock(WallClockMsg) returns (google.protobuf.Empty) {}
rpc WriteSetting(WriteSettingMsg) returns (google.protobuf.Empty) {}
rpc ReadSetting(ReadSettingMsg) returns (SettingsRsp) {}
rpc SubscribeToSettingsChanges(google.protobuf.Empty) returns (stream SettingsRsp) {}
rpc SubscribeToOobeActions(google.protobuf.Empty) returns (stream OobeActionRsp) {}
}
/* -- Multipoint Service ------------------------------------------------------------------------ */
message QuietModeStatusEvent {
int32 source = 1;
}
message ForceMultipointSwitchMsg {
// TODO
}
service Multipoint {
rpc SubscribeToQuietModeStatus(google.protobuf.Empty) returns (stream QuietModeStatusEvent) {}
rpc ForceMultipointSwitch(ForceMultipointSwitchMsg) returns (google.protobuf.Empty) {}
}
/* -- EartipFitTest Service --------------------------------------------------------------------- */
message StartEartipFitTestMsg {
// TODO
}
message EndEartipFitTestMsg {
// TODO
}
message SubscribeToEartipFitTestResultsMsg {
// TODO
}
message EartipFitTestResult {
// TODO
}
service EartipFitTest {
rpc StartTest(StartEartipFitTestMsg) returns (google.protobuf.Empty) {}
rpc EndTest(StartEartipFitTestMsg) returns (google.protobuf.Empty) {}
rpc SubscribeToResults(SubscribeToEartipFitTestResultsMsg) returns (stream EartipFitTestResult) {}
}
/* -- JitterBuffer Service ---------------------------------------------------------------------- */
message SetJitterBufferSizePreferenceMsg {
// TODO
}
service JitterBuffer {
rpc SetJitterBufferSizePreference(SetJitterBufferSizePreferenceMsg) returns (google.protobuf.Empty) {}
}
/* -- Dosimeter Service ------------------------------------------------------------------------- */
message DosimeterSummaryEntry {
int32 unknown1 = 1;
float unknown6 = 6;
}
message DosimeterSummary {
int32 unknown1 = 1;
repeated DosimeterSummaryEntry unknown2 = 2;
int32 unknown4 = 4;
float unknown5 = 5;
}
message DosimeterLiveDbMsg {
float intensity = 2; // convert to dB via log10(x) * 10
}
service Dosimeter {
rpc FetchDailySummaries(google.protobuf.Empty) returns (DosimeterSummary) {}
rpc SubscribeToLiveDb(google.protobuf.Empty) returns (DosimeterLiveDbMsg) {}
}

View File

@@ -0,0 +1,87 @@
// Copied from pigweed RPC library with modifications.
// - Changed package specification.
// - Removed java-package option.
//
// Original copyright:
// Copyright 2020 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain a
// copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
syntax = "proto3";
package pw.rpc.packet;
enum PacketType {
// To simplify identifying the origin of a packet, client-to-server packets
// use even numbers and server-to-client packets use odd numbers.
// Client-to-server packets
// The client invokes an RPC. Always the first packet.
REQUEST = 0;
// A message in a client stream. Always sent after a REQUEST and before a
// CLIENT_STREAM_END.
CLIENT_STREAM = 2;
// The client received a packet for an RPC it did not request.
CLIENT_ERROR = 4;
// Deprecated, do not use. Send a CLIENT_ERROR with status CANCELLED instead.
// TODO(b/234879973): Remove this packet type.
DEPRECATED_CANCEL = 6;
// A client stream has completed.
CLIENT_STREAM_END = 8;
// Server-to-client packets
// The RPC has finished.
RESPONSE = 1;
// Deprecated, do not use. Formerly was used as the last packet in a server
// stream.
// TODO(b/234879973): Remove this packet type.
DEPRECATED_SERVER_STREAM_END = 3;
// The server was unable to process a request.
SERVER_ERROR = 5;
// A message in a server stream.
SERVER_STREAM = 7;
}
message RpcPacket {
// The type of packet. Determines which other fields are used.
PacketType type = 1;
// Channel through which the packet is sent.
uint32 channel_id = 2;
// Hash of the fully-qualified name of the service with which this packet is
// associated. For RPC packets, this is the service that processes the packet.
fixed32 service_id = 3;
// Hash of the name of the method which should process this packet.
fixed32 method_id = 4;
// The packet's payload, which is an encoded protobuf.
bytes payload = 5;
// Status code for the RPC response or error.
uint32 status = 6;
// Unique identifier for the call that initiated this RPC. Optionally set by
// the client in the initial request and sent in all subsequent client
// packets; echoed by the server.
uint32 call_id = 7;
}

View File

@@ -0,0 +1,72 @@
use super::{decoder, encoder, Frame};
use bytes::BytesMut;
use tokio::io::{AsyncWrite, AsyncRead};
use tokio_util::codec::Framed;
#[derive(Debug)]
pub enum DecoderError {
Io(std::io::Error),
Decoder(decoder::Error),
}
impl From<std::io::Error> for DecoderError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<decoder::Error> for DecoderError {
fn from(value: decoder::Error) -> Self {
Self::Decoder(value)
}
}
#[derive(Debug, Default)]
pub struct Codec {
dec: decoder::Decoder,
}
impl Codec {
pub fn new() -> Self {
Self { dec: decoder::Decoder::new() }
}
pub fn with_capacity(cap: usize) -> Self {
Self { dec: decoder::Decoder::with_capacity(cap) }
}
pub fn wrap<T>(self, io: T) -> Framed<T, Codec>
where
T: AsyncRead + AsyncWrite,
{
Framed::with_capacity(io, self, 4096 as _)
}
}
impl tokio_util::codec::Encoder<&Frame> for Codec {
type Error = std::io::Error;
fn encode(&mut self, frame: &Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
encoder::encode(dst, frame);
Ok(())
}
}
impl tokio_util::codec::Decoder for Codec {
type Item = Frame;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.dec.process(src) {
Ok(x) => Ok(x),
Err(e) => {
tracing::warn!("error decoding data: {e:?}");
Ok(None)
},
}
}
}

View File

@@ -0,0 +1,10 @@
//! Flag bytes and bit masks used in the HDLC encoding.
pub mod flags {
pub const FRAME: u8 = 0x7E;
pub const ESCAPE: u8 = 0x7D;
}
pub mod escape {
pub const MASK: u8 = 0x20;
}

109
libmaestro/src/hdlc/crc.rs Normal file
View File

@@ -0,0 +1,109 @@
//! 32-bit CRC implementation.
#[derive(Debug)]
pub struct Crc32 {
state: u32,
}
impl Crc32 {
pub fn new() -> Self {
Self::with_state(0xFFFFFFFF)
}
pub fn with_state(state: u32) -> Self {
Self { state }
}
pub fn reset(&mut self) {
self.state = 0xFFFFFFFF;
}
pub fn value(&self) -> u32 {
!self.state
}
pub fn put_u8(&mut self, byte: u8) -> &mut Self {
self.state = tables::CRC32[((self.state as u8) ^ byte) as usize] ^ (self.state >> 8);
self
}
pub fn put_bytes<'a, B: IntoIterator<Item=&'a u8>>(&mut self, bytes: B) -> &mut Self {
for b in bytes.into_iter().copied() {
self.put_u8(b);
}
self
}
}
impl Default for Crc32 {
fn default() -> Self {
Self::new()
}
}
pub fn crc32<'a, B: IntoIterator<Item=&'a u8>>(bytes: B) -> u32 {
Crc32::new().put_bytes(bytes).value()
}
mod tables {
pub const CRC32: [u32; 256] = [
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
];
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_crc32() {
assert_eq!(crc32(b"test test test"), 0x235b6a02);
assert_eq!(crc32(b"1234321"), 0xd981751c);
}
}

View File

@@ -0,0 +1,328 @@
use bytes::{Buf, BytesMut};
use super::consts;
use super::crc;
use super::varint;
use super::Frame;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Error {
UnexpectedData,
UnexpectedEndOfFrame,
InvalidChecksum,
InvalidEncoding,
InvalidFrame,
InvalidAddress,
BufferOverflow,
}
impl From<varint::DecodeError> for Error {
fn from(value: varint::DecodeError) -> Self {
match value {
varint::DecodeError::Incomplete => Self::InvalidFrame,
varint::DecodeError::Overflow => Self::InvalidAddress,
}
}
}
#[derive(Debug)]
pub struct Decoder {
buf: Vec<u8>,
state: (State, EscState),
current_frame_size: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
Discard,
Frame,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EscState {
Normal,
Escape,
}
impl Decoder {
pub fn new() -> Self {
Self::with_capacity(4096)
}
pub fn with_capacity(cap: usize) -> Self {
Self {
buf: Vec::with_capacity(cap),
state: (State::Discard, EscState::Normal),
current_frame_size: 0,
}
}
pub fn process(&mut self, buf: &mut BytesMut) -> Result<Option<Frame>, Error> {
if buf.is_empty() {
return Ok(None);
}
loop {
match self.state.0 {
State::Discard => {
// try to find the start of this frame
match find_frame_start(buf) {
// expected: immediate start of frame
Some(0) => {
self.state.0 = State::Frame;
buf.advance(1);
},
// unexpected: n bytes before start of frame
Some(n) => {
self.state.0 = State::Frame;
buf.advance(n + 1);
return Err(Error::UnexpectedData);
},
// unexpected: unknown amount of bytes before start of frame
None => {
// check whether the last byte might indicate a start
let n = if buf.last() == Some(&consts::flags::FRAME) {
buf.len() - 1
} else {
buf.len()
};
buf.advance(n);
return Err(Error::UnexpectedData);
},
}
},
State::Frame => {
// copy and decode to internal buffer
for (i, b) in buf.iter().copied().enumerate() {
match (b, self.state.1) {
(consts::flags::ESCAPE, EscState::Normal) => {
self.state.1 = EscState::Escape;
},
(consts::flags::ESCAPE, EscState::Escape) => {
buf.advance(i + 1);
self.reset();
return Err(Error::InvalidEncoding);
},
(consts::flags::FRAME, EscState::Normal) => {
buf.advance(i + 1);
return self.decode_buffered();
},
(consts::flags::FRAME, EscState::Escape) => {
buf.advance(i);
self.reset();
return Err(Error::UnexpectedEndOfFrame);
},
(b, EscState::Normal) => {
self.push_byte(b);
},
(b, EscState::Escape) => {
self.push_byte(b ^ consts::escape::MASK);
self.state.1 = EscState::Normal;
},
}
}
buf.advance(buf.remaining());
return Ok(None);
},
}
}
}
fn decode_buffered(&mut self) -> Result<Option<Frame>, Error> {
// validate minimum frame size
if self.buf.len() < 6 {
self.reset();
self.state.0 = State::Frame; // the next frame may already start
return Err(Error::InvalidFrame);
}
// validate checksum
let crc_actual = crc::crc32(&self.buf[..self.buf.len()-4]);
let crc_expect = self.buf[self.buf.len()-4..].try_into().unwrap();
let crc_expect = u32::from_le_bytes(crc_expect);
if crc_expect != crc_actual {
self.reset();
self.state.0 = State::Frame; // the next frame may already start
return Err(Error::InvalidChecksum);
}
// check for overflow
if self.current_frame_size > self.buf.len() {
self.reset();
return Err(Error::BufferOverflow);
}
// decode address
let (address, n) = varint::decode(&self.buf)?;
// validate minimum remaining frame size
if self.buf.len() < n + 5 {
self.reset();
return Err(Error::InvalidFrame);
}
// get control byte and data
let control = self.buf[n];
let data = self.buf[n+1..self.buf.len()-4].into();
let frame = Frame {
address,
control,
data,
};
self.reset();
Ok(Some(frame))
}
fn push_byte(&mut self, byte: u8) {
self.current_frame_size += 1;
if self.buf.len() < self.buf.capacity() {
self.buf.push(byte);
}
}
fn reset(&mut self) {
self.buf.clear();
self.state = (State::Discard, EscState::Normal);
self.current_frame_size = 0;
}
}
impl Default for Decoder {
fn default() -> Self {
Self::new()
}
}
fn find_frame_start(buf: &[u8]) -> Option<usize> {
buf.windows(2)
.enumerate()
.find(|(_, b)| b[0] == consts::flags::FRAME && b[1] != consts::flags::FRAME)
.map(|(i, _)| i)
}
#[cfg(test)]
mod test {
use bytes::BufMut;
use super::*;
#[test]
fn test_find_frame_start() {
let buf = [0x7E, 0x01, 0x02, 0x03];
assert_eq!(find_frame_start(&buf), Some(0));
let buf = [0x03, 0x02, 0x01, 0x00, 0x7E, 0x00, 0x01, 0x02, 0x03];
assert_eq!(find_frame_start(&buf), Some(4));
let buf = [0x03, 0x02, 0x01, 0x00, 0x7E, 0x7E, 0x00, 0x01, 0x02, 0x03];
assert_eq!(find_frame_start(&buf), Some(5));
let buf = [0x03, 0x02, 0x01, 0x00, 0x7E];
assert_eq!(find_frame_start(&buf), None);
let buf = [0x03, 0x02, 0x01, 0x00, 0x7E, 0x00];
assert_eq!(find_frame_start(&buf), Some(4));
let buf = [0x7E];
assert_eq!(find_frame_start(&buf), None);
let buf = [];
assert_eq!(find_frame_start(&buf), None);
}
#[test]
fn test_frame_decode() {
let data = [
// message
0x7e, 0x06, 0x08, 0x09, 0x03, 0x05, 0x06, 0x07, 0x7d, 0x5d,
0x7d, 0x5e, 0x7f, 0xff, 0xe6, 0x2d, 0x17, 0xc6, 0x7e,
// and trailing bytes
0x02, 0x01
];
let expect = Frame {
address: 0x010203,
control: 0x03,
data: vec![0x05, 0x06, 0x07, 0x7D, 0x7E, 0x7F, 0xFF].into(),
};
let mut dec = Decoder::new();
// test standard decoding
let mut buf = BytesMut::from(&data[..data.len()-2]);
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 0);
// test decoding with trailing bytes
let mut buf = BytesMut::from(&data[..data.len()]);
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 2);
assert_eq!(dec.process(&mut buf), Err(Error::UnexpectedData));
assert_eq!(buf.remaining(), 0);
// test partial decoding / re-entrancy
let mut buf = BytesMut::from(&data[..9]);
assert_eq!(dec.process(&mut buf), Ok(None));
assert_eq!(buf.remaining(), 0);
assert_eq!(dec.state, (State::Frame, EscState::Escape));
let mut buf = BytesMut::from(&data[9..data.len()-2]);
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 0);
// test decoding of subsequent frames
let mut buf = BytesMut::new();
buf.put_slice(&data[..data.len()-2]);
buf.put_slice(&data[..]);
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), data.len());
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 2);
// test decoding of cut-off frame / data loss (with frame being too small)
let mut buf = BytesMut::new();
buf.put_slice(&data[..5]);
buf.put_slice(&data[..]);
assert_eq!(dec.process(&mut buf), Err(Error::InvalidFrame));
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 2);
// test decoding of cut-off frame / data loss (with data being cut off)
let mut buf = BytesMut::new();
buf.put_slice(&data[..10]);
buf.put_slice(&data[..]);
assert_eq!(dec.process(&mut buf), Err(Error::InvalidChecksum));
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 2);
// test frame flag as escaped byte
let mut buf = BytesMut::from(&data[..10]);
buf.put_slice(&data[..]);
buf[9] = consts::flags::FRAME;
assert_eq!(dec.process(&mut buf), Err(Error::UnexpectedEndOfFrame));
assert_eq!(dec.process(&mut buf), Err(Error::UnexpectedData));
assert_eq!(dec.process(&mut buf), Ok(Some(expect.clone())));
assert_eq!(buf.remaining(), 2);
}
}

View File

@@ -0,0 +1,146 @@
use bytes::{BufMut, BytesMut};
use super::{consts, crc::Crc32, varint, Frame};
struct ByteEscape<B: BufMut> {
buf: B,
}
impl<B: BufMut> ByteEscape<B> {
fn new(buf: B) -> Self {
Self { buf }
}
fn put_u8(&mut self, byte: u8) {
match byte {
consts::flags::ESCAPE | consts::flags::FRAME => self.buf.put_slice(&[
consts::flags::ESCAPE,
consts::escape::MASK ^ byte
]),
_ => self.buf.put_u8(byte),
}
}
fn put_frame_flag(&mut self) {
self.buf.put_u8(super::consts::flags::FRAME)
}
}
impl ByteEscape<&mut BytesMut> {
fn reserve(&mut self, additional: usize) -> &mut Self {
self.buf.reserve(additional);
self
}
}
struct Encoder<B: BufMut> {
buf: ByteEscape<B>,
crc: Crc32,
}
impl<B: BufMut> Encoder<B> {
fn new(buf: B) -> Self {
Self {
buf: ByteEscape::new(buf),
crc: Crc32::new(),
}
}
fn flag(&mut self) -> &mut Self {
self.buf.put_frame_flag();
self
}
fn put_u8(&mut self, byte: u8) -> &mut Self {
self.crc.put_u8(byte);
self.buf.put_u8(byte);
self
}
fn put_bytes<T: IntoIterator<Item = u8>>(&mut self, bytes: T) -> &mut Self {
for b in bytes.into_iter() {
self.put_u8(b);
}
self
}
fn finalize(&mut self) {
self.put_bytes(self.crc.value().to_le_bytes());
self.flag();
}
}
impl Encoder<&mut BytesMut> {
fn reserve(&mut self, additional: usize) -> &mut Self {
self.buf.reserve(additional);
self
}
}
pub fn encode(buf: &mut BytesMut, frame: &Frame) {
Encoder::new(buf)
.reserve(frame.data.len() + 8) // reserve at least data-size + min-frame-size
.flag() // flag
.put_bytes(varint::encode(frame.address)) // address
.put_u8(frame.control) // control
.put_bytes(frame.data.iter().copied()) // data
.reserve(5) // reserve CRC32 + flag
.finalize() // checksum and flag
}
pub fn encode_bytes(frame: &Frame) -> BytesMut {
let mut buf = BytesMut::new();
encode(&mut buf, frame);
buf
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_escape_bytes() {
fn e(src: &[u8]) -> Vec<u8> {
let mut dst = Vec::new();
let mut buf = ByteEscape::new(&mut dst);
for byte in src {
buf.put_u8(*byte);
}
dst
}
assert_eq!(e(&[0x00, 0x00]), [0x00, 0x00]);
assert_eq!(e(&[0x7D]), [0x7D, 0x5D]);
assert_eq!(e(&[0x7E]), [0x7D, 0x5E]);
assert_eq!(e(&[0x01, 0x7D, 0x02]), [0x01, 0x7D, 0x5D, 0x02]);
assert_eq!(e(&[0x01, 0x7E, 0x02]), [0x01, 0x7D, 0x5E, 0x02]);
assert_eq!(e(&[0x7D, 0x7E]), [0x7D, 0x5D, 0x7D, 0x5E]);
assert_eq!(e(&[0x7F, 0x5D, 0x7E]), [0x7F, 0x5D, 0x7D, 0x5E]);
}
#[test]
fn test_encode() {
assert_eq!([
0x7e, 0x06, 0x08, 0x09, 0x03, 0x8b, 0x3b, 0xf7, 0x42, 0x7e,
], &encode_bytes(&Frame {
address: 0x010203,
control: 0x03,
data: vec![].into(),
})[..]);
assert_eq!([
0x7e, 0x06, 0x08, 0x09, 0x03, 0x05, 0x06, 0x07, 0x7d, 0x5d,
0x7d, 0x5e, 0x7f, 0xff, 0xe6, 0x2d, 0x17, 0xc6, 0x7e,
], &encode_bytes(&Frame {
address: 0x010203,
control: 0x03,
data: vec![0x05, 0x06, 0x07, 0x7d, 0x7e, 0x7f, 0xff].into(),
})[..]);
}
}

View File

@@ -0,0 +1,34 @@
//! High-level Data Link Control (HDLC) support library.
pub mod codec;
pub mod consts;
pub mod crc;
pub mod decoder;
pub mod encoder;
pub mod varint;
pub use codec::Codec;
use bytes::BytesMut;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Frame {
pub address: u32,
pub control: u8,
pub data: Box<[u8]>,
}
impl Frame {
pub fn decode(buf: &mut BytesMut) -> Result<Option<Self>, decoder::Error> {
decoder::Decoder::new().process(buf)
}
pub fn encode(&self, buf: &mut BytesMut) {
encoder::encode(buf, self)
}
pub fn encode_bytes(&self) -> BytesMut {
encoder::encode_bytes(self)
}
}

View File

@@ -0,0 +1,154 @@
//! Support for variable length integer encoding as used in the HDLC frame encoding.
use arrayvec::ArrayVec;
pub fn decode<'a, S: IntoIterator<Item = &'a u8>>(src: S) -> Result<(u32, usize), DecodeError> {
let mut address = 0;
for (i, b) in src.into_iter().copied().enumerate() {
address |= ((b >> 1) as u64) << (i * 7);
if address > u32::MAX as u64 {
Err(DecodeError::Overflow)?;
}
if b & 0x01 == 0x01 {
return Ok((address as u32, i + 1));
}
}
Err(DecodeError::Incomplete)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DecodeError {
Incomplete,
Overflow,
}
pub fn encode(num: u32) -> Encode {
Encode { num, done: false }
}
pub fn encode_vec(num: u32) -> ArrayVec<u8, { num_bytes(u32::MAX) }> {
encode(num).collect()
}
pub struct Encode {
num: u32,
done: bool,
}
impl Iterator for Encode {
type Item = u8;
fn next(&mut self) -> Option<u8> {
if (self.num >> 7) != 0 {
let b = ((self.num & 0x7F) as u8) << 1;
self.num >>= 7;
Some(b)
} else if !self.done {
let b = (((self.num & 0x7F) as u8) << 1) | 1;
self.done = true;
Some(b)
} else {
None
}
}
}
pub const fn num_bytes(value: u32) -> usize {
if value == 0 {
1
} else {
(u32::BITS - value.leading_zeros()).div_ceil(7) as _
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_decode() {
assert_eq!(decode(&[0x01]).unwrap(), (0x00, 1));
assert_eq!(decode(&[0x00, 0x00, 0x00, 0x01]).unwrap(), (0x00, 4));
assert_eq!(decode(&[0x11, 0x00]).unwrap(), (0x0008, 1));
assert_eq!(decode(&[0x10, 0x21]).unwrap(), (0x0808, 2));
assert_eq!(decode(&[0x01]).unwrap(), (0x00, 1));
assert_eq!(decode(&[0x03]).unwrap(), (0x01, 1));
assert_eq!(decode(&[0xff]).unwrap(), (0x7f, 1));
assert_eq!(decode(&[0x00, 0x03]).unwrap(), (0x80, 2));
assert_eq!(decode(&[0xfe, 0xff]).unwrap(), (0x3fff, 2));
assert_eq!(decode(&[0x00, 0x00, 0x03]).unwrap(), (0x4000, 3));
assert_eq!(decode(&[0xfe, 0xfe, 0xff]).unwrap(), (0x1f_ffff, 3));
assert_eq!(decode(&[0x00, 0x00, 0x00, 0x03]).unwrap(), (0x20_0000, 4));
assert_eq!(decode(&[0xfe, 0xfe, 0xfe, 0xff]).unwrap(), (0x0fff_ffff, 4));
assert_eq!(decode(&[0x00, 0x00, 0x00, 0x00, 0x03]).unwrap(), (0x1000_0000, 5));
assert_eq!(decode(&[0xfe, 0x03]).unwrap(), (u8::MAX as _, 2));
assert_eq!(decode(&[0xfe, 0xfe, 0x07]).unwrap(), (u16::MAX as _, 3));
assert_eq!(decode(&[0xfe, 0xfe, 0xfe, 0xfe, 0x1f]).unwrap(), (u32::MAX, 5));
assert_eq!(decode(&[0xFE]), Err(DecodeError::Incomplete));
assert_eq!(decode(&[0xFE, 0xFE, 0xFE, 0xFE, 0xFF]), Err(DecodeError::Overflow));
}
#[test]
fn test_encode() {
assert_eq!(encode_vec(0x01234)[..], [0x68, 0x49]);
assert_eq!(encode_vec(0x87654)[..], [0xa8, 0xd8, 0x43]);
assert_eq!(encode_vec(0x00)[..], [0x01]);
assert_eq!(encode_vec(0x01)[..], [0x03]);
assert_eq!(encode_vec(0x7f)[..], [0xff]);
assert_eq!(encode_vec(0x80)[..], [0x00, 0x03]);
assert_eq!(encode_vec(0x3fff)[..], [0xfe, 0xff]);
assert_eq!(encode_vec(0x4000)[..], [0x00, 0x00, 0x03]);
assert_eq!(encode_vec(0x1f_ffff)[..], [0xfe, 0xfe, 0xff]);
assert_eq!(encode_vec(0x20_0000)[..], [0x00, 0x00, 0x00, 0x03]);
assert_eq!(encode_vec(0x0fff_ffff)[..], [0xfe, 0xfe, 0xfe, 0xff]);
assert_eq!(encode_vec(0x1000_0000)[..], [0x00, 0x00, 0x00, 0x00, 0x03]);
assert_eq!(encode_vec(u8::MAX as _)[..], [0xfe, 0x03]);
assert_eq!(encode_vec(u16::MAX as _)[..], [0xfe, 0xfe, 0x07]);
assert_eq!(encode_vec(u32::MAX)[..], [0xfe, 0xfe, 0xfe, 0xfe, 0x1f]);
}
#[test]
fn test_num_bytes() {
assert_eq!(num_bytes(0x00), 1);
assert_eq!(num_bytes(0x01), 1);
assert_eq!(num_bytes(0x7f), 1);
assert_eq!(num_bytes(0x80), 2);
assert_eq!(num_bytes(0x3fff), 2);
assert_eq!(num_bytes(0x4000), 3);
assert_eq!(num_bytes(0x1f_ffff), 3);
assert_eq!(num_bytes(0x20_0000), 4);
assert_eq!(num_bytes(0x0fff_ffff), 4);
assert_eq!(num_bytes(0x1000_0000), 5);
assert_eq!(num_bytes(u8::MAX as _), 2);
assert_eq!(num_bytes(u16::MAX as _), 3);
assert_eq!(num_bytes(u32::MAX), 5);
}
}

15
libmaestro/src/lib.rs Normal file
View File

@@ -0,0 +1,15 @@
//! Library for the Maestro protocol used to change settings (ANC, equalizer,
//! etc.) on the Google Pixel Buds Pro. Might support other Pixel Buds, might
//! not.
use uuid::{uuid, Uuid};
/// UUID under which the Maestro protocol is advertised.
///
/// Defined as `25e97ff7-24ce-4c4c-8951-f764a708f7b5`.
pub const UUID: Uuid = uuid!("25e97ff7-24ce-4c4c-8951-f764a708f7b5");
pub mod hdlc;
pub mod protocol;
pub mod pwrpc;
pub mod service;

View File

@@ -0,0 +1,115 @@
use num_enum::{FromPrimitive, IntoPrimitive};
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, FromPrimitive, IntoPrimitive)]
pub enum Peer {
Unknown = 0,
Host = 1,
Case = 2,
LeftBtCore = 3,
RightBtCore = 4,
LeftSensorHub = 5,
RightSensorHub = 6,
LeftSpiBridge = 7,
RightSpiBridge = 8,
DebugApp = 9,
MaestroA = 10,
LeftTahiti = 11,
RightTahiti = 12,
MaestroB = 13,
#[num_enum(catch_all)]
Unrecognized(u8) = 0xff,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Address {
value: u32,
}
impl Address {
pub fn from_value(value: u32) -> Self {
Address { value }
}
pub fn from_peers(source: Peer, target: Peer) -> Self {
let source: u8 = source.into();
let target: u8 = target.into();
Self::from_value(((source as u32 & 0xf) << 6) | ((target as u32 & 0xf) << 10))
}
pub fn value(&self) -> u32 {
self.value
}
pub fn source(&self) -> Peer {
Peer::from_primitive(((self.value >> 6) & 0x0f) as u8)
}
pub fn target(&self) -> Peer {
Peer::from_primitive(((self.value >> 10) & 0x0f) as u8)
}
pub fn swap(&self) -> Self {
Self::from_peers(self.target(), self.source())
}
pub fn channel_id(&self) -> Option<u32> {
let source = self.source();
let target = self.target();
if source == Peer::MaestroA || source == Peer::MaestroB {
channel_id(source, target)
} else {
channel_id(target, source)
}
}
}
impl From<u32> for Address {
fn from(value: u32) -> Self {
Self::from_value(value)
}
}
impl From<(Peer, Peer)> for Address {
fn from(peers: (Peer, Peer)) -> Self {
Self::from_peers(peers.0, peers.1)
}
}
pub fn channel_id(local: Peer, remote: Peer) -> Option<u32> {
match (local, remote) {
(Peer::MaestroA, Peer::Case) => Some(18),
(Peer::MaestroA, Peer::LeftBtCore) => Some(19),
(Peer::MaestroA, Peer::LeftSensorHub) => Some(20),
(Peer::MaestroA, Peer::RightBtCore) => Some(21),
(Peer::MaestroA, Peer::RightSensorHub) => Some(22),
(Peer::MaestroB, Peer::Case) => Some(23),
(Peer::MaestroB, Peer::LeftBtCore) => Some(24),
(Peer::MaestroB, Peer::LeftSensorHub) => Some(25),
(Peer::MaestroB, Peer::RightBtCore) => Some(26),
(Peer::MaestroB, Peer::RightSensorHub) => Some(27),
(_, _) => None,
}
}
pub fn address_for_channel(channel: u32) -> Option<Address> {
match channel {
18 => Some(Address::from_peers(Peer::MaestroA, Peer::Case)),
19 => Some(Address::from_peers(Peer::MaestroA, Peer::LeftBtCore)),
20 => Some(Address::from_peers(Peer::MaestroA, Peer::LeftSensorHub)),
21 => Some(Address::from_peers(Peer::MaestroA, Peer::RightBtCore)),
22 => Some(Address::from_peers(Peer::MaestroA, Peer::RightSensorHub)),
23 => Some(Address::from_peers(Peer::MaestroB, Peer::Case)),
24 => Some(Address::from_peers(Peer::MaestroB, Peer::LeftBtCore)),
25 => Some(Address::from_peers(Peer::MaestroB, Peer::LeftSensorHub)),
26 => Some(Address::from_peers(Peer::MaestroB, Peer::RightBtCore)),
27 => Some(Address::from_peers(Peer::MaestroB, Peer::RightSensorHub)),
_ => None,
}
}

View File

@@ -0,0 +1,81 @@
use bytes::BytesMut;
use prost::Message;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Framed, Encoder};
use crate::pwrpc::types::RpcPacket;
use crate::hdlc;
use super::addr;
pub struct Codec {
hdlc: hdlc::Codec,
}
impl Codec {
pub fn new() -> Self {
Self {
hdlc: hdlc::Codec::new(),
}
}
pub fn wrap<T>(self, io: T) -> Framed<T, Codec>
where
T: AsyncRead + AsyncWrite,
{
Framed::with_capacity(io, self, 4096 as _)
}
}
impl Default for Codec {
fn default() -> Self {
Self::new()
}
}
impl Decoder for Codec {
type Item = RpcPacket;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.hdlc.decode(src)? {
Some(frame) => {
if frame.control != 0x03 {
tracing::warn!("unexpected control type: {}", frame.control);
return Ok(None);
}
let packet = RpcPacket::decode(&frame.data[..])?;
Ok(Some(packet))
}
None => Ok(None),
}
}
}
impl Encoder<&RpcPacket> for Codec {
type Error = std::io::Error;
fn encode(&mut self, packet: &RpcPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
let address = addr::address_for_channel(packet.channel_id).unwrap();
let frame = hdlc::Frame {
address: address.value(),
control: 0x03,
data: packet.encode_to_vec().into(), // TODO: can we avoid these allocations?
};
self.hdlc.encode(&frame, dst)
}
}
impl Encoder<RpcPacket> for Codec {
type Error = std::io::Error;
fn encode(&mut self, packet: RpcPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.encode(&packet, dst)
}
}

View File

@@ -0,0 +1,7 @@
pub mod addr;
pub mod codec;
pub mod utils;
pub mod types {
include!(concat!(env!("OUT_DIR"), "/maestro_pw.rs"));
}

View File

@@ -0,0 +1,73 @@
use crate::pwrpc::Error;
use crate::pwrpc::client::{Client, Request, UnaryResponse, ClientHandle};
use crate::pwrpc::id::PathRef;
use crate::pwrpc::types::RpcPacket;
use super::addr;
use super::addr::Peer;
use super::types::SoftwareInfo;
pub async fn resolve_channel<S, E>(client: &mut Client<S>) -> Result<u32, Error>
where
S: futures::Sink<RpcPacket>,
S: futures::Stream<Item = Result<RpcPacket, E>> + Unpin,
Error: From<E>,
Error: From<S::Error>,
{
tracing::trace!("resolving channel");
let channels = (
addr::channel_id(Peer::MaestroA, Peer::Case).unwrap(),
addr::channel_id(Peer::MaestroA, Peer::LeftBtCore).unwrap(),
addr::channel_id(Peer::MaestroA, Peer::RightBtCore).unwrap(),
addr::channel_id(Peer::MaestroB, Peer::Case).unwrap(),
addr::channel_id(Peer::MaestroB, Peer::LeftBtCore).unwrap(),
addr::channel_id(Peer::MaestroB, Peer::RightBtCore).unwrap(),
);
let tasks = (
try_open_channel(client.handle(), channels.0),
try_open_channel(client.handle(), channels.1),
try_open_channel(client.handle(), channels.2),
try_open_channel(client.handle(), channels.3),
try_open_channel(client.handle(), channels.4),
try_open_channel(client.handle(), channels.5),
);
let channel = tokio::select! {
// Ensure that the open() calls are registered before we start running
// the client.
biased;
res = tasks.0 => { res? },
res = tasks.1 => { res? },
res = tasks.2 => { res? },
res = tasks.3 => { res? },
res = tasks.4 => { res? },
res = tasks.5 => { res? },
res = client.run() => { res?; return Err(Error::aborted("client terminated")) }
};
tracing::trace!(channel=channel, "channel resolved");
Ok(channel)
}
async fn try_open_channel(mut handle: ClientHandle, channel_id: u32) -> Result<u32, Error> {
let path = PathRef::new("maestro_pw.Maestro/GetSoftwareInfo");
let service_id = path.service().hash();
let method_id = path.method().hash();
let req = Request {
channel_id,
service_id,
method_id,
call_id: 0xffffffff,
message: (),
};
let mut rsp: UnaryResponse<SoftwareInfo> = handle.open_unary(req)?;
rsp.result().await?;
Ok(channel_id)
}

View File

@@ -0,0 +1,960 @@
use std::pin::Pin;
use std::task::Poll;
use futures::{Sink, SinkExt, Stream, StreamExt};
use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream, FusedStream};
use prost::Message;
use super::id::Path;
use super::status::{Status, Error};
use super::types::{RpcType, RpcPacket, PacketType};
#[derive(Debug)]
pub struct Client<S> {
/// Stream for lower-level transport.
io_rx: SplitStream<S>,
/// Sink for lower-level transport.
io_tx: SplitSink<S, RpcPacket>,
/// Queue receiver for requests to be processed and sent by us.
queue_rx: mpsc::UnboundedReceiver<CallRequest>,
/// Queue sender for requests to be processed by us. Counter-part for
/// `queue_rx`, used by callers via `ClientHandle` to initiate new calls.
queue_tx: mpsc::UnboundedSender<CallRequest>,
/// Pending RPC calls, waiting for a response.
pending: Vec<Call>,
}
impl<S, E> Client<S>
where
S: Sink<RpcPacket>,
S: Stream<Item = Result<RpcPacket, E>> + Unpin,
Error: From<S::Error>,
Error: From<E>,
{
pub fn new(stream: S) -> Client<S> {
let (io_tx, io_rx) = stream.split();
let (queue_tx, queue_rx) = mpsc::unbounded();
Client {
io_rx,
io_tx,
queue_rx,
queue_tx,
pending: Vec::new(),
}
}
pub fn handle(&self) -> ClientHandle {
ClientHandle {
queue_tx: self.queue_tx.clone(),
}
}
pub async fn run(&mut self) -> Result<(), Error> {
// Process the request queue first in case we are trying to catch some
// early RPC responses via open() calls.
while let Ok(Some(request)) = self.queue_rx.try_next() {
self.process_request(request).await?;
}
loop {
tokio::select! {
packet = self.io_rx.next() => {
let packet = packet
.ok_or_else(|| Error::aborted("underlying IO stream closed"))??;
self.process_packet(packet).await?;
},
request = self.queue_rx.next() => {
// SAFETY: We hold both sender and receiver parts and are
// the only ones allowed to close this queue. Therefore, it
// will always be open here.
let request = request.expect("request queue closed unexpectedly");
self.process_request(request).await?;
},
}
}
}
pub async fn terminate(&mut self) -> Result<(), Error> {
tracing::trace!("terminating client");
// Collect messages to be sent instead of directly sending them. We
// process infallible (local) operations first, before we try to
// communicate with the RPC peer, which is fallible.
let mut send = Vec::new();
// Close our request queue.
self.queue_rx.close();
// Process all pending requests. Abort requests for new calls and
// send/forward any errors.
//
// SAFETY: try_next() can only return an error when the channel has not
// been closed yet.
while let Some(msg) = self.queue_rx.try_next().unwrap() {
match msg {
CallRequest::New { sender, .. } => {
// Drop new requests. Instead, notify caller with status 'aborted'.
let update = CallUpdate::Error { status: Status::Aborted };
let _ = sender.unbounded_send(update);
sender.close_channel();
},
CallRequest::Error { uid, code, tx } => {
// Process error requests as normal: Send error message to
// peer, remove and complete call.
if let Some(mut call) = self.find_and_remove_call(uid) {
call.complete_with_error(code).await;
if tx {
send.push((uid, code));
}
}
},
}
}
// Cancel all pending RPCs and remove them from the list.
for call in &mut self.pending {
call.complete_with_error(Status::Aborted).await;
send.push((call.uid, Status::Cancelled));
}
self.pending.clear();
// Define functions because async try-catch blocks aren't a thing yet...
async fn do_send<S, E>(client: &mut Client<S>, send: Vec<(CallUid, Status)>) -> Result<(), Error>
where
S: Sink<RpcPacket>,
S: Stream<Item = Result<RpcPacket, E>> + Unpin,
Error: From<S::Error>,
Error: From<E>,
{
for (uid, code) in send {
client.send_client_error(uid, code).await?;
}
Ok(())
}
async fn do_close<S, E>(client: &mut Client<S>) -> Result<(), Error>
where
S: Sink<RpcPacket>,
S: Stream<Item = Result<RpcPacket, E>> + Unpin,
Error: From<S::Error>,
Error: From<E>,
{
client.io_tx.close().await?;
Ok(())
}
// Try to send cancel/error messages.
let res_send = do_send(self, send).await;
// Try to close the transport.
let res_close = do_close(self).await;
// Return the first error.
res_send?;
res_close
}
async fn process_packet(&mut self, packet: RpcPacket) -> Result<(), Error> {
tracing::trace!(
"received packet: type=0x{:02x}, channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.r#type, packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
let ty = packet.r#type;
let ty = PacketType::try_from(ty);
match ty {
Ok(PacketType::Response) => {
self.rpc_complete(packet).await
},
Ok(PacketType::ServerError) => {
self.rpc_complete_with_error(packet).await
},
Ok(PacketType::ServerStream) => {
self.rpc_stream_push(packet).await?
},
Ok(_) => {
tracing::error!(
"unsupported packet type: type=0x{:02x}, channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.r#type, packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
},
Err(_) => {
tracing::error!(
"unknown packet type: type=0x{:02x}, channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.r#type, packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
},
}
Ok(())
}
async fn rpc_complete(&mut self, packet: RpcPacket) {
let uid = CallUid::from_packet(&packet);
let call = self.find_and_remove_call(uid);
match call {
Some(mut call) => { // pending call found, complete rpc
tracing::trace!(
"completing rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
if packet.status != 0 {
tracing::warn!(
"completing rpc with non-zero status: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, status={}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id, packet.status
);
}
let status = Status::from(packet.status);
call.complete(packet.payload, status).await;
},
None => { // no pending call found, silently drop packet
tracing::debug!(
"received response for non-pending rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
},
}
}
async fn rpc_complete_with_error(&mut self, packet: RpcPacket) {
let uid = CallUid::from_packet(&packet);
let call = self.find_and_remove_call(uid);
match call {
Some(mut call) => { // pending call found, complete rpc with error
tracing::trace!(
"completing rpc with error: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, status={}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id, packet.status
);
let status = Status::from(packet.status);
call.complete_with_error(status).await;
},
None => { // no pending call found, silently drop packet
tracing::debug!(
"received error for non-pending rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, status={}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id, packet.status
);
},
}
}
async fn rpc_stream_push(&mut self, packet: RpcPacket) -> Result<(), Error> {
let uid = CallUid::from_packet(&packet);
let call = self.find_call_mut(uid);
match call {
Some(call) => { // pending call found, forward packet to caller
tracing::trace!(
"pushing server stream packet to caller: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
if call.ty.has_server_stream() { // packet was expected, forward it
call.push_item(packet.payload).await;
} else { // this type of rpc doesn't expect streaming packets from the server
// SAFETY: We are the only ones that can add, remove, or
// otherwise modify items in-between the above find
// operation and this one as we have the lock.
let mut call = self.find_and_remove_call(uid).unwrap();
tracing::warn!(
"received stream packet for non-stream rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.channel_id, packet.service_id, packet.method_id, packet.call_id
);
call.complete_with_error(Status::InvalidArgument).await;
self.send_client_error(uid, Status::InvalidArgument).await?;
}
},
None => { // no pending call found, try to notify server
tracing::debug!(
"received stream packet for non-pending rpc: service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
packet.service_id, packet.method_id, packet.call_id
);
self.send_client_error(uid, Status::FailedPrecondition).await?;
},
}
Ok(())
}
async fn process_request(&mut self, request: CallRequest) -> Result<(), Error> {
match request {
CallRequest::New { ty, uid, payload, sender, tx } => {
let call = Call { ty, uid, sender };
let packet = RpcPacket {
r#type: PacketType::Request.into(),
channel_id: uid.channel,
service_id: uid.service,
method_id: uid.method,
payload,
status: Status::Ok as _,
call_id: uid.call,
};
let action = if tx { "starting" } else { "opening" };
tracing::trace!(
"{} rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
action, packet.channel_id, packet.service_id, packet.method_id, packet.call_id,
);
self.pending.push(call);
if tx {
self.send(packet).await?;
}
Ok(())
},
CallRequest::Error { uid, code, tx } => {
match self.find_and_remove_call(uid) {
Some(mut call) => {
tracing::trace!(
"cancelling active rpc with code: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, code={}",
uid.channel, uid.service, uid.method, uid.call, code as u32,
);
call.complete_with_error(code).await;
if tx {
self.send_client_error(uid, code).await?;
}
Ok(())
},
None => {
tracing::trace!(
"received error request for non-pending rpc: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, code={}",
uid.channel, uid.service, uid.method, uid.call, code as u32,
);
Ok(())
},
}
},
}
}
fn find_and_remove_call(&mut self, uid: CallUid) -> Option<Call> {
let index = self.pending.iter().position(|call| call.uid == uid);
match index {
Some(index) => Some(self.pending.remove(index)),
None => None,
}
}
fn find_call_mut(&mut self, uid: CallUid) -> Option<&mut Call> {
self.pending.iter_mut().find(|call| call.uid == uid)
}
async fn send_client_error(&mut self, uid: CallUid, status: Status) -> Result<(), Error> {
let status: u32 = status.into();
tracing::trace!(
"sending client error packet: status={}, channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}",
status, uid.channel, uid.service, uid.method, uid.call,
);
let error_packet = RpcPacket {
r#type: PacketType::ClientError as _,
channel_id: uid.channel,
service_id: uid.service,
method_id: uid.method,
call_id: uid.call,
payload: Vec::new(),
status,
};
self.send(error_packet).await
}
async fn send(&mut self, packet: RpcPacket) -> Result<(), Error> {
self.io_tx.send(packet).await?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ClientHandle {
queue_tx: mpsc::UnboundedSender<CallRequest>,
}
impl ClientHandle {
pub fn call_unary<M1, M2>(&mut self, request: Request<M1>) -> Result<UnaryResponse<M2>, Error>
where
M1: Message,
M2: Message + Default,
{
let handle = self.call(RpcType::Unary, request)?;
let response = UnaryResponse {
maker: std::marker::PhantomData,
handle,
};
Ok(response)
}
pub fn call_server_stream<M1, M2>(&mut self, request: Request<M1>) -> Result<StreamResponse<M2>, Error>
where
M1: Message,
M2: Message + Default,
{
let handle = self.call(RpcType::ServerStream, request)?;
let stream = StreamResponse {
marker: std::marker::PhantomData,
handle,
};
Ok(stream)
}
fn call<M>(&mut self, ty: RpcType, request: Request<M>) -> Result<CallHandle, Error>
where
M: Message,
{
let (sender, receiver) = mpsc::unbounded();
let uid = CallUid {
channel: request.channel_id,
service: request.service_id,
method: request.method_id,
call: request.call_id,
};
let payload = request.message.encode_to_vec();
let queue_tx = self.queue_tx.clone();
let request = CallRequest::New { ty, uid, payload, sender, tx: true };
let handle = CallHandle { uid, queue_tx, receiver, cancel_on_drop: true };
self.queue_tx.unbounded_send(request)
.map_err(|_| Error::aborted("the channel has been closed, no new calls are allowed"))?;
Ok(handle)
}
pub fn open_unary<M>(&mut self, request: Request<()>) -> Result<UnaryResponse<M>, Error>
where
M: Message + Default,
{
let handle = self.open(RpcType::Unary, request)?;
let response = UnaryResponse {
maker: std::marker::PhantomData,
handle,
};
Ok(response)
}
pub fn open_server_stream<M>(&mut self, request: Request<()>) -> Result<StreamResponse<M>, Error>
where
M: Message + Default,
{
let handle = self.open(RpcType::ServerStream, request)?;
let stream = StreamResponse {
marker: std::marker::PhantomData,
handle,
};
Ok(stream)
}
fn open<M>(&mut self, ty: RpcType, request: Request<M>) -> Result<CallHandle, Error>
where
M: Message,
{
let (sender, receiver) = mpsc::unbounded();
let uid = CallUid {
channel: request.channel_id,
service: request.service_id,
method: request.method_id,
call: request.call_id,
};
let payload = Vec::new();
let queue_tx = self.queue_tx.clone();
let request = CallRequest::New { ty, uid, payload, sender, tx: false };
let handle = CallHandle { uid, queue_tx, receiver, cancel_on_drop: false };
self.queue_tx.unbounded_send(request)
.map_err(|_| Error::aborted("the channel has been closed, no new calls are allowed"))?;
Ok(handle)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CallUid {
channel: u32,
service: u32,
method: u32,
call: u32,
}
impl CallUid {
fn from_packet(packet: &RpcPacket) -> Self {
Self {
channel: packet.channel_id,
service: packet.service_id,
method: packet.method_id,
call: packet.call_id
}
}
}
#[derive(Debug)]
enum CallRequest {
New {
ty: RpcType,
uid: CallUid,
payload: Vec<u8>,
sender: mpsc::UnboundedSender<CallUpdate>,
tx: bool,
},
Error {
uid: CallUid,
code: Status,
tx: bool,
},
}
#[derive(Debug)]
enum CallUpdate {
Complete {
data: Vec<u8>,
status: Status,
},
StreamItem {
data: Vec<u8>,
},
Error {
status: Status,
}
}
#[derive(Debug)]
struct Call {
ty: RpcType,
uid: CallUid,
sender: mpsc::UnboundedSender<CallUpdate>,
}
impl Call {
pub async fn complete(&mut self, payload: Vec<u8>, status: Status) {
let update = CallUpdate::Complete { data: payload, status };
self.push_update(update).await;
self.sender.close_channel();
}
pub async fn complete_with_error(&mut self, status: Status) {
let update = CallUpdate::Error { status };
self.push_update(update).await;
self.sender.close_channel();
}
pub async fn push_item(&mut self, payload: Vec<u8>) {
let update = CallUpdate::StreamItem { data: payload };
self.push_update(update).await;
}
async fn push_update(&mut self, update: CallUpdate) {
if let Err(e) = self.sender.unbounded_send(update) {
let update = e.into_inner();
match update {
CallUpdate::Complete { .. } => {
tracing::warn!(
"cannot send call update, caller is gone: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, update=complete",
self.uid.channel, self.uid.service, self.uid.method, self.uid.call,
)
},
CallUpdate::StreamItem { .. } => {
tracing::warn!(
"cannot send call update, caller is gone: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, update=stream",
self.uid.channel, self.uid.service, self.uid.method, self.uid.call,
)
},
CallUpdate::Error { status } => {
let code: u32 = status.into();
tracing::trace!(
"cannot send call update, caller is gone: channel_id=0x{:02x}, service_id=0x{:08x}, method_id=0x{:08x}, call_id=0x{:02x}, update=error, error={}",
self.uid.channel, self.uid.service, self.uid.method, self.uid.call, code,
)
},
}
}
}
}
impl Drop for Call {
fn drop(&mut self) {
// Notify caller that call has been aborted if the call has not been
// completed yet. Ignore errors.
if !self.sender.is_closed() {
let update = CallUpdate::Error { status: Status::Aborted };
let _ = self.sender.unbounded_send(update);
self.sender.close_channel();
}
}
}
struct CallHandle {
uid: CallUid,
queue_tx: mpsc::UnboundedSender<CallRequest>,
receiver: mpsc::UnboundedReceiver<CallUpdate>,
cancel_on_drop: bool,
}
impl CallHandle {
fn is_complete(&self) -> bool {
self.queue_tx.is_closed()
}
fn error(&mut self, code: Status, tx: bool) -> bool {
let request = CallRequest::Error { uid: self.uid, code, tx };
let ok = self.queue_tx.unbounded_send(request).is_ok();
// Sending an error will complete the RPC. Disconnect our queue end to
// prevent more errors/cancel-requests to be sent.
self.queue_tx.disconnect();
ok
}
fn abandon(&mut self) -> bool {
self.error(Status::Cancelled, false)
}
fn cancel_on_drop(&mut self, cancel: bool) {
self.cancel_on_drop = cancel
}
fn cancel(&mut self) -> bool {
self.error(Status::Cancelled, true)
}
async fn cancel_and_wait(&mut self) -> Result<(), Error> {
if !self.cancel() {
return Ok(())
}
loop {
match self.receiver.next().await {
Some(CallUpdate::StreamItem { .. }) => {
continue
},
Some(CallUpdate::Complete { .. }) => {
return Ok(())
},
Some(CallUpdate::Error { status: Status::Cancelled }) => {
return Ok(())
},
Some(CallUpdate::Error { status }) => {
return Err(Error::from(status))
},
None => {
return Ok(())
},
}
}
}
}
impl Drop for CallHandle {
fn drop(&mut self) {
if self.cancel_on_drop {
self.cancel();
} else {
self.abandon();
}
}
}
pub struct Request<M> {
pub channel_id: u32,
pub service_id: u32,
pub method_id: u32,
pub call_id: u32,
pub message: M,
}
pub struct UnaryResponse<M> {
maker: std::marker::PhantomData<M>,
handle: CallHandle,
}
impl<M> UnaryResponse<M>
where
M: Message + Default,
{
pub async fn result(&mut self) -> Result<M, Error> {
let update = match self.handle.receiver.next().await {
Some(update) => update,
None => return Err(Error::resource_exhausted("cannot fetch result() multiple times")),
};
let data = match update {
CallUpdate::Complete { data, status: Status::Ok } => data,
CallUpdate::Complete { status, .. } => return Err(Error::from(status)),
CallUpdate::Error { status } => return Err(Error::from(status)),
CallUpdate::StreamItem { .. } => unreachable!("received stream update on unary rpc"),
};
self.handle.queue_tx.disconnect();
let message = M::decode(&data[..])?;
Ok(message)
}
pub fn abandon(&mut self) -> bool {
self.handle.abandon()
}
pub fn cancel_on_drop(&mut self, cacnel: bool) {
self.handle.cancel_on_drop(cacnel)
}
pub fn cancel(&mut self) -> bool {
self.handle.cancel()
}
pub async fn cancel_and_wait(&mut self) -> Result<(), Error> {
self.handle.cancel_and_wait().await
}
pub fn is_complete(&self) -> bool {
self.handle.is_complete()
}
}
pub struct StreamResponse<M> {
marker: std::marker::PhantomData<M>,
handle: CallHandle,
}
impl<M> StreamResponse<M>
where
M: Message + Default,
{
pub fn stream(&mut self) -> ServerStream<'_, M> {
ServerStream {
marker: std::marker::PhantomData,
handle: &mut self.handle,
}
}
pub fn abandon(&mut self) -> bool {
self.handle.abandon()
}
pub fn cancel_on_drop(&mut self, cacnel: bool) {
self.handle.cancel_on_drop(cacnel)
}
pub fn cancel(&mut self) -> bool {
self.handle.cancel()
}
pub async fn cancel_and_wait(&mut self) -> Result<(), Error> {
self.handle.cancel_and_wait().await
}
pub fn is_complete(&self) -> bool {
self.handle.is_complete()
}
}
pub struct ServerStream<'a, M> {
marker: std::marker::PhantomData<&'a mut M>,
handle: &'a mut CallHandle,
}
impl<M> Stream for ServerStream<'_, M>
where
M: Message + Default,
{
type Item = Result<M, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let update = match Pin::new(&mut self.handle.receiver).poll_next(cx) {
Poll::Ready(Some(update)) => update,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};
let data = match update {
CallUpdate::StreamItem { data } => {
data
},
CallUpdate::Complete { .. } => {
// This indicates the end of the stream. The payload
// should be empty.
self.handle.receiver.close();
self.handle.queue_tx.disconnect();
return Poll::Ready(None);
},
CallUpdate::Error { status } => {
self.handle.receiver.close();
self.handle.queue_tx.disconnect();
return Poll::Ready(Some(Err(Error::from(status))));
},
};
let result = match M::decode(&data[..]) {
Ok(message) => {
Ok(message)
},
Err(e) => {
self.handle.error(Status::InvalidArgument, true);
Err(e.into())
},
};
Poll::Ready(Some(result))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.handle.receiver.size_hint()
}
}
impl<M> FusedStream for ServerStream<'_, M>
where
M: Message + Default,
{
fn is_terminated(&self) -> bool {
self.handle.receiver.is_terminated()
}
}
#[derive(Debug, Clone)]
pub struct UnaryRpc<M1, M2> {
marker1: std::marker::PhantomData<M1>,
marker2: std::marker::PhantomData<M2>,
path: Path,
}
impl<M1, M2> UnaryRpc<M1, M2>
where
M1: Message,
M2: Message + Default,
{
pub fn new(path: impl Into<Path>) -> Self {
Self {
marker1: std::marker::PhantomData,
marker2: std::marker::PhantomData,
path: path.into(),
}
}
pub fn call(&self, handle: &mut ClientHandle, channel_id: u32, call_id: u32, message: M1)
-> Result<UnaryResponse<M2>, Error>
{
let req = Request {
channel_id,
service_id: self.path.service().hash(),
method_id: self.path.method().hash(),
call_id,
message,
};
handle.call_unary(req)
}
pub fn open(&self, handle: &mut ClientHandle, channel_id: u32, call_id: u32)
-> Result<UnaryResponse<M2>, Error>
{
let req = Request {
channel_id,
service_id: self.path.service().hash(),
method_id: self.path.method().hash(),
call_id,
message: (),
};
handle.open_unary(req)
}
}
#[derive(Debug, Clone)]
pub struct ServerStreamRpc<M1, M2> {
marker1: std::marker::PhantomData<M1>,
marker2: std::marker::PhantomData<M2>,
path: Path,
}
impl<M1, M2> ServerStreamRpc<M1, M2>
where
M1: Message,
M2: Message + Default,
{
pub fn new(path: impl Into<Path>) -> Self {
Self {
marker1: std::marker::PhantomData,
marker2: std::marker::PhantomData,
path: path.into(),
}
}
pub fn call(&self, handle: &mut ClientHandle, channel_id: u32, call_id: u32, message: M1)
-> Result<StreamResponse<M2>, Error>
{
let req = Request {
channel_id,
service_id: self.path.service().hash(),
method_id: self.path.method().hash(),
call_id,
message,
};
handle.call_server_stream(req)
}
pub fn open(&self, handle: &mut ClientHandle, channel_id: u32, call_id: u32)
-> Result<StreamResponse<M2>, Error>
{
let req = Request {
channel_id,
service_id: self.path.service().hash(),
method_id: self.path.method().hash(),
call_id,
message: (),
};
handle.open_server_stream(req)
}
}

194
libmaestro/src/pwrpc/id.rs Normal file
View File

@@ -0,0 +1,194 @@
pub type Hash = u32;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Id {
name: String,
}
impl Id {
pub fn new(id: impl Into<String>) -> Self {
Self { name: id.into() }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn hash(&self) -> Hash {
hash::hash_65599(&self.name)
}
pub fn as_ref(&self) -> IdRef<'_> {
IdRef { name: &self.name }
}
}
impl<S> From<S> for Id
where
S: Into<String>
{
fn from(name: S) -> Self {
Id::new(name)
}
}
impl<'a> From<IdRef<'a>> for Id {
fn from(id: IdRef<'a>) -> Self {
Id::new(id.name())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IdRef<'a> {
name: &'a str,
}
impl<'a> IdRef<'a> {
pub fn new(name: &'a str) -> Self {
Self { name }
}
pub fn name(&self) -> &'a str {
self.name
}
pub fn hash(&self) -> Hash {
hash::hash_65599(self.name)
}
}
impl<'a> From<&'a str> for IdRef<'a> {
fn from(name: &'a str) -> Self {
IdRef::new(name)
}
}
impl<'a> From<&'a String> for IdRef<'a> {
fn from(name: &'a String) -> Self {
IdRef::new(name)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Path {
path: String,
split: usize,
}
impl Path {
pub fn new(path: impl Into<String>) -> Self {
let path = path.into();
let split = path.rfind('/').unwrap_or(0);
Path { path, split }
}
pub fn service(&self) -> IdRef<'_> {
IdRef::new(&self.path[..self.split])
}
pub fn method(&self) -> IdRef<'_> {
if self.split < self.path.len() {
IdRef::new(&self.path[self.split+1..])
} else {
IdRef::new(&self.path[0..0])
}
}
pub fn as_ref(&self) -> PathRef<'_> {
PathRef { path: &self.path, split: self.split }
}
}
impl From<&str> for Path {
fn from(name: &str) -> Self {
Path::new(name)
}
}
impl From<String> for Path {
fn from(name: String) -> Self {
Path::new(name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PathRef<'a> {
path: &'a str,
split: usize,
}
impl<'a> PathRef<'a> {
pub fn new(path: &'a str) -> Self {
let split = path.rfind('/').unwrap_or(0);
PathRef { path, split }
}
pub fn service(&self) -> IdRef<'a> {
IdRef::new(&self.path[..self.split])
}
pub fn method(&self) -> IdRef<'a> {
if self.split < self.path.len() {
IdRef::new(&self.path[self.split+1..])
} else {
IdRef::new(&self.path[0..0])
}
}
}
impl<'a> From<&'a str> for PathRef<'a> {
fn from(name: &'a str) -> Self {
PathRef::new(name)
}
}
mod hash {
const HASH_CONST: u32 = 65599;
pub fn hash_65599(id: &str) -> u32 {
let mut hash = id.len() as u32;
let mut coef = HASH_CONST;
for chr in id.chars() {
hash = hash.wrapping_add(coef.wrapping_mul(chr as u32));
coef = coef.wrapping_mul(HASH_CONST);
}
hash
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_known_id_hashes() {
assert_eq!(IdRef::new("maestro_pw.Maestro").hash(), 0x7ede71ea);
assert_eq!(IdRef::new("GetSoftwareInfo").hash(), 0x7199fa44);
assert_eq!(IdRef::new("SubscribeToSettingsChanges").hash(), 0x2821adf5);
}
#[test]
fn test_path() {
let pref = PathRef::new("maestro_pw.Maestro/GetSoftwareInfo");
assert_eq!(pref.service().name(), "maestro_pw.Maestro");
assert_eq!(pref.service().hash(), 0x7ede71ea);
assert_eq!(pref.method().name(), "GetSoftwareInfo");
assert_eq!(pref.method().hash(), 0x7199fa44);
let pref = PathRef::new("maestro_pw.Maestro/SubscribeToSettingsChanges");
assert_eq!(pref.service().name(), "maestro_pw.Maestro");
assert_eq!(pref.service().hash(), 0x7ede71ea);
assert_eq!(pref.method().name(), "SubscribeToSettingsChanges");
assert_eq!(pref.method().hash(), 0x2821adf5);
}
}

View File

@@ -0,0 +1,8 @@
pub mod client;
pub mod id;
pub mod types;
pub mod utils;
mod status;
pub use status::Error;
pub use status::Status;

View File

@@ -0,0 +1,236 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Status {
Ok = 0,
Cancelled = 1,
Unknown = 2,
InvalidArgument = 3,
DeadlineExceeded = 4,
NotFound = 5,
AlreadyExists = 6,
PermissionDenied = 7,
ResourceExhausted = 8,
FailedPrecondition = 9,
Aborted = 10,
OutOfRange = 11,
Unimplemented = 12,
Internal = 13,
Unavailable = 14,
DataLoss = 15,
Unauthenticated = 16,
}
impl Status {
pub fn description(&self) -> &'static str {
match self {
Status::Ok => "The operation completed successfully",
Status::Cancelled => "The operation was cancelled",
Status::Unknown => "Unknown error",
Status::InvalidArgument => "Client specified an invalid argument",
Status::DeadlineExceeded => "Deadline expired before operation could complete",
Status::NotFound => "Some requested entity was not found",
Status::AlreadyExists => "Some entity that we attempted to create already exists",
Status::PermissionDenied => "The caller does not have permission to execute the specified operation",
Status::ResourceExhausted => "Some resource has been exhausted",
Status::FailedPrecondition => "The system is not in a state required for the operation's execution",
Status::Aborted => "The operation was aborted",
Status::OutOfRange => "Operation was attempted past the valid range",
Status::Unimplemented => "Operation is not implemented or not supported",
Status::Internal => "Internal error",
Status::Unavailable => "The service is currently unavailable",
Status::DataLoss => "Unrecoverable data loss or corruption",
Status::Unauthenticated => "The request does not have valid authentication credentials",
}
}
}
impl std::fmt::Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description())
}
}
impl From<u32> for Status {
fn from(value: u32) -> Self {
match value {
0 => Status::Ok,
1 => Status::Cancelled,
2 => Status::Unknown,
3 => Status::InvalidArgument,
4 => Status::DeadlineExceeded,
5 => Status::NotFound,
6 => Status::AlreadyExists,
7 => Status::PermissionDenied,
8 => Status::ResourceExhausted,
9 => Status::FailedPrecondition,
10 => Status::Aborted,
11 => Status::OutOfRange,
12 => Status::Unimplemented,
13 => Status::Internal,
14 => Status::Unavailable,
15 => Status::DataLoss,
16 => Status::Unauthenticated,
_ => Status::Unknown,
}
}
}
impl From<Status> for u32 {
fn from(value: Status) -> Self {
value as _
}
}
#[derive(Debug)]
pub struct Error {
code: Status,
message: String,
source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
}
impl Error {
pub fn new(code: Status, message: impl Into<String>) -> Self {
Self {
code,
message: message.into(),
source: None,
}
}
pub fn cancelled(message: impl Into<String>) -> Self {
Self::new(Status::Cancelled, message)
}
pub fn unknown(message: impl Into<String>) -> Self {
Self::new(Status::Unknown, message)
}
pub fn invalid_argument(message: impl Into<String>) -> Self {
Self::new(Status::InvalidArgument, message)
}
pub fn deadline_exceeded(message: impl Into<String>) -> Self {
Self::new(Status::DeadlineExceeded, message)
}
pub fn not_found(message: impl Into<String>) -> Self {
Self::new(Status::NotFound, message)
}
pub fn already_exists(message: impl Into<String>) -> Self {
Self::new(Status::AlreadyExists, message)
}
pub fn permission_denied(message: impl Into<String>) -> Self {
Self::new(Status::PermissionDenied, message)
}
pub fn resource_exhausted(message: impl Into<String>) -> Self {
Self::new(Status::ResourceExhausted, message)
}
pub fn failed_precondition(message: impl Into<String>) -> Self {
Self::new(Status::FailedPrecondition, message)
}
pub fn aborted(message: impl Into<String>) -> Self {
Self::new(Status::Aborted, message)
}
pub fn out_of_range(message: impl Into<String>) -> Self {
Self::new(Status::OutOfRange, message)
}
pub fn unimplemented(message: impl Into<String>) -> Self {
Self::new(Status::Unimplemented, message)
}
pub fn internal(message: impl Into<String>) -> Self {
Self::new(Status::Internal, message)
}
pub fn unavailable(message: impl Into<String>) -> Self {
Self::new(Status::Unavailable, message)
}
pub fn data_loss(message: impl Into<String>) -> Self {
Self::new(Status::DataLoss, message)
}
pub fn unauthenticated(message: impl Into<String>) -> Self {
Self::new(Status::Unauthenticated, message)
}
pub fn extend(
code: Status,
message: impl Into<String>,
error: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
) -> Self {
Self {
code,
message: message.into(),
source: Some(error.into()),
}
}
pub fn code(&self) -> Status {
self.code
}
pub fn message(&self) -> &str {
&self.message
}
}
impl From<Status> for Error {
fn from(code: Status) -> Self {
Self::new(code, code.description())
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
use std::io::ErrorKind;
let code = match err.kind() {
ErrorKind::BrokenPipe
| ErrorKind::WouldBlock
| ErrorKind::WriteZero
| ErrorKind::Interrupted => Status::Internal,
ErrorKind::ConnectionRefused
| ErrorKind::ConnectionReset
| ErrorKind::NotConnected
| ErrorKind::AddrInUse
| ErrorKind::AddrNotAvailable => Status::Unavailable,
ErrorKind::AlreadyExists => Status::AlreadyExists,
ErrorKind::ConnectionAborted => Status::Aborted,
ErrorKind::InvalidData => Status::DataLoss,
ErrorKind::InvalidInput => Status::InvalidArgument,
ErrorKind::NotFound => Status::NotFound,
ErrorKind::PermissionDenied => Status::PermissionDenied,
ErrorKind::TimedOut => Status::DeadlineExceeded,
ErrorKind::UnexpectedEof => Status::OutOfRange,
_ => Status::Unknown,
};
Error::extend(code, err.to_string(), err)
}
}
impl From<prost::DecodeError> for Error {
fn from(error: prost::DecodeError) -> Self {
Self::extend(Status::InvalidArgument, "failed to decode message", error)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "status: {:?}, message: {:?}", self.code, self.message)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|err| (&**err) as _)
}
}

View File

@@ -0,0 +1,31 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RpcType {
Unary,
ServerStream,
ClientStream,
BidirectionalStream,
}
impl RpcType {
pub fn has_server_stream(&self) -> bool {
match *self {
RpcType::ServerStream | RpcType::BidirectionalStream => true,
RpcType::Unary | RpcType::ClientStream => false,
}
}
pub fn has_client_stream(&self) -> bool {
match *self {
RpcType::ClientStream | RpcType::BidirectionalStream => true,
RpcType::Unary | RpcType::ServerStream => false,
}
}
}
mod generated {
include!(concat!(env!("OUT_DIR"), "/pw.rpc.packet.rs"));
}
pub use generated::PacketType;
pub use generated::RpcPacket;

View File

@@ -0,0 +1,56 @@
//! Miscellaneous utilities and helpers.
use bytes::{Buf, BufMut};
/// An encoded protobuf message.
///
/// This type represents an encoded protobuf message. Decoding and encoding are
/// essentially no-ops, reading and writing to/from the internal buffer. It is
/// a drop-in replacement for any valid (and invalid) protobuf type.
///
/// This type is intended for reverse-engineering and testing, e.g., in
/// combination with tools like `protoscope`.
#[derive(Clone, Default)]
pub struct EncodedMessage {
pub data: Vec<u8>,
}
impl std::fmt::Debug for EncodedMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x?}", self.data)
}
}
impl prost::Message for EncodedMessage {
fn encode_raw(&self, buf: &mut impl BufMut) {
buf.put_slice(&self.data[..])
}
fn merge_field(
&mut self,
_tag: u32,
_wire_type: prost::encoding::WireType,
_buf: &mut impl Buf,
_ctx: prost::encoding::DecodeContext,
) -> Result<(), prost::DecodeError> {
unimplemented!("use merge() instead")
}
fn merge(&mut self, mut buf: impl Buf) -> Result<(), prost::DecodeError> {
let a = self.data.len();
let b = a + buf.remaining();
self.data.resize(b, 0);
buf.copy_to_slice(&mut self.data[a..b]);
Ok(())
}
fn encoded_len(&self) -> usize {
self.data.len()
}
fn clear(&mut self) {
self.data.clear()
}
}

View File

@@ -0,0 +1,36 @@
use crate::protocol::types::{
DosimeterSummary, DosimeterLiveDbMsg,
};
use crate::pwrpc::client::{ClientHandle, ServerStreamRpc, StreamResponse, UnaryRpc};
use crate::pwrpc::Error;
#[derive(Debug, Clone)]
pub struct DosimeterService {
client: ClientHandle,
channel_id: u32,
rpc_fetch_daily_summaries: UnaryRpc<(), DosimeterSummary>,
rpc_sub_live_db: ServerStreamRpc<(), DosimeterLiveDbMsg>,
}
impl DosimeterService {
pub fn new(client: ClientHandle, channel_id: u32) -> Self {
Self {
client,
channel_id,
rpc_fetch_daily_summaries: UnaryRpc::new("maestro_pw.Dosimeter/FetchDailySummaries"),
rpc_sub_live_db: ServerStreamRpc::new("maestro_pw.Dosimeter/SubscribeToLiveDb"),
}
}
pub async fn fetch_daily_summaries(&mut self) -> Result<DosimeterSummary, Error> {
self.rpc_fetch_daily_summaries.call(&mut self.client, self.channel_id, 0, ())?
.result().await
}
pub fn subscribe_to_live_db(&mut self) -> Result<StreamResponse<DosimeterLiveDbMsg>, Error> {
self.rpc_sub_live_db.call(&mut self.client, self.channel_id, 0, ())
}
}

View File

@@ -0,0 +1,117 @@
use crate::protocol::types::{
self, read_setting_msg, settings_rsp, write_setting_msg, HardwareInfo, OobeActionRsp,
ReadSettingMsg, RuntimeInfo, SettingsRsp, SoftwareInfo, WriteSettingMsg,
};
use crate::pwrpc::client::{ClientHandle, ServerStreamRpc, StreamResponse, UnaryRpc};
use crate::pwrpc::Error;
use crate::service::settings::{Setting, SettingId, SettingValue};
#[derive(Debug, Clone)]
pub struct MaestroService {
client: ClientHandle,
channel_id: u32,
rpc_get_software_info: UnaryRpc<(), SoftwareInfo>,
rpc_get_hardware_info: UnaryRpc<(), HardwareInfo>,
rpc_sub_runtime_info: ServerStreamRpc<(), RuntimeInfo>,
rpc_write_setting: UnaryRpc<WriteSettingMsg, ()>,
rpc_read_setting: UnaryRpc<ReadSettingMsg, SettingsRsp>,
rpc_sub_settings_changes: ServerStreamRpc<(), SettingsRsp>,
rpc_sub_oobe_actions: ServerStreamRpc<(), OobeActionRsp>,
}
impl MaestroService {
pub fn new(client: ClientHandle, channel_id: u32) -> Self {
Self {
client,
channel_id,
rpc_get_software_info: UnaryRpc::new("maestro_pw.Maestro/GetSoftwareInfo"),
rpc_get_hardware_info: UnaryRpc::new("maestro_pw.Maestro/GetHardwareInfo"),
rpc_sub_runtime_info: ServerStreamRpc::new("maestro_pw.Maestro/SubscribeRuntimeInfo"),
rpc_write_setting: UnaryRpc::new("maestro_pw.Maestro/WriteSetting"),
rpc_read_setting: UnaryRpc::new("maestro_pw.Maestro/ReadSetting"),
rpc_sub_settings_changes: ServerStreamRpc::new("maestro_pw.Maestro/SubscribeToSettingsChanges"),
rpc_sub_oobe_actions: ServerStreamRpc::new("maestro_pw.Maestro/SubscribeToOobeActions"),
}
}
pub async fn get_software_info(&mut self) -> Result<SoftwareInfo, Error> {
self.rpc_get_software_info.call(&mut self.client, self.channel_id, 0, ())?
.result().await
}
pub async fn get_hardware_info(&mut self) -> Result<HardwareInfo, Error> {
self.rpc_get_hardware_info.call(&mut self.client, self.channel_id, 0, ())?
.result().await
}
pub fn subscribe_to_runtime_info(&mut self) -> Result<StreamResponse<RuntimeInfo>, Error> {
self.rpc_sub_runtime_info.call(&mut self.client, self.channel_id, 0, ())
}
pub async fn write_setting_raw(&mut self, setting: WriteSettingMsg) -> Result<(), Error> {
self.rpc_write_setting.call(&mut self.client, self.channel_id, 0, setting)?
.result().await
}
pub async fn write_setting(&mut self, setting: SettingValue) -> Result<(), Error> {
let setting = types::SettingValue {
value_oneof: Some(setting.into()),
};
let setting = WriteSettingMsg {
value_oneof: Some(write_setting_msg::ValueOneof::Setting(setting)),
};
self.write_setting_raw(setting).await
}
pub async fn read_setting_raw(&mut self, setting: ReadSettingMsg) -> Result<SettingsRsp, Error> {
self.rpc_read_setting.call(&mut self.client, self.channel_id, 0, setting)?
.result().await
}
pub async fn read_setting_var(&mut self, setting: SettingId) -> Result<SettingValue, Error> {
let setting = read_setting_msg::ValueOneof::SettingsId(setting.into());
let setting = ReadSettingMsg { value_oneof: Some(setting) };
let value = self.read_setting_raw(setting).await?;
let value = value.value_oneof
.ok_or_else(|| Error::invalid_argument("did not receive any settings value"))?;
let settings_rsp::ValueOneof::Value(value) = value;
let value = value.value_oneof
.ok_or_else(|| Error::invalid_argument("did not receive any settings value"))?;
Ok(value.into())
}
pub async fn read_setting<T>(&mut self, setting: T) -> Result<T::Type, Error>
where
T: Setting,
{
let value = self.read_setting_var(setting.id()).await?;
T::from_var(value)
.ok_or_else(|| Error::invalid_argument("failed to decode settings value"))
}
pub fn subscribe_to_settings_changes(&mut self) -> Result<StreamResponse<SettingsRsp>, Error> {
self.rpc_sub_settings_changes.call(&mut self.client, self.channel_id, 0, ())
}
pub fn subscribe_to_oobe_actions(&mut self) -> Result<StreamResponse<OobeActionRsp>, Error> {
self.rpc_sub_oobe_actions.call(&mut self.client, self.channel_id, 0, ())
}
// TODO:
// - SetWallClock
}

View File

@@ -0,0 +1,8 @@
mod dosimeter;
pub use self::dosimeter::DosimeterService;
mod maestro;
pub use self::maestro::MaestroService;
mod multipoint;
pub use self::multipoint::MultipointService;

View File

@@ -0,0 +1,30 @@
use crate::protocol::types::QuietModeStatusEvent;
use crate::pwrpc::client::{ClientHandle, ServerStreamRpc, StreamResponse};
use crate::pwrpc::Error;
#[derive(Debug, Clone)]
pub struct MultipointService {
client: ClientHandle,
channel_id: u32,
rpc_sub_quiet_mode_status: ServerStreamRpc<(), QuietModeStatusEvent>,
}
impl MultipointService {
pub fn new(client: ClientHandle, channel_id: u32) -> Self {
Self {
client,
channel_id,
rpc_sub_quiet_mode_status: ServerStreamRpc::new("maestro_pw.Multipoint/SubscribeToQuietModeStatus"),
}
}
pub fn subscribe_to_quiet_mode_status(&mut self) -> Result<StreamResponse<QuietModeStatusEvent>, Error> {
self.rpc_sub_quiet_mode_status.call(&mut self.client, self.channel_id, 0, ())
}
// TODO:
// - ForceMultipointSwitch
}

View File

@@ -0,0 +1,4 @@
pub mod settings;
mod impls;
pub use impls::{MaestroService, MultipointService, DosimeterService};

View File

@@ -0,0 +1,829 @@
use num_enum::{IntoPrimitive, FromPrimitive};
use crate::protocol::types;
#[repr(i32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, FromPrimitive)]
pub enum SettingId {
AutoOtaEnable = 1,
OhdEnable = 2,
OobeIsFinished = 3,
GestureEnable = 4,
DiagnosticsEnable = 5,
OobeMode = 6,
GestureControl = 7,
AncAccessibilityMode = 8,
AncrStateOneBud = 9,
AncrStateTwoBuds = 10,
MultipointEnable = 11,
AncrGestureLoop = 12,
CurrentAncrState = 13,
OttsMode = 14,
VolumeEqEnable = 15,
CurrentUserEq = 16,
VolumeAsymmetry = 17,
LastSavedUserEq = 18,
SumToMono = 19,
VolumeExposureNotifications = 21,
SpeechDetection = 22,
#[num_enum(catch_all)]
Unknown(i32),
}
#[derive(Debug, Clone, PartialEq)]
pub enum SettingValue {
AutoOtaEnable(bool),
OhdEnable(bool),
OobeIsFinished(bool),
GestureEnable(bool),
DiagnosticsEnable(bool),
OobeMode(bool),
GestureControl(GestureControl),
MultipointEnable(bool),
AncrGestureLoop(AncrGestureLoop),
CurrentAncrState(AncState),
OttsMode(i32),
VolumeEqEnable(bool),
CurrentUserEq(EqBands),
VolumeAsymmetry(VolumeAsymmetry),
SumToMono(bool),
VolumeExposureNotifications(bool),
SpeechDetection(bool),
}
impl SettingValue {
pub fn id(&self) -> SettingId {
match self {
SettingValue::AutoOtaEnable(_) => SettingId::AutoOtaEnable,
SettingValue::OhdEnable(_) => SettingId::OhdEnable,
SettingValue::OobeIsFinished(_) => SettingId::OobeIsFinished,
SettingValue::GestureEnable(_) => SettingId::GestureEnable,
SettingValue::DiagnosticsEnable(_) => SettingId::DiagnosticsEnable,
SettingValue::OobeMode(_) => SettingId::OobeMode,
SettingValue::GestureControl(_) => SettingId::GestureControl,
SettingValue::MultipointEnable(_) => SettingId::MultipointEnable,
SettingValue::AncrGestureLoop(_) => SettingId::AncrGestureLoop,
SettingValue::CurrentAncrState(_) => SettingId::CurrentAncrState,
SettingValue::OttsMode(_) => SettingId::OttsMode,
SettingValue::VolumeEqEnable(_) => SettingId::VolumeEqEnable,
SettingValue::CurrentUserEq(_) => SettingId::CurrentUserEq,
SettingValue::VolumeAsymmetry(_) => SettingId::VolumeAsymmetry,
SettingValue::SumToMono(_) => SettingId::SumToMono,
SettingValue::VolumeExposureNotifications(_) => SettingId::VolumeExposureNotifications,
SettingValue::SpeechDetection(_) => SettingId::SpeechDetection,
}
}
}
impl From<types::setting_value::ValueOneof> for SettingValue {
fn from(value: crate::protocol::types::setting_value::ValueOneof) -> Self {
use types::setting_value::ValueOneof;
match value {
ValueOneof::AutoOtaEnable(x) => SettingValue::AutoOtaEnable(x),
ValueOneof::OhdEnable(x) => SettingValue::OhdEnable(x),
ValueOneof::OobeIsFinished(x) => SettingValue::OobeIsFinished(x),
ValueOneof::GestureEnable(x) => SettingValue::GestureEnable(x),
ValueOneof::DiagnosticsEnable(x) => SettingValue::DiagnosticsEnable(x),
ValueOneof::OobeMode(x) => SettingValue::OobeMode(x),
ValueOneof::GestureControl(x) => SettingValue::GestureControl(GestureControl::from(x)),
ValueOneof::MultipointEnable(x) => SettingValue::MultipointEnable(x),
ValueOneof::AncrGestureLoop(x) => SettingValue::AncrGestureLoop(AncrGestureLoop::from(x)),
ValueOneof::CurrentAncrState(x) => SettingValue::CurrentAncrState(AncState::from_primitive(x)),
ValueOneof::OttsMode(x) => SettingValue::OttsMode(x),
ValueOneof::VolumeEqEnable(x) => SettingValue::VolumeEqEnable(x),
ValueOneof::CurrentUserEq(x) => SettingValue::CurrentUserEq(EqBands::from(x)),
ValueOneof::VolumeAsymmetry(x) => SettingValue::VolumeAsymmetry(VolumeAsymmetry::from_raw(x)),
ValueOneof::SumToMono(x) => SettingValue::SumToMono(x),
ValueOneof::VolumeExposureNotifications(x) => SettingValue::VolumeExposureNotifications(x),
ValueOneof::SpeechDetection(x) => SettingValue::SpeechDetection(x),
}
}
}
impl From<SettingValue> for types::setting_value::ValueOneof {
fn from(value: SettingValue) -> Self {
use types::setting_value::ValueOneof;
match value {
SettingValue::AutoOtaEnable(x) => ValueOneof::AutoOtaEnable(x),
SettingValue::OhdEnable(x) => ValueOneof::OhdEnable(x),
SettingValue::OobeIsFinished(x) => ValueOneof::OobeIsFinished(x),
SettingValue::GestureEnable(x) => ValueOneof::GestureEnable(x),
SettingValue::DiagnosticsEnable(x) => ValueOneof::DiagnosticsEnable(x),
SettingValue::OobeMode(x) => ValueOneof::OobeMode(x),
SettingValue::GestureControl(x) => ValueOneof::GestureControl(x.into()),
SettingValue::MultipointEnable(x) => ValueOneof::MultipointEnable(x),
SettingValue::AncrGestureLoop(x) => ValueOneof::AncrGestureLoop(x.into()),
SettingValue::CurrentAncrState(x) => ValueOneof::CurrentAncrState(x.into()),
SettingValue::OttsMode(x) => ValueOneof::OttsMode(x),
SettingValue::VolumeEqEnable(x) => ValueOneof::VolumeEqEnable(x),
SettingValue::CurrentUserEq(x) => ValueOneof::CurrentUserEq(x.into()),
SettingValue::VolumeAsymmetry(x) => ValueOneof::VolumeAsymmetry(x.raw()),
SettingValue::SumToMono(x) => ValueOneof::SumToMono(x),
SettingValue::VolumeExposureNotifications(x) => ValueOneof::VolumeExposureNotifications(x),
SettingValue::SpeechDetection(x) => ValueOneof::SpeechDetection(x),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GestureControl {
pub left: RegularActionTarget,
pub right: RegularActionTarget,
}
impl From<types::GestureControl> for GestureControl {
fn from(value: types::GestureControl) -> Self {
let left = value.left
.and_then(|v| v.value_oneof)
.map(|types::device_gesture_control::ValueOneof::Type(x)| x)
.map(|v| RegularActionTarget::from_primitive(v.value))
.unwrap_or(RegularActionTarget::Unknown(-1));
let right = value.right
.and_then(|v| v.value_oneof)
.map(|types::device_gesture_control::ValueOneof::Type(x)| x)
.map(|v| RegularActionTarget::from_primitive(v.value))
.unwrap_or(RegularActionTarget::Unknown(-1));
GestureControl { left, right }
}
}
impl From<GestureControl> for types::GestureControl {
fn from(value: GestureControl) -> Self {
use types::device_gesture_control::ValueOneof;
let left = types::DeviceGestureControl {
value_oneof: Some(ValueOneof::Type(types::GestureControlType {
value: value.left.into(),
})),
};
let right = types::DeviceGestureControl {
value_oneof: Some(ValueOneof::Type(types::GestureControlType {
value: value.right.into(),
})),
};
Self {
left: Some(left),
right: Some(right),
}
}
}
impl Default for GestureControl {
fn default() -> Self {
Self {
left: RegularActionTarget::AncControl,
right: RegularActionTarget::AncControl,
}
}
}
impl std::fmt::Display for GestureControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "left: {}, right: {}", self.left, self.right)
}
}
#[repr(i32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, FromPrimitive)]
pub enum RegularActionTarget {
CheckNotifications = 1,
PreviousTrackRepeat = 2,
NextTrack = 3,
PlayPauseTrack = 4,
AncControl = 5,
AssistantQuery = 6,
#[num_enum(catch_all)]
Unknown(i32),
}
impl RegularActionTarget {
pub fn as_str(&self) -> &'static str {
match self {
RegularActionTarget::CheckNotifications => "check-notifications",
RegularActionTarget::PreviousTrackRepeat => "previous",
RegularActionTarget::NextTrack => "next",
RegularActionTarget::PlayPauseTrack => "play-pause",
RegularActionTarget::AncControl => "anc",
RegularActionTarget::AssistantQuery => "assistant",
RegularActionTarget::Unknown(_) => "unknown",
}
}
}
impl std::fmt::Display for RegularActionTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RegularActionTarget::CheckNotifications => write!(f, "check-notifications"),
RegularActionTarget::PreviousTrackRepeat => write!(f, "previous"),
RegularActionTarget::NextTrack => write!(f, "next"),
RegularActionTarget::PlayPauseTrack => write!(f, "play-pause"),
RegularActionTarget::AncControl => write!(f, "anc"),
RegularActionTarget::AssistantQuery => write!(f, "assistant"),
RegularActionTarget::Unknown(x) => write!(f, "unknown ({x})"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AncrGestureLoop {
pub active: bool,
pub off: bool,
pub aware: bool,
pub adaptive: bool,
}
impl AncrGestureLoop {
pub fn is_valid(&self) -> bool {
// at least two need to be set
(self.active as u32 + self.off as u32 + self.aware as u32 + self.adaptive as u32) >= 2
}
}
impl From<types::AncrGestureLoop> for AncrGestureLoop {
fn from(other: types::AncrGestureLoop) -> Self {
AncrGestureLoop { active: other.active, off: other.off, aware: other.aware, adaptive: other.adaptive }
}
}
impl From<AncrGestureLoop> for types::AncrGestureLoop {
fn from(other: AncrGestureLoop) -> Self {
Self {
active: other.active,
off: other.off,
aware: other.aware,
adaptive: other.adaptive,
}
}
}
impl std::fmt::Display for AncrGestureLoop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut n = 0;
write!(f, "[")?;
if self.active {
write!(f, "active")?;
n += 1;
}
if self.off {
if n > 0 {
write!(f, ", ")?;
}
write!(f, "off")?;
n += 1;
}
if self.aware {
if n > 0 {
write!(f, ", ")?;
}
write!(f, "aware")?;
}
if self.adaptive {
if n > 0 {
write!(f, ", ")?;
}
write!(f, "adaptive")?;
}
write!(f, "]")
}
}
#[repr(i32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, FromPrimitive)]
pub enum AncState {
Off = 1,
Active = 2,
Aware = 3,
Adaptive = 4,
#[num_enum(catch_all)]
Unknown(i32),
}
impl AncState {
pub fn as_str(&self) -> &'static str {
match self {
AncState::Off => "off",
AncState::Active => "active",
AncState::Aware => "aware",
AncState::Adaptive => "adaptive",
AncState::Unknown(_) => "unknown",
}
}
}
// #[derive(Default)] clashes with #[derive(FromPrimitive)]
#[allow(clippy::derivable_impls)]
impl Default for AncState {
fn default() -> Self {
AncState::Off
}
}
impl std::fmt::Display for AncState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AncState::Off => write!(f, "off"),
AncState::Active => write!(f, "active"),
AncState::Aware => write!(f, "aware"),
AncState::Adaptive => write!(f, "adaptive"),
AncState::Unknown(x) => write!(f, "unknown ({x})"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct EqBands {
low_bass: f32,
bass: f32,
mid: f32,
treble: f32,
upper_treble: f32,
}
impl EqBands {
pub const MIN_VALUE: f32 = -6.0;
pub const MAX_VALUE: f32 = 6.0;
pub fn new(low_bass: f32, bass: f32, mid: f32, treble: f32, upper_treble: f32) -> Self {
Self {
low_bass: low_bass.clamp(Self::MIN_VALUE, Self::MAX_VALUE),
bass: bass.clamp(Self::MIN_VALUE, Self::MAX_VALUE),
mid: mid.clamp(Self::MIN_VALUE, Self::MAX_VALUE),
treble: treble.clamp(Self::MIN_VALUE, Self::MAX_VALUE),
upper_treble: upper_treble.clamp(Self::MIN_VALUE, Self::MAX_VALUE),
}
}
pub fn low_bass(&self) -> f32 {
self.low_bass
}
pub fn bass(&self) -> f32 {
self.bass
}
pub fn mid(&self) -> f32 {
self.mid
}
pub fn treble(&self) -> f32 {
self.treble
}
pub fn upper_treble(&self) -> f32 {
self.upper_treble
}
pub fn set_low_bass(&mut self, value: f32) {
self.low_bass = value.clamp(Self::MIN_VALUE, Self::MAX_VALUE)
}
pub fn set_bass(&mut self, value: f32) {
self.bass = value.clamp(Self::MIN_VALUE, Self::MAX_VALUE)
}
pub fn set_mid(&mut self, value: f32) {
self.mid = value.clamp(Self::MIN_VALUE, Self::MAX_VALUE)
}
pub fn set_treble(&mut self, value: f32) {
self.treble = value.clamp(Self::MIN_VALUE, Self::MAX_VALUE)
}
pub fn set_upper_treble(&mut self, value: f32) {
self.upper_treble = value.clamp(Self::MIN_VALUE, Self::MAX_VALUE)
}
}
impl Default for EqBands {
fn default() -> Self {
Self {
low_bass: 0.0,
bass: 0.0,
mid: 0.0,
treble: 0.0,
upper_treble: 0.0
}
}
}
impl From<types::EqBands> for EqBands {
fn from(other: types::EqBands) -> Self {
Self {
low_bass: other.low_bass,
bass: other.bass,
mid: other.mid,
treble: other.treble,
upper_treble: other.upper_treble,
}
}
}
impl From<EqBands> for types::EqBands {
fn from(other: EqBands) -> Self {
Self {
low_bass: other.low_bass.clamp(EqBands::MIN_VALUE, EqBands::MAX_VALUE),
bass: other.bass.clamp(EqBands::MIN_VALUE, EqBands::MAX_VALUE),
mid: other.mid.clamp(EqBands::MIN_VALUE, EqBands::MAX_VALUE),
treble: other.treble.clamp(EqBands::MIN_VALUE, EqBands::MAX_VALUE),
upper_treble: other.upper_treble.clamp(EqBands::MIN_VALUE, EqBands::MAX_VALUE),
}
}
}
impl std::fmt::Display for EqBands {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f, "[{:.2}, {:.2}, {:.2}, {:.2}, {:.2}]",
self.low_bass, self.bass, self.mid, self.treble, self.upper_treble,
)
}
}
#[derive(Default, Clone, Copy, PartialEq, Eq)]
pub struct VolumeAsymmetry {
value: i32,
}
impl VolumeAsymmetry {
pub fn from_normalized(value: i32) -> Self {
Self { value: value.clamp(-100, 100) }
}
pub fn from_raw(value: i32) -> Self {
let direction = value & 0x01;
let value = value >> 1;
let normalized = if direction != 0 {
value + 1
} else {
- value
};
Self { value: normalized }
}
pub fn raw(&self) -> i32 {
if self.value > 0 {
((self.value - 1) << 1) | 0x01
} else {
(-self.value) << 1
}
}
pub fn value(&self) -> i32 {
self.value
}
}
impl std::fmt::Debug for VolumeAsymmetry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value)
}
}
impl std::fmt::Display for VolumeAsymmetry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let left = (100 - self.value).min(100);
let right = (100 + self.value).min(100);
write!(f, "left: {left}%, right: {right}%")
}
}
pub trait Setting {
type Type;
fn id(&self) -> SettingId;
fn from_var(var: SettingValue) -> Option<Self::Type>;
}
impl Setting for SettingId {
type Type = SettingValue;
fn id(&self) -> SettingId {
*self
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
Some(var)
}
}
pub mod id {
use super::*;
pub struct AutoOtaEnable;
pub struct OhdEnable;
pub struct OobeIsFinished;
pub struct GestureEnable;
pub struct DiagnosticsEnable;
pub struct OobeMode;
pub struct GestureControl;
pub struct MultipointEnable;
pub struct AncrGestureLoop;
pub struct CurrentAncrState;
pub struct OttsMode;
pub struct VolumeEqEnable;
pub struct CurrentUserEq;
pub struct VolumeAsymmetry;
pub struct SumToMono;
pub struct VolumeExposureNotifications;
pub struct SpeechDetection;
impl Setting for AutoOtaEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::AutoOtaEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::AutoOtaEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for OhdEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::OhdEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::OhdEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for OobeIsFinished {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::OobeIsFinished
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::OobeIsFinished(x) => Some(x),
_ => None,
}
}
}
impl Setting for GestureEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::GestureEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::GestureEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for DiagnosticsEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::DiagnosticsEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::DiagnosticsEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for OobeMode {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::OobeMode
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::OobeMode(x) => Some(x),
_ => None,
}
}
}
impl Setting for GestureControl {
type Type = super::GestureControl;
fn id(&self) -> SettingId {
SettingId::GestureControl
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::GestureControl(x) => Some(x),
_ => None,
}
}
}
impl Setting for MultipointEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::MultipointEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::MultipointEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for AncrGestureLoop {
type Type = super::AncrGestureLoop;
fn id(&self) -> SettingId {
SettingId::AncrGestureLoop
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::AncrGestureLoop(x) => Some(x),
_ => None,
}
}
}
impl Setting for CurrentAncrState {
type Type = AncState;
fn id(&self) -> SettingId {
SettingId::CurrentAncrState
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::CurrentAncrState(x) => Some(x),
_ => None,
}
}
}
impl Setting for OttsMode {
type Type = i32;
fn id(&self) -> SettingId {
SettingId::OttsMode
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::OttsMode(x) => Some(x),
_ => None,
}
}
}
impl Setting for VolumeEqEnable {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::VolumeEqEnable
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::VolumeEqEnable(x) => Some(x),
_ => None,
}
}
}
impl Setting for CurrentUserEq {
type Type = EqBands;
fn id(&self) -> SettingId {
SettingId::CurrentUserEq
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::CurrentUserEq(x) => Some(x),
_ => None,
}
}
}
impl Setting for VolumeAsymmetry {
type Type = super::VolumeAsymmetry;
fn id(&self) -> SettingId {
SettingId::VolumeAsymmetry
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::VolumeAsymmetry(x) => Some(x),
_ => None,
}
}
}
impl Setting for SumToMono {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::SumToMono
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::SumToMono(x) => Some(x),
_ => None,
}
}
}
impl Setting for VolumeExposureNotifications {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::VolumeExposureNotifications
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::VolumeExposureNotifications(x) => Some(x),
_ => None,
}
}
}
impl Setting for SpeechDetection {
type Type = bool;
fn id(&self) -> SettingId {
SettingId::SpeechDetection
}
fn from_var(var: SettingValue) -> Option<Self::Type> {
match var {
SettingValue::SpeechDetection(x) => Some(x),
_ => None,
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_volume_assymetry_conversion() {
for i in 0..=200 {
assert_eq!(VolumeAsymmetry::from_raw(i).raw(), i)
}
}
}