1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! [`Subscription`]s are used to register interest in receiving updates
//! for specified [`DittoDocument`]s.

use_prelude!();

use std::sync::Arc;

use ffi_sdk::COrderByParam;
use tracing::{debug, error};

use crate::{
    ditto::{DittoHandleWrapper, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
};

/// Use [`cursor_op.subscribe()`] or [`id_op.subscribe()`] to sync documents from remote peers.
///
/// Holding a `Subscription` will cause documents matching the query to continue to sync
/// from remote peers.
///
/// Dropping a `Subscription` will cancel the sync of this query's documents from remote peers.
///
/// [`cursor_op.subscribe()`]: crate::store::query_builder::PendingCursorOperation::subscribe
/// [`id_op.subscribe()`]: crate::store::query_builder::PendingIdSpecificOperation::subscribe
#[must_use = "Dropping a `Subscription` will cancel it"]
pub struct Subscription {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) query: char_p::Box,
    pub(super) query_args: Option<Vec<u8>>,
    pub(super) order_by: Vec<(String, QuerySortDirection)>,
    pub(super) limit: i32,
    pub(super) offset: u32,
}

impl Subscription {
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        ditto: DittoHandleWrapper,
        collection_name: char_p::Box,
        query: &str,
        query_args: Option<Vec<u8>>,
        order_by: &'_ [COrderByParam<'_>],
        limit: i32,
        offset: u32,
    ) -> Self {
        let query = char_p::new(query);
        {
            let code = ffi_sdk::ditto_add_subscription(
                &ditto,
                collection_name.as_ref(),
                query.as_ref(),
                query_args.as_ref().map(|qa| (&qa[..]).into()),
                order_by.into(),
                limit,
                offset,
            );
            if code != 0 {
                error!(
                    collection = %collection_name,
                    %query,
                    error = %DittoError::from_ffi(ErrorKind::InvalidInput),
                    "error adding subscription",
                );
            }
        };
        let order_by: Vec<(String, QuerySortDirection)> = order_by
            .iter()
            // TODO: Doing `o.query_c_str.to_owned().into_string()` fails with
            // an InvalidNulTerminator error, but AFAICT it should be equivalent
            // to the below?
            .map(|o| (o.query_c_str.to_str_with_null().to_string(), o.direction))
            .collect();

        Subscription {
            ditto: Arc::downgrade(&ditto),
            collection_name,
            query,
            query_args,
            order_by,
            limit,
            offset,
        }
    }
}

#[allow(deprecated)]
impl crate::observer::Observer for Subscription {}

impl Drop for Subscription {
    fn drop(&mut self) {
        debug!(
            collection = %self.collection_name,
            query = %self.query,
            "dropping subscription",
        );
        if let Some(ditto) = self.ditto.upgrade() {
            {
                let order_by = self
                    .order_by
                    .iter()
                    .map(|query_and_direction| COrderByParam {
                        query_c_str: query_and_direction
                            .0
                            .as_str()
                            .try_into()
                            .expect("valid string"),
                        direction: query_and_direction.1,
                    })
                    .collect::<Vec<COrderByParam<'_>>>();

                let code = ffi_sdk::ditto_remove_subscription(
                    &ditto,
                    self.collection_name.as_ref(),
                    self.query.as_ref(),
                    self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                    order_by[..].into(),
                    self.limit,
                    self.offset,
                );
                if code != 0 {
                    error!(
                        collection = %self.collection_name,
                        query = %self.query,
                        error = %DittoError::from_ffi(ErrorKind::InvalidInput),
                        "error removing subscription",
                    );
                }
            }
        }
    }
}