kafka(13)
-
Kafka MultiProcesisng Queue Test
도움이 되셨다면, 광고 한번만 눌러주세요. 블로그 관리에 큰 힘이 됩니다 ^^ ## Producer from time import sleep from json import dumps from kafka import KafkaProducer import numpy as np def on_send_success(record_metadata): print("topic : {} , partition : {} , offset : {}".\ format( record_metadata.topic , record_metadata.partition , record_metadata.offset)) def on_send_error(excp): log.error('I am an errback', exc_info=excp) p..
2019.10.21 -
[ Python ] aiokafka 가 python-kafka 보다 나은 점
기존에 작업을 python-kafka로 겨우 익숙해졌는데, 비동기적인 처리 방법이 필요하게 됐다. 그래서 찾아보니 비동기 코 루틴인 asyncio 방식이 필요하게 됐다. 처음에는 지나친 패키지였지만 asyncio 개념이 필요하게 돼서 다시 보게 되니 생각보다 간편했다. 그래서 찾은 것이 aiokafka이다! 일단 이걸 하게 된 이유는 다음과 같다. asyncio를 적극적으로 쓴다. python-kafka 보다 거의 동일한 형태라서 익숙하다. python-kafka 보다 좀 더 성능이 좋다고 한다. URL python-kafka는 속도를 중시하지 않았지만, 자바 클라이언트 변화에 빠르게 변화했다. Producer 뿐만 아니라 Consumer도 기능이 python-kafka 보다 훨씬 다양하다. batch ..
2019.10.01 -
[Python] confluent-Kafka 연습하기
## Producer 1 (topic odd) from confluent_kafka import Producer import numpy as np p = Producer({'bootstrap.servers': 'localhost'}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(..
2019.09.03 -
[ Python ] Kafka 유용한 Command Class로 만들기
import os from kafka import KafkaConsumer class Commad : def __init__(self, path = "/usr/local/kafka/bin/" ) : self.consumer = KafkaConsumer( bootstrap_servers=['localhost:9092'], auto_offset_reset='latest' , # 'earliest', enable_auto_commit= True ,) self.path = path def show_topic(self,) : """ topic 보여주기 """ print("Topic : " , list(self.consumer.topics())) def create_topic(self, partition , rep..
2019.08.24 -
kafka 자주 사용 명령어 모음
## 키기 zookeeper -> server /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ## show topic list /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 ## create the topic /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --rep..
2019.08.17 -
[ Python ] kafka consume multiprocessing 해보기
https://stackoverflow.com/questions/46491616/python-kafka-multiprocess-vs-thread Python Kafka multiprocess vs thread I can use KafkaConsumer to consume messages in separate threads. However, when I use multiprocessing.Process instead of threading.Thread, I get an error: OSError: [Errno 9] Bad file descriptor T... stackoverflow.com 같은 group_id를 가지면서( offset 순차적 ) 같은 토픽에서 정보 가져오기 이렇게 하면 얻는 효과는 더 빨..
2019.08.17 -
[Python] Kafka offset 확인
Producer from time import sleep from json import dumps from kafka import KafkaProducer import numpy as np producer = KafkaProducer(bootstrap_servers=['localhost:9092'], key_serializer = None , value_serializer=lambda x: dumps(x).encode('utf-8')) ## for _ in range(100): value = np.random.normal(loc = 10 , scale = 20 ,size= 3).astype(str).tolist() values = ",".join( value ) data = {'number' : valu..
2019.08.17 -
카프카 데이터 플랫폼의 최강자 (1장 ~50pg)
현재 카프카를 공부할 일이 있어서 원래는 그냥 인터넷에 있는 글들을 읽어서 파악하려고 했지만, 영어 바보인 관계로 두리뭉실하게 알고 있는 것 같아서 카프카, 데이터 플랫폼의 최강자 책을 읽어보려고한다. 책에서는 현재 자바/파이썬으로 코드가 조금 있는 것 같아서 더 적합한 것 같다. 다른 책들은 일단 도서관에 없었으므로... 일단 이 책도 좋은 것 같아서 시작한다! 더 읽다가 괜찮으면 사야겠다! ㅎㅎㅎ http://www.yes24.com/Product/Goods/59789254 카프카, 데이터 플랫폼의 최강자 데이터 플랫폼의 핵심 컴포넌트로 각광받고 있는, 이벤트 기반 비동기 아키텍처를 위한 고가용성 실시간 분산 스트리밍 솔루션 카프카(Kafka)의 모든 것!국내 최대 모바일 플랫폼 회사인 카카오에서 `..
2019.08.17 -
java.net.BindException: 주소가 이미 사용 중
Kafka에서 server를 다시 키려고 하는데 사용중이라는 이야기가 나온다. bin/zookeeper-server-start.sh config/zookeeper.properties 그래서 찾아보니 이미 해결해준 사람이 있어서 공유합니다 ## grep으로 port 확인 netstat -nap | grep 2173 ## PID 번호 지우기 맨 오른쪽에 PID/java 있을 것이다. kill PID https://nickjoit.tistory.com/56 java.net.BindException: 주소가 이미 사용 중입니다 tomcat 실행중 해당 오류가 발생했다. 8080 포트가 이미 사용 중인게 문제 이다. 해당 포트의 PID번호를 확인 후 kill 시킨 후에 다시 진행하자. # 우분투 - 포트 확인을 ..
2019.08.04 -
Kafka 설치할 때 참고한 것 - Ubuntu
이걸 보고 설치를 했다. 실제로 하다가, 내가 환경을 잘 못 설정해서 생긴 오류인 dpkg install 오류 해결한것도 다른 곳에 포스팅을 하였다. https://data-newbie.tistory.com/217 Ubuntu16.04 dpkg: error processing package install-info 에러 해결하기 갑자기 apt-get install이 안되는 사태가 벌어져서 허둥지둥 여러가직 시도를 해보고 있다. apt-get install default-jre apt-get install zookeeperd 이걸 설치하려고 했는데, 다음과 같은 에러가 발생했다. dpkg.. data-newbie.tistory.com 정상적으로 작동하는 것을 확인 https://tecadmin.net/ins..
2019.08.04 -
Kafka topic 만들고 써보고 제거해보기
Kafka에 대해서 알아봐야해서 기초도 몰라서 여러가지 찾아보고 있다 다음 코드는 만들고 확인하는 것이다. 사실 아직 topic이 정확히 이해가 안되지만,,, 일단 해보면서 깨닫기로... ## 하기전 해야하는 것 /kafka/config/server.properties 에 들어가서 delete.topic.enable = True 로 설정해주기 ## topic 만들기 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic x ## 만들어졌는지 확인 bin/kafka-topics.sh --list --zookeeper localhost ## 안에 머라고 궁시렁 궁시렁 b..
2019.08.04 -
Kafka 자료 찾기
Kafka에 대한 기본 개념들 Topic, Broker, Publish-subscribe messaging system ... and Use Cases https://www.cloudkarafka.com/blog/2016-11-30-part1-kafka-for-beginners-what-is-apache-kafka.html Part 1: Apache Kafka for beginners - What is Apache Kafka? - CloudKarafka, Apache Kafka Message streaming as a Service The first part of Apache Kafka for beginners explains what Kafka is - a publish-subscribe-based d..
2019.08.02