Source code for ssp.spark.streaming.common.twitter_streamer_base

#!/usr/bin/env python

__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"

from pyspark.sql import SparkSession

import re

from ssp.logger.pretty_print import print_error, print_info
from ssp.spark.streaming.common.streamer_base import StreamerBase
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col, isnull
from pyspark.sql.functions import sha2, udf
from pyspark.sql.functions import from_json, regexp_replace

[docs]def pick_text(text, rtext, etext): ret = "" if etext: ret = etext elif rtext: ret = rtext elif text: ret = text else: ret = "" return re.sub("\n|\r", "", ret).strip()
pick_text_udf = udf(pick_text, StringType())
[docs]class TwitterStreamerBase(StreamerBase): """ :param spark_master: :param checkpoint_dir: :param warehouse_location: :param kafka_bootstrap_servers: :param kafka_topic: :param processing_time: """ def __init__(self, spark_master, checkpoint_dir, warehouse_location, kafka_bootstrap_servers, kafka_topic, processing_time='5 seconds'): StreamerBase.__init__(self, spark_master=spark_master, checkpoint_dir=checkpoint_dir, warehouse_location=warehouse_location, kafka_bootstrap_servers=kafka_bootstrap_servers, kafka_topic=kafka_topic, processing_time=processing_time) @staticmethod def _get_schema(): # define the schema to extract the data we are interested # https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object urls = ArrayType(StructType(). \ add("expanded_url", StringType(), True)) media = ArrayType(StructType(). \ add("media_url", StringType(), True). \ add("media_url_https", StringType(), True)) # Tweet -> Entities{} -> Urls[] -> Media[] entities = StructType(). \ add("urls", urls, True). \ add("media", media) schema = StructType(). \ add('id_str', StringType(), False). \ add('created_at', StringType(), False). \ add('source', StringType(), False). \ add('text', StringType(), False). \ add('extended_tweet', StructType().add("full_text", StringType(), True), True). \ add('entities', entities, False). \ add('retweeted_status', StructType().add('extended_tweet', StructType().\ add("full_text", StringType(), True), True).\ add('user', StructType().add('description', StringType())), True). \ add('geo', StringType(), True). \ add('retweet_count', IntegerType(), True) return schema def _get_source_stream(self, kafka_topic="mix_tweets_topic"): print_info("\n\n------------------------------------------------------------------------------------------\n\n") print_info(f"\t\t\t Kafka topis is {kafka_topic}") print_info("\n\n------------------------------------------------------------------------------------------\n\n") spark = self._get_spark() # read the tweets from kafka topic tweet_stream = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", self._kafka_bootstrap_servers) \ .option("subscribe", kafka_topic) \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .load() tweet_stream.printSchema() # converts the incoming data to string # parses the data with inbuild Json parser # extract the data as per our schema # cleans the html tags # extracts the `text` from the tweets # creates hash column # filer out null id columns # watermark : how late data can arrive and get considered for aggregation # processing time : how often to emt update, generally handled at writestream side tweet_df = tweet_stream. \ selectExpr("cast (value as STRING)"). \ select(from_json("value", TwitterStreamerBase._get_schema()). alias("tweet")). \ select(col("tweet.id_str"), col("tweet.created_at"), col("tweet.source"), col("tweet.text"), col("tweet.extended_tweet.full_text").alias("etext"), col("tweet.retweeted_status.extended_tweet.full_text").alias("rtext"), col("tweet.entities.urls.expanded_url"), col("tweet.entities.media.media_url_https")). \ withColumn("source", regexp_replace("source", "<[^>]*>", "")). \ withColumn("text", pick_text_udf(col("text"), col("rtext"), col("etext"))). \ withColumn("hash", sha2("text", 256)). \ drop("rtext", "etext"). \ where(~isnull(col("id_str"))) # TODO https://stackoverflow.com/questions/45474270/how-to-expire-state-of-dropduplicates-in-structured-streaming-to-avoid-oom # withWatermark("timestamp", "10 minutes"). \ # dropDuplicates("id_str") return tweet_df