Running Apache Spark jobs from applications

19/8/2015

Big data problems are often very easy to solve using Apache Spark: You create a Spark context and call textFile() to get your input. After that, everything mostly looks like regular, concise Scala code: you map, collect and filter as you please, before eventually collecting your result.

As easy as it is to write Spark jobs, as cumbersome it is to call them from application code, mostly just because of the way Spark is designed:

So, running Spark jobs from your application sucks, and there is nothing you can do to make it better. We can, however, make it work, duct tape style. To do so, we need to overcome a few challenges.

Challenge 1: Publishing your fat JAR

Okay, this one is actually easy and well-documented, but not remommended. ("[...] because non-modular JARs cause much sadness." I'm still giggling.)

Configure the sbt-assembly plugin in assembly.sbt:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

Then add this to your build.sbt in your Spark job project:

artifact in (Compile, assembly) := {
  val art = (artifact in (Compile, assembly)).value
  art.copy(`classifier` = Some("assembly"))
}

addArtifact(artifact in (Compile, assembly), assembly)

Now when you run sbt publish or sbt publishLocal, your fat JAR is published as well. Success.

Challenge 2: Loading your fat JAR from Ivy/Maven, but keeping it off the classpath

Now this is harder. As we have just published a fat JAR, it would be nice to pull it from your Ivy or Maven repo in the application that you want to run the Spark job from.

Now, here is the tricky part. Your fat JAR comes with a ton of stuff that you do not want on the classpath. For a start, it is compiled with a different Scala version. Also, there may be a lot of dependencies in there that your application also uses – but in the Scala 2.11 variant, or in a different version. Hence you cannot have your fat JAR on the classpath.

If you think about it, the JAR is more like a resource. You never call it from code, you just need to have the file lying around. We can thus use some messy SBT to turn it into a resource.

val AsResource = config("asResource")
ivyConfigurations += AsResource

libraryDependencies ++= dependencies(
    "my.org" % "my-job_2.10" % "0.1.0" % AsResource classifier "assembly" notTransitive()
)

val sparkJobs = Seq("my-job")
resources in Compile ++= update.value.select(configurationFilter(AsResource.name)).map { f =>
  val finalName = sparkJobs.find(f.name.startsWith).getOrElse(f.name)
  val target = (resourceManaged in Compile).value / s"$finalName.jar"
  IO.copyFile(f, target)
  target
}

We first create a custom configuration called "asResource", and then add a dependency to our fat JAR with that scope, so it is not added to the classpath. Then we let SBT look for the file on the classpath of that custom scope, and copy it to the managed resources (which is were generated files would go). While we are at it, we also map the file name to a known value.

As a result, the file is added to the root of the application resources with a fixed name (in this example, "my-job.jar").

Challenge 3: Getting Spark to run the job

The weapon of choice to run Spark jobs from code is SparkLauncher, which provides a builder for you to add all the required settings, and then just goes to the shell and runs spark-submit.

I've created two classes to help run the job:

import org.apache.commons.io.IOUtils
import org.apache.spark.launcher.SparkLauncher

import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.io.Source
import scala.reflect.io.File

class SparkImportJob(process: Process)(implicit executionContext: ExecutionContext) {

  def exitCode: Future[Int] = Future {
    process.waitFor()
  }

  def stderrIterator: Iterator[String] = {
    Source.fromInputStream(process.getErrorStream).getLines()
  }

  def stdoutIterator: Iterator[String] = {
    Source.fromInputStream(process.getInputStream).getLines()
  }

}

class SparkImportService(sparkHome: String, hadoopConfDir: String, yarnConfDir: String, 
    someCustomSetting: String)(implicit executionContext: ExecutionContext) {

  val jarLocation = "my-job.jar"

  def runSparkJob(): SparkImportJob = {
    val tempFileUri = copyJarToTempFile()

    val env = Map(
      "HADOOP_CONF_DIR" -> hadoopConfDir,
      "YARN_CONF_DIR" -> yarnConfDir,
      "some-custom-setting" -> someCustomSetting
    )

    val process = new SparkLauncher(env.asJava)
      .setSparkHome(sparkHome)
      .setAppResource(tempFileUri.toString)
      .setAppName("My Spark Job")
      .setMainClass("Importer") //Main class in fat JAR
      .setMaster("yarn-client")
      .setConf("spark.driver.memory", "2g")
      .setConf("spark.yarn.queue", "root.imports")
      .setConf("spark.yarn.am.memory", "1g") //This does not actually work.
      .setConf("spark.driver.memory", "1g") //Neither does this.
      .setConf("spark.akka.frameSize", "200")
      .setConf("spark.executor.memory", "8g")
      .setConf("spark.executor.instances", "8")
      .setConf("spark.executor.cores", "12")
      .setConf("spark.default.parallelism", "5000")
      .setVerbose(true)
      .launch()
    new SparkImportJob(process)
  }

  private def copyJarToTempFile(): URI = {
    val tempFile = File.makeTemp("my-job", ".jar")
    val out = tempFile.bufferedOutput()
    try {
      val in = getClass.getClassLoader.getResourceAsStream(jarLocation)
      IOUtils.copy(in, out)
      tempFile.jfile.toURI
    } finally {
      IOUtils.closeQuietly(out)
    }
  }

}

This looks mostly straightforward, but there are some subleties to it:

We finally call the whole thing from a Play controller:

import play.api.mvc.{ Action, Controller }

import scala.concurrent.{ Future, ExecutionContext }

class ImportController(sparkImportJob: SparkImportService)(implicit executionContext: ExecutionContext) extends Controller {

  def runImport() = Action.async {
    val importJob = sparkImportJob.runSparkJob()

    Future {
      importJob.stderrIterator.foreach { line =>
        println(line)
      }
    }

    Future {
      importJob.stdoutIterator.foreach { line =>
        println(line)
      }
    }

    importJob.exitCode.map {
      case 0 => Ok("Import done, exit code 0.")
      case exitCode => InternalServerError(s"Error, process ended with exit code $exitCode.")
    }
  }

}

For instantiating the controller, we use the same ExecutionContext as for the SparkImportService, as it also runs two threads just to copy stderr and stdout of the Spark job to its own stdout, which gives us the chance to actually see the log output of the job.

Summary

The good:

The bad:

Overall, I think this solution stinks, but it works.

Comments