| | import pandas as pd |
| | from datasets import Dataset |
| | from transformers import pipeline, GPT2Tokenizer |
| | from sentence_transformers import SentenceTransformer, util |
| |
|
| | |
| | filename = "output_country_details.txt" |
| | retrieval_model_name = 'output/sentence-transformer-finetuned/' |
| | gpt2_model_name = "gpt2" |
| | csv_file_path = "train_dataset.csv" |
| | output_csv_file_path = "updated_train_dataset.csv" |
| | val_csv_file_path = "val_dataset.csv" |
| | output_val_csv_file_path = "updated_val_csv.csv" |
| |
|
| | tokenizer = GPT2Tokenizer.from_pretrained(gpt2_model_name) |
| |
|
| | |
| | try: |
| | retrieval_model = SentenceTransformer(retrieval_model_name) |
| | gpt_model = pipeline("text-generation", model=gpt2_model_name) |
| | print("Models loaded successfully.") |
| | except Exception as e: |
| | print(f"Failed to load models: {e}") |
| |
|
| | def load_and_preprocess_text(filename): |
| | """ |
| | Load and preprocess text data from a file. |
| | |
| | Parameters: |
| | - filename (str): Path to the text file. |
| | |
| | Returns: |
| | - list[str]: A list of preprocessed text segments. |
| | """ |
| | try: |
| | with open(filename, 'r', encoding='utf-8') as file: |
| | segments = [line.strip() for line in file if line.strip()] |
| | print("Text loaded and preprocessed successfully.") |
| | return segments |
| | except Exception as e: |
| | print(f"Failed to load or preprocess text: {e}") |
| | return [] |
| |
|
| | segments = load_and_preprocess_text(filename) |
| |
|
| | def find_relevant_segment(user_query, segments): |
| | """ |
| | Find the most relevant text segment based on a user query. |
| | |
| | Parameters: |
| | - user_query (str): The user's query. |
| | - segments (list[str]): List of text segments to search within. |
| | |
| | Returns: |
| | - str: The most relevant text segment. |
| | """ |
| | try: |
| | query_embedding = retrieval_model.encode(user_query) |
| | segment_embeddings = retrieval_model.encode(segments) |
| | similarities = util.pytorch_cos_sim(query_embedding, segment_embeddings)[0] |
| | best_idx = similarities.argmax() |
| | return segments[best_idx] |
| | except Exception as e: |
| | print(f"Error finding relevant segment: {e}") |
| | return "" |
| |
|
| | def generate_response(question): |
| | """ |
| | Generate a response to a given question by finding a relevant text segment and |
| | using it to generate a more complete answer. |
| | |
| | Parameters: |
| | - question (str): The user's question. |
| | |
| | Returns: |
| | - str: Generated response. |
| | """ |
| | relevant_segment = find_relevant_segment(question, segments) |
| | return generate_response_with_context(question, relevant_segment) |
| |
|
| | def generate_response_with_context(user_query, relevant_segment): |
| | """ |
| | Generate a response based on a user query and a relevant segment. |
| | |
| | Parameters: |
| | - user_query (str): The user's query. |
| | - relevant_segment (str): A relevant fact or detail. |
| | |
| | Returns: |
| | - str: Formatted response incorporating the relevant segment. |
| | """ |
| | try: |
| | prompt = f"Thank you for your question! Here is an additional fact about your topic: {relevant_segment}" |
| | max_tokens = len(tokenizer(prompt)['input_ids']) + 50 |
| | response = gpt_model(prompt, max_length=max_tokens, temperature=0.25)[0]['generated_text'] |
| | return clean_up_response(response, relevant_segment) |
| | except Exception as e: |
| | print(f"Error generating response: {e}") |
| | return "" |
| |
|
| | def clean_up_response(response, segment): |
| | """ |
| | Clean up the generated response to ensure it is tidy and presentable. |
| | |
| | Parameters: |
| | - response (str): The initial response generated by the model. |
| | - segment (str): The segment used to generate the response. |
| | |
| | Returns: |
| | - str: A cleaned and formatted response. |
| | """ |
| | sentences = response.split('.') |
| | cleaned_sentences = [sentence.strip() for sentence in sentences if sentence.strip() and sentence.strip() not in segment] |
| | cleaned_response = '. '.join(cleaned_sentences).strip() |
| | if cleaned_response and not cleaned_response.endswith((".", "!", "?")): |
| | cleaned_response += "." |
| | return cleaned_response |
| |
|
| | def process_dataset(csv_file_path, output_csv_file_path): |
| | """ |
| | Process the dataset by generating responses and evaluating their similarities. |
| | |
| | Parameters: |
| | - csv_file_path (str): Path to the CSV file containing the dataset. |
| | - output_csv_file_path (str): Path where the updated dataset will be saved. |
| | |
| | Prints: |
| | - Path to the saved results and the average similarity score. |
| | """ |
| | df = pd.read_csv(csv_file_path) |
| | dataset = Dataset.from_pandas(df) |
| | updated_dataset = add_model_answers(dataset) |
| | similarities = evaluate_similarity(updated_dataset) |
| | updated_dataset = updated_dataset.add_column("similarity", similarities) |
| | results_df = updated_dataset.to_pandas() |
| | results_df.to_csv(output_csv_file_path, index=False) |
| | average_similarity = sum(similarities) / len(similarities) if similarities else 0 |
| | print(f"Results saved to {output_csv_file_path}") |
| | print(f"Average Similarity Score: {average_similarity:.3f}") |
| |
|
| | def add_model_answers(dataset): |
| | """ |
| | Add generated answers to the dataset. |
| | |
| | Parameters: |
| | - dataset (datasets.Dataset): The Hugging Face dataset object. |
| | |
| | Returns: |
| | - datasets.Dataset: Updated dataset with added answers. |
| | """ |
| | answers = [generate_response(q) for q in dataset['Question']] |
| | dataset = dataset.add_column("Answer", answers) |
| | return dataset |
| |
|
| | def evaluate_similarity(dataset): |
| | """ |
| | Evaluate the similarity of generated answers against ground truth answers. |
| | |
| | Parameters: |
| | - dataset (datasets.Dataset): The dataset containing both answers and ground truths. |
| | |
| | Returns: |
| | - list[float]: List of similarity scores. |
| | """ |
| | similarities = [util.pytorch_cos_sim(retrieval_model.encode(ans), retrieval_model.encode(gt))[0][0].item() |
| | for ans, gt in zip(dataset['Answer'], dataset['GroundTruth'])] |
| | return similarities |
| |
|
| | |
| | process_dataset(csv_file_path, output_csv_file_path) |
| | process_dataset(val_csv_file_path, output_val_csv_file_path) |
| |
|