278 lines
9.1 KiB
Python
278 lines
9.1 KiB
Python
# test_token_manager.py - Token 管理器測試
|
||
"""
|
||
Tests for TokenManager class.
|
||
Uses sandbox testing to avoid modifying real files.
|
||
"""
|
||
|
||
import json
|
||
import pytest
|
||
import tempfile
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
# 需要確保可以導入
|
||
import sys
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from src.services.token_manager import TokenManager
|
||
|
||
|
||
class TestTokenManager:
|
||
"""TokenManager 測試套件"""
|
||
|
||
@pytest.fixture
|
||
def temp_json_file(self, tmp_path):
|
||
"""創建臨時 JSON 文件用於沙箱測試"""
|
||
json_file = tmp_path / "openclaw.json"
|
||
test_data = {
|
||
"channels": {
|
||
"matrix": {
|
||
"accessToken": "old_token_12345",
|
||
"homeserver": "https://matrix.org"
|
||
},
|
||
"telegram": {
|
||
"botToken": "telegram_token"
|
||
}
|
||
},
|
||
"settings": {
|
||
"debug": True
|
||
}
|
||
}
|
||
json_file.write_text(json.dumps(test_data, indent=2))
|
||
return json_file
|
||
|
||
@pytest.fixture
|
||
def token_manager(self, temp_json_file):
|
||
"""創建 TokenManager 實例"""
|
||
return TokenManager(
|
||
json_path=temp_json_file,
|
||
token_key="channels.matrix.accessToken"
|
||
)
|
||
|
||
def test_create_backup(self, token_manager, temp_json_file):
|
||
"""測試備份創建"""
|
||
backup_path = token_manager.create_backup()
|
||
|
||
assert backup_path is not None
|
||
assert backup_path.exists()
|
||
assert temp_json_file.name in backup_path.name
|
||
assert ".bak" in backup_path.name
|
||
|
||
# 驗證備份內容與原文件相同
|
||
original = json.loads(temp_json_file.read_text())
|
||
backup = json.loads(backup_path.read_text())
|
||
assert original == backup
|
||
|
||
def test_verify_backup_exists(self, token_manager):
|
||
"""測試備份驗證"""
|
||
# 未創建備份時應該返回 False
|
||
assert token_manager.verify_backup_exists() is False
|
||
|
||
# 創建備份後應該返回 True
|
||
token_manager.create_backup()
|
||
assert token_manager.verify_backup_exists() is True
|
||
|
||
def test_get_nested_value(self, token_manager):
|
||
"""測試嵌套值讀取"""
|
||
data = {
|
||
"level1": {
|
||
"level2": {
|
||
"level3": "value"
|
||
}
|
||
}
|
||
}
|
||
|
||
assert token_manager._get_nested_value(data, "level1.level2.level3") == "value"
|
||
assert token_manager._get_nested_value(data, "level1.level2") is None # 不是字符串
|
||
assert token_manager._get_nested_value(data, "nonexistent") is None
|
||
|
||
def test_set_nested_value(self, token_manager):
|
||
"""測試嵌套值設置"""
|
||
data = {}
|
||
|
||
result = token_manager._set_nested_value(data, "a.b.c", "new_value")
|
||
|
||
assert result["a"]["b"]["c"] == "new_value"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_read_current_token(self, token_manager):
|
||
"""測試讀取當前 token"""
|
||
token = await token_manager.read_current_token()
|
||
|
||
assert token == "old_token_12345"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_access_token(self, token_manager, temp_json_file):
|
||
"""測試更新 access token(核心功能)"""
|
||
new_token = "new_token_67890"
|
||
|
||
# 執行更新
|
||
success = await token_manager.update_access_token(new_token)
|
||
|
||
# 驗證成功
|
||
assert success is True
|
||
|
||
# 驗證備份存在
|
||
assert token_manager.verify_backup_exists() is True
|
||
|
||
# 驗證新 token 已寫入
|
||
updated_data = json.loads(temp_json_file.read_text())
|
||
assert updated_data["channels"]["matrix"]["accessToken"] == new_token
|
||
|
||
# 驗證其他數據未被破壞
|
||
assert updated_data["channels"]["telegram"]["botToken"] == "telegram_token"
|
||
assert updated_data["settings"]["debug"] is True
|
||
|
||
def test_restore_from_backup(self, token_manager, temp_json_file):
|
||
"""測試從備份恢復"""
|
||
# 創建備份
|
||
backup_path = token_manager.create_backup()
|
||
|
||
# 修改原文件
|
||
temp_json_file.write_text('{"corrupted": true}')
|
||
|
||
# 恢復
|
||
success = token_manager.restore_from_backup()
|
||
|
||
assert success is True
|
||
|
||
# 驗證恢復結果
|
||
restored_data = json.loads(temp_json_file.read_text())
|
||
assert restored_data["channels"]["matrix"]["accessToken"] == "old_token_12345"
|
||
|
||
|
||
class TestTokenManagerEdgeCases:
|
||
"""TokenManager 邊緣情況測試"""
|
||
|
||
def test_backup_nonexistent_file(self, tmp_path):
|
||
"""測試備份不存在的文件"""
|
||
manager = TokenManager(
|
||
json_path=tmp_path / "nonexistent.json",
|
||
token_key="test"
|
||
)
|
||
|
||
backup_path = manager.create_backup()
|
||
assert backup_path is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_nonexistent_file(self, tmp_path):
|
||
"""測試更新不存在的文件"""
|
||
manager = TokenManager(
|
||
json_path=tmp_path / "nonexistent.json",
|
||
token_key="test"
|
||
)
|
||
|
||
success = await manager.update_access_token("new_token")
|
||
assert success is False
|
||
|
||
|
||
class TestBackupRotation:
|
||
"""備份輪轉功能測試(沙箱測試)"""
|
||
|
||
@pytest.fixture
|
||
def temp_json_file(self, tmp_path):
|
||
"""創建臨時 JSON 文件"""
|
||
json_file = tmp_path / "openclaw.json"
|
||
test_data = {"channels": {"matrix": {"accessToken": "test_token"}}}
|
||
json_file.write_text(json.dumps(test_data))
|
||
return json_file
|
||
|
||
@pytest.fixture
|
||
def token_manager(self, temp_json_file):
|
||
"""創建 TokenManager 實例"""
|
||
return TokenManager(
|
||
json_path=temp_json_file,
|
||
token_key="channels.matrix.accessToken"
|
||
)
|
||
|
||
def test_max_backups_constant(self):
|
||
"""測試最大備份數量常量"""
|
||
assert TokenManager.MAX_BACKUPS == 5
|
||
|
||
def test_get_backup_files_empty(self, token_manager):
|
||
"""測試沒有備份時返回空列表"""
|
||
backups = token_manager._get_backup_files()
|
||
assert backups == []
|
||
|
||
def test_get_backup_files_with_backups(self, token_manager, tmp_path):
|
||
"""測試獲取備份文件列表"""
|
||
# 創建幾個備份
|
||
for i in range(3):
|
||
token_manager.create_backup()
|
||
import time
|
||
time.sleep(0.01) # 確保時間戳不同
|
||
|
||
backups = token_manager._get_backup_files()
|
||
assert len(backups) == 3
|
||
|
||
# 確認按文件名排序(時間戳在名稱中,字典序=時間序,最新在前)
|
||
for i in range(len(backups) - 1):
|
||
assert backups[i].name > backups[i + 1].name
|
||
|
||
def test_rotation_under_limit(self, token_manager):
|
||
"""測試備份數量未超過限制時不刪除"""
|
||
# 創建 3 個備份(低於限制)
|
||
for i in range(3):
|
||
token_manager.create_backup()
|
||
import time
|
||
time.sleep(0.01)
|
||
|
||
assert token_manager.get_backup_count() == 3
|
||
|
||
def test_rotation_at_limit(self, token_manager):
|
||
"""測試備份數量剛好達到限制"""
|
||
# 創建 5 個備份(剛好等於限制)
|
||
for i in range(5):
|
||
token_manager.create_backup()
|
||
import time
|
||
time.sleep(0.01)
|
||
|
||
assert token_manager.get_backup_count() == 5
|
||
|
||
def test_rotation_over_limit(self, token_manager):
|
||
"""測試備份數量超過限制時自動刪除最舊的"""
|
||
# 創建 7 個備份(超過限制)
|
||
for i in range(7):
|
||
token_manager.create_backup()
|
||
import time
|
||
time.sleep(0.01)
|
||
|
||
# 應該只保留 5 個
|
||
assert token_manager.get_backup_count() == 5
|
||
|
||
def test_rotation_preserves_newest(self, token_manager, temp_json_file):
|
||
"""測試輪轉保留最新的備份"""
|
||
backup_paths = []
|
||
|
||
# 創建 7 個備份
|
||
for i in range(7):
|
||
path = token_manager.create_backup()
|
||
if path:
|
||
backup_paths.append(path)
|
||
import time
|
||
time.sleep(0.01)
|
||
|
||
# 檢查最新的 5 個備份仍然存在
|
||
remaining_backups = token_manager._get_backup_files()
|
||
assert len(remaining_backups) == 5
|
||
|
||
# 最新的 5 個應該是最後創建的 5 個
|
||
newest_5 = backup_paths[-5:]
|
||
for path in newest_5:
|
||
assert path.exists(), f"{path} should exist"
|
||
|
||
# 最舊的 2 個應該被刪除
|
||
oldest_2 = backup_paths[:2]
|
||
for path in oldest_2:
|
||
assert not path.exists(), f"{path} should be deleted"
|
||
|
||
def test_get_backup_count(self, token_manager):
|
||
"""測試獲取備份數量"""
|
||
assert token_manager.get_backup_count() == 0
|
||
|
||
token_manager.create_backup()
|
||
assert token_manager.get_backup_count() == 1
|
||
|
||
token_manager.create_backup()
|
||
assert token_manager.get_backup_count() == 2
|