# Imports import json import numpy as np import pandas as pd import networkx as nx import difflib import smolagents import openai from langchain_community.utilities.wikipedia import WikipediaAPIWrapper from settings import * from data import * # General Functions def vector_1st_distance(x: list, y: list): """ Calculate the average 1st distance between two vectors. """ if len(x) != len(y): raise ValueError return sum(np.array(x) - np.array(y)) / len(x) def skill_score( skill_profile: dict, # The skill profile that we want to analyze laser_cutting: float = None, wood_working: float = None, wood_cnc: float = None, metal_machining: float = None, metal_cnc: float = None, three_d_printer: float = None, welding: float = None, electronics: float = None, ): """ Calculate the skill score for a given skill profile. Useful for both staff and courses skill profiles. """ x = [] y = [] if laser_cutting is not None: x.append(skill_profile['Laser Cutting']) y.append(laser_cutting) if wood_working is not None: x.append(skill_profile['Wood Working']) y.append(wood_working) if wood_cnc is not None: x.append(skill_profile['Wood CNC']) y.append(wood_cnc) if metal_machining is not None: x.append(skill_profile['Metal Machining']) y.append(metal_machining) if metal_cnc is not None: x.append(skill_profile['Metal CNC']) y.append(metal_cnc) if three_d_printer is not None: x.append(skill_profile['3D Printer']) y.append(three_d_printer) if welding is not None: x.append(skill_profile['Welding']) y.append(welding) if electronics is not None: x.append(skill_profile['Electronics']) y.append(electronics) return vector_1st_distance(x, y) # Staff Functions def all_staff(): """ Return a list of all staff. """ return staff_df["Name"].dropna().tolist() def match_staff_name(name: str): """ Match the staff name to the closest match in the staff list. """ matches = difflib.get_close_matches(name, all_staff(), n=1, cutoff=0.2) return matches[0] if matches else None def all_available_staff(exclude: list): """ Return a list of all staff with exclusion. """ try: exclude = list(exclude) except: pass if exclude is None or len(exclude) == 0: return all_staff() excluded_names = [] for raw_name in exclude: excluded_name = match_staff_name(raw_name) if excluded_name: excluded_names.append(excluded_name) return [name for name in all_staff() if name not in excluded_names] def get_staff_full_profile(name: str): """ Get the staff full profile given its name (including description and skill). """ name = match_staff_name(name) if name: full_profile = staff_df[staff_df["Name"] == name].iloc[0].to_dict() return full_profile return None def get_staff_skills_profile(name: str): """ Get the staff skills profile given its name. """ full_profile = get_staff_full_profile(name) return {k: full_profile[k] for k in NUMERIC_PROFILE} def get_staff_profile(name: str): """ Get the staff profile without skill part. """ full_profile = get_staff_full_profile(name) return {k: v for k, v in full_profile.items() if k not in NUMERIC_PROFILE} def search_staff_by_skills( laser_cutting: float = None, wood_working: float = None, wood_cnc: float = None, metal_machining: float = None, metal_cnc: float = None, three_d_printer: float = None, welding: float = None, electronics: float = None, exclude: list = None, n_results: int = 1, ): names = all_available_staff(exclude) scored = [] for name in names: skills_profile = get_staff_skills_profile(name) score = skill_score( skill_profile=skills_profile, laser_cutting=laser_cutting, wood_working=wood_working, wood_cnc=wood_cnc, metal_machining=metal_machining, metal_cnc=metal_cnc, three_d_printer=three_d_printer, welding=welding, electronics=electronics, ) # keep only positive scores if score is not None and score > 0: scored.append((name, score)) scored.sort(key=lambda x: x[1]) # sort by score ascending (lower = better) return [name for name, score in scored[:n_results]] class SearchStaffInformation(smolagents.tools.Tool): name = "search_staff_information" description = ( "Search the staff information by its name." ) inputs = { "name": {"type": "string", "description": "Name of the staff member."}, } output_type = "object" def forward(self, name: str) -> str: return json.dumps(get_staff_profile(name)) class FindSuitableStaff(smolagents.tools.Tool): name = "find_suitable_staff" description = ( "Find the most suitable staff member for the task based on required skills." ) inputs = { "laser_cutting": {"type": "number", "nullable": True, "description": "Laser cutting skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "wood_working": {"type": "number", "nullable": True, "description": "Wood working skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "wood_cnc": {"type": "number", "nullable": True, "description": "Wood CNC skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "metal_machining": {"type": "number", "nullable": True, "description": "Metal machining skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "metal_cnc": {"type": "number", "nullable": True, "description": "Metal CNC skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "three_d_printer": {"type": "number", "nullable": True, "description": "3D printer skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "welding": {"type": "number", "nullable": True, "description": "Welding skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "electronics": {"type": "number", "nullable": True, "description": "Electronics skill required for the task. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "exclude": {"type": "number", "nullable": True, "description": "A list of names that we want to exclude from searching. Default is None or an empty list."} } output_type = "object" def forward(self, laser_cutting: float = None, wood_working: float = None, wood_cnc: float = None, metal_machining: float = None, metal_cnc: float = None, three_d_printer: float = None, welding: float = None, electronics: float = None, exclude: list = None, ) -> str: names = search_staff_by_skills( laser_cutting = laser_cutting, wood_working = wood_working, wood_cnc = wood_cnc, metal_machining = metal_machining, metal_cnc = metal_cnc, three_d_printer = three_d_printer, welding = welding, electronics = electronics, exclude = exclude, n_results = 2 ) staff_profiles = [get_staff_profile(name) for name in names] return json.dumps(staff_profiles) # Course Functions def all_courses_code(): """ Return a list of all course codes. """ return courses_df["Code"].dropna().astype(str).tolist() def all_courses_name(): """ Return a list of all course names. """ return courses_df["Name"].dropna().tolist() def course_name_to_code(course_name): """ Convert the course name to course code. """ return str(courses_df[courses_df["Name"] == course_name]["Code"].iloc[0]) def course_code_to_name(course_code): """ Convert the course code to course name. """ return str(courses_df[courses_df["Code"].astype(str) == str(course_code)]["Name"].iloc[0]) def match_course_name_code(input): """ Match the course to the closest match in the course list and return their codes. """ input = str(input) matches = None code_matches = difflib.get_close_matches(input, all_courses_code(), n=3, cutoff=0.2) name_matches_code = difflib.get_close_matches(input, all_courses_name(), n=2, cutoff=0.3) if name_matches_code: name_matches = [course_name_to_code(name) for name in name_matches_code] else: name_matches = None if code_matches and name_matches: matches = code_matches + name_matches elif code_matches and not name_matches: matches = code_matches elif name_matches and not code_matches: matches = name_matches return matches def get_course_full_profile(course): """ Get the course full profile given its code (including description and skill). """ # Ensure the input code is a string for comparison matches = match_course_name_code(course) code = matches[0] if matches else None if code: full_profile = courses_df[courses_df["Code"].astype(str) == code].iloc[0].to_dict() return full_profile return None def get_course_skills_profile(course_code): """ Get the course skills profile given its code. """ full_profile = get_course_full_profile(course_code) return {k: full_profile[k] for k in NUMERIC_PROFILE} def get_course_profile(course_code): """ Get the course profile without skill part. """ full_profile = get_course_full_profile(course_code) return {k: v for k, v in full_profile.items() if k not in NUMERIC_PROFILE} def search_course_by_skills( laser_cutting: float = None, wood_working: float = None, wood_cnc: float = None, metal_machining: float = None, metal_cnc: float = None, three_d_printer: float = None, welding: float = None, electronics: float = None, n_results: int = 1, ): names = all_courses_code() scored_courses = [] for name in names: skills_profile = get_course_skills_profile(name) score = skill_score( skill_profile=skills_profile, laser_cutting=laser_cutting, wood_working=wood_working, wood_cnc=wood_cnc, metal_machining=metal_machining, metal_cnc=metal_cnc, three_d_printer=three_d_printer, welding=welding, electronics=electronics, ) if score is not None: scored_courses.append((abs(score), name)) # store (absolute_score, course_name) scored_courses.sort(key=lambda x: x[0]) return [name for _, name in scored_courses[:n_results]] class SearchCourseInformation(smolagents.tools.Tool): name = "search_course_information" description = ( "Search the course information by the course name or course number (code)." ) inputs = { "name": {"type": "string", "description": "Course name or course number (code)."}, } output_type = "object" def forward(self, name: str) -> str: return json.dumps(get_course_profile(name)) class FindSuitableCourses(smolagents.tools.Tool): name = "find_suitable_courses" description = ( "Find the top 3 most suitable courses for the task based on required skills. The first element is the best match." ) inputs = { "laser_cutting": {"type": "number", "nullable": True, "description": "Laser cutting skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "wood_working": {"type": "number", "nullable": True, "description": "Wood working skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "wood_cnc": {"type": "number", "nullable": True, "description": "Wood CNC skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "metal_machining": {"type": "number", "nullable": True, "description": "Metal machining skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "metal_cnc": {"type": "number", "nullable": True, "description": "Metal CNC skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "three_d_printer": {"type": "number", "nullable": True, "description": "3D printer skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "welding": {"type": "number", "nullable": True, "description": "Welding skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, "electronics": {"type": "number", "nullable": True, "description": "Electronics skill being taught during the course. It is a number between 0 (no expertise required) to 3 (high expertise expertise). Default is None. If left None, it will be ignored. (Optional)"}, } output_type = "object" def forward(self, laser_cutting: float = None, wood_working: float = None, wood_cnc: float = None, metal_machining: float = None, metal_cnc: float = None, three_d_printer: float = None, welding: float = None, electronics: float = None, ) -> str: matches = search_course_by_skills( laser_cutting = laser_cutting, wood_working = wood_working, wood_cnc = wood_cnc, metal_machining = metal_machining, metal_cnc = metal_cnc, three_d_printer = three_d_printer, welding = welding, electronics = electronics, n_results = 3, ) options = [get_course_profile(course) for course in matches] return json.dumps(options) # Machines and Tools Functions def all_tools(): """ Return a list of all tools and machines. """ return tools_df["Name"].dropna().astype(str).tolist() def match_tool_name(input): """ Match the course to the closest match in the course list and return their codes. """ input = str(input) matches = difflib.get_close_matches(input, all_tools(), n=1, cutoff=0.4) return matches[0] if matches else None def get_tool_location(name: str): """ Get the tool location given its name. """ tool_name = match_tool_name(name) if tool_name is not None: return tools_df[tools_df["Name"] == tool_name].iloc[0]["Location"] else: raise ValueError("Not found.") def is_tool_accessible(name): """ Check if the machine is accessible to students, and if they require taking mandatory courses. """ result = None tool_name = match_tool_name(name) if tool_name is not None: accessible = tools_df[tools_df["Name"] == tool_name].iloc[0]["Accessible by Students"] accessible = bool(accessible) course_code = tools_df[tools_df["Name"] == tool_name].iloc[0]["Required Course"] else: raise ValueError("Not found.") if accessible is True: if course_code: # Accessible result_short = "Yes" result_description = f"Student can access it, but they may benefit from taking the course {course_code}: {course_code_to_name(course_code)}" else: # Accessible result_short = "Yes" result_description = "Student can access it." else: if course_code: # Accessible but conditional (only by passing the course) result_short = "Conditional" result_description = f"Student can access it only if they take the course {course_code}: {course_code_to_name(course_code)}." else: # Not accessible by students. Need staff members! result_short = "No" result_description = "Student cannot access it. Only available to staff memebers. Ask them to do your task for you." result = { "name": tool_name, "short answer": result_short, "description": result_description } return json.dumps(result) class SearchMachineLocation(smolagents.tools.Tool): name = "search_machine_location" description = ( "Search the machine or tool location in the TechSpark." ) inputs = { "name": {"type": "string", "description": "Tool or machine name."}, } output_type = "object" def forward(self, name: str) -> str: return json.dumps(get_tool_location(name)) class CheckMachineAccessibility(smolagents.tools.Tool): name = "check_machine_accessibility" description = ( "Check whether machine or tool is accessible to students. Some are accessible, some need to take a course to become accessible, and some are only available to staff members." ) inputs = { "name": {"type": "string", "description": "Tool or machine name."}, } output_type = "object" def forward(self, name: str) -> str: return json.dumps(is_tool_accessible(name)) # Wikipedia Functions class WikipediaSearch(smolagents.Tool): """ Create tool for searching Wikipedia """ name = "wikipedia_search" description = "Search Wikipedia, the free encyclopedia. Use this to learn about the topics that you don't have enough information." inputs = { "query": {"type": "string", "nullable": False, "description": "The search terms",}, } output_type = "string" def forward(self, query: str | None = None) -> str: if not query: return "Error: 'query' is required." wikipedia_api = WikipediaAPIWrapper(top_k_results=1) answer = wikipedia_api.run(query) return answer # Map Functions def all_nodes(): """ Return a list of all nodes name. """ return nodes_df["Name"].dropna().astype(str).tolist() def all_locations(): """ Return a list of all locations. """ return list(set(all_nodes())) def match_node_name(input): """ Match the input to the closest match in the nodes list and return their id. """ input = str(input) matches = difflib.get_close_matches(input, all_locations(), n=1, cutoff=0.2) return matches[0] if matches else None def node_pos(id: int): row = nodes_df.loc[nodes_df["ID"] == id, ["X", "Y"]] if row.empty: return None return row.iloc[0].tolist() def node_name(id: int): row = nodes_df.loc[nodes_df["ID"] == id, ["Name"]] if row.empty: return None return row.iloc[0]["Name"] def node_id(name: str): row = nodes_df.loc[nodes_df["Name"] == name, ["ID"]] if row.empty: return None return row.iloc[0]["ID"] def load_graph(nodes_df, edges_df): G = nx.Graph() # Add nodes with attributes for _, row in nodes_df.iterrows(): G.add_node(row["ID"]) # Add edges for _, row in edges_df.iterrows(): G.add_edge(row["ID 1"], row["ID 2"]) return G G = load_graph(nodes_df, edges_df) def path_finder(destination: int, source: int): try: path = nx.shortest_path(G, source=source, target=destination) path = [[int(x), int(y)] for x, y in zip(path[:-1], path[1:])] except nx.NetworkXNoPath: return None return path def shortest_path(destination: int, source: int = None): if source is None: entrances = [0, 7] paths = [] for entrance in entrances: path = path_finder(destination, entrance) paths.append(path) path = min(paths, key=len) else: path = path_finder(destination, source) return path def path_to_vector(path): path_vector = [] for piece in path: start = piece[0] end = piece[1] start_pos = node_pos(start) end_pos = node_pos(end) path_vector.append( [ end_pos[0] - start_pos[0], end_pos[1] - start_pos[1], ] ) return path_vector def path_to_names(path): names = [] for i in range(len(path)): if i == 0: names.append(node_name(path[i][0])) names.append(node_name(path[i][1])) else: names.append(node_name(path[i][1])) return names def vector_angle_signed(v1, v2): v1 = np.array(v1, dtype=float) v2 = np.array(v2, dtype=float) # Normalize n1 = v1 / np.linalg.norm(v1) n2 = v2 / np.linalg.norm(v2) # Dot and cross dot = np.dot(n1, n2) cross = n1[0] * n2[1] - n1[1] * n2[0] # z-component of cross product in 2D # Angle (radians → degrees) angle = np.degrees(np.arctan2(cross, dot)) return angle def turn_side(v1, v2): angle = vector_angle_signed(v1, v2) threshold = 10 if abs(angle) < threshold: return "go straight" elif angle > 0: return "turn left" else: return "turn right" def path_human(destination, source=None): destination_name = match_node_name(destination) if source is not None: source_name = match_node_name(source) source_id = node_id(source_name) else: source_name = None source_id = None destination_id = node_id(destination_name) path = shortest_path(destination=destination_id, source=source_id) names = path_to_names(path) vectors = path_to_vector(path) turns = [] for i in range(len(vectors) - 1): v1 = vectors[i] v2 = vectors[i+1] turns.append(turn_side(v1, v2)) txt = f"Enter from {names[0]}, " for i in range(len(turns)): txt += f"you'll reach {names[i+1]}, " txt += f"and then {turns[i]}, " txt += f"and finally reach {names[-1]}." return txt class PathFinding(smolagents.tools.Tool): name = "find_path" description = ( "Find the location and easiest path to reach areas inside the TechSpark. Also useful to help the user to reach machines in those locations." ) inputs = { "destination": {"type": "string", "description": "Name of the location inside the TechSpark."}, } output_type = "object" def forward(self, destination: str) -> str: return path_human(destination, source=None) # Agent model = smolagents.OpenAIServerModel( model_id="gpt-4.1-mini", api_key=OPENAI_API, ) techspark_definition = """ TechSpark is the largest makerspace at CMU (Carnegie Mellon University), located in the College of Engineering.  Its mission is to promote a vibrant, student-centric making culture to enhance educational, extracurricular, and research activities across the entire campus community. """ instruction = """ You are a helpful assistant for the CMU TechSpark facility. Your purpose is to assist users with inquiries related to staff, courses, and tools. Use the available tools to find information about staff members, suggest suitable staff based on skills, or provide training information for machines. Respond concisely and directly with the information requested by the user, utilizing the output from the tools. Which machines to use for a task, and where to find them. When you were in doubt, try searching wikipedia to gain more knowledge. Only answer questions related to TechSpark and manufacturing. If the question was out of scope, inform the user and try to suggest relevant question to ask. If the question was too technical, refer the user to relevant staff. Don't tell anyone about your internal tools. Safety is important. So: - When talking about any machines, it is good idea to check whether it is accessbile to students or not. - Try to match them to correct staff member specially when you are not sure about your answer or the student work might be dangerous. It is always a good idea to suggest some staff members if they can help and validate users request. - Never help them troubleshoot anything. In that case, only help them to contact proper staff members. Always return smooth, human-readable results. """ system_prompt = f""" {techspark_definition} {instruction} """ agent = smolagents.CodeAgent( tools=[ smolagents.FinalAnswerTool(), SearchStaffInformation(), FindSuitableStaff(), SearchCourseInformation(), FindSuitableCourses(), SearchMachineLocation(), CheckMachineAccessibility(), WikipediaSearch(), PathFinding(), ], instructions=system_prompt, model=model, add_base_tools=False, max_steps=10, verbosity_level=2, # show steps in logs for class demo )