Skip to content

KonaFiber用户文档 292

wxiang edited this page Jul 26, 2021 · 1 revision

KonaFiber用户文档-292

KonaFiber介绍

参考OpenJDK社区Loom项目( https://github.com/openjdk/loom )以及早期的Java协程原型实现[1],在Kona JDK8中提供协程方案。协程方案相比Java原生线程,切换速度更快、占用内存更少,在保持原有Java同步编程模式的情况下,降低了线程创建、切换的开销,适用于高并发、IO密集的的业务场景。

KonaFiber整体目标包括:

  • 兼容Loom的大部分API,保持后续可以无缝升级到OpenJDK的Loom协程方案;相关的API可以查看文档以及Loom的wiki [2];
  • 实现类似JKU方案的有栈协程方案,相比Loom更侧重于协程切换效率

从Java应用开发者的角度看,协程主要提供了三个概念:Continuation、VirtualThread、协程调度器

Continuation

Continuation:[5][6]与Coroutine概念类似,记录了一段程序的执行状态;提供了可重入的状态、以及Yield方法;但一般不推荐直接使用Continuation。

ContinuationScope scope = new ContinuationScope("scope");
Runnable target = new Runnable() {
    public void run() {
        System.out.println("before yield");
        Continuation.yield(scope);
        System.out.println("resume yield");
    }
};
Continuation cont = new Continuation(scope, target);
cont.run();
System.out.println("out of continuation");
cont.run();
System.out.println("finish continuation");
    
 输出:
 before yield
 out of continuation
 result yield
 finish continuation

ContinuationScope

ContinuationScope用于支持嵌套的Continuation切换;在KonaFiber当前实现中不支持嵌套Continuation使用。为了兼容Loom的API,保留ContinuationScope,后续增加支持。

void Continuation::run()

在当前线程执行Continuation构造时传入的Runnable方法。

第一次调用,执行Continuation构造函数中的Runnable方法; 后续run方法的调用从上一次Yield的位置继续执行; 整个Runnable执行完毕后,再次调用会抛出IllegalStateException;

boolean Continuation::yield(ContinuationScope scope)

从Continuation返回到run的调用点,Scope必须是当前Continuation对应的Scope。 Yield可能存在失败的情况,yield成功返回true,失败则返回false。 -当前Continuation的栈上有frame上持有ObjectMonitor锁; -当前Continuation的栈上有Native Frame;

Continuation Pin

当yield失败,Continuation继续执行,在Loom/KonaFiber中这种情况称为Pin,当前线程被Continuation绑定了,两种失败条件消失后,再次yield可以成功。

VirtualThread

VirtualThread:封装Continuation成为一个协程;作为Thread的子类加上了状态、线程方法的协程实现(例如Thread.yield会被替换成协程切换)。 VirtualThread不是public类,用户不能直接创建VirtualThread,需要通过Thread接口提供的OfVirtual方法、startVirtualThread方法。VirtualThread作为Thread的子类基本支持所有Thread的操作,start、join、sleep、interrupt等。应用代码中Thread的操作基本不用做大的修改。

CarrierThread

协程需要在运行时绑定一个线程作为承载,因此KonaFiber(与Loom相同)引入了CarrierThread的概念。协程在执行时被调度器调度到某一个CarrierThread上执行,这个动作成为Mount;在协程被切换出去后与carrierThread解除绑定(这个动作是Unmount),协程下一次执行时,调度器可以选择别的carrierThread执行。 Fiber1

协程调度器

协程调度器:java.util.concurrent.Executor类型;默认调度器实现为ForkJoinPool,有work stealing和动态扩展的能力; Fiber2

Java API

KonaFiber协程相关的接口在java.lang.Thread、sun.misc.VirtualThreads中:

  • Thread提供了创建协程的API,协程操作的接口都直接复用了线程的接口,包括start、join、sleep、interrupt等。
    • VirtualThread不是public类,必须通过Thread类的Builder方法创建协程。
    • 在Java应用中只需要将创建线程替换成创建协程,创建线程工厂替换成协程工厂,就可以将现有运行在线程上的代码运行到协程上。
  • VirtualThreads提供协程的挂起(park)、恢复(unpark)的操作接口,在调用前需要通过Thread.isVirtual()接口确定当前park/unpark的是协程还是线程。

详细接口描述可以查看API,下面会给出常见的协程创建、使用API示例。

Thread

创建、启动协程

startVirtualThread

startVirtualThread在默认协程调度器上创建协程执行task任务。

public static Thread startVirtualThread(Runnable task); 
Thread.OfVirtual创建协程

使用Thread.OfVirtual创建协程,指定协程执行的Runnable任务和调度器,使用start/unstarted返回一个Thread对象(实际是VirtualThread对象)。

  • allowSetThreadLocals(),配置协程是否使用ThreadLocal
  • inheritThreadLocals,配置协程是否继承ThreadLocal
Thread.ofVirtual().scheduler([Executor scheduler])
    [name(String name [, int start])]
    [allowSetThreadLocals(boolean allow)]
    [inheritInheritableThreadLocals(boolean inherit)]
    [uncaughtExceptionHandler(UncaughtExceptionHandler ueh)]
    unstarted(Runnable task) | start(Runnable task);

协程创建用例:

Thread vt = Thread.startVirtualThread(() -> {
    System.out.println("run in virtual thread");    // perform some computation
    Future<> f = send_requent();                        
    T result = f.get();                                    //  will not block "Thread" but park "VirtualThread"
    use result                                              // automatic unpark if reqeust is back and continuation execution
});
vt.join();
协程工厂

也可以使用Thread.OfVirtual创建协程工厂,区别是不需要指定Runnable task,最后调用factory方法。

Thread.ofVirtual().scheduler([Executor scheduler])
    [name(String name [, int start])]
    [allowSetThreadLocals(boolean allow)]
    [inheritInheritableThreadLocals(boolean inherit)]
    [uncaughtExceptionHandler(UncaughtExceptionHandler ueh)]
    factory();

调用线程工厂,这个线程工厂创建的线程符合前面定义的属性。 创建最多一万个协程的FixedThreadPool

ThreadFactory factory = Thread.ofVirtual().scheduler(scheduler).name("my_vt", 0).factory();
ExecutorService executor = Executors.newFixedThreadPool(10000, factory);

public final boolean isVirtual()

判断线程是否是VirtualThread协程,如果是协程返回true。

    public static void unpark(Thread thread) {
        if (thread != null) {
            if (thread.isVirtual()) {
                VirtualThreads.unpark(thread);
            } else {
                UNSAFE.unpark(thread);
            }
        }
    }

VirtualThreads

KonaFiber中的sun.misc.VirutalThreads对应Loom中的jdk.internal.misc.VirtualThreads。VirtualThreads提供了一些针对VirtualThread的静态方法,用于进行协程的操作,主要包括协程的Park、Unpark、得到协程mount的线程(currentCarrierThread) 通常应用开发者在改造一些Block接口的时会用到这些API,用于控制协程的状态。

public static Thread currentCarrierThread()

返回执行协程的Java线程。

public static void park()

挂起当前协程,协程进入Block状态;如果当前执行的线程不是VirutalThred,会抛InternalError;协程只支持自己park自己,不支持被其它线程、协程park。

协程Park时可能会发生失败的情况,比如

  1. 协程持有Object Monitor:使用了synchronized
  2. 调用了JNI frame再调用Java代码 协程park失败会block协程的carrier thread(carrier thread无法执行其它runnable任务),需要等待协程被unpark,carrier thread才能继续执行。可以通过打开jdk.tracePinnedThreads环境变量在park失败时抓取stack trace和失败的原因(支持中)。
    public static void unpark(Thread thread) {
        if (thread != null) {
            if (thread.isVirtual()) {
                VirtualThreads.unpark(thread);
            } else {
                UNSAFE.unpark(thread);
            }
        }
    }

public static void park(long nanos)

挂起当前协程并给定timeout的时间,协程进入Block状态;如果当前执行的线程不是VirutalThred,InternalError;时间到后协程会自动从Block状态重新进入runnable的状态。

public static void unpark(Thread thread)

恢复给定协程到Runnable状态,可以被调度器调度执行,如果给定的Thread不是协程,会抛InternalError,需要在使用前判断。如果被unpark的协程不在Block状态,unpark会抵消下一次的park。如果在park之前连续有两次unpark,只能抵消一次park。Unpark不会有失败的情况。

    public static void unpark(Thread thread) {
        if (thread != null) {
            if (thread.isVirtual()) {
                VirtualThreads.unpark(thread);
            } else {
                UNSAFE.unpark(thread);
            }
        }
    }

Loom差异

KonaFiber的协程相关接口尽量保持为Loom的一个子集,以方便后续JDK版本升级、Loom版本的切换等操作。除了实现上的差异,这里主要描述API和功能上的差异;

  • 不支持结构化并发-structured concurrency[4];
  • Continuation
    • 不支持嵌套的Continuation
    • 不支持Continuation被抢占式的切换(被动调度)
  • VirtualThread
  • Thread
  • 协程调度器
    • 不支持UnboundedScheduler
  • 其它
    • Socket相关API的协程化改造不支持;主要通过Netty等异步网络框架和KonaFiber协同,参见Demo netty_stress_test

Demo

continuation_launch_test

  • 测试continuation创建/切换/跨线程切换的性能

mysql_sync_stress_demo

  • 直接使用同步的mysql库,访问数据库时,协程会退化成线程,因此推荐将mysql操作提交到一个独立的线程池;
  • 本demo是为了验证,将mysql操作提交到独立线程池的方案是高效的;
  • 分别测试mysql操作,“直接由线程执行”/“由线程提交到线程池”/“由协程提交到线程池”/“直接使用异步编程”,对比它们的性能差异;

netty_stress_test

  • netty是一个异步框架,本用例展示“如何将异步框架封装成同步的方式”;
  • 对比“使用协程”和“使用线程”将异步框架封装成同步以后的性能差异;

sleep_stress_test

  • 测试“指定个数的协程/线程不停的sleep(1),比较协程和线程的性能差异“

yield_stress_test

  • 测试“指定个数的协程/线程不停的切换“,比较协程/线程的性能差异

参考文档

[1] https://ssw.jku.at/Research/Projects/JVM/Coroutines.html

[2] https://wiki.openjdk.java.net/display/loom/Main

[3] https://cr.openjdk.java.net/~rpressler/loom/loom/sol1_part1.html

[4] https://cr.openjdk.java.net/~rpressler/loom/loom/sol1_part2.html

[5] https://en.wikipedia.org/wiki/Coroutine

[6] https://en.wikipedia.org/wiki/Continuation