AWS EMRにspark-jobserverを構築してREST経由でJob実行させる

1 Star2 Stars3 Stars4 Stars5 Stars (まだ評価されていません)
Loading...

Sparkを利用して、ジョブをパシパシ叩きたかったのですが、どうやってジョブ実行をしようかな、と悩むことになりました。

クラウド依存になりたくなかったので、できればKinesisストリームは使いたくないな。。と思いつつも下記のように調べてみました。

利用可否 Streamingか単発ジョブか アドホックな引数設定の可否 懸念
KinesisとSpark Streaming Streaming やや難しい。 GCPとかMS使いたくなったら書き直しが多い
spark-jobserver 単発ジョブ RestAPIのパラメータ設定なので簡単 今後も利用され続けるか疑問。
luigiとか使ってspark-submitを叩く 単発ジョブ ジョブ作りこめば簡単 Sparkのためだけのジョブサーバを構築・運用するかどうするか
Akka Stream 不可 implementされてない(2016/4/21時点) おそらくStreaming おそらく難しい。 いつ出るんだろ。。

どれもあまり乗り気になれませんでしたが、RestAPIで叩けるようにしておけば、開発関係者が気軽にバッチを叩いてMLを試したりできるかも?ということで、spark-jobserverを使ってみることにします。

構築について

sparkの利用できるEMRの起動後に、sshでmasterノードにログインします。

sshログインしたら、下記のMarkdownにしたがって、job-server.tar.gzを作成します。

m1.mediumのインスタンスを利用しましたがビルドは遅かったです。。
assemblyでjob-server.tar.gzを作成するだけなので、CI系サービスか、自前の環境で作成してartifactoryやs3等に保存してもいいかもしれません。(これだけのために毎度EMRを立ち上げる必要はなし。)

利用してみる

job-server.tar.gzを展開します。

mkdir /mnt/lib/spark-jobserver
cd /mnt/lib/spark-jobserver
tar zxf job-server.tar.gz

JAVA_OPTSのserver_start.shの中のjmx周りの設定をコメントアウトします。
これがあると動かないことがありました。

テスト用のアプリケーションを用意してみます。

build.sbt

name := "example-spark-jobs"
organization := "org.triplew.dfree"
version := "1.0-SNAPSHOT"
enablePlugins(JavaAppPackaging, sbtdocker.DockerPlugin)
scalaVersion := "2.10.6"
scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")
libraryDependencies ++= {
val sparkV      = "1.6.1"
Seq(
"org.apache.spark" % "spark-core_2.10" % sparkV,
"spark.jobserver" %% "job-server-api" % "0.6.1" % "provided",
"com.github.seratch" %% "awscala" % "0.5.+",
"com.github.levkhomich" %% "akka-tracing-core" % "0.4",
"commons-configuration" % "commons-configuration" % "1.10"
)
}
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)
Revolver.settings
resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"
(以下割愛)

scalaのバージョンを2.11にすると、sparkジョブは動きませんでした。
これ、大丈夫だろうか、という不安を覚えますね。。

setMasterで指定しているmasterについては、spark-shellでいったん、sc.masterで確認しちゃいました。こういうのは外だししたいですね。。

package org.triplew.dfree.example.spark.jobs
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import spark.jobserver.{SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}
import scala.util.Try
object Hoge extends SparkJob {
override def runJob(sc:SparkContext, jobConfig: Config): Any = {
sc.parallelize(jobConfig.getString("input.string").split("-").toSeq).count()
}
override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
def main(args: Array[String]): Unit = {
val appName: String = "dfree-example-spark-jobs"
val conf = new SparkConf().setAppName(appName).setMaster("local[*]")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val result = runJob(sc, config)
println(result)
}
}

このファイルを、sbt pacakgeでコンパイルして生成されたjarを、curlコマンドで、jobserverにputします。

また、jarに対しては名前を与えることができるようです。

curl --data-binary @target/scala-2.10/example-spark-jobs_2.10-1.0-SNAPSHOT.jar http://*.*.*.*:8090/jars/my-test

次にcontextを用意します。

curl -d "" 'http://*.*.*.*:8090/contexts/test?num-cpu-cores=1&memory-per-node=512m&spark.executor.instances=1

上記で作成したcontextとjarを指定して、sparkジョブを実行します。

curl -d "input.string = a-b-c" '*.*.*.*:8090/jobs?appName=my-test&classPath=org.triplew.dfree.example.spark.jobs.Hoge&context=test&sync=true'

そしてアウトプットとして以下を取得します。

{
"result": 3
}

動いてるようです。

所感

  • Restのパラメータに指定するsyncで、非同期リクエストも可能のようなので、重たいバッチや機械学習をasync実行させ、jobidのみを控えておいて、ステータス繰り上がりあとに利用する、みたいな感じでしょうか。
  • 全部そこらへんもScalaでやるならFutureなどを利用するのもいいかもしれません。
  • Akka Streaming早く使ってみたい。。

本日は以上となります。


1 Star2 Stars3 Stars4 Stars5 Stars (まだ評価されていません)
Loading...
      この投稿は審査処理中  | 元のサイトへ