캐시 evict 규칙을 가장 오래동안 사용하지 않는 순으로 삭제하는 것이다.
LinkedList
와 LinkedHashMap
으로 쉽게 구현이 가능하다.
LinkedList
보다 LinkedHashMap
으로 구현한게 성능은 더 좋다.
import java.util.LinkedList;
public class LRUList<T> {
private final LinkedList<T> cache = new LinkedList<>();
private int cacheSize;
public LRUList(int cacheSize) {
if (cacheSize <= 0) {
throw new IllegalArgumentException("The maximum of the size should be a positive integer.");
}
this.cacheSize = cacheSize;
}
public T getData(T data) {
// cache hit
if(cache.remove(data)){
cache.addFirst(data);
// cache miss
} else {
int currentSize = cache.size();
if(currentSize == cacheSize){
cache.pollLast();
}
cache.addFirst(data);
}
return cache.getFirst();
}
}
import java.util.LinkedHashMap;
import java.util.Map;
public class LRUHashMap<K, V> extends LinkedHashMap<K, V> {
private int size;
private LRUHashMap(int size) {
super(size, 0.75f, true);
this.size = size;
}
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > size;
}
public static <K, V> LRUHashMap<K, V> newInstance(int size) {
return new LRUHashMap<K, V>(size);
}
}
LinkedHashMap
에서 accessOrder
를 true
로 주면 LRU처럼 동장한다. 만약 ‘false`이면 insert되는 순서로 저장된다.
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Set;
public class LRUMap<K, V> implements Serializable {
private static final long serialVersionUID = 99123123123L;
// 성능 개선을 위해 accessOrder를 변경
private final LinkedHashMap<K, V> cache = new LinkedHashMap<>(0, 0.75f, false);
private int cacheSize;
public LRUMap(int cacheSize) {
if (cacheSize <= 0) {
throw new IllegalArgumentException("The maximum of the size should be a positive integer.");
}
this.cacheSize = cacheSize;
}
// get과 put이 동시에 발생하므로 access에 대한 put을 별도로 진행하지 않음
public V get(K key) {
synchronized (cache) {
return cache.get(key);
}
}
public void put(K key, V v) {
this.put(key, v, true);
}
public void put(K key, V v, boolean check) {
if (check) {
if (key == null || v == null) {
throw new IllegalArgumentException("The key or the value is null.");
}
synchronized (cache) {
if (cache.containsKey(key)) {
cache.remove(key);
} else {
if (cache.size() >= cacheSize) {
cache.remove(cache.keySet().iterator().next());
}
}
// 테스트를 통해 삭제 후 put을 수행하는 것이 유리함을 확인함
cache.put(key, v);
}
} else {
cache.put(key, v);
}
}
public void remove(K k) {
synchronized (cache) {
cache.remove(k);
}
}
public boolean contains(K k) {
synchronized (cache) {
return cache.containsKey(k);
}
}
public int size() {
synchronized (cache) {
return cache.size();
}
}
public Set<K> keySet() {
return cache.keySet();
}
@Override
public String toString() {
return "LRUMap{" +
"cache=" + cache +
", cacheSize=" + cacheSize +
'}';
}
}
회사에서 parquet로 적재된 데이터를 특정row의 특정컬럼 데이터를 Null로 update하는 작업을 수행하던 중 발견한 이슈이다.
impala로 insert overwirte하면, 불필요한 file I/O가 발생하여 직접 parquet rewriter를 만들어 rowGroup단위로 필요한 부분을 update하는 엔진을 만들었다. 여기서 최적화를 한번더하면, rowGroup보다 더 작은 단위인 page단위로 update를 하는 처리를 할 수 있는데, rowGroup단위로 해도 충분히 성능이 나와 stop했음 해당 엔진을 만들 때 참고한 소스)
Java를 이용해서 Hadoop Parquet read/write를 하기 위해서 많이 사용되는 것이 avro-parquet라이브러리 일 것이다.
하지만 이 라이브러리에서는 INT96
type (Decimal, timestamp)를 지원하지 않는다. (parquet-format은 INT96타입을 지원하지 않음 참고)
Decimal타입을 사용하지 않으면, 굳이 해당 내용을 생각할 필요가 없다.
PrimitiveType asPrimitive = parquetType.asPrimitiveType();
PrimitiveTypeName parquetPrimitiveTypeName = asPrimitive.getPrimitiveTypeName();
final OriginalType annotation = parquetType.getOriginalType();
Schema schema = (Schema)parquetPrimitiveTypeName.convert(new PrimitiveTypeNameConverter<Schema, RuntimeException>() {
public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Type.BOOLEAN);
}
public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Type.INT);
}
public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Type.LONG);
}
public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
throw new IllegalArgumentException("INT96 not yet implemented.");
}
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Type.FLOAT);
}
public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Type.DOUBLE);
}
public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
int size = parquetType.asPrimitiveType().getTypeLength();
return Schema.createFixed(parquetType.getName(), (String)null, (String)null, size);
}
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
return annotation != OriginalType.UTF8 && annotation != OriginalType.ENUM ? Schema.create(Type.BYTES) : Schema.create(Type.STRING);
}
위 코드는 avro-parquet 내부소스인데, INT96
type에 대해서 컨버터를 만들어 주지 않아.
java.lang.IllegalArgumentException: INT96 not yet implemented
Exception을 표시한다.
Parquet-writer를 만들때 이슈를 정리하면 다음과 같다.
Sets which Parquet timestamp type to use when Spark writes data to Parquet files.
INT96 is a non-standard but commonly used timestamp type in Parquet.
TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch.
TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value.
2019년 1월 올해는 꼭 해야할 일을 정리했다. 그 중 OpenSource활동하기
라는 막연한 계획이 있었다.
2월에 Cassandra에 BulkLoad하는 기능이 필요해 이것 저것 찾던 중에 요구사항에 만족할 만한 것들이 없었다. 그래서 한번 내가 만들어 볼까라는 생각을 했고, 그렇게 회사 프로젝트에 CassandraBulkLoad 모듈을 만들어 적용했다. 하지만 원시 소스형태였고, 사용하기 불편한 형태의 코드였다.
작업한 코드는 BulkLoad하는 동안 Cassandra Read성능이 크게 떨어지지 않고, BulkLoad를 하는 방법이 단순해서 이거 Spark Lib으로 제작해보자라는 생각만 가지고 시간을 보냈다.
8월에 회사를 퇴사하게되면서, 자유시간이 많아졌다. 2월부터 생각만 했던 작업이 생각났고 자연스레 코드를 다시보았다. 그리고 Spark Lib형태로 제작하기 시작했다. 그렇게 탄생한게 Spark2CassandraBulkLoad이다.
해당 현재 해당 Repo는 Star가 5개이고, 대부분 프로젝트 내용을 알고 있는 회사사람들이다.
Lib을 제작하면서, 처음으로 JCenter에 배포도 해보고 spark-package.org에 등록도 하였다.
퇴사를 하면서 intellij License가 만료되어 걱정이였는데, 해당 repo로 JetBrains에 opensource lincense를 신청해 1년치 All Products License 받았다.
3년 회사생활를 잘 마무리한 것 같아 뿌듯하다. 다음 주부터 새로운 집에서 새로운 회사로 출근한다.
2019년 남은 4개월동안 새로운 곳에서 의미있는 일을 한번 해보자!
현재 그리고 앞으로의 프로젝트에서 java8이하 버전을 사용할 가능성이 거의 없다고 생각한다. 그래서 java8 java.time.*
을 정리하고자 한다.
(데이터를 날짜별로 분류하거나 로드할때 날짜 연산이 많이 사용되는데, 할때마다 날짜관련 메소드가 햇갈려서 이번에 정리하고자 한다.)
Period : 두 날짜 사이의 [년,월,일]로 표현되는 기간 (시간을 다루지 않다 보니 LocalDate를 사용한다)
Duration : 두 시간 사이의 [일,시,분,초]로 표현되는 기간 (Instant 클래스를 사용하고, seconds와 nanoseconds로 측정 되지만 [일,시,분,초]로 변환해 주는 메쏘드를 제공)
LocalDate.now(); // 오늘
LocalDateTime.now(); // 지금
LocalDate.of(2019, 7, 22); // 2019년7월22일
LocalDateTime.of(2019, 7, 22, 23, 23, 50); // 2019년7월22일23시23분50초
Year.of(2017).atMonth(7).atDay(22).atTime(10, 30); // 2019년7월22일 10시30분00초
Period.ofYears(2); // 2년간(P2Y)
Period.ofMonths(5); // 5개월간(P5M)
Period.ofWeeks(3); // 3주간(P21D)
Period.ofDays(20); // 20일간(P20D)
Duration.ofDays(2); // 48시간(PT48H)
Duration.ofHours(8); // 8시간(PT8H)
Duration.ofMinutes(10); // 10분간(PT10M)
Duration.ofSeconds(30); // 30초간(PT30S)
LocalTime.of(9, 0, 0).plus(Duration.ofMinutes(10)); // (9시 + 10분간) = 9시10분
LocalDate.of(2019, 7, 22).plus(Period.ofDays(1)); // (2019년7월22일 + 1일간) = 2019년7월23일
LocalDateTime.of(2019, 7, 1, 23, 47, 5).minus(Period.ofWeeks(3)); // (2019년7월1일 23시47분05초 - 3주간) = 2019년6월10일 23시47분05초
LocalDate.now().plusDays(1); // (오늘 + 1일) = 내일
LocalTime.now().minusHours(3); // (지금 - 3시간) = 3시간 전
Period.between(LocalDate.of(1950, 6, 25), LocalDate.of(1953, 7, 27)); // (1953년7월27일 - 1950년6월25일) = 3년1개월2일간(P3Y1M2D)
Period.between(LocalDate.of(1950, 6, 25), LocalDate.of(1953, 7, 27)).getDays(); // 3년1개월2일간 => 2일간
LocalDate.of(1950, 6, 25).until(LocalDate.of(1953, 7, 27), ChronoUnit.DAYS); // 3년1개월2일간 => 1128일간
ChronoUnit.DAYS.between(LocalDate.of(1950, 6, 25), LocalDate.of(1953, 7, 27)); // 3년1개월2일간 => 1128일간
Duration.between(LocalTime.of(10, 50), LocalTime.of(19, 0)); // (19시00분00초 - 10시50분00초) = 8시간10분간(PT8H10M)
Duration.between(LocalDateTime.of(2015, 1, 1, 0, 0), LocalDateTime.of(2016, 1, 1, 0, 0)).toDays(); // 365일간
ChronoUnit.YEARS.between(LocalDate.of(2015, 5, 5), LocalDate.of(2017, 2, 1)); // 1년간
LocalDate.of(2020, 12, 12).format(DateTimeFormatter.BASIC_ISO_DATE); // 20201212
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 2015-04-18 00:42:24
Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()); // Sat Apr 18 01:00:30 KST 2015
Date.valueOf(LocalDate.of(2015, 5, 5)); // 2015-05-05
Timestamp.valueOf(LocalDateTime.now()); // 2015-04-18 01:06:55.323
LocalDate.parse("2002-05-09"); // 2002-05-09
LocalDate.parse("20081004", DateTimeFormatter.BASIC_ISO_DATE); // 2008-10-04
LocalDateTime.parse("2007-12-03T10:15:30"); // 2007-12-03T10:15:30
LocalDateTime.parse("2010-11-25 12:30:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 2010-11-25T12:30
LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()); // 2015-04-18T01:16:46.755
new Date(System.currentTimeMillis()).toLocalDate(); // 2015-04-18
new Timestamp(System.currentTimeMillis()).toLocalDateTime(); // 2015-04-18T01:20:07.364
LocalDate.from(LocalDateTime.now()); // 2015-04-18
LocalDate.now().atTime(2, 30); // 2015-04-18T02:30
LocalDate.now().with(TemporalAdjusters.next(DayOfWeek.SATURDAY)); // 다음 토요일
LocalDate.of(2016, 5, 1).with(TemporalAdjusters.dayOfWeekInMonth(3, DayOfWeek.SUNDAY)); // 2016년5월 세번째 일요일
LocalDate.of(2015, 7, 1).with(TemporalAdjusters.firstInMonth(DayOfWeek.MONDAY)); // 2015년7월 첫번째 월요일
DayOfWeek.MONDAY.getDisplayName(TextStyle.FULL, Locale.ENGLISH); // Monday
DayOfWeek.MONDAY.getDisplayName(TextStyle.NARROW, Locale.ENGLISH); // M
DayOfWeek.MONDAY.getDisplayName(TextStyle.SHORT, Locale.ENGLISH); // Mon
DayOfWeek.MONDAY.getDisplayName(TextStyle.FULL, Locale.KOREAN); // 월요일
DayOfWeek.MONDAY.getDisplayName(TextStyle.NARROW, Locale.KOREAN); // 월
DayOfWeek.MONDAY.getDisplayName(TextStyle.SHORT, Locale.KOREAN); // 월
Month.FEBRUARY.getDisplayName(TextStyle.FULL, Locale.US); // February
Month.FEBRUARY.getDisplayName(TextStyle.FULL, Locale.KOREA); // 2월
NoSQL, 분산스토리지를 사용하면서 많이 들었던 용어를 정리할 필요가 있다고 느껴 정리하고자 한다.
블룸필터(Bloom Filter)는 1970년도에 Burton H. Bloom이 고안한 것으로 공간 효율적인 probabilistic data structure이며 구성요소가 집합의 구성원인지 점검하는데 사용된다. False positive(없다고 예측하고 실제로 없는경우)들이 가능하며, false negative(없다고 예측하고 실제로 있는경우)들은 불가능하다. 요소들은 집합에 추가될 수 있으나 제거는 되지 않는다. 그리고 블룸 필터는 메모리를 좀 더 사용함으로써 자주 호출되는 비싼 함수들의 성능을 크게 향상시키는 방법 중에 하나이다.
(즉 집합에 원소가 없으면 진짜 없다는 것을 보장해준다.)
+-+-+-+-+-+-+-+-+-+-+
|0|0|0|0|0|0|0|0|0|0|
+-+-+-+-+-+-+-+-+-+-+
0 1 2 3 4 5 6 7 8 9
데이터 A를 삽입한다고 가정하면, 데이터 A를 두 해시 함수 h1, h2를 거쳐 해시 값을 계산한다. 만일 3=h1(A), 7=h2(A) 였다고 하자. 이 해시 값 3, 7에 해당하는 곳에 1을 셋팅한다. 이제 배열은 아래와 같아진다.
+-+-+-+-+-+-+-+-+-+-+
|0|0|0|1|0|0|0|1|0|0|
+-+-+-+-+-+-+-+-+-+-+
0 1 2 3 4 5 6 7 8 9
그 다음 데이터 A를 검색한다면, 해시 함수를 거치면 3=h1(A), 7=h2(A)이다. 3, 7에 대응하는 배열을 보면 양쪽 모두 1로되어 있어서 리턴은 true로 던저질 것이다.
+-+-+-+-+-+-+-+-+-+-+
|0|0|1|1|0|0|0|1|0|0|
+-+-+-+-+-+-+-+-+-+-+
0 1 2 3 4 5 6 7 8 9
* *
그 다음 데이터 C를 검사한다면, 해시 함수를 거치면 3=h1(C), 6=h2(C)이다. 그런데 3은 1이나 6이 0이므로 false가 리턴되어 없는 것으로 간주하게 된다.
+-+-+-+-+-+-+-+-+-+-+
|0|0|1|1|0|0|0|1|0|0|
+-+-+-+-+-+-+-+-+-+-+
0 1 2 3 4 5 6 7 8 9
* *
이런 형태로 동작하는 방식이다. Bloom Filter는 기존 hash table의 key-value의 쌍으로 자정하지 않고 hash table상에 키가 존재하는지 안하는지 true, false 정보의 조각(Bit의 Set)이 저장된다고 보면 된다. 즉, 각 Value를 저장하는 대신에 Bloom Filter는 Key가 존재하는 지점(해시 함수에 의해)을 가르키는 Bits의 배열이라고 봐도 된다. 찾는 비트의 조합이 모두 1일 경우 제대로 찾을 확률이 높아진다는 것이다. Space와 Time을 적절하게 배합하고 고른 분포도를 가지며 Collision을 대처한다면 좋은 이득을 얻을 수 있다. 많은 비트를 할당할수록 성능은 좋을 수 있으나 많은 메모리가 필요하고 해싱 함수를 늘리게 되면 연산이 많아지게 되어 성능은 느리나 메모리를 덜 차지하게 되는 trade-off 관계가 존재한다. Optimizing하는게 관건이다.
https://medium.com/@balrajasubbiah/lamport-clocks-and-vector-clocks-b713db1890d7
Decentralization, Partition tolerance를 지원하기 위해 Cassandra는 Gossip Protocol을 사용한다. Gossip Protocol은 각 노드가 클러스터의 다른 노드의 상태정보를 추적하는 것이고 이는 매초 발생한다.
Two Phase Commit(a.k.a. 2PC)은 distributed system에서 atomic commit을 보장하는 프로토콜이다.
2PC에서 노드는 한 개의 coordinator와 여러 개의 cohort로 나누어진다. Coordinator는 commit 할 transaction을 만드는 노드고, cohort들은 coordinator가 보낸 transaction을 commit 하거나 revert 한다. 2PC는 이때 fail 하지 않은 모든 cohort가 같은 상태를 유지하도록 하는 것이다. 즉, fail 하지 않은 모든 노드는 다 같이 commit 하거나 revert 한다.
이때 coordinator를 어떻게 선정할지는 2PC의 영역이 아니다. 고정된 coordinator를 계속 사용할 수도 있고, 차례대로 돌아가면서 coordinator가 될 수도 있고, 별도의 leader election을 사용하여 coordinator를 선정할 수도 있다. 2PC는 어떻게든 coordinator가 선정된 뒤의 일이다.
2PC는 이름 그대로 2가지 phase로 나누어져 있다. 첫 번째 phase는 voting phase라고 부르고, 두 번째 phase는 commit phase라고 불린다. 각 phase의 시작은 coordinator가 보내는 메시지로 시작한다.
Voting phase에서 coordinator는 commit 하고 싶은 transaction을 commit 할지 말지 투표하는 요청을 cohort에게 보낸다. VOTE 메시지를 받은 cohort들은 이 transaction을 바로 commit 하지 않는다. 해당 transaction을 커밋할 수 있으면 YES 메시지를, 없으면 NO 메시지를 coordinator에게 보낸다.
Voting phase에서 coordinator가 quorum 이상의 YES 메시지나 NO 메시지를 모으면 commit phase를 시작한다. 이때 일부 cohort에 문제가 생겨서 더 진행되지 않는 것을 방지하기 위해서 일정 시간 응답을 주지 않는 cohort는 NO 메시지를 보냈다고 가정한다.
이때 quorum을 얼마로 잡는가에 따라서 시스템의 consistency model과 resilience가 결정된다. 예를 들어 N개의 coordinator가 있는 시스템에서 N개의 YES 메시지를 모아야 한다면, 하나의 failure도 용납하지 않는 resilient 하지 않지만, strong consistency를 보장하는 시스템이 된다. Quorum이 얼마가 되어야 하는지는 정해지지 않았다. 하지만 non-partition 상황에서 consistency를 보장하기 위해서는 최소 N/2 이상의 YES 메시지를 모아야 한다.
Coordinator가 quorum 이상의 YES 메시지를 받았으면 cohort들에게 COMMIT 메시지를 보내고, quorum 이상의 NO 메시지를 받았으면 cohort 들에게 ROLLBACK 메시지를 보낸다. cohort는 COMMIT 메시지를 받았으면 voting phase에서 받았던 transaction을 커밋하고, ROLLBACK 메시지를 받았으면 그 transaction을 버린다. COMMIT이든 ROLLBACK이든 메시지를 처리하고 나면 cohort는 coordinator에게 처리했다는 메시지를 보낸다. Coordinator가 cohort들에게 처리가 끝났다는 메시지를 받으면 commit phase가 끝난다.
위의 과정을 거쳐 2PC가 진행된다. 앞서 말했듯이 2PC는 괜찮은 resilience를 보이면서, 성능도 나쁘지 않기 때문에 많이 사용된다. 특히 atomic commit을 지원하는 프로토콜 중에서는 가장 적은 메시지 수로 commit 될 수 있는 프로토콜이다.
하지만 2PC에는 심각한 문제가 하나 있다. 2PC는 VOTE 메시지를 보낸 coordinator가 죽어서 COMMIT이나 ROLLBACK 메시지를 보내지 못하면 YES 메시지를 보낸 cohort가 안전하게 상태를 회복할 방법이 없다. 이는 YES 메시지를 보낸 cohort의 상태가 undefined이기 때문이다.
“분산 시스템에서는 3개 속성을 모두 가지는 것이 불가능하다!” 이다. 각 속성은 아래 3가지이다.
서버가 여러대인 분산시스템에서 데이터를 조회했을 때 특정 서버는 변경된 데이터가 조회되고 일부는 변경되지 않은 상태로 조회될 수 있다. 그 때 데이터의 일관성을 위해서 모든 서버에 결과값을 질의하고 N개 이상이 같은 값을 반환할 때 사용자에게 해당 값을 보여주는 것을 의미한다.
ex > Cassandra에서 Consistency read level을 설정하는 것
데이터의 저장을 칼럼단위로 처리하는 데이터베이스를 말한다.
칼럼 단위의 값은 데이터가 유사할 가능성이 높다. 이로 인해 높은 압축율을 얻을 수 있다.
MIN, MAX, SUM, COUNT 와 같은 연산에서 높은 성능을 얻을 수 있다.
Spark로 작업한 결과를 Mysql로 적재하는 일이 있었다.
자주 이용하는 방식은 Mysql Connection관리와 key 중복이 발생할때 update를 하기 위해서 아래 두가지 방식을 많이 사용했다.
collect()
하여 드라이버노드에서 insert 하는 방식foreachPartition
를 돌면서 insert 하는 방식하지만 데이터가 매우 커지면 위 방식들은 OOM이나, IOException을 발생 시켰다.
그래서 Dataset의 jdbc write 방식을 이용하게 되었다. 이를 이용하면서 몇가지 문제점을 기록하고자 한다.
val prop = new Properties()
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("user", "mysql.user")
prop.setProperty("password", "mysql.password")
resultDataset.write.mode(SaveMode.Append)
.jdbc("Mysql.url", "tableName", prop)
이렇게만 하면 각 executor에서 insert문을 실행한다.
SaveMode
에 따라서 Mysql table을 다루는 방식이 다른다.
.option("truncate", true)
)사용하는 점에서는 문제가 없지만 해결할 방법이 없는 경우가 있다.
INSERT INTO table (column_list)
VALUES (value_list)
ON DUPLICATE KEY UPDATE
c1 = v1,
c2 = v2,
...;
위 Query처럼 key겹칠때 update를 하고 싶을 때가 있다. 하지만 SparkSql에서는 해당 기능을 지원하지 않는다.
그래서 다른 방법을 사용해야한다.(파일에 쓰고 insert 등)
또하나 주의할 점은
SaveMode.Overwrite
에서 cretae 될때 테이블 정보가 없기 때문에 Dataset에 있는 colum정보로 table이 생성된다.
그렇기 때문에 SaveMode.Overwrite
를 사용할때는 truncate
옵션을 true로 사용하는 것이 좀 더 안전하다.