dittolive_ditto/store/attachment/
fetcher.rs

1use_prelude!();
2
3use core::ffi::c_void;
4use std::sync::{
5    atomic::{self, AtomicU64},
6    Arc, Weak,
7};
8
9use ffi_sdk::BoxedAttachmentHandle;
10
11use crate::{
12    error,
13    error::DittoError,
14    store::{
15        attachment::{DittoAttachment, DittoAttachmentFetchEvent, DittoAttachmentToken},
16        DittoFields,
17    },
18};
19
20/// Type-level `enum` to distinguish between a [`DittoAttachmentFetcher`] that
21/// can and must be cancelled, and the legacy behavior where it was auto-cancelled on drop.
22///
23/// ```rust ,ignore
24/// // It represents the following, but at the type-level ("const generic" sort to speak).
25/// enum FetcherVersion /* : Sealed */ {
26///     /// The deprecated `.collection.fetch_attachment(...)` API.
27///     V1,
28///
29///     /// The proper `.store.fetch_attachment(...)` API.
30///     V2,
31/// }
32/// ```
33#[allow(nonstandard_style)]
34pub mod FetcherVersion {
35    /// The deprecated V1 API (removed).
36    pub enum V1 {}
37    /// The proper [`store.fetch_attachment(...)`][crate::store::Store::fetch_attachment()] API.
38    pub enum V2 {}
39
40    mod seal {
41        pub trait Sealed: 'static {}
42    }
43    pub(crate) use seal::Sealed;
44
45    impl Sealed for V1 {}
46    impl Sealed for V2 {}
47}
48
49/// The output of [`store.fetch_attachment()`](crate::store::Store::fetch_attachment).
50///
51///   - In the deprecated [`FetcherVersion::V1`] case, they must be kept alive for the fetching of
52///     the attachment to proceed and for you to be notified once the status of the fetch request
53///     has changed.
54///
55///   - In the proper [`FetcherVersion::V2`] case, they are only
56///     [`.cancel()`][DittoAttachmentFetcher::cancel]-ed *explicitly*, that is, they are safe to
57///     discard / not to safe-keep.
58pub struct DittoAttachmentFetcher<'a, Version: FetcherVersion::Sealed = FetcherVersion::V1> {
59    pub(crate) context: Arc<DittoAttachmentFetcherCtx<'a>>,
60    _phantom: ::core::marker::PhantomData<fn() -> Version>,
61}
62
63impl Clone for DittoAttachmentFetcher<'static, FetcherVersion::V2> {
64    fn clone(&self) -> Self {
65        Self {
66            context: self.context.retain(),
67            ..*self
68        }
69    }
70}
71
72impl<'a, Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'a, Version> {
73    pub(crate) fn new(
74        token: DittoAttachmentToken,
75        ditto: Option<&Arc<DittoFields>>,
76        raw_ditto: &Arc<BoxedDitto>,
77        on_fetch_event: impl 'a + Send + Sync + Fn(DittoAttachmentFetchEvent, &AtomicU64),
78    ) -> Result<Self, DittoError> {
79        let context = Arc::new(DittoAttachmentFetcherCtx {
80            token: token.clone(),
81            ditto: ditto.map_or_else(::std::sync::Weak::new, Arc::downgrade),
82            raw_ditto: Arc::downgrade(raw_ditto),
83            on_fetch_event: Box::new(on_fetch_event),
84            cancel_token: 0.into(),
85        });
86        let raw_context = Arc::as_ptr(&context) as *mut c_void;
87
88        let cancel_token = unsafe {
89            ffi_sdk::ditto_resolve_attachment(
90                raw_ditto,
91                token.id.as_ref().into(),
92                raw_context,
93                Some(DittoAttachmentFetcherCtx::retain),
94                Some(DittoAttachmentFetcherCtx::release),
95                DittoAttachmentFetcherCtx::on_complete_cb,
96                DittoAttachmentFetcherCtx::on_progress_cb,
97                DittoAttachmentFetcherCtx::on_deleted_cb,
98            )
99            .ok()?
100        };
101        // HACK: until `dittoffi` exposes the `cancel_token` / `resolve_id`
102        // to the callback itself, we manually back-smuggle it using the shared
103        // context.
104        //
105        // This value is later accessed in the callback registered by
106        // `Store::fetch_attachment()`, in the `on_complete` case, after having
107        // acquired a lock which happens to be held during this whole `new()` call,
108        // hence why `Relaxed` suffices.
109        context
110            .cancel_token
111            .store(cancel_token as _, atomic::Ordering::Relaxed);
112
113        Ok(Self {
114            context,
115            _phantom: ::core::marker::PhantomData,
116        })
117    }
118}
119
120impl<Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'_, Version> {
121    pub(crate) fn cancel_token(&self) -> u64 {
122        self.context.cancel_token.load(atomic::Ordering::SeqCst)
123    }
124}
125
126impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
127    /// Yields `true` if the original `cancel_token` was `0`.
128    pub(crate) fn cancel_token_ensure_unique(&self) -> (u64, bool) {
129        let mut cancel_token = self.cancel_token();
130        let was_zero = cancel_token == 0;
131        if was_zero {
132            // Currently, FFI `.resolve_attachment()` will yield `0` for fetchers
133            // it believes to have invoked synchronously (fast path); even
134            // though our callback is —outside of Wasm— dispatching these
135            // invocations onto the `attachments_signal_sender` queue.
136            //
137            // So, to keep the property of unicity of these tokens across
138            // the lifetime of a ditto instance (TODO: process-wide?), we
139            // currently hack a fallback unicity mechanism. The `core`
140            // auto-increments off `1..`, so let us do `(..-2).rev()`.
141            static FALLBACK_UNIQUE_CANCEL_TOKEN: AtomicU64 = AtomicU64::new(u64::MAX - 1);
142            cancel_token =
143                FALLBACK_UNIQUE_CANCEL_TOKEN.fetch_sub(1, ::std::sync::atomic::Ordering::Relaxed);
144            self.context
145                .cancel_token
146                .store(cancel_token, atomic::Ordering::SeqCst);
147        };
148        (cancel_token, was_zero)
149    }
150}
151
152impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
153    /// Stops fetching the fetcher's associated attachment and cleans up any
154    /// associated resources.
155    ///
156    /// Note that you are not required to call it once your attachment
157    /// fetch operation has finished. The method primarily exists to allow you
158    /// to cancel an attachment fetch request while it is ongoing if you no
159    /// longer wish for the attachment to be made available locally to the
160    /// device nor for its evolution to be observed.
161    pub fn cancel(&self) {
162        if let Some(ditto) = self.context.ditto.upgrade() {
163            ditto.store.unregister_fetcher(self.cancel_token(), None);
164        }
165    }
166}
167
168// Does nothing for V2.
169impl<V1: FetcherVersion::Sealed> Drop for DittoAttachmentFetcher<'_, V1> {
170    fn drop(&mut self) {
171        /// Poorman's specialization.
172        use core::any::TypeId;
173
174        if TypeId::of::<V1>() == TypeId::of::<FetcherVersion::V1>() {
175            if let Some(ditto) = self.context.raw_ditto.upgrade() {
176                let status = ffi_sdk::ditto_cancel_resolve_attachment(
177                    &ditto,
178                    self.context.token.id.as_ref().into(),
179                    self.cancel_token(),
180                );
181                if status != 0 {
182                    #[allow(deprecated)] // Workaround for patched tracing
183                    {
184                        error!("failed to clean up attachment fetcher");
185                    }
186                }
187            }
188        }
189    }
190}
191
192pub(crate) struct DittoAttachmentFetcherCtx<'a> {
193    pub(crate) token: DittoAttachmentToken,
194    ditto: Weak<DittoFields>,
195    raw_ditto: Weak<BoxedDitto>,
196    #[allow(clippy::type_complexity)]
197    on_fetch_event: Box<dyn Fn(DittoAttachmentFetchEvent, &AtomicU64) + Send + Sync + 'a>,
198    cancel_token: atomic::AtomicU64,
199}
200
201impl DittoAttachmentFetcherCtx<'_> {
202    pub(crate) unsafe extern "C" fn retain(ctx: *mut c_void) {
203        let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
204        Arc::increment_strong_count(ptr);
205    }
206
207    pub(crate) unsafe extern "C" fn release(ctx: *mut c_void) {
208        let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
209        Arc::decrement_strong_count(ptr);
210    }
211
212    pub(crate) unsafe extern "C" fn on_complete_cb(
213        ctx: *mut c_void,
214        attachment_handle: BoxedAttachmentHandle,
215    ) {
216        let ctx_ref = ctx
217            .cast::<DittoAttachmentFetcherCtx<'_>>()
218            .as_ref()
219            .expect("got null");
220
221        let ditto_attachment = DittoAttachment::new_with_token(
222            ctx_ref.token.clone(),
223            ctx_ref.raw_ditto.clone(),
224            attachment_handle,
225        );
226        let event = DittoAttachmentFetchEvent::Completed {
227            attachment: ditto_attachment,
228        };
229        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
230    }
231
232    pub(crate) unsafe extern "C" fn on_progress_cb(
233        ctx: *mut c_void,
234        downloaded_bytes: u64,
235        total_bytes: u64,
236    ) {
237        let ctx_ref = ctx
238            .cast::<DittoAttachmentFetcherCtx<'_>>()
239            .as_ref()
240            .expect("got null");
241
242        let event = DittoAttachmentFetchEvent::Progress {
243            downloaded_bytes,
244            total_bytes,
245        };
246        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
247    }
248
249    pub(crate) unsafe extern "C" fn on_deleted_cb(ctx: *mut c_void) {
250        let ctx_ref = ctx
251            .cast::<DittoAttachmentFetcherCtx<'_>>()
252            .as_ref()
253            .expect("got null");
254
255        let event = DittoAttachmentFetchEvent::Deleted;
256        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
257    }
258}