Apache Spark 2.3: writing simple data processing application with Datasets/DataFrames API

in #utopian-io6 years ago (edited)

What Will I Learn?

Apache Spark is an open-source framework for distributed data processing. It provides relatively simple and easy to use API for efficient distributed computation. You will learn how to develop simple project that utilizes Apache Spark's functionality.

  • Basic principles of Spark's API design
  • Setting up Sbt/Scala/Spark environment
  • Writing simple Spark application

Requirements

This tutorial requires reader to be proficient in at least programming, preferably with Scala language. and have general understanding of the idea

  • Understanding programming principles and concepts
  • General understanding of an idea of distributed computation
  • Prefferably some Scala experience

Difficulty

  • Advanced

Tutorial Contents

In this tutorial we will explain general principles of Spark's design and gain some practical Spark programming experieance by creating simple project that will count number of words in a text file with Dataset and DataFrame API's.

Your question and suggestions are welcomed in comment section, as always. Examples are tested on MacOS, but should work without changes in Linux as well. Windows users are advised to use virtual machine to emulate linux environment.

Core Spark concepts

Spark architecture revolves around representing distributed data as a single complex immutable datatype inside the application. There are 3 different kinds of such datatypes:

  • Resilient Distributed Dataset (RDD) - was a standard datatype for Spark version 1.x. It is still fully supported in Spark 2.x, including latest release, which is Spark 2.3.
  • Dataset - is a recommended datatype to use with Spark versions 2.x. It provides as much flexibility as RDD, but is much faster because of better internal optimisation techniques.
  • DataFrame - is dataset representation where data is organized in named columns, similar to relational databases.

Environment setup

Spark programs can be written in Scala, Java, Python and R. We will focus on Scala in this tutorial, because in my experience it is the most common language when it comes to working with Spark, and because Spark itself is written in Scala as well.

Ensure you have Java Runtime Environment on your machine by running java -version. If needed, navigate to Java Download web-page and follow instruction to install latest version of Java.

To install Scala follow to the Scala web-site and choose Download SBT option. SBT is short for Scala Build Tool, It handles everything related to project configuration and dependancy management for us, including running correction version of the Scala. We will show how to use SBT for Scala/Spark project shortly.

Next we will need to download Apache Spark. Head to the Apache Spark download web-page and choose Spark release 2.3.0, package type "Pre-built for Apache Hadoop 2.7 and later". Save the resulting archive and unarchive it to the suitable location in your system.

App Stracture

Spark applications look a bit different than normal Scala applications. Normally, point of entry for Scala application is an object that inherits from App, but with spark Application is usually just an object without App parent, which has main method. main accepts array of strings - command line parameters passed on application start.

So an empty spark application template looks like this:

object Tutorial {
    def main(args: Array[String]) {
        //... main application code goes here
    }
}

Spark Session

In this tutorial we will focus on Spark Dataset API, as it is newer and more convenient way of writing Spark applications than RDD API.

With dataset API, application context is managed in SparkSession. SparkSession represents set of configuration parameters for connection between Spark application and Spark computing cluster.

SparkSession is created with Spark Session builder:

import org.apache.spark.sql.SparkSession // import SparkSession

object Tutorial {

    def main(args: Array[String]) {
        val spark = SparkSession
            .builder()
            .appName("LaxamSparkTutorial")
            .master("local[4]")
            .getOrCreate()
        import spark.implicits._
        //... main application code goes here
    }
}

This Spark Session has application name set to LaxamSparkTutorial, and master url is configured to be local[4], which is spark's way of configuring execution to be performed on local machine using 4 cores.

import spark.implicits._ uses created SparkSession to import multiple syntactic helpers. Normally you will do this for each spark session, even in most trivial cases.

Setting up sbt project

"Word Count" program is to Data Processing world what "Hello World" is to classical programming. We will keep up with this tradition and write a simple program to count words in a file. As an input we will use my latest Steemit post Bitcoin: how it supposed to be, and what it turned out to be, but you can use any text you like.

Before extending our previous example with code for performing wordcount we need to organise our code into proper Scala project structure. Create a directory for our project and navigate into it.

$ mkdir laxam_spark_tutorial && cd laxam_spark_tutorial

Copy the text file you want to analyze in here. In my case it's called laxam_bitcoin_post.md.

Now create file with name build.sbt and open it for editing with your favorite text editor. This is a configuration file for sbt. For our project we just need to set four values:

  • name - human readable project name
  • version - current version of the project
  • scalaVersion - version of Scala language to use
  • libraryDependencies - external libraries to load

Here is how built.sbt will look for our project:

name := "Laxam Spark Tutorial"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"

Note that for sbt configuration it is important to keep a blank line after each parameter!

Now you need to create subfolder main/scala where we will put the code for our app:

$ mkdir -p main/scala

Now create and open file main/scala/Tutorial.scala: this will be the location for our main program.

WordCount with spark

First thing we add to our Spark application is, of course, reading the data from a file:

val inFile = "<path_to_project_directory>/laxam_bitcoin_post.md"
val txt = spark.read.textFile(inFile).cache()

This line will read the whole file, create Dataset of strings out of it and, thanks to cache method, persist it in memory and on disk.

Now it's time to perform series of transformation on our data. Dataset API provides multitude of methods to choose from, among them are classical map, flatMap and filter which we are going to use.

val txt = spark.read.textFile(inFile).cache()
val df = txt
    .flatMap(_.split(" ")) // split lines into words
    .filter(_ != "") // remove empty lines (they don't count as words)
    .map(_.toLowerCase()) // convert all words to lowercase
    .toDF() // convert to DataFrame
    /* Conversion to dataframe is needed because
    it is much more convinient to use DataFrame api
    for operations that has to do with grouping and
    aggregation.
    Let's continue...  */
    .groupBy($"value") // group data on values
    .agg(count("*") as "count") // aggregate count
    .orderBy($"count" desc) // sort in descending order

Thats whole opperation. It performs calculation through series of interconnected opperations on immutable sets, and this allows spark to make decisions on how to split the work between execution nodes. What's left is just to ask Spark to display the results for us, and stop the application. Complete Tutorial.java will look like this:

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object Tutorial {
  def main(args: Array[String]) {
    val inFile = "./laxam_bitcoin_post.md"
    val spark = SparkSession.builder.appName("WordCount").getOrCreate()
    import spark.implicits._

    val txt = spark.read.textFile(inFile).cache()
    val df = txt
      .flatMap(_.split(" "))
      .filter(_ != "")
      .map(_.toLowerCase())
      .toDF()
      .groupBy($"value")
      .agg(count("*") as "num")
      .orderBy($"num" desc)
    df.show()
    spark.stop()
  }
}

To build this application we need to run single command:

$ sbt package
< ... job logs omitted ... >
[info] Done packaging.
[success] Total time: 6 s, completed Mar 4, 2018 9:38:48 PM

Resulting jar-file is generated in target/scala-2.11/laxam-spark-tutorial_2.11-1.0.jar (note that name is created from the project name on in your build.sbt, so it might be different for you). Now we can use this jar to execute our program in Spark. Navigate to spark folder and execute spark-submit binary like this:

./bin/spark-submit --class "Tutorial" ../laxam_spark_tutorial/target/scala-2.11/laxam-spark-tutorial_2.11-1.0.jar

Spark will execute our program and print out the top 20 rows in DataFrame (I'm forced to insert this illustration as a picture because Utopian editor breaks formatting):

Screen Shot 2018-03-05 at 00.24.31.png

Summary

Spark is a very flexible framework for distributed computations. In this tutorial we have learned how to setup Spark environment, create simple Spark project in Scala with sbt and how to write a WordCount program using Dataset and DataFrame API.

I hope this tutorial was as fun to read as it was to write it.



Posted on Utopian.io - Rewarding Open Source Contributors

Sort:  

Hey @laxam I am @utopian-io. I have just upvoted you!

Achievements

  • You have less than 500 followers. Just gave you a gift to help you succeed!
  • Seems like you contribute quite often. AMAZING!

Suggestions

  • Contribute more often to get higher and higher rewards. I wish to see you often!
  • Work on your followers to increase the votes/rewards. I follow what humans do and my vote is mainly based on that. Good luck!

Get Noticed!

  • Did you know project owners can manually vote with their own voting power or by voting power delegated to their projects? Ask the project owner to review your contributions!

Community-Driven Witness!

I am the first and only Steem Community-Driven Witness. Participate on Discord. Lets GROW TOGETHER!

mooncryption-utopian-witness-gif

Up-vote this comment to grow my power and help Open Source contributions like this one. Want to chat? Join me on Discord https://discord.gg/Pc8HG9x

Thank you for the contribution. It has been approved.

You can contact us on Discord.
[utopian-moderator]

Coin Marketplace

STEEM 0.16
TRX 0.16
JST 0.031
BTC 58954.91
ETH 2508.61
USDT 1.00
SBD 2.45