Dataflow is a unified programming model and a managed service for developing and executing a wide range of data processing patterns including ETL, batch computation, and continuous computation. Because Dataflow is a managed service, it can allocate resources on demand to minimize latency while maintaining high utilization efficiency.

The Dataflow model combines batch and stream processing so developers don't have to make tradeoffs between correctness, cost, and processing time. In this codelab, you'll learn how to run a Dataflow pipeline that counts the occurrences of unique words in a text file.

This tutorial is adapted from https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

What you'll learn

What you'll need

How will you use use this tutorial?

Read it through only Read it and complete the exercises

How would you rate your experience with using Google Cloud Platform services?

Novice Intermediate Proficient

Codelab-at-a-conference setup

The instructor will be sharing with you temporary accounts with existing projects that are already setup so you do not need to worry about enabling billing or any cost associated with running this codelab. Note that all these accounts will be disabled soon after the codelab is over.

Once you have received a temporary username / password to login from the instructor, log into Google Cloud Platform Console: https://console.cloud.google.com/.

Here's what you should see once logged in :

Note the project ID you were assigned ( "codelab-test003" in the screenshot above). It will be referred to later in this codelab as PROJECT_ID.

Enable the APIs

Click on the menu icon in the top left of the screen.

Select API Manager from the drop down.

Search for "Google Compute Engine" in the search box. Click on "Google Compute Engine API" in the results list that appears.

On the Google Compute Engine page click Enable

Once it has enabled click the arrow to go back.

Now search for the following APIs and enable them as well:

In the Google Cloud Platform Console, click the Menu icon on the top left of the screen:

Scroll down and select Cloud Storage in the Storage subsection:

You should now see the Cloud Storage Browser, and assuming you are using a project that does not currently have any Cloud Storage buckets, you will see a dialog box inviting you to create a new bucket:

Press the Create bucket button to create one:

Enter a name for your bucket. As the dialog box notes, bucket names must be unique across all of Cloud Storage. So if you choose an obvious name, such as "test", you will probably find that someone else has already created a bucket with that name, and will receive an error.

There are also some rules regarding what characters are allowed in bucket names. If you start and end your bucket name with a letter or number, and only use dashes in the middle, then you'll be fine. If you try to use special characters, or try to start or end your bucket name with something other than a letter or number, the dialog box will remind you of the rules.

Enter a unique name for your bucket and press Create. If you choose something that's already in use, you will see the error message shown above. When you have successfully created a bucket, you will be taken to your new, empty, bucket in the browser:

The bucket name you see will, of course, be different, since they must be unique across all projects.

You will do all of the work from the Google Cloud Shell, a command line environment running in the Cloud. This Debian-based virtual machine is loaded with all the development tools you'll need (gcloud, git and others) and offers a persistent 5GB home directory. Open the Google Cloud Shell by clicking on the icon on the top right of the screen:

After Cloud Shell launches, let's get started by creating a Maven project containing the Cloud Dataflow SDK for Java.

Run the mvn archetype:generate command in your shell as follows:

  mvn archetype:generate \
     -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
     -DarchetypeGroupId=com.google.cloud.dataflow \
     -DarchetypeVersion=1.9.0 \
     -DgroupId=com.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -DinteractiveMode=false
     -Dpackage=com.example

After running the command, you should see a new directory called first-dataflow under your current directory. first-dataflow contains a Maven project that includes the Cloud Dataflow SDK for Java and example pipelines.

Change to the first-dataflow/ directory.

 cd first-dataflow

We're going to run a pipeline called WordCount, which reads text, tokenizes the text lines into individual words, and performs a frequency count on each of those words. First we'll run the pipeline, and while it's running we'll take a look at what's happening in each step.

Start the pipeline by running the command mvn compile exec:java in your shell or terminal window. For the --project argument, you'll need to specify the Project ID for the Cloud Platform project that you created. For the --stagingLocation and --output arguments, you'll need to specify the name of the Cloud Storage bucket you created as part of the path.

For example, if your Cloud Platform Project ID is my-cloud-project and your Cloud Storage bucket name is my-wordcount-storage-bucket, enter the following command to run the WordCount pipeline:

 mvn compile exec:java \
      -Dexec.mainClass=com.example.WordCount \
      -Dexec.args="--project=my-cloud-project \
      --stagingLocation=gs://my-wordcount-storage-bucket/staging/ \
      --output=gs://my-wordcount-storage-bucket/output \
      --runner=BlockingDataflowPipelineRunner"

While the job is running, let's find the job in the job list.

Open the Cloud Dataflow Monitoring UI in the Google Cloud Platform Console. You should see your wordcount job with a status of Running:

Now, let's look at the pipeline parameters. Start by clicking on the name of your job:

When you select a job, you can view the execution graph. A pipeline's execution graph represents each transform in the pipeline as a box that contains the transform name and some status information. You can click on the carat in the top right corner of each step to see more details:

Let's see how the pipeline transforms the data at each step:

  /**
   * A PTransform that converts a PCollection containing lines of text 
   * into a PCollection of formatted word counts.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

We'll take a look at the resulting output from the pipeline in a few minutes.

Now take a look at the Summary page to the right of the graph, which includes pipeline parameters that we included in the mvn compile exec:java command.

You can also see Custom counters for the pipeline, which in this case shows how many empty lines have been encountered so far during execution. You can add new counters to your pipeline in order to track application-specific metrics.

You can click the Job Log tab to view the specific error messages. You filter the messages that appear in the Job Log tab by using the Minimum Severity drop-down menu.

You can use the Worker Logs button in the logs tab to view worker logs for the Compute Engine instances that run your pipeline. Worker Logs consist of log lines generated by your code and the Dataflow generated code running it.

If you are trying to debug a failure in the pipeline, oftentimes there will be additional logging in the Worker Logs that helps solve the problem. Keep in mind that these logs are aggregated across all workers, and can be filtered and searched.

In the next step, we'll check that your job succeeded.

Open the Cloud Dataflow Monitoring UI in the Google Cloud Platform Console.

You should see your wordcount job with a status of Running at first, and then Succeeded:

The job will take approximately 3-4 minutes to run.

Remember when you ran the pipeline and specified an output bucket? Let's take a look at the result (because don't you want to see how many times each word in King Lear occurred?!). Navigate back to the Cloud Storage Browser in the Google Cloud Platform Console. In your bucket, you should see the output files and staging files that your job created:

You can shut down your resources from the Google Cloud Platform Console.

Open the Cloud Storage browser in the Google Cloud Platform Console.

Select the checkbox next to the bucket that you created.

Click DELETE to permanently delete the bucket and its contents.

You learned how to create a Maven project with the Cloud Dataflow SDK, run an example pipeline using the Google Cloud Platform Console, and delete the associated Cloud Storage bucket and its contents.

Learn More

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.