Source code for euring.converters

from __future__ import annotations

from .fields import EURING_FIELDS
from .formats import (
    FORMAT_EURING2000,
    FORMAT_EURING2000PLUS,
    FORMAT_EURING2020,
    normalize_format,
    unknown_format_error,
)
from .utils import euring_lat_to_dms, euring_lng_to_dms


[docs] def convert_euring2000_record(value: str, target_format: str = FORMAT_EURING2020) -> str: """Convert a fixed-width euring2000 record to euring2000plus or euring2020.""" return convert_euring_record(value, source_format=FORMAT_EURING2000, target_format=target_format)
[docs] def convert_euring_record( value: str, source_format: str | None = None, target_format: str = FORMAT_EURING2020, force: bool = False, ) -> str: """Convert EURING records between euring2000, euring2000plus, and euring2020.""" normalized_target, values_by_key, target_fields = convert_euring_record_data( value, source_format=source_format, target_format=target_format, force=force ) if normalized_target == FORMAT_EURING2000: return _format_fixed_width(values_by_key, target_fields) output_values = [values_by_key.get(field["key"], "") for field in target_fields] return "|".join(output_values)
def convert_euring_record_data( value: str, *, source_format: str | None = None, target_format: str, force: bool = False, ) -> tuple[str, dict[str, str], list[dict[str, object]]]: """Convert and return the normalized target format plus field values by key.""" normalized_target = _normalize_target_format(target_format) normalized_source = _normalize_source_format(source_format, value) if normalized_source == FORMAT_EURING2000: fields = _split_fixed_width(value) source_fields = _fixed_width_fields() else: fields = _split_pipe_delimited(value) source_fields = _target_fields(normalized_source) if len(fields) > len(source_fields) and any(part.strip() for part in fields[len(source_fields) :]): raise ValueError( "Input has more fields than expected for the declared format. " f"Use {FORMAT_EURING2020} when 2020-only fields are present." ) values_by_key = _map_fields_to_values(source_fields, fields) _require_force_on_loss(values_by_key, normalized_source, normalized_target, force) _apply_coordinate_downgrade(values_by_key, normalized_source, normalized_target, force) if normalized_target == FORMAT_EURING2000: target_fields = _fixed_width_fields() else: target_fields = _target_fields(normalized_target) return normalized_target, values_by_key, target_fields def _split_fixed_width(value: str) -> list[str]: """Split a fixed-width EURING2000 record into field values.""" if "|" in value: raise ValueError(f"Input appears to be pipe-delimited, not fixed-width {FORMAT_EURING2000}.") if len(value) < 94: raise ValueError(f"{FORMAT_EURING2000} record must be 94 characters long.") if len(value) > 94 and value[94:].strip(): raise ValueError(f"{FORMAT_EURING2000} record contains extra data beyond position 94.") fields: list[str] = [] start = 0 for field in _fixed_width_fields(): length = field["length"] end = start + length chunk = value[start:end] if len(chunk) < length: chunk = chunk.ljust(length) fields.append(chunk) start = end return fields def _split_pipe_delimited(value: str) -> list[str]: """Split a pipe-delimited record into field values.""" return value.split("|") def _map_fields_to_values(fields: list[dict[str, object]], values: list[str]) -> dict[str, str]: """Map field definitions to values by key.""" mapping: dict[str, str] = {} for index, field in enumerate(fields): key = field["key"] mapping[key] = values[index] if index < len(values) else "" return mapping def _require_force_on_loss(values_by_key: dict[str, str], source_format: str, target_format: str, force: bool) -> None: """Raise when conversion would lose data without force.""" reasons: list[str] = [] if target_format in {FORMAT_EURING2000, FORMAT_EURING2000PLUS}: for key in ("latitude", "longitude", "current_place_code", "more_other_marks"): if values_by_key.get(key): reasons.append(f"drop {key}") accuracy = values_by_key.get("accuracy_of_coordinates", "") if accuracy.isalpha(): reasons.append("alphabetic coordinate accuracy") if target_format == FORMAT_EURING2000: fixed_keys = {field["key"] for field in _fixed_width_fields()} for key, value in values_by_key.items(): if key not in fixed_keys and value: reasons.append(f"drop {key}") if reasons and not force: summary = ", ".join(sorted(set(reasons))) raise ValueError(f"Conversion would lose data. Use --force to proceed. Potential losses: {summary}.") def _apply_coordinate_downgrade( values_by_key: dict[str, str], source_format: str, target_format: str, force: bool ) -> None: """Apply lossy coordinate downgrade rules when needed.""" if target_format not in {FORMAT_EURING2000, FORMAT_EURING2000PLUS}: return accuracy = values_by_key.get("accuracy_of_coordinates", "") if accuracy.isalpha(): if not force: raise ValueError( f"Alphabetic accuracy codes are only valid in {FORMAT_EURING2020}. Use --force to apply lossy mapping." ) mapped = _map_alpha_accuracy_to_numeric(accuracy) if mapped is None: raise ValueError(f'Unsupported alphabetic accuracy code "{accuracy}".') values_by_key["accuracy_of_coordinates"] = mapped coords = values_by_key.get("geographical_coordinates", "") if coords.strip(): return latitude = values_by_key.get("latitude", "") longitude = values_by_key.get("longitude", "") if not latitude or not longitude: return lat = euring_lat_to_dms(float(latitude)) lng = euring_lng_to_dms(float(longitude)) values_by_key["geographical_coordinates"] = f"{lat}{lng}" def _map_alpha_accuracy_to_numeric(code: str) -> str | None: """Map alphabetic accuracy codes to numeric values.""" mapping = { "A": "0", "B": "0", "C": "0", "D": "0", "E": "0", "F": "0", "G": "0", "H": "1", "I": "2", "J": "4", "K": "5", "L": "6", "M": "7", "Z": "9", } return mapping.get(code.upper()) def _format_fixed_width(values_by_key: dict[str, str], fields: list[dict[str, object]]) -> str: """Serialize values into a fixed-width record.""" parts: list[str] = [] for field in fields: key = field["key"] length = field["length"] value = values_by_key.get(key, "") if not value: parts.append("-" * length) continue if len(value) < length: value = value.ljust(length, "-") parts.append(value[:length]) return "".join(parts) def _target_fields(target_format: str) -> list[dict[str, object]]: """Return the field definitions for a target format.""" if target_format == FORMAT_EURING2000PLUS: for index, field in enumerate(EURING_FIELDS): if field.get("key") == "reference": return EURING_FIELDS[: index + 1] return EURING_FIELDS def _normalize_target_format(target_format: str) -> str: """Normalize a target format string to an internal constant.""" try: return normalize_format(target_format) except ValueError: raise ValueError(unknown_format_error(target_format, "target format")) def _normalize_source_format(source_format: str | None, value: str) -> str: """Normalize a source format string or auto-detect from the value.""" if source_format is None: if "|" not in value: return FORMAT_EURING2000 values = value.split("|") reference_index = _field_index("reference") accuracy_index = _field_index("accuracy_of_coordinates") accuracy_value = values[accuracy_index] if accuracy_index < len(values) else "" has_2020_fields = len(values) > reference_index + 1 if (accuracy_value and accuracy_value.isalpha()) or has_2020_fields: return FORMAT_EURING2020 return FORMAT_EURING2000PLUS try: return normalize_format(source_format) except ValueError: raise ValueError(unknown_format_error(source_format, "source format")) def _field_index(key: str) -> int: """Return the field index for a given key.""" for index, field in enumerate(EURING_FIELDS): if field.get("key") == key: return index raise ValueError(f'Unknown field key "{key}".') def _fixed_width_fields() -> list[dict[str, object]]: """Return field definitions for the EURING2000 fixed-width layout.""" fields: list[dict[str, object]] = [] start = 0 for field in EURING_FIELDS: if start >= 94: break length = field.get("length", field.get("max_length")) if not length: break fields.append({"key": field["key"], "length": length}) start += length return fields