Skip to content

Log Analysis and Management

This guide covers comprehensive log management, analysis techniques, and troubleshooting strategies using logs for the Studio Platform.

📊 Logging Architecture

Log Categories

  • Application Logs - Application events, errors, and performance data
  • Security Logs - Authentication, authorization, and security events
  • System Logs - Infrastructure and system-level events
  • Audit Logs - Compliance and audit trail information
  • Performance Logs - Response times, resource usage, and bottlenecks

Log Levels

# Python logging levels
DEBUG = 10      # Detailed information for debugging
INFO = 20       # General information about system events
WARNING = 30    # Warning about potential issues
ERROR = 40      # Error events that may affect functionality
CRITICAL = 50   # Critical errors that may cause system failure

🔧 Log Configuration

Django Logging Setup

# settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'simple': {
            'format': '{levelname} {message}',
            'style': '{',
        },
        'json': {
            'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "%(module)s", "message": "%(message)s", "request_id": "%(request_id)s"}',
            'style': '{',
        },
    },
    'filters': {
        'request_id': {
            '()': 'log_request_id.RequestIdFilter',
        },
    },
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/app/logs/studio.log',
            'maxBytes': 10485760,  # 10MB
            'backupCount': 5,
            'formatter': 'verbose',
            'filters': ['request_id'],
        },
        'error_file': {
            'level': 'ERROR',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/app/logs/studio_error.log',
            'maxBytes': 10485760,  # 10MB
            'backupCount': 10,
            'formatter': 'verbose',
            'filters': ['request_id'],
        },
        'security_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/app/logs/security.log',
            'maxBytes': 10485760,  # 10MB
            'backupCount': 20,
            'formatter': 'json',
            'filters': ['request_id'],
        },
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'simple',
        },
        'syslog': {
            'level': 'INFO',
            'class': 'logging.handlers.SysLogHandler',
            'address': ('localhost', 514),
            'facility': 'local0',
            'formatter': 'json',
        },
    },
    'loggers': {
        'django': {
            'handlers': ['file', 'console'],
            'level': 'INFO',
            'propagate': False,
        },
        'studio': {
            'handlers': ['file', 'error_file', 'console'],
            'level': 'DEBUG',
            'propagate': False,
        },
        'security': {
            'handlers': ['security_file', 'console'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery': {
            'handlers': ['file', 'console'],
            'level': 'INFO',
            'propagate': False,
        },
    },
    'root': {
        'handlers': ['console'],
        'level': 'WARNING',
    },
}

Request ID Middleware

# middleware.py
import uuid
import logging

class RequestIdMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        request_id = str(uuid.uuid4())
        request.request_id = request_id

        # Add request ID to all log records
        old_factory = logging.getLogRecordFactory()

        def record_factory(*args, **kwargs):
            record = old_factory(*args, **kwargs)
            record.request_id = getattr(request, 'request_id', None)
            return record

        logging.setLogRecordFactory(record_factory)

        response = self.get_response(request)

        # Add request ID to response headers
        response['X-Request-ID'] = request_id

        # Restore old factory
        logging.setLogRecordFactory(old_factory)

        return response

🔍 Log Analysis Techniques

Real-time Log Monitoring

# log_monitor.py
import re
import time
from collections import defaultdict, deque
from datetime import datetime, timedelta

class LogMonitor:
    def __init__(self, log_file='/app/logs/studio.log'):
        self.log_file = log_file
        self.patterns = {
            'error': re.compile(r'ERROR'),
            'critical': re.compile(r'CRITICAL'),
            'slow_request': re.compile(r'response_time.*>(\d+)ms'),
            'database_error': re.compile(r'database.*error'),
            'authentication_failure': re.compile(r'login.*failed'),
            'api_error': re.compile(r'API.*error'),
        }
        self.alerts = deque(maxlen=1000)
        self.stats = defaultdict(int)

    def monitor_logs(self, interval=60):
        """Monitor logs in real-time"""
        last_position = 0

        while True:
            try:
                with open(self.log_file, 'r') as f:
                    f.seek(last_position)
                    new_lines = f.readlines()
                    last_position = f.tell()

                for line in new_lines:
                    self.analyze_line(line.strip())

                self.check_alerts()
                time.sleep(interval)

            except Exception as e:
                print(f"Error monitoring logs: {e}")
                time.sleep(interval)

    def analyze_line(self, line):
        """Analyze individual log line"""
        timestamp = self.extract_timestamp(line)

        for pattern_name, pattern in self.patterns.items():
            if pattern.search(line):
                self.stats[pattern_name] += 1

                alert = {
                    'timestamp': timestamp,
                    'type': pattern_name,
                    'message': line,
                    'severity': self.get_severity(pattern_name)
                }

                self.alerts.append(alert)

                # Immediate alert for critical issues
                if alert['severity'] == 'critical':
                    self.send_immediate_alert(alert)

    def extract_timestamp(self, line):
        """Extract timestamp from log line"""
        try:
            # Assuming format: 2024-01-01 12:00:00
            timestamp_str = line.split()[0] + ' ' + line.split()[1]
            return datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
        except:
            return datetime.now()

    def get_severity(self, pattern_name):
        severity_map = {
            'error': 'warning',
            'critical': 'critical',
            'slow_request': 'info',
            'database_error': 'warning',
            'authentication_failure': 'warning',
            'api_error': 'warning'
        }
        return severity_map.get(pattern_name, 'info')

    def check_alerts(self):
        """Check for alert conditions"""
        recent_alerts = [
            alert for alert in self.alerts 
            if datetime.now() - alert['timestamp'] < timedelta(minutes=5)
        ]

        # Check for error rate spike
        error_count = len([a for a in recent_alerts if a['type'] in ['error', 'critical']])
        if error_count > 10:
            self.send_alert('High error rate detected', 'warning')

        # Check for authentication failures
        auth_failures = len([a for a in recent_alerts if a['type'] == 'authentication_failure'])
        if auth_failures > 5:
            self.send_alert('Multiple authentication failures detected', 'warning')

    def send_immediate_alert(self, alert):
        """Send immediate alert for critical issues"""
        message = f"CRITICAL: {alert['message']}"
        self.send_alert(message, 'critical')

    def send_alert(self, message, severity):
        """Send alert to monitoring system"""
        print(f"ALERT [{severity.upper()}]: {message}")
        # Integration with alerting system (Slack, email, etc.)
        # self.alerting_system.send_alert(message, severity)

Log Analysis Scripts

#!/bin/bash
# log_analysis.sh

# Function to count error types by hour
count_errors_by_hour() {
    local log_file=$1
    local date=$2

    echo "Error count for $date:"
    grep "$date" "$log_file" | grep -E "(ERROR|CRITICAL)" | \
    awk '{print $2}' | cut -d: -f1 | sort | uniq -c | sort -nr
}

# Function to find slow requests
find_slow_requests() {
    local log_file=$1
    local threshold=$2

    echo "Requests slower than ${threshold}ms:"
    grep "response_time.*>$threshold" "$log_file" | \
    awk '{print $7, $8}' | sort | uniq -c | sort -nr | head -10
}

# Function to analyze authentication patterns
analyze_auth_patterns() {
    local log_file=$1

    echo "Authentication Analysis:"
    echo "Failed logins:"
    grep "login.*failed" "$log_file" | awk '{print $1, $2}' | sort | uniq -c | sort -nr | head -10

    echo "Successful logins:"
    grep "login.*success" "$log_file" | awk '{print $1, $2}' | sort | uniq -c | sort -nr | head -10
}

# Function to check for database errors
check_database_errors() {
    local log_file=$1

    echo "Database Errors:"
    grep -i "database.*error" "$log_file" | tail -20
}

# Usage examples
if [ "$1" == "errors" ]; then
    count_errors_by_hour "/app/logs/studio.log" "$(date +%Y-%m-%d)"
elif [ "$1" == "slow" ]; then
    find_slow_requests "/app/logs/studio.log" "${2:-1000}"
elif [ "$1" == "auth" ]; then
    analyze_auth_patterns "/app/logs/studio.log"
elif [ "$1" == "db" ]; then
    check_database_errors "/app/logs/studio.log"
else
    echo "Usage: $0 {errors|slow|auth|db} [threshold]"
fi

Python Log Analysis

# log_analyzer.py
import re
import json
from datetime import datetime, timedelta
from collections import Counter, defaultdict
import pandas as pd
import matplotlib.pyplot as plt

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.logs = []
        self.parse_logs()

    def parse_logs(self):
        """Parse log file into structured data"""
        with open(self.log_file, 'r') as f:
            for line in f:
                try:
                    log_entry = self.parse_log_line(line.strip())
                    if log_entry:
                        self.logs.append(log_entry)
                except Exception as e:
                    print(f"Error parsing line: {e}")

    def parse_log_line(self, line):
        """Parse individual log line"""
        # Assuming format: LEVEL timestamp module message
        parts = line.split(' ', 3)
        if len(parts) < 4:
            return None

        return {
            'level': parts[0],
            'timestamp': parts[1] + ' ' + parts[2],
            'module': parts[3].split()[0] if parts[3] else '',
            'message': ' '.join(parts[3].split()[1:]) if len(parts[3].split()) > 1 else '',
            'raw': line
        }

    def get_error_trends(self, days=7):
        """Analyze error trends over time"""
        cutoff_date = datetime.now() - timedelta(days=days)
        recent_logs = [log for log in self.logs 
                      if datetime.strptime(log['timestamp'], '%Y-%m-%d %H:%M:%S') > cutoff_date]

        errors = [log for log in recent_logs if log['level'] in ['ERROR', 'CRITICAL']]

        # Group by hour
        hourly_errors = defaultdict(int)
        for error in errors:
            hour = datetime.strptime(error['timestamp'], '%Y-%m-%d %H:%M:%S').replace(minute=0, second=0)
            hourly_errors[hour] += 1

        return dict(hourly_errors)

    def get_top_errors(self, limit=10):
        """Get most common error messages"""
        errors = [log for log in self.logs if log['level'] in ['ERROR', 'CRITICAL']]
        error_messages = [log['message'] for log in errors]

        return Counter(error_messages).most_common(limit)

    def get_slow_requests(self, threshold=1000):
        """Find slow requests from logs"""
        slow_requests = []

        for log in self.logs:
            if 'response_time' in log['message']:
                # Extract response time
                match = re.search(r'response_time.*?(\d+)ms', log['message'])
                if match and int(match.group(1)) > threshold:
                    slow_requests.append({
                        'timestamp': log['timestamp'],
                        'response_time': int(match.group(1)),
                        'message': log['message']
                    })

        return sorted(slow_requests, key=lambda x: x['response_time'], reverse=True)

    def get_authentication_patterns(self):
        """Analyze authentication patterns"""
        auth_logs = [log for log in self.logs if 'login' in log['message'].lower()]

        successful_logins = []
        failed_logins = []

        for log in auth_logs:
            if 'success' in log['message'].lower():
                successful_logins.append(log)
            elif 'failed' in log['message'].lower():
                failed_logins.append(log)

        return {
            'successful': len(successful_logins),
            'failed': len(failed_logins),
            'success_rate': len(successful_logins) / (len(successful_logins) + len(failed_logins)) * 100 if (len(successful_logins) + len(failed_logins)) > 0 else 0,
            'recent_failures': [log for log in failed_logins if datetime.strptime(log['timestamp'], '%Y-%m-%d %H:%M:%S') > datetime.now() - timedelta(hours=24)]
        }

    def generate_report(self):
        """Generate comprehensive log analysis report"""
        report = {
            'analysis_date': datetime.now().isoformat(),
            'total_logs': len(self.logs),
            'error_trends': self.get_error_trends(),
            'top_errors': self.get_top_errors(),
            'slow_requests': self.get_slow_requests(),
            'authentication_patterns': self.get_authentication_patterns()
        }

        return report

    def export_to_csv(self, filename):
        """Export logs to CSV for analysis"""
        df = pd.DataFrame(self.logs)
        df.to_csv(filename, index=False)

    def plot_error_trends(self):
        """Plot error trends over time"""
        trends = self.get_error_trends()

        if not trends:
            print("No error data to plot")
            return

        dates = list(trends.keys())
        counts = list(trends.values())

        plt.figure(figsize=(12, 6))
        plt.plot(dates, counts, marker='o')
        plt.title('Error Trends Over Time')
        plt.xlabel('Time')
        plt.ylabel('Error Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

🚨 Log-Based Troubleshooting

Common Log Patterns and Solutions

Database Connection Errors

# Troubleshoot database issues from logs
def troubleshoot_database_logs(log_file):
    """Analyze database-related log entries"""

    with open(log_file, 'r') as f:
        logs = f.readlines()

    db_errors = []
    connection_issues = []
    slow_queries = []

    for log in logs:
        if 'database' in log.lower() and 'error' in log.lower():
            db_errors.append(log.strip())
        elif 'connection' in log.lower() and 'failed' in log.lower():
            connection_issues.append(log.strip())
        elif 'slow query' in log.lower() or 'query time' in log.lower():
            slow_queries.append(log.strip())

    print(f"Database Errors Found: {len(db_errors)}")
    for error in db_errors[-5:]:  # Show last 5 errors
        print(f"  - {error}")

    print(f"\nConnection Issues: {len(connection_issues)}")
    for issue in connection_issues[-3:]:  # Show last 3 issues
        print(f"  - {issue}")

    print(f"\nSlow Queries: {len(slow_queries)}")
    for query in slow_queries[-3:]:  # Show last 3 slow queries
        print(f"  - {query}")

    # Provide recommendations
    if len(db_errors) > 10:
        print("\nRecommendation: Check database configuration and connectivity")

    if len(connection_issues) > 5:
        print("\nRecommendation: Review connection pool settings and database availability")

    if len(slow_queries) > 10:
        print("\nRecommendation: Optimize slow queries and add appropriate indexes")

Authentication Issues

# Troubleshoot authentication from logs
def troubleshoot_auth_logs(log_file):
    """Analyze authentication-related log entries"""

    with open(log_file, 'r') as f:
        logs = f.readlines()

    failed_logins = []
    successful_logins = []
    suspicious_activity = []

    for log in logs:
        if 'login' in log.lower():
            if 'failed' in log.lower() or 'error' in log.lower():
                failed_logins.append(log.strip())
            elif 'success' in log.lower():
                successful_logins.append(log.strip())

        # Check for suspicious patterns
        if 'login' in log.lower() and ('multiple' in log.lower() or 'rapid' in log.lower()):
            suspicious_activity.append(log.strip())

    print(f"Failed Logins: {len(failed_logins)}")
    if failed_logins:
        print("Recent failed logins:")
        for login in failed_logins[-5:]:
            print(f"  - {login}")

    print(f"\nSuccessful Logins: {len(successful_logins)}")

    print(f"\nSuspicious Activity: {len(suspicious_activity)}")
    for activity in suspicious_activity:
        print(f"  - {activity}")

    # Check for brute force patterns
    ip_failures = defaultdict(int)
    for login in failed_logins:
        # Extract IP address (simplified)
        ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', login)
        if ip_match:
            ip_failures[ip_match.group(1)] += 1

    suspicious_ips = {ip: count for ip, count in ip_failures.items() if count > 5}

    if suspicious_ips:
        print(f"\nSuspicious IPs (multiple failed attempts):")
        for ip, count in suspicious_ips.items():
            print(f"  - {ip}: {count} failed attempts")

Performance Issues

# Troubleshoot performance from logs
def troubleshoot_performance_logs(log_file):
    """Analyze performance-related log entries"""

    with open(log_file, 'r') as f:
        logs = f.readlines()

    slow_requests = []
    memory_issues = []
    cpu_issues = []

    for log in logs:
        if 'response_time' in log.lower():
            # Extract response time
            match = re.search(r'response_time.*?(\d+)ms', log.lower())
            if match:
                response_time = int(match.group(1))
                if response_time > 2000:  # > 2 seconds
                    slow_requests.append({
                        'line': log.strip(),
                        'response_time': response_time
                    })

        elif 'memory' in log.lower() and ('error' in log.lower() or 'warning' in log.lower()):
            memory_issues.append(log.strip())

        elif 'cpu' in log.lower() and ('high' in log.lower() or 'error' in log.lower()):
            cpu_issues.append(log.strip())

    print(f"Slow Requests (>2s): {len(slow_requests)}")
    if slow_requests:
        print("Slowest requests:")
        sorted_requests = sorted(slow_requests, key=lambda x: x['response_time'], reverse=True)
        for req in sorted_requests[:5]:
            print(f"  - {req['response_time']}ms: {req['line']}")

    print(f"\nMemory Issues: {len(memory_issues)}")
    for issue in memory_issues[-3:]:
        print(f"  - {issue}")

    print(f"\nCPU Issues: {len(cpu_issues)}")
    for issue in cpu_issues[-3:]:
        print(f"  - {issue}")

📊 Log Management Best Practices

Log Rotation and Retention

# docker-compose.yml
services:
  backend:
    volumes:
      - ./logs:/app/logs
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

  nginx:
    volumes:
      - ./logs/nginx:/var/log/nginx
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

Centralized Logging with ELK Stack

# docker-compose.elk.yml
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"

  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.0
    volumes:
      - ./logstash/config:/usr/share/logstash/pipeline
    ports:
      - "5044:5044"
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.0
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch

volumes:
  elasticsearch_data:

Logstash Configuration

# logstash/pipeline/studio.conf
input {
  beats {
    port => 5044
  }
}

filter {
  if [fields][service] == "studio" {
    grok {
      match => { 
        "message" => "%{LOGLEVEL:level} %{TIMESTAMP_ISO8601:timestamp} %{DATA:module} %{GREEDYDATA:message}"
      }
    }

    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
    }

    if [level] == "ERROR" or [level] == "CRITICAL" {
      mutate {
        add_tag => [ "error" ]
      }
    }

    if [message] =~ /response_time/ {
      grok {
        match => { 
          "message" => "response_time:%{NUMBER:response_time:int}ms"
        }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "studio-logs-%{+YYYY.MM.dd}"
  }
}

🔧 Advanced Log Analysis

Machine Learning for Anomaly Detection

# anomaly_detection.py
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd

class LogAnomalyDetector:
    def __init__(self):
        self.scaler = StandardScaler()
        self.model = IsolationForest(contamination=0.1, random_state=42)
        self.is_trained = False

    def extract_features(self, logs):
        """Extract features from logs for anomaly detection"""
        features = []

        for log in logs:
            # Time-based features
            timestamp = datetime.strptime(log['timestamp'], '%Y-%m-%d %H:%M:%S')
            hour = timestamp.hour
            day_of_week = timestamp.weekday()

            # Log level features
            level_numeric = {'DEBUG': 0, 'INFO': 1, 'WARNING': 2, 'ERROR': 3, 'CRITICAL': 4}
            level_value = level_numeric.get(log['level'], 1)

            # Message length
            message_length = len(log['message'])

            # Response time if present
            response_time = 0
            if 'response_time' in log['message']:
                match = re.search(r'response_time.*?(\d+)ms', log['message'])
                if match:
                    response_time = int(match.group(1))

            features.append([hour, day_of_week, level_value, message_length, response_time])

        return np.array(features)

    def train(self, logs):
        """Train the anomaly detection model"""
        features = self.extract_features(logs)
        features_scaled = self.scaler.fit_transform(features)
        self.model.fit(features_scaled)
        self.is_trained = True

    def detect_anomalies(self, logs):
        """Detect anomalies in logs"""
        if not self.is_trained:
            self.train(logs)

        features = self.extract_features(logs)
        features_scaled = self.scaler.transform(features)

        predictions = self.model.predict(features_scaled)
        anomalies = []

        for i, prediction in enumerate(predictions):
            if prediction == -1:  # Anomaly detected
                anomalies.append({
                    'log': logs[i],
                    'anomaly_score': self.model.decision_function([features_scaled[i]])[0],
                    'features': features[i]
                })

        return anomalies

    def explain_anomaly(self, anomaly):
        """Explain why a log entry is considered anomalous"""
        log = anomaly['log']
        features = anomaly['features']

        explanations = []

        # Check time-based anomalies
        hour = int(features[0])
        if hour < 6 or hour > 22:
            explanations.append(f"Unusual hour: {hour}")

        # Check level anomalies
        level = log['level']
        if level in ['ERROR', 'CRITICAL']:
            explanations.append(f"High severity level: {level}")

        # Check message length anomalies
        message_length = int(features[3])
        if message_length > 1000:
            explanations.append(f"Unusually long message: {message_length} characters")

        # Check response time anomalies
        response_time = int(features[4])
        if response_time > 5000:
            explanations.append(f"Very slow response: {response_time}ms")

        return explanations

Log Monitoring

Set up real-time log monitoring with automated alerts for critical errors and unusual patterns.

Log Storage

Monitor log storage usage and implement proper retention policies to avoid disk space issues.

Log Security

Ensure logs don't contain sensitive information like passwords, API keys, or personal data.