Source code for euring.record

from __future__ import annotations

import json
import warnings

from .converters import convert_euring_record
from .exceptions import EuringParseException
from .fields import EURING_FIELDS
from .formats import (
    FORMAT_EURING2000,
    FORMAT_EURING2000PLUS,
    FORMAT_EURING2020,
    FORMAT_JSON,
    format_display_name,
    normalize_format,
    unknown_format_error,
)
from .parsing import euring_decode_value
from .rules import record_rule_errors, requires_euring2020


[docs] class EuringRecord: """Build or decode EURING records.""" def __init__(self, format: str, *, strict: bool = True) -> None: """Initialize a record with the given EURING format.""" self.format = normalize_format(format) self.strict = strict self._fields: dict[str, dict[str, object]] = {} self.errors: dict[str, list] = {"record": [], "fields": []}
[docs] @classmethod def decode(cls, value: str, format: str | None = None) -> EuringRecord: """Decode a EURING record string into an EuringRecord.""" record_format, values_by_key, record_errors = _decode_raw_record(value, format) record = cls(record_format, strict=False) for key, raw_value in values_by_key.items(): record._set_raw_value(key, raw_value) errors = record.validate() if record_errors: errors["record"] = record_errors + errors.get("record", []) record.errors = errors return record
@property def fields(self) -> dict[str, dict[str, object]]: """Return the decoded field data.""" return self._fields
[docs] def set(self, key: str, value: object) -> EuringRecord: """Set a field value by key.""" field = _FIELD_MAP.get(key) if field is None: raise ValueError(f'Unknown field key "{key}".') self._fields[key] = { "name": field["name"], "value": "" if value is None else str(value), "order": field["order"], } return self
def _set_raw_value(self, key: str, value: object) -> None: """Set a field from decoded input without normalization.""" field = _FIELD_MAP.get(key) if field is None: return self._fields[key] = { "name": field["name"], "value": "" if value is None else value, "order": field["order"], }
[docs] def update(self, values: dict[str, object]) -> EuringRecord: """Update multiple field values.""" for key, value in values.items(): self.set(key, value) return self
[docs] def serialize(self, output_format: str | None = None) -> str: """Serialize and validate a EURING record string or JSON payload.""" if output_format is not None and output_format != FORMAT_JSON: normalized = normalize_format(output_format) if normalized != self.format: raise ValueError(f'Record format is "{self.format}". Use that format or "{FORMAT_JSON}".') errors = self.validate() if self.has_errors(errors): if self.strict or self._has_non_optional_errors(errors): raise ValueError(f"Record validation failed: {errors}") if output_format == FORMAT_JSON: return json.dumps(self.to_dict()) return self._serialize()
[docs] def export(self, output_format: str, *, force: bool = False, warn_on_loss: bool = True) -> str: """Export the record to another EURING string format.""" if output_format == FORMAT_JSON: raise ValueError("Use serialize(output_format='json') for JSON output.") normalized = normalize_format(output_format) if normalized == self.format: return self.serialize() record = self.serialize() if force and warn_on_loss: try: return convert_euring_record(record, source_format=self.format, target_format=normalized, force=False) except ValueError as exc: warnings.warn(str(exc), UserWarning) return convert_euring_record(record, source_format=self.format, target_format=normalized, force=force)
[docs] def has_errors(self, errors: object) -> bool: """Return True when a structured errors payload contains entries.""" if not isinstance(errors, dict): return bool(errors) record_errors = errors.get("record", []) field_errors = errors.get("fields", []) return bool(record_errors) or bool(field_errors)
[docs] def validate(self, record: str | None = None) -> dict[str, list]: """Validate all fields, then apply multi-field and record-level checks.""" errors = {"record": [], "fields": []} field_errors = self._validate_fields() errors["fields"].extend(field_errors) errors["fields"].extend(self._validate_record_rules()) self.errors = errors return self.errors
def _validate_fields(self) -> list[dict[str, object]]: """Validate each field value against its definition.""" errors: list[dict[str, object]] = [] fields = _fields_for_format(self.format) positions = _field_positions(fields) if self.format == FORMAT_EURING2000 else {} for index, field in enumerate(fields): key = field["key"] value = self._fields.get(key, {}).get("value", "") value = "" if value is None else value try: euring_decode_value( value, field["type"], required=field.get("required", True), length=field.get("length"), min_length=field.get("min_length"), max_length=field.get("max_length"), parser=field.get("parser"), lookup=field.get("lookup"), ) except EuringParseException as exc: payload = { "field": field["name"], "message": f"{exc}", "value": "" if value is None else f"{value}", "key": key, "index": index, } position = positions.get(key) if position: payload["position"] = position["position"] payload["length"] = position["length"] errors.append(payload) return errors def _has_non_optional_errors(self, errors: dict[str, list]) -> bool: """Return True if errors include anything beyond missing required fields.""" if errors.get("record"): return True for error in errors.get("fields", []): message = error.get("message", "") if message != 'Required field, empty value "" is not permitted.': return True return False def _validate_record_rules(self) -> list[dict[str, object]]: """Validate multi-field and record-level rules.""" values_by_key = {key: field.get("value", "") for key, field in self._fields.items()} errors: list[dict[str, object]] = [] for error in record_rule_errors(self.format, values_by_key): errors.append(_record_error_for_key(error["key"], error["message"], value=error["value"])) return errors
[docs] def to_dict(self) -> dict[str, object]: """Return a JSON-serializable representation of the record.""" return {"record": {"format": format_display_name(self.format)}, "fields": self._fields, "errors": self.errors}
@property def display_format(self) -> str: """Return the formal EURING format name.""" return format_display_name(self.format) def _serialize(self) -> str: """Serialize current field values without strict completeness checks.""" fields = _fields_for_format(self.format) values_by_key: dict[str, str] = {} for field in fields: key = field["key"] value = self._fields.get(key, {}).get("value", "") values_by_key[key] = "" if value is None else value if self.format == FORMAT_EURING2000: return _format_fixed_width(values_by_key, _fixed_width_fields()) return "|".join(values_by_key.get(field["key"], "") for field in fields)
def _fields_for_format(format: str) -> list[dict[str, object]]: """Return the field list for the target format.""" if format == FORMAT_EURING2000: return _fixed_width_fields() if format == FORMAT_EURING2000PLUS: for index, field in enumerate(EURING_FIELDS): if field.get("key") == "reference": return EURING_FIELDS[: index + 1] return EURING_FIELDS def _fixed_width_fields() -> list[dict[str, object]]: """Return field definitions for the EURING2000 fixed-width layout.""" fields: list[dict[str, object]] = [] start = 0 for field in EURING_FIELDS: if start >= 94: break length = field.get("length", field.get("max_length")) if not length: break fields.append({**field, "length": length}) start += length return fields def _format_fixed_width(values_by_key: dict[str, str], fields: list[dict[str, object]]) -> str: """Serialize values into a fixed-width record.""" parts: list[str] = [] for field in fields: key = field["key"] length = field["length"] value = values_by_key.get(key, "") if not value: parts.append("-" * length) continue if len(value) < length: value = value.ljust(length, "-") parts.append(value[:length]) return "".join(parts) def _normalize_decode_format(format: str | None) -> str | None: """Normalize a user-provided format string or raise.""" if not format: return None try: return normalize_format(format) except ValueError: raise EuringParseException(unknown_format_error(format)) def _decode_raw_record(value: object, format: str | None) -> tuple[str, dict[str, str], list[dict[str, str]]]: """Decode raw field values from an encoded EURING record string.""" normalized = _normalize_decode_format(format) record_errors: list[dict[str, str]] = [] values_by_key: dict[str, str] = {} if not isinstance(value, str): record_errors.append({"message": f'Value "{value}" cannot be split with pipe character.'}) return normalized or FORMAT_EURING2000PLUS, values_by_key, record_errors fields = value.split("|") if len(fields) <= 1: if normalized and normalized != FORMAT_EURING2000: record_errors.append( {"message": f'Format "{format_display_name(normalized)}" conflicts with fixed-width EURING2000 data.'} ) start = 0 for field in _fixed_width_fields(): length = field["length"] end = start + length values_by_key[field["key"]] = value[start:end] start = end remainder = value[start:] if remainder.strip(): record_errors.append({"message": f'Value "{value}" invalid EURING2000 code beyond position {start}.'}) current_format = FORMAT_EURING2000 else: if normalized == FORMAT_EURING2000: record_errors.append( {"message": f'Format "{format_display_name(normalized)}" conflicts with pipe-delimited data.'} ) current_format = normalized or FORMAT_EURING2000PLUS for index, raw_value in enumerate(fields): if index >= len(EURING_FIELDS): break values_by_key[EURING_FIELDS[index]["key"]] = raw_value if normalized is None and current_format in {FORMAT_EURING2000PLUS, FORMAT_EURING2020}: if requires_euring2020(values_by_key): current_format = FORMAT_EURING2020 return current_format, values_by_key, record_errors _FIELD_MAP = {field["key"]: {**field, "order": index} for index, field in enumerate(EURING_FIELDS)} def _field_positions(fields: list[dict[str, object]]) -> dict[str, dict[str, int]]: """Return position metadata for fixed-width fields.""" positions: dict[str, dict[str, int]] = {} start = 1 for field in fields: length = field.get("length") if not length: continue positions[field["key"]] = {"position": start, "length": length} start += length return positions def _record_error_for_key(key: str, message: str, *, value: str) -> dict[str, object]: """Build a field error payload for a record-level rule.""" field = _FIELD_MAP.get(key, {}) return { "field": field.get("name", key), "message": message, "value": "" if value is None else f"{value}", "key": key, "index": field.get("order"), }