1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
use_prelude!();
use std::collections::BTreeMap;
use ffi_sdk::COrderByParam;
use crate::{
ditto::{TryUpgrade, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
store::{collection::document_id::DocumentId, live_query::LiveQuery, update::UpdateResult},
subscription::Subscription,
};
/// These objects are returned when using `find`-like functionality on `Collection`s.
///
/// They allow chaining of further query-related functions to do things like add a limit to the
/// number of documents you want returned or specify how you want the documents to be sorted and
/// ordered. You can either call `exec` on the object to get an array of `Document`s as an
/// immediate return value, or you can establish either a live query or a subscription, which both
/// work over time.
///
/// A live query, established by calling `observe_local`, will notify you every time there's an
/// update to a document that matches the query you provided in the preceding `find`-like call.
///
/// A subscription, established by calling `subscribe`, will act as a signal to other peers that
/// the device connects to that you would like to receive updates from them about documents
/// that match the query you provided in the preceding `find`-like call.
///
/// Typically, an app would set up a `subscribe` in some part of the application which is long-lived
/// to ensure the device receives updates from the mesh. These updates will be automatically
/// received and written into the local store. Elsewhere, where you need to use this data, an
/// `observeLocal` can be used to notify you of the data, and all subsequent changes to the data.
///
/// Update and remove functionality is also exposed through this object.
pub struct PendingCursorOperation<'order_by> {
pub(super) ditto: WeakDittoHandleWrapper,
pub(super) collection_name: char_p::Box,
pub(super) query: char_p::Box,
pub(super) query_args: Option<Vec<u8>>,
pub(super) offset: u32,
pub(super) limit: i32,
pub(super) order_by: Vec<COrderByParam<'order_by>>,
}
impl<'order_by> PendingCursorOperation<'order_by> {
// TODO(pub_check)
pub fn new(
ditto: WeakDittoHandleWrapper,
collection_name: char_p::Box,
query: &str,
query_args: Option<Vec<u8>>,
) -> Self {
let query = char_p::new(query);
Self {
ditto,
collection_name,
query,
query_args,
offset: 0,
limit: -1,
order_by: vec![],
}
}
/// Execute the query generated by the preceding function chaining and
/// return the list of matching documents. This occurs immediately.
pub fn exec(&self) -> Result<Vec<BoxedDocument>, DittoError> {
self.exec_internal(None)
}
fn exec_internal(
&self,
txn: Option<&'_ mut ffi_sdk::CWriteTransaction>,
) -> Result<Vec<BoxedDocument>, DittoError> {
let ditto = self.ditto.try_upgrade()?;
{
ffi_sdk::ditto_collection_exec_query_str(
&*ditto,
self.collection_name.as_ref(),
txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
}
.ok_or(ErrorKind::InvalidInput)
.map(|it| it.into())
}
/// Enables you to subscribe to changes that occur on a collection.
///
/// Having a subscription acts as a signal to others that you are interested in receiving
/// updates when local or remote changes are made to documents that match the query generated by
/// the chain of operations that precedes the call to subscribe. The returned
/// [`Subscription`](crate::prelude::Subscription) object must be kept in scope for as long as
/// you want to keep receiving updates.
///
/// # Panics
/// Panics if Ditto has been closed.
pub fn subscribe(&self) -> Subscription {
let ditto = self.ditto.try_upgrade().unwrap();
Subscription::new(
ditto,
self.collection_name.clone(),
self.query.to_str(),
self.query_args.clone(),
&self.order_by,
self.limit,
self.offset,
)
}
/// Update the document with the matching document Id.
///
/// - `updater`: am `Fn` which will be called on _all_ matching documents.
pub fn update<Updater>(
&self,
updater: Updater,
) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
where
Updater: Fn(&mut [BoxedDocument]),
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let mut docs = self.exec_internal(Some(&mut write_txn))?;
// Apply the closure to the document,
updater(&mut docs);
let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();
let code = {
ffi_sdk::ditto_collection_update_multiple(
&ditto,
self.collection_name.as_ref(),
&mut write_txn,
docs.into(),
)
};
if code != 0 {
ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn);
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(diff)
}
/// Limit the number of documents that get returned when querying a
/// collection for matching documents.
pub fn limit(&mut self, limit: i32) -> &mut Self {
self.limit = limit;
self
}
/// Offset the resulting set of matching documents. This is useful if you
/// aren’t interested in the first N matching documents for one reason
/// or another. For example, you might already have queried the
/// collection and obtained the first 20 matching documents and so you might
/// want to run the same query as you did previously but ignore the first 20
/// matching documents, and that is where you would use offset.
pub fn offset(&mut self, offset: u32) -> &mut Self {
self.offset = offset;
self
}
/// Remove all documents that match the query generated by the preceding
/// function chaining. Returns the IDs of all documents removed
pub fn remove(&self) -> Result<Vec<DocumentId>, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let ids = {
ffi_sdk::ditto_collection_remove_query_str(
&*ditto,
self.collection_name.as_ref(),
&mut write_txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|s| s.to::<Box<[u8]>>().into())
.collect())
}
/// Evict all documents that match the query generated by the preceding
/// function chaining.
pub fn evict(&self) -> Result<Vec<DocumentId>, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let ids = {
ffi_sdk::ditto_collection_evict_query_str(
&*ditto,
self.collection_name.as_ref(),
&mut write_txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|s| s.to::<Box<[u8]>>().into())
.collect())
}
/// Enables you to listen for changes that occur on a collection.
///
/// This won’t subscribe to receive changes made remotely by others and so it will only fire
/// updates when a local change is made. If you want to receive remotely performed updates as
/// well then you need to also call [`subscribe`](PendingCursorOperation::subscribe) with the
/// relevant query. The returned [`LiveQuery`](crate::prelude::LiveQuery) object must be kept in
/// scope for as long as you want the provided `Handler` to be called when an update occurs.
pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
where
Handler: EventHandler,
{
LiveQuery::with_handler(
self.ditto.clone(),
self.query.clone(),
self.query_args.clone(),
self.collection_name.clone(),
&self.order_by,
self.limit,
self.offset,
handler,
)
}
// FIXME: To bring this in line with the other SDKs this should accept a single
// "order_by" expression, which should then be added to the `order_by` vec
/// Sort the documents that match the query provided in the preceding
/// find-like function call.
pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
self.order_by = sort_param;
self
}
}