본문 바로가기

공부 기록/영상 후기

kafka 조금 아는 척하기 3 (개발자용)- 컨슈머

https://youtu.be/xqrIDHbGjOY?si=yvPFI6gIrgZF68JM

컨슈머

- 토픽 파티션에서 레코드 조회

 

토픽 파티션은 그룹 단위 할당

- 컨슈머 그룹 단위로 파티션 할당

- 컨슈머 개수가 파티션 개수보다 커지면 X

 

 

커밋된 오프셋이 없는 경우

- 처음 접근이거나 커밋한 오프셋이 없는 경우

- auto.offset.reset 설정 사용

   => earliest : 맨 처음 오프셋 사용

   => latest : 가장 마지막 오프셋 사용(기본값)

   => none : 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생

 

컨슈머 설정 - 조회에 영향을 주는 주요 설정

- fetch.min.bytes : 조회 시 브로커가 전송할 최소 데이터 크기

   => 기본값 1

   => 이 값이 크면 대기 시간은 늘지만 처리량이 증가

- fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간

   => 기본값 500

   => 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름

- max.partition.fetch.bytes : 파티션 당 서버가 리턴할 수 있는 최대 크기

   => 기본값 1048576(1MB)

 

자동 커밋/수동 커밋

- enable.auto.commit 설정

   => true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)

   => false : 수동으로 커밋 실행

- auto.commit.interval.ms : 자동 커밋 주기

   => 기본값 5000

- poll(), close() 메서드 호출 시 자동 커밋 실행

 

 

재처리와 순서

- 동일 메시지 조회 가능성 -> 일시적 커밋 실패, 리밸런스 등에 의해 발생

- 컨슈머는 멱등성(idempotence)을 고려해야 함

   => 예 : { 조회수 1 증가 -> 좋아요 1 증가 -> 조회수 1 증가 } 를 재처리할 경우, 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음

- 데이터 특성에 따라 타임스탬프, 일련 번호 등을 활용

 

세션 타임아웃, 하트비트, 최대 poll 간격

- 컨슈머는 하트비트를 전송해서 연결 유지

   => 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행

   => 관련 설정 : session.timeout.ms(세션 타임 아웃 시간(기본값 10초)), heartbeat.interval.ms(하트비트 전송 주기(기본값 3초), session.timeout.ms의 1/3 이하 추천)

- max.poll.interval.ms : poll() 메서드의 최대 호출 간격

   => 이 시간이 지나도록 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 

 

주의점!

- 카프카 컨슈머는 쓰레드에 안전하지 않음

   => 여러 쓰레드에서 동시에 사용하지 말 것

   => wakeup() 메서드는 예외