Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| from huggingface_hub import upload_file, hf_hub_download, InferenceClient | |
| from flask import Flask, request, jsonify | |
| import time | |
| os.environ["HF_HOME"] = "/tmp/.cache" | |
| os.environ["HF_DATASETS_CACHE"] = "/tmp/.cache" | |
| os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/.cache" | |
| os.makedirs("/tmp/.cache", exist_ok=True) | |
| os.makedirs("/tmp/outputs", exist_ok=True) | |
| embedding_model = SentenceTransformer('paraphrase-mpnet-base-v2') | |
| token = os.getenv("HF_TOKEN") or os.getenv("NEW_PUP_AI_Project") | |
| inference_client = InferenceClient( | |
| model="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| token=token | |
| ) | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DATASET_PATH = os.path.join(BASE_DIR, "dataset.json") | |
| with open(DATASET_PATH, "r") as f: | |
| dataset = json.load(f) | |
| questions = [item["question"] for item in dataset] | |
| answers = [item["answer"] for item in dataset] | |
| question_embeddings = embedding_model.encode(questions, convert_to_tensor=True) | |
| feedback_data = [] | |
| feedback_questions = [] | |
| feedback_embeddings = None | |
| dev_mode = {"enabled": False} | |
| feedback_path = "/tmp/outputs/feedback.json" | |
| COMMENTS_PATH = "/tmp/outputs/Comments.json" | |
| if not os.path.exists(COMMENTS_PATH): | |
| with open(COMMENTS_PATH, "w") as f: | |
| json.dump([], f, indent=4) | |
| try: | |
| hf_token = os.getenv("NEW_PUP_AI_Project") | |
| downloaded_path = hf_hub_download( | |
| repo_id="oceddyyy/University_Inquiries_Feedback", | |
| filename="feedback.json", | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| with open(downloaded_path, "r") as f: | |
| feedback_data = json.load(f) | |
| feedback_questions = [item["question"] for item in feedback_data] | |
| if feedback_questions: | |
| feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) | |
| with open(feedback_path, "w") as f_local: | |
| json.dump(feedback_data, f_local, indent=4) | |
| except Exception as e: | |
| print(f"[Startup] Feedback not loaded from Hugging Face. Using local only. Reason: {e}") | |
| feedback_data = [] | |
| def upload_file_to_hf(local_path, remote_filename): | |
| """Helper to upload any file to Hugging Face dataset repo.""" | |
| hf_token = os.getenv("NEW_PUP_AI_Project") | |
| if not hf_token: | |
| raise ValueError("Hugging Face token not found in environment variables!") | |
| try: | |
| upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=remote_filename, | |
| repo_id="oceddyyy/University_Inquiries_Feedback", | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| print(f"{remote_filename} uploaded to Hugging Face successfully.") | |
| except Exception as e: | |
| print(f"Error uploading {remote_filename} to HF: {e}") | |
| def chatbot_response(query, dev_mode_flag): | |
| query_embedding = embedding_model.encode([query], convert_to_tensor=True) | |
| if feedback_embeddings is not None: | |
| feedback_scores = cosine_similarity(query_embedding.cpu().numpy(), feedback_embeddings.cpu().numpy())[0] | |
| best_idx = int(np.argmax(feedback_scores)) | |
| best_score = feedback_scores[best_idx] | |
| matched_feedback = feedback_data[best_idx] | |
| base_threshold = 0.8 | |
| upvotes = matched_feedback.get("upvotes", 0) | |
| downvotes = matched_feedback.get("downvotes", 0) | |
| adjusted_threshold = base_threshold - (0.01 * upvotes) + (0.01 * downvotes) | |
| dynamic_threshold = min(max(adjusted_threshold, 0.4), 1.0) | |
| if best_score >= dynamic_threshold: | |
| return matched_feedback["response"], "Feedback", 0.0 | |
| similarity_scores = cosine_similarity(query_embedding.cpu().numpy(), question_embeddings.cpu().numpy())[0] | |
| top_k = 3 | |
| top_k_indices = np.argsort(similarity_scores)[-top_k:][::-1] | |
| top_k_items = [dataset[idx] for idx in top_k_indices] | |
| top_k_scores = [similarity_scores[idx] for idx in top_k_indices] | |
| matched_item = top_k_items[0] | |
| matched_a = matched_item.get("answer", "") | |
| matched_source = matched_item.get("source", "PUP Handbook") | |
| best_score = top_k_scores[0] | |
| if dev_mode_flag: | |
| context = "" | |
| for i, item in enumerate(top_k_items): | |
| context += f"Relevant info #{i+1} (score: {top_k_scores[i]:.2f}):\n\"{item.get('answer', '')}\"\n\n" | |
| prompt = ( | |
| f"You are an expert university assistant. " | |
| f"A student asked: \"{query}\"\n" | |
| f"Here are the most relevant handbook information snippets:\n{context}" | |
| f"Using only the information above, answer the student's question in your own words. " | |
| f"If the handbook info is not relevant, say you don't know." | |
| ) | |
| try: | |
| start_time = time.time() | |
| response = "" | |
| if hasattr(inference_client, "chat_completion"): | |
| conversation = [ | |
| {"role": "system", "content": "You are an expert university assistant."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| llm_response = inference_client.chat_completion( | |
| messages=conversation, | |
| model="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| max_tokens=200, | |
| temperature=0.7 | |
| ) | |
| if isinstance(llm_response, dict) and "choices" in llm_response: | |
| response = llm_response["choices"][0]["message"]["content"] | |
| elif hasattr(llm_response, "generated_text"): | |
| response = llm_response.generated_text | |
| else: | |
| llm_response = inference_client.text_generation( | |
| prompt, | |
| max_new_tokens=200, | |
| temperature=0.7 | |
| ) | |
| if isinstance(llm_response, dict) and "generated_text" in llm_response: | |
| response = llm_response["generated_text"] | |
| elif hasattr(llm_response, "generated_text"): | |
| response = llm_response.generated_text | |
| elapsed = time.time() - start_time | |
| if not response.strip() or response.strip() == matched_a.strip(): | |
| if "month" in matched_item and "year" in matched_item: | |
| response = f"As of {matched_item['month']}, {matched_item['year']}, {matched_a}" | |
| else: | |
| response = f"According to 2019 Proposed PUP Handbook, {matched_a}" | |
| return response.strip(), matched_source, elapsed | |
| except Exception as e: | |
| error_msg = f"[ERROR] HF inference failed: {e}" | |
| return f"(UnivAI+++ error: {error_msg})", matched_source, 0.0 | |
| if best_score < 0.4: | |
| response = "Sorry, but the PUP handbook does not contain such information." | |
| else: | |
| if "month" in matched_item and "year" in matched_item: | |
| response = f"As of {matched_item['month']}, {matched_item['year']}, {matched_a}" | |
| else: | |
| response = f"According to 2019 Proposed PUP Handbook, {matched_a}" | |
| return response.strip(), matched_source, 0.0 | |
| def record_feedback(feedback, query, response, comment=None): | |
| """Records user feedback and optional comment.""" | |
| global feedback_embeddings, feedback_questions | |
| matched = False | |
| new_embedding = embedding_model.encode([query], convert_to_tensor=True) | |
| for item in feedback_data: | |
| existing_embedding = embedding_model.encode([item["question"]], convert_to_tensor=True) | |
| similarity = cosine_similarity(existing_embedding.cpu().numpy(), new_embedding.cpu().numpy())[0][0] | |
| if similarity >= 0.8 and item["response"] == response: | |
| matched = True | |
| votes = {"positive": "upvotes", "negative": "downvotes"} | |
| item[votes[feedback]] = item.get(votes[feedback], 0) + 1 | |
| break | |
| if not matched: | |
| entry = { | |
| "question": query, | |
| "response": response, | |
| "feedback": feedback, | |
| "upvotes": 1 if feedback == "positive" else 0, | |
| "downvotes": 1 if feedback == "negative" else 0 | |
| } | |
| feedback_data.append(entry) | |
| with open(feedback_path, "w") as f: | |
| json.dump(feedback_data, f, indent=4) | |
| feedback_questions = [item["question"] for item in feedback_data] | |
| if feedback_questions: | |
| feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) | |
| upload_file_to_hf(feedback_path, "feedback.json") | |
| if comment and comment.strip(): | |
| try: | |
| with open(COMMENTS_PATH, "r") as f: | |
| comments_list = json.load(f) | |
| except json.JSONDecodeError: | |
| comments_list = [] | |
| comment_entry = { | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), | |
| "question": query, | |
| "response": response, | |
| "feedback": feedback, | |
| "comment": comment.strip() | |
| } | |
| comments_list.append(comment_entry) | |
| with open(COMMENTS_PATH, "w") as f: | |
| json.dump(comments_list, f, indent=4) | |
| upload_file_to_hf(COMMENTS_PATH, "Comments.json") | |
| app = Flask(__name__) | |
| def chat(): | |
| data = request.json | |
| query = data.get("query", "") | |
| dev = data.get("dev_mode", False) | |
| dev_mode["enabled"] = dev | |
| response, source, elapsed = chatbot_response(query, dev) | |
| return jsonify({"response": response, "source": source, "response_time": elapsed}) | |
| def feedback(): | |
| data = request.json | |
| query = data.get("query", "") | |
| response = data.get("response", "") | |
| feedback_type = data.get("feedback", "") | |
| comment = data.get("comment", None) | |
| record_feedback(feedback_type, query, response, comment) | |
| return jsonify({"status": "success"}) | |
| def index(): | |
| return "University Inquiries AI Chatbot API. Use POST /api/chat or /api/feedback.", 200 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7861) | |