[Python] Kafka offset 확인

2019. 8. 17. 16:03꿀팁 분석 환경 설정/Kafka

728x90

Producer

from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         key_serializer = None ,
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
## 
for _ in range(100):
    value = np.random.normal(loc = 10 , scale = 20 ,size= 3).astype(str).tolist()
    values = ",".join( value )
    data = {'number' : values }
    producer.send('numtest', value= data )
    sleep(0.1)

 

python-kafka에서는 Consumer에서 group_id를 이용하면 offset을 지정 가능하다.

 

실험 결과

1. Consumer를 껐다 켜었다 해도 되는지? -> 됨

2. group_id 다르게 하면? -> offset 다시 시작

3. groupd_id 다시 원래대로 하면? -> groupd_id에 맞게 다시 시작함

4. 같은 group_id 다른 topic일 때는? offset이 어떻게 될까? -> 메시지가 안 읽힘

5. 다시 원래 topic으로 보내면? -> offset 유지돼서 보내짐

 

 

from kafka import KafkaConsumer
from json import loads

print("Consumer 연결")
consumer = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit= True ,
     group_id='sr',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

topic 확인

consumer.topics()
## {'my', 'my-topic', 'my_topic', 'numtest'}

사용할 topic 지정

consumer.subscribe("numtest")

 

어떤 토픽 사용할지?

consumer.subscription()

## {'numtest'}

메시지 읽기

for message in consumer:
    print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

메시지 보내면 다음과 같이 
마지막 offset = 40

조금 사이즈를 늘려서 다시 Producer에서 Topic으로 보내기

offset = 41  보내는 값이 다른 것을 알 수 있음!

Consumer 부분 껐다 켜도 같은 group_id 일 때는 유지가 될지?!

> 유지가 됨 (offset =93으로 들어온 순서 정보 가지고 있음 개수도 2개 다르게 수정된 거 방영됨)

같은 group_id 다른 topic이면?

다시 시작하는 것을 알 수 있음!

위에서는 topic numtest
지금은 topic numtest2

for _ in range(100):
    value = np.random.normal(loc = 10 , scale = 20 ,size= 2).astype(str).tolist()
    values = ",".join( value )
    data = {'number' : values }
    producer.send('numtest2', value= data )
    sleep(0.1)

다시 원래 topic(numtest)에서 보낸다면?

> offset 유지되면서 보내짐 다만 먼가 조금 찾느라고 시간이 걸리게 됨

 

(추가) 문서에는 multiple Consumer가 있어서 해보려고 하는데 실패

 

https://kafka-python.readthedocs.io/en/master/usage.html

 

Usage — kafka-python 1.4.7.dev documentation

© Copyright 2016 -- Dana Powers, David Arthur, and Contributors Revision 279a7dd8.

kafka-python.readthedocs.io

from kafka import KafkaConsumer
from json import loads
consumer1 = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     #auto_offset_reset='earliest' , # 'earliest',
     #enable_auto_commit= False ,
     group_id='sr',
     value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer1.subscribe("numtest")
consumer2 = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     #auto_offset_reset='earliest' , # 'earliest',
     #enable_auto_commit= False ,
     group_id='sr',
     value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer2.subscribe("numtest")

from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         key_serializer = None ,                         
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
def on_send_success(record_metadata):
    print("topic : {} , partition : {} , offset : {}".\
          format( record_metadata.topic , record_metadata.partition , record_metadata.offset))

for msg , message in zip(consumer1 , consumer2) :
    print("="*50)
    print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
    print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (msg.topic, msg.partition,
                                          msg.offset, msg.key,
                                          msg.value))
    key = str(message.offset) + " " + str(msg.offset)
    producer.send('output', value= {  key : key }  ).add_callback(on_send_success)
    print("="*50)

> 결과물이 보이지도 않고 send도 안됨!

728x90