이번에 회사에서 C* 2.0에서 3.x로 버전업을 하였다. 이에 설치방법을 정리해본다.
$ sudo chmod u+x /etc/rc.local
$ sudo systemctl start rc-local
$ sudo vi /etc/rc.local
$ echo noop > /sys/block/sda/queue/scheduler
$ echo 0 > /sys/class/block/sda/queue/rotational
$ echo 8 > /sys/class/block/sda/queue/read_ahead_kb
설정 값 설명) disk i/o scheduler - noop 으로 설정 sysFS 회전 플래그 - false(0) 으로 설정 readahead 블록 장치 값을 8kb 로 설정
$ sudo sudo vi /etc/fstab
UUID=d0241535-bee1-4798-a0dc-678157eea087 /data xfs discard,noatime 0 0
설정 값 설명) 트림(TRIM)은 SSD 디스크를 사용하면서 운영체제에서 삭제한 쓰레기 파일을 SSD 디스크 자체에서도 삭제시켜서 SSD가 사용하면서 느려지는 것을 개선해주는 기능
$ sudo yum install numactl
$ numactl --show
$ numactl -H
$ nuastat -cm
$ cat /proc/buddyinfo
요약 : NUMA - 메모리 접근의 대기시간을 줄이기 위해서 CPU Processor 마다 특정 범위의 메모리를 할당해서 지역성을 부여하는 방식 / vm.zone_reclaim_mode - 특정 영역의 메모리가 부족할 경우 다른 영역의 메모리를 할당하는데 zone 안에서 재할당하지 않음(다른 zone 에서 가져와서 사용함)
$ sudo vi /etc/rc.local
$ echo 0 > /proc/sys/vm/zone_reclaim_mode
$ sudo vi /etc/sysctl.conf
$ vm.max_map_count = 1048575
요약 : max_map_count 는 특정 프로세스가 소유할 수 있는 VMA (가상 메모리 영역) 수의 제한을 허용하는 것이다. 프로그램이 메모리에 맵핑을 시도할 때 파일 공유, 공유 메모리에 대한 세그먼트에 대한 링크 또는 힙 공간을 할당 할 때 프로세스의 수명동안 생성됩니다. 이 값을 조정하면 프로세스가 소유할 수 있는 VMA의 양이 제한됩니다. 프로세스가 소유할 수 있는 VMA의 양을 제한하는 것이다. 이를 제한하면 프로세스가 VMA 제한에 도달했을 때 메모리 부족 오류가 발생한다.
주의!! OpenJDK, OracleJDK 1.8.161/1.8.162 버전에서는 RMI 에러가 발생하여 카산드라가 실행이 되지 않는다.
jdk rpm파일을 다운로드하여 각 노드에 설치한다.
$ sudo yum localinstall -y jdk-8u151-linux-x64.rpm
$ sudo yum install -y jna
요약 : Cassandra는 최근에 JNI(Java Native Interface) 대신 JNA (Java Native Access) 방식을 이용해서 native 영역에 메모리를 copy 하는 작업이 많은 경우에 사용되어 플랫폼의 속도를 향상시키는 방법을 쓰고 있다.
참고링크 : http://channy.creation.net/project/dev.kthcorp.com/2011/05/12/last-free-lunch-facebooks-memory-allocator-jemalloc/index.html http://www.databasetorque.com/2017/09/jemalloc-and-cassandra.html
$ wget http://dl.fedoraproject.org/pub/epel/7/x86_64/Packages/j/jemalloc-3.6.0-1.el7.x86_64.rpm
$ sudo yum localinstall -y jemalloc-3.6.0-1.el7.x86_64.rpm
요약 : facebook 이 소개한 메모리 할당자(malloc) 로서 즉, 시스템에서 메모리를 받아오는 역할이다. 최근 멀티코어, 멀티스레드에 동작하는 프로그램에서는 속도, 공간효율성의 측면에서 굉장히 중요하다.
Cassandra에서 Jemalloc 를 활성화하면 Java heap 공간을 줄이게 된다. Cassandra 에 쓰여진 데이터는 먼저 Heap 메모리에 있는 MemTable 에 저장되고, MemTable 이 가득차면 SStable 로 Flush 된다. JVM GC는 Heap Memory를 지우는데 사용된다. 때때로 이러한 수집 프로세스로 인해 가비지 수집 일시 중지가 발생하여 Cassandra 에 문제가 발생하기도 한다.
malloc : 동적으로 메모리를 할당하는 함수 (힙 영역에 메모리를 할당)
sudo vi /etc/yum.repos.d/cassandra.repo
[cassandra]
name=Apache Cassandra
baseurl=https://www.apache.org/dist/cassandra/redhat/311x/
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://www.apache.org/dist/cassandra/KEYS
sudo yum -y install cassandra
vi /etc/cassandra/conf/cassandra.yaml
10 cluster_name: 'testCassandra'
25 num_token: 256
73 hints_directory: /data/cassandra/hints
103 authenticator: PasswordAuthenticator
190 data_file_directories:
191 - /data/cassandra/data
196 commitlog_directory: /data/cassandra/commitlog
368 saved_caches_directory: /data/cassandra/saved_caches
425 - seeds: 시드 노드 IP
438 concurrent_reads: 128
439 concurrent_writes: 256
440 concurrent_counter_writes: 128
444 concurrent_materialized_view_writes: 128
466 disk_optimization_strategy: ssd
539 memtable_flush_writers: 128
575 trickle_fsync: true
599 listen_address: # 각 노드별 IP
676 rpc_address: # 각 노드별 IP
811 concurrent_compactors: 32
819 compaction_throughput_mb_per_sec: 32
1074 internode_compression: all
1094 enable_user_defined_functions: true
$ sudo mkdir -p /data/cassandra
$ sudo chown cassandra:cassandra /data/cassandra
$ sudo systemctl start cassandra
$ sudo systemctl enable cassandra
nodetool status
nodetool describecluster
UN => UP / NORMAL Cluster Name => testCassandra Snitch => SimpleSnitch Partitioner => Murmur3Partitioner
$ cqlsh -u cassandra -p cassandra
CREATE USER testuser WITH PASSWORD 'testuser123' SUPERUSER;
ALTER USER cassandra WITH PASSWORD 'newpasswd' NOSUPERUSER;
LIST USER;
ALTER KEYSPACE system_auth WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
$ sudo vi /etc/cassandra/conf/cassandra-env.sh
243 if [ "x$LOCAL_JMX" = "x" ]; then
244 LOCAL_JMX=no
$ sudo cp /usr/java/jdk1.8.0_151/jre/lib/management/jmxremote.password.template /etc/cassandra/jmxremote.password
$ sudo vi /etc/cassandra/jmxremote.password
testuser testuser123
$ sudo chown cassandra:cassandra /etc/cassandra/jmxremote.password
$ sudo chmod 400 /etc/cassandra/jmxremote.password
nodetool -u testuser -pw testuser123 status
회사에서 용량이 30G정도 되는 gz파일 두개의 subtract를 구해 cassandra에 insert하는 작업이 있었다.(꽤 오래전일..)
처음 생각할 수 있는 코드 다음과 같을 것이다.
클러스터의 memory를 많이 사용할 수 있고 spark버전이 2.0이상이면 문제 없이(?) 많은 리소스를 사용하면서 잘 동작하겠지만 나에게 주어진 환경은 spark1.6버전 (spark 2.0이하 버전에서는 container 메모리가 일정 수준을 넘어가면 yarn에서 강제로 죽여버린다.
참고) 한정된 리소스였다.
그래서 위 코드 plan으로 코드를 작성하면 subtract연산을 할때 많은 데이터가 메모리 위에 존재하게 되고 다음과 같은 에러를 보게 된다.
다시 생각해보면 MapReduce작업이고 Reduce작업 할때 병목현상이 생겨 메모리를 많이 사용하게 된다.
이러한 병목현상을 줄이기 위해서 reduceByKey
, groupByKey
, MapPartitions
, RepartitionAndSortWithinPartitions
이런 연산들이 있다.
여기서
groupByKey
연산은 조심해서 사용해야하는데, groupBy연산은 shuffle이 발생한 뒤에 single in-memory에서 연산을 하므로 memory를 많이 사용하게 된다.
그래서 결론은 repartitionAndSortWithinPartitions
연산을 추가해서 문제를 해결하였다.
Spark-Sql를 사용하면 이런 튜닝은 자동으로 해준다. 똑같은 연산을 Dataframe join으로 풀어보면 실행계획 중에
repartitionAndSortWithinPartitions
이 들어간 것을 확인 할 수 있었다.
MapPartitoins
map연산과 HashPartitoiner
를 이용해서 hash함수에 key를 통과 시켜 같은 HashCode를 가지는 데이터는 같은 partition에 넣는 작업을 한다.
이게 groupByKey
연산과 가장 크게 다른점은 record를 하나의 memory에 모으지 않고 iterator를 사용해 stream처리해 memory를 적게 사용한다.
좋은점은 iterator를 직접 만들어서 partitions되는동안에 logic을 넣을 수 있다. 이렇게 되서 얻는 장점은 logic에 사용되는 memory 사용량을 줄일 수 있다.
val outputRDD = partitionedRDD.mapPartitions(v => new CustomIterator(v))
class CustomIterator(iter: Iterator[(String, String)]) extends Iterator[(String, String)] {
var lastKeyBase = null: String
var attr = null: String
def hasNext : Boolean = {
iter.hasNext
}
def next : (String, String) = {
var record = iter.next
var key = record._1.split("\\|")
var keyBase = key(0)
var keyFlag = key(1)
// Parse value
var value = record._2.split(",")
var inAttr = value(3)
if (lastKeyBase != null && !lastKeyBase.equals(keyBase)) {
// Reset group attribute
attr = null
lastKeyBase = keyBase
}
if (keyFlag.equals("A") && attr == null) {
attr = inAttr
}
(keyBase, value(0) + "," + value(1) + "," + value(2) + "," + (if (attr != null) attr else inAttr))
}
}
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions
연산은 글자 그대로 partition연산을 할때 key에대해서 sorting을 해준다.
이 연산이 가지는 장점은 여러개의 RDD에대해서 join연산을 할때 장점을 가진다. subtarct연산을 할때도 역시 장점을 가진다 key에 대해 sorting이 되어 있으니, 동일 key가 발견되면 해당 key 뒤 내용을 전혀 볼 필요가 없으니 말이다.
hashcode를 통한 key로 partition을 할때 사용하는 partitoner
class KeyBasePartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
//KEY를 자유롭게 설정하면 된다.
Math.abs(key.hashCode() % numPartitions)
}
}
class Functions(props: Properties) extends Serializable {
@transient
lazy val hbaseConnection = {
val hbaseConf = HBaseConfiguration.create()
props.entrySet().asScala.foreach(entry => {
val key = entry.getKey.toString
if (key.startsWith("hbase")) {
hbaseConf.set(key, entry.getValue.toString)
}
})
ConnectionFactory.createConnection(hbaseConf)
}
...
def getItem(log: Log) = {
val table = hbaseConnection.getTable("itemdb")
...
}
}
Object Driver {
....
val funtions = new Functions(props)
...
streaming.map(functions.getItem)
}
object Functions {
private var hbaseConnection: Connection = null
def getHbaseConnection(props:Properties) = {
if (hbaseConnection == null) {
val hbaseConf = HBaseConfiguration.create()
props.entrySet().asScala.foreach(entry => {
val key = entry.getKey.toString
if (key.startsWith("hbase")) {
hbaseConf.set(key, entry.getValue.toString)
}
})
hbaseConnection = ConnectionFactory.createConnection(hbaseConf)
}
hbaseConnection
}
}
class Functions(props: Properties) extends Serializable {
...
def getItem(log: Log) = {
val table = Functions.hbaseConnection(props).getTable("itemdb")
...
}
}
Covariance(공변성)에 대해서 서술하기전에 Polymorphism(다형성)에 대해서 알아보자
매개변수 다형성은 정적 타입의 장점을 포기하지 않으면서 일반적 코드(즉 여러 타입의 값을 처리할 수 있는 코드)를 만들기 위해 사용한다.
trait List[T] {
def isEmpty: Boolean
def: head: T
def tail: List[T]
}
class Cons[T](val head: T, val tail: List[T]) extends List[T] {
def isEmpty = false
}
class Nil[T] extends List[T] {
def isEmpty = true
}
def singleton[T](elem: T) =
new Cons[T](elem, new Nil[T]
singleton[Int](1)
singleton[Boolean](true)
java의 generic같은 느낌이다.
abstract class Animal {
def name: String
}
abstract class Pet extends Animal {}
class Cat extends Pet {
override def name = "Cat"
}
class Dog extends Pet {
override def name = "Dog"
}
class Lion extends Aniaml {
override def name = "Lion"
}
class PetContainer[P](p:P) {
def pet: P = p
}
val dogContainer = new PetContainer(new Dog)
val catContainer = new PetContainer(new Cat)
val lionContainer = new PetContainer(new Lion). // <- 주목
PetContainer에 Type은 무엇이든 올 수 있다. 이때 Type을 한정할 수 있는데, 이를 Type Bounds라고 한다.
예를 들어 PetContainer의 타입을 Pet이하의 것으로 한정하면 다음과 같이 된다.
class PetContainer[P <: Pet](p: P) {
def pet: P = p
}
val dogContainer = new PetContainer(new Dog)
val catContainer = new PetContainer(new Cat)
val lionContainer = new PetContainer(new Lion). // <- 컴파일 Error
Pet은 Animal을 상속 받기 때문에 Lion이 PetContainer에 들어 갈 수 없다.
앞선 예제를 생각해보자.
어떤 객체가 Dog <: Pet
이면 List[Dog] <: List[Pet]
일까?
List가 공변성을 가지고 있으면 맞는 말이다. 만약 List가 반 공변적이면 Error이다.
scala의 List는 공변적이다.
abstract class List[+A]
다시 정리하면 하위타입 관계가 실제 의미하는 것은 “어떤 타입 A에 대해 B이 하위 타입(A <: B)이라면, B으로 A를 대치할 수 있는가?” 하는 문제이다(역주: 이를 리스코프치환원칙(Liskov Substitution Principle이라 한다.)
이는 상위타입이 쓰이는 곳에는 언제나 하위 타입의 인스턴스를 넣어도 이상이 없이 동작해야 한다는 의미이다. 이를 위해 하위 타입은 상위 타입의 인터페이스(자바의 인터페이스가 아니라 외부와의 접속을 위해 노출시키는 인터페이스)를 모두 지원하고, 상위타입에서 가지고 있는 가정을 어겨서는 안된다).
C[T], A <: B
C[A] <: C[B] convariant - 공변적
C[A] >: C[B] contravariant - 반공변적
C[A] C[B] invariant - 무공변적
class C[+A] {...} convariant - 공변적
class C[-A] {...} contravariant - 반공변적
class C[A] {...} invariant - 무공변적
예제를 보면
def process(animals: List[Animal]) = ...
val cats = List(new Cat())
val dogs = List(new Dog())
process(cats)
process(dogs)
List가 공변적이기 때문에 Aniaml의 하위 type을 List에 담을 수 있다. 만약 List가 반공변적이면 Animal보다 상위 type만 List에 담을 수 있다.
그래서 다음 예제도 어디가 틀렸는지 바로 알 수 있다.
val dogs = Array(new Dog)
val pets: Array[Pet] = dogs <--Error
pets(0) = new Cat
val dog = dogs(0)
UserDetails
로 로그인/로그아웃 구현 방식에 대해(REST방식 로그인)SecurityContextHolder
에대해서 이해한 것들로그인 관련 MySql Table정보
create table user (
username varchar(20),
password varchar(500),
name varchar(20),
isAccountNonExpired boolean,
isAccountNonLocked boolean,
isCredentialsNonExpired boolean,
isEnabled boolean
);
create table authority (
username varchar(20),
authority_name varchar(20)
);
UserDetails
를 implements한 클래스를 만든다.@Getter
@Setter
@ToString
public class CustomUserModel implements UserDetails {
private String username;
private String password;
private boolean isAccountNonExpired = true;
private boolean isAccountNonLocked = true;
private boolean isCredentialsNonExpired = true;
private boolean isEnabled = true;
private Collection<? extends GrantedAuthority> authorities;
}
UserDetailsService
를 implements한 클래스를 만든다.@Service
public class UserManageService implements UserDetailsService {
@Autowired
private CustomUserDao customUserDao;
...
public CustomUserModel readUser(String userName) {
return customUserDao.readUser(userName);
}
public Collection<GrantedAuthority> getAuthorities(String username) {
return customUserDao.readAuthority(username);
}
@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
CustomUserModel customUserModel = customUserDao.readUser(username);
customUserModel.setAuthorities(getAuthorities(username));
return customUserModel;
}
}
참고 Mybatis를 사용할 경우 ResultMap으로
Collection<GrantedAuthority>
return 받음!!
<resultMap id="authorityMap" type="org.springframework.security.core.authority.SimpleGrantedAuthority">
<constructor>
<idArg column="authority_name" javaType="String"/>
</constructor>
</resultMap>
REST방법을 사용하는 경우 다음과 같이 사용한다. 아이디/패스워드를 json을 받아서 처리하니.. formLogin으로 처리해도 되지만, 따로 json처리를 해줘야하는 귀찮음이 있다.
@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
@ComponentScan(basePackages = {"com.addinfra.customtargeting.security"})
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Autowired
UserManageService userManageService;
@Autowired
RestUnauthorizedEntryPoint authenticationEntryPoint;
@Autowired
RestAccessDeniedHandler restAccessDeniedHandler;
@Autowired
LogoutHandler logoutHandler;
@Autowired
public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
auth.userDetailsService(userManageService);
}
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring()
.antMatchers("/*.js", "/*.css,/*.png", "/*.jpg", "/*.otf", "/*.eot", "/*.svg", "/*.ttf", "/*.woff", "/*.woff2", "/signin.html");
}
@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable();
http.authorizeRequests()
.antMatchers(HttpMethod.POST, "/login").permitAll()
.antMatchers(HttpMethod.POST, "/logout").permitAll()
.antMatchers(HttpMethod.OPTIONS, "/**").permitAll()
.antMatchers("/confirmAdList.html").hasAnyAuthority("admin")
.antMatchers("/userList.html").hasAnyAuthority("admin")
.anyRequest().authenticated()
.and()
.exceptionHandling()
.authenticationEntryPoint(authenticationEntryPoint)
.accessDeniedHandler(restAccessDeniedHandler);
http.logout()
.invalidateHttpSession(true)
.deleteCookies("JSESSIONID")
.logoutSuccessHandler(logoutHandler);
}
@Bean
@Override
public AuthenticationManager authenticationManagerBean() throws Exception {
return super.authenticationManagerBean();
}
}
다양한 Handler를 사용할 수 있다. 각 상황에 맞는 Handler를 구현해서 사용하면 된다.
프론트를 AngularJs를 사용하여 구현했다. 이때 logoutSuccessHandler를 사용해 pageRedirct를 할려고 했지만 잘되지 않아.. 우회적으로 Response를 프론트에서 받아 pageRedirct를 했다.
그리고 기본적으로 Spring Security에서
/logout
url요청에 대해서filter
가 걸려있다. 따로 Controller에서/logout
을 구현하지 않아도 자동으로 Session과 SecurityContextHolder가 초기화 된다.@Component public class LogoutHandler implements LogoutSuccessHandler { @Override public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException { SecurityUtils.sendResponse(response,HttpServletResponse.SC_OK,"logoutOk"); } }
formLogin과 달리 login처리를 따로 해줘야한다.
@RestController
public class LoginController {
@Autowired
UserManageService userManageService;
@Autowired
AuthenticationManager authenticationManager;
@PostMapping("/login")
public String login(@RequestBody AuthenticationRequest authenticationRequest, HttpSession session) {
try {
String username = authenticationRequest.getUsername();
String password = authenticationRequest.getPassword();
UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken(username, password);
Authentication authentication = authenticationManager.authenticate(token);
SecurityContextHolder.getContext().setAuthentication(authentication);
session.setAttribute(HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY,
SecurityContextHolder.getContext());
return new AuthenticationToken(authentication.getName(), userManageService.getLevel(authentication.getAuthorities()), session.getId());
} catch (BadCredentialsException e) {
return JSONObject.quote("check_pw");
} catch (InternalAuthenticationServiceException e) {
return JSONObject.quote("check_id");
} catch (Exception e) {
return JSONObject.quote(e.getMessage());
}
}
}
UsernamePasswordAuthenticationToken
에서 id/pw로 token을 만들어authenticationManager
에서 authenticate(인증)을 받고SecurityContextHolder
에 인증정보를 저장한다. 즉 authorization(인가)된 사용자라고 저장한다.
Spring Scurity에 LogoutFilter
클래스가 있다. 여기서 하는 일은 /logout
url로 요청이 오면 SecurityContextHolder
를 clear하고 session을 삭제한다.
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest)req;
HttpServletResponse response = (HttpServletResponse)res;
if (this.requiresLogout(request, response)) {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Logging out user '" + auth + "' and transferring to logout destination");
}
this.handler.logout(request, response, auth);
this.logoutSuccessHandler.onLogoutSuccess(request, response, auth);
} else {
chain.doFilter(request, response);
}
}
아무런 설정을 하지 않으면 기본적으로 생성된 ScurityContextHolder는 ThreadLocalSecurityContextHolderStrategy
에 저장된다.
여기서 궁금한점은 같은 아디로 여러 곳에서 로그인을 했을 때 한곳에서 로그아웃을 하면 다른 곳에서 로그인한 사용자도 로그아웃이 될까? (결론은 다른곳 사용자는 로그아웃이 안된다.)
public void logout(HttpServletRequest request, HttpServletResponse response, Authentication authentication) {
Assert.notNull(request, "HttpServletRequest required");
if (this.invalidateHttpSession) {
HttpSession session = request.getSession(false);
if (session != null) {
this.logger.debug("Invalidating session: " + session.getId());
session.invalidate();
}
}
if (this.clearAuthentication) {
SecurityContext context = SecurityContextHolder.getContext();
context.setAuthentication((Authentication)null);
}
SecurityContextHolder.clearContext();
}
위 소스는 혼란이 왔던 부분이다.
id/pw로만 인증을 할텐데.. 어떻게 다른 곳에서 로그인한 것을 구분할까… 여기에 답은 AuthenticationManager
에서 사용자 session값도 함께 저장을 하기 때문에 각각에 사용자에 대해서 로그아웃처리를 할 수 있던것이다.
그럼 AuthenticationManager를 사용하지 않고 id/pw로만 인증과정을 직접구현해서 사용하면 한사람이 로그아웃하면 동일아이디로 로그인 한사용자 모두가 로그아웃 되겠지..
conf/log4j.properties에서 log level를 수정한다.
log4j.rootCategory=INFO, console
to
log4j.rootCategory=ERROR, console
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)