꿀팁 분석 환경 설정/Kafka
Kafka MultiProcesisng Queue Test
데이터분석뉴비
2019. 10. 21. 00:25
728x90
도움이 되셨다면, 광고 한번만 눌러주세요. 블로그 관리에 큰 힘이 됩니다 ^^
## Producer
from time import sleep
from json import dumps
from kafka import KafkaProducer
import numpy as np
def on_send_success(record_metadata):
print("topic : {} , partition : {} , offset : {}".\
format( record_metadata.topic , record_metadata.partition , record_metadata.offset))
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
key_serializer = None ,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
##
for i in range(200):
value = np.random.normal(loc = 0 , scale = 2 ,size= 7).astype(str).tolist()
values = ",".join( value )
data = {'number' : values }
producer.send('test', value= str(i) ).\
add_callback(on_send_success).add_errback(on_send_error) # data
## Consumer
from kafka import KafkaConsumer
from kafka import TopicPartition
from multiprocessing import Process , Queue, Array , Value
import multiprocessing
from time import sleep
from json import loads
class QueueConsumer(multiprocessing.Process) :
def __init__(self, topic , partition , group_id , result_queue , result_queue_max ) :
super(QueueConsumer , self).__init__()
self.consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id = group_id ,
enable_auto_commit= True ,
value_deserializer=lambda x: loads(x.decode('utf-8')))
self.partition = partition
self._tp = TopicPartition(topic , partition)
self.consumer.assign([self._tp])
self.result_queue = result_queue
self.result_queue_max = result_queue_max
self._stop = False
def stop(self) :
self._stop = True
def run(self) :
num_msg = 0
it = 0
while not self._stop :
if self.result_queue.qsize() < self.result_queue_max :
msg = next(self.consumer)
print("partiton : ", self.partition , "msg : ", msg.value)
self.result_queue.put(msg)
num_msg += 1
else :
if it == 0 :
print("consumer finished partition : {} offset : {}".format(self.partition , msg.offset))
print("qsize : {} max size : {}".format(self.result_queue.qsize() , self.result_queue_max) )
it +=1
consumers = []
partitions = 10
topic = "test"
group_id = "test"
import numpy as np
maxsize= 10
queue = Queue(maxsize= maxsize)
for partition in np.arange(partitions) :
consumer = QueueConsumer(topic , partition , group_id , queue, maxsize)
consumer.daemon = True
consumers.append(consumer)
for consumer in consumers :
consumer.start()
def finish() :
for consumer in consumers :
consumer.stop()
import signal
def stop_handler(signal, frame): #SIGINT handler정의
finish()
signal.signal(signal.SIGINT, stop_handler)
signal.signal(signal.SIGTERM , stop_handler)
Q : Queue Maxsize 를 10으로 설정을 했는데, 왜 print 되는 것은 18개가 될까?
idx = 0
while True :
msg = queue.get()
print(idx , msg.value)
idx += 1
결과물이 순서대로 1,2,3,... 이렇게 큐에 잘 쌓여서 보내지기를 기대했지만, 그렇게는 안되는 것 같음....
원하는 것은 Partition을 여러개를 줘서 빠른 속도 처리를 하게하고 각 파티션에서 결과를 순서대로 Queue에 쌓은 다음에 순서대로 내뱉게 하는 것이지만, 그게 안되는 것 같음... ㅠㅠ
728x90