use std::{any::Any, collections::HashMap, ffi::c_void, sync::Arc};
use ffi_sdk::{ffi_utils::char_p, BoxedDitto, DittoSdkTransportsError};
use safer_ffi::prelude::*;
use tracing::info;
use super::{
sync_state::SyncState,
transport_config::{LanConfig, TransportConfig},
};
use crate::{
identity::SharedIdentity,
utils::prelude::{DittoError, ErrorKind},
};
pub struct TransportSync {
ditto: Arc<BoxedDitto>,
effective_state: SyncState,
requested_state: SyncState,
pub(crate) tcp_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
pub(crate) ws_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
mdns_transport_handle: Option<MdnsTransportHandleHolder>,
ble_transport_handle: Option<BleTransportHandleHolder>,
}
struct MdnsTransportHandleHolder(*mut c_void);
unsafe impl Send for MdnsTransportHandleHolder {}
unsafe impl Sync for MdnsTransportHandleHolder {}
struct BleTransportHandleHolder(*mut c_void);
unsafe impl Send for BleTransportHandleHolder {}
unsafe impl Sync for BleTransportHandleHolder {}
impl TransportSync {
pub(crate) fn from_config(
config: TransportConfig,
ditto: Arc<BoxedDitto>,
identity: SharedIdentity,
) -> TransportSync {
let web_valid = ffi_sdk::ditto_auth_client_is_web_valid(&ditto) != 0;
let x509_valid = ffi_sdk::ditto_auth_client_is_x509_valid(&ditto) != 0;
let requested_state = SyncState::new(config, identity.clone(), web_valid, x509_valid);
let effective_state =
SyncState::new(TransportConfig::new(), identity, web_valid, x509_valid);
let mut error = DittoSdkTransportsError::None;
unsafe { ffi_sdk::ditto_sdk_transports_init((&mut error).into()) };
if error != DittoSdkTransportsError::None {
tracing::error!(?error, "failed to perform transports init");
}
TransportSync {
ditto,
requested_state,
effective_state,
tcp_clients: HashMap::with_capacity(0), ws_clients: HashMap::with_capacity(0),
mdns_transport_handle: None,
ble_transport_handle: None,
}
}
pub(crate) fn start_sync(&mut self) {
self.requested_state.set_sync(true);
self.update()
}
pub(crate) fn stop_sync(&mut self) {
self.requested_state.set_sync(false);
self.update()
}
pub(crate) fn validity_updated(&mut self, web_valid: bool, x509_valid: bool) {
self.requested_state
.set_web_valid(web_valid)
.set_x509_valid(x509_valid);
self.update()
}
pub(crate) fn set_transport_config(&mut self, state: TransportConfig) {
let config_cbor =
serde_cbor::to_vec(&state).expect("TransportConfig can be serialized to CBOR");
self.requested_state.set_config(state);
self.update();
ffi_sdk::ditto_small_peer_info_set_transport_config_data(
&self.ditto,
config_cbor.as_slice().into(),
);
}
fn update(&mut self) {
let future_state = self.requested_state.compute_effective_state();
let old_state = self.effective_state.clone();
self.apply_transport_state(&future_state, &old_state);
self.apply_transport_global_state(&future_state, &old_state);
self.effective_state = future_state;
}
fn apply_transport_state(&mut self, state: &SyncState, old_state: &SyncState) {
self.update_peer_to_peer_bluetooth_le(state, old_state);
self.update_peer_to_peer_lan(state, old_state);
self.update_listen_tcp(state, old_state);
self.update_listen_http(state, old_state);
self.update_connect_tcp_servers(state, old_state);
self.update_connect_websocket_url(state, old_state);
self.update_connect_retry_interval(state, old_state);
}
pub(crate) fn current_config(&self) -> &TransportConfig {
self.requested_state.config()
}
#[cfg(test)]
pub(crate) fn effective_config(&self) -> &TransportConfig {
self.effective_state.config()
}
fn apply_transport_global_state(&self, state: &SyncState, old_state: &SyncState) {
let new_sync_group = state.config().global.sync_group;
let old_sync_group = old_state.config().global.sync_group;
if new_sync_group != old_sync_group {
ffi_sdk::ditto_set_sync_group(&self.ditto, new_sync_group);
}
}
fn start_tcp_listen(&mut self, config: &crate::transport::TcpListenConfig) {
let bind_ip = format!("{}:{}", config.interface_ip, config.port);
let c_addr = char_p::new(bind_ip);
let _result = ffi_sdk::ditto_start_tcp_server(&self.ditto, Some(c_addr.as_ref()));
}
fn stop_tcp_listen(&mut self) {
ffi_sdk::ditto_stop_tcp_server(&self.ditto);
}
fn start_http_listen(
&mut self,
config: &crate::transport::HttpListenConfig,
) -> Result<(), DittoError> {
let enable_ws = if config.websocket_sync {
ffi_sdk::WebSocketMode::Enabled
} else {
ffi_sdk::WebSocketMode::Disabled
};
let bind_ip = format!("{}:{}", config.interface_ip, config.port);
let c_addr = char_p::new(bind_ip);
let c_static_path = config
.static_content_path
.as_ref()
.map(|x| char_p::new(x.to_string_lossy().to_string()));
let c_tls_cert_path = config
.tls_certificate_path
.as_ref()
.map(|x| char_p::new(x.to_string_lossy().to_string()));
let c_tls_key_path = config
.tls_key_path
.as_ref()
.map(|x| char_p::new(x.to_string_lossy().to_string()));
let status = {
ffi_sdk::ditto_start_http_server(
&self.ditto,
Some(c_addr.as_ref()),
c_static_path.as_ref().map(|x| x.as_ref()),
enable_ws,
c_tls_cert_path.as_ref().map(|x| x.as_ref()), c_tls_key_path.as_ref().map(|x| x.as_ref()), )
};
if status != 0 {
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
} else {
Ok(())
}
}
fn stop_http_listen(&mut self) {
ffi_sdk::ditto_stop_http_server(&self.ditto);
}
fn start_tcp_connect(&mut self, address: String) {
let addr = char_p::new(address.clone());
let tcp_client_handle = ffi_sdk::ditto_add_static_tcp_client(&self.ditto, addr.as_ref());
info!(?address, "static TCP client transport started");
self.tcp_clients
.insert(address, Box::new(tcp_client_handle));
}
fn stop_tcp_connect(&mut self, address: &str) {
let _to_drop = self.tcp_clients.remove(address);
}
fn start_ws_connect(&mut self, url: String, routing_hint: u32) {
let c_url = char_p::new(url.clone());
let ws_client_handle =
ffi_sdk::ditto_add_websocket_client(&self.ditto, c_url.as_ref(), routing_hint);
info!(?url, "websocket client transport started");
self.ws_clients.insert(url, Box::new(ws_client_handle));
}
fn stop_ws_connect(&mut self, url: &str) {
let _to_drop = self.ws_clients.remove(url);
}
fn start_bluetooth(&mut self) {
let mut error = DittoSdkTransportsError::None;
let handle =
unsafe { ffi_sdk::ditto_sdk_transports_ble_create(&self.ditto, (&mut error).into()) };
if error == DittoSdkTransportsError::None {
self.ble_transport_handle = Some(BleTransportHandleHolder(handle));
tracing::info!("BLE transport started");
} else {
tracing::error!(?error, "failed to start BLE transport");
}
}
fn stop_bluetooth(&mut self) {
let Some(handle) = self.ble_transport_handle.take() else {
return;
};
let mut error = DittoSdkTransportsError::None;
unsafe { ffi_sdk::ditto_sdk_transports_ble_destroy(handle.0, (&mut error).into()) };
if error != DittoSdkTransportsError::None {
tracing::error!(?error, "failed to stop LAN transport");
}
}
fn start_lan(&mut self, config: &LanConfig) {
if config.mdns_enabled {
tracing::info!("starting LAN transport");
let mut error = DittoSdkTransportsError::None;
let handle = unsafe {
ffi_sdk::ditto_sdk_transports_lan_create(&self.ditto, (&mut error).into())
};
if error == DittoSdkTransportsError::None {
self.mdns_transport_handle = Some(MdnsTransportHandleHolder(handle));
} else {
tracing::error!(?error, "failed to start LAN transport");
}
}
#[allow(deprecated)] if config.multicast_enabled {
ffi_sdk::ditto_add_multicast_transport(&self.ditto);
}
}
fn stop_lan(&mut self) {
ffi_sdk::ditto_remove_multicast_transport(&self.ditto);
let Some(handle) = self.mdns_transport_handle.take() else {
return;
};
let mut error = DittoSdkTransportsError::None;
unsafe { ffi_sdk::ditto_sdk_transports_lan_destroy(handle.0, (&mut error).into()) };
if error != DittoSdkTransportsError::None {
tracing::error!(?error, "failed to stop LAN transport");
}
}
}
impl TransportSync {
fn update_peer_to_peer_lan(&mut self, state: &SyncState, old_state: &SyncState) {
if state.config().listen.tcp != old_state.config().listen.tcp
|| state.config().peer_to_peer.lan != old_state.config().peer_to_peer.lan
{
self.stop_lan();
if state.config().peer_to_peer.lan.enabled {
self.start_lan(&state.config().peer_to_peer.lan);
}
}
}
fn update_listen_tcp(&mut self, state: &SyncState, old_state: &SyncState) {
if state.config().listen.tcp != old_state.config().listen.tcp {
self.stop_tcp_listen();
if state.config().listen.tcp.enabled {
self.start_tcp_listen(&state.config().listen.tcp);
}
}
}
fn update_listen_http(&mut self, state: &SyncState, old_state: &SyncState) {
if state.config().listen.http != old_state.config().listen.http {
self.stop_http_listen();
if state.config().listen.http.enabled {
let _ = self.start_http_listen(&state.config().listen.http);
}
}
}
fn update_connect_tcp_servers(&mut self, state: &SyncState, old_state: &SyncState) {
let tcp_connects_to_stop = old_state
.config()
.connect
.tcp_servers
.difference(&state.config().connect.tcp_servers);
for addr in tcp_connects_to_stop {
self.stop_tcp_connect(addr);
}
let tcp_connects_to_start = state
.config()
.connect
.tcp_servers
.difference(&old_state.config().connect.tcp_servers);
for addr in tcp_connects_to_start {
self.start_tcp_connect(addr.clone());
}
}
fn update_peer_to_peer_bluetooth_le(&mut self, state: &SyncState, old_state: &SyncState) {
let new_ble_enabled = state.config().peer_to_peer.bluetooth_le.enabled;
let old_ble_enabled = old_state.config().peer_to_peer.bluetooth_le.enabled;
if old_ble_enabled && !new_ble_enabled {
self.stop_bluetooth();
}
if new_ble_enabled && !old_ble_enabled {
self.start_bluetooth();
}
}
fn update_connect_websocket_url(&mut self, state: &SyncState, old_state: &SyncState) {
let ws_connects_to_stop = old_state
.config()
.connect
.websocket_urls
.difference(&state.config().connect.websocket_urls);
for url in ws_connects_to_stop {
self.stop_ws_connect(url);
}
let ws_connects_to_start = state
.config()
.connect
.websocket_urls
.difference(&old_state.config().connect.websocket_urls);
let routing_hint = state.config().global.routing_hint;
for url in ws_connects_to_start {
self.start_ws_connect(url.clone(), routing_hint);
}
}
fn update_connect_retry_interval(&mut self, state: &SyncState, _old_state: &SyncState) {
let retry_interval =
u32::try_from(state.config().connect.retry_interval.as_millis()).unwrap_or(u32::MAX);
ffi_sdk::ditto_set_connect_retry_interval(&self.ditto, retry_interval);
}
}