Wiki
Clone wikidemeter-course / ScalaExamples
Scala Examples
Using Spark
Before doing anything, we need to get a working spark context,
i.e.
org.apache.spark.SparkContext
.
Locally
The “local” context is well explained in the Getting Started Guide. Create it like this:
val jarFiles = List("target/scala-2.10/<your-big-jar>-assembly-1.0.jar") val sparkContext = new SparkContext( "local", // Yes, it's local "Simple Test App", // The name that will be displayed in the monitoring web-app /* sparkHome = */ null, // Where to find spark jarFiles // What to run )
We can leave sparkHome = null
when spark is included in the Jar (see below
for an SBT example), if not, we need a path to the installation directory
(like /hpc/users/<user>/usr/build/spark-0.9.1/
).
On Demeter
This time we use the class SparkConf to configure the run (c.f. some of the the possible key/value pairs):
val sparkMaster = "spark://demeter-mgmt1.demeter.hpc.mssm.edu:7077" val applicationName = "Some Test" val configuration = new SparkConf() configuration.setMaster(sparkMaster) configuration.setAppName(applicationName) configuration.set("spark.executor.memory", "1g") configuration.set("spark.cores.max", "4") configuration.setJars(jarFiles) val sparkContext = new SparkContext(configuration)
Computing Something
Once the context is well configured, this should work on
Demeter (if your run locally, change the inputFile
):
// We can give full HDFS URLs: val inputFile = "hdfs://demeter-nn1.demeter.hpc.mssm.edu//user/ahujaa01/yangc07_sample1_filelist.txt" val logData = sparkContext.textFile(inputFile, minSplits = 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() val result = "Lines with a: %s, Lines with b: %s".format(numAs, numBs) println("[%s] Lines with '/': %s, Lines with b: %s".format(SparkContext.jarOfClass(this.getClass), numAs, numBs))
Check-out some other examples.
Access HDFS
Spark gives access to Hadoop's Configuration
(org.apache.hadooop.conf.Configuration
)
but we still need to provide the URI:
val fileSystem = org.apache.hadoop.fs.FileSystem.get( new java.net.URI("hdfs://demeter-nn1.demeter.hpc.mssm.edu"), sparkContext.hadoopConfiguration)
We get an
org.apache.hadooop.fs.Filesystem
from which we can keep going with an
org.apache.hadooop.fs.FSDataOutputStream
val outputStream = fileSystem.create( new org.apache.hadoop.fs.Path("/user/<user>/result_from_simple_test.txt"), true /* overwrite */) outputStream.writeBytes("Hello World!") outputStream.close()
Monitoring
As explained elsewhere:
- one can monitor the job here: http://demeter-mgmt1.demeter.hpc.mssm.edu:18080/ (with the proxy, Demeter credentials),
- the file system is browsable on Hue (without the proxy, 2-factor auth).
Killing Jobs
Killing should be easy
./bin/spark-class org.apache.spark.deploy.Client kill spark://demeter-mgmt1.demeter.hpc.mssm.edu:7077 app-20140423113720-0026
but it seems broken.
Building a Heavy Jar With SBT
To deploy a fully loaded Jar file with SBT one need to use the sbt-assembly plugin, which requires 2 files on top of the main SBT config.
./<main-sbt-file.sbt> ./assembly.sbt ./project/plugins.sbt
The file ./project/plugins.sbt
registers the plugin:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
The file assembly.sbt
configures the plugin (remains standard):
import AssemblyKeys._ // put this at the top of the file assemblySettings // your assembly settings here
In the main, one need to exclude duplicates from the package:
Seq( name := "SparklingTest", // Name used to generate the Jar filename version := "1.0", scalaVersion := "2.10.3", libraryDependencies ++= Seq( // Here is how we force the version of `hadoop-client`: "org.apache.hadoop" % "hadoop-client" % "2.2.0", "org.apache.spark" %% "spark-core" % "0.9.1").map({ dep => // There are conflicts while trying to assemble the big Jar, // this is how they are solved: // (cf. https://github.com/sbt/sbt-assembly) dep.exclude("org.mortbay.jetty", "servlet-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-collections", "commons-collections"). exclude("commons-collections", "commons-collections"). exclude("com.esotericsoftware.minlog", "minlog"). exclude("asm", "asm"). exclude("org.apache.hadoop", "hadoop-yarn-common") }), resolvers += "Akka Repository" at "http://repo.akka.io/releases/" )
Updated