Joswlv

Spark serialize에대한 깨달음!

2018-05-28

Java spark streaming을 scala로 바꾸면서 발생한 시행착오 공유

  • spark job 처리 시 transform 작업이 간단한 경우 lambda 식으로 바로 적기도 하지만 재사용 등의 이유로 클래스로 만들어 사용하기도 한다
  • 일반적인 경우에는 처리 프로세스들의 모음이므로 클래스의 인스턴스가 매번 새롭게 생성될 필요가 없으므로 object(java의 static)로 만들어서 사용한다
  • 하지만 처리 프로세스 안에서 외부의 값을 받아서 써야 하는 일이 생겨 class로 정의하고 인스턴스를 만들어 전달을 하게 되었다 -> 문제의 시작

문제 발생 - 1

  • 처리 프로세스들을 class로 묶으면서 hbase에 접근하여 값을 가져오는 처리가 필요했다
  • hbase connection은 재사용이 가능하므로 한 번만 만들어 놓고 계속 쓰는 것이 효율적이라 클래스 변수로 hbase connection을 선언하고 클래스 생성 시 hbase connection에 필요한 값들얼 파라메터로 받아 connection을 생성했다
class Functions(props: Properties) extends Serializable {
    @transient
    lazy val hbaseConnection = {
        val hbaseConf = HBaseConfiguration.create()
        props.entrySet().asScala.foreach(entry => {
        val key = entry.getKey.toString
        if (key.startsWith("hbase")) {
          hbaseConf.set(key, entry.getValue.toString)
        }
      })
      ConnectionFactory.createConnection(hbaseConf)
    }
    ...
    def getItem(log: Log) = {
        val table = hbaseConnection.getTable("itemdb")
        ...
    }
}

Object Driver {
    ....
    val funtions = new Functions(props)
    ...
    streaming.map(functions.getItem)
}

  • transform을 통해 executor로 instance(위에서는 funtions)가 전달되려면 serialize가 가능해야 하므로 Serializable을 implements 하고, hbaseConnection이 serializable하지 않으므로 transient 키워드를 붙여 serialize 처리 시 빠지게 하고 lazy 키워드를 붙여 처음 사용 시 한 번 생성하도록 했다
  • 위와 같은 코드로 streaming을 실행하고 시간이 지나자 zookeeper connection과 관련한 문제가 발생하였다
    • hbaseConnection이 생성될때마다 zookeeper 연결도 같이 하게 되는데 hbaseConnection이 많아져서 zookeeper와 관련된 오류가 발생하는 것을 확인하였다
  • driver를 통해 전달된 Funtions instance가 executor에 한 번 전달되면 deserialize된 hbaseConnection이 lazy가 적용되어 최초 한 번 호출 시 생성이 되고 그 다음부터는 재사용될 것이라고 생각했는데 streaming이 처리되는 주기마다 매 번 새로 생성이 되었다
  • transient와 lazy를 같이 써서 hbaseConnection이 매번 생성되는 것인가 생각을 하고 lazy를 빼고 적용을 하니 zookeeper 연결과 관련된 문제가 발생하지 않아 문제가 해결됐다고 생각했다 -> 착각은 자유
    • deserialize를 통해 executor에서 instance가 생성되고 transient된 필드는 코드대로 초기화가 수행될 것이라고 생각했는데 초기화부분이 처리되지 않아 hbaseConnection이 null이었고, 그로 인해 hbase에서 값을 가져오지 못하는 문제가 발생하였다

Spark와 Serialize

  • spark는 transform 처리 시 외부에서 생성된 instance를 serialize하여 executor로 보내고 executor에서는 deserialize하여 다시 instance를 만들어 이를 사용한다
  • 해당 transform이 수행될 때마다 executor는 매 번 deserialize를 수행하며 이 때 매 번 새로운 instance가 생성된다
    • 일반적인 spark job의 경우 별 문제가 되지 않지만 streaming의 경우 짧은 주기로 처리가 되므로 life cycle을 길게 가져가야 하는 connection 등의 instance 관리에 문제가 된다
  • java의 경우 function class에 static으로 connection같은 필드를 지정하면 executor 프로세스가 재시작되지 않는 이상 재사용이 가능하지만 scala에는 static 키워드가 없다
    • 대신 companion object를 이용하여 static과 비슷한 처리를 할 수 있다
object Functions {
    private var hbaseConnection: Connection = null

    def getHbaseConnection(props:Properties) = {
        if (hbaseConnection == null) {
            val hbaseConf = HBaseConfiguration.create()
            props.entrySet().asScala.foreach(entry => {
            val key = entry.getKey.toString
            if (key.startsWith("hbase")) {
              hbaseConf.set(key, entry.getValue.toString)
            }
          })
          hbaseConnection = ConnectionFactory.createConnection(hbaseConf)
      }
      hbaseConnection
    }
}

class Functions(props: Properties) extends Serializable {
    ...
    def getItem(log: Log) = {
        val table = Functions.hbaseConnection(props).getTable("itemdb")
        ...
    }
}

Comments