리액터 타입


  • 리액티브 스트림은 수요 조절에 기반하고 있다. 프로젝트 리액터는 핵심 타입인 Flux<T>를 사용해서 수요를 조절한다.

  • 레스토랑에서 일하는 서빙 점원에 비유를 하자면, 주방에서 요리가 완성되면, 점원이 주방에서 요리를 받아서 손님에게 가져다주고, 다시 제자리로 돌아와서 다음 요리를 기다린다.

class KitchenService {
    Flux<Dish> getDishes() {
        return Flux.just(
            new Dish("Sesame chicken:),
            new Dish("Lo mein noodles, planin"),
            new Dish("Sweet & sour beef"));
        )
    }
}
  • 서빙 점원은 손님에게 가져다줄 Dish 객체를 달라고 요청할 수 있고, 모두 요리가 완료된 다음에 받을 수 도 있지만, Flux<Dish> 객체로 바로 받을 수 있다. Flux<Dish> 안에 포함된 요리는 아직 완성되지 않았지만, 머지 않아서 완성될 것이다. 하지만 정확히 언제 완성될지는 모른다.

  • 하지만 요리가 완성되기 전에 논블로킹이기 때문에, 점원은 다른 일을 못 한 채 계속 기다리지 않는다. 결과가 아직 정해지지 않았고 미래 시점에 알 수 있는 점에서 Future와 비슷하지만, Future는 이미 시작되었음으로 나타내는 반면에, Flux는 시작할 수 있음을 나타낸다.

Future는 제공하지 않지만, Flux 에서는 제공해주는 특징


- 하나 이상의 `Dish` 포함 가능
- 각 `Dish`가 제공될 때 어떤 일이 발생하는지 지정 가능
- 성공과 실패의 두 가지 경로 모두에 대한 처리 방향 정의 가능
- 결과 폴링 불필요
- 함수형 프로그래밍 지원
  • Future는 하나의 값을 정확하게 제공하는 것이 목적이었고, Flux는 다수의 값을 지원하는 것이 목적이다.
class SimpleServer {
    private final KitchenService kitchen;

    SimpleServer(KitchenService kitchen) {
        this.kitchen = kitchen;
    }

    Flux<Dish> doingMyJob() {
        return this.kitchen.getDishes()
            .map(dish -> Dish.deliver(dish));
    }
}
  • doingMyJob() 함수는 레스토랑 매니저가 서빙 점원을 툭, 치면 주방에 가서 요리를 받아오는 임무를 수행하는 것이다.

  • 주방에 요리를 요청한 후에는 요리 완성 후 해야할 일을 map() 함수를 호출해서 지정한다. 예제에서는 deliver(dish)를 호출해서 요리를 손님에게 가져다주는 일을 지정했다.

  • deliver(dish) 는 요리의 delivered 상태를 true로 설정한다.

  • 예제 코드는 단순한 형태의 리액티브 컨슈머이다. 리액티브 컨슈머는 다른 리액티브 서비스를 호출하고 결과를 반환한다.

  • map() 함수는 인자로 받은 맵핑 함수를 Flux에 담겨있는 각 요리에 적용해서 변환하고 Flux에 담아 반환하므로, 맵핑 함수는 무언가를 반드시 반환해야 한다. 그래서 맵핑 함수의 내부에서 호출되는 deliver() 함수는 void를 반환할 수 없고 dish를 반환한다.

  • 프로젝트 리액터는 풍부한 프로그래밍 모델을 제공한다. 함수형 프로그래밍에서 수행하는 변환 뿐만 아니라, onNext(), onError(), onComplete() 시그널처럼 Future 객체에는 없는 리액티브 스트림 수명 주기에 연결 지을 수 있다.

class PoliteServer {
    private final KitchenService kitchen;

    public PoliteServer(KitchenService kitchen) {
        this.kitchen = kitchen;
    }

    Flux<Dish> dongMyJob() {
        return this.kitchen.getDishes()
            .doOnNext(dish -> System.out.println("thank you for " + dish + "!"))
            .doOnError(error -> System.out.println("so sorry about" + error.getMessage()))
            .doOnComplete(() -> System.out.println("thanks for all your hard work!"))
            .map(Dish::deliver);
    }
}
  • doOnNext()를 사용해서 리액티브 스트림의 onNext() 시그널을 받으면 kitchen 에게 감사합니다라는 말을 하는 기능이 추가되었다.

  • doOnError() 를 사용해서 onError() 시그널을 받으면 처리해야 할 일을 지정해준다.

  • doOnComplete()를 사용해서 주방에서 모든 요리가 완료되었음을 의미하는 onComplete() 시그널을 받으면 처리해야 할 일을 지정해준다.

  • doOnNext(), doOnError(), doOnComplete() 메서드는 필요한 만큼 사용할 수 있다. 해당 메서드들은 리액티브 스트림의 시그널이라는 점을 잊지 말고, 이 시그널을 개발자가 작성하는 코드에서 직접 사용해야하는 것은 아니지만, 알아두면 더 다양한 기능을 더 손쉽게 구현할 수 있다.

  • 하지만 리액터 기반 애플리케이션에서는 실제 구독하기 전까지는 아무런 일도 일어나지 않기 때문에 main() 메서드 안에서 subscribe() 가 호출되어야 비로소 그때부터 뭔가가 동작하기 시작한다. 프로젝터 리액터는 태생적으로 lazy 방식이다. 누군가가 구독하고 결과를 당겨가기 전까지 아무런 일도 일어나지 않는다.

Mono


  • Mono는 0 또는 1개의 원소만 담을 수 있는 리액티브 발행자로서, 프로젝트 리액터에서 제공해주는 구현체이다.

  • 프로젝트 리액터 개발 초기에 Mono의 필요성에 대한 고민 끝에 하나의 원소만 비동기적으로 반환하는 경우가 압도적으로 많음을 깨닫고 Mono를 추가하기로 했다. Mono는 함수형 프로그래밍을 무기로 무장한 Future라고 생각하면 된다.

참고 문헌


>> Home