커맨드와 이벤트
-
이벤트와 마찬가지로 커맨드도 이벤트의 일종이다. 시스템의 한 부분에서 다른 부분으로 전달되는 명령이 바로 커맨드이다.
-
보통은 커맨드를 아무 메서드도 들어있지 않은 데이터구조로 표현하고 이벤트와 거의 같은 방식으로 처리한다. 하지만 커맨드와 이벤트의 차이는 중요하다.
-
커맨드는 한 행위자로부터 다른 구체적인 행위자에게 전달된다. 보내는 행위자는 받는 행위자가 커맨드를 받고 구체적인 작업을 수행하길 바란다.
-
API 핸들러에 폼을 전달하는 행위는 커맨드를 전달하는 행위와 같다. 커맨드는 의도를 잡아내며, 어떤 일을 수행하기 바라는 의도를 드러낸다. 그 결과, 커맨드를 보내는 행위자는 커맨드 수신자가 커맨드 처리에 실패했을 때 오류 정보를 돌려 받기를 원한다.
-
이벤트는 행위자가 관심있는 모든 리스너에게 보내는 메시지이다. 이벤트를 발행하더라도 이벤트를 발행해도 발행하는 행위자는 누가 이 이벤트를 받는지에 대해서 모른다. 이벤트를 보내는 쪽은 받는 쪽의 성공이나 실패에 관심이 없다.
이벤트와 커맨드의 비교
-
이벤트의 이름은 과거형이고 오류 처리를 (송신하는 쪽과) 독립적으로 실패한다. 그리고 받는 주체는 모든 리스너에서 이벤트를 처리한다.
-
커맨드는 (송신하는 쪽에 오류를 돌려주면서) 시끄럽게 실패하고 정해진 수신자가 있다.
class Command:
pass
@dataclass
class Allocate(Command):
orderid: str
sku: str
qty: int
@dataclass
class CreateBatch(Command):
ref: str
sku: str
qty: int
eta: Optional[date] = None
@dataclass
class ChangeBatchQuantity(Command):
ref: str
qty: int
예외 방식의 차이점
- 이름과 동사를 변경하는 것은 아무런 문제가 없고 변경하더라도 시스템의 동작이 바뀌지는 않는다. 이제 이벤트와 커맨드를 비슷하게 처리하지만 완전히 똑같이 처리하지는 않을 것이다.
def handle(
message: Message,
uow: unit_of_work.AbstractUnitOfWork,
):
results = []
queue = [message]
while queue:
message = queue.pop(0)
if isinstance(message, events.Event):
handle_event(message, queue, uow)
elif isinstance(message, commands.Command):
cmd_result = handle_command(message, queue, uow)
results.append(cmd_result)
else:
raise Exception(f"{message} was not an Event or Command")
return results
def handle_event(
event: events.Event,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
for handler in EVENT_HANDLERS[type(event)]: # 한 이벤트를 여러 핸들러가 처리하도록 위임할 수 있는 디스패처로 이벤트가 처리된다.
try:
logger.debug('handling event %s with handler %s', event, handler)
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.exception('Exception handling event %s',
event) # 오류가 발생하면 오류를 찾아서 로그에 남기지만, 오류가 메시지 처리를 방해하지는 못하게 한다.
continue
def handle_command(
command: commands.Command,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
logger.debug('handling command %s', command)
try:
handler = COMMAND_HANDLERS[type(command)] # 커맨드 디스패처는 커맨드 한 개에 핸들러 한 개만 허용한다.
result = handler(command, uow=uow)
queue.extend(uow.collect_new_events())
return result
except Exception:
logger.exception('Exception handling command %s', command)
raise # 오류가 발생하면 빠르게 실패하면서 오류를 위로 전파한다.
EVENT_HANDLERS = {
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
COMMAND_HANDLERS = {
commands.Allocate: handlers.allocate,
commands.CreateBatch: handlers.add_batch,
commands.ChnageBatchQuantity: handlers.change_batch_quantity,
} # type: Dict[Type[commands.Command], Coallable]
논의: 이벤트, 커맨드, 오류 처리
-
사용자가 시스템이 어떤 일을 하기 원한다면 이 요청을 커맨드로 표현한다. 커맨드는 한 애그리게이트를 변경해야 하고, 전체적으로 성공하거나 전체적으로 실패햐아 한다.
-
시스템이 수행하는 다른 북키핑, 정리, 통지는 이벤트를 통해서 발생한다. 커맨드가 성공하기 위해 이벤트 핸들러가 성공할 필요는 없다.
-
예를 들어서 비싼 명품을 파는 전자상거래 사이트에서 주문 이력이 3번 이상인 고객에는 VIP 표시 및 이메일을 전송하는 요구가 있다고 가정을 하자.
주문 이력이 2개 있는 고객이 3번째 주문을 넣을 때 이 고객을 VIP로 설정한다. 처음 VIP 로 변경된 고객에게는 축하 이메일을 보낸다.
class History:
def __init__(self, customer_id: int):
self.orders = set()
self.customer_id = customer_id
def record_order(self, order_id, str, order_amount: int):
entry = HistoryEntry(order_id, order_amount)
if entry in self.sorders:
return
self.orders.add(entry)
if len(self.orders) == 3:
self.events.append(
CustomerBecameVIP(self.customer_id)
)
def create_order_from_basket(uow, cmd: CreateOrder):
with uow:
order = Order.from_basket(cmd.customer_id, cmd.basket_items)
uow.orders.add(order)
uow.commit()
def update_customer_history(uow, event: OrderCreated):
with uow:
history = uow.order_history.get(event.customer_id)
history.record_order(event.order_id, event.order_amount)
uow.commit()
def congratulate_vip_customer(uow, event: CustomerBecameVIP):
with uow:
customer = uow.customer.get(event.customer_id)
email.send(
customer.email_address,
f'Congratulations {customer.first_name}'
)
-
이 코드를 보면 이벤트 기반 시스템에서 오류를 어떻게 처리하는지에 대한 직관을 얻을 수 있다.
-
현재 구현에서 애그리게이트는 상태를 데이터베이스에 영속화한 다음에 이벤트를 발생시킨다.
-
만약 각 시스템이 독립적이지 않다면, 이메일 서버가 과부하 상태이며, 다른 곳에 버그가 있으면 결제가 처리되지 않는다.
-
이러한 관심사를 분리하면 실패할 수 있는 요소들이 서로 격리되어 실패할 수 있으며 이렇게 하면 시스템 전체의 신뢰성이 높아진다. 이 코드에서 성공해야하는 부분은 주문을 만드는 커맨드 핸들러 뿐이고 비즈니스 관계자들이 중요하게 여기는 부분이다.
동기적으로 오류 복구 하기
-
이벤트가 이벤트를 발생시킨 커맨드와 독립적으로 실패해도 된다는 사실을 이해했다면, 불가피하게 오류가 발생한 경우에 오류를 복구 시킬 수 있다고 확신하려면 어떻게 해야하는가?
-
제일 먼저 알아야하는 것은
언제
오류가 일어났는지 확인하는 것이다. 보통은 오류가 일어난 시점을 찾을 때 로그를 사용한다. -
시스템에서 메시지를 처리할 때 가장 먼저 하는 일은 무슨 일을 할지에 대한 로그를 기록하는 것이다.
-
오류가 발생하면 로그에 저장된 데이터를 사용해 문제를 단위 테스트로 재현하거나 시스템에서 메시지를 다시 실행할 수 있다.
-
이벤트를 다시 처리하기 전에 버그를 수정해야 한다면 수동 재실행이 잘 작동한다. 하지만 시스템은 언제나 백그라운드에서 일시적인 실패가 일정 수준 존재할 것이다. 이런 실패에는 네트워크의 일시적인 문제, 데이터베이스의 교착 상태, 배치로 인해서 잠깐 발생하는 서비스 중단 등이 있다.
-
이런 경우에는 재시도를 통해서 시스템 상태를 우아하게 복구 할 수 있다. 처음에는 성공하지 못했지만, 지수적으로 증가되는 백오프 시간 이후에 연산을 재시도 한다는 뜻이다.
def handle_event(
event: events.Event,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
for handler in EVENT_HANDLERS[type(event)]: # 한 이벤트를 여러 핸들러가 처리하도록 위임할 수 있는 디스패처로 이벤트가 처리된다.
try:
for attempt in Retrying(
stop=stop_after_attemp(3),
wait=wait_exponential()
):
with attempt:
logger.debug("handling event %s with hanler %s", event, handler)
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.error(
'Failed to handle event %s times, giving up', retry_failure.last_attempt.attempt_number
)
continue
-
실패할 수도 있는 연산을 재시도 하는 것은 아마도 시스템의 회복 탄력성을 향상시키는 최선의 방식일 것이다.
-
여기서도 작업 단위와 명령 핸들러 패턴이 각 재시도가 일관성 있는 시스템 상태를 보장하고, 시스템이 작업이 반쯤 끝난 상태로 남지 않게 해준다.
커맨드와 이벤트 분리의 트레이드 오프
-
장점은 커맨드와 이벤트를 다른 방식으로 처리하면 어떤 부분이 꼭 성공해야 하는지, 어떤 부분은 나중에 정리해도 되는지를 구분하는데 도움이 된다.
-
CreateBatch
는BatchCreated
라는 이름 보다 분명히 덜 혼란스러운 이름이다. 사용자에게 의도를 명시적으로 보여줄 수 있다. 명시성은 암시성보다 더 낫다. -
단점은 커맨드와 이벤트 사이의 의미적인 차이가 미묘하다. 이로인해서 둘 사이의 차이에 대해 고민하는데 시간을 낭비할 수 있다.
-
실패를 명확히 구분한다. 때로는 프로그램이 깨질 수 있다는 사실을 알고, 실패를 더 작게 만들고 격리시키는 방식으로 처리하기로 결정한다. 이로 인해서 시스템에 대해서 추론하기가 더 어려워질 수 있고 모니터링을 더 잘해야 할 필요가 생긴다.
참고 문헌
>> Home