Creating Pyspark ML model pipeline
Objective:
This post outlines the end to end Data+ML model pipeline creation process in Pyspark environment. It provides a brief comparison of pyspark over the ubiquitous pandas libraries(atleast in python circles). It then moves directly to a code example pipeline, which starts with training data, and results in prediction dataframe.
Premise:
Sources and sinks can be databases, files, applications, devices and products which generate data or have data of some kind. There are two types of data, one that is structured and another is unstructured. With pyspark it is possible to read in both types of data as RDDs and process it with the map-filter-reduce commands. I had written how Pyspark can be leveraged to ingest the AirBnB dataset, into a PostgreSQL database served locally / online (attach link here) from your machine. I had only looked at the data ingestion, and the data analysis capability provided by the Pyspark’s SQL functions or spark’s SQL module.
Regular Python / Javascript code is sufficient to move volumes of data through servers, and get insights. Pandas, Dask are notable libraries that support data activities. Pyspark shines is in Creating distributed data pipelines,
1) Pipelines that includes the ML models in the middle.
2) Data can be distributed across multiple nodes.
3) RAM and Processor can be consumed from multiple nodes.
We will see how Pyspark leverage these capabilities to build pipelines.
RDDs and (Pyspark + SQL) Dataframe:
Pyspark and the underlying HDFS / HIVE engines abstract away the distributed nature of the pipeline. As a user what we see is a Dataframe or Resilient Distributed Data(RDD) type. To us, Data Engineers, Scientists and Analysts these are python objects that simply contain data, and has interesting set of methods to manipulate the data. Under the hood, if you are running a cluster and your data size is multiples of GB, then pyspark is doing a lot of heavy lifting.
Pyspark dataframe object provides multiple ways to use SQL commands on the dataframe instances. All the SQL commands can be used in chained fashion as shown below.
ordersView.select(col(“order_item_order_id”),col(“order_subtotal”).alias(‘subtotal’)).show()
or the spark’s SQL module can be leveraged to query the pyspark dataframe after creating views in the execution environment.
spark.sql(“SELECT * FROM ordersView LIMIT 5”).show()
That lays the foundation of how pyspark is closed integrated with relational databases. Now for the the Detour….
Do I need MultiNode cluster and Hadoop/ Hive knowledge to practice Pyspark?
Nope, it is not mandatory to work at the pyspark API level. Having the basic understanding will be very helpful, and let you to appreciate the work of Big Data Engineers. Let me explain using something that we are familiar.
Pandas is all time favorite for anybody working with Data. Pandas can easily be used to create pipelines from Database to Database or from external file (csv, text, etc) to Database. I had written about it here, with example. All happens in python and your local hard-disk itself.
Spark on the contrary works on a distributed computer. Which simply means that the data and processing happens on multiple computers inside a network, like your office LAN. The processor, hard-disk and the RAM are all distributed. Spark framework helps in controlling it. Spark interfaces with Hive Engine and works with HDFS file system. That subject is for the Big Data engineers. Data engineers/scientists work a level above in the Big Data stack, through the Python / Scala interfaces called the APIs. Again re-iterating that, it is not necessary to learn Big Data tech implementation concepts to use Pyspark. All you need is a i3/2GB ram linux machine.
Pyspark’s Ml-lib makes things very Intuitive:
Let me start at the deep end of “pipelines”. Below is a pipeline command that can moves the raw text data through multiple functions and finally provides the prediction results for the test data. If you are new to pyspark, then it can be overwhelming, just bear with the post and continue reading. I am purposefully providing the overall picture about how Ml-lib / Pyspark works together, then connects with data-sources. That is the only way to show the possibility that Pyspark opens up. Lets begin.
i) Multistage Pipeline:
pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, count_vec, idf, clean_up])
The individual part of the pipelines are Function that are configured as below. Do you observe the pattern there. Each function has Inputcol, Outputcol. Finally the Vector assembler has the “features” and “labels”
ii) Pipeline Stages are Functions:
tokenizer = Tokenizer(inputCol = ‘text’, outputCol = ‘token_text’)
stop_remove = StopWordsRemover(inputCol = ‘token_text’, outputCol = ‘stop_token’)
count_vec = CountVectorizer(inputCol = ‘stop_token’, outputCol = ‘c_vec’)
idf = IDF(inputCol = ‘c_vec’, outputCol = ‘tf_idf’)
ham_spam_to_numeric = StringIndexer(inputCol = ‘class’, outputCol = ‘label’)
clean_up = VectorAssembler(inputCols = [‘tf_idf’, ‘length’], outputCol = ‘features’)
That pipeline is used for getting the data ready to ingest into the NaiveBayes classifier. The initialisation signature for NB looks like this
iii) ML model with default arguments:
nb = NaiveBayes(
*,
featuresCol: str = ‘features’,
labelCol: str = ‘label’,
predictionCol: str = ‘prediction’,
probabilityCol: str = ‘probability’,
rawPredictionCol: str = ‘rawPrediction’,
smoothing: float = 1.0,
modelType: str = ‘multinomial’,
thresholds: Optional[List[float]] = None,
weightCol: Optional[str] = None,
)
Notice the init function has all the positional arguments coded with default parameters. If you review the pipeline commands, you will see the outputCol will contain the ‘features’, ‘label’. Notice the output of the VectorAssembler, it contains features. The StringIndexer contains the ‘label’.
iv) Prediction on the Test Data:
All that is required now is to execute the following commands
```
pipeModel = pipeline.fit(train_data)
transformedData = pipeModel.transform(train_data)model = nb.fit(transformedData)
predictionData = model.transform(test_data)
```
The model has been trained, and the prediction output has been generated with the test_data.The predictionData is again a Pyspark Dataframe
1) Which can be queried with SQL commands, and the results sent to API end point.
2) Can be entirely written as a table to a database.
3) Can be written out to a csv/parquet/json/avro or any other format
That is the entire ML flow:
The pipeline above takes in a data from a datasource like Database/ file and converts to Pyspark Dataframe, moves it through the pipeline including the model generation, which outputs a Pyspark Dataframe which can written out to any Database/ file.
The other post here shows how Amazon Research Analyst dataset (WIP) can be ingested into pyspark enviroment, data analysed, encoded and then ML pipeline is built. Then the prediction result is written into the database. The dataset is taken from the Kaggle, located in this link.