#!/usr/bin/env python
__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"
import argparse
import gin
from pyspark.sql import SparkSession
from ssp.spark.streaming.common.twitter_streamer_base import TwitterStreamerBase
from ssp.spark.streaming.ml import SentimentSparkModel
# from ssp.customudf.textblob_sentiment import textblob_sentiment_analysis_udf
[docs]@gin.configurable
class SentimentAnalysis(TwitterStreamerBase):
    """
    Uses the :ssp.spark.streaming.ml.SentimentSparkModel to classify the stream text
    :param kafka_bootstrap_servers:
    :param kafka_topic: Kafka topic to listen for
    :param checkpoint_dir: Spark Streaming checkpoint directory
    :param parquet_dir: Parquet directory to read the streamed data
    :param warehouse_location: Spark warehouse location
    :param spark_master: Spark Master URL
    :param is_live_stream: (bool) Use live stream or parquet diretory
    :param processing_time: (bool) Spark Streaming processing trigger time delay
    """
    def __init__(self,
                 kafka_bootstrap_servers="localhost:9092",
                 kafka_topic="ai_tweets_topic",
                 checkpoint_dir="hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/",
                 parquet_dir="hdfs://localhost:9000/tmp/ssp/data/lake/silver/",
                 warehouse_location="/opt/spark-warehouse/",
                 spark_master="spark://IMCHLT276:7077",
                 is_live_stream=True,
                 processing_time='5 seconds'):
        TwitterStreamerBase.__init__(self,
                                     spark_master=spark_master,
                                     checkpoint_dir=checkpoint_dir,
                                     warehouse_location=warehouse_location,
                                     kafka_bootstrap_servers=kafka_bootstrap_servers,
                                     kafka_topic=kafka_topic,
                                     processing_time=processing_time)
        self._spark_master = spark_master
        self._checkpoint_dir = checkpoint_dir
        self._parquet_dir = parquet_dir
        self._warehouse_location = warehouse_location
        self.spark = self._get_spark()
        self._model = SentimentSparkModel(spark=self.spark)
        self._is_live_stream = is_live_stream
[docs]    def online_process(self):
        tweet_stream = self._get_source_stream()
        return tweet_stream 
[docs]    def hdfs_process(self):
        userSchema = self.spark.read.parquet(self._bronze_parquet_dir).schema
        tweet_stream = self.spark.readStream. \
            
schema(userSchema).\
            
format("parquet"). \
            
option("ignoreChanges", "true"). \
            
option("failOnDataLoss", "false"). \
            
load(self._bronze_parquet_dir)
        return tweet_stream 
[docs]    def process(self):
        if self._is_live_stream:
            tweet_stream = self.online_process()
        else:
            tweet_stream = self.hdfs_process()
        def foreach_batch_function(df, epoch_id):
            # Transform and write batchDF
            df = self._model.predict(df).select(["text", "prediction"])
            # df = df.withColumn("sentiment", textblob_sentiment_analysis_udf("text"))
            df.show(50, False)
        tweet_stream.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()