Files
abot/slack_functions.py
2026-01-14 11:44:13 -08:00

385 lines
15 KiB
Python

import os
import json
import logging
import requests
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo # Import ZoneInfo for proper timezone handling
from pathlib import Path
# User information cache management
def load_user_cache(cache_file='user_cache.json'):
"""Load user cache from file"""
try:
with open(cache_file, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_user_cache(users, cache_file='user_cache.json'):
"""Save user cache to file"""
with open(cache_file, 'w') as f:
json.dump(users, f, indent=2)
def get_slack_user_info(slack_client, user_id, bot_user_id=None, users=None):
"""
Retrieve comprehensive user information, fetching from Slack if not in cache
Args:
slack_client: The initialized Slack client
user_id (str): Slack user ID
bot_user_id (str, optional): The bot's user ID
users (dict, optional): User cache dictionary
Returns:
str: Formatted user name
"""
# Initialize users if not provided
if users is None:
users = load_user_cache()
# Check if user is in cache and not stale (older than 30 days)
if user_id in users:
cached_user = users[user_id]
cache_time = datetime.fromisoformat(cached_user.get('cached_at', '1970-01-01'))
if (datetime.now() - cache_time) < timedelta(days=30):
# Return preferred name format
return (
cached_user.get('real_name') or
cached_user.get('display_name') or
cached_user.get('name') or
user_id
)
# Fetch user info from Slack
try:
response = slack_client.users_info(user=user_id)
if response['ok']:
user_info = response['user']
profile = user_info.get('profile', {})
# Extract comprehensive user details
user_details = {
'id': user_id,
'name': user_info.get('name', ''),
'real_name': user_info.get('real_name', ''),
'display_name': profile.get('display_name', ''),
'email': profile.get('email', ''),
'title': profile.get('title', ''),
'first_name': profile.get('first_name', ''),
'last_name': profile.get('last_name', ''),
'cached_at': datetime.now().isoformat()
}
# Update cache
users[user_id] = user_details
save_user_cache(users)
# Return preferred name format
return (
user_details.get('real_name') or
user_details.get('display_name') or
user_details.get('name') or
user_id
)
except Exception as e:
logging.error(f"Error fetching user info for {user_id}: {e}")
return user_id # Fallback to user ID
def load_channel_cache(cache_file='channel_cache.json'):
"""Load channel cache from file"""
try:
with open(cache_file, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_channel_cache(channels, cache_file='channel_cache.json'):
"""Save channel cache to file"""
with open(cache_file, 'w') as f:
json.dump(channels, f, indent=2)
def get_slack_channel_info(slack_client, channel_id, bot_user_id=None, channels=None):
"""
Retrieve channel information, fetching from Slack if not in cache
Args:
slack_client: The initialized Slack client
channel_id (str): Slack channel ID
bot_user_id (str, optional): The bot's user ID
channels (dict, optional): Channel cache dictionary
Returns:
str: Channel name
"""
# Initialize channels if not provided
if channels is None:
channels = load_channel_cache()
# Check if channel is in cache and not stale (older than 30 days)
if channel_id in channels:
cached_channel = channels[channel_id]
cache_time = datetime.fromisoformat(cached_channel.get('cached_at', '1970-01-01'))
if (datetime.now() - cache_time) < timedelta(days=30):
return cached_channel.get('name', channel_id)
# Fetch channel info from Slack
try:
# Different API methods for different channel types
if channel_id.startswith('C'): # Public channel
response = slack_client.conversations_info(channel=channel_id)
elif channel_id.startswith('G'): # Private channel or multi-person DM
response = slack_client.conversations_info(channel=channel_id)
elif channel_id.startswith('D'): # Direct message
# For DMs, we'll use a custom name since there's no channel name
user_ids = slack_client.conversations_members(channel=channel_id)
if user_ids['ok'] and len(user_ids['members']) == 2:
# Get the other user's name (not the bot)
other_user_id = [uid for uid in user_ids['members'] if uid != bot_user_id][0]
other_user_name = get_slack_user_info(slack_client, other_user_id, bot_user_id)
channel_details = {
'id': channel_id,
'name': f"dm-{other_user_name}",
'is_dm': True,
'cached_at': datetime.now().isoformat()
}
channels[channel_id] = channel_details
save_channel_cache(channels)
return channel_details['name']
return f"dm-channel"
if response['ok']:
channel_info = response['channel']
# Extract channel details
channel_details = {
'id': channel_id,
'name': channel_info.get('name', ''),
'is_private': channel_info.get('is_private', False),
'is_dm': False,
'cached_at': datetime.now().isoformat()
}
# Update cache
channels[channel_id] = channel_details
save_channel_cache(channels)
return channel_details['name']
except Exception as e:
logging.error(f"Error fetching channel info for {channel_id}: {e}")
return channel_id # Fallback to channel ID
def format_slack_timestamp(ts):
"""Convert Slack timestamp to human-readable format in Pacific time"""
try:
timestamp = float(ts)
# Create UTC datetime and then convert to Pacific time
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
# Use ZoneInfo for proper timezone handling, including DST transitions
pacific_tz = ZoneInfo("America/Los_Angeles")
return dt.astimezone(pacific_tz).strftime("%Y-%m-%d %H:%M:%S %Z")
except (ValueError, TypeError):
return "Unknown Time"
def log_slack_message(slack_client, channel_id, user_id, text, ts, bot_user_id=None):
"""
Log Slack messages to files organized by date and channel
Args:
slack_client: The initialized Slack client
channel_id (str): Slack channel ID
user_id (str): Slack user ID
text (str): Message text
ts (str): Message timestamp
bot_user_id (str, optional): The bot's user ID
"""
# Create logs directory if it doesn't exist
log_dir = Path('./slack-logs')
log_dir.mkdir(exist_ok=True)
# Get channel name
channel_name = get_slack_channel_info(slack_client, channel_id, bot_user_id)
# Get user name
user_name = get_slack_user_info(slack_client, user_id, bot_user_id)
# Format the timestamp for filename and log entry, using Pacific time with ZoneInfo
timestamp = float(ts)
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
# Use ZoneInfo for America/Los_Angeles timezone (Pacific Time)
pacific_tz = ZoneInfo("America/Los_Angeles")
pacific_dt = dt.astimezone(pacific_tz)
date_str = pacific_dt.strftime("%Y-%m-%d")
time_str = pacific_dt.strftime("%H:%M:%S")
# Create filename with date and channel
filename = f"{date_str}-{channel_name}-log.txt"
log_path = log_dir / filename
# Format the log entry
log_entry = f"[{date_str} {time_str}] {user_name}: {text}\n"
# Write to log file
with open(log_path, 'a', encoding='utf-8') as f:
f.write(log_entry)
def get_recent_channel_history(channel_id, max_messages=100):
"""
Read recent messages from the channel log file for the current date
Args:
channel_id (str): Slack channel ID
max_messages (int): Maximum number of messages to retrieve
Returns:
str: Formatted string of recent messages
"""
try:
# Get channel name
channels = load_channel_cache()
channel_name = channels.get(channel_id, {}).get('name', channel_id)
# Get current date in Pacific time
pacific_tz = ZoneInfo("America/Los_Angeles")
current_date = datetime.now(pacific_tz).strftime("%Y-%m-%d")
# Construct log file path
log_dir = Path('./slack-logs')
log_file = f"{current_date}-{channel_name}-log.txt"
log_path = log_dir / log_file
# Check if the log file exists
if not log_path.exists():
return "No recent channel history available."
# Read the log file and extract the most recent messages
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Get the last max_messages lines
recent_messages = lines[-max_messages:] if len(lines) > max_messages else lines
# Format the messages as a single string
return "".join(recent_messages)
except Exception as e:
logging.error(f"Error retrieving channel history: {e}")
return "Error retrieving channel history."
def handle_slack_attachments(slack_client, message, bot_user_id=None):
"""
Process file attachments in Slack messages and save them to appropriate directories
Args:
slack_client: The initialized Slack client
message (dict): The Slack message object containing file attachments
bot_user_id (str, optional): The bot's user ID
Returns:
bool: True if attachments were handled, False otherwise
"""
# Check if the message contains files
if 'files' not in message:
return False
channel_id = message.get('channel')
user_id = message.get('user')
ts = message.get('ts')
# Get channel and user info
channel_name = get_slack_channel_info(slack_client, channel_id, bot_user_id)
user_name = get_slack_user_info(slack_client, user_id, bot_user_id)
# Process each file in the message
for file_data in message['files']:
try:
file_type = file_data.get('filetype', '').lower()
file_mimetype = file_data.get('mimetype', '')
file_url = file_data.get('url_private') or file_data.get('url_private_download')
file_name = file_data.get('name', 'unknown_file')
if not file_url:
logging.warning(f"No download URL found for file: {file_name}")
continue
# Determine file category and destination directory
if file_mimetype.startswith('audio/') or file_type in ['mp3', 'wav', 'ogg', 'm4a']:
save_dir = Path('./slack-audio-files')
category = 'audio'
elif file_mimetype.startswith('image/') or file_type in ['jpg', 'jpeg', 'png', 'gif']:
save_dir = Path('./slack-images')
category = 'image'
else:
save_dir = Path('./slack-other-files')
category = 'file'
# Create the directory if it doesn't exist
save_dir.mkdir(parents=True, exist_ok=True)
# Format timestamp for filename
pacific_tz = ZoneInfo("America/Los_Angeles")
timestamp = float(ts)
dt = datetime.fromtimestamp(timestamp, tz=pacific_tz)
date_str = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H%M%S")
# Create a unique filename that matches your logging format
# Format: date-channel-user-time-originalfilename.extension
file_extension = os.path.splitext(file_name)[1]
save_filename = f"{date_str}-{channel_name}-{user_name.replace(' ', '_')}-{time_str}{file_extension}"
save_path = save_dir / save_filename
# Download and save the file
download_slack_file(slack_client, file_url, save_path)
# Log the file save
logging.info(f"Saved {category} file from {user_name} in {channel_name}: {save_path}")
# Also log to the channel log file that a file was shared
log_file_message(slack_client, channel_id, user_id, f"[Shared {category} file: {file_name}]", ts, bot_user_id)
except Exception as e:
logging.error(f"Error processing file attachment: {e}")
return True
def download_slack_file(slack_client, file_url, save_path):
"""
Download a file from Slack using the private URL
Args:
slack_client: The initialized Slack client (for authentication)
file_url (str): The private URL of the file to download
save_path (Path): Path where the file should be saved
Returns:
bool: True if download succeeded, False otherwise
"""
try:
# Get the OAuth token from the slack client for authorization
token = slack_client.token
# Download the file with authorization
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(file_url, headers=headers, stream=True)
response.raise_for_status()
# Save the file
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
logging.error(f"Error downloading file from Slack: {e}")
return False
def log_file_message(slack_client, channel_id, user_id, text, ts, bot_user_id=None):
"""
A wrapper around log_slack_message to log file attachments in the same format
This uses the existing logging mechanism but with a special message indicating a file was shared
"""
log_slack_message(slack_client, channel_id, user_id, text, ts, bot_user_id)