Kafka MultiProcesisng Queue Test
·
개발/Kafka
도움이 되셨다면, 광고 한번만 눌러주세요. 블로그 관리에 큰 힘이 됩니다 ^^ ## Producer from time import sleep from json import dumps from kafka import KafkaProducer import numpy as np def on_send_success(record_metadata): print("topic : {} , partition : {} , offset : {}".\ format( record_metadata.topic , record_metadata.partition , record_metadata.offset)) def on_send_error(excp): log.error('I am an errback', exc_info=excp) p..
[ Python ] aiokafka 가 python-kafka 보다 나은 점
·
개발/Kafka
기존에 작업을 python-kafka로 겨우 익숙해졌는데, 비동기적인 처리 방법이 필요하게 됐다. 그래서 찾아보니 비동기 코 루틴인 asyncio 방식이 필요하게 됐다. 처음에는 지나친 패키지였지만 asyncio 개념이 필요하게 돼서 다시 보게 되니 생각보다 간편했다. 그래서 찾은 것이 aiokafka이다! 일단 이걸 하게 된 이유는 다음과 같다. asyncio를 적극적으로 쓴다. python-kafka 보다 거의 동일한 형태라서 익숙하다. python-kafka 보다 좀 더 성능이 좋다고 한다. URL python-kafka는 속도를 중시하지 않았지만, 자바 클라이언트 변화에 빠르게 변화했다. Producer 뿐만 아니라 Consumer도 기능이 python-kafka 보다 훨씬 다양하다. batch ..
[Python] confluent-Kafka 연습하기
·
개발/Kafka
## Producer 1 (topic odd) from confluent_kafka import Producer import numpy as np p = Producer({'bootstrap.servers': 'localhost'}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(..
[ Python ] Kafka 유용한 Command Class로 만들기
·
개발/Kafka
import os from kafka import KafkaConsumer class Commad : def __init__(self, path = "/usr/local/kafka/bin/" ) : self.consumer = KafkaConsumer( bootstrap_servers=['localhost:9092'], auto_offset_reset='latest' , # 'earliest', enable_auto_commit= True ,) self.path = path def show_topic(self,) : """ topic 보여주기 """ print("Topic : " , list(self.consumer.topics())) def create_topic(self, partition , rep..
kafka 자주 사용 명령어 모음
·
개발/Kafka
## 키기 zookeeper -> server /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ## show topic list /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 ## create the topic /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --rep..
[ Python ] kafka consume multiprocessing 해보기
·
개발/Kafka
https://stackoverflow.com/questions/46491616/python-kafka-multiprocess-vs-thread Python Kafka multiprocess vs thread I can use KafkaConsumer to consume messages in separate threads. However, when I use multiprocessing.Process instead of threading.Thread, I get an error: OSError: [Errno 9] Bad file descriptor T... stackoverflow.com 같은 group_id를 가지면서( offset 순차적 ) 같은 토픽에서 정보 가져오기 이렇게 하면 얻는 효과는 더 빨..
[Python] Kafka offset 확인
·
개발/Kafka
Producer from time import sleep from json import dumps from kafka import KafkaProducer import numpy as np producer = KafkaProducer(bootstrap_servers=['localhost:9092'], key_serializer = None , value_serializer=lambda x: dumps(x).encode('utf-8')) ## for _ in range(100): value = np.random.normal(loc = 10 , scale = 20 ,size= 3).astype(str).tolist() values = ",".join( value ) data = {'number' : valu..
카프카 데이터 플랫폼의 최강자 (1장 ~50pg)
·
개발/Kafka
현재 카프카를 공부할 일이 있어서 원래는 그냥 인터넷에 있는 글들을 읽어서 파악하려고 했지만, 영어 바보인 관계로 두리뭉실하게 알고 있는 것 같아서 카프카, 데이터 플랫폼의 최강자 책을 읽어보려고한다. 책에서는 현재 자바/파이썬으로 코드가 조금 있는 것 같아서 더 적합한 것 같다. 다른 책들은 일단 도서관에 없었으므로... 일단 이 책도 좋은 것 같아서 시작한다! 더 읽다가 괜찮으면 사야겠다! ㅎㅎㅎ http://www.yes24.com/Product/Goods/59789254 카프카, 데이터 플랫폼의 최강자 데이터 플랫폼의 핵심 컴포넌트로 각광받고 있는, 이벤트 기반 비동기 아키텍처를 위한 고가용성 실시간 분산 스트리밍 솔루션 카프카(Kafka)의 모든 것!국내 최대 모바일 플랫폼 회사인 카카오에서 `..
java.net.BindException: 주소가 이미 사용 중
·
꿀팁 분석 환경 설정/Linux
Kafka에서 server를 다시 키려고 하는데 사용중이라는 이야기가 나온다. bin/zookeeper-server-start.sh config/zookeeper.properties 그래서 찾아보니 이미 해결해준 사람이 있어서 공유합니다 ## grep으로 port 확인 netstat -nap | grep 2173 ## PID 번호 지우기 맨 오른쪽에 PID/java 있을 것이다. kill PID https://nickjoit.tistory.com/56 java.net.BindException: 주소가 이미 사용 중입니다 tomcat 실행중 해당 오류가 발생했다. 8080 포트가 이미 사용 중인게 문제 이다. 해당 포트의 PID번호를 확인 후 kill 시킨 후에 다시 진행하자. # 우분투 - 포트 확인을 ..
Kafka 설치할 때 참고한 것 - Ubuntu
·
꿀팁 분석 환경 설정/Linux
이걸 보고 설치를 했다. 실제로 하다가, 내가 환경을 잘 못 설정해서 생긴 오류인 dpkg install 오류 해결한것도 다른 곳에 포스팅을 하였다. https://data-newbie.tistory.com/217 Ubuntu16.04 dpkg: error processing package install-info 에러 해결하기 갑자기 apt-get install이 안되는 사태가 벌어져서 허둥지둥 여러가직 시도를 해보고 있다. apt-get install default-jre apt-get install zookeeperd 이걸 설치하려고 했는데, 다음과 같은 에러가 발생했다. dpkg.. data-newbie.tistory.com 정상적으로 작동하는 것을 확인 https://tecadmin.net/ins..
Kafka topic 만들고 써보고 제거해보기
·
개발/Kafka
Kafka에 대해서 알아봐야해서 기초도 몰라서 여러가지 찾아보고 있다 다음 코드는 만들고 확인하는 것이다. 사실 아직 topic이 정확히 이해가 안되지만,,, 일단 해보면서 깨닫기로... ## 하기전 해야하는 것 /kafka/config/server.properties 에 들어가서 delete.topic.enable = True 로 설정해주기 ## topic 만들기 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic x ## 만들어졌는지 확인 bin/kafka-topics.sh --list --zookeeper localhost ## 안에 머라고 궁시렁 궁시렁 b..
Kafka 자료 찾기
·
개발/Kafka
Kafka에 대한 기본 개념들 Topic, Broker, Publish-subscribe messaging system ... and Use Cases https://www.cloudkarafka.com/blog/2016-11-30-part1-kafka-for-beginners-what-is-apache-kafka.html Part 1: Apache Kafka for beginners - What is Apache Kafka? - CloudKarafka, Apache Kafka Message streaming as a Service The first part of Apache Kafka for beginners explains what Kafka is - a publish-subscribe-based d..

AI 도구

AI 도구 사이드 패널

아래 AI 서비스 중 하나를 선택하여 블로그를 보면서 동시에 사용해보세요.

API 키를 입력하세요API 키를 저장하려면 저장 버튼을 클릭하세요API 키가 저장되었습니다
API 키를 입력하세요API 키를 저장하려면 저장 버튼을 클릭하세요API 키가 저장되었습니다
API 키를 입력하세요API 키를 저장하려면 저장 버튼을 클릭하세요API 키가 저장되었습니다
URL과 모델을 입력하세요설정을 저장하려면 저장 버튼을 클릭하세요설정이 저장되었습니다