This lab will cover how to set-up and use Apache Spark and Jupyter notebooks on Cloud Dataproc.

Jupyter notebooks are widely used for exploratory data analysis and building machine learning models as they allow you to interactively run your code and immediately see your results.

However setting up and using Apache Spark and Jupyter Notebooks can be complicated.


Cloud Dataproc makes this fast and easy by allowing you to create a Dataproc Cluster with Apache Spark, Jupyter component and Component Gateway in around 90 seconds.

What you'll learn

In this codelab, you'll learn how to:

The total cost to run this lab on Google Cloud is about $1. Full details on Cloud Dataproc pricing can be found here.

First, open up Cloud Shell by clicking the button in the top right-hand corner of the cloud console:

After the Cloud Shell loads, run the following command to set the project ID from the previous step:

gcloud config set project <project_id>

The project ID can also be found by clicking on your project in the top left of the cloud console:

Next, enable the Dataproc, Compute Engine and BigQuery Storage APIs.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Alternatively this can be done in the Cloud Console. Click on the menu icon in the top left of the screen.

Select API Manager from the drop down.

Click on Enable APIs and Services.

Search for and enable the following APIs:

Create a Google Cloud Storage bucket in the region closest to your data and give it a unique name.

This will be used for the Dataproc cluster.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

You should see the following output

Creating gs://<your-bucket-name>/...

Creating your cluster

Set the env variables for your cluster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Then run this gcloud command to create your cluster with all the necessary components to work with Jupyter on your cluster.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

You should see the following output while your cluster is being created

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

It should take about 90 seconds to create your cluster and once it is ready you will be able to access your cluster from the Dataproc Cloud console UI.

While you are waiting you can carry on reading below to learn more about the flags used in gcloud command.

You should the following output once the cluster is created:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flags used in gcloud dataproc create command

Here is a breakdown of the flags used in the gcloud dataproc create command

--region=${REGION}

Specifies the region and zone of where the cluster will be created. You can see the list of available regions here.

--image-version=1.4

The image version to use in your cluster. You can see the list of available versions here.

--bucket=${BUCKET_NAME}

Specify the Google Cloud Storage bucket you created earlier to use for the cluster. If you do not supply a GCS bucket it will be created for you.

This is also where your notebooks will be saved even if you delete your cluster as the GCS bucket is not deleted.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

The machine types to use for your Dataproc cluster. You can see a list of available machine types here.

By default, 1 master node and 2 worker nodes are created if you do not set the flag --num-workers

--optional-components=ANACONDA,JUPYTER

Setting these values for optional components will install all the necessary libraries for Jupyter and Anaconda (which is required for Jupyter notebooks) on your cluster.

--enable-component-gateway

Enabling Component Gateway creates an App Engine link using Apache Knox and Inverting Proxy which gives easy, secure and authenticated access to the Jupyter and JupyterLab web interfaces meaning you no longer need to create SSH tunnels.

It will also create links for other tools on the cluster including the Yarn Resource manager and Spark History Server which are useful for seeing the performance of your jobs and cluster usage patterns.

Accessing the JupyterLab web interface

Once the cluster is ready you can find the Component Gateway link to the JupyterLab web interface by going to Dataproc Clusters - Cloud console, clicking on the cluster you created and going to the Web Interfaces tab.

You will notice that you have access to Jupyter which is the classic notebook interface or JupyterLab which is described as the next-generation UI for Project Jupyter.

There are a lot of great new UI features in JupyterLab and so if you are new to using notebooks or looking for the latest improvements it is recommended to go with using JupyterLab as it will eventually replace the classic Jupyter interface according to the official docs.

Create a notebook with a Python 3 kernel

From the launcher tab click on the Python 3 notebook icon to create a notebook with a Python 3 kernel (not the PySpark kernel) which allows you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.

Rename the notebook

Right click on the notebook name in the sidebar on the left or the top navigation and rename the notebook to "BigQuery Storage & Spark DataFrames.ipynb"

Run your Spark code in the notebook



In this notebook, you will use the spark-bigquery-connector which is a tool for reading and writing data between BigQuery and Spark making use of the BigQuery Storage API.

The BigQuery Storage API brings significant improvements to accessing data in BigQuery by using a RPC-based protocol. It supports data reads and writes in parallel as well as different serialization formats such as Apache Avro and Apache Arrow. At a high-level, this translates to significantly improved performance, especially on larger data sets.

In the first cell check the Scala version of your cluster so you can include the correct version of the spark-bigquery-connector jar.

Input [1]:

!scala -version

Output [1]:

Create a Spark session and include the spark-bigquery-connector package.

If your Scala version is 2.11 use the following package.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

If your Scala version is 2.12 use the following package.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Input [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Enable repl.eagerEval

This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output.

Input [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)


Read BigQuery table into Spark DataFrame

Create a Spark DataFrame by reading in data from a public BigQuery dataset. This makes use of the spark-bigquery-connector and BigQuery Storage API to load the data into the Spark cluster.

Create a Spark DataFrame and load data from the BigQuery public dataset for Wikipedia pageviews. You will notice that you are not running a query on the data as you are using the spark-bigquery-connector to load the data into Spark where the processing of the data will occur. When this code is run it will not actually load the table as it is a lazy evaluation in Spark and the execution will occur in the next step.

Input [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Output [4]:

Select the required columns and apply a filter using `where()` which is an alias for `filter()`.

When this code is run it triggers a Spark action and the data is read from BigQuery Storage at this point.

Input [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Output [5]:

Group by title and order by page views to see the top pages

Input [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Output [6]:

You can make use of the various plotting libraries that are available in Python to plot the output of your Spark jobs.

Convert Spark DataFrame to Pandas DataFrame

Convert the Spark DataFrame to Pandas DataFrame and set the datehour as the index. This is useful if you want to work with the data directly in Python and plot the data using the many available Python plotting libraries.

Input [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Output [7]:

Plotting Pandas Dataframe

Import the matplotlib library which is required to display the plots in the notebook

Input [8]:

import matplotlib.pyplot as plt


Use the Pandas plot function to create a line chart from the Pandas DataFrame.

Input [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Output [9]:

Check the notebook was saved in GCS

You should now have your first Jupyter notebook up and running on your Dataproc cluster. Give your notebook a name and it will be auto-saved to the GCS bucket used when creating the cluster.

You can check this using this gsutil command in the cloud shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

You should see the following output

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

There might be scenarios where you want the data in memory instead of reading from BigQuery Storage every time.

This job will read the data from BigQuery and push the filter to BigQuery. The aggregation will then be computed in Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

You can modify the job above to include a cache of the table and now the filter on the wiki column will be applied in memory by Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

You can then filter for another wiki language using the cached data instead of reading data from BigQuery storage again and therefore will run much faster.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

You can remove the cache by running

df_wiki_all.unpersist()

The Cloud Dataproc GitHub repo features Jupyter notebooks with common Apache Spark patterns for loading data, saving data, and plotting your data with various Google Cloud Platform products and open-source tools: