Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentStream: issues with Send and Sync bounds #195

Open
sgued opened this issue Sep 29, 2024 · 2 comments
Open

ConcurrentStream: issues with Send and Sync bounds #195

sgued opened this issue Sep 29, 2024 · 2 comments
Assignees
Labels
help wanted Extra attention is needed

Comments

@sgued
Copy link

sgued commented Sep 29, 2024

There appears to be some invariance introduced by the IntoConcurrentStream and Map implementation that confuses the compiler for the Send and Sync autotraits

The following:

use futures_concurrency::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};

fn assert_value_send_sync<T: Send + Sync>(_v: &T) {}

async fn iterative_method(context: &str) -> Vec<String> {
    let vec: Vec<u32> = Vec::new();
    let mut acc = Vec::new();
    for item in vec {
        acc.push(handle_item(&item, &context).await);
    }
    acc
}

async fn concurrent_method(context: &str) -> Vec<String> {
    let vec: Vec<u32> = Vec::new();
    vec.into_co_stream()
        .map(|d| async move { handle_item(&d, &*context).await })
        .collect()
        .await
}

async fn handle_item(item: &u32, context: &str) -> String {
    format!("{context}: {item}")
}

fn main() {
    let context = String::new();
    let iterative = async {
        iterative_method(&context).await;
    };
    assert_value_send_sync(&iterative);

    let concurrent = async {
        concurrent_method(&context).await;
    };
    assert_value_send_sync(&concurrent);
}

fails with the error:

rror[E0308]: mismatched types
  --> src/main.rs:36:5
   |
17 |         .map(|d| async move { handle_item(&d, &*context).await })
   |                  ----------
   |                  |
   |                  the expected `async` block
   |                  the found `async` block
...
36 |     assert_value_send_sync(&concurrent);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected `async` block `{async block@src/main.rs:17:18: 17:28}`
              found `async` block `{async block@src/main.rs:17:18: 17:28}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object
note: the lifetime requirement is introduced here
  --> src/main.rs:3:30
   |
3  | fn assert_value_send_sync<T: Send + Sync>(_v: &T) {}
   |                              ^^^^

error[E0308]: mismatched types
  --> src/main.rs:36:5
   |
17 |         .map(|d| async move { handle_item(&d, &*context).await })
   |                  ----------
   |                  |
   |                  the expected `async` block
   |                  the found `async` block
...
36 |     assert_value_send_sync(&concurrent);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected `async` block `{async block@src/main.rs:17:18: 17:28}`
              found `async` block `{async block@src/main.rs:17:18: 17:28}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object
note: the lifetime requirement is introduced here
  --> src/main.rs:3:37
   |
3  | fn assert_value_send_sync<T: Send + Sync>(_v: &T) {}
   |                                     ^^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `future-concurrency-repro` (bin "future-concurrency-repro") due to 2 previous errors

I can't make a minimal reproducer, but I first encountered this error message, which I think it's the same issue. To me this suggests this might even be a bug in rustc. Even if it isn't a bug, the error message makes no sense and should be fixed.

error: implementation of `Send` is not general enough
   --> src/main.rs:348:34
    |
348 |         .route("/dashboard/:id", post(dashboard_post))
    |                                  ^^^^^^^^^^^^^^^^^^^^ implementation of `Send` is not general enough
    |
    = note: `Send` would have to be implemented for the type `&str`
    = note: ...but `Send` is actually implemented for the type `&'0 str`, for some specific lifetime `'0`
@yoshuawuyts
Copy link
Owner

Thanks for raising this! Yeah having a minimal reproducer for this would be helpful. I'm also a little short on time so diving into this is a little tricky right now for me too.

Minimizing this test case would actually be a really good contribution if someone wants to make one!

@yoshuawuyts yoshuawuyts added the help wanted Extra attention is needed label Nov 20, 2024
@matheus-consoli matheus-consoli self-assigned this Dec 5, 2024
@matheus-consoli
Copy link
Collaborator

I'm working on the pin violation first (#188), but here is a minimized test if anyone wants to take a look

fn assert_value_is_send_sync<T: Send + Sync>(_v: &T) {}

async fn concurrent_method(ctx: &str) -> Vec<String> {
    let vec: Vec<u32> = Vec::new();
    vec.into_co_stream()
        .map(|d| async move { a(&d, ctx).await })
        .collect()
        .await
}

async fn a(i: &u32, c: &str) -> String {
    format!("{c}: {i}")
}

assert_value_is_send_sync(&concurrent_method("asd"));

changing the map combinator to take a for<'a> F: Fn(Self::Item) -> Fut + 'a (since each call to the closure gets a new loan until the closure's future is completed) fixes the first issue of the bounds, but the next problem is the classic that ctx may not live long enough

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants