[ Python ] Kafka 유용한 Command Class로 만들기
2019. 8. 24. 19:13ㆍ꿀팁 분석 환경 설정/Kafka
import os
from kafka import KafkaConsumer
class Commad :
def __init__(self, path = "/usr/local/kafka/bin/" ) :
self.consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest' , # 'earliest',
enable_auto_commit= True ,)
self.path = path
def show_topic(self,) :
"""
topic 보여주기
"""
print("Topic : " , list(self.consumer.topics()))
def create_topic(self, partition , replication , topic) :
"""
partition : 파티션 (int)
replication : 죽었을 때 방지? (int)
"""
command = "{}kafka-topics.sh --create --bootstrap-server localhost:9092".format(self.path)
command2 = "{} --replication-factor {} --partitions {} --topic {}".\
format(command , replication , partition , topic)
if os.system(command2) == 0 :
return "topic `{}` 생성 partition : {} replication : {} ".\
format(topic , partition , replication)
else :
return "topic `{}` 생성 실패".format(topic)
def delete_topic(self , topics) :
"""
topics : topic list (list)
"""
if type(topics) == list :
pass
else :
topics = [topics]
for topic in topics :
c = "{}kafka-topics.sh --zookeeper localhost:2181 --delete --topic {}".\
format(self.path , topic)
if os.system(c) == 0 :
print("Topic `{}` 제거 완료".format(topic))
else :
print("Topic `{}` 제거 실패 or 이미 제거".format(topic))
def delete_Consumer_group(self, group) :
c = "{}kafka-consumer-groups.sh --zookeeper localhost:2181 --delete --group {}".\
format(self.path , group)
if os.system(c) == 0 :
print("Consumer Group `{}` 제거 완료".format(group))
else :
print("Consumer Group `{}` 제거 실패 or 이미 제거".format(group))
728x90
'꿀팁 분석 환경 설정 > Kafka' 카테고리의 다른 글
[ Python ] aiokafka 가 python-kafka 보다 나은 점 (0) | 2019.10.01 |
---|---|
[Python] confluent-Kafka 연습하기 (0) | 2019.09.03 |
kafka 자주 사용 명령어 모음 (0) | 2019.08.17 |
[ Python ] kafka consume multiprocessing 해보기 (0) | 2019.08.17 |
[Python] Kafka offset 확인 (0) | 2019.08.17 |