Kafka MultiProcesisng Queue Test

2019. 10. 21. 00:25꿀팁 분석 환경 설정/Kafka

728x90

도움이 되셨다면, 광고 한번만 눌러주세요.  블로그 관리에 큰 힘이 됩니다 ^^

## Producer

from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np

def on_send_success(record_metadata):
    print("topic : {} , partition : {} , offset : {}".\
          format( record_metadata.topic , record_metadata.partition , record_metadata.offset))


def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         key_serializer = None ,
                         
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
## 
for i in range(200):
    value = np.random.normal(loc = 0 , scale = 2 ,size= 7).astype(str).tolist()
    values = ",".join( value )
    data = {'number' : values }
    producer.send('test', value= str(i) ).\
    add_callback(on_send_success).add_errback(on_send_error) # data 

## Consumer

from kafka import KafkaConsumer
from kafka import TopicPartition
from multiprocessing import Process , Queue, Array , Value
import multiprocessing
from time import sleep
from json import loads

class QueueConsumer(multiprocessing.Process) :
    def __init__(self,  topic , partition , group_id , result_queue , result_queue_max   ) :
        super(QueueConsumer , self).__init__()
        self.consumer = KafkaConsumer(
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest', 
            group_id = group_id , 
            enable_auto_commit= True ,
            value_deserializer=lambda x: loads(x.decode('utf-8')))
        self.partition = partition
        self._tp = TopicPartition(topic , partition)    
        self.consumer.assign([self._tp])
        self.result_queue = result_queue
        self.result_queue_max = result_queue_max
        self._stop = False
    
    def stop(self) :
        self._stop = True
    
    def run(self) :
        num_msg = 0 
        it = 0 
        while not self._stop :
            if self.result_queue.qsize() < self.result_queue_max :
                msg = next(self.consumer)
                print("partiton : ", self.partition , "msg : ", msg.value)
                self.result_queue.put(msg)
                num_msg += 1
            else :
                if it == 0 :
                    print("consumer finished partition : {} offset : {}".format(self.partition , msg.offset))
                    print("qsize : {} max size : {}".format(self.result_queue.qsize() , self.result_queue_max) )
                    it +=1
consumers = []
partitions = 10
topic = "test"
group_id = "test"
import numpy as np
maxsize= 10 
queue = Queue(maxsize= maxsize)
for partition in np.arange(partitions) :
    consumer = QueueConsumer(topic , partition , group_id , queue, maxsize)
    consumer.daemon  = True
    consumers.append(consumer)
for consumer in consumers :
    consumer.start()

def finish() :
    for consumer in consumers :
        consumer.stop()
import signal
def stop_handler(signal, frame): #SIGINT handler정의
    finish()
signal.signal(signal.SIGINT, stop_handler) 
signal.signal(signal.SIGTERM , stop_handler)

Q : Queue Maxsize 를 10으로 설정을 했는데, 왜 print 되는 것은 18개가 될까?

Q : Queue Maxsize 를 10으로 설정을 했는데, 왜 print 되는 것은 18개가 될까?

idx = 0
while True :
    msg = queue.get()
    print(idx , msg.value)
    idx += 1

결과물이 순서대로 1,2,3,... 이렇게 큐에 잘 쌓여서 보내지기를 기대했지만, 그렇게는 안되는 것 같음....

원하는 것은 Partition을 여러개를 줘서 빠른 속도 처리를 하게하고 각 파티션에서 결과를 순서대로 Queue에 쌓은 다음에 순서대로 내뱉게 하는 것이지만, 그게 안되는 것 같음... ㅠㅠ

728x90