ssp.spark.streaming.analytics


class ssp.spark.streaming.analytics.sentiment_analysis.SentimentAnalysis(kafka_bootstrap_servers='localhost:9092', kafka_topic='ai_tweets_topic', checkpoint_dir='hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/', parquet_dir='hdfs://localhost:9000/tmp/ssp/data/lake/silver/', warehouse_location='/opt/spark-warehouse/', spark_master='spark://IMCHLT276:7077', is_live_stream=True, processing_time='5 seconds')[source]

Bases: ssp.spark.streaming.common.twitter_streamer_base.TwitterStreamerBase

Uses the :ssp.spark.streaming.ml.SentimentSparkModel to classify the stream text

Parameters
  • kafka_bootstrap_servers

  • kafka_topic – Kafka topic to listen for

  • checkpoint_dir – Spark Streaming checkpoint directory

  • parquet_dir – Parquet directory to read the streamed data

  • warehouse_location – Spark warehouse location

  • spark_master – Spark Master URL

  • is_live_stream – (bool) Use live stream or parquet diretory

  • processing_time – (bool) Spark Streaming processing trigger time delay

hdfs_process()[source]
online_process()[source]
process()[source]
ssp.spark.streaming.analytics.sentiment_analysis_test.test_sentiment_analysis_members()[source]
class ssp.spark.streaming.analytics.trending_hashtags.TrendingHashTags(kafka_bootstrap_servers='localhost:9092', kafka_topic='ai_tweets_topic', checkpoint_dir='hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/', bronze_parquet_dir='hdfs://localhost:9000/tmp/ssp/data/lake/bronze/', warehouse_location='/opt/spark-warehouse/', spark_master='spark://IMCHLT304:7077', postgresql_host='localhost', postgresql_port='5432', postgresql_database='sparkstreamingdb', postgresql_user='sparkstreaming', postgresql_password='sparkstreaming', processing_time='5 seconds', is_live_stream=True)[source]

Bases: ssp.spark.streaming.common.twitter_streamer_base.TwitterStreamerBase

Extracts hash tags and counts the individual occurrence of tags. And then dumps the data into a Postgresql Database table

Parameters
  • kafka_bootstrap_servers – (str) host_url:port

  • kafka_topic – (str) Live stream Kafka topic

  • checkpoint_dir – (str) Spark Streaming checkpoint directory

  • bronze_parquet_dir – (str) Input stream directory path. For local paths prefix it with “file///”

  • warehouse_location – (str) Spark warehouse location

  • spark_master – (str) Spark master url

  • postgresql_host – (str) Postgresql host url

  • postgresql_port – (str) Postgres port

  • postgresql_database – (str) Database name

  • postgresql_user – (str) Postgresql user name

  • postgresql_password – (str) Postgresql user password

  • processing_time – (str) Spark Streaming process interval

  • is_live_stream – (bool) Use live stream or to use streamed directory as input

process()[source]
ssp.spark.streaming.analytics.trending_hashtags.extract_hashtag(text)[source]

Extracts the twitter #hashtag from the text :param text: (str) Twitter text :return: (list) List of hashtags

ssp.spark.streaming.analytics.trending_hashtags.extract_hashtag_udf(text)

Extracts the twitter #hashtag from the text :param text: (str) Twitter text :return: (list) List of hashtags