ssp.kafka.producer


class ssp.kafka.producer.twitter_producer.TweetsListener(kafka_addr='localhost:9092', topic='ai_tweets_topic', is_ai=False)[source]

Bases: tweepy.streaming.StreamListener

Tweepy StreamListener. Reference: http://docs.tweepy.org/en/latest/streaming_how_to.html

Parameters
  • kafka_addr – (str) Kafka host address <host_url:port>

  • topic – (str) Kafka topic

  • is_ai – (bool) Used to differentiate AI tweets wuth green color and red for other category tweets

if_error(status)[source]
on_data(data)[source]

Gets triggered by the Twitter stream API :param data: Tweet Json data :return: dumps the data into Kafka topic

class ssp.kafka.producer.twitter_producer.TwitterProducer(twitter_consumer_key=None, twitter_consumer_secret=None, twitter_access_token=None, twitter_access_secret=None, kafka_address='localhost:9092', kafka_topic_1='ai_tweets_topic', kafka_topic_2='mix_tweets_topic', topic_2_filter_words=None)[source]

Bases: object

Twitter data ingestion. Gets the twitter stream data and dumps the data into Kafka topic(s).

Parameters
  • twitter_consumer_key – (str) Twitter Consumer Key

  • twitter_consumer_secret – (str) Twitter Consumer secret

  • twitter_access_token – (str) Twitter Access token

  • twitter_access_secret – (str) Twitter Access secret

  • kafka_address – (str) Kafka host address <host_url:port>

  • kafka_topic_1 – (str) Tweet stream Kafka topic defaults to use POSITIVE()

  • kafka_topic_2 – (str) Tweet stream Kafka topic

  • topic_2_filter_words – (list) Filter words to be used for second stream

run(stream='both')[source]

Starts two Kafka producers :return: None

ssp.kafka.producer.twitter_producer.pick_text(text, rtext, etext)[source]

Twitter Json data has three level of text. This function picks what is available in the order etext > rtext > text :param text: Plain text at top level of the Json with stipped content and an URL :param rtext: Retweeted full text :param etext: Extended retweeted full text :return: