first commit

This commit is contained in:
Your Name
2026-01-14 01:41:03 -08:00
commit 9d31399518
18 changed files with 2191 additions and 0 deletions

586
tools/mtscripter.py Normal file
View File

@@ -0,0 +1,586 @@
# --- START OF FILE mtscripter.py ---
import os
from dotenv import load_dotenv
# generate_mikrotik_CPE_script.py
import datetime # Already imported, good.
from datetime import datetime # Explicitly import datetime class for clarity
import ipaddress # For potential future validation, though not used in core generation
import json # For result formatting
# import slack
from slack_sdk.web import WebClient # Explicitly import WebClient
from slack_sdk.errors import SlackApiError
import logging
from typing import Dict, Any # Added for type hinting
# Mimic the C# GlobalVariables.AppVersion
# You might want to manage this version more dynamically in your actual project
MTSCRIPTER_VERSION = "4.1"
# Initialize Slack client
slack_client = WebClient(token=os.environ['SLACK_TOKEN'])
# --- Tool Definition (for LLM) ---
TOOL_DEFINITION = {
"name": "generate_mikrotik_CPE_script",
"description": "Generate and send a MikroTik CPE configuration script directly to the Slack channel. Ask the user to clarify any parameters not provided.",
"input_schema": {
"type": "object",
"properties": {
"channel": {
"type": "string",
"description": "The Slack channel ID to send the script to"
},
"selected_hardware": {
"type": "string",
"description": "The hardware model (e.g., 'hap ax2', 'hap ac2')"
},
"selected_mode": {
"type": "string",
"description": "The connection mode ('PPPoE', 'Bridge', 'DHCP')"
},
"wan_int": {
"type": "string",
"description": "The designated WAN interface (e.g., 'ether1'). Ignored if mode is 'Bridge'. Defaults to 'ether1' if not provided."
},
"user": {
"type": "string",
"description": "The username (PPPoE user or device identifier). Should be lowercase"
},
"password": {
"type": "string",
"description": "The password (PPPoE password or device admin password)"
},
"ssid": {
"type": "string",
"description": "The Wi-Fi network name"
},
"wpa": {
"type": "string",
"description": "The Wi-Fi WPA2/WPA3 passphrase"
},
"technician_name": {
"type": "string",
"description": "The name of the first step technician requesting the script (the name of the person prompting you from the slack channel)."
},
"has_mgt": {
"type": "boolean",
"description": "Boolean indicating if a static management IP should be configured"
},
"mgt_ip": {
"type": "string",
"description": "The static management IP address (e.g., '10.1.1.5'). Required ONLY if has_mgt is True."
},
"has_wifi": {
"type": "boolean",
"description": "Boolean indicating if Wi-Fi should be configured and enabled. Defaults to True if not provided."
}
},
# List only the truly required arguments for the tool to function at all.
# Conditional requirements (like mgt_ip) are handled in the function logic.
"required": ["channel", "selected_hardware", "selected_mode", "user", "password", "ssid", "wpa", "technician_name", "has_mgt", "has_wifi"]
}
}
# --- End Tool Definition ---
# --- Tool Implementation ---
def generate_mikrotik_CPE_script(**kwargs: Any) -> Dict[str, Any]:
"""
Generates a MikroTik CPE configuration script based on input parameters,
validates them, saves the script locally to the 'cpe-configs' folder
with a timestamp, sends the script to the specified Slack channel,
and returns a confirmation message.
Args:
**kwargs (Any): Keyword arguments matching the tool's input_schema properties.
Returns:
A dictionary with the result of the operation, containing either a success message or error details.
"""
# --- Extract Arguments and Set Defaults ---
channel = kwargs.get('channel')
selected_hardware = kwargs.get('selected_hardware')
selected_mode = kwargs.get('selected_mode')
wan_int = kwargs.get('wan_int', "ether1") # Default WAN if not provided
user = kwargs.get('user')
password = kwargs.get('password')
ssid = kwargs.get('ssid')
wpa = kwargs.get('wpa')
has_mgt = kwargs.get('has_mgt', False) # Default False if missing (handle potential None from LLM)
mgt_ip = kwargs.get('mgt_ip') # No default, checked later if has_mgt is True
has_wifi = kwargs.get('has_wifi', True) # Default True if missing (handle potential None from LLM)
technician_name = kwargs.get('technician_name') # this is for some reason failing later validation even though it should be a string
# so let's print it out for debugging
logging.info(f"[mtscripter] technician_name received: {technician_name}")
# maybe we type cast it to str here to ensure it's a string
# tech_str = str(technician_name) if technician_name else "" # Ensure it's a string or empty
# Explicitly handle None for booleans if LLM might pass it
if has_mgt is None: has_mgt = False
if has_wifi is None: has_wifi = True
# --- Input Validation Moved Here ---
validation_errors = []
required_args_from_schema = TOOL_DEFINITION["input_schema"].get("required", [])
# Check static required args first (using the extracted/defaulted values)
arg_values = { # Map arg names to their extracted/defaulted values for checking
"channel": channel, "selected_hardware": selected_hardware, "selected_mode": selected_mode,
"user": user, "password": password, "ssid": ssid, "wpa": wpa,
"has_mgt": has_mgt, "has_wifi": has_wifi, "technician_name": technician_name # Added technician_name here for check below
}
for arg_name in required_args_from_schema:
value = arg_values.get(arg_name)
# Check for None or empty string (booleans are handled by their explicit check below)
# if not isinstance(value, bool) and not value:
# explicitly exclude args handled by specific elif blocks below
if arg_name not in ["has_mgt", "has_wifi"] and not isinstance(value, bool) and not value: # Removed technician_name from exclusion
validation_errors.append(f"Missing or empty required argument: '{arg_name}'.")
# Ensure booleans are actually booleans
elif arg_name in ["has_mgt", "has_wifi"] and not isinstance(value, bool):
validation_errors.append(f"Argument '{arg_name}' must be a boolean (true/false). Received: {type(value)}")
# # specific check for technician_name (must be non-empty string) < - this is broken for some reason
# # Re-enabled technician_name check - let's see if it passes now
# # It seems technician_name was missing from arg_values dict before
elif arg_name == "technician_name" and (not value or not isinstance(value, str)):
validation_errors.append(f"Argument 'technician_name' is required and must be a non-empty string.")
# Conditional validation for mgt_ip
if has_mgt and (not mgt_ip or not isinstance(mgt_ip, str)):
validation_errors.append("Management IP ('mgt_ip') is required and must be a non-empty string when 'has_mgt' is True.")
# Validate specific values/types
if selected_hardware and not isinstance(selected_hardware, str):
validation_errors.append(f"Argument 'selected_hardware' must be a string.")
elif selected_hardware and selected_hardware.lower() not in ["hap ax2", "hap ac2"]:
# logging.warning(f"Unrecognized hardware '{selected_hardware}'. Defaulting to 'hap ax2' for script generation, but LLM should be corrected.")
# selected_hardware = "hap ax2" # Default internally, but maybe flag as warning? No, let's add error. # No longer defaulting, return error
validation_errors.append(f"Unrecognized hardware '{selected_hardware}'. Expected 'hap ax2' or 'hap ac2'.")
if selected_mode and not isinstance(selected_mode, str):
validation_errors.append(f"Argument 'selected_mode' must be a string.")
elif selected_mode and selected_mode not in ["PPPoE", "Bridge", "DHCP"]:
validation_errors.append(f"Unrecognized mode '{selected_mode}'. Expected 'PPPoE', 'Bridge', or 'DHCP'.")
# Check other types if needed (e.g., wan_int should be string)
if wan_int and not isinstance(wan_int, str):
validation_errors.append(f"Argument 'wan_int' must be a string. Received: {type(wan_int)}")
# If validation errors, return them
if validation_errors:
error_message = "Input validation failed: " + ", ".join(validation_errors)
logging.error(f"[mtscripter] {error_message}")
return {"error": error_message}
# --- End Input Validation ---
try:
# --- Variable Setup (use validated/defaulted variables) ---
# `selected_hardware` might have been defaulted above if invalid input was given and we chose to proceed -> No longer defaulting
logical_wan = wan_int # Use the defaulted or provided wan_int
mgt_wan = wan_int
# date_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S") # Use specific class
date_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") # Simplified using explicit import
mgt_gw = ""
# Adjust WAN based on mode (after validation ensures selected_mode is valid)
if selected_mode == "Bridge":
logical_wan = "bridge"
mgt_wan = "bridge"
elif selected_mode == "PPPoE":
logical_wan = "pppoe-out1"
# else DHCP mode uses wan_int directly for logical_wan
# Calculate management GW (after validation ensures mgt_ip exists if has_mgt)
if has_mgt:
try:
ip_parts = mgt_ip.split('.')
if len(ip_parts) == 4:
mgt_gw = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.1"
else:
# This should ideally be caught by more robust IP validation if added
raise ValueError(f"Invalid Management IP format for gateway calculation: {mgt_ip}")
except Exception as e:
# This error indicates a problem despite passing initial validation (e.g., malformed IP string)
error_msg = f"Error calculating management gateway from '{mgt_ip}': {e}"
logging.error(f"[mtscripter] {error_msg}")
return {"error": error_msg}
# --- Determine Hardware Specific Interface Names ---
interface1 = ""
interface2 = ""
interface_type = ""
hardware_lower = selected_hardware.lower() # Already validated as 'hap ax2' or 'hap ac2'
if hardware_lower == "hap ax2":
interface1 = "wifi1"
interface2 = "wifi2"
interface_type = "wifiwave2"
elif hardware_lower == "hap ac2":
interface1 = "wlan1"
interface2 = "wlan2"
interface_type = "wireless"
# --- Build Configuration Script ---
# Using f-strings for easy variable insertion
# Required arguments are guaranteed to exist and be non-empty strings/booleans by validation
user_str = str(user)
pass_str = str(password)
ssid_str = str(ssid)
wpa_str = str(wpa)
tech_str = str(technician_name) # technician_name passed validation
# --- Start of Script Generation (Identical core logic) ---
# Note: Using triple double quotes for the main script block
# and relying on f-string interpolation within it.
# This maintains the original script structure.
config_text = f"""
# Router Configuration - Copy and Paste into Terminal
### Create Configuration Script ###
/system script
add name=configure.rsc comment="default configuration" source={{
:local USER "{user_str}"
:local PASS "{pass_str}"
:local SSID "{ssid_str}"
:local WPA2 "{wpa_str}"
:local WANI "{wan_int}"
### Create Bridge ###
{{
/interface bridge
add auto-mac=yes fast-forward=no name=bridge protocol-mode=none
/interface bridge port
add bridge=bridge interface=ether1
add bridge=bridge interface=ether2
add bridge=bridge interface=ether3
add bridge=bridge interface=ether4
add bridge=bridge interface=ether5
add bridge=bridge interface={interface1}
:if ([:len [/interface {interface_type} find ]]=2) do={{ add bridge=bridge interface={interface2} }}
/log info message="bridge created"
}}
"""
# --- Mode Specific Configuration ---
if selected_mode != "Bridge":
config_text += f"""
### disable WAN port from bridge ###
/int bridge port set [find interface=$WANI] disabled=yes
"""
if selected_mode == "PPPoE":
config_text += f"""
### PPPoE ###
{{
/interface pppoe-client
add add-default-route=yes disabled=no interface=$WANI keepalive-timeout=60 \\
name=pppoe-out1 password=$PASS use-peer-dns=yes user=$USER
#Set PPP Profile VOIP workaround
/ppp profile set default on-up=":delay 5; /ip firewall connection remove [find protocol=udp and (dst-address~\\":5060\\\\\\$\\" or dst-address~\\":15062\\\\\\$\\")]"
/log info message="PPPoE Interface Configured"
}}
"""
elif selected_mode == "DHCP":
# logical_wan is already set to wan_int for DHCP mode
config_text += f"""
### DHCP Client on WAN ###
{{
/ip dhcp-client add disabled=no interface=$WANI use-peer-dns=yes add-default-route=yes
}}
"""
# Common configuration for non-Bridge modes (PPPoE, DHCP)
config_text += f"""
### DHCP and IP pool ###
{{
/ip pool
add name=dhcp ranges=192.168.88.10-192.168.88.254
/ip dhcp-server
add address-pool=dhcp authoritative=yes disabled=no interface=bridge name=standard lease-time=3d
/ip dhcp-server option
add name=VCI code=60 value="'Cambium-WiFi-AP'"
add name="cnPilot URL" code=43 value="'https://cnpilot.fsr.com'"
/ip address
add address=192.168.88.1/24 interface=bridge network=192.168.88.0
/ip dhcp-server network
add address=192.168.88.0/24 gateway=192.168.88.1 dns-server=192.168.88.1
/ip dns static
add address=192.168.88.1 name=router
/ip dns set allow-remote-requests=yes
/log info message="DHCP and IP Pool Configured"
}}
#### Firewall ####
{{
/interface list add name=WAN
/interface list member add list=WAN interface=$WANI
:foreach x in=[/interface pppoe-client find] do={{/interface list member add list=WAN interface=$x}}
/ip firewall filter
add action=fasttrack-connection chain=forward comment="fasttrack" \\
connection-state=established,related
add action=accept chain=forward comment="fasttrack" \\
connection-state=established,related
add action=drop chain=input comment="Drop inbound DNS requests" dst-port=53 in-interface-list=WAN protocol=udp
add action=drop chain=input comment="Drop inbound TCP DNS requests" dst-port=53 in-interface-list=WAN protocol=tcp
add action=accept chain=forward comment="accept established,related" \\
connection-state=established,related
add action=drop chain=forward comment="drop invalid" connection-state=invalid
add action=drop chain=forward comment="drop all from WAN not DSTNATed" connection-nat-state=!dstnat \\
connection-state=new in-interface-list=WAN
}}
{{
/ip firewall nat
### no interface ###
add action=masquerade chain=srcnat comment="masquerade" out-interface-list=WAN src-address=192.168.88.0/24
/log info message="Firewall Configured"
}}
#### Enable UPNP and Configure Interfaces ####
{{
/ip upnp set enabled=yes
/ip upnp interfaces add interface={logical_wan} type=external
/ip upnp interfaces add interface=bridge type=internal
/log info message="UPNP enabled"
}}
#### IPv6 enable
{{
/ipv6 settings set disable-ipv6=no
}}
#### IPv6 LAN
{{
/ipv6 nd set [ find default=yes ] interface=bridge ra-delay=0s ra-interval=30s-45s ra-lifetime=10m
/ipv6 nd prefix default set preferred-lifetime=5m valid-lifetime=10m
/ipv6 dhcp-server add prefix-pool=isp-pd interface=bridge lease-time=1m name=default
/ipv6 address add eui-64=no address=::1 from-pool=isp-pd interface=bridge
}}
#### IPv6 WAN
{{
/ipv6 dhcp-client add add-default-route=yes interface={logical_wan} pool-name=isp-pd rapid-commit=no request=prefix
}}
#### IPv6 Firewall
{{
/ipv6 firewall address-list
add address=::/128 comment="defconf: unspecified address" list=bad_ipv6
add address=::1/128 comment="defconf: lo" list=bad_ipv6
add address=::ffff:0.0.0.0/96 comment="defconf: ipv4-mapped" list=bad_ipv6
add address=::/96 comment="defconf: ipv4 compat" list=bad_ipv6
add address=100::/64 comment="defconf: discard only " list=bad_ipv6
add address=2001:db8::/32 comment="defconf: documentation" list=bad_ipv6
add address=2001:10::/28 comment="defconf: ORCHID" list=bad_ipv6
add address=3ffe::/16 comment="defconf: 6bone" list=bad_ipv6
/ipv6 firewall filter
add action=fasttrack-connection chain=forward comment="defconf: fasttrack" connection-state=established,related
add action=accept chain=forward comment="defconf: fasttrack" connection-state=established,related
add action=accept chain=input comment="defconf: accept established,related,untracked" connection-state=established,related,untracked
add action=accept chain=input comment="defconf: accept ICMPv6" protocol=icmpv6
add action=drop chain=input comment="defconf: drop invalid" connection-state=invalid
add action=accept chain=input comment="defconf: accept UDP traceroute" port=33434-33534 protocol=udp
add action=accept chain=input comment="defconf: accept DHCPv6-Client prefix delegation" dst-port=546 protocol=udp src-address=fe80::/10
add action=accept chain=input comment="defconf: accept IKE" dst-port=500,4500 protocol=udp
add action=accept chain=input comment="defconf: accept ipsec AH" protocol=ipsec-ah
add action=accept chain=input comment="defconf: accept ipsec ESP" protocol=ipsec-esp
add action=accept chain=input comment="defconf: accept all that matches ipsec policy" ipsec-policy=in,ipsec
add action=drop chain=input comment="defconf: drop everything else not coming from LAN" disabled=yes in-interface-list=WAN
add action=accept chain=forward comment="defconf: accept untracked" connection-state=untracked
add action=drop chain=forward comment="defconf: drop invalid" connection-state=invalid
add action=drop chain=forward comment="defconf: drop packets with bad src ipv6" src-address-list=bad_ipv6
add action=drop chain=forward comment="defconf: drop packets with bad dst ipv6" dst-address-list=bad_ipv6
add action=drop chain=forward comment="defconf: rfc4890 drop hop-limit=1" disabled=yes hop-limit=equal:1 protocol=icmpv6
add action=accept chain=forward comment="defconf: accept ICMPv6" protocol=icmpv6
add action=accept chain=forward comment="defconf: accept HIP" protocol=139
add action=accept chain=forward comment="defconf: accept IKE" dst-port=500,4500 protocol=udp
add action=accept chain=forward comment="defconf: accept ipsec AH" protocol=ipsec-ah
add action=accept chain=forward comment="defconf: accept ipsec ESP" protocol=ipsec-esp
add action=accept chain=forward comment="defconf: accept all that matches ipsec policy" ipsec-policy=in,ipsec
add action=drop chain=forward comment="defconf: drop everything else not coming from LAN" in-interface-list=WAN
}}
"""
# --- Management IP Configuration ---
if has_mgt:
config_text += f"""
### Configure Static Management Address ###
{{
/ip address add address={mgt_ip}/24 comment=management-ip interface={mgt_wan}
/routing table add name=management fib
/ip route add gateway={mgt_gw} distance=10 routing-table=management
/routing rule add action=lookup-only-in-table src-address={mgt_ip} table=management
/log info message="Management IP Configured: {mgt_ip}"
}}
"""
# --- DHCP Client on Bridge (Bridge mode without static MGT IP) ---
elif not has_mgt and selected_mode == "Bridge":
config_text += f"""
### DHCP Client on Bridge ###
{{
/ip dhcp-client add disabled=no interface=bridge use-peer-dns=yes add-default-route=yes
}}
"""
# --- Wi-Fi Configuration ---
if has_wifi: # Check the boolean flag (validated/defaulted earlier)
if hardware_lower == "hap ax2":
config_text += f"""
### Wireless for hap ax2 ###
{{
# DFS channel availability check (1 min)
/interface wifiwave2 channel
add name=5ghz band=5ghz-ax disabled=no skip-dfs-channels=10min-cac width=20/40/80mhz
add name=2ghz band=2ghz-ax disabled=no skip-dfs-channels=10min-cac width=20/40mhz
/interface wifiwave2 security
add name=$SSID authentication-types=wpa2-psk disabled=no encryption=ccmp \\
group-encryption=ccmp passphrase=$WPA2
/interface wifiwave2 configuration
add name=$SSID country="United States" disabled=no mode=ap ssid=$SSID \\
security=$SSID multicast-enhance=enabled
/interface wifiwave2
set [ find ] disabled=no configuration.mode=ap configuration=$SSID
set [ find default-name=wifi1 ] channel=5ghz
set [ find default-name=wifi2 ] channel=2ghz
/log info message="Wireless Configured for hap ax2"
}}
"""
elif hardware_lower == "hap ac2":
config_text += f"""
### Wireless for hap ac2 ###
{{
/interface wireless
set [ find ] disabled=no distance=indoors frequency=auto mode=ap-bridge \\
wireless-protocol=802.11 multicast-helper=full ssid=$SSID
set [ find default-name=wlan1 ] band=2ghz-b/g/n channel-width=20/40mhz-XX
:if ([:len [/interface wireless find ]]=2) do={{ \\
set [ find default-name=wlan2 ] band=5ghz-a/n/ac channel-width=20/40/80mhz-XXXX }}
/interface wireless security-profiles
set [ find default=yes ] authentication-types=wpa2-psk mode=dynamic-keys \\
wpa2-pre-shared-key=$WPA2
/log info message="Wireless Configured for hap ac2"
}}
"""
# --- Standard FSR Config & Script Closing ---
# Placeholder for FSR password - this should ideally be handled securely, maybe via env var?
# fsr_password_placeholder = "xxxxxxxx" # Placeholder removed, using env var below
# Attempt to get FSR password from environment variable
fsr_admin_password = os.environ.get('MIKROTIK_CPE_PASSWORD', 'ERROR_PASSWORD_NOT_SET') # Provide default if missing
if fsr_admin_password == 'ERROR_PASSWORD_NOT_SET':
logging.warning("[mtscripter] MIKROTIK_CPE_PASSWORD environment variable not set. Using placeholder in script.")
config_text += f"""
#### Set Identity and Login Password ####
{{
/system identity
set name=("router-".$USER)
/user set 0 password=$PASS
/user add name=fsr group=full password={fsr_admin_password}; ### Password comes from env var ###
/log info message="Identity and Password Set"
}}
### SNMP ###
/snmp {{
community set [find default=yes] addresses=0.0.0.0/0 name=fsr;
set contact=sysadmin@fsr.com enabled=yes location=$USER trap-community=fsr;
}};
#### Restrict IP Services to FSR Network ####
{{
/ip service set [/ip service find] address=64.126.128.0/18,204.52.244.0/22,10.0.0.0/8,192.168.0.0/16,172.16.0.0/12,100.64.0.0/10,2604:2a00::/32
/log info message="IP Services Restricted to FSR Network"
}}
#### Set log memory to 5000 ####
{{
/system log action set memory memory-lines=5000
}}
#### Set NTP server and time zone ####
{{
/ip cloud set update-time=no
/system ntp client set enabled=yes servers=199.245.242.36
/system clock set time-zone-name=PST8PDT
}}
#### Disable Insecure Services ####
{{
/ip service set [ find name=www ] disabled=yes
/ip service set [ find name=telnet ] disabled=yes
}}
#### Version and date comment ####
{{
/interface set [ find default-name=ether1 ] comment="configured with version {MTSCRIPTER_VERSION} on {date_time} by {tech_str}"
}}
#### Create Backup ####
{{
/file add name=flash/baseline.backup
/system backup save name=flash/baseline
}}
}}
/system script
run configure.rsc
# End of Additional Configuration
"""
# --- End of Script Generation ---
# --- Post-process config_text for LF line endings and no leading spaces ---
lines = config_text.splitlines() # Split into lines, automatically handles various line endings
processed_lines = [line.lstrip() for line in lines] # Remove leading whitespace from each line
config_text = '\r\n'.join(processed_lines) # Join back with CRLF endings
# --- End Post-processing ---
# --- Prepare Filenames (Slack and Local) ---
# Determine base filename for Slack upload
cpe_type = "router" if selected_mode in ["DHCP", "PPPoE"] else "bridge"
base_filename_slack = f"{cpe_type}-{user_str}.rsc" # Existing logic
# Determine unique filename for local save
save_dir = "cpe-configs"
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
name_part, ext_part = os.path.splitext(base_filename_slack)
local_filename = f"{name_part}_{timestamp_str}{ext_part}"
save_path = os.path.join(save_dir, local_filename)
# --- Save Script Locally ---
try:
os.makedirs(save_dir, exist_ok=True) # Ensure directory exists
with open(save_path, 'w', encoding='utf-8', newline='\n') as f: # <-- Added newline='\n'
f.write(config_text)
logging.info(f"[mtscripter] Successfully saved generated script locally to: {save_path}")
except IOError as e:
# Log the error but continue to attempt Slack upload
logging.error(f"[mtscripter] Failed to save generated script locally to {save_path}: {e}")
# Optionally, you could add a note to the Slack message or return value here
# For now, we just log it.
# --- Send Script to Slack ---
try:
slack_client.files_upload_v2(
channel=channel, # Use validated channel ID
content=config_text,
filename=base_filename_slack, # Use the original base filename for Slack
initial_comment=f"MikroTik configuration script generated for `{user_str}` (Mode: {selected_mode}, Hardware: {selected_hardware}, Technician: {tech_str}).\nCopy the content below or download the attached `.rsc` file.",
title=base_filename_slack # Title matches filename
)
logging.info(f"[mtscripter] Successfully sent MikroTik script for {user_str} to channel {channel}")
except SlackApiError as e:
# Handle Slack API errors
error_message = f"Slack API error sending script to channel {channel}: {e.response['error']}"
logging.error(f"[mtscripter] {error_message}")
# Return error to LLM so it knows it failed
# Include note about local save attempt
return {"error": f"Failed to send the script to Slack. Error: {e.response['error']}. Note: Script was attempted to be saved locally to '{save_path}'."}
# Return success message to LLM
return {"message": f"Script for user '{user_str}' (Mode: {selected_mode}) has been successfully generated, saved locally as '{local_filename}', and sent to the Slack channel {channel} as '{base_filename_slack}'."}
except ValueError as ve:
# Handle specific value errors raised during script generation (like GW calculation)
error_message = str(ve)
logging.error(f"[mtscripter] Value error generating MikroTik script: {error_message}")
return {"error": error_message} # Return specific error to LLM
except Exception as e:
# Handle any other unexpected exceptions during script generation or Slack sending
error_message = f"Unexpected error generating or sending MikroTik script: {str(e)}"
logging.error(f"[mtscripter] {error_message}", exc_info=True) # Log traceback
return {"error": f"An unexpected internal error occurred while generating the script: {str(e)}"} # Return generic error
# --- End Tool Implementation ---
# --- END OF FILE mtscripter.py ---

196
tools/user_lookup_tool.py Normal file
View File

@@ -0,0 +1,196 @@
# --- START OF FILE user_lookup_tool.py ---
import os
import json
import logging
from typing import Dict, Any
# --- Tool Definition (for LLM) ---
TOOL_DEFINITION = {
"name": "get_user_info",
"description": "Look up information about Slack users from the cache file by searching across all available fields",
"input_schema": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Term to search for - matches against any field in user records including ID, name, real name, display name, email, title, phone, etc."
}
},
"required": ["search_term"]
}
}
# --- End Tool Definition ---
# --- Tool Implementation ---
def get_user_info(**kwargs: Any) -> Dict[str, Any]:
"""
Retrieve user information from the user cache file by searching across all available fields,
validating input arguments internally.
Args:
**kwargs (Any): Keyword arguments matching the tool's input_schema properties
(expects 'search_term').
Returns:
Dict[str, Any]: A dictionary containing matched user information or an error message
"""
search_term = kwargs.get('search_term')
# --- Input Validation Moved Here ---
# More forgiving search term validation
# If search_term is None or empty after stripping, return error
if search_term is None:
logging.error("get_user_info validation failed: Missing 'search_term' argument.")
return {
"found": False,
"error": "Missing required argument: 'search_term'."
}
# Ensure search_term is a string and strip whitespace
try:
search_term = str(search_term).strip()
except Exception as e:
logging.error(f"get_user_info validation failed: Could not convert search_term to string: {e}")
return {
"found": False,
"error": f"Invalid search_term format: {e}"
}
if not search_term:
logging.error("get_user_info validation failed: Empty 'search_term' provided.")
return {
"found": False,
"error": "Empty search term provided after stripping whitespace."
}
# --- End Input Validation ---
try:
logging.info(f"get_user_info: Attempting to find user with search term: {search_term}")
# Debug input received
logging.info(f"Search term type: {type(search_term)}, value: '{search_term}'")
# Normalize search term
search_term_lower = search_term.lower() # Use a different variable name
# Load the user cache
try:
# Specify encoding for broader compatibility
with open('user_cache.json', 'r', encoding='utf-8') as f:
user_cache = json.load(f)
except FileNotFoundError:
logging.error("User cache file 'user_cache.json' not found.")
return {"found": False, "error": "User cache file not found."}
except json.JSONDecodeError:
logging.error("Invalid JSON in user cache file 'user_cache.json'.")
return {"found": False, "error": "Invalid user cache format."}
# Search for matches across all users
matches = []
for user_id, user_data in user_cache.items():
# Check if user_data is valid
if not isinstance(user_data, dict):
logging.warning(f"Skipping invalid user data entry for ID {user_id} in cache.")
continue
# Flag to track if this user matches
user_matches = False
# Check every field in the user data for matches
for field_name, field_value in user_data.items():
# Skip non-string fields or None values
if field_value is None:
continue
# Convert field value to string and check for match
try:
str_value = str(field_value).lower()
if search_term_lower in str_value:
user_matches = True
break # Found a match in this user, no need to check other fields
except Exception as e:
# If any error in string conversion, just skip this field
logging.debug(f"Could not convert field '{field_name}' to string for user {user_id}: {e}")
continue
# If we found a match, add to our results
if user_matches:
# Create a clean user record with common fields
user_record = {
"id": user_data.get("id", user_id), # Use key as fallback ID
"name": user_data.get("name", ""),
"real_name": user_data.get("real_name", ""),
"display_name": user_data.get("display_name", ""),
"email": user_data.get("email", ""),
}
# Add optional fields if they exist and are not None
optional_fields = ["title", "phone", "first_name", "last_name", "cached_at", "status_text", "team"]
for field in optional_fields:
if field in user_data and user_data[field] is not None:
user_record[field] = user_data[field]
matches.append(user_record)
# Return results
if matches:
logging.info(f"Found {len(matches)} match(es) for search term '{search_term}'.")
return {
"found": True,
"match_count": len(matches),
"matches": matches
}
else:
logging.info(f"No users found matching '{search_term}'.")
return {
"found": False,
"error": f"No users found matching '{search_term}'"
}
except Exception as e:
logging.error(f"Unexpected error retrieving user info from cache: {e}", exc_info=True) # Added exc_info
return {
"found": False,
"error": f"An unexpected error occurred while accessing user cache: {str(e)}"
}
# --- End Tool Implementation ---
# Example Usage
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Create a dummy cache file for testing if it doesn't exist
dummy_cache_path = 'user_cache.json'
if not os.path.exists(dummy_cache_path):
print(f"Creating dummy {dummy_cache_path} for testing...")
dummy_data = {
"U123": {"id": "U123", "name": "john.doe", "real_name": "John Doe", "display_name": "Johnny", "email": "john.doe@example.com", "title": "Engineer"},
"U456": {"id": "U456", "name": "jane.smith", "real_name": "Jane Smith", "display_name": "Janey", "email": "jane.smith@example.com", "title": "Manager"},
"U789": {"id": "U789", "name": "test.user", "real_name": "Test User", "display_name": "", "email": "test@example.com", "phone": "555-1234"}
}
with open(dummy_cache_path, 'w', encoding='utf-8') as f:
json.dump(dummy_data, f, indent=2)
print("--- Testing get_user_info ---")
# Example: Find users with 'john' in any field
result1 = get_user_info(search_term="john")
print(f"Result (search='john'): {json.dumps(result1, indent=2)}")
# Example: Find user by specific email
result2 = get_user_info(search_term="jane.smith@example.com") # Use dummy email
print(f"Result (search='jane.smith@example.com'): {json.dumps(result2, indent=2)}")
# Example: Search for empty string (should fail validation)
result3 = get_user_info(search_term=" ")
print(f"Result (search=' '): {json.dumps(result3, indent=2)}")
# Example: Search for None (should fail validation)
result4 = get_user_info(search_term=None)
print(f"Result (search=None): {json.dumps(result4, indent=2)}")
# Example: Search term not found
result5 = get_user_info(search_term="NonExistentXYZ123")
print(f"Result (search='NonExistentXYZ123'): {json.dumps(result5, indent=2)}")
# --- END OF FILE user_lookup_tool.py ---

201
tools/weather_tool.py Normal file
View File

@@ -0,0 +1,201 @@
# --- START OF FILE weather_tool.py ---
from typing import Dict, Any
import os
import logging
import requests
import json
import slack
from slack import WebClient
from slack.errors import SlackApiError
# Initialize Slack client
slack_client = slack.WebClient(token=os.environ['SLACK_TOKEN'])
# --- Tool Definition (for LLM) ---
TOOL_DEFINITION = {
"name": "get_weather",
"description": "Retrieve current weather information for a given location so that you can provide that info to the user",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city name or city,country code to get weather for. Input MUST be formatted as {city name},{state code},{country code}. eg. Lewiston,ID,US. if state or country aren't provided by the USER, guess between WA and ID for state, and assume US for country"
},
"channel": {
"type": "string",
"description": "The Slack channel ID to post a webcam image to, if a webcam is available for the location"
}
},
"required": ["location", "channel"] # Both required as per original schema
}
}
# --- End Tool Definition ---
# Define a dictionary of placenames and their corresponding webcam URLs
WEBCAM_URLS = {
"Elk City": {"url": "https://511.idaho.gov/map/Cctv/205.C1--2", "info": "SH-14 Eastbound View"},
"Grangeville": {"url": "https://www.deq.idaho.gov/wp-content/uploads/cameras/Grangeville.jpg", "info": "North off High Camp"},
"Lewiston": {"url": "https://www.deq.idaho.gov/wp-content/uploads/cameras/Lewiston.jpg", "info": "South off Lewiston Rim"},
"Lapwai": {"url": "https://www.deq.idaho.gov/wp-content/uploads/cameras/Lapwai.jpg", "info": "East off Lewiston Rim"},
"Potlatch": {"url": "https://www.deq.idaho.gov/wp-content/uploads/cameras/Potlatch.jpg", "info": "North off West Twin"},
"Teakean Butte": {"url": "https://www.deq.idaho.gov/wp-content/uploads/cameras/Teakean.jpg", "info": "West off Teakean Butte"},
"Moscow": {"url": "https://media.kuoi.org/camera/latest.jpg", "info": "NNE off Morill Hall"}
# Add more locations as needed
}
# --- Tool Implementation ---
def get_weather(**kwargs: Any) -> Dict[str, Any]:
"""
Retrieve current weather information for a given location using OpenWeatherMap API,
validating input arguments internally.
Args:
**kwargs (Any): Keyword arguments matching the tool's input_schema properties
(expects 'location' and 'channel').
Returns:
Dict[str, Any]: A dictionary containing weather information or an error message.
"""
location = kwargs.get('location')
channel = kwargs.get('channel')
# --- Input Validation Moved Here ---
if not location or not isinstance(location, str):
logging.error("get_weather validation failed: Missing or invalid 'location' argument.")
return {"error": "Missing or invalid required argument: 'location'."}
if not channel or not isinstance(channel, str):
# This check is important because the function needs the channel to post webcams
logging.error("get_weather validation failed: Missing or invalid 'channel' argument.")
# Although the schema requires it, the LLM might still fail. Provide clear error.
return {"error": "Missing or invalid required argument: 'channel'."}
# --- End Input Validation ---
try:
# Log the incoming location request
logging.info(f"Attempting to fetch weather for location: {location}")
# Retrieve API key from environment variables
api_key = os.environ.get('OPENWEATHERMAP_API_KEY')
if not api_key:
logging.error("OpenWeatherMap API key not found in environment variables.")
return {
"error": "OpenWeatherMap API key not found. Please set OPENWEATHERMAP_API_KEY in your environment variables."
}
# Construct the API URL
base_url = "http://api.openweathermap.org/data/2.5/weather"
params = {
"q": location,
"appid": api_key,
"units": "imperial" # don't use metric units (Celsius) but rather, imperial (Fahrenheit)
}
# Log the API request details
logging.info(f"API Request URL: {base_url}")
logging.info(f"API Request Params: {params}")
# check if the location is in the webcam URLs dictionary, and if so, get the webcam URL. we should see if any part of the location input
# matches any of the locations in the dictionary, regardless of case, and if so, send the the webcam URL and info to the slack channel, and
# proceed on
webcam_posted = False # Flag to track if webcam was posted
for loc, webcam in WEBCAM_URLS.items():
if loc.lower() in location.lower():
webcam_url = webcam['url']
webcam_info = webcam['info']
logging.info(f"Webcam URL found for location: {loc}")
logging.info(f"Webcam URL: {webcam_url}, Info: {webcam_info}")
# Send the webcam URL and info to the Slack channel
try:
slack_client.chat_postMessage(
channel=channel,
text=f"Webcam for {loc} ({webcam_info}): {webcam_url}" # Added context
)
webcam_posted = True
except SlackApiError as e:
logging.error(f"Error sending message to Slack channel {channel}: {e.response['error']}")
# Decide if this should be returned as part of the tool result error
# For now, just log it and continue with weather lookup
break # Stop checking after the first match
# Make the API request
try:
response = requests.get(base_url, params=params, timeout=10)
# Log the response status
logging.info(f"API Response Status Code: {response.status_code}")
logging.debug(f"API Response Content: {response.text}")
# Check if the request was successful
if response.status_code == 200:
data = response.json()
weather_info = {
"location": data['name'],
"country": data['sys']['country'],
"temperature": data['main']['temp'],
"feels_like": data['main']['feels_like'],
"description": data['weather'][0]['description'],
"humidity": data['main']['humidity'],
"wind_speed": data['wind']['speed'],
"webcam_posted": webcam_posted # Include status of webcam post
}
# Log successful weather retrieval
logging.info(f"Successfully retrieved weather for {location}")
logging.info(f"Weather details: {weather_info}")
return weather_info
else:
# Log unsuccessful API response
logging.error(f"Failed to retrieve weather. Status code: {response.status_code}")
logging.error(f"Response content: {response.text}")
return {
"error": f"Failed to retrieve weather. Status code: {response.status_code}, Response: {response.text}",
"webcam_posted": webcam_posted # Include status even on error
}
except requests.exceptions.RequestException as req_err:
# Log network-related errors
logging.error(f"Request error occurred: {req_err}")
return {
"error": f"Network error occurred: {str(req_err)}",
"webcam_posted": webcam_posted
}
except Exception as e:
# Log any unexpected errors
logging.error(f"Unexpected error occurred while fetching weather: {str(e)}", exc_info=True) # Added exc_info
# Attempt to get webcam_posted status if it was set before the error
wc_status = 'unknown'
if 'webcam_posted' in locals():
wc_status = webcam_posted
return {
"error": f"An unexpected error occurred while fetching weather: {str(e)}",
"webcam_posted": wc_status
}
# --- End Tool Implementation ---
# Example usage remains the same
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
test_channel = os.environ.get("TEST_SLACK_CHANNEL_ID", "C08B9A6RPN1")
print("--- Testing get_weather ---")
result1 = get_weather(location="Lewiston,ID,US", channel=test_channel)
print(f"Result (Lewiston): {result1}")
result2 = get_weather(location="London", channel=test_channel)
print(f"Result (London): {result2}")
result3 = get_weather(location="", channel=test_channel) # Test validation
print(f"Result (Empty Location): {result3}")
result4 = get_weather(location="Paris,FR") # Missing channel - kwargs will be missing 'channel'
print(f"Result (Missing Channel): {result4}")
result5 = get_weather(location="Grangeville", channel=test_channel) # With webcam
print(f"Result (Grangeville with Webcam): {result5}")
# --- END OF FILE weather_tool.py ---