Python Kafka Producer(feat.Go)
March 2023 (2022 Words, 12 Minutes)
는 kafka 클러스터에 record를 publish하는 Kafka 클라이언트이다.
보통 Producer는 여러 스레드에서 하나의 인스턴스를 공유하는 것이 여러 인스턴스를 생성해서 사용하는 것보다 빠르다.
Kafka Producer는 record를 저장하는 buffer 공간과 클러스터로 record를 전송하는 backgroud I/O 스레드로 구성된다.
Buffer usage
정해진 batch_size
만큼 producer는 각 파티션의 아직 전송되지 않은 record를 buffer에 유지한다. 사이즈를 크게 하면 한번에 더 많은 데이터를 보낼 수 있으나, 메모리 사용량이 늘어날 수 있다. default는 16KiB(16384)이다.
초기화할 때 kafka.KafkaProducer
내부적으로 ` self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)를 실행하여
RecordAccumulator` 인스턴스로 서버로 전송할 메시지로 큐로 관리한다.
메시지를 발행할 때 producer.send(self.topic, msg)
를 실행한다.
로직은 대략 아래와 같다.
FutureRecordMetadata: resolves to RecordMetadata
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
tp = TopicPartition(topic, partition)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers,
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or getting a new batch", tp)
return future
전송할 메시지와 그에 할당된 토픽 파티션을 매개변수로 send
를 실행한다.
는 메시지를 추가하고 결과를 반환한다. 결과에는 future metadata가 포함되어 배치가 꽉 찼는지, 배치가 새로 생성되었는지에 대한 flag를 포함한다. 대략적인 로직은 아래와 같다.
tuple: (future, batch_is_full, new_batch_created)
# 토픽 파티션에 lock을 걸어준다
if tp not in self._tp_locks:
with self._tp_locks[None]:
if tp not in self._tp_locks:
self._tp_locks[tp] = threading.Lock()
with self._tp_locks[tp]:
# 현재 진행중인 배치가 있는지 확인하여 적용한다.
dq = self._batches[tp]
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
# future metadata가 있으면 바로 결과를 반환한다.
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
# 메모리를 할당한다.
size = max(self.config['batch_size'], estimated_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp)
buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
# dequeue lock을 획득한 이후에 producer가 닫혔는지 확인한다.
assert not self._closed, 'RecordAccumulator is closed'
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
# future metadata가 있으면 메모리를 해제하고 결과를 반환한다.
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
records = MemoryRecordsBuilder(
# 현재 진행 중인 배치가 없으므로(if dq에서 걸러짐) 배치를 새로 생성하여 dequeue에 추가한다.
batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()
batch_is_full = len(dq) > 1 or batch.records.is_full()
return future, batch_is_full, True
Kafka producer in Go
Go에서는 kafka producer가 cgo를 기반으로 구현되어 있다.
기본적인 produce
는 아래는 배치가 아닌 단일 메시지를 발행하는 메서드다. 내부적으로 큐에 메시지를 추가하고 바로 리턴하는 방식으로 비동기적으로 호출된다.
메시지 배치 발행은 Producer.produceBatch
를 적용해야 할 것으로 보인다.
/* kafka.Producer */
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
err := p.verifyClient()
if err != nil {
return err
return p.produce(msg, 0, deliveryChan)
func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
return newErrorFromString(ErrInvalidArg, "")
crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
var valp []byte
var keyp []byte
oneByte := []byte{0}
var valIsNull
var keyIsNull
var valLen int
var keyLen int
/* if msg.Value == nil {
/* if msg.Key == nil {
var cgoid int
if deliveryChan != nil || msg.Opaque != nil {
// cgoPut은 cgo map에 객체를 추가하고 고유 id를 반환한다.(아래)
cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
var timestamp int64
if !msg.Timestamp.IsZero() {
timestamp = msg.Timestamp.UnixNano() / 1000000
var tmphdrs []C.tmphdr_t
tmphdrsCnt := len(msg.Headers)
if tmphdrsCnt > 0 {
tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
for n, hdr := range msg.Headers {
// C.CString : go string을 *C.char로 반환, C heap에 생성되며 메모리 해제 필요
tmphdrs[n].key = C.CString(hdr.Key) // copy
if hdr.Value != nil {
tmphdrs[n].size = C.ssize_t(len(hdr.Value))
if tmphdrs[n].size > 0 {
// C.CBytes : []byte를 unsafe.Pointer로 반환, C array는 C heap에 생성되며 메모리 해제 필요
tmphdrs[n].val = C.CBytes(hdr.Value) // copy
} else {
// null value
tmphdrs[n].size = C.ssize_t(-1)
} else {
tmphdrs = []C.tmphdr_t
// unsafe Pointer는 Go에서 임의의 메모리 조작을 허용하는 것으로 C 포인터와 사용과 관련있다.
cErr := C.do_produce(p.handle.rk, crkt,
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
if cgoid != 0 {
return newError(cErr)
return nil
/* kafka.handle */
func (h *handle) cgoPut(cg cgoif) (cgoid int) {
defer h.cgoLock.Unlock()
if h.cgoidNext == 0 {
cgoid = (int)(h.cgoidNext)
h.cgomap[cgoid] = cg
return cgoid
Send message
를 초기화할 때 메시지를 전송하기 위한 Sender
인스턴스를 생성하면서 비동기 요청 및 응답에 대응하는 네트워크 I/O를 위한 KafkaClient
를 생성한다.
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
는 특정 노드에 요청을 보내는 클래스이고, 실제 네트워크 I/O는 socket으로 데이터를 read/write하는 함수인 .poll()
호출로 발생한다.
내부적으로 다음과 같이 명시되어 있다. 네트워크 I/O는 send_pending_requests
를 호출하여 트리거하는데, 실제로는 send_pending_requests_v2
를 실행하는 것으로 보인다.
# conn.send will queue the request internally
# we will need to call send_pending_requests()
# to trigger network I/O
future = conn.send(request, blocking=False)
는 non-blocking I/O를 통해 요청을 전송한다. non-block I/O 방식으로 처리가 완료되지 않으면 에러를 발생시켜 block 상태를 만들지 않는다.
def send_pending_requests_v2(self):
"""Attempts to send pending requests messages via non-blocking IO
If all requests have been sent, return True
Otherwise, if the socket is blocked and there are more bytes to send,
return False.
with self._lock:
if not self._can_send_recv():
return False
# _protocol.send_bytes()에서 인코딩된 요청을 반환하고, 실제로 _send_bytes()로 전송한다. 남은 바이트는 _send_buffer로 홀드
if not self._send_buffer:
self._send_buffer = self._protocol.send_bytes()
total_bytes = 0
if self._send_buffer:
total_bytes = self._send_bytes(self._send_buffer)
self._send_buffer = self._send_buffer[total_bytes:]
if self._sensors:
# Return True iff send buffer is empty
return len(self._send_buffer) == 0
except (ConnectionError, TimeoutError, Exception) as e:
log.exception("Error sending request data to %s", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
return False