# encoding=utf-8 """ S3 Configuration and Service Module Handles S3 credentials and asset retrieval for the MOTM application. """ import os import json import boto3 from botocore.exceptions import ClientError, NoCredentialsError from datetime import datetime, timedelta from typing import Optional, Dict, Any class S3ConfigManager: """Manages S3 configuration settings. Configuration priority (highest to lowest): 1. Environment variables (from Kubernetes secrets) 2. Database settings (admin-configurable via web UI) 3. JSON file (local development fallback) Note: Credentials are ALWAYS from environment variables for security. """ def __init__(self): self.config_file = 's3_config.json' self.config = self.load_config() def load_config(self) -> Dict[str, Any]: """Load S3 configuration from environment variables, database, or file. Priority: 1. Environment variables (for Kubernetes/container deployments) 2. Database settings (admin-configurable via web UI) 3. Configuration file (for local/legacy deployments) """ # Priority 1: Check if running in Kubernetes/container mode with env vars if os.getenv('S3_ENABLED') or os.getenv('S3_ACCESS_KEY_ID'): return self._load_from_env() # Priority 2: Try loading from database db_config = self._load_from_database() if db_config: return db_config # Priority 3: Fall back to file-based configuration if os.path.exists(self.config_file): try: with open(self.config_file, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError): return self.get_default_config() # Default: Disabled, use local static files return self.get_default_config() def _load_from_env(self) -> Dict[str, Any]: """Load S3 configuration from environment variables.""" return { 'aws_access_key_id': os.getenv('S3_ACCESS_KEY_ID', ''), 'aws_secret_access_key': os.getenv('S3_SECRET_ACCESS_KEY', ''), 'aws_region': os.getenv('S3_REGION', 'us-east-1'), 'bucket_name': os.getenv('S3_BUCKET', ''), 'bucket_prefix': os.getenv('S3_BUCKET_PREFIX', 'assets/'), 'enable_s3': os.getenv('S3_ENABLED', 'false').lower() in ('true', '1', 'yes'), 'use_signed_urls': os.getenv('S3_USE_SIGNED_URLS', 'true').lower() in ('true', '1', 'yes'), 'signed_url_expiry': int(os.getenv('S3_SIGNED_URL_EXPIRY', '3600')), 'fallback_to_static': os.getenv('S3_FALLBACK_TO_STATIC', 'true').lower() in ('true', '1', 'yes'), 'storage_provider': os.getenv('S3_STORAGE_PROVIDER', 'aws'), 'minio_endpoint': os.getenv('S3_ENDPOINT', ''), 'minio_use_ssl': os.getenv('S3_USE_SSL', 'true').lower() in ('true', '1', 'yes'), 'endpoint_url': os.getenv('S3_ENDPOINT', '') # For compatibility } def _load_from_database(self) -> Optional[Dict[str, Any]]: """Load S3 configuration from database. Returns None if database is not available or no settings exist. Credentials are always loaded from environment variables. """ try: # Import here to avoid circular dependency from database import get_db_session, S3Settings session = get_db_session() try: # Get settings for admin user settings = session.query(S3Settings).filter_by(userid='admin').first() if not settings: return None # Build configuration from database settings # Credentials ALWAYS come from environment variables return { 'enable_s3': settings.enabled, 'aws_access_key_id': os.getenv('S3_ACCESS_KEY_ID', ''), 'aws_secret_access_key': os.getenv('S3_SECRET_ACCESS_KEY', ''), 'storage_provider': settings.storage_provider, 'aws_region': settings.region, 'bucket_name': settings.bucket_name, 'bucket_prefix': settings.bucket_prefix, 'use_signed_urls': settings.use_signed_urls, 'signed_url_expiry': settings.signed_url_expiry, 'fallback_to_static': settings.fallback_to_static, 'minio_endpoint': settings.endpoint, 'minio_use_ssl': settings.use_ssl, 'endpoint_url': settings.endpoint # For compatibility } finally: session.close() except Exception as e: # Database not available or table doesn't exist yet # This is normal during initial setup return None def get_default_config(self) -> Dict[str, Any]: """Get default S3 configuration.""" return { 'aws_access_key_id': '', 'aws_secret_access_key': '', 'aws_region': 'us-east-1', 'bucket_name': '', 'bucket_prefix': 'assets/', 'enable_s3': False, 'use_signed_urls': True, 'signed_url_expiry': 3600, # 1 hour in seconds 'fallback_to_static': True, 'storage_provider': 'aws', # 'aws' or 'minio' 'minio_endpoint': '', # MinIO endpoint URL 'minio_use_ssl': True # Whether to use SSL for MinIO } def save_config(self, config_data: Dict[str, Any]) -> bool: """Save S3 configuration to database. Note: Credentials are NOT saved to database for security. They should be provided via environment variables. """ try: from database import get_db_session, S3Settings session = get_db_session() try: # Get or create settings for admin user settings = session.query(S3Settings).filter_by(userid='admin').first() if not settings: settings = S3Settings(userid='admin') session.add(settings) # Update settings from config_data # NOTE: We do NOT save credentials to database settings.enabled = config_data.get('enable_s3', False) settings.storage_provider = config_data.get('storage_provider', 'aws') settings.endpoint = config_data.get('minio_endpoint', config_data.get('endpoint_url', '')) settings.region = config_data.get('aws_region', 'us-east-1') settings.bucket_name = config_data.get('bucket_name', '') settings.bucket_prefix = config_data.get('bucket_prefix', 'assets/') settings.use_signed_urls = config_data.get('use_signed_urls', True) settings.signed_url_expiry = config_data.get('signed_url_expiry', 3600) settings.fallback_to_static = config_data.get('fallback_to_static', True) settings.use_ssl = config_data.get('minio_use_ssl', True) session.commit() # Reload config from database self.config = self.load_config() return True except Exception as e: session.rollback() print(f"Error saving S3 config to database: {e}") # Fall back to file-based save for local development return self._save_to_file(config_data) finally: session.close() except Exception as e: print(f"Database not available, falling back to file: {e}") # Fall back to file-based save for local development return self._save_to_file(config_data) def _save_to_file(self, config_data: Dict[str, Any]) -> bool: """Save S3 configuration to JSON file (fallback for local development).""" try: with open(self.config_file, 'w') as f: json.dump(config_data, f, indent=2) self.config = config_data return True except IOError: return False def get_config_dict(self) -> Dict[str, Any]: """Get current configuration as dictionary.""" return self.config.copy() def test_connection(self, config_data: Optional[Dict[str, Any]] = None) -> tuple[bool, str]: """Test S3 connection with provided or current configuration.""" if config_data: test_config = config_data else: test_config = self.config if not test_config.get('enable_s3', False): return True, "S3 is disabled - using local static files" try: # Determine storage provider storage_provider = test_config.get('storage_provider', 'aws') if storage_provider == 'minio': # Create MinIO client minio_endpoint = test_config.get('minio_endpoint', '') if not minio_endpoint: return False, "MinIO endpoint is required when using MinIO" # Parse endpoint to remove protocol if present if minio_endpoint.startswith('http://'): endpoint_host = minio_endpoint[7:] use_ssl = False elif minio_endpoint.startswith('https://'): endpoint_host = minio_endpoint[8:] use_ssl = True else: endpoint_host = minio_endpoint use_ssl = test_config.get('minio_use_ssl', True) # Construct the full endpoint URL endpoint_url = f"{'https' if use_ssl else 'http'}://{endpoint_host}" s3_client = boto3.client( 's3', aws_access_key_id=test_config.get('aws_access_key_id', ''), aws_secret_access_key=test_config.get('aws_secret_access_key', ''), region_name=test_config.get('aws_region', 'us-east-1'), endpoint_url=endpoint_url, use_ssl=use_ssl, verify=True # Enable SSL certificate verification ) provider_name = "MinIO" else: # Create AWS S3 client s3_client = boto3.client( 's3', aws_access_key_id=test_config.get('aws_access_key_id', ''), aws_secret_access_key=test_config.get('aws_secret_access_key', ''), region_name=test_config.get('aws_region', 'us-east-1') ) provider_name = "AWS S3" # Test connection by listing bucket bucket_name = test_config.get('bucket_name', '') if not bucket_name: return False, "Bucket name is required when S3 is enabled" s3_client.head_bucket(Bucket=bucket_name) return True, f"Successfully connected to {provider_name} bucket: {bucket_name}" except ClientError as e: error_code = e.response['Error']['Code'] provider_name = "MinIO" if test_config.get('storage_provider', 'aws') == 'minio' else "AWS S3" if error_code == '404': return False, f"{provider_name} bucket '{bucket_name}' not found" elif error_code == '403': return False, f"Access denied to {provider_name} bucket. Check credentials and permissions" else: return False, f"{provider_name} error: {e.response['Error']['Message']}" except NoCredentialsError: return False, "AWS credentials not found" except Exception as e: provider_name = "MinIO" if test_config.get('storage_provider', 'aws') == 'minio' else "AWS S3" return False, f"{provider_name} connection error: {str(e)}" class S3AssetService: """Service for retrieving assets from S3.""" def __init__(self, config_manager: S3ConfigManager): self.config_manager = config_manager self._s3_client = None def _get_public_url(self, bucket_name: str, s3_key: str) -> str: """Generate public URL for S3 or MinIO.""" config = self.config_manager.config storage_provider = config.get('storage_provider', 'aws') if storage_provider == 'minio': minio_endpoint = config.get('minio_endpoint', '') use_ssl = config.get('minio_use_ssl', True) protocol = 'https' if use_ssl else 'http' # Parse endpoint to remove protocol if present if minio_endpoint.startswith('http://'): minio_endpoint = minio_endpoint[7:] elif minio_endpoint.startswith('https://'): minio_endpoint = minio_endpoint[8:] return f"{protocol}://{minio_endpoint}/{bucket_name}/{s3_key}" else: return f"https://{bucket_name}.s3.{config.get('aws_region', 'us-east-1')}.amazonaws.com/{s3_key}" @property def s3_client(self): """Lazy-loaded S3 client.""" if self._s3_client is None and self.config_manager.config.get('enable_s3', False): config = self.config_manager.config storage_provider = config.get('storage_provider', 'aws') if storage_provider == 'minio': # Create MinIO client minio_endpoint = config.get('minio_endpoint', '') use_ssl = config.get('minio_use_ssl', True) # Parse endpoint to remove protocol if present if minio_endpoint.startswith('http://'): endpoint_host = minio_endpoint[7:] use_ssl = False elif minio_endpoint.startswith('https://'): endpoint_host = minio_endpoint[8:] use_ssl = True else: endpoint_host = minio_endpoint # Construct the full endpoint URL endpoint_url = f"{'https' if use_ssl else 'http'}://{endpoint_host}" self._s3_client = boto3.client( 's3', aws_access_key_id=config.get('aws_access_key_id', ''), aws_secret_access_key=config.get('aws_secret_access_key', ''), region_name=config.get('aws_region', 'us-east-1'), endpoint_url=endpoint_url, use_ssl=use_ssl, verify=True, # Enable SSL certificate verification config=boto3.session.Config( s3={ 'addressing_style': 'path' }, signature_version='s3v4' ) ) else: # Create AWS S3 client self._s3_client = boto3.client( 's3', aws_access_key_id=config.get('aws_access_key_id', ''), aws_secret_access_key=config.get('aws_secret_access_key', ''), region_name=config.get('aws_region', 'us-east-1') ) return self._s3_client def get_logo_url(self, logo_path: str, club_name: str = '') -> str: """Get logo URL from S3 or fallback to static.""" config = self.config_manager.config if not config.get('enable_s3', False): return self._get_static_logo_url(logo_path) bucket_name = config.get('bucket_name', '') bucket_prefix = config.get('bucket_prefix', 'motm-assets/') if not bucket_name: return self._get_static_logo_url(logo_path) # Clean up the logo path if logo_path.startswith('/static/'): s3_key = bucket_prefix + logo_path.replace('/static/', '') elif logo_path.startswith('logos/'): s3_key = bucket_prefix + logo_path else: s3_key = bucket_prefix + 'logos/' + logo_path try: if config.get('use_signed_urls', True): # Generate signed URL expiry = config.get('signed_url_expiry', 3600) signed_url = self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': bucket_name, 'Key': s3_key}, ExpiresIn=expiry ) return signed_url else: # Return public URL return self._get_public_url(bucket_name, s3_key) except ClientError: # Fallback to static if S3 fails if config.get('fallback_to_static', True): return self._get_static_logo_url(logo_path) return '' def get_logo_url_public(self, logo_path: str, club_name: str = '') -> str: """Get logo URL from S3 using public URLs (no authentication).""" config = self.config_manager.config if not config.get('enable_s3', False): return self._get_static_logo_url(logo_path) bucket_name = config.get('bucket_name', '') bucket_prefix = config.get('bucket_prefix', 'assets/') if not bucket_name: return self._get_static_logo_url(logo_path) # Clean up the logo path if logo_path.startswith('/static/'): s3_key = bucket_prefix + logo_path.replace('/static/', '') elif logo_path.startswith('logos/'): s3_key = bucket_prefix + logo_path else: s3_key = bucket_prefix + 'logos/' + logo_path try: # Always return public URL (no authentication) return self._get_public_url(bucket_name, s3_key) except Exception: # Fallback to static if S3 fails if config.get('fallback_to_static', True): return self._get_static_logo_url(logo_path) return '' def get_asset_url(self, asset_path: str) -> str: """Get any asset URL from S3 or fallback to static.""" config = self.config_manager.config if not config.get('enable_s3', False): return f"/static/{asset_path}" bucket_name = config.get('bucket_name', '') bucket_prefix = config.get('bucket_prefix', 'motm-assets/') if not bucket_name: return f"/static/{asset_path}" # Clean up the asset path if asset_path.startswith('/static/'): s3_key = bucket_prefix + asset_path.replace('/static/', '') else: s3_key = bucket_prefix + asset_path try: if config.get('use_signed_urls', True): # Generate signed URL expiry = config.get('signed_url_expiry', 3600) signed_url = self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': bucket_name, 'Key': s3_key}, ExpiresIn=expiry ) return signed_url else: # Return public URL return self._get_public_url(bucket_name, s3_key) except ClientError: # Fallback to static if S3 fails if config.get('fallback_to_static', True): return f"/static/{asset_path}" return '' def get_asset_url_public(self, asset_path: str) -> str: """Get any asset URL from S3 using public URLs (no authentication).""" config = self.config_manager.config if not config.get('enable_s3', False): return f"/static/{asset_path}" bucket_name = config.get('bucket_name', '') bucket_prefix = config.get('bucket_prefix', 'assets/') if not bucket_name: return f"/static/{asset_path}" # Clean up the asset path if asset_path.startswith('/static/'): s3_key = bucket_prefix + asset_path.replace('/static/', '') else: s3_key = bucket_prefix + asset_path try: # Always return public URL (no authentication) return self._get_public_url(bucket_name, s3_key) except Exception: # Fallback to static if S3 fails if config.get('fallback_to_static', True): return f"/static/{asset_path}" return '' def upload_asset(self, file_path: str, s3_key: str) -> tuple[bool, str]: """Upload an asset to S3.""" config = self.config_manager.config if not config.get('enable_s3', False): return False, "S3 is disabled" bucket_name = config.get('bucket_name', '') bucket_prefix = config.get('bucket_prefix', 'motm-assets/') if not bucket_name: return False, "Bucket name not configured" full_s3_key = bucket_prefix + s3_key try: self.s3_client.upload_file(file_path, bucket_name, full_s3_key) return True, f"Successfully uploaded {s3_key} to S3" except ClientError as e: return False, f"Upload failed: {e.response['Error']['Message']}" except Exception as e: return False, f"Upload error: {str(e)}" def _get_static_logo_url(self, logo_path: str) -> str: """Get static logo URL as fallback.""" if logo_path.startswith('/static/'): return logo_path return f"/static/images/{logo_path}" # Global instances s3_config_manager = S3ConfigManager() s3_asset_service = S3AssetService(s3_config_manager)