본문 바로가기

공부 기록/영상 후기

kafka 조금 아는 척하기 2 (개발자용) - 프로듀서

https://youtu.be/geMtm17ofPY?si=aDYPrpk_JttKW-ii

프로듀서

- 토픽에 메시지 전송(토픽, 키, 값)

 

 

Sender와 send()는 서로 다른 쓰레드로 동작

 

 

전송 결과 확인 안 함

producer.send(new ProducerRecord<>("simple", "value"));

- 전송 실패를 알 수 없음

- 실패에 대한 별도 처리가 필요 없는 메시지 전송에 사용

 

전송 결과 확인함 - Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
    RecordMetadata meta = f.get(); // 블로킹
} catch (ExecutionException e) {
}

- 배치 효과 떨어짐 -> 처리량 저하

   => 처리량이 낮아도 되는 경우에만 사용

 

전송 결과 확인함 - Callback 사용

producer.send(new ProducerRecord<>("simple", "value"),
    new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception ex) {
        }
    });

- 블로킹 방식이 아니기 때문에 처리량 저하 없음

 

전송 보장과 ack

- ack = 0

   => 서버 응답을 기다리지 않음

   => 전송 보장도 zero

- ack = 1

   => 파티션의 리더에 저장되면 응답 받음

   => 리더 장애 시 메시지 유실 가능

- ack = all (또는 -1)

   => 모든 리플리카에 저장되면 응답 받음

      ==> 브로커 min.insync.replicas 설정에 따라 달라짐

 

ack + min.insync.replicas

- min.insync.replicas(브로커 옵션)

   => 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카의 최소 개수

예1) 리플리카 개수 3, ack = all, min.insync.replicas = 2 일 때, 리더에 저장하고 팔로워 중 한 개에 저장하면 성공 응답

예2) 리플리카 개수 3, ack = all, min.insync.replicas = 1 일 때, 리더에 저장되면 성공 응답(ack = 1과 동일, 리더 장애 시 메시지 유실 가능)

예3) 리플리카 개수 3, ack = all, min.insync.replicas = 3 일 때, 리더와 팔로워 2개에 저장되면 성공 응답(팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패)

 

에러 유형

- 전송 과정에서 실패

   => 전송 타임 아웃(일시적인 네트워크 오류 등)

   => 리더 다운에 의한 새 리더 선출 진행 중

   => 브로커 설정 메시지 크기 한도 초과

   => 기타 등등

- 전송 전 실패

   => 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과

   => 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과

   => 기타 등등

 

실패 대응1 - 재시도

- 재시도 가능한 에러는 재시도 처리(ex. 브로커 응답 타임 아웃, 일시적인 리더 없음 등)

- 재시도 위치

   => 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도 -> retries 속성

   => send() 메서드에서 익셉션 발생 시 익셉션 타입에 따라 send() 재호출

   => 콜백 메서드에서 익셉션 발생 시 익셉션 타입에 따라 send() 재호출

- 아주 아주 특별한 이유가 없다면 무한 재시도 X -> 전체 메시지가 밀리지 않도록

 

실패 대응2 - 기록

- 추후 처리를 위해 기록

   => 별도 파일, DB 등을 이용해서 실패한 메시지 기록

   => 추후에 수동(또는 자동) 보정 작업 진행

- 기록 위치

   => send() 메서드에서 익셉션 발생 시

   => send() 메서드에 전달한 콜백에서 익셉션을 받는 경우

   => send() 메서드가 리턴한 Future의 get() 메서드에서 익셉션 발생 시

 

* 참고 - enable.idempotence 속성

 

재시도와 순서

- max.in.flight.requests.per.connection

   => 블로킹 없이 한 커넥션에서 전송할 수 있는 최대 전송 중인 요청 개수

   => 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음 -> 전송 순서가 중요하면 이 값을 1로 지정