from tabulate import tabulate # ANSI Color Codes GREEN = "\033[92m" RED = "\033[91m" CYAN = "\033[96m" YELLOW = "\033[93m" RESET = "\033[0m" BOLD = "\033[1m" def colorize(text, color, use_color=True): if not use_color: return text return f"{color}{text}{RESET}" def format_summary(cluster_data, use_color=True): """ Formats the cluster health summary. """ health = cluster_data["health"] color = GREEN if health == "Healthy" else RED if health == "Split Brain Detected (Multiple Primaries)": color = YELLOW summary = [ f"{BOLD}Cluster Health:{RESET} {colorize(health, color, use_color)}", f"{BOLD}Total Nodes:{RESET} {len(cluster_data['nodes'])}", f"{BOLD}Primaries:{RESET} {cluster_data['primary_count']}", "-" * 30 ] return "\n".join(summary) def format_node_table(nodes, use_color=True): """ Formats the node list as a table. """ headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Replication Lag", "LiteFS Info"] table_data = [] for node in nodes: # Consul status color status = node["status"] status_color = GREEN if status == "passing" else RED colored_status = colorize(status, status_color, use_color) # Role color role = node["role"] role_color = CYAN if role == "primary" else RESET colored_role = colorize(role, role_color, use_color) # LiteFS role color & consistency check litefs_primary = node["litefs_primary"] litefs_role = "primary" if litefs_primary else "replica" # Highlight discrepancy if consul and litefs disagree litefs_role_color = RESET if (role == "primary" and not litefs_primary) or (role == "replica" and litefs_primary): litefs_role_color = YELLOW litefs_role = f"!! {litefs_role} !!" elif litefs_primary: litefs_role_color = CYAN colored_litefs_role = colorize(litefs_role, litefs_role_color, use_color) # Error info info = "" if node.get("litefs_error"): info = colorize("LiteFS API Error", RED, use_color) else: info = node.get("advertise_url", "") table_data.append([ colorize(node["node"], BOLD, use_color), colored_role, colored_status, colored_litefs_role, node["uptime"], node["replication_lag"], info ]) return tabulate(table_data, headers=headers, tablefmt="simple")