import sys import os import logging from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import func import datetime # Setup environment sys.path.append(os.path.dirname(os.path.abspath(__file__))) from src.models.activity import Activity from src.models.segment import Segment from src.services.postgresql_manager import PostgreSQLManager from src.utils.config import config from src.services.job_manager import job_manager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def test_optimization(): db_manager = PostgreSQLManager(config.DATABASE_URL) with db_manager.get_db_session() as db: # 1. Reset activity = db.query(Activity).filter(Activity.id == 256).first() if not activity: print("Activity 256 not found") return print("Resetting last_segment_scan_timestamp...") activity.last_segment_scan_timestamp = None db.commit() # 2. Get Max Segment Date max_seg_date = db.query(func.max(Segment.created_at)).scalar() print(f"Max Segment Date: {max_seg_date}") # 3. Simulate Logic Check (Pass 1) last_scan = activity.last_segment_scan_timestamp if last_scan and max_seg_date and last_scan >= max_seg_date: print("PASS 1: Optimization INCORRECTLY skipped!") else: print("PASS 1: Optimization correctly signaled to scan.") # Simulate scan completion activity.last_segment_scan_timestamp = datetime.datetime.now(datetime.timezone.utc) db.commit() # 4. Refresh & Check Pass 2 db.refresh(activity) last_scan = activity.last_segment_scan_timestamp print(f"Activity Last Scan: {last_scan}") if last_scan and max_seg_date and last_scan >= max_seg_date: print("PASS 2: Optimization CORRECTLY skipped.") else: print(f"PASS 2: Optimization FAILED to skip. last_scan={last_scan} max_seg={max_seg_date}") # 5. Add New Segment print("Creating new segment...") new_seg = Segment( name="Future Segment", points="[]", bounds="[]", distance=100, activity_type="cycling" # created_at defaults to now ) db.add(new_seg) db.commit() # created_at set # Refresh max date logic max_seg_date_new = db.query(func.max(Segment.created_at)).scalar() print(f"New Max Segment Date: {max_seg_date_new}") if last_scan and max_seg_date_new and last_scan >= max_seg_date_new: print("PASS 3: Optimization FAILED (Generated False Positive Skip).") else: print("PASS 3: Optimization CORRECTLY signaled re-scan (Partial).") # Cleanup db.delete(new_seg) db.commit() if __name__ == "__main__": test_optimization()