diff --git a/Cargo.lock b/Cargo.lock index 08c8f6c1a..4c5f1276a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,7 +94,7 @@ dependencies = [ "polling", "rustix 0.37.23", "slab", - "socket2", + "socket2 0.4.9", "waker-fn", ] @@ -146,6 +146,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -475,7 +524,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -534,6 +583,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.28" @@ -705,7 +763,7 @@ dependencies = [ "signal-hook", "sketches-ddsketch", "smallvec", - "socket2", + "socket2 0.4.9", "tracing", "typenum", ] @@ -769,6 +827,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -798,6 +862,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -858,7 +923,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -869,7 +934,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix 0.38.4", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -898,11 +963,12 @@ dependencies = [ [[package]] name = "laminarmq" -version = "0.0.5-rc2" +version = "0.0.5" dependencies = [ "async-io", "async-stream", "async-trait", + "axum", "bincode", "byteorder", "bytes", @@ -923,6 +989,8 @@ dependencies = [ "route-recognizer", "serde", "tokio", + "tower", + "tower-http", "tower-service", "tracing", "tracing-subscriber", @@ -936,9 +1004,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "linux-raw-sys" @@ -977,6 +1045,21 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.5.0" @@ -1019,6 +1102,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1030,13 +1119,13 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1238,9 +1327,15 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.48.1", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project" version = "1.1.2" @@ -1263,9 +1358,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -1314,7 +1409,7 @@ dependencies = [ "libc", "log", "pin-project-lite", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1341,9 +1436,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.64" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" +checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43" dependencies = [ "unicode-ident", ] @@ -1359,9 +1454,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.29" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -1405,8 +1500,17 @@ checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.3.3", + "regex-syntax 0.7.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1417,9 +1521,15 @@ checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.4", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.4" @@ -1476,7 +1586,7 @@ dependencies = [ "io-lifetimes", "libc", "linux-raw-sys 0.3.8", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1489,9 +1599,15 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.3", - "windows-sys", + "windows-sys 0.48.0", ] +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "ryu" version = "1.0.14" @@ -1521,18 +1637,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.171" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.171" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", @@ -1550,6 +1666,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -1609,6 +1747,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "spin" version = "0.9.8" @@ -1661,15 +1809,21 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.25" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tempfile" version = "3.6.0" @@ -1681,7 +1835,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix 0.37.23", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1726,27 +1880,27 @@ dependencies = [ [[package]] name = "tokio" -version = "1.29.1" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", "mio", "num_cpus", "pin-project-lite", - "socket2", + "signal-hook-registry", + "socket2 0.5.7", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -1767,6 +1921,47 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.3.3", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -1780,6 +1975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1823,10 +2019,14 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -1999,7 +2199,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", ] [[package]] @@ -2008,13 +2217,29 @@ version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -2023,38 +2248,86 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + [[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + [[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + [[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + [[package]] name = "windows_x86_64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/Cargo.toml b/Cargo.toml index bd02dd5dc..12ab2b89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ license = "MIT" categories = ["web-programming"] keywords = ["message-queue", "distributed-systems", "segmented-log", "io-uring"] exclude = [".github/", "assets/"] -version = "0.0.5-rc2" +version = "0.0.5" edition = "2021" rust-version = "1.62" @@ -55,6 +55,15 @@ bench = false rlimit = "0.10.1" criterion = { version = "0.5", features = ["html_reports", "async_futures", "async_tokio"] } pprof = { version = "0.12", features = ["flamegraph", "criterion"] } +axum = "0.6.20" +crc32fast = "1.3.2" +hyper = "0.14.27" +serde = { version = "1.0.188", features = ["derive"] } +tokio = { version = "1.32.0", features = ["rt", "rt-multi-thread", "sync", "net", "fs", "signal"] } +tower = { version = "0.4.13", features = ["util", "timeout"] } +tower-http = { version = "0.4.4", features = ["add-extension", "trace"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [[bench]] name = "commit_log_append" diff --git a/README.md b/README.md index 1738b351c..3b92abe22 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ to use `laminarmq` as a library, add the following to your `Cargo.toml`: ```toml [dependencies] -laminarmq = "0.0.5-rc2" +laminarmq = "0.0.5" ``` Refer to latest git [API Documentation](https://arindas.github.io/laminarmq/docs/laminarmq/) or diff --git a/examples/laminarmq-tokio-commit-log-server/README.md b/examples/laminarmq-tokio-commit-log-server/README.md new file mode 100644 index 000000000..b03eb3511 --- /dev/null +++ b/examples/laminarmq-tokio-commit-log-server/README.md @@ -0,0 +1,72 @@ +# laminarmq-tokio-commit-log-server + +A simple persistent commit log server using the tokio runtime. + +## Endpoints + +This server exposes the following HTTP endpoints: + +```rust +.route("/index_bounds", get(index_bounds)) // obtain the index bounds +.route("/records/:index", get(read)) // obtain the record at given index +.route("/records", post(append)) // append a new record at the end of the commit log + +.route("/rpc/truncate", post(truncate)) // truncate the commit log + // expects JSON: { "truncate_index": } + // records starting from truncate_index are removed +``` + +## Usage + +Run the server as follows: + +```sh +cargo run --example laminarmq-tokio-commit-log-server --release +``` + +The server optionally expects an environment variable: `STORAGE_DIRECTORY`. + +The default value is: + +```rust +const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log"; +``` + +You may specify it as follows: + +```sh +STORAGE_DIRECTORY="" cargo run --release +``` + +Once the server is running you may make requests as follows: + +```sh +curl -w "\n" "http://127.0.0.1:3000/index_bounds" + +curl -w "\n" --request POST --data "Hello World" "http://127.0.0.1:3000/records" +curl -w "\n" --request POST --data "Moshi moshi" "http://127.0.0.1:3000/records" +curl -w "\n" --request POST --data "Bonjour <3" "http://127.0.0.1:3000/records" + +curl -w "\n" "http://127.0.0.1:3000/index_bounds" + +curl -w "\n" "http://127.0.0.1:3000/records/1" + +curl -w "\n" --header "Content-Type: application/json" --request POST \ + --data "{\"truncate_index\": 1}" \ + "http://127.0.0.1:3000/rpc/truncate" + +curl -w "\n" "http://127.0.0.1:3000/index_bounds" +``` + +Here's what's happening above: + +- First request find the index_bounds, (highest_index) is exclusive +- We append three records with the given data +- We lookup the current index_bounds after appending to the commit_log +- We read the record at index 1 +- We truncate the commit_log at index 1. All records starting from index 1 are + removed. After this operation the bounds are [0, 1) +- We lookup the current index_bounds after truncating the commit_log + +> Note: The `-w "\n"` flag is for appending a "\n" to the output of curl. This way +> the output is more readable. diff --git a/examples/laminarmq-tokio-commit-log-server/main.rs b/examples/laminarmq-tokio-commit-log-server/main.rs new file mode 100644 index 000000000..4775db5e9 --- /dev/null +++ b/examples/laminarmq-tokio-commit-log-server/main.rs @@ -0,0 +1,556 @@ +use axum::{ + error_handling::HandleErrorLayer, + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use hyper::{Body, Request}; + +extern crate laminarmq; + +use laminarmq::{ + common::{cache::NoOpCache, serde_compat::bincode}, + storage::{ + commit_log::{ + segmented_log::{segment::Config as SegmentConfig, Config, MetaWithIdx, SegmentedLog}, + CommitLog, Record, + }, + impls::{ + common::DiskBackedSegmentStorageProvider, + in_mem::{segment::InMemSegmentStorageProvider, storage::InMemStorage}, + tokio::storage::std_seek_read::{ + StdSeekReadFileStorage, StdSeekReadFileStorageProvider, + }, + }, + }, +}; +use serde::{Deserialize, Serialize}; +use std::{ + env, + fmt::Debug, + future::Future, + io, + net::SocketAddr, + rc::Rc, + thread::{self, JoinHandle}, + time::Duration, +}; +use tokio::{ + signal, + sync::{mpsc, oneshot, AcquireError, RwLock, Semaphore}, + task, +}; +use tower::{BoxError, ServiceBuilder}; +use tower_http::trace::TraceLayer; +use tracing::{error, error_span, info, info_span, Instrument}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub type InMemSegLog = SegmentedLog< + InMemStorage, + (), + crc32fast::Hasher, + u32, + usize, + bincode::BinCode, + InMemSegmentStorageProvider, + NoOpCache, +>; + +#[allow(unused)] +#[derive(Clone)] +struct AppState { + message_tx: mpsc::Sender, +} + +#[derive(Debug)] +pub enum ChannelError { + SendError, + RecvError, +} + +impl AppState { + pub async fn enqueue_request( + &self, + request: AppRequest, + ) -> Result, ChannelError> { + let (resp_tx, resp_rx) = oneshot::channel(); + + let message = Message::Connection { resp_tx, request }; + + self.message_tx + .send(message) + .await + .map_err(|_| ChannelError::SendError)?; + + Ok(resp_rx) + } +} + +pub struct CommitLogServerConfig { + message_buffer_size: usize, + max_connections: usize, +} + +#[allow(unused)] +const IN_MEMORY_SEGMENTED_LOG_CONFIG: Config = Config { + segment_config: SegmentConfig { + max_store_size: 1048576, // = 1MiB + max_store_overflow: 524288, + max_index_size: 1048576, + }, + initial_index: 0, + num_index_cached_read_segments: None, +}; + +const PERSISTENT_SEGMENTED_LOG_CONFIG: Config = Config { + segment_config: SegmentConfig { + max_store_size: 10000000, // ~ 10MB + max_store_overflow: 10000000 / 2, + max_index_size: 10000000, + }, + initial_index: 0, + num_index_cached_read_segments: None, +}; + +const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log"; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + "laminarmq_tokio_commit_log_server=debug,tower_http=debug".into() + }), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let storage_directory = + env::var("STORAGE_DIRECTORY").unwrap_or(DEFAULT_STORAGE_DIRECTORY.into()); + + let (join_handle, message_tx) = CommitLogServer::orchestrate( + CommitLogServerConfig { + message_buffer_size: 1024, + max_connections: 512, + }, + || async { + let disk_backed_storage_provider = + DiskBackedSegmentStorageProvider::<_, _, u32>::with_storage_directory_path_and_provider( + storage_directory, + StdSeekReadFileStorageProvider, + ) + .unwrap(); + + SegmentedLog::< + StdSeekReadFileStorage, + (), + crc32fast::Hasher, + u32, + u64, + bincode::BinCode, + _, + NoOpCache, + >::new( + PERSISTENT_SEGMENTED_LOG_CONFIG, + disk_backed_storage_provider, + ) + .await + .unwrap() + }, + ); + + // Compose the routes + let app = Router::new() + .route("/index_bounds", get(index_bounds)) + .route("/records/:index", get(read)) + .route("/records", post(append)) + .route("/rpc/truncate", post(truncate)) + // Add middleware to all routes + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|error: BoxError| async move { + if error.is::() { + Ok(StatusCode::REQUEST_TIMEOUT) + } else { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Unhandled internal error: {}", error), + )) + } + })) + .timeout(Duration::from_secs(10)) + .layer(TraceLayer::new_for_http()) + .into_inner(), + ) + .with_state(AppState { + message_tx: message_tx.clone(), + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + + tracing::debug!("listening on {}", addr); + + hyper::Server::bind(&addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); + + message_tx.send(Message::Terminate).await.unwrap(); + + tokio::task::spawn_blocking(|| join_handle.join()) + .await + .unwrap() + .unwrap() + .unwrap(); + + info!("Exiting application."); +} + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + tracing::info!("signal received, starting graceful shutdown"); +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct IndexBoundsResponse { + highest_index: u32, + lowest_index: u32, +} + +pub struct StringError(String); + +impl From for StringError { + fn from(value: String) -> Self { + Self(value) + } +} + +impl IntoResponse for StringError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.0).into_response() + } +} + +async fn index_bounds( + State(state): State, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::IndexBounds) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::IndexBounds(index_bounds_response) = response { + Ok(Json(index_bounds_response)) + } else { + Err(StringError("invalid response type".into())) + } +} + +async fn read( + Path(index): Path, + State(state): State, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Read { index }) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::Read { record_value } = response { + Ok(record_value) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AppendResponse { + write_index: u32, +} + +async fn append( + State(state): State, + request: Request, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Append { + record_value: request.into_body(), + }) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving reponse: {:?}", err))??; + + if let AppResponse::Append(append_reponse) = response { + Ok(Json(append_reponse)) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TruncateRequest { + truncate_index: u32, +} + +async fn truncate( + State(state): State, + Json(truncate_request): Json, +) -> Result<(), StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Truncate(truncate_request)) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::Truncate = response { + Ok(()) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug)] +pub enum AppResponse { + IndexBounds(IndexBoundsResponse), + Read { record_value: Vec }, + Append(AppendResponse), + Truncate, +} + +#[derive(Debug)] +pub enum AppRequest { + IndexBounds, + Read { index: u32 }, + Append { record_value: Body }, + Truncate(TruncateRequest), +} + +type ResponseResult = Result; + +pub enum Message { + Connection { + resp_tx: oneshot::Sender, + request: AppRequest, + }, + + Terminate, +} + +#[allow(unused)] +pub struct CommitLogServer { + message_rx: mpsc::Receiver, + commit_log: CL, + max_connections: usize, +} + +impl CommitLogServer { + pub fn new( + message_rx: mpsc::Receiver, + commit_log: CL, + max_connections: usize, + ) -> Self { + Self { + message_rx, + commit_log, + max_connections, + } + } +} + +#[derive(Debug)] +pub enum CommitLogServerError { + ConnPermitAcquireError(AcquireError), + CommitLogError(CLE), + IoError(io::Error), + ResponseSendError, +} + +pub type CommitLogServerResult = Result>; + +impl CommitLogServer +where + CL: CommitLog, Vec, Idx = u32> + 'static, +{ + pub async fn handle_request( + commit_log: Rc>, + request: AppRequest, + ) -> Result> { + match request { + AppRequest::IndexBounds => { + let commit_log = commit_log.read().await; + + Ok(AppResponse::IndexBounds(IndexBoundsResponse { + highest_index: commit_log.highest_index(), + lowest_index: commit_log.lowest_index(), + })) + } + + AppRequest::Read { index: idx } => commit_log + .read() + .await + .read(&idx) + .await + .map(|Record { metadata: _, value }| AppResponse::Read { + record_value: value, + }) + .map_err(CommitLogServerError::CommitLogError), + + AppRequest::Append { record_value } => commit_log + .write() + .await + .append(Record { + metadata: MetaWithIdx { + metadata: (), + index: None, + }, + value: record_value, + }) + .await + .map(|write_index| AppResponse::Append(AppendResponse { write_index })) + .map_err(CommitLogServerError::CommitLogError), + + AppRequest::Truncate(TruncateRequest { + truncate_index: idx, + }) => commit_log + .write() + .await + .truncate(&idx) + .await + .map(|_| AppResponse::Truncate) + .map_err(CommitLogServerError::CommitLogError), + } + } + + pub async fn serve(self) { + let (mut message_rx, commit_log, max_connections) = + (self.message_rx, self.commit_log, self.max_connections); + + let conn_semaphore = Rc::new(Semaphore::new(max_connections)); + let commit_log = Rc::new(RwLock::new(commit_log)); + + let commit_log_copy = commit_log.clone(); + + let local = task::LocalSet::new(); + + local + .run_until(async move { + while let Some(Message::Connection { resp_tx, request }) = message_rx.recv().await { + let (conn_semaphore, commit_log_copy) = + (conn_semaphore.clone(), commit_log_copy.clone()); + + task::spawn_local( + async move { + let response = async move { + let _semaphore_permit = conn_semaphore + .acquire() + .await + .map_err(CommitLogServerError::ConnPermitAcquireError)?; + + let commit_log = commit_log_copy; + + let response = Self::handle_request(commit_log, request).await?; + + Ok::<_, CommitLogServerError>(response) + } + .await + .map_err(|err| format!("{:?}", err)); + + if let Err(err) = resp_tx.send(response) { + error!("error sending response: {:?}", err) + } + } + .instrument(error_span!("commit_log_server_handler_task")), + ); + } + }) + .await; + + match Rc::into_inner(commit_log) { + Some(commit_log) => match commit_log.into_inner().close().await { + Ok(_) => {} + Err(err) => error!("error closing commit_log: {:?}", err), + }, + None => error!("unable to unrwap commit_log Rc"), + }; + + info!("Closed commit_log."); + } + + pub fn orchestrate( + server_config: CommitLogServerConfig, + commit_log_provider: CLP, + ) -> (JoinHandle>, mpsc::Sender) + where + CLP: FnOnce() -> CLF + Send + 'static, + CLF: Future, + CL::Error: Send + 'static, + { + let CommitLogServerConfig { + message_buffer_size, + max_connections, + } = server_config; + + let (message_tx, message_rx) = mpsc::channel::(message_buffer_size); + + ( + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread().build()?; + + rt.block_on( + async move { + let commit_log_server = CommitLogServer::new( + message_rx, + commit_log_provider().await, + max_connections, + ); + + commit_log_server.serve().await; + + info!("Done serving requests."); + } + .instrument(info_span!("commit_log_server")), + ); + + Ok(()) + }), + message_tx, + ) + } +}