use_prelude!();
use std::{
hash::{self, Hash},
sync::{Arc, Weak},
};
pub use ffi_sdk::LiveQueryAvailability;
pub use super::*;
use crate::{ditto::DittoFields, error::DittoError};
pub struct StoreObserver {
ditto: Weak<DittoFields>,
pub(crate) live_query_id: i64,
pub(crate) _query: Query,
_query_args: Option<QueryArguments>,
}
trait_alias! {
pub trait ChangeHandler =
FnMut(QueryResult)
+ Send + 'static }
trait_alias! {
pub trait SignalNext =
FnOnce()
+ Send + 'static }
trait_alias! {
pub trait ChangeHandlerWithSignalNext =
FnMut(QueryResult, Box<dyn Send + SignalNext>)
+ Send + 'static }
fn dittoffi_try_experimental_register_change_observer_str_safe<F>(
ditto: &'_ Ditto,
query: Query,
query_args: Option<QueryArguments>,
observer: F,
) -> Result<StoreObserver>
where
F: 'static + Send + Sync + Fn(QueryResult, Box<dyn Send + SignalNext>),
{
struct CbContext<F> {
cb: F,
ditto_weak: Weak<DittoFields>,
live_query_id: ::std::sync::Mutex<Option<i64>>,
}
let cb_context = CbContext {
cb: observer,
ditto_weak: Arc::downgrade(&ditto.fields),
live_query_id: ::std::sync::Mutex::new(None),
};
let arc_cb_context = Arc::new(cb_context);
let retain_ctx = ::extern_c::extern_c(|ctx: *mut c_void| unsafe {
Arc::<CbContext<F>>::increment_strong_count(ctx.cast());
});
let release_ctx = ::extern_c::extern_c(|ctx: *mut c_void| unsafe {
drop(Arc::<CbContext<F>>::from_raw(ctx.cast()))
});
let c_cb = ::extern_c::extern_c(
|ctx: *mut c_void, c_params: ffi_sdk::ChangeHandlerWithQueryResult| unsafe {
let ctx: *const CbContext<F> = ctx.cast();
let callback_context: &CbContext<F> = ctx.as_ref().expect("Got NULL");
let observer = &callback_context.cb;
let ditto_weak = &callback_context.ditto_weak;
let live_query_id = callback_context
.live_query_id
.lock()
.expect("mutex poisoned")
.expect("live_query_id should be set");
let signal_next = move || {
let Some(ditto) = Weak::upgrade(ditto_weak) else {
tracing::debug!("Live Query signal_next fired after Ditto has closed");
return;
};
ffi_sdk::ditto_live_query_signal_available_next(&ditto.ditto, live_query_id);
};
let signal_next = Box::new(signal_next);
observer(QueryResult::from(c_params.query_result), signal_next);
},
);
let c_query = query.prepare_ffi();
let c_query_cbor = query_args.as_ref().map(|qa| qa.cbor());
let mut live_query_id_lock = arc_cb_context.live_query_id.lock().expect("mutex poisoned");
let live_query_id = unsafe {
ffi_sdk::dittoffi_try_experimental_register_change_observer_str(
&ditto.ditto,
c_query,
c_query_cbor.map(|qa| qa.into()),
LiveQueryAvailability::WhenSignalled,
Arc::as_ptr(&arc_cb_context) as *mut c_void,
Some(retain_ctx),
Some(release_ctx),
c_cb,
)
}
.into_rust_result()?;
*live_query_id_lock = Some(live_query_id);
drop(live_query_id_lock);
let this = StoreObserver {
_query: query,
_query_args: query_args,
ditto: Arc::downgrade(&ditto.fields),
live_query_id,
};
ffi_sdk::ditto_live_query_start(&ditto.ditto, this.live_query_id);
Ok(this)
}
impl StoreObserver {
pub(crate) fn new<F>(
ditto: &Ditto,
query: Query,
query_args: Option<QueryArguments>,
mut on_change: F,
) -> Result<Self, DittoError>
where
F: ChangeHandler,
{
Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
on_change(args);
signal_next();
})
}
pub(crate) fn with_signal_next<F>(
ditto: &Ditto,
query: Query,
query_args: Option<QueryArguments>,
on_change: F,
) -> Result<Self, DittoError>
where
F: ChangeHandlerWithSignalNext,
{
let on_change = {
let on_change = ::std::sync::Mutex::new(on_change);
move |arg: QueryResult, signal_next: Box<dyn Send + SignalNext>| {
let mut on_change = on_change
.lock()
.expect("`on_change` observer not to be poisoned");
on_change(arg, signal_next)
}
};
dittoffi_try_experimental_register_change_observer_str_safe(
ditto, query, query_args, on_change,
)
}
pub fn cancel(&self) {
if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
ditto.store.unregister_observer(self);
}
}
pub fn is_cancelled(&self) -> bool {
Ditto::upgrade(&self.ditto)
.map_or(true, |ditto| ditto.store.observers().contains(self).not())
}
}
impl Drop for StoreObserver {
fn drop(&mut self) {
self.cancel()
}
}
impl StoreObserver {
fn comparable_parts(&self) -> impl '_ + Eq + Hash {
(self.live_query_id, &self._query, &self._query_args)
}
}
impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
fn eq(&self, other: &Self) -> bool {
self.comparable_parts() == other.comparable_parts()
}
}
impl Hash for StoreObserver {
fn hash<H: hash::Hasher>(&self, h: &mut H) {
self.comparable_parts().hash(h)
}
}