dittolive_ditto/store/transactions/
api.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
use crate::dql::QueryResult;

use_prelude!();

impl Store {
    /// Executes multiple DQL queries within a single atomic transaction.
    ///
    /// ```rust,no_run
    /// # use dittolive_ditto::prelude::*;
    /// # use serde_json::json;
    /// # #[tokio::main]
    /// # async fn main() {
    /// # let ditto: Ditto = todo!();
    /// ditto.store().transaction(async |txn| {
    ///   txn.execute((
    ///     "INSERT INTO users DOCUMENTS (:doc)",
    ///     json!({"name": "alice"}),
    ///   )).await?;
    ///
    ///   txn.execute((
    ///     "INSERT INTO users DOCUMENTS (:doc)",
    ///     json!({"name": "bob"}),
    ///   )).await?;
    ///
    ///   Ok::<_, DittoError>(TransactionCompletionAction::Commit)
    /// }).await.unwrap();
    /// # }
    /// ```
    /// The closure returns a [`Result<T, E>`][core::result::Result], and in the success case, this
    /// value will be returned from the call to
    /// [`ditto.store().transaction()`][Store::transaction]. However, if the value is a
    /// [`TransactionCompletionAction`] (i.e. if `T = TransactionCompletionAction`), then this
    /// value is used to determine the behaviour of the transaction:
    /// - if the value is [`TransactionCompletionAction::Commit`], the transaction is committed
    /// - if the value is [`TransactionCompletionAction::Rollback`], the transaction is rolled back
    /// - if the value is any other type, the transaction is implicitly committed
    ///
    /// See the "errors" section below for more information on the error case.
    ///
    /// This ensures that either all statements are executed successfully, or none are executed at
    /// all, providing strong consistency guarantees. Certain mesh configurations may impose
    /// limitations on these guarantees. For more details, refer to the [Ditto
    /// documentation](https://docs.ditto.live/sdk/latest/crud/transactions). Transactions are
    /// initiated as read-write transactions by default, and only a single read-write transaction
    /// is being executed at any given time. Any other read-write transaction started concurrently
    /// will wait until the current transaction has been committed or rolled back. Therefore, it is
    /// crucial to make sure a transaction finishes as early as possible so other read-write
    /// transactions aren't blocked for a long time.
    ///
    /// A transaction can also be configured to be read-only, or given an option debugging `hint`
    /// via [`ditto.store.transaction_with_options`][Store::transaction_with_options]. See the docs
    /// for more information.
    ///
    /// # Errors
    ///
    /// See the [module-level docs][module] for more details on error types in transactions.
    ///
    /// If errors occur in an [`execute()`][Transaction::execute] call within a transaction block,
    /// this error can be handled like a normal [`Result`], and the transaction will continue
    /// without being rolled back. If the transaction callback itself returns an [`Err`], the
    /// transaction is implicitly rolled back and the error is propagated to the called.
    ///
    /// For a complete guide on transactions, please refer to the [Ditto
    /// documentation](https://docs.ditto.live/sdk/latest/crud/transactions).
    ///
    /// See also - [`Transaction`]
    ///
    /// [module]: crate::store::transactions
    pub async fn transaction<T, E>(
        &self,
        scope: impl AsyncFnOnce(&Transaction) -> Result<T, E>,
    ) -> Result<T, E>
    where
        T: core::any::Any,
        E: From<DittoError>,
    {
        let options = CreateTransactionOptions::default();
        self.transaction_with_options(options, scope).await
    }

    /// Create a transaction with the provided options.
    ///
    /// ```rust,no_run
    /// # use dittolive_ditto::prelude::*;
    /// # use serde_json::json;
    /// # #[tokio::main]
    /// # async fn main() {
    /// # let ditto: Ditto = todo!();
    /// let mut opts = CreateTransactionOptions::new();
    /// opts.hint = Some("debug transaction name");
    /// opts.is_read_only = false;
    ///
    /// ditto.store().transaction_with_options(opts, async |txn| {
    ///   txn.execute((
    ///     "INSERT INTO users DOCUMENTS (:doc)",
    ///     json!({"name": "alice"}),
    ///   )).await?;
    ///
    ///   txn.execute((
    ///     "INSERT INTO users DOCUMENTS (:doc)",
    ///     json!({"name": "bob"}),
    ///   )).await?;
    ///
    ///   Ok::<_, DittoError>(TransactionCompletionAction::Commit)
    /// }).await.unwrap();
    /// # }
    /// ```
    /// If [`is_read_only`][CreateTransactionOptions::is_read_only] is `true`, mutating DQL queries
    /// will error, even if no actual mutation would have occurred.
    ///
    /// See [`ditto.store().transaction()`][Store::transaction] for more documentation on the
    /// behaviour of transactions in general.
    pub async fn transaction_with_options<T, E>(
        &self,
        options: CreateTransactionOptions<'_>,
        scope: impl AsyncFnOnce(&Transaction) -> Result<T, E>,
    ) -> Result<T, E>
    where
        T: core::any::Any,
        E: From<DittoError>,
    {
        let hint = options.hint.map(char_p::new);

        let options = ffi_sdk::BeginTransactionOptions {
            hint: hint.as_ref().map(|s| s.as_ref()),
            is_read_only: options.is_read_only,
        };

        self._transaction_with_options(options, scope).await
    }
}

/// Options for customizing a transaction. Used with [`Store::transaction_with_options`].
///
/// ```rust,no_run
/// # use dittolive_ditto::prelude::*;
/// # #[tokio::main]
/// # async fn main() {
/// # let ditto: Ditto = todo!();
/// let mut options = CreateTransactionOptions::new();
/// options.hint = Some("my transaction name");
/// options.is_read_only = true;
///
/// ditto
///     .store()
///     .transaction_with_options(options, async |txn| {
///         // do transaction stuff
///         Ok::<_, DittoError>(TransactionCompletionAction::Commit)
///     })
///     .await
///     .unwrap();
/// # }
/// ```
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub struct CreateTransactionOptions<'hint> {
    /// A hint used for debugging and logging
    pub hint: Option<&'hint str>,

    /// Whether the transaction should be created read-only
    pub is_read_only: bool,
}

#[allow(
    clippy::derivable_impls,
    reason = "writing it out makes it clearer what the defaults are"
)]
impl Default for CreateTransactionOptions<'_> {
    fn default() -> Self {
        Self {
            hint: None,
            is_read_only: false,
        }
    }
}

impl CreateTransactionOptions<'_> {
    /// Create a new, default [`CreateTransactionOptions`].
    pub fn new() -> Self {
        Self::default()
    }
}

/// Encapsulates information about a transaction.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub struct TransactionInfo {
    /// A globally unique ID of the transaction.
    pub id: String,

    /// The user hint passed when creating the transaction, useful for debugging
    /// and testing.
    pub hint: Option<String>,

    /// Indicates whether mutating DQL statements can be executed in the transaction. Defaults to
    /// `false`. See [`ditto.store().transaction()`][Store::transaction] for more information.
    pub is_read_only: bool,
}

/// Represents an action that completes a transaction, by either committing it or
/// rolling it back.
#[derive(Debug, Clone, PartialEq)]
#[must_use = "Code should handle whether a transaction succeeded"]
pub enum TransactionCompletionAction {
    /// Represents the action of committing a transaction.
    Commit,

    /// Represents the action of rolling back a transaction.
    Rollback,
}

/// Represents a transaction in the Ditto store.
///
/// A [`Transaction`] is used to group multiple operations into a single atomic unit. This ensures
/// that either all operations within the transaction are applied, or none of them are, maintaining
/// the integrity of the data.
///
/// Please consult the documentation of [`ditto.store().transaction()`][Store::transaction] or the
/// [module-level docs][module] for more information on how to create and use transactions. For a
/// complete guide on transactions, please refer to the [Ditto documentation][docs]
///
/// [module]: crate::store::transactions
/// [docs]: https://docs.ditto.live/sdk/latest/crud/transactions
pub struct Transaction {
    pub(crate) ptr: repr_c::Box<ffi_sdk::FfiTransaction>,
}

impl Transaction {
    /// Information about the current transaction.
    pub fn info(&self) -> TransactionInfo {
        self._info()
    }

    /// Executes a DQL query and returns matching items as a [`QueryResult`].
    ///
    /// Note that this method only returns results from the local store without waiting for any
    /// [`SyncSubscription`][crate::sync::SyncSubscription]s to have caught up with the latest
    /// changes. Only use this method if your program must proceed with immediate results. Use a
    /// [`StoreObserver`][crate::store::StoreObserver] (obtained from
    /// [`ditto.store().register_observer()`][crate::store::Store::register_observer]) to receive
    /// updates to query results as soon as they have been synced to this peer.
    pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
    where
        Q: IntoQuery,
        Q::Args: serde::Serialize,
    {
        self._execute(query).await
    }
}