ssp.spark.streaming.consumer

ssp.spark.streaming.consumer.get_local_spark_master()[source]

SSP modules that handles all data at ingestion level from Twitter stream

class ssp.spark.streaming.consumer.twiteer_stream_consumer.TwitterDataset(kafka_bootstrap_servers='localhost:9092', kafka_topic='mix_tweets_topic', checkpoint_dir='hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/', bronze_parquet_dir='hdfs://localhost:9000/tmp/ssp/data/lake/bronze/', warehouse_location='/opt/spark-warehouse/', spark_master='spark://IMCHLT276:7077', postgresql_host='localhost', postgresql_port='5432', postgresql_database='sparkstreamingdb', postgresql_user='sparkstreaming', postgresql_password='sparkstreaming', raw_tweet_table_name_prefix='raw_tweet_dataset', processing_time='5 seconds')[source]

Bases: ssp.spark.streaming.common.twitter_streamer_base.TwitterStreamerBase

Twitter ingestion class

  • Starts the Spark Structured Streaming against the Kafka topic and dumps the data to HDFS / Postgresql

Parameters
  • kafka_bootstrap_servers – (str) Kafka bootstram server

  • kafka_topic – (str) Kafka topic

  • checkpoint_dir – (str) Checkpoint directory for fault tolerance

  • bronze_parquet_dir – (str) Store path to dump tweet data

  • warehouse_location – Spark warehouse location

  • spark_master – Spark Master URL

  • postgresql_host – Postgresql host

  • postgresql_port – Postgresql port

  • postgresql_database – Postgresql Database

  • postgresql_user – Postgresql user name

  • postgresql_password – Postgresql user password

  • raw_tweet_table_name_prefix – Postgresql table name prefix

  • processing_time – Processing time trigger

check_n_define_table_schema(table_name)[source]
check_n_stop_streaming(topic, query, num_records, raw_tweet_table_name)[source]
dump_into_bronze_lake()[source]
dump_into_postgresql(run_id, num_records=50000)[source]
get_postgresql_connection()[source]
ssp.spark.streaming.consumer.twiteer_stream_consumer.check_n_mk_dirs(path, is_remove=False)[source]
ssp.spark.streaming.consumer.twiteer_stream_consumer.check_table_exists(table_name, schema='public')[source]
ssp.spark.streaming.consumer.twiteer_stream_consumer.filter_possible_ai_tweet(text)[source]
ssp.spark.streaming.consumer.twiteer_stream_consumer.filter_possible_ai_tweet_udf(text)
ssp.spark.streaming.consumer.twiteer_stream_consumer.get_create_table_sql(table_name)[source]
ssp.spark.streaming.consumer.twiteer_stream_consumer.get_raw_dataset(path)[source]

Combines all parquet file as one in given path :param path: Folder path :return:

SSP modules that handles all data at ingestion level from Twitter stream