added segments
This commit is contained in:
108
FitnessSync/scratch/auto_create_segments.py
Normal file
108
FitnessSync/scratch/auto_create_segments.py
Normal file
@@ -0,0 +1,108 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.activity import Activity
|
||||
from src.models.segment import Segment
|
||||
from src.services.segment_matcher import SegmentMatcher
|
||||
from src.services.parsers import extract_points_from_file
|
||||
from src.utils.geo import haversine_distance, calculate_bounds, ramer_douglas_peucker
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
def auto_create_segments(garmin_activity_id, split_dist_meters=1000):
|
||||
print(f"\n--- Auto Creating Segments for {garmin_activity_id} (Split: {split_dist_meters}m) ---")
|
||||
|
||||
activity = db.query(Activity).filter(Activity.garmin_activity_id == garmin_activity_id).first()
|
||||
if not activity:
|
||||
print(f"Activity {garmin_activity_id} not found in DB.")
|
||||
return
|
||||
|
||||
# Extract points
|
||||
points = extract_points_from_file(activity.file_content, activity.file_type)
|
||||
if not points or len(points) < 2:
|
||||
print("No points found in activity.")
|
||||
return
|
||||
|
||||
print(f"Total Points: {len(points)}")
|
||||
|
||||
accum_dist = 0.0
|
||||
split_start_idx = 0
|
||||
split_count = 1
|
||||
|
||||
segments_created = []
|
||||
|
||||
for i in range(1, len(points)):
|
||||
p1 = points[i-1]
|
||||
p2 = points[i]
|
||||
|
||||
# dist between p1 and p2
|
||||
d = haversine_distance(p1[1], p1[0], p2[1], p2[0])
|
||||
accum_dist += d
|
||||
|
||||
if accum_dist >= split_dist_meters:
|
||||
# Create segment
|
||||
split_end_idx = i
|
||||
|
||||
seg_points = points[split_start_idx : split_end_idx + 1]
|
||||
|
||||
# Simplify
|
||||
simple_points = ramer_douglas_peucker(seg_points, epsilon=5.0)
|
||||
bounds = calculate_bounds(seg_points)
|
||||
|
||||
# Calc actual distance & elevation
|
||||
seg_dist = 0.0
|
||||
seg_elev_gain = 0.0
|
||||
for k in range(len(seg_points)-1):
|
||||
sp1 = seg_points[k]
|
||||
sp2 = seg_points[k+1]
|
||||
seg_dist += haversine_distance(sp1[1], sp1[0], sp2[1], sp2[0])
|
||||
if len(sp1) > 2 and len(sp2) > 2 and sp1[2] is not None and sp2[2] is not None:
|
||||
diff = sp2[2] - sp1[2]
|
||||
if diff > 0:
|
||||
seg_elev_gain += diff
|
||||
|
||||
name = f"AutoSplit #{split_count} ({garmin_activity_id})"
|
||||
|
||||
print(f"Creating Segment: {name} | Dist: {seg_dist:.1f}m | Elev: {seg_elev_gain:.1f}m | Indices: {split_start_idx}-{split_end_idx}")
|
||||
|
||||
segment = Segment(
|
||||
name=name,
|
||||
description=f"Auto-generated {split_dist_meters}m split",
|
||||
distance=seg_dist,
|
||||
elevation_gain=seg_elev_gain,
|
||||
activity_type=activity.activity_type or 'cycling',
|
||||
points=json.dumps(simple_points),
|
||||
bounds=json.dumps(bounds)
|
||||
)
|
||||
db.add(segment)
|
||||
segments_created.append(segment)
|
||||
|
||||
# Reset for next split
|
||||
accum_dist = 0.0
|
||||
split_start_idx = i
|
||||
split_count += 1
|
||||
|
||||
db.commit()
|
||||
print(f"\nCreated {len(segments_created)} segments.")
|
||||
|
||||
# Trigger Matching
|
||||
if segments_created:
|
||||
print("\nTriggering Segment Matcher...")
|
||||
matcher = SegmentMatcher(db)
|
||||
matcher.match_activity(activity, points)
|
||||
print("Matching complete.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
auto_create_segments("21249259141", 1000)
|
||||
123
FitnessSync/scratch/debug_segment_match.py
Normal file
123
FitnessSync/scratch/debug_segment_match.py
Normal file
@@ -0,0 +1,123 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.activity import Activity
|
||||
from src.models.segment import Segment
|
||||
from src.utils.geo import haversine_distance, perpendicular_distance
|
||||
from src.services.segment_matcher import SegmentMatcher
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
# Helpers
|
||||
def _min_dist_to_segment_path(point, seg_points):
|
||||
min_d = float('inf')
|
||||
for i in range(len(seg_points) - 1):
|
||||
d = perpendicular_distance(point, seg_points[i], seg_points[i+1])
|
||||
if d < min_d:
|
||||
min_d = d
|
||||
return min_d
|
||||
|
||||
def debug_match(activity_garmin_id, segment_name):
|
||||
print(f"\n--- Debugging Match: Activity {activity_garmin_id} vs Segment {segment_name} ---")
|
||||
|
||||
activity = db.query(Activity).filter(Activity.garmin_activity_id == activity_garmin_id).first()
|
||||
if not activity:
|
||||
print("Activity not found")
|
||||
return
|
||||
|
||||
segment = db.query(Segment).filter(Segment.name == segment_name).first()
|
||||
if not segment:
|
||||
print(f"Segment {segment_name} not found")
|
||||
return
|
||||
|
||||
# Load points
|
||||
from src.services.parsers import extract_points_from_file
|
||||
act_points = extract_points_from_file(activity.file_content, activity.file_type)
|
||||
seg_points = json.loads(segment.points) if isinstance(segment.points, str) else segment.points
|
||||
|
||||
print(f"Activity Points: {len(act_points)}")
|
||||
print(f"Segment Points: {len(seg_points)}")
|
||||
print(f"Segment DB Distance: {segment.distance:.2f}m")
|
||||
|
||||
# Parameters
|
||||
ENTRY_RADIUS = 25.0
|
||||
CORRIDOR_RADIUS = 35.0
|
||||
|
||||
start_node = seg_points[0]
|
||||
end_node = seg_points[-1]
|
||||
|
||||
# 1. Find all start candidates
|
||||
start_candidates = []
|
||||
for i, p in enumerate(act_points):
|
||||
dist = haversine_distance(p[1], p[0], start_node[1], start_node[0])
|
||||
if dist <= ENTRY_RADIUS:
|
||||
start_candidates.append(i)
|
||||
|
||||
print(f"Found {len(start_candidates)} candidates for Start.")
|
||||
for idx in start_candidates:
|
||||
print(f" Candidate Index: {idx}")
|
||||
|
||||
if not start_candidates:
|
||||
print("No matches expected (No start found).")
|
||||
return
|
||||
|
||||
# 2. Trace each candidate
|
||||
match_found = False
|
||||
for start_idx in start_candidates:
|
||||
print(f"\n--- Tracing Candidate {start_idx} ---")
|
||||
if _trace(start_idx, act_points, seg_points, segment.distance, end_node, ENTRY_RADIUS, CORRIDOR_RADIUS):
|
||||
print("MATCH SUCCESS FOUND!")
|
||||
match_found = True
|
||||
break
|
||||
else:
|
||||
print("Candidate failed.")
|
||||
|
||||
if not match_found:
|
||||
print("\nAll candidates failed.")
|
||||
|
||||
def _trace(start_idx, act_points, seg_points, seg_dist, end_node, ENTRY_RADIUS, CORRIDOR_RADIUS):
|
||||
effort_accum_dist = 0.0
|
||||
|
||||
for j in range(start_idx, len(act_points)):
|
||||
p = act_points[j]
|
||||
# Accumulate dist
|
||||
if j > start_idx:
|
||||
prev = act_points[j-1]
|
||||
effort_accum_dist += haversine_distance(p[1], p[0], prev[1], prev[0])
|
||||
|
||||
d_path = _min_dist_to_segment_path(p, seg_points)
|
||||
d_end = haversine_distance(p[1], p[0], end_node[1], end_node[0])
|
||||
|
||||
status = "OK"
|
||||
if d_path > CORRIDOR_RADIUS:
|
||||
print(f" Idx {j} (Accum {effort_accum_dist:.1f}m): DEVIATED (DistPath={d_path:.2f}m)")
|
||||
return False
|
||||
|
||||
if d_end <= ENTRY_RADIUS:
|
||||
if effort_accum_dist >= 0.8 * seg_dist:
|
||||
print(f" Idx {j} (Accum {effort_accum_dist:.1f}m): FINISHED! (Valid Distance)")
|
||||
return True
|
||||
else:
|
||||
status = f"NEAR_END (Short: {effort_accum_dist:.1f}/{seg_dist:.1f}m)"
|
||||
|
||||
# Logging
|
||||
if (j - start_idx) % 100 == 0 or status != "OK":
|
||||
print(f" Idx {j}: Path={d_path:.1f}m End={d_end:.1f}m Accum={effort_accum_dist:.0f}m -> {status}")
|
||||
|
||||
print(" End of activity stream reached.")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
debug_match("21249259141", "Climb1")
|
||||
39
FitnessSync/scratch/inspect_fit_fields.py
Normal file
39
FitnessSync/scratch/inspect_fit_fields.py
Normal file
@@ -0,0 +1,39 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
import fitdecode
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.activity import Activity
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
def inspect_activity(garmin_activity_id):
|
||||
activity = db.query(Activity).filter(Activity.garmin_activity_id == garmin_activity_id).first()
|
||||
if not activity:
|
||||
print("Activity not found")
|
||||
return
|
||||
|
||||
content = activity.file_content
|
||||
print(f"File Size: {len(content)} bytes")
|
||||
|
||||
with io.BytesIO(content) as f:
|
||||
with fitdecode.FitReader(f) as fit:
|
||||
for frame in fit:
|
||||
if frame.frame_type == fitdecode.FIT_FRAME_DATA and frame.name == 'record':
|
||||
print("First Record Fields:")
|
||||
for field in frame.fields:
|
||||
print(f" - {field.name}: {field.value} (units: {field.units})")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
inspect_activity("21249259141")
|
||||
38
FitnessSync/scratch/inspect_segment.py
Normal file
38
FitnessSync/scratch/inspect_segment.py
Normal file
@@ -0,0 +1,38 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.segment import Segment
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
def inspect_segment(segment_name):
|
||||
print(f"--- Inspecting Segment: {segment_name} ---")
|
||||
|
||||
segment = db.query(Segment).filter(Segment.name == segment_name).first()
|
||||
if not segment:
|
||||
print("Segment not found")
|
||||
return
|
||||
|
||||
print(f"ID: {segment.id}")
|
||||
print(f"Name: {segment.name}")
|
||||
print(f"Distance: {segment.distance} meters")
|
||||
|
||||
points = json.loads(segment.points) if isinstance(segment.points, str) else segment.points
|
||||
print(f"Point Logic: {len(points)} points")
|
||||
if len(points) > 0:
|
||||
print(f"Start: {points[0]}")
|
||||
print(f"End: {points[-1]}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
inspect_segment("TEST3")
|
||||
54
FitnessSync/scratch/rematch_segments.py
Normal file
54
FitnessSync/scratch/rematch_segments.py
Normal file
@@ -0,0 +1,54 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.activity import Activity
|
||||
from src.models.segment_effort import SegmentEffort
|
||||
from src.models.segment import Segment
|
||||
from src.services.segment_matcher import SegmentMatcher
|
||||
from src.services.parsers import extract_activity_data # verify import works
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
def rematch_activity(garmin_activity_id):
|
||||
print(f"\n--- Rematching Segments for {garmin_activity_id} ---")
|
||||
|
||||
activity = db.query(Activity).filter(Activity.garmin_activity_id == garmin_activity_id).first()
|
||||
if not activity:
|
||||
print(f"Activity {garmin_activity_id} not found.")
|
||||
return
|
||||
|
||||
# Delete existing efforts
|
||||
deleted = db.query(SegmentEffort).filter(SegmentEffort.activity_id == activity.id).delete()
|
||||
db.commit()
|
||||
print(f"Deleted {deleted} existing efforts.")
|
||||
|
||||
# Extract points (and full data implicitly used in matcher)
|
||||
# Matcher expects list of points for geometric match
|
||||
from src.services.parsers import extract_points_from_file
|
||||
points = extract_points_from_file(activity.file_content, activity.file_type)
|
||||
|
||||
if not points:
|
||||
print("No points found.")
|
||||
return
|
||||
|
||||
print(f"Loaded {len(points)} points. triggering match...")
|
||||
|
||||
matcher = SegmentMatcher(db)
|
||||
efforts = matcher.match_activity(activity, points)
|
||||
|
||||
print(f"Matched {len(efforts)} segments.")
|
||||
for eff in efforts:
|
||||
print(f" - Segment: {eff.segment_id} | Time: {eff.elapsed_time}s | HR: {eff.avg_hr} | Power: {eff.avg_power}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
rematch_activity("21249259141")
|
||||
138
FitnessSync/scratch/test_segment_splitting.py
Normal file
138
FitnessSync/scratch/test_segment_splitting.py
Normal file
@@ -0,0 +1,138 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Add backend to path
|
||||
sys.path.append(os.path.join(os.getcwd(), 'backend'))
|
||||
|
||||
from src.models.activity import Activity
|
||||
from src.models.segment import Segment
|
||||
from src.utils.geo import haversine_distance, calculate_bounds
|
||||
from src.services.parsers import extract_points_from_file
|
||||
from src.services.segment_matcher import SegmentMatcher
|
||||
from src.utils.config import config
|
||||
|
||||
# Setup DB
|
||||
engine = create_engine(config.DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
db = Session()
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger("SegmentTest")
|
||||
|
||||
def test_segment_splitting(activity_garmin_id):
|
||||
print(f"--- Segment Splitting Test: Activity {activity_garmin_id} ---")
|
||||
|
||||
activity = db.query(Activity).filter(Activity.garmin_activity_id == activity_garmin_id).first()
|
||||
if not activity:
|
||||
print("Activity not found")
|
||||
return
|
||||
|
||||
if not activity.file_content:
|
||||
print("No file content")
|
||||
return
|
||||
|
||||
points = extract_points_from_file(activity.file_content, activity.file_type)
|
||||
print(f"Total Points: {len(points)}")
|
||||
|
||||
if len(points) < 2:
|
||||
print("Not enough points")
|
||||
return
|
||||
|
||||
# Split into 1-mile segments (1609.34 meters)
|
||||
MILE_IN_METERS = 1609.34
|
||||
|
||||
segments_to_test = []
|
||||
|
||||
current_segment_points = [points[0]]
|
||||
current_dist = 0.0
|
||||
seg_count = 1
|
||||
|
||||
# Simple splitting logic
|
||||
for i in range(1, len(points)):
|
||||
p1 = points[i-1]
|
||||
p2 = points[i]
|
||||
d = haversine_distance(p1[1], p1[0], p2[1], p2[0])
|
||||
current_dist += d
|
||||
current_segment_points.append(p2)
|
||||
|
||||
if current_dist >= MILE_IN_METERS:
|
||||
# Finalize this segment
|
||||
# Ensure it has enough points? Yes, if it's a mile long.
|
||||
|
||||
seg_name = f"Test_{seg_count}"
|
||||
|
||||
# Create a mock Segment object (not saving to DB to avoid pollution, unless needed by matcher?)
|
||||
# Matcher queries DB for segments. So we probably have to save them, or mock the query.
|
||||
# The user asked to "create segments named Test_...".
|
||||
# Ideally we check logic without DB writes, but Matcher implementation:
|
||||
# segments = self.db.query(Segment)...
|
||||
|
||||
# So we must persist them temporarily or modify matcher to accept list.
|
||||
# Let's persist and delete after?
|
||||
# Or just persist them as requested "create... segments".
|
||||
|
||||
# We will create meaningful Segment objects in memory and inject them into the matcher logic?
|
||||
# No, Matcher.match_activity queries the DB.
|
||||
# I will manually invoke _match_segment which takes specific objects.
|
||||
|
||||
segments_to_test.append({
|
||||
"name": seg_name,
|
||||
"points": current_segment_points,
|
||||
"distance": current_dist
|
||||
})
|
||||
|
||||
# Reset for next segment
|
||||
# Start next segment from current point (overlap 1 point)
|
||||
current_segment_points = [p2]
|
||||
current_dist = 0.0
|
||||
seg_count += 1
|
||||
|
||||
print(f"Created {len(segments_to_test)} mock segments.")
|
||||
|
||||
matcher = SegmentMatcher(db)
|
||||
|
||||
# Test each segment
|
||||
success_count = 0
|
||||
|
||||
for mock_seg in segments_to_test:
|
||||
print(f"\nTesting {mock_seg['name']} ({mock_seg['distance']:.2f}m)...")
|
||||
|
||||
# Create a transient Segment object
|
||||
seg_obj = Segment(
|
||||
id=9999 + int(mock_seg['name'].split('_')[1]), # Fake ID
|
||||
name=mock_seg['name'],
|
||||
activity_type=activity.activity_type,
|
||||
points=json.dumps(mock_seg['points']), # Matcher needs serialized or list?
|
||||
# Matcher: seg_points = json.loads(segment.points) if isinstance(segment.points, str) else segment.points
|
||||
# So list is fine if we pass it directly to _match_segment
|
||||
distance=mock_seg['distance'],
|
||||
bounds=json.dumps(calculate_bounds(mock_seg['points']))
|
||||
)
|
||||
# Note: We pass list directly to _match_segment, but Matcher.match_activity queries DB.
|
||||
# We will bypass match_activity lookup and call _match_segment directly.
|
||||
|
||||
# NOTE: _match_segment signature:
|
||||
# def _match_segment(self, segment: Segment, seg_points: List[List[float]], activity: Activity, act_points: List[List[float]]) -> Optional[Tuple[int, int]]:
|
||||
|
||||
try:
|
||||
indices = matcher._match_segment(seg_obj, mock_seg['points'], activity, points)
|
||||
|
||||
if indices:
|
||||
s, e = indices
|
||||
print(f" [PASS] Matched! Activity indexes {s} to {e}")
|
||||
success_count += 1
|
||||
else:
|
||||
print(f" [FAIL] No match found.")
|
||||
|
||||
except Exception as e:
|
||||
print(f" [ERROR] {e}")
|
||||
|
||||
print(f"\nSummary: {success_count}/{len(segments_to_test)} segments matched.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_segment_splitting("21368342318")
|
||||
74
FitnessSync/scratch/verify_timeout.py
Normal file
74
FitnessSync/scratch/verify_timeout.py
Normal file
@@ -0,0 +1,74 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# Adjust path to find backend modules. We need 'backend' to be in path so 'src' is top level.
|
||||
# Script is in backend/../scratch/verify_timeout.py
|
||||
# We want abs path to backend/
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
backend_path = os.path.join(project_root, 'backend')
|
||||
sys.path.append(backend_path)
|
||||
|
||||
# Mock dependencies to avoid database connections or complex setups
|
||||
# We need to mock modules BEFORE they are imported by the real code
|
||||
sys.modules['src.models'] = MagicMock()
|
||||
sys.modules['src.models.api_token'] = MagicMock()
|
||||
sys.modules['src.models.base'] = MagicMock()
|
||||
|
||||
# Now import the client from the src package
|
||||
from src.services.garmin.client import GarminClient
|
||||
import garth
|
||||
|
||||
|
||||
class TestGarminTimeout(unittest.TestCase):
|
||||
def test_timeout_injection(self):
|
||||
# Initialize client (mocking credentials to avoid network)
|
||||
client = GarminClient("test", "test")
|
||||
|
||||
# We want to verify that client.client.garth.sess.request (which we patched) adds 'timeout'
|
||||
# But we must be careful: we patched the INSTANCE method on init.
|
||||
|
||||
# We need to mock the ORIGINAL request method that our wrapper calls,
|
||||
# OR just mock the return value if we don't care about the original running.
|
||||
# content of patch:
|
||||
# original_request = self.client.garth.sess.request
|
||||
# ... return original_request(...)
|
||||
|
||||
# So if we mock 'original_request', how do we access it?
|
||||
# It's captured in the closure of 'request_with_timeout'.
|
||||
|
||||
# Instead, we can inspect what our wrapper does.
|
||||
# But 'original_request' is the REAL request method of requests.Session (since we didn't mock it before Init).
|
||||
# We don't want to make a real network call.
|
||||
|
||||
# So we should Mock `requests.Session.request` BEFORE initializing GarminClient?
|
||||
# But `GarminClient` init creates `garminconnect.Garmin` which creates `garth.Client` which creates `requests.Session`.
|
||||
|
||||
# We can mock `requests.Session.request` globally?
|
||||
with unittest.mock.patch('requests.Session.request') as mock_session_request:
|
||||
# Re-init client so it picks up the mock as 'original_request' ??
|
||||
# No, 'original_request = ...sess.request' grabs the bound method.
|
||||
# If we patch Session.request, new instances will have the mock.
|
||||
|
||||
client = GarminClient("test", "test")
|
||||
# Now `client.client.garth.sess.request` is our wrapper.
|
||||
# And `original_request` (inside wrapper) should be the mock_session_request (bound).
|
||||
|
||||
# Call our wrapper
|
||||
client.client.garth.sess.request("GET", "http://example.com")
|
||||
|
||||
# Verify the mock was called with timeout
|
||||
kwargs = mock_session_request.call_args.kwargs
|
||||
print(f"Call kwargs: {kwargs}")
|
||||
|
||||
self.assertIn('timeout', kwargs, "Timeout parameter missing from request")
|
||||
self.assertEqual(kwargs['timeout'], 30, "Timeout value incorrect")
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
unittest.main()
|
||||
except SystemExit as e:
|
||||
# Prevent unittest from exiting so we can see output if run via run_command with multiple steps
|
||||
pass
|
||||
Reference in New Issue
Block a user