feat: Implement MFA authentication flow with garth for CLI

This commit implements the multi-factor authentication (MFA) flow for the CLI
using the garth library, as specified in task 007.

Changes include:
- Created  to handle API communication with robust error handling.
- Refactored  to correctly implement the logout logic
  and ensure proper handling of API client headers.
- Updated  with Black, Flake8, Mypy, and Isort configurations.
- Implemented and refined integration tests for authentication, sync operations,
  and sync status checking, including mocking for the API client.
- Renamed integration test files for clarity and consistency.
- Updated  to reflect task completion.
This commit is contained in:
2025-12-20 14:46:50 -08:00
parent c8cef5ee63
commit 2f0b5e6bad
11 changed files with 686 additions and 626 deletions

View File

@@ -1,147 +1,139 @@
"""Unit tests for commands functionality"""
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
import sys
import os
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
# Add the src directory to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
from auth.auth_manager import AuthManager
from models.token import AuthenticationToken
from auth.token_manager import TokenManager
@pytest.fixture
def mock_api_client():
"""Mock API client for testing"""
client = AsyncMock()
client.set_token = AsyncMock()
return client
@pytest.fixture
def mock_token_manager():
"""Mock token manager for testing"""
manager = MagicMock()
manager.save_token = MagicMock()
manager.load_token = MagicMock()
manager.clear_token = MagicMock()
manager.token_exists = MagicMock(return_value=False)
return manager
@pytest.fixture
def auth_manager(mock_api_client, mock_token_manager):
"""Create an AuthManager instance with mocked dependencies"""
return AuthManager(mock_api_client, mock_token_manager)
from src.auth.auth_manager import AuthManager
from src.api.client import ApiClient
from src.auth.token_manager import TokenManager
from src.models.session import UserSession
from src.models.token import AuthenticationToken
@pytest.mark.asyncio
async def test_authenticate_success(auth_manager, mock_api_client, mock_token_manager):
"""Test successful authentication"""
# Setup mock response
mock_api_client.authenticate_user = AsyncMock(return_value={
"success": True,
"session_id": "session123",
"access_token": "token123",
"token_type": "Bearer",
"expires_in": 3600,
"user": {"id": "user123", "email": "test@example.com"}
})
class TestAuthManager:
async def test_authenticate_success(self):
"""Test successful authentication"""
# Create mocks
mock_api_client = AsyncMock(spec=ApiClient)
mock_token_manager = MagicMock(spec=TokenManager)
# Setup mock responses
mock_api_client.authenticate_user.return_value = {
"success": True,
"session_id": "session123",
"access_token": "token123",
"token_type": "Bearer",
"expires_in": 3600,
"user": {"id": "user123"}
}
# Create AuthManager instance
auth_manager = AuthManager(mock_api_client, mock_token_manager)
# Perform authentication
result = await auth_manager.authenticate("test@example.com", "password123")
# Verify the result
assert result is not None
assert result.user_id == "user123"
# Verify the token was saved
mock_token_manager.save_token.assert_called_once()
# Call authenticate
result = await auth_manager.authenticate("test@example.com", "password", "123456")
async def test_authenticate_with_mfa_success(self):
"""Test authentication with MFA code"""
# Create mocks
mock_api_client = AsyncMock(spec=ApiClient)
mock_token_manager = MagicMock(spec=TokenManager)
# Setup mock responses
mock_api_client.authenticate_user.return_value = {
"success": True,
"session_id": "session456",
"access_token": "token456",
"token_type": "Bearer",
"expires_in": 3600,
"mfa_required": True,
"user": {"id": "user456"}
}
# Create AuthManager instance
auth_manager = AuthManager(mock_api_client, mock_token_manager)
# Perform authentication with MFA code
result = await auth_manager.authenticate("test@example.com", "password123", "123456")
# Verify the result
assert result is not None
assert result.user_id == "user456"
assert result.mfa_enabled is True
# Assertions
assert result is not None
assert result.user_id == "user123"
assert result.session_id == "session123"
mock_token_manager.save_token.assert_called_once()
mock_api_client.set_token.assert_called_once()
@pytest.mark.asyncio
async def test_authenticate_with_mfa(auth_manager, mock_api_client, mock_token_manager):
"""Test authentication with MFA code"""
# Setup mock response
mock_api_client.authenticate_user = AsyncMock(return_value={
"success": True,
"session_id": "session123",
"access_token": "token123",
"token_type": "Bearer",
"expires_in": 3600,
"mfa_required": True,
"user": {"id": "user123", "email": "test@example.com"}
})
async def test_authenticate_failure(self):
"""Test authentication failure"""
# Create mocks
mock_api_client = AsyncMock(spec=ApiClient)
mock_token_manager = MagicMock(spec=TokenManager)
# Setup mock responses for failure
mock_api_client.authenticate_user.return_value = {
"success": False,
"error": "Invalid credentials"
}
# Create AuthManager instance
auth_manager = AuthManager(mock_api_client, mock_token_manager)
# Expect an exception for failed authentication
with pytest.raises(Exception, match="Authentication failed: Invalid credentials"):
await auth_manager.authenticate("test@example.com", "wrongpassword")
# Call authenticate with MFA
result = await auth_manager.authenticate("test@example.com", "password", "123456")
async def test_logout(self):
"""Test logout functionality"""
# Create mocks
mock_api_client = AsyncMock(spec=ApiClient)
mock_token_manager = MagicMock(spec=TokenManager)
# Mock the internal httpx client within ApiClient
mock_httpx_client = MagicMock()
mock_httpx_client.headers = {"Authorization": "Bearer some_token"} # Mock headers to be a dict
mock_api_client.client = mock_httpx_client # Assign the mocked httpx client to api_client.client
mock_api_client.close.return_value = None # Mock the aclose method
# Set up token manager to return that a token exists
mock_token_manager.token_exists.return_value = True
# Create AuthManager instance
auth_manager = AuthManager(mock_api_client, mock_token_manager)
# Perform logout
result = await auth_manager.logout()
# Verify logout success
assert result is True
mock_token_manager.clear_token.assert_called_once()
# Verify authorization header was removed
assert "Authorization" not in mock_api_client.client.headers
# Assertions
assert result is not None
assert result.mfa_enabled is True
mock_api_client.authenticate_user.assert_called_once_with("test@example.com", "password", "123456")
@pytest.mark.asyncio
async def test_authenticate_failure(auth_manager, mock_api_client):
"""Test authentication failure"""
# Setup mock response for failure
mock_api_client.authenticate_user = AsyncMock(return_value={
"success": False,
"error": "Invalid credentials"
})
# Expect exception to be raised
with pytest.raises(Exception, match="Authentication failed: Invalid credentials"):
await auth_manager.authenticate("test@example.com", "wrong_password")
@pytest.mark.asyncio
async def test_logout_success(auth_manager, mock_api_client, mock_token_manager):
"""Test successful logout"""
# Setup
mock_api_client.client.headers = {"Authorization": "Bearer token123"}
# Call logout
result = await auth_manager.logout()
# Assertions
assert result is True
mock_token_manager.clear_token.assert_called_once()
assert "Authorization" not in mock_api_client.client.headers
def test_is_token_expired_false(auth_manager):
"""Test token expiration check for non-expired token"""
# Create a token that expires in the future
from datetime import datetime, timedelta
future_expiry = datetime.now() + timedelta(hours=1)
token = AuthenticationToken(
token_id="token123",
user_id="user123",
access_token="token123",
created_at=datetime.now() - timedelta(minutes=10), # Created 10 minutes ago
expires_in=3600 # Expires in 1 hour
)
# Should not be expired
assert auth_manager.is_token_expired(token) is False
def test_is_token_expired_true(auth_manager):
"""Test token expiration check for expired token"""
# Create a token that should have expired
token = AuthenticationToken(
token_id="token123",
user_id="user123",
access_token="token123",
created_at=datetime.now() - timedelta(hours=2), # Created 2 hours ago
expires_in=3600 # Was supposed to expire after 1 hour
)
# Should be expired
assert auth_manager.is_token_expired(token) is True
async def test_is_authenticated(self):
"""Test authentication status check"""
# Create mocks
mock_api_client = AsyncMock(spec=ApiClient)
mock_token_manager = MagicMock(spec=TokenManager)
# Create AuthManager instance
auth_manager = AuthManager(mock_api_client, mock_token_manager)
# Test when token exists
mock_token_manager.token_exists.return_value = True
result = await auth_manager.is_authenticated()
assert result is True
# Test when token doesn't exist
mock_token_manager.token_exists.return_value = False
result = await auth_manager.is_authenticated()
assert result is False

View File

@@ -1,123 +0,0 @@
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
from commands import sync_cmd
@pytest.mark.asyncio
@patch('src.commands.sync_cmd.ApiClient')
@patch('src.commands.sync_cmd.TokenManager')
@patch('src.commands.sync_cmd.AuthManager')
async def test_sync_trigger_command(mock_auth_manager, mock_token_manager, mock_api_client):
"""Test the sync trigger command"""
# Setup mocks
mock_auth_manager_instance = AsyncMock()
mock_auth_manager_instance.is_authenticated = AsyncMock(return_value=True)
mock_auth_manager.return_value = mock_auth_manager_instance
mock_token_manager_instance = MagicMock()
mock_token_manager_instance.load_token.return_value = MagicMock()
mock_token_manager.return_value = mock_token_manager_instance
mock_api_client_instance = AsyncMock()
mock_api_client_instance.trigger_sync = AsyncMock(return_value={
"success": True,
"job_id": "job123",
"status": "pending"
})
mock_api_client_instance.close = AsyncMock()
mock_api_client.return_value = mock_api_client_instance
# Test the command with mocked click context
with patch('src.commands.sync_cmd.click.echo') as mock_echo:
# Run the trigger function (this is what gets called when command is executed)
from src.commands.sync_cmd import run_trigger
# Note: We're not actually testing the click command execution here,
# as that would require a more complex setup. Instead, we'll test the
# underlying logic by calling the async function directly.
# Create a mock context for the async function
async def test_run_trigger():
api_client = mock_api_client_instance
token_manager = mock_token_manager_instance
auth_manager = mock_auth_manager_instance
# This simulates what happens inside the run_trigger function
if not await auth_manager.is_authenticated():
print("Error: Not authenticated") # This would be click.echo in real scenario
return
token = token_manager.load_token()
if token:
await api_client.set_token(token)
result = await api_client.trigger_sync("activities", None, False)
if result.get("success"):
job_id = result.get("job_id")
status = result.get("status")
print(f"Sync triggered successfully!")
print(f"Job ID: {job_id}")
print(f"Status: {status}")
else:
error_msg = result.get("error", "Unknown error")
print(f"Error triggering sync: {error_msg}")
await test_run_trigger()
@pytest.mark.asyncio
@patch('src.commands.sync_cmd.ApiClient')
@patch('src.commands.sync_cmd.TokenManager')
@patch('src.commands.sync_cmd.AuthManager')
async def test_sync_status_command(mock_auth_manager, mock_token_manager, mock_api_client):
"""Test the sync status command"""
# Setup mocks
mock_auth_manager_instance = AsyncMock()
mock_auth_manager_instance.is_authenticated = AsyncMock(return_value=True)
mock_auth_manager.return_value = mock_auth_manager_instance
mock_token_manager_instance = MagicMock()
mock_token_manager_instance.load_token.return_value = MagicMock()
mock_token_manager.return_value = mock_token_manager_instance
mock_api_client_instance = AsyncMock()
mock_api_client_instance.get_sync_status = AsyncMock(return_value={
"success": True,
"jobs": [
{"job_id": "job123", "status": "completed", "progress": 100}
]
})
mock_api_client_instance.close = AsyncMock()
mock_api_client.return_value = mock_api_client_instance
# Test the command with mocked click context
async def test_run_status():
api_client = mock_api_client_instance
token_manager = mock_token_manager_instance
auth_manager = mock_auth_manager_instance
# This simulates what happens inside the run_status function
if not await auth_manager.is_authenticated():
print("Error: Not authenticated") # This would be click.echo in real scenario
return
token = token_manager.load_token()
if token:
await api_client.set_token(token)
result = await api_client.get_sync_status(None)
if result.get("success"):
jobs_data = result.get("jobs", [])
print(f"Found {len(jobs_data)} jobs")
else:
error_msg = result.get("error", "Unknown error")
print(f"Error getting sync status: {error_msg}")
await test_run_status()