1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use_prelude!();

use ffi_sdk::COrderByParam;

use super::event::CollectionsEvent;
use crate::{
    ditto::WeakDittoHandleWrapper, error::DittoError,
    store::collection::pending_cursor_operation::*, subscription::Subscription,
};

trait_alias! {
    /// A closure which is called on each event relating to changes in the known
    /// about collections.
    ///
    /// Ditto may call this handler in parallel such as when a second invocation
    /// happens before the first invocation has completed. This typically occurs
    /// when the closure makes slow, external calls.
    pub
    trait CollectionsEventHandler =
        FnMut(CollectionsEvent) // callable sequentially
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

/// These objects are returned when calling `collections`[crate::store::Store::collections] on
/// `Store`.
///
/// They allow chaining of further collections-related functions. You can either call `exec` on the
/// object to get an array of `Collection`s as an immediate return value, or you can establish
/// either a live query or a subscription, which both work over time.
///
/// A live query, established by calling `observe_local`, will notify you every time there's a
/// change in the collections that the device knows about.
///
/// A subscription, established by calling `subscribe`, will act as a signal to other peers that
/// the device connects to that you would like to receive updates from them about the collections
/// that they know about.
///
/// Typically, an app would set up a `subscribe` in some part of the application which is long-lived
/// to ensure the device receives updates from the mesh. These updates will be automatically
/// received and written into the local store. Elsewhere, where you need to use this data, an
/// `observe_local` can be used to notify you of the data, and all subsequent changes to the data.
pub struct PendingCollectionsOperation<'order_by> {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) pending_cursor_operation: PendingCursorOperation<'order_by>,
}

impl<'order_by> PendingCollectionsOperation<'order_by> {
    pub(crate) fn new(ditto: WeakDittoHandleWrapper) -> Self {
        let pending_cursor_operation =
            PendingCursorOperation::new(ditto.clone(), char_p::new("__collections"), "true", None);
        Self {
            ditto,
            pending_cursor_operation,
        }
    }

    /// Limit the number of documents that get returned when querying a
    /// collection for matching documents.
    pub fn limit(&mut self, limit: i32) -> &mut Self {
        self.pending_cursor_operation.limit(limit);
        self
    }

    /// Offset the resulting set of matching documents. This is useful if you
    /// aren’t interested in the first N matching documents for one reason
    /// or another. For example, you might already have queried the
    /// collection and obtained the first 20 matching documents and so you might
    /// want to run the same query as you did previously but ignore the first 20
    /// matching documents, and that is where you would use offset.
    pub fn offset(&mut self, offset: u32) -> &mut Self {
        self.pending_cursor_operation.offset(offset);
        self
    }

    // FIXME: To bring this in line with the other SDKs this should accept a single
    // "order_by" expression, which should then be added to the `order_by` vec
    /// Sort the documents that match the query provided in the preceding
    /// find-like function call.
    pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
        self.pending_cursor_operation.sort(sort_param);
        self
    }

    /// Execute the query generated by the preceding function chaining and
    /// return the list of matching documents. This occurs immediately.
    pub fn exec(&self) -> Result<Vec<Collection>, DittoError> {
        let docs = self.pending_cursor_operation.exec()?;
        Ok(PendingCollectionsOperation::collections_from_docs(
            docs,
            self.ditto.clone(),
        ))
    }

    /// Enables you to subscribe to changes that occur on a collection. Having a subscription acts
    /// as a signal to others that you are interested in receiving updates when local or remote
    /// changes are made to documents that match the query generated by the chain of operations that
    /// precedes the call to subscribe. The returned [`Subscription`](crate::prelude::Subscription)
    /// object must be kept in scope for as long as you want to keep receiving updates.
    pub fn subscribe(&self) -> Subscription {
        self.pending_cursor_operation.subscribe()
    }

    /// Enables you to listen for changes that occur on a collection. This won’t subscribe to
    /// receive changes made remotely by others and so it will only fire updates when a local change
    /// is made. If you want to receive remotely performed updates as well then also call subscribe
    /// with the relevant query. The returned [`LiveQuery`](crate::prelude::Subscription) object
    /// must be kept in scope for as long as you want the provided `Handler` to be called when an
    /// update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: CollectionsEventHandler,
    {
        let ditto = self.ditto.clone();
        let mut handler = handler;
        let handler = move |docs: Vec<BoxedDocument>, event: LiveQueryEvent| {
            let colls_ev = PendingCollectionsOperation::collections_event_from_docs_and_event(
                docs,
                event,
                ditto.clone(),
            );
            handler(colls_ev);
        };
        self.pending_cursor_operation.observe_local(handler)
    }

    fn collections_from_docs(
        docs: Vec<BoxedDocument>,
        ditto: WeakDittoHandleWrapper,
    ) -> Vec<Collection> {
        docs.iter()
            .filter_map(|doc| {
                doc.get::<String>("name")
                    .ok()
                    .map(|coll_name| Collection::new(ditto.clone(), coll_name))
            })
            .collect()
    }

    fn collections_event_from_docs_and_event(
        docs: Vec<BoxedDocument>,
        event: LiveQueryEvent,
        ditto: WeakDittoHandleWrapper,
    ) -> CollectionsEvent {
        match event {
            LiveQueryEvent::Initial => CollectionsEvent::initial(
                PendingCollectionsOperation::collections_from_docs(docs, ditto),
            ),
            LiveQueryEvent::Update {
                old_documents,
                insertions,
                deletions,
                updates,
                moves,
            } => CollectionsEvent::update(
                PendingCollectionsOperation::collections_from_docs(docs, ditto.clone()),
                PendingCollectionsOperation::collections_from_docs(old_documents, ditto),
                insertions,
                deletions,
                updates,
                moves,
            ),
        }
    }
}