Source code for ssp.spark.udf.spacy_ner_udf

from pyspark.sql.functions import udf
import requests
import ast
from pyspark.sql.types import *


# Interesting use case : What if the end point gone for toss and
# Streaming has to recomputed for failed cases ?
from ssp.logger.pretty_print import print_info
from ssp.logger.pretty_print import print_error


[docs]def get_ner(text, url): data = {'text': text} tags = [] try: r = requests.post(url=url, json=data) r.raise_for_status() data = r.json()["res"] data = eval(data) for key, value in zip(data.keys(), data.values()): tags.append((str(key), str(value))) except requests.exceptions.HTTPError as err: print(err) tags = ["URL_ERROR"] return tags
schema = ArrayType(StructType([ StructField("ner", StringType(), False), StructField("word", StringType(), False) ])) # TODO fixed for now, expose/configure the URL through gin config
[docs]def get_ner_udf(is_docker): if is_docker: url = "http://host.docker.internal:30123/text/ner/spacy" return udf(lambda x: get_ner(text=x, url=url), schema) else: url = "http://127.0.0.1:30123/text/ner/spacy" return udf(lambda x: get_ner(text=x, url=url), schema)
if __name__ == "__main__": try: URL = "http://host.docker.internal:30123/text/ner/spacy" print_info(f"Trying URL : {URL} ") # sending get request and saving the response as response object data = get_ner(text="Wow! this is Wednesday night now and here the lazy Mageswaran coding me", url=URL) print(data) except: print_error("Failed!") URL = "http://127.0.0.1:30123/text/ner/spacy" print_info(f"Trying URL : {URL} ") # sending get request and saving the response as response object data = get_ner(text="Wow! this is Wednesday night now and here the lazy Mageswaran coding me", url=URL) print(data)