dittolive_ditto/store/
mod.rs

1//! Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
2//!
3//! The `Store` provides two interfaces for interacting with data: Ditto Query Language
4//! (DQL), and the legacy "Query Builder" API. Where possible, we recommend developing new
5//! functionality using DQL, as we will eventually deprecate Query Builder.
6//!
7//! - [See the `dql` module docs for examples of DQL queries in action][0]
8//!
9//! [`ditto.store()`]: crate::prelude::Ditto::store
10//! [0]: crate::dql
11
12use_prelude!();
13
14use std::{
15    ops::Deref,
16    sync::{
17        atomic::{self, AtomicU64},
18        RwLock, Weak,
19    },
20};
21
22use ffi_sdk::FfiStoreObserver;
23
24use crate::{debug, error};
25
26pub mod attachment;
27mod document_id;
28mod observer;
29pub mod transactions;
30
31use self::attachment::{DittoAttachmentFetcher, FetcherVersion};
32pub use self::{
33    document_id::DocumentId,
34    observer::{ChangeHandler, ChangeHandlerWithSignalNext, SignalNext, StoreObserver},
35};
36use crate::{
37    ditto::DittoFields,
38    dql::{query::IntoQuery, *},
39    error::{DittoError, ErrorKind},
40    utils::{extension_traits::FfiResultIntoRustResult, SetArc},
41};
42
43type CancelToken = u64;
44
45/// Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
46///
47/// [See the `store` module for guide-level docs and examples][0].
48///
49/// [`ditto.store()`]: crate::prelude::Ditto::store
50/// [0]: crate::store
51#[derive(Clone)]
52pub struct Store {
53    ditto: Arc<ffi_sdk::BoxedDitto>,
54    // FIXME(Daniel): unify this field with `.ditto`
55    weak_ditto_fields: Weak<DittoFields>,
56    #[allow(clippy::type_complexity)]
57    attachment_fetchers: Arc<
58        RwLock<HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>>,
59    >,
60}
61
62impl Store {
63    pub(crate) fn new(
64        ditto: Arc<ffi_sdk::BoxedDitto>,
65        weak_ditto_fields: Weak<DittoFields>,
66    ) -> Self {
67        Self {
68            ditto,
69            weak_ditto_fields,
70            attachment_fetchers: <_>::default(),
71        }
72    }
73
74    /// Installs and returns a store observer for a query, configuring Ditto to
75    /// trigger the passed in change handler whenever documents in the local
76    /// store change such that the result of the matching query changes. The
77    /// passed in query must be a `SELECT` query, otherwise an error will be
78    /// returned.
79    ///
80    /// # Example
81    ///
82    /// ```
83    /// use dittolive_ditto::prelude::*;
84    /// # fn main() -> anyhow::Result<()> {
85    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
86    ///
87    /// let store = ditto.store();
88    /// let _observer = store.register_observer(
89    ///     "SELECT * FROM cars WHERE color = 'blue'",
90    ///     move |query_result| {
91    ///         for item in query_result.iter() {
92    ///             // ... handle each item
93    ///         }
94    ///     },
95    /// )?;
96    /// # Ok(())
97    /// # }
98    /// ```
99    ///
100    /// The first invocation of the change handler will always happen after
101    /// this method has returned.
102    ///
103    /// The observer will remain active until:
104    ///
105    /// - the [`StoreObserver`] handle gets dropped,
106    /// - the [`observer.cancel()`] method is called, or
107    /// - the owning [`Ditto`] instance has shut down
108    ///
109    /// Observer callbacks will never be called concurrently. That is, one callback
110    /// must return before the observer will call the handler again. See
111    /// [`ditto.store().register_observer_with_signal_next(...)`] if you want
112    /// to manually signal readiness for the next callback.
113    ///
114    /// [`ditto.store().register_observer_with_signal_next(...)`]: crate::store::Store::register_observer_with_signal_next
115    /// [`observer.cancel()`]: crate::store::StoreObserver::cancel
116    /// [`Ditto`]: crate::prelude::Ditto
117    pub fn register_observer<Q, F>(
118        &self,
119        query: Q,
120        on_change: F,
121    ) -> Result<Arc<StoreObserver>, DittoError>
122    where
123        Q: IntoQuery,
124        Q::Args: Serialize,
125        F: ChangeHandler,
126    {
127        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
128        let query = query.into_query()?;
129
130        let observer = Arc::new(StoreObserver::new(
131            &ditto,
132            &query.string,
133            query.args_cbor.as_deref(),
134            on_change,
135        )?);
136        Ok(observer)
137    }
138
139    /// Installs and returns a store observer for a query, configuring Ditto to
140    /// trigger the passed in change handler whenever documents in the local
141    /// store change such that the result of the matching query changes. The
142    /// passed in query must be a `SELECT` query, otherwise an error will be
143    /// returned.
144    ///
145    /// Here, a function is passed as an additional argument to the change
146    /// handler. This allows the change handler to control how frequently
147    /// it is called. See [`register_observer`] for a convenience method that
148    /// automatically signals the next invocation.
149    ///
150    /// # Example
151    ///
152    /// ```
153    /// use dittolive_ditto::prelude::*;
154    /// # fn main() -> anyhow::Result<()> {
155    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
156    ///
157    /// let store = ditto.store();
158    /// let _observer = store.register_observer_with_signal_next(
159    ///     "SELECT * FROM cars WHERE color = 'blue'",
160    ///     move |query_result, signal_next| {
161    ///         for item in query_result.iter() {
162    ///             // ... handle each item
163    ///         }
164    ///
165    ///         // Call `signal_next` when you're ready for the next callback
166    ///         signal_next();
167    ///     },
168    /// )?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    ///
173    /// The first invocation of the change handler will always happen after
174    /// this method has returned.
175    ///
176    /// The observer will remain active until:
177    ///
178    /// - the [`StoreObserver`] handle gets dropped,
179    /// - the [`StoreObserver::cancel`] method is called, or
180    /// - the owning [`Ditto`] instance has shut down
181    ///
182    /// After invoking the callback once, the observer will wait to deliver
183    /// another callback until after you've called [`signal_next`].
184    ///
185    /// [`register_observer`]: Store::register_observer
186    /// [`signal_next`]: crate::store::SignalNext
187    pub fn register_observer_with_signal_next<Q, F>(
188        &self,
189        query: Q,
190        on_change: F,
191    ) -> Result<Arc<StoreObserver>, DittoError>
192    where
193        Q: IntoQuery,
194        Q::Args: Serialize,
195        F: ChangeHandlerWithSignalNext,
196    {
197        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
198        let query = query.into_query()?;
199
200        let new_obs = Arc::new(StoreObserver::with_signal_next(
201            &ditto,
202            &query.string,
203            query.args_cbor.as_deref(),
204            on_change,
205        )?);
206        Ok(new_obs)
207    }
208
209    /// Gets temporary access to the set of currently registered observers.
210    ///
211    /// A (read) lock is held until the return value is dropped: this means
212    /// that neither [`Self::register_observer()`] nor
213    /// [`StoreObserver::cancel()`] can make progress until this read
214    /// lock is released.
215    pub fn observers(&self) -> impl '_ + Deref<Target = SetArc<StoreObserver>> {
216        let observers: repr_c::Vec<repr_c::Box<FfiStoreObserver>> =
217            ffi_sdk::dittoffi_store_observers(&self.ditto);
218        let observers: Vec<_> = observers.into();
219        let observers = observers
220            .into_iter()
221            .map(|handle: repr_c::Box<FfiStoreObserver>| Arc::new(StoreObserver { handle }))
222            .collect::<SetArc<_>>();
223
224        Box::new(observers)
225    }
226
227    /// Executes the given query in the local store and returns the result.
228    ///
229    /// # Example
230    ///
231    /// ```
232    /// use dittolive_ditto::prelude::*;
233    /// use dittolive_ditto::dql::QueryResult;
234    /// # #[tokio::main]
235    /// # async fn main() -> anyhow::Result<()> {
236    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
237    ///
238    /// // Query a collection
239    /// let result = ditto.store().execute("SELECT * FROM cars").await?;
240    ///
241    /// // Insert a document into a collection
242    /// let insert_result: QueryResult = ditto
243    ///     .store()
244    ///     .execute((
245    ///          "INSERT INTO cars DOCUMENTS (:newCar)",
246    ///          serde_json::json!({
247    ///              "newCar": {
248    ///                  "make": "ford",
249    ///                  "color": "blue"
250    ///              }
251    ///          })
252    ///     ))
253    ///     .await?;
254    /// # Ok(())
255    /// # }
256    /// ```
257    ///
258    /// Use placeholders to incorporate values from the optional `query_args`
259    /// parameter into the query. The keys of the query arguments object must
260    /// match the placeholders used within the query. You can not use placeholders
261    /// in the `FROM` clause.
262    ///
263    /// This method only returns results from the local store without waiting for any
264    /// [`SyncSubscription`]s to have caught up with the
265    /// latest changes. Only use this method if your program must proceed with immediate results.
266    ///
267    /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
268    /// as soon as they have been synced to this peer.
269    ///
270    /// ## Query parameter
271    ///
272    /// The `query` parameter must implement [`IntoQuery`], which is a trait that is implemented by
273    /// objects that can be turned into a query string along with the relevant query arguments.
274    ///
275    /// For queries with no arguments, a [`String`] is sufficient.
276    ///
277    /// [`SyncSubscription`]: crate::sync::SyncSubscription
278    /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
279    pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
280    where
281        Q: IntoQuery,
282        Q::Args: serde::Serialize,
283    {
284        let query = query.into_query()?;
285        let query_string = (&*query.string).into();
286        let query_args = query.args_cbor.as_deref().map(Into::into);
287
288        let ffi_query_result =
289            ffi_sdk::dittoffi_try_exec_statement(&self.ditto, query_string, query_args)
290                .into_rust_result()?;
291
292        Ok(QueryResult::from(ffi_query_result))
293    }
294
295    /// Creates a new attachment, which can then be inserted into a document.
296    ///
297    /// The file residing at the provided path will be copied into Ditto’s store. The
298    /// [`DittoAttachment`] object that is returned is what you can
299    /// then use to insert an attachment into a document.
300    ///
301    /// You can provide custom user data about the attachment, which will be replicated to other
302    /// peers alongside the file attachment.
303    pub async fn new_attachment(
304        &self,
305        filepath: &(impl ?Sized + AsRef<Path>),
306        user_data: HashMap<String, String>,
307    ) -> Result<DittoAttachment, DittoError> {
308        DittoAttachment::from_file_and_metadata(filepath, user_data, &self.ditto)
309    }
310
311    /// Creates a new attachment from in-memory data
312    ///
313    /// Refer to [`new_attachment`](Self::new_attachment) for additional information.
314    pub async fn new_attachment_from_bytes(
315        &self,
316        bytes: &(impl ?Sized + AsRef<[u8]>),
317        user_data: HashMap<String, String>,
318    ) -> Result<DittoAttachment, DittoError> {
319        DittoAttachment::from_bytes_and_metadata(bytes, user_data, &self.ditto)
320    }
321
322    /// Fetches the attachment corresponding to the provided attachment token.
323    /// - `attachment_token`: can be either a [`DittoAttachmentToken`], or a `&BTreeMap<CborValue,
324    ///   CborValue>`, that is, the output of a [`QueryResultItem::value()`] once casted
325    ///   [`.as_object()`][crate::prelude::CborValueGetters::as_object()].
326    ///
327    /// - `on_fetch_event`: A closure that will be called when the status of the request to fetch
328    ///   the attachment has changed. If the attachment is already available then this will be
329    ///   called almost immediately with a completed status value.
330    ///
331    /// The returned [`DittoAttachmentFetcher`] is a handle which is safe to discard, unless you
332    /// wish to be able to [`.cancel()`][DittoAttachmentFetcher::cancel] the fetching operation.
333    /// When not explicitly cancelled, the fetching operation will remain active until it either
334    /// completes, the attachment is deleted, or the owning [`Ditto`] object is dropped.
335    pub fn fetch_attachment(
336        &self,
337        attachment_token: impl attachment::DittoAttachmentTokenLike,
338        on_fetch_event: impl 'static + Send + Sync + Fn(DittoAttachmentFetchEvent),
339    ) -> Result<DittoAttachmentFetcher<'static, FetcherVersion::V2>, DittoError> {
340        let attachment_token = attachment_token.parse_attachment_token()?;
341
342        let weak_ditto = self.weak_ditto_fields.clone();
343        let ditto = weak_ditto
344            .upgrade()
345            .ok_or(ErrorKind::ReleasedDittoInstance)?;
346
347        let mut attachment_fetchers_lockguard = self.attachment_fetchers.write().unwrap();
348        let fetcher = DittoAttachmentFetcher::new(
349            attachment_token,
350            Some(&ditto),
351            &self.ditto,
352            // Shim around `on_fetch_event` to `cancel` on completion.
353            move |event, cancel_token: &AtomicU64| {
354                let has_finished = matches! {
355                    event,
356                    | DittoAttachmentFetchEvent::Completed { .. }
357                    | DittoAttachmentFetchEvent::Deleted
358                };
359                on_fetch_event(event);
360                if has_finished {
361                    if let Some(ditto) = weak_ditto.upgrade() {
362                        let mut attachment_fetchers_inner_lockguard =
363                            ditto.store.attachment_fetchers.write().unwrap();
364                        // Relaxed is fine thanks to the lock.
365                        let cancel_token = cancel_token.load(atomic::Ordering::Relaxed);
366                        ditto.store.unregister_fetcher(
367                            cancel_token,
368                            Some(&mut *attachment_fetchers_inner_lockguard),
369                        );
370                    }
371                }
372            },
373        )?;
374        let (cancel_token, was_zero) = fetcher.cancel_token_ensure_unique();
375        attachment_fetchers_lockguard.insert(cancel_token, (was_zero, fetcher.clone()));
376        Ok(fetcher)
377    }
378
379    fn unregister_fetcher(
380        &self,
381        mut fetcher_cancel_token: CancelToken,
382        fetchers: Option<
383            &mut HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>,
384        >,
385    ) -> bool {
386        let mut lock_guard = None;
387        let fetchers = fetchers.unwrap_or_else(|| {
388            &mut **lock_guard.get_or_insert(self.attachment_fetchers.write().unwrap())
389        });
390
391        let Some((was_zero, removed_fetcher)) = fetchers.remove(&fetcher_cancel_token) else {
392            return false;
393        };
394        drop(lock_guard);
395
396        if was_zero {
397            fetcher_cancel_token = 0;
398        }
399
400        let att_token = &removed_fetcher.context.token;
401
402        #[allow(deprecated)] // Workaround for patched tracing
403        {
404            debug!(
405                token_id = %att_token.id(),
406                %fetcher_cancel_token,
407                "unregistering ditto attachment fetcher"
408            );
409        }
410
411        let status = ffi_sdk::ditto_cancel_resolve_attachment(
412            &self.ditto,
413            att_token.id.as_ref().into(),
414            fetcher_cancel_token,
415        );
416
417        if status != 0 {
418            #[allow(deprecated)] // Workaround for patched tracing
419            {
420                error!(
421                    token_id = %att_token.id(),
422                    %fetcher_cancel_token,
423                    "failed to clean up attachment fetcher"
424                );
425            }
426        }
427        status == 0
428    }
429
430    /// Gets a copy of the set of currently registered attachment fetchers.
431    ///
432    /// A (read) lock is held during the copy: this contends with [`Self::fetch_attachment()`] and
433    /// with [`DittoAttachmentFetcher::cancel()`].
434    pub fn attachment_fetchers(&self) -> Vec<DittoAttachmentFetcher<'static, FetcherVersion::V2>> {
435        self.attachment_fetchers
436            .read()
437            .unwrap()
438            .iter()
439            .map(|(_, (_, fetcher))| fetcher.clone())
440            .collect()
441    }
442}
443
444/// Specify the order of returned Documents in a query.
445#[non_exhaustive]
446#[derive(Clone, Copy, PartialEq, Eq, Debug)]
447pub enum SortDirection {
448    /// First result is "smallest", last result is "largest"
449    Ascending,
450
451    /// First result is "largest", last result is "smallest"
452    Descending,
453}