[Python] confluent-Kafka 연습하기
2019. 9. 3. 17:42ㆍ꿀팁 분석 환경 설정/Kafka
## Producer 1 (topic odd)
from confluent_kafka import Producer
import numpy as np
p = Producer({'bootstrap.servers': 'localhost'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
idx = 0
while True :
value = np.random.normal(loc = 10 , scale = 20 ,size= 100).astype(str).tolist()
values = ",".join( value )
p.poll(0)
p.produce('odd', values.encode('utf-8'), callback=delivery_report)
sleep(1.0)
if idx % 100 == 0 :
input("hi")
idx += 1
p.flush()
## Producer 2 (topic even)
from confluent_kafka import Producer
import numpy as np
from time import sleep
p = Producer({'bootstrap.servers': 'localhost'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
idx = 0
while True :
value = np.random.normal(loc = 10 , scale = 20 ,size= 100).astype(str).tolist()
values = ",".join( value )
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
#if idx % 2 == 0 :
p.produce('even', values.encode('utf-8'), callback=delivery_report)
sleep(2.0)
if idx % 100 == 0 :
input("hi")
idx += 1
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
Consumer
oven , odd topic 2개를 받은 다음에 multithreading으로 3개 작업 처리하고 queue를 이용해서 쌓인 것을 처리되게 하기
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaError
import numpy as np
from confluent_kafka import TopicPartition ,libversion
import confluent_kafka
from confluent_kafka import OFFSET_BEGINNING , OFFSET_END
import threading
import time
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty
def my_assign (consumer, partitions):
for p in partitions:
p.offset = confluent_kafka.OFFSET_END
print('assign', partitions)
consumer.assign(partitions)
class IntendedException (Exception):
pass
def thread_run(myid, p, q):
def do_crash(err, msg):
raise IntendedException()
c = Consumer({
'bootstrap.servers': 'localhost',
'group.id': str(myid),
"enable.auto.commit":False,
'auto.offset.reset': 'earliest'})
c.subscribe(["even", "odd"], on_assign=my_assign)
while True:
m = c.poll(1)
if m is None:
continue
if m.error() is None:
values = m.value().decode('utf-8').split(",")
values = [float(x) for x in values ]
print('id : {} topic : {} mean : {}'.format( str(myid) , m.topic(), np.mean(values) ))
q.put(m.topic() + " " + str(m.offset()))
print(q.qsize())
if q.qsize() == 3 :
while q.qsize():
print(q.get_nowait())
with q.mutex:
q.queue.clear()
print(myid, 'Done')
def test_thread_safety():
""" Basic thread safety tests. """
q = Queue(maxsize=3)
print("Queue maxsize", q.maxsize)
p = Producer({'socket.timeout.ms': 10,
'message.timeout.ms': 10})
threads = list()
for i in range(1, 4):
thr = threading.Thread(target=thread_run, name=str(i), args=[i, p, q])
thr.start()
threads.append(thr)
for thr in threads:
thr.join()
# Count the number of threads that exited cleanly
# while q.qsize():
# print(q.get_nowait())
cnt = 0
try:
for x in iter(q.get_nowait, None):
print(x)
cnt += 1
except Empty:
pass
if cnt != len(threads):
raise Exception('Only %d/%d threads succeeded' % (cnt, len(threads)))
print('Done')
if __name__ == '__main__':
test_thread_safety()
728x90
'꿀팁 분석 환경 설정 > Kafka' 카테고리의 다른 글
Kafka MultiProcesisng Queue Test (0) | 2019.10.21 |
---|---|
[ Python ] aiokafka 가 python-kafka 보다 나은 점 (0) | 2019.10.01 |
[ Python ] Kafka 유용한 Command Class로 만들기 (0) | 2019.08.24 |
kafka 자주 사용 명령어 모음 (0) | 2019.08.17 |
[ Python ] kafka consume multiprocessing 해보기 (0) | 2019.08.17 |