home..

Python Kafka Producer(feat.Go)

kafka.KafkaProducer는 kafka 클러스터에 record를 publish하는 Kafka 클라이언트이다.
보통 Producer는 여러 스레드에서 하나의 인스턴스를 공유하는 것이 여러 인스턴스를 생성해서 사용하는 것보다 빠르다.

Kafka Producer는 record를 저장하는 buffer 공간과 클러스터로 record를 전송하는 backgroud I/O 스레드로 구성된다.

Buffer usage

정해진 batch_size만큼 producer는 각 파티션의 아직 전송되지 않은 record를 buffer에 유지한다. 사이즈를 크게 하면 한번에 더 많은 데이터를 보낼 수 있으나, 메모리 사용량이 늘어날 수 있다. default는 16KiB(16384)이다.

초기화할 때 kafka.KafkaProducer 내부적으로 ` self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)를 실행하여 RecordAccumulator` 인스턴스로 서버로 전송할 메시지로 큐로 관리한다.

메시지를 발행할 때 producer.send(self.topic, msg)를 실행한다.

send 로직은 대략 아래와 같다.

    """
    Returns:
                FutureRecordMetadata: resolves to RecordMetadata
    """            
            partition = self._partition(topic, partition, key, value,
                                        key_bytes, value_bytes)
            ...

            tp = TopicPartition(topic, partition)
            ...
            result = self._accumulator.append(tp, timestamp_ms,
                                              key_bytes, value_bytes, headers,
                                              self.config['max_block_ms'],
                                              estimated_size=message_size)
            future, batch_is_full, new_batch_created = result
            if batch_is_full or new_batch_created:
                log.debug("Waking up the sender since %s is either full or getting a new batch", tp)
                self._sender.wakeup()

            return future

전송할 메시지와 그에 할당된 토픽 파티션을 매개변수로 send를 실행한다.

self._accumulator.append는 메시지를 추가하고 결과를 반환한다. 결과에는 future metadata가 포함되어 배치가 꽉 찼는지, 배치가 새로 생성되었는지에 대한 flag를 포함한다. 대략적인 로직은 아래와 같다.

        """
        Returns:
                tuple: (future, batch_is_full, new_batch_created)
        """         
        try:
            # 토픽 파티션에 lock을 걸어준다
            if tp not in self._tp_locks:
                with self._tp_locks[None]:
                    if tp not in self._tp_locks:
                        self._tp_locks[tp] = threading.Lock()

            with self._tp_locks[tp]:
                # 현재 진행중인 배치가 있는지 확인하여 적용한다.
                dq = self._batches[tp]
                if dq:
                    last = dq[-1]
                    future = last.try_append(timestamp_ms, key, value, headers)
                    # future metadata가 있으면 바로 결과를 반환한다.
                    if future is not None:
                        batch_is_full = len(dq) > 1 or last.records.is_full()
                        return future, batch_is_full, False

            # 메모리를 할당한다.
            size = max(self.config['batch_size'], estimated_size)
            log.debug("Allocating a new %d byte message buffer for %s", size, tp)
            buf = self._free.allocate(size, max_time_to_block_ms)
            with self._tp_locks[tp]:
                # dequeue lock을 획득한 이후에 producer가 닫혔는지 확인한다.
                assert not self._closed, 'RecordAccumulator is closed'

                if dq:
                    last = dq[-1]
                    future = last.try_append(timestamp_ms, key, value, headers)
                    if future is not None:
                        # future metadata가 있으면 메모리를 해제하고 결과를 반환한다.
                        self._free.deallocate(buf)
                        batch_is_full = len(dq) > 1 or last.records.is_full()
                        return future, batch_is_full, False

                records = MemoryRecordsBuilder(
                    self.config['message_version'],
                    self.config['compression_attrs'],
                    self.config['batch_size']
                )
                
                # 현재 진행 중인 배치가 없으므로(if dq에서 걸러짐) 배치를 새로 생성하여 dequeue에 추가한다.
                batch = ProducerBatch(tp, records, buf)
                future = batch.try_append(timestamp_ms, key, value, headers)
                if not future:
                    raise Exception()

                dq.append(batch)
                self._incomplete.add(batch)
                batch_is_full = len(dq) > 1 or batch.records.is_full()
                return future, batch_is_full, True
        finally:
            self._appends_in_progress.decrement()

Kafka producer in Go

Go에서는 kafka producer가 cgo를 기반으로 구현되어 있다.

기본적인 produce는 아래는 배치가 아닌 단일 메시지를 발행하는 메서드다. 내부적으로 큐에 메시지를 추가하고 바로 리턴하는 방식으로 비동기적으로 호출된다.
메시지 배치 발행은 Producer.produceBatch를 적용해야 할 것으로 보인다.

/* kafka.Producer */
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
	err := p.verifyClient()
	if err != nil {
		return err
	}
	return p.produce(msg, 0, deliveryChan)
}

func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
	if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
		return newErrorFromString(ErrInvalidArg, "")
	}

	crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
	var valp []byte
	var keyp []byte
	oneByte := []byte{0}
	var valIsNull C.int
	var keyIsNull C.int
	var valLen int
	var keyLen int

	/* if msg.Value == nil {
        ...
    */

	/* if msg.Key == nil {
        ...
    */

	var cgoid int

	if deliveryChan != nil || msg.Opaque != nil {
        // cgoPut은 cgo map에 객체를 추가하고 고유 id를 반환한다.(아래)
		cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
	}

	var timestamp int64
	if !msg.Timestamp.IsZero() {
		timestamp = msg.Timestamp.UnixNano() / 1000000
	}

	var tmphdrs []C.tmphdr_t
	tmphdrsCnt := len(msg.Headers)

	if tmphdrsCnt > 0 {
		tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)

		for n, hdr := range msg.Headers {
            // C.CString : go string을 *C.char로 반환, C heap에 생성되며 메모리 해제 필요
			tmphdrs[n].key = C.CString(hdr.Key) // copy
			if hdr.Value != nil {
				tmphdrs[n].size = C.ssize_t(len(hdr.Value))
				if tmphdrs[n].size > 0 {
                    // C.CBytes : []byte를 unsafe.Pointer로 반환, C array는 C heap에 생성되며 메모리 해제 필요
					tmphdrs[n].val = C.CBytes(hdr.Value)  // copy
				}
			} else {
				// null value
				tmphdrs[n].size = C.ssize_t(-1)
			}
		}
	} else {
		tmphdrs = []C.tmphdr_t
	}

    // unsafe Pointer는 Go에서 임의의 메모리 조작을 허용하는 것으로 C 포인터와 사용과 관련있다.
	cErr := C.do_produce(p.handle.rk, crkt,
		C.int32_t(msg.TopicPartition.Partition),
		C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
		valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
		keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
		C.int64_t(timestamp),
		(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
		(C.uintptr_t)(cgoid))
	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
		if cgoid != 0 {
			p.handle.cgoGet(cgoid)
		}
		return newError(cErr)
	}
	return nil
}

/* kafka.handle */
func (h *handle) cgoPut(cg cgoif) (cgoid int) {
	h.cgoLock.Lock()
	defer h.cgoLock.Unlock()

	h.cgoidNext++
	if h.cgoidNext == 0 {
		h.cgoidNext++
	}
	cgoid = (int)(h.cgoidNext)
	h.cgomap[cgoid] = cg
	return cgoid
}

Send message

kafka.KafkaProducer를 초기화할 때 메시지를 전송하기 위한 Sender 인스턴스를 생성하면서 비동기 요청 및 응답에 대응하는 네트워크 I/O를 위한 KafkaClient를 생성한다.

        client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
                                    wakeup_timeout_ms=self.config['max_block_ms'],
                                    **self.config)
        ...
        self._sender = Sender(client, self._metadata,
                              self._accumulator, self._metrics,
                              guarantee_message_order=guarantee_message_order,
                              **self.config)

KafkaClient는 특정 노드에 요청을 보내는 클래스이고, 실제 네트워크 I/O는 socket으로 데이터를 read/write하는 함수인 .poll() 호출로 발생한다.

KafkaClient.send 내부적으로 다음과 같이 명시되어 있다. 네트워크 I/O는 send_pending_requests를 호출하여 트리거하는데, 실제로는 send_pending_requests_v2를 실행하는 것으로 보인다.

        # conn.send will queue the request internally
        # we will need to call send_pending_requests()
        # to trigger network I/O
        future = conn.send(request, blocking=False)
        self._sending.add(conn)

send_pending_requests_v2는 non-blocking I/O를 통해 요청을 전송한다. non-block I/O 방식으로 처리가 완료되지 않으면 에러를 발생시켜 block 상태를 만들지 않는다.

 def send_pending_requests_v2(self):
        """Attempts to send pending requests messages via non-blocking IO
        If all requests have been sent, return True
        Otherwise, if the socket is blocked and there are more bytes to send,
        return False.
        """
        try:
            with self._lock:
                if not self._can_send_recv():
                    return False

                # _protocol.send_bytes()에서 인코딩된 요청을 반환하고, 실제로 _send_bytes()로 전송한다. 남은 바이트는 _send_buffer로 홀드
                if not self._send_buffer:
                    self._send_buffer = self._protocol.send_bytes()

                total_bytes = 0
                if self._send_buffer:
                    total_bytes = self._send_bytes(self._send_buffer)
                    self._send_buffer = self._send_buffer[total_bytes:]

            if self._sensors:
                self._sensors.bytes_sent.record(total_bytes)
            # Return True iff send buffer is empty
            return len(self._send_buffer) == 0

        except (ConnectionError, TimeoutError, Exception) as e:
            log.exception("Error sending request data to %s", self)
            error = Errors.KafkaConnectionError("%s: %s" % (self, e))
            self.close(error=error)
            return False

Reference

© 2025 Yujin Lee   •  Powered by Soopr   •  Theme  Moonwalk