본문 바로가기

옥탑방주인/Spark

Spark 2.2.0 Programming Guide



Overview


모든 스파크 어플리케이션은 유저의 main function 실행하는것과 클러스터상에서 다양한 병렬처리를하는 driver program으로 구성되어 있다. Spark에서 제공하는 main abstraction은 resilient distributed dataset(RDD)인데, RDD는 클러스터의 노드에서 파티션된 인자(element)의 콜렉션인데, 이것들은 병렬 처리가 가능하다. RDDs는 Hadoop file system(또는 하둡이 지원하는 다른 파일 시스템)또는 현재 diver program에서 Scala collection 과 함께 생성되고 변형시킨다. 유저들은 메모리에서 RDD가 유지하도록 요청하여, 병렬작업에서 효율적으로 재사용된다. 끝으로 RDDs는 노드에서 실패(node failures)를 자동으로 복구된다.


Spark에서 두번째 abstraction은 병럴작업에서 사용되는 공유 변수들(shared variables)이다. 기본적으로, Spark은 다른 노드에서 작업 집합과 같이 병렬로 함수를 실행할 때 함수에 사용 된 각 변수의 복사본을 각 작업에 제공합니다. 때때로, 변수는 작업들(tasks)또는, 작업들과 driver program사이에 공유되길 원한다. Spark는 두가지 타입의 공유변수(shared variable)를 지원한다.: broadcast variable은 모든 노트의 메모리에서 값을 캐시하는되 사용된다. accumulators는 더하거나 계산하는것데 사용된다.


이 가이드는 스파크가 서포트하는 언어에서의 특징을 보여준다. 이것은 스파크 반응형 쉘(interactive shell)에서 쉽게 따라할 수 있다. bin/spark-shell에서는 스파크 쉘이 있고, bin/pyspark에서는 파이썬 쉘이 있다.


Linking with Spark


스파크 2.2.0은 Scala 2.11 버전을 기준으로 작업한다.(Scala 다른버전에서도 마찬가지로 작업하는것을 빌드할 수 있다.) 스칼라에서 어플리케이션을 사용하려면, Scala 버전을 맞춰서 사용해야 한다(e.g. 2.11.X).


스파크 어플리케이션을 사용하는것은, Spark에서 Maven dependency를 추가하는것이 필요하다. Spark는 Maven Central을 통해 이용가능하다.


groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0


만약 HDFS 클러스터를 추가하고 싶다면, 스파크 버전에 맞는 hadoop-client dependency를 추가해주면 된다.


groupId = org.apache.hadoop
artifactId = hadoop-client
version = 


마지막으로, 프로그램에서 몇개의 스파크 클래스를 추가하는것이 필요하다.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


(스파크 1.3.0이전버전이면 import org.apache.spark.SparkContext. 도 추가해줘야 된다.)




Initializing Spark


스파크 프로그램에서 가장먼저 해야될 것은 SparkContext object를 만드는 것이다. 이것은 스파크가 클러스터에 어떻게 접근하지는지를 알려준다. SparkContext를 생성하기위해 첫번째로 당신이 작성한 어플리케이션에 관한 정보를 포함하고있는 SparkConf object를 빌드하는것이 필요하다.


오직 하나의 SparkContext만 각 JVM에서 실행가능하다. 새로운것을 만들기 전에 SparkContext의 동작을 stop() 해야한다.


val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)


appName 인자(parameter)는 당신의 어플리케이션을 클러스터 UI에서 보여주는 이름이다. master는 Spark, Mesos or YARN cluster URL, 또는 로컬모드에서 실행할수있는 특별한 "local" 스트링이다. cluster상에서 작동할 때, 프로그램에서 master를 하드코드(hardcode)하고싶지는 않을 것이다, 어플리케이션에 spark-submit을 런치하기도 하지만 받기도 한다. 그러나, local 테스트와 unit테스트는 Spark in-process에서 작동한다.


Using the Shell


스파크 쉘이서는, 특별한 인터프리터(interpreter)가 있다. SparkContext는 sc라고 불리는 변수로 당신을 위해 이미 생성되어 있다. 나만의 SparkContext를 만드는것은 효과가 없다.  --master 인자(argument)를 사용해서 문맥(context)가 연결되는 master를 설정할 수 있고, JAR파일은 --jar 인자(argument) 명령어를 사용해서 사용할 수 있다. Maven을 추가하는것도 --package 인자를 사용해서 가능하다. 추가적인 저장소(repositories)는 --repositories 인자를 사용할 수 있다. bin/spark-shell을 사용해서 예제를 아래에서 보여줄 것이다.


$ ./bin/spark-shell --master local[4]


code.jar 경로 추가:

$ ./bin/spark-shell --master local[4] --jars code.jar


Maven coordinate사용해서 dependency 추가

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"


이러한 옵션들이 있고, spark-shell에서 --help를 실행하면 여러가지 명령어들이 있다. spark-shell은 많은 spark-submit script를 호출할수있다.



Resilient Distributed Datasets (RDDs)


스파크는 resilient distributed dataset(RDD)의 개념으로 중심이 되어있다. RDD는 병렬에서 작동되는 인자의 컬렉션에 내고장성(fault-tolerant)을 지원한다. RDD 생성하는것은 두가지 방법이 있다: 당신의 드라이버 프로그램에서 현재 컬렉션을 parallelizing 하는것과 HDFS, HBase, 또는 하둡 입력포맷을 제공하는 어떤 데이터와 같은 공유 파일시스템(shared filesystem)인 외부 저장 시스템에서 테이터셋을 참조(referencing)하는것이 있다.



Parallelized Collections


병렬하된 컬렉션은 당신의 프로그램(Scala Seq)의 기존 컬렉션에서 SparkContext'S parallelize 메소드(method)로 호출된다. 컬렉션의 요소가 복사되어 분산 된 데이터 집합을 형성하며이 데이터 집합을 병렬로 작동 할 수 있습니다. 예를들어, 1에서 5를 가지고있는 병렬화된 컬렉션(palatalized collection)을 생성하는법을 보여주겠다.

 val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelized(data)


이것이 생성되었다면, 분산된 데이터셋(distData)는 병렬에서 작동한다. 예를들어, 어레이의 인자를 더하는것을 disData,.reduce((a, b) => a + b)로 호출한다. 추후에 이런내용을 다룰 것이다.


병렬 컬렉션(parallel collection)에서 하나의 중요한 인자는 데이터셋에서 잘려진 partition의 개수이다.

스파크는 클러스터의 각 파티션에서 하나의 작업(task)를 실행할 수 있다. 보통 클러스터의 각 CPU에서 2~4개 partition을 원할 것이다. 보통, 스파크는 클러스터기반으로 파티션의 갯수를 설정하는것을 자동적으로 시도한다. 그러나, 두번째 매개변수는 parallelize(e.g., sc.parallelize(data, 10)) 에서 수동설정할 수 있다. 참고 : 코드의 일부 위치에서는 이전 버전과의 호환성을 유지하기 위해 슬라이스(slice)라는 용어 (파티션의 동의어)를 사용합니다.


External Datasets


스파크는 Hadoop에서 지원되는 모든 스토리지 소스로부터 분산 데이터셋을 생성할 수 있다. HDFS, Cassandra, HBase Amazon S3, etc.. 와 같은 당신의 로컬 파일 시스템도 지원한다. 스파크는 texfile, SequenceFiles, 그리고 모든 하둡 입력포맷(InputFormat)도 지원한다.


Text file RDD는 SparkContext'S textFile 메소드를 사용해서 생성된다. 이 메소드는 file(머신의 로컬 패스이거나 hdfs://, s3n://, etc URI)에서 URI를 가져오고 라인의 컬렉션과 같은것을 읽어들인다.

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at :26


생성이 되었다면, distFile은 데이터셋 오퍼레이션(dataset operations)으로 행동될 것이다. 예를들어, map과 reduce를 사용해서 모든 라인의 사이즈를 더하는것을 :distFIle.map(s => s.length).reduce((a, b) => a + b)와 같이 나타낼 수 있다.


아래의 읽을거리들이 몇개 있다.


  • 만약 경로를 local filesystem에서 사용하려면, 워커 노드에서도 접근가능한 경로를 설정해야된다. 모든 워커에서 파일을 복사하거나 네트워크에 마운트된 공유 파일시스템(shared file system)을 사용해라.

  

  • textFile을 포함한 Spark의 모든 파일 기반 입력 방법은 디렉토리, 압축 파일 및 와일드 카드에서도 실행될 수 있습니다. 예를들어, textFile("/my/directory"), textFile("/my/directory/*.txt"), 그리고 textFile("/my/directory/*.gz")로 사용할 수 있다.


  • textFile 메소드는 파일의 파티션의 갯수를 컨트롤하는 선택적인 두번째 인자(argument)를 가져올 수 있다. 기본적으로, 스파크는 파일의 각 블록(HDFS에서 기본적으로 블록은 128MB로 되어있다)에서 하나의 파티션으로 생성한다.


텍스트 파일 외에도 Spark의 Scala API는 여러 가지 다른 데이터 형식을 지원합니다.

s

  • SparkContext.wholeTextFiles은 다양한 작은 text files을 포함하고있는 경로를 읽고 그것들(filename, content)을 짝(pair)짓어서 리턴시켜준다.

  • SequenceFiles에선, SparkContext's는 sequenceFile[K, V]메소드를 사용합니다. k는 파일에서 key이고, v는 value입니다. IntWritable과 Text와 같은 하둡의 Writable한 인터페이스의 서브클래들이여만 된다. 게다가, 스파크를 사용하면 몇가지 일반적인 Writable에 대한 기본 유형을 지정할 수 있다; 예를들어, sequenceFile[Int, String]은 자동적으로 IntWritables와 Text로 읽어진다.

  • 다른 하둡 입력포맷(InputFormat)에선, SparkContext.hadoopRdd 메소드를 사용한다. 이 메소드는 key class와 value class, format class입력과 임의의 JobConf를 가지고잇다. 당신의 input source와 Hadoop job과 설정을 똑같게 해라. "새로운" MapReduce API(org.apache.hadoop.mapreduce)기반인 InputFormat에서 SparkContext.newAPIHadoopRDD를 사용할 수 있을것이다.

  • RDD.saveAsObjectFile과 SparkContext.objectFile은 RDD에서 시리얼라이즈된 자바 오브젝트로 구성된 간단한 포맷을 저장한다. Avro와 같은 특수 형식만큼 효율적이지는 않지만 RDD를 쉽게 저장할 수 있습니다.



RDD Operations


RDDs는 두가지 타입의 동작을 지원한다: transformations는 현재 하나로부터 새로운 데이터셋을 생성하는 것이고 action은 dataset에서 계산이 실행된 후 driver program에서 값을 리턴하는 것이다. 예를들어, map은 함수(function)을 통해 각 데이터셋의 인자를(element)를 넘겨준 것과 새로운 RDDs에서 표현된 결과의 리턴값을 변형시킨다. 반면에, reduce는 몇개의 함수(function)과 driver program(분산 데이터셋에서 리턴된 병렬 reduceByKey이긴 하지만)에서의 마지막 결과를 사용한 RDD의 모든 인자(element)를 합하는 action을 한다.


스파크에서 모든 변형(transformation)은 lazy(Scala lazy를 참조하면 이해가 빠를것이다)이다. 결과를 바로 계산하지 않는다. 그러나, persist(또는 cache)메소드를 사용해서 메모리에서 RDD를 persist하는것은 가능하다. 그런경우에 스파크는 다음번에 너가 쿼리하는것에대해 더욱빠르게 접근할 수 있도록 인자(element)를 보관한다. 또한 disk에 RDDs를 유지(persist)하거나 다중 노드에서 복제되는것을 지원한다.



Basics


RDD 기본을 설명하자면, 간단한 아래 프로그램을 보면 된다:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)


외부파일로부터 기본 RDD를 정의하는것은 첫번째 라인이다. 이 데이터 세트는 메모리에 로드되지 않았거나 다른 방법으로 작동하지 않습니다. 라인은 파일에 대한 포인터 일뿐입니다. 두번째 라인은 map 함수의 변형의 결과를 lineLengths로 정의한 것이다. 다시말해, lineLengths는 즉시 계산되는것이 아니고, laziness(호출되는순간 값이 초기화)처럼 동작한다. 마지막으로, reduce는 액션처럼 동작한다. 이 시점에서는 스파크는 계산을 개별 컴퓨터에서 실행되는 작업으로 분해하고 각 머신마다 map의 부분과 local reduction을 실행하고, driver program에  답하는것을 리턴한다.


lineLengths를 나중에 사용하길 원하면 아래처럼 사용하면 된다:

lineLengths.persist()


reduce하기전에, reduce를 지정하면 lineLength가 처음 계산 된 후 메모리에 저장됩니다.



Passing Functions to Spark



Spark의 API는 드라이버 프로그램에서 함수를 전달하여 클러스터에서 실행하는 것에 크게 의존합니다. 두개의 추천사항을 제시해주겠다.



object MyFunctions {     def func1(s: String): String = { ... }

}


myRdd.map(MyFunctions.func1)

클래스 인스턴스 (싱글 톤 객체가 아닌)에서 메소드에 대한 참조를 전달하는 것도 가능하지만,이 경우 메소드와 함께 해당 클래스가 포함 된 객체를 보내야합니다. 예를 들어 다음을 고려하십시오.

class MyClass {     def func1(s: String): String = { ... }

def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }

}

MyClass 인스턴스를 새로 만들고 doStuff를 호출하면 그 안에있는 맵이 해당 MyClass 인스턴스의 func1 메소드를 참조하므로 전체 객체를 클러스터로 보내야합니다. 이러한 것들은 rdd.map(x => this.func1(x))를 사용하는것과 비슷하다.


비슷한 내용으론, 바깥 쪽 개체의 필드에 액세스하면 전체 개체를 참조하게됩니다.

class MyClass {     val field = "Hello"

def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x)

}

this의 모든것을 참조하는것은 rdd.map(x => this.field +x)를 작성하는것과 동일하다. 오류를 방지하기위해서 간단한 방법은 외부에서 액세스하는 대신 필드를 로컬 변수에 복사하는 것이다:

def doStuff(rdd:RDD[String]): RDD[String] = {     val field_ = this.field

rdd.map(x => field_ + x)

}




Understanding closures


스파크에 관해 어려운것중에 하나는 범위(scope)와 변수(variables)의 라이프 사이클 그리고 클러스터에서 코드를 실행할 때의 메소드를 이해하는 것이다. 범위를 벗어나는 변수를 수정하는 RDD 연산은 자주 혼란을 일으킬 수 있습니다.  예제에선 증가하는 연산에서 foreach()를 사용하는 코드를 볼거다. 하지만 다른 실행과 마찬가지로 오류가 발생할 수 있다.


Example


아래의 native RDD 인자 합계를 생각해보면. 실행은 동일한 JVM 내에서 발생하는지 여부에 따라 다르게 동작 할 수 있습니다. 이에 대한 일반적인 예로 로컬 모드 (--master = local [n])에서 Spark 응용 프로그램을 클러스터에 배포하는 경우 (예 : Spark-YARN )를 실행하는 경우를들 수 있습니다.

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)


Local vs cluster modes



위에 코드의 동작은 정의되지 않았고, 의도한대로 작동하지 않을것이다. 잡(jobs)들을 실행하려면, 스파크는 작업(tasks)에서 RDD 작동(RDD operations)의 과정을 해야된다.






'옥탑방주인 > Spark' 카테고리의 다른 글

Spark 2.2.0 Quick Start  (0) 2017.10.13
Spark 실행옵션  (0) 2017.08.18
[Spark] Spark Streaming - A QUICK Example 에서 에러  (0) 2017.07.20