Spring Cloud Stream 기반으로 처리되는 구독(Subscribe) 예제 5종을 제공하며 메시지 브로커의 메시지를 전달 받아 다양한 방법으로 처리한다.
순번 | 유형 | Messsage Broker | 기반 | 구독 데이터 | UI URL | 비고 |
---|---|---|---|---|---|---|
1 | IOT | RabbitMQ | Spring Cloud Stream | Websocket(Google Chart) 출력 | /sensor | |
2 | Log | Apache Kafka | OpenSearch | OpenDashboard로 출력 | OpenLogstash로 자료 처리 및 연동 | |
3 | File | Apache Kafka | Spring Cloud Stream | Websocket이용하여 출력 | /file_line | |
4 | DB | Apache Kafka | Spring Cloud Stream | Websocket(Google Chart) 출력 | /db | |
5 | Open API | Apache Kafka | Spring Cloud Stream | MongoDB 저장 |
메시지 브로커에서 온도 및 습도 정보를 구독하여 관련 데이타를 가져온 다음 Websocket을 이용하여 화면에 구글차트로 결과를 보여준다.
유형 | 대상소스명 | 설명 |
---|---|---|
Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 온/습도 데이타 구독 |
Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 온/습도 컨트롤러 클래스 |
Thymeleaf | /src/main/resources/templates/thymeleaf/egovSensor.html | 온/습도 수신 데이타 화면 표시 |
Websocket | /src/main/java/egovframework/webflux/websocket/SensorWebSocketHandler.java | 온/습도 데이타 웹소켓 설정 |
spring: cloud: stream: bindings: sensorThermo-in-0: destination: sensor binder: rabbit function: definition: sensorThermo
메시지 브로커에서 라인기반 파일 정보를 구독하여 관련 데이타를 가져온 Websocket을 이용하여 화면에 결과를 보여준다.
유형 | 대상소스명 | 설명 |
---|---|---|
Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 라인기반 파일 데이타 구독 |
Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 라인기반 파일 컨트롤러 클래스 |
Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerFileLine.html | 라인기반 파일 구독 데이타 결과 표시 |
Websocket | /src/main/java/egovframework/webflux/websocket/FileWebSocketHandler.java | 파일 데이타 웹소켓 설정 |
spring: cloud: stream: bindings: fileLine-in-0: destination: line-topic binder: kafka function: definition: fileLine
Elastic Search가 상용화 되면서 해당 OSS(Open Source Software)에서 분기한 OpenSearch를 이용하면 편리하게 데이터를 수신하여 시각화 할 수 있다. 이때 메시지 서버와 연계는 Logstash를 이용하여 처리한다.
Logstash는 로그 형태의 데이타를 OpenSearch로 쉽게 전달할 수 있다.
* input > group_id : consumer 그룹 지정
* input > topics : egov-logs (Kafka 토픽 명)
* input > decorate_events ⇒ 이걸 설정해야 @metadata 같이 옵션 사용가능
* filter > grok 플러그인 : 메시지 패턴 적용
* output > index : YYYY.MM.dd로 일별 구분
input { kafka { bootstrap_servers => "192.168.100.50:9092" group_id => "logstash" topics => ["egov-logs"] consumer_threads => 1 decorate_events => true } } filter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] \[%{DATA:thread}] %{LOGLEVEL:logLevel} %{JAVACLASS:packageClass} - %{GREEDYDATA:msg}" } } } output { stdout {} opensearch { hosts => ["https://192.168.100.50:9200"] index => "egov-logs-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl => true ssl_certificate_verification => false } }
메시지 브로커에서 DB CRUD 정보를 구독하여 Websocket을 이용하여 화면에 결과를 보여준다.
유형 | 대상소스명 | 설명 |
---|---|---|
Consumer Bean | /src/main/java/egovframework/webflux/stream/db/EgovConsumerDb.java | DB 데이터 구독 |
Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | DB 데이터 컨트롤러 클래스 |
Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerDb.html | DB 데이터 구독 결과 표시 |
Websocket | /src/main/java/egovframework/webflux/websocket/DbWebSocketHandler.java | DB 데이터 웹소켓 설정 |
spring: cloud: stream: bindings: historyDb-in-0: destination: db-topic binder: kafka function: definition: historyDb
메시지 브로커에서 Open API 정보를 구독하여 MongoDB에 저장한다.
유형 | 대상소스명 | 설명 |
---|---|---|
Consumer Bean | /src/main/java/egovframework/webflux/stream/api/EgovConsumerTrainApi.java | Open API 데이터 구독 |
DTO | /src/main/java/egovframework/webflux/stream/api/dto/RealtimePositionDTO.java | Open API 데이터 DTO |
Entity | /src/main/java/egovframework/webflux/stream/api/entity/RealtimePosition.java | Open API 데이터 Entity |
spring: cloud: stream: bindings: trainApi-in-0: destination: api-topic binder: kafka function: definition: trainApi