first commit
This commit is contained in:
14
utils/__init__.py
Normal file
14
utils/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""
|
||||
Utility functions for qBittorrent connection monitoring
|
||||
"""
|
||||
|
||||
from .time_utils import format_human_readable_time, calculate_time_remaining
|
||||
from .formatters import format_connection_status, format_debug_metrics, format_torrent_list
|
||||
|
||||
__all__ = [
|
||||
'format_human_readable_time',
|
||||
'calculate_time_remaining',
|
||||
'format_connection_status',
|
||||
'format_debug_metrics',
|
||||
'format_torrent_list'
|
||||
]
|
||||
BIN
utils/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
utils/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/formatters.cpython-313.pyc
Normal file
BIN
utils/__pycache__/formatters.cpython-313.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/time_utils.cpython-313.pyc
Normal file
BIN
utils/__pycache__/time_utils.cpython-313.pyc
Normal file
Binary file not shown.
56
utils/formatters.py
Normal file
56
utils/formatters.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
def format_connection_status(status: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Format connection status for logging
|
||||
|
||||
Args:
|
||||
status: Connection status dictionary
|
||||
|
||||
Returns:
|
||||
Formatted status string
|
||||
"""
|
||||
connection_status = status.get('connection_status', 'unknown')
|
||||
dht_nodes = status.get('dht_nodes', 0)
|
||||
return f"Status: {connection_status}, DHT Nodes: {dht_nodes}"
|
||||
|
||||
def format_debug_metrics(metrics: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Format debug metrics for logging
|
||||
|
||||
Args:
|
||||
metrics: Dictionary of debug metrics
|
||||
|
||||
Returns:
|
||||
Formatted debug metrics string
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if 'transition_info' in metrics:
|
||||
parts.append(metrics['transition_info'])
|
||||
if 'failure_info' in metrics:
|
||||
parts.append(metrics['failure_info'])
|
||||
if 'last_failure_info' in metrics:
|
||||
parts.append(metrics['last_failure_info'])
|
||||
if 'remediation_info' in metrics:
|
||||
parts.append(metrics['remediation_info'])
|
||||
if 'stability_info' in metrics:
|
||||
parts.append(metrics['stability_info'])
|
||||
|
||||
return " - ".join(parts)
|
||||
|
||||
def format_torrent_list(torrents: list) -> str:
|
||||
"""
|
||||
Format torrent list for logging
|
||||
|
||||
Args:
|
||||
torrents: List of torrent dictionaries
|
||||
|
||||
Returns:
|
||||
Formatted torrent list string
|
||||
"""
|
||||
if not torrents:
|
||||
return "No torrents"
|
||||
|
||||
hashes = [torrent.get('hash', 'unknown') for torrent in torrents]
|
||||
return f"{len(hashes)} torrents: {'|'.join(hashes[:3])}{'...' if len(hashes) > 3 else ''}"
|
||||
49
utils/time_utils.py
Normal file
49
utils/time_utils.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from datetime import timedelta
|
||||
from typing import Optional
|
||||
|
||||
def format_human_readable_time(seconds: float) -> str:
|
||||
"""
|
||||
Convert seconds to human-readable time format using datetime.timedelta
|
||||
|
||||
Args:
|
||||
seconds: Number of seconds to format
|
||||
|
||||
Returns:
|
||||
Human-readable time string (e.g., "1 day 2 hours 3 minutes 4 seconds")
|
||||
"""
|
||||
td = timedelta(seconds=seconds)
|
||||
|
||||
# Extract components
|
||||
days = td.days
|
||||
hours, remainder = divmod(td.seconds, 3600)
|
||||
minutes, seconds_remaining = divmod(remainder, 60)
|
||||
|
||||
# Build the human-readable string
|
||||
parts = []
|
||||
if days > 0:
|
||||
parts.append(f"{days} day" if days == 1 else f"{days} days")
|
||||
if hours > 0:
|
||||
parts.append(f"{hours} hour" if hours == 1 else f"{hours} hours")
|
||||
if minutes > 0:
|
||||
parts.append(f"{minutes} minute" if minutes == 1 else f"{minutes} minutes")
|
||||
if seconds_remaining > 0 or not parts: # Include seconds if no other parts or if seconds exist
|
||||
parts.append(f"{seconds_remaining} second" if seconds_remaining == 1 else f"{seconds_remaining} seconds")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
def calculate_time_remaining(start_time: float, duration_required: float, current_time: Optional[float] = None) -> float:
|
||||
"""
|
||||
Calculate remaining time for a duration requirement
|
||||
|
||||
Args:
|
||||
start_time: When the timer started
|
||||
duration_required: Total duration required
|
||||
current_time: Current time (defaults to time.time())
|
||||
|
||||
Returns:
|
||||
Remaining time in seconds (0 if elapsed time exceeds requirement)
|
||||
"""
|
||||
import time
|
||||
current = current_time or time.time()
|
||||
elapsed = current - start_time
|
||||
return max(0, duration_required - elapsed)
|
||||
Reference in New Issue
Block a user