[ Python ] kafka consume multiprocessing 해보기
2019. 8. 17. 17:10ㆍ꿀팁 분석 환경 설정/Kafka
https://stackoverflow.com/questions/46491616/python-kafka-multiprocess-vs-thread
같은 group_id를 가지면서( offset 순차적 ) 같은 토픽에서 정보 가져오기
이렇게 하면 얻는 효과는 더 빨리 메시지를 읽는 것?
from kafka import KafkaConsumer
from json import loads
class KafkaWrapper():
def __init__(self):
self.consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
group_id='sr',
value_deserializer=lambda x: loads(x.decode('utf-8')))
def consume(self, topic):
self.consumer.subscribe(topic)
for message in self.consumer:
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic):
self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
from multiprocessing import Process
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread here instead of Process
jobs.append(Process(target=serviceA.start, args=("numtest",)))
jobs.append(Process(target=serviceB.start, args=("numtest",)))
for job in jobs:
job.start()
for job in jobs:
job.join()
if __name__ == "__main__":
main()
서로 다른 토픽에서 데이터 불러오기
from kafka import KafkaConsumer
from json import loads
from json import dumps
from kafka import KafkaProducer
class KafkaWrapper():
def __init__(self,) :
self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
def on_send_success(self, record_metadata):
print("topic : {} , partition : {} , offset : {}".\
format( record_metadata.topic , record_metadata.partition , record_metadata.offset))
def consume(self, topic , group):
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
group_id=group,
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer.subscribe(topic)
for message in consumer:
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
data = {group : str(message.offset)}
#self.producer.send( group + "A", value = data )
#.add_callback(on_send_success)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic , group):
self.kafka_wrapper.consume(topic , group)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
from multiprocessing import Process
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread here instead of Process
jobs.append(Process(target=serviceA.start, args=("my_topic","SR1",))) # my_topic
jobs.append(Process(target=serviceB.start, args=("my-topic","SR2",))) # my-topic
for job in jobs:
job.start()
for job in jobs:
job.join()
if __name__ == "__main__":
main()
그냥 들어온 순서대로 처리하는 것 같음
from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(200):
if i % 2 == 0 :
value = np.random.normal(loc = 10 , scale = 20 ,size= 5).astype(str).tolist()
values = ",".join( value )
data = {'number_1' : values }
producer.send('my_topic', value=data)
else :
value = np.random.normal(loc = 0 , scale = 10 ,size= 5).astype(str).tolist()
values = ",".join( value )
data = {'number_2' : values }
producer.send('my-topic', value=data)
하지만 아직 이것을 처리한 다음에 다시 Producer send 하는 것이 안됨.........ㅠ
안에다가 Producer를 정의해주면 된다.
from kafka import KafkaConsumer
from json import loads
from json import dumps
from kafka import KafkaProducer
def on_send_success(record_metadata):
print("topic : {} , partition : {} , offset : {}".\
format( record_metadata.topic , record_metadata.partition , record_metadata.offset))
class KafkaWrapper():
def __init__(self,) :
self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
def consume(self, topic , group):
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
group_id=group,
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer.subscribe(topic)
for message in consumer:
print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
data = {group : str(message.offset)}
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
producer.send( "result", value = data ).add_callback(on_send_success)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic , group):
self.kafka_wrapper.consume(topic , group)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
from multiprocessing import Process
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread here instead of Process
jobs.append(Process(target=serviceA.start, args=("my_topic","SR1",))) # my_topic
jobs.append(Process(target=serviceB.start, args=("my-topic2","SR3",))) # my-topic
for job in jobs:
job.start()
for job in jobs:
job.join()
if __name__ == "__main__":
main()
음 어떻게 활용해야 할지 모델이 여러 개가 있을 때 각각의 topic을 받아서 다른 group_id를 줌으로써, offset을 보장받으면서 순차적으로 뽑을 수 있게 한다.
이렇게 생성된것을 자바에서 읽게 하면 될 듯?
/usr/local/kafka/bin/kafka-consoonsumer.sh --bootstrap-server localhost:9092 --from-beginning --topic result
728x90
'꿀팁 분석 환경 설정 > Kafka' 카테고리의 다른 글
[ Python ] Kafka 유용한 Command Class로 만들기 (0) | 2019.08.24 |
---|---|
kafka 자주 사용 명령어 모음 (0) | 2019.08.17 |
[Python] Kafka offset 확인 (0) | 2019.08.17 |
카프카 데이터 플랫폼의 최강자 (1장 ~50pg) (0) | 2019.08.17 |
Celery란? 좋은 자료 (1) | 2019.08.10 |