mirror of
https://github.com/sstent/AICyclingCoach.git
synced 2026-01-25 08:34:51 +00:00
215 lines
7.8 KiB
Python
215 lines
7.8 KiB
Python
"""
|
|
Route service for TUI application.
|
|
Manages GPX routes and route visualization without HTTP dependencies.
|
|
"""
|
|
import gpxpy
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from backend.app.models.route import Route
|
|
from backend.app.models.section import Section
|
|
|
|
|
|
class RouteService:
|
|
"""Service for route and GPX operations."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def get_routes(self) -> List[Dict]:
|
|
"""Get all routes."""
|
|
try:
|
|
result = await self.db.execute(
|
|
select(Route).order_by(desc(Route.created_at))
|
|
)
|
|
routes = result.scalars().all()
|
|
|
|
return [
|
|
{
|
|
"id": r.id,
|
|
"name": r.name,
|
|
"description": r.description,
|
|
"total_distance": r.total_distance,
|
|
"elevation_gain": r.elevation_gain,
|
|
"gpx_file_path": r.gpx_file_path,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None
|
|
} for r in routes
|
|
]
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error fetching routes: {str(e)}")
|
|
|
|
async def get_route(self, route_id: int) -> Optional[Dict]:
|
|
"""Get a specific route by ID."""
|
|
try:
|
|
route = await self.db.get(Route, route_id)
|
|
if not route:
|
|
return None
|
|
|
|
return {
|
|
"id": route.id,
|
|
"name": route.name,
|
|
"description": route.description,
|
|
"total_distance": route.total_distance,
|
|
"elevation_gain": route.elevation_gain,
|
|
"gpx_file_path": route.gpx_file_path,
|
|
"created_at": route.created_at.isoformat() if route.created_at else None
|
|
}
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error fetching route {route_id}: {str(e)}")
|
|
|
|
async def load_gpx_file(self, file_path: str) -> Dict:
|
|
"""Load and parse GPX file."""
|
|
try:
|
|
gpx_path = Path(file_path)
|
|
if not gpx_path.exists():
|
|
raise Exception(f"GPX file not found: {file_path}")
|
|
|
|
with open(gpx_path, 'r', encoding='utf-8') as gpx_file:
|
|
gpx = gpxpy.parse(gpx_file)
|
|
|
|
# Extract track points
|
|
track_points = []
|
|
total_distance = 0
|
|
min_elevation = float('inf')
|
|
max_elevation = float('-inf')
|
|
|
|
for track in gpx.tracks:
|
|
for segment in track.segments:
|
|
for point in segment.points:
|
|
track_points.append({
|
|
"lat": point.latitude,
|
|
"lon": point.longitude,
|
|
"ele": point.elevation if point.elevation else 0,
|
|
"time": point.time.isoformat() if point.time else None
|
|
})
|
|
|
|
if point.elevation:
|
|
min_elevation = min(min_elevation, point.elevation)
|
|
max_elevation = max(max_elevation, point.elevation)
|
|
|
|
# Calculate total distance and elevation gain
|
|
if track_points:
|
|
total_distance = self._calculate_total_distance(track_points)
|
|
elevation_gain = max_elevation - min_elevation if max_elevation != float('-inf') else 0
|
|
|
|
return {
|
|
"name": gpx_path.stem,
|
|
"total_distance": total_distance,
|
|
"elevation_gain": elevation_gain,
|
|
"track_points": track_points,
|
|
"gpx_file_path": file_path
|
|
}
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error loading GPX file {file_path}: {str(e)}")
|
|
|
|
def _calculate_total_distance(self, track_points: List[Dict]) -> float:
|
|
"""Calculate total distance from track points."""
|
|
if len(track_points) < 2:
|
|
return 0
|
|
|
|
total_distance = 0
|
|
for i in range(1, len(track_points)):
|
|
prev_point = track_points[i-1]
|
|
curr_point = track_points[i]
|
|
|
|
# Simple haversine distance calculation
|
|
distance = self._haversine_distance(
|
|
prev_point["lat"], prev_point["lon"],
|
|
curr_point["lat"], curr_point["lon"]
|
|
)
|
|
total_distance += distance
|
|
|
|
return total_distance
|
|
|
|
def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Calculate distance between two points using haversine formula."""
|
|
import math
|
|
|
|
R = 6371000 # Earth's radius in meters
|
|
|
|
lat1_rad = math.radians(lat1)
|
|
lat2_rad = math.radians(lat2)
|
|
delta_lat = math.radians(lat2 - lat1)
|
|
delta_lon = math.radians(lon2 - lon1)
|
|
|
|
a = (math.sin(delta_lat / 2) ** 2 +
|
|
math.cos(lat1_rad) * math.cos(lat2_rad) *
|
|
math.sin(delta_lon / 2) ** 2)
|
|
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
return R * c
|
|
|
|
async def create_route(self, name: str, description: str, gpx_file_path: str) -> Dict:
|
|
"""Create a new route from GPX data."""
|
|
try:
|
|
# Load GPX data
|
|
gpx_data = await self.load_gpx_file(gpx_file_path)
|
|
|
|
# Create route record
|
|
db_route = Route(
|
|
name=name,
|
|
description=description,
|
|
total_distance=gpx_data["total_distance"],
|
|
elevation_gain=gpx_data["elevation_gain"],
|
|
gpx_file_path=gpx_file_path
|
|
)
|
|
self.db.add(db_route)
|
|
await self.db.commit()
|
|
await self.db.refresh(db_route)
|
|
|
|
return {
|
|
"id": db_route.id,
|
|
"name": db_route.name,
|
|
"description": db_route.description,
|
|
"total_distance": db_route.total_distance,
|
|
"elevation_gain": db_route.elevation_gain,
|
|
"gpx_file_path": db_route.gpx_file_path,
|
|
"created_at": db_route.created_at.isoformat() if db_route.created_at else None
|
|
}
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error creating route: {str(e)}")
|
|
|
|
async def delete_route(self, route_id: int) -> Dict:
|
|
"""Delete a route."""
|
|
try:
|
|
route = await self.db.get(Route, route_id)
|
|
if not route:
|
|
raise Exception("Route not found")
|
|
|
|
await self.db.delete(route)
|
|
await self.db.commit()
|
|
|
|
return {"message": "Route deleted successfully"}
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error deleting route: {str(e)}")
|
|
|
|
async def get_route_sections(self, route_id: int) -> List[Dict]:
|
|
"""Get sections for a specific route."""
|
|
try:
|
|
result = await self.db.execute(
|
|
select(Section).where(Section.route_id == route_id)
|
|
)
|
|
sections = result.scalars().all()
|
|
|
|
return [
|
|
{
|
|
"id": s.id,
|
|
"route_id": s.route_id,
|
|
"gpx_file_path": s.gpx_file_path,
|
|
"distance_m": s.distance_m,
|
|
"grade_avg": s.grade_avg,
|
|
"min_gear": s.min_gear,
|
|
"est_time_minutes": s.est_time_minutes
|
|
} for s in sections
|
|
]
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error fetching route sections: {str(e)}") |