Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only a single cpu core is utilized when join-ing on tokio runtime #185

Open
manifest opened this issue May 4, 2024 · 8 comments
Open

Only a single cpu core is utilized when join-ing on tokio runtime #185

manifest opened this issue May 4, 2024 · 8 comments
Labels
bug Something isn't working

Comments

@manifest
Copy link

manifest commented May 4, 2024

Only a single CPU core is utilized when join-ing futures on tokio runtime. The issue does not arise when using async_std.

some stats:

# async_std:naive
Total duration: 2.3061 s
Rate: 433627.7135 reqs/s

# async_std:futures_concurrency_join
Total duration: 2.8707 s
Rate: 348346.6330 reqs/s

# tokio:naive
Total duration: 2.4885 s
Rate: 401842.6764 reqs/s

# tokio:futures_concurrency_join
Terminated after a few minutes. Utilization here is limited to a single CPU core.

tokio test cases:

use std::time::{Duration, Instant};
use tokio::{spawn, time::sleep};

#[derive(Debug, Clone)]
struct Workload {
    task_duration: Duration,
    n_tasks: usize,
    start: Instant,
}

#[tokio::main]
async fn main() {
    let workload = Workload {
        task_duration: Duration::from_millis(100),
        n_tasks: 1000000,
        start: Instant::now(),
    };

    // naive(workload.clone()).await;
    futures_concurrency_join(workload.clone()).await;

    let total_duration = (Instant::now() - workload.start).as_secs_f64();
    let rate = workload.n_tasks as f64 / total_duration;
    println!("Total duration: {:.4} s", total_duration);
    println!("Rate: {:.4} reqs/s", rate);
}

async fn naive(workload: Workload) {
    let tasks = (0..workload.n_tasks)
        .map(|_| {
            spawn({
                async move {
                    sleep(workload.task_duration.clone()).await;
                }
            })
        })
        .collect::<Vec<_>>();

    for task in tasks {
        task.await.ok();
    }
}

async fn futures_concurrency_join(workload: Workload) {
    use futures_concurrency::prelude::*;

    let tasks = (0..workload.n_tasks)
        .map(|_| async move {
            sleep(workload.task_duration.clone()).await;
        })
        .collect::<Vec<_>>();

    tasks.join().await;
}

async_std test cases:

use std::time::{Duration, Instant};
use async_std::task::{spawn, sleep};

#[derive(Debug, Clone)]
struct Workload {
    task_duration: Duration,
    n_tasks: usize,
    start: Instant,
}

#[async_std::main]
async fn main() {
    let workload = Workload {
        task_duration: Duration::from_millis(100),
        n_tasks: 1000000,
        start: Instant::now(),
    };

    // naive(workload.clone()).await;
    futures_concurrency_join(workload.clone()).await;

    let total_duration = (Instant::now() - workload.start).as_secs_f64();
    let rate = workload.n_tasks as f64 / total_duration;
    println!("Total duration: {:.4} s", total_duration);
    println!("Rate: {:.4} reqs/s", rate);
}

async fn naive(workload: Workload) {
    let tasks = (0..workload.n_tasks)
        .map(|_| {
            spawn({
                async move {
                    sleep(workload.task_duration.clone()).await;
                }
            })
        })
        .collect::<Vec<_>>();

    for task in tasks {
        task.await
    }
}

async fn futures_concurrency_join(workload: Workload) {
    use futures_concurrency::prelude::*;

    let tasks = (0..workload.n_tasks)
        .map(|_| async move {
            sleep(workload.task_duration.clone()).await;
        })
        .collect::<Vec<_>>();

    tasks.join().await;
}
@manifest
Copy link
Author

manifest commented May 4, 2024

async-std v1.12.0
futures-concurrency v7.6.0
tokio v1.37.0

macOS Sonoma 14.4.1

@yoshuawuyts
Copy link
Owner

Thanks for reporting this @manifest. I can reproduce this locally, and we should look into the root cause of this further.

You mentioned Tokio only uses a single core with this; how did you measure that? I've been able to see the slowdown, but I didn't yet look at core utilization.

I'd like to test the scaling with FuturesUnordered to see whether this has something to do with futures-concurrency specifically, or whether it's something to do with Tokio and intermediate wakers.

@manifest
Copy link
Author

manifest commented May 5, 2024

You mentioned Tokio only uses a single core with this; how did you measure that? I've been able to see the slowdown, but I didn't yet look at core utilization.

I've just checked the CPU usage in htop. The process with the latest test case using Tokio runtime utilized up to 100% (unitilization of a single cpu core). My comment isn't entirely accurate because different CPU cores were involved, but it seemed as sequential execution.

@manifest
Copy link
Author

manifest commented May 5, 2024

There is another interesting case resulting in a stalled execution. At least on my machine, I can constantly reproduce it when n_tasks is greater than 4.

#[derive(Debug, Clone)]
struct Workload {
    task_duration: Duration,
    n_tasks: usize,
    start: Instant,
}

#[tokio::main]
async fn main() {
    let workload = Workload {
        task_duration: Duration::from_millis(100),
        n_tasks: 10,
        start: Instant::now(),
    };

    futures_concurrency_co_stream(workload.clone()).await;

    let total_duration = (Instant::now() - workload.start).as_secs_f64();
    let rate = workload.n_tasks as f64 / total_duration;
    println!("Total duration: {:.4} s", total_duration);
    println!("Rate: {:.4} tasks/s", rate);
}

async fn futures_concurrency_co_stream(workload: Workload) {
    use futures_concurrency::prelude::*;

    (0..workload.n_tasks)
        .collect::<Vec<_>>()
        .into_co_stream()
        .for_each(|_| async {
            sleep(workload.task_duration.clone()).await;
        })
        .await;
}

@yoshuawuyts
Copy link
Owner

yoshuawuyts commented May 5, 2024

There is another interesting case resulting in a stalled execution. At least on my machine, I can constantly reproduce it when n_tasks is greater than 4.

@manifest The last test case you posted almost certainly will hit #182 - tokio::sleep + ConcurrentStream currently don't work as expected. 4+ elements hit a relocation, which currently violate referential stability. We still need to fix that.

I've just checked the CPU usage in htop. The process with the latest test case using Tokio runtime utilized up to 100% (unitilization of a single cpu core). My comment isn't entirely accurate because different CPU cores were involved, but it seemed as sequential execution.

I see, thank you for clarifying!

@yoshuawuyts yoshuawuyts added the bug Something isn't working label May 6, 2024
@yoshuawuyts
Copy link
Owner

I've created two more test cases using FuturesUnordered and futures_util::future::join_all:

async fn future_util_join_all(workload: Workload) {
    let tasks = (0..workload.n_tasks)
        .map(|_| async move {
            sleep(workload.task_duration.clone()).await;
        })
        .collect::<Vec<_>>();

    join_all(tasks).await;
}

async fn futures_unordered_join(workload: Workload) {
    let tasks = (0..workload.n_tasks)
        .map(|_| async move {
            sleep(workload.task_duration.clone()).await;
        })
        .collect::<Vec<_>>();

    let mut group = FuturesUnordered::from_iter(tasks.into_iter());

    while let Some(_) = group.next().await {}
}

The results for n_tasks = 10_000 is as follows:

tokio naive
Total duration: 0.1288 s
Rate: 77654.0458 reqs/s

futures_concurrency_join
Total duration: 0.6355 s
Rate: 15736.2799 reqs/s

futures_join_all
Total duration: 0.7635 s
Rate: 13097.1088 reqs/s

futures_unordered
Total duration: 0.8850 s
Rate: 11299.5670 reqs/s

Comparatively the futures_concurrency impl seems to perform better than the alternative concurrency impls, but all of them seem to do significantly worse than using tokio's for..in loop over handles. My hypothesis right now is that there are two things happening here which we should separate:

  1. Tokio isn't particularly happy with handles being placed inside of concurrency operators. That's something to figure out on the Tokio side.
  2. If we go from 10_000 to 100_000 the futures_concurrency::Join impl isn't too happy about it, and seems to hit a pathological case. That's likely something on our side, and we should figure out why that happens.

@manifest
Copy link
Author

manifest commented May 6, 2024

Interestingly, the excessive awaiting points seems to be hurting performance. For me that is counter-intuitive, because I was thinking about them as zero-cost abstractions. There is almost 30% degradation in performance between the following test cases. I use n_tasks = 1_000_000.

futures_unordered_join
Total duration: 0.2884 s
Rate: 3467896.8878 tasks/s

futures_unordered_join_excessive_await_points
Total duration: 0.4054 s
Rate: 2466827.3394 tasks/s
async fn futures_unordered_join(workload: Workload) {
    use futures::stream::{FuturesUnordered, StreamExt};

    let tasks = (0..workload.n_tasks)
        .map(|_| {
            sleep(workload.task_duration.clone())
        })
        .collect::<Vec<_>>();

    let mut group = FuturesUnordered::from_iter(tasks.into_iter());
    while let Some(_) = group.next().await {}
}

async fn futures_unordered_join_excessive_await_points(workload: Workload) {
    use futures::stream::{FuturesUnordered, StreamExt};

    let tasks = (0..workload.n_tasks)
        .map(|_| async {
            sleep(workload.task_duration.clone()).await
        })
        .collect::<Vec<_>>();

    let mut group = FuturesUnordered::from_iter(tasks.into_iter());
    while let Some(_) = group.next().await {}
}

I've compared naive vs futures_unordered_join on n_tasks = 1_000_000 just for the sake of curiosity. futures_unordered_join performs more then 3 times faster on my machine.

naive
Total duration: 0.8474 s
Rate: 1180038.8759 tasks/s

futures_unordered_join
Total duration: 0.2884 s
Rate: 3467896.8878 tasks/s
async fn naive(workload: Workload) {
    let tasks = (0..workload.n_tasks)
        .map(|_| spawn({
            sleep(workload.task_duration)
        }))
        .collect::<Vec<_>>();

    for task in tasks {
        task.await.ok();
    }
}

Here is stats for n_tasks = 10_000. All implementations behave on par with naive and futures_concurrency_join slightly behind.

naive
Total duration: 0.1079 s
Rate: 92649.3189 tasks/s

futures_unordered_join
Total duration: 0.1049 s
Rate: 95305.6797 tasks/s

futures_unordered_join_excessive_await_points
Total duration: 0.1048 s
Rate: 95426.9807 tasks/s

futures_concurrency_join
Total duration: 0.1198 s
Rate: 83464.3257 tasks/s

@yoshuawuyts
Copy link
Owner

My previous analysis wasn't right; I spent some time on a plane debugging this today, and I realized a number of things:

  1. I indeed wasn't measuring things right - that means it's not a Tokio issue and it's squarely on us.
  2. What we're seeing is quadratic scaling, that is almost certainly because our iterations are fairly expensive.
  3. The reason why futures-concurrency uses a single core is because it's only concurrent, not parallel. The examples should use task.spawn to also be parallel.
  4. When using task.spawn, FuturesUnordered actually ends up performing slightly better than the naive tokio impl.

I think what needs to happen is change the internal slab impl to one with better iteration speeds as well as referential stability. That would not only solve the quadratic scaling, it would also address the issues in #182.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants