Source code for ssp.kafka.producer.twitter_producer

#!/usr/bin/env python

__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"

import re
import gin
import json
import threading

from tweepy.auth import OAuthHandler
from tweepy.streaming import Stream
from tweepy.streaming import StreamListener

# import socket
from kafka import KafkaProducer

from ssp.utils.ai_key_words import AIKeyWords
from ssp.logger.pretty_print import print_error, print_info

[docs]def pick_text(text, rtext, etext): """ Twitter Json data has three level of text. This function picks what is available in the order etext > rtext > text :param text: Plain text at top level of the Json with stipped content and an URL :param rtext: Retweeted full text :param etext: Extended retweeted full text :return: """ ret = "" if etext: ret = etext elif rtext: ret = rtext elif text: ret = text else: ret = "" return re.sub("\n|\r", "", ret).strip()
[docs]class TweetsListener(StreamListener): """ Tweepy StreamListener. Reference: http://docs.tweepy.org/en/latest/streaming_how_to.html :param kafka_addr: (str) Kafka host address <host_url:port> :param topic: (str) Kafka topic :param is_ai: (bool) Used to differentiate AI tweets wuth green color and red for other category tweets """ def __init__(self, kafka_addr='localhost:9092', topic='ai_tweets_topic', is_ai=False): StreamListener.__init__(self) # Kafka settings self._kafka_producer = KafkaProducer(bootstrap_servers=kafka_addr) self._kafka_topic = topic self._is_ai = is_ai
[docs] def on_data(self, data): """ Gets triggered by the Twitter stream API :param data: Tweet Json data :return: dumps the data into Kafka topic """ data_dict = json.loads(data) # Debug info if "text" in data_dict.keys(): text = data_dict["text"] else: text = None if "extended_tweet" in data_dict.keys(): etext = data_dict["extended_tweet"]["full_text"] else: etext = None if "retweeted_status" in data_dict.keys(): if "extended_tweet" in data_dict["retweeted_status"].keys(): rtext = data_dict["retweeted_status"]["extended_tweet"]["full_text"] else: rtext = None else: rtext = None text = pick_text(text=text, rtext=rtext, etext=etext) if self._is_ai: print_info(text) else: print_error(text) # with open("/tmp/tweets/{}.json".format(json.loads(data)["id_str"]), "wt", encoding='utf-8') as file: # file.write(data) # this is where the data is dumped into the Kafka topic self._kafka_producer.send(self._kafka_topic, data.encode('utf-8')).get(timeout=10) return True
[docs] def if_error(self, status): print(status) return True
[docs]@gin.configurable class TwitterProducer(object): """ Twitter data ingestion. Gets the twitter stream data and dumps the data into Kafka topic(s). :param twitter_consumer_key: (str) Twitter Consumer Key :param twitter_consumer_secret: (str) Twitter Consumer secret :param twitter_access_token: (str) Twitter Access token :param twitter_access_secret: (str) Twitter Access secret :param kafka_address: (str) Kafka host address <host_url:port> :param kafka_topic_1: (str) Tweet stream Kafka topic defaults to use :func:`~ssp.utils.AIKeyWords.POSITIVE` :param kafka_topic_2: (str) Tweet stream Kafka topic :param topic_2_filter_words: (list) Filter words to be used for second stream """ def __init__(self, twitter_consumer_key=None, twitter_consumer_secret=None, twitter_access_token=None, twitter_access_secret=None, kafka_address='localhost:9092', kafka_topic_1='ai_tweets_topic', kafka_topic_2='mix_tweets_topic', topic_2_filter_words=None): self._twitter_consumer_key = twitter_consumer_key self._twitter_consumer_secret = twitter_consumer_secret self._twitter_access_token = twitter_access_token self._twitter_access_secret = twitter_access_secret self._kafka_addr = kafka_address self._kafka_topic_1 = kafka_topic_1 self._kafka_topic_2 = kafka_topic_2 self._topic_2_filter_words = topic_2_filter_words def _twitter_kafka_stream(self, kafka_topic, keywords, is_ai=False): """ :param kafka_topic: :param keywords: :param is_ai: :return: """ auth = OAuthHandler(self._twitter_consumer_key, self._twitter_consumer_secret) auth.set_access_token(self._twitter_access_token, self._twitter_access_secret) print_info("\n\n---------------------------------------------------------------------------------\n\n") print_info(f"Kafka topic : {kafka_topic}") print_info(f"Twitter Keywords : {keywords}") print_info("\n\n---------------------------------------------------------------------------------\n\n") while True: try: twitter_stream = Stream(auth, TweetsListener(kafka_addr=self._kafka_addr, topic=kafka_topic, is_ai=is_ai)) # https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter # https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters twitter_stream.filter(track=keywords, languages=["en"]) except Exception as e: print("Error: Restarting the twitter stream")
[docs] def run(self, stream="both"): """ Starts two Kafka producers :return: None """ if self._topic_2_filter_words is None: self._topic_2_filter_words = AIKeyWords.ALL.split("|") if stream == "topic1": ai_stream = threading.Thread(target=self._twitter_kafka_stream, args=(self._kafka_topic_1, AIKeyWords.POSITIVE.split("|"), True,)) ai_stream.setDaemon(True) ai_stream.start() ai_stream.join() elif stream == "topic2": non_ai_stream = threading.Thread(target=self._twitter_kafka_stream, args=(self._kafka_topic_2, self._topic_2_filter_words,)) non_ai_stream.setDaemon(True) non_ai_stream.start() non_ai_stream.join() else: ai_stream = threading.Thread(target=self._twitter_kafka_stream, args=(self._kafka_topic_1, AIKeyWords.POSITIVE.split("|"), True,)) non_ai_stream = threading.Thread(target=self._twitter_kafka_stream, args=(self._kafka_topic_2, self._topic_2_filter_words,)) ai_stream.setDaemon(True) non_ai_stream.setDaemon(True) ai_stream.start() non_ai_stream.start() ai_stream.join() non_ai_stream.join()