import os import json import logging import requests from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo # Import ZoneInfo for proper timezone handling from pathlib import Path # User information cache management def load_user_cache(cache_file='user_cache.json'): """Load user cache from file""" try: with open(cache_file, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_user_cache(users, cache_file='user_cache.json'): """Save user cache to file""" with open(cache_file, 'w') as f: json.dump(users, f, indent=2) def get_slack_user_info(slack_client, user_id, bot_user_id=None, users=None): """ Retrieve comprehensive user information, fetching from Slack if not in cache Args: slack_client: The initialized Slack client user_id (str): Slack user ID bot_user_id (str, optional): The bot's user ID users (dict, optional): User cache dictionary Returns: str: Formatted user name """ # Initialize users if not provided if users is None: users = load_user_cache() # Check if user is in cache and not stale (older than 30 days) if user_id in users: cached_user = users[user_id] cache_time = datetime.fromisoformat(cached_user.get('cached_at', '1970-01-01')) if (datetime.now() - cache_time) < timedelta(days=30): # Return preferred name format return ( cached_user.get('real_name') or cached_user.get('display_name') or cached_user.get('name') or user_id ) # Fetch user info from Slack try: response = slack_client.users_info(user=user_id) if response['ok']: user_info = response['user'] profile = user_info.get('profile', {}) # Extract comprehensive user details user_details = { 'id': user_id, 'name': user_info.get('name', ''), 'real_name': user_info.get('real_name', ''), 'display_name': profile.get('display_name', ''), 'email': profile.get('email', ''), 'title': profile.get('title', ''), 'first_name': profile.get('first_name', ''), 'last_name': profile.get('last_name', ''), 'cached_at': datetime.now().isoformat() } # Update cache users[user_id] = user_details save_user_cache(users) # Return preferred name format return ( user_details.get('real_name') or user_details.get('display_name') or user_details.get('name') or user_id ) except Exception as e: logging.error(f"Error fetching user info for {user_id}: {e}") return user_id # Fallback to user ID def load_channel_cache(cache_file='channel_cache.json'): """Load channel cache from file""" try: with open(cache_file, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_channel_cache(channels, cache_file='channel_cache.json'): """Save channel cache to file""" with open(cache_file, 'w') as f: json.dump(channels, f, indent=2) def get_slack_channel_info(slack_client, channel_id, bot_user_id=None, channels=None): """ Retrieve channel information, fetching from Slack if not in cache Args: slack_client: The initialized Slack client channel_id (str): Slack channel ID bot_user_id (str, optional): The bot's user ID channels (dict, optional): Channel cache dictionary Returns: str: Channel name """ # Initialize channels if not provided if channels is None: channels = load_channel_cache() # Check if channel is in cache and not stale (older than 30 days) if channel_id in channels: cached_channel = channels[channel_id] cache_time = datetime.fromisoformat(cached_channel.get('cached_at', '1970-01-01')) if (datetime.now() - cache_time) < timedelta(days=30): return cached_channel.get('name', channel_id) # Fetch channel info from Slack try: # Different API methods for different channel types if channel_id.startswith('C'): # Public channel response = slack_client.conversations_info(channel=channel_id) elif channel_id.startswith('G'): # Private channel or multi-person DM response = slack_client.conversations_info(channel=channel_id) elif channel_id.startswith('D'): # Direct message # For DMs, we'll use a custom name since there's no channel name user_ids = slack_client.conversations_members(channel=channel_id) if user_ids['ok'] and len(user_ids['members']) == 2: # Get the other user's name (not the bot) other_user_id = [uid for uid in user_ids['members'] if uid != bot_user_id][0] other_user_name = get_slack_user_info(slack_client, other_user_id, bot_user_id) channel_details = { 'id': channel_id, 'name': f"dm-{other_user_name}", 'is_dm': True, 'cached_at': datetime.now().isoformat() } channels[channel_id] = channel_details save_channel_cache(channels) return channel_details['name'] return f"dm-channel" if response['ok']: channel_info = response['channel'] # Extract channel details channel_details = { 'id': channel_id, 'name': channel_info.get('name', ''), 'is_private': channel_info.get('is_private', False), 'is_dm': False, 'cached_at': datetime.now().isoformat() } # Update cache channels[channel_id] = channel_details save_channel_cache(channels) return channel_details['name'] except Exception as e: logging.error(f"Error fetching channel info for {channel_id}: {e}") return channel_id # Fallback to channel ID def format_slack_timestamp(ts): """Convert Slack timestamp to human-readable format in Pacific time""" try: timestamp = float(ts) # Create UTC datetime and then convert to Pacific time dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) # Use ZoneInfo for proper timezone handling, including DST transitions pacific_tz = ZoneInfo("America/Los_Angeles") return dt.astimezone(pacific_tz).strftime("%Y-%m-%d %H:%M:%S %Z") except (ValueError, TypeError): return "Unknown Time" def log_slack_message(slack_client, channel_id, user_id, text, ts, bot_user_id=None): """ Log Slack messages to files organized by date and channel Args: slack_client: The initialized Slack client channel_id (str): Slack channel ID user_id (str): Slack user ID text (str): Message text ts (str): Message timestamp bot_user_id (str, optional): The bot's user ID """ # Create logs directory if it doesn't exist log_dir = Path('./slack-logs') log_dir.mkdir(exist_ok=True) # Get channel name channel_name = get_slack_channel_info(slack_client, channel_id, bot_user_id) # Get user name user_name = get_slack_user_info(slack_client, user_id, bot_user_id) # Format the timestamp for filename and log entry, using Pacific time with ZoneInfo timestamp = float(ts) dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) # Use ZoneInfo for America/Los_Angeles timezone (Pacific Time) pacific_tz = ZoneInfo("America/Los_Angeles") pacific_dt = dt.astimezone(pacific_tz) date_str = pacific_dt.strftime("%Y-%m-%d") time_str = pacific_dt.strftime("%H:%M:%S") # Create filename with date and channel filename = f"{date_str}-{channel_name}-log.txt" log_path = log_dir / filename # Format the log entry log_entry = f"[{date_str} {time_str}] {user_name}: {text}\n" # Write to log file with open(log_path, 'a', encoding='utf-8') as f: f.write(log_entry) def get_recent_channel_history(channel_id, max_messages=100): """ Read recent messages from the channel log file for the current date Args: channel_id (str): Slack channel ID max_messages (int): Maximum number of messages to retrieve Returns: str: Formatted string of recent messages """ try: # Get channel name channels = load_channel_cache() channel_name = channels.get(channel_id, {}).get('name', channel_id) # Get current date in Pacific time pacific_tz = ZoneInfo("America/Los_Angeles") current_date = datetime.now(pacific_tz).strftime("%Y-%m-%d") # Construct log file path log_dir = Path('./slack-logs') log_file = f"{current_date}-{channel_name}-log.txt" log_path = log_dir / log_file # Check if the log file exists if not log_path.exists(): return "No recent channel history available." # Read the log file and extract the most recent messages with open(log_path, 'r', encoding='utf-8') as f: lines = f.readlines() # Get the last max_messages lines recent_messages = lines[-max_messages:] if len(lines) > max_messages else lines # Format the messages as a single string return "".join(recent_messages) except Exception as e: logging.error(f"Error retrieving channel history: {e}") return "Error retrieving channel history." def handle_slack_attachments(slack_client, message, bot_user_id=None): """ Process file attachments in Slack messages and save them to appropriate directories Args: slack_client: The initialized Slack client message (dict): The Slack message object containing file attachments bot_user_id (str, optional): The bot's user ID Returns: bool: True if attachments were handled, False otherwise """ # Check if the message contains files if 'files' not in message: return False channel_id = message.get('channel') user_id = message.get('user') ts = message.get('ts') # Get channel and user info channel_name = get_slack_channel_info(slack_client, channel_id, bot_user_id) user_name = get_slack_user_info(slack_client, user_id, bot_user_id) # Process each file in the message for file_data in message['files']: try: file_type = file_data.get('filetype', '').lower() file_mimetype = file_data.get('mimetype', '') file_url = file_data.get('url_private') or file_data.get('url_private_download') file_name = file_data.get('name', 'unknown_file') if not file_url: logging.warning(f"No download URL found for file: {file_name}") continue # Determine file category and destination directory if file_mimetype.startswith('audio/') or file_type in ['mp3', 'wav', 'ogg', 'm4a']: save_dir = Path('./slack-audio-files') category = 'audio' elif file_mimetype.startswith('image/') or file_type in ['jpg', 'jpeg', 'png', 'gif']: save_dir = Path('./slack-images') category = 'image' else: save_dir = Path('./slack-other-files') category = 'file' # Create the directory if it doesn't exist save_dir.mkdir(parents=True, exist_ok=True) # Format timestamp for filename pacific_tz = ZoneInfo("America/Los_Angeles") timestamp = float(ts) dt = datetime.fromtimestamp(timestamp, tz=pacific_tz) date_str = dt.strftime("%Y-%m-%d") time_str = dt.strftime("%H%M%S") # Create a unique filename that matches your logging format # Format: date-channel-user-time-originalfilename.extension file_extension = os.path.splitext(file_name)[1] save_filename = f"{date_str}-{channel_name}-{user_name.replace(' ', '_')}-{time_str}{file_extension}" save_path = save_dir / save_filename # Download and save the file download_slack_file(slack_client, file_url, save_path) # Log the file save logging.info(f"Saved {category} file from {user_name} in {channel_name}: {save_path}") # Also log to the channel log file that a file was shared log_file_message(slack_client, channel_id, user_id, f"[Shared {category} file: {file_name}]", ts, bot_user_id) except Exception as e: logging.error(f"Error processing file attachment: {e}") return True def download_slack_file(slack_client, file_url, save_path): """ Download a file from Slack using the private URL Args: slack_client: The initialized Slack client (for authentication) file_url (str): The private URL of the file to download save_path (Path): Path where the file should be saved Returns: bool: True if download succeeded, False otherwise """ try: # Get the OAuth token from the slack client for authorization token = slack_client.token # Download the file with authorization headers = {"Authorization": f"Bearer {token}"} response = requests.get(file_url, headers=headers, stream=True) response.raise_for_status() # Save the file with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: logging.error(f"Error downloading file from Slack: {e}") return False def log_file_message(slack_client, channel_id, user_id, text, ts, bot_user_id=None): """ A wrapper around log_slack_message to log file attachments in the same format This uses the existing logging mechanism but with a special message indicating a file was shared """ log_slack_message(slack_client, channel_id, user_id, text, ts, bot_user_id)