SparkStreaming & Kafka 연동
팀에서 실시간 리포트입력 소스를 kafka로 변경하는 작업을 했다. 이 글은 작업을 하는 동안 배운 것을 정리한다.
먼저 스트림 처리를 신뢰도에 따라 다음과 같이 세가지로 구분법을 알아보자!
- At-most-once(최대 한 번): 데이터 유실이 있을 수 있어, 추천하지 않는 방식
- At-least-once(적어도 한 번): 데이터 유실은 없으나 재전송으로 인해 중복이 생길 수 있음. 대부분의 경우 충분한 방식
- Exactly-once(딱 한 번): 데이터가 오직 한 번만 처리되어 유실도 중복도 없음. 모든 상황에 대해 완벽히 보장하기 어렵지만 가장 바라는 방식
회사에서 사용하고 있는 Spark 버전이 1.5이므로 1.5까지 SparkStreaming의 변천사를 보자.
- v1.0 - Graceful shutdow기능
- Driver로 부터 중단 신호를 받으면 Receiver는 더 이상 데이터를 가져오지 않고, Driver는 Receiver가 이미 받아온 데이터가 처리될 때까지 기다렸다가 중지한다.
- v1.2 - 데이터 유실방지를 위한 WAL(Write Ahead Logs)
- v1.3 - Kafka Direct API
- WAL를 사용한 처리방식은 성능저하를 발생시킴
- Kafka Direct API는 receiver를 사용하지 않고 executor에서 직접 Kafka의 Simple Consumer API를 사용하여 offset 범위를 지정한 만큼 데이터를 받아서 처리한다. 스트림 처리를 위한 KafkaUtils.createDirectStream() 외에 KafkaUtils.createRDD()를 제공해 Kafka 데이터를 배치 처리할 수 있는 기능도 추가됨
- Kafka Direct API로 exactly-once를 보장하게 되어 비로소 유실 없는 스트림 처리에 Spark Streaming을 사용할 수 있게 됨.
- v1.4 - UI향상 DataBricks Blog
- Processing Time, Scheduling Delay를 확인 할 수 있어 배치 시간을 설정하는데 도움이 됨.
- v1.5 - Back Pressure
- Back Pressure은 시스템이 불안정 상태일 경우에 데이터를 받아오는 양을 조절해 주는 기능이다.
- 갑자기 많은 양의 데이터가 몰려 들어오거나 분산환경에서 일시적으로 처리지연이 발생할 경우 시스템이 동적으로 Input Rate을 조절해 다시 안정을 찾도록 한다.
앞선 버전별 설명에서 보면 SparkStreming에서 Kafka를 연동에 두가지 방법이 존재한다.
1) Kafka API
이방식은 Receiver를 통해 kafka로 부터 Data를 전달 받는다. 그리고 데이터 유실을 방지하기 위해 WAL를 사용해 kafka로 부터 받은 데이터를 저장을 하면서 처리한다. Receiver를 사용하는 경우, 처음 receiver를 시작할 때 만들어진 Kafka 연결을 계속 사용하기 때문에, 매 배치마다 연결 지연이 발생하지 않음.
WAL을 사용한 receiver 방식은 Kafka로부터 받은 데이터가 WAL에 먼저 저장되는데, Zookeeper에 Kafka의 offset을 갱신하기 전에 시스템이 실패할 경우, Spark Streaming은 데이터를 받았다고 인식하지만 Kafka는 데이터를 전달하지 않았다고 인식하는 불일치가 발생합니다. 이후 시스템이 복구되면 Spark Streaming은 WAL에 저장된 데이터를 읽어 복구 처리를 하고 receiver를 동작시킵니다. Receiver는 Kafka로부터 다음 데이터를 받아오려고 하는데, Kafka는 Zookeeper에 저장된 offset 정보를 보고, 이전 데이터를 전달하지 않았다고 보게 되므로, 보낼 필요가 없는 데이터를 다시 전송한다. 결국 복구 처리를 두 번하게 된다.(At-least-once)
그리고 다음과 같이 DStream을 생성한다.
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
2) Kafka Dircet API
가장 큰 특징은 Receiver를 사용하지 않는다. 그래서 다음과 같은 특성이 있다.
- 매 배치마다 새로 할당받은 executor에서 kafka에 연결해서 데이터를 가져와 약간의 지연이 발생함 하지만 Exactly-once를 보장한다.
- Kafka의 각 파티션은 하나의 executor에서 직접 받아오고, 하나의 RDD 파티션이 생성된다. 즉, Kafka의 파티션 수만큼 병렬화하여 데이터를 가져온다. kafka파티션수로 병렬수준을 쉽게 설정할 수 있다.
- Zookeeper에 offset 정보를 갱신하지 않는다. offset정보를 직접 Handling해야한다.
Kafka Direct API 방식에서 checkpoint를 사용하면 driver는 장애 복구를 위해 어플리케이션의 메타 데이터를 HDFS나 S3처럼 신뢰성 있는 저장소에 저장한다. 메타 데이터에는 어플리케이션 설정 정보와 스케쥴러에 등록되었지만(queued) 아직 완료되지 않은 배치 작업에 대한 정보를 저장한다. 배치 작업 정보에는 offset 정보도 포함되어 있어 driver 장애로 중단된 스트림 처리를 복구할 때 저장된 메타 데이터를 읽어 불필요한 재전송 없이 장애가 발생했던 부분부터 처리할 수 있습니다.
여기서 offset를 관리하는 방법은 다음과 같다.
- kafka에서 offset정보를 읽어온다.
- shuffle이 발생하기전에 offset을 Hbase, HDFS, Kafka, Zookeeper등에 저장한다.
- 비정상적으로 종료되었다가 재시작되면 저장되어 있던 offset부터 읽어 다시 처리한다.
쉽게 offset을 저장하는 방법은 Spark의 checkpoint에 저장하는 방법있다.
- Kafka의 offset 정보를 직접 접근할 필요가 없다. Spark Streaming이 알아서 checkpoint의 메타 데이터에 offset 정보를 저장한다
- Spark Streaming을 다시 시작하더라도 자동으로 checkpoint에 저장된 offset 정보를 읽어서 이어서 처리할 수 있다.
하지만 여기에 다음과 같은 문제가 있다.
- Spark Streaming이 알아서 저장하기 때문에 최종 결과를 저장했음에도 불구하고 offset을 checkpoint에 저장하지 못하고 장애가 발생하는 경우가 발생할 수 있다.
- 결과 저장 단계에서 트랜잭션을 지원하면 두 번의 트랜잭션이 발생할 수 있다. 따라서 결과 저장이 멱등해야 한다.
- checkpoint에 저장된 메타 데이터를 읽을 수 없으면, 재시작을 하더라도 마지막 처리된 이후부터 처리할 수 없다.
- checkpoint 정보는 직렬화(serialized)된 객체를 저장하는데, 역직렬화(deserialize)할 때 클래스가 바뀌면 에러가 발생하며 실패한다.
- 어플리케이션이 시스템 장애로 재시작하면 코드 변화가 없기 때문에 역직렬화 문제가 발생하지 않는다.
- 스트림 처리를 재시작하는 경우는 대부분 어플리케이션이 데이터를 잘못 처리해서 죽어버리거나 잠재적인 오류를 수정하여 어플리케이션을 재배포하는 경우이기 때문에, 어플리케이션 코드가 동일한 경우가 별로 없다. 따라서 checkpoint를 믿고 사용하기에는 불안정한 요소가 많다.
- window 연산은 checkpoint를 사용할 수 밖에 없는데, 어플리케이션 코드 변경에 취약하다는 점을 알고 사용해야 한다.
회사에서는 관리의 편의상 Zookeeper에 offset을 저장하는 방법으로 실시간리포트를 구현하였다.
소스는 다음과 같다.
3) SparkStreaming에서 주의 할점
1. Spark Executor 메모리 설정
- 데이터 원천으로부터 읽은 데이터를 executor의 메모리에 담아두어야 하기 때문에, executor의 메모리 설정은 하나의 배치 단위에서 받아올 수 있는 최대 데이터 크기보다 크게 설정해야 한다.
- Window 연산을 할 경우, window 크기만큼 여러 배치의 데이터를 모두 메모리에 올릴 수 있어야 합니다.
2. 최대 처리율(max rate) 설정
- Kafka Direct API를 사용할 때 최대 처리율을 지정하는 옵션은
spark.streaming.kafka.maxRatePerPartition
이다. 이 설정은 파티션당 초당 처리량이라는 점을 주의 해야한다. 배치 간격이 10초이고 최대 처리율이 1000이고, 파티션 수가 4개라면 데이터는 최대 4 * 1000 * 10 = 4000만큼 받아옵니다. - 최대 처리율을 설정하지 않으면 Spark Streaming을 중단했다가 재시작할 때, 그동안 처리하지 않고 쌓인 데이터를 한꺼번에 받아와서 처리하려고 한다.
- 배치 간격 동안 충분히 처리할 수 있는 데이터 양이라면 문제가 없지만, 중단 시간이 길어질수록 데이터 양은 많아지기 때문에 배치 간격 내에 처리하기 어려워지고 지연시간이 증가하거나 메모리 부족이 발생한다.
- 중단과 재시작에서 설명하였듯이 재시작시 초기 배치는 시간이 더 걸리기 때문에, 안심하고 재시작을 자유롭게 하려면 최대 처리율을 설정하는 것이 좋다.
- 최대 처리율은 여러 번의 테스트를 통해 배치 간격 동안 처리할 수 있는 최대로 지정합니다. 단, 재시작 초기의 처리는 평소보다 느리니 초기 결과를 사용하지 않는다.
- 최대 처리율을 낮게 설정하면, 오랫동안 중단된 후 재시작하는 경우에 중단된 시간만큼의 데이터를 따라잡는데 오래 걸릴 수 있다.
- 최대 처리율을 높게 설정했음에도 불구하고 따라잡는 시간이 오래 걸릴 경우, KafkaUtils.createRDD()을 사용해 배치 처리를 할 수 있다. 단, offsetsRange를 지정해야 하므로, 마지막 처리 offset과 현재 Kafka에 들어온 마지막 offset을 알아야 한다.
- 여러 개의 Kafka topic을 동시에 처리하면, 최대 처리율은 당연히 데이터 입수율이 가장 높은 topic을 기준으로 설정한다.
3. BackPressure 설정
- BackPressure 설정은
spark.streaming.backpressure.enabled
를 true로 설정합니다. - 최대 처리율을 설정하더라도 역 압력을 설정하면 분산 환경과 입력 데이터의 변화에 대응하기 좋습니다.
- 분산 환경의 특성상 처리 시간이 배치 간격보다 커지면, 다음 배치의 일정 지연이 생기고 불안정(unstable) 상태가 됩니다. 불안정 상태가 되면 역 압력에 의해 입력율(Input Rate)을 줄임으로써 처리 시간을 줄이고, 배치 간격을 기다리지 않고 연속해서 다음 배치를 실행해 일정 지연을 0으로 만든다.
- 일정 지연이 0이 되면 입력율을 조금씩 높여서 버퍼링 되는 데이터를 줄인다.
- 입력율은 안정 상태가 지속되는 한 최대 처리율까지 올라갈 수 있다.
- 버퍼링 되는 데이터를 모두 소진하면 입력율은 다시 평소처럼 낮아진다.
- BackPressure 설정을 하면 처리 시간이 배치 간격보다 클 때마다 입력율이 낮아졌다 올라갔다 다시 정상으로 돌아오는 과정이 반복됩니다.
4. 번외
중단된 시간이 너무 길어지면, Kafka에서 버퍼링하며 저장할 수 있는 데이터 크기를 넘어설 수 있다. Kafka가 보관하고 있어서 언제든지 전송할 수 있는 offset을 지나치면, 스트림 처리에서 offset을 잘 저장하고 있더라도 재시작할 때 OffsetOutOfRangeException
이 발생한다. 당연히 유실 없는 스트림 처리는 불가능한 상황이 된다. 따라서 중단될 수 있는 최대 예상 시간 동안 데이터를 충분히 버퍼링 할 수 있도록 Kafka를 설정해 둬야 한다.
회사에서는 kafka에 저장된 시간이 20분이다. 20분넘게 장애가 발생하고 실시간 리포트 Job을 재시작하면 OffsetOutOfRangeException
이 발생한다. offset이 가리키고 있는 데이터가 삭제되어 존재하지 않기 때문이다. 이때 예외처리는 offset을 최근으로 재설정하는 방법을 취했다.