
use_prelude!();
use ffi_sdk::{self, COrderByParam, ChangeHandlerWithDocsDiff, LiveQueryAvailability};
use tracing::{debug, error};
use crate::{
ditto::{TryUpgrade, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
subscription::Subscription,
};
mod event;
mod single_document_event;
#[path = "move.rs"]
#[doc(hidden)]
mod move_module;
pub use event::LiveQueryEvent;
pub use move_module::LiveQueryMove;
pub use single_document_event::SingleDocumentLiveQueryEvent;
trait_alias! {
/// A closure which is called on each event for a single document live query
/// (`find_by_id(...).observe_local(...)`)
pub
trait SingleDocumentEventHandler =
FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
+ Send // can be dropped in another thread
+ 'static // cannot dangle
}
trait_alias! {
/// A closure which is called on each event
pub
trait EventHandler =
FnMut(Vec<BoxedDocument>, LiveQueryEvent)
+ Send // can be dropped in another thread
+ 'static // cannot dangle
}
/// The type that is returned when calling [`observe_local`] on a [`PendingCursorOperation`]
/// object. It handles the logic for calling the [`EventHandler`] that is provided to
/// [`observe_local`] calls. [`LiveQuery`] objects must be kept in scope for as long as you wish to
/// have your event handler be called when there is an update to a document matching the query you
/// provide.
///
/// [`observe_local`]:
/// crate::store::collection::pending_cursor_operation::PendingCursorOperation::observe_local
pub struct LiveQuery {
ditto: WeakDittoHandleWrapper,
pub query: char_p::Box,
pub collection_name: char_p::Box,
// TODO: remove, as this is now always `None`. But removing it is technically a breaking change
pub subscription: Option<Box<Subscription>>,
pub id: i64,
}
const UNASSIGNED_ID_SENTINEL: i64 = -1;
impl LiveQuery {
#[rustfmt::skip]
#[allow(clippy::too_many_arguments)]
// TODO(pub_check)
pub
fn with_handler<F> (
ditto: WeakDittoHandleWrapper,
query: char_p::Box,
query_args: Option<Vec<u8>>,
collection_name: char_p::Box,
order_by: &'_ [COrderByParam<'_>], // Note COrderByParam is not opaque
limit: i32,
offset: u32,
event_handler: F,
) -> Result<Self, DittoError>
where
F : EventHandler,
{
let strong_ditto = ditto.try_upgrade()?;
let event_handler: Arc<F> = Arc::new(event_handler);
let ctx = Arc::as_ptr(&event_handler) as *mut c_void;
let retain_ctx = {
unsafe extern "C"
fn retain<F>(ctx: *mut c_void) {
Arc::<F>::increment_strong_count(ctx.cast());
}
retain::<F>
};
let release_ctx = {
unsafe extern "C"
fn release<F>(ctx: *mut c_void) {
drop(Arc::<F>::from_raw(ctx.cast()));
}
release::<F>
};
#[allow(improper_ctypes_definitions)] // false positive
unsafe extern "C"
fn c_cb<F : EventHandler> (ctx: *mut c_void, p: ChangeHandlerWithDocsDiff)
{
let ctx: *mut F = ctx.cast();
// Note: this assumes non-concurrent calls from the FFI, which is currently true.
// Should it not be the case, we'd have to `Mutex`-wrap the `F` so as to `.lock()`,
// here (or require `EventHandler : Sync + Fn`, which would make the caller be the one
// having to do that, but it would be a breaking change).
let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
let event = if p.is_initial { // more explicit flag for "initial" status
LiveQueryEvent::Initial
} else {
let moves: Vec<LiveQueryMove> =
p .moves
.as_ref()
.unwrap()
.as_slice()
.chunks_exact(2)
.map(|it| if let [from, to] = *it {
LiveQueryMove { from, to }
} else { unreachable!() })
.collect()
;
LiveQueryEvent::Update {
old_documents: p.old_documents.unwrap().into(),
insertions: p.insertions.unwrap().into(),
deletions: p.deletions.unwrap().into(),
updates: p.updates.unwrap().into(),
moves,
}
};
event_handler(p.documents.into(), event);
}
let mut this = LiveQuery {
ditto,
query,
collection_name,
subscription: None,
id: UNASSIGNED_ID_SENTINEL,
};
let id = &mut this.id;
let availability = LiveQueryAvailability::Always;
*id = unsafe {
ffi_sdk::ditto_live_query_register_str(
&*strong_ditto,
this.collection_name.as_ref(),
this.query.as_ref(),
query_args.as_ref().map(|qa| (&qa[..]).into()),
order_by.into(),
limit,
offset,
availability,
ctx,
Some(retain_ctx),
Some(release_ctx),
c_cb::<F>,
).ok_or(ErrorKind::InvalidInput)?
};
if *id == UNASSIGNED_ID_SENTINEL {
error!("live query was not given a valid id");
} else {
debug!(%id, "live query id");
}
// start the query
{
ffi_sdk::ditto_live_query_start(&*strong_ditto, *id);
}
Ok(this)
}
}
impl Drop for LiveQuery {
fn drop(self: &'_ mut LiveQuery) {
// Signals to the FFI that it is now free to *eventually* release the
// live query's event handler.
debug!(id = %self.id, "dropping LiveQuery");
if let Some(ditto) = self.ditto.upgrade() {
ffi_sdk::ditto_live_query_stop(&*ditto, self.id);
}
}
}
impl crate::observer::Observer for LiveQuery {}