dittolive_ditto/sync/sync_subscription.rs
1use std::{
2 cmp::Ordering,
3 hash::{self, Hash},
4};
5
6use ffi_sdk::{
7 ffi_utils::{c_slice, char_p, repr_c},
8 FfiSyncSubscription,
9};
10
11use crate::{
12 ditto::Ditto,
13 error::DittoError,
14 utils::{extension_traits::FfiResultIntoRustResult, zstr::zstr},
15};
16
17/// Use [`ditto.sync().register_subscription(...)`] to create a `SyncSubscription`
18///
19/// The subscription will remain active until either:
20///
21/// - the `SyncSubscription` is explicitly canceled via [`.cancel()`], or
22/// - the owning [`Ditto`] object goes out of scope
23///
24/// [See the `sync` module documentation for more details][0].
25///
26/// [`ditto.sync().register_subscription(...)`]: crate::sync::Sync::register_subscription
27/// [`.cancel()`]: Self::cancel
28/// [0]: crate::sync
29pub struct SyncSubscription {
30 pub(crate) handle: repr_c::Box<FfiSyncSubscription>,
31}
32
33impl SyncSubscription {
34 pub(crate) fn new(
35 ditto: &Ditto,
36 query: &zstr,
37 query_args: Option<&[u8]>,
38 ) -> Result<Self, DittoError> {
39 let handle = ffi_sdk::dittoffi_sync_register_subscription_throws(
40 &ditto.ditto,
41 query.into(),
42 query_args.map(|a| a.into()),
43 )
44 .into_rust_result()?;
45
46 Ok(Self { handle })
47 }
48
49 /// Returns the DQL query string that this [`SyncSubscription`] is subscribed to.
50 ///
51 /// # Example
52 ///
53 /// ```
54 /// use dittolive_ditto::prelude::*;
55 ///
56 /// # fn main() -> anyhow::Result<()> {
57 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
58 /// let sync_subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
59 ///
60 /// assert_eq!(sync_subscription.query_string(), "SELECT * FROM cars");
61 /// # Ok(())
62 /// # }
63 /// ```
64 pub fn query_string(&self) -> String {
65 let cbox: char_p::Box = ffi_sdk::dittoffi_sync_subscription_query_string(&self.handle);
66 cbox.into_string()
67 }
68
69 /// Returns the DQL query arguments that this [`SyncSubscription`] is subscribed to.
70 ///
71 /// # Example
72 ///
73 /// ```
74 /// use dittolive_ditto::prelude::*;
75 ///
76 /// # fn main() -> anyhow::Result<()> {
77 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
78 /// let sync_subscription = ditto
79 /// .sync()
80 /// .register_subscription((
81 /// "SELECT * FROM cars WHERE color = :color",
82 /// serde_json::json!({
83 /// "color": "red",
84 /// })
85 /// ))?;
86 ///
87 /// let maybe_args = sync_subscription.query_arguments();
88 /// let args = maybe_args.expect("expected query arguments");
89 /// let args_json = serde_json::to_value(&args)?;
90 ///
91 /// assert_eq!(args_json, serde_json::json!({
92 /// "color": "red",
93 /// }));
94 ///
95 /// # Ok(())
96 /// # }
97 /// ```
98 pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
99 let buffer: c_slice::Box<u8> =
100 ffi_sdk::dittoffi_sync_subscription_query_arguments_cbor(&self.handle)?;
101
102 let cbor = serde_cbor::from_slice(buffer.as_slice())
103 .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
104 Some(cbor)
105 }
106
107 /// Returns the DQL query arguments as raw CBOR-encoded bytes.
108 ///
109 /// # Example
110 ///
111 /// ```
112 /// use dittolive_ditto::prelude::*;
113 ///
114 /// # fn main() -> anyhow::Result<()> {
115 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
116 /// let sync_subscription = ditto
117 /// .sync()
118 /// .register_subscription((
119 /// "SELECT * FROM cars WHERE color = :color",
120 /// serde_json::json!({
121 /// "color": "red",
122 /// })
123 /// ))?;
124 ///
125 /// let maybe_args = sync_subscription.query_arguments_cbor_data();
126 /// let args = maybe_args.expect("expected query arguments");
127 /// let args_json: serde_json::Value = serde_cbor::from_slice(&args)?;
128 ///
129 /// assert_eq!(args_json, serde_json::json!({
130 /// "color": "red",
131 /// }));
132 ///
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub fn query_arguments_cbor_data(&self) -> Option<Vec<u8>> {
137 let buffer: c_slice::Box<u8> =
138 ffi_sdk::dittoffi_sync_subscription_query_arguments_cbor(&self.handle)?;
139 Some(buffer.as_slice().to_vec())
140 }
141
142 /// Returns the DQL query arguments as a JSON string.
143 ///
144 /// # Example
145 ///
146 /// ```
147 /// use dittolive_ditto::prelude::*;
148 ///
149 /// # fn main() -> anyhow::Result<()> {
150 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
151 /// let args = serde_json::json!({
152 /// "color": "red",
153 /// });
154 /// let query = (
155 /// "SELECT * FROM cars WHERE color = :color",
156 /// args.clone(),
157 /// );
158 /// let sync_subscription = ditto.sync().register_subscription(query)?;
159 ///
160 /// let maybe_args = sync_subscription.query_arguments_json_str();
161 /// let args_str = maybe_args.expect("expected query arguments");
162 ///
163 /// assert_eq!(args_str, serde_json::to_string(&args).unwrap());
164 ///
165 /// # Ok(())
166 /// # }
167 /// ```
168 pub fn query_arguments_json_str(&self) -> Option<String> {
169 let buffer: c_slice::Box<u8> =
170 ffi_sdk::dittoffi_sync_subscription_query_arguments_json(&self.handle)?;
171
172 let json = String::from_utf8(buffer.as_slice().to_vec())
173 .unwrap_or_else(|error| panic!("bug: failed to deserialize JSON from FFI: {error}"));
174 Some(json)
175 }
176
177 /// Cancels this [`SyncSubscription`], so that changes matching the query are no longer
178 /// synced from other peers to this one.
179 ///
180 /// # Example
181 ///
182 /// ```
183 /// use dittolive_ditto::Ditto;
184 ///
185 /// # fn main() -> anyhow::Result<()> {
186 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
187 /// let subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
188 /// assert!(!subscription.is_cancelled());
189 ///
190 /// subscription.cancel();
191 /// assert!(subscription.is_cancelled());
192 /// # Ok(())
193 /// # }
194 /// ```
195 pub fn cancel(&self) {
196 ffi_sdk::dittoffi_sync_subscription_cancel(&self.handle);
197 }
198
199 /// Returns `true` if this [`SyncSubscription`] has been cancelled, `false` otherwise.
200 ///
201 /// # Example
202 ///
203 /// ```
204 /// use dittolive_ditto::Ditto;
205 ///
206 /// # fn main() -> anyhow::Result<()> {
207 /// # let (_root, ditto) = dittolive_ditto::doctest_helpers::doctest_ditto();
208 /// let subscription = ditto.sync().register_subscription("SELECT * FROM cars")?;
209 /// assert!(!subscription.is_cancelled());
210 ///
211 /// subscription.cancel();
212 /// assert!(subscription.is_cancelled());
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub fn is_cancelled(&self) -> bool {
217 ffi_sdk::dittoffi_sync_subscription_is_cancelled(&self.handle)
218 }
219
220 /// Intentionally left non-public as the ID representation is an implementation detail
221 ///
222 /// The only reason this is here is to power the Ord/Hash/Debug impls
223 fn id(&self) -> impl '_ + Ord + Hash + core::fmt::Debug {
224 ffi_sdk::dittoffi_sync_subscription_id(&self.handle)
225 }
226}
227
228impl std::fmt::Debug for SyncSubscription {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 f.debug_struct("SyncSubscription")
231 .field("id", &self.id())
232 .finish_non_exhaustive()
233 }
234}
235
236impl std::fmt::Display for SyncSubscription {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 std::fmt::Debug::fmt(self, f)
239 }
240}
241
242impl Ord for SyncSubscription {
243 fn cmp(&self, other: &Self) -> Ordering {
244 Ord::cmp(&self.id(), &other.id())
245 }
246}
247
248impl PartialOrd for SyncSubscription {
249 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
250 Some(Ord::cmp(self, other))
251 }
252}
253
254impl Eq for SyncSubscription {}
255impl PartialEq for SyncSubscription {
256 fn eq(&self, other: &Self) -> bool {
257 self.id() == other.id()
258 }
259}
260
261impl Hash for SyncSubscription {
262 fn hash<H: hash::Hasher>(&self, h: &mut H) {
263 self.id().hash(h)
264 }
265}