1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::{
    cmp::Ordering,
    hash::{self, Hash},
    sync::{Arc, Weak},
};

use crate::{
    ditto::{Ditto, DittoFields},
    error::DittoError,
    store::{Query, QueryArguments},
    utils::extension_traits::FfiResultIntoRustResult,
};

/// Create a replication subscription to receive updates from remote peers about
/// documents matching the replication subscription's query.
///
/// The replication subscription will remain active until
/// [`SyncSubscription::cancel`] is called or the replication subscription object
/// goes out of scope.
pub struct SyncSubscription {
    ditto: Weak<DittoFields>,
    pub(crate) query: Query,
    pub(crate) query_args: Option<QueryArguments>,
}

impl ::core::fmt::Display for SyncSubscription {
    fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
        self.query.inner_string.fmt(f)
    }
}

impl SyncSubscription {
    pub(crate) fn new(
        ditto: &Ditto,
        query: Query,
        query_args: Option<QueryArguments>,
    ) -> Result<Self, DittoError> {
        // start replication
        ffi_sdk::dittoffi_try_experimental_add_dql_subscription(
            &ditto.ditto,
            query.prepare_ffi(),
            query_args.as_ref().map(|qa| qa.cbor().into()),
        )
        .into_rust_result()?;

        // create object
        Ok(Self {
            ditto: Arc::downgrade(&ditto.fields),
            query,
            query_args,
        })
    }

    /// Cancels the [`SyncSubscription`] so that new changes matching the query are
    /// no longer received from other peers. No-op if it's already cancelled or
    /// the owning [`Ditto`](crate::ditto::Ditto) object goes out of scope.
    pub fn cancel(&self) {
        if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
            ditto.sync.unregister_subscription(self);
        }
    }

    pub fn is_cancelled(&self) -> bool {
        Ditto::upgrade(&self.ditto).map_or(true, |ditto| ditto.sync.subscriptions().contains(self))
    }
}

impl Drop for SyncSubscription {
    fn drop(&mut self) {
        self.cancel()
    }
}

impl SyncSubscription {
    fn comparable_parts(&self) -> impl '_ + Ord + Hash {
        (&self.query, &self.query_args)
    }
}

impl Ord for SyncSubscription {
    fn cmp(self: &Self, other: &Self) -> Ordering {
        Ord::cmp(&self.comparable_parts(), &other.comparable_parts())
    }
}

impl PartialOrd for SyncSubscription {
    fn partial_cmp(self: &Self, other: &Self) -> Option<Ordering> {
        Some(Ord::cmp(self, other))
    }
}

impl Eq for SyncSubscription {}
impl PartialEq for SyncSubscription {
    fn eq(self: &Self, other: &Self) -> bool {
        self.comparable_parts() == other.comparable_parts()
    }
}

impl Hash for SyncSubscription {
    fn hash<H: hash::Hasher>(&self, h: &mut H) {
        self.comparable_parts().hash(h)
    }
}