Real-time text classification with Dataflow and PubSub

Deploy ML model at scale with Apache Beam Python streaming pipelines

Bouthaina Abichou
4 min readJan 9, 2021

In this post, I will cover how to deploy a machine learning model through a scalable processing pipeline of unbounded streaming data. I will use the Dataflow service that runs jobs written with the Apache Beam programming model. Text data streams are collected through a subscription to the Google Pub/Sub messaging service and results are pushed to Bigquery as a data warehouse. Machine learning models are stored within a Google Storage bucket. The use case used in this project is the spam/ham label detection for SMS messages.

Train text classification model

The data set used here is from Kaggle. It is a collection of over 5000 SMS messages in English labeled according being ham (legitimate) or spam. To train the classification model that predicts the spam messages, two main Python libraries are used: the NLTK text processing library and the Scikit-learn package for machine learning tools.

The following script is used to perform these tasks:

  • First, 15 % of raw SMS messages are put aside for real-time processing
  • For remaining data, the NLTK library is used for text processing: lower case, remove non-alphabetic characters and stemming non stop-words.
  • Then, word embedding step is done through the bag-of-words method and the resulting embedding model is saved.
  • Finally, the Random Forest Classifier was retained for classification. This training pipeline gives an accuracy score of 0,97 and no false positives (precision = 1). The trained classification model is also saved as a pickle file.

Pipeline architecture

As illustrated in the figure below, the streaming pipeleine is built through two main steps:

  • Continuous publishing of messages to a Pub/Sub topic. This step is handled by the Publisher.py script and uses the unseen SMS data csv file saved within a Google Cloud Storage bucket.
  • A Pub/Sub-to-BigQuery pipeline that process text data stream and predict spam/ham label. Processing jobs in Pipeline.py are written using the Apache Beam SDK and the Dataflow runner. ML models generated from the training step are used through the same above mentioned GS bucket.
Streaming pipeline architecture

These Python scripts are runned from two Gougle Cloud Shell terminal windows. In such a way there is no need to generate credential keys to access the Google cloud services APIs.

Push data to Pub/Sub

In order to produce a text data stream, these steps are required:

  • Create a storage bucket and upload the unseen data csv file to it
  • Cretate a Pub/Sub topic
  • In the Publisher.py script read the data file within a pandas data frame. Then, in an infinite loop, push to the Pub/Sub topic a randomly chosen message from the dataframe and wait one, two or three seconds. Pushed data must be a bytestring.
  • Use python3 command from the project root within Google cloud Shell terminal to run this script until shutting it off using the Ctrl+c command. Required Python librairies should be checked or installed on the Cloud Shell VM.

Build the Apache Beam pipeline

The pipeline transformations are defined within the main run() function. The first step consists of ingesting streaming SMS messages through the Apache Beam standard I/O pattern for reading from a Pub/Sub topic. Then, a beam.Map transformation is applied to the text_process() function. This method applies a simple one-to-one mapping function over each element in the collection issued from the precedent step. It applies for each raw SMS message, an utf-8 decoding and the same text processing tasks evoked in the training script. The processing timestamp is added as a dictionary element for the output.

For the embedding and classification transformation, the beam.ParDo method is chosen. It is a generic transform for parallel processing and can produce multiple outputs for each input collection element. In order to avoid multiple expensive initialisations of ML models, and since DoFn.Setup() is executed once and not for each bundle, then models importing from the storage bucket and unpickling are executed in this method.

The final transform consists of dumping the labeled SMS data into BigQuery. For this transform a table schema is needed if the default parameter CREATE_IF_NEEDED is allowed. However the BigQuery dataset must already exist.

To run this pipeline, once again, the handy Google Cloud Shell is used. Within the project folder, a requirements.txt file that contains the google-cloud-storage package to be installed on Dataflow workers is also needed.

In order to avoid importing modules needed for each transformation within its processing methods, the --save_main_session pipeline option is set to True. This will pickle the whole main session objects and load them on the Cloud Dataflow workers. There is an exception with the import_model() function that needs local import of the required Python module in order to be pickled.

The steps needed to run the pipeline from the terminal window for the first time are the following:

#Create a virtual environment
python3 -m virtualenv env
source env/bin/activate
#Install required packages
pip3 install apache-beam[gcp]
pip3 install google-cloud-storage
pip3 install nltk
python3 Pipeline.py \
--runner DataFlow \
--project your_project_id \
--temp_location gs://your_bucket/temp \
--staging_location gs://your_bucket/staging \
--streaming \
--region selected_region \
--job_name your_job_name \
--save_main_session \
--requirements_file requirements.txt

During the pipeline execution, move to the Dataflow tasks in the Google Cloud Console in order to check different transformations monitoring features such as: progression time, logs from workers, real-time input/outut flows etc. We can also check if the output results appears in the destination BigQuery table.

Code files used for this project can be cloned from my Github repository.

I hope this post can be helpful for you. Deploying ML in real-time manner and at scale can be easier for data scientists thanks to these serverless services. That can be of a great help for accomplishing end-to-end data science projects.

--

--