Spark Application을 cluster
모드로 제작할 때 SparkConf 정보 등과 같이 다양한 리소스 파일을 읽는 경우가 있다.
이때 사용하면 좋은 옵션이 --files
옵션이다.
(리소스파일 hadoop에 올리는 방법도 있지만, 매우 낭비가 큰 방식이라 선호하지는 않는다.)
deploy-mode를 client
모드를 사용할때는 dirver에서 로컬 파일을 읽는 방식으로 리소스 파일을 읽어 사용하면 되지만 cluster
모드에서는 어떤 노드가 dirver노드가 될지 모르기 때문에 로컬 파일를 읽는 방식 처럼 사용하기는 힘들다.
이때 사용하는 방식이 spark-submit옵션 중에 --files
옵션을 사용하는 것이다.
아래와 같이 --files
다음에 리소스파일 path를 주면 사용이 가능하다.
spark-submit \
--class ${class} \
--name "${appName}_${date}_${type}" \
--master yarn \
--deploy-mode cluster \
--files ${filteringFile} \
${runjar}
spark-core소스를 보면 --files
로 deploy된 리소스파일은 각각의 executors에서 사용이 가능하다고 나온다. 즉 spark context로 배포되기때문에 어느 시점에서 로컬 파일처럼 읽어도 사용이 가능하다.
spark application에서 Files.newBufferedReader(Paths.get(fileName))
으로 읽어서 사용하면 된다.
//files로 배포되고 각 executor에 저장된 localFilePath
val executorLocalFilePath = SparkFiles.get(fileName);
//아래 코드는 NotFoundFileException을 발생시킨다.
Source.fromFile(executorLocalFilePath)
--files
옵션으로 배포된 file을 읽을때는 Path에 fileName만 주면 상대경로로 file을 찾아준다.
절대경로 file 찾기란 쉽지 않으니,, 상대경로를 사용하자!!
//files로 배포되고 소스에서 사용할려면 fileName만 주면 된다.
Source.fromFile(fileName)
(이것 때문에 많은 시간을 보냈다.ㅠㅠ)
alias
설정 spark-submit \
--class ${class} \
--name "${appName}_${date}_${type}" \
--master yarn \
--deploy-mode cluster \
--files fileName#aliasName \
${runjar}
--files
옵션 파라미터에 #
을 사용하면 #
뒤 String이 alias로 사용된다.
alias
는 cluster모드
에서만 사용이 가능하고 소스에서 사용할 때는 aliasName
으로 호출해서 사용한다. (fileName으로는 접근 안됨)
//files로 배포되고 소스에서 사용할려면 aliasName을 넣어줘야한다.
Source.fromFile(aliasName)
-> 배치 사용하기 전에 배치를 꼭 사용하는가를 먼저 생각하기
개발자가 실제로 개발하는 부분 Tip은 복잡한 step을 만들때 간단한.step을 여러개 만들어서 만드는게 낫다 배치를 만들때 어떻게 잘 나눌까?를 고민하기
저장소 개념으로 사용
Job의 영속성관리 할때 사용 DAO라고 보면 됨
Job meta데이터를 어디에 저장할 것인가를 결정하는 인터페이스임
job이름을 가져올때 사용
demo5를 보면 batch의 트렌젝션을 볼 수 있음
주의할점 wirte에서 error나면 chunk가 1로 변경되기 때문에 느려질수도 잇으니 적절하게 chunck크기를 조절하자 모니터링잘하구
Datastax에서 제공하는 spark-cassandra-connector_2.11
을 사용하여 2억 Row이상이 되는 파일을 읽어 Cassandra에 Insert하는 작업이 있다. 이 작업은 Daily로 진행 된다.
회사 클러스터 자원을 사용했을 때 작업 시간은 대략 30분 정도이다. 하지만 이 작업을 할 때 Cassandra Read 기능은 거의 사용할 수 없을 정도로 불능 상태가 된다.
해당 작업이 수행할 동안 Cassandra의 Read Latency는 다음과 같다. (오전7시30분~ 오전8시까지)
해당 작업이 2억Row가 되는 데이터를 한번에 Insert하는 작업이기 때문에 Spark-Cassandra-connector에서 throughput_mb_per_sec
옵션으로 insert throughtput양을 조절해도 워낙 들어가는 양이 많아서 Network I/O 부하가 심해지고 memTable에 작성되는 내용이기 때문에 Memory 사용량 증가와 잦은 compaction으로 C* 전체의 성능이 저하되는 현상이 있다.
즉 C*를 사용하는 Data-API서버의 Response Time을 개선하고 싶은 것이다.
아래는 Data-API서버에 Response Time이 100ms가 넘는 비율을 나타낸 그래프이다.
7시 Bulk Insert작업이 동작하는 시간이다.
일명 불기둥(7시 bulk insert작업동안 C*의 Read성능이 떨어져 API Response Time이 100m가 넘는 것)을 제거하는 것이 이번 프로젝트에 목표다.
기존 방식에서 Read성능이 떨어지는 이유를 write path와 read path보면 알 수 있다.
row cache를 쓰고 있으면 를 확인하여 있으면 응답을 보낸다
row cache
row cache에 값이 없으면 를 체크하여 sstable에 값이 있는지 확인한다
bloom filter
bloom filter 확인 결과가 true라고 하더라도 는 false positive(실제로는 없는데 있다고 판단) 가능성이 있으므로 를 한 번 더 확인한다
bloom filter
key cache
key cache에 값이 없으면 에서 partition key에 해당하는 partition index의 offset값을 찾는다(partition key에 딱 맞는 값을 주는 것이 아니라 starting point를 알려줌)
partition summary
spark-cassandra-connector
에서 C*에 데이터를 Insert하는 방식은 RDD를 mapPartition 돌면서 Insert문을 만들어 C*에 질의하는 방식이다. 그렇기 때문에 MamTable에서 SSTable로 가는 Flush가 많아지고 SSTable이 많아지면 Compaction수가 많아져 Read작업에 사용될 Memory가 부족해 wirte작업이 끝날때 까지 Read Latency의 값이 매우 크게 나타난다. 또 GC발생횟수가 증가하고, Memory에서 Disk로 Flush가 되기 때문에 System Load도 증가된다.
문제는 실시간으로 C*에서 값을 가져가 사용하는 서비스에 큰 영향을 미친다. spark-cassandra-connector
에서 제공하는 방식이 아닌 새로운 방식을 모색하게 되었다. 여러가지 실험을 진행했지만 그 중 SSTableFile을 만들어 직업 C*에 Insert하는 방식을 사용하니 ReadLatency와 C*의 성능 개선을 할 수 있었다.
라이브러리를 제작할 때 크게 두가지 기능을 구현하면 되었다.
이때 Spark를 사용하여, 위 두가지 기능을 분산 처리 하도록 하였다.
작업이 순서는 다음과 같다.
작업 시간은 10분정도 이다. (기존 30분에서 20분 가량 줄어 든 시간이다.)
아래 코드는 RDD를 Iterator돌며 SSTableFile제작 → C* upload작업을 나타낸다.
public class SSTableExportProcessor implements Serializable {
public static void process(Iterator<CustomTargetingFitModel> it, SparkCassSSTableLoaderClientStatement clientStatement, int TTL) throws IOException {
String keyspaceName = clientStatement.getKeyspaceName();
String tableName = clientStatement.getTableName();
//ssTable Directory Path에 마지막은 keyspace/tableName으로 해야함.
String tempSSTableDirectoryPath = "/tmp/" + "spark-cass-" + UUID.randomUUID().toString()
+ "/" + keyspaceName + "/" + tableName;
File tempSSTableDirectory = new File(tempSSTableDirectoryPath);
boolean makeDirCheck = tempSSTableDirectory.mkdirs();
if (makeDirCheck) {
//SSTable File 생성작업
CQLSSTableWriter writer =
CQLSSTableWriter.builder()
.inDirectory(tempSSTableDirectory)
.forTable(clientStatement.getTableSchemaStatement())
.using(clientStatement.getInsertStatement(TTL))
.build();
while (it.hasNext()) {
CustomTargetingFitModel row = it.next();
List<Object> rowValues = new ArrayList<>(2);
rowValues.add(row.getUid());
rowValues.add(row.getCode());
writer.addRow(rowValues);
}
writer.close();
//SSTable File cassandar로 load
CqlBulkRecordWriter.ExternalClient externalClient =
new CqlBulkRecordWriter.ExternalClient(clientStatement.getExternalClientConf());
try {
new SSTableLoader(tempSSTableDirectory,
externalClient,
new OutputHandler.LogOutput()).stream().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
if (tempSSTableDirectory.exists()) {
FileUtils.deleteDirectory(tempSSTableDirectory);
}
}
}
}
매일 오전 7시30분에 Bulk Insert작업이 시작된다. 수정한 코드는 2월 16일에 배포를 했다. SystemLoad와 GC Time이 줄어든 것을 확인할 수 있다. MemTable을 거치지 않으니 당연한 결과이다.
하지만 Read Latency는 크게 변화가 없었다. (혼자 크게 뛰는 장비는 장비 이상으로 추후 제거되었다.)
그래서 원인을 찾아보았다.
위 그래프는 C*를 Datasource로 사용하는 API서버에서 ResponseTime이 100ms가 넘는 비율을 나타낸다.
SSTable File을 이용한 Bulk insert 방식은 7시33분 1분 동안 70%가 넘는 비율이 ResponseTime이 100ms를 넘겼다.
이에 반해 Spark-Cassandra-Connector를 사용한 isnert작업은 10%대를 길게 유지하였다.
Bulk Inert한 테이블의 Compaction Strategy는 LeveledCompactionStrategy
이다. 즉 많은 SSTable File이 생성되면 될 수록 Read성능이 떨어진다.(reper. datastax-doc)
기존 작업은 executor하나당 1000개가 넘는 SStable파일이 생성되어 compaction 타임에 C*의 Read성능에 악영향을 미쳤을 것으로 보인다.
그래서 수정된 작업은 다음과 같다.
즉 1만개가 넘는 SSTableFile 갯수를 Repartitions을 하여 10개로 만들어서 테스트를 하니 API에 100ms넘는 비율이 확연하게 줄어든 것을 알 수 있다.
그리고 7시 불기둥도 사라졌다.
2억 Row이상의 데이터를 C*로 Insert하는 작업을 할때 line-by-line으로 memTable로 insert하는 것 보다 SSTable에 바로 write하는 것이 유리하다.
C* doc에서 권장하는 Heap Size는 8GB였다. 그래서 장비의 memory에 큰 관심이 없었다. 하지만 compaction, read할때 장비의 memory가 크면 클 수록 유리하다는 것을 doc(datastax-doc)을 읽으면서 알 수 있었다.
특히 compaction이 발생할때…
그리고 SSTable File용량보다는 File갯수가 성능에 더 큰 영향을 준다. 즉 SSTableFile갯수가 적을 수록 성능에 더 유리하다.
아래는 C*에서 사용하는 Memory구조이다.
위 내용을 Spark Lib으로 제작하여 배포하였다.
libraryDependencies += "com.joswlv.spark.cassandra.bulk" %% "Spark2CassandraBulkLoad" % "1.0.1"
<dependency>
<groupId>com.joswlv.spark.cassandra.bulk</groupId>
<artifactId>Spark2CassandraBulkLoad</artifactId>
<version>1.0.1</version>
</dependency>
compile 'com.joswlv.spark.cassandra.bulk:Spark2CassandraBulkLoad:1.0.1'
// Import the following to have access to the `bulkLoadToCass()` function for RDDs or DataFrames.
import com.joswlv.spark.cassandra.bulk.rdd._
import com.joswlv.spark.cassandra.bulk.sql._
// Specify the `keyspaceName` and the `tableName` to write.
rdd.bulkLoadToCass(
keyspaceName = "keyspaceName",
tableName = "tableName"
)
// Specify the `keyspaceName` and the `tableName` to write.
df.bulkLoadToCass(
keyspaceName = "keyspaceName",
tableName = "tableName"
)
[출처: https://shenghuawan.wordpress.com/2015/01/20/cassandra-bulk-loading-summary/]
–> 3번은 분산 환경에서 적합하지 않음
–> 2번은 레퍼런스는 많으나 Cassandra 버전업 이후 deprecated 됨
–> 1번 방법인 CqlBulkOutputFormat을 이용해 개발하기로 결정함
List<ByteBuffer>
데이터를 입력으로 받음List<ByteBuffer>
로만 입력을 받으므로 Mapper(혹은 Reducer)를 Java로 구현하는 게 필요함public void map(LongWritable key, Text data,
OutputCollector<Text, List<ByteBuffer>> output,
Reporter reporter) throws IOException {
String[] dataArr = data.toString().split(delimiter);
int categoryId = Integer.parseInt(dataArr[0]);
String uid = dataArr[1].toLowerCase();
SimpleDateFormat sdf = new SimpleDateFormat(dateformat);
long dtSeconds = 0;
try {
Date dt = sdf.parse(uploadData);
dtSeconds = dt.getTime();
} catch (java.text.ParseException e) { }
// make column data
List<ByteBuffer> columns = new ArrayList<ByteBuffer>();
columns.add(ByteBufferUtil.bytes(uid));
columns.add(ByteBufferUtil.bytes(dtSeconds));
columns.add(ByteBufferUtil.bytes(score));
columns.add(ByteBufferUtil.bytes(categoryId));
output.collect(new Text(uid), columns);
}
Hadoop Streaming -D 옵션으로 직접 설정하거나, -conf 옵션에 properties xml 파일로 지정함
ex) CREATE TABLE dmp.user_interest_category_test ( uid text, date timestamp, score float, category_id int, PRIMARY KEY (uid, date, score, category_id) )
ex) INSERT INTO dmp.user_interest_category_test (uid, date, score, category_id) VALUES (?, ?, ?, ?);
<repositories>
<repository>
<id>ngnentRepository</id>
<name>NMP Repository</name>
<url><http://nexus.nhnent.com/content/groups/public></url>
</repository>
<repository>
<id>mvnRepository</id>
<name>mvn Repository</name>
<url><https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all></url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>system</scope>
<systemPath>${HADOOP_COMMON}</systemPath>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>system</scope>
<systemPath>${HADOOP_MAPREDUCE_CORE}</systemPath>
<version>0.20</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.1.15</version>
</dependency>
</dependencies>
Maven 빌드한 jar 파일을 포함 시키고, 작업한 Mapper class와 outputformat을 명시한다.
hadoop jar ${HADOOP_STREAMING_JAR} \\
-D mapred.job.name="dmp-interest-bulkload-"${DATE} \\
-D mapred.map.tasks=2 \\
-D mapreduce.job.user.classpath.first=true \\
-D cassandra.output.keyspace.username=${CASSANDRA_USER} \\
-D cassandra.output.keyspace.passwd=${CASSANDRA_PAWD} \\
-D cassandra.output.thrift.address=${CASSANDRA_ADDR} \\
-D cassandra.output.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner \\
-conf ${CONF_DIR}/schema_user_interest_category_app.xml \\
-libjars ${JAR_PATH} \\
-input ${INPUT_PATH} \\
-output ${OUTPUT_PATH} \\
-mapper com.toast.exchange.dmp.interest.bulkload.AppAdidBulkload \\
-reducer NONE \\
-outputformat org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat
각 Map (혹은 Reduce)의 stdout 로그를 살펴보면 sstable 파일들이 생성되고, 전송된 결과를 확인할 수 있다. MapReduce작업이 실행된 data node를 접근해 생성된 sstable 파일들을 확인해볼 수도 있다.
sstable 생성로그
09:25:46.685 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Digest.sha1 to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Digest.sha1
09:25:46.687 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-TOC.txt to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-TOC.txt
09:25:46.687 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Index.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Index.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Statistics.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Statistics.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Filter.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Filter.db
09:25:46.688 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-CompressionInfo.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-CompressionInfo.db
09:25:46.689 [Thread-11] DEBUG o.apache.cassandra.io.util.FileUtils - Renaming /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-tmp-ka-1-Data.db to /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db
sstable 전송로그
09:25:48.236 [StreamConnectionEstablisher:3] INFO o.a.c.streaming.StreamCoordinator - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401, ID#0] Beginning stream session with /x.x.x.x
09:25:48.235 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 24064, transfer size: 2436481, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.235 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 22528, transfer size: 2483732, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.236 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending File (Header (cfId: e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0, version: ka, estimated keys: 22912, transfer size: 2574224, compressed?: true, repairedAt: 0), file: /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db)
09:25:48.315 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.40, repairedAt = 0, totalSize = 2459515
09:25:48.324 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.156, repairedAt = 0, totalSize = 2379097
09:25:48.325 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.37, repairedAt = 0, totalSize = 2482971
09:25:48.328 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.39, repairedAt = 0, totalSize = 2424141
09:25:48.333 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.21, repairedAt = 0, totalSize = 2436481
09:25:48.336 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.38, repairedAt = 0, totalSize = 2574224
09:25:48.342 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.157, repairedAt = 0, totalSize = 2483732
09:25:48.350 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Start streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.22, repairedAt = 0, totalSize = 2461039
09:27:24.853 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.s.c.CompressedStreamWriter - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Finished streaming file /data8/yarn/nm/usercache/irteam/appcache/application_1456906327100_79670/container_1456906327100_79670_01_000004/tmp/dmp/user_interest_category-c485093c-ef8b-4d06-b870-9465cd076a3b/dmp-user_interest_category-ka-1-Data.db to /10.161.26.38, bytesTransferred = 2574224, totalSize = 2574224
09:27:24.863 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Received Received (e5ec2a50-3fd2-11e5-9d62-af38b156d6d1, #0)
09:27:24.864 [STREAM-OUT-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Sending Complete
09:27:24.876 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Received Complete
09:27:24.877 [STREAM-IN-/x.x.x.x] DEBUG o.a.c.streaming.ConnectionHandler - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Closing stream connection handler on /x.x.x.x
09:27:24.880 [STREAM-IN-/x.x.x.x] INFO o.a.c.streaming.StreamResultFuture - [Stream #5082a580-6023-11e6-a70e-5fc5e5829401] Session with /x.x.x.x is complete
사내정보 시스템 SSO
연동을 위해 Apache사용이 불가피함
사내에서 발급받은 서버에는 기본적으로 Apahce2.2가 설치되어 있어 Apahce설치과정은 생략한다.
mod_jk를 설치과정 및 Apache설정은 다음과 같다.
1) tomcat-connectors설치
cd /usr/local/src
wget http://www.apache.org/dist/tomcat/tomcat-connectors/jk/tomcat-connectors-1.2.44-src.tar.gz
tar -xzf tomcat-connectors-1.2.44-src.tar.gz
2) native 디렉토리로 이동
cd tomcat-connectors-1.2.44-src/native
3) 컴파일
./configure --with-apxs=/usr/sbin/apxs
make
make install
apxs path를 입력해줘야하는데, defulat는 /usr/sbin/apxs
이다. 만약 이 경로에 없다면 찾아서 알맞게 입력하자.
만약 없는 경우
sudo yum install httpd-devel
4) httpd.conf수정
cd /etc/httpd/conf
sudo vi httpd.conf
(httpd.conf) 맨아래추가
LoadModule jk_module modules/mod_jk.so
include conf/mod_jk.conf
include conf/http_vhost.conf
5) mod_jk.conf 추가
vi mod_jk.conf
(mod_jk.conf)
<IfModule mod_jk.c>
JkWorkersFile conf/workers.properties
JkLogFile logs/mod_jk.log
JkLogLevel info
</IfModule>
6) workers.properties 추가
vi worker.properties
(workers.properties)
worker.list=worker1,worker2
worker.worker1.port=18009
worker.worker1.host=127.0.0.1
worker.worker1.type=ajp13
worker.worker1.lbfactor=1
worker.worker2.port=28009
worker.worker2.host=127.0.0.1
worker.worker2.type=ajp13
worker.worker2.lbfactor=1
work가 여러개이면 woker.list에 콤마(,)구분자로 추가
7) http_vhost.conf 추가
vi http_vhost.conf
(http_vhost.conf)
<VirtualHost *:80>
ServerName customtargeting.nhnent.com
JkMount /* worker1
</VirtualHost>
<VirtualHost *:80>
ServerName addinfra-site.nhnent.com
JkMount /* worker2
</VirtualHost>
서비스를 추가할려면 workers.properties
와 http_vhost.conf
에 값을 추가하고 apache를 재시작해주면 된다.
SpringBoot 1.x와 SpringBoot 2.x의 AJP포트 설정 법에 차이가 있다. 그 이유는 SpringBoot1.x에서는 내장톰캣을 기본으로 사용하고 있었지만 SpringBoot2.x에서는 내장톰캣대신에 netty를 사용하기 때문이다.
1) application.properties 값추가
tomcat.ajp.protocol=AJP/1.3
tomcat.ajp.port=18009
tomcat.ajp.enabled=true
2-1) SpringBoot1.x ContainerConfig 클래스 추가
import org.apache.catalina.connector.Connector;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.embedded.EmbeddedServletContainerCustomizer;
import org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ContainerConfig {
@Value("${tomcat.ajp.protocol}")
String ajpProtocol;
@Value("${tomcat.ajp.port}")
int ajpPort;
@Value("${tomcat.ajp.enabled}")
boolean tomcatAjpEnabled;
@Bean
public EmbeddedServletContainerCustomizer containerCustomizer() {
return container -> {
TomcatEmbeddedServletContainerFactory tomcat = (TomcatEmbeddedServletContainerFactory) container;
if (tomcatAjpEnabled) {
Connector ajpConnector = new Connector(ajpProtocol);
ajpConnector.setPort(ajpPort);
ajpConnector.setSecure(false);
ajpConnector.setAllowTrace(false);
ajpConnector.setScheme("http");
tomcat.addAdditionalTomcatConnectors(true);
}
};
}
}
2-2) SpringBoot2.x ContainerConfig 클래스 추가
import org.apache.catalina.connector.Connector;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ContainerConfig {
@Value("${tomcat.ajp.protocol}")
String ajpProtocol;
@Value("${tomcat.ajp.port}")
int ajpPort;
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
tomcat.addAdditionalTomcatConnectors(createAjpConnector());
return tomcat;
}
private Connector createAjpConnector() {
Connector ajpConnector = new Connector(ajpProtocol);
ajpConnector.setPort(ajpPort);
ajpConnector.setSecure(false);
ajpConnector.setAllowTrace(false);
ajpConnector.setScheme("http");
return ajpConnector;
}
}
3) 부팅 로그 확인
10:50:32.520 [main] [INFO ] o.a.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
10:50:32.538 [main] [INFO ] o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
10:50:32.544 [main] [INFO ] o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
10:50:32.560 [main] [INFO ] o.apache.coyote.ajp.AjpNioProtocol - Initializing ProtocolHandler ["ajp-nio-18009"]
10:50:32.563 [main] [INFO ] o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
10:50:32.563 [main] [INFO ] o.apache.coyote.ajp.AjpNioProtocol - Starting ProtocolHandler ["ajp-nio-18009"]
10:50:32.576 [main] [INFO ] o.s.b.c.e.t.TomcatEmbeddedServletContainer - Tomcat started on port(s): 8080 (http) 18009 (http)
ajp포트가 올라왔는지 확인한다.
toDebugString()
메소드로 RDD의 가계도 출력 가능파이프라이닝 외에도 Spark의 내부 스케줄러는 RDD가 이미 Cluster 메모리나 디스크에 Caching되어 있는 경우 RDD 그래프의 가계도를 제거
특정 Action을 위해 생성되는 Stage들이 모여 Job을 이룸 즉, count() 같은 Action을 호출할 때마다 하나 이상의 Stage로 구성된 Job이 생성 됨
한 번 Stage 그래프가 정의되면 Task들이 만들어지고 사용하는 배포 모드에 따라 다양하게 내부 스케줄러로 전송 됨
Physical Plan에서 Stage들은 RDD 가계도에 따라 각자 의존성을 가지게 되므로 그에 맞는 순서로 실행
Stage는 실행되는 Partition은 서로 다르지만 같은 일을 수행하는 Task들을 실행 시킴
RDD의 연산들은 새로운 RDD를 만들고 이것들은 부모를 참조하게 되면서 이에 따라 그래프가 만들어 진다.
RDD에서 Action을 호출하면 그때는 반드시 연산이 수행되어야 한다. 이는 또한 부모 RDD에게도 연산을 요구하게 된다. Spark의 스케줄러는 RDD들이 필요한 모든 연산을 수행하도록 Job을 제출한다. 이 Job은 하나 이상의 Stage를 갖게 되며, 이는 Task들로 구성된 병렬 집단 연산들을 말한다. 각 Stage는 DAG에서 하나 이상의 RDD들과 연계된다. 하나의 Stage는 파이프라이닝에 의해 여러개의 RDD와 연계될 수도 있다.
Stage들은 순서대로 실행되며 RDD의 조각들을 연산하기 위한 Task들을 실행한다. Stage의 최종 단계가 끝나면 Action이 완료된다.