dittolive_ditto/ditto/init/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
use std::{
    ffi::CString,
    future::Future,
    os::raw::{c_uint, c_void},
    str::FromStr,
    sync::{Arc, Weak},
    time::Duration,
};

use async_fn_traits::AsyncFn2;
use async_trait::async_trait;
pub(crate) use config_or_identity::ConfigOrIdentity;
use extern_c::extern_c;
use ffi_sdk::{FsComponent, TransportConfigMode};

pub use self::config::{DittoConfig, DittoConfigConnect};
use crate::{
    ditto::DittoFields,
    identity::DittoAuthenticator,
    small_peer_info::SmallPeerInfo,
    utils::{make_continuation, prelude::*},
};

mod config;
mod config_or_identity;

impl Ditto {
    /// Open a new Ditto instance using a [`DittoConfig`]
    ///
    /// # Example
    ///
    /// ```
    /// # use dittolive_ditto::prelude::*;
    /// # async fn example() -> anyhow::Result<()> {
    /// // Load your database ID somehow, ENV is a good option
    /// let database_id = std::env::var("DITTO_DATABASE_ID")?;
    ///
    /// // Choose one of the following types of connection config
    /// let connect = DittoConfigConnect::Server {
    ///     url: "https://example.com/your-server-url".parse().unwrap(),
    /// };
    /// let connect = DittoConfigConnect::SmallPeersOnly {
    ///     private_key: Some("https://example.com/your-server-url".bytes().collect()),
    /// };
    /// let connect = DittoConfigConnect::SmallPeersOnly { private_key: None };
    ///
    /// let config = DittoConfig::new(database_id, connect);
    /// let ditto = Ditto::open(config).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn open(config: DittoConfig) -> Result<Ditto, DittoError> {
        let config_cbor =
            serde_cbor::to_vec(&config).expect("should serialize well-formed DittoConfig");
        let config_cbor_ref = (&*config_cbor).into();
        let (continuation, recv) = make_continuation();
        let default_root_dir = default_root_directory();
        let default_root_dir_ref = default_root_dir
            .as_deref()
            .and_then(|root| root.to_str())
            .ok_or_else(|| {
                DittoError::new(
                    ErrorKind::IO,
                    "Unable to resolve a default data directory on this platform".to_string(),
                )
            })?;
        let default_root_dir_cstring = CString::from_str(default_root_dir_ref)
            .expect("should construct CString from no-nulls &str");
        let default_root_dir_cstr = &*default_root_dir_cstring;
        let default_root_dir_charp: char_p::Ref<'_> = default_root_dir_cstr.into();

        // CLIPPY: continuation.into() needed to pass CI
        #[allow(clippy::useless_conversion)]
        ffi_sdk::dittoffi_ditto_open_async_throws(
            config_cbor_ref,
            TransportConfigMode::PlatformIndependent,
            default_root_dir_charp,
            continuation.into(),
        );

        let ffi_result = recv.await.unwrap();
        let ffi_ditto: repr_c::Box<ffi_sdk::Ditto> = ffi_result.into_rust_result()?;

        Self::finish_open(ffi_ditto, config)
    }

    /// Open a new Ditto instance using a [`DittoConfig`]
    ///
    /// This is a synchronous blocking variant of [`Ditto::open()`] that will wait until
    /// initialization is complete.
    ///
    /// # Example
    ///
    /// ```
    /// # use dittolive_ditto::prelude::*;
    /// # fn example() -> anyhow::Result<()> {
    /// // Load your database ID somehow, ENV is a good option
    /// let database_id = std::env::var("DITTO_DATABASE_ID")?;
    ///
    /// // Choose one of the following types of connection config
    /// let connect = DittoConfigConnect::Server {
    ///     url: "https://example.com/your-server-url".parse().unwrap(),
    /// };
    /// let connect = DittoConfigConnect::SmallPeersOnly {
    ///     private_key: Some("https://example.com/your-server-url".bytes().collect()),
    /// };
    /// let connect = DittoConfigConnect::SmallPeersOnly { private_key: None };
    ///
    /// let config = DittoConfig::new(database_id, connect);
    /// let ditto = Ditto::open_sync(config)?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn open_sync(config: DittoConfig) -> Result<Ditto, DittoError> {
        let config_cbor =
            serde_cbor::to_vec(&config).expect("should serialize well-formed DittoConfig");
        let config_cbor_ref = (&(*config_cbor)).into();

        let default_root_dir = default_root_directory();
        let default_root_dir_ref = default_root_dir
            .as_deref()
            .and_then(|root| root.to_str())
            .ok_or_else(|| {
                DittoError::new(
                    ErrorKind::IO,
                    "Unable to resolve a default data directory on this platform".to_string(),
                )
            })?;
        let default_root_dir_cstring = CString::from_str(default_root_dir_ref)
            .expect("should construct CString from no-nulls &str");
        let default_root_dir_cstr = &*default_root_dir_cstring;
        let default_root_dir_charp: char_p::Ref<'_> = default_root_dir_cstr.into();

        let ffi_ditto: repr_c::Box<ffi_sdk::Ditto> = ffi_sdk::dittoffi_ditto_open_throws(
            config_cbor_ref,
            TransportConfigMode::PlatformIndependent,
            default_root_dir_charp,
        )
        .into_rust_result()?;

        Self::finish_open(ffi_ditto, config)
    }

    fn finish_open(
        ffi_ditto: repr_c::Box<ffi_sdk::Ditto>,
        config: DittoConfig,
    ) -> Result<Ditto, DittoError> {
        let ditto: Arc<repr_c::Box<ffi_sdk::Ditto>> = Arc::new(ffi_ditto);
        let has_auth = matches!(&config.connect, DittoConfigConnect::Server { .. });
        #[allow(deprecated)]
        let site_id: SiteId = ffi_sdk::ditto_auth_client_get_site_id(&ditto);

        let ditto_root = {
            let ditto_root = if let Some(dir) = &config.persistence_directory {
                PersistentRoot::new(dir)?
            } else {
                let default_dir = default_root_directory().ok_or_else(|| {
                    DittoError::new(
                        ErrorKind::IO,
                        "failed to determine default Ditto persistence directory",
                    )
                })?;
                PersistentRoot::new(default_dir)?
            };
            Arc::new(ditto_root) as Arc<dyn DittoRoot>
        };
        let disk_usage = DiskUsage::new(ditto.retain(), FsComponent::Root);
        let small_peer_info = SmallPeerInfo::new(ditto.retain());
        let fields = Arc::new_cyclic(|weak_fields: &arc::Weak<_>| {
            let store = Store::new(ditto.retain(), weak_fields.clone());
            let sync = crate::sync::Sync::new(weak_fields.clone());
            let presence = Arc::new(Presence::new(weak_fields.clone()));

            DittoFields {
                ditto: ditto.retain(),
                has_auth,
                state: ConfigOrIdentity::Config(config),
                store,
                sync,
                site_id,
                ditto_root,
                presence,
                disk_usage,
                small_peer_info,
            }
        });

        let ditto = Ditto {
            fields,
            is_shut_down_able: true,
        };

        Ok(ditto)
    }
}

fn default_root_directory() -> Option<PathBuf> {
    std::env::current_exe()
        .ok()
        .and_then(|abspath| abspath.parent().map(|x| x.to_path_buf()))
}

impl DittoAuthenticator {
    /// Set a callback to notify the client when the authentication is expiring.
    ///
    /// When using `DittoConfigConnect::Server { .. }` mode, Ditto _requires_ you to register an
    /// "expiration handler" for authentication. This handler is called for an initial
    /// authentication and periodically thereafter when the current authentication is near to
    /// expiration.
    ///
    /// For more details about authentication, see the [Ditto Auth and Authorization docs][0].
    ///
    /// [0]: https://docs.ditto.live/sdk/latest/auth-and-authorization/cloud-authentication#login
    ///
    /// For more details about the expiration handler, see the [`DittoAuthExpirationHandler`] trait.
    ///
    /// # Example
    ///
    /// ```
    /// # #[allow(deprecated)]
    /// # use tracing::error;
    /// # use dittolive_ditto::prelude::*;
    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
    /// async fn sample_get_token() -> anyhow::Result<String> {
    ///     // e.g. reqwest::get("https://example.com/token").await?.text().await?;
    ///     Ok("token".to_string())
    /// }
    ///
    /// let auth = ditto
    ///     .auth()
    ///     .expect("Auth is available in Server connect mode");
    /// auth.set_expiration_handler(async |ditto: &Ditto, duration_remaining| {
    ///     let auth = ditto
    ///         .auth()
    ///         .expect("Auth is available in Server connect mode");
    ///
    ///     // Call your auth service to get a new token
    ///     let token = sample_get_token().await.unwrap();
    ///
    ///     // Where "my-provider" is the name of the Authentication Webhook
    ///     // you've configured on the Ditto Portal
    ///     let result = auth.login(&token, "my-provider");
    ///
    ///     if let Err(login_error) = result {
    ///         // Handle login error, e.g.:
    ///         error!("Failed to login: {}", login_error);
    ///     }
    /// });
    /// # Ok(())
    /// # }
    /// ```
    pub fn set_expiration_handler<F>(&self, handler: F)
    where
        F: DittoAuthExpirationHandler,
    {
        let Some(ditto) = self.ditto_fields.upgrade() else {
            error!("Failed to set expiration handler, Ditto has shut down");
            return;
        };

        let login_provider = make_login_provider(self.ditto_fields.clone(), Arc::new(handler));
        ffi_sdk::ditto_auth_set_login_provider(&ditto.ditto, Some(login_provider));
    }

    /// Clear the expiration handler, if set.
    pub fn clear_expiration_handler(&self) {
        let Some(ditto) = self.ditto_fields.upgrade() else {
            error!("Failed to clear expiration handler, Ditto has shut down");
            return;
        };

        ffi_sdk::ditto_auth_set_login_provider(&ditto.ditto, None);
    }
}

/// Trait describing types which can be used as an authentication expiration handler for Ditto.
///
/// When using `DittoConfigConnect::Server { .. }` mode, Ditto _requires_ you to register an
/// "expiration handler" for authentication. This handler is called for an initial authentication
/// and periodically thereafter when the current authentication is near to expiration.
///
/// This trait is implemented for async closures and for functions returning an
/// `impl Future<Output = ()>`, which take the expected arguments of an auth expiration handler.
///
/// Expiration handlers are expected to call `auth.login(...)` with a valid authentication token.
///
/// For more details about authentication, see the [Ditto Auth and Authorization docs][0].
///
/// [0]: https://docs.ditto.live/sdk/latest/auth-and-authorization/cloud-authentication#login
///
/// **NOTE**: Because expiration handlers are asynchronous, they must not block the thread.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// # use dittolive_ditto::prelude::*;
/// # fn example(ditto: &Ditto) {
/// let auth = ditto
///     .auth()
///     .expect("Auth is available for DittoConfigConnect::Server mode");
///
/// // Option 1: Use an async closure
/// auth.set_expiration_handler(async |ditto: &Ditto, duration| {
///     // Your authentication handler code here
/// });
///
/// // Option 2: Use a closure returning an async block
/// auth.set_expiration_handler(|ditto: &Ditto, duration| async {
///     // Your authentication handler code here
/// });
///
/// // Option 3: Use a custom type and trait impl
/// struct MyAuthHandler;
/// impl DittoAuthExpirationHandler for MyAuthHandler {
///     async fn on_expiration(&self, ditto: &Ditto, duration_remaining: Duration) {
///         // Your authentication handler code here
///     }
/// }
/// auth.set_expiration_handler(MyAuthHandler);
/// # }
/// ```
pub trait DittoAuthExpirationHandler: 'static + Send + Sync {
    fn on_expiration(
        &self,
        ditto: &Ditto,
        duration_remaining: Duration,
    ) -> impl Send + Future<Output = ()>;
}

impl<F> DittoAuthExpirationHandler for F
where
    F: 'static + Send + Sync,
    F: for<'r> AsyncFn2<&'r Ditto, Duration, Output = (), OutputFuture: Send>,
{
    async fn on_expiration(&self, ditto: &Ditto, duration_remaining: Duration) {
        self(ditto, duration_remaining).await
    }
}

#[async_trait]
pub(crate) trait DynDittoAuthExpirationHandler: 'static + Send + Sync {
    async fn dyn_on_expiration(&self, ditto: &Ditto, duration_remaining: Duration);
}

#[async_trait]
impl<F: DittoAuthExpirationHandler> DynDittoAuthExpirationHandler for F {
    async fn dyn_on_expiration(&self, ditto: &Ditto, duration_remaining: Duration) {
        self.on_expiration(ditto, duration_remaining).await
    }
}

impl DittoAuthExpirationHandler for dyn '_ + DynDittoAuthExpirationHandler {
    async fn on_expiration(&self, ditto: &Ditto, duration_remaining: Duration) {
        self.dyn_on_expiration(ditto, duration_remaining).await
    }
}

pub(crate) fn make_login_provider(
    ditto_fields: Weak<DittoFields>,
    auth_expiration_handler: Arc<dyn DynDittoAuthExpirationHandler>,
) -> repr_c::Box<ffi_sdk::LoginProvider> {
    struct LoginProviderCtx {
        ditto_fields: Weak<DittoFields>,
        auth_expiration_handler: Arc<dyn DynDittoAuthExpirationHandler>,
    }

    let login_provider_ctx = Arc::new(LoginProviderCtx {
        auth_expiration_handler,
        ditto_fields,
    });

    let ffi_ctx = Arc::as_ptr(&login_provider_ctx) as *mut c_void;
    let ffi_retain = Some(extern_c(|ctx: *mut c_void| unsafe {
        Arc::<LoginProviderCtx>::increment_strong_count(ctx.cast())
    }) as unsafe extern "C" fn(_));
    let ffi_release = Some(extern_c(|ctx: *mut c_void| unsafe {
        Arc::<LoginProviderCtx>::decrement_strong_count(ctx.cast())
    }) as unsafe extern "C" fn(_));

    // This callback is just a "trigger" to initiate the caller's authentication handler
    // The handler is async and spawned in a separate task
    let ffi_handler = extern_c(|ctx: *mut c_void, secs_remaining: c_uint| {
        let login_provider_ctx: &LoginProviderCtx = unsafe { &*ctx.cast() };
        let auth_expiration_handler = login_provider_ctx.auth_expiration_handler.retain();
        let Ok(ditto) = Ditto::upgrade(&login_provider_ctx.ditto_fields) else {
            error!("Failed to dispatch auth handler, Ditto has been shut down");
            return;
        };

        dispatch_auth_handler(
            auth_expiration_handler,
            ditto,
            Duration::from_secs(secs_remaining.into()),
        );
    });

    unsafe {
        ffi_sdk::ditto_auth_client_make_login_provider(
            ffi_ctx,
            ffi_retain,
            ffi_release,
            ffi_handler,
        )
    }
}

/// Dispatches the users's authentication expiration handler.
///
/// This function checks whether the current execution context is within a tokio runtime.
///
/// - If a tokio runtime is available, the handler is spawned onto a task.
/// - If no tokio runtime is available, a temporary current-thread runtime is created and the
///   dispatcher will block until the handler completes.
///
/// # Panics
///
/// This function will panic if:
///
/// - There is no current tokio runtime available, and
/// - We fail to create a temporary current-thread runtime
fn dispatch_auth_handler(
    auth_expiration_handler: Arc<dyn DynDittoAuthExpirationHandler>,
    ditto: Ditto,
    duration_remaining: Duration,
) {
    // Check if we're in a tokio runtime context
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => {
            // We have a tokio runtime, spawn the handler as usual
            handle.spawn(async move {
                auth_expiration_handler
                    .on_expiration(&ditto, duration_remaining)
                    .await;
            });
        }
        Err(_) => {
            // No tokio runtime available, create a temporary current-thread runtime
            warn!("No tokio runtime available for expiration handler. Creating temporary runtime.");

            match tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
            {
                Ok(rt) => {
                    rt.block_on(async move {
                        auth_expiration_handler
                            .on_expiration(&ditto, duration_remaining)
                            .await;
                    });
                }
                Err(e) => {
                    panic!(
                        "Failed to create tokio runtime for expiration handler: {}. Consider \
                         running within a tokio runtime context.",
                        e
                    );
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {

    #[test]
    fn test_runtime_detection_behavior() {
        // This test demonstrates the runtime detection behavior
        println!("Testing runtime detection...");

        // Check if we're in a tokio context (should be false in regular test)
        match tokio::runtime::Handle::try_current() {
            Ok(_handle) => {
                println!("✓ Tokio runtime detected - handlers will be spawned");
            }
            Err(_) => {
                println!("✗ No tokio runtime - will create temporary runtime");

                // Demonstrate temporary runtime creation (as done in make_login_provider)
                match tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                {
                    Ok(_rt) => {
                        println!("✓ Successfully created temporary runtime");
                    }
                    Err(e) => {
                        println!("✗ Failed to create temporary runtime: {}", e);
                    }
                }
            }
        }
    }

    #[tokio::test]
    async fn test_with_tokio_runtime_available() {
        // This test runs with a tokio runtime available
        println!("Testing with tokio runtime available...");

        // This should succeed
        match tokio::runtime::Handle::try_current() {
            Ok(_handle) => {
                println!("✓ Tokio runtime is available for async handlers");
            }
            Err(_) => {
                panic!("Expected tokio runtime to be available in tokio::test");
            }
        }

        println!("✓ Runtime detection works correctly in async context");
    }

    #[test]
    fn test_improved_error_handling() {
        // Test that we can create the same type of runtime as the implementation
        let rt_result = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build();

        match rt_result {
            Ok(rt) => {
                println!("✓ Temporary runtime creation works");

                // Demonstrate that we can use it for async work
                rt.block_on(async {
                    println!("✓ Async work executes successfully in temporary runtime");
                });
            }
            Err(e) => {
                println!("✗ Failed to create runtime: {}", e);
            }
        }
    }
}