dittolive_ditto/sync/
sync_subscription.rs

1use std::{
2    cmp::Ordering,
3    hash::{self, Hash},
4};
5
6use ffi_sdk::{
7    ffi_utils::{c_slice, char_p, repr_c},
8    FfiSyncSubscription,
9};
10
11use crate::{
12    ditto::Ditto,
13    error::DittoError,
14    utils::{extension_traits::FfiResultIntoRustResult, zstr::zstr},
15};
16
17/// Use [`ditto.sync().register_subscription(...)`] to create a `SyncSubscription`
18///
19/// The subscription will remain active until either:
20///
21/// - the `SyncSubscription` is explicitly canceled via [`.cancel()`], or
22/// - the owning [`Ditto`] object goes out of scope
23///
24/// [See the `sync` module documentation for more details][0].
25///
26/// [`ditto.sync().register_subscription(...)`]: crate::sync::Sync::register_subscription
27/// [`.cancel()`]: Self::cancel
28/// [0]: crate::sync
29pub struct SyncSubscription {
30    pub(crate) handle: repr_c::Box<FfiSyncSubscription>,
31}
32
33impl SyncSubscription {
34    pub(crate) fn new(
35        ditto: &Ditto,
36        query: &zstr,
37        query_args: Option<&[u8]>,
38    ) -> Result<Self, DittoError> {
39        let handle = ffi_sdk::dittoffi_sync_register_subscription_throws(
40            &ditto.ditto,
41            query.into(),
42            query_args.map(|a| a.into()),
43        )
44        .into_rust_result()?;
45
46        Ok(Self { handle })
47    }
48
49    /// Returns the DQL query string that this [`SyncSubscription`] is subscribed to.
50    ///
51    /// # Example
52    ///
53    /// ```
54    /// use dittolive_ditto::prelude::*;
55    ///
56    /// # fn main() -> anyhow::Result<()> {
57    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
58    /// let sync_subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
59    ///
60    /// assert_eq!(sync_subscription.query_string(), "SELECT * FROM cars");
61    /// # Ok(())
62    /// # }
63    /// ```
64    pub fn query_string(&self) -> String {
65        let cbox: char_p::Box = ffi_sdk::dittoffi_sync_subscription_query_string(&self.handle);
66        cbox.into_string()
67    }
68
69    /// Returns the DQL query arguments that this [`SyncSubscription`] is subscribed to.
70    ///
71    /// # Example
72    ///
73    /// ```
74    /// use dittolive_ditto::prelude::*;
75    ///
76    /// # fn main() -> anyhow::Result<()> {
77    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
78    /// let sync_subscription = ditto
79    ///     .sync()
80    ///     .register_subscription((
81    ///         "SELECT * FROM cars WHERE color = :color",
82    ///         serde_json::json!({
83    ///             "color": "red",
84    ///         })
85    ///     ))?;
86    ///
87    /// let maybe_args = sync_subscription.query_arguments();
88    /// let args = maybe_args.expect("expected query arguments");
89    /// let args_json = serde_json::to_value(&args)?;
90    ///
91    /// assert_eq!(args_json, serde_json::json!({
92    ///    "color": "red",
93    /// }));
94    ///
95    /// # Ok(())
96    /// # }
97    /// ```
98    pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
99        let buffer: c_slice::Box<u8> =
100            ffi_sdk::dittoffi_sync_subscription_query_arguments_cbor(&self.handle)?;
101
102        let cbor = serde_cbor::from_slice(buffer.as_slice())
103            .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
104        Some(cbor)
105    }
106
107    /// Returns the DQL query arguments as raw CBOR-encoded bytes.
108    ///
109    /// # Example
110    ///
111    /// ```
112    /// use dittolive_ditto::prelude::*;
113    ///
114    /// # fn main() -> anyhow::Result<()> {
115    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
116    /// let sync_subscription = ditto
117    ///     .sync()
118    ///     .register_subscription((
119    ///         "SELECT * FROM cars WHERE color = :color",
120    ///         serde_json::json!({
121    ///             "color": "red",
122    ///         })
123    ///     ))?;
124    ///
125    /// let maybe_args = sync_subscription.query_arguments_cbor_data();
126    /// let args = maybe_args.expect("expected query arguments");
127    /// let args_json: serde_json::Value = serde_cbor::from_slice(&args)?;
128    ///
129    /// assert_eq!(args_json, serde_json::json!({
130    ///    "color": "red",
131    /// }));
132    ///
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub fn query_arguments_cbor_data(&self) -> Option<Vec<u8>> {
137        let buffer: c_slice::Box<u8> =
138            ffi_sdk::dittoffi_sync_subscription_query_arguments_cbor(&self.handle)?;
139        Some(buffer.as_slice().to_vec())
140    }
141
142    /// Returns the DQL query arguments as a JSON string.
143    ///
144    /// # Example
145    ///
146    /// ```
147    /// use dittolive_ditto::prelude::*;
148    ///
149    /// # fn main() -> anyhow::Result<()> {
150    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
151    /// let args = serde_json::json!({
152    ///     "color": "red",
153    /// });
154    /// let query = (
155    ///     "SELECT * FROM cars WHERE color = :color",
156    ///     args.clone(),
157    /// );
158    /// let sync_subscription = ditto.sync().register_subscription(query)?;
159    ///
160    /// let maybe_args = sync_subscription.query_arguments_json_str();
161    /// let args_str = maybe_args.expect("expected query arguments");
162    ///
163    /// assert_eq!(args_str, serde_json::to_string(&args).unwrap());
164    ///
165    /// # Ok(())
166    /// # }
167    /// ```
168    pub fn query_arguments_json_str(&self) -> Option<String> {
169        let buffer: c_slice::Box<u8> =
170            ffi_sdk::dittoffi_sync_subscription_query_arguments_json(&self.handle)?;
171
172        let json = String::from_utf8(buffer.as_slice().to_vec())
173            .unwrap_or_else(|error| panic!("bug: failed to deserialize JSON from FFI: {error}"));
174        Some(json)
175    }
176
177    /// Cancels this [`SyncSubscription`], so that changes matching the query are no longer
178    /// synced from other peers to this one.
179    ///
180    /// # Example
181    ///
182    /// ```
183    /// use dittolive_ditto::Ditto;
184    ///
185    /// # fn main() -> anyhow::Result<()> {
186    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
187    /// let subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
188    /// assert!(!subscription.is_cancelled());
189    ///
190    /// subscription.cancel();
191    /// assert!(subscription.is_cancelled());
192    /// # Ok(())
193    /// # }
194    /// ```
195    pub fn cancel(&self) {
196        ffi_sdk::dittoffi_sync_subscription_cancel(&self.handle);
197    }
198
199    /// Returns `true` if this [`SyncSubscription`] has been cancelled, `false` otherwise.
200    ///
201    /// # Example
202    ///
203    /// ```
204    /// use dittolive_ditto::Ditto;
205    ///
206    /// # fn main() -> anyhow::Result<()> {
207    /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
208    /// let subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
209    /// assert!(!subscription.is_cancelled());
210    ///
211    /// subscription.cancel();
212    /// assert!(subscription.is_cancelled());
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub fn is_cancelled(&self) -> bool {
217        ffi_sdk::dittoffi_sync_subscription_is_cancelled(&self.handle)
218    }
219
220    /// Intentionally left non-public as the ID representation is an implementation detail
221    ///
222    /// The only reason this is here is to power the Ord/Hash/Debug impls
223    fn id(&self) -> impl '_ + Ord + Hash + core::fmt::Debug {
224        ffi_sdk::dittoffi_sync_subscription_id(&self.handle)
225    }
226}
227
228impl std::fmt::Debug for SyncSubscription {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        f.debug_struct("SyncSubscription")
231            .field("id", &self.id())
232            .finish_non_exhaustive()
233    }
234}
235
236impl std::fmt::Display for SyncSubscription {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        std::fmt::Debug::fmt(self, f)
239    }
240}
241
242impl Ord for SyncSubscription {
243    fn cmp(&self, other: &Self) -> Ordering {
244        Ord::cmp(&self.id(), &other.id())
245    }
246}
247
248impl PartialOrd for SyncSubscription {
249    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
250        Some(Ord::cmp(self, other))
251    }
252}
253
254impl Eq for SyncSubscription {}
255impl PartialEq for SyncSubscription {
256    fn eq(&self, other: &Self) -> bool {
257        self.id() == other.id()
258    }
259}
260
261impl Hash for SyncSubscription {
262    fn hash<H: hash::Hasher>(&self, h: &mut H) {
263        self.id().hash(h)
264    }
265}