dittolive_ditto/store/
observer.rsuse_prelude!();
use std::hash::{self, Hash};
use ffi_sdk::{
ffi_utils::repr_c, FfiDynChangeHandlerWithSignalNext, FfiDynSignalNext, FfiQueryResult,
FfiStoreObserver,
};
use uuid::Uuid;
pub use super::*;
use crate::{error::DittoError, utils::zstr::zstr};
pub struct StoreObserver {
pub(crate) handle: repr_c::Box<FfiStoreObserver>,
}
trait_alias! {
pub trait ChangeHandler =
FnMut(QueryResult)
+ Send + Sync
+ 'static }
trait_alias! {
pub trait SignalNext =
FnOnce()
+ 'static + Send }
trait_alias! {
pub trait ChangeHandlerWithSignalNext =
FnMut(QueryResult, Box<dyn SignalNext>)
+ 'static + Send + Sync
}
fn dittoffi_store_observer_register_safe<F>(
ditto: &'_ Ditto,
query: &zstr,
query_args: Option<&[u8]>,
mut observer: F,
) -> Result<StoreObserver>
where
F: ChangeHandlerWithSignalNext,
{
let ffi_callback: repr_c::Box<FfiDynChangeHandlerWithSignalNext> = {
fn make_callback<F>(f: F) -> repr_c::Box<FfiDynChangeHandlerWithSignalNext>
where
F: FnMut(repr_c::Box<FfiQueryResult>, repr_c::Arc<FfiDynSignalNext>) + 'static + Send,
{
Box::new(f).into()
}
make_callback(
move |ffi_query_result: repr_c::Box<FfiQueryResult>,
signal_next: repr_c::Arc<FfiDynSignalNext>| {
let signal_next = Box::new(move || {
signal_next.call();
});
observer(QueryResult::from(ffi_query_result), signal_next);
},
)
};
let query = query.into();
let query_args_cbor = query_args.map(|qa| qa.into());
let handle = ffi_sdk::dittoffi_store_register_observer_throws(
&ditto.ditto,
query,
query_args_cbor,
ffi_callback,
)
.into_rust_result()?;
Ok(StoreObserver { handle })
}
impl StoreObserver {
pub(crate) fn new<F>(
ditto: &Ditto,
query: &zstr,
query_args: Option<&[u8]>,
mut on_change: F,
) -> Result<Self, DittoError>
where
F: ChangeHandler,
{
Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
on_change(args);
signal_next();
})
}
pub(crate) fn with_signal_next<F>(
ditto: &Ditto,
query: &zstr,
query_args: Option<&[u8]>,
on_change: F,
) -> Result<Self, DittoError>
where
F: ChangeHandlerWithSignalNext,
{
let on_change = {
let on_change = ::std::sync::Mutex::new(on_change);
move |arg: QueryResult, signal_next: Box<dyn SignalNext>| {
let mut on_change = on_change
.lock()
.expect("`on_change` observer not to be poisoned");
on_change(arg, signal_next)
}
};
dittoffi_store_observer_register_safe(ditto, query, query_args, on_change)
}
pub fn query_string(&self) -> String {
let char_p = ffi_sdk::dittoffi_store_observer_query_string(&self.handle);
char_p.into_string()
}
pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
let buffer: c_slice::Box<u8> =
ffi_sdk::dittoffi_store_observer_query_arguments(&self.handle)?;
let cbor = serde_cbor::from_slice(buffer.as_slice())
.unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
Some(cbor)
}
pub fn cancel(&self) {
ffi_sdk::dittoffi_store_observer_cancel(&self.handle);
}
pub fn is_cancelled(&self) -> bool {
ffi_sdk::dittoffi_store_observer_is_cancelled(&self.handle)
}
fn id(&self) -> Uuid {
let buffer = ffi_sdk::dittoffi_store_observer_id(&self.handle);
Uuid::from_slice(buffer.as_slice()).expect("bug: expected valid UUID")
}
}
impl StoreObserver {
fn comparable_parts(&self) -> impl '_ + Eq + Hash {
self.id()
}
}
impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
fn eq(&self, other: &Self) -> bool {
self.comparable_parts() == other.comparable_parts()
}
}
impl Hash for StoreObserver {
fn hash<H: hash::Hasher>(&self, h: &mut H) {
self.comparable_parts().hash(h)
}
}