Kafka Connect

- 데이터베이스, 키-밸류 스토어와 같은 외부 시스템을 Kafka에 연결해 주는 구성 요소.
- Kafka와 외부 시스템 간의 연결을 쉽고, 간편하게 해준다.
- Consumer 와 Producer의 기능을 Code 작성 없이 설정만으로 구성할 수 있다.
Kafka connect의 도입 배경
Single Kafka System with Producer & Consumer

- 카프카(kafka)는 프로듀서(Producer)와 컨슈머(Consumer)를 통해 다양한 외부 시스템 데이터를 주고 받으며 메세지 파이프라인 아키텍쳐를 구성.
Multiple Kafka System with Producer & Consumer

- 하지만 이러한 파이프라인을 매번 구성하며 프로듀서와 컨슈머를 개발하는 것은 비용, 시간, 반복작업 측면에서 좋지 않습니다.
- 반복적인 파이프라인 생성 작업이 있을 때, 매번 프로듀서, 컨슈머 애플리케이션을 개발하고 배포, 운영해야 하기 때문이죠.
Multiple Kafka System with Connect & Connector

- Kafka Connect는 특정한 작업 형태를 템플릿으로 만들어놓은 커넥터(connector)를 실행함으로써 반복 작업을 줄일 수 있다.
실생활 예시

- 집 하나를 만들 때
- 집 템플릿이 있는 상진이는 템플릿을 가져와 바로 만듬.
- 태혁이는 레고를 하나하나 조립해 집 하나를 만듬.

- 하나하나 집을 만들어야 하는 태혁이와는 다르게 상진이는 집 템플릿이 있어서 편하고, 빠르게 마을 구성 가능!

- 위의 집처럼 Kafka Connect는 미리 정의된 Connector 템플릿을 통해 자신이 원하는 환경설정만 적어주기만 하면 반복적인 파이프라인을 쉽게 작성할 수 있음
코드 예시
Connector을 이용하는 경우
이제 Connector를 이용하는 경우 단순히 어떤 connector를 사용할 지, 어떤 topic에 넣을지, DB의 주소, 비밀번호는 뭔지 등을 설정하는 json 파일을 통해 쉽게 구성이 가능합니다.
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres-server:5433/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "boaz",
"topic.prefix": "postgres-source-",
"topic.creation.default.partitions": 1,
"topic.creation.default.replication.factor": 1,
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": 2
}
}
Python으로 연결하려는 경우
그러나 이 작업을 gpt한테 Python으로 작성해보라고 시켜봤는데요. python으로 연결하려면 아래와 같이 코드를 작성하는 복잡한 과정이 존재합니다.
import psycopg2
from kafka import KafkaProducer
# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic_prefix = 'postgres-source_'
# PostgreSQL configuration
db_host = 'postgres-server'
db_port = 5433
db_name = 'mydatabase'
db_user = 'myuser'
db_password = 'mypassword'
table_name = 'boaz'
incrementing_column = 'id'
# Connect to PostgreSQL
connection = psycopg2.connect(
host=db_host,
port=db_port,
dbname=db_name,
user=db_user,
password=db_password
)
cursor = connection.cursor()
# Initialize Kafka Producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# Query database and produce messages to Kafka
def produce_messages():
cursor.execute(f"SELECT * FROM {table_name}")
for row in cursor.fetchall():
key = str(row[0]).encode('utf-8') # Assuming the incrementing column is the first column
value = ','.join([str(item) for item in row]).encode('utf-8')
topic = topic_prefix + table_name
producer.send(topic, key=key, value=value)
producer.flush()
# Close database connection
def close_connection():
cursor.close()
connection.close()
# Run the producer
produce_messages()
# Close the connection
close_connection()
Kafka connect의 특징
Kafka connect는 대표적으로 다음과 같은 5가지 특징을 가지고 있습니다.
데이터 중심 파이프라인
- 카프카 커넥트를 이용해 카프카로 데이터를 보내거나, 카프카로 데이터를 가져옴
유연성과 확장성
- 커넥트는 테스트를 위한 단독 모드(standalone mode)와 대규모 운영 환경을 위한 분산 모드(distributed mode)를 제공
재사용성과 기능 확장
- 커넥트는 기존 커넥터를 활용할 수도 있고 운영 환경에서의 요구사항에 맞춰 확장이 가능
편리한 운영과 관리
- 카프카 커넥트가 제공하는 REST API로 빠르고 간단하게 커넥트 운영 가능
- 좀 있다 실습에서 REST API를 이용해 Connect에 Connector 연결 요청 진행 예정.
장애 및 복구
- 카프카 커넥트를 분산 모드로 실행하면 워커 노드의 장애 상황에도 메타데이터를 백업함으로써 대응 가능하며 고가용성 보장
Kafka Connect의 구성 요소
더보기
💡 Connect와 Connector의 차이
Connect
- Connector를 동작하게 하기 위해 실행해주는 프로세스
Connector
- 실질적으로 Data Source(DB)의 데이터를 처리하는 코드가 들어있는 jar파일
- 쉽게 말하면, 일련의 템플릿과 같은 특정 동작을 하는 코드 뭉치라고 볼 수 있다.
- 파이프라인에 필요한 여러가지 동작, 설정, 실행하는 메서드를 포함.
Connector

Source Connector
- 프로듀서 역할을 하는 커넥터
- 데이터베이스로부터 데이터를 가져와서 토픽에 넣는 역할.
- 어렵게 말하자면, Source system 의 데이터를 브로커의 토픽으로 publish 하는 Connector
Sink Connector
- 컨슈머 역할을 하는 커넥터
- Kafka의 특정 Topic에 있는 데이터를 Oracle, MySQL, Elastic Search와 같은 특정 저장소에 저장을 하는 역할을 함.

위의 예시
- Source Connector : MySQL 소스 커넥터, PostgreSQL 소스 커넥터
- Sink Connector : ElasticSearch Sink Connector, …
Task
- 데이터를 Kafka로 또는 Kafka에서 복사(전송)하는 방법 구현
Worker
- Connector와 Task를 실행하는 프로세스
Worker - Standalone Mode

- 단일 워커로 이루어진 카프카 커넥트
- 테스트 및 개발 환경에서만 사용.
Worker - Distributed Mode

- 다수의 워커로 이루어진 카프카 커넥트
- group.id를 사용해 worker들을 클러스터링
- 분산 모드는 특정 워커에 장애가 발생하더라도 다른 워커로 이동해 연속해서 동작 가능
Converter
- 데이터 처리를 하기 전에 스키마를 변경하도록 도와준다.
- 쉽게 얘기하면 보내거나, 받을 데이터를 원하는 형식으로 변환하는 역할.
Transform
- Connector에서 생성하거나 Connector로 전송되는 메시지 내용 변환
- 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용.
Connector 종류

Confluent Connector Portfolio | KR
실제 기업 Kafka Connect 살펴보기
- MSA 아키텍처로 인해 각 서비스가 독립적으로 개발되고, 서비스(팀)마다 사용하는 DB가 다양해짐.
- 신세계 개발팀 예시
- Postgresql DB를 Command를 위한 DB로, Mongo DB를 View를 위한 DB로 사용
- Postgresql DB와 Mongo DB 사이에 데이터 동기화에 Kafka Connect를 이용.
- 네이버 쇼핑
- MongoDB와 MySQL에 쌓이는 로그를 Kafka Connect를 이용해 Kafka로 전달
- Kafka Connect를 이용해 다양한 이벤트들을 쉽게 모을 수 있고, 새로운 이벤트 Source가 생기더라도 Kafka Connect를 통해 쉽게 추가 가능.
- 신세계 개발팀 예시
Reference
Kafka Connect | Confluent Documentation
3) Connect & Connector | ML Engineer를 위한 MLOps
'데이터엔지니어링' 카테고리의 다른 글
서버리스(Serverless) 데이터 파이프라인 구축기 (2) | 2025.01.31 |
---|---|
[데이터엔지니어링] Docker 개념 (2) | 2024.05.03 |
[데이터엔지니어링] Apache Kafka 개념 알아보기 (2) | 2024.04.04 |
[데이터엔지니어링] Airflow 실습(2) - Airflow 간단 설명 (1) | 2024.01.28 |
[데이터엔지니어링] Airflow 실습(1) (0) | 2023.11.29 |