use_prelude!();
use std::collections::BTreeMap;
use ffi_sdk::COrderByParam;
use serde::Serialize;
use super::*;
use crate::{
error::{DittoError, ErrorKind},
ffi_sdk,
store::{update::UpdateResult, WriteStrategy},
};
pub struct ScopedCollection<'coll, 'batch> {
pub(super) batch: &'coll mut ScopedStore<'batch>,
pub(super) collection_name: char_p::Box,
}
impl ScopedCollection<'_, '_> {
pub fn upsert<V: ::serde::Serialize, C: Borrow<V>>(
&'_ mut self,
content: C,
) -> Result<&'_ DocumentId, DittoError> {
self.insert_cbor(
::serde_cbor::to_vec(content.borrow()).unwrap().as_slice(),
WriteStrategy::Merge,
)
}
pub fn upsert_with_strategy<V: ::serde::Serialize, C: Borrow<V>>(
&'_ mut self,
content: C,
write_strategy: WriteStrategy,
) -> Result<&'_ DocumentId, DittoError> {
self.insert_cbor(
::serde_cbor::to_vec(content.borrow()).unwrap().as_slice(),
write_strategy,
)
}
fn insert_cbor(
&'_ mut self,
cbor: &'_ [u8],
write_strategy: WriteStrategy,
) -> Result<&'_ DocumentId, DittoError> {
let coll = Collection {
ditto: Arc::downgrade(&self.batch.store.ditto),
collection_name: self.collection_name.clone(),
};
let doc_id = coll.insert_cbor(cbor, write_strategy, Some(self.batch.txn))?;
self.batch.results.push(WriteTransactionResult {
doc_id,
collection_name: self.collection_name.to_owned(),
kind: DocChangeKind::Inserted,
});
Ok(&self.batch.results.last().unwrap().doc_id)
}
}
impl<'coll, 'batch> ScopedCollection<'coll, 'batch> {
pub fn find<'find, 'order_by>(
self: &'find mut ScopedCollection<'coll, 'batch>,
query: &'_ str,
) -> BatchCursorOperation<'find, 'coll, 'batch, 'order_by> {
let query = char_p::new(query);
BatchCursorOperation {
query,
query_args: None,
collection: self,
offset: 0,
limit: -1,
order_by: vec![],
}
}
pub fn find_with_args<'find, 'order_by, V: ::serde::Serialize, C: Borrow<V>>(
self: &'find mut ScopedCollection<'coll, 'batch>,
query: &'_ str,
query_args: C,
) -> BatchCursorOperation<'find, 'coll, 'batch, 'order_by> {
let query = char_p::new(query);
BatchCursorOperation {
query,
query_args: Some(serde_cbor::to_vec(query_args.borrow()).unwrap()),
collection: self,
offset: 0,
limit: -1,
order_by: vec![],
}
}
pub fn find_all<'find, 'order_by>(
self: &'find mut ScopedCollection<'coll, 'batch>,
) -> BatchCursorOperation<'find, 'coll, 'batch, 'order_by> {
self.find("true")
}
pub fn find_by_id(
self: &'_ mut ScopedCollection<'coll, 'batch>, document_id: DocumentId,
) -> BatchIdOperation<'_, 'coll, 'batch> {
BatchIdOperation {
collection: self,
document_id,
}
}
}
pub struct BatchCursorOperation<'find, 'coll, 'batch, 'order_by> {
query: char_p::Box,
query_args: Option<Vec<u8>>,
collection: &'find mut ScopedCollection<'coll, 'batch>,
offset: u32,
limit: i32,
order_by: Vec<COrderByParam<'order_by>>,
}
impl<'order_by> BatchCursorOperation<'_, '_, '_, 'order_by> {
pub fn exec(&mut self) -> Result<Vec<BoxedDocument>, DittoError> {
{
ffi_sdk::ditto_collection_exec_query_str(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
Some(self.collection.batch.txn),
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
}
.ok_or(ErrorKind::InvalidInput)
.map(|c_vec| c_vec.into())
}
pub fn update<Updater>(
self,
updater: Updater,
) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
where
Updater: FnOnce(&mut [BoxedDocument]),
{
let mut docs = {
ffi_sdk::ditto_collection_exec_query_str(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
Some(self.collection.batch.txn),
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
updater(&mut docs);
let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();
let code = {
ffi_sdk::ditto_collection_update_multiple(
&self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
docs,
)
};
if code != 0 {
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
Ok(diff)
}
pub fn limit(&mut self, limit: i32) -> &mut Self {
self.limit = limit;
self
}
pub fn offset(&mut self, offset: u32) -> &mut Self {
self.offset = offset;
self
}
pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
self.order_by = sort_param;
self
}
pub fn remove(&mut self) -> Result<Vec<DocumentId>, DittoError> {
let ids = {
ffi_sdk::ditto_collection_remove_query_str(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|id| id.to::<Box<[u8]>>().into())
.collect())
}
pub fn evict(&mut self) -> Result<Vec<DocumentId>, DittoError> {
let ids = {
ffi_sdk::ditto_collection_evict_query_str(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|s| s.to::<Box<[u8]>>().into())
.collect())
}
}
pub struct BatchIdOperation<'find_by_id, 'coll, 'batch> {
collection: &'find_by_id mut ScopedCollection<'coll, 'batch>,
pub(super) document_id: DocumentId,
}
impl BatchIdOperation<'_, '_, '_> {
pub fn remove(self) -> Result<(), DittoError> {
let removed = {
ffi_sdk::ditto_collection_remove(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
self.document_id.as_ref().into(),
)
.ok()?
};
if removed.not() {
return Err(DittoError::from_ffi(ErrorKind::NonExtant));
}
self.collection.batch.results.push(WriteTransactionResult {
doc_id: self.document_id.clone(),
collection_name: self.collection.collection_name.to_owned(),
kind: DocChangeKind::Removed,
});
Ok(())
}
pub fn evict(&mut self) -> Result<(), DittoError> {
let evicted = {
ffi_sdk::ditto_collection_evict(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
self.document_id.as_ref().into(),
)
.ok()?
};
if evicted.not() {
return Err(DittoError::from_ffi(ErrorKind::NonExtant));
}
self.collection.batch.results.push(WriteTransactionResult {
doc_id: self.document_id.clone(),
collection_name: self.collection.collection_name.to_owned(),
kind: DocChangeKind::Evicted,
});
Ok(())
}
pub fn update<Updater>(self, updater: Updater) -> Result<Vec<UpdateResult>, DittoError>
where
Updater: FnOnce(Option<&mut BoxedDocument>), {
let mut document = Some({
let mut read_txn =
ffi_sdk::ditto_read_transaction(&*self.collection.batch.store.ditto).ok()?;
ffi_sdk::ditto_collection_get(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.document_id.as_ref().into(),
&mut read_txn,
)
.ok_or(ErrorKind::NonExtant)?
});
updater(document.as_mut());
let diff = Vec::with_capacity(0); if let Some(doc) = document {
ffi_sdk::ditto_collection_update(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
doc,
);
self.collection.batch.results.push(WriteTransactionResult {
doc_id: self.document_id.clone(),
collection_name: self.collection.collection_name.to_owned(),
kind: DocChangeKind::Updated,
});
Ok(diff)
} else {
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn update_doc<T>(self, new_value: &T) -> Result<(), DittoError>
where
T: Serialize,
{
let mut document: BoxedDocument = {
let mut read_txn =
ffi_sdk::ditto_read_transaction(&*self.collection.batch.store.ditto).ok()?;
ffi_sdk::ditto_collection_get(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.document_id.as_ref().into(),
&mut read_txn,
)
.ok_or(ErrorKind::NonExtant)?
};
let new_content = ::serde_cbor::to_vec(new_value).unwrap();
if ffi_sdk::ditto_document_update(&mut document, new_content.as_slice().into()) != 0 {
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
let status = ffi_sdk::ditto_collection_update(
&*self.collection.batch.store.ditto,
self.collection.collection_name.as_ref(),
self.collection.batch.txn,
document,
);
if status != 0 {
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
} else {
Ok(())
}
}
}