Skip to content

Commit

Permalink
Naive lock to prevent any concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
hatchan committed Jul 15, 2024
1 parent d23c0ab commit c0458ba
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
53 changes: 38 additions & 15 deletions fpx/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,32 @@ use std::collections::BTreeMap;
use std::fmt::Display;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{Mutex, MutexGuard};
use tracing::{error, instrument};

pub mod migrations;
pub mod models;
#[cfg(test)]
mod tests;

pub struct Transaction(libsql::Transaction);
pub struct Transaction<'a> {
tx: libsql::Transaction,
guard: MutexGuard<'a, ()>,
}

impl Deref for Transaction {
impl<'a> Transaction<'a> {
pub fn new(tx: libsql::Transaction, guard: MutexGuard<'a, ()>) -> Self {
Self { tx, guard }
}
}

impl<'a> Deref for Transaction<'a> {
type Target = libsql::Transaction;

fn deref(&self) -> &Self::Target {
&self.0
&self.tx
}
}

Expand All @@ -32,6 +43,7 @@ impl Deref for Transaction {
/// not have to do do anything there.
#[derive(Clone)]
pub struct Store {
lock: Arc<Mutex<()>>,
connection: Connection,
}

Expand Down Expand Up @@ -72,28 +84,39 @@ impl Store {
.connect()
.with_context(|| format!("failed to connect to libSQL database: {}", path))?;

Ok(Store { connection })
Ok(Store {
connection,
lock: Arc::new(Mutex::new(())),
})
}

pub async fn in_memory() -> Result<Self> {
Self::open(DataPath::InMemory).await
}

pub async fn start_transaction(&self) -> Result<Transaction, DbError> {
self.connection
let guard = self.lock.lock().await;

let tx = self
.connection
.transaction()
.await
.map(Transaction)
.map_err(DbError::InternalError)
.map_err(DbError::InternalError)?;

Ok(Transaction::new(tx, guard))
}

pub async fn commit_transaction(&self, tx: Transaction) -> Result<(), DbError> {
tx.0.commit().await.map_err(DbError::InternalError)
pub async fn commit_transaction(&self, tx: Transaction<'_>) -> Result<(), DbError> {
let result = tx.tx.commit().await.map_err(DbError::InternalError);

drop(tx.guard);

result
}

#[tracing::instrument(skip_all)]
pub async fn request_create(
tx: &Transaction,
tx: &Transaction<'_>,
method: &str,
url: &str,
body: &str,
Expand All @@ -115,12 +138,12 @@ impl Store {
}

#[tracing::instrument(skip_all)]
pub async fn request_list(_tx: &Transaction) -> Result<Vec<Request>> {
pub async fn request_list(_tx: &Transaction<'_>) -> Result<Vec<Request>> {
todo!()
}

#[tracing::instrument(skip_all)]
pub async fn request_get(&self, tx: &Transaction, id: i64) -> Result<Request, DbError> {
pub async fn request_get(&self, tx: &Transaction<'_>, id: i64) -> Result<Request, DbError> {
let request: models::Request = tx
.query("SELECT * FROM requests WHERE id = ?", params!(id))
.await?
Expand All @@ -135,7 +158,7 @@ impl Store {
#[instrument(skip(self, tx, span))]
pub async fn span_create(
&self,
tx: &Transaction,
tx: &Transaction<'_>,
span: models::Span,
) -> Result<models::Span, DbError> {
let span = tx
Expand Down Expand Up @@ -194,7 +217,7 @@ impl Store {
#[instrument(skip(self, tx))]
pub async fn span_get(
&self,
tx: &Transaction,
tx: &Transaction<'_>,
trace_id: Vec<u8>,
span_id: Vec<u8>,
) -> Result<models::Span, DbError> {
Expand All @@ -214,7 +237,7 @@ impl Store {
#[instrument(skip(self, tx))]
pub async fn span_list_by_trace(
&self,
tx: &Transaction,
tx: &Transaction<'_>,
trace_id: Vec<u8>,
) -> Result<Vec<models::Span>, DbError> {
let span = tx
Expand Down
4 changes: 2 additions & 2 deletions fpx/src/data/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn migrate(store: &Store) -> Result<()> {
}

/// Create the new migrations table if it does not exist.
async fn migrations_bootstrap(tx: &Transaction) -> Result<()> {
async fn migrations_bootstrap(tx: &Transaction<'_>) -> Result<()> {
// First check if the migrations table exist
let sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='_fpx_migrations'";
let mut results = tx
Expand Down Expand Up @@ -109,7 +109,7 @@ async fn migrations_bootstrap(tx: &Transaction) -> Result<()> {
}

/// List already applied migrations.
async fn migrations_list(tx: &Transaction) -> Result<Vec<String>> {
async fn migrations_list(tx: &Transaction<'_>) -> Result<Vec<String>> {
let mut results = vec![];

let sql = "SELECT name, created_at FROM _fpx_migrations ORDER BY name ASC";
Expand Down
41 changes: 41 additions & 0 deletions fpx/src/data/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use crate::models::SpanKind;
use libsql::params;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::time::Duration;

Check warning on line 7 in fpx/src/data/tests.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused import: `std::time::Duration`
use test_log::test;
use tokio::join;
use tokio::time::sleep;

Check warning on line 10 in fpx/src/data/tests.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused import: `tokio::time::sleep`
use tracing::info;

/// Initialize a in memory database, and run the migrations on it.
async fn create_test_database() -> Store {
Expand Down Expand Up @@ -99,3 +103,40 @@ async fn test_create_span() {
let span = store.span_create(&tx, span).await.unwrap();
assert_eq!(span.kind, SpanKind::Server);
}

#[test(tokio::test(flavor = "multi_thread"))]
async fn test_concurrent_transactions() {
let store = create_test_database().await;

let store1 = store.clone();
let tx1_task = tokio::spawn(async move {
info!("Starting tx1");
let tx1 = store1
.start_transaction()
.await
.expect("Failed to start tx1");
// sleep(Duration::from_secs(1)).await;
store1
.commit_transaction(tx1)
.await
.expect("Failed to commit tx1");
});

let store2 = store.clone();
let tx2_task = tokio::spawn(async move {
info!("Starting tx2");
let tx2 = store2
.start_transaction()
.await
.expect("Failed to start tx2");
// sleep(Duration::from_secs(1)).await;
store2
.commit_transaction(tx2)
.await
.expect("Failed to commit tx2");
});

let (result1, result2) = join!(tx1_task, tx2_task);
assert!(result1.is_ok());
assert!(result2.is_ok());
}
2 changes: 1 addition & 1 deletion scripts/otel_collector/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ service:
pipelines:
traces:
receivers: [otlp]
exporters: [debug, otlphttp]
exporters: [debug, otlphttp, otlp]

telemetry:
logs:
Expand Down

0 comments on commit c0458ba

Please sign in to comment.