====== Data Stream 예제(Subscribe) ======
===== 개요 =====
Spring Cloud Stream 기반으로 처리되는 구독(Subscribe) 예제 5종을 제공하며 메시지 브로커의 메시지를 전달 받아 다양한 방법으로 처리한다.
=== 데이터 구독 유형 ===
^ 순번 ^ 유형 ^ Messsage Broker ^ 기반 ^ 구독 데이터 ^ UI URL ^ 비고 ^
| 1 | IOT | RabbitMQ | Spring Cloud Stream | Websocket(Google Chart) 출력 | /sensor |
| 2 | Log | Apache Kafka | OpenSearch | OpenDashboard로 출력 | | OpenLogstash로 자료 처리 및 연동 |
| 3 | File | Apache Kafka | Spring Cloud Stream | Websocket이용하여 출력 | /file_line | |
| 4 | DB | Apache Kafka | Spring Cloud Stream | Websocket(Google Chart) 출력 | /db | |
| 5 | Open API | Apache Kafka | Spring Cloud Stream | MongoDB 저장 | | |
===== 설명 =====
==== 1. IOT 데이타 구독 예제 ====
=== 기능 설명 ===
메시지 브로커에서 온도 및 습도 정보를 구독하여 관련 데이타를 가져온 다음 Websocket을 이용하여 화면에 구글차트로 결과를 보여준다.
=== 관련 소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 온/습도 데이타 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 온/습도 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovSensor.html | 온/습도 수신 데이타 화면 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/SensorWebSocketHandler.java | 온/습도 데이타 웹소켓 설정 |
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
sensorThermo-in-0:
destination: sensor
binder: rabbit
function:
definition: sensorThermo
=== 결과 화면 ===
수집된 온습도 데이터에 대해 실시간으로 차트에 결과를 보여준다.\\
{{:egovframework:rte4:rex:consumer_sesor_chart.png?700|}}
==== 2. File 데이타 구독 예제 ====
메시지 브로커에서 라인기반 파일 정보를 구독하여 관련 데이타를 가져온 Websocket을 이용하여 화면에 결과를 보여준다.
=== 관련 소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 라인기반 파일 데이타 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 라인기반 파일 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerFileLine.html | 라인기반 파일 구독 데이타 결과 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/FileWebSocketHandler.java | 파일 데이타 웹소켓 설정 |
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
fileLine-in-0:
destination: line-topic
binder: kafka
function:
definition: fileLine
=== 결과 화면 ===
전달된 텍스트 데이타에 대해 실시간으로 화면에 결과를 보여준다.\\
{{:egovframework:rte4:rex:consumer_file_line_result.png?600|}}
==== 3. Log 데이타 구독 예제 ====
Elastic Search가 상용화 되면서 해당 OSS(Open Source Software)에서 분기한 OpenSearch를 이용하면 편리하게 데이터를 수신하여 시각화 할 수 있다.
이때 메시지 서버와 연계는 Logstash를 이용하여 처리한다.
=== Logstash ===
Logstash는 로그 형태의 데이타를 OpenSearch로 쉽게 전달할 수 있다.
== logstash.conf ==
* input > group_id : consumer 그룹 지정\\
* input > topics : egov-logs (Kafka 토픽 명)\\
* input > decorate_events => 이걸 설정해야 @metadata 같이 옵션 사용가능\\
* filter > grok 플러그인 : 메시지 패턴 적용\\
* output > index : YYYY.MM.dd로 일별 구분\\
input {
kafka {
bootstrap_servers => "192.168.100.50:9092"
group_id => "logstash"
topics => ["egov-logs"]
consumer_threads => 1
decorate_events => true
}
}
filter {
grok {
match => {
"message" => "\[%{TIMESTAMP_ISO8601:timestamp}] \[%{DATA:thread}] %{LOGLEVEL:logLevel} %{JAVACLASS:packageClass} - %{GREEDYDATA:msg}"
}
}
}
output {
stdout {}
opensearch {
hosts => ["https://192.168.100.50:9200"]
index => "egov-logs-%{+YYYY.MM.dd}"
user => "admin"
password => "admin"
ssl => true
ssl_certificate_verification => false
}
}
=== 결과 화면 ===
{{:egovframework:rte4:rex:consumer_opensearch_result.png?800|}}
==== 4. DB 데이타 구독 예제 ====
메시지 브로커에서 DB CRUD 정보를 구독하여 Websocket을 이용하여 화면에 결과를 보여준다.
=== 관련 소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Consumer Bean | /src/main/java/egovframework/webflux/stream/db/EgovConsumerDb.java | DB 데이터 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | DB 데이터 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerDb.html | DB 데이터 구독 결과 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/DbWebSocketHandler.java | DB 데이터 웹소켓 설정 |
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
historyDb-in-0:
destination: db-topic
binder: kafka
function:
definition: historyDb
=== 결과 화면 ===
CRUD에 의해 조회된 이력정보를 구독하여 실시간으로 차트에 출력한다.\\
{{:egovframework:rte4:rex:consumer_db_result.png?800|}}
==== 5. Open API 데이타 구독 예제 ====
메시지 브로커에서 Open API 정보를 구독하여 MongoDB에 저장한다.
=== 관련 소스 ===
^ 유형 ^ 대상소스명 ^ 설명 ^
| Consumer Bean | /src/main/java/egovframework/webflux/stream/api/EgovConsumerTrainApi.java | Open API 데이터 구독 |
| DTO | /src/main/java/egovframework/webflux/stream/api/dto/RealtimePositionDTO.java | Open API 데이터 DTO |
| Entity | /src/main/java/egovframework/webflux/stream/api/entity/RealtimePosition.java | Open API 데이터 Entity |
=== 바인딩 설정 ===
spring:
cloud:
stream:
bindings:
trainApi-in-0:
destination: api-topic
binder: kafka
function:
definition: trainApi
=== 결과 화면 ===
구독된 데이타는 즉시 MongoDB의 Collection에 저장된다.\\
MongoDB에서 제공하는 Compass client를 사용하면 손쉽게 Collection 확인이 가능하다.\\
{{:egovframework:rte4:rex:comsumer_store_mongo.png?800|}}
===== 소스코드 =====
===== 참고자료 =====
* [[egovframework:rte2:rex:clouddatastream|Cloud Data Stream 환경설정]]
* [[https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html|grok 문서]]
* [[https://github.com/elastic/logstash/blob/v1.4.0/patterns/grok-patterns|grok 패턴 정의]]
* [[https://grokconstructor.appspot.com/do/match#result|grok 패턴 테스트 URL]]
* [[https://www.mongodb.com/ko-kr/products/tools/compass|MongoDB Compass]]