dittolive_ditto/store/mod.rs
1//! Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
2//!
3//! The `Store` provides two interfaces for interacting with data: Ditto Query Language
4//! (DQL), and the legacy "Query Builder" API. Where possible, we recommend developing new
5//! functionality using DQL, as we will eventually deprecate Query Builder.
6//!
7//! - [See the `dql` module docs for examples of DQL queries in action][0]
8//!
9//! [`ditto.store()`]: crate::prelude::Ditto::store
10//! [0]: crate::dql
11
12use_prelude!();
13
14use std::{
15 ops::Deref,
16 sync::{
17 atomic::{self, AtomicU64},
18 RwLock, Weak,
19 },
20};
21
22use ffi_sdk::FfiStoreObserver;
23
24use crate::{debug, error};
25
26pub mod attachment;
27mod document_id;
28mod observer;
29pub mod transactions;
30
31use self::attachment::{DittoAttachmentFetcher, FetcherVersion};
32pub use self::{
33 document_id::DocumentId,
34 observer::{ChangeHandler, ChangeHandlerWithSignalNext, SignalNext, StoreObserver},
35};
36use crate::{
37 ditto::DittoFields,
38 dql::{query::IntoQuery, *},
39 error::{DittoError, ErrorKind},
40 utils::{extension_traits::FfiResultIntoRustResult, SetArc},
41};
42
43type CancelToken = u64;
44
45/// Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
46///
47/// [See the `store` module for guide-level docs and examples][0].
48///
49/// [`ditto.store()`]: crate::prelude::Ditto::store
50/// [0]: crate::store
51#[derive(Clone)]
52pub struct Store {
53 ditto: Arc<ffi_sdk::BoxedDitto>,
54 // FIXME(Daniel): unify this field with `.ditto`
55 weak_ditto_fields: Weak<DittoFields>,
56 #[allow(clippy::type_complexity)]
57 attachment_fetchers: Arc<
58 RwLock<HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>>,
59 >,
60}
61
62impl Store {
63 pub(crate) fn new(
64 ditto: Arc<ffi_sdk::BoxedDitto>,
65 weak_ditto_fields: Weak<DittoFields>,
66 ) -> Self {
67 Self {
68 ditto,
69 weak_ditto_fields,
70 attachment_fetchers: <_>::default(),
71 }
72 }
73
74 /// Installs and returns a store observer for a query, configuring Ditto to
75 /// trigger the passed in change handler whenever documents in the local
76 /// store change such that the result of the matching query changes. The
77 /// passed in query must be a `SELECT` query, otherwise an error will be
78 /// returned.
79 ///
80 /// # Example
81 ///
82 /// ```
83 /// use dittolive_ditto::prelude::*;
84 /// # fn main() -> anyhow::Result<()> {
85 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
86 ///
87 /// let store = ditto.store();
88 /// let _observer = store.register_observer(
89 /// "SELECT * FROM cars WHERE color = 'blue'",
90 /// move |query_result| {
91 /// for item in query_result.iter() {
92 /// // ... handle each item
93 /// }
94 /// },
95 /// )?;
96 /// # Ok(())
97 /// # }
98 /// ```
99 ///
100 /// The first invocation of the change handler will always happen after
101 /// this method has returned.
102 ///
103 /// The observer will remain active until:
104 ///
105 /// - the [`StoreObserver`] handle gets dropped,
106 /// - the [`observer.cancel()`] method is called, or
107 /// - the owning [`Ditto`] instance has shut down
108 ///
109 /// Observer callbacks will never be called concurrently. That is, one callback
110 /// must return before the observer will call the handler again. See
111 /// [`ditto.store().register_observer_with_signal_next(...)`] if you want
112 /// to manually signal readiness for the next callback.
113 ///
114 /// [`ditto.store().register_observer_with_signal_next(...)`]: crate::store::Store::register_observer_with_signal_next
115 /// [`observer.cancel()`]: crate::store::StoreObserver::cancel
116 /// [`Ditto`]: crate::prelude::Ditto
117 pub fn register_observer<Q, F>(
118 &self,
119 query: Q,
120 on_change: F,
121 ) -> Result<Arc<StoreObserver>, DittoError>
122 where
123 Q: IntoQuery,
124 Q::Args: Serialize,
125 F: ChangeHandler,
126 {
127 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
128 let query = query.into_query()?;
129
130 let observer = Arc::new(StoreObserver::new(
131 &ditto,
132 &query.string,
133 query.args_cbor.as_deref(),
134 on_change,
135 )?);
136 Ok(observer)
137 }
138
139 /// Installs and returns a store observer for a query, configuring Ditto to
140 /// trigger the passed in change handler whenever documents in the local
141 /// store change such that the result of the matching query changes. The
142 /// passed in query must be a `SELECT` query, otherwise an error will be
143 /// returned.
144 ///
145 /// Here, a function is passed as an additional argument to the change
146 /// handler. This allows the change handler to control how frequently
147 /// it is called. See [`register_observer`] for a convenience method that
148 /// automatically signals the next invocation.
149 ///
150 /// # Example
151 ///
152 /// ```
153 /// use dittolive_ditto::prelude::*;
154 /// # fn main() -> anyhow::Result<()> {
155 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
156 ///
157 /// let store = ditto.store();
158 /// let _observer = store.register_observer_with_signal_next(
159 /// "SELECT * FROM cars WHERE color = 'blue'",
160 /// move |query_result, signal_next| {
161 /// for item in query_result.iter() {
162 /// // ... handle each item
163 /// }
164 ///
165 /// // Call `signal_next` when you're ready for the next callback
166 /// signal_next();
167 /// },
168 /// )?;
169 /// # Ok(())
170 /// # }
171 /// ```
172 ///
173 /// The first invocation of the change handler will always happen after
174 /// this method has returned.
175 ///
176 /// The observer will remain active until:
177 ///
178 /// - the [`StoreObserver`] handle gets dropped,
179 /// - the [`StoreObserver::cancel`] method is called, or
180 /// - the owning [`Ditto`] instance has shut down
181 ///
182 /// After invoking the callback once, the observer will wait to deliver
183 /// another callback until after you've called [`signal_next`].
184 ///
185 /// [`register_observer`]: Store::register_observer
186 /// [`signal_next`]: crate::store::SignalNext
187 pub fn register_observer_with_signal_next<Q, F>(
188 &self,
189 query: Q,
190 on_change: F,
191 ) -> Result<Arc<StoreObserver>, DittoError>
192 where
193 Q: IntoQuery,
194 Q::Args: Serialize,
195 F: ChangeHandlerWithSignalNext,
196 {
197 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
198 let query = query.into_query()?;
199
200 let new_obs = Arc::new(StoreObserver::with_signal_next(
201 &ditto,
202 &query.string,
203 query.args_cbor.as_deref(),
204 on_change,
205 )?);
206 Ok(new_obs)
207 }
208
209 /// Gets temporary access to the set of currently registered observers.
210 ///
211 /// A (read) lock is held until the return value is dropped: this means
212 /// that neither [`Self::register_observer()`] nor
213 /// [`StoreObserver::cancel()`] can make progress until this read
214 /// lock is released.
215 pub fn observers(&self) -> impl '_ + Deref<Target = SetArc<StoreObserver>> {
216 let observers: repr_c::Vec<repr_c::Box<FfiStoreObserver>> =
217 ffi_sdk::dittoffi_store_observers(&self.ditto);
218 let observers: Vec<_> = observers.into();
219 let observers = observers
220 .into_iter()
221 .map(|handle: repr_c::Box<FfiStoreObserver>| Arc::new(StoreObserver { handle }))
222 .collect::<SetArc<_>>();
223
224 Box::new(observers)
225 }
226
227 /// Executes the given query in the local store and returns the result.
228 ///
229 /// # Example
230 ///
231 /// ```
232 /// use dittolive_ditto::prelude::*;
233 /// use dittolive_ditto::dql::QueryResult;
234 /// # #[tokio::main]
235 /// # async fn main() -> anyhow::Result<()> {
236 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
237 ///
238 /// // Query a collection
239 /// let result = ditto.store().execute("SELECT * FROM cars").await?;
240 ///
241 /// // Insert a document into a collection
242 /// let insert_result: QueryResult = ditto
243 /// .store()
244 /// .execute((
245 /// "INSERT INTO cars DOCUMENTS (:newCar)",
246 /// serde_json::json!({
247 /// "newCar": {
248 /// "make": "ford",
249 /// "color": "blue"
250 /// }
251 /// })
252 /// ))
253 /// .await?;
254 /// # Ok(())
255 /// # }
256 /// ```
257 ///
258 /// Use placeholders to incorporate values from the optional `query_args`
259 /// parameter into the query. The keys of the query arguments object must
260 /// match the placeholders used within the query. You can not use placeholders
261 /// in the `FROM` clause.
262 ///
263 /// This method only returns results from the local store without waiting for any
264 /// [`SyncSubscription`]s to have caught up with the
265 /// latest changes. Only use this method if your program must proceed with immediate results.
266 ///
267 /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
268 /// as soon as they have been synced to this peer.
269 ///
270 /// ## Query parameter
271 ///
272 /// The `query` parameter must implement [`IntoQuery`], which is a trait that is implemented by
273 /// objects that can be turned into a query string along with the relevant query arguments.
274 ///
275 /// For queries with no arguments, a [`String`] is sufficient.
276 ///
277 /// [`SyncSubscription`]: crate::sync::SyncSubscription
278 /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
279 pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
280 where
281 Q: IntoQuery,
282 Q::Args: serde::Serialize,
283 {
284 let query = query.into_query()?;
285 let query_string = (&*query.string).into();
286 let query_args = query.args_cbor.as_deref().map(Into::into);
287
288 let ffi_query_result =
289 ffi_sdk::dittoffi_try_exec_statement(&self.ditto, query_string, query_args)
290 .into_rust_result()?;
291
292 Ok(QueryResult::from(ffi_query_result))
293 }
294
295 /// Creates a new attachment, which can then be inserted into a document.
296 ///
297 /// The file residing at the provided path will be copied into Ditto’s store. The
298 /// [`DittoAttachment`] object that is returned is what you can
299 /// then use to insert an attachment into a document.
300 ///
301 /// You can provide custom user data about the attachment, which will be replicated to other
302 /// peers alongside the file attachment.
303 pub async fn new_attachment(
304 &self,
305 filepath: &(impl ?Sized + AsRef<Path>),
306 user_data: HashMap<String, String>,
307 ) -> Result<DittoAttachment, DittoError> {
308 DittoAttachment::from_file_and_metadata(filepath, user_data, &self.ditto)
309 }
310
311 /// Creates a new attachment from in-memory data
312 ///
313 /// Refer to [`new_attachment`](Self::new_attachment) for additional information.
314 pub async fn new_attachment_from_bytes(
315 &self,
316 bytes: &(impl ?Sized + AsRef<[u8]>),
317 user_data: HashMap<String, String>,
318 ) -> Result<DittoAttachment, DittoError> {
319 DittoAttachment::from_bytes_and_metadata(bytes, user_data, &self.ditto)
320 }
321
322 /// Fetches the attachment corresponding to the provided attachment token.
323 /// - `attachment_token`: can be either a [`DittoAttachmentToken`], or a `&BTreeMap<CborValue,
324 /// CborValue>`, that is, the output of a [`QueryResultItem::value()`] once casted
325 /// [`.as_object()`][crate::prelude::CborValueGetters::as_object()].
326 ///
327 /// - `on_fetch_event`: A closure that will be called when the status of the request to fetch
328 /// the attachment has changed. If the attachment is already available then this will be
329 /// called almost immediately with a completed status value.
330 ///
331 /// The returned [`DittoAttachmentFetcher`] is a handle which is safe to discard, unless you
332 /// wish to be able to [`.cancel()`][DittoAttachmentFetcher::cancel] the fetching operation.
333 /// When not explicitly cancelled, the fetching operation will remain active until it either
334 /// completes, the attachment is deleted, or the owning [`Ditto`] object is dropped.
335 pub fn fetch_attachment(
336 &self,
337 attachment_token: impl attachment::DittoAttachmentTokenLike,
338 on_fetch_event: impl 'static + Send + Sync + Fn(DittoAttachmentFetchEvent),
339 ) -> Result<DittoAttachmentFetcher<'static, FetcherVersion::V2>, DittoError> {
340 let attachment_token = attachment_token.parse_attachment_token()?;
341
342 let weak_ditto = self.weak_ditto_fields.clone();
343 let ditto = weak_ditto
344 .upgrade()
345 .ok_or(ErrorKind::ReleasedDittoInstance)?;
346
347 let mut attachment_fetchers_lockguard = self.attachment_fetchers.write().unwrap();
348 let fetcher = DittoAttachmentFetcher::new(
349 attachment_token,
350 Some(&ditto),
351 &self.ditto,
352 // Shim around `on_fetch_event` to `cancel` on completion.
353 move |event, cancel_token: &AtomicU64| {
354 let has_finished = matches! {
355 event,
356 | DittoAttachmentFetchEvent::Completed { .. }
357 | DittoAttachmentFetchEvent::Deleted
358 };
359 on_fetch_event(event);
360 if has_finished {
361 if let Some(ditto) = weak_ditto.upgrade() {
362 let mut attachment_fetchers_inner_lockguard =
363 ditto.store.attachment_fetchers.write().unwrap();
364 // Relaxed is fine thanks to the lock.
365 let cancel_token = cancel_token.load(atomic::Ordering::Relaxed);
366 ditto.store.unregister_fetcher(
367 cancel_token,
368 Some(&mut *attachment_fetchers_inner_lockguard),
369 );
370 }
371 }
372 },
373 )?;
374 let (cancel_token, was_zero) = fetcher.cancel_token_ensure_unique();
375 attachment_fetchers_lockguard.insert(cancel_token, (was_zero, fetcher.clone()));
376 Ok(fetcher)
377 }
378
379 fn unregister_fetcher(
380 &self,
381 mut fetcher_cancel_token: CancelToken,
382 fetchers: Option<
383 &mut HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>,
384 >,
385 ) -> bool {
386 let mut lock_guard = None;
387 let fetchers = fetchers.unwrap_or_else(|| {
388 &mut **lock_guard.get_or_insert(self.attachment_fetchers.write().unwrap())
389 });
390
391 let Some((was_zero, removed_fetcher)) = fetchers.remove(&fetcher_cancel_token) else {
392 return false;
393 };
394 drop(lock_guard);
395
396 if was_zero {
397 fetcher_cancel_token = 0;
398 }
399
400 let att_token = &removed_fetcher.context.token;
401
402 #[allow(deprecated)] // Workaround for patched tracing
403 {
404 debug!(
405 token_id = %att_token.id(),
406 %fetcher_cancel_token,
407 "unregistering ditto attachment fetcher"
408 );
409 }
410
411 let status = ffi_sdk::ditto_cancel_resolve_attachment(
412 &self.ditto,
413 att_token.id.as_ref().into(),
414 fetcher_cancel_token,
415 );
416
417 if status != 0 {
418 #[allow(deprecated)] // Workaround for patched tracing
419 {
420 error!(
421 token_id = %att_token.id(),
422 %fetcher_cancel_token,
423 "failed to clean up attachment fetcher"
424 );
425 }
426 }
427 status == 0
428 }
429
430 /// Gets a copy of the set of currently registered attachment fetchers.
431 ///
432 /// A (read) lock is held during the copy: this contends with [`Self::fetch_attachment()`] and
433 /// with [`DittoAttachmentFetcher::cancel()`].
434 pub fn attachment_fetchers(&self) -> Vec<DittoAttachmentFetcher<'static, FetcherVersion::V2>> {
435 self.attachment_fetchers
436 .read()
437 .unwrap()
438 .iter()
439 .map(|(_, (_, fetcher))| fetcher.clone())
440 .collect()
441 }
442}
443
444/// Specify the order of returned Documents in a query.
445#[non_exhaustive]
446#[derive(Clone, Copy, PartialEq, Eq, Debug)]
447pub enum SortDirection {
448 /// First result is "smallest", last result is "largest"
449 Ascending,
450
451 /// First result is "largest", last result is "smallest"
452 Descending,
453}