Files
abot/tools/imail_tool.py
2026-01-14 11:44:13 -08:00

131 lines
4.9 KiB
Python

import pyodbc
import logging
import re
TOOL_DEFINITION = {
"name": "get_imail_password",
"description": "Retrieves historical email passwords from the legacy Imail database archives. Use this when a user asks for an old email password or 'imail' password.",
"input_schema": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "The username part of the email (before the @)."
},
"domain": {
"type": "string",
"description": "The domain part of the email (after the @)."
}
},
"required": ["username", "domain"]
}
}
def normalize_domain_for_sql(domain):
return domain.replace('.', '_')
def parse_timestamp_from_table(table_name):
if not table_name.startswith('X_CANCELLED_'):
return 99999999
match = re.search(r'_(\d{8})_', table_name)
if match:
return int(match.group(1))
return 0
def get_imail_password(username, domain):
server = 'emeralddev.fsr.com'
database = 'IMAILSECDB-20260104'
user = 'abot-read'
password = 'N0npstN!'
driver = '{ODBC Driver 18 for SQL Server}'
try:
cnxn = pyodbc.connect(f'DRIVER={driver};SERVER={server};DATABASE={database};UID={user};PWD={password};TrustServerCertificate=yes;')
cursor = cnxn.cursor()
sql_safe_domain = normalize_domain_for_sql(domain)
cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'")
all_tables = [row.TABLE_NAME for row in cursor.fetchall()]
matches = []
for t in all_tables:
if t == sql_safe_domain:
matches.append(t)
elif t.startswith(f"X_CANCELLED_{sql_safe_domain}_"):
matches.append(t)
if not matches:
cnxn.close()
return {"error": f"No tables found for domain: {domain}"}
matches_by_date = {}
for table in matches:
ts = parse_timestamp_from_table(table)
if ts not in matches_by_date:
matches_by_date[ts] = []
matches_by_date[ts].append(table)
sorted_timestamps = sorted(matches_by_date.keys(), reverse=True)
result_data = {}
found = False
for ts in sorted_timestamps:
current_group = matches_by_date[ts]
group_results = []
for table in current_group:
try:
cursor.execute(f"SELECT USERID, PASSWORD FROM [{table}] WHERE USERID = ?", username)
row = cursor.fetchone()
if row:
group_results.append({
'table': table,
'password': row.PASSWORD
})
except Exception as e:
logging.error(f"Error querying table {table}: {e}")
if group_results:
unique_passwords = {}
for r in group_results:
if r['password'] not in unique_passwords:
unique_passwords[r['password']] = []
unique_passwords[r['password']].append(r['table'])
disclaimer = "\n*Note: Imail is no longer in use; this is a historical password retrieved from the archives.*"
if len(unique_passwords) == 1:
pwd = list(unique_passwords.keys())[0]
src = list(unique_passwords.values())[0][0]
# Format for Claude to present to user
result_data = {
"status": "success",
"password": pwd,
"source_table": src,
"message": f"Found password: {pwd} (Source: {src}){disclaimer}"
}
else:
conflicts = []
for pwd, tables in unique_passwords.items():
conflicts.append(f"Password: {pwd} (from {', '.join(tables)})")
conflict_str = "\n".join(conflicts)
result_data = {
"status": "conflict",
"details": conflicts,
"message": f"Found conflicting passwords in tables from the same time period:\n{conflict_str}{disclaimer}"
}
found = True
break # Found in newest group
cnxn.close()
if not found:
return {"status": "not_found", "message": f"User '{username}' not found in any table for {domain}."}
return result_data
except Exception as e:
return {"status": "error", "message": f"Database error: {str(e)}"}