본문 바로가기

개발스터디/MSA 스터디 (22년)

[MSA 1.0] (3) RabbitMQ 메시징 큐 서비스 구현하기

 

 

먼저 여러분이 MQ에 대해서 생소하다면 일단 기존 Legacy Application은 어떻게 구현되있는지를 알아야한다.

 

여러분은 백엔드라고 하면 spring framework, django, jsp 등등을 생각하는 경향이 있다.

뭐 틀린말은 아니지만 이는 backend중에서는 앞단이다.

내부적으로 데이터를 처리하고 가공하는 backend중에서도 뒷단이 존재한다.

즉 backend에서도 front, back이 나뉘게 된다.

 

back의 front와 back의 back, 보통 앞은 RESTful, SOAP, GraphQL등의 API서버라고 불리게 되고 뒷단은 Analysis서버라고 불리게된다.

이 둘을 한 서버에서 돌리는건 위험하다(실무에서는 한 컴퓨터에서 한 서버를 돌리니까 정확히 말하면 한 컴퓨터에서 돌리는게 위험하다고 볼 수 있다.)

왜냐하면 사용자가 접속해서 트래픽을 발생시키는 API와 데이터를 계속해서 처리하고 있는 분석서버가 같이 돌아가면

사용자가 갑자기 트래픽이 폭주하는 상황에서 뜬금없이 분석서버가 사망,

반대로 갑자기 분석할 데이터양이 늘어나서 폭주해서 API서버가 뜬금없이 죽어서 분명 분석서버가 죽었는데 로그인까지 못하는 어처구니 없는 상황이 일어난다.

그래서 서버는 계속해서 분할하는게 가용성측면에서 이득을 본다. 그리고 이렇게 해야 확장성도 용이하게 된다.

이러한 구조를 MSA(마이크로 서비스 아키텍쳐)라고 부르게된다. 어찌됬던 서버들이 분할된다는 것이다.

 

문제는 이렇게 분할된 서버들을 어떻게 데이터를 주고받을 것이냐는 것이다.

 

사실 그냥 다이렉트로 주고받으면 안되냐?라고 생각할 수 있다.

하지만 그렇게 하지 않는 정말 실무적인 이유가 있다.

 

가령 API서버와 분석서버간의 통신을 예시로 들어보자. 분석서버가 데이터 분석을 끝내고 이를 API서버로 요청해서 데이터를 적재한다고 가정하자.

근데 분석서버의 데이터가 갑자기 폭주했다고 가정하자. 이 때 API서버가 폭주하는 데이터를 버텨내지 못하고... 죽어버렸다면?

서비스는 멈추게 된다. 

 

그래서 큐에 적재하고 큐에서 완충역활을 하는 녀석이 있어주면 좋다.

데이터가 폭주하는 최악에 상황에서 어짜피 죽는건 큐다. 그럼 API서버는 무사하므로 일반 웹은 계속 돌아간다.

그럼 분석은 추가적으로 안되겠지만 DB도 무사하고 인증서버도 무사하므로 그냥 평소처럼 웹을 사용할 수 있다.

이번시간에는 우리가 개발한 Django와 Flask의 통신마다 Message 큐를 입혀서 통신할수 있도록 

코드를 개발하도록 하겠다.

 

1. RabbitMQ as a Service 포탈 가입

 

아래 사이트에 접속하여 회원가입 후 1개월 무료 이용가능

https://www.cloudamqp.com/

 

 

 

CloudAMQP - RabbitMQ as a Service

Managed RabbitMQ servers hosted in the cloud. Decouple your applications with the speed of CloudAMQP, a highly available message queuing service.

www.cloudamqp.com

 

 

2. Create new instance

 

 

  • Plan: Little Lemur ( 1달 Free 버전 )
  • Datacenter: AP-northeast-1 (AWS- Tokyo 리젼)

 

 

3. 인스턴스 클릭 후 URL 확인

 

아래 AMQP details의 URL을 복사, 해당 url을 통해 각 프로젝트 마다 producer.py와 consumer.py를 생성하여

connection 함.

 

 

 

 

 

4. consumer, producer 파일 만들기

consumer.py는 각 프로젝트의 상단에 생성한다. (/consumer.py)

producer.py는 각 프로젝트의 내부에 생성한다. (products/producer.py), 

기본적으로 예제에서는 Flask는 consumer.py에서 Django로 부터 json 데이터를 받거나 호출 받아서 해당 데이터를 이용하여 CRUD 기능을 구현한다.

 

구현에 대한 내용은 github를 참고하고, 아래 내용은 기본적인 연결에 대한 가이드이다. 

구현한 상태에서 장고 가상 env backend 서버에 접속하여 

python consumer.py로 consumer을 실행시킨다면 RabbitMQ를 통한 메시징 큐 기능을 확인해볼수 있을것이다.

 

# 장고의 producer.py

import pika, json

params = pika.URLParameters('your_rabbitmq_url')

connection = pika.BlockingConnection(params)

channel = connection.channel()


def publish():
    channel.basic_publish(exchange='', routing_key='main', body='hello')

# 함수 구현을 producer.py 페이지에 해두고 후에, views.py에서 각 crud 메서드에서 호출한다.


# pika 라이브러리를 통해 Application과 RabbitMQ를 연결한다.
# Connection - Application과 RabbitMQ Broker사이의 TCP 연결
# Channel - connection내부에 정의된 가상의 연결. queue에서 데이터를 관리할 때 생기는 일종의 통로같은 개념
# products/views.py

... 중략


class ProductViewSet(viewsets.ViewSet):
    def list(self, request): # /api/products
        products = Product.objects.all()
        serializer = ProductSerializer(products, many=True)
        publish()
        return Response(serializer.data
        
        
        
        ... 중략

 

# 장고의 consumer.py
mport pika


params = pika.URLParameters('amqps://wjdihkuz:13LRz2uHVsbK80oPLnLmPfwwowPFbdBs@cougar.rmq.cloudamqp.com/wjdihkuz')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='admin')


def callback(channel, method, properties, body):
    print('Received in admin')
    print(body)


channel.basic_consume(queue='admin', on_message_callback=callback)

print('Started Consuming')

channel.start_consuming()

channel.close()