본문 바로가기

데이터엔지니어링

[데이터엔지니어링] Apache Kafka Connect 설명

Kafka Connect

https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc/

  • 데이터베이스, 키-밸류 스토어와 같은 외부 시스템을 Kafka에 연결해 주는 구성 요소.
  • Kafka와 외부 시스템 간의 연결을 쉽고, 간편하게 해준다.
  • Consumer 와 Producer의 기능을 Code 작성 없이 설정만으로 구성할 수 있다.

Kafka connect의 도입 배경

Single Kafka System with Producer & Consumer

  • 카프카(kafka)는 프로듀서(Producer)와 컨슈머(Consumer)를 통해 다양한 외부 시스템 데이터를 주고 받으며 메세지 파이프라인 아키텍쳐를 구성.

Multiple Kafka System with Producer & Consumer

  • 하지만 이러한 파이프라인을 매번 구성하며 프로듀서와 컨슈머를 개발하는 것은 비용, 시간, 반복작업 측면에서 좋지 않습니다.
    • 반복적인 파이프라인 생성 작업이 있을 때, 매번 프로듀서, 컨슈머 애플리케이션을 개발하고 배포, 운영해야 하기 때문이죠.

Multiple Kafka System with Connect & Connector

  • Kafka Connect는 특정한 작업 형태를 템플릿으로 만들어놓은 커넥터(connector)를 실행함으로써 반복 작업을 줄일 수 있다.

실생활 예시

  • 집 하나를 만들 때
    • 집 템플릿이 있는 상진이는 템플릿을 가져와 바로 만듬.
    • 태혁이는 레고를 하나하나 조립해 집 하나를 만듬.

  • 하나하나 집을 만들어야 하는 태혁이와는 다르게 상진이는 집 템플릿이 있어서 편하고, 빠르게 마을 구성 가능!

  • 위의 집처럼 Kafka Connect는 미리 정의된 Connector 템플릿을 통해 자신이 원하는 환경설정만 적어주기만 하면 반복적인 파이프라인을 쉽게 작성할 수 있음

코드 예시

Connector을 이용하는 경우

이제 Connector를 이용하는 경우 단순히 어떤 connector를 사용할 지, 어떤 topic에 넣을지, DB의 주소, 비밀번호는 뭔지 등을 설정하는 json 파일을 통해 쉽게 구성이 가능합니다.

{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://postgres-server:5433/mydatabase",
        "connection.user": "myuser",
        "connection.password": "mypassword",
        "table.whitelist": "boaz",
        "topic.prefix": "postgres-source-",
        "topic.creation.default.partitions": 1,
        "topic.creation.default.replication.factor": 1,
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "tasks.max": 2
    }
}

Python으로 연결하려는 경우

그러나 이 작업을 gpt한테 Python으로 작성해보라고 시켜봤는데요. python으로 연결하려면 아래와 같이 코드를 작성하는 복잡한 과정이 존재합니다.

import psycopg2
from kafka import KafkaProducer

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic_prefix = 'postgres-source_'

# PostgreSQL configuration
db_host = 'postgres-server'
db_port = 5433
db_name = 'mydatabase'
db_user = 'myuser'
db_password = 'mypassword'
table_name = 'boaz'
incrementing_column = 'id'

# Connect to PostgreSQL
connection = psycopg2.connect(
    host=db_host,
    port=db_port,
    dbname=db_name,
    user=db_user,
    password=db_password
)
cursor = connection.cursor()

# Initialize Kafka Producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Query database and produce messages to Kafka
def produce_messages():
    cursor.execute(f"SELECT * FROM {table_name}")
    for row in cursor.fetchall():
        key = str(row[0]).encode('utf-8')  # Assuming the incrementing column is the first column
        value = ','.join([str(item) for item in row]).encode('utf-8')
        topic = topic_prefix + table_name
        producer.send(topic, key=key, value=value)
    producer.flush()

# Close database connection
def close_connection():
    cursor.close()
    connection.close()

# Run the producer
produce_messages()

# Close the connection
close_connection()

Kafka connect의 특징

Kafka connect는 대표적으로 다음과 같은 5가지 특징을 가지고 있습니다.

 

데이터 중심 파이프라인

  • 카프카 커넥트를 이용해 카프카로 데이터를 보내거나, 카프카로 데이터를 가져옴

유연성과 확장성

  • 커넥트는 테스트를 위한 단독 모드(standalone mode)와 대규모 운영 환경을 위한 분산 모드(distributed mode)를 제공

재사용성과 기능 확장

  • 커넥트는 기존 커넥터를 활용할 수도 있고 운영 환경에서의 요구사항에 맞춰 확장이 가능

편리한 운영과 관리

  • 카프카 커넥트가 제공하는 REST API로 빠르고 간단하게 커넥트 운영 가능
    • 좀 있다 실습에서 REST API를 이용해 Connect에 Connector 연결 요청 진행 예정.

장애 및 복구

  • 카프카 커넥트를 분산 모드로 실행하면 워커 노드의 장애 상황에도 메타데이터를 백업함으로써 대응 가능하며 고가용성 보장

Kafka Connect의 구성 요소

더보기

💡 Connect와 Connector의 차이

Connect

  • Connector를 동작하게 하기 위해 실행해주는 프로세스 

Connector

  • 실질적으로 Data Source(DB)의 데이터를 처리하는 코드가 들어있는 jar파일
  • 쉽게 말하면, 일련의 템플릿과 같은 특정 동작을 하는 코드 뭉치라고 볼 수 있다.
  • 파이프라인에 필요한 여러가지 동작, 설정, 실행하는 메서드를 포함. 

Connector

Source Connector

  • 프로듀서 역할을 하는 커넥터
  • 데이터베이스로부터 데이터를 가져와서 토픽에 넣는 역할.
  • 어렵게 말하자면, Source system 의 데이터를 브로커의 토픽으로 publish 하는 Connector

Sink Connector

  • 컨슈머 역할을 하는 커넥터
  • Kafka의 특정 Topic에 있는 데이터를 Oracle, MySQL, Elastic Search와 같은 특정 저장소에 저장을 하는 역할을 함.

위의 예시

  • Source Connector : MySQL 소스 커넥터, PostgreSQL 소스 커넥터
  • Sink Connector : ElasticSearch Sink Connector, …

Task

  • 데이터를 Kafka로 또는 Kafka에서 복사(전송)하는 방법 구현

Worker

  • Connector와 Task를 실행하는 프로세스

Worker - Standalone Mode

  • 단일 워커로 이루어진 카프카 커넥트
  • 테스트 및 개발 환경에서만 사용.

Worker - Distributed Mode

  • 다수의 워커로 이루어진 카프카 커넥트
    • group.id를 사용해 worker들을 클러스터링
  • 분산 모드는 특정 워커에 장애가 발생하더라도 다른 워커로 이동해 연속해서 동작 가능

Converter

  • 데이터 처리를 하기 전에 스키마를 변경하도록 도와준다.
    • 쉽게 얘기하면 보내거나, 받을 데이터를 원하는 형식으로 변환하는 역할.

Transform

  • Connector에서 생성하거나 Connector로 전송되는 메시지 내용 변환
    • 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용.

Connector 종류

Confluent Connector Portfolio | KR

실제 기업 Kafka Connect 살펴보기

  • MSA 아키텍처로 인해 각 서비스가 독립적으로 개발되고, 서비스(팀)마다 사용하는 DB가 다양해짐.
    • 신세계 개발팀 예시
      • Postgresql DB를 Command를 위한 DB로, Mongo DB를 View를 위한 DB로 사용
      • Postgresql DB와 Mongo DB 사이에 데이터 동기화에 Kafka Connect를 이용.
    • 네이버 쇼핑
      • MongoDB와 MySQL에 쌓이는 로그를 Kafka Connect를 이용해 Kafka로 전달
      • Kafka Connect를 이용해 다양한 이벤트들을 쉽게 모을 수 있고, 새로운 이벤트 Source가 생기더라도 Kafka Connect를 통해 쉽게 추가 가능.

Reference

Kafka Connect | Confluent Documentation

3) Connect & Connector | ML Engineer를 위한 MLOps

Kafka와 MongoDB, Kubernetes로 유연하고 확장 가능한 LINE 쇼핑 플랫폼 구축하기

[Kafka] Kafka Connect 개념/예제

Kafka Connector 이론 | Notion

카프카 커넥트 적용기