Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from db.schema import Feedback, Response | |
| from db.setup import db_setup | |
| # Create operation (Ingest data into Firebase with unique user_id check) | |
| def ingest(data: Feedback): | |
| ref = db_setup() # Ensure Firebase is initialized | |
| print(f"Attempting to ingest feedback data from user '{data.user_id}'...") | |
| feedback_data = data.model_dump() | |
| feedback_data["time_stamp"] = feedback_data['time_stamp'].isoformat() | |
| ref.child('feedback').push(feedback_data) | |
| print(f"Feedback data from user '{data.user_id}' pushed to Firebase!") | |
| def save_feedback(data: Feedback): | |
| ref = db_setup() # Ensure Firebase is initialized | |
| print(f"Processing feedback for user '{data.user_id}'...") | |
| # Convert Pydantic model to dictionary | |
| feedback_data = data.model_dump() | |
| feedback_data["time_stamp"] = feedback_data["time_stamp"].isoformat() | |
| # Check if the user already has feedback | |
| existing_feedback = ref.child("feedback").order_by_child("user_id").equal_to(data.user_id).get() | |
| if existing_feedback: | |
| # Update existing feedback | |
| for key in existing_feedback: | |
| ref.child("feedback").child(key).update(feedback_data) | |
| print(f"Feedback updated for user '{data.user_id}'!") | |
| else: | |
| # Insert new feedback | |
| ref.child("feedback").push(feedback_data) | |
| print(f"(Ingestion) New feedback added for user '{data.user_id}'!") | |
| # Read operation (Fetch feedback data from Firebase) | |
| def read(user_id: str): | |
| """Fetch feedback data for a specific user from Firebase.""" | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get() | |
| if feedback_ref: | |
| for key, value in feedback_ref.items(): | |
| return value # Return the first found entry | |
| return None # No progress found | |
| # Update operation (Update feedback data in Firebase) | |
| def update(feedback_id: int, updated_data: Feedback): | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
| if feedback_ref: | |
| # Assuming we're updating the first entry found | |
| for key in feedback_ref: | |
| ref.child('feedback').child(key).update(updated_data.dict()) | |
| print("Feedback data updated in Firebase!") | |
| else: | |
| print("Feedback not found to update!") | |
| # Delete operation (Remove feedback data from Firebase) | |
| def delete(feedback_id: int): | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
| if feedback_ref: | |
| # Assuming we're deleting the first entry found | |
| for key in feedback_ref: | |
| ref.child('feedback').child(key).delete() | |
| print("Feedback data deleted from Firebase!") | |
| else: | |
| print("Feedback not found to delete!") | |
| def reset_feedback_in_db(user_id: str): | |
| """Deletes previous feedback for a user, resetting progress in Firebase.""" | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get() | |
| if feedback_ref: | |
| for key in feedback_ref: | |
| ref.child('feedback').child(key).delete() | |
| print(f"Feedback data for user '{user_id}' reset in Firebase.") | |