본문 바로가기

DevOps & Cloud

8. 데이터 파이프라인

728x90
반응형

연구실 프로젝트 Season1 Ep8

 

 

Change Stream은 MongoDB의 변경을 애플리케이션에 실시간으로 전달해 주는 기능으로, MongoDB v3.6에 추가되었다. MongoDB가 Publisher, 애플리케이션이 Subscriber가 되는 형태이다. 애플리케이션은 데이터의 변경을 전달받아서, 목적에 따라 여러 가지 작업을 수행할 수 있다. 예를 들면, 데이터 변경에 대한 알림 메일을 전송한다거나, 데이터 분석을 위해 별도 시스템으로 데이터를 전송하는 것이다.

 

while 1 안에서 collection.watch() 함수에 새로운 데이터가 추가된 것이 포착되면 해당 데이터를 change_stream에 반환한다.이렇게 하면 실시간으로 데이터 파이프라인 처럼 구현이 가능하다.

 

또한 해당 스트림을 사용하기 위해서는 하위 노드에 몽고 DB를 설치하고 Replica Set을 구성해야 사용가능하다.Replica Set을 구성하는 것은 하단 블로그를 참고했다.https://jinyes-tistory.tistory.com/229

 

몽고디비 레플리카셋 구축하기

복제 셋(replica set)을 구축하기 위한 단계는 다음과 같다. 1. 각 노드에 몽고 디비 설치 (나는 전부 4.4.5로 버전으로 통일시켰다.) 2. mongod.conf 설정 3. 각 노드에서 몽고디비 실행 4. Primary로 사용할

jinyes-tistory.tistory.com

 

결과적으로 아래와 같은 파이프라인을 작성해서 얻고자 하는 것은 실시간으로 데이터를 DB에 쌓으면서 동시에 모델을 사용해서 이상탐지를 할 수 있도록 하기 위함이다.

#데이터 파이프 라인 코드

from pymongo import MongoClient
from pymongo.errors import PyMongoError

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["logs"]
collection = db["network_flows"]

# Set up change stream to listen for new insertions
while 1:
    try:
        change_stream = collection.watch()
        for change in change_stream:
            if change["operationType"] == "insert":
                new_data = change["fullDocument"]
                print(new_data)

    except PyMongoError as e:
        print(e)

 

위에서 학습된 모델을 호출해서 predict 함수를 통해서 평가하면 분류가 가능할 것이다.

 

kafka를 사용하기 위해서 파이썬 버전 변경도 했으나 아직 사용하지 않음.. kafka는 계속 오류 발생.

 

파이썬 버전 변경: https://mickael-k.tistory.com/92

 

[Ubuntu] 파이썬(Python) 버전 변경 방법

기본 우분투를 깔고 터미널창을 열고 'python'이란 명령어를 치면 자동으로 python 2.x버전이 잡힙니다. 이번 포스트에서는 리눅스 Alternatives를 활용하여 Python의 기본 default인 2에서 새롭게 3으로 바

mickael-k.tistory.com

 

온라인 학습을 하려고 했으나, 일단 모델이 잘 작동하는지 부터 확인하고 싶었다. 해당 db, 해당 collection에 존재하는 필드들을 csv 형태로 추출하는 명령어다.

#몽고DB csv로 추출하는 명령어 
mongoexport --db logs --port 27017 --collection network_flows --type=csv --fields IP_source,IP_destination,l4_TCP_source_port,l4_TCP_destination_port,l4_TCP_flags_ACK,source_identity,destination_identity,destination_labels_0,destination_labels_1,destination_labels_2,destination_labels_3,destination_labels_4,Type,traffic_direction,trace_observation_point,event_type_type,Summary --out ./train.csv

 

해당 명령어는 mongosh 안에서 수행해야한다. 

728x90
반응형

'DevOps & Cloud' 카테고리의 다른 글

클라우드 컴퓨팅 정리  (0) 2023.04.13
9. 인공지능 모델 학습  (0) 2023.01.21
추천 프로젝트 By Chat GPT  (0) 2023.01.18
7. 네트워크 로그 수집 자동화(Protobuf, gRPC)  (0) 2023.01.13
6. Go & Mongo DB  (0) 2023.01.13