Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import requests | |
| import isort | |
| import black | |
| import flair | |
| import time | |
| from bs4 import BeautifulSoup | |
| import re | |
| import numpy as np | |
| import os | |
| from flair.data import Sentence | |
| from flair.models import SequenceTagger | |
| from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline | |
| import string | |
| import textwrap | |
| import tweepy | |
| import gradio as gr | |
| URL = "https://www.formula1.com/content/fom-website/en/latest/all.xml" | |
| api_key = os.environ['api_key'] | |
| secret_api_key = os.environ['secret_api_key'] | |
| access_token = os.environ['access_token'] | |
| secret_access_token = os.environ['secret_access_token'] | |
| bearer_token = os.environ['bearer_token'] | |
| def get_xml(url): | |
| # xpath is only for formula1 | |
| # use urllib.parse to check for formula1.com website or other news | |
| xml = pd.read_xml(url,xpath='channel/item') | |
| return xml | |
| cols_list = ['title', 'description', 'link', 'creator', 'guid'] | |
| previous_xml = pd.DataFrame(columns=cols_list) | |
| # care taken to only consider results where there are more words not a single word quotes | |
| def extract_quote(string): | |
| # Use the re.findall function to extract the quoted text | |
| results = re.findall(r'[β\"](.*?)[β\"]', string) | |
| quotes = [] | |
| for result in results: | |
| split_result = result.split() | |
| if len(split_result) >3: | |
| quotes.append(result) | |
| return quotes | |
| def get_names(text): | |
| # # load the NER tagger | |
| tagger = SequenceTagger.load('ner') | |
| sentence = Sentence(text) | |
| tagger.predict(sentence) | |
| names = [] | |
| for label in sentence.get_labels('ner'): | |
| if label.value == "PER": | |
| names.append(f"{label.data_point.text}") | |
| # convert to a set to remove some of the repetitions | |
| names = list(set(names)) | |
| return names | |
| def get_text(new_articles_df): | |
| """ | |
| quotes outputs a list of quotes | |
| """ | |
| dfs_dict = {} | |
| for article in new_articles_df.iterrows(): | |
| link = article[1]["guid"] | |
| request = requests.get(link) | |
| soup = BeautifulSoup(request.content, "html.parser") | |
| # class_ below will be different for different websites | |
| s = soup.find("div", class_="col-lg-8 col-xl-7 offset-xl-1 f1-article--content") | |
| lines = s.find_all("p") | |
| text_content = pd.DataFrame(data={"text": []}) | |
| for i, line in enumerate(lines): | |
| df = pd.DataFrame(data={"text": [line.text]}) | |
| text_content = pd.concat([text_content, df], ignore_index=True) | |
| strongs = s.find_all("strong") | |
| strong_content = pd.DataFrame(data={"text": []}) | |
| for i, strong in enumerate(strongs): | |
| if i > 0: | |
| df = pd.DataFrame(data={"text": [strong.text]}) | |
| strong_content = pd.concat([strong_content, df], ignore_index=True) | |
| # df has content | |
| df = text_content[~text_content["text"].isin(strong_content["text"])].reset_index( | |
| drop=True | |
| ) | |
| # df["quote"] = df["text"].apply(lambda row: extract_quote(row)) | |
| # # combine all rows into context | |
| context = "" | |
| for i,row in df.iterrows(): | |
| context += f" {row['text']}" | |
| quotes = extract_quote(context) | |
| # to save some time not computing unnecessary NER | |
| if len(quotes) != 0: | |
| speakers = get_names(context) | |
| else: | |
| speakers = () | |
| dfs_dict[link] = {'context':context, 'quotes':quotes, 'speakers':speakers} | |
| return dfs_dict | |
| def load_speaker_model(): | |
| model_name = f"deepset/xlm-roberta-large-squad2" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForQuestionAnswering.from_pretrained(model_name) | |
| question_answerer = pipeline("question-answering", model=model, tokenizer=tokenizer) | |
| return question_answerer | |
| question_answerer = load_speaker_model() | |
| def remove_punctuations(text): | |
| modified_text = "".join([character for character in text if character not in string.punctuation]) | |
| modified_text = modified_text.lstrip(" ") | |
| modified_text = modified_text.rstrip(" ") | |
| return modified_text | |
| def get_speaker_quotes(dfs_dict, question_answerer): | |
| speaker_quote = [] | |
| for link in dfs_dict: | |
| context = dfs_dict[link]['context'] | |
| quotes = dfs_dict[link]['quotes'] | |
| potential_speakers = dfs_dict[link]['speakers'] | |
| if len(quotes) != 0: | |
| #loop through the list of quotes | |
| for quote in quotes: | |
| # max_seq_len == 384 : https://huggingface.co/deepset/roberta-base-squad2 | |
| full_quote = quote | |
| if len(quote) >380: | |
| quote = quote[:384] | |
| speaker_dict = question_answerer(question=f"Who said '{quote}'?", context=context) | |
| speaker = speaker_dict['answer'] | |
| if len(speaker) >0: | |
| speaker = remove_punctuations(speaker_dict['answer']) | |
| if speaker not in potential_speakers: | |
| speaker = "" | |
| quote = "" | |
| else: | |
| pair = {'speaker':speaker, 'quote': quote, 'source':link} | |
| speaker_quote.append(pair) | |
| return speaker_quote | |
| def post_to_twitter(): | |
| twitter_api_key = api_key | |
| twitter_secret_api_key = secret_api_key | |
| twitter_access_token = access_token | |
| twitter_secret_access_token = secret_access_token | |
| twitter_bearer_token = bearer_token | |
| api = tweepy.Client(bearer_token=twitter_bearer_token, consumer_key=twitter_api_key, | |
| consumer_secret=twitter_secret_api_key, access_token=twitter_access_token, | |
| access_token_secret=twitter_secret_access_token,wait_on_rate_limit=True | |
| ) | |
| #tweet = api.create_tweet(text=post_title, in_reply_to_tweet_id=in_reply_to_tweet_id) | |
| return api | |
| def split_near_space(string, max_length): | |
| # Split the string into lines based on the maximum line width, breaking only at spaces | |
| lines = textwrap.wrap(string, width=max_length,) | |
| return lines | |
| def send_tweets(speaker_quote): | |
| for i, pair in enumerate(speaker_quote): | |
| speaker = pair['speaker'] | |
| quote = pair['quote'] | |
| source = pair['source'] | |
| total_tweet_length = len(speaker) + len(quote) + 10 # 10 is for emojis and #f1 hashtag | |
| tweet_text = f"π£οΈ | {speaker}: '{quote}'" | |
| api = post_to_twitter() | |
| if total_tweet_length < 280: | |
| try: | |
| first_tweet = api.create_tweet(text=tweet_text, ) | |
| first_tweet_id = first_tweet.data['id'] | |
| second_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=first_tweet_id) | |
| except: | |
| continue | |
| else: | |
| quotes_list = split_near_space(quote, 280 - len(speaker) -10) | |
| thread_id = None | |
| try: | |
| for i, quote in enumerate(quotes_list): | |
| tweet_text = f"'...{quote}...'" | |
| if i == 0: | |
| tweet_text = f"π£οΈ | {speaker}: '{quote}...'" | |
| if i ==len(quotes_list) -1: | |
| tweet_text = f"'...{quote}'" | |
| recent_tweet = api.create_tweet(text=tweet_text, in_reply_to_tweet_id=thread_id) | |
| thread_id = recent_tweet.data['id'] | |
| last_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=thread_id) | |
| except: | |
| continue | |
| def check_updates(every=300): | |
| while True: | |
| time.sleep(every) | |
| latest_xml = get_xml(URL) | |
| if ~previous_xml.equals(latest_xml): | |
| print('New articles found') | |
| new_articles_df = latest_xml[~latest_xml["guid"].isin(previous_xml["guid"])] | |
| # loops through new articles and gets the necessary text, quotes and speakers | |
| dfs_dict = get_text(new_articles_df) | |
| speaker_quote = get_speaker_quotes(dfs_dict, question_answerer) | |
| send_tweets(speaker_quote) | |
| else: | |
| print('No New article is found') | |
| demo = gr.Interface(fn=check_updates, inputs="number", outputs="text", analytics_enabled=True) | |
| demo.launch(max_threads=1, show_api=False) | |