kafka centos 확인사항

kafka 2017. 7. 12. 11:14

single instance 이고 zookeeper는 kafka 기본제공되는것으로 사용함

zookeeper와 broker 도 1:1로 설정해서 기타 설정 건드리지 않음. 

실행시 입력값은 window와 동일 



download https://kafka.apache.org/downloads


wget 으로 원하는 버전 download


tar -xvf kafka.tar


${kafka_home|/config/zookeeper.properties dir 경로 수정

${kafka_home|/config/server.properties dir 경로 수정


버전  9이하

host.name = 0.0.0.0 혹은 주석

advertised.host.name = public ip 혹은 public host name (consumer나 producer에서 접속할 ip혹은 도메인)



버전 10이상

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092


consumer 콘솔이나 producer 콘솔을 다른 서버에서 접속할 경우 

config/consumer.properties 에 zookeeper 접속 주키버 주소로 변경

config/producer.properies에 meta.broker 머시기 접속할 broker 주소로 변경



--------------------------------

하단 참고>> https://free-strings.blogspot.kr/2016/04/blog-post.html


broker.id

기본값은 0이다. 클러스터내에서 브로커를 구분 하는 이름으로 사용되며, 브로커를 구분하는 유니크한 값이기 때문에 브로커 호스트가 바뀌거나(다른 서버로 옮기거나) 포트가 바뀌어도 컨슈머(consumer) 설정을 바꾸지 않아도 된다.


host.name

기본값은 null이다. 값을 넣지 않으면 시스템에 있는 모든 인터페이스에 바인딩하고, 지정하면 특정 IP에 바인딩을 한다. 그래서 클라이언트가 특정 인터페이스에 접속하게 하려면 host.name을 지정하면 된다. 

port

카프카 브로커가 열고 있는 소켓 포트를 말한다.

log.dirs

카프카 브로커가 메세지를 파일로 저장 할 디렉토리를 말하는데,  콤마(,)로 구분해 여러 디렉토리를 지정 할 수 있다. 기본값은 /tmp/kafka-logs이다. 

기본 설정으로 브로커 하나를 띄워도 되고, 위 4가지만 설정만 조정하면 여러 브로커 인스턴스로 구성된 카프카 클러스터도 구성 할 수도 있다.  아래에서 살펴보자.

그 밖에,  advertised.host.name, advertised.port 가 있는데,
advertised.host.name은 프로듀서, 컨슈머 그리고 다른 브로커들에 주어지는 호스트 이름으로 지정하지 않으면 host.name이 사용된다. advertised.port도 마찬가지로 프로듀서나 컨슈머 그리고 브로커에 접속 할 때 지정하는 포트로 지정되지 않으면 port 설정이 사용된다.

브로커가 시작될때 주키퍼에 IP/PORT가 등록 될때, InetAddress.getLocalHost.getHostAddress가 사용된다. 그래서 클라우드 환경이나 Docker, Vagrant 같은 환경일 경우는 가끔 consumer/producer가 브로커에 접속이 되지 않는 경우가 생긴다.  이때는 host.name을 속성을 지정하면 해결 할 수있다. 드물게 브로커에 바인딩된 IP/PORT와 클라이언트가 사용하는 IP/PORT가 다를 경우는 advertise.host.name, advertise.port를 지정해 주면 된다.

쓰레드 및 네트워크 설정


message.max.bytes

서버가 받을 수 있는 메세지의 최대 크기를 말한다. 그래서 컨슈머의 fetch.message.max.bytes와 값을 맞추는게 중요하다.  프로듀서에 값을 지정해서 예상치 않은 큰 메세지를 보내는 것을 방지 하기위해 값을 조절하기도 하지만, 기본값 보다 큰 길이의 메세지를 처리하려면 조절해야 하는 값이다. 기본값은 1000000(1M)이다. 

스택오버플로우 참

Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.

Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).

Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer. 

num.network.threads

네트워크 요청을 처리하는 쓰레드 수. 기본값은 3.

네트워크 요청 처리란 NIO Selector (java.nio.channels.Selector)개수를 말한다.

SocketServer.scala#L54SocketServer.scala#L92 , SocketServer.scala#394 , Selector.java#L109

    /**
     * Create a new nioSelector
     */
    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
        } catch (IOException e) {
            throw new KafkaException(e);
        }

num.io.threads

IO 가 생길때 마다 spawn되는 쓰레드 수로, 로그를 저장하는 데 사용 되는 디스크 수와 동일하게 지정 하는것을 권장한다. 기본값은 8.

background.threads

여러가지 백그라운드 작업을 하는 쓰레드 수를 말함. 오래된 로그를 삭제하는 쓰레드도 포함하고 있다. 기본 값은 10

queued.max.requests

IO 쓰레드가 처리하는 동안 지연된 메세지를 보관하는 큐의 크기. 큐가 가득차면 네트워크 쓰레드는 더이상 메세지를 받지 않는다.

socket.send.buffer.bytes

소켓 커넥션에 사용될 SO_SNDBUF 버퍼 크기. 기본값은 102400.

소켓을 생성하면 소켓에는 send buffer와 receive buffer가 각각 할당 되는데,  send buffer에 대한 설정이다. 커널에 힌트를 주는 값으로 어프리케이션에서 설정한 값대로 적용되지는 않는다. 가령 0으로 설정하거나 가용한 최대값을 넘었을 경우 커널이 자동 조절 한다.
Set a hint the size of the underlying buffers used by the platform for outgoing network I/O. When used in set, this is a suggestion to the kernel from the application about the size of buffers to use for the data to be sent over the socket. When used in get, this must return the size of the buffer actually used by the platform when sending out data on this socket.

socket.receive.buffer.bytes

소켓 커넥션에 사용될 SO_RCVBUFF 버퍼 크기. 기본값은 102400.

socket.request.max.bytes

서버가 받을 수 있는 최대 요청 개수.  서버 메모리가 고갈 되는걸 방지한다. 자바 힙크기 보다 작게 설정해야 한다. 기본값은 104857600.

num.partitions

토픽당 로그 파티션의 기본 개수. 파티션 크기를 명시적으로 지정하지 않아도 된다.


로그 설정


log.segment.bytes

단일 로그파일의 최대 크기를 의미한다. 로그파일이 세그먼트 크기만큼 되면 새로운 세그먼트 파일이 생긴다. 토픽은 디렉토리안에 여러 세그먼트 파일로 저장된다. 토픽마다 기본적용 되며(per-topic) 기본 값은 1GB.

log.roll.{ms,hours}

새로운 세그먼트 파일이 생기는 주기. 세그먼트 크기가 되지 않아도 주기별로 파일이 나뉜다. 토픽 기본설정. 기본값은 7일

log.cleanup.policy

"delete"와 "compact" 두가지 모드가 있다. "delete"는 시간 설정이나 크기 설정에 따라 주기적으로 로그 세그먼트를 삭제하고 "compact"는 필요없는 레코드를 지운다. 토픽 기본설정(per-topic)이다.

log.retention.{ms,minutes,hours}

로그 세그먼트가 보관될 시간. 토픽 기본설정(per-topic). 기본 7일.

log.retention.bytes

삭제전까지 파티션별로 보관할 로그 바이트 수. 토픽 기본설정(per-topic).  설정한 로그 시간이나 크기가 되면 세그먼트는 삭제됨.

log.retention.check.interval.ms

로그를 삭제 하기 위해 체크하는 주기. 기본 5분.

log.cleaner.enable

로그 compaction 설정. 기본 true.

log.cleaner.threads

로그 compaction 작업을 할 쓰레드 수. 기본 1.

log.cleaner.backoff.ms

로그 cleaning이 필요한지 체크하는 주기. 기본 15000

log.index.size.max.bytes

로그 세그먼트의 최대 오프셋 인덱스. 토픽 기본(per-topic). 기본값 10485760
인덱스에 대한 설명(OffsetIndex.scala) ... 인덱스는 특정 로그 세그먼트의 위치를 물리적인 파일에 오프셋을 매핑한다. 이 인덱스는 Sparse 하다: 로그에 전체 메세지를 위한 엔트리를 보관하지 않는다.
인덱스는 최대 8-byte entry의 고정된 숫자로 미리 할당되어 파일로 저장된다.
인덱스는 이 파일의 메모리맵 검색을 지원한다. 타겟 오프셋과 같거나 가장 큰 오프셋보다 작은 오프셋/위치 쌍의 위치를 찾기위해 간단한 변종 이진검색을 한다.
인덱스 파일은 두가지 방식으로 열린다: 비어있거나 변경될 수 있는 인덱스로 추가가 될 수 있는 인덱스 파일 또는 이전에 내용이 덧붙여져(populated) 변경될 수 없는 읽기전용 인덱스 파일... 

why-index-file-exists-in-kafka-log-directory 

log.index.interval.bytes

오프셋 인덱스에 새엔트리가 추가될 주기로, 브로커는 패치 요청마다 로그파일을 정확한 위치를 찾을 때 까지 순차적으로 스캔한다. 기본값은 4096.


log.flush.interval.message

디스크에 플러쉬 될때 까지 메모리에 유지될 메세지의 숫자.


로그 압축(compaction) (관련링크) 로그 압축은 카프카가 단일 파티션의 데이터를 담은 로그에 있는 각 키값들이 항상 가장 마지막 값을 보관 할 수 있게 보장한다. 어플리케이션이 크래시 난 후, 시스템이 실패했을때 또는 Operational maintenance 동안 어플리케이션이 재시작 한 후 크래시 된것들을 재로딩하는 유즈케이스나 시나리오등에 사용 될 수 있다.  ... 이런 작업은 각 레코드가 독립적인(stands alone) 로깅 즉, 임시 이벤트 데이타 같은 데는 적합하다. 그렇지만, 데이터 스트림에서 중요한것은 변경이 중심이된(keyed)로그, 변경 데이타(mutable data)다 – 예, 데이터베이스 테이블 변경)
스트림과 같은 구체적인 예를 다뤄보자. 우리가 사용자 메일 주소를 포함하는 토픽을 가지고 있다고 하자; 매시간 사용자는 메일 주소를 업데이트 한다. 우리는 사용자의 USER ID를 주키(primary key)로 이 토픽에 메세지를 보낸다. 자 우리가 특정 시간에 걸쳐 아래 메세지를 보낸다고 해보자.
123 => bill@microsoft.com
            .
            .
            .
123 => bill@gatesfoundation.org
            .
            .
            .
123 => bill@gmail.com


각 메세지 주키에 대해 최소한 가장 마지막 업데이트를 보관 될 수 있게 보장하기 위해서 로그 압축은 너무 잘게 나눠지지 않은(granular)보관 메커니즘을 제공한다. 이 메커니즘은 단지 최근에 변경된 키만 보관하는게 아니라 모든 키의 마지막 값을 가지는 전체 스냅샷 로그를 보장하는 것이다. 이것은 다운스트림 컨슈머가 전체 변경로그를 보관하지 않고 토픽과 별개로 스스로의 상태를 복구 할 수 있음을 의미한다.

몇가지 유용한 사용예를 살펴보면,,

1. 데이터베이스 변화 구독. 여러 데이터 시스템에 데이터셋을 가질 필요가 종종 있고 대개 이 시스템중 하나가 어떤 종류의 데이터베이스(RDBMS, key-value store,,) 이기도 하다. 예를들어, 데이터베이스,  캐쉬, 검색 클러스터, 그리고 하둡 클러스트를 가지고 있을 수 있다. 각 시스템이 데이터베이스에 가하는 변경은 캐쉬에, 검색 클러스터에, 그리고 결과적으로 하둡에 반영될 필요가 있다.  시스템중 하나가 실시간 업데이트 처리만 하는 경우는 단지 최신 로그만 필요하다. 그러나 만약 캐쉬를 다시 로딩할 수 있는걸 원하거나 실패한 검색노드를 복구하길 원한다면, 일부가 아닌 전체 데이터 셋이 필요할 수 있다.
2. 이벤트 소싱https://msdn.microsoft.com/en-us/library/jj591559.aspx#sec1, http://martinfowler.com/eaaDev/EventSourcing.html  이벤트 소싱은 어프리케이션의 현재 상태를 결정하는 히스토리를 저장해서 어프리케이션의 상태를 보관하는 방법.
3. 저널링 파일 시스템 : https://ko.wikipedia.org/wiki/%EC%A0%80%EB%84%90%EB%A7%81_%ED%8C%8C%EC%9D%BC_%EC%8B%9C%EC%8A%A4%ED%85%9C
고가용성을 위한 저널링. 변경을 로깅 함으로써 로컬 계산 처리가 Fault-tolerant 될 수 있게 한다. 이 변경들은 로컬 변경을 만들 수 있게 해서 다른 프로세스가 이 변경을 재로딩 할 수 있고 만약 실패 하더라도 계속 진행 할 수 있게 한다. 구체적인 예로 카운트 처리, 데이터 통합, 그리고 "group by"  (스트림 쿼리 시스템에서 처리와 같은)게 있다.  실시간 스트림 처리 프레임워크인 Samza는 정확히 이 목적으로 사용한다.

각각의 경우에서 한가지는 주로 변경의 실시간 피드 처리가 주로 필요하다. 그러나 경우에 따라, 머신이 크래쉬 되거나 데이터가 재로딩 될 필요가 있거나 재처리 될 필요가 있을때 한가지는 전체 로딩을 필요로 한다는 것이다. 로그 압축은 같은 토픽에서 이런 사례 모두를 피딩되게 한다. 이런 로그의 사용 스타일은 https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying 에서 더 자세히 설명 되었다.

보통 아이디어는 꽤 단순하다. 만약 우리가 제약없이 로그를 보관 한다면, 그리고 우리가 위 케이스들에서 각 변경을 로깅 한다면, 우리는 최초 시작부터 매번 시스템의 상태를 캡쳐 해야 한다. 이 전체 로그를 사용함으로써 처음 N개의 레코드를 재연(replay)해서 제때 어떤 지점으로  복구 할 수 있다. 이 가상적인 전체 로그는 아주 현실적이지지 않다. 로그로 단일 레코드를 많이 업데이트 하는 시스템이라면 끝없이 커질것이다. 오래된 업데이트를 버리는 간단한 로그 보관 메커니즘은 공간을 제약하지만 로그는 더이상 현재상태를 복구 할 방법이 없다.
로그 압축은 레코드당 보관법에 finer-grained를 주는 메커니즘이다. coarser-grained한 시간기반 보관법보다 이 아이디어는 같은 키로 더 최신 업데이트를 가져 선택적으로 레코드를 삭제한다. 이 방법은 최소한 키별로 마지막 상태를 가지는 로그가 보장된다.
이 보관 정책은 토픽단 설정 될 수 있다. 그래서 단일 클러스터는 크기나 시간으로 보관이 강제된 몇몇 토픽을 가질 수 있고 다른  토픽들은 압축이 강제된 보관 정책을  가질 수 있다.
이 기능은 링크드인의 가장 오래된것들중 하나에서 영감을 받았다 그리고 가장 성공한 인프라의 부분이다. 데이터베이스 변경 로그 캐싱 서비스인 Databus -  대부분의 구조화된 로그 시스템 같이 않게 카프카는 구독을 위해 만들어 졌다 그리고 빠른 순차 읽기와 쓰기를 위해데이터를 조직화 한다. Databus와 달리, 카프카는 source-of-truth 저장 동작을 한다. 그래서 심지어 업스트림 데이터 소스가 재연(replayable) 될 수 없는 상황에서도 유용하다.  

복제 세팅



default.replication.factor

자동으로 생성된 토픽에 대한 기본 복제 인수(factor)다. 기본 1

replica.fetch.max.bytes

팔로워가 리더에서 복제 할때 패치할 데이터의 최대 바이트 양. 1048576

replica.fetch.wait.max.ms

복제 패치 요청에 리더가 응답할 최대 시간. 기본 500

num.replica.fetchers

리더에서 메세지를 복제할때 사용할 쓰레드 수로 쓰레드 수를 늘리면 IO 비율도 높아짐. 기본 1

replica.high.watermark.checkpoint.interval.ms

 각 복제가 복구를 위해 디스크에 워터마크를 저장하는 빈도. 기본 5000


주키퍼 세팅


zookeeper.connect

hostname:port 형식으로 주키퍼 커넥션을 지정한다. 콤파(,)로 구분해 여러 주키퍼 노드를 지정 할 수 있다.  여러 노드를 지정하는 것은 주키퍼 노드 하나가 다운 되어도 카프카 클러스터가 신뢰도와 지속성을 가지게 하기 위함. 주키퍼는 특정 패스에 카프카 데이타를 만들 수 있게  chroot 패스를 허용한다.  chroot는 동일 주키퍼 클러스터가 여러 카프카 클러스터를 지원 할 수 있게 한다.  예) host1:port1,host2:port2,host3:port3/chroot/path. 마지막 설정은 모든 클러스터 데이터를 /chroot/path에 넣는다. 이 패스는 반드시 카프카가 시작되기 전에 생성되어야 한다. 그리고 컨슈머는 반드시 동일한 문자열(패스)을 사용해야 한다.

zookeeper.session.timeout.ms

서버로부터 하트비트를 받지 못하면 죽은것으로 간주한다. 이 값은 반드시 주의깊게 설정되어야 하는데, 만약 이 하트비트가 주기 너무 길면 죽은서버를 감지하지 못하고, 짧으면 살아 있는 서버가 죽은것으로 간주 된다. 기본값은 6000

zookeeper.sync.time.ms

주키퍼 팔로워가 리더 뒤에 있을 수 있는 시간을 지정한다? 기본 2000.


기타 설정


auto.create.topics.enable

이 값을 true로 놓았을때, 만약 존재하지 않는 토픽에 메타데이터를 패치하거나 메세지를 생성하면,  자동으로 생성된다. 운영환경에서는 false로 둬야한다.

controlled.shutdown.enable

true로 놓았을때, 브로커에 셧다운이 호출되면, 그리고 이 브로커가 어떤 토픽의 리더일때, 리더를 셧다운전에 다른 브로커로 옮긴다. 이 설정은 시스템 가용성을 높일 수 있다. 기본값은 true.

auto.leader.rebalance.enable

true일때, 가능하다면 각 파티션의 우선하는 복제에 주기적으로 리더쉽을 세팅해서,  브로커는 자동으로 브로커간 파티션의 리더쉽 균형을 맞추려고 시도한다. (우선하는 복제??) 기본값 true.

leader.imbalance.per.broker.percentage

브로커별로 허락된 리더 불균형을 백분율로 지정. 만약 세팅된 비율을 웃돌면 클러스터는 리더쉽 재균형을 맞춘다. 기본값 10.

offset.metadata.max.bytes

클라이언트에 오프셋과 함께 저장될 수 있게 허락된 최대 메타데이타 양. 기본값 4096.

max.connections.per.ip

브로커가 지정된 IP에서 수용 할 수 있는 최대 커넥션 수. 기본 214748367 (=Int.MaxValue)

connections.max.idle.ms

브로커가 소켓을 닫기전 기다리는 최대 idle 시간. 기본 600000 (10 * 60 * 1000L)

unclean.leader.election.enable

true 일때,  ISR(in-sync replicas) 가 아닌(=out-of-sync) 복제가 리더가 된다. 데이터 소실이 생길 수 있다. 기본 true

새로 선출된 리더에 동기화 되지 않은 메세지들은 소실된다. 이 기능은 가용성과 신뢰성간 균형을 제공한다. 이 옵션이 꺼질때(false), 만약 브로커가 향후 사용하지  않게 될 파티션에 대한 리더 복제를 포함 하고 있고, 동기화 되지 않았지만 리더와 대체할 복제가 존재하면, 리더 복제나 다른 동기화된 복재가 온라인이 될때 까지 그 파티션은 사용할 수 없게 된다.  

offsets.topic.num.partitions

오프셋 커밋 토픽에 대한 파티션 수. 이 값은 디플로이 되고 나면 변경 될 수 없어서, 제한보다 높게 설정되는게 좋다. 기본 값은 50

offsets.topic.replication.factor

오프셋 커밋 토픽에 복제 인수(factor)를 지정. 지정값보다 더 높으면,  그 시점에 토픽을 생성하고 지정값 보다 낮으면 생성된 복제 수가 브로커 수와 같게 된다. 기본값 3.

offsets.topic.segment.bytes

오프셋 토픽의 세그먼트 크기. 값이 작게 유지되면 리더가 로그 압축과 로딩이 더 빠르다.

offsets.load.buffer.size

오프셋 세그먼트를 오프셋 메니저의 캐시로 읽어들이는데 사용될 버퍼 크기.

offsets.commit.required.acks

오프셋 커밋이 받아들여지기 전에 요구되는 ack(acknowledgement) 수. 기본값 -1.


'kafka' 카테고리의 다른 글

카프카 윈도우  (0) 2017.07.07
Posted by 보통 사랑 :D
,

카프카 윈도우

kafka 2017. 7. 7. 10:02

apache kafka(아파치 카프카)

- open-source message broker

- 메시지 전송, 메시지 검증, 메시지 라우팅 프로젝트

 

윈도우 환경에서의 카프카 서버 실행
JAVA_HOME이 환경변수에 잡혀있어야 한다.(자바 sdk 경로)

다운로드
- http://kafka.apache.org/downloads.html 에서 binary 버전을 다운받는다.

서버 실행(주키퍼와 카프카가 모두 실행되야 함)
1. 시작-실행-cmd 엔터
2. cd 카프카 경로/bin/windows (ex: C:\dev\kafka_2.9.2-0.8.2-beta\bin\windows)
3. 주키퍼, 카프카 실행(cmd를 2개 띄워서 각각 실행)
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties

로컬에서 테스트 명령어(주키퍼와 카프카 실행 된 상태에서 각각 cmd창 띄운 상태에서 하면 됨)


토픽 생성
kafka-topics.bat --zookeeper localhost:2181 --topic test --create --partitions 1 --replication-factor 1

 

토픽 리스트확인
kafka-topics.bat --zookeeper localhost:2181

 

프로듀서 생성
kafka-console-producer.bat --broker-list localhost:9092 --topic test
컨슈머까지 생성 후 메시지 입력하면 컨슈머로 전송됨

 

컨슈머 생성 
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 

'kafka' 카테고리의 다른 글

kafka centos 확인사항  (0) 2017.07.12
Posted by 보통 사랑 :D
,

구글 스프레드시트로 엑셀파일 편집하기



1. 구글 로그인 후 우측 상단 툴박스 클릭 드라이브 선택



2. create 클릭





3. Speradsheet 클릭





4. 스프레드 시트 열리면 File>Open 선택




4.Upload 선택후 드래그앤드롭 혹은 Select a file ypur computer 선택하여 파일 업로드




5. 파일편집




6. FIle>Download as>Microsoft Excel(.xlsx) 선택




7.다운로드 된 파일 클릭 (아래 화면은 크롬으로 다운로드 한 경우입니다)




8.파일을 엑셀로 열어보면 정상적으로 편집되었음을 확인 할 수 있습니다. 




9. 크롬의 경우 도구바에서 다운로드를 클릭하여 다운로드 폴더열기를 할 수 있습니다.

주소창에 chrome://downloads 를 입력하여 다운로드 목록으로 이동할 수도 있습니다. 




10. 다운받은 파일은 다운로드 폴더에 저장됩니다. 



11. window7 기준 파일관리자의 다운로드 클릭시 해당 폴더로 이동가능합니다


Posted by 보통 사랑 :D
,