From f7adbcc0def0635bed6f707038731c1511d6d276 Mon Sep 17 00:00:00 2001 From: sstent Date: Mon, 11 Aug 2025 12:30:41 -0700 Subject: [PATCH] trying to fix history issues --- consul-monitor/app.py | 59 ++++--- consul-monitor/background_poller.py | 50 +++--- consul-monitor/consul_client.py | 38 ++++- consul-monitor/database.py | 159 ++++++++++++++--- consul-monitor/templates/index.html | 254 +++++++++++++++------------- plan_phase3.md | 73 ++++++++ 6 files changed, 447 insertions(+), 186 deletions(-) create mode 100644 plan_phase3.md diff --git a/consul-monitor/app.py b/consul-monitor/app.py index d16cce6..d52b51d 100644 --- a/consul-monitor/app.py +++ b/consul-monitor/app.py @@ -43,18 +43,23 @@ def index(): # Get thread-local database connection db_conn = get_db() - # Get initial service data - services = database.get_all_services_with_health(db_conn) - consul_available = consul_client.is_consul_available() - - # Generate URLs for services - for service in services: - if service['port']: - service['url'] = f"http://{service['name']}.service.dc1.consul:{service['port']}" - else: - service['url'] = None - - return render_template('index.html', services=services, consul_available=consul_available) + try: + # Get services grouped by name + services = database.get_all_services_grouped(db_conn) + consul_available = consul_client.is_consul_available() + + # Generate URLs for each instance in each service + for service in services: + for instance in service['instances']: + if instance['port']: + instance['url'] = f"http://{service['name']}.service.dc1.consul:{instance['port']}" + else: + instance['url'] = None + + return render_template('index.html', services=services, consul_available=consul_available) + except Exception as e: + # Fallback in case of errors + return render_template('index.html', services=[], consul_available=False, error=str(e)) @app.route('/api/services') def get_services(): @@ -63,16 +68,17 @@ def get_services(): db_conn = get_db() try: - # Always use database data since background polling updates it - services = database.get_all_services_with_health(db_conn) + # Get services grouped by name + services = database.get_all_services_grouped(db_conn) consul_available = consul_client.is_consul_available() - # Generate URLs for services + # Generate URLs for each instance in each service for service in services: - if service['port']: - service['url'] = f"http://{service['name']}.service.dc1.consul:{service['port']}" - else: - service['url'] = None + for instance in service['instances']: + if instance['port']: + instance['url'] = f"http://{service['name']}.service.dc1.consul:{instance['port']}" + else: + instance['url'] = None response = { 'status': 'success', @@ -134,8 +140,8 @@ def update_config(): session.permanent = True return jsonify({'status': 'success'}) -@app.route('/api/services//history') -def get_service_history(service_id): +@app.route('/api/services//history') +def get_service_history(service_name): """Get historical health data for charts""" # Get thread-local database connection db_conn = get_db() @@ -144,15 +150,19 @@ def get_service_history(service_id): granularity = int(request.args.get('granularity', session.get('history_granularity', 15))) + # Get instance address from query params + instance_address = request.args.get('instance', '') + try: # Get raw history data (24 hours) - history = database.get_service_history(db_conn, service_id, 24) + history = database.get_service_history(db_conn, service_name, instance_address, 24) # Aggregate data by granularity for Chart.js chart_data = aggregate_health_data(history, granularity) return jsonify({ - 'service_id': service_id, + 'service_name': service_name, + 'instance_address': instance_address, 'granularity': granularity, 'data': chart_data }) @@ -160,7 +170,8 @@ def get_service_history(service_id): except Exception as e: return jsonify({ 'error': str(e), - 'service_id': service_id, + 'service_name': service_name, + 'instance_address': instance_address, 'data': [] }), 500 diff --git a/consul-monitor/background_poller.py b/consul-monitor/background_poller.py index 879cbb8..226474b 100644 --- a/consul-monitor/background_poller.py +++ b/consul-monitor/background_poller.py @@ -61,8 +61,14 @@ class ConsulPoller: logger.warning("Consul unavailable during background poll") return - # Get fresh data from Consul - service_data = consul_client.fetch_all_service_data() + # Get fresh data from Consul (now returns services and instances) + consul_data = consul_client.fetch_all_service_data() + if not consul_data: + logger.warning("No data received from Consul") + return + + service_data = consul_data['services'] + instances = consul_data['instances'] if not service_data: logger.warning("No service data received from Consul") @@ -80,26 +86,28 @@ class ConsulPoller: services_updated = 0 health_checks_inserted = 0 - for service_id, data in service_data.items(): - # Upsert service - database.upsert_service(conn, { - 'id': service_id, - 'name': data['name'], - 'address': data['address'], - 'port': data['port'], - 'tags': data['tags'], - 'meta': data['meta'] - }) - services_updated += 1 + # Process instances + for address, instance in instances.items(): + # Upsert instance with composite health + database.upsert_instance(conn, address, instance['health_status']) - # Insert health checks - raw data points every minute - for check in data['health_checks']: - database.insert_health_check( - conn, service_id, - check['check_name'], - check['status'] - ) - health_checks_inserted += 1 + # Record instance health + database.insert_instance_health(conn, address, instance['health_status']) + + # Process services in this instance + for service in instance['services']: + # Upsert service with instance address + database.upsert_service(conn, service, address) + services_updated += 1 + + # Insert health checks + for check in service['health_checks']: + database.insert_health_check( + conn, service['id'], + check['check_name'], + check['status'] + ) + health_checks_inserted += 1 conn.close() diff --git a/consul-monitor/consul_client.py b/consul-monitor/consul_client.py index fddc096..1944ef7 100644 --- a/consul-monitor/consul_client.py +++ b/consul-monitor/consul_client.py @@ -50,8 +50,38 @@ def is_consul_available(): except requests.exceptions.RequestException: return False +def calculate_composite_health(services): + """Calculate overall health status for a group of services""" + status_priority = {'critical': 3, 'warning': 2, 'passing': 1} + worst_status = 'passing' + + for service in services: + for check in service['health_checks']: + if status_priority[check['status']] > status_priority[worst_status]: + worst_status = check['status'] + return worst_status + +def group_services_by_instance(services): + """Group services by their instance address""" + instances = {} + for service in services.values(): + address = service['address'] + if address not in instances: + instances[address] = { + 'address': address, + 'services': [], + 'health_status': 'passing' + } + instances[address]['services'].append(service) + + # Calculate composite health for each instance + for instance in instances.values(): + instance['health_status'] = calculate_composite_health(instance['services']) + + return instances + def fetch_all_service_data(): - """Fetch service data and health status for all services""" + """Fetch service data and health status for all services, grouped by instance""" try: services = get_consul_services() service_data = {} @@ -76,7 +106,11 @@ def fetch_all_service_data(): 'health_checks': health_checks } - return service_data + # Return both individual services and grouped instances + return { + 'services': service_data, + 'instances': group_services_by_instance(service_data) + } except requests.exceptions.RequestException: logger.error("Failed to fetch service data from Consul") return {} diff --git a/consul-monitor/database.py b/consul-monitor/database.py index 78200d4..1121303 100644 --- a/consul-monitor/database.py +++ b/consul-monitor/database.py @@ -4,12 +4,22 @@ from datetime import datetime def create_tables(conn): cursor = conn.cursor() + # Create instances table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS instances ( + address TEXT PRIMARY KEY, + health_status TEXT NOT NULL DEFAULT 'unknown', + first_seen DATETIME DEFAULT CURRENT_TIMESTAMP, + last_seen DATETIME DEFAULT CURRENT_TIMESTAMP + ) + ''') + # Create services table cursor.execute(''' CREATE TABLE IF NOT EXISTS services ( id TEXT PRIMARY KEY, name TEXT NOT NULL, - address TEXT, + address TEXT REFERENCES instances(address) ON DELETE CASCADE, port INTEGER, tags TEXT, meta TEXT, @@ -30,6 +40,16 @@ def create_tables(conn): ) ''') + # Create instance health table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS instance_health ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + address TEXT NOT NULL REFERENCES instances(address) ON DELETE CASCADE, + health_status TEXT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ) + ''') + # Create indexes cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_health_checks_service_timestamp @@ -49,7 +69,20 @@ def init_database(): create_tables(conn) return conn -def upsert_service(conn, service_data): +def upsert_instance(conn, address, health_status): + """Insert or update an instance record""" + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO instances (address, health_status, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(address) DO UPDATE SET + health_status = excluded.health_status, + last_seen = excluded.last_seen + ''', (address, health_status)) + conn.commit() + +def upsert_service(conn, service_data, instance_address): + """Insert or update a service record with instance reference""" cursor = conn.cursor() cursor.execute(''' INSERT INTO services (id, name, address, port, tags, meta) @@ -64,13 +97,22 @@ def upsert_service(conn, service_data): ''', ( service_data['id'], service_data['name'], - service_data.get('address'), + instance_address, service_data.get('port'), json.dumps(service_data.get('tags', [])), json.dumps(service_data.get('meta', {})) )) conn.commit() +def insert_instance_health(conn, address, health_status): + """Insert an instance health record""" + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO instance_health (address, health_status) + VALUES (?, ?) + ''', (address, health_status)) + conn.commit() + def insert_health_check(conn, service_id, check_name, status): cursor = conn.cursor() cursor.execute(''' @@ -79,40 +121,72 @@ def insert_health_check(conn, service_id, check_name, status): ''', (service_id, check_name, status)) conn.commit() -def get_all_services_with_health(conn): +def get_all_services_grouped(conn): + """Get all services grouped by name with composite health status""" cursor = conn.cursor() cursor.execute(''' - SELECT s.id, s.name, s.address, s.port, s.tags, s.meta, - h.status, MAX(h.timestamp) AS last_check + WITH latest_health AS ( + SELECT + service_id, + status, + MAX(timestamp) as last_check + FROM health_checks + GROUP BY service_id + ) + SELECT + s.name, + json_group_array(json_object( + 'address', s.address, + 'port', s.port, + 'id', s.id, + 'tags', s.tags, + 'meta', s.meta, + 'current_status', lh.status, + 'last_check', lh.last_check + )) AS instances, + MIN(CASE + WHEN lh.status = 'critical' THEN 1 + WHEN lh.status = 'warning' THEN 2 + WHEN lh.status = 'passing' THEN 3 + ELSE 4 END) as composite_status_order FROM services s - LEFT JOIN health_checks h ON s.id = h.service_id - GROUP BY s.id + LEFT JOIN latest_health lh ON s.id = lh.service_id + GROUP BY s.name + ORDER BY s.name ''') services = [] for row in cursor.fetchall(): service = { - 'id': row[0], - 'name': row[1], - 'address': row[2], - 'port': row[3], - 'tags': json.loads(row[4]) if row[4] else [], - 'meta': json.loads(row[5]) if row[5] else {}, - 'current_status': row[6] or 'unknown', - 'last_check': row[7] + 'name': row[0], + 'instances': json.loads(row[1]) if row[1] else [], + 'composite_status': 'passing' # Default } + + # Determine composite status based on worst status + if any(inst.get('current_status') == 'critical' for inst in service['instances']): + service['composite_status'] = 'critical' + elif any(inst.get('current_status') == 'warning' for inst in service['instances']): + service['composite_status'] = 'warning' + elif all(inst.get('current_status') == 'passing' for inst in service['instances']): + service['composite_status'] = 'passing' + else: + service['composite_status'] = 'unknown' + services.append(service) return services -def get_service_history(conn, service_id, hours=24): +def get_service_history(conn, service_name, instance_address, hours=24): cursor = conn.cursor() cursor.execute(''' - SELECT status, timestamp - FROM health_checks - WHERE service_id = ? - AND timestamp >= datetime('now', ?) - ORDER BY timestamp ASC - ''', (service_id, f'-{hours} hours')) + SELECT hc.status, hc.timestamp + FROM health_checks hc + JOIN services s ON hc.service_id = s.id + WHERE s.name = ? + AND s.address = ? + AND hc.timestamp >= datetime('now', ?) + ORDER BY hc.timestamp ASC + ''', (service_name, instance_address, f'-{hours} hours')) return cursor.fetchall() def get_service_history_detailed(conn, service_id, hours=24): @@ -135,3 +209,42 @@ def is_database_available(conn): return True except sqlite3.Error: return False + +# Keep the old function for now but we'll remove it later +def get_all_instances_with_services(conn): + """Get all instances with their services and health status""" + cursor = conn.cursor() + cursor.execute(''' + SELECT i.address, i.health_status, + s.id, s.name, s.port, s.tags, s.meta, + h.status, MAX(h.timestamp) AS last_check + FROM instances i + LEFT JOIN services s ON i.address = s.address + LEFT JOIN health_checks h ON s.id = h.service_id + GROUP BY i.address, s.id + ''') + + instances = {} + for row in cursor.fetchall(): + address = row[0] + if address not in instances: + instances[address] = { + 'address': address, + 'health_status': row[1], + 'services': [] + } + + # Only add service if it exists + if row[2]: # service id + service = { + 'id': row[2], + 'name': row[3], + 'port': row[4], + 'tags': json.loads(row[5]) if row[5] else [], + 'meta': json.loads(row[6]) if row[6] else {}, + 'current_status': row[7] or 'unknown', + 'last_check': row[8] + } + instances[address]['services'].append(service) + + return list(instances.values()) diff --git a/consul-monitor/templates/index.html b/consul-monitor/templates/index.html index 030ecea..7adb088 100644 --- a/consul-monitor/templates/index.html +++ b/consul-monitor/templates/index.html @@ -53,36 +53,46 @@ Service Name - Status - URL - Tags - 24h History + Health + Instances + History + Details -