Source code for euring.record

from __future__ import annotations

import json
import warnings
from dataclasses import replace

from .coordinates import _lat_to_euring_coordinate, _lng_to_euring_coordinate
from .exceptions import EuringConstraintException, EuringException
from .field_schema import EuringField, coerce_field
from .fields import EURING2000_FIELDS, EURING2000PLUS_FIELDS, EURING2020_FIELDS
from .formats import (
    FORMAT_EURING2000,
    FORMAT_EURING2000PLUS,
    FORMAT_EURING2020,
    FORMAT_JSON,
    format_display_name,
    normalize_format,
    unknown_format_error_message,
)
from .rules import record_rule_errors, requires_euring2020
from .utils import is_all_hyphens, is_empty


[docs] class EuringRecord: """Build or decode EURING records.""" def __init__(self, format: str, *, strict: bool = True) -> None: """Initialize a record with the given EURING format.""" self.format = normalize_format(format) self.strict = strict self._fields: dict[str, dict[str, object]] = {} self.errors: dict[str, list] = {"record": [], "fields": []}
[docs] @classmethod def decode(cls, value: str, format: str | None = None) -> EuringRecord: """Decode a EURING record string into an EuringRecord.""" record_format, values_by_key, record_errors = _decode_raw_record(value, format) record = cls(record_format, strict=False) for key, raw_value in values_by_key.items(): record._set_raw_value(key, raw_value) errors = record.validate() if record_errors: errors["record"] = record_errors + errors.get("record", []) record.errors = errors return record
@property def fields(self) -> dict[str, dict[str, object]]: """Return the decoded field data.""" return self._fields
[docs] def set(self, key: str, value: object) -> EuringRecord: """Set a field value by key.""" field = _FIELD_MAP.get(key) if field is None: raise ValueError(f'Unknown field key "{key}".') # Setting a typed value should clear any previously captured raw EURING text. self._fields[key] = {"name": field["name"], "value": value, "order": field["order"]} return self
def _set_raw_value(self, key: str, value: object) -> None: """Set a field from decoded input without normalization.""" field = _FIELD_MAP.get(key) if field is None: return self._fields[key] = { "name": field["name"], "raw_value": "" if value is None else f"{value}", "value": "" if value is None else f"{value}", "order": field["order"], }
[docs] def update(self, values: dict[str, object]) -> EuringRecord: """Update multiple field values.""" for key, value in values.items(): self.set(key, value) return self
[docs] def serialize(self, output_format: str | None = None) -> str: """Serialize and validate a EURING record string or JSON payload.""" if output_format is not None and output_format != FORMAT_JSON: normalized = normalize_format(output_format) if normalized != self.format: raise ValueError(f'Record format is "{self.format}". Use that format or "{FORMAT_JSON}".') errors = self.validate() if self.has_errors(errors): if self.strict or self._has_non_optional_errors(errors): raise ValueError(f"Record validation failed: {errors}") if output_format == FORMAT_JSON: return json.dumps(self.to_dict()) return self._serialize()
[docs] def export(self, output_format: str, *, force: bool = False, warn_on_loss: bool = True) -> str: """Export the record to another EURING string format.""" if output_format == FORMAT_JSON: raise ValueError("Use serialize(output_format='json') for JSON output.") normalized = normalize_format(output_format) if normalized == self.format: return self.serialize() record = self.serialize() if force and warn_on_loss: try: return _convert_record_string( record, source_format=self.format, target_format=normalized, force=False, ) except ValueError as exc: warnings.warn(str(exc), UserWarning) return _convert_record_string(record, source_format=self.format, target_format=normalized, force=force)
[docs] def has_errors(self, errors: object) -> bool: """Return True when a structured errors payload contains entries.""" if not isinstance(errors, dict): return bool(errors) record_errors = errors.get("record", []) field_errors = errors.get("fields", []) return bool(record_errors) or bool(field_errors)
[docs] def validate(self, record: str | None = None) -> dict[str, list]: """Validate all fields, then apply multi-field and record-level checks.""" errors = {"record": [], "fields": []} field_errors = self._validate_fields() errors["fields"].extend(field_errors) errors["fields"].extend(self._validate_record_rules()) self.errors = errors return self.errors
def _validate_fields(self) -> list[dict[str, object]]: """Validate each field value against its definition.""" errors: list[dict[str, object]] = [] fields = _fields_for_format(self.format) positions = _field_positions(fields) if self.format == FORMAT_EURING2000 else {} needs_geo_dots = False if self.format == FORMAT_EURING2020: lat_value = self._fields.get("latitude", {}).get("value") lng_value = self._fields.get("longitude", {}).get("value") needs_geo_dots = not is_empty(lat_value) or not is_empty(lng_value) for index, field in enumerate(fields): key = field["key"] field_state = self._fields.get(key, {}) value = field_state.get("value", "") had_empty_value = is_empty(value) try: field_obj = field if isinstance(field, EuringField) else coerce_field(field) if self.format == FORMAT_EURING2000 and field_obj.get("variable_length"): # EURING2000 does not support variable-length encoding. field_obj = replace(field_obj, variable_length=False) encoded_value = _serialize_field_value(field, value, self.format) raw_value = encoded_value if key == "date" and had_empty_value and is_all_hyphens(raw_value): # Treat placeholder dashes for missing required dates as empty so # non-strict mode only reports a missing-required-field error. raw_value = "" if key == "geographical_coordinates" and had_empty_value and needs_geo_dots: raw_value = "." * 15 encoded_value = raw_value parsed_value = field_obj.parse(raw_value) if had_empty_value and raw_value: parsed_value = None description_value = parsed_value if field_obj.get("lookup") is not None and raw_value != "" and parsed_value is not None: if field_obj.get("parser") is None: description_value = raw_value elif field_obj.get("value_type") == "date": # Date lookups operate on the encoded ddmmyyyy string. description_value = raw_value description = field_obj.describe(description_value) if key in self._fields: self._fields[key]["value"] = parsed_value self._fields[key]["encoded_value"] = encoded_value if field_obj.get("parser") is not None: self._fields[key]["parsed_value"] = parsed_value if description is not None: self._fields[key]["description"] = description except EuringException as exc: payload = { "field": field["name"], "message": f"{exc}", "value": "" if value is None else f"{value}", "key": key, "index": index, } position = positions.get(key) if position: payload["position"] = position["position"] payload["length"] = position["length"] errors.append(payload) return errors def _has_non_optional_errors(self, errors: dict[str, list]) -> bool: """Return True if errors include anything beyond missing required fields.""" if errors.get("record"): return True for error in errors.get("fields", []): message = error.get("message", "") if message != 'Required field, empty value "" is not permitted.': return True return False def _validate_record_rules(self) -> list[dict[str, object]]: """Validate multi-field and record-level rules.""" values_by_key: dict[str, str] = {} for field in _fields_for_format(self.format): key = field["key"] field_state = self._fields.get(key, {}) source_raw = field_state.get("raw_value") if source_raw is not None: values_by_key[key] = source_raw continue value = field_state.get("value", "") try: values_by_key[key] = _serialize_field_value(field, value, self.format) except EuringException: values_by_key[key] = "" if self.format == FORMAT_EURING2020: lat_value = values_by_key.get("latitude", "") lng_value = values_by_key.get("longitude", "") if (lat_value or lng_value) and not values_by_key.get("geographical_coordinates"): values_by_key["geographical_coordinates"] = "." * 15 errors: list[dict[str, object]] = [] for error in record_rule_errors(self.format, values_by_key): errors.append(_record_error_for_key(error["key"], error["message"], value=error["value"])) return errors
[docs] def to_dict(self) -> dict[str, object]: """Return a JSON-serializable representation of the record.""" return {"record": {"format": format_display_name(self.format)}, "fields": self._fields, "errors": self.errors}
@property def display_format(self) -> str: """Return the formal EURING format name.""" return format_display_name(self.format) def _serialize(self) -> str: """Serialize current field values without strict completeness checks.""" fields = _fields_for_format(self.format) values_by_key: dict[str, str] = {} geo_placeholder = None if self.format == FORMAT_EURING2020: lat_value = self._fields.get("latitude", {}).get("value") lng_value = self._fields.get("longitude", {}).get("value") if not is_empty(lat_value) or not is_empty(lng_value): geo_placeholder = "." * 15 for field in fields: key = field["key"] value = self._fields.get(key, {}).get("value") if key == "geographical_coordinates": if is_empty(value) and geo_placeholder: values_by_key[key] = geo_placeholder continue values_by_key[key] = _serialize_field_value(field, value, self.format) if self.format == FORMAT_EURING2000: return _format_fixed_width(values_by_key, EURING2000_FIELDS) return "|".join(values_by_key.get(field["key"], "") for field in fields)
def _fields_for_format(format: str) -> list[dict[str, object]]: """Return the field list for the target format.""" if format == FORMAT_EURING2000: return EURING2000_FIELDS if format == FORMAT_EURING2000PLUS: return EURING2000PLUS_FIELDS if format == FORMAT_EURING2020: return EURING2020_FIELDS raise EuringException(f"Unknown EuringRecord format: {format}.") def _format_fixed_width(values_by_key: dict[str, str], fields: list[dict[str, object]]) -> str: """Serialize values into a fixed-width record.""" parts: list[str] = [] for field in fields: key = field["key"] length = field["length"] value = values_by_key.get(key, "") if not value: parts.append("-" * length) continue if len(value) < length: value = value.ljust(length, "-") parts.append(value[:length]) return "".join(parts) def _serialize_field_value(field: dict[str, object], value: object, format: str) -> str: """Encode a typed field value into a EURING raw string.""" field_obj = coerce_field(field) return field_obj.encode_for_format(value, format=format) def _convert_record_string( value: str, *, source_format: str | None = None, target_format: str, force: bool = False, ) -> str: """Convert EURING records between euring2000, euring2000plus, and euring2020.""" normalized_target, values_by_key, target_fields = _convert_record_data( value, source_format=source_format, target_format=target_format, force=force ) if normalized_target == FORMAT_EURING2000: return _format_fixed_width(values_by_key, target_fields) output_values = [values_by_key.get(field["key"], "") for field in target_fields] return "|".join(output_values) def _convert_record_data( value: str, *, source_format: str | None = None, target_format: str, force: bool = False, ) -> tuple[str, dict[str, str], list[dict[str, object]]]: """Convert and return the normalized target format plus field values by key.""" normalized_target = _normalize_target_format(target_format) normalized_source = _normalize_source_format(source_format, value) if normalized_source == FORMAT_EURING2000: fields = _split_fixed_width(value) source_fields = EURING2000_FIELDS else: fields = _split_pipe_delimited(value) source_fields = _fields_for_format(normalized_source) if len(fields) > len(source_fields) and any(part.strip() for part in fields[len(source_fields) :]): raise ValueError( "Input has more fields than expected for the declared format. " f"Use {FORMAT_EURING2020} when 2020-only fields are present." ) values_by_key = _map_fields_to_values(source_fields, fields) _require_force_on_loss(values_by_key, normalized_source, normalized_target, force) _apply_coordinate_downgrade(values_by_key, normalized_source, normalized_target, force) target_fields = _fields_for_format(normalized_target) return normalized_target, values_by_key, target_fields def _split_fixed_width(value: str) -> list[str]: """Split a fixed-width EURING2000 record into field values.""" if "|" in value: raise ValueError(f"Input appears to be pipe-delimited, not fixed-width {FORMAT_EURING2000}.") if len(value) < 94: raise ValueError(f"{FORMAT_EURING2000} record must be 94 characters long.") if len(value) > 94 and value[94:].strip(): raise ValueError(f"{FORMAT_EURING2000} record contains extra data beyond position 94.") fields: list[str] = [] start = 0 for field in EURING2000_FIELDS: length = field["length"] end = start + length chunk = value[start:end] if len(chunk) < length: chunk = chunk.ljust(length) fields.append(chunk) start = end return fields def _split_pipe_delimited(value: str) -> list[str]: """Split a pipe-delimited record into field values.""" return value.split("|") def _map_fields_to_values(fields: list[dict[str, object]], values: list[str]) -> dict[str, str]: """Map field definitions to values by key.""" mapping: dict[str, str] = {} for index, field in enumerate(fields): key = field["key"] mapping[key] = values[index] if index < len(values) else "" return mapping def _require_force_on_loss(values_by_key: dict[str, str], source_format: str, target_format: str, force: bool) -> None: """Raise when conversion would lose data without force.""" reasons: list[str] = [] if target_format in {FORMAT_EURING2000, FORMAT_EURING2000PLUS}: for key in ("latitude", "longitude", "current_place_code", "more_other_marks"): if values_by_key.get(key): reasons.append(f"drop {key}") accuracy = values_by_key.get("accuracy_of_coordinates", "") if accuracy.isalpha(): reasons.append("alphabetic coordinate accuracy") if target_format == FORMAT_EURING2000: fixed_keys = {field["key"] for field in EURING2000_FIELDS} for key, value in values_by_key.items(): if key not in fixed_keys and value: reasons.append(f"drop {key}") if reasons and not force: summary = ", ".join(sorted(set(reasons))) raise ValueError(f"Conversion would lose data. Use --force to proceed. Potential losses: {summary}.") def _apply_coordinate_downgrade( values_by_key: dict[str, str], source_format: str, target_format: str, force: bool ) -> None: """Apply lossy coordinate downgrade rules when needed.""" if target_format not in {FORMAT_EURING2000, FORMAT_EURING2000PLUS}: return accuracy = values_by_key.get("accuracy_of_coordinates", "") if accuracy.isalpha(): if not force: raise ValueError( f"Alphabetic accuracy codes are only valid in {FORMAT_EURING2020}. Use --force to apply lossy mapping." ) mapped = _map_alpha_accuracy_to_numeric(accuracy) if mapped is None: raise ValueError(f'Unsupported alphabetic accuracy code "{accuracy}".') values_by_key["accuracy_of_coordinates"] = mapped coords = values_by_key.get("geographical_coordinates", "") if coords.strip() and set(coords) != {"."}: return latitude = values_by_key.get("latitude", "") longitude = values_by_key.get("longitude", "") if not latitude or not longitude: return lat = _lat_to_euring_coordinate(float(latitude)) lng = _lng_to_euring_coordinate(float(longitude)) values_by_key["geographical_coordinates"] = f"{lat}{lng}" def _map_alpha_accuracy_to_numeric(code: str) -> str | None: """Map alphabetic accuracy codes to numeric values.""" mapping = { "A": "0", "B": "0", "C": "0", "D": "0", "E": "0", "F": "0", "G": "0", "H": "1", "I": "2", "J": "4", "K": "5", "L": "6", "M": "7", "Z": "9", } return mapping.get(code.upper()) def _normalize_target_format(target_format: str) -> str: """Normalize a target format string to an internal constant.""" try: return normalize_format(target_format) except ValueError: raise ValueError(unknown_format_error_message(target_format, "target format")) def _normalize_source_format(source_format: str | None, value: str) -> str: """Normalize a source format string or auto-detect from the value.""" if source_format is None: if "|" not in value: return FORMAT_EURING2000 values = value.split("|") reference_index = _field_index("reference") accuracy_index = _field_index("accuracy_of_coordinates") accuracy_value = values[accuracy_index] if accuracy_index < len(values) else "" has_2020_fields = len(values) > reference_index + 1 if (accuracy_value and accuracy_value.isalpha()) or has_2020_fields: return FORMAT_EURING2020 return FORMAT_EURING2000PLUS try: return normalize_format(source_format) except ValueError: raise ValueError(unknown_format_error_message(source_format, "source format")) def _field_index(key: str) -> int: """Return the field index for a given key.""" for index, field in enumerate(EURING2020_FIELDS): if field.get("key") == key: return index raise ValueError(f'Unknown field key "{key}".') def _normalize_decode_format(format: str | None) -> str | None: """Normalize a user-provided format string or raise.""" if not format: return None try: return normalize_format(format) except ValueError: raise EuringConstraintException(unknown_format_error_message(format)) def _decode_raw_record(value: object, format: str | None) -> tuple[str, dict[str, str], list[dict[str, str]]]: """Decode raw field values from an encoded EURING record string.""" normalized = _normalize_decode_format(format) record_errors: list[dict[str, str]] = [] values_by_key: dict[str, str] = {} if not isinstance(value, str): record_errors.append({"message": f'Value "{value}" cannot be split with pipe character.'}) return normalized or FORMAT_EURING2000PLUS, values_by_key, record_errors fields = value.split("|") if len(fields) <= 1: if normalized and normalized != FORMAT_EURING2000: record_errors.append( {"message": f'Format "{format_display_name(normalized)}" conflicts with fixed-width EURING2000 data.'} ) start = 0 for field in EURING2000_FIELDS: length = field["length"] end = start + length values_by_key[field["key"]] = value[start:end] start = end remainder = value[start:] if remainder.strip(): record_errors.append({"message": f'Value "{value}" invalid EURING2000 code beyond position {start}.'}) current_format = FORMAT_EURING2000 else: if normalized == FORMAT_EURING2000: record_errors.append( {"message": f'Format "{format_display_name(normalized)}" conflicts with pipe-delimited data.'} ) current_format = normalized or FORMAT_EURING2000PLUS for index, raw_value in enumerate(fields): if index >= len(EURING2020_FIELDS): break values_by_key[EURING2020_FIELDS[index]["key"]] = raw_value if normalized is None and current_format in {FORMAT_EURING2000PLUS, FORMAT_EURING2020}: if requires_euring2020(values_by_key): current_format = FORMAT_EURING2020 return current_format, values_by_key, record_errors _FIELD_MAP = {field["key"]: {**field, "order": index} for index, field in enumerate(EURING2020_FIELDS)} def _field_positions(fields: list[dict[str, object]]) -> dict[str, dict[str, int]]: """Return position metadata for fixed-width fields.""" positions: dict[str, dict[str, int]] = {} start = 1 for field in fields: length = field.get("length") if not length: continue positions[field["key"]] = {"position": start, "length": length} start += length return positions def _record_error_for_key(key: str, message: str, *, value: str) -> dict[str, object]: """Build a field error payload for a record-level rule.""" field = _FIELD_MAP.get(key, {}) return { "field": field.get("name", key), "message": message, "value": "" if value is None else f"{value}", "key": key, "index": field.get("order"), }