use_prelude!();
use core::ffi::c_void;
use std::sync::{
atomic::{self, AtomicU64},
Arc, Weak,
};
use ffi_sdk::BoxedAttachmentHandle;
use tracing::error;
use crate::{
ditto::{DittoHandleWrapper, WeakDittoHandleWrapper},
error::DittoError,
store::{
ditto_attachment::DittoAttachment, ditto_attachment_fetch_event::DittoAttachmentFetchEvent,
ditto_attachment_token::DittoAttachmentToken, DittoFields,
},
};
#[allow(nonstandard_style)]
pub mod FetcherVersion {
pub enum V1 {}
pub enum V2 {}
mod seal {
pub trait Sealed: 'static {}
}
pub(crate) use seal::Sealed;
impl Sealed for V1 {}
impl Sealed for V2 {}
}
pub struct DittoAttachmentFetcher<'a, Version: FetcherVersion::Sealed = FetcherVersion::V1> {
pub(crate) context: Arc<DittoAttachmentFetcherCtx<'a>>,
_phantom: ::core::marker::PhantomData<fn() -> Version>,
}
pub(crate) type DittoAttachmentFetcherV2 = DittoAttachmentFetcher<'static, FetcherVersion::V2>;
impl Clone for DittoAttachmentFetcherV2 {
fn clone(&self) -> Self {
Self {
context: self.context.retain(),
..*self
}
}
}
impl<'a, Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'a, Version> {
pub(crate) fn new(
token: DittoAttachmentToken,
ditto: Option<&Arc<DittoFields>>,
raw_ditto: &DittoHandleWrapper,
on_fetch_event: impl 'a + Send + Sync + Fn(DittoAttachmentFetchEvent, &AtomicU64),
) -> Result<Self, DittoError> {
let context = Arc::new(DittoAttachmentFetcherCtx {
token: token.clone(),
ditto: ditto.map_or_else(::std::sync::Weak::new, Arc::downgrade),
raw_ditto: Arc::downgrade(raw_ditto),
on_fetch_event: Box::new(on_fetch_event),
cancel_token: 0.into(),
});
let raw_context = Arc::as_ptr(&context) as *mut c_void;
let cancel_token = unsafe {
ffi_sdk::ditto_resolve_attachment(
raw_ditto,
token.id.as_ref().into(),
raw_context,
Some(DittoAttachmentFetcherCtx::retain),
Some(DittoAttachmentFetcherCtx::release),
DittoAttachmentFetcherCtx::on_complete_cb,
DittoAttachmentFetcherCtx::on_progress_cb,
DittoAttachmentFetcherCtx::on_deleted_cb,
)
.ok()?
};
context
.cancel_token
.store(cancel_token as _, atomic::Ordering::Relaxed);
Ok(Self {
context,
_phantom: ::core::marker::PhantomData,
})
}
}
impl<Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'_, Version> {
pub(crate) fn cancel_token(&self) -> u64 {
self.context.cancel_token.load(atomic::Ordering::SeqCst)
}
}
impl DittoAttachmentFetcherV2 {
pub(crate) fn cancel_token_ensure_unique(&self) -> (u64, bool) {
let mut cancel_token = self.cancel_token();
let was_zero = cancel_token == 0;
if was_zero {
static FALLBACK_UNIQUE_CANCEL_TOKEN: AtomicU64 = AtomicU64::new(u64::MAX - 1);
cancel_token =
FALLBACK_UNIQUE_CANCEL_TOKEN.fetch_sub(1, ::std::sync::atomic::Ordering::Relaxed);
self.context
.cancel_token
.store(cancel_token, atomic::Ordering::SeqCst);
};
(cancel_token, was_zero)
}
}
impl DittoAttachmentFetcherV2 {
pub fn cancel(&self) {
if let Some(ditto) = self.context.ditto.upgrade() {
ditto.store.unregister_fetcher(self.cancel_token(), None);
}
}
}
#[allow(deprecated)]
impl<'a> crate::observer::Observer for DittoAttachmentFetcher<'a, FetcherVersion::V1> {}
impl<'a, V1: FetcherVersion::Sealed> Drop for DittoAttachmentFetcher<'a, V1> {
fn drop(&mut self) {
use core::any::TypeId;
if TypeId::of::<V1>() == TypeId::of::<FetcherVersion::V1>() {
if let Some(ditto) = self.context.raw_ditto.upgrade() {
let status = ffi_sdk::ditto_cancel_resolve_attachment(
&ditto,
self.context.token.id.as_ref().into(),
self.cancel_token(),
);
if status != 0 {
error!("failed to clean up attachment fetcher");
}
}
}
}
}
pub(crate) struct DittoAttachmentFetcherCtx<'a> {
pub(crate) token: DittoAttachmentToken,
ditto: Weak<DittoFields>,
raw_ditto: WeakDittoHandleWrapper,
#[allow(clippy::type_complexity)]
on_fetch_event: Box<dyn Fn(DittoAttachmentFetchEvent, &AtomicU64) + Send + Sync + 'a>,
cancel_token: atomic::AtomicU64,
}
impl<'a> DittoAttachmentFetcherCtx<'a> {
pub(crate) unsafe extern "C" fn retain(ctx: *mut c_void) {
let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
Arc::increment_strong_count(ptr);
}
pub(crate) unsafe extern "C" fn release(ctx: *mut c_void) {
let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
Arc::decrement_strong_count(ptr);
}
pub(crate) unsafe extern "C" fn on_complete_cb(
ctx: *mut c_void,
attachment_handle: BoxedAttachmentHandle,
) {
let ctx_ref = ctx
.cast::<DittoAttachmentFetcherCtx<'_>>()
.as_ref()
.expect("got null");
let ditto_attachment = DittoAttachment::new_with_token(
ctx_ref.token.clone(),
ctx_ref.raw_ditto.clone(),
attachment_handle,
);
let event = DittoAttachmentFetchEvent::Completed {
attachment: ditto_attachment,
};
(ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
}
pub(crate) unsafe extern "C" fn on_progress_cb(
ctx: *mut c_void,
downloaded_bytes: u64,
total_bytes: u64,
) {
let ctx_ref = ctx
.cast::<DittoAttachmentFetcherCtx<'_>>()
.as_ref()
.expect("got null");
let event = DittoAttachmentFetchEvent::Progress {
downloaded_bytes,
total_bytes,
};
(ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
}
pub(crate) unsafe extern "C" fn on_deleted_cb(ctx: *mut c_void) {
let ctx_ref = ctx
.cast::<DittoAttachmentFetcherCtx<'_>>()
.as_ref()
.expect("got null");
let event = DittoAttachmentFetchEvent::Deleted;
(ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
}
}