Spark Logo

Spark is a Big Data framework which allows you to run batch jobs, query data interactively, and process incoming information as it streams into your system.

Spark runs on top of normal Hadoop infrastructure – if you already have a Hadoop Distributed File System (HDFS) cluster set up, you can run it on top of that, and run your Spark jobs on it without modifying or disturbing anything you’re already doing. Like Hadoop, it can do batch processing, although it’s typically quite a bit faster than Hadoop due to aggressive caching of data in RAM. Spark has several additional tricks, though.

Hadoop workloads are typically centred around the concept of doing batch jobs on large amounts of data, and typically it’s not used for interactive querying. Spark, in contrast, has the ability to chainsaw through large amounts of data, store pared-down views of the data in structures called Resilient Distributed Datasets (RDDs), and do interactive queries with sub-second response times.

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods. An RDD is a queryable collection of information stored across multiple machines in your cluster. You can think of an RDD as a persisted Scala collection, ready for manipulation with Scala’s (awesome) collection functions.

You can ingest data with Spark, clean it up, join up data from multiple sources, boil it down to the smallest data set you need, and save the resultant data set as a new RDD for speedy querying. Spark distributes the RDD across your HDFS cluster and takes care of all low-level details for you.

Spark can also process very high volume streams of incoming information in real time. So, you could attach to the Twitter firehose, the Amazon product-purchase firehose, a data stream coming from multiple point of sale systems, or any other source of rapid-fire information. You can then process the stream as it arrives, saving results to the Spark cluster as a new or existing RDD.

All of these use cases (batch, query, and stream processing) can use exactly the same algorithms, data structures, and RDDs. You might do an initial run of batch processing on stored data, then switch to stream processing for new data as it arrives, and then do interactive querying of the RDDs you’ve created. Having only one stack for all three purposes greatly simplifies both the necessary skillsets and the infrastructure you need to set up.

Spark is very easy to get started with. The easiest way to start is by setting it up to run on a single machine. Spark works natively in both Java and Scala.

If you haven’t built an application in Scala before, it’s fairly easy to install the prequisites: Java, SBT, Giter8, and Conscript. Let’s generate a new sbt project skeleton, so that we can try running a spark job in it.

g8 ./basic-project

That will generate you a very basic sbt project skeleton to work from. The g8 utility will ask you a few questions about your project, answer them somewhat like this:

name [Basic Project]: Spark Simple App organization [com.example]: com.orbifold version [0.1.0-SNAPSHOT]: 1.0

As a test, build it. First run sbt. All the project dependencies will be downloaded, and you’ll end up at an sbt shell.

cd spark-simple-app sbt

If you don’t yet have sbt installed, install it for your platform then re-run the command.

Compile the application and run it:

compile run

It should build and run, with output that looks something like this:

$ sbt [info] Loading project definition from ~/Desktop/spark-simple-app/project 
[info] Updating {file:~/Desktop/spark-simple-app/project/} spark-simple-app-build... 
[info] Resolving org.fusesource.jansi#jansi;1.4 ... 
[info] Done updating. [info] Set current project to Spark Simple App (in build file:~/Desktop/spark-simple-app/) > compile 
[info] Updating {file:~/Desktop/spark-simple-app/}spark-simple-app... 
[info] Resolving org.fusesource.jansi#jansi;1.4 ... 
[info] Done updating. [info] Compiling 1 Scala source to ~/Desktop/spark-simple-app/target/scala-2.10/classes... 
[success] Total time: 1 s, completed 22-May-2014 10:11:36 > run [info] Running com.constructiveproof.sparksimpleapp.App Hello com.constructiveproof.Spark Simple App![success] 
Total time: 0 s, completed 22-May-2014 10:11:38 >

Now it’s time to make some additions to the build file.

An sbt project usually contains a file called build.sbt. This defines all the project’s dependencies, where libraries are downloaded from, what version of Scala the project uses, and many other settings. Add the Spark 0.9.1 dependency to it, and also add the Akka resolvers to it (they’re both necessary) in order to get Spark downloaded onto your machine.

name := "Spark Simple App"

organization := "com.orbifold"

version := "1.0"

scalaVersion := "2.10.3"

crossScalaVersions := Seq("2.10.3", "2.11.0-M8")

libraryDependencies ++= Seq(
 "org.apache.spark" %% "spark-core" % "0.9.1", // add this
 "org.scalatest" %% "scalatest" % "2.1.RC1" % "test",
 "org.scalacheck" %% "scalacheck" % "1.11.3" % "test"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" // and add this

initialCommands := "import com.orbifold.sparksimpleapp._"

An sbt project details the jar files that your project depends on. This line adds the spark jar:

"org.apache.spark" %% "spark-core" % "0.9.1", // add this

The spark jar files depend on another library, called Akka. You add this resolver so that sbt knows where to download the Akka jars from:

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" // and add this

In order to pick up the dependencies, you can either type exit at the sbt prompt and run sbt again, or just type reload at the sbt prompt. Now let’s turn our attention to the Spark app itself. We’ll adapt the Simple App example from the Spark 0.9.1 documentation to get something running quickly. By default, the generated project template has placed an object called App in the file src/main/scala/App.scala. Delete that (or rename it to SimpleApp), and enter the following code in src/main/scala/SimpleApp.scala:

// src/main/scala/SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
 def main(args: Array[String]) {
 val textFile = "/etc/hosts" // Should be some file on your system
 val sc = new SparkContext("local", "Simple App")
 val logData = sc.textFile(textFile, 2).cache()
 val numAs = logData.filter(line => line.contains("a")).count()
 val numBs = logData.filter(line => line.contains("b")).count()
 println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
 }
}

This is a really simple Spark program. Let’s take it a few lines at a time to see what it does. First, we pointed our app at some example data. It’s a simple text file:

val textFile = "/etc/hosts" // Should be some file on your system

Assuming you’re on a *nix system, you’ll have a file at /etc/hosts, so that should work. If you’re on Windows, pick any text file you’ve got kicking around. If you want to do some speed testing, I’d suggest that you download one of the many publicly available large datasets on the internet. My system rips through the 1.8GB subset of the Million Songs Database in about 2 seconds. Next, we set up what’s called a SparkContext:

val sc = new SparkContext("local", "Simple App")

This tells our SimpleApp where Spark lives, and the name of our Spark job. The name serves as a job identifier, and shows up in the Spark web interface if you’re running a cluster. Right now, since we’re saying our Spark cluster is local, there’s no need to have a cluster set up.

val logData = sc.textFile(textFile, 2).cache()

This loads up some data into the SparkContext sc. We tell it to cache() the result, although in the case of /etc/hosts, this is not going to make a difference. If we were ingesting a huge file on a cluster, though, it’d be ingested by Spark, and cached as RDDs across all machines in our cluster, so you’d see a major performance boost the next time you attempted to use this data source.

val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

Now this is truly interesting, at least to me. Our logData value is a multi-line RDD[String] in Scala. We can iterate over every line using Spark’s RDD collections. These collections have most of the methods of the regular Scala collections, but they’re modified to operate on RDDs spread across your Spark cluster. When you use the collections for querying data, it feels very much as though you’re working with regular Scala collections (e.g. Lists, Maps, etc). In fact, you’re using a special set of collections which are transparently running map-reduce algorithms, locally or across a multi-machine Akka cluster. That’s basically what Spark is.

That’s pretty much it. You can compile, package, and run your job with this command in the sbt console:

~; compile; package; run

That tilde at the front of the command will ensure that your code is recompiled, re-packaged, and re-run every time you modify and save the SimpleApp.scala file:

> ~; compile; package; run
[info] Compiling 1 Scala source to /code-examples/spark-simple-app/target/scala-2.10/classes...
[success] Total time: 1 s, completed 22-May-2014 10:37:44
[info] Packaging /orbifold/code-examples/spark-simple-app/target/scala-2.10/spark-simple-app_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed 22-May-2014 10:37:44
[info] Running SimpleApp
<snip>
Lines with a: 7, Lines with b: 4
14/05/22 10:37:46 INFO network.ConnectionManager: Selector thread was interrupted!
[success] Total time: 2 s, completed 22-May-2014 10:37:46
1. Waiting for source changes... (press enter to interrupt)

As you can see, Spark is remarkably easy to set up and play with in local mode.

Running locally is a great way to try Spark. There’s not much infrastructure to set up, and you can easily get a feel for how it feels to load up your data and use Spark’s collections to manipulate it. In real life,  you’ll want to run your jobs on a cluster.