-
이전장을 통해서 내부적으로 애플리케이션의 핵심은 메시지 처리기가 되었는데, 이러한 구성에 따라서 메시지 처리기가 외부로도 메시지를 처리하도록 변경해보겠다.
-
애플리케이션은 외부에서 들어오는 이벤트를 외부 메시지 버스를 통해서 받고 (레디스 발행 / 구독 대기열을 예제로 사용한다) 출력을 이벤트 형태로 외부 메시지 버스에 발행한다.
분산된 진흙공, 명사로 생각하기
- 마이크로서비스 아키텍처를 구축하는 엔지니어와 정기적으로 이야기를 할 때 기존 애플리케이션을 마이그레이션 하는 경우가 자주 있는데, 마이그레이션을 할 때 엔지니어들이 본능적으로 하는 첫 번째 일을 시스템을 명사로 나누는 것이다.
분산 시스템에서 오류 처리하기
-
‘모든 것은 망가진다’는 소프트웨어 엔지니어링에서 일반적인 규칙이다. 어떤 요청이 실패하면 시스템에 어떤 일이 생길까?
-
두 가지를 함께 바꿔야 하는 경우를 서로 결합됐다고 말한다. 앞 단락에서 본 연쇄적인 실패를 시간적 결합이라고 부른다.
-
시스템의 모든 부분이 동시에 제대로 작동할 때만 정상적으로 작동하는 경우를 시간적 결합이라고 한다. 시스템이 커지면 시스템 부품 중 일부의 성능이 나빠질 확률이 지수적으로 높아진다.
대안 : 비동기 메시징을 사용한 시간적 결합
-
어떻게 하면 적절한 결합을 얻을수 있을까? 이미 해답의 일부, 즉 명사가 아니라 동사로 생각해야 한다는 점에서 살펴봤다.
-
도메인 모델은 비즈니스 프로세스를 모델링하기 위함이다. 도메인 모델은 어떤 물건에 대한 정적인 데이터 모델이 아닌 동사에 대한 모델이다.
-
따라서 주문에 대한 시스템과 배치에 대한 시스템을 생각하는 대신에 주문 행위에 대한 시스템과 할당 행위에 대한 시스템등을 생각한다.
-
이런 방식으로 사물을 구분하면 어떤 시스템이 어떤일을 하는지에 대해서 생각하기가 쉽다. 주문 행위에 대해 생각해보면 우리가 정말 원한느 일은 주문을 넣었을 떄 주문이 들어가는 것이다. 다른 모든 일은 언젠가 발생한다는 것만 보장할 수 있다면 나중에 발생할 수 있다.
-
애그리게이트와 비슷하게 마이크로서비스도 일관성 경계여야 한다. 두 서비스 사이에는 최종 일관성을 받아들일 수 있고, 이는 동기화된 호출에 의존하지 않아도 된다는 뜻이다.
-
각 서비스는 외부 세계에서 커맨드를 받고 결과를 저장하기 위해 이벤트를 발생시킨다. 이런 이벤트를 리슨하는 다른 서비스는 워크 플로우의 다른 단계를 촉발한다.
-
분산 진흙공 안티 패턴을 방지하기 위해 시간적으로 결합된 HTTP API를 호출하는 대신에 비동기 메시지로 시스템을 통합한다.
이러한 구조가 더 좋은 이유
-
각 부분이 서로 독립적으로 실패할 수 있어서 잘못된 동작이 발생했을 때 처리하기가 더 쉽다. 할당 시스템이 좋지 않은 날이라도 여전히 주문을 받을 수 있다.
-
둘째, 시스템 사이의 결합 강도를 감소시킬 수 있다. 처리 과정을 이루는 연산 순서를 바꾸거나 새로운 단계를 도입하고 싶을 떄 이를 지역적으로 할 수 있다.
레디스 발생 / 구독 채널을 통합에 사용하기
-
이 모든것이 어떻게 구체적으로 작동하는지 살펴보자. 이벤트를 시스템 밖으로 보내고 다른 시스템 안으로 넣는 서비스를 위한 메시지 버스 같은 것이 필요하다. 이런 인프라를 종종 메시지 브로커 라고부른다.
-
메시지 브로커의 역할은 발행자로부터 메시지를 받아서 구독자에게 배달하는 것이다.
레디스는 메시지 버스를 감싸는 다른 얇은 어댑터
- 우리가 사용하는 레디스 발행/구독 리스너 (또는 이벤트 소비자)는 플라스크와 아주 비슷하다. 이벤트 리스너는 외부 세계를 변환해서 이벤트로 만든다.
r = redis.Redis(**config.get_redis_host_and_port())
def main():
orm.start_mappers()
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe('change_batch_quantity')
for m in pubsub.listen():
handle_change_batch_quantity(m)
def handle_change_batch_quantity(m):
logging.debug('handling %s', m)
data = json.loads(m['data']) # 시스템 진입점에서 해야할 일은 JSON을 역직렬화 하고 역직렬화한 객체를 Command로 변환해서 서비스 계층으로 넘기는 일이다.
cmd = commands.ChnageBatchQuantity(ref=data['batchref'], qty=data['qty'])
messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
-
반대 방향의 변환 (도메인 이벤트를 공개 이벤트로 변환)을 처리하는 다운 스트림 어댑터를 만든다.
-
메시지 발행은 아래 코드와 같이 발행한다.
r = redis.Redis(**config.get_redis_host_and_port())
def publish(channel, event: events.Event): # 여기서는 하드 코딩한 채널을 사용하지만, 이벤트 클래스 / 이름과 적절한 채널을 맵핑하는 정보를 저장할 수도 있다. 이렇게 하면 메시지 유형 중 일부에 대해 다른 채널을 사용할 수도 있다.
logging.debug('publishing: channel=%s, event=%s', channel, event)
r.publish(channel, json.dumps(asdict(event)))
외부로 나가는 새 이벤트
@dataclass
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: str
- 이 이벤트는 주문 라인 상세 정보, 어떤 배치에 주문 라인이 할당됐는지 등 할당에 대해 알아야할 필요가 있는 모든 내용을 저장한다.
내부 이벤트와 외부 이벤트 비교
-
내부와 외부 이벤트의 구분을 명확히 하면 좋다. 일부 이벤트는 밖에서 들어오지만 일부 이벤트는 승격되면서 외부에 이벤트를 발행할 수 있다.
-
하지만 모든 이벤트가 다 외부에 이벤트를 내보내지는 않는다. 이벤트 소싱을 사용할 경우에는 이런 특징이 특히 중요하다.
정리
-
이벤트는 외부에서 들어올 수도 내부에서 외부로 발행할 수도 있다. 우리가 만든
publish
핸들러는 이벤트를 레디스 메시지 채널의 메시지로 변환한다. -
우리는 이벤트를 통해서 외부 세계와 이야기를 하며, 이런 종류의 시간적인 결합을 사용하면 애플리케이션 통합 시 상당한 유연성을 얻을 수 있다.
-
하지만 여러 가지 이벤트 통지에 대해 실행되는 논리적인 흐름이 존재한다면 프로그램 본문에서는 이러한 흐름이 명시적이지 않기 때문에 이러한 흐름을 알아보기 어렵다. 따라서 이벤트 통지를 사용한 시스템의 디버깅이나 변경이 어려워 질 수 있다.
참고 문헌
>> Home