Spring Cloud Stream 기반으로 처리되는 발행 예제 5종을 제공하며 발행된 메시지는 메시지 브로커로 전달된다.
순번 | 유형 | 발행 | 기반 | Messsage Broker | 비고 |
---|---|---|---|---|---|
1 | IOT | 온습도 센서 | 라즈베리파이 | RabbitMQ | 센서정보를 GPIO를 통해 수신하여 Python 기반으로 송신 |
2 | Log | Logback Appender | Spring Boot | Apache Kafka | Logback 기반의 Appender 사용 |
3 | File | Line 기반 파일 | Spring Cloud Stream | Apache Kafka | |
4 | DB | H2 DB | Spring Cloud Stream | Apache Kafka | |
5 | Open API | 지하철 전동차 정보 | Spring Cloud Stream | Apache Kafka |
Adafruit_DHT 패키지및 AMQP통신을 위한 pika 라이브러리를 사용한다.
#!/usr/bin/python3 import Adafruit_DHT # 라이브러리 불러오기 import pika # AMQP 라이브러리 class Publisher: def __init__(self): self.__url = '192.168.100.50' self.__port = 5672 self.__vhost = 'egov' self.__cred = pika.PlainCredentials('guest', 'guest') self.__exchange = 'sensor'; self.__queue = 'q_temp_humi'; return def main(self): sensor = Adafruit_DHT.DHT22 # sensor 객체 생성 pin = 4 # Data핀의 GPIO핀 넘버 humidity, temperature = Adafruit_DHT.read_retry(sensor, pin) # 센서 객체에서 센서 값(ㅇ노도, 습도) 읽기 result = '' if humidity is not None and temperature is not None: #습도 및 온도 값이 모두 제대로 읽혔다면 result = '{{ "temp":{0:0.1f} , "humidity":{1:0.1f} }}'.format(temperature, humidity) print('Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity)) # 온도, 습도 순으로 표시 else: # 에러가 생겼다면 result = '{"temp":-100.0 , "humidity": -1.0}' print('Failed to get reading. Try again!') # 에러 표시 print(result) # 온도, 습도 순으로 표시 conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() chan.basic_publish( exchange = self.__exchange, routing_key = self.__queue, body = result ) conn.close() return publisher = Publisher() publisher.main()
$ python3 send_rabbitmq_humi_temp.py Temp=25.4*C Humidity=24.6% { "temp":25.4 , "humidity":24.6 }
csv파일이나 라인기반 텍스트 파일에 데이타가 누적되는 즉시 메시지 브로커로 전달한다.
PAGE_COUNT : 한번에 처리할 라인 수
FILE_PATH : 처리할 라인기반 텍스트 파일명
spring: cloud: stream: bindings: lineFileProducer-out-0: destination: line-topic binder: kafka function: definition: lineFileProducer
유형 | 대상소스명 | 설명 |
---|---|---|
Supplier Bean | /src/main/java/egovframework/webflux/stream/file/EgovStreamLineFile.java | 라인기반 파일 데이타 발행 |
Log 데이타 처리를 LogBack KafkaAppender를 통해 메시지 브로커로 간단하게 전달하는 방법을 소개한다.
유형 | 대상소스명 | 설명 |
---|---|---|
Supplier | /src/main/resources/logback.xml | 로그에 대해 데이타 발행 |
CRUD에 의한 DB 처리 수행시 AOP에 의해 필요한 데이타를 전달한다.
spring: cloud: stream: bindings: historyDb: destination: db-topic binder: kafka
유형 | 대상소스명 | 설명 |
---|---|---|
AOP | /src/main/java/egovframework/webflux/stream/aspect/AspectDbHistory.java | DB 처리시 AOP 수행 |
ListDTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleListDTO.java | SampleDTO List |
DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleDTO.java | DB데이타 전달용 DTO |
DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/DefaultDTO.java | AOP관련 자료 DTO |
Configuration | /src/main/java/egovframework/webflux/config/EgovR2dbcConfig.java | R2dbc 설정 |
Configuration | /src/main/java/egovframework/webflux/config/EgovH2ConsoleConfig.java | H2 웹 콘솔 구동 |
전동차 위치정보를 JSON데이타 기반의 Open API를 이용하여 수신하는데 Spring Cloud Open Feign을 활용한다.
JSON 및 XML로 정보를 수신할수 있으며 여기서는 JSON을 사용한다.
spring: cloud: stream: bindings: trainApiProducer-out-0: destination: api-topic binder: kafka function: definition: trainApiProducer
유형 | 대상소스명 | 설명 |
---|---|---|
Configuration | /src/main/java/egovframework/webflux/stream/api/EgovConfigTrainApi.java | Supplier Bean 생성 |
Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovConfigOpenFeign.java | ConfigOpenFeign Decoder/Encoder 설정(JSON) |
Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovTrainFeignClient.java | OpenFeign Client 설정 |
DTO | /src/main/java/egovframework/webflux/stream/feign/ErrorMessage.java | 에러 메세지 처리용 DTO |
DTO | /src/main/java/egovframework/webflux/stream/feign/RealtimePosition.java | 전동차 위치정보 DTO |
DTO | /src/main/java/egovframework/webflux/stream/feign/ResultTrainPosition.java | 전동차 위치정보 List DTO |
DTO | /src/main/java/egovframework/webflux/stream/feign/XmlResultTrainPosition.java | 전동차 위치정보 DTO (XML) |