✅ MongoDB Sink Connector 설정에 유용한 주요 옵션들

1 min read

✅ MongoDB Sink Connector 설정에 유용한 주요 옵션들

설명
connection.uri필수. MongoDB 접속 주소 (mongodb://user:pass@host:port/?authSource=…)
topicsKafka에서 수신할 토픽 이름 (쉼표로 여러 개 가능)
databaseMongoDB 내 사용할 데이터베이스 이름
collectionMongoDB 내 컬렉션 이름
tasks.max병렬 태스크 수 (성능 확장용)
writemodel.strategyReplaceOneDefaultStrategy, InsertOneDefaultStrategy 등 쓰기 전략
document.id.strategy문서의 _id 생성 방식 (예: BsonOidStrategy, ProvidedInKeyStrategy, FullKeyStrategy)
document.id.strategy.overwrite.existing기존 문서를 덮어쓸지 여부 (true/false)
transformsKafka 메시지 가공용 SMT(Single Message Transform) 이름
transforms.X.typeSMT 타입 (HoistField, ExtractField, ValueToKey, 등)
errors.tolerance오류 허용 수준 (none, all)
errors.deadletterqueue.topic.name에러 메시지를 보낼 DLQ 토픽 이름
errors.deadletterqueue.context.headers.enableDLQ 메시지에 원본 메타데이터 포함 여부
value.converterKafka 메시지 value 처리 방식 (JsonConverter, AvroConverter, 등)
value.converter.schemas.enable스키마 포함 여부 (false 권장)
key.converterKafka 메시지 key 처리 방식
key.converter.schemas.enablekey에도 스키마 붙일지 여부
delete.on.null.values메시지 값이 null일 때 Mongo 문서 삭제 여부 (true 시 tombstone 처리됨)
max.batch.sizeMongoDB에 한번에 쓸 레코드 수
max.buffer.size버퍼에 보관할 레코드 수 (flush 전)
rate.limiting.timeout쓰기 실패 시 재시도 시간 제한 (ms)
rate.limiting.every.n지정 수만큼 문서마다 지연 (백프레셔 대응)

✅ SMT(Single Message Transform) 관련 예시

MongoDB Sink에서 자주 쓰는 SMT 설정 예시도 함께 알려드릴게요:

메시지의 payload 필드만 MongoDB에 넣고 싶을 때:

"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "payload"

메시지의 특정 필드(예: id)를 Mongo 문서 _id로 쓰고 싶을 때:

"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "id",
"document.id.strategy.partial.value.projection.type": "AllowList"

✅ 샘플: 전체 예시


{
  // MongoDB Kafka Sink Connector 클래스 지정
  "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

  // 기존 문서가 있을 경우 덮어쓰기 설정
  "document.id.strategy.overwrite.existing": "true",

  // MongoDB에 저장할 때 ReplaceOne 방식 사용 (기존 문서를 교체)
  "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",

  // JSON payload를 루트로 감싸는 HoistField 변환기 설정
  "transforms.HoistField.field": "payload",

  // 최대 태스크 수 지정 (1개만 사용하여 단일 처리)
  "tasks.max": "1",

  // Kafka에서 수신할 토픽 이름
  "topics": "user-tp",

  // 사용할 Kafka Connect 변환(transform) 체인 설정
  "transforms": "HoistField",

  // HoistField 변환기 타입 지정 (value 기준으로 작동)
  "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",

  // MongoDB에 저장할 컬렉션 이름
  "collection": "user",

  // DLQ(Dead Letter Queue) 전송 시 헤더 정보 포함 여부
  "errors.deadletterqueue.context.headers.enable": "true",

  // Kafka 메시지 key에 스키마 포함 여부 (false로 설정하여 스키마 생략)
  "key.converter.schemas.enable": "false",

  // MongoDB에 저장할 DB 이름
  "database": "portal",

  // 문서 ID 전략: MongoDB ObjectId 사용
  "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",

  // DLQ 토픽 이름 설정 (에러 발생 시 메시지 저장 위치)
  "errors.deadletterqueue.topic.name": "dlq-users",

  // MongoDB 접속 URI (보안을 위해 값은 빈 문자열로 비워둠 — 실제 운영 시 연결 정보 입력 필요)
  "connection.uri": "",

  // Kafka 메시지 value에 스키마 포함 여부 (false로 설정하여 스키마 생략)
  "value.converter.schemas.enable": "false",

  // 이 커넥터 인스턴스의 이름 (Kafka Connect 내부 식별자)
  "name": "mongodb-sink-user",

  // 오류 허용 수준: 모든 오류 허용 (DLQ로 전송)
  "errors.tolerance": "all",

  // DLQ 토픽의 복제 팩터 (브로커 수가 1개인 경우 1로 설정)
  "errors.deadletterqueue.topic.replication.factor": "1",

  // Kafka 메시지 value 변환기 (JSON 사용)
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",

  // Kafka 메시지 key 변환기 (JSON 사용)
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"
}

루아 Lua 프로그래밍 : 모듈과 패키지 가이드

지금까지 우리는 함수로 코드를 묶고, 테이블로 데이터를 구조화하는 방법을 익혔습니다. 하지만 프로젝트의 규모가 커지기 시작하면, 모든 코드를 단 하나의 파일에 담는 것은 금세 한계에...
eve
53 sec read

루아 (Lua) 프로그래밍: 테이블과 메타테이블의 모든 것

Lua 프로그래밍의 여정에서 가장 중요하고 흥미로운 지점에 도달했습니다. 바로 Lua 언어의 심장이자 가장 중심적인 기능인 테이블(Table)입니다. Lua에는 배열, 딕셔너리, 리스트, 객체 등을 위한 별도의...
eve
1 min read

루아(Lua) 프로그래밍: 제어 구조 조건과 반복

지금까지 우리는 변수에 데이터를 저장하고, 연산자로 이 데이터들을 계산하고 비교하는 방법을 배웠습니다. 하지만 프로그램이 단순히 위에서 아래로 순서대로만 실행된다면, 매우 단순한 작업밖에 할 수...
eve
1 min read