import pyodbc import logging import re TOOL_DEFINITION = { "name": "get_imail_password", "description": "Retrieves historical email passwords from the legacy Imail database archives. Use this when a user asks for an old email password or 'imail' password.", "input_schema": { "type": "object", "properties": { "username": { "type": "string", "description": "The username part of the email (before the @)." }, "domain": { "type": "string", "description": "The domain part of the email (after the @)." } }, "required": ["username", "domain"] } } def normalize_domain_for_sql(domain): return domain.replace('.', '_') def parse_timestamp_from_table(table_name): if not table_name.startswith('X_CANCELLED_'): return 99999999 match = re.search(r'_(\d{8})_', table_name) if match: return int(match.group(1)) return 0 def get_imail_password(username, domain): server = 'emeralddev.fsr.com' database = 'IMAILSECDB-20260104' user = 'abot-read' password = 'N0npstN!' driver = '{ODBC Driver 18 for SQL Server}' try: cnxn = pyodbc.connect(f'DRIVER={driver};SERVER={server};DATABASE={database};UID={user};PWD={password};TrustServerCertificate=yes;') cursor = cnxn.cursor() sql_safe_domain = normalize_domain_for_sql(domain) cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'") all_tables = [row.TABLE_NAME for row in cursor.fetchall()] matches = [] for t in all_tables: if t == sql_safe_domain: matches.append(t) elif t.startswith(f"X_CANCELLED_{sql_safe_domain}_"): matches.append(t) if not matches: cnxn.close() return {"error": f"No tables found for domain: {domain}"} matches_by_date = {} for table in matches: ts = parse_timestamp_from_table(table) if ts not in matches_by_date: matches_by_date[ts] = [] matches_by_date[ts].append(table) sorted_timestamps = sorted(matches_by_date.keys(), reverse=True) result_data = {} found = False for ts in sorted_timestamps: current_group = matches_by_date[ts] group_results = [] for table in current_group: try: cursor.execute(f"SELECT USERID, PASSWORD FROM [{table}] WHERE USERID = ?", username) row = cursor.fetchone() if row: group_results.append({ 'table': table, 'password': row.PASSWORD }) except Exception as e: logging.error(f"Error querying table {table}: {e}") if group_results: unique_passwords = {} for r in group_results: if r['password'] not in unique_passwords: unique_passwords[r['password']] = [] unique_passwords[r['password']].append(r['table']) disclaimer = "\n*Note: Imail is no longer in use; this is a historical password retrieved from the archives.*" if len(unique_passwords) == 1: pwd = list(unique_passwords.keys())[0] src = list(unique_passwords.values())[0][0] # Format for Claude to present to user result_data = { "status": "success", "password": pwd, "source_table": src, "message": f"Found password: {pwd} (Source: {src}){disclaimer}" } else: conflicts = [] for pwd, tables in unique_passwords.items(): conflicts.append(f"Password: {pwd} (from {', '.join(tables)})") conflict_str = "\n".join(conflicts) result_data = { "status": "conflict", "details": conflicts, "message": f"Found conflicting passwords in tables from the same time period:\n{conflict_str}{disclaimer}" } found = True break # Found in newest group cnxn.close() if not found: return {"status": "not_found", "message": f"User '{username}' not found in any table for {domain}."} return result_data except Exception as e: return {"status": "error", "message": f"Database error: {str(e)}"} def run(**kwargs): """ Tool entrypoint required by the bot runtime. This must exist so the LLM can execute the tool. """ return get_imail_password(**kwargs)