from pyspark.sql.functions import udf
import requests
import ast
from pyspark.sql.types import *
# Interesting use case : What if the end point gone for toss and
# Streaming has to recomputed for failed cases ?
from ssp.logger.pretty_print import print_info
from ssp.logger.pretty_print import print_error
[docs]def get_ner(text, url):
    data = {'text': text}
    tags = []
    try:
        r = requests.post(url=url, json=data)
        r.raise_for_status()
        data = r.json()["res"]
        data = eval(data)
        for key, value in zip(data.keys(), data.values()):
            tags.append((str(key), str(value)))
    except requests.exceptions.HTTPError as err:
        print(err)
        tags = ["URL_ERROR"]
    return tags 
schema = ArrayType(StructType([
    StructField("ner", StringType(), False),
    StructField("word", StringType(), False)
]))
# TODO fixed for now, expose/configure the URL through gin config
[docs]def get_ner_udf(is_docker):
    if is_docker:
        url = "http://host.docker.internal:30123/text/ner/spacy"
        return udf(lambda x: get_ner(text=x, url=url), schema)
    else:
        url = "http://127.0.0.1:30123/text/ner/spacy"
        return udf(lambda x: get_ner(text=x, url=url), schema) 
if __name__ == "__main__":
    try:
        URL = "http://host.docker.internal:30123/text/ner/spacy"
        print_info(f"Trying URL : {URL} ")
        # sending get request and saving the response as response object
        data = get_ner(text="Wow! this is Wednesday night now and here the lazy Mageswaran coding me", url=URL)
        print(data)
    except:
        print_error("Failed!")
        URL = "http://127.0.0.1:30123/text/ner/spacy"
        print_info(f"Trying URL : {URL} ")
        # sending get request and saving the response as response object
        data = get_ner(text="Wow! this is Wednesday night now and here the lazy Mageswaran coding me", url=URL)
        print(data)