팀에서 실시간 리포트입력 소스를 kafka로 변경하는 작업을 했다. 이 글은 작업을 하는 동안 배운 것을 정리한다.
먼저 스트림 처리를 신뢰도에 따라 다음과 같이 세가지로 구분법을 알아보자!
회사에서 사용하고 있는 Spark 버전이 1.5이므로 1.5까지 SparkStreaming의 변천사를 보자.
앞선 버전별 설명에서 보면 SparkStreming에서 Kafka를 연동에 두가지 방법이 존재한다.
이방식은 Receiver를 통해 kafka로 부터 Data를 전달 받는다. 그리고 데이터 유실을 방지하기 위해 WAL를 사용해 kafka로 부터 받은 데이터를 저장을 하면서 처리한다. Receiver를 사용하는 경우, 처음 receiver를 시작할 때 만들어진 Kafka 연결을 계속 사용하기 때문에, 매 배치마다 연결 지연이 발생하지 않음.
WAL을 사용한 receiver 방식은 Kafka로부터 받은 데이터가 WAL에 먼저 저장되는데, Zookeeper에 Kafka의 offset을 갱신하기 전에 시스템이 실패할 경우, Spark Streaming은 데이터를 받았다고 인식하지만 Kafka는 데이터를 전달하지 않았다고 인식하는 불일치가 발생합니다. 이후 시스템이 복구되면 Spark Streaming은 WAL에 저장된 데이터를 읽어 복구 처리를 하고 receiver를 동작시킵니다. Receiver는 Kafka로부터 다음 데이터를 받아오려고 하는데, Kafka는 Zookeeper에 저장된 offset 정보를 보고, 이전 데이터를 전달하지 않았다고 보게 되므로, 보낼 필요가 없는 데이터를 다시 전송한다. 결국 복구 처리를 두 번하게 된다.(At-least-once)
그리고 다음과 같이 DStream을 생성한다.
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
가장 큰 특징은 Receiver를 사용하지 않는다. 그래서 다음과 같은 특성이 있다.
Kafka Direct API 방식에서 checkpoint를 사용하면 driver는 장애 복구를 위해 어플리케이션의 메타 데이터를 HDFS나 S3처럼 신뢰성 있는 저장소에 저장한다. 메타 데이터에는 어플리케이션 설정 정보와 스케쥴러에 등록되었지만(queued) 아직 완료되지 않은 배치 작업에 대한 정보를 저장한다. 배치 작업 정보에는 offset 정보도 포함되어 있어 driver 장애로 중단된 스트림 처리를 복구할 때 저장된 메타 데이터를 읽어 불필요한 재전송 없이 장애가 발생했던 부분부터 처리할 수 있습니다.
여기서 offset를 관리하는 방법은 다음과 같다.
쉽게 offset을 저장하는 방법은 Spark의 checkpoint에 저장하는 방법있다.
하지만 여기에 다음과 같은 문제가 있다.
회사에서는 관리의 편의상 Zookeeper에 offset을 저장하는 방법으로 실시간리포트를 구현하였다.
소스는 다음과 같다.
spark.streaming.kafka.maxRatePerPartition
이다.
이 설정은 파티션당 초당 처리량이라는 점을 주의 해야한다. 배치 간격이 10초이고 최대 처리율이 1000이고, 파티션 수가 4개라면 데이터는 최대 4 * 1000 * 10 = 4000만큼 받아옵니다.spark.streaming.backpressure.enabled
를 true로 설정합니다.중단된 시간이 너무 길어지면, Kafka에서 버퍼링하며 저장할 수 있는 데이터 크기를 넘어설 수 있다. Kafka가 보관하고 있어서 언제든지 전송할 수 있는 offset을 지나치면, 스트림 처리에서 offset을 잘 저장하고 있더라도 재시작할 때 OffsetOutOfRangeException
이 발생한다. 당연히 유실 없는 스트림 처리는 불가능한 상황이 된다. 따라서 중단될 수 있는 최대 예상 시간 동안 데이터를 충분히 버퍼링 할 수 있도록 Kafka를 설정해 둬야 한다.
회사에서는 kafka에 저장된 시간이 20분이다. 20분넘게 장애가 발생하고 실시간 리포트 Job을 재시작하면 OffsetOutOfRangeException
이 발생한다. offset이 가리키고 있는 데이터가 삭제되어 존재하지 않기 때문이다. 이때 예외처리는 offset을 최근으로 재설정하는 방법을 취했다.
in-memory technology라고 하는 것은, 데이터를 처리하는 동안 RAM에 persist 하는것 이라고 한다.
하지만 Spark의 겨우 모든 겨우 데이터를 persist하는 것이 아니라 cache를 한다.
따라서 Spark는 in-memory technology가 아니다.
Spark 메인페이지에서 볼 수 있는 위와 같은 성능 측정 결과는 Logistic regreesion에 대한 것이다. 머신러닝에 대한 일을 처리할 경우 테스트하여 성능 결과를 측정해 놓은 것이다.
보통 머신러닝에서는 동일한 dataset에 대해서 repeatedly iteration을 많이 하곤한다. 이 경우 Spark의 LRU 캐쉬를 이용한 in-memory cache가 빛이나는 경우이다.!!
반복적으로 같은 dataset에 여러번 접근할 경우에 처음에만 읽고 원할때 마다 데이터에 접근 할 수 있는데 결국 memory에 있는 것을 읽기만 하면 된다. 이 경우는 BestCase이다.
그러나 불행하게도, Hadoop상에서 위와 같은 반복작업이 실행될 경우에는 HDFS cache 능력을 잘 활용하지 못하는거 같다. (Haddop-cache)
대략적으로 3x ~ 4x배 정도 Spark가 빠르다고 여겨지는데,
결론적으로 짧은 실행의 job의 경우에는 100x배까지 빠를수는 있어도 보통의 실제 경우의 작업에서는 Hadoop의 성능보다 최대 2.5 ~3배를 넘지 않을 것이다.
Spark에 의해서 혁명적으로 새로운것이 소개된게 없다. 효과적으로 LRU cache를 이용하고 data processing pipelining을 이용했지만 비단 Spark만 그랫던것은 아니다.
만약에 이 문제에 대해서 마음을 좀 열어둔다면, 너는 MPP databases에 의해서 소개되어진( query execution pipelinig, no itermediate data materializaion, LRU cache) 과 거의 같은 컨셉이라는것을 알아차릴 수 있다.
물론 Spark는 이것을 open source로 해서 무료로 사용할 수 있게 제공하고 있다. ( 대부분의 회사들이 MPP database가 좋다고 해도, 엔터프라이즈 MPP 을 돈 주고 쓰고 있지 않는다.. )
|-src/
|--main/
|----resources/
| <files to include in main jar here>
|----scala/
| <main Scala sources>
|----java/
| <main Java sources>
|--test/
|----resources
| <files to include in test jar here>
|----scala/
| <test Scala sources>
|----java/
<test Java sources>
빌드 definition 파일 - 베이스 디렉토리에 있는 build.sbt 파일
빌드 support 파일 - 베이스 디렉토리 하위 project라는 디렉토리에 포함한 파일들을 의미한다.
빌드 target 디렉토리 - 기본으로 베이스 디렉토리 하위의 target
maven과 유사하다.
간단한 테스크를 실행하면 아래와 같다.
➜ scala-Learning-spark sbt clean package
[info] Loading global plugins from /Users/nhnent/.sbt/0.13/plugins
[info] Loading project definition from /Users/nhnent/dev/scala-Learning-spark/project
[info] Set current project to scala-Learning-spark (in build file:/Users/nhnent/dev/scala-Learning-spark/)
[success] Total time: 0 s, completed 2017. 8. 7 오후 8:14:27
[info] Updating {file:/Users/nhnent/dev/scala-Learning-spark/}scala-learning-spark...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Packaging /Users/nhnent/dev/scala-Learning-spark/target/scala-2.11/scala-learning-spark_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 1 s, completed 2017. 8. 7 오후 8:14:28
기본적인 명령어
clean - 타겟 디렉토리에 생성된 모든 파일을 삭제한다.
compile - 메인 리소스에 있는 모든 소스를 컴파일 한다.
test - 컴파일을 하고 모든 테스트케이스를 수행한다.
package - src/main 하위의 자바와 스칼라의 컴파일된 클래스와 리소스를 패키징한다./java.
reload - 빌드 설정을 리로드한다.
기본적인 사용법은 maven과 매우 유사한 것을 알 수 있다.
빌드 버전은 project/build.properties
에서 설정할 수 있다.
maven의 pom.xml
파일처럼 sbt에서는 build.sbt에 명세한다.
intellij에서 sbt프로젝트를 생성하면 다음과 같이 적혀있다.
name := "scala-Learning-spark"
version := "1.0"
scalaVersion := "2.11.8"
규칙은 다음과 같다.
:=
이다.maven처럼 dependencies를 설정할려면 다음과 같이 추가 해주면 된다.
libraryDependencies ++= Seq(
groupID % artifactID % revision,
groupID % otherID % otherRevision
)
예를 들면 다음과 같다.
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
"org.apache.spark" %% "spark-sql" % "1.6.0" % "provided",
"org.apache.spark" %% "spark-hive" % "1.6.0" % "provided"
)
기본적으로 Dependency 소스저장소는 Maven2 repository
를 사용한다. 하지만 사내망안에 있는 reopsitory가 필요할 때가 있다. 이럴 때 maven에서는 <repositories>
태그를 추가해서 해결 했지만 sbt는 다음과 같이 한다.
resolvers += name at location
#ex> resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
maven에서 dependency libraries를 함께 packaging을 할 수 있었다. sbt에서도 똑같이 할 수 있는데 방법은 다음과 같다.
build.sbt에 다음 내용을 추가한면된다.
#jar이름 설정
assemblyJarName in assembly := artifact.value.name + "-" + version.value + "." + artifact.value.extension
###예시###
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
###예시 end###
assemblyMergeStrategy in assembly := {
case PathList("com","payco",xs @ _*) => MergeStrategy.last
case _ => MergeStrategy.discard
}
이렇게 설정한뒤에 다음 명령어를 사용하면 원하는 데로 packaging을 할 수 있다.
sbt clean assembly
:=
you can assign a value to a setting and a computation to a task. For a setting, the value will be computed once at project load time. For a task, the computation will be re-run each time the task is executed.
+=
will append a single element to the sequence.
++=
will concatenate another sequence.
예를들어 기본적으로 컴파일 폴더는 src/main/scala가 되는데, Compile의 Seq[File] 값을 가지는 sourceDirectories라는 키를 통해 컴파일 단계에서 source라는 디렉토리를 컴파일에 포함시키고 싶다면 아래와 같이 하면된다.
sourceDirectories in Compile += new File("source")
아니면 sbt 패키지에 포함되어 있는 file() 함수로 더 쉽게 할 수도 있다.
sourceDirectories in Compile += file("source")
(file() just creates a new File.)
++= 메소드를 사용하면 여러 디렉토리를 한번에 추가 하는 것도 가능하다.
sourceDirectories in Compile ++= Seq(file("sources1"), file("sources2"))
default source directory를 완전 수정하고 싶다면 :=
메소드를 사용하자.
sourceDirectories in Compile := Seq(file("sources1"), file("sources2")
>>> a = "comdementor"
>>> print "Reverse is",a[::-1]
Reverse is rotnemedoc
>>> mat = [[1,2,3], [4,5,6]]
>>> zip(*mat)
[(1,4), (2,5), (3,6)]
>>> a = [1,2,3]
>>> x, y, z = a
>>> x
1
>>> y
2
>>> z
3
>>> a = ["a", "b", "c", "d"]
>>> print " ".join(a)
a b c d
>>> list1 = ['a','b','c','d']
>>> list2 = ['p','q','r','s']
>>> for x, y in zip(list1,list2):
... print x, y
...
a p
b q
c r
d s
>>> a=7
>>> b=5
>>> b, a = a, b
>>> a
5
>>> b
7
>>> print "code"*4+' '+"good"*5
codecodecodecode goodgoodgoodgoodgood
>>> list1 = [ [1 , 2] , [ 3, 4] ]
>>> answer = sum( list1 , [ ] )
>>> print answer
1,2,3,4
>>> wantToInt = "1 2 3 4"
>>> map(int, wantToInt.split())
[1, 2, 3, 4]
brew install mysql
mysql.server start
DB를 만들기 전에 MySql 기본 캐리터셋을 설정해야한다.
MySQL에서는 설정 파일을 다음 순서데로 읽는다.
그래서 /etc/my.cnf
파일 있는지 확인 하고 없으면 cp /usr/local/mysql/support-files/my-default.cnf /etc/my.cnf
으로 생성하고 다음내용을 추가한다.
[mysqld]
character-set-server=utf8
collation-server=utf8_general_ci
init_connect=SET collation_connection=utf8_general_ci
init_connect=SET NAMES utf8
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
그리고 서버 재시작
[root@test ~]# /usr/bin/mysql_secure_installation
mysql> create database testdb;
Query OK, 1 row affected (0.00 sec)
mysql> show create database testdb;
+----------+------------------------------------------------------------------+
| Database | Create Database |
+----------+------------------------------------------------------------------+
| testdb | CREATE DATABASE `testdb` /*!40100 DEFAULT CHARACTER SET utf-8 */ |
+----------+------------------------------------------------------------------+
1 row in set (0.00 sec)
캐릭터 셋이 utf-8이 되어 있다.
유저생성은 다음과 같다.
// localhost로 접속만 허용
mysql> grant all privileges on testdb.* to dev@localhost identified by 'password123';
// 원격 접속도 허용
mysql> grant all privileges on testdb.* to dev@'%' identified by 'password123';
$> mysql -u dev -p
Enter password: (앞에서 지정한 패스워드 입력)
... 중략 ...
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| testdb |
+--------------------+
3 rows in set (0.00 sec)
/etc/localtime
파일 설정을 한국시간으로 바꾸기만 하면 된다.
$> ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime
$> ls -al | grep /etc/locatime
lrwxrwxrwx 1 root root 30 7월 31 12:17 localtime -> /usr/share/zoneinfo/Asia/Seoul
$> date
2017. 07. 31. (월) 12:18:08 KST
[root@test ~]# chkconfig --list mysqld
mysqld 0:off 1:off 2:off 3:off 4:off 5:off 6:off
[root@test ~]# chkconfig mysqld on
[root@test ~]# chkconfig --list mysqld
mysqld 0:off 1:off 2:on 3:on 4:on 5:on 6:off
mutt
- 서버에서 메일 보내기 기능zcat
- 압축안풀고 내용보기같은 명령어 gzip -d -c
wc -l
- 파일 라인수 세기Usage: wc [OPTION]... [FILE]...
or: wc [OPTION]... --files0-from=F
Print newline, word, and byte counts for each FILE, and a total line if
more than one FILE is specified. With no FILE, or when FILE is -,
read standard input.
-c, --bytes print the byte counts
-m, --chars print the character counts
-l, --lines print the newline counts
--files0-from=F read input from the files specified by
NUL-terminated names in file F;
If F is - then read names from standard input
-L, --max-line-length print the length of the longest line
-w, --words print the word counts
sort
- 정렬sort [옵션] "fileName"
sort -k2 -r data
: data파일의 2번째 필드을 기준으로 내림차순 정렬옵션종류
-f
=> 대소문자를 구분하지않음-r
=> 내림차순정렬-k
=> 필드 번호를 나타냄-t
=> 필드 구분자 지정-n
=>숫자 순서로 정렬file -bi
- 파일 인코딩 확인> file -bi test.txt
=> text/html; charset=utf-8iconv
- 파일 인코딩 변환iconv -c -f utf-8 -t euc-kr ttt.php > ttt2.php
: utf-8 인코딩이었던 ttt.php 를 euc-kr 로 변환하여 ttt2.php 로 저장^M
문자 제거sed -i -e "s/\r//g" fileName
ln -Tfs
- 심볼릭링크 타겟변경ex)
#심볼릭 링크 설정
ln -s /home/data1 /home/data
lrwxrwxrwx 1 root root 5 12월 13 19:28 data -> data1
#심볼릭 링크 타겟 변경
ln -Tfs /home/data2 /home/data
lrwxrwxrwx 1 root root 5 12월 13 19:28 data -> data2
sed
- 스트리밍 편집기sed [-e script][-f script-file][file...]
기본적인 기능은 ed에서 따 왔으며, 이 기능들은 모두 sed에 적용이 된다. 다만 ed는 대화형 편집기이며, sed는 스트리밍 편집기이다. 대화형 편집기와 스트리밍 편집기의 차이점은 대화형 편집기는 입력 및 출력이 하나로 이루어지며, 스트리밍 편집기는 하나의 입력이 하나의 출력을 낸다는 것이다. \n 을 개행문자로 사용하는 스트리밍 에디터이다.
sed -n '/abd/p' list.txt
: list.txt 파일을 한줄씩 읽으면서(-n : 읽은 것을 출력하지 않음) abd 문자를 찾으면 그 줄을 출력(p)한다.
sed 's/addrass/address/' list.txt
: addrass를 address로 바꾼다. 단, 원본파일을 바꾸지 않고 출력을 바꿔서 한다. sed 's/addrass/address/' list.txt > list2.txt
sed 's/\t/\ /' list.txt
: 탭문자를 엔터로 변환
sed 's/□□*/□/' list.txt
: ( *표시: □ 는 공백 문자를 표시한다. ) 위의 구문은 한개이상의 공백문자열을 하나의 공백으로 바꾼다.
scriptfile - s/
sed '/TD/d' 1.html
: TD 문자가 포함된 줄을 삭제하여 출력한다.
sed '/Src/!d' 1.html
: Src 문자가 있는 줄만 지우지 않는다.
sed '1,2d' 1.html
: 처음 1줄, 2줄을 지운다.
sed '/^$/d 1.html
: 공백라인을 삭제하는 명령이다
s/^.*\/\([a-zA-Z0-9.]*\)".*$/\1/
: ^는 라인의 맨 처음, .* 아무문자열, (, )은 정규표현식을 그룹화, $ 는 라인의 맨 끝. ( s;^.*\/\([a-zA-Z0-9.]*\)".*$;\1;)
\1는 그룹화된 첫번째 요소를 말한다.
[a-zA-Z0-9.]
는 알파벳과 숫자 및 .(콤마)를 표현하는 문자(character)를 말한다.
즉 GF02.jpg와 같은 문자열을 첫번째 그룹화하고 난 다음 라인 전체를 그룹화된 내용으로 바꾸는 것이다.
/g
: global을 의미 한줄에 대상문자가 여러개일 때도 처리하도록 한다.
who | sed -e 's; .*$;;'
: 각 라인의 첫 번째 공백에서부터 마지막까지 삭제하라.
who | sed -e 's;^.* ;;'
: 각 라인의 처음부터 맨 마지막 공백까지 삭제하라.
who | sed -e 's;^.*:;;'
: 각 라인의 처음부터 : 문자가 있는 곳(:문자포함)까지 삭제하라.
sed는 항상 표준 출력에서 입력 받은 각 라인을 나타낸다는 것을 알아냈다. 그러나 때때로 한 파일로부터 몇 개의 라인들을 추출해 내기 위해 sed를 사용하기를 원할 때도 있다. 이러한 경우에 -n옵션을 사용한다. 이 옵션은 사용자가 만약 해야 할 일을 정확히 말해 주지 않는다면 임의의 라인을 프린트하는 것을 원하지 않는다고 sed에게 말한다. 따라서 p명령이 같이 쓰인다. 라인 번호와 라인 번호의 범위를 나타냄으로써 sed를 사용하여 텍스트의 라인들을 선택적으로 프린트할 수 있게 한다. 다음에서 볼 수 있는 바와 같이, 한 파일로부터 첫 번째 두 라인이 프린트되었다.
$ sed -n '1,2p' intro Just print the first 2 lines from intro file.
만약 라인 번호 대신에 슬래시로 에워 싸인 문자열과 함께 p명령이 쓰인다면 sed는 이들 문자들이 포함하고 있는 표준 입력을 통해서 라인들을 프린트하게 된다. 따라서 하나의 파일로부터 처음의 두 라인을 프린트하기 위하여 다음과 같이 사용될 수 있다.
$ sed -n '/UNIX/p' intro Just print lines containing UNIX
sed '5d'
: 라인 5를 삭제
sed '/[Tt]est/d'
: Test 또는 test를 포함하는 모든 라인들을 삭제
sed -n '20,25p' text
: text로부터 20에서 25까지의 라인들만 프린트
sed '1,10s/unix/UNIX/g' intro
: intro의 처음 10개의 라인들의 unix를 UNIX로 변경
sed '/jan/s/-1/-5'
: jan을 포함하는 모든 라인들 위의 첫 번째 -1을 -5로 변경
sed 's/...//' data
: 각 data라인으로부터 처음 세 개의 문자들을 삭제
sed 's/...$//' data
: 각 데이터 라인으로부터 마지막 3문자들을 삭제
sed -n '1' text
: 비 프린트 문자들을 \nn으로 (여기서 nn은 그 문자의 8진수 값임),
그 리고 탭 문자들을 > 로 나타내는 각 텍스트로부터의 모든 라인들을 프린트