Source code for ssp.spark.streaming.consumer.twiteer_stream_consumer
#!/usr/bin/env python
"""
SSP modules that handles all data at ingestion level from Twitter stream
"""
__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"
import os
import shutil
import gin
import time
import pandas as pd
import glob
from tqdm import tqdm
import psycopg2
# import socket
import threading
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from ssp.logger.pretty_print import print_info
from ssp.utils.ai_key_words import AIKeyWords
from ssp.spark.streaming.common.twitter_streamer_base import TwitterStreamerBase
# Twitter Filter Tags
[docs]def get_create_table_sql(table_name):
sql_str = """
CREATE TABLE IF NOT EXISTS {} (
id_str VARCHAR (100) NOT NULL,
created_at VARCHAR (100) NOT NULL,
source VARCHAR (50) NOT NULL,
text VARCHAR (2048) NOT NULL,
expanded_url VARCHAR (1024),
media_url_https VARCHAR (1024),
hash VARCHAR (512));
""".format(table_name)
return sql_str
[docs]def check_table_exists(table_name, schema="public"):
sql_str = """
SELECT to_regclass('{}.{}') as table;
""".format(schema, table_name)
return sql_str
[docs]def check_n_mk_dirs(path, is_remove=False):
if os.path.exists(path):
if is_remove:
shutil.rmtree(path)
else:
os.makedirs(path)
[docs]def get_raw_dataset(path):
"""
Combines all parquet file as one in given path
:param path: Folder path
:return:
"""
all_files = glob.glob(path + "/*.parquet")
files = []
for filename in tqdm(all_files):
df = pd.read_parquet(filename, engine="fastparquet")
files.append(df)
df = pd.concat(files, axis=0, ignore_index=True)
return df
[docs]def filter_possible_ai_tweet(text):
text = text.replace("#", "").replace("@", "")
for tag in AIKeyWords.POSITIVE.split("|"):
if f' {tag.lower()} ' in f' {text.lower()} ':
return 1
return 0
filter_possible_ai_tweet_udf = udf(filter_possible_ai_tweet, IntegerType())
[docs]@gin.configurable
class TwitterDataset(TwitterStreamerBase):
"""
Twitter ingestion class \n
- Starts the Spark Structured Streaming against the Kafka topic and dumps the data to HDFS / Postgresql
:param kafka_bootstrap_servers: (str) Kafka bootstram server
:param kafka_topic: (str) Kafka topic
:param checkpoint_dir: (str) Checkpoint directory for fault tolerance
:param bronze_parquet_dir: (str) Store path to dump tweet data
:param warehouse_location: Spark warehouse location
:param spark_master: Spark Master URL
:param postgresql_host: Postgresql host
:param postgresql_port: Postgresql port
:param postgresql_database: Postgresql Database
:param postgresql_user: Postgresql user name
:param postgresql_password: Postgresql user password
:param raw_tweet_table_name_prefix: Postgresql table name prefix
:param processing_time: Processing time trigger
"""
def __init__(self,
kafka_bootstrap_servers="localhost:9092",
kafka_topic="mix_tweets_topic",
checkpoint_dir="hdfs://localhost:9000/tmp/ssp/data/lake/checkpoint/",
bronze_parquet_dir="hdfs://localhost:9000/tmp/ssp/data/lake/bronze/",
warehouse_location="/opt/spark-warehouse/",
spark_master="spark://IMCHLT276:7077",
postgresql_host="localhost",
postgresql_port="5432",
postgresql_database="sparkstreamingdb",
postgresql_user="sparkstreaming",
postgresql_password="sparkstreaming",
raw_tweet_table_name_prefix="raw_tweet_dataset",
processing_time='5 seconds'):
TwitterStreamerBase.__init__(self,
spark_master=spark_master,
checkpoint_dir=checkpoint_dir,
warehouse_location=warehouse_location,
kafka_bootstrap_servers=kafka_bootstrap_servers,
kafka_topic=kafka_topic,
processing_time=processing_time)
self._spark_master = spark_master
self._kafka_bootstrap_servers = kafka_bootstrap_servers
self._checkpoint_dir = checkpoint_dir
self._bronze_parquet_dir = bronze_parquet_dir
self._warehouse_location = warehouse_location
self._postgresql_host = postgresql_host
self._postgresql_port = postgresql_port
self._postgresql_database = postgresql_database
self._postgresql_user = postgresql_user
self._postgresql_password = postgresql_password
self._broad_cast_raw_tweet_count = self._get_spark().sparkContext.accumulator(0)
self._kafka_topic = kafka_topic
self._kafka_ai_topic = "ai_tweets_topic"
self._raw_tweet_table_name_prefix = raw_tweet_table_name_prefix
[docs] def dump_into_bronze_lake(self):
self.structured_streaming_dump(path=self._bronze_parquet_dir)
[docs] def get_postgresql_connection(self):
return psycopg2.connect(host=self._postgresql_host,
port=self._postgresql_port,
database=self._postgresql_database,
user=self._postgresql_user,
password=self._postgresql_password)
[docs] def check_n_define_table_schema(self, table_name):
conn = self.get_postgresql_connection()
# Load the data
try:
cur = conn.cursor()
# query_to_df the INSERT statement
cur.query_to_df(get_create_table_sql(table_name=table_name))
conn.commit()
cur.close()
except:
print_info("Postgresql Error!")
[docs] def check_n_stop_streaming(self, topic, query, num_records, raw_tweet_table_name):
while (True):
conn = self.get_postgresql_connection()
try:
count = pd.read_sql(f"select count(*) from {raw_tweet_table_name}", conn)["count"][0]
except Exception as e:
count = 0
try:
print_info(query.lastProgress())
except:
pass #print_info("No stream progress")
if count > num_records:
print_info(f"Number of records received so far in topic {topic} is {count}")
query.stop()
break
else:
print_info(f"Number of records received so far in topic {topic} is {count}")
time.sleep(1)
def _dump_into_postgresql_internal(self, run_id, kafka_topic, num_records=25000):
"""
Dumps the live tweet stream into Postgresql DB
:param run_id:
:param kafka_topic:
:param num_records:
:return:
"""
tweet_stream = self._get_source_stream(kafka_topic)
raw_tweet_table_name = self._raw_tweet_table_name_prefix + "_{}".format(run_id)
def postgresql_all_tweets_data_dump(df,
epoch_id,
raw_tweet_table_name):
# DROP TABLE IF EXISTS ssp_raw_tweet_dataset_0 CASCADE;
# Transform and write batchDF
print_info("Raw Tweets...")
df.select(["text"]).show(50, False)
mode = "append"
url = "jdbc:postgresql://{}:{}/{}".format(self._postgresql_host,
self._postgresql_port,
self._postgresql_database)
properties = {"user": self._postgresql_user,
"password": self._postgresql_password,
"driver": "org.postgresql.Driver"}
df.write.jdbc(url=url, table=raw_tweet_table_name, mode=mode, properties=properties)
query = tweet_stream.writeStream.outputMode("append"). \
foreachBatch(lambda df, id :
postgresql_all_tweets_data_dump(df=df,
epoch_id=id,
raw_tweet_table_name=raw_tweet_table_name)).\
trigger(processingTime=self._processing_time). \
start()
monitor_thread = threading.Thread(target=self.check_n_stop_streaming, args=(kafka_topic, query, num_records, raw_tweet_table_name, ))
monitor_thread.setDaemon(True)
monitor_thread.start()
query.awaitTermination()
monitor_thread.join()
[docs] def dump_into_postgresql(self, run_id, num_records=50000):
self._dump_into_postgresql_internal(run_id=run_id, kafka_topic=self._kafka_topic, num_records=num_records // 2)
self._dump_into_postgresql_internal(run_id=run_id, kafka_topic=self._kafka_ai_topic, num_records=num_records)