1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
//! [`Subscription`]s are used to register interest in receiving updates
//! for specified [`DittoDocument`]s.
use_prelude!();
use std::sync::Arc;
use ffi_sdk::COrderByParam;
use tracing::{debug, error};
use crate::{
ditto::{DittoHandleWrapper, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
};
/// Use [`cursor_op.subscribe()`] or [`id_op.subscribe()`] to sync documents from remote peers.
///
/// Holding a `Subscription` will cause documents matching the query to continue to sync
/// from remote peers.
///
/// Dropping a `Subscription` will cancel the sync of this query's documents from remote peers.
///
/// [`cursor_op.subscribe()`]: crate::store::query_builder::PendingCursorOperation::subscribe
/// [`id_op.subscribe()`]: crate::store::query_builder::PendingIdSpecificOperation::subscribe
#[must_use = "Dropping a `Subscription` will cancel it"]
pub struct Subscription {
pub(super) ditto: WeakDittoHandleWrapper,
pub(super) collection_name: char_p::Box,
pub(super) query: char_p::Box,
pub(super) query_args: Option<Vec<u8>>,
pub(super) order_by: Vec<(String, QuerySortDirection)>,
pub(super) limit: i32,
pub(super) offset: u32,
}
impl Subscription {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
ditto: DittoHandleWrapper,
collection_name: char_p::Box,
query: &str,
query_args: Option<Vec<u8>>,
order_by: &'_ [COrderByParam<'_>],
limit: i32,
offset: u32,
) -> Self {
let query = char_p::new(query);
{
let code = ffi_sdk::ditto_add_subscription(
&ditto,
collection_name.as_ref(),
query.as_ref(),
query_args.as_ref().map(|qa| (&qa[..]).into()),
order_by.into(),
limit,
offset,
);
if code != 0 {
error!(
collection = %collection_name,
%query,
error = %DittoError::from_ffi(ErrorKind::InvalidInput),
"error adding subscription",
);
}
};
let order_by: Vec<(String, QuerySortDirection)> = order_by
.iter()
// TODO: Doing `o.query_c_str.to_owned().into_string()` fails with
// an InvalidNulTerminator error, but AFAICT it should be equivalent
// to the below?
.map(|o| (o.query_c_str.to_str_with_null().to_string(), o.direction))
.collect();
Subscription {
ditto: Arc::downgrade(&ditto),
collection_name,
query,
query_args,
order_by,
limit,
offset,
}
}
}
#[allow(deprecated)]
impl crate::observer::Observer for Subscription {}
impl Drop for Subscription {
fn drop(&mut self) {
debug!(
collection = %self.collection_name,
query = %self.query,
"dropping subscription",
);
if let Some(ditto) = self.ditto.upgrade() {
{
let order_by = self
.order_by
.iter()
.map(|query_and_direction| COrderByParam {
query_c_str: query_and_direction
.0
.as_str()
.try_into()
.expect("valid string"),
direction: query_and_direction.1,
})
.collect::<Vec<COrderByParam<'_>>>();
let code = ffi_sdk::ditto_remove_subscription(
&ditto,
self.collection_name.as_ref(),
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
order_by[..].into(),
self.limit,
self.offset,
);
if code != 0 {
error!(
collection = %self.collection_name,
query = %self.query,
error = %DittoError::from_ffi(ErrorKind::InvalidInput),
"error removing subscription",
);
}
}
}
}
}