Skip to content

Commit

Permalink
DocumentDB support (#706)
Browse files Browse the repository at this point in the history
* insert event draft

* abstract change event

* Added documentdb delete event

* Added support to change event drop

* Added support to dropDatabase Event

* - MongoDB v6.0 Change Event Fields removed
- ChangeEvent enum tagged
- AnyDocument common type created

* replace event support

* added support to invalidate event in documentdb

* Adding DocumentDB Rename event.

* run cargo fmt

* Excluding 'to' parameter

* Add DocumentDB Update event

* fixed 'to' parameter and run cargo fmt

* Refactoring 'Rename' event declaration as a single type not a commum type

* InsertNs renamed to DatabaseCollection for code reuse

* unused field removed

* cfg fix

* fix lines

* fmt and makefile fixed

* makefile reord

---------

Co-authored-by: nich.morgan <[email protected]>
Co-authored-by: erso <[email protected]>
Co-authored-by: Luca Barcelos <[email protected]>
Co-authored-by: Vinicius Brisotti <[email protected]>
Co-authored-by: Pedro Rabello Sato <[email protected]>
Co-authored-by: darwish <[email protected]>
  • Loading branch information
7 people authored Oct 19, 2023
1 parent bcd3f97 commit 2675fa9
Show file tree
Hide file tree
Showing 23 changed files with 533 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ check-event-features:
cargo test --package aws_lambda_events --no-default-features --features cognito
cargo test --package aws_lambda_events --no-default-features --features config
cargo test --package aws_lambda_events --no-default-features --features connect
cargo test --package aws_lambda_events --no-default-features --features documentdb
cargo test --package aws_lambda_events --no-default-features --features dynamodb
cargo test --package aws_lambda_events --no-default-features --features ecr_scan
cargo test --package aws_lambda_events --no-default-features --features eventbridge
cargo test --package aws_lambda_events --no-default-features --features firehose
cargo test --package aws_lambda_events --no-default-features --features iam
cargo test --package aws_lambda_events --no-default-features --features iot
Expand All @@ -101,7 +103,6 @@ check-event-features:
cargo test --package aws_lambda_events --no-default-features --features sns
cargo test --package aws_lambda_events --no-default-features --features sqs
cargo test --package aws_lambda_events --no-default-features --features streams
cargo test --package aws_lambda_events --no-default-features --features eventbridge

fmt:
cargo +nightly fmt --all
2 changes: 2 additions & 0 deletions lambda-events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ default = [
"sns",
"sqs",
"streams",
"documentdb",
"eventbridge",
]

Expand Down Expand Up @@ -118,4 +119,5 @@ ses = ["chrono"]
sns = ["chrono", "serde_with"]
sqs = ["serde_with"]
streams = []
documentdb = []
eventbridge = ["chrono", "serde_with"]
44 changes: 44 additions & 0 deletions lambda-events/src/event/documentdb/events/commom_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde_json::Value;

pub type AnyDocument = HashMap<String, Value>;

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DatabaseCollection {
db: String,
#[serde(default)]
coll: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentId {
#[serde(rename = "_data")]
pub data: String,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentKeyIdOid {
#[serde(rename = "$oid")]
pub oid: String,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentKeyId {
#[serde(rename = "_id")]
pub id: DocumentKeyIdOid,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct InnerTimestamp {
t: usize,
i: usize,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct Timestamp {
#[serde(rename = "$timestamp")]
pub timestamp: InnerTimestamp,
}
20 changes: 20 additions & 0 deletions lambda-events/src/event/documentdb/events/delete_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDeleteEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
17 changes: 17 additions & 0 deletions lambda-events/src/event/documentdb/events/drop_database_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDropDatabaseEvent {
#[serde(rename = "_id")]
id: DocumentId,
cluster_time: Timestamp,
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
17 changes: 17 additions & 0 deletions lambda-events/src/event/documentdb/events/drop_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDropEvent {
#[serde(rename = "_id")]
id: DocumentId,
cluster_time: Timestamp,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
21 changes: 21 additions & 0 deletions lambda-events/src/event/documentdb/events/insert_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]

pub struct ChangeInsertEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
//operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
13 changes: 13 additions & 0 deletions lambda-events/src/event/documentdb/events/invalidate_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{DocumentId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeInvalidateEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
// operation_type: String,
}
9 changes: 9 additions & 0 deletions lambda-events/src/event/documentdb/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub mod commom_types;
pub mod delete_event;
pub mod drop_database_event;
pub mod drop_event;
pub mod insert_event;
pub mod invalidate_event;
pub mod rename_event;
pub mod replace_event;
pub mod update_event;
21 changes: 21 additions & 0 deletions lambda-events/src/event/documentdb/events/rename_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeRenameEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,

#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
//operation_type: String,
#[serde(default)]
txn_number: Option<String>,
to: DatabaseCollection,
}
20 changes: 20 additions & 0 deletions lambda-events/src/event/documentdb/events/replace_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeReplaceEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
19 changes: 19 additions & 0 deletions lambda-events/src/event/documentdb/events/update_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeUpdateEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
96 changes: 96 additions & 0 deletions lambda-events/src/event/documentdb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
pub mod events;

use self::events::{
delete_event::ChangeDeleteEvent, drop_database_event::ChangeDropDatabaseEvent, drop_event::ChangeDropEvent,
insert_event::ChangeInsertEvent, invalidate_event::ChangeInvalidateEvent, rename_event::ChangeRenameEvent,
replace_event::ChangeReplaceEvent, update_event::ChangeUpdateEvent,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(tag = "operationType", rename_all = "camelCase")]
pub enum ChangeEvent {
Insert(ChangeInsertEvent),
Delete(ChangeDeleteEvent),
Drop(ChangeDropEvent),
DropDatabase(ChangeDropDatabaseEvent),
Invalidate(ChangeInvalidateEvent),
Replace(ChangeReplaceEvent),
Update(ChangeUpdateEvent),
Rename(ChangeRenameEvent),
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentDbInnerEvent {
pub event: ChangeEvent,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentDbEvent {
#[serde(default)]
pub event_source_arn: Option<String>,
pub events: Vec<DocumentDbInnerEvent>,
#[serde(default)]
pub event_source: Option<String>,
}

#[cfg(test)]
#[cfg(feature = "documentdb")]
mod test {
use super::*;

pub type Event = DocumentDbEvent;

fn test_example(data: &[u8]) {
let parsed: Event = serde_json::from_slice(data).unwrap();
let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();

assert_eq!(parsed, reparsed);
}

#[test]
fn example_documentdb_insert_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-insert-event.json"));
}

#[test]
fn example_documentdb_delete_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-delete-event.json"));
}

#[test]
fn example_documentdb_drop_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-drop-event.json"));
}

#[test]
fn example_documentdb_replace_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-replace-event.json"));
}

#[test]
fn example_documentdb_update_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-update-event.json"));
}

#[test]
fn example_documentdb_rename_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-rename-event.json"));
}

#[test]
fn example_documentdb_invalidate_event() {
test_example(include_bytes!(
"../../fixtures/example-documentdb-invalidate-event.json"
));
}

#[test]
fn example_documentdb_drop_database_event() {
test_example(include_bytes!(
"../../fixtures/example-documentdb-drop-database-event.json"
));
}
}
4 changes: 4 additions & 0 deletions lambda-events/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ pub mod sqs;
#[cfg(feature = "streams")]
pub mod streams;

// AWS Lambda event definitions for DocumentDB
#[cfg(feature = "documentdb")]
pub mod documentdb;

/// AWS Lambda event definitions for EventBridge.
#[cfg(feature = "eventbridge")]
pub mod eventbridge;
30 changes: 30 additions & 0 deletions lambda-events/src/fixtures/example-documentdb-delete-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03",
"events": [
{
"event": {
"_id": {
"_data": "0163eeb6e7000000090100000009000041e1"
},
"clusterTime": {
"$timestamp": {
"t": 1676588775,
"i": 9
}
},
"documentKey": {
"_id": {
"$oid": "63eeb6e7d418cd98afb1c1d7"
}
},
"ns": {
"db": "test_database",
"coll": "test_collection"
},
"operationType": "delete"
}
}
],
"eventSource": "aws:docdb"
}

Loading

0 comments on commit 2675fa9

Please sign in to comment.