Skip to content

Jsonschema-path upgrade #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions openapi_spec_validator/readers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import sys
from collections.abc import Hashable
from os import path
from pathlib import Path
from typing import Any
from typing import Mapping
from typing import Tuple

from jsonschema_path.handlers import all_urls_handler
from jsonschema_path.handlers import file_handler
from jsonschema_path.typing import Schema


def read_from_stdin(filename: str) -> Tuple[Mapping[Hashable, Any], str]:
def read_from_stdin(filename: str) -> Tuple[Schema, str]:
return file_handler(sys.stdin), "" # type: ignore


def read_from_filename(filename: str) -> Tuple[Mapping[Hashable, Any], str]:
def read_from_filename(filename: str) -> Tuple[Schema, str]:
if not path.isfile(filename):
raise OSError(f"No such file: {filename}")

Expand Down
8 changes: 3 additions & 5 deletions openapi_spec_validator/schemas/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
"""OpenAIP spec validator schemas utils module."""
from collections.abc import Hashable
from importlib.resources import as_file
from importlib.resources import files
from os import path
from typing import Any
from typing import Mapping
from typing import Tuple

from jsonschema_path.readers import FilePathReader
from jsonschema_path.typing import Schema


def get_schema(version: str) -> Tuple[Mapping[Hashable, Any], str]:
def get_schema(version: str) -> Tuple[Schema, str]:
schema_path = f"resources/schemas/v{version}/schema.json"
ref = files("openapi_spec_validator") / schema_path
with as_file(ref) as resource_path:
schema_path_full = path.join(path.dirname(__file__), resource_path)
return FilePathReader(schema_path_full).read()


def get_schema_content(version: str) -> Mapping[Hashable, Any]:
def get_schema_content(version: str) -> Schema:
content, _ = get_schema(version)
return content
58 changes: 35 additions & 23 deletions openapi_spec_validator/validation/keywords.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import string
from typing import TYPE_CHECKING
from typing import Any
from typing import Sequence
from typing import Iterator
from typing import List
from typing import Optional
Expand Down Expand Up @@ -78,7 +79,8 @@ def _collect_properties(self, schema: SchemaPath) -> set[str]:
props: set[str] = set()

if "properties" in schema:
props.update((schema / "properties").keys())
schema_props = (schema / "properties").keys()
props.update(cast(Sequence[str], schema_props))

for kw in ("allOf", "anyOf", "oneOf"):
if kw in schema:
Expand All @@ -96,11 +98,12 @@ def _collect_properties(self, schema: SchemaPath) -> set[str]:
def __call__(
self, schema: SchemaPath, require_properties: bool = True
) -> Iterator[ValidationError]:
if not hasattr(schema.content(), "__getitem__"):
schema_value = schema.read_value()
if not hasattr(schema_value, "__getitem__"):
return

assert self.schema_ids_registry is not None
schema_id = id(schema.content())
schema_id = id(schema_value)
if schema_id in self.schema_ids_registry:
return
self.schema_ids_registry.append(schema_id)
Expand Down Expand Up @@ -151,8 +154,8 @@ def __call__(
require_properties=False,
)

required = schema.getkey("required", [])
properties = schema.get("properties", {}).keys()
required = "required" in schema and (schema / "required").read_value() or []
properties = "properties" in schema and (schema / "properties").keys() or []
if "allOf" in schema:
extra_properties = list(
set(required) - set(properties) - set(nested_properties)
Expand All @@ -166,10 +169,12 @@ def __call__(
)

if "default" in schema:
default = schema["default"]
nullable = schema.get("nullable", False)
if default is not None or nullable is not True:
yield from self.default_validator(schema, default)
default_value = (schema / "default").read_value()
nullable_value = False
if "nullable" in schema:
nullable_value = (schema / "nullable").read_value()
if default_value is not None or nullable_value is not True:
yield from self.default_validator(schema, default_value)


class SchemasValidator(KeywordValidator):
Expand Down Expand Up @@ -203,9 +208,9 @@ def __call__(self, parameter: SchemaPath) -> Iterator[ValidationError]:

if "default" in parameter:
# only possible in swagger 2.0
default = parameter.getkey("default")
if default is not None:
yield from self.default_validator(parameter, default)
if "default" in parameter:
default_value = (parameter / "default").read_value()
yield from self.default_validator(parameter, default_value)


class ParametersValidator(KeywordValidator):
Expand Down Expand Up @@ -246,6 +251,7 @@ def media_type_validator(self) -> MediaTypeValidator:

def __call__(self, content: SchemaPath) -> Iterator[ValidationError]:
for mimetype, media_type in content.items():
assert isinstance(mimetype, str)
yield from self.media_type_validator(mimetype, media_type)


Expand Down Expand Up @@ -291,6 +297,7 @@ def response_validator(self) -> ResponseValidator:

def __call__(self, responses: SchemaPath) -> Iterator[ValidationError]:
for response_code, response in responses.items():
assert isinstance(response_code, str)
yield from self.response_validator(response_code, response)


Expand All @@ -317,15 +324,17 @@ def __call__(
) -> Iterator[ValidationError]:
assert self.operation_ids_registry is not None

operation_id = operation.getkey("operationId")
if (
operation_id is not None
and operation_id in self.operation_ids_registry
):
yield DuplicateOperationIDError(
f"Operation ID '{operation_id}' for '{name}' in '{url}' is not unique"
)
self.operation_ids_registry.append(operation_id)
if "operationId" in operation:
operation_id_value = (operation / "operationId").read_value()
if (
operation_id_value is not None
and operation_id_value in self.operation_ids_registry
):
yield DuplicateOperationIDError(
f"Operation ID '{operation_id_value}' for "
f"'{name}' in '{url}' is not unique"
)
self.operation_ids_registry.append(operation_id_value)

if "responses" in operation:
responses = operation / "responses"
Expand Down Expand Up @@ -392,6 +401,7 @@ def __call__(
yield from self.parameters_validator(parameters)

for field_name, operation in path_item.items():
assert isinstance(field_name, str)
if field_name not in self.OPERATIONS:
continue

Expand All @@ -407,6 +417,7 @@ def path_validator(self) -> PathValidator:

def __call__(self, paths: SchemaPath) -> Iterator[ValidationError]:
for url, path_item in paths.items():
assert isinstance(url, str)
yield from self.path_validator(url, path_item)


Expand All @@ -416,8 +427,9 @@ def schemas_validator(self) -> SchemasValidator:
return cast(SchemasValidator, self.registry["schemas"])

def __call__(self, components: SchemaPath) -> Iterator[ValidationError]:
schemas = components.get("schemas", {})
yield from self.schemas_validator(schemas)
if "schemas" in components:
schemas = components / "schemas"
yield from self.schemas_validator(schemas)


class RootValidator(KeywordValidator):
Expand Down
11 changes: 5 additions & 6 deletions openapi_spec_validator/validation/protocols.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
from typing import Any
from typing import Hashable
from typing import Iterator
from typing import Mapping
from typing import Optional
from typing import Protocol
from typing import runtime_checkable

from jsonschema_path.typing import Schema

from openapi_spec_validator.validation.exceptions import OpenAPIValidationError


@runtime_checkable
class SupportsValidation(Protocol):
def is_valid(self, instance: Mapping[Hashable, Any]) -> bool:
def is_valid(self, instance: Schema) -> bool:
...

def iter_errors(
self,
instance: Mapping[Hashable, Any],
instance: Schema,
base_uri: str = "",
spec_url: Optional[str] = None,
) -> Iterator[OpenAPIValidationError]:
...

def validate(
self,
instance: Mapping[Hashable, Any],
instance: Schema,
base_uri: str = "",
spec_url: Optional[str] = None,
) -> None:
Expand Down
8 changes: 4 additions & 4 deletions openapi_spec_validator/validation/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ class DetectValidatorProxy:
def __init__(self, choices: Mapping[Tuple[str, str], SpecValidatorProxy]):
self.choices = choices

def detect(self, instance: Mapping[Hashable, Any]) -> SpecValidatorProxy:
def detect(self, instance: Schema) -> SpecValidatorProxy:
for (key, value), validator in self.choices.items():
if key in instance and instance[key].startswith(value):
return validator
raise ValidatorDetectError("Spec schema version not detected")

def validate(
self,
instance: Mapping[Hashable, Any],
instance: Schema,
base_uri: str = "",
spec_url: Optional[str] = None,
) -> None:
Expand All @@ -80,14 +80,14 @@ def validate(
):
raise err

def is_valid(self, instance: Mapping[Hashable, Any]) -> bool:
def is_valid(self, instance: Schema) -> bool:
validator = self.detect(instance)
error = next(validator.iter_errors(instance), None)
return error is None

def iter_errors(
self,
instance: Mapping[Hashable, Any],
instance: Schema,
base_uri: str = "",
spec_url: Optional[str] = None,
) -> Iterator[OpenAPIValidationError]:
Expand Down
2 changes: 1 addition & 1 deletion openapi_spec_validator/validation/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(

if isinstance(schema, SchemaPath):
self.schema_path = schema
self.schema = schema.contents()
self.schema = schema.read_value()
else:
self.schema = schema
self.schema_path = SchemaPath.from_dict(
Expand Down
Loading
Loading