====== Data Stream 예제(Publish) ====== ===== 개요 ===== Spring Cloud Stream 기반으로 처리되는 발행 예제 5종을 제공하며 발행된 메시지는 메시지 브로커로 전달된다. === 데이터 발행 유형 === ^ 순번 ^ 유형 ^ 발행 ^ 기반 ^ Messsage Broker ^ 비고 ^ | 1 | IOT | 온습도 센서 | 라즈베리파이 | RabbitMQ | 센서정보를 GPIO를 통해 수신하여 Python 기반으로 송신 | | 2 | Log | Logback Appender | Spring Boot | Apache Kafka | Logback 기반의 Appender 사용 | | 3 | File | Line 기반 파일 | Spring Cloud Stream | Apache Kafka | | | 4 | DB | H2 DB | Spring Cloud Stream | Apache Kafka | | | 5 | Open API | 지하철 전동차 정보 | Spring Cloud Stream | Apache Kafka | | ===== 설명 ===== ==== 1. IOT 데이타 발행 예제 ==== === python3 기반 소스코드 === Adafruit_DHT 패키지및 AMQP통신을 위한 pika 라이브러리를 사용한다. #!/usr/bin/python3 import Adafruit_DHT # 라이브러리 불러오기 import pika # AMQP 라이브러리 class Publisher: def __init__(self): self.__url = '192.168.100.50' self.__port = 5672 self.__vhost = 'egov' self.__cred = pika.PlainCredentials('guest', 'guest') self.__exchange = 'sensor'; self.__queue = 'q_temp_humi'; return def main(self): sensor = Adafruit_DHT.DHT22 # sensor 객체 생성 pin = 4 # Data핀의 GPIO핀 넘버 humidity, temperature = Adafruit_DHT.read_retry(sensor, pin) # 센서 객체에서 센서 값(ㅇ노도, 습도) 읽기 result = '' if humidity is not None and temperature is not None: #습도 및 온도 값이 모두 제대로 읽혔다면 result = '{{ "temp":{0:0.1f} , "humidity":{1:0.1f} }}'.format(temperature, humidity) print('Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity)) # 온도, 습도 순으로 표시 else: # 에러가 생겼다면 result = '{"temp":-100.0 , "humidity": -1.0}' print('Failed to get reading. Try again!') # 에러 표시 print(result) # 온도, 습도 순으로 표시 conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() chan.basic_publish( exchange = self.__exchange, routing_key = self.__queue, body = result ) conn.close() return publisher = Publisher() publisher.main() === 구동 === $ python3 send_rabbitmq_humi_temp.py Temp=25.4*C Humidity=24.6% { "temp":25.4 , "humidity":24.6 } ==== 2. File 데이타 발행 예제 ==== === 기능설명 === csv파일이나 라인기반 텍스트 파일에 데이타가 누적되는 즉시 메시지 브로커로 전달한다.\\ PAGE_COUNT : 한번에 처리할 라인 수\\ FILE_PATH : 처리할 라인기반 텍스트 파일명\\ === 바인딩 설정 === spring: cloud: stream: bindings: lineFileProducer-out-0: destination: line-topic binder: kafka function: definition: lineFileProducer === 관련소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Supplier Bean | /src/main/java/egovframework/webflux/stream/file/EgovStreamLineFile.java | 라인기반 파일 데이타 발행 | ==== 3. Log 데이타 발행 예제 ==== === 기능설명 === Log 데이타 처리를 LogBack KafkaAppender를 통해 메시지 브로커로 간단하게 전달하는 방법을 소개한다. === 관련소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Supplier | /src/main/resources/logback.xml | 로그에 대해 데이타 발행 | ==== 4. DB 데이타 발행 예제 ==== === 기능설명 === CRUD에 의한 DB 처리 수행시 AOP에 의해 필요한 데이타를 전달한다. === 바인딩 설정 === spring: cloud: stream: bindings: historyDb: destination: db-topic binder: kafka === 관련소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | AOP| /src/main/java/egovframework/webflux/stream/aspect/AspectDbHistory.java | DB 처리시 AOP 수행 | | ListDTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleListDTO.java | SampleDTO List | | DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleDTO.java | DB데이타 전달용 DTO | | DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/DefaultDTO.java | AOP관련 자료 DTO | | Configuration | /src/main/java/egovframework/webflux/config/EgovR2dbcConfig.java | R2dbc 설정 | | Configuration | /src/main/java/egovframework/webflux/config/EgovH2ConsoleConfig.java | H2 웹 콘솔 구동 | ==== 5. Open API 데이타 발행 예제 ==== === 기능설명 === 전동차 위치정보를 JSON데이타 기반의 Open API를 이용하여 수신하는데 Spring Cloud Open Feign을 활용한다.\\ JSON 및 XML로 정보를 수신할수 있으며 여기서는 JSON을 사용한다. === 바인딩 설정 === spring: cloud: stream: bindings: trainApiProducer-out-0: destination: api-topic binder: kafka function: definition: trainApiProducer === 관련소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Configuration | /src/main/java/egovframework/webflux/stream/api/EgovConfigTrainApi.java | Supplier Bean 생성 | | Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovConfigOpenFeign.java | ConfigOpenFeign Decoder/Encoder 설정(JSON) | | Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovTrainFeignClient.java | OpenFeign Client 설정 | | DTO | /src/main/java/egovframework/webflux/stream/feign/ErrorMessage.java | 에러 메세지 처리용 DTO | | DTO | /src/main/java/egovframework/webflux/stream/feign/RealtimePosition.java | 전동차 위치정보 DTO | | DTO | /src/main/java/egovframework/webflux/stream/feign/ResultTrainPosition.java | 전동차 위치정보 List DTO | | DTO | /src/main/java/egovframework/webflux/stream/feign/XmlResultTrainPosition.java | 전동차 위치정보 DTO (XML) | ===== 소스코드 ===== ===== 참고자료 ===== * [[egovframework:rte2:rex:clouddatastream|Cloud Data Stream 환경설정]] * [[https://ko.wikipedia.org/wiki/GPIO|GPIO(general-purpose input/output)]]