====== Data Stream 예제(Publish) ======
===== 개요 =====
Spring Cloud Stream 기반으로 처리되는 발행 예제 5종을 제공하며 발행된 메시지는 메시지 브로커로 전달된다.
=== 데이터 발행 유형 ===
^ 순번 ^ 유형 ^ 발행 ^ 기반 ^ Messsage Broker ^ 비고 ^
| 1 | IOT | 온습도 센서 | 라즈베리파이 | RabbitMQ | 센서정보를 GPIO를 통해 수신하여 Python 기반으로 송신 |
| 2 | Log | Logback Appender | Spring Boot | Apache Kafka | Logback 기반의 Appender 사용 |
| 3 | File | Line 기반 파일 | Spring Cloud Stream | Apache Kafka | |
| 4 | DB | H2 DB | Spring Cloud Stream | Apache Kafka | |
| 5 | Open API | 지하철 전동차 정보 | Spring Cloud Stream | Apache Kafka | |
===== 설명 =====
==== 1. IOT 데이타 발행 예제 ====
=== python3 기반 소스코드 ===
Adafruit_DHT 패키지및 AMQP통신을 위한 pika 라이브러리를 사용한다.
#!/usr/bin/python3
import Adafruit_DHT # 라이브러리 불러오기
import pika # AMQP 라이브러리
class Publisher:
def __init__(self):
self.__url = '192.168.100.50'
self.__port = 5672
self.__vhost = 'egov'
self.__cred = pika.PlainCredentials('guest', 'guest')
self.__exchange = 'sensor';
self.__queue = 'q_temp_humi';
return
def main(self):
sensor = Adafruit_DHT.DHT22 # sensor 객체 생성
pin = 4 # Data핀의 GPIO핀 넘버
humidity, temperature = Adafruit_DHT.read_retry(sensor, pin) # 센서 객체에서 센서 값(ㅇ노도, 습도) 읽기
result = ''
if humidity is not None and temperature is not None: #습도 및 온도 값이 모두 제대로 읽혔다면
result = '{{ "temp":{0:0.1f} , "humidity":{1:0.1f} }}'.format(temperature, humidity)
print('Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity)) # 온도, 습도 순으로 표시
else: # 에러가 생겼다면
result = '{"temp":-100.0 , "humidity": -1.0}'
print('Failed to get reading. Try again!') # 에러 표시
print(result) # 온도, 습도 순으로 표시
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred))
chan = conn.channel()
chan.basic_publish(
exchange = self.__exchange,
routing_key = self.__queue,
body = result
)
conn.close()
return
publisher = Publisher()
publisher.main()
=== 구동 ===
$ python3 send_rabbitmq_humi_temp.py
Temp=25.4*C Humidity=24.6%
{ "temp":25.4 , "humidity":24.6 }
==== 2. File 데이타 발행 예제 ====
=== 기능설명 ===
csv파일이나 라인기반 텍스트 파일에 데이타가 누적되는 즉시 메시지 브로커로 전달한다.\\
PAGE_COUNT : 한번에 처리할 라인 수\\
FILE_PATH : 처리할 라인기반 텍스트 파일명\\
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
lineFileProducer-out-0:
destination: line-topic
binder: kafka
function:
definition: lineFileProducer
=== 관련소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Supplier Bean | /src/main/java/egovframework/webflux/stream/file/EgovStreamLineFile.java | 라인기반 파일 데이타 발행 |
==== 3. Log 데이타 발행 예제 ====
=== 기능설명 ===
Log 데이타 처리를 LogBack KafkaAppender를 통해 메시지 브로커로 간단하게 전달하는 방법을 소개한다.
=== 관련소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Supplier | /src/main/resources/logback.xml | 로그에 대해 데이타 발행 |
==== 4. DB 데이타 발행 예제 ====
=== 기능설명 ===
CRUD에 의한 DB 처리 수행시 AOP에 의해 필요한 데이타를 전달한다.
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
historyDb:
destination: db-topic
binder: kafka
=== 관련소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| AOP| /src/main/java/egovframework/webflux/stream/aspect/AspectDbHistory.java | DB 처리시 AOP 수행 |
| ListDTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleListDTO.java | SampleDTO List |
| DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/SampleDTO.java | DB데이타 전달용 DTO |
| DTO | /src/main/java/egovframework/webflux/stream/aspect/dto/DefaultDTO.java | AOP관련 자료 DTO |
| Configuration | /src/main/java/egovframework/webflux/config/EgovR2dbcConfig.java | R2dbc 설정 |
| Configuration | /src/main/java/egovframework/webflux/config/EgovH2ConsoleConfig.java | H2 웹 콘솔 구동 |
==== 5. Open API 데이타 발행 예제 ====
=== 기능설명 ===
전동차 위치정보를 JSON데이타 기반의 Open API를 이용하여 수신하는데 Spring Cloud Open Feign을 활용한다.\\
JSON 및 XML로 정보를 수신할수 있으며 여기서는 JSON을 사용한다.
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
trainApiProducer-out-0:
destination: api-topic
binder: kafka
function:
definition: trainApiProducer
=== 관련소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Configuration | /src/main/java/egovframework/webflux/stream/api/EgovConfigTrainApi.java | Supplier Bean 생성 |
| Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovConfigOpenFeign.java | ConfigOpenFeign Decoder/Encoder 설정(JSON) |
| Configuration | /src/main/java/egovframework/webflux/stream/feign/EgovTrainFeignClient.java | OpenFeign Client 설정 |
| DTO | /src/main/java/egovframework/webflux/stream/feign/ErrorMessage.java | 에러 메세지 처리용 DTO |
| DTO | /src/main/java/egovframework/webflux/stream/feign/RealtimePosition.java | 전동차 위치정보 DTO |
| DTO | /src/main/java/egovframework/webflux/stream/feign/ResultTrainPosition.java | 전동차 위치정보 List DTO |
| DTO | /src/main/java/egovframework/webflux/stream/feign/XmlResultTrainPosition.java | 전동차 위치정보 DTO (XML) |
===== 소스코드 =====
===== 참고자료 =====
* [[egovframework:rte2:rex:clouddatastream|Cloud Data Stream 환경설정]]
* [[https://ko.wikipedia.org/wiki/GPIO|GPIO(general-purpose input/output)]]