Data Stream 예제(Publish)

개요

Spring Cloud Stream 기반으로 처리되는 발행 예제 5종을 제공하며 발행된 메시지는 메시지 브로커로 전달된다.

데이터 발행 유형

순번 유형 발행 기반 Messsage Broker 비고
1 IOT 온습도 센서 라즈베리파이 RabbitMQ 센서정보를 GPIO를 통해 수신하여 Python 기반으로 송신
2 Log Logback Appender Spring Boot Apache Kafka Logback 기반의 Appender 사용
3 File Line 기반 파일 Spring Cloud Stream Apache Kafka
4 DB H2 DB Spring Cloud Stream Apache Kafka
5 Open API 지하철 전동차 정보 Spring Cloud Stream Apache Kafka

설명

1. IOT 데이타 발행 예제

python3 기반 소스코드

Adafruit_DHT 패키지및 AMQP통신을 위한 pika 라이브러리를 사용한다.

#!/usr/bin/python3 
 
import Adafruit_DHT      # 라이브러리 불러오기
import pika # AMQP 라이브러리
 
class Publisher:
    def __init__(self):
        self.__url = '192.168.100.50'
        self.__port = 5672
        self.__vhost = 'egov'
        self.__cred = pika.PlainCredentials('guest', 'guest')
        self.__exchange = 'sensor';
        self.__queue = 'q_temp_humi';
        return
 
    def main(self):
 
        sensor = Adafruit_DHT.DHT22     #  sensor 객체 생성
        pin = 4                        # Data핀의 GPIO핀 넘버
 
        humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)   # 센서 객체에서 센서 값(ㅇ노도, 습도) 읽기
        result = ''
 
        if humidity is not None and temperature is not None:   #습도 및 온도 값이 모두 제대로 읽혔다면 
            result = '{{ "temp":{0:0.1f} , "humidity":{1:0.1f} }}'.format(temperature, humidity)
            print('Temp={0:0.1f}*C  Humidity={1:0.1f}%'.format(temperature, humidity))  # 온도, 습도 순으로 표시
        else:                                                  # 에러가 생겼다면 
            result = '{"temp":-100.0 , "humidity": -1.0}'
            print('Failed to get reading. Try again!')        #  에러 표시
 
        print(result)  # 온도, 습도 순으로 표시
 
        conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred))
        chan = conn.channel()
        chan.basic_publish(
            exchange = self.__exchange,
            routing_key = self.__queue,
            body = result
        )
        conn.close()
        return
 
publisher = Publisher()
publisher.main()

구동

$ python3 send_rabbitmq_humi_temp.py
Temp=25.4*C  Humidity=24.6%
{ "temp":25.4 , "humidity":24.6 }

2. File 데이타 발행 예제

기능설명

csv파일이나 라인기반 텍스트 파일에 데이타가 누적되는 즉시 메시지 브로커로 전달한다.
PAGE_COUNT : 한번에 처리할 라인 수
FILE_PATH : 처리할 라인기반 텍스트 파일명

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        lineFileProducer-out-0:
          destination: line-topic
          binder: kafka
      function:
        definition: lineFileProducer

관련소스

유형 대상소스명 설명
Supplier Bean /src/main/java/egovframework/webflux/stream/file/EgovStreamLineFile.java 라인기반 파일 데이타 발행

3. Log 데이타 발행 예제

기능설명

Log 데이타 처리를 LogBack KafkaAppender를 통해 메시지 브로커로 간단하게 전달하는 방법을 소개한다.

관련소스

유형 대상소스명 설명
Supplier /src/main/resources/logback.xml 로그에 대해 데이타 발행

4. DB 데이타 발행 예제

기능설명

CRUD에 의한 DB 처리 수행시 AOP에 의해 필요한 데이타를 전달한다.

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        historyDb:
          destination: db-topic
          binder: kafka

관련소스

유형 대상소스명 설명
AOP /src/main/java/egovframework/webflux/stream/aspect/AspectDbHistory.java DB 처리시 AOP 수행
ListDTO /src/main/java/egovframework/webflux/stream/aspect/dto/SampleListDTO.java SampleDTO List
DTO /src/main/java/egovframework/webflux/stream/aspect/dto/SampleDTO.java DB데이타 전달용 DTO
DTO /src/main/java/egovframework/webflux/stream/aspect/dto/DefaultDTO.java AOP관련 자료 DTO
Configuration /src/main/java/egovframework/webflux/config/EgovR2dbcConfig.java R2dbc 설정
Configuration /src/main/java/egovframework/webflux/config/EgovH2ConsoleConfig.java H2 웹 콘솔 구동

5. Open API 데이타 발행 예제

기능설명

전동차 위치정보를 JSON데이타 기반의 Open API를 이용하여 수신하는데 Spring Cloud Open Feign을 활용한다.
JSON 및 XML로 정보를 수신할수 있으며 여기서는 JSON을 사용한다.

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        trainApiProducer-out-0:
          destination: api-topic
          binder: kafka
      function:
        definition: trainApiProducer

관련소스

유형 대상소스명 설명
Configuration /src/main/java/egovframework/webflux/stream/api/EgovConfigTrainApi.java Supplier Bean 생성
Configuration /src/main/java/egovframework/webflux/stream/feign/EgovConfigOpenFeign.java ConfigOpenFeign Decoder/Encoder 설정(JSON)
Configuration /src/main/java/egovframework/webflux/stream/feign/EgovTrainFeignClient.java OpenFeign Client 설정
DTO /src/main/java/egovframework/webflux/stream/feign/ErrorMessage.java 에러 메세지 처리용 DTO
DTO /src/main/java/egovframework/webflux/stream/feign/RealtimePosition.java 전동차 위치정보 DTO
DTO /src/main/java/egovframework/webflux/stream/feign/ResultTrainPosition.java 전동차 위치정보 List DTO
DTO /src/main/java/egovframework/webflux/stream/feign/XmlResultTrainPosition.java 전동차 위치정보 DTO (XML)

소스코드

참고자료

 
egovframework/rte4/rex/publish_data_stream.txt · 마지막 수정: 2024/02/06 07:19 (외부 편집기)
 
이 위키의 내용은 다음의 라이센스에 따릅니다 :CC Attribution-Noncommercial-Share Alike 3.0 Unported
전자정부 표준프레임워크 라이센스(바로가기)

전자정부 표준프레임워크 활용의 안정성 보장을 위해 위험성을 지속적으로 모니터링하고 있으나, 오픈소스의 특성상 문제가 발생할 수 있습니다.
전자정부 표준프레임워크는 Apache 2.0 라이선스를 따르고 있는 오픈소스 프로그램입니다. Apache 2.0 라이선스에 따라 표준프레임워크를 활용하여 발생된 업무중단, 컴퓨터 고장 또는 오동작으로 인한 손해 등에 대해서 책임이 없습니다.
Recent changes RSS feed CC Attribution-Noncommercial-Share Alike 3.0 Unported Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki