목차

Data Stream 예제(Publish)

개요

Spring Cloud Stream 기반으로 처리되는 발행 예제 5종을 제공하며 발행된 메시지는 메시지 브로커로 전달된다.

데이터 발행 유형

순번 유형 발행 기반 Messsage Broker 비고
1 IOT 온습도 센서 라즈베리파이 RabbitMQ 센서정보를 GPIO를 통해 수신하여 Python 기반으로 송신
2 Log Logback Appender Spring Boot Apache Kafka Logback 기반의 Appender 사용
3 File Line 기반 파일 Spring Cloud Stream Apache Kafka
4 DB H2 DB Spring Cloud Stream Apache Kafka
5 Open API 지하철 전동차 정보 Spring Cloud Stream Apache Kafka

설명

1. IOT 데이타 발행 예제

python3 기반 소스코드

Adafruit_DHT 패키지및 AMQP통신을 위한 pika 라이브러리를 사용한다.

#!/usr/bin/python3 
 
import Adafruit_DHT      # 라이브러리 불러오기
import pika # AMQP 라이브러리
 
class Publisher:
    def __init__(self):
        self.__url = '192.168.100.50'
        self.__port = 5672
        self.__vhost = 'egov'
        self.__cred = pika.PlainCredentials('guest', 'guest')
        self.__exchange = 'sensor';
        self.__queue = 'q_temp_humi';
        return
 
    def main(self):
 
        sensor = Adafruit_DHT.DHT22     #  sensor 객체 생성
        pin = 4                        # Data핀의 GPIO핀 넘버
 
        humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)   # 센서 객체에서 센서 값(ㅇ노도, 습도) 읽기
        result = ''
 
        if humidity is not None and temperature is not None:   #습도 및 온도 값이 모두 제대로 읽혔다면 
            result = '{{ "temp":{0:0.1f} , "humidity":{1:0.1f} }}'.format(temperature, humidity)
            print('Temp={0:0.1f}*C  Humidity={1:0.1f}%'.format(temperature, humidity))  # 온도, 습도 순으로 표시
        else:                                                  # 에러가 생겼다면 
            result = '{"temp":-100.0 , "humidity": -1.0}'
            print('Failed to get reading. Try again!')        #  에러 표시
 
        print(result)  # 온도, 습도 순으로 표시
 
        conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred))
        chan = conn.channel()
        chan.basic_publish(
            exchange = self.__exchange,
            routing_key = self.__queue,
            body = result
        )
        conn.close()
        return
 
publisher = Publisher()
publisher.main()

구동

$ python3 send_rabbitmq_humi_temp.py
Temp=25.4*C  Humidity=24.6%
{ "temp":25.4 , "humidity":24.6 }

2. File 데이타 발행 예제

기능설명

csv파일이나 라인기반 텍스트 파일에 데이타가 누적되는 즉시 메시지 브로커로 전달한다.
PAGE_COUNT : 한번에 처리할 라인 수
FILE_PATH : 처리할 라인기반 텍스트 파일명

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        lineFileProducer-out-0:
          destination: line-topic
          binder: kafka
      function:
        definition: lineFileProducer

관련소스

유형 대상소스명 설명
Supplier Bean /src/main/java/egovframework/webflux/stream/file/EgovStreamLineFile.java 라인기반 파일 데이타 발행

3. Log 데이타 발행 예제

기능설명

Log 데이타 처리를 LogBack KafkaAppender를 통해 메시지 브로커로 간단하게 전달하는 방법을 소개한다.

관련소스

유형 대상소스명 설명
Supplier /src/main/resources/logback.xml 로그에 대해 데이타 발행

4. DB 데이타 발행 예제

기능설명

CRUD에 의한 DB 처리 수행시 AOP에 의해 필요한 데이타를 전달한다.

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        historyDb:
          destination: db-topic
          binder: kafka

관련소스

유형 대상소스명 설명
AOP /src/main/java/egovframework/webflux/stream/aspect/AspectDbHistory.java DB 처리시 AOP 수행
ListDTO /src/main/java/egovframework/webflux/stream/aspect/dto/SampleListDTO.java SampleDTO List
DTO /src/main/java/egovframework/webflux/stream/aspect/dto/SampleDTO.java DB데이타 전달용 DTO
DTO /src/main/java/egovframework/webflux/stream/aspect/dto/DefaultDTO.java AOP관련 자료 DTO
Configuration /src/main/java/egovframework/webflux/config/EgovR2dbcConfig.java R2dbc 설정
Configuration /src/main/java/egovframework/webflux/config/EgovH2ConsoleConfig.java H2 웹 콘솔 구동

5. Open API 데이타 발행 예제

기능설명

전동차 위치정보를 JSON데이타 기반의 Open API를 이용하여 수신하는데 Spring Cloud Open Feign을 활용한다.
JSON 및 XML로 정보를 수신할수 있으며 여기서는 JSON을 사용한다.

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        trainApiProducer-out-0:
          destination: api-topic
          binder: kafka
      function:
        definition: trainApiProducer

관련소스

유형 대상소스명 설명
Configuration /src/main/java/egovframework/webflux/stream/api/EgovConfigTrainApi.java Supplier Bean 생성
Configuration /src/main/java/egovframework/webflux/stream/feign/EgovConfigOpenFeign.java ConfigOpenFeign Decoder/Encoder 설정(JSON)
Configuration /src/main/java/egovframework/webflux/stream/feign/EgovTrainFeignClient.java OpenFeign Client 설정
DTO /src/main/java/egovframework/webflux/stream/feign/ErrorMessage.java 에러 메세지 처리용 DTO
DTO /src/main/java/egovframework/webflux/stream/feign/RealtimePosition.java 전동차 위치정보 DTO
DTO /src/main/java/egovframework/webflux/stream/feign/ResultTrainPosition.java 전동차 위치정보 List DTO
DTO /src/main/java/egovframework/webflux/stream/feign/XmlResultTrainPosition.java 전동차 위치정보 DTO (XML)

소스코드

참고자료