1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
use_prelude!();

use std::collections::BTreeMap;

use ffi_sdk::COrderByParam;

use crate::{
    ditto::{TryUpgrade, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
    store::{collection::document_id::DocumentId, live_query::LiveQuery, update::UpdateResult},
    subscription::Subscription,
};

pub struct PendingCursorOperation<'order_by> {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) query: char_p::Box,
    pub(super) query_args: Option<Vec<u8>>,
    pub(super) offset: u32,
    pub(super) limit: i32,
    pub(super) order_by: Vec<COrderByParam<'order_by>>,
}

impl<'order_by> PendingCursorOperation<'order_by> {
    pub fn new(
        ditto: WeakDittoHandleWrapper,
        collection_name: char_p::Box,
        query: &str,
        query_args: Option<Vec<u8>>,
    ) -> Self {
        let query = char_p::new(query);
        Self {
            ditto,
            collection_name,
            query,
            query_args,
            offset: 0,
            limit: -1,
            order_by: vec![],
        }
    }

    /// Execute the query generated by the preceding function chaining and
    /// return the list of matching documents. This occurs immediately.
    pub fn exec(&self) -> Result<Vec<BoxedDocument>, DittoError> {
        self.exec_internal(None)
    }

    fn exec_internal(
        &self,
        txn: Option<&'_ mut ffi_sdk::CWriteTransaction>,
    ) -> Result<Vec<BoxedDocument>, DittoError> {
        let ditto = self.ditto.try_upgrade()?;
        unsafe {
            ffi_sdk::ditto_collection_exec_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
        }
        .ok_or(ErrorKind::InvalidInput)
        .map(|it| it.into())
    }

    /// Enables you to subscribe to changes that occur on a collection. Having a
    /// subscription acts as a signal to others that you are interested in
    /// receiving updates when local or remote changes are made to documents
    /// that match the query generated by the chain of operations
    /// that precedes the call to subscribe. The returned DittoSubscription
    /// object must be kept in scope for as long as you want to keep
    /// receiving updates.
    /// # Panics
    /// Panics if Ditto has been closed.
    pub fn subscribe(&self) -> Subscription {
        let ditto = self.ditto.try_upgrade().unwrap();
        Subscription::new(
            ditto,
            self.collection_name.clone(),
            self.query.to_str(),
            self.query_args.clone(),
            &self.order_by,
            self.limit,
            self.offset,
        )
    }

    /// Update the document with the matching ID.
    /// * `updater` - a Fn which will be called on _all_ matching documents
    ///
    /// Note that fetching the documents occurs in one transaction and then
    /// applying `updater` to _all_ fetched documents occurs in a single,
    /// second transaction.
    pub fn update<Updater>(
        &self,
        updater: Updater,
    ) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
    where
        Updater: Fn(&mut [BoxedDocument]),
    {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto).ok()? };

        let mut docs = self.exec_internal(Some(&mut write_txn))?;

        // Apply the closure to the document,
        updater(&mut docs);
        let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();

        let code = unsafe {
            ffi_sdk::ditto_collection_update_multiple(
                &ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                docs.into(),
            )
        };
        if code != 0 {
            unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn) };
            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
        }
        unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn) };
        Ok(diff)
    }

    /// Limit the number of documents that get returned when querying a
    /// collection for matching documents.
    pub fn limit(&mut self, limit: i32) -> &mut Self {
        self.limit = limit;
        self
    }

    /// Offset the resulting set of matching documents. This is useful if you
    /// aren’t interested in the first N matching documents for one reason
    /// or another. For example, you might already have queried the
    /// collection and obtained the first 20 matching documents and so you might
    /// want to run the same query as you did previously but ignore the first 20
    /// matching documents, and that is where you would use offset.
    pub fn offset(&mut self, offset: u32) -> &mut Self {
        self.offset = offset;
        self
    }

    /// Remove all documents that match the query generated by the preceding
    /// function chaining. Returns the IDs of all documents removed
    pub fn remove(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto).ok()? };

        let ids = unsafe {
            ffi_sdk::ditto_collection_remove_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        unsafe {
            ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        }
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Evict all documents that match the query generated by the preceding
    /// function chaining.
    pub fn evict(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto).ok()? };

        let ids = unsafe {
            ffi_sdk::ditto_collection_evict_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        unsafe {
            ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        }
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Enables you to listen for changes that occur on a collection. This won’t
    /// subscribe to receive changes made remotely by others and so it will only
    /// fire updates when a local change is made. If you want to receive
    /// remotely performed updates as well then you need to also call subscribe
    /// with the relevant query. The returned DittoLiveQuery object must be kept
    /// in scope for as long as you want the provided eventHandler to be called
    /// when an update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: EventHandler,
    {
        LiveQuery::with_handler(
            self.ditto.clone(),
            self.query.clone(),
            self.query_args.clone(),
            self.collection_name.clone(),
            &self.order_by,
            self.limit,
            self.offset,
            None,
            handler,
        )
    }

    // FIXME: To bring this in line with the other SDKs this should accept a single
    // "order_by" expression, which should then be added to the `order_by` vec
    /// Sort the documents that match the query provided in the preceding
    /// find-like function call.
    pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
        self.order_by = sort_param;
        self
    }
}