[Python] confluent-Kafka 연습하기

2019. 9. 3. 17:42꿀팁 분석 환경 설정/Kafka

## Producer 1 (topic odd)

from confluent_kafka import Producer
import numpy as np

p = Producer({'bootstrap.servers': 'localhost'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

idx = 0
while True :
    value = np.random.normal(loc = 10 , scale = 20 ,size= 100).astype(str).tolist()
    values = ",".join( value )
    p.poll(0)
    p.produce('odd', values.encode('utf-8'), callback=delivery_report)
    sleep(1.0)
    if idx % 100 == 0 :
        input("hi")
    idx += 1 
p.flush()

## Producer 2 (topic even)

from confluent_kafka import Producer
import numpy as np
from time import sleep
p = Producer({'bootstrap.servers': 'localhost'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

idx = 0
while True :
    value = np.random.normal(loc = 10 , scale = 20 ,size= 100).astype(str).tolist()
    values = ",".join( value )
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)
    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    #if idx % 2 == 0 :
    p.produce('even', values.encode('utf-8'), callback=delivery_report)
    sleep(2.0)
    if idx % 100 == 0 :
        input("hi")
    idx += 1 
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

Consumer

oven , odd topic 2개를 받은 다음에 multithreading으로 3개 작업 처리하고 queue를 이용해서 쌓인 것을 처리되게 하기

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaError
import numpy as np
from confluent_kafka import TopicPartition ,libversion
import confluent_kafka
from confluent_kafka import OFFSET_BEGINNING  , OFFSET_END
import threading
import time
try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty

def my_assign (consumer, partitions):
    for p in partitions:
        p.offset = confluent_kafka.OFFSET_END
    print('assign', partitions)
    consumer.assign(partitions)
    
class IntendedException (Exception):
    pass

def thread_run(myid, p, q):
    def do_crash(err, msg):
        raise IntendedException()
    c = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': str(myid),
    "enable.auto.commit":False,
    'auto.offset.reset': 'earliest'})
    c.subscribe(["even", "odd"], on_assign=my_assign)
    while True:
        m = c.poll(1)
        if m is None:
            continue

        if m.error() is None:
            values = m.value().decode('utf-8').split(",")
            values = [float(x) for x in values ]
            print('id : {} topic : {} mean : {}'.format( str(myid) , m.topic(), np.mean(values)  ))
            q.put(m.topic() + " " + str(m.offset()))
            
            print(q.qsize())
        if q.qsize() == 3 :
            while q.qsize():
                print(q.get_nowait())
            with q.mutex:
                q.queue.clear()

    print(myid, 'Done')
    

def test_thread_safety():
    """ Basic thread safety tests. """

    q = Queue(maxsize=3)
    print("Queue maxsize", q.maxsize)
    p = Producer({'socket.timeout.ms': 10,
                  'message.timeout.ms': 10})

    threads = list()
    for i in range(1, 4):
        thr = threading.Thread(target=thread_run, name=str(i), args=[i, p, q])
        thr.start()
        threads.append(thr)
    
    for thr in threads:
        thr.join()
    # Count the number of threads that exited cleanly
#     while q.qsize():
#         print(q.get_nowait())
    cnt = 0
    try:
        for x in iter(q.get_nowait, None):
            print(x)
            cnt += 1
    except Empty:
        pass

    if cnt != len(threads):
        raise Exception('Only %d/%d threads succeeded' % (cnt, len(threads)))
    
    print('Done')


if __name__ == '__main__':
    test_thread_safety()
728x90