[ Python ] kafka consume multiprocessing 해보기

2019. 8. 17. 17:10꿀팁 분석 환경 설정/Kafka

728x90

https://stackoverflow.com/questions/46491616/python-kafka-multiprocess-vs-thread

 

Python Kafka multiprocess vs thread

I can use KafkaConsumer to consume messages in separate threads. However, when I use multiprocessing.Process instead of threading.Thread, I get an error: OSError: [Errno 9] Bad file descriptor T...

stackoverflow.com

 

같은 group_id를 가지면서( offset 순차적 ) 같은 토픽에서 정보 가져오기 

이렇게 하면 얻는 효과는 더 빨리 메시지를 읽는 것?

from kafka import KafkaConsumer
from json import loads
class KafkaWrapper():
    def __init__(self):
        self.consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                                      group_id='sr',
                                      value_deserializer=lambda x: loads(x.decode('utf-8')))
    def consume(self, topic):
        self.consumer.subscribe(topic)
        for message in self.consumer:
            print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
            
class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic):
        self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass
    
from multiprocessing import Process
def main():
    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread here instead of Process
    jobs.append(Process(target=serviceA.start, args=("numtest",)))
    jobs.append(Process(target=serviceB.start, args=("numtest",)))

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()
if __name__ == "__main__":
    main()

서로 다른 토픽에서 데이터 불러오기

from kafka import KafkaConsumer
from json import loads
from json import dumps
from kafka import KafkaProducer
class KafkaWrapper():
    def __init__(self,) :
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
    
    def on_send_success(self, record_metadata):
        print("topic : {} , partition : {} , offset : {}".\
              format( record_metadata.topic , record_metadata.partition , record_metadata.offset))
    
    def consume(self, topic , group):
        consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                                      group_id=group,
                                      value_deserializer=lambda x: loads(x.decode('utf-8')))
        consumer.subscribe(topic)
        for message in consumer:
            print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
            data = {group : str(message.offset)}
            #self.producer.send( group + "A", value = data )
            #.add_callback(on_send_success)
            
class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic , group):
        self.kafka_wrapper.consume(topic , group)
class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass
    
from multiprocessing import Process
def main():
    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread here instead of Process
    jobs.append(Process(target=serviceA.start, args=("my_topic","SR1",))) # my_topic
    jobs.append(Process(target=serviceB.start, args=("my-topic","SR2",))) # my-topic

    for job in jobs:
        job.start()
        

    for job in jobs:
        job.join()
if __name__ == "__main__":
    main()

이미 기존에 쌓여있을때

 

실시간으로 들어온 데이터가 순서가 있을 때

그냥 들어온 순서대로 처리하는 것 같음

from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np


producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
for i in range(200):
    if i % 2 == 0 :
        value = np.random.normal(loc = 10 , scale = 20 ,size= 5).astype(str).tolist()
        values = ",".join( value )
        data = {'number_1' : values }
        producer.send('my_topic', value=data)
    else :
        value = np.random.normal(loc = 0 , scale = 10 ,size= 5).astype(str).tolist()
        values = ",".join( value )
        data = {'number_2' : values }
        producer.send('my-topic', value=data)
    

 

하지만 아직 이것을 처리한 다음에 다시 Producer send 하는 것이 안됨.........ㅠ

안에다가 Producer를 정의해주면 된다.

from kafka import KafkaConsumer
from json import loads
from json import dumps
from kafka import KafkaProducer


def on_send_success(record_metadata):
    print("topic : {} , partition : {} , offset : {}".\
          format( record_metadata.topic , record_metadata.partition , record_metadata.offset))

class KafkaWrapper():
    def __init__(self,) :
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
    
    
    def consume(self, topic , group):
        consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                                      group_id=group,
                                      value_deserializer=lambda x: loads(x.decode('utf-8')))
        consumer.subscribe(topic)
        for message in consumer:
            print ("topic=%s partition=%d offset=%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
            data = {group : str(message.offset)}
            producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
            producer.send( "result", value = data ).add_callback(on_send_success)
            
class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic , group):
        self.kafka_wrapper.consume(topic , group)
class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass
    
from multiprocessing import Process
def main():
    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread here instead of Process
    jobs.append(Process(target=serviceA.start, args=("my_topic","SR1",))) # my_topic
    jobs.append(Process(target=serviceB.start, args=("my-topic2","SR3",))) # my-topic

    for job in jobs:
        job.start()
        

    for job in jobs:
        job.join()
if __name__ == "__main__":
    main()

음 어떻게 활용해야 할지 모델이 여러 개가 있을 때 각각의 topic을 받아서 다른 group_id를 줌으로써, offset을 보장받으면서 순차적으로 뽑을 수 있게 한다.

 

이렇게 생성된것을 자바에서 읽게 하면 될 듯? 

 

/usr/local/kafka/bin/kafka-consoonsumer.sh --bootstrap-server localhost:9092 --from-beginning --topic result

728x90