This commit is contained in:
Dieu 2025-07-13 23:42:33 +02:00
parent 39bcee3f7b
commit c9fda793e2
5 changed files with 7 additions and 878 deletions

View file

@ -14,7 +14,7 @@ from pydantic import BaseModel, Field, ConfigDict
from open_webui.models.users import Users, UserModel
from open_webui.models.groups import Groups, GroupModel
from open_webui.utils.auth import get_admin_user, get_current_user, decode_token
from open_webui.utils.auth import get_admin_user, get_current_user, decode_token, get_verified_user
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import SRC_LOG_LEVELS
@ -236,8 +236,13 @@ def get_scim_auth(request: Request, authorization: Optional[str] = Header(None))
)
return True
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
log.error(f"SCIM authentication error: {e}")
import traceback
log.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication failed",
@ -312,6 +317,7 @@ def group_to_scim(group: GroupModel, request: Request) -> SCIMGroup:
)
# SCIM Service Provider Config
@router.get("/ServiceProviderConfig")
async def get_service_provider_config():

View file

@ -1,347 +0,0 @@
"""
Tests for SCIM 2.0 endpoints
"""
import json
import pytest
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from datetime import datetime, timezone
from open_webui.main import app
from open_webui.models.users import UserModel
from open_webui.models.groups import GroupModel
class TestSCIMEndpoints:
"""Test SCIM 2.0 endpoints"""
@pytest.fixture
def client(self):
return TestClient(app)
@pytest.fixture
def admin_token(self):
"""Mock admin token for authentication"""
return "mock-admin-token"
@pytest.fixture
def mock_admin_user(self):
"""Mock admin user"""
return UserModel(
id="admin-123",
name="Admin User",
email="admin@example.com",
role="admin",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_user(self):
"""Mock regular user"""
return UserModel(
id="user-456",
name="Test User",
email="test@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_group(self):
"""Mock group"""
return GroupModel(
id="group-789",
user_id="admin-123",
name="Test Group",
description="Test group description",
user_ids=["user-456"],
created_at=1234567890,
updated_at=1234567890
)
@pytest.fixture
def auth_headers(self, admin_token):
"""Authorization headers for requests"""
return {"Authorization": f"Bearer {admin_token}"}
# Service Provider Config Tests
def test_get_service_provider_config(self, client):
"""Test getting SCIM Service Provider Configuration"""
response = client.get("/api/v1/scim/v2/ServiceProviderConfig")
assert response.status_code == 200
data = response.json()
assert "schemas" in data
assert data["schemas"] == ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"]
assert "patch" in data
assert data["patch"]["supported"] == True
assert "filter" in data
assert data["filter"]["supported"] == True
# Resource Types Tests
def test_get_resource_types(self, client):
"""Test getting SCIM Resource Types"""
response = client.get("/api/v1/scim/v2/ResourceTypes")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 2
# Check User resource type
user_type = next(r for r in data if r["id"] == "User")
assert user_type["name"] == "User"
assert user_type["endpoint"] == "/Users"
assert user_type["schema"] == "urn:ietf:params:scim:schemas:core:2.0:User"
# Check Group resource type
group_type = next(r for r in data if r["id"] == "Group")
assert group_type["name"] == "Group"
assert group_type["endpoint"] == "/Groups"
assert group_type["schema"] == "urn:ietf:params:scim:schemas:core:2.0:Group"
# Schemas Tests
def test_get_schemas(self, client):
"""Test getting SCIM Schemas"""
response = client.get("/api/v1/scim/v2/Schemas")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 2
# Check User schema
user_schema = next(s for s in data if s["id"] == "urn:ietf:params:scim:schemas:core:2.0:User")
assert user_schema["name"] == "User"
assert "attributes" in user_schema
# Check Group schema
group_schema = next(s for s in data if s["id"] == "urn:ietf:params:scim:schemas:core:2.0:Group")
assert group_schema["name"] == "Group"
assert "attributes" in group_schema
# User Tests
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.get_users')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_users(self, mock_get_groups, mock_get_users, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_user):
"""Test listing SCIM users"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.return_value = mock_admin_user
mock_get_users.return_value = {
"users": [mock_user],
"total": 1
}
mock_get_groups.return_value = []
response = client.get("/api/v1/scim/v2/Users", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["schemas"] == ["urn:ietf:params:scim:api:messages:2.0:ListResponse"]
assert data["totalResults"] == 1
assert data["itemsPerPage"] == 1
assert data["startIndex"] == 1
assert len(data["Resources"]) == 1
user = data["Resources"][0]
assert user["id"] == "user-456"
assert user["userName"] == "test@example.com"
assert user["displayName"] == "Test User"
assert user["active"] == True
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_user_by_id(self, mock_get_groups, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_user):
"""Test getting a specific SCIM user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.side_effect = lambda id: mock_admin_user if id == "admin-123" else mock_user
mock_get_groups.return_value = []
response = client.get("/api/v1/scim/v2/Users/user-456", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["id"] == "user-456"
assert data["userName"] == "test@example.com"
assert data["displayName"] == "Test User"
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.get_user_by_email')
@patch('open_webui.models.users.Users.insert_new_user')
def test_create_user(self, mock_insert_user, mock_get_user_by_email, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user):
"""Test creating a SCIM user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.return_value = mock_admin_user
mock_get_user_by_email.return_value = None
new_user = UserModel(
id="new-user-123",
name="New User",
email="newuser@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
mock_insert_user.return_value = new_user
create_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "newuser@example.com",
"displayName": "New User",
"emails": [{"value": "newuser@example.com", "primary": True}],
"active": True
}
response = client.post("/api/v1/scim/v2/Users", headers=auth_headers, json=create_data)
assert response.status_code == 201
data = response.json()
assert data["userName"] == "newuser@example.com"
assert data["displayName"] == "New User"
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.update_user_by_id')
def test_update_user(self, mock_update_user, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_user):
"""Test updating a SCIM user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.side_effect = lambda id: mock_admin_user if id == "admin-123" else mock_user
updated_user = mock_user.model_copy()
updated_user.name = "Updated User"
mock_update_user.return_value = updated_user
update_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"displayName": "Updated User"
}
response = client.put(f"/api/v1/scim/v2/Users/{mock_user.id}", headers=auth_headers, json=update_data)
assert response.status_code == 200
data = response.json()
assert data["displayName"] == "Updated User"
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.update_user_by_id')
def test_patch_user(self, mock_update_user, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_user):
"""Test patching a SCIM user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.side_effect = lambda id: mock_admin_user if id == "admin-123" else mock_user
updated_user = mock_user.model_copy()
updated_user.role = "pending"
mock_update_user.return_value = updated_user
patch_data = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "replace",
"path": "active",
"value": False
}
]
}
response = client.patch(f"/api/v1/scim/v2/Users/{mock_user.id}", headers=auth_headers, json=patch_data)
assert response.status_code == 200
data = response.json()
assert data["active"] == False
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.delete_user_by_id')
def test_delete_user(self, mock_delete_user, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_user):
"""Test deleting a SCIM user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.side_effect = lambda id: mock_admin_user if id == "admin-123" else mock_user
mock_delete_user.return_value = True
response = client.delete(f"/api/v1/scim/v2/Users/{mock_user.id}", headers=auth_headers)
assert response.status_code == 204
# Group Tests
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.groups.Groups.get_groups')
def test_get_groups(self, mock_get_groups, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_group):
"""Test listing SCIM groups"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.return_value = mock_admin_user
mock_get_groups.return_value = [mock_group]
response = client.get("/api/v1/scim/v2/Groups", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["schemas"] == ["urn:ietf:params:scim:api:messages:2.0:ListResponse"]
assert data["totalResults"] == 1
assert len(data["Resources"]) == 1
group = data["Resources"][0]
assert group["id"] == "group-789"
assert group["displayName"] == "Test Group"
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.get_super_admin_user')
@patch('open_webui.models.groups.Groups.insert_new_group')
def test_create_group(self, mock_insert_group, mock_get_super_admin, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user, mock_group):
"""Test creating a SCIM group"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.return_value = mock_admin_user
mock_get_super_admin.return_value = mock_admin_user
mock_insert_group.return_value = mock_group
create_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
"displayName": "Test Group"
}
response = client.post("/api/v1/scim/v2/Groups", headers=auth_headers, json=create_data)
assert response.status_code == 201
data = response.json()
assert data["displayName"] == "Test Group"
# Error Cases
def test_unauthorized_access(self, client):
"""Test accessing SCIM endpoints without authentication"""
response = client.get("/api/v1/scim/v2/Users")
assert response.status_code == 401
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
def test_non_admin_access(self, mock_get_user_by_id, mock_decode_token, client, mock_user):
"""Test accessing SCIM endpoints as non-admin user"""
mock_decode_token.return_value = {"id": "user-456"}
mock_get_user_by_id.return_value = mock_user
response = client.get("/api/v1/scim/v2/Users", headers={"Authorization": "Bearer non-admin-token"})
assert response.status_code == 403
@patch('open_webui.routers.scim.decode_token')
@patch('open_webui.models.users.Users.get_user_by_id')
def test_user_not_found(self, mock_get_user_by_id, mock_decode_token, client, auth_headers, mock_admin_user):
"""Test getting non-existent user"""
mock_decode_token.return_value = {"id": "admin-123"}
mock_get_user_by_id.side_effect = lambda id: mock_admin_user if id == "admin-123" else None
response = client.get("/api/v1/scim/v2/Users/non-existent", headers=auth_headers)
assert response.status_code == 404

View file

@ -1,237 +0,0 @@
"""
Fixed tests for SCIM 2.0 endpoints with proper authentication mocking
"""
import json
import pytest
from unittest.mock import patch, MagicMock, Mock
from fastapi.testclient import TestClient
from datetime import datetime, timezone
import time
from open_webui.main import app
from open_webui.models.users import UserModel
from open_webui.models.groups import GroupModel
class TestSCIMEndpointsFixed:
"""Test SCIM 2.0 endpoints with proper auth mocking"""
@pytest.fixture
def client(self):
return TestClient(app)
@pytest.fixture
def admin_token(self):
"""Mock admin token for authentication"""
return "mock-admin-token"
@pytest.fixture
def mock_admin_user(self):
"""Mock admin user"""
return UserModel(
id="admin-123",
name="Admin User",
email="admin@example.com",
role="admin",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_user(self):
"""Mock regular user"""
return UserModel(
id="user-456",
name="Test User",
email="test@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_group(self):
"""Mock group"""
return GroupModel(
id="group-789",
user_id="admin-123",
name="Test Group",
description="Test group description",
user_ids=["user-456"],
created_at=1234567890,
updated_at=1234567890
)
@pytest.fixture
def auth_headers(self, admin_token):
"""Authorization headers for requests"""
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture
def valid_token_data(self):
"""Valid token data"""
return {
"id": "admin-123",
"email": "admin@example.com",
"name": "Admin User",
"role": "admin",
"exp": int(time.time()) + 3600 # Valid for 1 hour
}
# Service Provider Config Tests (No auth required)
def test_get_service_provider_config(self, client):
"""Test getting SCIM Service Provider Configuration"""
response = client.get("/api/v1/scim/v2/ServiceProviderConfig")
assert response.status_code == 200
data = response.json()
assert "schemas" in data
assert data["schemas"] == ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"]
assert "patch" in data
assert data["patch"]["supported"] == True
assert "filter" in data
assert data["filter"]["supported"] == True
# Mock the entire authentication dependency
@patch('open_webui.routers.scim.get_scim_auth')
@patch('open_webui.models.users.Users.get_users')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_users_with_mocked_auth(self, mock_get_groups, mock_get_users, mock_get_scim_auth, client, auth_headers, mock_user):
"""Test listing SCIM users with mocked authentication"""
# Mock the authentication to always return True
mock_get_scim_auth.return_value = True
# Mock the database calls
mock_get_users.return_value = {
"users": [mock_user],
"total": 1
}
mock_get_groups.return_value = []
response = client.get("/api/v1/scim/v2/Users", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["schemas"] == ["urn:ietf:params:scim:api:messages:2.0:ListResponse"]
assert data["totalResults"] == 1
assert data["itemsPerPage"] == 1
assert data["startIndex"] == 1
assert len(data["Resources"]) == 1
user = data["Resources"][0]
assert user["id"] == "user-456"
assert user["userName"] == "test@example.com"
assert user["displayName"] == "Test User"
assert user["active"] == True
# Alternative approach: Mock at the decode_token level
def test_get_users_with_token_mock(self, client, auth_headers, mock_admin_user, mock_user, valid_token_data):
"""Test listing SCIM users with token decoding mocked"""
with patch('open_webui.routers.scim.decode_token') as mock_decode_token, \
patch('open_webui.models.users.Users.get_user_by_id') as mock_get_user_by_id, \
patch('open_webui.models.users.Users.get_users') as mock_get_users, \
patch('open_webui.models.groups.Groups.get_groups_by_member_id') as mock_get_groups:
# Setup mocks
mock_decode_token.return_value = valid_token_data
mock_get_user_by_id.return_value = mock_admin_user
mock_get_users.return_value = {
"users": [mock_user],
"total": 1
}
mock_get_groups.return_value = []
response = client.get("/api/v1/scim/v2/Users", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["totalResults"] == 1
# Test authentication failures
def test_unauthorized_access_no_header(self, client):
"""Test accessing SCIM endpoints without authentication header"""
response = client.get("/api/v1/scim/v2/Users")
assert response.status_code == 401
def test_unauthorized_access_invalid_token(self, client):
"""Test accessing SCIM endpoints with invalid token"""
with patch('open_webui.routers.scim.decode_token') as mock_decode_token:
mock_decode_token.return_value = None # Invalid token
response = client.get("/api/v1/scim/v2/Users", headers={"Authorization": "Bearer invalid-token"})
assert response.status_code == 401
def test_non_admin_access(self, client, mock_user):
"""Test accessing SCIM endpoints as non-admin user"""
with patch('open_webui.routers.scim.decode_token') as mock_decode_token, \
patch('open_webui.models.users.Users.get_user_by_id') as mock_get_user_by_id:
# Mock token for non-admin user
mock_decode_token.return_value = {"id": "user-456"}
mock_get_user_by_id.return_value = mock_user # Non-admin user
response = client.get("/api/v1/scim/v2/Users", headers={"Authorization": "Bearer user-token"})
assert response.status_code == 403
# Create user test with proper mocking
@patch('open_webui.routers.scim.get_scim_auth')
@patch('open_webui.models.users.Users.get_user_by_email')
@patch('open_webui.models.users.Users.insert_new_user')
def test_create_user(self, mock_insert_user, mock_get_user_by_email, mock_get_scim_auth, client, auth_headers):
"""Test creating a SCIM user"""
mock_get_scim_auth.return_value = True
mock_get_user_by_email.return_value = None # User doesn't exist
new_user = UserModel(
id="new-user-123",
name="New User",
email="newuser@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
mock_insert_user.return_value = new_user
create_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "newuser@example.com",
"displayName": "New User",
"emails": [{"value": "newuser@example.com", "primary": True}],
"active": True
}
response = client.post("/api/v1/scim/v2/Users", headers=auth_headers, json=create_data)
assert response.status_code == 201
data = response.json()
assert data["userName"] == "newuser@example.com"
assert data["displayName"] == "New User"
# Group tests
@patch('open_webui.routers.scim.get_scim_auth')
@patch('open_webui.models.groups.Groups.get_groups')
@patch('open_webui.models.users.Users.get_user_by_id')
def test_get_groups(self, mock_get_user_by_id, mock_get_groups, mock_get_scim_auth, client, auth_headers, mock_group, mock_user):
"""Test listing SCIM groups"""
mock_get_scim_auth.return_value = True
mock_get_groups.return_value = [mock_group]
mock_get_user_by_id.return_value = mock_user
response = client.get("/api/v1/scim/v2/Groups", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["schemas"] == ["urn:ietf:params:scim:api:messages:2.0:ListResponse"]
assert data["totalResults"] == 1
assert len(data["Resources"]) == 1
group = data["Resources"][0]
assert group["id"] == "group-789"
assert group["displayName"] == "Test Group"

View file

@ -1,163 +0,0 @@
"""
SCIM tests with dependency override approach
"""
import pytest
from unittest.mock import Mock, patch
from fastapi.testclient import TestClient
from fastapi import Depends
from open_webui.main import app
from open_webui.routers.scim import get_scim_auth
from open_webui.models.users import UserModel
from open_webui.models.groups import GroupModel
# Override the authentication dependency
async def override_get_scim_auth():
"""Override SCIM auth to always return True for tests"""
return True
class TestSCIMWithOverride:
"""Test SCIM endpoints by overriding dependencies"""
@pytest.fixture
def client(self):
# Override the dependency before creating the test client
app.dependency_overrides[get_scim_auth] = override_get_scim_auth
client = TestClient(app)
yield client
# Clean up
app.dependency_overrides.clear()
@pytest.fixture
def mock_user(self):
"""Mock regular user"""
return UserModel(
id="user-456",
name="Test User",
email="test@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_group(self):
"""Mock group"""
return GroupModel(
id="group-789",
user_id="admin-123",
name="Test Group",
description="Test group description",
user_ids=["user-456"],
created_at=1234567890,
updated_at=1234567890
)
# Now test without worrying about auth
@patch('open_webui.models.users.Users.get_users')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_users(self, mock_get_groups, mock_get_users, client, mock_user):
"""Test listing SCIM users"""
mock_get_users.return_value = {
"users": [mock_user],
"total": 1
}
mock_get_groups.return_value = []
# No need for auth headers since we overrode the dependency
response = client.get("/api/v1/scim/v2/Users")
assert response.status_code == 200
data = response.json()
assert data["schemas"] == ["urn:ietf:params:scim:api:messages:2.0:ListResponse"]
assert data["totalResults"] == 1
assert data["itemsPerPage"] == 1
assert len(data["Resources"]) == 1
user = data["Resources"][0]
assert user["id"] == "user-456"
assert user["userName"] == "test@example.com"
assert user["displayName"] == "Test User"
assert user["active"] == True
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_user_by_id(self, mock_get_groups, mock_get_user_by_id, client, mock_user):
"""Test getting a specific SCIM user"""
mock_get_user_by_id.return_value = mock_user
mock_get_groups.return_value = []
response = client.get(f"/api/v1/scim/v2/Users/{mock_user.id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == "user-456"
assert data["userName"] == "test@example.com"
@patch('open_webui.models.users.Users.get_user_by_email')
@patch('open_webui.models.users.Users.insert_new_user')
def test_create_user(self, mock_insert_user, mock_get_user_by_email, client):
"""Test creating a SCIM user"""
mock_get_user_by_email.return_value = None
new_user = UserModel(
id="new-user-123",
name="New User",
email="newuser@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
mock_insert_user.return_value = new_user
create_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "newuser@example.com",
"displayName": "New User",
"emails": [{"value": "newuser@example.com", "primary": True}],
"active": True
}
response = client.post("/api/v1/scim/v2/Users", json=create_data)
assert response.status_code == 201
data = response.json()
assert data["userName"] == "newuser@example.com"
assert data["displayName"] == "New User"
@patch('open_webui.models.groups.Groups.get_groups')
@patch('open_webui.models.users.Users.get_user_by_id')
def test_get_groups(self, mock_get_user_by_id, mock_get_groups, client, mock_group, mock_user):
"""Test listing SCIM groups"""
mock_get_groups.return_value = [mock_group]
mock_get_user_by_id.return_value = mock_user
response = client.get("/api/v1/scim/v2/Groups")
assert response.status_code == 200
data = response.json()
assert data["totalResults"] == 1
assert len(data["Resources"]) == 1
group = data["Resources"][0]
assert group["id"] == "group-789"
assert group["displayName"] == "Test Group"
def test_service_provider_config(self, client):
"""Test service provider config (no auth needed)"""
# Remove the override for this test since it doesn't need auth
app.dependency_overrides.clear()
response = client.get("/api/v1/scim/v2/ServiceProviderConfig")
assert response.status_code == 200
data = response.json()
assert data["patch"]["supported"] == True
assert data["filter"]["supported"] == True

View file

@ -1,130 +0,0 @@
"""
SCIM tests using actual JWT tokens for more realistic testing
"""
import json
import pytest
import jwt
import time
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from datetime import datetime, timezone, timedelta
from open_webui.main import app
from open_webui.models.users import UserModel
from open_webui.models.groups import GroupModel
from open_webui.env import WEBUI_SECRET_KEY
class TestSCIMWithJWT:
"""Test SCIM endpoints with real JWT tokens"""
@pytest.fixture
def client(self):
return TestClient(app)
@pytest.fixture
def mock_admin_user(self):
"""Mock admin user"""
return UserModel(
id="admin-123",
name="Admin User",
email="admin@example.com",
role="admin",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
@pytest.fixture
def mock_user(self):
"""Mock regular user"""
return UserModel(
id="user-456",
name="Test User",
email="test@example.com",
role="user",
profile_image_url="/user.png",
created_at=1234567890,
updated_at=1234567890,
last_active_at=1234567890
)
def create_test_token(self, user_id: str, email: str, role: str = "admin"):
"""Create a valid JWT token for testing"""
payload = {
"id": user_id,
"email": email,
"name": "Test User",
"role": role,
"exp": int(time.time()) + 3600, # Valid for 1 hour
"iat": int(time.time()),
}
# Use the same secret key and algorithm as the application
# You might need to mock or set WEBUI_SECRET_KEY for tests
secret_key = "test-secret-key" # or use WEBUI_SECRET_KEY if available
token = jwt.encode(payload, secret_key, algorithm="HS256")
return token
@pytest.fixture
def admin_token(self):
"""Create admin token"""
return self.create_test_token("admin-123", "admin@example.com", "admin")
@pytest.fixture
def user_token(self):
"""Create regular user token"""
return self.create_test_token("user-456", "test@example.com", "user")
@pytest.fixture
def auth_headers_admin(self, admin_token):
"""Admin authorization headers"""
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture
def auth_headers_user(self, user_token):
"""User authorization headers"""
return {"Authorization": f"Bearer {user_token}"}
# Test with proper JWT token and mocked database
@patch('open_webui.env.WEBUI_SECRET_KEY', 'test-secret-key')
@patch('open_webui.models.users.Users.get_user_by_id')
@patch('open_webui.models.users.Users.get_users')
@patch('open_webui.models.groups.Groups.get_groups_by_member_id')
def test_get_users_with_jwt(self, mock_get_groups, mock_get_users, mock_get_user_by_id,
client, auth_headers_admin, mock_admin_user, mock_user):
"""Test listing users with JWT token"""
# Mock the database calls
mock_get_user_by_id.return_value = mock_admin_user
mock_get_users.return_value = {
"users": [mock_user],
"total": 1
}
mock_get_groups.return_value = []
response = client.get("/api/v1/scim/v2/Users", headers=auth_headers_admin)
# If still getting 401, the token validation might need different mocking
if response.status_code == 401:
pytest.skip("JWT token validation requires full auth setup")
assert response.status_code == 200
data = response.json()
assert data["totalResults"] == 1
# Test non-admin access
@patch('open_webui.env.WEBUI_SECRET_KEY', 'test-secret-key')
@patch('open_webui.models.users.Users.get_user_by_id')
def test_non_admin_forbidden(self, mock_get_user_by_id, client, auth_headers_user, mock_user):
"""Test that non-admin users get 403"""
mock_get_user_by_id.return_value = mock_user
response = client.get("/api/v1/scim/v2/Users", headers=auth_headers_user)
# Should get 403 Forbidden for non-admin
if response.status_code == 401:
pytest.skip("JWT token validation requires full auth setup")
assert response.status_code == 403