528 lines
22 KiB
Python
528 lines
22 KiB
Python
# encoding=utf-8
|
|
"""
|
|
S3 Configuration and Service Module
|
|
Handles S3 credentials and asset retrieval for the MOTM application.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import boto3
|
|
from botocore.exceptions import ClientError, NoCredentialsError
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
|
|
|
|
class S3ConfigManager:
|
|
"""Manages S3 configuration settings.
|
|
|
|
Configuration priority (highest to lowest):
|
|
1. Environment variables (from Kubernetes secrets)
|
|
2. Database settings (admin-configurable via web UI)
|
|
3. JSON file (local development fallback)
|
|
|
|
Note: Credentials are ALWAYS from environment variables for security.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.config_file = 's3_config.json'
|
|
self.config = self.load_config()
|
|
|
|
def load_config(self) -> Dict[str, Any]:
|
|
"""Load S3 configuration from environment variables, database, or file.
|
|
|
|
Priority:
|
|
1. Environment variables (for Kubernetes/container deployments)
|
|
2. Database settings (admin-configurable via web UI)
|
|
3. Configuration file (for local/legacy deployments)
|
|
"""
|
|
# Priority 1: Check if running in Kubernetes/container mode with env vars
|
|
if os.getenv('S3_ENABLED') or os.getenv('S3_ACCESS_KEY_ID'):
|
|
return self._load_from_env()
|
|
|
|
# Priority 2: Try loading from database
|
|
db_config = self._load_from_database()
|
|
if db_config:
|
|
return db_config
|
|
|
|
# Priority 3: Fall back to file-based configuration
|
|
if os.path.exists(self.config_file):
|
|
try:
|
|
with open(self.config_file, 'r') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return self.get_default_config()
|
|
|
|
# Default: Disabled, use local static files
|
|
return self.get_default_config()
|
|
|
|
def _load_from_env(self) -> Dict[str, Any]:
|
|
"""Load S3 configuration from environment variables."""
|
|
return {
|
|
'aws_access_key_id': os.getenv('S3_ACCESS_KEY_ID', ''),
|
|
'aws_secret_access_key': os.getenv('S3_SECRET_ACCESS_KEY', ''),
|
|
'aws_region': os.getenv('S3_REGION', 'us-east-1'),
|
|
'bucket_name': os.getenv('S3_BUCKET', ''),
|
|
'bucket_prefix': os.getenv('S3_BUCKET_PREFIX', 'assets/'),
|
|
'enable_s3': os.getenv('S3_ENABLED', 'false').lower() in ('true', '1', 'yes'),
|
|
'use_signed_urls': os.getenv('S3_USE_SIGNED_URLS', 'true').lower() in ('true', '1', 'yes'),
|
|
'signed_url_expiry': int(os.getenv('S3_SIGNED_URL_EXPIRY', '3600')),
|
|
'fallback_to_static': os.getenv('S3_FALLBACK_TO_STATIC', 'true').lower() in ('true', '1', 'yes'),
|
|
'storage_provider': os.getenv('S3_STORAGE_PROVIDER', 'aws'),
|
|
'minio_endpoint': os.getenv('S3_ENDPOINT', ''),
|
|
'minio_use_ssl': os.getenv('S3_USE_SSL', 'true').lower() in ('true', '1', 'yes'),
|
|
'endpoint_url': os.getenv('S3_ENDPOINT', '') # For compatibility
|
|
}
|
|
|
|
def _load_from_database(self) -> Optional[Dict[str, Any]]:
|
|
"""Load S3 configuration from database.
|
|
|
|
Returns None if database is not available or no settings exist.
|
|
Credentials are always loaded from environment variables.
|
|
"""
|
|
try:
|
|
# Import here to avoid circular dependency
|
|
from database import get_db_session, S3Settings
|
|
|
|
session = get_db_session()
|
|
try:
|
|
# Get settings for admin user
|
|
settings = session.query(S3Settings).filter_by(userid='admin').first()
|
|
|
|
if not settings:
|
|
return None
|
|
|
|
# Build configuration from database settings
|
|
# Credentials ALWAYS come from environment variables
|
|
return {
|
|
'enable_s3': settings.enabled,
|
|
'aws_access_key_id': os.getenv('S3_ACCESS_KEY_ID', ''),
|
|
'aws_secret_access_key': os.getenv('S3_SECRET_ACCESS_KEY', ''),
|
|
'storage_provider': settings.storage_provider,
|
|
'aws_region': settings.region,
|
|
'bucket_name': settings.bucket_name,
|
|
'bucket_prefix': settings.bucket_prefix,
|
|
'use_signed_urls': settings.use_signed_urls,
|
|
'signed_url_expiry': settings.signed_url_expiry,
|
|
'fallback_to_static': settings.fallback_to_static,
|
|
'minio_endpoint': settings.endpoint,
|
|
'minio_use_ssl': settings.use_ssl,
|
|
'endpoint_url': settings.endpoint # For compatibility
|
|
}
|
|
finally:
|
|
session.close()
|
|
|
|
except Exception as e:
|
|
# Database not available or table doesn't exist yet
|
|
# This is normal during initial setup
|
|
return None
|
|
|
|
def get_default_config(self) -> Dict[str, Any]:
|
|
"""Get default S3 configuration."""
|
|
return {
|
|
'aws_access_key_id': '',
|
|
'aws_secret_access_key': '',
|
|
'aws_region': 'us-east-1',
|
|
'bucket_name': '',
|
|
'bucket_prefix': 'assets/',
|
|
'enable_s3': False,
|
|
'use_signed_urls': True,
|
|
'signed_url_expiry': 3600, # 1 hour in seconds
|
|
'fallback_to_static': True,
|
|
'storage_provider': 'aws', # 'aws' or 'minio'
|
|
'minio_endpoint': '', # MinIO endpoint URL
|
|
'minio_use_ssl': True # Whether to use SSL for MinIO
|
|
}
|
|
|
|
def save_config(self, config_data: Dict[str, Any]) -> bool:
|
|
"""Save S3 configuration to database.
|
|
|
|
Note: Credentials are NOT saved to database for security.
|
|
They should be provided via environment variables.
|
|
"""
|
|
try:
|
|
from database import get_db_session, S3Settings
|
|
|
|
session = get_db_session()
|
|
try:
|
|
# Get or create settings for admin user
|
|
settings = session.query(S3Settings).filter_by(userid='admin').first()
|
|
|
|
if not settings:
|
|
settings = S3Settings(userid='admin')
|
|
session.add(settings)
|
|
|
|
# Update settings from config_data
|
|
# NOTE: We do NOT save credentials to database
|
|
settings.enabled = config_data.get('enable_s3', False)
|
|
settings.storage_provider = config_data.get('storage_provider', 'aws')
|
|
settings.endpoint = config_data.get('minio_endpoint', config_data.get('endpoint_url', ''))
|
|
settings.region = config_data.get('aws_region', 'us-east-1')
|
|
settings.bucket_name = config_data.get('bucket_name', '')
|
|
settings.bucket_prefix = config_data.get('bucket_prefix', 'assets/')
|
|
settings.use_signed_urls = config_data.get('use_signed_urls', True)
|
|
settings.signed_url_expiry = config_data.get('signed_url_expiry', 3600)
|
|
settings.fallback_to_static = config_data.get('fallback_to_static', True)
|
|
settings.use_ssl = config_data.get('minio_use_ssl', True)
|
|
|
|
session.commit()
|
|
|
|
# Reload config from database
|
|
self.config = self.load_config()
|
|
return True
|
|
|
|
except Exception as e:
|
|
session.rollback()
|
|
print(f"Error saving S3 config to database: {e}")
|
|
# Fall back to file-based save for local development
|
|
return self._save_to_file(config_data)
|
|
finally:
|
|
session.close()
|
|
|
|
except Exception as e:
|
|
print(f"Database not available, falling back to file: {e}")
|
|
# Fall back to file-based save for local development
|
|
return self._save_to_file(config_data)
|
|
|
|
def _save_to_file(self, config_data: Dict[str, Any]) -> bool:
|
|
"""Save S3 configuration to JSON file (fallback for local development)."""
|
|
try:
|
|
with open(self.config_file, 'w') as f:
|
|
json.dump(config_data, f, indent=2)
|
|
self.config = config_data
|
|
return True
|
|
except IOError:
|
|
return False
|
|
|
|
def get_config_dict(self) -> Dict[str, Any]:
|
|
"""Get current configuration as dictionary."""
|
|
return self.config.copy()
|
|
|
|
def test_connection(self, config_data: Optional[Dict[str, Any]] = None) -> tuple[bool, str]:
|
|
"""Test S3 connection with provided or current configuration."""
|
|
if config_data:
|
|
test_config = config_data
|
|
else:
|
|
test_config = self.config
|
|
|
|
if not test_config.get('enable_s3', False):
|
|
return True, "S3 is disabled - using local static files"
|
|
|
|
try:
|
|
# Determine storage provider
|
|
storage_provider = test_config.get('storage_provider', 'aws')
|
|
|
|
if storage_provider == 'minio':
|
|
# Create MinIO client
|
|
minio_endpoint = test_config.get('minio_endpoint', '')
|
|
if not minio_endpoint:
|
|
return False, "MinIO endpoint is required when using MinIO"
|
|
|
|
# Parse endpoint to remove protocol if present
|
|
if minio_endpoint.startswith('http://'):
|
|
endpoint_host = minio_endpoint[7:]
|
|
use_ssl = False
|
|
elif minio_endpoint.startswith('https://'):
|
|
endpoint_host = minio_endpoint[8:]
|
|
use_ssl = True
|
|
else:
|
|
endpoint_host = minio_endpoint
|
|
use_ssl = test_config.get('minio_use_ssl', True)
|
|
|
|
# Construct the full endpoint URL
|
|
endpoint_url = f"{'https' if use_ssl else 'http'}://{endpoint_host}"
|
|
|
|
s3_client = boto3.client(
|
|
's3',
|
|
aws_access_key_id=test_config.get('aws_access_key_id', ''),
|
|
aws_secret_access_key=test_config.get('aws_secret_access_key', ''),
|
|
region_name=test_config.get('aws_region', 'us-east-1'),
|
|
endpoint_url=endpoint_url,
|
|
use_ssl=use_ssl,
|
|
verify=True # Enable SSL certificate verification
|
|
)
|
|
provider_name = "MinIO"
|
|
else:
|
|
# Create AWS S3 client
|
|
s3_client = boto3.client(
|
|
's3',
|
|
aws_access_key_id=test_config.get('aws_access_key_id', ''),
|
|
aws_secret_access_key=test_config.get('aws_secret_access_key', ''),
|
|
region_name=test_config.get('aws_region', 'us-east-1')
|
|
)
|
|
provider_name = "AWS S3"
|
|
|
|
# Test connection by listing bucket
|
|
bucket_name = test_config.get('bucket_name', '')
|
|
if not bucket_name:
|
|
return False, "Bucket name is required when S3 is enabled"
|
|
|
|
s3_client.head_bucket(Bucket=bucket_name)
|
|
return True, f"Successfully connected to {provider_name} bucket: {bucket_name}"
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
provider_name = "MinIO" if test_config.get('storage_provider', 'aws') == 'minio' else "AWS S3"
|
|
if error_code == '404':
|
|
return False, f"{provider_name} bucket '{bucket_name}' not found"
|
|
elif error_code == '403':
|
|
return False, f"Access denied to {provider_name} bucket. Check credentials and permissions"
|
|
else:
|
|
return False, f"{provider_name} error: {e.response['Error']['Message']}"
|
|
except NoCredentialsError:
|
|
return False, "AWS credentials not found"
|
|
except Exception as e:
|
|
provider_name = "MinIO" if test_config.get('storage_provider', 'aws') == 'minio' else "AWS S3"
|
|
return False, f"{provider_name} connection error: {str(e)}"
|
|
|
|
|
|
class S3AssetService:
|
|
"""Service for retrieving assets from S3."""
|
|
|
|
def __init__(self, config_manager: S3ConfigManager):
|
|
self.config_manager = config_manager
|
|
self._s3_client = None
|
|
|
|
def _get_public_url(self, bucket_name: str, s3_key: str) -> str:
|
|
"""Generate public URL for S3 or MinIO."""
|
|
config = self.config_manager.config
|
|
storage_provider = config.get('storage_provider', 'aws')
|
|
|
|
if storage_provider == 'minio':
|
|
minio_endpoint = config.get('minio_endpoint', '')
|
|
use_ssl = config.get('minio_use_ssl', True)
|
|
protocol = 'https' if use_ssl else 'http'
|
|
|
|
# Parse endpoint to remove protocol if present
|
|
if minio_endpoint.startswith('http://'):
|
|
minio_endpoint = minio_endpoint[7:]
|
|
elif minio_endpoint.startswith('https://'):
|
|
minio_endpoint = minio_endpoint[8:]
|
|
|
|
return f"{protocol}://{minio_endpoint}/{bucket_name}/{s3_key}"
|
|
else:
|
|
return f"https://{bucket_name}.s3.{config.get('aws_region', 'us-east-1')}.amazonaws.com/{s3_key}"
|
|
|
|
@property
|
|
def s3_client(self):
|
|
"""Lazy-loaded S3 client."""
|
|
if self._s3_client is None and self.config_manager.config.get('enable_s3', False):
|
|
config = self.config_manager.config
|
|
storage_provider = config.get('storage_provider', 'aws')
|
|
|
|
if storage_provider == 'minio':
|
|
# Create MinIO client
|
|
minio_endpoint = config.get('minio_endpoint', '')
|
|
use_ssl = config.get('minio_use_ssl', True)
|
|
|
|
# Parse endpoint to remove protocol if present
|
|
if minio_endpoint.startswith('http://'):
|
|
endpoint_host = minio_endpoint[7:]
|
|
use_ssl = False
|
|
elif minio_endpoint.startswith('https://'):
|
|
endpoint_host = minio_endpoint[8:]
|
|
use_ssl = True
|
|
else:
|
|
endpoint_host = minio_endpoint
|
|
|
|
# Construct the full endpoint URL
|
|
endpoint_url = f"{'https' if use_ssl else 'http'}://{endpoint_host}"
|
|
|
|
self._s3_client = boto3.client(
|
|
's3',
|
|
aws_access_key_id=config.get('aws_access_key_id', ''),
|
|
aws_secret_access_key=config.get('aws_secret_access_key', ''),
|
|
region_name=config.get('aws_region', 'us-east-1'),
|
|
endpoint_url=endpoint_url,
|
|
use_ssl=use_ssl,
|
|
verify=True, # Enable SSL certificate verification
|
|
config=boto3.session.Config(
|
|
s3={
|
|
'addressing_style': 'path'
|
|
},
|
|
signature_version='s3v4'
|
|
)
|
|
)
|
|
else:
|
|
# Create AWS S3 client
|
|
self._s3_client = boto3.client(
|
|
's3',
|
|
aws_access_key_id=config.get('aws_access_key_id', ''),
|
|
aws_secret_access_key=config.get('aws_secret_access_key', ''),
|
|
region_name=config.get('aws_region', 'us-east-1')
|
|
)
|
|
return self._s3_client
|
|
|
|
def get_logo_url(self, logo_path: str, club_name: str = '') -> str:
|
|
"""Get logo URL from S3 or fallback to static."""
|
|
config = self.config_manager.config
|
|
|
|
if not config.get('enable_s3', False):
|
|
return self._get_static_logo_url(logo_path)
|
|
|
|
bucket_name = config.get('bucket_name', '')
|
|
bucket_prefix = config.get('bucket_prefix', 'motm-assets/')
|
|
|
|
if not bucket_name:
|
|
return self._get_static_logo_url(logo_path)
|
|
|
|
# Clean up the logo path
|
|
if logo_path.startswith('/static/'):
|
|
s3_key = bucket_prefix + logo_path.replace('/static/', '')
|
|
elif logo_path.startswith('logos/'):
|
|
s3_key = bucket_prefix + logo_path
|
|
else:
|
|
s3_key = bucket_prefix + 'logos/' + logo_path
|
|
|
|
try:
|
|
if config.get('use_signed_urls', True):
|
|
# Generate signed URL
|
|
expiry = config.get('signed_url_expiry', 3600)
|
|
signed_url = self.s3_client.generate_presigned_url(
|
|
'get_object',
|
|
Params={'Bucket': bucket_name, 'Key': s3_key},
|
|
ExpiresIn=expiry
|
|
)
|
|
return signed_url
|
|
else:
|
|
# Return public URL
|
|
return self._get_public_url(bucket_name, s3_key)
|
|
|
|
except ClientError:
|
|
# Fallback to static if S3 fails
|
|
if config.get('fallback_to_static', True):
|
|
return self._get_static_logo_url(logo_path)
|
|
return ''
|
|
|
|
def get_logo_url_public(self, logo_path: str, club_name: str = '') -> str:
|
|
"""Get logo URL from S3 using public URLs (no authentication)."""
|
|
config = self.config_manager.config
|
|
|
|
if not config.get('enable_s3', False):
|
|
return self._get_static_logo_url(logo_path)
|
|
|
|
bucket_name = config.get('bucket_name', '')
|
|
bucket_prefix = config.get('bucket_prefix', 'assets/')
|
|
|
|
if not bucket_name:
|
|
return self._get_static_logo_url(logo_path)
|
|
|
|
# Clean up the logo path
|
|
if logo_path.startswith('/static/'):
|
|
s3_key = bucket_prefix + logo_path.replace('/static/', '')
|
|
elif logo_path.startswith('logos/'):
|
|
s3_key = bucket_prefix + logo_path
|
|
else:
|
|
s3_key = bucket_prefix + 'logos/' + logo_path
|
|
|
|
try:
|
|
# Always return public URL (no authentication)
|
|
return self._get_public_url(bucket_name, s3_key)
|
|
|
|
except Exception:
|
|
# Fallback to static if S3 fails
|
|
if config.get('fallback_to_static', True):
|
|
return self._get_static_logo_url(logo_path)
|
|
return ''
|
|
|
|
def get_asset_url(self, asset_path: str) -> str:
|
|
"""Get any asset URL from S3 or fallback to static."""
|
|
config = self.config_manager.config
|
|
|
|
if not config.get('enable_s3', False):
|
|
return f"/static/{asset_path}"
|
|
|
|
bucket_name = config.get('bucket_name', '')
|
|
bucket_prefix = config.get('bucket_prefix', 'motm-assets/')
|
|
|
|
if not bucket_name:
|
|
return f"/static/{asset_path}"
|
|
|
|
# Clean up the asset path
|
|
if asset_path.startswith('/static/'):
|
|
s3_key = bucket_prefix + asset_path.replace('/static/', '')
|
|
else:
|
|
s3_key = bucket_prefix + asset_path
|
|
|
|
try:
|
|
if config.get('use_signed_urls', True):
|
|
# Generate signed URL
|
|
expiry = config.get('signed_url_expiry', 3600)
|
|
signed_url = self.s3_client.generate_presigned_url(
|
|
'get_object',
|
|
Params={'Bucket': bucket_name, 'Key': s3_key},
|
|
ExpiresIn=expiry
|
|
)
|
|
return signed_url
|
|
else:
|
|
# Return public URL
|
|
return self._get_public_url(bucket_name, s3_key)
|
|
|
|
except ClientError:
|
|
# Fallback to static if S3 fails
|
|
if config.get('fallback_to_static', True):
|
|
return f"/static/{asset_path}"
|
|
return ''
|
|
|
|
def get_asset_url_public(self, asset_path: str) -> str:
|
|
"""Get any asset URL from S3 using public URLs (no authentication)."""
|
|
config = self.config_manager.config
|
|
|
|
if not config.get('enable_s3', False):
|
|
return f"/static/{asset_path}"
|
|
|
|
bucket_name = config.get('bucket_name', '')
|
|
bucket_prefix = config.get('bucket_prefix', 'assets/')
|
|
|
|
if not bucket_name:
|
|
return f"/static/{asset_path}"
|
|
|
|
# Clean up the asset path
|
|
if asset_path.startswith('/static/'):
|
|
s3_key = bucket_prefix + asset_path.replace('/static/', '')
|
|
else:
|
|
s3_key = bucket_prefix + asset_path
|
|
|
|
try:
|
|
# Always return public URL (no authentication)
|
|
return self._get_public_url(bucket_name, s3_key)
|
|
|
|
except Exception:
|
|
# Fallback to static if S3 fails
|
|
if config.get('fallback_to_static', True):
|
|
return f"/static/{asset_path}"
|
|
return ''
|
|
|
|
def upload_asset(self, file_path: str, s3_key: str) -> tuple[bool, str]:
|
|
"""Upload an asset to S3."""
|
|
config = self.config_manager.config
|
|
|
|
if not config.get('enable_s3', False):
|
|
return False, "S3 is disabled"
|
|
|
|
bucket_name = config.get('bucket_name', '')
|
|
bucket_prefix = config.get('bucket_prefix', 'motm-assets/')
|
|
|
|
if not bucket_name:
|
|
return False, "Bucket name not configured"
|
|
|
|
full_s3_key = bucket_prefix + s3_key
|
|
|
|
try:
|
|
self.s3_client.upload_file(file_path, bucket_name, full_s3_key)
|
|
return True, f"Successfully uploaded {s3_key} to S3"
|
|
except ClientError as e:
|
|
return False, f"Upload failed: {e.response['Error']['Message']}"
|
|
except Exception as e:
|
|
return False, f"Upload error: {str(e)}"
|
|
|
|
def _get_static_logo_url(self, logo_path: str) -> str:
|
|
"""Get static logo URL as fallback."""
|
|
if logo_path.startswith('/static/'):
|
|
return logo_path
|
|
return f"/static/images/{logo_path}"
|
|
|
|
|
|
# Global instances
|
|
s3_config_manager = S3ConfigManager()
|
|
s3_asset_service = S3AssetService(s3_config_manager)
|