[Python] Kafka offset 확인
2019. 8. 17. 16:03ㆍ꿀팁 분석 환경 설정/Kafka
Producer
from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
key_serializer = None ,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
##
for _ in range(100):
value = np.random.normal(loc = 10 , scale = 20 ,size= 3).astype(str).tolist()
values = ",".join( value )
data = {'number' : values }
producer.send('numtest', value= data )
sleep(0.1)
python-kafka에서는 Consumer에서 group_id를 이용하면 offset을 지정 가능하다.
실험 결과
1. Consumer를 껐다 켜었다 해도 되는지? -> 됨
2. group_id 다르게 하면? -> offset 다시 시작
3. groupd_id 다시 원래대로 하면? -> groupd_id에 맞게 다시 시작함
4. 같은 group_id 다른 topic일 때는? offset이 어떻게 될까? -> 메시지가 안 읽힘
5. 다시 원래 topic으로 보내면? -> offset 유지돼서 보내짐
from kafka import KafkaConsumer
from json import loads
print("Consumer 연결")
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit= True ,
group_id='sr',
value_deserializer=lambda x: loads(x.decode('utf-8')))
topic 확인
consumer.topics()
## {'my', 'my-topic', 'my_topic', 'numtest'}
사용할 topic 지정
consumer.subscribe("numtest")
어떤 토픽 사용할지?
consumer.subscription()
## {'numtest'}
메시지 읽기
for message in consumer:
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
조금 사이즈를 늘려서 다시 Producer에서 Topic으로 보내기
Consumer 부분 껐다 켜도 같은 group_id 일 때는 유지가 될지?!
> 유지가 됨 (offset =93으로 들어온 순서 정보 가지고 있음 개수도 2개 다르게 수정된 거 방영됨)
같은 group_id 다른 topic이면?
다시 시작하는 것을 알 수 있음!
위에서는 topic numtest
지금은 topic numtest2
for _ in range(100):
value = np.random.normal(loc = 10 , scale = 20 ,size= 2).astype(str).tolist()
values = ",".join( value )
data = {'number' : values }
producer.send('numtest2', value= data )
sleep(0.1)
다시 원래 topic(numtest)에서 보낸다면?
> offset 유지되면서 보내짐 다만 먼가 조금 찾느라고 시간이 걸리게 됨
(추가) 문서에는 multiple Consumer가 있어서 해보려고 하는데 실패
https://kafka-python.readthedocs.io/en/master/usage.html
from kafka import KafkaConsumer
from json import loads
consumer1 = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
#auto_offset_reset='earliest' , # 'earliest',
#enable_auto_commit= False ,
group_id='sr',
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer1.subscribe("numtest")
consumer2 = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
#auto_offset_reset='earliest' , # 'earliest',
#enable_auto_commit= False ,
group_id='sr',
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer2.subscribe("numtest")
from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
key_serializer = None ,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
def on_send_success(record_metadata):
print("topic : {} , partition : {} , offset : {}".\
format( record_metadata.topic , record_metadata.partition , record_metadata.offset))
for msg , message in zip(consumer1 , consumer2) :
print("="*50)
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (msg.topic, msg.partition,
msg.offset, msg.key,
msg.value))
key = str(message.offset) + " " + str(msg.offset)
producer.send('output', value= { key : key } ).add_callback(on_send_success)
print("="*50)
> 결과물이 보이지도 않고 send도 안됨!
728x90
'꿀팁 분석 환경 설정 > Kafka' 카테고리의 다른 글
kafka 자주 사용 명령어 모음 (0) | 2019.08.17 |
---|---|
[ Python ] kafka consume multiprocessing 해보기 (0) | 2019.08.17 |
카프카 데이터 플랫폼의 최강자 (1장 ~50pg) (0) | 2019.08.17 |
Celery란? 좋은 자료 (1) | 2019.08.10 |
python-Kafka Example (0) | 2019.08.06 |