fix: MCP OAuth discovery via Protected Resource metadata flow

When an MCP server's OAuth authorization server is on a different domain
(e.g., Todoist MCP at ai.todoist.net with OAuth at todoist.com), the
current implementation fails because it only looks for OAuth metadata at
the MCP server's domain.

This commit implements the full MCP Protected Resource discovery flow as
specified in the MCP authorization spec:

1. Make an unauthenticated request to the MCP endpoint
2. Parse the WWW-Authenticate header to get the resource_metadata URL
3. Fetch the Protected Resource metadata
4. Extract the authorization_servers array
5. Use those servers for OAuth metadata discovery

The fix is backwards-compatible - if Protected Resource discovery fails,
it falls back to the existing behavior.

Fixes #19794
This commit is contained in:
jamie 2025-12-07 12:53:22 +11:00
parent 6f1486ffd0
commit b766a23e36
No known key found for this signature in database

View file

@ -239,6 +239,61 @@ def is_in_blocked_groups(group_name: str, groups: list) -> bool:
return False return False
async def discover_authorization_server_from_mcp(mcp_server_url: str) -> list[str]:
"""
Discover OAuth authorization servers by following the MCP Protected Resource flow.
According to the MCP spec (https://modelcontextprotocol.io/specification/2025-03-26/basic/authorization):
1. Make an unauthenticated request to the MCP endpoint
2. Parse WWW-Authenticate header to get resource_metadata URL
3. Fetch Protected Resource metadata to get authorization_servers
Returns:
List of authorization server base URLs, or empty list if discovery fails
"""
authorization_servers = []
try:
# Step 1: Make unauthenticated request to MCP endpoint to get WWW-Authenticate header
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(
mcp_server_url,
json={"jsonrpc": "2.0", "method": "initialize", "params": {}, "id": 1},
headers={"Content-Type": "application/json"},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
if response.status == 401:
www_auth = response.headers.get("WWW-Authenticate", "")
# Parse resource_metadata from WWW-Authenticate header
# Format: Bearer resource_metadata="https://..."
match = re.search(r'resource_metadata="([^"]+)"', www_auth)
if match:
resource_metadata_url = match.group(1)
log.debug(f"Found resource_metadata URL: {resource_metadata_url}")
# Step 2: Fetch Protected Resource metadata
async with session.get(
resource_metadata_url, ssl=AIOHTTP_CLIENT_SESSION_SSL
) as resource_response:
if resource_response.status == 200:
resource_metadata = await resource_response.json()
# Step 3: Extract authorization_servers
servers = resource_metadata.get(
"authorization_servers", []
)
if servers:
authorization_servers = servers
log.debug(
f"Discovered authorization servers: {servers}"
)
except Exception as e:
log.debug(f"MCP Protected Resource discovery failed: {e}")
return authorization_servers
def get_parsed_and_base_url(server_url) -> tuple[urllib.parse.ParseResult, str]: def get_parsed_and_base_url(server_url) -> tuple[urllib.parse.ParseResult, str]:
parsed = urllib.parse.urlparse(server_url) parsed = urllib.parse.urlparse(server_url)
base_url = f"{parsed.scheme}://{parsed.netloc}" base_url = f"{parsed.scheme}://{parsed.netloc}"
@ -301,9 +356,29 @@ async def get_oauth_client_info_with_dynamic_client_registration(
response_types=["code"], response_types=["code"],
) )
# First, try MCP Protected Resource discovery flow
# This handles cases where the OAuth server is on a different domain than the MCP server
# (e.g., Todoist MCP at ai.todoist.net, OAuth at todoist.com)
authorization_servers = await discover_authorization_server_from_mcp(
oauth_server_url
)
# Build discovery URLs - prioritize authorization servers from MCP discovery
all_discovery_urls = []
for auth_server in authorization_servers:
auth_server = auth_server.rstrip("/")
all_discovery_urls.extend(
[
f"{auth_server}/.well-known/oauth-authorization-server",
f"{auth_server}/.well-known/openid-configuration",
]
)
# Fall back to standard discovery URLs based on the MCP server URL
all_discovery_urls.extend(get_discovery_urls(oauth_server_url))
# Attempt to fetch OAuth server metadata to get registration endpoint & scopes # Attempt to fetch OAuth server metadata to get registration endpoint & scopes
discovery_urls = get_discovery_urls(oauth_server_url) for url in all_discovery_urls:
for url in discovery_urls:
async with aiohttp.ClientSession(trust_env=True) as session: async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get( async with session.get(
url, ssl=AIOHTTP_CLIENT_SESSION_SSL url, ssl=AIOHTTP_CLIENT_SESSION_SSL