본문 바로가기

옥탑방주인/Spark

Spark 2.2.0 Quick Start




Quick Start


이 튜토리얼은 빠르게 스파크를 사용하는 방법을 제공한다. 먼저 스파크 대화형(interactive) 쉘(파이썬 또는 스칼라로)을 통해 API를 소개하고, 어플리케이션을 어떻게 Java, Scala, Python으로 쓰는지를 보여줄 것이다.


이 가이드를 따라하려면, 먼저 Spark website 에서 스파크가 패키징되어있는것을 다운받아라. 여기서는 HDFS를 사용하지않겠지만, 릴리즈된 하둡 버전을 다운받을 수 있다.


Spark 2.0 이전에는 스파크의 메인프로그래밍 인터페이스(main programming interface)가 RDD(Resilient Distributed Dataset)였다. 스파크 2.0 이후에는, RDDs는 강력하게 형상화된 데이터셋으로 변경되었지만, 후드 아래에서 풍부한(richer) 최적화가 이루어졌다. RDD 인터페이스는 계속해서 지원되고 있고, RDD programming guide 에서 더 많이 완벽해진 레퍼런스를 확인할 수 있다. 그러나, 우리는 Dataset을 사용해서 바꾸는(switch)것을 매우 추천한다, Dataset은 RDD보다 낫은(better) 퍼포먼스를 갖고있다. SQL programming guide 에서 Dataset에 관해 더 많은 정보를 얻을 수 있다.


Interactive Analysis with the Spark Shell


Basics


Spark shell은 API를 배우는 간단한 방법을 제공한다. 마찬가지로 데이터를 대화식으로 분석하는 강력한 툴(powerful tool)을 제공한다. 이것은 Scala(Java VM에서 작동하고 현재 존재하는 Java libraries들을 사용할 수 있다) 또는 Python으로 실행가능하다. Spark 폴더에서 실행해보자.

./bin/spark-shell
스파크 중요한 추상적인 개념은 Dataset이라고 불리는 아이템의 분산된 콜렉션이다. Datasets은 하둡 인풋포멧 또는 다른 Datasets으로 변형하여 만들 수 있다. 스파크 소스 폴더안에 있는 README 파일의 텍스트로부터 새로운 데이터셋을 만들어 보자.
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [Value: string]
action으로 불리는 직접적인 데이터셋의 값 또는 변형된 데이터셋을 변환시켜 새롭운 값을 얻을 수 있다. 좀 더 자세한 내용은 API doc에 있다.


scala> textFile.count()
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark


이제 이 데이터셋을 변형시켜 새로운 것으로 만들어보자. 파일에서 아이템의 subset과 새로운 데이터셋을 리턴하는것을 filter라고 부를 것이다.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]


변형(transformation)과 액션(actions)을 한꺼번에 묶어 처리할 수 있다.


scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15


More on Dataset Operations


Dataset 액션(action)과 변형(transformations)은 더 복잡한 연산에 사용할 수 있다.


scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15


먼저 line에 정수값을 매핑하여 새로운 dataset을 생성한다. reduce는 Dataset에서 제일 큰 word count를 찾기위해 호출된다. map과 reduce에서 인자(argument)는 스칼라 함수 리터럴(Scala function literal)이고 Scala/Java library 또는 모든 언어 feature를 사용할 수 있다(요약해서 정리해보면 scala나 java library에 포함되어있는 기능들을 여기에서 다 사용할 수 있다는 것 같다). 예를들어, 다른곳에서 선언된 함수를 쉽게 호출할 수 있다. Math.max() 함수를 사용해서 이해를 좀 더 쉽게 해주겠다.


scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15


하나의 일반적인 데이터 플로우 패턴은 MapReduce이고 Hadoop으로 많이 알려져 있는 기능이다. Spark는 MapReduce를 좀 더 쉽게 실행할 수 있다. 


scala> val wordCount = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org,apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]


데이터셋의 단어에서 데이터셋의 라인으로 변형시키는것은 flatmap이고 file에서 데이터셋의 짝(String, Long 타입)을 단어 단위로 계산해주는것은 groupByKey와 count 이다.


scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means, 1), (under, 2), (this, 3), (Because, 1), (Python, 2), (agree, 1), (cluster., 1), ...)


caching


Spark는 데이터셋을 클러스터 전체 in-memory cache에서 가져오는것을 지원한다. 작은 "hot" dataset을 쿼리하거나 PageRank같이 반복적인 알고리즘을 실행하는것과 같은 반복적인 접근을 하는 데이터를 사용할 때 매우 유용하다. dataset에 캐시되는 linesWithSpark를 만들어 보자.


scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15


100라인 택스트파일이 캐시된것을 스파크를 사용해서 탐색하는것이 매우 바보같아 보이지만, 흥미로운점은 10개 또는 100개 노드의에서 매우 큰 데이터를 처리할때도 같은 함수가 사용된다. 마찬가지로 클러스터에서 bin/spark-shell을 연결해서 반응형 쉘을 사용할 수 있다. RDD programming guide에 자세한 내용이 나와있으니 참조해라.



Self-Contained Applications


Spark API를 사용해서 독립적인 어플리케이션을 사용할 수 있다. 간단한 어플리케이션을 통해 Scala,(with sbt), Java(with Maven), and  Python에서 어떻게 작성하는지 서술해놓았다.


SimpleApp.scala라고 불리는 매우 간단한 Spark application을 만들었다.


/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp{ def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFIle).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Line with a: $numAs, Lines with b: &numBs") spark.stop() } }


어플리케이션은 scala.App을 확장하는 대신에 main() method를 정의해야되고, scala.App의 서브 클래스가 올바르게 작동하지 않을 수 있습니다.


위에 코드는 'a'를 포함하고 있는 라인의 횟수를 카운트하고 Spark README에서 'b'를 포함하고 있는 숫자를 카운트 합니다. YOUR_SPARK_HOME의 위치를 스파크가 설치된 곳으로 바꿔야 된다. SparkSession을 Spark shell에서 실행했던 예제와는 달리, 프로그램의 한 부분으로 SparkSession을 설치할수있다.


[[SparkSession]]을 제작하는것을 SparkSession.builder라 부른다. 그 후 어플리케이션 이름을 정하고, [[SparkSession]] instance를 얻는것을 getOrCreate라 부른다.


우리의 어플리케이션은 Spark API에 의존한다. Spark의 dependency의 설정이 포함된 sbt 구성파일인 build.sbt를 아래에 적어놓았다.


name := "Simple Project"


version := "1.0"


scalaVersion := "2.11.8"


libraryDependencies += "org.apache,spark" %% "spark-sql" % "2.2.0" 


sbt가 정상적으로 동작한다면, 일반적인 폴더 구성에 따라 SimpleApp.scala와 build.sbt를 레이아웃하는것이 필요하다. 이러한 구성들이 완성되면 어플리케이션 코드를 포함하고있는 JAR 패키지를 생성할 수 있고 우리의 프로그램에서 실행할 수 있는 spark-submit을 사용할 수 있다.



 # Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
   --class "SimpleApp" \
   --master local[4] \
   target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23





'옥탑방주인 > Spark' 카테고리의 다른 글

Spark 2.2.0 Programming Guide  (0) 2017.10.23
Spark 실행옵션  (0) 2017.08.18
[Spark] Spark Streaming - A QUICK Example 에서 에러  (0) 2017.07.20