Data Stream 예제(Subscribe)

개요

Spring Cloud Stream 기반으로 처리되는 구독(Subscribe) 예제 5종을 제공하며 메시지 브로커의 메시지를 전달 받아 다양한 방법으로 처리한다.

데이터 구독 유형

순번 유형 Messsage Broker 기반 구독 데이터 비고
1 IOT RabbitMQ Spring Cloud Stream Websocket(Google Chart) 출력
2 Log Apache Kafka OpenSearch OpenDashboard로 출력 OpenLogstash로 자료 처리 및 연동
3 File Apache Kafka Spring Cloud Stream Websocket이용하여 출력
4 DB Apache Kafka Spring Cloud Stream Websocket(Google Chart) 출력
5 Open API Apache Kafka Spring Cloud Stream MongoDB 저장

설명

1. IOT 데이타 구독 예제

기능 설명

메시지 브로커에서 온도 및 습도 정보를 구독하여 관련 데이타를 가져온 다음 Websocket을 이용하여 화면에 구글차트로 결과를 보여준다.

관련 소스

유형 대상소스명 설명
Consumer Bean /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java 온/습도 데이타 구독
Controller /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java 온/습도 컨트롤러 클래스
Thymeleaf /src/main/resources/templates/thymeleaf/egovSensor.html 온/습도 수신 데이타 화면 표시
Websocket /src/main/java/egovframework/webflux/websocket/SensorWebSocketHandler.java 온/습도 데이타 웹소켓 설정

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        sensorThermo-in-0:
          destination: sensor
          binder: rabbit
      function:
        definition: sensorThermo

결과 화면

수집된 온습도 데이터에 대해 실시간으로 차트에 결과를 보여준다.

2. File 데이타 구독 예제

메시지 브로커에서 라인기반 파일 정보를 구독하여 관련 데이타를 가져온 Websocket을 이용하여 화면에 결과를 보여준다.

관련 소스

유형 대상소스명 설명
Consumer Bean /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java 라인기반 파일 데이타 구독
Controller /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java 라인기반 파일 컨트롤러 클래스
Thymeleaf /src/main/resources/templates/thymeleaf/egovConsumerFileLine.html 라인기반 파일 구독 데이타 결과 표시
Websocket /src/main/java/egovframework/webflux/websocket/FileWebSocketHandler.java 파일 데이타 웹소켓 설정

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        fileLine-in-0:
          destination: line-topic
          binder: kafka
      function:
        definition: fileLine

결과 화면

전달된 텍스트 데이타에 대해 실시간으로 화면에 결과를 보여준다.

3. Log 데이타 구독 예제

Elastic Search가 상용화 되면서 해당 OSS(Open Source Software)에서 분기한 OpenSearch를 이용하면 편리하게 데이터를 수신하여 시각화 할 수 있다. 이때 메시지 서버와 연계는 Logstash를 이용하여 처리한다.

Logstash

Logstash는 로그 형태의 데이타를 OpenSearch로 쉽게 전달할 수 있다.

logstash.conf

* input > group_id : consumer 그룹 지정
* input > topics : egov-logs (Kafka 토픽 명)
* input > decorate_events ⇒ 이걸 설정해야 @metadata 같이 옵션 사용가능
* filter > grok 플러그인 : 메시지 패턴 적용
* output > index : YYYY.MM.dd로 일별 구분

input {
  kafka {
    bootstrap_servers => "192.168.100.50:9092"
    group_id => "logstash"
    topics => ["egov-logs"]
    consumer_threads => 1
    decorate_events => true
  }
 
}
 
filter {
  grok {
    match => {
	  "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] \[%{DATA:thread}] %{LOGLEVEL:logLevel} %{JAVACLASS:packageClass} - %{GREEDYDATA:msg}"
    }
  }
}
 
output {
 
  stdout {}
 
  opensearch {
    hosts => ["https://192.168.100.50:9200"]
    index => "egov-logs-%{+YYYY.MM.dd}"
    user => "admin"
    password => "admin"
    ssl => true
    ssl_certificate_verification => false
  }
 
}

결과 화면

4. DB 데이타 구독 예제

메시지 브로커에서 DB CRUD 정보를 구독하여 Websocket을 이용하여 화면에 결과를 보여준다.

관련 소스

유형 대상소스명 설명
Consumer Bean /src/main/java/egovframework/webflux/stream/db/EgovConsumerDb.java DB 데이터 구독
Controller /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java DB 데이터 컨트롤러 클래스
Thymeleaf /src/main/resources/templates/thymeleaf/egovConsumerDb.html DB 데이터 구독 결과 표시
Websocket /src/main/java/egovframework/webflux/websocket/DbWebSocketHandler.java DB 데이터 웹소켓 설정

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        historyDb-in-0:
          destination: db-topic
          binder: kafka
      function:
        definition: historyDb

결과 화면

CRUD에 의해 조회된 이력정보를 구독하여 실시간으로 차트에 출력한다.

5. Open API 데이타 구독 예제

메시지 브로커에서 Open API 정보를 구독하여 MongoDB에 저장한다.

관련 소스

유형 대상소스명 설명
Consumer Bean /src/main/java/egovframework/webflux/stream/api/EgovConsumerTrainApi.java Open API 데이터 구독
DTO /src/main/java/egovframework/webflux/stream/api/dto/RealtimePositionDTO.java Open API 데이터 DTO
Entity /src/main/java/egovframework/webflux/stream/api/entity/RealtimePosition.java Open API 데이터 Entity

바인딩 설정

spring:
  cloud:
    stream:
      bindings:
        trainApi-in-0:
          destination: api-topic
          binder: kafka
      function:
        definition: trainApi

결과 화면

구독된 데이타는 즉시 MongoDB의 Collection에 저장된다.
MongoDB에서 제공하는 Compass client를 사용하면 손쉽게 Collection 확인이 가능하다.

소스코드

참고자료

 
egovframework/rte4/rex/subscribe_data_stream.txt · 마지막 수정: 2024/02/06 07:24 (외부 편집기)
 
이 위키의 내용은 다음의 라이센스에 따릅니다 :CC Attribution-Noncommercial-Share Alike 3.0 Unported
전자정부 표준프레임워크 라이센스(바로가기)

전자정부 표준프레임워크 활용의 안정성 보장을 위해 위험성을 지속적으로 모니터링하고 있으나, 오픈소스의 특성상 문제가 발생할 수 있습니다.
전자정부 표준프레임워크는 Apache 2.0 라이선스를 따르고 있는 오픈소스 프로그램입니다. Apache 2.0 라이선스에 따라 표준프레임워크를 활용하여 발생된 업무중단, 컴퓨터 고장 또는 오동작으로 인한 손해 등에 대해서 책임이 없습니다.
Recent changes RSS feed CC Attribution-Noncommercial-Share Alike 3.0 Unported Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki