Cassandra CqlBulkOutputFormat사용법
도입배경
- Cassandra에서는 Spark connector, cqlsh 등 다양한 방법의 데이터 업로드 방법을 제공하나 대부분 insert query를 만들어 Cassandra에 실행하는 형태임.
- 많은 수의 query가 발생할 경우 Network I/O 부하가 심해지고, Cassandra의 Memory 사용량 증가와 잦은 compaction으로 인해 성능 저하가 발생함.
- 이를 방지하기 위해 Cassandra는 외부 서버에서 SSTable을 만든 후 올리는 SSTableLoader라는 Tool을 제공함.
- 그러나, 외부 서버에서 SSTable 생성시 많은 Memory 사용과 CPU 점유률로 인해 부하 분산을 고민하게 되었음.
분산 환경에서 Cassandra bulk 업로드 방법들
- CqlBulkOutputFormat, CQL3 based, easier to program, requires C* V2+ and JDK 7 in Hadoop environment
- BulkOutputFormat, thrift based, less abstraction and needs more low-level work, provided in C* V1 and OK with JDK 6
- CqlSSTableWriter + SSTableLoader, CQL3 based, easier to program, but no managed parallelism, requires C* V2 but no need for Hadoop environment, multi-thread/parallel computing mechanism would required for scalability
[출처: https://shenghuawan.wordpress.com/2015/01/20/cassandra-bulk-loading-summary/]
–> 3번은 분산 환경에서 적합하지 않음
–> 2번은 레퍼런스는 많으나 Cassandra 버전업 이후 deprecated 됨
–> 1번 방법인 CqlBulkOutputFormat을 이용해 개발하기로 결정함
CqlBulkOutputFormat 소스
CqlBulkOutputFormat 기본 동작
- Mapper 혹은 Reducer에서 넘어온
List<ByteBuffer>
데이터를 입력으로 받음 - 전달된 데이터들은 buffer size대로 쪼개져 여러개의 SSTable 파일로 생성됨
- 생성된 SSTable들은 지정된 대역폭대로 압축되어 Cassandra에 전송됨
Mapper 구현
- CqlBulkOutputFormat은
List<ByteBuffer>
로만 입력을 받으므로 Mapper(혹은 Reducer)를 Java로 구현하는 게 필요함 - Cassandra 테이블 컬럼의 데이터형에 따라 ByteBuffer로 casting 해줘야 하는데 Cassandra API 중 org.apache.cassandra.utils.ByteBufferUtil 을 사용하면 편리함
public void map(LongWritable key, Text data,
OutputCollector<Text, List<ByteBuffer>> output,
Reporter reporter) throws IOException {
String[] dataArr = data.toString().split(delimiter);
int categoryId = Integer.parseInt(dataArr[0]);
String uid = dataArr[1].toLowerCase();
SimpleDateFormat sdf = new SimpleDateFormat(dateformat);
long dtSeconds = 0;
try {
Date dt = sdf.parse(uploadData);
dtSeconds = dt.getTime();
} catch (java.text.ParseException e) { }
// make column data
List<ByteBuffer> columns = new ArrayList<ByteBuffer>();
columns.add(ByteBufferUtil.bytes(uid));
columns.add(ByteBufferUtil.bytes(dtSeconds));
columns.add(ByteBufferUtil.bytes(score));
columns.add(ByteBufferUtil.bytes(categoryId));
output.collect(new Text(uid), columns);
}
CqlBulkOutputFormat 옵션
Hadoop Streaming -D 옵션으로 직접 설정하거나, -conf 옵션에 properties xml 파일로 지정함
- cassandra.output.keyspace : keyspace 이름 설정
- mapreduce.output.basename : table 이름 설정
- cassandra.columnfamily.schema.user_interest_category : table schema
ex) CREATE TABLE dmp.user_interest_category_test ( uid text, date timestamp, score float, category_id int, PRIMARY KEY (uid, date, score, category_id) )
- cassandra.columnfamily.insert.user_interest_category : insert query
ex) INSERT INTO dmp.user_interest_category_test (uid, date, score, category_id) VALUES (?, ?, ?, ?);
- cassandra.output.keyspace.username : cassandra user name
- cassandra.output.keyspace.passwd : password
- cassandra.output.thrift.address : cassandra 서버 ip 목록 (comma seperate)
- cassandra.output.partitioner.class : org.apache.cassandra.dht.Murmur3Partitioner
- mapreduce.job.user.classpath.first
- hadoop core 라이브러리와 cassandra 라이브러리 간에 충돌이 발생할 경우가 있음. guava api의 경우 사용하는 버전이 달라서 에러가 발생하므로 cassandra의 라이브러리를 우선 사용하도록 이 옵션을 true로 설정
- mapreduce.output.bulkoutputformat.buffersize
- sstable 생성시 사용하는 buffer size (기본 64MB).
- 설정값을 크게 할수록 한번에 생성되는 sstable size가 커지는 대신에 메모리 사용량이 많아서 MR 작업시 java.lang.OutOfMemoryError: GC overhead limit exceeded 에러가 발생하기도 함
- 너무 적게 설정할 경우 sstable 개수가 많아져 cassandra의 compaction이 자주 일어날 가능성이 있음
- mapreduce.output.bulkoutputformat.streamthrottlembits
- cassandra에 sstable 전송시 설정되는 bandwidth 값. (기본 0: 무한대)
- 설정하지 않을 경우 굉장히 빠른 속도로 cassandra 서버에 전송되나 cassandra 서버의 network inbound 대역폭을 전부 점유해버려 일시적으로 cassandra 서버에 큰 부하를 줄 수 있음.
- 상황에 따라 적절한 값으로 설정하는 게 중요함.
Maven build
<repositories>
<repository>
<id>ngnentRepository</id>
<name>NMP Repository</name>
<url><http://nexus.nhnent.com/content/groups/public></url>
</repository>
<repository>
<id>mvnRepository</id>
<name>mvn Repository</name>
<url><https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all></url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>system</scope>
<systemPath>${HADOOP_COMMON}</systemPath>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>system</scope>
<systemPath>${HADOOP_MAPREDUCE_CORE}</systemPath>
<version>0.20</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.1.15</version>
</dependency>
</dependencies>
Hadoop Streaming MapReduce 작업 실행
Maven 빌드한 jar 파일을 포함 시키고, 작업한 Mapper class와 outputformat을 명시한다.
hadoop jar ${HADOOP_STREAMING_JAR} \\
-D mapred.job.name="dmp-interest-bulkload-"${DATE} \\
-D mapred.map.tasks=2 \\
-D mapreduce.job.user.classpath.first=true \\
-D cassandra.output.keyspace.username=${CASSANDRA_USER} \\
-D cassandra.output.keyspace.passwd=${CASSANDRA_PAWD} \\
-D cassandra.output.thrift.address=${CASSANDRA_ADDR} \\
-D cassandra.output.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner \\
-conf ${CONF_DIR}/schema_user_interest_category_app.xml \\
-libjars ${JAR_PATH} \\
-input ${INPUT_PATH} \\
-output ${OUTPUT_PATH} \\
-mapper com.toast.exchange.dmp.interest.bulkload.AppAdidBulkload \\
-reducer NONE \\
-outputformat org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat
Hadoop Streaming MapReduce 작업 로그
각 Map (혹은 Reduce)의 stdout 로그를 살펴보면 sstable 파일들이 생성되고, 전송된 결과를 확인할 수 있다. MapReduce작업이 실행된 data node를 접근해 생성된 sstable 파일들을 확인해볼 수도 있다.
sstable 생성로그
09:25:46.685 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Digest.sha1 to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Digest.sha1
09:25:46.687 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-TOC.txt to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-TOC.txt
09:25:46.687 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Index.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Index.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Statistics.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Statistics.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Filter.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Filter.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-CompressionInfo.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-CompressionInfo.db
09:25:46.689 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Data.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db
sstable 전송로그
09:25:48.236 [StreamConnectionEstablisher:3] INFO o.a.c.streaming.StreamCoordinator - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401, ID#0] Beginning stream session with /x.x.x.x
09:25:48.235 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 24064, transfer size: 2436481, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.235 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 22528, transfer size: 2483732, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.236 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 22912, transfer size: 2574224, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.315 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.40, repairedAt = 0, totalSize = 2459515
09:25:48.324 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.156, repairedAt = 0, totalSize = 2379097
09:25:48.325 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.37, repairedAt = 0, totalSize = 2482971
09:25:48.328 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.39, repairedAt = 0, totalSize = 2424141
09:25:48.333 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.21, repairedAt = 0, totalSize = 2436481
09:25:48.336 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.38, repairedAt = 0, totalSize = 2574224
09:25:48.342 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.157, repairedAt = 0, totalSize = 2483732
09:25:48.350 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.22, repairedAt = 0, totalSize = 2461039
09:27:24.853 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Finished streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.38, bytesTransferred = 2574224, totalSize = 2574224
09:27:24.863 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Received Received (e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0)
09:27:24.864 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending Complete
09:27:24.876 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Received Complete
09:27:24.877 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Closing stream connection handler on /x.x.x.x
09:27:24.880 [STREAM-IN-/x.x.x.x] INFO o.a.c.streaming.StreamResultFuture - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Session with /x.x.x.x is complete
CqlBulkOutputFormat bulk 업로드 성능 측정
- 대상 데이터 : 4.5억 row수의 13G 데이터
- buffer size와 대역폭의 변화를 주고 성능 측정함
- buffer size : 64MB(기본값), 대역폭 : 0 (기본값)
- 결과 : sstable 생성 10분, 전송 5분
- buffer size : 96MB, 대역폭 지정: 1MBps
- 결과 : sstable 생성 10분, 전송 1시간 50분