プラットフォーム・スレッド(スレッドプール/CompletableFuture)との比較で理解する並行処理の変遷

はじめに

Java21から「仮想スレッド(Virtual Threads)」が正式版として導入されました。Spring Boot でも3.2以降、仮想スレッドがサポートされています。(Spring Boot 3.2 Release Notes

本記事では、この仮想スレッドがどのようなものなのか、従来の「プラットフォーム・スレッド」と比較し、Javaの並行処理の変遷を追いながら、コード例と共に説明していきます。

対象読者

  • Javaの基本は理解しておりスレッド処理について知りたい人
  • Javaの従来のスレッドと仮想スレッドについて興味がある人
  • Java21で正式導入された仮想スレッドの概要を理解したい人

この記事で分かること

  • 従来のJavaのスレッド(プラットフォーム・スレッド)が抱えていた課題
  • 仮想スレッドと従来のプラットフォーム・スレッドとの違い
  • プラットフォーム・スレッド(スレッドプール・CompletableFuture)と仮想スレッドの変遷
  • 仮想スレッドの仕組み
  • 仮想スレッド利用時の注意点

並行処理の変遷

ここからはJavaの並行処理について、次の①~④の方法を、順に説明していきます。

仮想スレッド登場の背景を知ることで、従来のスレッドとの違いやその特徴を理解することができます。

  • ① プラットフォーム・スレッド
  • ② スレッドプール(プラットフォーム・スレッド)
  • ③ CompletableFuture(プラットフォーム・スレッド)
  • ④ 仮想スレッド - Virtual Threads(Java21以降)

①~③は、全てプラットフォーム・スレッドを使っています。様々な工夫によって、プラットフォーム・スレッドをいかに効率よく使うか、という改善の歴史です。一方で、④の仮想スレッドは根本的に異なるアプローチを取っており、これまで1対1の対応関係であったOSスレッドとJavaスレッドを、少数のOSスレッドに多数の仮想スレッドをマッピング(JEP 444)することで、これまでの工夫(スレッドプールやCompletableFutureの使用)を全て不要にします。

① プラットフォーム・スレッド

Javaで並行処理を行う最も基本的な方法が、java.lang.Threadクラスを使ったスレッドの生成です。

Threadでは、通常、オペレーティング・システムによってスケジュールされたカーネル・スレッドに1:1でマップされる「プラットフォーム・スレッド」の作成がサポートされています。

引用:Java21 API仕様

これまでJavaで単に「スレッド」と呼ばれていたものは、Java21以降で仮想スレッドと区別するため「プラットフォーム・スレッド」として説明されています。

ThreadにはRunnableを引数とするコンストラクタが定義されており、これを用いて作成することができます。(後述の通り、仮想スレッドと対になるようプラットフォーム・スレッドの作成についても、Java21でビルダーを返すファクトリメソッドが提供されています。そのためThread.ofPlatform()で作成することも可能です。)

Java
Runnabletask=()->System.out.println("実行");// スレッド内で実行する処理
Threadthread=newThread(task);// スレッドを1つ作成
thread.start();// スレッドを開始
thread.join();// スレッドの終了を待機
サンプルコード全文(Example01.java
Example01.java
packagecom.example;

importjava.util.ArrayList;
importjava.util.List;

publicclassExample01{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        longstart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=5;
        List<Thread>threads=newArrayList<Thread>(threadCount);

        // スレッドを作成してリストに追加
        for(inti=1;i<=threadCount;i++){
            Threadthread=newThread(task(i));
            threads.add(thread);
        }

        // スレッドを開始
        for(Threadthread:threads){
            thread.start();
        }

        // スレッドの終了を待機
        for(Threadthread:threads){
            thread.join();
        }

        System.out.println("===== 終了 =====");

        longend=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(end-start)+"ms");
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            try{
                Thread.sleep(2_000);
            }catch(InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }
}

出力は次のようになります。1つ実行するのに2秒かかるタスクを5つ処理するのに、10秒かからず2秒で全体の処理が終了していることが分かります。

サンプルコード(Example01.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク3): Thread[#23,Thread-2,5,main]
開始(タスク1): Thread[#21,Thread-0,5,main]
開始(タスク5): Thread[#25,Thread-4,5,main]
開始(タスク4): Thread[#24,Thread-3,5,main]
開始(タスク2): Thread[#22,Thread-1,5,main]
終了(タスク4): Thread[#24,Thread-3,5,main] (処理時間: 2084ms)
終了(タスク5): Thread[#25,Thread-4,5,main] (処理時間: 2068ms)
終了(タスク2): Thread[#22,Thread-1,5,main] (処理時間: 2084ms)
終了(タスク1): Thread[#21,Thread-0,5,main] (処理時間: 2073ms)
終了(タスク3): Thread[#23,Thread-2,5,main] (処理時間: 2068ms)
===== 終了 =====
全体の処理時間: 2110ms

このようにシンプルなコードで記述できますが、従来のJavaスレッドはOSスレッドの薄いラッパーであり、1対1の対応関係であるため、使用できるスレッド数はOSスレッドの数に制限されます。また、生成コストが高く、大量のスレッドはメモリ消費も問題になります。

プラットフォーム・スレッドは、オペレーティング・システム(OS)スレッドを囲むthinラッパーとして実装されます。プラットフォーム・スレッドは、基礎となるOSスレッド上でJavaコードを実行します。また、プラットフォーム・スレッドはプラットフォーム・スレッドの存続期間中ずっとOSスレッドを保持します。このため、使用可能なプラットフォーム・スレッドの数はOSスレッドの数に制限されます。

引用:プラットフォーム・スレッドとは

② スレッドプール(プラットフォーム・スレッド)

前述の通り、プラットフォーム・スレッドは、生成コストが高いという課題があります。

これを解消するため「スレッドプール」という考え方が用いられます。毎回新しいスレッドを生成するのではなく、あらかじめ決まった数のスレッドを事前に作成してプールしておき、これを使い回すというものです。

Executors.newFixedThreadPool(n)を使用すると、固定数のスレッドを生成して確保しておくことができます。

Java
ExecutorServiceservice=Executors.newFixedThreadPool(5);// 固定数(5)のスレッドを確保
Runnabletask=()->System.out.println("実行");// スレッド内で実行する処理
service.submit(task);// タスクを登録して実行
service.shutdown();// スレッドの終了
service.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);// スレッドの終了を待機
サンプルコード全文(Example02.java
Example02.java
packagecom.example;

importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.TimeUnit;

publicclassExample02{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=5;
        // 固定で5件のスレッドを事前作成しプールしておく
        ExecutorServiceexecutorService=Executors.newFixedThreadPool(threadCount);

        // 処理件数(ここでは5件の処理を並列実行する例)
        intprocessCount=5;

        // スレッドプールから取り出したスレッドにタスクを登録して実行
        for(inti=1;i<=processCount;i++){
            executorService.submit(task(i));
        }

        // スレッドの終了と待機
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            try{
                Thread.sleep(2_000);
            }catch(InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }
}
サンプルコード(Example02.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク1): Thread[#21,pool-1-thread-1,5,main]
開始(タスク5): Thread[#25,pool-1-thread-5,5,main]
開始(タスク4): Thread[#24,pool-1-thread-4,5,main]
開始(タスク2): Thread[#22,pool-1-thread-2,5,main]
開始(タスク3): Thread[#23,pool-1-thread-3,5,main]
終了(タスク1): Thread[#21,pool-1-thread-1,5,main] (処理時間: 2074ms)
終了(タスク5): Thread[#25,pool-1-thread-5,5,main] (処理時間: 2074ms)
終了(タスク2): Thread[#22,pool-1-thread-2,5,main] (処理時間: 2075ms)
終了(タスク3): Thread[#23,pool-1-thread-3,5,main] (処理時間: 2075ms)
終了(タスク4): Thread[#24,pool-1-thread-4,5,main] (処理時間: 2074ms)
===== 終了 =====
全体の処理時間: 2162ms

この例では5スレッドで5タスクを処理しているため、①の例との違いが分かりにくいかもしれません。次のように、3スレッドで5タスクを処理する例を見てみると、違いが顕著になります。

サンプルコード全文(Example03.java
Example03.java
packagecom.example;

importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.TimeUnit;

publicclassExample03{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=3;
        // 固定で3件のスレッドを事前作成しプールしておく
        ExecutorServiceexecutorService=Executors.newFixedThreadPool(threadCount);

        // 処理件数(ここでは5件の処理を並列実行する例)
        intprocessCount=5;

        // スレッドプールから取り出したスレッドにタスクを登録して実行
        for(inti=1;i<=processCount;i++){
            executorService.submit(task(i));
        }

        // スレッドの終了と待機
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            try{
                Thread.sleep(2_000);
            }catch(InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }
}
サンプルコード(Example03.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク1): Thread[#21,pool-1-thread-1,5,main]
開始(タスク3): Thread[#23,pool-1-thread-3,5,main]
開始(タスク2): Thread[#22,pool-1-thread-2,5,main]
終了(タスク1): Thread[#21,pool-1-thread-1,5,main] (処理時間: 2057ms)
終了(タスク2): Thread[#22,pool-1-thread-2,5,main] (処理時間: 2060ms)
終了(タスク3): Thread[#23,pool-1-thread-3,5,main] (処理時間: 2057ms)
開始(タスク4): Thread[#21,pool-1-thread-1,5,main]
開始(タスク5): Thread[#23,pool-1-thread-3,5,main]
終了(タスク4): Thread[#21,pool-1-thread-1,5,main] (処理時間: 2013ms)
終了(タスク5): Thread[#23,pool-1-thread-3,5,main] (処理時間: 2014ms)
===== 終了 =====
全体の処理時間: 4119ms

プールされたスレッドが3つだけのため、まず3つのタスクが先に処理され、それが終わると残りの2つのタスクが処理されていることが分かります。これにより全体の処理時間は2秒でなく4秒になります。またスレッドIDからも、pool-1-thread-1, pool-1-thread-2, pool-1-thread-3 の3つが使い回されていることを確認できます。

補足:try-with-resources文の利用

ExecutorServiceAutoCloseableを実装し(JEP 425)、close()メソッドが定義されているため、try-with-resources文で記述することにより、tryブロックを抜ける時にスレッドの終了と待機が実行され、明示的に終了と待機の処理を書く必要がなくなります。

Java
// スレッドプール作成(tryブロックを抜けると自動でスレッドの終了と待機)
try(ExecutorServiceexecutorService=Executors.newFixedThreadPool(5)){
    Runnabletask=()->System.out.println("実行");// スレッド内で実行する処理
    executorService.submit(task);// タスクを登録して実行
}
Example04.java
packagecom.example;

importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;

publicclassExample04{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=3;

        // 固定で3件のスレッドを事前作成しプールしておく(try-with-resourcesで記述)
        try(ExecutorServiceexecutorService=Executors.newFixedThreadPool(threadCount)){

            // 処理件数(ここでは5件の処理を並列実行する例)
            intprocessCount=5;

            // スレッドプールから取り出したスレッドにタスクを登録して実行
            for(inti=1;i<=processCount;i++){
                executorService.submit(task(i));
            }

            /* ↑
* try-with-resourcesの使用により、tryブロックを抜けると自動的にスレッドの終了と待機が処理されるため、
* 明示的にshutdown()やawaitTermination()を呼び出す必要がなくなる。
*/
            // executorService.shutdown();
            // executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            try{
                Thread.sleep(2_000);
            }catch(InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }
}

このようにスレッドプールを利用するという工夫により、スレッド生成コストの問題を改善することができます。

しかしスレッド処理には、この方法だけでは解決できない問題がまだ残ります。例えばHTTPリクエストを行うような処理の場合、外部のAPIへリクエストを送信してからレスポンスが返ってくるまで、そのスレッドは何も処理を行うことなく、ただ待っているだけになってしまいます。3スレッドで5タスクを処理する例で見た通り、スレッドプールを使用すると、事前に確保されたスレッドの数以上のタスクは、その前のタスクの処理が完了するまで実行されません。実際にはスレッドは何もせずただレスポンスを待っているだけなのに、この間も次のタスクが実行されず、処理効率が落ちることに繋がってしまいます。この、スレッド処理におけるI/O待ちの問題についても、なんらかの方法で解決する必要があります。

③ CompletableFuture(プラットフォーム・スレッド)

CompletableFutureとは、

明示的に(その値とステータスを設定して)完了できるFutureです。

そしてFutureは、

非同期計算の結果を表します。

つまり、完了可能(Completable)な(=事前に完了後の処理を定義しておき、完了前に処理結果を返し、後から完了を通知できる)、未来,将来(Future)の結果、を表すオブジェクトです。

スレッドプールの問題はI/O待ち中もスレッドを占有することでした。CompletableFutureを使用することで、コールバック関数による非同期処理を記述できるようになります。

Java
// CompletableFutureタスクを作成(コールバック関数による非同期プログラミング)
CompletableFuture<Void>task=CompletableFuture.supplyAsync(()->any)
                                                .thenCompose(any->client.sendAsync(request,responseBodyHandler))
                                                .thenAccept(response->System.out.println(response));
// 完了を待機して結果を返却
task.join();

supplyAsync()にタスク開始時の処理を、thenCompose()にノンブロッキングI/O処理を、thenAccept()に結果が返ってきた後の完了処理を、といった具合に、事前に各段階で実行してほしい関数を渡しておき、そうして作成されたCompletableFutureをまとめて実行する、という流れになります。処理を実行順に記述していくのではなく、関数を定義して記述しておくという非同期プログラミングの方式が採用されていることが分かります。

補足:ブロッキングI/OとノンブロッキングI/O

I/Oには、ブロッキングI/OとノンブロッキングI/Oの2種類があります。java.ioパッケージによるファイル入出力のようなブロッキングI/Oは、処理が完了するまでスレッドがその場で待ち続けます。一方、HttpClient.sendAsync()のようなノンブロッキングI/Oは、処理をOSに委ねた瞬間にスレッドを解放します。CompletableFutureがスレッドを解放できるのは、このノンブロッキングI/Oと組み合わせた時です。同じくHttpClientに定義されたsendメソッドでは、ブロッキングI/Oになってしまうため注意が必要です。

また上記の例では単一のCompletableFutureを作成していますが、次に示すサンプルコードのように複数タスクの場合は、allOf()でまとめてから、join()で全タスクの完了を待ちます。

サンプルコード全文(Example05.java

ここまでの例では時間のかかるタスクの再現にThread.sleep()を利用してきましたが、Thread.sleep()もブロッキング処理のため、前述の通りCompletableFutureの検証には使えません。そのためここではWireMockを利用してモックWebサーバーを起動し、非同期HTTPリクエストを再現しています。

Example05.java
packagecom.example;

importstaticcom.github.tomakehurst.wiremock.client.WireMock.aResponse;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.get;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

importjava.net.URI;
importjava.net.http.HttpClient;
importjava.net.http.HttpRequest;
importjava.net.http.HttpResponse;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.CompletableFuture;

importcom.github.tomakehurst.wiremock.WireMockServer;

publicclassExample05{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        // モックサーバーを起動(1秒の固定遅延を設定)
        WireMockServerwireMockServer=startWireMockServer();

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=3;

        // 処理件数(ここでは5件の処理を並列実行する例)
        intprocessCount=5;

        // タスク(CompletableFuture)のリストを作成
        List<CompletableFuture<Void>>completableFutures=newArrayList<>(threadCount);
        for(inti=1;i<=processCount;i++){
            completableFutures.add(task(i));
        }

        // 全てのタスク(CompletableFuture)が完了するまで待機
        CompletableFuture.allOf(completableFutures.toArray(newCompletableFuture[0])).join();

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");

        // モックサーバーを停止
        wireMockServer.stop();
    }

    // タスク(CompletableFuture)を作成
    privatestaticCompletableFuture<Void>task(inttaskId){

        longstart=System.currentTimeMillis();

        HttpClientclient=HttpClient.newHttpClient();
        HttpRequestrequest=HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8080/api"))
                .build();

        returnCompletableFuture
                // 非同期処理の起点(別スレッドで最初に実行したい処理)
                .supplyAsync(()->{
                    System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());
                    returntaskId;
                })
                // 前の処理結果を受け取り別のCompletableFutureに繋ぐ(ノンブロッキングI/Oなど)
                .thenCompose(id->{
                    System.out.println("送信(タスク"+id+"): "+Thread.currentThread());
                    returnclient.sendAsync(request,HttpResponse.BodyHandlers.ofString());
                })
                // 非同期処理の終点(前の処理結果を受け取る最終的な完了処理)
                .thenAccept(response->{
                    longend=System.currentTimeMillis();
                    System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
                });
    }

    // WireMockを利用してノンブロッキングI/O(非同期HTTPリクエスト)を再現
    privatestaticWireMockServerstartWireMockServer(){
        WireMockServerwireMockServer=newWireMockServer(8080);
        wireMockServer.stubFor(get(urlEqualTo("/api"))
                      .willReturn(aResponse().withStatus(200)
                                             .withFixedDelay(1_000)));
        wireMockServer.start();
        returnwireMockServer;
    }
}

出力は次のようになります。リクエスト送信後に一度スレッドが解放されているため、終了時のスレッドID(ForkJoinPool.commonPool-worker-n)が開始および送信時と異なっていることが分かります。

サンプルコード(Example05.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク2): Thread[#42,ForkJoinPool.commonPool-worker-2,5,main]
開始(タスク1): Thread[#40,ForkJoinPool.commonPool-worker-1,5,main]
送信(タスク2): Thread[#42,ForkJoinPool.commonPool-worker-2,5,main]
開始(タスク3): Thread[#44,ForkJoinPool.commonPool-worker-3,5,main]
送信(タスク3): Thread[#44,ForkJoinPool.commonPool-worker-3,5,main]
送信(タスク1): Thread[#40,ForkJoinPool.commonPool-worker-1,5,main]
開始(タスク4): Thread[#46,ForkJoinPool.commonPool-worker-4,5,main]
送信(タスク4): Thread[#46,ForkJoinPool.commonPool-worker-4,5,main]
開始(タスク5): Thread[#48,ForkJoinPool.commonPool-worker-5,5,main]
送信(タスク5): Thread[#48,ForkJoinPool.commonPool-worker-5,5,main]
終了(タスク5): Thread[#40,ForkJoinPool.commonPool-worker-1,5,main] (処理時間: 1937ms)
終了(タスク4): Thread[#42,ForkJoinPool.commonPool-worker-2,5,main] (処理時間: 1927ms)
終了(タスク2): Thread[#46,ForkJoinPool.commonPool-worker-4,5,main] (処理時間: 1929ms)
終了(タスク3): Thread[#44,ForkJoinPool.commonPool-worker-3,5,main] (処理時間: 1939ms)
終了(タスク1): Thread[#48,ForkJoinPool.commonPool-worker-5,5,main] (処理時間: 2049ms)
===== 終了 =====
全体の処理時間: 2086ms

また②スレッドプールの例では、Executors.newFixedThreadPool()の引数にプールするスレッド数を明示的に指定する必要があり、事前に確保しておくスレッド数は開発者に委ねられていましたが、CompletableFutureは指定がない場合デフォルト値が自動で決定されます。

明示的なExecutor引数を持たないすべての非同期メソッドは、ForkJoinPool.commonPool()を使用して実行されます

引用:Java21 API仕様
補足:②スレッドプールと③CompletableFutureの比較

②スレッドプールでは、プールサイズよりもタスク数が多い場合、次のようにI/O待ちの間スレッドは他の処理を行えないため、並行処理であっても2周分の時間がかかってしまいます。一方で③CompletableFutureでは、ノンブロッキングI/O処理に入った時点で処理を戻しスレッドが解放されるため、少ないスレッド数でも効率的にタスクを処理することが可能です。

補足:②スレッドプールと③CompletableFutureの比較

サンプルコードで実際に比較してみると、結果は次のようになります。

②スレッドプールのサンプルコード全文(Example06.java

CompletableFutureと比較するため、これまでThread.sleep()で検証していた部分をHTTPリクエスト処理に書き換えています。

Example06.java
packagecom.example;

importstaticcom.github.tomakehurst.wiremock.client.WireMock.aResponse;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.get;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

importjava.io.IOException;
importjava.net.URI;
importjava.net.http.HttpClient;
importjava.net.http.HttpRequest;
importjava.net.http.HttpResponse;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.TimeUnit;

importcom.github.tomakehurst.wiremock.WireMockServer;

publicclassExample06{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        // モックサーバーを起動(1秒の固定遅延を設定)
        WireMockServerwireMockServer=startWireMockServer();

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=3;
        // 固定で3件のスレッドを事前作成しプールしておく
        ExecutorServiceexecutorService=Executors.newFixedThreadPool(threadCount);

        // 処理件数(ここでは5件の処理を並列実行する例)
        intprocessCount=5;

        // スレッドプールから取り出したスレッドにタスクを登録して実行
        for(inti=1;i<=processCount;i++){
            executorService.submit(task(i));
        }

        // スレッドの終了と待機
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");

        // モックサーバーを停止
        wireMockServer.stop();
    }

    // ブロッキングI/O(同期HTTPリクエスト)を行うタスク
    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            HttpClientclient=HttpClient.newHttpClient();
            HttpRequestrequest=HttpRequest.newBuilder()
                    .uri(URI.create("http://localhost:8080/api"))
                    .build();

            try{
                System.out.println("送信(タスク"+taskId+"): "+Thread.currentThread());
                client.send(request,HttpResponse.BodyHandlers.ofString());
            }catch(IOException|InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }

    // WireMockを利用してノンブロッキングI/O(非同期HTTPリクエスト)を再現
    privatestaticWireMockServerstartWireMockServer(){
        WireMockServerwireMockServer=newWireMockServer(8080);
        wireMockServer.stubFor(get(urlEqualTo("/api"))
                      .willReturn(aResponse().withStatus(200)
                                             .withFixedDelay(1_000)));
        wireMockServer.start();
        returnwireMockServer;
    }
}
③CompletableFutureのサンプルコード全文(Example07.java

CompletableFutureではデフォルトのプールサイズが自動で決定されてしまうため、ExecutorServiceを渡して固定スレッド数3で条件を揃えられるように書き換えています。

Example07.java
packagecom.example;

importstaticcom.github.tomakehurst.wiremock.client.WireMock.aResponse;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.get;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

importjava.net.URI;
importjava.net.http.HttpClient;
importjava.net.http.HttpRequest;
importjava.net.http.HttpResponse;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.CompletableFuture;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;

importcom.github.tomakehurst.wiremock.WireMockServer;

publicclassExample07{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        // スレッドプール固定数3のExecutorService(CompletableFuture.supplyAsync()に指定するために作成)
        try(ExecutorServicefixedThreadPoolExecutorService=Executors.newFixedThreadPool(3)){

            // モックサーバーを起動(1秒の固定遅延を設定)
            WireMockServerwireMockServer=startWireMockServer();

            longprogramStart=System.currentTimeMillis();
            System.out.println("===== 開始 =====");

            // スレッド数を指定
            intthreadCount=3;

            // 処理件数(ここでは5件の処理を並列実行する例)
            intprocessCount=5;

            // タスク(CompletableFuture)のリストを作成
            List<CompletableFuture<Void>>completableFutures=newArrayList<>(threadCount);
            for(inti=1;i<=processCount;i++){
                completableFutures.add(task(i,fixedThreadPoolExecutorService));
            }

            // 全てのタスク(CompletableFuture)が完了するまで待機
            CompletableFuture.allOf(completableFutures.toArray(newCompletableFuture[0])).join();

            System.out.println("===== 終了 =====");

            longprogramEnd=System.currentTimeMillis();
            System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");

            // モックサーバーを停止
            wireMockServer.stop();
        }
    }

    // タスク(CompletableFuture)を作成
    // プールするスレッド数をデフォルト値から変更するためExecutorServiceを受け取る
    privatestaticCompletableFuture<Void>task(inttaskId,ExecutorServiceexecutorService){

        longstart=System.currentTimeMillis();

        HttpClientclient=HttpClient.newHttpClient();
        HttpRequestrequest=HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8080/api"))
                .build();

        returnCompletableFuture
                // 非同期処理の起点(別スレッドで最初に実行したい処理)
                .supplyAsync(()->{
                    System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());
                    returntaskId;
                },executorService)
                // 前の処理結果を受け取り別のCompletableFutureに繋ぐ(ノンブロッキングI/Oなど)
                .thenCompose(id->{
                    System.out.println("送信(タスク"+id+"): "+Thread.currentThread());
                    returnclient.sendAsync(request,HttpResponse.BodyHandlers.ofString());
                })
                // 非同期処理の終点(前の処理結果を受け取る最終的な完了処理)
                .thenAccept(response->{
                    longend=System.currentTimeMillis();
                    System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
                });
    }

    // WireMockを利用してノンブロッキングI/O(非同期HTTPリクエスト)を再現
    privatestaticWireMockServerstartWireMockServer(){
        WireMockServerwireMockServer=newWireMockServer(8080);
        wireMockServer.stubFor(get(urlEqualTo("/api"))
                      .willReturn(aResponse().withStatus(200)
                                             .withFixedDelay(1_000)));
        wireMockServer.start();
        returnwireMockServer;
    }
}

②スレッドプール(Example06.java)の結果

$ 実行結果
===== 開始 =====
開始(タスク1): Thread[#39,pool-2-thread-1,5,main]
開始(タスク2): Thread[#40,pool-2-thread-2,5,main]
開始(タスク3): Thread[#41,pool-2-thread-3,5,main]
送信(タスク3): Thread[#41,pool-2-thread-3,5,main]
送信(タスク1): Thread[#39,pool-2-thread-1,5,main]
送信(タスク2): Thread[#40,pool-2-thread-2,5,main]
終了(タスク3): Thread[#41,pool-2-thread-3,5,main] (処理時間: 1815ms)
終了(タスク2): Thread[#40,pool-2-thread-2,5,main] (処理時間: 1815ms)
終了(タスク1): Thread[#39,pool-2-thread-1,5,main] (処理時間: 1816ms)
開始(タスク5): Thread[#40,pool-2-thread-2,5,main]
開始(タスク4): Thread[#41,pool-2-thread-3,5,main]
送信(タスク5): Thread[#40,pool-2-thread-2,5,main]
送信(タスク4): Thread[#41,pool-2-thread-3,5,main]
終了(タスク5): Thread[#40,pool-2-thread-2,5,main] (処理時間: 1049ms)
終了(タスク4): Thread[#41,pool-2-thread-3,5,main] (処理時間: 1050ms)
===== 終了 =====
全体の処理時間: 2885ms

③CompletableFuture(Example07.java)の結果

$ 実行結果
===== 開始 =====
開始(タスク1): Thread[#40,pool-1-thread-1,5,main]
開始(タスク2): Thread[#42,pool-1-thread-2,5,main]
送信(タスク1): Thread[#40,pool-1-thread-1,5,main]
送信(タスク2): Thread[#42,pool-1-thread-2,5,main]
開始(タスク3): Thread[#44,pool-1-thread-3,5,main]
送信(タスク3): Thread[#44,pool-1-thread-3,5,main]
開始(タスク4): Thread[#42,pool-1-thread-2,5,main]
送信(タスク4): Thread[#42,pool-1-thread-2,5,main]
開始(タスク5): Thread[#40,pool-1-thread-1,5,main]
送信(タスク5): Thread[#40,pool-1-thread-1,5,main]
終了(タスク1): Thread[#74,ForkJoinPool.commonPool-worker-2,5,main] (処理時間: 1968ms)
終了(タスク4): Thread[#78,ForkJoinPool.commonPool-worker-3,5,main] (処理時間: 1843ms)
終了(タスク2): Thread[#75,ForkJoinPool.commonPool-worker-1,5,main] (処理時間: 1862ms)
終了(タスク5): Thread[#76,ForkJoinPool.commonPool-worker-5,5,main] (処理時間: 1840ms)
終了(タスク3): Thread[#77,ForkJoinPool.commonPool-worker-4,5,main] (処理時間: 1862ms)
===== 終了 =====
全体の処理時間: 2016ms

どちらも3スレッドで5タスクを処理している点は変わりませんが、③CompletableFutureの方では、送信後レスポンスを待つことなく、すぐに処理が戻され残りのタスク4と5もスレッド1と2によって送信が処理されているため、全体の処理時間が2秒になっています。

補足:Spring WebFlux(Project Reactor)について

CompletableFutureのように少ないスレッドで同時実行を処理できるようにする考え方をWebアプリケーション全体に適用したものが Spring WebFluxProject Reactor)です。「ノンブロッキング」と「関数型」による「リアクティブ」プログラミングモデルにより、Mono/Fluxというリアクティブ型でデータの流れを記述しますが、やや学習コストの高いWebフレームワークといえます。

CompletableFuture(や Spring WebFlux )によりスレッド効率を改善することはできますが、サンプルコードを見ても分かる通り、ある程度コードは複雑になってしまいます。その他、順次処理の基本的な演算子を使用できなくなる、ステップ実行できずデバッグ機能の使用が難しくなる等、様々な課題が残ります。

これにより、OSスレッドの不足によって課されていたスループットの制限は解消されますが、その代償は大きいものです。すなわち、いわゆる非同期プログラミングスタイルを採用する必要があり、I/O操作の完了を待たずに、後でコールバックに完了を通知する、独立した一連のI/Oメソッドを使用することになります。専用のスレッドがないため、開発者はリクエスト処理ロジックを小さな段階に分割し(通常はラムダ式として記述される)、API を使用してそれらを順次的なパイプラインとして構成する必要があります(例えば、CompletableFuture や、いわゆる「リアクティブ」フレームワークを参照)。したがって、ループや try/catch ブロックといった、言語の基本的な順次構成演算子は使用できなくなります。

引用:JEP 444: Virtual Threads > Motivation > Improving scalability with the asynchronous style

④ 仮想スレッド - Virtual Threads(Java21以降)

OSスレッドとJavaスレッドが1対1の薄いラッパーである①のプラットフォーム・スレッドに対して、根本的なアプローチを変えることで、②や③のような工夫を不要なものとするのが仮想スレッドです。これにより、シンプルなコード記述とスレッド効率の改善を両立させることができます。

ThreadクラスおよびThread.Builderインタフェースを使用した仮想スレッドの作成の通り、ビルダーを返すファクトリメソッドThread.ofVirtual()が用意されているため、①プラットフォーム・スレッドのようにシンプルな記述で仮想スレッドを作成し、実行することが可能です。

Java
Runnabletask=()->System.out.println("実行");// スレッド内で実行する処理
Threadthread=Thread.ofVirtual().unstarted(task);// 未開始の仮想スレッドを1つ作成
thread.start();// スレッドを開始
thread.join();// スレッドの終了を待機

(①プラットフォーム・スレッドも、従来のコンストラクタ(new Thread)を使用する方法の他、Java21以降はThread.ofPlatform()で作成することが可能となっています。)

サンプルコード全文(Example08.java
Example08.java
packagecom.example;

importstaticcom.github.tomakehurst.wiremock.client.WireMock.aResponse;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.get;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

importjava.io.IOException;
importjava.net.URI;
importjava.net.http.HttpClient;
importjava.net.http.HttpRequest;
importjava.net.http.HttpResponse;
importjava.util.ArrayList;
importjava.util.List;

importcom.github.tomakehurst.wiremock.WireMockServer;

publicclassExample08{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        // モックサーバーを起動(1秒の固定遅延を設定)
        WireMockServerwireMockServer=startWireMockServer();

        longstart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // スレッド数を指定
        intthreadCount=5;
        List<Thread>threads=newArrayList<Thread>(threadCount);

        // 仮想スレッドを作成してリストに追加
        for(inti=1;i<=threadCount;i++){
            Threadthread=Thread.ofVirtual().unstarted(task(i));
            threads.add(thread);
        }

        // スレッドを開始
        for(Threadthread:threads){
            thread.start();
        }

        // スレッドの終了を待機
        for(Threadthread:threads){
            thread.join();
        }

        System.out.println("===== 終了 =====");

        longend=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(end-start)+"ms");

        // モックサーバーを停止
        wireMockServer.stop();
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            HttpClientclient=HttpClient.newHttpClient();
            HttpRequestrequest=HttpRequest.newBuilder()
                    .uri(URI.create("http://localhost:8080/api"))
                    .build();

            try{
                System.out.println("送信(タスク"+taskId+"): "+Thread.currentThread());
                client.send(request,HttpResponse.BodyHandlers.ofString());
            }catch(IOException|InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }

    privatestaticWireMockServerstartWireMockServer(){
        WireMockServerwireMockServer=newWireMockServer(8080);
        wireMockServer.stubFor(get(urlEqualTo("/api"))
                      .willReturn(aResponse().withStatus(200)
                                             .withFixedDelay(1_000)));
        wireMockServer.start();
        returnwireMockServer;
    }
}

ブロッキングHTTPリクエストのHttpClient.send()を使用し、CompletableFutureを用いることなく従来通りRunnableを渡していますが、出力結果を見ると、開始・送信・終了の各処理で異なるスレッドIDが割り当てられ、I/O待ち中にOSスレッドがアンマウントされ、仮想スレッドがI/O待ちの間、OSスレッドは解放されている(終了時は別のOSスレッドがマウントされ、キャリアとなっている)ことが分かります。

サンプルコード(Example08.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-1
開始(タスク3): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-3
開始(タスク5): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-5
開始(タスク4): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-4
開始(タスク2): VirtualThread[#40]/runnable@ForkJoinPool-1-worker-2
送信(タスク3): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-6
送信(タスク4): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-1
送信(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-4
送信(タスク2): VirtualThread[#40]/runnable@ForkJoinPool-1-worker-3
送信(タスク5): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-2
終了(タスク4): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-2 (処理時間: 1978ms)
終了(タスク2): VirtualThread[#40]/runnable@ForkJoinPool-1-worker-9 (処理時間: 1978ms)
終了(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-6 (処理時間: 1980ms)
終了(タスク5): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-4 (処理時間: 1975ms)
終了(タスク3): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-1 (処理時間: 1980ms)
===== 終了 =====
全体の処理時間: 1999ms

また、JEP 444: Virtual Threads > Description > Using virtual threads vs. platform threadsのように、ExecutorServiceを使用した記述も可能です。

前述した補足の通り、try-with-resources文で記述すればスレッドの終了と待機は明示的に書く必要がなくなるため、こちらも次のようにシンプルなコードになります。

Java
// 仮想スレッド作成(tryブロックを抜けると自動でスレッドの終了と待機)
try(ExecutorServiceexecutorService=Executors.newVirtualThreadPerTaskExecutor()){
    Runnabletask=()->System.out.println("実行");// スレッド内で実行する処理
    executorService.submit(task);// タスクを登録して実行
}
サンプルコード全文(Example09.java
Example09.java
packagecom.example;

importstaticcom.github.tomakehurst.wiremock.client.WireMock.aResponse;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.get;
importstaticcom.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

importjava.io.IOException;
importjava.net.URI;
importjava.net.http.HttpClient;
importjava.net.http.HttpRequest;
importjava.net.http.HttpResponse;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;

importcom.github.tomakehurst.wiremock.WireMockServer;

publicclassExample09{

    publicstaticvoidmain(String[]args)throwsInterruptedException{

        // モックサーバーを起動(1秒の固定遅延を設定)
        WireMockServerwireMockServer=startWireMockServer();

        longprogramStart=System.currentTimeMillis();
        System.out.println("===== 開始 =====");

        // 仮想スレッド作成(ExecutorServiceがAutoCloseableを実装しているためtry-with-resourcesで記述)
        try(ExecutorServiceexecutorService=Executors.newVirtualThreadPerTaskExecutor()){

            // 処理件数(ここでは5件の処理を並列実行する例)
            intprocessCount=5;

            // タスクを登録して実行
            for(inti=1;i<=processCount;i++){
                executorService.submit(task(i));
            }
        }

        System.out.println("===== 終了 =====");

        longprogramEnd=System.currentTimeMillis();
        System.out.println("全体の処理時間: "+(programEnd-programStart)+"ms");

        // モックサーバーを停止
        wireMockServer.stop();
    }

    privatestaticRunnabletask(inttaskId){
        return()->{
            longstart=System.currentTimeMillis();
            System.out.println("開始(タスク"+taskId+"): "+Thread.currentThread());

            HttpClientclient=HttpClient.newHttpClient();
            HttpRequestrequest=HttpRequest.newBuilder()
                    .uri(URI.create("http://localhost:8080/api"))
                    .build();

            try{
                System.out.println("送信(タスク"+taskId+"): "+Thread.currentThread());
                client.send(request,HttpResponse.BodyHandlers.ofString());
            }catch(IOException|InterruptedExceptione){
                Thread.currentThread().interrupt();
            }

            longend=System.currentTimeMillis();
            System.out.println("終了(タスク"+taskId+"): "+Thread.currentThread()+" (処理時間: "+(end-start)+"ms)");
        };
    }

    privatestaticWireMockServerstartWireMockServer(){
        WireMockServerwireMockServer=newWireMockServer(8080);
        wireMockServer.stubFor(get(urlEqualTo("/api"))
                      .willReturn(aResponse().withStatus(200)
                                             .withFixedDelay(1_000)));
        wireMockServer.start();
        returnwireMockServer;
    }
}
サンプルコード(Example09.java)の実行結果
$ 実行結果
===== 開始 =====
開始(タスク2): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-2
開始(タスク5): VirtualThread[#44]/runnable@ForkJoinPool-1-worker-5
開始(タスク3): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-3
開始(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-1
開始(タスク4): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-4
送信(タスク4): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-2
送信(タスク3): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-4
送信(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-5
送信(タスク5): VirtualThread[#44]/runnable@ForkJoinPool-1-worker-1
送信(タスク2): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-3
終了(タスク3): VirtualThread[#42]/runnable@ForkJoinPool-1-worker-4 (処理時間: 1972ms)
終了(タスク5): VirtualThread[#44]/runnable@ForkJoinPool-1-worker-5 (処理時間: 1969ms)
終了(タスク1): VirtualThread[#39]/runnable@ForkJoinPool-1-worker-3 (処理時間: 1973ms)
終了(タスク2): VirtualThread[#41]/runnable@ForkJoinPool-1-worker-2 (処理時間: 1973ms)
終了(タスク4): VirtualThread[#43]/runnable@ForkJoinPool-1-worker-1 (処理時間: 1970ms)
===== 終了 =====
全体の処理時間: 2000ms

仮想スレッドの仕組み(キャリア/マウントとアンマウント/仮想スレッドスケジューラ)

仮想スレッドは、OSスレッドをラップしたプラットフォーム・スレッドと異なり、作成コストが低く大量に作成することが可能です。(Do not pool virtual threads

②の例で確認した通り、スレッドの実行中には、I/O待ちなど実際に処理を実行していないタイミングがあります。仮想スレッドはCPU上で計算を実行している間だけOSスレッドを消費し、プラットフォーム・スレッドにマウントして実行されます。マウントされたプラットフォーム・スレッドは仮想スレッドの「キャリア」と呼ばれます。I/O待ちが発生すると仮想スレッドはプラットフォーム・スレッドをアンマウントします。これにより解放されたプラットフォーム・スレッドは、別の仮想スレッドの「キャリア」となることができるようになります。

これら割り当てやタイミングの管理は、仮想スレッドスケジューラが行います。

このような仕組みにより、仮想スレッドは見かけ上は非同期に処理を戻すことなく従来のスレッドのようにI/O待ちを行い、実際のプラットフォーム・スレッドは効率よく待ちの発生していないスレッドの必要な処理を実行することができます。

仮想スレッドの仕組み

仮想スレッドでコードを実行するために、JDKの仮想スレッドスケジューラは、その仮想スレッドをプラットフォームスレッドにマウントし、プラットフォームスレッド上で実行されるように割り当てます。これにより、そのプラットフォームスレッドは仮想スレッドの「キャリア」となります。その後、コードの実行が一段落すると、仮想スレッドはキャリアからアンマウントされることがあります。その時点でプラットフォームスレッドは解放されるため、スケジューラは別の仮想スレッドをそこにマウントし、再びキャリアとして機能させることができます。

引用:JEP 444: Virtual Threads > Description > Executing virtual threads

仮想スレッド利用時の注意点

CPUに依存する処理には効果がない

ここまでの説明からも分かる通り、仮想スレッドとは、I/O待ちの間プラットフォーム・スレッドを効率的に活用するための技術です。従って、I/O待ちが発生しない、CPU処理に時間のかかる処理の場合、処理速度の向上は見込めません。

仮想スレッドは高速スレッドではありません。つまりプラットフォーム・スレッドよりも速くコードが実行されることはありません。速度(低レイテンシ)ではなく、スケール(高スループット)を提供するために存在します。

引用:仮想スレッドを使用する理由

仮想スレッドのPinning(固定化・ピン留め)

通常、仮想スレッドはブロッキング操作が発生するとアンマウントされますが、一部の操作では仮想スレッドがキャリアに固定され、マウントの解除ができなくなります。アプリケーションが正しく動作しなくなるわけではありませんが、アンマウントできないということはすなわちスレッドの効率的な利用を妨げることになるため、パフォーマンスの向上に影響を及ぼす可能性に繋がります。(Pinning

仮想スレッドのスケジュールおよび固定された仮想スレッドでも次の2つが挙げられています。

  • 仮想スレッドがsynchronizedブロックまたはメソッド内でコードを実行する場合(Java24で解消
  • 仮想スレッドがnativeメソッドまたは外部関数を実行する場合

Javaのスレッド処理を全て置き換えるものではない

サンプルコードからも分かる通り、コード自体はシンプルであり、かつ既存のThreadExecutorServiceを利用しているため、技術的に移行は比較的容易といえます。

Java21以降のシステムで新規にI/Oバウンドな処理を作成する際には積極的に採用を検討することができますが、こと置き換えに関しては、JEP 444 にて Non-Goals とされていることから、慎重な検討が求められます。

従来のスレッド実装を削除することや、既存のアプリケーションを仮想スレッドを使用するように密かに移行させることは、目標ではありません。Javaの基本的な並行処理モデルを変更することは、目標ではありません。

引用:JEP 444: Virtual Threads > Non-Goals

まとめ

本記事では、Javaの並行処理の変遷を追いつつ、従来のプラットフォーム・スレッドと比較することで、仮想スレッドがどのようなものかを確認しました。

並行処理課題解決策残った問題
① プラットフォーム・スレッドスレッド数に上限・メモリを消費スレッド生成コストが高い
② スレッドプールスレッド生成コストが高い事前生成と使い回しでコスト削減I/O待ち中もスレッドを占有
③ CompletableFutureI/O待ち中もスレッドを占有ノンブロッキングI/Oでスレッドを解放非同期プログラミングでコードが複雑
④ 仮想スレッドスレッドの占有・複雑なコード軽量な仮想スレッドによるマウントCPUバウンドには効果なし・Pinning

①~③は全てプラットフォーム・スレッドを使っており、その制約をいかに工夫で乗り越えるかという歴史でした。④の仮想スレッドはその制約そのものを取り除き、シンプルなコードのまま高いスループットを実現します。

仮想スレッドはJavaの並行処理を根本から変える技術です。本記事が、仮想スレッドを理解するための一助になれば幸いです。