RabbitMQ IN DEPTH 책을 공부하며 작성한 글.
이 책은 RabbitMQ 3.6.3 을 기준으로 저술되었다.
예제 코드
이 책에 나오는 모든 예제 코드는 매닝 웹사이트 (https://www.manning.com/books/rabbitmq-in-depth) 와 깃허브 저장소 (https://github.com/gmr/RabbitMQ-in-Depth) 에서 무료로 다운로드할 수 있다.
에이콘출판사 도서정보 페이지 http://www.acornpub.co.kr/book/rabbitmq-depth 에서도 예제 코드를 다운로드할 수 있다.
1부 RabbitMQ 와 애플리케이션 아키텍처
1. RabbitMQ 살펴보기
RabbitMQ 의 기능과 장점
- 오픈소스
- 플랫폼과 업체 중립성
- 경량성
- 다양한 클라이언트 라이브러리
- 유연한 성능과 안정성 절충 제어
- 대기 시간이 긴 네트워크 환경을 위한 플로그인
- 서드파티 플러그인
- 보안 계층
1.1.1 RabbitMQ 와 얼랭
RabbitMQ 는 얼랭으로 구현됨
1.1.2 RabbitMQ와 AMQP
2007년 처음 출시했을 때 AMQP 스펙을 구현한 최초의 메시지 브로커 중 하나였다. AMQP 자체가 RabbitMQ 에 영향을 많이 줬다.
AMQP에 대한 자세한 내용은 https://www.amqp.org/ 에서 확인할 수 있다.
메시지 프로토콜과 메시지 브로커는 다양하므로, 애플리케이션에 미치는 영향을 고려해 메시지 프로토콜과 메시지 브로커를 선택해야 한다. RabbitMQ 는 AMQP 를 기반으로 구현됐지만 MQTT, STOMP, XMPP 등 다양한 프로토콜도 제공한다. 멀티 프로토콜 애플리케이션 아키텍처에는 좋은 선택이다.
1.2 RabbitMQ 를 사용하는 곳들
- NASA 의 Nebula
- 아구라 게임즈
- Ocean Observations Initiative
- 래포티브
- 메르카도리브레
- 구글의 AdMob
- Andhaar
1.3 느슨하게 결합된 아키텍처의 장점
로그인 서비스에서 마지막으로 로그인한 시간을 비동기로 작업하므로, 애플리케이션은 즉시 인증된 회원 페이지로 이동한다.
1.3.1 애플리케이션의 의존성 제거
애플리케이션 아키텍처는 더 이상 데이터베이스 쓰기 성능에 영향을 받지 않는다. 핵심 애플리케이션 코드를 수정하지 않고도 데이터를 처리하는 새로운 애플리케이션을 쉽게 추가할 수 있다.
1.3.4 데이터와 이벤트 복제
Federation 플러그인은 WAN 허용 오차 및 네트워크 단절을 고려해서 원격 RabbitMQ 인스턴스에 메시지를 전달한다.
1.3.5 다중 마스터 Federation
노트
메시지 지향 미들웨어를 사용하면 어느 정도의 운영상 복잡성이 발생한다. 메시지 브로커는 구조상 중심점으로 애플리케이션 설계상 새로운 단일 장애 지점 (Single point of failure) 이 된다.
1.3.6 AMQ 모델
HTTPj, SMTP 와 같은 프로토콜과 달리 AMQP 스펙은 네트워크 프로토콜의 정의뿐 아니라 서버 측 서비스와 동작 방식도 정의하는데, AMQ (Advanced Message Queuing) 모델을 살펴보면 확인할 수 있다. AMQ 모델은 메시지 라우팅 동작을 정의하는 메시지 브로커의 세 가지 추상 컴포넌트를 다음과 같이 논리적으로 정의한다.
- 익스체인지: 메시지 브로커에서 큐에 메시지를 전달하는 컴포넌트
- 큐: 메시지를 저장하는 디스크상이나 메모리상의 자료 구조
- 바인딩: 익스체인지에 전달된 메시지가 어떤 큐에 저장돼야 하는지 정의하느 컴포넌트
익스체인지
익스체인지 Exchange 는 RabbitMQ 에서 메시지를 적절한 목적지로 전달하기 위해 필요한 첫 번째 입력 값으로 AMQ 모델이 정의하는 세 컴포넌트 중 하나다. 여러 유형의 익스체인지가 있다. 특히 플러그인을 사용해서 직접 커스텀 익스체인지도 정의할 수 있다.
큐
큐는 수신한 메시지를 저장하는 역할
바인딩
바인딩을 이용해 큐와 익스체인지의 관계를 정의한다.
바인딩 키는 익스체인지가 어떤 큐에 메시지를 전달해야 하는지를 의미한다.
익스체인지에 메시지를 발행할 때 애플리케이션은 라우팅 키 (routing-key) 속성을 사용한다. 라우팅 키는 때로는 큐의 이름이거나 의미적으로 메시지를 설명하는 문자열이다. 익스체인지는 라우팅 키를 바인딩 키에 맞춰서 평가한다. 즉, 바인딩 키는 큐를 익스체인지에 연결하고 라우팅 키를 평가하는 기준이다.
RabbitMQ 에서느 AMQP 스펙을 확장해 특정 익스체인지를 다른 익스체인지에 연결할 수 있는데, 이는 메시지를 라우팅하는 데 상당한 유연성을 제공한다.
2. AMQP와 RabbitMQ 코드 작성하기
2.1 RPC 전송으로서의 AMQP
RabbitMQ 는 거의 모든 부분에서 RPC (Remote Procedure Call) 패턴으로 엄격하게 통신한다. RPC 는 한 컴퓨터에서 다른 컴퓨터의 프로그램이나 프로그램의 메서드를 원격에서 실행할 수 있게 해주는 컴퓨터 간의 통신 유형 중 하나다. 원격 API 와 통신하는 웹 프로그램이 있다면, 이는 일반적인 RPC 패턴을 사용했다고 볼 수 있다.
AMQP 스펙은 서버와 클라이언트 모두 명령을 실행할 수 있다.
2.1.1 대화 시작하기
AMQP 로 통신을 시작할 때, 인사말은 프로토콜 헤더 (Protocol header) 에 해당되는데 클라이언트가 서버로 전송한다. RabbitMQ 는 Connection.start 명령으로 인사말에 응답해 명령/응답 흐름을 시작하고 클라이언트는 Connection.StartOk 응답 프레임으로 RPC 요청에 응답한다.
2.1.2 올바른 채널로 튜닝
RabbitMQ 를 이용해서 클라이언트 애플리케이션을 구현할 때는 복잡하게 너무 많은 채널을 사용하지 않는 것이 좋다.
2.2 AMQP 의 RPC 프레임 구조
Connection.Start (클래스.메소드)
2.2.1 AMQP 프레임 컴포넌트
저수준 AMQP 프레임은 다섯 개의 별개 구성 요소로 구성된다.
- 프레임 유형
- 채널 번호
- 프레임 크기 (바이트)
- 프레임 페이로드 (Payload)
- 끝 바이트 표식 (ASCII 값 206)
2.2.4 메소드 프레임 해부하기
노트
메시지를 발행할 때 mandatory 플래그를 사용하는 경우 애플리케이션은 RabbitMQ 에서 응답한 Basic.Return 명령을 수신해야 한다. RabbitMQ 가 mandatory 플래그에 설정한 요구 사항을 충족하지 못하면 Basic.Return 명령을 동일한 채널의 클라이언트에 전송한다. 자세한건 4장에서 다룬다.
2.3 프로토콜 사용하기
메시지를 큐에 발행하기 전에 몇 가지 설정 단계를 거쳐야 하는데, 최소한 익스체인지와 큐를 설정한 후 둘을 연결해야 한다.
2.3.1 익스체인지 선언하기
AMQ 모델에서 익스체인지는 큐와 같이 '1급 시민' 으로 AMQP 스펙에 해당 클래스가 존재한다.
Exchange.Declare 명령을 전송하면 RabbitMQ 는 익스체인지를 생성한 후 Exchange.DeclarOk 메소드 프레임을 응답으로 전송한다. Declare 명령이 실패하면 RabbitMQ sms Exchange.Declare 가 실패하고 채널이 닫힌 이유를 나타내는 숫자 응답 코드와 텍스트 값을 Channel.Close 명령에 포함시켜 전송하고 Exchange.Declare 명령이 전송된 채널을 닫는다.
2.3.2 큐 선언하기
Queue.Declare 명령이 실패하면 채널이 닫힌다.
큐를 선언할 때 동일한 Queue.Declare 명령을 두 번 이상 전송해도 문제는 발생하지 않는다. RabbitMQ 는 중복된 큐 선언을 감지해 큐에 대기 중인 메시지의 수와 구독 중인 구독자의 수와 같이 큐에 대한 유용한 상태를 반환한다.
정상적으로 에러 처리하기
이미 생성한 큐와 같은 이름이지만, 속성이 다른 큐를 선언하려고 시도하면 RabbitMQ 는 RPC 요청을 발행한 채널을 닫는다. 예를 들어 가상 호스트 (virtual host) 의 설정에 대해 접근 권한이 없는 사용자가 Queue.Declare 명령을 실행하면 403 에러가 반환되고 채널은 닫힌다.
클라이언트 애플리케이션이 에러를 정상적으로 처리하려면 RabbitMQ 로부터 Channel.Close 명령을 전달받아 적절하게 응답해야 한다. 특정 클라이언트 라이브러리는 에러 응답을 애플리케이션이 처리할 수 있는 예외로 변환해 처리하며, 다른 유형의 라이브러리는 사용자가 메소드를 등록할 때 콜백을 추가하도록 하고 Channel.Close 명령을 보낼 때 콜백을 호출하는 식으로 처리하기도 한다.
클라이언트 애플리케이션이 서버에서 전송하는 이벤트를 수신하지 않거나 적절하게 처리하지 않으면 메시지가 손실될 수 있다. 존재하지 않거나 이미 닫힌 채널에 메시지를 발행하는 경우 RabbitMQ 는 연결을 종료한다. 메시지를 소비하는 애플리케이션이 RabbitMQ 가 채널을 닫은 사실을 모르는 경우 RabbitMQ 가 메시지를 더는 전송하지 않지만, 클라이언트는 빈 큐를 구독하고 있다고 간주하는 문제가 발생한다.
2.3.3 큐와 익스체인지 연결하기
Queue.Declare 와 유사하게 큐를 익스체인지에 연결하는 명령인 Queue.Bind 는 한번에 하나의 큐만 지정한다. 성공적으로 처리된 경우 클라이언트 애플리케이션에 Queue.BindOk 메소드 프레임을 전송한다.
위는 모두 동기이며, 비동기 명령들도 있다.
2.3.4 RabbitMQ 에 메시지 발행하기
Basic.Publish 메소드 프레임에는 익스체인지의 이름과 라우팅 키가 들어있는데, 이를 RabbitMQ 는 익스체인지의 이름을 저장한 데이터베이스와 비교한다.
팁
RabbitMQ 에 존재하지 않는 익스체인지에 메시지를 발행하는 경우, 기본적으로 메시지는 자동으로 버려진다. 메시지가 제대로 발행됐는지 확인하려면, 메시지 발행 시에 mandatory 플래그를 true 로 설정하거나 발행자 확인 (publisher confirmations) 을 사용해야 하는데, 이 옵션에 대해서는 4장에서 자세히 알아본다. 이 중 하나를 사용하면 애플리케이션의 메시지 발행 속도가 저하될 수 있으니 주의해야 한다.
큐를 구독하는 소비자가 없어서 메시지를 소비하지 않는다면 메시지는 큐에 계속 저장되고 메시지를 더 추가할수록 큐의 크기도 커진다. RabbitMQ 는 메시지의 Basic.Properties 에 지정된 배달 모드 (delivery mode) 에 따라 메시지를 메모리에 보관하거나 디스크에 기록한다. 배달 모드는 매우 중요하므로 다음 4장에 걸쳐 자세히 알아본다.
2.3.5 RabbitMQ 에서 메시지 소비하기
소비자 애플리케이션은 Basic.Consume 명령을 실행해서 RabbitMQ 의 큐를 구독한다.
Basic.Cunsume 이 발급되면 특정 상황이 발생하기 전까지 활성 상태를 유지한다. 소비자가 메시지 수신을 중지하려면 Basic.Cancel 명령을 발행해야 한다.
소비자는 Basic.CancelOk 응답 프레임을 받기 전에 RabbitMQ 가 미리 할당된 메시지 수만큼 메시지를 받을 수 있다.
메시지를 소비할 때 RabbitMQ 에는 소비자의 수신 방식을 알 수 있는 몇 가지 설정이 있는데, 그 중 하나는 Basic.Consume 명령의 no_ack 인수다. no_ack 를 true 로 설정하면 RabbitMQ 는 소비자가 Basic.Cancel 명령을 보내거나 연결을 끊을 때 까지 계속 메시지를 보낸다. no_ack 플래그를 false 로 설정하면 소비자는 Basic.Ack RPC 요청을 전송해 수신한 각 메시지를 확인해야 한다.
2.4 파이썬으로 메시지 발행자 작성하기
일단 도커로 RabbitMQ 를 실행해 보자.
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3.8-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
그리고
http://localhost:15672
로 접속을 해서 ID PASSWORD 를 guest/guest 로 접속해보자.
exchange 를 선언해보자.
python example/Examples/2.4\ Publisher\ Example.py
# exchange = rabbitpy.Exchange(channel, 'chapter2-example')


메시지 발행방법 요약
- connection.channel 로 채널 생성
- exchange 생성 후 declare
- queue declare
- queue 에 exchange 와 bind
- 메시지에 채널 명시해 생성 후, exchange 에 routing key 와 함께 publish
팁
운영환경에서 발행자 애플리케이션을 작성하는 경우 JSON 또는 XML 과 같은 데이터로 직렬화하면 소비자가 메시지를 쉽게 디코딩할 수 있으므로 문제가 발생하는 경우 더 쉽게 원인을 찾을 수 있다.

Get messages 를 하면

3. 메시지 속성 심층 탐사
메시지를 설명하기 위한 일관된 방법을 찾기 위해 RabbitMQ 에 발행된 모든 메시지와 함께 전달되는 데이터 구조인 AMQP 스펙의 Basic.Properties 를 살펴봤다. Basic.Properties 를 활용하면 메시지가 자동으로 제거되거나, 처리하기 전에 메시지의 출처와 유형을 검증할 수 있는 더욱 지능적인 소비자 애플리케이션의 작성이 가능했다. 3장에서는 메시지의 각 속성과 다양한 용도로 사용할 수 있는 Basic.Properties 에 대해 자세히 살펴본다.
3.1 메시지 속성 적절히 사용하기
발행한 메시지는 세 가지 유형의 프레임인 Basic.Publish 메소드 프레임, 콘텐츠 헤더 프레임, 바디 프레임으로 구성된다.
콘텐츠 헤더 프레임에 있는 메시지 속성은 Basic.Properties 데이터 구조로 사전에 정의한 값이 있는 집합이다. delivery-mode 와 같은 일부 속성은 AMQP 스펙에서 정의한 의미를 갖지만, type 과 같이 정확한 스펙이 없는 속성들도 있다.
delivery-mode 속성은 메시지가 큐에 있을 때 메시지를 메모리에 보관할지, 디스크에 먼저 저장해야 할지 RabbitMQ 에 알리는 데 사용된다.
팁
RabbitMQ 에서 MQTT 와 같은 프로토콜을 사용하는 경우, AMQP 에 특정된 속성은 사용할 수 없게 되므로 특정 속성이 손실되지 않도록 유의해야 한다.
각 기본 속성을 살펴보자.
- content-type: 메시지 본문 해석하는 방법
- content-encoding: 메시지 본문이 어떤 방법으로 압축되거나 인코딩됐는지
- message-id, correlation-id: 메시지와 메시지 응답을 고유하게 식별해 메시지 추적
- timestamp: 메시지 생성 시점에 대한 표준시간 전달
- expriation: 메시지 만료 전달
- delivery-mode: 큐에 메시지 추가할 때, 디스크 또는 메모리에 저장할지를 전달
- app-id, user-id: 문제가 발생한 애플리케이션을 추적
- type: 발행자와 소비자 사이에 계약 (contract) 정의
- reply-to: 패턴을 값으로 전달해 응답 메시지를 라우팅할 때 사용
- headers: RabbitMQ 에 메시지를 라우팅할 때, 사용자 정의 형식의 속성을 정의하는 데 사용
3.2 content-type 으로 명시적 메시지 계약 작성하기
HTTP 의 content-type 과 같다. application/json 같은 것
파이썬에서 라이브러리는 content-type 헤더에서 메시지가 어떤 유형으로 직렬화됐는지 감지하고, 이를 사용해 메시지 본문을 자동으로 디코딩해 dict, list 또는 다른 원시 데이터 유형으로 변환할 수 있다. 이를 통해 소비자 애플리케이션의 코드 복잡성을 현저하게 줄일 수 있다.
3.3 gzip, content-encoding 으로 메시지 크기 줄이기
AMQP 를 이용해서 전달한 메시지는 기본적으로 압축되지 않는다. 이는 XML 과 같이 지나치게 자세한 마크업 문법이나 큰 메시지에는 JSON, YAML 과 같이 마크업을 사용하지 않는 경우에도 문제가 될 수 있다. 서버에서 웹 페이지를 gzip 으로 압축하고 브라우저가 렌더링하기 전에 압축을 푸는 것과 마찬가지로 발행자는 메시지를 발행하기 전에 압축하고 소비자로부터 메시지를 전달받아 압축을 풀 수 있다.
content-encoding 속성으로 메시지 본문이 base64 혹은 gzip 과 같은 특수한 형식으로 인코딩됐는지 알 수 있다.
운영 환경에서는 발행자와 소비자의 메시지 계약을 운영 중에 변경하지 않는 것이 바람직하며 기존 코드에 대한 잠재적 영향을 최소화하는 것이 좋다. 그러나 메시지 크기가 애플리케이션의 전체 성능이나 안정성에 영향을 미쳐서 본문 인코딩의 변경이 불가피한 경우, content-encoding 헤더를 사용하면 소비자가 메시지의 형식을 사전에 확인할 수 있으므로 메시지 본문을 적절하게 디코딩할 수 있다.
노트
AMQP 클라이언트는 자동으로 content-encoding 값을 UTF-8 로 설정하지만, 이는 잘못된 동작이다. AMQP 스펙에는 content-encoding 이 MIME 콘텐츠 인코딩을 저장하기 위한 것이라고 명시돼 있다.
MIME 이메일 마크업은 병렬 처리가 가능하도록 content-encoding 필드를 사용해 이메일의 각 파트에 대한 인코딩을 표현한다. 이메일에서 가장 일반적인 인코딩 유형은 Base64 와 QP (Quoted-Printable) 인코딩이다. Base64 인코딩은 메시지에서 전송할 바이너리 데이터가 텍스트 전용인 SMTP 프로토콜의 범위를 넘지 않도록 사용된다. 예를 들어 이미지가 포함된 HTML 의 이메일 본문을 만드는 경우 포함된 이미지는 Base64 로 인코딩된다.
하지만 SMTP 와 달리 AMQP 는 바이너리 프로토콜이다.
클라이언트 라이브러리 활용하기
클라이언트 라이브러리를 사용해 소비자 코드를 작성하는 경우, content-encoding 속성을 사용해 수신 시 메시지를 자동으로 디코딩할 수 있다. 보통 라이브러리가 메시지의 전처리, 디코딩, 압축 해제를 처리하므로 직접 작성해야 하는 소비자 애플리케이션의 로직과 코드는 단순하다. 따라서 개발자는 메시지 본문을 처리하는 작업에 집중할 수 있다.
애플리케이션의 운영 중에 메시지 본문을 bzip2 로 압축하는 것이 더 적합하다고 판단돼 변경하더라도 content-encoding 속성을 검사하도록 소비자 애플리케이션을 구현하면 디코딩할 수 없는 메시지를 거부할 수 있다. zlib 압축만 해제할 수 있는 소비자 애플리케이션은 bzip2 로 압축된 메시지를 거부해 bzip2 압축을 풀 수 있는 다른 소비자 애플리케이션이 처리할 수 있도록 큐에 메시지를 남겨둔다.
3.4 message-id 와 correlation-id 를 이용한 메시지 참조
AMQP 스펙에서 message-id 와 correlation-id 는 '애플리케이션 용도' 로 지정됐으며, 공식적으로 정의된 동작은 없다. 두 필드는 최대 255 바이트의 UTF-8 로 인코딩된 값을 가지며 Basic.Properties 데이터 구조에 포함된 압축되지 않은 값으로 저장된다.
- message-id: 메시지를 고유하게 식별
- correlation-id: 메시지가 다른 메시지에 대한 응답임을 나타내는 데 사용, 이 경우 이전 메시지의 meesage-id 를 값으로 포함
3.4.1 message-id
판매 주문이나 지원 요청과 같은 경우 메시지를 쉽게 파악하는 데 도움이 된다.
3.4.2 correlation-id
다른 메시지에 대한 응답임을 표시. 또 다른 사용 예는 트랜잭션 ID 나 메시지가 참조하는 다른 데이터를 전달하는 데 이 속성을 사용하는 것이다.
3.5 timestamp
timestamp 속성을 사용해 메시지 생성 시점을 기록하면 메시지를 발행할 때 성능을 추적할 수 있다.
프로세스가 시행해야하는 서비스 수준 계약 (SLA, Service Level Agreement) 이 있다면, 소비자 애플리케이션에서 메시지의 timestamp 속성을 평가해 메시지를 처리할 지 여부를 결정하거나 메시지의 수명이 지정한 값을 초과한 경우 모니터링 애플리케이션에 경고 메시지를 발행해서 누군게에게 알릴 수 있다.
timestamp 는 유닉스 시간 (Unix epoch) 또는 1970년 1월 1일 자정 이래로 경과된 초를 나타내는 정수로 전송된다. 불행히도 timestamp 는 시간대 (time zone) 정보가 없으므로 UTC 혹은 다른 일관된 시간대를 약속해 사용하는 것이 좋다.
3.6 자동으로 메시지 만료하기
expiration 속성은 RabbitMQ 에서 소비하지 않은 메시지를 버려야 할 때를 파악하는데 사용한다. expiration 속성은 AMQP 스펙 0-8 과 0-9-1 버전에 모두 존재하지만, RabbitMQ 3.0 버전 이전에는 지원되지 않았다. '구현할 때 사용할 수 있지만 공식적인 동작은 없음' 으로 정의됐다. timestamp 와 동일하게 유닉스 시간을 값으로 갖지만, 타입은 255자의 짧은 문자열이라는 것이다.
RabbitMQ 에서 expiration 속성을 사용해 메시지를 자동으로 만료 처리하려면 유닉스 시간 또는 정수 기반 timestamp 를 값으로 가져야 하지만, 타입은 문자열로 저장돼야 한다. "2002-02-20T00:00:00-00" 과 같이 ISO-8601 형식의 타임스탬프를 저장하는 대신 문자열 값인 "1329696000" 과 동일한 형식으로 값을 설정해야 한다.
expiration 속성을 사용하는 메시지가 서버에 도착한 후 시간이 만료된 경우 메시지는 큐로 삽입되지 앉고 삭제된다.
RabbitMQ 에는 특정 상황에서만 메시지가 만료되는 다른 기능이 있다. 큐를 선언할 때 큐의 정의와 함께 x-message-ttl 속성을 인자로 전달해서 메시지를 만료할 수 있는데, 유닉스 시간이지만 밀리세컨드 정밀도 (유닉스 시간 X 1000) 의 정수로 값을 설정한다. 큐의 x-message-ttl 속성은 지정된 시간이 경과되면 메시지를 자동으로 삭제한다.
3.7 배달 모드를 이용해 안정성과 속도 조절하기
메시지를 디스크에 저장하면 RabbitMQ 서버를 정지하고 다시 시작하더라도 메시지가 소비될 때까지 큐에 남아있게 된다. delivery-mode 속성은 메시지를 저장하지 않을 경우 1, 메시지를 저장하는 경우 2, 이렇게 두 가지 값으로 지정된다.
노트
RabbitMQ 의 다양한 용어와 설정에 대해 처음 접한다면, 메시지 지속성 (persistence) 이 큐의 내구성 (durable) 속성과 혼동될 수 있다. 큐의 durable 속성은 RabbitMQ 서버나 클러스터를 다시 시작한 후에도 큐 정의가 유지돼야 하는지를 나타내는 반면, delevery-mode 는 메시지를 유지할지 여부를 나타낸다. 하나의 큐에는 디스크에 저장되는 지속성 메시지와 메모리에만 보관되는 비지속성 메시지가 동시에 포함될 수 있다.
웹 애플리케이션의 로그인 이벤트의 경우 delivery-mode 속성을 선택하기가 다른 케이스보다 쉽다. 로그인 이벤트가 없어진다고 해서 비즈니스가 위험에 빠지지는 않으므로 이벤트를 메모리에만 보관하는 것도 합리적인 선택이 된다. 이 경우 delivery-mode 를 1로 설정한다. 그러나 RabbitMQ 를 사용해 금융 거래 데이터를 발행하고 애플리케이션 아키텍처가 메시지 처리량보다는 정확한 전달에 초첨을 맞춘다면 delivery-mode 를 2로 지정해 지속성을 활성화한다.
배달모드는 성능에 중요한 영향을 미친다.
3.8 app-id 및 user-id 를 사용해 메시지의 출처 확인하기
app-id 와 user-id 는 소비자 애플리케이션에서 메시지를 처리하기 전에 유효성을 검증하는 용도로 활용된다.
app-id 속성은 발행자 애플리케이션에서 자유롭게 사용할 수 있는 문자열 값이다.
RabbitMQ 는 메시지를 발행하는 RabbitMQ 사용자에 대해 user-id 속성을 이용해 유효성을 검사한다.
**메시지 출처를 확인하는 데 사용한다.
3.8.1 app-id 속성
app-id 속성은 최대 255자의 짧은 UTF-8 문자열이다. 애플리케이션이 API 중심으로 디자인돼 버전 관리가 필요한 경우 app-id 를 사용해 생성된 메시지와 함께 특정 API 와 버전을 달리할 수 있다. 발행자와 소비자 간에 계약을 맺는 방법 중 하나로 사용한다면, 메시지를 처리하기 전에 app-id 를 검사해 알 수 없거나 지원하지 않는 출처의 메시지의 경우 애플리케이션에서 메시지를 삭제할 수 있다.
app-id 의 다른 사용법은 통계 데이터로 수집하는 것이다. 예를 들어 메시지를 사용해 로그인 이벤트를 전달하는 경우, app-id 속성을 로그인 이벤트를 발생시키는 애플리케이션의 플랫폼과 버전으로 설정한다. 웹 기반이나 데스크톱 그리고 모바일 클라이언트 애플리케이션을 사용하는 환경에서는 메시지 본문을 검사하지 않고도 플랫폼별로 로그인을 추적하기 위해 계약을 맺어서 데이터를 추출할 수 있다. 통계 수집 전용 소비자를 구현하고 로그인 이벤트를 처리하는 소비자와 동일한 메시지를 구독한다면 이 기능이 특히 유용하다. app-id 속성을 제공하면 통계 수집 전용 소비자가 메시지 본문을 디코딩할 필요가 없다.
사용법
- 버전관리
- 출처확인
- 통계
팁
큐에 대기하는 문제가 발생한 메시지의 출처를 추적할 때, app-id 를 이용하면 메시지의 출처를 쉽게 추적할 수 있으며, 이는 다수 애플리케이션이 동일한 RabbitMQ 인프라를 공유하는 대규모 환경에서 특히 유용하다. 기존 발행 애플리케이션과 동일한 익스체인지와 라우팅 키를 새로운 발행자 애플리케이션이 잘못 사용하는 경우가 종종 발생한다.
3.8.2 user-id 속성
사용자 인증의 경우에는 로그인한 사용자를 식별하기 위해 user-id 속성을 사용하는 것이 유용해 보이지만, 대부분의 경우 권장되지 않는다. RabbitMQ 는 메시지를 발행하는 사용자에 대해 user-id 속성의 값으로 발행된 모든 메시지를 검사하고 두 값이 일치하지 않으면 메시지가 거부된다. 예를 들어 애플리케이션이 RabbitMQ 를 사용해 사용자 'www' 로 인증하고 메시지의 user-id 속성을 'linus' 로 설정할 경우 메시지가 거부된다.
물론 작성하는 애플리케이션이 채팅이나 인스턴트 메시징 서비스라면 한 채팅방의 모든 사용자가 같은 user-id 를 사용해야 하며, 실제로 로그인한 실제 사용자를 식별하기 위해 user-id 를 사용할 수 는 없다.
3.9 type 속성을 이용해 메시지 특정하기
AMQP 0-9-1 버전에서 Basic.Properties 의 type 속성은 '메시지 유형 이름' 으로 정의돼 있는데, 애플리케이션 전용으로 공식적인 동작은 정해지지 않았다는 의미다. 익스체인지와 결합된 라우팅 키 값은 메시지 내용을 결정하는 데 필요한 만큼 메시지에 대한 많은 정보를 전달하는 데 반해, type 속성은 애플리케이션이 메시지 처리 방법을 결정 하는 데 또 다른 수단으로 사용된다.
**자유 형식 문자열 값으로 메시지 유형을 정의하는 데 주로 사용된다.
자체 설명 직렬화 형식이 충분히 빠르지 않은 경우
type 속성은 자체 설명 (self-describing) 메시지를 만들 때, 특히 메시지 본문이 자체 설명 데이터 형식으로 직렬화되지 않은 경우 매우 유용하다. JSON, XML 같은 형식은 너무 장황한 편이다. 이럴 경우 Apache Thrift 혹은 Protobug 와 같은 직렬화 형식을 선택하기도 한다. 이들은 MessagePack 과는 달리 이진 코드화 메시지 형식에 자체 설명이 포함되지 않았으므로 직렬화 및 역직렬화를 위한 외부 정의 파일이 필요한데, 메시지에 자체 설명이 빠져있으므로 유선에서 더 적은 페이로드가 사용된다.
발행자와 소비자 간에 실행 가능한 계약을 자체 설명하는 메시지와 달리 자체 설명하지 않는 메시지의 본문은 메시지를 소비자가 처리할 수 있는지 결정하기 전에 메시지 본문을 역직렬화해야 한다. 이 경우 type 속성을 사용해 레코드 유형이나 외부 정의 파일을 지정함으로써 소비자가 메시지를 처리하는데 사용하는 적절한 .thrift 나 .proto 파일에 접근할 수 없는 경우 처리할 수 없는 메시지로 판단해 거부할 수 있다.
이제 이벤트를 데이터 웨어하우스에 저장하자. 단일 소비자가 모든 메시지를 처리하기 위해 일반 큐를 사용해 ETL (추출 extract_변환 transform-로드 load) 단계를 수행한다. ETL 을 위한 큐의 소비자는 여러 유형의 메시지를 처리하고 type 속성을 사용해 추출된 데이터를 저장할 시스템이나 테이블 또는 클러스터를 결정한다.
노트
ETL 은 최종적인 보고 목적으로 OLTP 데이터를 추출해서 데이터 웨어하우스에 로드하는 표준 방식이다.
3.10 동적인 작업 흐름을 위한 reply-to 속성 사용하기
AMQP 스펙에서 reply-to 속성은 공식적으로 정의된 동작은 없고 '애플리케이션 용도' 로만 지정돼 있다. 앞서 언급한 속성들과는 달리, 메시지에 대한 응답을 위한 개인 응답 큐를 지정하는 데 사용될 수 있다는 점을 주목할 만 하다. AMQP 스펙에 개인 응답 큐에 대한 명확한 정의가 명시돼 있지는 않지만, reply-to 속성은 특정 큐 이름이나 메시지가 원래 발행된 동일한 익스체인지의 응답 키를 전달하는 데 사용할 수 있다.
경고
AMQP 0-9-1 버전에는 reply-to 속성에 대해 '요청 메시지에 사용될 때 개인 응답 큐의 읆을 보유할 수 있다.'는 경고가 있다. 이 정의는 모호하므로 사용할 때 주의해야 한다. reply-to 속성의 값 때문에 응답 메시지가 라우팅될 수 없는 경우, RabbitMQ 가 메시지 발행을 거부할 수 있다.
**reply-to 속성은 RPC 스타일의 메시지의 응답에 소비자가 사용해야 하는 라우팅 키를 전달하는 데 사용할 수 있다.
3.11 headers 를 사용해 사용자 속성 지정하기
사용자 정의 키와 값을 갖는 테이블이다. 키는 최대 255자의 길이를 갖는 ASCII 또는 유니코드 문자열을 설정할 수 있다. 값은 유효한 AMQP 값 유형을 설정할 수 있다. 다른 속성들과 달리 headers 속성을 사용하면 원하는 모든 데이터를 headers 테이블에 추가할 수 있다. headers 속성에는 특별한 기능이 있는데, RabbitMQ 는 라우팅 키를 사용하는 대신 헤더 테이블에 채워진 값을 기반으로 메시지를 라우팅할 수 있다는 점이다. headers 속성을 통한 메시지 라우팅은 6장에서 알아본다.
예시
키 (ASCII 또는 유니코드 문자열) | 값 (모든 AMQP 데이터 유형) |
foo | bar |
corge | 1 |
grault | True |
graply | 1921-08-19 |
3.12 priority 속성
RabbitMQ 3.5.0 부터 AMQP 스펙에 맞춰서 priority 필드가 구현됐다. priority 속성의 값은 큐에 포함된 메시지의 우선순위 지정에 사용하며, 0~9까지의 값을 갖는 정수로 정의된다. priority 가 9인 메시지가 발행되고 나서 priority 가 0 인 메시지가 발행되면 새로 연결된 소비자 애플리케이션은 priority 가 9인 메시지보다 0인 메시지를 먼저 받게 된다. (낮은 것 부터 받음) 흥미롭게도 RabbitMQ 는 priority 속성을 부호 없는 바이트 (unsigned byte) 로 구현해 0에서 255사이의 값을 지정할 수 있지만, AMQP 스펙과 상호운용성을 유지하려면 priority 를 0에서 9로 제한해야 한다.
3.13 사용할 수 없는 속성: cluster-id/reserved
cluster-id 는 AMQP 0-8 에서 정의됐으며 AMQP 0-9-1 에서 제거됐다.
AMQP 0-9-1 에서 예약됨 (reserved) 으로 변경됐으므로 사용하지 말아야 한다.
이 속성은 가급적 사용하지 않는 것이 좋다.
3.14 요약
속성 | 유형 | 사용처 | 명시된 내용 |
app-id | 짧은 문자열 | 애플리케이션 | 메시지르 발행하는 애플리케이션을 정의 |
content-encoding | 짧은 문자열 | 애플리케이션 | 메시지 본문이 zlib, deflate 또는 Base64 와 같은 특별한 방법으로 인코딩 되는지 지정 |
content-type | 짧은 문자열 | 애플리케이션 | mime-types 를 사용해 메시지 본문의 유형 지정 |
correlation-id | 짧은 문자열 | 애플리케이션 | 메시지가 참조하는 내용 |
delivery-mode | octet | RabbitMQ | 1은 RabbitMQ 가 메시지를 메모리에 보관 2는 디스크에 기록 |
expiration | 짧은 문자열 | RabbitMQ | 메시지가 만료되는 시기를 나타내는 데 사용하는 텍스트 문자열의 유닉스 시간 값 |
headers | 테이블 | 양쪽 모두 | 메시지에 대한 추가적인 메타데이터를 첨부하는 데 사용하는 자유 형식 키/값 테이블. 원하는 경우 RabbitMQ 가 이 값을 기반으로 라우팅 가능 |
message-id | 짧은 문자열 | 애플리케이션 | 메시지를 식별하는 데 사용할 수 있는 UUID 와 같은 고유 식별자 |
priority | octet | RabbitMQ | 큐에서 메시지의 우선순위를 지정하는 속성 |
timestamp | timestamp | 애플리케이션 | 메시지 작성 시점을 나타내는 데 사용하는 유닉스 시간 값 |
type | 짧은 문자열 | 애플리케이션 | 에플리케이션이 메시지 유형 또는 페이로드를 설명하는 데 사용할 수 있는 텍스트 문자열 |
user-id | 짧은 문자열 | 양쪽 모두 | 연결된 사용자에 대해 유효성을 검증하고 일치하지 않는 메시지를 삭제하는 자유 형식 문자열 |
4. 메시지 발행에서 성능 절충
여러 서버에 걸쳐 있는 HA 큐와 같은 다양한 메시지 배달 보장 수준을 선택할 수 있다. 4장에서는 이러한 기능을 사용하는 데 관련된 성능과 배달 보장의 성능 절충에 대해 알아보고 RabbitMQ 가 자동으로 발행자의 메시지를 조절하는 방법을 알아본다.
4.1 발행 속도와 배달 보장의 균형 잡기
RabbitMQ 서버를 재부팅해도 메시지가 유지되도록 하는 등의 일부 기능은 특정 애플리케이션에서는 너무 느리고 적합하지 않을 수 있다. 반면에 추가적인 배달 보장 없이 메시지를 발행한다면, 속도는 훨씬 빠르지만 미션 크리티컬 애플리케이션에서는 안전한 환경을 제공하지 못한다.
토끼 <-> 거북이
- 배달 보장 없음
- 실패 통보
- 발행자 확인
- 대체 익스체인지
- HA 큐
- 트랜잭션
- 트랜잭션 HA 큐
- 메시지 디스크에 저장
자체적인 성능 벤치마크를 수행해 성능과 배달 보장 간에 적절한 균형을 결정하는 것이 좋다.
올바른 솔루션을 위한 고성능과 메시지 배달 보장 사이에 적절한 균형을 찾는 과정에서는 다음 질문을 염두에 두길 바란다.
- 발행 시에 미시지에 큐를 넣는 것이 얼마나 중요한가?
- 메시지를 라우팅할 수 없는 경우, 발행자에게 메시지를 보내야 하는가?
- 메시지를 라우팅할 수 없는 경우, 차후에 조정하는 다른 곳으로 메시지를 보내야 하는가?
- RabbitMQ 서버에 장애가 발생할 때, 메시지가 손실돼도 괜찮은가?
- RabitMQ 가 새 메시지를 처리할 때, 요청한 모든 메시지를 라우팅한 후 디스크에 저장하는 작업이 정상적으로 수행했는지 발행자가 확인해야 하는가?
- 발행자가 메시지를 한꺼번에 전달하면 RabbitMQ 는 메시지를 라우팅하고 디스크에 저장한 후 작업이 정상적으로 실행됐는지를 발행자에게 다시 알려야 하는가?
- 다수 메시지를 라우팅한 후 디스크에 정상적으로 저장됐는지 확인하는 작업을 일괄 처리하는 경우, 메시지를 저장할 큐에 원자 커밋 (atomic commit) 이 필요한가?
- 발행자가 적절한 성능과 메시지 처리량을 달성하는데, 메시지 배달 보장 기능 간에 절충점이 있는가?
- 메시지 발행의 다른 측면이 메시지 처리량 및 성능에 영향을 미치는가?
HA 큐와 필수 (mandatory) 라우팅을 결합해서 선택하거나 delivery-mode 를 2로 설정하고 트랜잭션 발행을 선택해 메시지를 디스크에 보관할 수 있다. 애플리케이션 개발 시에 균형 잡힌 조합을 발견할 때 까지, 각각 다른 옵션을 조합해 시도하는 것을 추천한다.
4.1.1 배달 보장을 사용하지 않는 환경
이상적인 환경이라면 RabbitMQ 는 추가 구성이나 설정 없이 메시지를 안정적으로 전달한다. 올바른 Basic.Publish 를 통해 익스체인지, 라우팅 정보와 함께 메시지를 발행하면 RabbitMQ 가 메시지를 수신하고 적절한 큐에 전달한다. 네트워크 문제가 없고 서버 하드웨어가 안정적이며 장애도 발생하지 않는다면, 운영체제가 RabbitMQ 메시지 브로커의 운영 상태에 영향을 미치는 문제는 발생하지 않는다.
불행하게도 현실 세계에는 완벽한 환경에서는 결코 일어나지 않을 일들이 정기적으로 발생한다.
미션 크리티컬한 애플리케이션이 아닌 경우, 일반적인 메시지 발행 중 발생 가능한 모든 장애를 처리할 필요는 없으며, 적절한 처리만 해도 안정적이고 예측 가능한 가동 시간을 확보할 수 있다.
미션 크리티컬한 애플리케이션이 아닌 경우에는 RabbitMQ 의 기본 설정으로도 적절한 수준의 안정적인 메시징 환경을 구축할 수 있다. 배달 보장을 사용하지 않아도 적절한 환경에 대해 살펴보자.
이 메시지는 서버의 CPU 로드, 메모리, 네트워크 사용과 같은 정보를 전달한다.
RabbbitMQ 와의 연결이 끊어지면 다음 번에 통계 데이터를 보내야 할 때 다시 연결을 시도한다. 마찬가지로 소비자 애플리케이션은 연결이 끊어지면 다시 연결하고 이전에 사용하던 동일한 큐에서 다시 메시지를 소비한다.
4.1.2 mandatory 플래그를 설정한 메시지를 라우팅할 수 없을 때
서버 모니터링 데이터가 항상 RabbitMQ 로 배달되도록 보장하려면, collectd 에서 RabbitMQ 에 발행하는 메시지의 mandatory 를 설정한다. mandatory 플래그는 Basic.Publish RPC 명령과 함께 전달되는 인수인데, 메시지를 라우팅할 수 없으면 Basic.Return RPC 를 통해 RabbitMQ 가 메시지를 발행자에게 다시 보내도록 지시한다. mandatory 플래그는 오류 감지 모드를 켜는 것으로 간주할 수 있는데, 메시지 라우팅 실패를 알리는 데 사용한다. 메시지 라우팅이 올바르게 처리되면 발행자에게 별도의 메시지를 전송하지 않는다. mandatory 플래그를 설정한 메시지를 발행하기 위해 다음과 같이 익스체인지, 라우팅 키, 메시지, 속성을 전달한 후 인수를 전달한다.
노트
위 예제 코드에서는 Connection 과 Channel 객체를 호출하는 새로운 방법을 사용했는데, 두 객체 모두 컨텍스츠 관리자로 생성된다. (with xxx as connection: 문법)
Rabbit MQ 의 Basic.Return 은 비동기로 동작하며 메시지가 발행된 후 언제든지 발생할 수 있다. 예를 들어 RabbitMQ 에 통계 데이터를 발행하는 데 실패할 경우, collectd 가 Basic.Return 호출을 받기 전에 다른 데이터를 계속 발행할 수 있다.
mandatory 예외처리
import datetime
import rabbitpy
import time
# Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
url = 'amqp://guest:guest@localhost:5672/%2F'
connection = rabbitpy.Connection(url)
try:
with connection.channel() as channel:
properties = {'content_type': 'text/plain',
'timestamp': datetime.datetime.now(),
'message_type': 'graphite metric'}
body = 'server.cpu.utilization 25.5 1350884514'
message = rabbitpy.Message(channel, body, properties)
message.publish('chapter2-example',
'server-metrics',
mandatory=True)
except rabbitpy.exceptions.MessageReturnedException as error:
print('Publish failure: %s' % error)
다른 라이브러리의 경우, 메시지를 발행할 때 RabbitMQ 에서 Basic.Return RPC 를 전달받으면 실행할 콜백 메소드를 등록해야 할 수 있다. 비동기적으로 Basic.Return 메시지를 처리할 때 다른 메시지를 소비하는 것 처럼 Basic.Return 메소드 프레임, 콘텐츠 헤더 프레임, 바디 프레임을 받게 되는데, 복잡해 보여도 크게 걱정할 필요는 없다. 프로세스를 단순화하고 메시지 라우팅 오류를 처리하는 다른 방법이 있는데, 그 중 하나는 RabbitMQ 에서 발행자 확인을 사용하는 것이다.
노트
rabbitpy 라이브러리는 Basic.Publish 명령을 보낼 때 최대 세 개의 인수만 받는데, 이는 추가 인수로 immediate 플래그가 포함된 AMQP 스펙과 대조적이다. immediate 플래그는 메시지 브로커가 메시지를 즉시 라우팅할 수 없는 경우 Basic.Return 을 반환하도록 지시한다. immediate 플래그는 RabitMQ 2.9 이후로 더 이상 사용되지 않으며, 사용할 경우 예외가 발생되고 채널이 닫힌다.
4.1.3 트랜잭션보다 가벼운 발행자 확인
발행자 확인 (Publisher Confirms) 은 AMQP 스펙의 확장 기능으로 RabitMQ 관련 확장을 지원하는 클라이언트 라이브러리에서만 지원된다. 디스크에 메시지를 저장하는 것으로도 메시지 손실을 막을 수 있지만, 이것으로는 발행자와 RabbitMQ 사이에 메시지가 전달됐음을 확신할 수 없다. 메시지를 발행하기 전에 메시지 발행자는 RabbitMQ 에 Confirm.Select PRC 요청을 전송하고 메시지가 전달됐는지 확인하기 위해 Confirm.SelectOk 응답을 기다린다. 이 시점부터 발행자가 RabbitMQ 에 보내는 각 메시지에 대해 서버는 수신 확인 (Baisc.Ack) 또는 부정 수신 확인 (Basic.Nack) 으로 응답하며, 메시지의 오프셋을 지정하는 정수 값을 포함하거나 확인한다. 확인 번호는 Confirm.Slect RPC 요청 다음에 수신된 순서에 따라 메시지를 참조한다.
python 4.1.3\ Publisher\ Confirms.py

노트
발행자 확인의 사용 여부와 상관없이 존재하지 않는 익스체인지에 메시지를 발행할 경우, 채널은 RabbitMQ 에 의해 종료된다. 이 경우 rabbitpy 에서는 rabbitpy.exceptions.RemoteClosedChannelException 예외가 발생한다.
4.1.4 라우팅할 수 없는 메시지를 위한 대체 익스체인지 사용하기
대체 익스체인지 (alternate exchange) 는 라우팅할 수 없는 메시지를 처리하기 위해 RabbitMQ 팀이 AMQP 를 확장한 또 다른 예다. 대체 익스체인지는 처음 익스체인지를 선언할 때 명시되며, RabbitMQ 에서 익스체인지가 라우팅할 수 없으면, 새로운 익스체인지가 메시지를 라우팅할 기존의 익스체인지를 대신해 지정된다.
**라우팅할 수 없는 메시지가 대체 익스체인지를 정의한 익스체인지에 발행되면, 메시지느 대체 익스체인지에 전달된다.
노트
대체 익스체인지가 설정된 익스체인지로 메시지를 보낼 때 mandatory 플래그를 설정하면 의도한 익스체인지가 메시지를 정상적으로 라우팅할 수 없는 경우 Basic.Return 이 발행자에게 직접 전송되지 않는다. 라우팅할 수 없는 메시지를 대체 익스체인지에 보내는 동작은 mandatory 플래그를 True 로 설정한 메시지에도 동일하게 적용된다. RabbitMQ 의 메시지 라우팅 패턴이 다른 익스체인지와 마찬가지로 대체 익스체인지에도 동일하게 적용된다는 점을 주의해야 한다. 큐가 원래 라우팅 키가 명시된 메시지를 수신하도록 바인딩되지 않은 경우 메시지는 큐에 추가되지 않고 손실된다.
대체 익스체인지를 사용하려면, 먼저 라우팅 할 수 없는 메시지를 전송할 익스체인지를 설정해야 한다. 기본 익스체인지를 서정한 후, 메시지를 발행할 때 Exchange.Declare 명령에 alternate-exchange 인수를 추가한다. 다음 예제 코드에는 이전 예제 코드에 라우팅할 수 없는 메시지를 저장하는 메시지 큐가 더 추가됐다.

대체 익스체인지를 팬아웃 (fanout) 유형으로 생성했지만, graphite 익스체인지는 토픽 (topic) 유형으로 생성했다. 팬아웃 익스체인지는 자신이 알고 있는 모든 큐에 메시지를 전달하고 토픽 익스체인지는 라우팅 키를 기반으로 선택적으로 메시지를 라우팅할 수 있다. 익스체인지의 유형에 대해서는 5장에서 자세히 알아본다. 두 익스체인지를 생성하고 unroutable-message 라는 이름의 큐를 대체 익스체인지에 연결한다.

이후 graphite 익스체인지에 메시지가 발행되고 라우팅될 수 없는 메시지는 unroutable-message 큐에 저장된다.
4.1.5 트랜잭션으로 배치 처리하기
RabbitMQ 확장 스펙인 발행자 확인을 구현하기 전에 메시지 전달을 보장하기 위한 유일한 방법은 트랜잭션이었다. AMQP 트랜잭션 혹은 TX 클래스는 메시지를 일괄로 RabbitMQ 에 발행한 후 큐에 커밋하거나 롤백할 수 있는 매커니즘을 제공한다. 다음 예제는 '4.1.5 Transactional Publishing' 노트북에 포함돼 있다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
tx = rabbitpy.Tx(channel)
tx.select()
message = rabbitpy.Message(channel,
'This is an important message',
{'content_type': 'text/plain',
'delivery_mode': 2,
'message_type': 'important'})
message.publish('chapter4-example', 'important.message')
try:
if tx.commit():
print('Transaction committed')
except rabbitpy.exceptions.NoActiveTransactionError:
print('Tried to commit without active transaction')
존재하지 않는 익스체인지와 같은 오류로 인해 RabbitMQ 가 메시지를 라우팅할 수 없으면 TX.CommitOk 응답을 보내기 전에 Basic.Return 응답이 반환된다. 발행자가 트랜잭션을 중단하려는 경우 TX.Rollback RPC 요청을 보내고 계속 진행하기 전에 미싲 브로커의 TX.RollbackOk 응답을 기다려야 한다.
RabbitMQ 와 원자 트랜잭션
불행하게도 RabbitMQ 는 발행한 모든 명령이 단일 큐에 영향을 줄 때만 원자 트랜잭션을 지원한다. 트랜잭션의 명령이 둘 이상의 큐에 영향을 주면 커밋은 원자적으로 동작하지 않는다.
메시지 발행 확인을 목적으로 트랜잭션의 사용을 고려한다면 좀 더 단순한 발행자 확인을 사용하는 것을 추천하는데, 발행자 확인이 더 빠르며 성공과 실패를 확인할 수 있다.
그러나 대부분의 경우 발행자 확인 뿐 아니라 메시지가 큐에 있는 동안 손실되지 않는 것이 중요한데, 이는 HA 큐로 보장할 수 있다.
4.1.6 HA 큐를 사용해 노드 장애 대응하기
미션 크리티컬 메시징 아키텍처에서는 가용성이 높은 HA 가 중요한 역할을 한다. HA 큐도 AMQP 스펙이 아닌 RabbitMQ 팀이 만든 확장 기능이며, 큐를 여러 서버에 중복해 복사본을 저장하는 기능을 제공한다.
HA 큐는 클러스터로 구성된 RabbitMQ 환경이 필요하며 AMQP API 또는 웹 기반 관리자 UI 로 설정할 수 있다. 이어서는 AMQP API 를 이용해 설정하는 방법을 알아보고, 웹 기반 관리자 UI 를 사용해 HA 큐에 대한 정책을 설정하는 방법은 8장에서 자세히 알아보자.
import rabbitpy
connection = rabbitpy.Connection()
try:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel,
'my-ha-queue',
arguments={'x-ha-policy': 'all'})
if queue.declare():
print('Queue declared')
except rabbitpy.exceptions.RemoteClosedChannelException as error:
print('Queue declare failed: %s' % error)
메시지가 HA 큐로 설정된 큐에 발행되면 HA 큐를 담당하는 클러스터의 각 서버로 메시지가 전송된다. 클러스터의 노드가 메시지를 소비하면 다른 노드의 모든 메시지 복사본이 즉시 제거된다.
개별 노드를 지정하려면 x-ha-policy: all 대신 nodes 를 인수로 전달하고 다음 인수인 x-ha-nodes 에 큐의 노드 목록을 지정한다.
4.1.6 Selective HA Queue Declaration
import rabbitpy
connection = rabbitpy.Connection()
try:
with connection.channel() as channel:
arguments = {'x-ha-policy': 'nodes',
'x-ha-nodes': ['rabbit@node1',
'rabbit@node2',
'rabbit@node3']}
queue = rabbitpy.Queue(channel,
'my-2nd-ha-queue',
arguments=arguments)
if queue.declare():
print('Queue declared')
except rabbitpy.exceptions.RemoteClosedChannelException as error:
print('Queue declare failed: %s' % error)
노트
node1, node2, node3 가 정의돼 있지 않더라도 큐를 정의할 수 있으며 메시지를 발행하는 경우 큐에 전달된다. 나열된 노드 중 하나 이상이 존재하는 경우 메시지는 해당 서버에 저장된다.
다운된 노드가 다시 추가되거나 새 노드가 클러스터에 추가되더라도 기존 노드의 큐에 이미 존재하는 메시지는 포함되지 않는다. 대신 이전에 발행한 모든 메시지가 소비되면 모든 새 메시지가 수신되고 동기화된다.
4.1.7 HA 큐 트랜잭션
HA 큐에서 트랜잭션 또는 발행자 확인을 사용하는 경우, 메시지가 HA 큐의 모든 활성 노드에 있는 것으로 확인될 때 까지 RabbitMQ 는 성공 응답을 보내지 않는다. 이로 인해 발행자 애플리케이션에 대한 응답이 지연될 수 있다.
4.1.8 delivery-mode 2를 사용해 메시지를 디스크에 저장하기
이어서 또 다른 배달 보장에 대해 알아보자. RabbitMQ 서버가 메시지를 소비하기 전에 특정 이유로 노드가 다운될 경우, RabbitMQ 에 메시지를 발행할 때 디스크에 저장하도록 설정하지 않는다면 메시지는 영원히 손실된다.
노트
delivery-mode 2 외에 RabbitMQ 서버를 다시 시작한 후에도 메시지가 남아있게 하려면 큐를 만들 때 durable 로 선언돼야 한다.
**OLTP (On-line Transaction Processing, 온라인 트랜잭션 처리)
일반적으로 대부분의 웹 애플리케이션에서 쓰기의 비율은 낮다.
대부분의 운영체제에서 커널은 디스크에서 읽은 페이지를 버퍼링하기 위해 RAM 의 여유분을 사용하지만, 디스크 쓰기를 캐시하는 유일한 컴포넌트는 디스크 컨트롤러와 디스크다. 이 때문에 메시지를 디스크에 저장할 때 하드웨어의 스펙을 올바르게 설정하는 것이 중요하다. 소형 서버에서는 쓰기 작업량이 과한 경우 RabbitMQ 의 동작이 매우 느려질 수 있다.
4.2 RabbitMQ 푸시백
AMQP 스펙에는 RabbitMQ 서버 구현에 유리하지 않은 발행자의 가정이 있다. RabbitMQ 버전 2.0 이전에는 발행자 애플리케이션이 너무 빨리 메시지를 발행해 RabbitMQ 를 압도하기 시작한 경우 발행자에게 Channel.Flow RPC 메소드를 보내 차단하고 다른 Channel.Flow 명령을 받을 때 까지 더 이상 메시지를 보내지 않도록 지시한다.
이는 Channel.Flow 명령을 처리하지 않거나 잘못 처리하는 발행자 애플리케이션의 경우, 메시지 발행을 늦추는데 상당히 비효율적인 방법으로 알려졌다. RabbitMQ 3.2 버전 이전에 RabbitMQ 팀은 Channel.Flow 의 사용을 중단했으며, 이를 TCP 배압 (Back-Pressure) 매커니즘으로 대체해 문제를 해결했다. 발행자에게 정중하게 요청하지 않고 RabbitMQ 는 TCP 소켓에서 하위 수준의 데이터 수신을 중지한다. 이는 단일 발행자에게 RabbitMQ 가 압도당하지 않도록 보호한다.
내부적으로 RabbitMQ 는 크레딧 (credit) 이라는 개념을 사용해 발행자에 대해 언제 푸시백을 할 것인지를 관리한다. 새로운 연결이 생성되면 이 연결에 미리 사용할 수 있는 크레딧의 양이 할당되고 RabbitMQ 가 각 RPC 명령을 수신하면 크레딧이 감소한다.
연결에 남은 크레딧이 없으면 크레딧이 생길 때까지 무시한다.
RabbitMQ 3.2 부터 RabbitMQ 팀은 AMQP 스펙을 확장해 연결에 대한 크레딧이 임곗값에 도달했을 때 전송되는 알림을 추가하고 클라이언트에 연결이 차단됐다는 사실을 알린다. Connection.Blocked 와 Connection.Unblocked 는 RabbitMQ 가 발행자 클라이언트를 차단하거나 해당 블록이 제거됐을 때 언제든지 클라이언트에 알릴 수 있는 비동기 메소드다. 대부분의 주요 클라이언트는 이 기능을 구현했다. 사용중인 클라이언트 라이브러리에도 애플리케이션의 연결 상태를 결정하는 방법이 구현됐는지 확인해야 한다. 이어지는 절에는 rabbitpy 로 이 검사를 수행하는 방법과 3.2 이전의 RabbitMQ 버전에서 관리 API 를 활용해 연결의 채널이 차단됐는지 확인하는 방법을 설명한다.
4.2.1 rabbitpy 로 연결 상태 확인하기
Connection.Blocked 알림을 지원하는 RabbitMQ 버전을 사용하거나 그렇지 않더라도 rabbitpy 라이브러리는 이 기능을 사용하기 쉬운 하나의 API 로 제공한다. Connection.Blocked 알림을 지원하는 RabbitMQ 버전에 연결되면 rabbitpy 는 알림을 수신하고 연결이 차단됐다는 내부 플래그를 설정한다.
import rabbitpy
connection = rabbitpy.Connection()
print('Channel is Blocked? %s' % connection.blocked)

4.2.2 연결 상태 확인을 위한 관리자 API 사용하기
RabbitMQ 3.2 이전 버전을 사용하는 경우 애플리케이션은 웹 기반 관리 API 를 사용해 연결 상태를 지속적으로 폴링 (polling) 해 확인할 수 있다.
채널의 상태를 질의하면 name, node, connection_details, consumer_count, client_flow_blocked 와 같은 여러 필드를 확인할 수 있다. client_flow_blocked 필드는 RabbitMQ 가 TCP 배압을 연결에 적용했는지를 나타낸다.
채널 상태를 확인하기 전에 먼저 해당 이름의 채널을 생성해야 한다. 채널의 이름은 연결 이름과 해당 채널 ID 를 기반으로 하는데, 연결 이름을 구성하려면 다음이 필요하다.
- 로컬호스트 IP 주소 및 출력 TCP 포트
- 원격호스트 IP 주소 및 TCP 포트
연결의 형식은 "LOCAL_ADDR : PORT -> REMOTE_ADDDR : PORT" 고, 이를 확장한 채널 이름의 형식은 "LOCAL_ADDR : PORT -> REMOTE_ADDDR : PORT (CHANNEL_ID)" 다.
RabbitMQ 의 관리자 API 에 채널 상태를 질의하는 API 는 http://host:port/api/channels/[CHANNEL_NAME] 이다.
API 결과는 JSON 직렬화된 객체로 반환된다.
4.3 요약
- 발행자가 메시지를 디스크에 저장해야 하는가?
- 애플리케이션의 다양한 구성 요소는 발행된 메시지가 수신됐는지 보장해야 하는가?
- TCP 배압으로 애플리케이션이 차단되거나 RabbitMQ 에 메시지를 발행하는 동안 연결이 차단된 경우 어떻게 되는가?
- 메시지가 얼마나 중요한가? 메시지 처리량을 높이기 위해 배달 보장을 희생할 수 있는가?
이러한 질문은 올바른 애플리케이션 아키텍처를 만드는 데 도움이 된다.
5. 메시지를 받지 않고 소비하기
이 장에서 다루는 내용
- 메시지 소비하기
- 소비자 메시지 처리량 조정
- 소비자와 큐가 배타적인 경우
- 사용자를 위한 서비스 품질 지정
애플리케이션이 메시지를 소비하는 방법을 결정하는 것은 이 균형을 찾는 첫 번째 단계로 '메시지를 단순히 가져올지' 아니면 '메시지를 소비해야 하는지' 라는 질문으로 시작한다.
- 단순히 메시지를 전달받지 않고 소비하는 것이 좋은 이유
- 메시지 배달 보장과 성능 간의 균형을 유지하는 방법
- RabbitMQ 의 큐별 설정을 사용해 큐를 자동을 삭제하고, 메시지의 수명을 제한하는 등의 작업
5.1 Basic.Get vs. Basic.Consume
RabbitMQ 는 큐에서 메시지를 가져오는 두 AMQP RPC 명령인 Basic.Get 과 Basic.Consume 을 구현했다. 이 장의 제목에서 알 수 있듯이 Basic.Get 은 서버에서 메시지를 검색하는 이상적인 방법은 아니다. 간단히 말해 Basic.Get 은 폴링 모델이지만 Baisc.Consume 은 푸시 기반 모델이다.
5.1.1 Basic.Get
소비자 애플리케이션이 Basic.Get 요청을 사용해 메시지를 검색한다면 큐에 메시지가 여러 개 있어도 메시지를 받을 때마다 새 요청을 보내야 한다. Basic.Get 을 발행할 때 메시지를 검색하는 큐에 대기 중인 메시지가 있으면 RabitMQ 는 Basic.GetOk RPC 로 응답한다. 없으면 Basic.GetEmpty 로 응답한다.
5.1.2 Basic.Consume
이를 흔히 발행자-구독자 패턴 또는 Pub-Sub 이라고 부른다.
소비자 태그
Basic.Consume 을 실행하면 RabbitMQ 로 열린 채널에서 애플리케이션을 식별하는 고유한 문자열이 생성된다. 소비자 태그 (consumer tag) 라고 불리는 이 문자열은 RabbitMQ 의 각 메시지와 함께 애플리케이션에 전송된다.
소비자 태그는 이후에 Basic.Cancel RPC 명령으로 RabbitMQ 에 메시지 수신을 취소할 때 사용할 수 있다.
애플리케이션이 다른 큐에서 받은 메시지에 대해 다른 작업을 수행해야 하는 경우, Basic.Consume 요청에 전달된 소비자 태그를 사용해 메시지 처리 방법을 구분할 수 있다. 그러나 대부분의 경우 소비자 태그는 클라이언트 라이브러리가 자동으로 처리하므로 크게 걱정할 필요는 없다.
consumer
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
for message in queue.consume_messages(no_ack=True):
message.pprint()
publisher
import rabbitpy
for iteration in range(10):
rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'go')
rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'stop')

반복문이 종료될 때 rabbitpy 라이브러리의 내부에는 몇 가지 동작이 실행된다. 먼저 라이브러리는 RabbitMQ 에 Basic.Cancel 명령을 전송한다. 이후 Basic.CancelOk RPC 응답이 수신되면 RabbitMQ 가 클라이언트에 처리되지 않은 메시지를 보낸 경우 rabbitpy 는 Basic.Nack 명령을 전송하고 RabbitMQ 메시지를 다시 큐에 삽입하도록 지시한다.
5.2 소비자 성능 조정
5.2.1 빠른 성능을 위해 무응답 모드로 메시지 소비하기
메시지를 소비할 때 애플리케이션은 자신을 RabbitMQ 에 등록하고 메시지를 사용할 수 있을 때 전달되도록 요청한다. 애플리케이션이 Basic.Consume RPC 요청을 보낼 때, 인자에는 no-ack 플래그가 있다. 이 플래그가 활성화되면 RabbitMQ 는 소비자가 메시지 수신확인을 하지 않으므로 RabbitMQ 가 가능한 한 빨리 메시지를 보낼 수 있다.
RabbitMQ 는 열려 있는 연결을 통해 메시지를 보낼 때 TCP 소켓 연결을 통해 클라이언트와 통신한다. 이 연결이 열려 있고 쓰기 가능한 경우 RabbitMQ 는 모든 것이 정상적으로 동작하며 메시지가 전달됐다고 가정한다.
리눅스에서 수신 소켓 버퍼 늘리기
리눅스 운영체제에서 수신 소켓 버퍼 수를 늘리려면 net.core.rmem_default 와 net.core.rmem_max 값을 기본 128KB 값에서 더 큰 값으로 늘려야 한다. 대부분의 환경에는 16MB (16777216) 값이 적합하다. 대부분의 배포판에서는 /etc/sysctl.conf 에서 이 값들을 변경하는데, 다음 명령을 실행해 수동으로 값을 설정할 수도 있다.
echo 16777216 > /proc/sys/net/core/rmem_default
echo 16777216 > /proc/sys/net/core/rmem_max
일회용 메시지의 경우, 가능한 한 가장 빠른 메시지 처리 속도를 만드는 가장 이상적인 방법이지만 중요한 위험 요소가 없는 것은 아니다. 소비자 애플리케이션이 운영체제의 수신 소켓 버퍼에 100KB 메시지를 버퍼링한 상태로 장애가 발생할 경우 어떻게 될지 생각해보면 알 수 있다. RabbitMQ 는 이미 메시지들을 보낸 것으로 간주하며 애플리케이션이 충돌하고 소켓이 닫힐 때 운영체제에서 읽어야 하는 메시지의 수를 표시하지 않는다. 애플리케이션이 직면하는 문제는 운영체제의 소켓 수신 버퍼 크기와 함께 메시지 크기 및 수량에 따라 달라진다.
메시지를 이런 방식으로 소비하는 것이 애플리케이션 이카텍처에 맞지 않지만, 단일 메시지 전달 후 메시지 수신이 제공하는 것보다 빠른 메시지 처리를 원한다면 소비자 채널 서비스 품질 설정의 프리페치 (prefetching) 를 제어하는 것이 좋다.
5.2.2 서비스 품질 설정을 통한 소비자 프리페치 제어
AMQP 스펙에서는 소비자가 메시지 수신을 확인하기 전에 미리 지정된 수의 메시지를 수신하도록 처리할 수 있는 서비스 품질 (QoS, Quality of Service) 설정을 채널에 요청 할 수 있다.
수신 확인을 비활성화 (no_ack=True) 한 소비자와 달리, 소비자 애프리케이션은 메시지를 확인하기 전에 충돌하는 경우 소켓을 닫으면 미리 가져온 모든 메시지가 큐로 반환된다.
프로토콜 수준에서 채널에 Basic.QoS RPC 요청을 보내면 서비스 품질이 지정된다.
이때 요청을 전송하는 채널에 대해서만 QoS 를 설정할지 혹은 연결된 모든 채널에 대해 QoS 설정할지 지정할 수 있다. Basic.QoS 요청은 언제든지 보낼 수 있지만, 보통 소비자가 Basic.Consume RPC 요청을 발행하기 전에 수행된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
message.ack()

노트
AMQP 스펙에는 Basic.QoS 메소드의 프리페치 수와 프리페치 크기를 모두 설정하지만, no-ack 옵션을 설정하면 프리페치 크기가 무시된다.
프리페치 값을 최적의 수준으로 교정
프리페치 수를 지나치게 할당하면 메시지 처리량에 부정적인 영향을 미칠 수 있음을 인식하는 것이 중요하다.
RabbitMQ 에서 동일한 큐에 연결된 다수 소비자는 라운드 로빈 (Round-Robin) 방식으로 메시지를 수신하는데, 고속의 소비자 애플리케이션에서는 프리페치 수가 성능에 영향을 미치는지 벤치마크하는 것이 중요하다. 특정 설정은 메시지 구성, 소비자 애플리케이션의 행동 혹은 운영체제 및 언어와 같은 다른 요인에 따라 영향도가 달라질 수 있다.
2500의 프리페치 카운트 값이 최고 메시지 속도에 가장 적합한 설정임을 확인할 수 있다.
한 번에 여러 메시지 확인하기
QoS 설정 중 유용한 또 다른 점은 Basic.Ack RPC 응답과 함께 받은 각 메시지를 개별적으로 하나씩 확인하지 않아도 되는 점이다.
대신 Basic.Ack RPC 응답의 multiple 속성을 True 로 설정해 반환하면 RabbitMQ 는 수신 확인하지 않은 모든 메시지를 수신 확인으로 처리한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
unacknowledged = 0
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
unacknowledged += 1
if unacknowledged == 10:
message.ack(all_previous=True)
unacknowledged = 0
동시에 여러 메시지를 수신 확인하면 메시지 처리에 필요한 네트워크 통신을 최소화해 메시지 처리량을 향상시킬 수 있다. 이 방식의 수신확인은 인정 수준의 위험이 따른다는 점을 주의해야 한다.
QoS 뿐만 아니라 트랜잭션도 애플리케이션에 대한 메시지 전달 보장을 향상시키는 방법이다.
초당 메시지 처리 수를 예상해 prefetch count 를 설정하자.
5.2.3 소비자 애플리케이션에서 트랜잭션 사용하기
메시지를 RabbitMQ 에 발행할 때와 마찬가지로 트랜잭션을 사용해 소비자 애플리케이션에서 일괄 작업을 커밋하고 롤백할 수 있다. 트랜잭션 (AMQP TX 클래스) 은 한 가지 예외적인 상황을 제외하고는 메시지 처리량에 부정적인 영향을 미칠 수 있다.
노트
메시지 수신 확인이 비활성화된 소비자 애플리케이션은 트랜잭션이 작동하지 않는다.
5.3 메시지 거부하기
메시지를 처리하는 중에 문제가 발생하면 어떻게 해야 할까? 이러한 경우 RabbitMQ 는 메시지를 메시지 브로커에 다시 전달하는 Basic.Reject, Basic.Nack 두 가지 매커니즘을 제공한다. 이번 절에서는 두 기능의 차이점과 거부된 메시지를 일괄 처리해서 시스템 문제를 파악하는데 용이한 RabbitMQ 전용 확장, 데드 레터 익스체인지에 대해 알아본다.
Basic.Nack 은 한 번에 여러 메시지를 거부할 수 있지만, Basic.Reject 는 한 번에 하나의 메시지만 거부할 수 있다.
5.3.1 Basic.Reject
전달받은 메시지를 처리할 수 없음을 메시지 브로커에 알리는 AMQP 의 RPC 응답이다.
Basic.Ack 와 마찬가지로 RabbitMQ 가 생성한 배달 태그 (delivery tag) 와 함께 소비자가 RabbitMQ 와 통신하는 채널의 메시지를 고유하게 식별한다. 소비자가 메시지를 거부하면 RabbitMQ 가 메시지를 삭제하거나 큐에 있는 메시지를 다시 삽입되도록 지시할 수 있다. 재삽입 (requeue) 플래그가 활성화되면 RabbitMQ 는 차후에 다시 처리되도록 큐에 메시지를 넣는다.
requeue 플래그는 데이터베이스나 원격 API 와 같은 다른 서비스와 통신하는 소비자 애플리케이션을 작성하는 데 종종 사용된다. Basic.Ack 는 소비자 애플리케이션에서 RabbitMQ 와 통신하고 있는 채널의 메시지를 고유하게 식별하기 위해 RabbitMQ 에서 만든 배달 태그를 전달한다. 소비자가 메시지를 거부하면 RabbitMQ 에서 메시지를 삭제하거나 큐에 메시지를 다시 삽입할 수 있다.
재삽입 플래그는 데이터베이스와의 연결이 끊어지거나 원격 API 에 연결하는 것이 실패하는 경우와 같은 원격지의 예외로 인해 실패한 경우, 재시도를 위해 소비자 애플리케이션에 로직을 구현하는 대신, 예외를 잡아서 재삽입 플래그를 활성화한 메시지를 거부해서 처리한다.
다음은 '두 번 실패 시 제거' 정책으로 구현돼 있다. 문제의 원인이 메시지 혹은 다른 이유인지가 불확실한 경우, redelivered 플래그를 검사해서 메시지를 다시 재삽입해야 하는지, 폐기해야 하는지를 결정할 수 있다.
import rabbitpy
for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
'test-messages'):
message.pprint()
print('Redelivered: %s' % message.redelivered)
message.reject(True)

Basic.Ack 와 마찬가지로 Basic.Reject 를 사용하면 no-ack 가 활성화되지 않은 상태로 전달된 후 메시지가 제거된다.
5.3.2 Basic.Nack
아쉽게도 AMQP 스펙에는 다중 메시지 거부 기능이 제공되지 않는다. RabbitMQ 팀은 AMQP 스펙의 단점을 보완하기 위해 Basic.Nack 이라는 새로운 RPC 응답 메소드를 구현했다.
Basic.Nack 은 '부정적인 수신 확인' 의 줄임말로, Basic.Reject 응답 메소드와 뜻이 유사해 혼동될 수 있다.
경고
다른 RabbitMQ 확장 스펙과 마찬가지로, Basic.Nack 은 QPID, ActiveMQ 와 같은 다른 AMQP 메시지 브로커에 존재하지 않을 수도 있다.
5.3.3 데드 레터 익스체인지
데드 레터 익스체인지 (DLX, Dead-Letter Exchange) 는 RabbitMQ 확장 스펙이며, 전달된 메시지를 거부할 수 있는 추가적인 기능이다. 이는 특정 메시지를 소비하는 데 발생한 문제의 원인을 찾는 데 유용하다.
예를 들어 한 소비자 애플리케이션에서 XML 기반 메시지를 가져와서 PDF 파일로 변환한다고 하자. 이 작업이 이전에는 잘 동작했지만, 현재는 계속 실패한다고 가정하자.
데드 레터 익스체인지가 없다면, 소비자 애플리케이션에 XML 문서를 나중에 접근할 수 있는 특정 디렉토리에 저장하는 코드를 작성해야 한다. 코드를 직접 작성하는 대신, 데드 레터 익스체인지를 이용해 또 다른 큐를 지정한 후 터미널에서 소비자 코드를 직접 실행해 메시지 발행자 애플리케이션이 문서를 생성할 때, 유니코드 문자가 처리되는 방식과 관련된 문제 등의 원인을 확인할 수 있다.
큐에 없는 메시지가 거부되면 RabbitMQ 는 메시지를 큐의 x-dead-letter-exchange 인수에 지정된 익스체인지로 라우팅한다.
노트
데드 레터 익스체인지는 4장에서 알아본 대체 일스체인지와는 다르다. 만료되거나 거절된 메시지가 데드 레터 익스체인지를 통해 전달되는 반면, 대체 인스체인지는 RabbitMQ 가 라우팅 할 수 없는 메시지가 전달된다.
큐를 선언할 때 데드 레터 익스체인지를 지정하는 것은 간단하다. Queue.Declare RPC 요청을 발행할 때 rabbitpy Queue 객체를 생성하고 익스체인지의 이름과 dead_letter_exchange 인수 혹은 x-dead-letter-exchange 인수를 함께 전달하면 된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
rabbitpy.Exchange(channel, 'rejected-messages').declare()
queue = rabbitpy.Queue(channel, 'dlx-example',
dead_letter_exchange='rejected-messages')
queue.declare()

익스체인지를 지정하느 것 외에, 라우팅 키를 미리 지정된 값으로 대체할 수도 있다. 이렇게 하면 데드 레터 (dead-lettered) 메시지에 대해 동일한 익스체인지를 사용하지만 데드 레터 메시지를 서로 다른 큐에 전달할 수 있다. 미리 지정된 라우팅 키를 설정하려면 큐를 선언할 때 x-dead-letter-routing-key 라는 추가 인수를 지정해야 한다.
노트
AMQP 표준에 따라 RabbitMQ 의 모든 큐는 선언 후에 설정을 변경할 수 없다. 특정 큐의 데드 레터 익스체인지 설정을 변경하려면, 큐를 삭제하고 재선언해야 한다.
데드 레터 익스체인지가 애플리케이션 아키텍처에서 활용될 수 있는 시나리오는 많다. 잘못된 형식의 메시지를 안전한 저장소에 저장하는 것부터 거부된 신용카드 승인 처리와 같은 직접적인 기능까지 데드 레터 익스체인지는 매우 강력하지만, 큐를 선언할 때 간단한 옵션으로 지정할 수 있으므로 종종 유용성이 간과되는 경향이 있다.
5.4 큐 제어하기
일부 애플리케이션은 여러 소비자가 동일한 큐를 구독해야 할 수도 있고 다른 경우 단일 소비자만 있어야 할 수 있다. 채팅 애플리케이션은 큐의 메시지를 임시로 저장하고 사용자당 하나의 큐를 만들어야 하지만, 신용카드 처리 애플리케이션에서는 항상 존재하는 영구적인 큐를 만들어야 할 수 있다.
큐를 정의할 때, 큐의 동작을 결정하는 설정은 다음과 같이 다양하다.
- 자동 삭제 큐
- 큐 독점 설정
- 자동 메시지 만료
- 대기 메시지 수 제한
- 오래된 메시지 큐에서 제거
AMQP 스펙에 따라 큐 설정이 불변 (immutable) 이라는 점을 유의하자. 큐를 선언한 후에는 큐를 만들 때 지정한 설정을 변경할 수 없고, 큐의 설정을 변경하려면 큐를 삭제하고 다시 만들어야 한다.
큐를 생성할 때 사용할 수 있는 다양한 설정을 알아보기 위해 우선 임시 큐를 생성하는 방법을 알아보자.
5.4.1 임시 큐
자동삭제 큐
소비자와 연결을 맺고 메시지를 전달한 후 연결을 끊으면 큐는 제거된다. 자동삭제 큐를 생성하는 것은 쉬운 작업으로 Queue.Declare RPC 요청에서 auto_delete 플래그를 True 로 설정하면 된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'ad-example', auto_delete=True)
queue.declare()

자동 삭제 큐를 다수 소비자가 구독할 수 있다는 점에 유의해야 한다. 큐는 구독자가 더 이상 없을 때만 자신을 삭제한다.
한 사용 사례로서 채팅 애플리케이션에서 각 큐를 사용자의 입력 버퍼로 사용하는 경우가 있다. 채팅 애플리케이션에서 사용자의 연결이 끊어지면 큐에 읽지 않은 메시지가 있더라도 자동으로 삭제하는 데 이용하기도 한다.
또 다른 예를 살펴보면, RPC 스타일 애플리케이션에서 소비자에게 RPC 요청을 보내고 응답을 RabbitMQ 가 전달할 경우에 애플리케이션이 종료되거나 연결이 끊어질 때 자신을 삭제하는 큐를 만들어서 RabbitMQ 가 애플리케이션을 종료할 때 자동으로 큐를 정리하도록 할 수 있다. 이 경우 RPC 응답 큐는 원래 RPC 요청을 발행한 애플리케이션에서만 사용할 수 있어야 한다.
큐 독점 설정
RabbitMQ 에서 큐의 독점 (exclusive) 설정을 사용하지 않는다면 다수 소비자가 큐를 구독할 수 있는데, 큐를 구독해서 메시지를 소비하는 소비자의 수에 대한 제한은 없다. 실제로 큐는 메시지를 수신하는 모든 구독자에게 라운드 로빈으로 메시지를 전달한다.
큐에 독점 기능을 활성화하면 소비자가 연결 해제된 후 큐가 자동으로 제거된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'exclusive-example',
exclusive=True)
queue.declare()
독점 큐는 큐를 생성한 채널이 닫히면 자동으로 삭제되는데, 이는 자동 삭제를 설정한 큐를 소비자가 더 이상 구독하지 않으면 삭제되는 것과 유사하다. 자동 삭제 큐와는 달리 채널이 닫힐 때까지 독점 큐를 구독하는 사용자가 원하는 만큼 여러 번 소비하고 취소할 수 있다. 또한 자동 삭제 큐와 달리 Basic.Consume 요청이 발행됐는지에 관계없이 독점 큐를 자동 삭제가 발생한다는 점도 중요하다.
자동 만료 큐
자동 만료 큐는 시간에 민감한 작업에 대해 RPC 응답을 무기한으로 대기하지 않을 경우 유용하다. 만료 값이 설정된 RPC 응답 큐를 만든 후에 해당 큐가 만료되면 큐가 자동으로 삭제된다. 큐를 선언하고 큐의 존재를 폴링해서 대기 중인 메시지가 있거나 큐가 더 이상 존재하지 않는지 확인할 수 있다. x-expires 인수로 큐를 선언하면 된다.
import rabbitpy
import time
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-queue',
arguments={'x-expires': 1000})
queue.declare()
messages, consumers = queue.declare(passive=True)
time.sleep(2)
try:
messages, consumers = queue.declare(passive=True)
except rabbitpy.exceptions.AMQPNotFound:
print('The queue no longer exists')

자동 만료 큐와 관련해 다음과 같은 몇 가지 규칙이 있다.
- 큐는 소비자가 없으면 만료된다. 연결된 소비자가 있을 경우 큐는 Basic.Cancel 을 실행하거나 연결을 해제한 후에만 자동으로 제거된다.
- 큐는 TTL 지속 시간 동안 Basic.Get 요청이 없으면 만료된다. 하나의 Basic.Get 요청이 만료 값이 있는 큐로 전송되면 만료 설정이 초기화되고 큐는 자동으로 삭제되지 않는다.
- 다른 일반적인 큐와 마찬가지로 x-expires 설정은 다시 설정되거나 변경될 수 없다.
- RabbitMQ 는 큐가 만료될 때 즉시 제거되는 것을 보장하지 않는다.
5.4.2 영구적인 큐
내구성 큐
서버를 재시작한 후에도 계속 유지돼야 하는 큐를 선언하려면 내구성 (durability) 플래그를 True 로 설정해야 한다. 큐의 내구성은 종종 메시지 지속성 (persistence) 과 혼동된다. 이전 장에서 알아본 것 처럼 delivery-mode 속성을 2로 설정해 메시지를 발행하면 메시지가 디스크에 저장된다. 반대로 내구성 플래그는 Queue.Delete 가 호출되기 전까지 RabbitMQ 가 큐를 삭제하지 않도록 설정한다.
일반적으로 RPC 스타일 애플리케이션은 소비자와 주고받는 큐를 사용하지만, 내구성 큐는 여러 소비자를 같은 큐에 연결한 후 메시지 흐름이 동적으로 변경되지 않는 애플리케이션에 매우 편리하다.

큐에서 메시지의 자동 만료
미션 크리티컬한 애플리케이션이 아닌 경우, 메시지를 너무 오랫동안 소비하지 않을 때 자동으로 삭제하는 기능이 필요할 수 있다. 메시지 단위 TTL 설정을 통해 서버 측에서 메시지의 최대 수명에 대한 제약 조건을 걸 수 있다. 데드 레터 익스체인지와 TTL 값을 모두 설정한 큐는 만료 시에 메시지를 데드 레터로 간주한다.
메시지마다 적용되는 메시지의 expiration 속성과 달리 x-message-ttl 큐 설정은 큐에 있는 모든 메시지의 최대 수명을 적용한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-msg-queue',
arguments={'x-message-ttl': 1000})
queue.declare()


큐에 있는 메시지별 TTL 은 메시지의 사용처에 따라 서로 다른 값으로 지정할 때 사용한다. 실시간 대시보드에서 메시지를 자동으로 만료시키는 큐를 작성해 큐에서 수신 대기중인 오래된 정보를 수신하는 것을 방지할 수도 있다.
제한된 수의 메시지 보관
RabbitMQ 3.1.0 부터 큐의 메시지 최대 크기를 설정할 수 있다. 큐에 x-max-length 인수를 설정한 후 대기 중인 메시지 수가 최대 크기에 도달하면 RabbitMQ 는 새로운 메시지가 추가될 때 가장 먼저 받은 메시지를 삭제한다. x-max-length 로 선언한 큐는 클라이언트가 채팅방의 n 개의 최신 메시지를 받는 데 사용할 수 있다.
메시지 최대 크기를 설정한 큐의 맨 앞에서 제거된 메시지는 데드 레터 익스체인지를 설정한 경우 해당 익스체인지로 이동한다.


5.4.3 임의의 큐 설정
RabbitMQ 팀은 큐와 관련된 AMQP 스펙을 확장하는 새로운 기능을 구현했는데, 각 기능들은 큐를 정의할 때 인수로 전달한다.
인수 이름 | 목적 |
x-dead-letter-exchange | 메시지가 재삽입되지 않고 거부될 때, 라우팅할 익스체인지 |
x-dead-letter-routing-key | 거부된 메시지를 라우팅하는 데 사용하는 라우팅 키 |
x-expires | 지정된 시간 (밀리초 단위) 후에 큐를 삭제 |
x-ha-policy | HA 큐를 만들 때, 노드 간에 HA 를 적용하는 정책 지점 |
x-ha-nodes | HA 큐를 분산할 노드 (4.1.6절 참고) |
x-max-length | 큐에서 지정하는 메시지 만료 시간 (밀리초 단위) |
x-message-ttl | 큐의 최대 메시지 수 |
x-max-priority | 최대 값이 255인 큐의 우선순위를 지정하는 데 사용 (RabbitMQ 버전 3.5.0 이상) |
5.5 요약
소비자 애플리케이션을 작성할 때는 다음 질문을 고려해 애플리케이션에 적합한 옵션을 찾는 것을 추천한다.
- 모든 메시지를 수신했는지 또는 폐기할 수 있는지 확인해야 하는가?
- 메시지를 받은 다음 일괄적으로 수신 확인하거나 거부해야 하는가?
- 그렇지 않다면, 개별 작업을 자동으로 일괄 처리하고 트랜잭션을 사용해 성능을 향상시킬 수 있는가?
- 소비자 애플리케이션에서 트랜잭션 커밋 및 롤백 기능이 정말로 필요한가?
- 소비자가 구독하는 큐의 메시지를 독점적으로 접근해야 하는가?
- 소비자 애플리케이션에 오류가 발생했을 때 어떻게 처리해야 하는가? 메시지를 버려야 하는가? 큐에 재삽입해야 하는가? 혹은 데드 레터 익스체인지로 보내야 하는가?
6장에서는 여러 가지 메시징 패턴 및 사용 사례를 사용해 실제로 활용하는 방법을 알아보자.
6. 익스체인지 라우팅을 통한 메시지 패턴
이 장에서 다루는 내용
- RabbitMQ 가 제공하는 기본 유형 익스체인지와 플러그인 익스체인지
- 애플리케이션 아키텍처에 적합한 익스체인지 유형 선택하기
- 익스체인지 간 라우팅을 통한 메시지의 다양한 라우팅 옵션
아마도 RabbitMQ 의 최대 강점은 발행자 애플리케이션이 제공한 라우팅 정보를 기반으로 메시지를 서로 다른 큐로 유연하게 라우팅할 수 있다는 점일 것이다.
간단한 애플리케이션에는 복잡한 라우팅 로직이 필요하지 않지만, 올바른 유형의 익스체인지를 선택하는 것은 전체 애플리케이션 아키텍처에 큰 영향을 줄 수 있다.
이 장에서는 네 가지 기본 유형의 익스체인지와 각 익스체인지에 적절한 아키텍처 유형을 알아본다.
- 다이렉트 (direct) 익스체인지
- 팬아웃 (fanout) 익스체인지
- 토픽 (topic) 익스체인지
- 헤더 (headers) 익스체인지
토픽 익스체인지는 라우팅 키의 와일드카드 매칭을 기반으로 메시지를 선택적으로 전달하며, 헤더 익스체인지는 메시지 라우팅에 메시지 자체를 사용하는 새로운 방식을 제공한다.
마지막으로 단일 큐를 공유하는 여러 소비자 애플리케이션에 소비자의 처리량을 올리기 위해 사용하는 플러그인, 컨시스턴트 해싱 (consistent hashing) 익스체인지에 대해 알아본다.
6.1 다이렉트 익스체인지를 사용한 간단한 메시지 라우팅
다이렉트 익스체인지는 특정 큐 또는 특정 큐 그룹에 메시지를 전달할 떄 유용하다. 메시지를 발행할 때 사용하는 라우팅 키와 동일한 키로 익스체인지에 바인딩된 모든 큐에 메시지가 전달된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
exchange = rabbitpy.Exchange(channel, 'direct-example',
exchange_type='direct')
exchange.declare()
다이렉트 익스체인지는 매우 단순하며 RPC 메시징 패턴에서 응답 메시지의 라우팅에 적합하다.
6.1.1 애플리케이션 아키텍처 만들기
이미지를 익스체인지에 발행할 때 응답 큐 이름은 메시지 속성의 reply-to 필드에, 요청 ID 는 correlation-id 필드에 저장된다.
익스체인지 선언하기
RabbitMQ 에 연결되면 RPC 요청 메시지를 라우팅하는 익스체인지와 RPC 응답을 라우팅하는 익스체인지를 선언한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
for exchange_name in ['rpc-replies', 'direct-rpc-requests']:
exchange = rabbitpy.Exchange(channel, exchange_name,
exchange_type='direct')
exchange.declare()
RPC 작업 처리를 위한 익스체인지를 선언한 후에는 이어서 RPC 작업자를 만든다.
6.1.2 RPC 작업자 생성하기
노트
큐를 생성할 때 이름을 생략하면 RabbitMQ 가 자동으로 큐의 이름을 생성한다.
RPC 요청 처리하기
rabbitpy.Queue.consume_messages 메소드를 사용하는데, 이는 파이썬 컨텍스트 매니저로도 사용된다. 파이썬 컨텍스트 매니저는 with 문에 의해 호출되는 언어 구조다. 종료할 때 실행되는 메소드인 __enter__ 와 __exit__ 를 정의해야 한다.
콘텐츠 매니저를 사용해 코드를 호출하면 rabbitpy 가 AMQP RPC 요청인 Basic.Consume 과 Basic.Cancel 을 처리하기 때문에 로직 코드에만 집중할 수 있다.
rabbitpy 는 timestamp 속성을 파이썬 datetime 객체로 자동 변환하므로 메시지를 발행한 후 초 단위로 계산하려면 값을 유닉스 UNIX 시간으로 다시 변환해야 한다.
결과 응답하기
headers 속성에 메시지를 처음 발행할 때의 timestamp 를 설정하는데, 발행자 애플리케이션에서 요청에서 응답까지의 총 시간을 측정해 모니터링하는 데 사용할 수 있다.
6.1.3 간단한 RPC 발행자 코드 작성하기
외부 라이브러리 설정하기
..,.
2부 데이터 센터 또는 클라우드에서 RabbitMQ 운영하기
3부 통합과 맞춤 설정
'DevOps > 스터디' 카테고리의 다른 글
따라하며 배우는 도커와 CI 환경 (1) | 2023.03.26 |
---|---|
시작하세요! 도커 / 쿠버네티스 (0) | 2023.02.05 |
시작하세요! 도커 / 쿠버네티스 legacy (0) | 2022.06.17 |
따라하며 배우는 도커와 CI환경 (0) | 2022.03.11 |
RabbitMQ IN DEPTH 책을 공부하며 작성한 글.
이 책은 RabbitMQ 3.6.3 을 기준으로 저술되었다.
예제 코드
이 책에 나오는 모든 예제 코드는 매닝 웹사이트 (https://www.manning.com/books/rabbitmq-in-depth) 와 깃허브 저장소 (https://github.com/gmr/RabbitMQ-in-Depth) 에서 무료로 다운로드할 수 있다.
에이콘출판사 도서정보 페이지 http://www.acornpub.co.kr/book/rabbitmq-depth 에서도 예제 코드를 다운로드할 수 있다.
1부 RabbitMQ 와 애플리케이션 아키텍처
1. RabbitMQ 살펴보기
RabbitMQ 의 기능과 장점
- 오픈소스
- 플랫폼과 업체 중립성
- 경량성
- 다양한 클라이언트 라이브러리
- 유연한 성능과 안정성 절충 제어
- 대기 시간이 긴 네트워크 환경을 위한 플로그인
- 서드파티 플러그인
- 보안 계층
1.1.1 RabbitMQ 와 얼랭
RabbitMQ 는 얼랭으로 구현됨
1.1.2 RabbitMQ와 AMQP
2007년 처음 출시했을 때 AMQP 스펙을 구현한 최초의 메시지 브로커 중 하나였다. AMQP 자체가 RabbitMQ 에 영향을 많이 줬다.
AMQP에 대한 자세한 내용은 https://www.amqp.org/ 에서 확인할 수 있다.
메시지 프로토콜과 메시지 브로커는 다양하므로, 애플리케이션에 미치는 영향을 고려해 메시지 프로토콜과 메시지 브로커를 선택해야 한다. RabbitMQ 는 AMQP 를 기반으로 구현됐지만 MQTT, STOMP, XMPP 등 다양한 프로토콜도 제공한다. 멀티 프로토콜 애플리케이션 아키텍처에는 좋은 선택이다.
1.2 RabbitMQ 를 사용하는 곳들
- NASA 의 Nebula
- 아구라 게임즈
- Ocean Observations Initiative
- 래포티브
- 메르카도리브레
- 구글의 AdMob
- Andhaar
1.3 느슨하게 결합된 아키텍처의 장점
로그인 서비스에서 마지막으로 로그인한 시간을 비동기로 작업하므로, 애플리케이션은 즉시 인증된 회원 페이지로 이동한다.
1.3.1 애플리케이션의 의존성 제거
애플리케이션 아키텍처는 더 이상 데이터베이스 쓰기 성능에 영향을 받지 않는다. 핵심 애플리케이션 코드를 수정하지 않고도 데이터를 처리하는 새로운 애플리케이션을 쉽게 추가할 수 있다.
1.3.4 데이터와 이벤트 복제
Federation 플러그인은 WAN 허용 오차 및 네트워크 단절을 고려해서 원격 RabbitMQ 인스턴스에 메시지를 전달한다.
1.3.5 다중 마스터 Federation
노트
메시지 지향 미들웨어를 사용하면 어느 정도의 운영상 복잡성이 발생한다. 메시지 브로커는 구조상 중심점으로 애플리케이션 설계상 새로운 단일 장애 지점 (Single point of failure) 이 된다.
1.3.6 AMQ 모델
HTTPj, SMTP 와 같은 프로토콜과 달리 AMQP 스펙은 네트워크 프로토콜의 정의뿐 아니라 서버 측 서비스와 동작 방식도 정의하는데, AMQ (Advanced Message Queuing) 모델을 살펴보면 확인할 수 있다. AMQ 모델은 메시지 라우팅 동작을 정의하는 메시지 브로커의 세 가지 추상 컴포넌트를 다음과 같이 논리적으로 정의한다.
- 익스체인지: 메시지 브로커에서 큐에 메시지를 전달하는 컴포넌트
- 큐: 메시지를 저장하는 디스크상이나 메모리상의 자료 구조
- 바인딩: 익스체인지에 전달된 메시지가 어떤 큐에 저장돼야 하는지 정의하느 컴포넌트
익스체인지
익스체인지 Exchange 는 RabbitMQ 에서 메시지를 적절한 목적지로 전달하기 위해 필요한 첫 번째 입력 값으로 AMQ 모델이 정의하는 세 컴포넌트 중 하나다. 여러 유형의 익스체인지가 있다. 특히 플러그인을 사용해서 직접 커스텀 익스체인지도 정의할 수 있다.
큐
큐는 수신한 메시지를 저장하는 역할
바인딩
바인딩을 이용해 큐와 익스체인지의 관계를 정의한다.
바인딩 키는 익스체인지가 어떤 큐에 메시지를 전달해야 하는지를 의미한다.
익스체인지에 메시지를 발행할 때 애플리케이션은 라우팅 키 (routing-key) 속성을 사용한다. 라우팅 키는 때로는 큐의 이름이거나 의미적으로 메시지를 설명하는 문자열이다. 익스체인지는 라우팅 키를 바인딩 키에 맞춰서 평가한다. 즉, 바인딩 키는 큐를 익스체인지에 연결하고 라우팅 키를 평가하는 기준이다.
RabbitMQ 에서느 AMQP 스펙을 확장해 특정 익스체인지를 다른 익스체인지에 연결할 수 있는데, 이는 메시지를 라우팅하는 데 상당한 유연성을 제공한다.
2. AMQP와 RabbitMQ 코드 작성하기
2.1 RPC 전송으로서의 AMQP
RabbitMQ 는 거의 모든 부분에서 RPC (Remote Procedure Call) 패턴으로 엄격하게 통신한다. RPC 는 한 컴퓨터에서 다른 컴퓨터의 프로그램이나 프로그램의 메서드를 원격에서 실행할 수 있게 해주는 컴퓨터 간의 통신 유형 중 하나다. 원격 API 와 통신하는 웹 프로그램이 있다면, 이는 일반적인 RPC 패턴을 사용했다고 볼 수 있다.
AMQP 스펙은 서버와 클라이언트 모두 명령을 실행할 수 있다.
2.1.1 대화 시작하기
AMQP 로 통신을 시작할 때, 인사말은 프로토콜 헤더 (Protocol header) 에 해당되는데 클라이언트가 서버로 전송한다. RabbitMQ 는 Connection.start 명령으로 인사말에 응답해 명령/응답 흐름을 시작하고 클라이언트는 Connection.StartOk 응답 프레임으로 RPC 요청에 응답한다.
2.1.2 올바른 채널로 튜닝
RabbitMQ 를 이용해서 클라이언트 애플리케이션을 구현할 때는 복잡하게 너무 많은 채널을 사용하지 않는 것이 좋다.
2.2 AMQP 의 RPC 프레임 구조
Connection.Start (클래스.메소드)
2.2.1 AMQP 프레임 컴포넌트
저수준 AMQP 프레임은 다섯 개의 별개 구성 요소로 구성된다.
- 프레임 유형
- 채널 번호
- 프레임 크기 (바이트)
- 프레임 페이로드 (Payload)
- 끝 바이트 표식 (ASCII 값 206)
2.2.4 메소드 프레임 해부하기
노트
메시지를 발행할 때 mandatory 플래그를 사용하는 경우 애플리케이션은 RabbitMQ 에서 응답한 Basic.Return 명령을 수신해야 한다. RabbitMQ 가 mandatory 플래그에 설정한 요구 사항을 충족하지 못하면 Basic.Return 명령을 동일한 채널의 클라이언트에 전송한다. 자세한건 4장에서 다룬다.
2.3 프로토콜 사용하기
메시지를 큐에 발행하기 전에 몇 가지 설정 단계를 거쳐야 하는데, 최소한 익스체인지와 큐를 설정한 후 둘을 연결해야 한다.
2.3.1 익스체인지 선언하기
AMQ 모델에서 익스체인지는 큐와 같이 '1급 시민' 으로 AMQP 스펙에 해당 클래스가 존재한다.
Exchange.Declare 명령을 전송하면 RabbitMQ 는 익스체인지를 생성한 후 Exchange.DeclarOk 메소드 프레임을 응답으로 전송한다. Declare 명령이 실패하면 RabbitMQ sms Exchange.Declare 가 실패하고 채널이 닫힌 이유를 나타내는 숫자 응답 코드와 텍스트 값을 Channel.Close 명령에 포함시켜 전송하고 Exchange.Declare 명령이 전송된 채널을 닫는다.
2.3.2 큐 선언하기
Queue.Declare 명령이 실패하면 채널이 닫힌다.
큐를 선언할 때 동일한 Queue.Declare 명령을 두 번 이상 전송해도 문제는 발생하지 않는다. RabbitMQ 는 중복된 큐 선언을 감지해 큐에 대기 중인 메시지의 수와 구독 중인 구독자의 수와 같이 큐에 대한 유용한 상태를 반환한다.
정상적으로 에러 처리하기
이미 생성한 큐와 같은 이름이지만, 속성이 다른 큐를 선언하려고 시도하면 RabbitMQ 는 RPC 요청을 발행한 채널을 닫는다. 예를 들어 가상 호스트 (virtual host) 의 설정에 대해 접근 권한이 없는 사용자가 Queue.Declare 명령을 실행하면 403 에러가 반환되고 채널은 닫힌다.
클라이언트 애플리케이션이 에러를 정상적으로 처리하려면 RabbitMQ 로부터 Channel.Close 명령을 전달받아 적절하게 응답해야 한다. 특정 클라이언트 라이브러리는 에러 응답을 애플리케이션이 처리할 수 있는 예외로 변환해 처리하며, 다른 유형의 라이브러리는 사용자가 메소드를 등록할 때 콜백을 추가하도록 하고 Channel.Close 명령을 보낼 때 콜백을 호출하는 식으로 처리하기도 한다.
클라이언트 애플리케이션이 서버에서 전송하는 이벤트를 수신하지 않거나 적절하게 처리하지 않으면 메시지가 손실될 수 있다. 존재하지 않거나 이미 닫힌 채널에 메시지를 발행하는 경우 RabbitMQ 는 연결을 종료한다. 메시지를 소비하는 애플리케이션이 RabbitMQ 가 채널을 닫은 사실을 모르는 경우 RabbitMQ 가 메시지를 더는 전송하지 않지만, 클라이언트는 빈 큐를 구독하고 있다고 간주하는 문제가 발생한다.
2.3.3 큐와 익스체인지 연결하기
Queue.Declare 와 유사하게 큐를 익스체인지에 연결하는 명령인 Queue.Bind 는 한번에 하나의 큐만 지정한다. 성공적으로 처리된 경우 클라이언트 애플리케이션에 Queue.BindOk 메소드 프레임을 전송한다.
위는 모두 동기이며, 비동기 명령들도 있다.
2.3.4 RabbitMQ 에 메시지 발행하기
Basic.Publish 메소드 프레임에는 익스체인지의 이름과 라우팅 키가 들어있는데, 이를 RabbitMQ 는 익스체인지의 이름을 저장한 데이터베이스와 비교한다.
팁
RabbitMQ 에 존재하지 않는 익스체인지에 메시지를 발행하는 경우, 기본적으로 메시지는 자동으로 버려진다. 메시지가 제대로 발행됐는지 확인하려면, 메시지 발행 시에 mandatory 플래그를 true 로 설정하거나 발행자 확인 (publisher confirmations) 을 사용해야 하는데, 이 옵션에 대해서는 4장에서 자세히 알아본다. 이 중 하나를 사용하면 애플리케이션의 메시지 발행 속도가 저하될 수 있으니 주의해야 한다.
큐를 구독하는 소비자가 없어서 메시지를 소비하지 않는다면 메시지는 큐에 계속 저장되고 메시지를 더 추가할수록 큐의 크기도 커진다. RabbitMQ 는 메시지의 Basic.Properties 에 지정된 배달 모드 (delivery mode) 에 따라 메시지를 메모리에 보관하거나 디스크에 기록한다. 배달 모드는 매우 중요하므로 다음 4장에 걸쳐 자세히 알아본다.
2.3.5 RabbitMQ 에서 메시지 소비하기
소비자 애플리케이션은 Basic.Consume 명령을 실행해서 RabbitMQ 의 큐를 구독한다.
Basic.Cunsume 이 발급되면 특정 상황이 발생하기 전까지 활성 상태를 유지한다. 소비자가 메시지 수신을 중지하려면 Basic.Cancel 명령을 발행해야 한다.
소비자는 Basic.CancelOk 응답 프레임을 받기 전에 RabbitMQ 가 미리 할당된 메시지 수만큼 메시지를 받을 수 있다.
메시지를 소비할 때 RabbitMQ 에는 소비자의 수신 방식을 알 수 있는 몇 가지 설정이 있는데, 그 중 하나는 Basic.Consume 명령의 no_ack 인수다. no_ack 를 true 로 설정하면 RabbitMQ 는 소비자가 Basic.Cancel 명령을 보내거나 연결을 끊을 때 까지 계속 메시지를 보낸다. no_ack 플래그를 false 로 설정하면 소비자는 Basic.Ack RPC 요청을 전송해 수신한 각 메시지를 확인해야 한다.
2.4 파이썬으로 메시지 발행자 작성하기
일단 도커로 RabbitMQ 를 실행해 보자.
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3.8-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
그리고
http://localhost:15672
로 접속을 해서 ID PASSWORD 를 guest/guest 로 접속해보자.
exchange 를 선언해보자.
python example/Examples/2.4\ Publisher\ Example.py
# exchange = rabbitpy.Exchange(channel, 'chapter2-example')


메시지 발행방법 요약
- connection.channel 로 채널 생성
- exchange 생성 후 declare
- queue declare
- queue 에 exchange 와 bind
- 메시지에 채널 명시해 생성 후, exchange 에 routing key 와 함께 publish
팁
운영환경에서 발행자 애플리케이션을 작성하는 경우 JSON 또는 XML 과 같은 데이터로 직렬화하면 소비자가 메시지를 쉽게 디코딩할 수 있으므로 문제가 발생하는 경우 더 쉽게 원인을 찾을 수 있다.

Get messages 를 하면

3. 메시지 속성 심층 탐사
메시지를 설명하기 위한 일관된 방법을 찾기 위해 RabbitMQ 에 발행된 모든 메시지와 함께 전달되는 데이터 구조인 AMQP 스펙의 Basic.Properties 를 살펴봤다. Basic.Properties 를 활용하면 메시지가 자동으로 제거되거나, 처리하기 전에 메시지의 출처와 유형을 검증할 수 있는 더욱 지능적인 소비자 애플리케이션의 작성이 가능했다. 3장에서는 메시지의 각 속성과 다양한 용도로 사용할 수 있는 Basic.Properties 에 대해 자세히 살펴본다.
3.1 메시지 속성 적절히 사용하기
발행한 메시지는 세 가지 유형의 프레임인 Basic.Publish 메소드 프레임, 콘텐츠 헤더 프레임, 바디 프레임으로 구성된다.
콘텐츠 헤더 프레임에 있는 메시지 속성은 Basic.Properties 데이터 구조로 사전에 정의한 값이 있는 집합이다. delivery-mode 와 같은 일부 속성은 AMQP 스펙에서 정의한 의미를 갖지만, type 과 같이 정확한 스펙이 없는 속성들도 있다.
delivery-mode 속성은 메시지가 큐에 있을 때 메시지를 메모리에 보관할지, 디스크에 먼저 저장해야 할지 RabbitMQ 에 알리는 데 사용된다.
팁
RabbitMQ 에서 MQTT 와 같은 프로토콜을 사용하는 경우, AMQP 에 특정된 속성은 사용할 수 없게 되므로 특정 속성이 손실되지 않도록 유의해야 한다.
각 기본 속성을 살펴보자.
- content-type: 메시지 본문 해석하는 방법
- content-encoding: 메시지 본문이 어떤 방법으로 압축되거나 인코딩됐는지
- message-id, correlation-id: 메시지와 메시지 응답을 고유하게 식별해 메시지 추적
- timestamp: 메시지 생성 시점에 대한 표준시간 전달
- expriation: 메시지 만료 전달
- delivery-mode: 큐에 메시지 추가할 때, 디스크 또는 메모리에 저장할지를 전달
- app-id, user-id: 문제가 발생한 애플리케이션을 추적
- type: 발행자와 소비자 사이에 계약 (contract) 정의
- reply-to: 패턴을 값으로 전달해 응답 메시지를 라우팅할 때 사용
- headers: RabbitMQ 에 메시지를 라우팅할 때, 사용자 정의 형식의 속성을 정의하는 데 사용
3.2 content-type 으로 명시적 메시지 계약 작성하기
HTTP 의 content-type 과 같다. application/json 같은 것
파이썬에서 라이브러리는 content-type 헤더에서 메시지가 어떤 유형으로 직렬화됐는지 감지하고, 이를 사용해 메시지 본문을 자동으로 디코딩해 dict, list 또는 다른 원시 데이터 유형으로 변환할 수 있다. 이를 통해 소비자 애플리케이션의 코드 복잡성을 현저하게 줄일 수 있다.
3.3 gzip, content-encoding 으로 메시지 크기 줄이기
AMQP 를 이용해서 전달한 메시지는 기본적으로 압축되지 않는다. 이는 XML 과 같이 지나치게 자세한 마크업 문법이나 큰 메시지에는 JSON, YAML 과 같이 마크업을 사용하지 않는 경우에도 문제가 될 수 있다. 서버에서 웹 페이지를 gzip 으로 압축하고 브라우저가 렌더링하기 전에 압축을 푸는 것과 마찬가지로 발행자는 메시지를 발행하기 전에 압축하고 소비자로부터 메시지를 전달받아 압축을 풀 수 있다.
content-encoding 속성으로 메시지 본문이 base64 혹은 gzip 과 같은 특수한 형식으로 인코딩됐는지 알 수 있다.
운영 환경에서는 발행자와 소비자의 메시지 계약을 운영 중에 변경하지 않는 것이 바람직하며 기존 코드에 대한 잠재적 영향을 최소화하는 것이 좋다. 그러나 메시지 크기가 애플리케이션의 전체 성능이나 안정성에 영향을 미쳐서 본문 인코딩의 변경이 불가피한 경우, content-encoding 헤더를 사용하면 소비자가 메시지의 형식을 사전에 확인할 수 있으므로 메시지 본문을 적절하게 디코딩할 수 있다.
노트
AMQP 클라이언트는 자동으로 content-encoding 값을 UTF-8 로 설정하지만, 이는 잘못된 동작이다. AMQP 스펙에는 content-encoding 이 MIME 콘텐츠 인코딩을 저장하기 위한 것이라고 명시돼 있다.
MIME 이메일 마크업은 병렬 처리가 가능하도록 content-encoding 필드를 사용해 이메일의 각 파트에 대한 인코딩을 표현한다. 이메일에서 가장 일반적인 인코딩 유형은 Base64 와 QP (Quoted-Printable) 인코딩이다. Base64 인코딩은 메시지에서 전송할 바이너리 데이터가 텍스트 전용인 SMTP 프로토콜의 범위를 넘지 않도록 사용된다. 예를 들어 이미지가 포함된 HTML 의 이메일 본문을 만드는 경우 포함된 이미지는 Base64 로 인코딩된다.
하지만 SMTP 와 달리 AMQP 는 바이너리 프로토콜이다.
클라이언트 라이브러리 활용하기
클라이언트 라이브러리를 사용해 소비자 코드를 작성하는 경우, content-encoding 속성을 사용해 수신 시 메시지를 자동으로 디코딩할 수 있다. 보통 라이브러리가 메시지의 전처리, 디코딩, 압축 해제를 처리하므로 직접 작성해야 하는 소비자 애플리케이션의 로직과 코드는 단순하다. 따라서 개발자는 메시지 본문을 처리하는 작업에 집중할 수 있다.
애플리케이션의 운영 중에 메시지 본문을 bzip2 로 압축하는 것이 더 적합하다고 판단돼 변경하더라도 content-encoding 속성을 검사하도록 소비자 애플리케이션을 구현하면 디코딩할 수 없는 메시지를 거부할 수 있다. zlib 압축만 해제할 수 있는 소비자 애플리케이션은 bzip2 로 압축된 메시지를 거부해 bzip2 압축을 풀 수 있는 다른 소비자 애플리케이션이 처리할 수 있도록 큐에 메시지를 남겨둔다.
3.4 message-id 와 correlation-id 를 이용한 메시지 참조
AMQP 스펙에서 message-id 와 correlation-id 는 '애플리케이션 용도' 로 지정됐으며, 공식적으로 정의된 동작은 없다. 두 필드는 최대 255 바이트의 UTF-8 로 인코딩된 값을 가지며 Basic.Properties 데이터 구조에 포함된 압축되지 않은 값으로 저장된다.
- message-id: 메시지를 고유하게 식별
- correlation-id: 메시지가 다른 메시지에 대한 응답임을 나타내는 데 사용, 이 경우 이전 메시지의 meesage-id 를 값으로 포함
3.4.1 message-id
판매 주문이나 지원 요청과 같은 경우 메시지를 쉽게 파악하는 데 도움이 된다.
3.4.2 correlation-id
다른 메시지에 대한 응답임을 표시. 또 다른 사용 예는 트랜잭션 ID 나 메시지가 참조하는 다른 데이터를 전달하는 데 이 속성을 사용하는 것이다.
3.5 timestamp
timestamp 속성을 사용해 메시지 생성 시점을 기록하면 메시지를 발행할 때 성능을 추적할 수 있다.
프로세스가 시행해야하는 서비스 수준 계약 (SLA, Service Level Agreement) 이 있다면, 소비자 애플리케이션에서 메시지의 timestamp 속성을 평가해 메시지를 처리할 지 여부를 결정하거나 메시지의 수명이 지정한 값을 초과한 경우 모니터링 애플리케이션에 경고 메시지를 발행해서 누군게에게 알릴 수 있다.
timestamp 는 유닉스 시간 (Unix epoch) 또는 1970년 1월 1일 자정 이래로 경과된 초를 나타내는 정수로 전송된다. 불행히도 timestamp 는 시간대 (time zone) 정보가 없으므로 UTC 혹은 다른 일관된 시간대를 약속해 사용하는 것이 좋다.
3.6 자동으로 메시지 만료하기
expiration 속성은 RabbitMQ 에서 소비하지 않은 메시지를 버려야 할 때를 파악하는데 사용한다. expiration 속성은 AMQP 스펙 0-8 과 0-9-1 버전에 모두 존재하지만, RabbitMQ 3.0 버전 이전에는 지원되지 않았다. '구현할 때 사용할 수 있지만 공식적인 동작은 없음' 으로 정의됐다. timestamp 와 동일하게 유닉스 시간을 값으로 갖지만, 타입은 255자의 짧은 문자열이라는 것이다.
RabbitMQ 에서 expiration 속성을 사용해 메시지를 자동으로 만료 처리하려면 유닉스 시간 또는 정수 기반 timestamp 를 값으로 가져야 하지만, 타입은 문자열로 저장돼야 한다. "2002-02-20T00:00:00-00" 과 같이 ISO-8601 형식의 타임스탬프를 저장하는 대신 문자열 값인 "1329696000" 과 동일한 형식으로 값을 설정해야 한다.
expiration 속성을 사용하는 메시지가 서버에 도착한 후 시간이 만료된 경우 메시지는 큐로 삽입되지 앉고 삭제된다.
RabbitMQ 에는 특정 상황에서만 메시지가 만료되는 다른 기능이 있다. 큐를 선언할 때 큐의 정의와 함께 x-message-ttl 속성을 인자로 전달해서 메시지를 만료할 수 있는데, 유닉스 시간이지만 밀리세컨드 정밀도 (유닉스 시간 X 1000) 의 정수로 값을 설정한다. 큐의 x-message-ttl 속성은 지정된 시간이 경과되면 메시지를 자동으로 삭제한다.
3.7 배달 모드를 이용해 안정성과 속도 조절하기
메시지를 디스크에 저장하면 RabbitMQ 서버를 정지하고 다시 시작하더라도 메시지가 소비될 때까지 큐에 남아있게 된다. delivery-mode 속성은 메시지를 저장하지 않을 경우 1, 메시지를 저장하는 경우 2, 이렇게 두 가지 값으로 지정된다.
노트
RabbitMQ 의 다양한 용어와 설정에 대해 처음 접한다면, 메시지 지속성 (persistence) 이 큐의 내구성 (durable) 속성과 혼동될 수 있다. 큐의 durable 속성은 RabbitMQ 서버나 클러스터를 다시 시작한 후에도 큐 정의가 유지돼야 하는지를 나타내는 반면, delevery-mode 는 메시지를 유지할지 여부를 나타낸다. 하나의 큐에는 디스크에 저장되는 지속성 메시지와 메모리에만 보관되는 비지속성 메시지가 동시에 포함될 수 있다.
웹 애플리케이션의 로그인 이벤트의 경우 delivery-mode 속성을 선택하기가 다른 케이스보다 쉽다. 로그인 이벤트가 없어진다고 해서 비즈니스가 위험에 빠지지는 않으므로 이벤트를 메모리에만 보관하는 것도 합리적인 선택이 된다. 이 경우 delivery-mode 를 1로 설정한다. 그러나 RabbitMQ 를 사용해 금융 거래 데이터를 발행하고 애플리케이션 아키텍처가 메시지 처리량보다는 정확한 전달에 초첨을 맞춘다면 delivery-mode 를 2로 지정해 지속성을 활성화한다.
배달모드는 성능에 중요한 영향을 미친다.
3.8 app-id 및 user-id 를 사용해 메시지의 출처 확인하기
app-id 와 user-id 는 소비자 애플리케이션에서 메시지를 처리하기 전에 유효성을 검증하는 용도로 활용된다.
app-id 속성은 발행자 애플리케이션에서 자유롭게 사용할 수 있는 문자열 값이다.
RabbitMQ 는 메시지를 발행하는 RabbitMQ 사용자에 대해 user-id 속성을 이용해 유효성을 검사한다.
**메시지 출처를 확인하는 데 사용한다.
3.8.1 app-id 속성
app-id 속성은 최대 255자의 짧은 UTF-8 문자열이다. 애플리케이션이 API 중심으로 디자인돼 버전 관리가 필요한 경우 app-id 를 사용해 생성된 메시지와 함께 특정 API 와 버전을 달리할 수 있다. 발행자와 소비자 간에 계약을 맺는 방법 중 하나로 사용한다면, 메시지를 처리하기 전에 app-id 를 검사해 알 수 없거나 지원하지 않는 출처의 메시지의 경우 애플리케이션에서 메시지를 삭제할 수 있다.
app-id 의 다른 사용법은 통계 데이터로 수집하는 것이다. 예를 들어 메시지를 사용해 로그인 이벤트를 전달하는 경우, app-id 속성을 로그인 이벤트를 발생시키는 애플리케이션의 플랫폼과 버전으로 설정한다. 웹 기반이나 데스크톱 그리고 모바일 클라이언트 애플리케이션을 사용하는 환경에서는 메시지 본문을 검사하지 않고도 플랫폼별로 로그인을 추적하기 위해 계약을 맺어서 데이터를 추출할 수 있다. 통계 수집 전용 소비자를 구현하고 로그인 이벤트를 처리하는 소비자와 동일한 메시지를 구독한다면 이 기능이 특히 유용하다. app-id 속성을 제공하면 통계 수집 전용 소비자가 메시지 본문을 디코딩할 필요가 없다.
사용법
- 버전관리
- 출처확인
- 통계
팁
큐에 대기하는 문제가 발생한 메시지의 출처를 추적할 때, app-id 를 이용하면 메시지의 출처를 쉽게 추적할 수 있으며, 이는 다수 애플리케이션이 동일한 RabbitMQ 인프라를 공유하는 대규모 환경에서 특히 유용하다. 기존 발행 애플리케이션과 동일한 익스체인지와 라우팅 키를 새로운 발행자 애플리케이션이 잘못 사용하는 경우가 종종 발생한다.
3.8.2 user-id 속성
사용자 인증의 경우에는 로그인한 사용자를 식별하기 위해 user-id 속성을 사용하는 것이 유용해 보이지만, 대부분의 경우 권장되지 않는다. RabbitMQ 는 메시지를 발행하는 사용자에 대해 user-id 속성의 값으로 발행된 모든 메시지를 검사하고 두 값이 일치하지 않으면 메시지가 거부된다. 예를 들어 애플리케이션이 RabbitMQ 를 사용해 사용자 'www' 로 인증하고 메시지의 user-id 속성을 'linus' 로 설정할 경우 메시지가 거부된다.
물론 작성하는 애플리케이션이 채팅이나 인스턴트 메시징 서비스라면 한 채팅방의 모든 사용자가 같은 user-id 를 사용해야 하며, 실제로 로그인한 실제 사용자를 식별하기 위해 user-id 를 사용할 수 는 없다.
3.9 type 속성을 이용해 메시지 특정하기
AMQP 0-9-1 버전에서 Basic.Properties 의 type 속성은 '메시지 유형 이름' 으로 정의돼 있는데, 애플리케이션 전용으로 공식적인 동작은 정해지지 않았다는 의미다. 익스체인지와 결합된 라우팅 키 값은 메시지 내용을 결정하는 데 필요한 만큼 메시지에 대한 많은 정보를 전달하는 데 반해, type 속성은 애플리케이션이 메시지 처리 방법을 결정 하는 데 또 다른 수단으로 사용된다.
**자유 형식 문자열 값으로 메시지 유형을 정의하는 데 주로 사용된다.
자체 설명 직렬화 형식이 충분히 빠르지 않은 경우
type 속성은 자체 설명 (self-describing) 메시지를 만들 때, 특히 메시지 본문이 자체 설명 데이터 형식으로 직렬화되지 않은 경우 매우 유용하다. JSON, XML 같은 형식은 너무 장황한 편이다. 이럴 경우 Apache Thrift 혹은 Protobug 와 같은 직렬화 형식을 선택하기도 한다. 이들은 MessagePack 과는 달리 이진 코드화 메시지 형식에 자체 설명이 포함되지 않았으므로 직렬화 및 역직렬화를 위한 외부 정의 파일이 필요한데, 메시지에 자체 설명이 빠져있으므로 유선에서 더 적은 페이로드가 사용된다.
발행자와 소비자 간에 실행 가능한 계약을 자체 설명하는 메시지와 달리 자체 설명하지 않는 메시지의 본문은 메시지를 소비자가 처리할 수 있는지 결정하기 전에 메시지 본문을 역직렬화해야 한다. 이 경우 type 속성을 사용해 레코드 유형이나 외부 정의 파일을 지정함으로써 소비자가 메시지를 처리하는데 사용하는 적절한 .thrift 나 .proto 파일에 접근할 수 없는 경우 처리할 수 없는 메시지로 판단해 거부할 수 있다.
이제 이벤트를 데이터 웨어하우스에 저장하자. 단일 소비자가 모든 메시지를 처리하기 위해 일반 큐를 사용해 ETL (추출 extract_변환 transform-로드 load) 단계를 수행한다. ETL 을 위한 큐의 소비자는 여러 유형의 메시지를 처리하고 type 속성을 사용해 추출된 데이터를 저장할 시스템이나 테이블 또는 클러스터를 결정한다.
노트
ETL 은 최종적인 보고 목적으로 OLTP 데이터를 추출해서 데이터 웨어하우스에 로드하는 표준 방식이다.
3.10 동적인 작업 흐름을 위한 reply-to 속성 사용하기
AMQP 스펙에서 reply-to 속성은 공식적으로 정의된 동작은 없고 '애플리케이션 용도' 로만 지정돼 있다. 앞서 언급한 속성들과는 달리, 메시지에 대한 응답을 위한 개인 응답 큐를 지정하는 데 사용될 수 있다는 점을 주목할 만 하다. AMQP 스펙에 개인 응답 큐에 대한 명확한 정의가 명시돼 있지는 않지만, reply-to 속성은 특정 큐 이름이나 메시지가 원래 발행된 동일한 익스체인지의 응답 키를 전달하는 데 사용할 수 있다.
경고
AMQP 0-9-1 버전에는 reply-to 속성에 대해 '요청 메시지에 사용될 때 개인 응답 큐의 읆을 보유할 수 있다.'는 경고가 있다. 이 정의는 모호하므로 사용할 때 주의해야 한다. reply-to 속성의 값 때문에 응답 메시지가 라우팅될 수 없는 경우, RabbitMQ 가 메시지 발행을 거부할 수 있다.
**reply-to 속성은 RPC 스타일의 메시지의 응답에 소비자가 사용해야 하는 라우팅 키를 전달하는 데 사용할 수 있다.
3.11 headers 를 사용해 사용자 속성 지정하기
사용자 정의 키와 값을 갖는 테이블이다. 키는 최대 255자의 길이를 갖는 ASCII 또는 유니코드 문자열을 설정할 수 있다. 값은 유효한 AMQP 값 유형을 설정할 수 있다. 다른 속성들과 달리 headers 속성을 사용하면 원하는 모든 데이터를 headers 테이블에 추가할 수 있다. headers 속성에는 특별한 기능이 있는데, RabbitMQ 는 라우팅 키를 사용하는 대신 헤더 테이블에 채워진 값을 기반으로 메시지를 라우팅할 수 있다는 점이다. headers 속성을 통한 메시지 라우팅은 6장에서 알아본다.
예시
키 (ASCII 또는 유니코드 문자열) | 값 (모든 AMQP 데이터 유형) |
foo | bar |
corge | 1 |
grault | True |
graply | 1921-08-19 |
3.12 priority 속성
RabbitMQ 3.5.0 부터 AMQP 스펙에 맞춰서 priority 필드가 구현됐다. priority 속성의 값은 큐에 포함된 메시지의 우선순위 지정에 사용하며, 0~9까지의 값을 갖는 정수로 정의된다. priority 가 9인 메시지가 발행되고 나서 priority 가 0 인 메시지가 발행되면 새로 연결된 소비자 애플리케이션은 priority 가 9인 메시지보다 0인 메시지를 먼저 받게 된다. (낮은 것 부터 받음) 흥미롭게도 RabbitMQ 는 priority 속성을 부호 없는 바이트 (unsigned byte) 로 구현해 0에서 255사이의 값을 지정할 수 있지만, AMQP 스펙과 상호운용성을 유지하려면 priority 를 0에서 9로 제한해야 한다.
3.13 사용할 수 없는 속성: cluster-id/reserved
cluster-id 는 AMQP 0-8 에서 정의됐으며 AMQP 0-9-1 에서 제거됐다.
AMQP 0-9-1 에서 예약됨 (reserved) 으로 변경됐으므로 사용하지 말아야 한다.
이 속성은 가급적 사용하지 않는 것이 좋다.
3.14 요약
속성 | 유형 | 사용처 | 명시된 내용 |
app-id | 짧은 문자열 | 애플리케이션 | 메시지르 발행하는 애플리케이션을 정의 |
content-encoding | 짧은 문자열 | 애플리케이션 | 메시지 본문이 zlib, deflate 또는 Base64 와 같은 특별한 방법으로 인코딩 되는지 지정 |
content-type | 짧은 문자열 | 애플리케이션 | mime-types 를 사용해 메시지 본문의 유형 지정 |
correlation-id | 짧은 문자열 | 애플리케이션 | 메시지가 참조하는 내용 |
delivery-mode | octet | RabbitMQ | 1은 RabbitMQ 가 메시지를 메모리에 보관 2는 디스크에 기록 |
expiration | 짧은 문자열 | RabbitMQ | 메시지가 만료되는 시기를 나타내는 데 사용하는 텍스트 문자열의 유닉스 시간 값 |
headers | 테이블 | 양쪽 모두 | 메시지에 대한 추가적인 메타데이터를 첨부하는 데 사용하는 자유 형식 키/값 테이블. 원하는 경우 RabbitMQ 가 이 값을 기반으로 라우팅 가능 |
message-id | 짧은 문자열 | 애플리케이션 | 메시지를 식별하는 데 사용할 수 있는 UUID 와 같은 고유 식별자 |
priority | octet | RabbitMQ | 큐에서 메시지의 우선순위를 지정하는 속성 |
timestamp | timestamp | 애플리케이션 | 메시지 작성 시점을 나타내는 데 사용하는 유닉스 시간 값 |
type | 짧은 문자열 | 애플리케이션 | 에플리케이션이 메시지 유형 또는 페이로드를 설명하는 데 사용할 수 있는 텍스트 문자열 |
user-id | 짧은 문자열 | 양쪽 모두 | 연결된 사용자에 대해 유효성을 검증하고 일치하지 않는 메시지를 삭제하는 자유 형식 문자열 |
4. 메시지 발행에서 성능 절충
여러 서버에 걸쳐 있는 HA 큐와 같은 다양한 메시지 배달 보장 수준을 선택할 수 있다. 4장에서는 이러한 기능을 사용하는 데 관련된 성능과 배달 보장의 성능 절충에 대해 알아보고 RabbitMQ 가 자동으로 발행자의 메시지를 조절하는 방법을 알아본다.
4.1 발행 속도와 배달 보장의 균형 잡기
RabbitMQ 서버를 재부팅해도 메시지가 유지되도록 하는 등의 일부 기능은 특정 애플리케이션에서는 너무 느리고 적합하지 않을 수 있다. 반면에 추가적인 배달 보장 없이 메시지를 발행한다면, 속도는 훨씬 빠르지만 미션 크리티컬 애플리케이션에서는 안전한 환경을 제공하지 못한다.
토끼 <-> 거북이
- 배달 보장 없음
- 실패 통보
- 발행자 확인
- 대체 익스체인지
- HA 큐
- 트랜잭션
- 트랜잭션 HA 큐
- 메시지 디스크에 저장
자체적인 성능 벤치마크를 수행해 성능과 배달 보장 간에 적절한 균형을 결정하는 것이 좋다.
올바른 솔루션을 위한 고성능과 메시지 배달 보장 사이에 적절한 균형을 찾는 과정에서는 다음 질문을 염두에 두길 바란다.
- 발행 시에 미시지에 큐를 넣는 것이 얼마나 중요한가?
- 메시지를 라우팅할 수 없는 경우, 발행자에게 메시지를 보내야 하는가?
- 메시지를 라우팅할 수 없는 경우, 차후에 조정하는 다른 곳으로 메시지를 보내야 하는가?
- RabbitMQ 서버에 장애가 발생할 때, 메시지가 손실돼도 괜찮은가?
- RabitMQ 가 새 메시지를 처리할 때, 요청한 모든 메시지를 라우팅한 후 디스크에 저장하는 작업이 정상적으로 수행했는지 발행자가 확인해야 하는가?
- 발행자가 메시지를 한꺼번에 전달하면 RabbitMQ 는 메시지를 라우팅하고 디스크에 저장한 후 작업이 정상적으로 실행됐는지를 발행자에게 다시 알려야 하는가?
- 다수 메시지를 라우팅한 후 디스크에 정상적으로 저장됐는지 확인하는 작업을 일괄 처리하는 경우, 메시지를 저장할 큐에 원자 커밋 (atomic commit) 이 필요한가?
- 발행자가 적절한 성능과 메시지 처리량을 달성하는데, 메시지 배달 보장 기능 간에 절충점이 있는가?
- 메시지 발행의 다른 측면이 메시지 처리량 및 성능에 영향을 미치는가?
HA 큐와 필수 (mandatory) 라우팅을 결합해서 선택하거나 delivery-mode 를 2로 설정하고 트랜잭션 발행을 선택해 메시지를 디스크에 보관할 수 있다. 애플리케이션 개발 시에 균형 잡힌 조합을 발견할 때 까지, 각각 다른 옵션을 조합해 시도하는 것을 추천한다.
4.1.1 배달 보장을 사용하지 않는 환경
이상적인 환경이라면 RabbitMQ 는 추가 구성이나 설정 없이 메시지를 안정적으로 전달한다. 올바른 Basic.Publish 를 통해 익스체인지, 라우팅 정보와 함께 메시지를 발행하면 RabbitMQ 가 메시지를 수신하고 적절한 큐에 전달한다. 네트워크 문제가 없고 서버 하드웨어가 안정적이며 장애도 발생하지 않는다면, 운영체제가 RabbitMQ 메시지 브로커의 운영 상태에 영향을 미치는 문제는 발생하지 않는다.
불행하게도 현실 세계에는 완벽한 환경에서는 결코 일어나지 않을 일들이 정기적으로 발생한다.
미션 크리티컬한 애플리케이션이 아닌 경우, 일반적인 메시지 발행 중 발생 가능한 모든 장애를 처리할 필요는 없으며, 적절한 처리만 해도 안정적이고 예측 가능한 가동 시간을 확보할 수 있다.
미션 크리티컬한 애플리케이션이 아닌 경우에는 RabbitMQ 의 기본 설정으로도 적절한 수준의 안정적인 메시징 환경을 구축할 수 있다. 배달 보장을 사용하지 않아도 적절한 환경에 대해 살펴보자.
이 메시지는 서버의 CPU 로드, 메모리, 네트워크 사용과 같은 정보를 전달한다.
RabbbitMQ 와의 연결이 끊어지면 다음 번에 통계 데이터를 보내야 할 때 다시 연결을 시도한다. 마찬가지로 소비자 애플리케이션은 연결이 끊어지면 다시 연결하고 이전에 사용하던 동일한 큐에서 다시 메시지를 소비한다.
4.1.2 mandatory 플래그를 설정한 메시지를 라우팅할 수 없을 때
서버 모니터링 데이터가 항상 RabbitMQ 로 배달되도록 보장하려면, collectd 에서 RabbitMQ 에 발행하는 메시지의 mandatory 를 설정한다. mandatory 플래그는 Basic.Publish RPC 명령과 함께 전달되는 인수인데, 메시지를 라우팅할 수 없으면 Basic.Return RPC 를 통해 RabbitMQ 가 메시지를 발행자에게 다시 보내도록 지시한다. mandatory 플래그는 오류 감지 모드를 켜는 것으로 간주할 수 있는데, 메시지 라우팅 실패를 알리는 데 사용한다. 메시지 라우팅이 올바르게 처리되면 발행자에게 별도의 메시지를 전송하지 않는다. mandatory 플래그를 설정한 메시지를 발행하기 위해 다음과 같이 익스체인지, 라우팅 키, 메시지, 속성을 전달한 후 인수를 전달한다.
노트
위 예제 코드에서는 Connection 과 Channel 객체를 호출하는 새로운 방법을 사용했는데, 두 객체 모두 컨텍스츠 관리자로 생성된다. (with xxx as connection: 문법)
Rabbit MQ 의 Basic.Return 은 비동기로 동작하며 메시지가 발행된 후 언제든지 발생할 수 있다. 예를 들어 RabbitMQ 에 통계 데이터를 발행하는 데 실패할 경우, collectd 가 Basic.Return 호출을 받기 전에 다른 데이터를 계속 발행할 수 있다.
mandatory 예외처리
import datetime
import rabbitpy
import time
# Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
url = 'amqp://guest:guest@localhost:5672/%2F'
connection = rabbitpy.Connection(url)
try:
with connection.channel() as channel:
properties = {'content_type': 'text/plain',
'timestamp': datetime.datetime.now(),
'message_type': 'graphite metric'}
body = 'server.cpu.utilization 25.5 1350884514'
message = rabbitpy.Message(channel, body, properties)
message.publish('chapter2-example',
'server-metrics',
mandatory=True)
except rabbitpy.exceptions.MessageReturnedException as error:
print('Publish failure: %s' % error)
다른 라이브러리의 경우, 메시지를 발행할 때 RabbitMQ 에서 Basic.Return RPC 를 전달받으면 실행할 콜백 메소드를 등록해야 할 수 있다. 비동기적으로 Basic.Return 메시지를 처리할 때 다른 메시지를 소비하는 것 처럼 Basic.Return 메소드 프레임, 콘텐츠 헤더 프레임, 바디 프레임을 받게 되는데, 복잡해 보여도 크게 걱정할 필요는 없다. 프로세스를 단순화하고 메시지 라우팅 오류를 처리하는 다른 방법이 있는데, 그 중 하나는 RabbitMQ 에서 발행자 확인을 사용하는 것이다.
노트
rabbitpy 라이브러리는 Basic.Publish 명령을 보낼 때 최대 세 개의 인수만 받는데, 이는 추가 인수로 immediate 플래그가 포함된 AMQP 스펙과 대조적이다. immediate 플래그는 메시지 브로커가 메시지를 즉시 라우팅할 수 없는 경우 Basic.Return 을 반환하도록 지시한다. immediate 플래그는 RabitMQ 2.9 이후로 더 이상 사용되지 않으며, 사용할 경우 예외가 발생되고 채널이 닫힌다.
4.1.3 트랜잭션보다 가벼운 발행자 확인
발행자 확인 (Publisher Confirms) 은 AMQP 스펙의 확장 기능으로 RabitMQ 관련 확장을 지원하는 클라이언트 라이브러리에서만 지원된다. 디스크에 메시지를 저장하는 것으로도 메시지 손실을 막을 수 있지만, 이것으로는 발행자와 RabbitMQ 사이에 메시지가 전달됐음을 확신할 수 없다. 메시지를 발행하기 전에 메시지 발행자는 RabbitMQ 에 Confirm.Select PRC 요청을 전송하고 메시지가 전달됐는지 확인하기 위해 Confirm.SelectOk 응답을 기다린다. 이 시점부터 발행자가 RabbitMQ 에 보내는 각 메시지에 대해 서버는 수신 확인 (Baisc.Ack) 또는 부정 수신 확인 (Basic.Nack) 으로 응답하며, 메시지의 오프셋을 지정하는 정수 값을 포함하거나 확인한다. 확인 번호는 Confirm.Slect RPC 요청 다음에 수신된 순서에 따라 메시지를 참조한다.
python 4.1.3\ Publisher\ Confirms.py

노트
발행자 확인의 사용 여부와 상관없이 존재하지 않는 익스체인지에 메시지를 발행할 경우, 채널은 RabbitMQ 에 의해 종료된다. 이 경우 rabbitpy 에서는 rabbitpy.exceptions.RemoteClosedChannelException 예외가 발생한다.
4.1.4 라우팅할 수 없는 메시지를 위한 대체 익스체인지 사용하기
대체 익스체인지 (alternate exchange) 는 라우팅할 수 없는 메시지를 처리하기 위해 RabbitMQ 팀이 AMQP 를 확장한 또 다른 예다. 대체 익스체인지는 처음 익스체인지를 선언할 때 명시되며, RabbitMQ 에서 익스체인지가 라우팅할 수 없으면, 새로운 익스체인지가 메시지를 라우팅할 기존의 익스체인지를 대신해 지정된다.
**라우팅할 수 없는 메시지가 대체 익스체인지를 정의한 익스체인지에 발행되면, 메시지느 대체 익스체인지에 전달된다.
노트
대체 익스체인지가 설정된 익스체인지로 메시지를 보낼 때 mandatory 플래그를 설정하면 의도한 익스체인지가 메시지를 정상적으로 라우팅할 수 없는 경우 Basic.Return 이 발행자에게 직접 전송되지 않는다. 라우팅할 수 없는 메시지를 대체 익스체인지에 보내는 동작은 mandatory 플래그를 True 로 설정한 메시지에도 동일하게 적용된다. RabbitMQ 의 메시지 라우팅 패턴이 다른 익스체인지와 마찬가지로 대체 익스체인지에도 동일하게 적용된다는 점을 주의해야 한다. 큐가 원래 라우팅 키가 명시된 메시지를 수신하도록 바인딩되지 않은 경우 메시지는 큐에 추가되지 않고 손실된다.
대체 익스체인지를 사용하려면, 먼저 라우팅 할 수 없는 메시지를 전송할 익스체인지를 설정해야 한다. 기본 익스체인지를 서정한 후, 메시지를 발행할 때 Exchange.Declare 명령에 alternate-exchange 인수를 추가한다. 다음 예제 코드에는 이전 예제 코드에 라우팅할 수 없는 메시지를 저장하는 메시지 큐가 더 추가됐다.

대체 익스체인지를 팬아웃 (fanout) 유형으로 생성했지만, graphite 익스체인지는 토픽 (topic) 유형으로 생성했다. 팬아웃 익스체인지는 자신이 알고 있는 모든 큐에 메시지를 전달하고 토픽 익스체인지는 라우팅 키를 기반으로 선택적으로 메시지를 라우팅할 수 있다. 익스체인지의 유형에 대해서는 5장에서 자세히 알아본다. 두 익스체인지를 생성하고 unroutable-message 라는 이름의 큐를 대체 익스체인지에 연결한다.

이후 graphite 익스체인지에 메시지가 발행되고 라우팅될 수 없는 메시지는 unroutable-message 큐에 저장된다.
4.1.5 트랜잭션으로 배치 처리하기
RabbitMQ 확장 스펙인 발행자 확인을 구현하기 전에 메시지 전달을 보장하기 위한 유일한 방법은 트랜잭션이었다. AMQP 트랜잭션 혹은 TX 클래스는 메시지를 일괄로 RabbitMQ 에 발행한 후 큐에 커밋하거나 롤백할 수 있는 매커니즘을 제공한다. 다음 예제는 '4.1.5 Transactional Publishing' 노트북에 포함돼 있다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
tx = rabbitpy.Tx(channel)
tx.select()
message = rabbitpy.Message(channel,
'This is an important message',
{'content_type': 'text/plain',
'delivery_mode': 2,
'message_type': 'important'})
message.publish('chapter4-example', 'important.message')
try:
if tx.commit():
print('Transaction committed')
except rabbitpy.exceptions.NoActiveTransactionError:
print('Tried to commit without active transaction')
존재하지 않는 익스체인지와 같은 오류로 인해 RabbitMQ 가 메시지를 라우팅할 수 없으면 TX.CommitOk 응답을 보내기 전에 Basic.Return 응답이 반환된다. 발행자가 트랜잭션을 중단하려는 경우 TX.Rollback RPC 요청을 보내고 계속 진행하기 전에 미싲 브로커의 TX.RollbackOk 응답을 기다려야 한다.
RabbitMQ 와 원자 트랜잭션
불행하게도 RabbitMQ 는 발행한 모든 명령이 단일 큐에 영향을 줄 때만 원자 트랜잭션을 지원한다. 트랜잭션의 명령이 둘 이상의 큐에 영향을 주면 커밋은 원자적으로 동작하지 않는다.
메시지 발행 확인을 목적으로 트랜잭션의 사용을 고려한다면 좀 더 단순한 발행자 확인을 사용하는 것을 추천하는데, 발행자 확인이 더 빠르며 성공과 실패를 확인할 수 있다.
그러나 대부분의 경우 발행자 확인 뿐 아니라 메시지가 큐에 있는 동안 손실되지 않는 것이 중요한데, 이는 HA 큐로 보장할 수 있다.
4.1.6 HA 큐를 사용해 노드 장애 대응하기
미션 크리티컬 메시징 아키텍처에서는 가용성이 높은 HA 가 중요한 역할을 한다. HA 큐도 AMQP 스펙이 아닌 RabbitMQ 팀이 만든 확장 기능이며, 큐를 여러 서버에 중복해 복사본을 저장하는 기능을 제공한다.
HA 큐는 클러스터로 구성된 RabbitMQ 환경이 필요하며 AMQP API 또는 웹 기반 관리자 UI 로 설정할 수 있다. 이어서는 AMQP API 를 이용해 설정하는 방법을 알아보고, 웹 기반 관리자 UI 를 사용해 HA 큐에 대한 정책을 설정하는 방법은 8장에서 자세히 알아보자.
import rabbitpy
connection = rabbitpy.Connection()
try:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel,
'my-ha-queue',
arguments={'x-ha-policy': 'all'})
if queue.declare():
print('Queue declared')
except rabbitpy.exceptions.RemoteClosedChannelException as error:
print('Queue declare failed: %s' % error)
메시지가 HA 큐로 설정된 큐에 발행되면 HA 큐를 담당하는 클러스터의 각 서버로 메시지가 전송된다. 클러스터의 노드가 메시지를 소비하면 다른 노드의 모든 메시지 복사본이 즉시 제거된다.
개별 노드를 지정하려면 x-ha-policy: all 대신 nodes 를 인수로 전달하고 다음 인수인 x-ha-nodes 에 큐의 노드 목록을 지정한다.
4.1.6 Selective HA Queue Declaration
import rabbitpy
connection = rabbitpy.Connection()
try:
with connection.channel() as channel:
arguments = {'x-ha-policy': 'nodes',
'x-ha-nodes': ['rabbit@node1',
'rabbit@node2',
'rabbit@node3']}
queue = rabbitpy.Queue(channel,
'my-2nd-ha-queue',
arguments=arguments)
if queue.declare():
print('Queue declared')
except rabbitpy.exceptions.RemoteClosedChannelException as error:
print('Queue declare failed: %s' % error)
노트
node1, node2, node3 가 정의돼 있지 않더라도 큐를 정의할 수 있으며 메시지를 발행하는 경우 큐에 전달된다. 나열된 노드 중 하나 이상이 존재하는 경우 메시지는 해당 서버에 저장된다.
다운된 노드가 다시 추가되거나 새 노드가 클러스터에 추가되더라도 기존 노드의 큐에 이미 존재하는 메시지는 포함되지 않는다. 대신 이전에 발행한 모든 메시지가 소비되면 모든 새 메시지가 수신되고 동기화된다.
4.1.7 HA 큐 트랜잭션
HA 큐에서 트랜잭션 또는 발행자 확인을 사용하는 경우, 메시지가 HA 큐의 모든 활성 노드에 있는 것으로 확인될 때 까지 RabbitMQ 는 성공 응답을 보내지 않는다. 이로 인해 발행자 애플리케이션에 대한 응답이 지연될 수 있다.
4.1.8 delivery-mode 2를 사용해 메시지를 디스크에 저장하기
이어서 또 다른 배달 보장에 대해 알아보자. RabbitMQ 서버가 메시지를 소비하기 전에 특정 이유로 노드가 다운될 경우, RabbitMQ 에 메시지를 발행할 때 디스크에 저장하도록 설정하지 않는다면 메시지는 영원히 손실된다.
노트
delivery-mode 2 외에 RabbitMQ 서버를 다시 시작한 후에도 메시지가 남아있게 하려면 큐를 만들 때 durable 로 선언돼야 한다.
**OLTP (On-line Transaction Processing, 온라인 트랜잭션 처리)
일반적으로 대부분의 웹 애플리케이션에서 쓰기의 비율은 낮다.
대부분의 운영체제에서 커널은 디스크에서 읽은 페이지를 버퍼링하기 위해 RAM 의 여유분을 사용하지만, 디스크 쓰기를 캐시하는 유일한 컴포넌트는 디스크 컨트롤러와 디스크다. 이 때문에 메시지를 디스크에 저장할 때 하드웨어의 스펙을 올바르게 설정하는 것이 중요하다. 소형 서버에서는 쓰기 작업량이 과한 경우 RabbitMQ 의 동작이 매우 느려질 수 있다.
4.2 RabbitMQ 푸시백
AMQP 스펙에는 RabbitMQ 서버 구현에 유리하지 않은 발행자의 가정이 있다. RabbitMQ 버전 2.0 이전에는 발행자 애플리케이션이 너무 빨리 메시지를 발행해 RabbitMQ 를 압도하기 시작한 경우 발행자에게 Channel.Flow RPC 메소드를 보내 차단하고 다른 Channel.Flow 명령을 받을 때 까지 더 이상 메시지를 보내지 않도록 지시한다.
이는 Channel.Flow 명령을 처리하지 않거나 잘못 처리하는 발행자 애플리케이션의 경우, 메시지 발행을 늦추는데 상당히 비효율적인 방법으로 알려졌다. RabbitMQ 3.2 버전 이전에 RabbitMQ 팀은 Channel.Flow 의 사용을 중단했으며, 이를 TCP 배압 (Back-Pressure) 매커니즘으로 대체해 문제를 해결했다. 발행자에게 정중하게 요청하지 않고 RabbitMQ 는 TCP 소켓에서 하위 수준의 데이터 수신을 중지한다. 이는 단일 발행자에게 RabbitMQ 가 압도당하지 않도록 보호한다.
내부적으로 RabbitMQ 는 크레딧 (credit) 이라는 개념을 사용해 발행자에 대해 언제 푸시백을 할 것인지를 관리한다. 새로운 연결이 생성되면 이 연결에 미리 사용할 수 있는 크레딧의 양이 할당되고 RabbitMQ 가 각 RPC 명령을 수신하면 크레딧이 감소한다.
연결에 남은 크레딧이 없으면 크레딧이 생길 때까지 무시한다.
RabbitMQ 3.2 부터 RabbitMQ 팀은 AMQP 스펙을 확장해 연결에 대한 크레딧이 임곗값에 도달했을 때 전송되는 알림을 추가하고 클라이언트에 연결이 차단됐다는 사실을 알린다. Connection.Blocked 와 Connection.Unblocked 는 RabbitMQ 가 발행자 클라이언트를 차단하거나 해당 블록이 제거됐을 때 언제든지 클라이언트에 알릴 수 있는 비동기 메소드다. 대부분의 주요 클라이언트는 이 기능을 구현했다. 사용중인 클라이언트 라이브러리에도 애플리케이션의 연결 상태를 결정하는 방법이 구현됐는지 확인해야 한다. 이어지는 절에는 rabbitpy 로 이 검사를 수행하는 방법과 3.2 이전의 RabbitMQ 버전에서 관리 API 를 활용해 연결의 채널이 차단됐는지 확인하는 방법을 설명한다.
4.2.1 rabbitpy 로 연결 상태 확인하기
Connection.Blocked 알림을 지원하는 RabbitMQ 버전을 사용하거나 그렇지 않더라도 rabbitpy 라이브러리는 이 기능을 사용하기 쉬운 하나의 API 로 제공한다. Connection.Blocked 알림을 지원하는 RabbitMQ 버전에 연결되면 rabbitpy 는 알림을 수신하고 연결이 차단됐다는 내부 플래그를 설정한다.
import rabbitpy
connection = rabbitpy.Connection()
print('Channel is Blocked? %s' % connection.blocked)

4.2.2 연결 상태 확인을 위한 관리자 API 사용하기
RabbitMQ 3.2 이전 버전을 사용하는 경우 애플리케이션은 웹 기반 관리 API 를 사용해 연결 상태를 지속적으로 폴링 (polling) 해 확인할 수 있다.
채널의 상태를 질의하면 name, node, connection_details, consumer_count, client_flow_blocked 와 같은 여러 필드를 확인할 수 있다. client_flow_blocked 필드는 RabbitMQ 가 TCP 배압을 연결에 적용했는지를 나타낸다.
채널 상태를 확인하기 전에 먼저 해당 이름의 채널을 생성해야 한다. 채널의 이름은 연결 이름과 해당 채널 ID 를 기반으로 하는데, 연결 이름을 구성하려면 다음이 필요하다.
- 로컬호스트 IP 주소 및 출력 TCP 포트
- 원격호스트 IP 주소 및 TCP 포트
연결의 형식은 "LOCAL_ADDR : PORT -> REMOTE_ADDDR : PORT" 고, 이를 확장한 채널 이름의 형식은 "LOCAL_ADDR : PORT -> REMOTE_ADDDR : PORT (CHANNEL_ID)" 다.
RabbitMQ 의 관리자 API 에 채널 상태를 질의하는 API 는 http://host:port/api/channels/[CHANNEL_NAME] 이다.
API 결과는 JSON 직렬화된 객체로 반환된다.
4.3 요약
- 발행자가 메시지를 디스크에 저장해야 하는가?
- 애플리케이션의 다양한 구성 요소는 발행된 메시지가 수신됐는지 보장해야 하는가?
- TCP 배압으로 애플리케이션이 차단되거나 RabbitMQ 에 메시지를 발행하는 동안 연결이 차단된 경우 어떻게 되는가?
- 메시지가 얼마나 중요한가? 메시지 처리량을 높이기 위해 배달 보장을 희생할 수 있는가?
이러한 질문은 올바른 애플리케이션 아키텍처를 만드는 데 도움이 된다.
5. 메시지를 받지 않고 소비하기
이 장에서 다루는 내용
- 메시지 소비하기
- 소비자 메시지 처리량 조정
- 소비자와 큐가 배타적인 경우
- 사용자를 위한 서비스 품질 지정
애플리케이션이 메시지를 소비하는 방법을 결정하는 것은 이 균형을 찾는 첫 번째 단계로 '메시지를 단순히 가져올지' 아니면 '메시지를 소비해야 하는지' 라는 질문으로 시작한다.
- 단순히 메시지를 전달받지 않고 소비하는 것이 좋은 이유
- 메시지 배달 보장과 성능 간의 균형을 유지하는 방법
- RabbitMQ 의 큐별 설정을 사용해 큐를 자동을 삭제하고, 메시지의 수명을 제한하는 등의 작업
5.1 Basic.Get vs. Basic.Consume
RabbitMQ 는 큐에서 메시지를 가져오는 두 AMQP RPC 명령인 Basic.Get 과 Basic.Consume 을 구현했다. 이 장의 제목에서 알 수 있듯이 Basic.Get 은 서버에서 메시지를 검색하는 이상적인 방법은 아니다. 간단히 말해 Basic.Get 은 폴링 모델이지만 Baisc.Consume 은 푸시 기반 모델이다.
5.1.1 Basic.Get
소비자 애플리케이션이 Basic.Get 요청을 사용해 메시지를 검색한다면 큐에 메시지가 여러 개 있어도 메시지를 받을 때마다 새 요청을 보내야 한다. Basic.Get 을 발행할 때 메시지를 검색하는 큐에 대기 중인 메시지가 있으면 RabitMQ 는 Basic.GetOk RPC 로 응답한다. 없으면 Basic.GetEmpty 로 응답한다.
5.1.2 Basic.Consume
이를 흔히 발행자-구독자 패턴 또는 Pub-Sub 이라고 부른다.
소비자 태그
Basic.Consume 을 실행하면 RabbitMQ 로 열린 채널에서 애플리케이션을 식별하는 고유한 문자열이 생성된다. 소비자 태그 (consumer tag) 라고 불리는 이 문자열은 RabbitMQ 의 각 메시지와 함께 애플리케이션에 전송된다.
소비자 태그는 이후에 Basic.Cancel RPC 명령으로 RabbitMQ 에 메시지 수신을 취소할 때 사용할 수 있다.
애플리케이션이 다른 큐에서 받은 메시지에 대해 다른 작업을 수행해야 하는 경우, Basic.Consume 요청에 전달된 소비자 태그를 사용해 메시지 처리 방법을 구분할 수 있다. 그러나 대부분의 경우 소비자 태그는 클라이언트 라이브러리가 자동으로 처리하므로 크게 걱정할 필요는 없다.
consumer
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
for message in queue.consume_messages(no_ack=True):
message.pprint()
publisher
import rabbitpy
for iteration in range(10):
rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'go')
rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'stop')

반복문이 종료될 때 rabbitpy 라이브러리의 내부에는 몇 가지 동작이 실행된다. 먼저 라이브러리는 RabbitMQ 에 Basic.Cancel 명령을 전송한다. 이후 Basic.CancelOk RPC 응답이 수신되면 RabbitMQ 가 클라이언트에 처리되지 않은 메시지를 보낸 경우 rabbitpy 는 Basic.Nack 명령을 전송하고 RabbitMQ 메시지를 다시 큐에 삽입하도록 지시한다.
5.2 소비자 성능 조정
5.2.1 빠른 성능을 위해 무응답 모드로 메시지 소비하기
메시지를 소비할 때 애플리케이션은 자신을 RabbitMQ 에 등록하고 메시지를 사용할 수 있을 때 전달되도록 요청한다. 애플리케이션이 Basic.Consume RPC 요청을 보낼 때, 인자에는 no-ack 플래그가 있다. 이 플래그가 활성화되면 RabbitMQ 는 소비자가 메시지 수신확인을 하지 않으므로 RabbitMQ 가 가능한 한 빨리 메시지를 보낼 수 있다.
RabbitMQ 는 열려 있는 연결을 통해 메시지를 보낼 때 TCP 소켓 연결을 통해 클라이언트와 통신한다. 이 연결이 열려 있고 쓰기 가능한 경우 RabbitMQ 는 모든 것이 정상적으로 동작하며 메시지가 전달됐다고 가정한다.
리눅스에서 수신 소켓 버퍼 늘리기
리눅스 운영체제에서 수신 소켓 버퍼 수를 늘리려면 net.core.rmem_default 와 net.core.rmem_max 값을 기본 128KB 값에서 더 큰 값으로 늘려야 한다. 대부분의 환경에는 16MB (16777216) 값이 적합하다. 대부분의 배포판에서는 /etc/sysctl.conf 에서 이 값들을 변경하는데, 다음 명령을 실행해 수동으로 값을 설정할 수도 있다.
echo 16777216 > /proc/sys/net/core/rmem_default
echo 16777216 > /proc/sys/net/core/rmem_max
일회용 메시지의 경우, 가능한 한 가장 빠른 메시지 처리 속도를 만드는 가장 이상적인 방법이지만 중요한 위험 요소가 없는 것은 아니다. 소비자 애플리케이션이 운영체제의 수신 소켓 버퍼에 100KB 메시지를 버퍼링한 상태로 장애가 발생할 경우 어떻게 될지 생각해보면 알 수 있다. RabbitMQ 는 이미 메시지들을 보낸 것으로 간주하며 애플리케이션이 충돌하고 소켓이 닫힐 때 운영체제에서 읽어야 하는 메시지의 수를 표시하지 않는다. 애플리케이션이 직면하는 문제는 운영체제의 소켓 수신 버퍼 크기와 함께 메시지 크기 및 수량에 따라 달라진다.
메시지를 이런 방식으로 소비하는 것이 애플리케이션 이카텍처에 맞지 않지만, 단일 메시지 전달 후 메시지 수신이 제공하는 것보다 빠른 메시지 처리를 원한다면 소비자 채널 서비스 품질 설정의 프리페치 (prefetching) 를 제어하는 것이 좋다.
5.2.2 서비스 품질 설정을 통한 소비자 프리페치 제어
AMQP 스펙에서는 소비자가 메시지 수신을 확인하기 전에 미리 지정된 수의 메시지를 수신하도록 처리할 수 있는 서비스 품질 (QoS, Quality of Service) 설정을 채널에 요청 할 수 있다.
수신 확인을 비활성화 (no_ack=True) 한 소비자와 달리, 소비자 애프리케이션은 메시지를 확인하기 전에 충돌하는 경우 소켓을 닫으면 미리 가져온 모든 메시지가 큐로 반환된다.
프로토콜 수준에서 채널에 Basic.QoS RPC 요청을 보내면 서비스 품질이 지정된다.
이때 요청을 전송하는 채널에 대해서만 QoS 를 설정할지 혹은 연결된 모든 채널에 대해 QoS 설정할지 지정할 수 있다. Basic.QoS 요청은 언제든지 보낼 수 있지만, 보통 소비자가 Basic.Consume RPC 요청을 발행하기 전에 수행된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
message.ack()

노트
AMQP 스펙에는 Basic.QoS 메소드의 프리페치 수와 프리페치 크기를 모두 설정하지만, no-ack 옵션을 설정하면 프리페치 크기가 무시된다.
프리페치 값을 최적의 수준으로 교정
프리페치 수를 지나치게 할당하면 메시지 처리량에 부정적인 영향을 미칠 수 있음을 인식하는 것이 중요하다.
RabbitMQ 에서 동일한 큐에 연결된 다수 소비자는 라운드 로빈 (Round-Robin) 방식으로 메시지를 수신하는데, 고속의 소비자 애플리케이션에서는 프리페치 수가 성능에 영향을 미치는지 벤치마크하는 것이 중요하다. 특정 설정은 메시지 구성, 소비자 애플리케이션의 행동 혹은 운영체제 및 언어와 같은 다른 요인에 따라 영향도가 달라질 수 있다.
2500의 프리페치 카운트 값이 최고 메시지 속도에 가장 적합한 설정임을 확인할 수 있다.
한 번에 여러 메시지 확인하기
QoS 설정 중 유용한 또 다른 점은 Basic.Ack RPC 응답과 함께 받은 각 메시지를 개별적으로 하나씩 확인하지 않아도 되는 점이다.
대신 Basic.Ack RPC 응답의 multiple 속성을 True 로 설정해 반환하면 RabbitMQ 는 수신 확인하지 않은 모든 메시지를 수신 확인으로 처리한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
unacknowledged = 0
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
unacknowledged += 1
if unacknowledged == 10:
message.ack(all_previous=True)
unacknowledged = 0
동시에 여러 메시지를 수신 확인하면 메시지 처리에 필요한 네트워크 통신을 최소화해 메시지 처리량을 향상시킬 수 있다. 이 방식의 수신확인은 인정 수준의 위험이 따른다는 점을 주의해야 한다.
QoS 뿐만 아니라 트랜잭션도 애플리케이션에 대한 메시지 전달 보장을 향상시키는 방법이다.
초당 메시지 처리 수를 예상해 prefetch count 를 설정하자.
5.2.3 소비자 애플리케이션에서 트랜잭션 사용하기
메시지를 RabbitMQ 에 발행할 때와 마찬가지로 트랜잭션을 사용해 소비자 애플리케이션에서 일괄 작업을 커밋하고 롤백할 수 있다. 트랜잭션 (AMQP TX 클래스) 은 한 가지 예외적인 상황을 제외하고는 메시지 처리량에 부정적인 영향을 미칠 수 있다.
노트
메시지 수신 확인이 비활성화된 소비자 애플리케이션은 트랜잭션이 작동하지 않는다.
5.3 메시지 거부하기
메시지를 처리하는 중에 문제가 발생하면 어떻게 해야 할까? 이러한 경우 RabbitMQ 는 메시지를 메시지 브로커에 다시 전달하는 Basic.Reject, Basic.Nack 두 가지 매커니즘을 제공한다. 이번 절에서는 두 기능의 차이점과 거부된 메시지를 일괄 처리해서 시스템 문제를 파악하는데 용이한 RabbitMQ 전용 확장, 데드 레터 익스체인지에 대해 알아본다.
Basic.Nack 은 한 번에 여러 메시지를 거부할 수 있지만, Basic.Reject 는 한 번에 하나의 메시지만 거부할 수 있다.
5.3.1 Basic.Reject
전달받은 메시지를 처리할 수 없음을 메시지 브로커에 알리는 AMQP 의 RPC 응답이다.
Basic.Ack 와 마찬가지로 RabbitMQ 가 생성한 배달 태그 (delivery tag) 와 함께 소비자가 RabbitMQ 와 통신하는 채널의 메시지를 고유하게 식별한다. 소비자가 메시지를 거부하면 RabbitMQ 가 메시지를 삭제하거나 큐에 있는 메시지를 다시 삽입되도록 지시할 수 있다. 재삽입 (requeue) 플래그가 활성화되면 RabbitMQ 는 차후에 다시 처리되도록 큐에 메시지를 넣는다.
requeue 플래그는 데이터베이스나 원격 API 와 같은 다른 서비스와 통신하는 소비자 애플리케이션을 작성하는 데 종종 사용된다. Basic.Ack 는 소비자 애플리케이션에서 RabbitMQ 와 통신하고 있는 채널의 메시지를 고유하게 식별하기 위해 RabbitMQ 에서 만든 배달 태그를 전달한다. 소비자가 메시지를 거부하면 RabbitMQ 에서 메시지를 삭제하거나 큐에 메시지를 다시 삽입할 수 있다.
재삽입 플래그는 데이터베이스와의 연결이 끊어지거나 원격 API 에 연결하는 것이 실패하는 경우와 같은 원격지의 예외로 인해 실패한 경우, 재시도를 위해 소비자 애플리케이션에 로직을 구현하는 대신, 예외를 잡아서 재삽입 플래그를 활성화한 메시지를 거부해서 처리한다.
다음은 '두 번 실패 시 제거' 정책으로 구현돼 있다. 문제의 원인이 메시지 혹은 다른 이유인지가 불확실한 경우, redelivered 플래그를 검사해서 메시지를 다시 재삽입해야 하는지, 폐기해야 하는지를 결정할 수 있다.
import rabbitpy
for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
'test-messages'):
message.pprint()
print('Redelivered: %s' % message.redelivered)
message.reject(True)

Basic.Ack 와 마찬가지로 Basic.Reject 를 사용하면 no-ack 가 활성화되지 않은 상태로 전달된 후 메시지가 제거된다.
5.3.2 Basic.Nack
아쉽게도 AMQP 스펙에는 다중 메시지 거부 기능이 제공되지 않는다. RabbitMQ 팀은 AMQP 스펙의 단점을 보완하기 위해 Basic.Nack 이라는 새로운 RPC 응답 메소드를 구현했다.
Basic.Nack 은 '부정적인 수신 확인' 의 줄임말로, Basic.Reject 응답 메소드와 뜻이 유사해 혼동될 수 있다.
경고
다른 RabbitMQ 확장 스펙과 마찬가지로, Basic.Nack 은 QPID, ActiveMQ 와 같은 다른 AMQP 메시지 브로커에 존재하지 않을 수도 있다.
5.3.3 데드 레터 익스체인지
데드 레터 익스체인지 (DLX, Dead-Letter Exchange) 는 RabbitMQ 확장 스펙이며, 전달된 메시지를 거부할 수 있는 추가적인 기능이다. 이는 특정 메시지를 소비하는 데 발생한 문제의 원인을 찾는 데 유용하다.
예를 들어 한 소비자 애플리케이션에서 XML 기반 메시지를 가져와서 PDF 파일로 변환한다고 하자. 이 작업이 이전에는 잘 동작했지만, 현재는 계속 실패한다고 가정하자.
데드 레터 익스체인지가 없다면, 소비자 애플리케이션에 XML 문서를 나중에 접근할 수 있는 특정 디렉토리에 저장하는 코드를 작성해야 한다. 코드를 직접 작성하는 대신, 데드 레터 익스체인지를 이용해 또 다른 큐를 지정한 후 터미널에서 소비자 코드를 직접 실행해 메시지 발행자 애플리케이션이 문서를 생성할 때, 유니코드 문자가 처리되는 방식과 관련된 문제 등의 원인을 확인할 수 있다.
큐에 없는 메시지가 거부되면 RabbitMQ 는 메시지를 큐의 x-dead-letter-exchange 인수에 지정된 익스체인지로 라우팅한다.
노트
데드 레터 익스체인지는 4장에서 알아본 대체 일스체인지와는 다르다. 만료되거나 거절된 메시지가 데드 레터 익스체인지를 통해 전달되는 반면, 대체 인스체인지는 RabbitMQ 가 라우팅 할 수 없는 메시지가 전달된다.
큐를 선언할 때 데드 레터 익스체인지를 지정하는 것은 간단하다. Queue.Declare RPC 요청을 발행할 때 rabbitpy Queue 객체를 생성하고 익스체인지의 이름과 dead_letter_exchange 인수 혹은 x-dead-letter-exchange 인수를 함께 전달하면 된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
rabbitpy.Exchange(channel, 'rejected-messages').declare()
queue = rabbitpy.Queue(channel, 'dlx-example',
dead_letter_exchange='rejected-messages')
queue.declare()

익스체인지를 지정하느 것 외에, 라우팅 키를 미리 지정된 값으로 대체할 수도 있다. 이렇게 하면 데드 레터 (dead-lettered) 메시지에 대해 동일한 익스체인지를 사용하지만 데드 레터 메시지를 서로 다른 큐에 전달할 수 있다. 미리 지정된 라우팅 키를 설정하려면 큐를 선언할 때 x-dead-letter-routing-key 라는 추가 인수를 지정해야 한다.
노트
AMQP 표준에 따라 RabbitMQ 의 모든 큐는 선언 후에 설정을 변경할 수 없다. 특정 큐의 데드 레터 익스체인지 설정을 변경하려면, 큐를 삭제하고 재선언해야 한다.
데드 레터 익스체인지가 애플리케이션 아키텍처에서 활용될 수 있는 시나리오는 많다. 잘못된 형식의 메시지를 안전한 저장소에 저장하는 것부터 거부된 신용카드 승인 처리와 같은 직접적인 기능까지 데드 레터 익스체인지는 매우 강력하지만, 큐를 선언할 때 간단한 옵션으로 지정할 수 있으므로 종종 유용성이 간과되는 경향이 있다.
5.4 큐 제어하기
일부 애플리케이션은 여러 소비자가 동일한 큐를 구독해야 할 수도 있고 다른 경우 단일 소비자만 있어야 할 수 있다. 채팅 애플리케이션은 큐의 메시지를 임시로 저장하고 사용자당 하나의 큐를 만들어야 하지만, 신용카드 처리 애플리케이션에서는 항상 존재하는 영구적인 큐를 만들어야 할 수 있다.
큐를 정의할 때, 큐의 동작을 결정하는 설정은 다음과 같이 다양하다.
- 자동 삭제 큐
- 큐 독점 설정
- 자동 메시지 만료
- 대기 메시지 수 제한
- 오래된 메시지 큐에서 제거
AMQP 스펙에 따라 큐 설정이 불변 (immutable) 이라는 점을 유의하자. 큐를 선언한 후에는 큐를 만들 때 지정한 설정을 변경할 수 없고, 큐의 설정을 변경하려면 큐를 삭제하고 다시 만들어야 한다.
큐를 생성할 때 사용할 수 있는 다양한 설정을 알아보기 위해 우선 임시 큐를 생성하는 방법을 알아보자.
5.4.1 임시 큐
자동삭제 큐
소비자와 연결을 맺고 메시지를 전달한 후 연결을 끊으면 큐는 제거된다. 자동삭제 큐를 생성하는 것은 쉬운 작업으로 Queue.Declare RPC 요청에서 auto_delete 플래그를 True 로 설정하면 된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'ad-example', auto_delete=True)
queue.declare()

자동 삭제 큐를 다수 소비자가 구독할 수 있다는 점에 유의해야 한다. 큐는 구독자가 더 이상 없을 때만 자신을 삭제한다.
한 사용 사례로서 채팅 애플리케이션에서 각 큐를 사용자의 입력 버퍼로 사용하는 경우가 있다. 채팅 애플리케이션에서 사용자의 연결이 끊어지면 큐에 읽지 않은 메시지가 있더라도 자동으로 삭제하는 데 이용하기도 한다.
또 다른 예를 살펴보면, RPC 스타일 애플리케이션에서 소비자에게 RPC 요청을 보내고 응답을 RabbitMQ 가 전달할 경우에 애플리케이션이 종료되거나 연결이 끊어질 때 자신을 삭제하는 큐를 만들어서 RabbitMQ 가 애플리케이션을 종료할 때 자동으로 큐를 정리하도록 할 수 있다. 이 경우 RPC 응답 큐는 원래 RPC 요청을 발행한 애플리케이션에서만 사용할 수 있어야 한다.
큐 독점 설정
RabbitMQ 에서 큐의 독점 (exclusive) 설정을 사용하지 않는다면 다수 소비자가 큐를 구독할 수 있는데, 큐를 구독해서 메시지를 소비하는 소비자의 수에 대한 제한은 없다. 실제로 큐는 메시지를 수신하는 모든 구독자에게 라운드 로빈으로 메시지를 전달한다.
큐에 독점 기능을 활성화하면 소비자가 연결 해제된 후 큐가 자동으로 제거된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'exclusive-example',
exclusive=True)
queue.declare()
독점 큐는 큐를 생성한 채널이 닫히면 자동으로 삭제되는데, 이는 자동 삭제를 설정한 큐를 소비자가 더 이상 구독하지 않으면 삭제되는 것과 유사하다. 자동 삭제 큐와는 달리 채널이 닫힐 때까지 독점 큐를 구독하는 사용자가 원하는 만큼 여러 번 소비하고 취소할 수 있다. 또한 자동 삭제 큐와 달리 Basic.Consume 요청이 발행됐는지에 관계없이 독점 큐를 자동 삭제가 발생한다는 점도 중요하다.
자동 만료 큐
자동 만료 큐는 시간에 민감한 작업에 대해 RPC 응답을 무기한으로 대기하지 않을 경우 유용하다. 만료 값이 설정된 RPC 응답 큐를 만든 후에 해당 큐가 만료되면 큐가 자동으로 삭제된다. 큐를 선언하고 큐의 존재를 폴링해서 대기 중인 메시지가 있거나 큐가 더 이상 존재하지 않는지 확인할 수 있다. x-expires 인수로 큐를 선언하면 된다.
import rabbitpy
import time
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-queue',
arguments={'x-expires': 1000})
queue.declare()
messages, consumers = queue.declare(passive=True)
time.sleep(2)
try:
messages, consumers = queue.declare(passive=True)
except rabbitpy.exceptions.AMQPNotFound:
print('The queue no longer exists')

자동 만료 큐와 관련해 다음과 같은 몇 가지 규칙이 있다.
- 큐는 소비자가 없으면 만료된다. 연결된 소비자가 있을 경우 큐는 Basic.Cancel 을 실행하거나 연결을 해제한 후에만 자동으로 제거된다.
- 큐는 TTL 지속 시간 동안 Basic.Get 요청이 없으면 만료된다. 하나의 Basic.Get 요청이 만료 값이 있는 큐로 전송되면 만료 설정이 초기화되고 큐는 자동으로 삭제되지 않는다.
- 다른 일반적인 큐와 마찬가지로 x-expires 설정은 다시 설정되거나 변경될 수 없다.
- RabbitMQ 는 큐가 만료될 때 즉시 제거되는 것을 보장하지 않는다.
5.4.2 영구적인 큐
내구성 큐
서버를 재시작한 후에도 계속 유지돼야 하는 큐를 선언하려면 내구성 (durability) 플래그를 True 로 설정해야 한다. 큐의 내구성은 종종 메시지 지속성 (persistence) 과 혼동된다. 이전 장에서 알아본 것 처럼 delivery-mode 속성을 2로 설정해 메시지를 발행하면 메시지가 디스크에 저장된다. 반대로 내구성 플래그는 Queue.Delete 가 호출되기 전까지 RabbitMQ 가 큐를 삭제하지 않도록 설정한다.
일반적으로 RPC 스타일 애플리케이션은 소비자와 주고받는 큐를 사용하지만, 내구성 큐는 여러 소비자를 같은 큐에 연결한 후 메시지 흐름이 동적으로 변경되지 않는 애플리케이션에 매우 편리하다.

큐에서 메시지의 자동 만료
미션 크리티컬한 애플리케이션이 아닌 경우, 메시지를 너무 오랫동안 소비하지 않을 때 자동으로 삭제하는 기능이 필요할 수 있다. 메시지 단위 TTL 설정을 통해 서버 측에서 메시지의 최대 수명에 대한 제약 조건을 걸 수 있다. 데드 레터 익스체인지와 TTL 값을 모두 설정한 큐는 만료 시에 메시지를 데드 레터로 간주한다.
메시지마다 적용되는 메시지의 expiration 속성과 달리 x-message-ttl 큐 설정은 큐에 있는 모든 메시지의 최대 수명을 적용한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-msg-queue',
arguments={'x-message-ttl': 1000})
queue.declare()


큐에 있는 메시지별 TTL 은 메시지의 사용처에 따라 서로 다른 값으로 지정할 때 사용한다. 실시간 대시보드에서 메시지를 자동으로 만료시키는 큐를 작성해 큐에서 수신 대기중인 오래된 정보를 수신하는 것을 방지할 수도 있다.
제한된 수의 메시지 보관
RabbitMQ 3.1.0 부터 큐의 메시지 최대 크기를 설정할 수 있다. 큐에 x-max-length 인수를 설정한 후 대기 중인 메시지 수가 최대 크기에 도달하면 RabbitMQ 는 새로운 메시지가 추가될 때 가장 먼저 받은 메시지를 삭제한다. x-max-length 로 선언한 큐는 클라이언트가 채팅방의 n 개의 최신 메시지를 받는 데 사용할 수 있다.
메시지 최대 크기를 설정한 큐의 맨 앞에서 제거된 메시지는 데드 레터 익스체인지를 설정한 경우 해당 익스체인지로 이동한다.


5.4.3 임의의 큐 설정
RabbitMQ 팀은 큐와 관련된 AMQP 스펙을 확장하는 새로운 기능을 구현했는데, 각 기능들은 큐를 정의할 때 인수로 전달한다.
인수 이름 | 목적 |
x-dead-letter-exchange | 메시지가 재삽입되지 않고 거부될 때, 라우팅할 익스체인지 |
x-dead-letter-routing-key | 거부된 메시지를 라우팅하는 데 사용하는 라우팅 키 |
x-expires | 지정된 시간 (밀리초 단위) 후에 큐를 삭제 |
x-ha-policy | HA 큐를 만들 때, 노드 간에 HA 를 적용하는 정책 지점 |
x-ha-nodes | HA 큐를 분산할 노드 (4.1.6절 참고) |
x-max-length | 큐에서 지정하는 메시지 만료 시간 (밀리초 단위) |
x-message-ttl | 큐의 최대 메시지 수 |
x-max-priority | 최대 값이 255인 큐의 우선순위를 지정하는 데 사용 (RabbitMQ 버전 3.5.0 이상) |
5.5 요약
소비자 애플리케이션을 작성할 때는 다음 질문을 고려해 애플리케이션에 적합한 옵션을 찾는 것을 추천한다.
- 모든 메시지를 수신했는지 또는 폐기할 수 있는지 확인해야 하는가?
- 메시지를 받은 다음 일괄적으로 수신 확인하거나 거부해야 하는가?
- 그렇지 않다면, 개별 작업을 자동으로 일괄 처리하고 트랜잭션을 사용해 성능을 향상시킬 수 있는가?
- 소비자 애플리케이션에서 트랜잭션 커밋 및 롤백 기능이 정말로 필요한가?
- 소비자가 구독하는 큐의 메시지를 독점적으로 접근해야 하는가?
- 소비자 애플리케이션에 오류가 발생했을 때 어떻게 처리해야 하는가? 메시지를 버려야 하는가? 큐에 재삽입해야 하는가? 혹은 데드 레터 익스체인지로 보내야 하는가?
6장에서는 여러 가지 메시징 패턴 및 사용 사례를 사용해 실제로 활용하는 방법을 알아보자.
6. 익스체인지 라우팅을 통한 메시지 패턴
이 장에서 다루는 내용
- RabbitMQ 가 제공하는 기본 유형 익스체인지와 플러그인 익스체인지
- 애플리케이션 아키텍처에 적합한 익스체인지 유형 선택하기
- 익스체인지 간 라우팅을 통한 메시지의 다양한 라우팅 옵션
아마도 RabbitMQ 의 최대 강점은 발행자 애플리케이션이 제공한 라우팅 정보를 기반으로 메시지를 서로 다른 큐로 유연하게 라우팅할 수 있다는 점일 것이다.
간단한 애플리케이션에는 복잡한 라우팅 로직이 필요하지 않지만, 올바른 유형의 익스체인지를 선택하는 것은 전체 애플리케이션 아키텍처에 큰 영향을 줄 수 있다.
이 장에서는 네 가지 기본 유형의 익스체인지와 각 익스체인지에 적절한 아키텍처 유형을 알아본다.
- 다이렉트 (direct) 익스체인지
- 팬아웃 (fanout) 익스체인지
- 토픽 (topic) 익스체인지
- 헤더 (headers) 익스체인지
토픽 익스체인지는 라우팅 키의 와일드카드 매칭을 기반으로 메시지를 선택적으로 전달하며, 헤더 익스체인지는 메시지 라우팅에 메시지 자체를 사용하는 새로운 방식을 제공한다.
마지막으로 단일 큐를 공유하는 여러 소비자 애플리케이션에 소비자의 처리량을 올리기 위해 사용하는 플러그인, 컨시스턴트 해싱 (consistent hashing) 익스체인지에 대해 알아본다.
6.1 다이렉트 익스체인지를 사용한 간단한 메시지 라우팅
다이렉트 익스체인지는 특정 큐 또는 특정 큐 그룹에 메시지를 전달할 떄 유용하다. 메시지를 발행할 때 사용하는 라우팅 키와 동일한 키로 익스체인지에 바인딩된 모든 큐에 메시지가 전달된다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
exchange = rabbitpy.Exchange(channel, 'direct-example',
exchange_type='direct')
exchange.declare()
다이렉트 익스체인지는 매우 단순하며 RPC 메시징 패턴에서 응답 메시지의 라우팅에 적합하다.
6.1.1 애플리케이션 아키텍처 만들기
이미지를 익스체인지에 발행할 때 응답 큐 이름은 메시지 속성의 reply-to 필드에, 요청 ID 는 correlation-id 필드에 저장된다.
익스체인지 선언하기
RabbitMQ 에 연결되면 RPC 요청 메시지를 라우팅하는 익스체인지와 RPC 응답을 라우팅하는 익스체인지를 선언한다.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
for exchange_name in ['rpc-replies', 'direct-rpc-requests']:
exchange = rabbitpy.Exchange(channel, exchange_name,
exchange_type='direct')
exchange.declare()
RPC 작업 처리를 위한 익스체인지를 선언한 후에는 이어서 RPC 작업자를 만든다.
6.1.2 RPC 작업자 생성하기
노트
큐를 생성할 때 이름을 생략하면 RabbitMQ 가 자동으로 큐의 이름을 생성한다.
RPC 요청 처리하기
rabbitpy.Queue.consume_messages 메소드를 사용하는데, 이는 파이썬 컨텍스트 매니저로도 사용된다. 파이썬 컨텍스트 매니저는 with 문에 의해 호출되는 언어 구조다. 종료할 때 실행되는 메소드인 __enter__ 와 __exit__ 를 정의해야 한다.
콘텐츠 매니저를 사용해 코드를 호출하면 rabbitpy 가 AMQP RPC 요청인 Basic.Consume 과 Basic.Cancel 을 처리하기 때문에 로직 코드에만 집중할 수 있다.
rabbitpy 는 timestamp 속성을 파이썬 datetime 객체로 자동 변환하므로 메시지를 발행한 후 초 단위로 계산하려면 값을 유닉스 UNIX 시간으로 다시 변환해야 한다.
결과 응답하기
headers 속성에 메시지를 처음 발행할 때의 timestamp 를 설정하는데, 발행자 애플리케이션에서 요청에서 응답까지의 총 시간을 측정해 모니터링하는 데 사용할 수 있다.
6.1.3 간단한 RPC 발행자 코드 작성하기
외부 라이브러리 설정하기
..,.
2부 데이터 센터 또는 클라우드에서 RabbitMQ 운영하기
3부 통합과 맞춤 설정
'DevOps > 스터디' 카테고리의 다른 글
따라하며 배우는 도커와 CI 환경 (1) | 2023.03.26 |
---|---|
시작하세요! 도커 / 쿠버네티스 (0) | 2023.02.05 |
시작하세요! 도커 / 쿠버네티스 legacy (0) | 2022.06.17 |
따라하며 배우는 도커와 CI환경 (0) | 2022.03.11 |