#!/usr/bin/env python
__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"
import os
import argparse
import pyarrow as pa
import gin
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline, PipelineModel
from ssp.logger.pretty_print import print_info
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import NGram, VectorAssembler
# https://github.com/tthustla/setiment_analysis_pyspark/blob/master/Sentiment%20Analysis%20with%20PySpark.ipynb
[docs]@gin.configurable
class SentimentSparkModel(object):
"""
Build text classification model for tweet sentiment classification
If HDFS details are given, model will be stored in HDFS
:param spark: Sparksession
:param spark_master: Spark master URL
:param sentiment_dataset_path: Tweeter Kaggle sentiment dataset path
:param model_dir: Model sava directory
:param hdfs_host: HDFS host url
:param hdfs_port: HDFS port
"""
def __init__(self,
spark=None,
spark_master="spark://IMCHLT276:7077",
sentiment_dataset_path="data/dataset/sentiment140/",
model_dir="~/ssp/data/model/sentiment/",
hdfs_host=None,
hdfs_port=None):
self._spark_master = spark_master
self._model_dir = os.path.expanduser(model_dir)
self._hdfs_host, self._hdfs_port = hdfs_host, hdfs_port
if hdfs_host and hdfs_port:
self._hdfs_fs = pa.hdfs.connect(hdfs_host, hdfs_port)
self._is_local_dir = False
if self._hdfs_fs.exists(self._model_dir):
self._pre_trained = True
print_info(f"Loading model...{self._model_dir}")
self._model = PipelineModel.load(self._model_dir)
else:
self._model = None
else:
self._is_local_dir = True
if os.path.exists(self._model_dir):
self._model = PipelineModel.load(self._model_dir)
else:
self._model = None
if spark:
self._spark = spark
else:
self._spark = SparkSession.builder. \
appName("twitter_stream"). \
master(self._spark_master). \
enableHiveSupport(). \
getOrCreate()
self._spark.sparkContext.setLogLevel("error")
self._pipeline = None
self._ai_tweets_topicset_path = "file:///" + os.path.abspath(sentiment_dataset_path)
self._ai_tweets_topicset_schema = StructType([
StructField("target", StringType(), False),
StructField("id", StringType(), False),
StructField("date", StringType(), False),
StructField("flag", StringType(), False),
StructField("user", StringType(), False),
StructField("text", StringType(), False)
])
self._train_df, self._val_df, self._test_df = None, None, None
[docs] def prepare_data(self):
df = self._spark.read.csv(self._ai_tweets_topicset_path, schema=self._ai_tweets_topicset_schema)
self._train_df, self._val_df, self._test_df = df.randomSplit([0.8, 0.1, 0.1])
print_info("Train data count : {}".format(self._train_df.count()))
print_info("Val data count : {}".format(self._val_df.count()))
print_info("Test data count : {}".format(self._test_df.count()))
[docs] def build_naive_pipeline(self, input_col="text"):
# Tokenize the text
tokenizer = Tokenizer(inputCol=input_col, outputCol="words")
# Count each word and use the count as its weight
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='tf')
# IDF
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) # minDocFreq: remove sparse terms
label_string_idx = StringIndexer(inputCol="target", outputCol="label")
lr = LogisticRegression(maxIter=100)
self._pipeline = Pipeline(stages=[tokenizer, cv, idf, label_string_idx, lr])
[docs] def build_ngrams_wocs(self, inputcol="text", outputcol="target", n=3):
tokenizer = [Tokenizer(inputCol=inputcol, outputCol="words")]
ngrams = [
NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
for i in range(1, n + 1)
]
cv = [
CountVectorizer(vocabSize=5460, inputCol="{0}_grams".format(i),
outputCol="{0}_tf".format(i))
for i in range(1, n + 1)
]
idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]
assembler = [VectorAssembler(
inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
outputCol="features"
)]
label_stringIdx = [StringIndexer(inputCol=outputcol, outputCol="label")]
lr = [LogisticRegression(maxIter=100)]
return Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + lr)
[docs] def train(self):
self.prepare_data()
pipeline = self.build_ngrams_wocs()
print_info(self._train_df.show())
self._model = pipeline.fit(self._train_df)
# TODO
self._model.write().overwrite().save(self._model_dir)
return self._model
[docs] def evaluate(self, model):
# evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
predictions = model.transform(self._val_df)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print_info("Accuracy : {}".format(accuracy))
[docs] def predict(self, df):
predicted_df = self._model.transform(df)
return predicted_df