dittolive_ditto/store/attachment/
fetcher.rs1use_prelude!();
2
3use core::ffi::c_void;
4use std::sync::{
5 atomic::{self, AtomicU64},
6 Arc, Weak,
7};
8
9use ffi_sdk::BoxedAttachmentHandle;
10
11use crate::{
12 error,
13 error::DittoError,
14 store::{
15 attachment::{DittoAttachment, DittoAttachmentFetchEvent, DittoAttachmentToken},
16 DittoFields,
17 },
18};
19
20#[allow(nonstandard_style)]
34pub mod FetcherVersion {
35 pub enum V1 {}
37 pub enum V2 {}
39
40 mod seal {
41 pub trait Sealed: 'static {}
42 }
43 pub(crate) use seal::Sealed;
44
45 impl Sealed for V1 {}
46 impl Sealed for V2 {}
47}
48
49pub struct DittoAttachmentFetcher<'a, Version: FetcherVersion::Sealed = FetcherVersion::V1> {
59 pub(crate) context: Arc<DittoAttachmentFetcherCtx<'a>>,
60 _phantom: ::core::marker::PhantomData<fn() -> Version>,
61}
62
63impl Clone for DittoAttachmentFetcher<'static, FetcherVersion::V2> {
64 fn clone(&self) -> Self {
65 Self {
66 context: self.context.retain(),
67 ..*self
68 }
69 }
70}
71
72impl<'a, Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'a, Version> {
73 pub(crate) fn new(
74 token: DittoAttachmentToken,
75 ditto: Option<&Arc<DittoFields>>,
76 raw_ditto: &Arc<BoxedDitto>,
77 on_fetch_event: impl 'a + Send + Sync + Fn(DittoAttachmentFetchEvent, &AtomicU64),
78 ) -> Result<Self, DittoError> {
79 let context = Arc::new(DittoAttachmentFetcherCtx {
80 token: token.clone(),
81 ditto: ditto.map_or_else(::std::sync::Weak::new, Arc::downgrade),
82 raw_ditto: Arc::downgrade(raw_ditto),
83 on_fetch_event: Box::new(on_fetch_event),
84 cancel_token: 0.into(),
85 });
86 let raw_context = Arc::as_ptr(&context) as *mut c_void;
87
88 let cancel_token = unsafe {
89 ffi_sdk::ditto_resolve_attachment(
90 raw_ditto,
91 token.id.as_ref().into(),
92 raw_context,
93 Some(DittoAttachmentFetcherCtx::retain),
94 Some(DittoAttachmentFetcherCtx::release),
95 DittoAttachmentFetcherCtx::on_complete_cb,
96 DittoAttachmentFetcherCtx::on_progress_cb,
97 DittoAttachmentFetcherCtx::on_deleted_cb,
98 )
99 .ok()?
100 };
101 context
110 .cancel_token
111 .store(cancel_token as _, atomic::Ordering::Relaxed);
112
113 Ok(Self {
114 context,
115 _phantom: ::core::marker::PhantomData,
116 })
117 }
118}
119
120impl<Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'_, Version> {
121 pub(crate) fn cancel_token(&self) -> u64 {
122 self.context.cancel_token.load(atomic::Ordering::SeqCst)
123 }
124}
125
126impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
127 pub(crate) fn cancel_token_ensure_unique(&self) -> (u64, bool) {
129 let mut cancel_token = self.cancel_token();
130 let was_zero = cancel_token == 0;
131 if was_zero {
132 static FALLBACK_UNIQUE_CANCEL_TOKEN: AtomicU64 = AtomicU64::new(u64::MAX - 1);
142 cancel_token =
143 FALLBACK_UNIQUE_CANCEL_TOKEN.fetch_sub(1, ::std::sync::atomic::Ordering::Relaxed);
144 self.context
145 .cancel_token
146 .store(cancel_token, atomic::Ordering::SeqCst);
147 };
148 (cancel_token, was_zero)
149 }
150}
151
152impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
153 pub fn cancel(&self) {
162 if let Some(ditto) = self.context.ditto.upgrade() {
163 ditto.store.unregister_fetcher(self.cancel_token(), None);
164 }
165 }
166}
167
168impl<V1: FetcherVersion::Sealed> Drop for DittoAttachmentFetcher<'_, V1> {
170 fn drop(&mut self) {
171 use core::any::TypeId;
173
174 if TypeId::of::<V1>() == TypeId::of::<FetcherVersion::V1>() {
175 if let Some(ditto) = self.context.raw_ditto.upgrade() {
176 let status = ffi_sdk::ditto_cancel_resolve_attachment(
177 &ditto,
178 self.context.token.id.as_ref().into(),
179 self.cancel_token(),
180 );
181 if status != 0 {
182 #[allow(deprecated)] {
184 error!("failed to clean up attachment fetcher");
185 }
186 }
187 }
188 }
189 }
190}
191
192pub(crate) struct DittoAttachmentFetcherCtx<'a> {
193 pub(crate) token: DittoAttachmentToken,
194 ditto: Weak<DittoFields>,
195 raw_ditto: Weak<BoxedDitto>,
196 #[allow(clippy::type_complexity)]
197 on_fetch_event: Box<dyn Fn(DittoAttachmentFetchEvent, &AtomicU64) + Send + Sync + 'a>,
198 cancel_token: atomic::AtomicU64,
199}
200
201impl DittoAttachmentFetcherCtx<'_> {
202 pub(crate) unsafe extern "C" fn retain(ctx: *mut c_void) {
203 let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
204 Arc::increment_strong_count(ptr);
205 }
206
207 pub(crate) unsafe extern "C" fn release(ctx: *mut c_void) {
208 let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
209 Arc::decrement_strong_count(ptr);
210 }
211
212 pub(crate) unsafe extern "C" fn on_complete_cb(
213 ctx: *mut c_void,
214 attachment_handle: BoxedAttachmentHandle,
215 ) {
216 let ctx_ref = ctx
217 .cast::<DittoAttachmentFetcherCtx<'_>>()
218 .as_ref()
219 .expect("got null");
220
221 let ditto_attachment = DittoAttachment::new_with_token(
222 ctx_ref.token.clone(),
223 ctx_ref.raw_ditto.clone(),
224 attachment_handle,
225 );
226 let event = DittoAttachmentFetchEvent::Completed {
227 attachment: ditto_attachment,
228 };
229 (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
230 }
231
232 pub(crate) unsafe extern "C" fn on_progress_cb(
233 ctx: *mut c_void,
234 downloaded_bytes: u64,
235 total_bytes: u64,
236 ) {
237 let ctx_ref = ctx
238 .cast::<DittoAttachmentFetcherCtx<'_>>()
239 .as_ref()
240 .expect("got null");
241
242 let event = DittoAttachmentFetchEvent::Progress {
243 downloaded_bytes,
244 total_bytes,
245 };
246 (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
247 }
248
249 pub(crate) unsafe extern "C" fn on_deleted_cb(ctx: *mut c_void) {
250 let ctx_ref = ctx
251 .cast::<DittoAttachmentFetcherCtx<'_>>()
252 .as_ref()
253 .expect("got null");
254
255 let event = DittoAttachmentFetchEvent::Deleted;
256 (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
257 }
258}