회사에서 자바 비동기 처리 및 Future, CompletableFuture에 대해서 세미나를 진행하였다.

과제를 진행하면서, 비동기 프로그래밍 및 자바에서 비동기 프로그래밍을 어떻게 사용하는지 생각해 볼 수 있었다.

Screen Shot 2021-03-13 at 11 50 40 AM

친절하게 피드백을 해주셨기 때문에, 내가 ComputableFuture를 잘못사용하고 있다는 사실을 알게 되었다.

repository.retrieveCategories().parallelStream()
                .map(category -> CompletableFuture.supplyAsync(() -> repository.retrieveBooksByCategory(category)))
                .collect(Collectors.toList())
        .parallelStream()
        .map(CompletableFuture::join)     // (1)
        .flatMap(Collection::parallelStream)
        .collect(Collectors.toList())
        .parallelStream()
        .map(book -> CompletableFuture.runAsync(() -> repository.updateAuthor(book, author), executors))
        .collect(Collectors.toList())
        .forEach(CompletableFuture::join); // (2)
        executors.shutdown();

위의 코드는 내가 처음에 제출한 코드이다. 문제점을 보면 다음과 같다.

  • 코드에서 join을 사용한 곳이 2개정도 존재한다.

  • join은 현재 Thread를 블록킹 하여 결과를 기다리는 연산이다.

  • 따라서 ComputableFuture는 이러한 join 없이 CompletionStage라는 약속을 기반으로 Non blocking 기반의 비동기 처리를 할 수 있게 도와준다.

  • 하지만, 나는 이러한 특징을 이해하지 못하고, join을 남발하였다.

  • 따라서 join 연산은 스트림의 수집이 List와 같이 완료된 후에 개별적으로 루프를 돌면서 수행하는 것이 좋다.

  • 또한 스트림의 중간 연산 단계에서 join을 호출하면, 게으른 연산을 하는 특성으로 인해서 순차적인 blocking이 걸릴 수 있다. 따라서 join 연산은 스트림의 수집이 List와 같이 완료된 후에 개별적으로 루프를 돌면서 수행하는 것이 좋다.

  • stream으로 변환할 때, parallelStream은 조심해서 사용을 해야한다. parallelStream 작업을 멀티코어에서 병렬처리 하는 것은 일반적으로 데이터가 많을 때 유리하다. 데이터가 적으면 오히려 분할하는데 시간이 더 걸린다.

  • 따라서 이러한 최적화는 여러번 수행해보면서 반드시 측정을 기반으로 선택이 되어야한다. 그러한 경우가 아니라면 stream도 충분하다.

  • 꼭 필요한 곳이 아니라면, join 연산은 최종적으로 “동기"가 필요한 곳에서 사용하는 것이 좋다.

따라서 피드백을 받은 부분을 개선한 결과는 다음과 같다.

final List<CompletableFuture<List<Book>>> futures
            = repository.retrieveCategories().stream()
            .map(category -> supplyAsync(() -> repository.retrieveBooksByCategory(category), executors))
            .collect(toList());

        final List<CompletableFuture<Void>> updateFutures
            = futures.stream()
            .map(future -> future.thenCompose(books -> allOf(
                books.stream()
                    .map(book -> runAsync(() -> repository.updateAuthor(book, author), executors))
                    .toArray(CompletableFuture[]::new)
            )))
            .collect(toList());
        updateFutures.forEach(CompletableFuture::join);
  • ComputableFuture로 작업을 처리한 후에, 최종적으로 한꺼번에 join 을 해주고 있다.

참고 문헌

>> Home