Source code for pystow.utils.pydantic_utils

"""Utilities for working with Pydantic."""

from __future__ import annotations

import logging
import typing
from collections.abc import Callable, Generator, Iterable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, TextIO, TypeAlias

from tqdm import tqdm

from .safe_open import (
    _open_read_text,
    _open_write_text,
    safe_open_dict_reader,
    safe_open_json,
    safe_open_yaml,
    write_json,
    write_yaml,
)

if TYPE_CHECKING:
    import pydantic

__all__ = [
    "ModelValidateFailureAction",
    "iter_pydantic_jsonl",
    "iter_pydantic_tsv",
    "model_dump_yaml",
    "read_pydantic_json",
    "read_pydantic_jsonl",
    "read_pydantic_tsv",
    "read_pydantic_yaml",
    "stream_write_pydantic_jsonl",
    "write_pydantic_json",
    "write_pydantic_jsonl",
    "write_pydantic_yaml",
]

logger = logging.getLogger(__name__)
BaseModelVar = typing.TypeVar("BaseModelVar", bound="pydantic.BaseModel")

#: The action to take on model validation failure
ModelValidateFailureAction: TypeAlias = Literal["raise", "skip"]


[docs] def iter_pydantic_jsonl( file: str | Path | TextIO, model_cls: type[BaseModelVar], *, progress: bool = False, failure_action: ModelValidateFailureAction = "skip", encoding: str | None = None, newline: str | None = None, tqdm_kwargs: Mapping[str, Any] | None = None, ) -> Iterable[BaseModelVar]: """Read models to a file as JSONL.""" import pydantic _tqdm_kwargs = { "desc": "Reading mappings", "leave": False, "unit": "mapping", "unit_scale": True, } if tqdm_kwargs is not None: _tqdm_kwargs.update(tqdm_kwargs) with _open_read_text(file, encoding=encoding, newline=newline) as file: for i, line in enumerate(tqdm(file, disable=not progress, **_tqdm_kwargs)): try: yv = model_cls.model_validate_json(line.strip()) except pydantic.ValidationError: if failure_action == "raise": raise else: logger.debug("[line:%d] failed to parse JSON", i) continue else: yield yv
[docs] def read_pydantic_jsonl( file: str | Path | TextIO, model_cls: type[BaseModelVar], **kwargs: Any ) -> list[BaseModelVar]: """Read models from a file as JSONL.""" return list(iter_pydantic_jsonl(file, model_cls, **kwargs))
[docs] def write_pydantic_jsonl( models: Iterable[pydantic.BaseModel], file: str | Path | TextIO, **kwargs: Any ) -> None: """Write models to a file as JSONL.""" kwargs.setdefault("exclude_none", True) kwargs.setdefault("exclude_unset", True) kwargs.setdefault("exclude_defaults", True) with _open_write_text(file) as file: for model in models: file.write(model.model_dump_json(**kwargs) + "\n")
[docs] def stream_write_pydantic_jsonl( models: Iterable[BaseModelVar], file: str | Path | TextIO, **kwargs: Any ) -> Generator[BaseModelVar, None, None]: """Write models to a file as JSONL and yield them.""" kwargs.setdefault("exclude_none", True) kwargs.setdefault("exclude_unset", True) kwargs.setdefault("exclude_defaults", True) with _open_write_text(file) as file: for model in models: file.write(model.model_dump_json(**kwargs) + "\n") yield model
[docs] def read_pydantic_tsv( path: str | Path | TextIO, model: type[BaseModelVar], *, process: Callable[[dict[str, Any]], dict[str, Any]] | None = None, failure_action: ModelValidateFailureAction = "skip", ) -> list[BaseModelVar]: """Read models from a TSV file.""" return list(iter_pydantic_tsv(path, model, process=process, failure_action=failure_action))
[docs] def iter_pydantic_tsv( path: str | Path | TextIO, model_cls: type[BaseModelVar], *, process: Callable[[dict[str, Any]], dict[str, Any]] | None = None, failure_action: ModelValidateFailureAction = "skip", ) -> Generator[BaseModelVar, None, None]: """Read models from a TSV file, iteratively.""" with safe_open_dict_reader(path) as reader: records: Iterable[dict[str, Any]] if process is None: records = iter(reader) else: records = (process(record) for record in reader) for record in records: try: yv = model_cls.model_validate(record) except pydantic.ValidationError: if failure_action == "raise": raise else: logger.debug("[line:%d] failed to parse row", record) continue else: yield yv
[docs] def read_pydantic_json( path_or_url: str | Path | TextIO, model_cls: type[BaseModelVar], *, encoding: str | None = None, newline: str | None = None, ) -> BaseModelVar: """Read a JSON file into a model.""" return model_cls.model_validate(safe_open_json(path_or_url, encoding=encoding, newline=newline))
[docs] def read_pydantic_yaml( path_or_url: str | Path | TextIO, model_cls: type[BaseModelVar], *, encoding: str | None = None, newline: str | None = None, ) -> BaseModelVar: """Read a YAML file into a model.""" return model_cls.model_validate(safe_open_yaml(path_or_url, encoding=encoding, newline=newline))
[docs] def model_dump_yaml( model: pydantic.BaseModel, *, exclude_none: bool = False, exclude_unset: bool = False, exclude: set[str] | None = None, ) -> str: """Dump the model as YAML string.""" import yaml data = model.model_dump( mode="json", exclude_none=exclude_none, exclude_unset=exclude_unset, exclude=exclude ) return yaml.safe_dump(data, allow_unicode=True)
[docs] def write_pydantic_yaml( model: pydantic.BaseModel, path: str | Path | TextIO, *, exclude_none: bool = False, exclude_unset: bool = False, encoding: str | None = None, newline: str | None = None, ) -> None: """Write a model to a YAML file.""" data = model_dump_yaml(model, exclude_none=exclude_none, exclude_unset=exclude_unset) write_yaml(data, path, encoding=encoding, newline=newline)
[docs] def write_pydantic_json( model: pydantic.BaseModel, path: str | Path | TextIO, *, exclude_none: bool = False, exclude_unset: bool = False, encoding: str | None = None, newline: str | None = None, ) -> None: """Write a model to a JSON file.""" data = model.model_dump(mode="json", exclude_none=exclude_none, exclude_unset=exclude_unset) write_json(data, path, encoding=encoding, newline=newline)