파이썬으로 살펴보는 아키텍처 패턴 - 8장 (이벤트와 메시지 버스)

이동욱

2021/12/29

가장 먼저 웹 컨트롤러가 지저분해지는 일을 막기


@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    line = model.OrderLine(
        request.json['orderid'],
        request.json['sku'],
        request.json['qty'],
    )
    try:
        uow = unit_of_work.SqlAlchemyUnitOfWork()
        batchref = services.allocate(line, uow)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            'out of stock',
            'stock_admin@made.com',
            f'{line.orderid} - {line.sku}'
        )
        return jsonify({'message': str(e)}), 400
    return jsonify({'batchref': batchref}), 201

모델이 지저분 해지는 일을 막기


def allocate(line: OrderLine, batches: List[Batch]) -> str:
    try:

        batch = next(
            b for b in sorted(batches) if b.can_allocate(line)
        )
        batch.allocate(line)
        return batch.reference
    except StopIteration:
        email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
        raise OutOfStock(f'Out of stock for sku {line.sku}')

서비스 계층이 지저분 해지는 것을 막기


def allocate(orderid: str, sku: str, qty: int, uow: unit_of_work.AbstractUnitOfWork) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
            raise

메시지 버스


class Event:
    pass


@dataclass
class OutOfStock(Event):
    sku: str
def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch('batch1', 'SMALL-FORK', 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine('order1', 'SMALL-FORK', 10))

    allocation = product.allocate(OrderLine('order2', 'SMALL-FORK', 1))

    assert product.events[-1] == events.OutofStock(sku="SMALL-FORK")
    assert allocation is None
class Product:

    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []

    def allocate(self, line: OrderLine) -> str:
        try:
            ...
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))
            return None
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)

def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        'stock@made.com',
        f'Out of stock for {event.sku}',
    )

HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}

첫 번째 선택지 : 서비스 계층이 모델에서 이벤트를 가져와 메시지 버스에 싣는 방법

def allocate(
        order_id: str, sku: str, qty: int,
        uow: unit_of_work.AbstractionUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        finally:
            messagebus.handle(product.events)

두 번째 선택지 : 서비스 계층은 자신만의 이벤트를 발생한다

def allocate(orderid: str, sku: str, qty: int,
             uow: unit_of_work.AbstractionUnitOfWork) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()

        if batchref is None:
            messagebus.handle(events.OutofStock(line.sku))
        return batchref

세 번째 선택지 : UoW가 메시지 버스에 이벤트를 발행한다

unit_of_work.py

import abc


class AbstractUnitOfWork(abc.ABC):

    def commit(self):
        self.commit()
        self.publish_events()

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError


class SqlAlchemyUnitOfWork(AbstractUnitOfWork):

    def _commit(self):
        self.session.commit()


class AbstractRepository(abc.ABC):

    def __init__(self):
        self.seen = set()

    def add(self, product: model.Product):
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

도메인 이벤트의 장/단점


도메인 이벤트의 장점

도메인 이벤트의 단점

정리


참고 문헌


>> Home