278 lines
9.1 KiB
Python
278 lines
9.1 KiB
Python
|
|
# test_token_manager.py - Token 管理器測試
|
|||
|
|
"""
|
|||
|
|
Tests for TokenManager class.
|
|||
|
|
Uses sandbox testing to avoid modifying real files.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import pytest
|
|||
|
|
import tempfile
|
|||
|
|
from pathlib import Path
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# 需要確保可以導入
|
|||
|
|
import sys
|
|||
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|||
|
|
|
|||
|
|
from src.services.token_manager import TokenManager
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestTokenManager:
|
|||
|
|
"""TokenManager 測試套件"""
|
|||
|
|
|
|||
|
|
@pytest.fixture
|
|||
|
|
def temp_json_file(self, tmp_path):
|
|||
|
|
"""創建臨時 JSON 文件用於沙箱測試"""
|
|||
|
|
json_file = tmp_path / "openclaw.json"
|
|||
|
|
test_data = {
|
|||
|
|
"channels": {
|
|||
|
|
"matrix": {
|
|||
|
|
"accessToken": "old_token_12345",
|
|||
|
|
"homeserver": "https://matrix.org"
|
|||
|
|
},
|
|||
|
|
"telegram": {
|
|||
|
|
"botToken": "telegram_token"
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
"settings": {
|
|||
|
|
"debug": True
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
json_file.write_text(json.dumps(test_data, indent=2))
|
|||
|
|
return json_file
|
|||
|
|
|
|||
|
|
@pytest.fixture
|
|||
|
|
def token_manager(self, temp_json_file):
|
|||
|
|
"""創建 TokenManager 實例"""
|
|||
|
|
return TokenManager(
|
|||
|
|
json_path=temp_json_file,
|
|||
|
|
token_key="channels.matrix.accessToken"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def test_create_backup(self, token_manager, temp_json_file):
|
|||
|
|
"""測試備份創建"""
|
|||
|
|
backup_path = token_manager.create_backup()
|
|||
|
|
|
|||
|
|
assert backup_path is not None
|
|||
|
|
assert backup_path.exists()
|
|||
|
|
assert temp_json_file.name in backup_path.name
|
|||
|
|
assert ".bak" in backup_path.name
|
|||
|
|
|
|||
|
|
# 驗證備份內容與原文件相同
|
|||
|
|
original = json.loads(temp_json_file.read_text())
|
|||
|
|
backup = json.loads(backup_path.read_text())
|
|||
|
|
assert original == backup
|
|||
|
|
|
|||
|
|
def test_verify_backup_exists(self, token_manager):
|
|||
|
|
"""測試備份驗證"""
|
|||
|
|
# 未創建備份時應該返回 False
|
|||
|
|
assert token_manager.verify_backup_exists() is False
|
|||
|
|
|
|||
|
|
# 創建備份後應該返回 True
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
assert token_manager.verify_backup_exists() is True
|
|||
|
|
|
|||
|
|
def test_get_nested_value(self, token_manager):
|
|||
|
|
"""測試嵌套值讀取"""
|
|||
|
|
data = {
|
|||
|
|
"level1": {
|
|||
|
|
"level2": {
|
|||
|
|
"level3": "value"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
assert token_manager._get_nested_value(data, "level1.level2.level3") == "value"
|
|||
|
|
assert token_manager._get_nested_value(data, "level1.level2") is None # 不是字符串
|
|||
|
|
assert token_manager._get_nested_value(data, "nonexistent") is None
|
|||
|
|
|
|||
|
|
def test_set_nested_value(self, token_manager):
|
|||
|
|
"""測試嵌套值設置"""
|
|||
|
|
data = {}
|
|||
|
|
|
|||
|
|
result = token_manager._set_nested_value(data, "a.b.c", "new_value")
|
|||
|
|
|
|||
|
|
assert result["a"]["b"]["c"] == "new_value"
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_read_current_token(self, token_manager):
|
|||
|
|
"""測試讀取當前 token"""
|
|||
|
|
token = await token_manager.read_current_token()
|
|||
|
|
|
|||
|
|
assert token == "old_token_12345"
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_update_access_token(self, token_manager, temp_json_file):
|
|||
|
|
"""測試更新 access token(核心功能)"""
|
|||
|
|
new_token = "new_token_67890"
|
|||
|
|
|
|||
|
|
# 執行更新
|
|||
|
|
success = await token_manager.update_access_token(new_token)
|
|||
|
|
|
|||
|
|
# 驗證成功
|
|||
|
|
assert success is True
|
|||
|
|
|
|||
|
|
# 驗證備份存在
|
|||
|
|
assert token_manager.verify_backup_exists() is True
|
|||
|
|
|
|||
|
|
# 驗證新 token 已寫入
|
|||
|
|
updated_data = json.loads(temp_json_file.read_text())
|
|||
|
|
assert updated_data["channels"]["matrix"]["accessToken"] == new_token
|
|||
|
|
|
|||
|
|
# 驗證其他數據未被破壞
|
|||
|
|
assert updated_data["channels"]["telegram"]["botToken"] == "telegram_token"
|
|||
|
|
assert updated_data["settings"]["debug"] is True
|
|||
|
|
|
|||
|
|
def test_restore_from_backup(self, token_manager, temp_json_file):
|
|||
|
|
"""測試從備份恢復"""
|
|||
|
|
# 創建備份
|
|||
|
|
backup_path = token_manager.create_backup()
|
|||
|
|
|
|||
|
|
# 修改原文件
|
|||
|
|
temp_json_file.write_text('{"corrupted": true}')
|
|||
|
|
|
|||
|
|
# 恢復
|
|||
|
|
success = token_manager.restore_from_backup()
|
|||
|
|
|
|||
|
|
assert success is True
|
|||
|
|
|
|||
|
|
# 驗證恢復結果
|
|||
|
|
restored_data = json.loads(temp_json_file.read_text())
|
|||
|
|
assert restored_data["channels"]["matrix"]["accessToken"] == "old_token_12345"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestTokenManagerEdgeCases:
|
|||
|
|
"""TokenManager 邊緣情況測試"""
|
|||
|
|
|
|||
|
|
def test_backup_nonexistent_file(self, tmp_path):
|
|||
|
|
"""測試備份不存在的文件"""
|
|||
|
|
manager = TokenManager(
|
|||
|
|
json_path=tmp_path / "nonexistent.json",
|
|||
|
|
token_key="test"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
backup_path = manager.create_backup()
|
|||
|
|
assert backup_path is None
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_update_nonexistent_file(self, tmp_path):
|
|||
|
|
"""測試更新不存在的文件"""
|
|||
|
|
manager = TokenManager(
|
|||
|
|
json_path=tmp_path / "nonexistent.json",
|
|||
|
|
token_key="test"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
success = await manager.update_access_token("new_token")
|
|||
|
|
assert success is False
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestBackupRotation:
|
|||
|
|
"""備份輪轉功能測試(沙箱測試)"""
|
|||
|
|
|
|||
|
|
@pytest.fixture
|
|||
|
|
def temp_json_file(self, tmp_path):
|
|||
|
|
"""創建臨時 JSON 文件"""
|
|||
|
|
json_file = tmp_path / "openclaw.json"
|
|||
|
|
test_data = {"channels": {"matrix": {"accessToken": "test_token"}}}
|
|||
|
|
json_file.write_text(json.dumps(test_data))
|
|||
|
|
return json_file
|
|||
|
|
|
|||
|
|
@pytest.fixture
|
|||
|
|
def token_manager(self, temp_json_file):
|
|||
|
|
"""創建 TokenManager 實例"""
|
|||
|
|
return TokenManager(
|
|||
|
|
json_path=temp_json_file,
|
|||
|
|
token_key="channels.matrix.accessToken"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def test_max_backups_constant(self):
|
|||
|
|
"""測試最大備份數量常量"""
|
|||
|
|
assert TokenManager.MAX_BACKUPS == 5
|
|||
|
|
|
|||
|
|
def test_get_backup_files_empty(self, token_manager):
|
|||
|
|
"""測試沒有備份時返回空列表"""
|
|||
|
|
backups = token_manager._get_backup_files()
|
|||
|
|
assert backups == []
|
|||
|
|
|
|||
|
|
def test_get_backup_files_with_backups(self, token_manager, tmp_path):
|
|||
|
|
"""測試獲取備份文件列表"""
|
|||
|
|
# 創建幾個備份
|
|||
|
|
for i in range(3):
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
import time
|
|||
|
|
time.sleep(0.01) # 確保時間戳不同
|
|||
|
|
|
|||
|
|
backups = token_manager._get_backup_files()
|
|||
|
|
assert len(backups) == 3
|
|||
|
|
|
|||
|
|
# 確認按文件名排序(時間戳在名稱中,字典序=時間序,最新在前)
|
|||
|
|
for i in range(len(backups) - 1):
|
|||
|
|
assert backups[i].name > backups[i + 1].name
|
|||
|
|
|
|||
|
|
def test_rotation_under_limit(self, token_manager):
|
|||
|
|
"""測試備份數量未超過限制時不刪除"""
|
|||
|
|
# 創建 3 個備份(低於限制)
|
|||
|
|
for i in range(3):
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
import time
|
|||
|
|
time.sleep(0.01)
|
|||
|
|
|
|||
|
|
assert token_manager.get_backup_count() == 3
|
|||
|
|
|
|||
|
|
def test_rotation_at_limit(self, token_manager):
|
|||
|
|
"""測試備份數量剛好達到限制"""
|
|||
|
|
# 創建 5 個備份(剛好等於限制)
|
|||
|
|
for i in range(5):
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
import time
|
|||
|
|
time.sleep(0.01)
|
|||
|
|
|
|||
|
|
assert token_manager.get_backup_count() == 5
|
|||
|
|
|
|||
|
|
def test_rotation_over_limit(self, token_manager):
|
|||
|
|
"""測試備份數量超過限制時自動刪除最舊的"""
|
|||
|
|
# 創建 7 個備份(超過限制)
|
|||
|
|
for i in range(7):
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
import time
|
|||
|
|
time.sleep(0.01)
|
|||
|
|
|
|||
|
|
# 應該只保留 5 個
|
|||
|
|
assert token_manager.get_backup_count() == 5
|
|||
|
|
|
|||
|
|
def test_rotation_preserves_newest(self, token_manager, temp_json_file):
|
|||
|
|
"""測試輪轉保留最新的備份"""
|
|||
|
|
backup_paths = []
|
|||
|
|
|
|||
|
|
# 創建 7 個備份
|
|||
|
|
for i in range(7):
|
|||
|
|
path = token_manager.create_backup()
|
|||
|
|
if path:
|
|||
|
|
backup_paths.append(path)
|
|||
|
|
import time
|
|||
|
|
time.sleep(0.01)
|
|||
|
|
|
|||
|
|
# 檢查最新的 5 個備份仍然存在
|
|||
|
|
remaining_backups = token_manager._get_backup_files()
|
|||
|
|
assert len(remaining_backups) == 5
|
|||
|
|
|
|||
|
|
# 最新的 5 個應該是最後創建的 5 個
|
|||
|
|
newest_5 = backup_paths[-5:]
|
|||
|
|
for path in newest_5:
|
|||
|
|
assert path.exists(), f"{path} should exist"
|
|||
|
|
|
|||
|
|
# 最舊的 2 個應該被刪除
|
|||
|
|
oldest_2 = backup_paths[:2]
|
|||
|
|
for path in oldest_2:
|
|||
|
|
assert not path.exists(), f"{path} should be deleted"
|
|||
|
|
|
|||
|
|
def test_get_backup_count(self, token_manager):
|
|||
|
|
"""測試獲取備份數量"""
|
|||
|
|
assert token_manager.get_backup_count() == 0
|
|||
|
|
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
assert token_manager.get_backup_count() == 1
|
|||
|
|
|
|||
|
|
token_manager.create_backup()
|
|||
|
|
assert token_manager.get_backup_count() == 2
|