Skip to content

Commit

Permalink
broken-pipe-fix & dependencies update (#166)
Browse files Browse the repository at this point in the history
* fix children to be fast and not create all tiles in memory

* fix children and clippy

* Bump tower-http from 0.5.2 to 0.6.0

Bumps [tower-http](https://github.com/tower-rs/tower-http) from 0.5.2 to 0.6.0.
- [Release notes](https://github.com/tower-rs/tower-http/releases)
- [Commits](tower-rs/tower-http@tower-http-0.5.2...tower-http-0.6.0)

---
updated-dependencies:
- dependency-name: tower-http
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>

* fix unused fn

* cargo update

* fix broken pipe problemos

* updated tower and other dependencies

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
jessekrubin and dependabot[bot] authored Sep 26, 2024
1 parent c3efd0f commit 3ee3cd6
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 80 deletions.
80 changes: 21 additions & 59 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/utiles-oxipng/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ clap = { workspace = true, features = ["derive", "color", "wrap_help"] }
futures.workspace = true
indicatif.workspace = true
oxipng = { version = "9.1.2", features = [] }
#oxipng = { version = "9.1.2", features = ["parallel"] }
size.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-stream.workspace = true
Expand Down
16 changes: 8 additions & 8 deletions crates/utiles/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ required-features = [
[dependencies]
anyhow.workspace = true
async-sqlite = { workspace = true, features = ["bundled", "functions", "trace", "blob"] }
async-trait = "0.1.80"
axum = { version = "0.7.5", features = ["tokio", "json", "macros"] }
async-trait = "0.1.83"
axum = { version = "0.7.6", features = ["tokio", "json", "macros"] }
chrono = "0.4.38"
clap = { workspace = true, features = ["derive", "color", "wrap_help"], optional = true }
fnv = "1.0.7"
futures = "0.3.29"
futures = "0.3.30"
geo-types.workspace = true
geojson.workspace = true
globset = "0.4.15"
hex = "0.4.3"
image = "0.25.1"
image = "0.25.2"
imagesize = "0.13.0"
indicatif.workspace = true
indoc = { workspace = true }
Expand All @@ -49,7 +49,7 @@ rusqlite = { workspace = true, features = ["bundled", "blob", "backup", "functio
serde.workspace = true
serde_json = { workspace = true, features = ["preserve_order"] }
size.workspace = true
sqlite-hashes = { version = "0.7.3", default-features = false, features = ["hex", "window", "md5", "fnv", "xxhash"] }
sqlite-hashes = { version = "0.7.5", default-features = false, features = ["hex", "window", "md5", "fnv", "xxhash"] }
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
Expand All @@ -58,11 +58,11 @@ time = "0.3.36"
tokio = { workspace = true, features = ["fs"] }
tokio-stream.workspace = true
tower = { version = "0.5.1", features = ["timeout"] }
tower-http = { version = "0.5.1", features = ["trace", "timeout", "add-extension", "util", "request-id", "compression-gzip", "compression-zstd", "async-compression"] }
tower-http = { version = "0.6.1", features = ["trace", "timeout", "add-extension", "util", "request-id", "compression-gzip", "compression-zstd", "async-compression"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["fmt", "json", "env-filter", "chrono"] }
utiles-core = { path = "../utiles-core", version = "0.7.0-alpha.1" }
walkdir = "2.4.0"
utiles-core = { path = "../utiles-core", version = "0.7.0-alpha.9" }
walkdir = "2.5.0"
xxhash-rust = { workspace = true, features = ["const_xxh3", "const_xxh64", "const_xxh32", "xxh3", "xxh64", "xxh32"] }

# deadpool = { version = "0.12.1", features = ["managed"], default-features = false }
Expand Down
21 changes: 13 additions & 8 deletions crates/utiles/src/cli/commands/enumerate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::{BufWriter, Write};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{debug, error};
use tracing::debug;

async fn enumerate_db(
fspath: &str,
Expand All @@ -27,10 +27,10 @@ async fn enumerate_db(
let s = mbt.enumerate_rx(Some(&query))?;
let mut tiles = ReceiverStream::new(s);
while let Some(tile) = tiles.next().await {
// let tile_str = format!("{} {}", fspath, tile.json_arr());
let tile_str = tformatter.fmt_tile(&tile);
if let Err(e) = tx.send(tile_str).await {
return Err(crate::UtilesError::Error(format!("enumerate_db: {e:?}")));
debug!("recv dropped: {:?}", e);
break;
}
}
Ok(())
Expand All @@ -55,20 +55,25 @@ pub async fn enumerate_main(args: &EnumerateArgs) -> UtilesResult<()> {
let mut buf = BufWriter::with_capacity(32 * 1024, lock);
let mut count: usize = 0;
while let Some(tile_str) = rx.blocking_recv() {
buf.write_all(tile_str.as_bytes())?;
buf.write_all(b"\n")?;
let tile_str_newline = format!("{tile_str}\n");

if let Err(e) = buf.write_all(tile_str_newline.as_bytes()) {
debug!("write_all err: {:?}", e);
break;
}
count += 1;
if count % 1024 == 0 {
buf.flush()?;
if let Err(e) = buf.flush() {
error!("write_task: {:?}", e);
debug!("flushing err: {:?}", e);
break;
}
}
}

// flush remaining
buf.flush()?;
if let Err(e) = buf.flush() {
debug!("final flush err: {:?}", e);
}
Ok(())
});
let tfilter = args.filter_args.tiles_filter_maybe();
Expand Down
20 changes: 16 additions & 4 deletions crates/utiles/src/mbt/tiles_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rusqlite::Connection;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, warn};
use tracing::{debug, error, warn};

use utiles_core::prelude::*;

Expand Down Expand Up @@ -85,20 +85,32 @@ pub fn make_enumerate_rx(
let z_column = s.column_index("zoom_level")?;
let x_column = s.column_index("tile_column")?;
let y_column = s.column_index("tile_row")?;
let tx = tx.clone();
let tiles_iters = s.query_map(rusqlite::params![], |row| {
let z: u8 = row.get(z_column)?;
let x: u32 = row.get(x_column)?;
let yup: u32 = row.get(y_column)?;
let tile = utile_yup!(x, yup, z);
let tx = tx.clone();
if let Err(e) = tx.blocking_send(tile) {
warn!("Blocking send error: {:?}", e);
debug!("Blocking send error: {:?}", e);
Ok(false)
} else {
Ok(true)
}
Ok(())
})?;
// Consume the iterator
for row in tiles_iters {
let _ = row;
match row {
Ok(true) => {}
Ok(false) => {
break;
}
Err(e) => {
error!("enum tiles iter error: {:?}", e);
break;
}
}
}
Ok(())
})
Expand Down

0 comments on commit 3ee3cd6

Please sign in to comment.