1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! [`Subscription`]s are used to register interest in receiving updates
//! for specified [`DittoDocument`]s.

use_prelude!();

use std::sync::Arc;

use ffi_sdk::COrderByParam;

use crate::{
    ditto::{DittoHandleWrapper, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
};

/// A handle for a Ditto Subscription to documents in a collection matching a
/// query.
///
/// When the Subscription is dropped, it will automatically trigger a clean up
/// of itself.
pub struct Subscription {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) query: char_p::Box,
    pub(super) query_args: Option<Vec<u8>>,
    pub(super) order_by: Vec<(String, QuerySortDirection)>,
    pub(super) limit: i32,
    pub(super) offset: u32,
}

impl Subscription {
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        ditto: DittoHandleWrapper,
        collection_name: char_p::Box,
        query: &str,
        query_args: Option<Vec<u8>>,
        order_by: &'_ [COrderByParam<'_>],
        limit: i32,
        offset: u32,
    ) -> Self {
        let query = char_p::new(query);
        {
            let code = ffi_sdk::ditto_add_subscription(
                &ditto,
                collection_name.as_ref(),
                query.as_ref(),
                query_args.as_ref().map(|qa| (&qa[..]).into()),
                order_by.into(),
                limit,
                offset,
            );
            if code != 0 {
                ::log::error!(
                    "Error adding subscription for collection {} query {}. Error: {}",
                    &collection_name,
                    &query,
                    DittoError::from_ffi(ErrorKind::InvalidInput)
                );
            }
        };
        let order_by: Vec<(String, QuerySortDirection)> = order_by
            .iter()
            // TODO: Doing `o.query_c_str.to_owned().into_string()` fails with
            // an InvalidNulTerminator error, but AFAICT it should be equivalent
            // to the below?
            .map(|o| (o.query_c_str.to_str_with_null().to_string(), o.direction))
            .collect();

        Subscription {
            ditto: Arc::downgrade(&ditto),
            collection_name,
            query,
            query_args,
            order_by,
            limit,
            offset,
        }
    }
}

impl crate::observer::Observer for Subscription {}

impl Drop for Subscription {
    fn drop(&mut self) {
        ::log::debug!(
            "Dropping subscription for collection {} query {}",
            &self.collection_name,
            &self.query
        );
        if let Some(ditto) = self.ditto.upgrade() {
            {
                let order_by = self
                    .order_by
                    .iter()
                    .map(|query_and_direction| COrderByParam {
                        query_c_str: query_and_direction
                            .0
                            .as_str()
                            .try_into()
                            .expect("valid string"),
                        direction: query_and_direction.1,
                    })
                    .collect::<Vec<COrderByParam<'_>>>();

                let code = ffi_sdk::ditto_remove_subscription(
                    &ditto,
                    self.collection_name.as_ref(),
                    self.query.as_ref(),
                    self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                    order_by[..].into(),
                    self.limit,
                    self.offset,
                );
                if code != 0 {
                    ::log::error!(
                        "Error removing subscription for collection {} query {}. Error: {}",
                        &self.collection_name,
                        &self.query,
                        DittoError::from_ffi(ErrorKind::InvalidInput)
                    );
                }
            }
        }
    }
}