Wiki

Clone wiki

demeter-course / ScalaExamples

Scala Examples

Using Spark

Before doing anything, we need to get a working spark context, i.e. org.apache.spark.SparkContext.

Locally

The “local” context is well explained in the Getting Started Guide. Create it like this:

val jarFiles = List("target/scala-2.10/<your-big-jar>-assembly-1.0.jar")
val sparkContext = new SparkContext(
  "local",                // Yes, it's local
  "Simple Test App",      // The name that will be displayed in the monitoring web-app
  /* sparkHome = */ null, // Where to find spark
  jarFiles                // What to run
)

We can leave sparkHome = null when spark is included in the Jar (see below for an SBT example), if not, we need a path to the installation directory (like /hpc/users/<user>/usr/build/spark-0.9.1/).

On Demeter

This time we use the class SparkConf to configure the run (c.f. some of the the possible key/value pairs):

val sparkMaster = "spark://demeter-mgmt1.demeter.hpc.mssm.edu:7077"
val applicationName = "Some Test"
val configuration = new SparkConf()
configuration.setMaster(sparkMaster)
configuration.setAppName(applicationName)
configuration.set("spark.executor.memory", "1g")
configuration.set("spark.cores.max", "4")
configuration.setJars(jarFiles)
val sparkContext = new SparkContext(configuration)

Computing Something

Once the context is well configured, this should work on Demeter (if your run locally, change the inputFile):

// We can give full HDFS URLs:
val inputFile = "hdfs://demeter-nn1.demeter.hpc.mssm.edu//user/ahujaa01/yangc07_sample1_filelist.txt"
val logData = sparkContext.textFile(inputFile, minSplits = 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
val result = "Lines with a: %s, Lines with b: %s".format(numAs, numBs)

println("[%s] Lines with '/': %s, Lines with b: %s".format(SparkContext.jarOfClass(this.getClass), numAs, numBs))

Check-out some other examples.

Access HDFS

Spark gives access to Hadoop's Configuration (org.apache.hadooop.conf.Configuration) but we still need to provide the URI:

val fileSystem = org.apache.hadoop.fs.FileSystem.get(
  new java.net.URI("hdfs://demeter-nn1.demeter.hpc.mssm.edu"),
  sparkContext.hadoopConfiguration)

We get an org.apache.hadooop.fs.Filesystem from which we can keep going with an org.apache.hadooop.fs.FSDataOutputStream

val outputStream = fileSystem.create(
  new org.apache.hadoop.fs.Path("/user/<user>/result_from_simple_test.txt"),
  true /* overwrite */)
outputStream.writeBytes("Hello World!")
outputStream.close()

Monitoring

As explained elsewhere:

Killing Jobs

Killing should be easy

./bin/spark-class org.apache.spark.deploy.Client kill spark://demeter-mgmt1.demeter.hpc.mssm.edu:7077  app-20140423113720-0026

but it seems broken.

Building a Heavy Jar With SBT

To deploy a fully loaded Jar file with SBT one need to use the sbt-assembly plugin, which requires 2 files on top of the main SBT config.

./<main-sbt-file.sbt>
./assembly.sbt
./project/plugins.sbt

The file ./project/plugins.sbt registers the plugin:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

The file assembly.sbt configures the plugin (remains standard):

import AssemblyKeys._ // put this at the top of the file

assemblySettings

// your assembly settings here

In the main, one need to exclude duplicates from the package:

Seq(
  name := "SparklingTest", // Name used to generate the Jar filename
  version := "1.0",
  scalaVersion := "2.10.3",
  libraryDependencies ++= Seq(
    // Here is how we force the version of `hadoop-client`:
    "org.apache.hadoop" % "hadoop-client" % "2.2.0", 
    "org.apache.spark" %% "spark-core" % "0.9.1").map({ dep =>
      // There are conflicts while trying to assemble the big Jar,
      // this is how they are solved:
      // (cf. https://github.com/sbt/sbt-assembly)
      dep.exclude("org.mortbay.jetty", "servlet-api").
        exclude("commons-beanutils", "commons-beanutils-core").
        exclude("commons-collections", "commons-collections").
        exclude("commons-collections", "commons-collections").
        exclude("com.esotericsoftware.minlog", "minlog").
        exclude("asm", "asm").
        exclude("org.apache.hadoop", "hadoop-yarn-common")
    }),
  resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
  )

Updated