Source code for ssp.spark.streaming.analytics.sentiment_analysis

#!/usr/bin/env python

__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"

import argparse
import gin
from pyspark.sql import SparkSession

from ssp.spark.streaming.common.twitter_streamer_base import TwitterStreamerBase
from ssp.spark.streaming.ml import SentimentSparkModel
# from ssp.customudf.textblob_sentiment import textblob_sentiment_analysis_udf

[docs]@gin.configurable class SentimentAnalysis(TwitterStreamerBase): """ Uses the :ssp.spark.streaming.ml.SentimentSparkModel to classify the stream text :param kafka_bootstrap_servers: :param kafka_topic: Kafka topic to listen for :param checkpoint_dir: Spark Streaming checkpoint directory :param parquet_dir: Parquet directory to read the streamed data :param warehouse_location: Spark warehouse location :param spark_master: Spark Master URL :param is_live_stream: (bool) Use live stream or parquet diretory :param processing_time: (bool) Spark Streaming processing trigger time delay """ def __init__(self, kafka_bootstrap_servers="localhost:9092", kafka_topic="ai_tweets_topic", checkpoint_dir="hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/", parquet_dir="hdfs://localhost:9000/tmp/ssp/data/lake/silver/", warehouse_location="/opt/spark-warehouse/", spark_master="spark://IMCHLT276:7077", is_live_stream=True, processing_time='5 seconds'): TwitterStreamerBase.__init__(self, spark_master=spark_master, checkpoint_dir=checkpoint_dir, warehouse_location=warehouse_location, kafka_bootstrap_servers=kafka_bootstrap_servers, kafka_topic=kafka_topic, processing_time=processing_time) self._spark_master = spark_master self._checkpoint_dir = checkpoint_dir self._parquet_dir = parquet_dir self._warehouse_location = warehouse_location self.spark = self._get_spark() self._model = SentimentSparkModel(spark=self.spark) self._is_live_stream = is_live_stream
[docs] def online_process(self): tweet_stream = self._get_source_stream() return tweet_stream
[docs] def hdfs_process(self): userSchema = self.spark.read.parquet(self._bronze_parquet_dir).schema tweet_stream = self.spark.readStream. \ schema(userSchema).\ format("parquet"). \ option("ignoreChanges", "true"). \ option("failOnDataLoss", "false"). \ load(self._bronze_parquet_dir) return tweet_stream
[docs] def process(self): if self._is_live_stream: tweet_stream = self.online_process() else: tweet_stream = self.hdfs_process() def foreach_batch_function(df, epoch_id): # Transform and write batchDF df = self._model.predict(df).select(["text", "prediction"]) # df = df.withColumn("sentiment", textblob_sentiment_analysis_udf("text")) df.show(50, False) tweet_stream.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()