1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
use_prelude!();
use ffi_sdk::{self, COrderByParam, ChangeHandlerWithDocsDiff, LiveQueryAvailability};
use tracing::{debug, error};
use crate::{
ditto::{TryUpgrade, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
subscription::Subscription,
};
mod event;
mod single_document_event;
#[path = "move.rs"]
#[doc(hidden)]
mod move_module;
pub use event::LiveQueryEvent;
pub use move_module::LiveQueryMove;
pub use single_document_event::SingleDocumentLiveQueryEvent;
trait_alias! {
/// A closure which is called on each event for a single document live query
/// (`find_by_id(...).observe_local(...)`)
pub
trait SingleDocumentEventHandler =
FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
+ Send // can be dropped in another thread
+ 'static // cannot dangle
}
trait_alias! {
/// A closure which is called on each event
pub
trait EventHandler =
FnMut(Vec<BoxedDocument>, LiveQueryEvent)
+ Send // can be dropped in another thread
+ 'static // cannot dangle
}
/// The type that is returned when calling [`observe_local`] on a [`PendingCursorOperation`]
/// object. It handles the logic for calling the [`EventHandler`] that is provided to
/// [`observe_local`] calls. [`LiveQuery`] objects must be kept in scope for as long as you wish to
/// have your event handler be called when there is an update to a document matching the query you
/// provide.
///
/// [`observe_local`]:
/// crate::store::collection::pending_cursor_operation::PendingCursorOperation::observe_local
pub struct LiveQuery {
ditto: WeakDittoHandleWrapper,
pub query: char_p::Box,
pub collection_name: char_p::Box,
// TODO: remove, as this is now always `None`. But removing it is technically a breaking change
pub subscription: Option<Box<Subscription>>,
pub id: i64,
}
const UNASSIGNED_ID_SENTINEL: i64 = -1;
impl LiveQuery {
#[rustfmt::skip]
#[allow(clippy::too_many_arguments)]
// TODO(pub_check)
pub
fn with_handler<F> (
ditto: WeakDittoHandleWrapper,
query: char_p::Box,
query_args: Option<Vec<u8>>,
collection_name: char_p::Box,
order_by: &'_ [COrderByParam<'_>], // Note COrderByParam is not opaque
limit: i32,
offset: u32,
event_handler: F,
) -> Result<Self, DittoError>
where
F : EventHandler,
{
let strong_ditto = ditto.try_upgrade()?;
let event_handler: Arc<F> = Arc::new(event_handler);
let ctx = Arc::as_ptr(&event_handler) as *mut c_void;
let retain_ctx = {
unsafe extern "C"
fn retain<F>(ctx: *mut c_void) {
Arc::<F>::increment_strong_count(ctx.cast());
}
retain::<F>
};
let release_ctx = {
unsafe extern "C"
fn release<F>(ctx: *mut c_void) {
drop(Arc::<F>::from_raw(ctx.cast()));
}
release::<F>
};
#[allow(improper_ctypes_definitions)] // false positive
unsafe extern "C"
fn c_cb<F : EventHandler> (ctx: *mut c_void, p: ChangeHandlerWithDocsDiff)
{
let ctx: *mut F = ctx.cast();
// Note: this assumes non-concurrent calls from the FFI, which is currently true.
// Should it not be the case, we'd have to `Mutex`-wrap the `F` so as to `.lock()`,
// here (or require `EventHandler : Sync + Fn`, which would make the caller be the one
// having to do that, but it would be a breaking change).
let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
let event = if p.is_initial { // more explicit flag for "initial" status
LiveQueryEvent::Initial
} else {
let moves: Vec<LiveQueryMove> =
p .moves
.as_ref()
.unwrap()
.as_slice()
.chunks_exact(2)
.map(|it| if let [from, to] = *it {
LiveQueryMove { from, to }
} else { unreachable!() })
.collect()
;
LiveQueryEvent::Update {
old_documents: p.old_documents.unwrap().into(),
insertions: p.insertions.unwrap().into(),
deletions: p.deletions.unwrap().into(),
updates: p.updates.unwrap().into(),
moves,
}
};
event_handler(p.documents.into(), event);
}
let mut this = LiveQuery {
ditto,
query,
collection_name,
subscription: None,
id: UNASSIGNED_ID_SENTINEL,
};
let id = &mut this.id;
let availability = LiveQueryAvailability::Always;
*id = unsafe {
ffi_sdk::ditto_live_query_register_str(
&*strong_ditto,
this.collection_name.as_ref(),
this.query.as_ref(),
query_args.as_ref().map(|qa| (&qa[..]).into()),
order_by.into(),
limit,
offset,
availability,
ctx,
Some(retain_ctx),
Some(release_ctx),
c_cb::<F>,
).ok_or(ErrorKind::InvalidInput)?
};
if *id == UNASSIGNED_ID_SENTINEL {
error!("live query was not given a valid id");
} else {
debug!(%id, "live query id");
}
// start the query
{
ffi_sdk::ditto_live_query_start(&*strong_ditto, *id);
}
Ok(this)
}
}
impl Drop for LiveQuery {
fn drop(self: &'_ mut LiveQuery) {
// Signals to the FFI that it is now free to *eventually* release the
// live query's event handler.
debug!(id = %self.id, "dropping LiveQuery");
if let Some(ditto) = self.ditto.upgrade() {
ffi_sdk::ditto_live_query_stop(&*ditto, self.id);
}
}
}
impl crate::observer::Observer for LiveQuery {}