// Returning T, throwing the exception on failure
@annotation.tailrec
def retry[T](n: Int)(fn: => T): T = {
util.Try { fn } match {
case util.Success(x) => x
case _ if n > 1 => retry(n - 1)(fn)
case util.Failure(e) => throw e
}
}
// Returning a Try[T] wrapper
@annotation.tailrec
def retry[T](n: Int)(fn: => T): util.Try[T] = {
util.Try { fn } match {
case x: util.Success[T] => x
case _ if n > 1 => retry(n - 1)(fn)
case fn => fn
}
}
val retryResult = retry(3) {
if (validateObject(esClient)) {
esClient
} else {
null
}
}
회사에서는 아직 Spark1.6
을 사용하고 있어 hiveContext
를 사용하여 HDFS에 있는 orc, parquet, gz등을 읽을 때가 많다.
이때 filePath
입력시 주의할 점이 있다.
file를 읽을때 아래와 같은 코드를 사용한다.
JavaRDD<String> tempRDD = hiveContext.read().format("orc").load(path)
basePath/1.orc
basepath/2.orc
path자리에 basePath/*
로 입력하면 file을 읽지 못한다.
또 이 문제는 Java Spark에서만 발생한다.
basePath/*.orc
로 확장자 아스타를 넣어주는 방식basePath/
까지만 path로 입력한다.대부분의 spark job을 처리할때 hadoop에 처리할 파일을 업로드하여 사용한다. 하지만 Meta file이나 config file같이 용량이 얼마 되지 않은 파일을 Hadoop에 업로드하여 사용하기에는 부담이 있다.(HDFS 블록갯수가 많아지는 이슈)
그래서 resource(Maven프로젝트시)에 관련 파일을 읽는 법이 필요하다.
총 4가지 방법이 존재 한다.
예제 소스는 다음과 같다.
평소에 위와 같이 많이 사용한다. 이번에 Spark에서 Meta File을 읽는 과정에서 getResource
를 사용했지만 동작이 되지 않아 테스트를 진행했다.
위 예제소스를 가지고 Test를 진행했다.
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
//class에서 getResource(AsStream)을 이용한 방식 (상대경로 절대경로 test도 추가)
readFileByResourceAsStream("config.txt")
readFileByResourceAsStream("/config.txt")
readFileByResource("r1/config.txt")
readFileByResource("/r1/config.txt")
//classLoader에서 getResource(AsStream)을 이용한 방식 (상대경로 절대경로 test도 추가)
readFileByClassLoaderResourceAsStream("r1/config.txt")
readFileByClassLoaderResourceAsStream("/r1/config.txt")
readFileByClassLoaderResource("r1/config.txt")
readFileByClassLoaderResource("/r1/config.txt")
println("class loaders start")
var loader = SparkResourceTest.getClass.getClassLoader
do {
println(s"class loader : ${loader}")
} while ( {
loader = loader.getParent;
loader != null
})
println("class loaders end")
}
결과는 다음과 같았다.
read file by ResourceAsStream : config.txt
getClass file read error : java.lang.NullPointerException
read file by ResourceAsStream : /config.txt
file line : 1,a
file line : 2,b
after read file
read file by Resource : config.txt
getClass file read error : java.lang.NullPointerException
read file by Resource : /config.txt
getClass file read error : java.nio.file.FileSystemNotFoundException
read file by ClassloaderResourceAsStream : config.txt
file line : 1,a
file line : 2,b
after read file
read file by ClassloaderResourceAsStream : /config.txt
getClass file read error : java.lang.NullPointerException
read file by ClassloaderResource : config.txt
getClass file read error : java.nio.file.FileSystemNotFoundException
read file by ClassloaderResource : /config.txt
getClass file read error : java.lang.NullPointerException
정리하면
class, classLoader에 상관없이 getResourceAsStream만 동작하였다.
왜 getResourceAsStream만 동작할까 Scala나 Java에서는 getResource도 사용가능한데…
Spark와 Scala(Java)에 차이가 있다면 File URL Path를 찾을 때 참조하는 classLoader 갯수가 달랐다.
MutableURLClassLoader
클래스가 private이라 디버깅은 못해봤지만,,,
Spark는 File URL Path를 찾아 나가는 방식이 Scala(Java)와는 반대 방향이고 찾는 규칙이 Scala(Java)보다 한단계 더 있을 것으로 추측된다.
Spark에서 Resouce File을 읽을 때 절대경로로 읽고 싶으면 class.getResourceAsStream
상대경로로 읽고 싶으면 class.getClassLoader.getResourceAsStream
으로 사용하면 된다.
spark-submit \
--master yarn-cluster
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true
...
dynamicAllocation
옵션을 사용할때 항상 같이 사용하는 옵션이 spark.shuffle.service.enabled=true
이다. 하지만 해당 shuffle 옵션은 yarn-cluster
모드에서는 동작하지 않는다.
이 문제를 해결하기 위해서는 spark-submit
을 할때 shuffle서비스 관련 jar를 함께 배포하는 것이다.
아래와 같이 작성한다.
spark-submit \
--master yarn-cluster
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--jars "/spark/lib/spark-1.6.3-yarn-shuffle.jar,/spark/lib/datanucleus-rdbms-3.2.9.jar,/spark/lib/datanucleus-core-3.2.10.jar,/spark/lib/datanucleus-api-jdo-3.2.6.jar" \
...
spark 2.x대와 1.x대 버전에서 shuffle관련 jar path가 다르다. 2.x대 버전에서는 1.x대 버전에 있는 여러개 jar파일들을 하나로 합친것 같다.
spark 1.6버전에서 shuffle관련 jar path ==>
-$sparkHome/lib/spark-1.6.3-yarn-shuffle.jar
-$sparkHome/lib/datanucleus-rdbms-3.2.9.jar
-$sparkHome/lib/datanucleus-core-3.2.10.jar
-$sparkHome/lib/datanucleus-api-jdo-3.2.6.jar
spark 2.x버전에서 suffle관련 jar path ==>
-$sparkHome/yarn/spark-2.1.1-yarn-shuffle.jar
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version>
</dependency>
위 dependency를 이용하는 프로젝트를 jekins을 이용하여 maven build를 할려고 했다.
하지만 다음과 같은 문제가 발생하여 빌드가 진행되지 않았다.
여기서 주목할 점은 Received fatal alert: protocol_version
이다!!
maven repo에서 다운로드를 못 하고 있는 것이다.
문제를 찾아보니 TLS library
문제라고 한다. client에서 사용하는 java버전에 따라서 TLS이 기본으로 활성화 되있거나 비활성화 되있다고 한다.
불행하게도 회사 jekins의 기본 java version은 java7이고 공식 문서에 따르면 java7에서는 TLS버전 1.2을 지원하지만 default상태는 비활성화 되어 있다.
해결 방안은 두가지 방법이 존재하는데,
-Dhttps.protocols=TLSv1.2
을 추가해준다.1.8.0_60-b27
보다 높은 버전을 사용해야한다.)두가지 방법중 어느 것을 택해도 문제는 해결된다.
아래 글은 https://mapr.com/blog/in-depth-look-hbase-architecture/ 글을 정리 한것입니다.
HBase는 물리적으로는 Master-Slave 구조로 3가지 서버의 구성 요소를 가집니다.
Region Server가 담당하는 데이터들은 Hadoop의 DataNode에 저장된다.
모든 HBase Data는 HDFS 파일안에 저장된다.
Region Server는 HDFS DataNode와 협력을 함으로써, 데이터 지역성을 가능하게 한다.
NameNode는 물리적인 데이터 블럭에 대한 metadata 정보를 유지한다.
zookeeper는 분산 시스템의 멤버들에게 상태정보를 공유하기 위해서 사용되는 서비스이다.
Region 서버와 Active HMaster는 session을 이용해서 zookeeper에 연결한다.
zookeeper는 ephemeral node를 유지한다.
각각의 Resion Serversms ephemeral node를 하나 생성한다.
HMaster는 이 노드를 모니터해서 사용가능한 region server를 발견하고, 또 server의 failure도 모니터한다.
HMaster는 ephemeral node하나 생성하는 것을 경쟁한다.
zookeeper는 첫번째 것을 고려하고, 하나의 master가 active할 수 있게 한다.
Actvie Master는 하트비트를 zookeeper에게 보내고, inactive HMaster는 active HMaster의 실패의 알림을 듣는 역할을 한다.
만약 Region server나 active HMaster가 하트비트 보내는 것을 실패하면, session이 만료되고, 대응하는 노드는 삭제된다.
Active HMaster는 region server를 듣고, 실패시 region server의 복구할 것이고, Inactive HMaster는 active HMaster의 실패를 듣고 있다가, active HMaster가 실패하면 inactive HMaster가 active가 된다.
정리
1) Active HMaster 2) Inactive HMaster 3) Region Server 이렇게 있는데, HMaster와 Regsion Server는 ephemeral node를 만들어서 세션을 zookeeper와 연결한다. 그 노드를 통해서, ActiveHMaster는 Region Server의 상태를 체크하고, 어떤 서버가 이용가능한지 보고, 그리고 Region server가 실패하면 복구를 명령한다. Inactive HMaster는 ActiveHMaster를 감시한다. Active HMaster가 실패하면, 자기 자신이 Active가 된다.
HBase에는 Meta Table이라는 특별한 Catalog Table이 있다. Cluster내에 있는 region의 위치를 담고 있다.
zookeeper는 region의 위치를 Meta Table에 저장한다.
처음으로 읽고 쓰는 작업
나중에 다시 읽을 때는, client는 meta location을 가지고 오기 위해 캐쉬를 이용한다. (이전에 미리 읽어놨기 때문에 캐쉬에 저장되어 있다.)
region이 다른 곳으로 이동되어서, 정보를 가져오는 것에 실패하지 않는 한, Meta Table에 쿼리를 할 필요가 없다. (정보를 가져오는 것을 실패 했을 때는 re-query를 하고, 캐쉬를 update한다.)
.META
table은 B-Tree같은 구조이다..META
table 구조는 다음과 같다.
정리
Meta Table은 시작되는 row key정보를 갖고 있고, region id를 갖고 있으며, 어느 server지를 가르키는 region server를 value로 갖고 있다.
Region Server는 HDFS Data Node 위에서 실행되고 다음 component를 가진다.
client가 put request를 날리면, 첫번째로 WAL이 일어난다.
Edits가 disk에 저장되어 있는 WAL파일 끝에 로그를 쓴다.
WAL는 Server Crash가 발생시 not-yet-persisted data를 복구하기 위해 사용된곤한다.
일단 데이터가 WAL에 쓰여지지만, MemStore에 놓여진다. 그런후에, Put Request는 client에게 Acknowledgement 반환할 것을 요청한다.
MemStore는 정렬된 KeyValue로써, 메모리안에 업데이트를 저장한다. HFile에 저장되어 진것과 같은 모양이다.
column family당 하나의 MemStore이 있다. Update는 column family마다 정렬되어 진다.
MemStore에 충분한 데이터를 축적하면 저장되어진 데이터가 HDFS내에 새로운 HFile로 쓰여진다.
HBase는 하나의 column family당 여러개의 HFile을 사용하고, 실제의 cell 또는 Key-Value객체를 담고 있다.
MemStore안에서 Key-Value로 정려로디어진채로 생성되어진 이들 파일은 디스크에 파일로써 flush된다.
하나의 CF마다 하나의 MemStore가 있다. 하나가 full이 되면 모든것이 flush가 된다. 또 마지막으로 쓰여진 스퀀스 번호를 저장하여, 시스템이 지금까지 저장된 것이 무엇인지 알수 있게 한다. 이것이 왜 column family 수의 제한이 있는지 이유이다.
가장 큰 시퀀스 숫자는 각각의 HFile안에 meta filed로써 저장 되어 진다. 지속하고 있는것이 끝나지고 어디서 계속되는지 반영하기 위해서 meta file에 저장한다.
데이터는 정렬된 key-value를 담고있는 HFile로 저장된다. MemStore에 적당한 데이터를 모우면 전체 정려로딘 key-value set이 새로운 Hfile로 HDFS안에 쓰여진다. 이것은 연속적인 쓰기작업이다. Disk Drive Head 움직임을 피하기 때문에 매우 빠르다.
하나의 HFile은 Multi-Layered Index를 가지고, 이것 때문에 전체 파일을 읽을 필요없이 HBase내에 데이터를 찾을 수 있게 해준다. (Multi-level index는 b+tree와 같다.)
Trailer는 meta block을 가리킨다. 그리고 이것은 file에 지속적인 데이터의 끝에 쓰여진다. 또 Trailer는 bloom filter와 같은 정보와 시간 범위 정보를 가진다. (bloom filter는 특정한 row key를 포함하지 않은 파일을 read skip을 해주어 read비용을 줄여준다.) 시간 범위 정보는 읽고자 하는 시간 범위가 아닐경우 파일을 skipping하는데 유용하다.
읽는 순서
MemStore마다 많은 HFile이 존재한다. 이 의미는 한번 읽을때 마다, 여러개의 파일이 검사되어진다는 의미이고, 이것은 성능에 영향을 줄 가능성이 크다
이것을 Read Amplification
이라고 부른다.
HBase는 자동적으로 어떤 작은양의 HFile을 집어와서, 약간 더 큰 HFile안에 다시 쓴다. 이 과정을 Minor Compaction
이라고 한다.
Minor Compaction은 Storage File 숫자를 줄이고 작은 HFile을 좀 더 큰 HFile로 쓴다. 이 작업은 Merge Sort로 진행된다.
Major Compaction은 한 Region안에 있는 모든 Hfile을 하나의 Column Family마다의 HFile로 머지하고 다시 쓰는 작업이다. 그리고 이 과정에서, 셀을 drop, 삭제하거나 expire한다.
이 작업이 이뤄지면, 읽기 성능을 향상시킨다. 그러나 Major Compaction이 모든 파일을 새로 쓰기 때문에 Disk I/O와 Network Traffic이 많이 발생한다. 이 작업을 write amplification
이라고 한다.
Major Compaction은 자동적으로 실행되도록 스케줄되어 있다. write amplification 때문에 Major Compaction은 보통 주말 또는 오후에 일어난다. (Note that MapR-DB has made improvements and does not need to do compactions)
Table은 하나또는 여러개의 Region으로 나눠진다. Region은 start key와 end key사이에 연속적으로 정렬된 범위의 row를 갖는다. default로 각각의 Region은 1G 사이즈를 갖는다. 각 Region은 RegionServer에 의해 client에게 서비스 제공한다. Region server은 1000개 정도의 Region을 관리한다.
초기에는 하나의 테이블에 하나의 Region이 있다. Region이 점점 커질수로, 이것은 두개의 child Region으로 나눠진다. 두개의 child Region이 같은 Region Server에서 평행하여 열리고, split 가 HMaster에게 보고한다. Load balancing 이유 때문에, HMaster는 새로운 region을 다른 서버로 보내도록 스케줄링 할 수도 있다.
하나의 Region이 커져서, split되어서 두개의 Region 으로 나눠지고, load balancing 문제 때문에 하나의 Region이 다른 서버로 옮겨지게 된다. 이것은, major compaction이 데이터 file을 Region 서버의 local node로 옮길때까지, 새로운 Region server serving으로 끝난다.
HBase data는 처음 쓸 때는, local이지만, region이 다른 서버로 옮겨지면, 더이상 local이 아니다. ( 정확히는 major compaction이 일어날 때까지)
이 말인즉, 처음에 Region 이 분리 되어지면, 다른 Region Server가 해당 Region을 맡게 된다. 이때는 Remote 로 데이터를 맡는 것이다. 이후에 Major compaction이 일어나면 그때는 local이 된다.
모든 쓰가와 읽그는 primary node로부터 이고, primary node 까지이다. HDFS는 WAL와 HFile block을 복제한다. HFile block replication은 자동적으로 이루어진다. Hbase은 HDFS에 의존하여 이것의 파일을 저장하듯이 하여 데이터 안전을 제공한다. 데이터가 HDFS에 쓰여지면 하나의 copy는 locally하게 저장되고, 그런 후에 replicated가 secondary node로 된다. 세번째 copy는 tertiary node에 진행된다.
zookeeper는 region server의 heart beat를 잃게 되면 Node failure를 고려할것이다. HMaster은 Region Server가 실패되어진다는 알림을 받을 것이다.
HMaster는 region server가 crash 되었다는것을 알아차리고, HMaster는 region을 충돌한 서버로부터 active Region server로 재할당 할것이다. 충돌난 region server에서 아직 disk에 쓰여지지 않은 memstore edit를 복구 하기 위해서, HMaster는 충돌난 서버에 존재하는 WAL을 여러개의 file로 나눌 것이고, 이 파일을 새로운 region server의 data node에 저장한다. 각각의 Region server은 그런뒤에 WAL을 replay 해서, memstore을 rebuild 한다.
WAL 파일들은 많은 작업의 리스트를 가진다. 하나의 edit는 하나의 put/delete를 나타낸다. Edits는 시간순서로 일어나고, 저장을 위해서 추가적인것은 디스크에 저장되는 WAL 파일의 끝에 시간순서로 일어난다.
그러니깐, 만약 data가 여전히 memory에 있고, HFile로 저장되어진게 아닌채로 실패가 발생하면 무슨일이 일어날까?
WAL이 replayed 되어진다. 추가와 정렬은 current MemStore에 이뤄진다. 끝으로 MemStore은 HFile에 change본이 flush 된다.