====== Data Stream 예제(Subscribe) ====== ===== 개요 ===== Spring Cloud Stream 기반으로 처리되는 구독(Subscribe) 예제 5종을 제공하며 메시지 브로커의 메시지를 전달 받아 다양한 방법으로 처리한다. === 데이터 구독 유형 === ^ 순번 ^ 유형 ^ Messsage Broker ^ 기반 ^ 구독 데이터 ^ UI URL ^ 비고 ^ | 1 | IOT | RabbitMQ | Spring Cloud Stream | Websocket(Google Chart) 출력 | /sensor | | 2 | Log | Apache Kafka | OpenSearch | OpenDashboard로 출력 | | OpenLogstash로 자료 처리 및 연동 | | 3 | File | Apache Kafka | Spring Cloud Stream | Websocket이용하여 출력 | /file_line | | | 4 | DB | Apache Kafka | Spring Cloud Stream | Websocket(Google Chart) 출력 | /db | | | 5 | Open API | Apache Kafka | Spring Cloud Stream | MongoDB 저장 | | | ===== 설명 ===== ==== 1. IOT 데이타 구독 예제 ==== === 기능 설명 === 메시지 브로커에서 온도 및 습도 정보를 구독하여 관련 데이타를 가져온 다음 Websocket을 이용하여 화면에 구글차트로 결과를 보여준다. === 관련 소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 온/습도 데이타 구독 | | Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 온/습도 컨트롤러 클래스 | | Thymeleaf | /src/main/resources/templates/thymeleaf/egovSensor.html | 온/습도 수신 데이타 화면 표시 | | Websocket | /src/main/java/egovframework/webflux/websocket/SensorWebSocketHandler.java | 온/습도 데이타 웹소켓 설정 | === 바인딩 설정 === spring: cloud: stream: bindings: sensorThermo-in-0: destination: sensor binder: rabbit function: definition: sensorThermo === 결과 화면 === 수집된 온습도 데이터에 대해 실시간으로 차트에 결과를 보여준다.\\ {{:egovframework:rte4:rex:consumer_sesor_chart.png?700|}} ==== 2. File 데이타 구독 예제 ==== 메시지 브로커에서 라인기반 파일 정보를 구독하여 관련 데이타를 가져온 Websocket을 이용하여 화면에 결과를 보여준다. === 관련 소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 라인기반 파일 데이타 구독 | | Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 라인기반 파일 컨트롤러 클래스 | | Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerFileLine.html | 라인기반 파일 구독 데이타 결과 표시 | | Websocket | /src/main/java/egovframework/webflux/websocket/FileWebSocketHandler.java | 파일 데이타 웹소켓 설정 | === 바인딩 설정 === spring: cloud: stream: bindings: fileLine-in-0: destination: line-topic binder: kafka function: definition: fileLine === 결과 화면 === 전달된 텍스트 데이타에 대해 실시간으로 화면에 결과를 보여준다.\\ {{:egovframework:rte4:rex:consumer_file_line_result.png?600|}} ==== 3. Log 데이타 구독 예제 ==== Elastic Search가 상용화 되면서 해당 OSS(Open Source Software)에서 분기한 OpenSearch를 이용하면 편리하게 데이터를 수신하여 시각화 할 수 있다. 이때 메시지 서버와 연계는 Logstash를 이용하여 처리한다. === Logstash === Logstash는 로그 형태의 데이타를 OpenSearch로 쉽게 전달할 수 있다. == logstash.conf == * input > group_id : consumer 그룹 지정\\ * input > topics : egov-logs (Kafka 토픽 명)\\ * input > decorate_events => 이걸 설정해야 @metadata 같이 옵션 사용가능\\ * filter > grok 플러그인 : 메시지 패턴 적용\\ * output > index : YYYY.MM.dd로 일별 구분\\ input { kafka { bootstrap_servers => "192.168.100.50:9092" group_id => "logstash" topics => ["egov-logs"] consumer_threads => 1 decorate_events => true } } filter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] \[%{DATA:thread}] %{LOGLEVEL:logLevel} %{JAVACLASS:packageClass} - %{GREEDYDATA:msg}" } } } output { stdout {} opensearch { hosts => ["https://192.168.100.50:9200"] index => "egov-logs-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl => true ssl_certificate_verification => false } } === 결과 화면 === {{:egovframework:rte4:rex:consumer_opensearch_result.png?800|}} ==== 4. DB 데이타 구독 예제 ==== 메시지 브로커에서 DB CRUD 정보를 구독하여 Websocket을 이용하여 화면에 결과를 보여준다. === 관련 소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Consumer Bean | /src/main/java/egovframework/webflux/stream/db/EgovConsumerDb.java | DB 데이터 구독 | | Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | DB 데이터 컨트롤러 클래스 | | Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerDb.html | DB 데이터 구독 결과 표시 | | Websocket | /src/main/java/egovframework/webflux/websocket/DbWebSocketHandler.java | DB 데이터 웹소켓 설정 | === 바인딩 설정 === spring: cloud: stream: bindings: historyDb-in-0: destination: db-topic binder: kafka function: definition: historyDb === 결과 화면 === CRUD에 의해 조회된 이력정보를 구독하여 실시간으로 차트에 출력한다.\\ {{:egovframework:rte4:rex:consumer_db_result.png?800|}} ==== 5. Open API 데이타 구독 예제 ==== 메시지 브로커에서 Open API 정보를 구독하여 MongoDB에 저장한다. === 관련 소스 === ^ 유형 ^ 대상소스명 ^ 설명 ^ | Consumer Bean | /src/main/java/egovframework/webflux/stream/api/EgovConsumerTrainApi.java | Open API 데이터 구독 | | DTO | /src/main/java/egovframework/webflux/stream/api/dto/RealtimePositionDTO.java | Open API 데이터 DTO | | Entity | /src/main/java/egovframework/webflux/stream/api/entity/RealtimePosition.java | Open API 데이터 Entity | === 바인딩 설정 === spring: cloud: stream: bindings: trainApi-in-0: destination: api-topic binder: kafka function: definition: trainApi === 결과 화면 === 구독된 데이타는 즉시 MongoDB의 Collection에 저장된다.\\ MongoDB에서 제공하는 Compass client를 사용하면 손쉽게 Collection 확인이 가능하다.\\ {{:egovframework:rte4:rex:comsumer_store_mongo.png?800|}} ===== 소스코드 ===== ===== 참고자료 ===== * [[egovframework:rte2:rex:clouddatastream|Cloud Data Stream 환경설정]] * [[https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html|grok 문서]] * [[https://github.com/elastic/logstash/blob/v1.4.0/patterns/grok-patterns|grok 패턴 정의]] * [[https://grokconstructor.appspot.com/do/match#result|grok 패턴 테스트 URL]] * [[https://www.mongodb.com/ko-kr/products/tools/compass|MongoDB Compass]]