๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Java

์™œ ๋‚ด CompletableFuture ๋Š” Console ์— ๋กœ๊ทธ๊ฐ€ ์—†๋Š”๊ฐ€? ForkJoinPool ์˜ ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋ฅผ ์•Œ์•„๋ณด์ž

CompletableFuture๋ฅผ ํ™œ์šฉํ•œ ๋น„๋™๊ธฐ์—์„œ ์ฝ˜์†”์— ๋กœ๊ทธ๊ฐ€ ๋‚จ์ง€ ์•Š๋Š” ๋ฌธ์ œ

๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด CompletableFuture ์‚ฌ์šฉ๋ฒ•์„ ์ตํžˆ๋Š” ์ค‘, ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ๋กœ์ง(Task ํด๋ž˜์Šค์˜ doAsyncWithForkJoinPool( ))์—์„œ ์ฝ˜์†”์— ๋กœ๊ทธ๊ฐ€ ๋‚จ์ง€ ์•Š๋Š” ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๊ณ ์ž ํ•ฉ๋‹ˆ๋‹ค.

 

๋ฌธ์ œ๊ฐ€ ๋˜์—ˆ๋˜ ์ฝ”๋“œ๋กœ ์‹คํ–‰ ๊ฒฐ๊ณผ๋Š” "๋ฉ”์ธ ์ข…๋ฃŒ" ๋งŒ ๋‚˜์˜ฌ ๋ฟ, Thread.currentThread().getName() ๊ด€๋ จ ๋กœ๊ทธ๋Š” ์ฝ˜์†”์— ๋‚˜์˜ค์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.

// Main ํด๋ž˜์Šค
public static void main(String[] args) {
        
   Task task = new Task();
   CompletableFuture<Void> future = task.doAsyncWithForkJoinPool().thenAccept(s -> {
       System.out.println("[main] Thread.currentThread().getName() = "
                            + Thread.currentThread().getName());
       System.out.println(s);
    });
    System.out.println("๋ฉ”์ธ ์ข…๋ฃŒ");
}
    
// Task ํด๋ž˜์Šค
public CompletableFuture<String> doAsyncWithForkJoinPool() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("[doAsync] Thread.currentThread().getName() = "
                            + Thread.currentThread().getName());
        return "doAsync";
    });
}

 

CompletableFuture๋ž€?

CompletableFuture๋Š” Java 8๋ถ€ํ„ฐ ์ œ๊ณต๋œ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์œ„ํ•œ ๊ฐ•๋ ฅํ•œ ๋„๊ตฌ์ž…๋‹ˆ๋‹ค.
์ด๋ฅผ ํ†ตํ•ด ๋น„๋™๊ธฐ ์ž‘์—…์˜ ์ƒํƒœ์™€ ๊ฒฐ๊ณผ๋ฅผ ์‰ฝ๊ฒŒ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค:

  • ์ฝœ๋ฐฑ ์ถ”๊ฐ€: ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„ ์‹คํ–‰ํ•  ์ž‘์—…์„ ๋“ฑ๋ก.
  • ์ž‘์—… ์กฐํ•ฉ: ์—ฌ๋Ÿฌ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์กฐํ•ฉํ•ด ์ƒˆ๋กœ์šด ์ž‘์—…์„ ์ƒ์„ฑ.
  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ: ๋น„๋™๊ธฐ ์ž‘์—… ์ค‘ ๋ฐœ์ƒํ•œ ์˜ˆ์™ธ๋ฅผ ๊ฐ„ํŽธํ•˜๊ฒŒ ์ฒ˜๋ฆฌ.

๊ธฐ๋ณธ์ ์œผ๋กœ CompletableFuture๋ฅผ ํ†ตํ•ด ๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋ฉด ForkJoinPool.commonPool()์˜ ์›Œ์ปค ์Šค๋ ˆ๋“œ๊ฐ€ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ , ์ด ForkJoinPool ์ด ๋ฌธ์ œ์˜ ์›์ธ์ด์—ˆ์Šต๋‹ˆ๋‹ค.

 

ForkJoinPool์˜ ์“ฐ๋ ˆ๋“œ ์ƒ์„ฑ ์›๋ฆฌ

ForkJoinPool์˜ ์›Œ์ปค ์Šค๋ ˆ๋“œ๋Š” ๋ชจ๋‘ ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋กœ ์ดˆ๊ธฐํ™”๋œ๋‹ค๊ณ  ๋ช…์‹œ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

All worker threads are initialized with Thread.isDaemon set true.

 

๋˜ํ•œ, ForkJoinPool.commonPool()์˜ Javadoc์˜ ๋ฐ‘์ค„์„ ํ•ด์„ํ•˜์ž๋ฉด ๋น„๋™๊ธฐ ์ž‘์—…์ด ํ”„๋กœ๊ทธ๋žจ ์ข…๋ฃŒ ์ „์— ๋ฐ˜๋“œ์‹œ ์™„๋ฃŒ๋˜์–ด์•ผ ํ•˜๋Š” ํ”„๋กœ๊ทธ๋žจ์—์„œ๋Š” commonPool().awaitQuiescence๋ฅผ ํ˜ธ์ถœํ•ด์•ผ ํ•œ๋‹ค๊ณ  ๋‚˜์™€์žˆ์Šต๋‹ˆ๋‹ค.

Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown or shutdownNow. However this pool and any ongoing processing are automatically terminated upon program System.exit. Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

Returns: the common pool instance Since: 1.8


๋˜ํ•œ, ForkJoinPool ์€ ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ(parallel())์„ ์‚ฌ์šฉํ•  ๋•Œ ์ฃผ๋กœ ๋‚˜์˜ค๋Š” ์šฉ์–ด์ž…๋‹ˆ๋‹ค. ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•  ๋•Œ, ForkJoinPool() ์˜ ์“ฐ๋ ˆ๋“œ ์ˆ˜๋Š” Runtime.availableProcessors ์˜ ๋ฐ˜ํ™˜๊ฐ’์œผ๋กœ ํ’€์— ์‚ฌ์šฉํ•  ์Šค๋ ˆ๋“œ ์ˆ˜๋ฅผ ๊ฒฐ์ •ํ•ฉ๋‹ˆ๋‹ค. (์ถœ์ฒ˜: ๋ชจ๋˜ ์ž๋ฐ” ์ธ ์•ก์…˜ ไธญ). ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ์— ๊ด€ํ•ด์„œ๋Š” ์ด๋ฒˆ ๋‚ด์šฉ๊ณผ ๋ฌด๊ด€ํ•ด์„œ ์ด๋ฒˆ ๊ธ€์—์„œ๋Š” ์„ค๋ช…ํ•˜์ง€ ์•Š๊ฒ ์Šต๋‹ˆ๋‹ค.

 

๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ ์ค‘์š”ํ•œ๊ฐ€?

JVM์€ ๋ชจ๋“  ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋˜์—ˆ์„ ๋•Œ ์ข…๋ฃŒ๋ฉ๋‹ˆ๋‹ค.
์ฆ‰, ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋Š” ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด ์ž‘์—… ์ค‘์ด๋ผ๋„ JVM๊ณผ ํ•จ๊ป˜ ๊ฐ•์ œ๋กœ ์ข…๋ฃŒ๋ฉ๋‹ˆ๋‹ค.

Main ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ, JVM์ด ์ข…๋ฃŒ๋˜๋Š” ๊ฒƒ์ด ์•„๋‹™๋‹ˆ๋‹ค.(์ถœ์ฒ˜ - ๊น€์˜ํ•œ๋‹˜์˜ ๊ณ ๊ธ‰ ์ž๋ฐ” 1ํŽธ ไธญ)

๋”ฐ๋ผ์„œ, ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌ ์ค‘์ธ ์ž‘์—…์€ JVM์ด ์ข…๋ฃŒ๋˜๋ฉด์„œ ์ž‘์—… ์†์‹ค์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์˜ˆ์ œ ์ฝ”๋“œ: ForkJoinPool ์™€ ExecutorService ๋น„๊ต

์•„๋ž˜ ์ฝ”๋“œ๋Š” CompletableFuture๋ฅผ ํ™œ์šฉํ•ด ๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค.
๋น„๋™๊ธฐ ์ž‘์—…์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ForkJoinPool.commonPool()์˜ ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค.

 

์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ์—์„œ CompletableFuture ๋ฅผ ํ™œ์šฉํ•ด doAsyncWithForkJoinPool ๋ฉ”์„œ๋“œ๋ฅผ ๋น„๋™๊ธฐ๋กœ ํ˜ธ์ถœํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๋•Œ, ForkJoinPool.commonPool() ์˜ ๋ฐ๋ชฌ์“ฐ๋ ˆ๋“œ๊ฐ€ doAsyncWithForkJoinPool ๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

// Main ํด๋ž˜์Šค
public static void main(String[] args) {
        
   Task task = new Task();
   CompletableFuture<Void> future = task.doAsyncWithForkJoinPool().thenAccept(s -> {
       System.out.println("[main] Thread.currentThread().getName() = "
                            + Thread.currentThread().getName());
       System.out.println(s);
    });
    System.out.println("๋ฉ”์ธ ์ข…๋ฃŒ");
}
    
// Task ํด๋ž˜์Šค
public CompletableFuture<String> doAsyncWithForkJoinPool() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("[doAsync] Thread.currentThread().getName() = "
                            + Thread.currentThread().getName());
        return "doAsync";
    });
}
๊ฒฐ๊ณผ๋Š” '๋ฉ”์ธ ์ข…๋ฃŒ' ๊ฐ€ ๋‚˜์™”์Šต๋‹ˆ๋‹ค.

 

๋น„๋™๊ธฐ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ์—์„œ์˜ ๋กœ๊ทธ๊ฐ€ ์ถœ๋ ฅ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ด๋Š” ๋ฉ”์ธ ์Šค๋ ˆ๋“œ๊ฐ€ ๋น ๋ฅด๊ฒŒ ์ข…๋ฃŒ๋˜๋ฉด์„œ JVM๋„ ํ•จ๊ป˜ ์ข…๋ฃŒ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค.

 

ํ•ด๊ฒฐ ๋ฐฉ๋ฒ• 1: ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ ์‚ฌ์šฉ

๋ฐ๋ชฌ์“ฐ๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ForkJoinPool ๋Œ€์‹  ์‚ฌ์šฉ์ž์“ฐ๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ExecutorService๋กœ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋ฉด, ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ JVM์ด ์ข…๋ฃŒ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

// Main ํด๋ž˜์Šค
public static void main(String[] args) {
        
   Task task = new Task();
   CompletableFuture<Void> future = task.doAsyncWithExecutorService().thenAccept(s -> {
       System.out.println("[main] Thread.currentThread().getName() = " 
                           + Thread.currentThread().getName());
       System.out.println(s);
    });
    System.out.println("๋ฉ”์ธ ์ข…๋ฃŒ");
}

// Task ํด๋ž˜์Šค
public CompletableFuture<String> doAsyncWithExecutorService() {
    ExecutorService es = Executors.newCachedThreadPool();

    return CompletableFuture.supplyAsync(() -> {
        System.out.println("[doAsync] Thread.currentThread().getName() = " 
                            + Thread.currentThread().getName());
        return "doAsync";
    }, es);
}
๊ฒฐ๊ณผ๋Š” ์ด 4์ค„์˜ ๋กœ๊ทธ๊ฐ€ ๋‚จ์•˜์Šต๋‹ˆ๋‹ค.

๋ฉ”์ธ ์ข…๋ฃŒ
[doAsync] Thread.currentThread().getName() = pool-2-thread-1
[main] Thread.currentThread().getName() = pool-2-thread-1
doAsync

 

ํ•ด๊ฒฐ ๋ฐฉ๋ฒ• 2: CompletableFuture ์˜ join() / get() ํ˜ธ์ถœ

join() / get() ์€ ๋น„๋™๊ธฐ์˜ ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ ๊นŒ์ง€ ์ฝœ๋Ÿฌ ์“ฐ๋ ˆ๋“œ๋ฅผ Blocking ํ•ฉ๋‹ˆ๋‹ค. ์ด ๋•๋ถ„์— ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„์— ์ฝœ๋Ÿฌ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ข…๋ฃŒ์‹œํ‚ต๋‹ˆ๋‹ค.

[doAsync] Thread.currentThread().getName() = ForkJoinPool.commonPool-worker-1
[main] Thread.currentThread().getName() = ForkJoinPool.commonPool-worker-1
doAsync
๋ฉ”์ธ ์ข…๋ฃŒ

 

์ฆ‰, ExecutorService๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ๊ฐ€ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋ฉฐ, JVM์€ ๋ชจ๋“  ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ๊ฐ€ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ๋•Œ๊นŒ์ง€ ์ข…๋ฃŒํ•˜์ง€ ์•Š๊ณ  ๋Œ€๊ธฐํ•ฉ๋‹ˆ๋‹ค. ์ด๋กœ ์ธํ•ด ๋น„๋™๊ธฐ ์ž‘์—…์˜ ๋ชจ๋“  ๋กœ๊ทธ๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ถœ๋ ฅ๋ฉ๋‹ˆ๋‹ค.

 

๊ฒฐ๋ก  ๋ฐ ๊ฐœ์ธ์ ์ธ ์˜๊ฒฌ

์—ฌ๊ธฐ์„œ๋ถ€ํ„ฐ๋Š” ์ด ๊ฒฝํ—˜์„ ํ†ตํ•ด ์ƒ๊ฐํ•˜๊ฒŒ ๋œ ์ œ ๊ฐœ์ธ์ ์ธ ์˜๊ฒฌ์ž…๋‹ˆ๋‹ค.

๋น„๋™๊ธฐ๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์ฃผ๋กœ I/O ์ž‘์—…๊ณผ ๊ฐ™์ด ์‹œ๊ฐ„์ด ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
์ด๋Ÿฌํ•œ ์ž‘์—…์„ ForkJoinPool.commonPool()์„ ์‚ฌ์šฉํ•˜๋Š” CompletableFuture๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค๋ฉด, JVM์ด ์ •์ƒ์ ์œผ๋กœ ์ข…๋ฃŒ๋  ๋•Œ ์ž‘์—…์ด ์™„๋ฃŒ๋˜์ง€ ์•Š์•˜์Œ์—๋„ JVM์ด ์ข…๋ฃŒ๋˜๋ฉด์„œ ์ž‘์—… ์†์‹ค์ด ๋ฐœ์ƒํ•  ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฌผ๋ก  join() ์ด๋‚˜ get() ์œผ๋กœ ํ•ด๊ฒฐํ•  ์ˆœ ์žˆ์ง€๋งŒ ๋น„๋™๊ธฐ ์ž‘์—…์— Blocking ์€ ์–ด์šธ๋ฆฌ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.(ํ•„์š”ํ•  ๋•Œ๋„ ์žˆ๊ฒ ์ง€๋งŒ์š”)

 

์Šค๋ ˆ๋“œ ํ’€์„ ํ•™์Šตํ•˜๋‹ค ๋ณด๋ฉด ์šฐ์•„ํ•œ ์ข…๋ฃŒ(Graceful Shutdown)๋ผ๋Š” ๊ฐœ๋…์„ ์ ‘ํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
์šฐ์•„ํ•œ ์ข…๋ฃŒ๋ž€, ํ์— ๋‚จ์•„ ์žˆ๋Š” ์ž‘์—…์„ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•œ ํ›„ ์ข…๋ฃŒํ•˜์—ฌ ์ž‘์—… ์†์‹ค์„ ๋ฐฉ์ง€ํ•˜๋Š” ๋ฐฉ์‹์ž…๋‹ˆ๋‹ค.
์ด๋Š” ์ž‘์—…์˜ ์‹ ๋ขฐ์„ฑ์„ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•œ ํ•„์ˆ˜์ ์ธ ์Šต๊ด€์œผ๋กœ, ๋ชจ๋˜ ์ž๋ฐ” ์ธ ์•ก์…˜์—์„œ๋„ ์ด๋Ÿฌํ•œ ์ค‘์š”์„ฑ์„ ๊ฐ•์กฐํ•˜๊ณ  ์žˆ์—ˆ๋Š”๋ฐ์š”.

 

์ด์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ, CompletableFuture ์ž‘์—…๋„ ์šฐ์•„ํ•œ ์ข…๋ฃŒ์ฒ˜๋Ÿผ ์ž‘์—… ์†์‹ค์ด ์—†์–ด์•ผ ํ•˜์ง€ ์•Š์„๊นŒ? ๋ผ๊ณ  ์ƒ๊ฐํ•ด๋ด…๋‹ˆ๋‹ค.
๋”ฐ๋ผ์„œ, ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋ฅผ ์›Œ์ปค ์Šค๋ ˆ๋“œ๋กœ ์‚ฌ์šฉํ•˜๋Š” ForkJoinPool ๋Œ€์‹ , ExecutorService ๊ธฐ๋ฐ˜์˜ ์‚ฌ์šฉ์ž ์Šค๋ ˆ๋“œ ํ’€์„ ์‚ฌ์šฉํ•ด์„œ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ข…๋ฃŒ ๋  ์ˆ˜ ์žˆ๊ฒŒ ํ•˜๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค๋Š” ๊ฒฐ๋ก ์„ ๋ง์”€๋“œ๋ฆฌ๋ฉฐ ๊ธ€์„ ๋งˆ๋ฌด๋ฆฌ ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. 

 

 

์ฐธ๊ณ 

- ์Šคํƒ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ: Why is CompletableFuture.supplyAsync succeeding a random number of times?