single instance 이고 zookeeper는 kafka 기본제공되는것으로 사용함
zookeeper와 broker 도 1:1로 설정해서 기타 설정 건드리지 않음.
실행시 입력값은 window와 동일
download https://kafka.apache.org/downloads
wget 으로 원하는 버전 download
tar -xvf kafka.tar
${kafka_home|/config/zookeeper.properties dir 경로 수정
${kafka_home|/config/server.properties dir 경로 수정
버전 9이하
host.name = 0.0.0.0 혹은 주석
advertised.host.name = public ip 혹은 public host name (consumer나 producer에서 접속할 ip혹은 도메인)
버전 10이상
listeners=PLAINTEXT://:9092
consumer 콘솔이나 producer 콘솔을 다른 서버에서 접속할 경우
config/consumer.properties 에 zookeeper 접속 주키버 주소로 변경
config/producer.properies에 meta.broker 머시기 접속할 broker 주소로 변경
--------------------------------
하단 참고>> https://free-strings.blogspot.kr/2016/04/blog-post.html
broker.id
기본값은 0이다. 클러스터내에서 브로커를 구분 하는 이름으로 사용되며, 브로커를 구분하는 유니크한 값이기 때문에 브로커 호스트가 바뀌거나(다른 서버로 옮기거나) 포트가 바뀌어도 컨슈머(consumer) 설정을 바꾸지 않아도 된다.
host.name
port
log.dirs
브로커가 시작될때 주키퍼에 IP/PORT가 등록 될때, InetAddress.getLocalHost.getHostAddress가 사용된다. 그래서 클라우드 환경이나 Docker, Vagrant 같은 환경일 경우는 가끔 consumer/producer가 브로커에 접속이 되지 않는 경우가 생긴다. 이때는 host.name을 속성을 지정하면 해결 할 수있다. 드물게 브로커에 바인딩된 IP/PORT와 클라이언트가 사용하는 IP/PORT가 다를 경우는 advertise.host.name, advertise.port를 지정해 주면 된다.
쓰레드 및 네트워크 설정
message.max.bytes
스택오버플로우 참
Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer.
num.network.threads
네트워크 요청 처리란 NIO Selector (java.nio.channels.Selector)개수를 말한다.
SocketServer.scala#L54, SocketServer.scala#L92 , SocketServer.scala#394 , Selector.java#L109
/** * Create a new nioSelector */ public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) { try { this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); }
num.io.threads
IO 가 생길때 마다 spawn되는 쓰레드 수로, 로그를 저장하는 데 사용 되는 디스크 수와 동일하게 지정 하는것을 권장한다. 기본값은 8.background.threads
여러가지 백그라운드 작업을 하는 쓰레드 수를 말함. 오래된 로그를 삭제하는 쓰레드도 포함하고 있다. 기본 값은 10queued.max.requests
IO 쓰레드가 처리하는 동안 지연된 메세지를 보관하는 큐의 크기. 큐가 가득차면 네트워크 쓰레드는 더이상 메세지를 받지 않는다.socket.send.buffer.bytes
소켓 커넥션에 사용될 SO_SNDBUF 버퍼 크기. 기본값은 102400.소켓을 생성하면 소켓에는 send buffer와 receive buffer가 각각 할당 되는데, send buffer에 대한 설정이다. 커널에 힌트를 주는 값으로 어프리케이션에서 설정한 값대로 적용되지는 않는다. 가령 0으로 설정하거나 가용한 최대값을 넘었을 경우 커널이 자동 조절 한다.
Set a hint the size of the underlying buffers used by the platform for outgoing network I/O. When used in set, this is a suggestion to the kernel from the application about the size of buffers to use for the data to be sent over the socket. When used in get, this must return the size of the buffer actually used by the platform when sending out data on this socket.
socket.receive.buffer.bytes
소켓 커넥션에 사용될 SO_RCVBUFF 버퍼 크기. 기본값은 102400.socket.request.max.bytes
서버가 받을 수 있는 최대 요청 개수. 서버 메모리가 고갈 되는걸 방지한다. 자바 힙크기 보다 작게 설정해야 한다. 기본값은 104857600.num.partitions
토픽당 로그 파티션의 기본 개수. 파티션 크기를 명시적으로 지정하지 않아도 된다.로그 설정
log.segment.bytes
단일 로그파일의 최대 크기를 의미한다. 로그파일이 세그먼트 크기만큼 되면 새로운 세그먼트 파일이 생긴다. 토픽은 디렉토리안에 여러 세그먼트 파일로 저장된다. 토픽마다 기본적용 되며(per-topic) 기본 값은 1GB.log.roll.{ms,hours}
새로운 세그먼트 파일이 생기는 주기. 세그먼트 크기가 되지 않아도 주기별로 파일이 나뉜다. 토픽 기본설정. 기본값은 7일log.cleanup.policy
"delete"와 "compact" 두가지 모드가 있다. "delete"는 시간 설정이나 크기 설정에 따라 주기적으로 로그 세그먼트를 삭제하고 "compact"는 필요없는 레코드를 지운다. 토픽 기본설정(per-topic)이다.log.retention.{ms,minutes,hours}
로그 세그먼트가 보관될 시간. 토픽 기본설정(per-topic). 기본 7일.log.retention.bytes
삭제전까지 파티션별로 보관할 로그 바이트 수. 토픽 기본설정(per-topic). 설정한 로그 시간이나 크기가 되면 세그먼트는 삭제됨.log.retention.check.interval.ms
로그를 삭제 하기 위해 체크하는 주기. 기본 5분.log.cleaner.enable
로그 compaction 설정. 기본 true.log.cleaner.threads
로그 compaction 작업을 할 쓰레드 수. 기본 1.log.cleaner.backoff.ms
로그 cleaning이 필요한지 체크하는 주기. 기본 15000log.index.size.max.bytes
로그 세그먼트의 최대 오프셋 인덱스. 토픽 기본(per-topic). 기본값 10485760인덱스에 대한 설명(OffsetIndex.scala) ... 인덱스는 특정 로그 세그먼트의 위치를 물리적인 파일에 오프셋을 매핑한다. 이 인덱스는 Sparse 하다: 로그에 전체 메세지를 위한 엔트리를 보관하지 않는다.
인덱스는 최대 8-byte entry의 고정된 숫자로 미리 할당되어 파일로 저장된다.
인덱스는 이 파일의 메모리맵 검색을 지원한다. 타겟 오프셋과 같거나 가장 큰 오프셋보다 작은 오프셋/위치 쌍의 위치를 찾기위해 간단한 변종 이진검색을 한다.
인덱스 파일은 두가지 방식으로 열린다: 비어있거나 변경될 수 있는 인덱스로 추가가 될 수 있는 인덱스 파일 또는 이전에 내용이 덧붙여져(populated) 변경될 수 없는 읽기전용 인덱스 파일...
why-index-file-exists-in-kafka-log-directory
log.index.interval.bytes
오프셋 인덱스에 새엔트리가 추가될 주기로, 브로커는 패치 요청마다 로그파일을 정확한 위치를 찾을 때 까지 순차적으로 스캔한다. 기본값은 4096.log.flush.interval.message
디스크에 플러쉬 될때 까지 메모리에 유지될 메세지의 숫자.로그 압축(compaction) (관련링크) 로그 압축은 카프카가 단일 파티션의 데이터를 담은 로그에 있는 각 키값들이 항상 가장 마지막 값을 보관 할 수 있게 보장한다. 어플리케이션이 크래시 난 후, 시스템이 실패했을때 또는 Operational maintenance 동안 어플리케이션이 재시작 한 후 크래시 된것들을 재로딩하는 유즈케이스나 시나리오등에 사용 될 수 있다. ... 이런 작업은 각 레코드가 독립적인(stands alone) 로깅 즉, 임시 이벤트 데이타 같은 데는 적합하다. 그렇지만, 데이터 스트림에서 중요한것은 변경이 중심이된(keyed)로그, 변경 데이타(mutable data)다 – 예, 데이터베이스 테이블 변경)
스트림과 같은 구체적인 예를 다뤄보자. 우리가 사용자 메일 주소를 포함하는 토픽을 가지고 있다고 하자; 매시간 사용자는 메일 주소를 업데이트 한다. 우리는 사용자의 USER ID를 주키(primary key)로 이 토픽에 메세지를 보낸다. 자 우리가 특정 시간에 걸쳐 아래 메세지를 보낸다고 해보자.
123 => bill@microsoft.com
.
.
.
123 => bill@gatesfoundation.org
.
.
.
123 => bill@gmail.com
각 메세지 주키에 대해 최소한 가장 마지막 업데이트를 보관 될 수 있게 보장하기 위해서 로그 압축은 너무 잘게 나눠지지 않은(granular)보관 메커니즘을 제공한다. 이 메커니즘은 단지 최근에 변경된 키만 보관하는게 아니라 모든 키의 마지막 값을 가지는 전체 스냅샷 로그를 보장하는 것이다. 이것은 다운스트림 컨슈머가 전체 변경로그를 보관하지 않고 토픽과 별개로 스스로의 상태를 복구 할 수 있음을 의미한다.
몇가지 유용한 사용예를 살펴보면,,
1. 데이터베이스 변화 구독. 여러 데이터 시스템에 데이터셋을 가질 필요가 종종 있고 대개 이 시스템중 하나가 어떤 종류의 데이터베이스(RDBMS, key-value store,,) 이기도 하다. 예를들어, 데이터베이스, 캐쉬, 검색 클러스터, 그리고 하둡 클러스트를 가지고 있을 수 있다. 각 시스템이 데이터베이스에 가하는 변경은 캐쉬에, 검색 클러스터에, 그리고 결과적으로 하둡에 반영될 필요가 있다. 시스템중 하나가 실시간 업데이트 처리만 하는 경우는 단지 최신 로그만 필요하다. 그러나 만약 캐쉬를 다시 로딩할 수 있는걸 원하거나 실패한 검색노드를 복구하길 원한다면, 일부가 아닌 전체 데이터 셋이 필요할 수 있다.
2. 이벤트 소싱: https://msdn.microsoft.com/en-us/library/jj591559.aspx#sec1, http://martinfowler.com/eaaDev/EventSourcing.html 이벤트 소싱은 어프리케이션의 현재 상태를 결정하는 히스토리를 저장해서 어프리케이션의 상태를 보관하는 방법.
3. 저널링 파일 시스템 : https://ko.wikipedia.org/wiki/%EC%A0%80%EB%84%90%EB%A7%81_%ED%8C%8C%EC%9D%BC_%EC%8B%9C%EC%8A%A4%ED%85%9C
고가용성을 위한 저널링. 변경을 로깅 함으로써 로컬 계산 처리가 Fault-tolerant 될 수 있게 한다. 이 변경들은 로컬 변경을 만들 수 있게 해서 다른 프로세스가 이 변경을 재로딩 할 수 있고 만약 실패 하더라도 계속 진행 할 수 있게 한다. 구체적인 예로 카운트 처리, 데이터 통합, 그리고 "group by" (스트림 쿼리 시스템에서 처리와 같은)게 있다. 실시간 스트림 처리 프레임워크인 Samza는 정확히 이 목적으로 사용한다.
각각의 경우에서 한가지는 주로 변경의 실시간 피드 처리가 주로 필요하다. 그러나 경우에 따라, 머신이 크래쉬 되거나 데이터가 재로딩 될 필요가 있거나 재처리 될 필요가 있을때 한가지는 전체 로딩을 필요로 한다는 것이다. 로그 압축은 같은 토픽에서 이런 사례 모두를 피딩되게 한다. 이런 로그의 사용 스타일은 https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying 에서 더 자세히 설명 되었다.
보통 아이디어는 꽤 단순하다. 만약 우리가 제약없이 로그를 보관 한다면, 그리고 우리가 위 케이스들에서 각 변경을 로깅 한다면, 우리는 최초 시작부터 매번 시스템의 상태를 캡쳐 해야 한다. 이 전체 로그를 사용함으로써 처음 N개의 레코드를 재연(replay)해서 제때 어떤 지점으로 복구 할 수 있다. 이 가상적인 전체 로그는 아주 현실적이지지 않다. 로그로 단일 레코드를 많이 업데이트 하는 시스템이라면 끝없이 커질것이다. 오래된 업데이트를 버리는 간단한 로그 보관 메커니즘은 공간을 제약하지만 로그는 더이상 현재상태를 복구 할 방법이 없다.
로그 압축은 레코드당 보관법에 finer-grained를 주는 메커니즘이다. coarser-grained한 시간기반 보관법보다 이 아이디어는 같은 키로 더 최신 업데이트를 가져 선택적으로 레코드를 삭제한다. 이 방법은 최소한 키별로 마지막 상태를 가지는 로그가 보장된다.
이 보관 정책은 토픽단 설정 될 수 있다. 그래서 단일 클러스터는 크기나 시간으로 보관이 강제된 몇몇 토픽을 가질 수 있고 다른 토픽들은 압축이 강제된 보관 정책을 가질 수 있다.
이 기능은 링크드인의 가장 오래된것들중 하나에서 영감을 받았다 그리고 가장 성공한 인프라의 부분이다. 데이터베이스 변경 로그 캐싱 서비스인 Databus - 대부분의 구조화된 로그 시스템 같이 않게 카프카는 구독을 위해 만들어 졌다 그리고 빠른 순차 읽기와 쓰기를 위해데이터를 조직화 한다. Databus와 달리, 카프카는 source-of-truth 저장 동작을 한다. 그래서 심지어 업스트림 데이터 소스가 재연(replayable) 될 수 없는 상황에서도 유용하다.
복제 세팅
default.replication.factor
자동으로 생성된 토픽에 대한 기본 복제 인수(factor)다. 기본 1replica.fetch.max.bytes
팔로워가 리더에서 복제 할때 패치할 데이터의 최대 바이트 양. 1048576replica.fetch.wait.max.ms
복제 패치 요청에 리더가 응답할 최대 시간. 기본 500num.replica.fetchers
리더에서 메세지를 복제할때 사용할 쓰레드 수로 쓰레드 수를 늘리면 IO 비율도 높아짐. 기본 1replica.high.watermark.checkpoint.interval.ms
각 복제가 복구를 위해 디스크에 워터마크를 저장하는 빈도. 기본 5000주키퍼 세팅
zookeeper.connect
hostname:port 형식으로 주키퍼 커넥션을 지정한다. 콤파(,)로 구분해 여러 주키퍼 노드를 지정 할 수 있다. 여러 노드를 지정하는 것은 주키퍼 노드 하나가 다운 되어도 카프카 클러스터가 신뢰도와 지속성을 가지게 하기 위함. 주키퍼는 특정 패스에 카프카 데이타를 만들 수 있게 chroot 패스를 허용한다. chroot는 동일 주키퍼 클러스터가 여러 카프카 클러스터를 지원 할 수 있게 한다. 예) host1:port1,host2:port2,host3:port3/chroot/path. 마지막 설정은 모든 클러스터 데이터를 /chroot/path에 넣는다. 이 패스는 반드시 카프카가 시작되기 전에 생성되어야 한다. 그리고 컨슈머는 반드시 동일한 문자열(패스)을 사용해야 한다.zookeeper.session.timeout.ms
서버로부터 하트비트를 받지 못하면 죽은것으로 간주한다. 이 값은 반드시 주의깊게 설정되어야 하는데, 만약 이 하트비트가 주기 너무 길면 죽은서버를 감지하지 못하고, 짧으면 살아 있는 서버가 죽은것으로 간주 된다. 기본값은 6000zookeeper.sync.time.ms
주키퍼 팔로워가 리더 뒤에 있을 수 있는 시간을 지정한다? 기본 2000.기타 설정
auto.create.topics.enable
이 값을 true로 놓았을때, 만약 존재하지 않는 토픽에 메타데이터를 패치하거나 메세지를 생성하면, 자동으로 생성된다. 운영환경에서는 false로 둬야한다.controlled.shutdown.enable
true로 놓았을때, 브로커에 셧다운이 호출되면, 그리고 이 브로커가 어떤 토픽의 리더일때, 리더를 셧다운전에 다른 브로커로 옮긴다. 이 설정은 시스템 가용성을 높일 수 있다. 기본값은 true.auto.leader.rebalance.enable
true일때, 가능하다면 각 파티션의 우선하는 복제에 주기적으로 리더쉽을 세팅해서, 브로커는 자동으로 브로커간 파티션의 리더쉽 균형을 맞추려고 시도한다. (우선하는 복제??) 기본값 true.leader.imbalance.per.broker.percentage
브로커별로 허락된 리더 불균형을 백분율로 지정. 만약 세팅된 비율을 웃돌면 클러스터는 리더쉽 재균형을 맞춘다. 기본값 10.offset.metadata.max.bytes
클라이언트에 오프셋과 함께 저장될 수 있게 허락된 최대 메타데이타 양. 기본값 4096.max.connections.per.ip
브로커가 지정된 IP에서 수용 할 수 있는 최대 커넥션 수. 기본 214748367 (=Int.MaxValue)connections.max.idle.ms
브로커가 소켓을 닫기전 기다리는 최대 idle 시간. 기본 600000 (10 * 60 * 1000L)unclean.leader.election.enable
true 일때, ISR(in-sync replicas) 가 아닌(=out-of-sync) 복제가 리더가 된다. 데이터 소실이 생길 수 있다. 기본 true새로 선출된 리더에 동기화 되지 않은 메세지들은 소실된다. 이 기능은 가용성과 신뢰성간 균형을 제공한다. 이 옵션이 꺼질때(false), 만약 브로커가 향후 사용하지 않게 될 파티션에 대한 리더 복제를 포함 하고 있고, 동기화 되지 않았지만 리더와 대체할 복제가 존재하면, 리더 복제나 다른 동기화된 복재가 온라인이 될때 까지 그 파티션은 사용할 수 없게 된다.