Cloud Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Cloud Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data.

What you'll learn

What you'll need

How will you use use this tutorial?

Read it through only Read it and complete the exercises

How would you rate your experience with building HTML/CSS web apps?

Novice Intermediate Proficient

How would you rate your experience with using Google Cloud Platform services?

Novice Intermediate Proficient

Codelab-at-a-conference setup

The instructor will be sharing temporary accounts with existing projects that are already setup so you do not need to worry about enabling billing or incurring costs associated with running this codelab. Note that these accounts will be disabled soon after the codelab is finished.

After the instructor gives you a temporary username and password, log into Google Cloud Console: https://console.cloud.google.com/.

Here's what you should after you are logged in :

Note the project ID you were assigned ( "codelab-test003" in the screenshot above). It will be referred to later in this codelab as PROJECT_ID.

Enable the Cloud Dataproc and Google Compute Engine APIs

Click on the menu icon in the top left of the screen.

Select API Manager from the drop down.

Search for "Google Compute Engine" in the search box. Click on "Google Compute Engine" in the results list that appears.

On the Google Compute Engine page click "Enable".

After the API is enabled, click the arrow to go back.

Now, search for "Google Cloud Dataproc API" and enable it.

You will do all of the work from the Google Cloud Shell, a command line environment running in the Cloud. This Debian-based virtual machine is loaded with common development tools (gcloud, git and others), and offers a persistent 5GB home directory. Open the Google Cloud Shell by clicking on the icon on the top right of the screen:

You will be using the gcloud command from your Cloud Shell window. To simplify those commands, set a default zone:

gcloud config set compute/zone us-central1-a

Install Scala and sbt:

echo "deb https://dl.bintray.com/sbt/debian /" |
sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install -y scala apt-transport-https sbt

You will use sbt, an open source build tool, to build the JAR for the job you will submit to the Cloud Dataproc cluster. This JAR will contain your program and the required packages necessary to run the job. The job detects faces in a set of image files in a Google Cloud Storage (GCS) bucket of your choice, and writes out image files, with the faces outlined, to another or the same Cloud Storage bucket.

Download the code from github

The code for this codelab is available in the Cloud Dataproc repository on github. Clone the repository, then cd into the directory for this codelab:

git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/opencv-haarcascade

If you cloned the Cloud Dataproc repository in the previous step, you can skip this section and go to Build the Feature Detector.

If you did not clone the Cloud Dataproc github repository, follow the instructions below to set up the codelab files.

Use the mkdir command to create the project directory, which will contain the source files, the configuration files, and the build objects. Then, use the cd command to change to the project directory:

mkdir feature_detector
cd feature_detector

Copy the following text into a file named FeatureDetector.scala. The program detects the feature specified in the given classifier, and outputs the images, with the feature outlined, to the output directory (the output directory can optionally be specified in the command-line arguments passed to the feature detector).

package com.google.spark.feature_detector

import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles
import org.bytedeco.javacpp.opencv_core.Mat
import org.bytedeco.javacpp.opencv_core.RectVector
import org.bytedeco.javacpp.opencv_core.Scalar
import org.bytedeco.javacpp.opencv_imgcodecs.imread
import org.bytedeco.javacpp.opencv_imgcodecs.imwrite
import org.bytedeco.javacpp.opencv_imgproc.rectangle
import org.bytedeco.javacpp.opencv_objdetect.CascadeClassifier

object FeatureDetector {

  def main (args: Array[String]){
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    
    val timestamp1 = System.currentTimeMillis()

    // The 1st argument is the path of the classifier.
    // The 2nd argument is the directory where the input images are located.
    // The optional 3rd argument is the directory where the output images will be written.
    if (args.length < 2 || args.length > 3) {
      println("usage: <classifier-path> <input-directory> [output-directory]")
      System.exit(-1)
    }
    val classifierPath = args(0)
    val inputDirName = args(1)
    val outputDirName = if (args.length == 2) inputDirName else args(2)

    val classifier = downloadToCluster(classifierPath, sc)

    val inputDir = new Path(inputDirName)
    val fs = inputDir.getFileSystem(new Configuration())
    val files = fs.listStatus(inputDir)
    val filePaths = files.map(_.getPath().toString())

    sc.parallelize(filePaths).foreach({
      processImage(_, classifier.getName(), outputDirName)
    })

    val timestamp2 = System.currentTimeMillis()

    println("\n\n")
    println("The output files can be found at " + outputDirName)
    println("Total time: " + (timestamp2-timestamp1) + " milliseconds.")
    println("\n\n")
  }

  def processImage(inPath: String, classifier: String, outputDir: String): String = {
    val classifierName = SparkFiles.get(classifier)

    // Download the input file to the worker.
    val suffix = inPath.lastIndexOf(".")
    val localIn = File.createTempFile("pic_",
        inPath.substring(suffix),
        new File("/tmp/"))
    copyFile(inPath, "file://" + localIn.getPath())

    val inImg = imread(localIn.getPath())
    val detector = new CascadeClassifier(classifierName)
    val outImg = detectFeatures(inImg, detector)

    // Write the output image to a temporary file.
    val localOut = File.createTempFile("out_",
        inPath.substring(suffix),
        new File("/tmp/"))
    imwrite(localOut.getPath(), outImg)

    // Copy the file to the output directory.
    val filename = inPath.substring(inPath.lastIndexOf("/") + 1)
    val extension = filename.substring(filename.lastIndexOf("."))
    val name = filename.substring(0, filename.lastIndexOf("."))
    val outPath = outputDir + name +"_output" + extension
    copyFile("file://" + localOut.getPath(), outPath)
    return outPath
  }

  // Outlines the detected features in a given image.
  def detectFeatures(img: Mat, detector: CascadeClassifier): Mat = {
    val features = new RectVector()
    detector.detectMultiScale(img, features)
    val numFeatures = features.size().toInt
    val outlined = img.clone()

    // Draws the rectangles on the detected features.
    val green = new Scalar(0, 255, 0, 0)
    for (f <- 0 until numFeatures) {
      val currentFeature = features.get(f)
      rectangle(outlined, currentFeature, green)
    }
    return outlined
  }

  // Copies file to and from given locations.
  def copyFile(from: String, to: String) {
    val conf = new Configuration()
    val fromPath = new Path(from)
    val toPath = new Path(to)
    val is = fromPath.getFileSystem(conf).open(fromPath)
    val os = toPath.getFileSystem(conf).create(toPath)
    IOUtils.copyBytes(is, os, conf)
    is.close()
    os.close()
  }

  // Downloads the file to each node in the cluster.
  def downloadToCluster(path: String, sc: SparkContext): File = {
    val suffix = path.substring(path.lastIndexOf("."))
    val file = File.createTempFile("file_", suffix, new File("/tmp/"))
    copyFile(path, "file://" + file.getPath())
    sc.addFile("file://" + file.getPath())
    return file
  }
}

The build.sbt file defines build settings and includes a list of the project dependencies, which allows sbt to build the JAR with all the packages.

Copy the following text into a file called build.sbt. This file lists the packages and dependencies for your project:

lazy val root = (project in file(".")).
  settings(
    name := "feature_detector",
    version := "1.0",
    scalaVersion := "2.10.6"
  )

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided"

libraryDependencies += "org.bytedeco" % "javacv" % "1.2"

libraryDependencies += "org.bytedeco.javacpp-presets" % "opencv" % "3.1.0-1.2" classifier "linux-x86_64"

libraryDependencies += "org.bytedeco.javacpp-presets" % "opencv" % "3.1.0-1.2"

classpathTypes += "maven-plugin"

// EclipseKeys.withSource := true

// EclipseKeys.withJavadoc := true

Run the following commands to add the assembly plugin to sbt. This enables sbt to create a fat JAR of your project with all of its dependencies:

mkdir project
cd project
echo "addSbtPlugin(\"com.eed3si9n\" % \"sbt-assembly\" % \"0.14.3\")" > plugins.sbt
cd ..

If you have Eclipse installed, you may wish to uncomment (by removing the leading double-slash on each line, //) the last two lines in sbt.build, which reference EclipseKeys.

Build the JAR file by running the following command in the main project directory:

sbt assembly

The JAR file containing your program will be: target/scala-2.10/feature_detector-assembly-1.0.jar.

Note: building the JAR file for the first time can take up to ten minutes while it downloads the necessary files and compiles everything. This may be a good time to take a break.

The program in this codelab reads a collection of images from a bucket, looks for faces in those images, and writes modified images back to a bucket.

Select a name for your bucket. In this step, we set a shell variable to your bucket name. This shell variable is used in the following commands to refer to your bucket.

MYBUCKET="${USER/_/-}-images-${RANDOM}"
echo MYBUCKET=${MYBUCKET}

Use the gsutil program, which comes with gcloud in the Cloud SDK, to create the bucket to hold your sample images:

gsutil mb gs://${MYBUCKET}

Download some sample images into your bucket:

curl http://www.publicdomainpictures.net/pictures/20000/velka/family-of-three-871290963799xUk.jpg | gsutil cp - gs://${MYBUCKET}/imgs/family-of-three.jpg
curl http://www.publicdomainpictures.net/pictures/10000/velka/african-woman-331287912508yqXc.jpg | gsutil cp - gs://${MYBUCKET}/imgs/african-woman.jpg
curl http://www.publicdomainpictures.net/pictures/10000/velka/296-1246658839vCW7.jpg | gsutil cp - gs://${MYBUCKET}/imgs/classroom.jpg

View the contents of your bucket:

gsutil ls -R gs://${MYBUCKET}

Select a name to use for your cluster. In this step, we set a shell variable to your cluster name. This shell variable is used in the following commands to refer to your cluster.

MYCLUSTER="${USER/_/-}-codelab"
echo MYCLUSTER=${MYCLUSTER}

Set a default GCE zone to use:

gcloud config set compute/zone us-central1-a

Create a new cluster:

gcloud dataproc clusters create --worker-machine-type=n1-standard-2 ${MYCLUSTER}

The default cluster settings, which include two worker nodes, should be sufficient for this codelab. We specify n1-standard-2 as the worker machine type to reduce the overall number of cores used by our cluster. See the Cloud SDK gcloud dataproc clusters create command for information on using command line flags to customize cluster settings.

In this codelab, the program is used as a face detector, so the inputted Haar classifier must describe a face. A Haar classifier is an XML file that is used to describe features that the program will detect. You will need to download and include this file path in your job submission. The file can be found here, and will be copied to your GCS bucket (see below). You will be using its GCS path as the first argument when you submit your job to your Cloud Dataproc cluster.

Load the face detection configuration file into your bucket:

curl https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml | gsutil cp - gs://${MYBUCKET}/haarcascade_frontalface_default.xml

You will be using the set of images you uploaded into the imgs directory in your GCS bucket as input to your Feature Detector. You must include the path to that directory as the second argument of your job-submission command.

Submit your job to Cloud Dataproc:

gcloud dataproc jobs submit spark \
--cluster ${MYCLUSTER} \
--jar target/scala-2.10/feature_detector-assembly-1.0.jar -- \
gs://${MYBUCKET}/haarcascade_frontalface_default.xml \
gs://${MYBUCKET}/imgs/ \
gs://${MYBUCKET}/out/

You can supply other image files by adding the images to the GCS bucket specified in the second argument.

After submitting the job, the output images will appear in the out folder in the GCS bucket.

gsutil ls -lR gs://${MYBUCKET}

If you want to experiment, you can make edits to the FeatureDetector code then rerun sbt assembly and the gcloud dataproc jobs submit command.

Shut down the cluster using the Cloud Shell command line:

gcloud dataproc clusters delete ${MYCLUSTER}

Delete the bucket you created for this codelab, including all of the files within it:

gsutil rm "gs://${MYBUCKET}/**"
gsutil rb gs://${MYBUCKET}

This codelab created a directory in your Cloud Shell home directory called cloud-dataproc. Remove that directory:

cd
rm -rf cloud-dataproc

Within approximately 30 minutes after you close your Cloud Shell session, other files that you installed, such as scala and sbt, will be cleaned up. End your session now:

exit

You have learned how to create a Cloud Dataproc cluster, write a Spark program that outlines faces in an image using OpenCV, build the job using sbt, submit the job to your Cloud Dataproc cluster, and shut down your cluster using gcloud!

Learn More

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.