ssp.posgress


class ssp.posgress.dataset_base.PostgresqlConnection(postgresql_host='localhost', postgresql_port='5432', postgresql_database='sparkstreamingdb', postgresql_user='sparkstreaming', postgresql_password='sparkstreaming')[source]

Bases: object

Postgresql utility class to read,write tables and execute query

Parameters
  • postgresql_host – Postgresql Host address

  • postgresql_port – Postgresql port number

  • postgresql_database – Postgresql database name

  • postgresql_user – Postgresql user name

  • postgresql_password – Postgresql password

get_sqlalchemy_connection()[source]
Returns

Returns postgresql sqlalchemy connection

get_sqlalchemy_session()[source]
get_table(table_name)[source]

Use to get the Postgresql table as Pandas dataframe :param table_name: :return: Pandas DataFrame

get_tables_list(table_schema='public')[source]
Parameters

table_schema – Postgresql schema. Default is public

Returns

List of tables on given table schema

query_to_df(query)[source]
run_query(query)[source]
store_df_as_parquet(df, path, overwrite=False)[source]

Stores the DataFrame as parquet :param df: Pandas DataFrame :param path: Local machine path :return: None

to_posgresql_table(df, table_name, schema='public', if_exists='fail')[source]

Stores the DataFrame as Postgresql table :param df: Pandas Dataframe :param table_name: Name of the table :param if_exists: {‘fail’, ‘replace’, ‘append’}, default ‘fail’

How to behave if the table already exists.

  • fail: Raise a ValueError.

  • replace: Drop the table before inserting new values.

  • append: Insert new values to the existing table.

Returns

class ssp.posgress.dataset_base.PostgresqlDatasetBase(text_column='text', label_output_column='slabel', raw_tweet_table_name_prefix='raw_tweet_dataset', postgresql_host='localhost', postgresql_port='5432', postgresql_database='sparkstreamingdb', postgresql_user='sparkstreaming', postgresql_password='sparkstreaming')[source]

Bases: ssp.posgress.dataset_base.PostgresqlConnection

get_latest_raw_dataset_name(version=0)[source]

Returns the specific version of raw tweet table :param version: (int) Run id/version used while dumping the data using ~ssp.spark.consumer.TwitterDataset :return: (str) name of the table with version

get_processed_datasets(version=0)[source]
get_raw_dump_tables_list()[source]

Returns list of raw data tables dataset dumped by ~ssp.spark.consumer.TwitterDataset :return: List of string Eg: [raw_tweet_dataset_0,raw_tweet_dataset_1]

split_dataset_table(version=0)[source]
ssp.posgress.dataset_base.main(argv)[source]