Source code for ssp.spark.streaming.ml.sentiment_analysis_model

#!/usr/bin/env python

__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"

import os

import argparse
import pyarrow as pa
import gin
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline, PipelineModel
from ssp.logger.pretty_print import print_info
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import NGram, VectorAssembler


# https://github.com/tthustla/setiment_analysis_pyspark/blob/master/Sentiment%20Analysis%20with%20PySpark.ipynb
[docs]@gin.configurable class SentimentSparkModel(object): """ Build text classification model for tweet sentiment classification If HDFS details are given, model will be stored in HDFS :param spark: Sparksession :param spark_master: Spark master URL :param sentiment_dataset_path: Tweeter Kaggle sentiment dataset path :param model_dir: Model sava directory :param hdfs_host: HDFS host url :param hdfs_port: HDFS port """ def __init__(self, spark=None, spark_master="spark://IMCHLT276:7077", sentiment_dataset_path="data/dataset/sentiment140/", model_dir="~/ssp/data/model/sentiment/", hdfs_host=None, hdfs_port=None): self._spark_master = spark_master self._model_dir = os.path.expanduser(model_dir) self._hdfs_host, self._hdfs_port = hdfs_host, hdfs_port if hdfs_host and hdfs_port: self._hdfs_fs = pa.hdfs.connect(hdfs_host, hdfs_port) self._is_local_dir = False if self._hdfs_fs.exists(self._model_dir): self._pre_trained = True print_info(f"Loading model...{self._model_dir}") self._model = PipelineModel.load(self._model_dir) else: self._model = None else: self._is_local_dir = True if os.path.exists(self._model_dir): self._model = PipelineModel.load(self._model_dir) else: self._model = None if spark: self._spark = spark else: self._spark = SparkSession.builder. \ appName("twitter_stream"). \ master(self._spark_master). \ enableHiveSupport(). \ getOrCreate() self._spark.sparkContext.setLogLevel("error") self._pipeline = None self._ai_tweets_topicset_path = "file:///" + os.path.abspath(sentiment_dataset_path) self._ai_tweets_topicset_schema = StructType([ StructField("target", StringType(), False), StructField("id", StringType(), False), StructField("date", StringType(), False), StructField("flag", StringType(), False), StructField("user", StringType(), False), StructField("text", StringType(), False) ]) self._train_df, self._val_df, self._test_df = None, None, None
[docs] def prepare_data(self): df = self._spark.read.csv(self._ai_tweets_topicset_path, schema=self._ai_tweets_topicset_schema) self._train_df, self._val_df, self._test_df = df.randomSplit([0.8, 0.1, 0.1]) print_info("Train data count : {}".format(self._train_df.count())) print_info("Val data count : {}".format(self._val_df.count())) print_info("Test data count : {}".format(self._test_df.count()))
[docs] def build_naive_pipeline(self, input_col="text"): # Tokenize the text tokenizer = Tokenizer(inputCol=input_col, outputCol="words") # Count each word and use the count as its weight cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='tf') # IDF idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) # minDocFreq: remove sparse terms label_string_idx = StringIndexer(inputCol="target", outputCol="label") lr = LogisticRegression(maxIter=100) self._pipeline = Pipeline(stages=[tokenizer, cv, idf, label_string_idx, lr])
[docs] def build_ngrams_wocs(self, inputcol="text", outputcol="target", n=3): tokenizer = [Tokenizer(inputCol=inputcol, outputCol="words")] ngrams = [ NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i)) for i in range(1, n + 1) ] cv = [ CountVectorizer(vocabSize=5460, inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i)) for i in range(1, n + 1) ] idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)] assembler = [VectorAssembler( inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)], outputCol="features" )] label_stringIdx = [StringIndexer(inputCol=outputcol, outputCol="label")] lr = [LogisticRegression(maxIter=100)] return Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + lr)
[docs] def train(self): self.prepare_data() pipeline = self.build_ngrams_wocs() print_info(self._train_df.show()) self._model = pipeline.fit(self._train_df) # TODO self._model.write().overwrite().save(self._model_dir) return self._model
[docs] def evaluate(self, model): # evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") predictions = model.transform(self._val_df) accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count()) print_info("Accuracy : {}".format(accuracy))
[docs] def predict(self, df): predicted_df = self._model.transform(df) return predicted_df