{{breadcrumbs}}
← BACK TO HUB
HUB / lib_mfdb_validator.py

lib_mfdb_validator.py

Runtime
Python
Category
Core
Path
/storage/emulated/0/Projects/Management/Libraries/py/Core/lib_mfdb_validator.py
FILE // lib_mfdb_validator.py
"""
Library:     lib_mfdb_validator.py
Family:      Core
Jurisdiction: ["PYTHON", "BEJSON_LIBRARIES"]
Status:      OFFICIAL — Core-Command/Lib (v1.3.1)
Author:      Elton Boehnen
Version:     1.3.1 OFFICIAL
MFDB Version: 1.3.1
Format_Creator: Elton Boehnen
Date:        2026-05-01
Description: Standard validator for MFDB (Multifile Database) structures.
             Layers on lib_bejson_validator.py.
             v1.2 adds support for validating .mfdb.zip archives.
             v1.3 adds support for Federation headers.
"""
import json
import os
import zipfile
from dataclasses import dataclass, field as dc_field
from pathlib import Path
from typing import Any, List, Dict, Optional

from lib_bejson_validator import (
    BEJSONValidationError,
    bejson_validator_validate_file,
)

# ---------------------------------------------------------------------------
# Error codes (30–49)
# ---------------------------------------------------------------------------

E_MFDB_NOT_MANIFEST           = 30
E_MFDB_NOT_ENTITY_FILE        = 31
E_MFDB_MANIFEST_RECORDS_TYPE  = 32
E_MFDB_ENTITY_NOT_FOUND       = 33
E_MFDB_ENTITY_NAME_MISMATCH   = 34
E_MFDB_DUPLICATE_ENTRY        = 35
E_MFDB_NO_PARENT_HIERARCHY    = 36
E_MFDB_MANIFEST_NOT_FOUND     = 37
E_MFDB_BIDIRECTIONAL_FAIL     = 38
E_MFDB_FK_UNRESOLVED          = 39
E_MFDB_MISSING_REQUIRED_FIELD = 40
E_MFDB_NULL_REQUIRED          = 41
E_MFDB_INVALID_ARCHIVE        = 42


class MFDBValidationError(Exception):
    """Raised when MFDB-level validation fails."""
    def __init__(self, message: str, code: int, context: dict = None):
        super().__init__(message)
        self.code = code
        self.context = context or {}


# ---------------------------------------------------------------------------
# Validation state
# ---------------------------------------------------------------------------

@dataclass
class _MFDBValidationState:
    errors:   list[str] = dc_field(default_factory=list)
    warnings: list[str] = dc_field(default_factory=list)
    findings: dict      = dc_field(default_factory=dict)

    def reset(self):
        self.errors.clear()
        self.warnings.clear()
        self.findings.clear()

    def add_error(self, message: str, location: str = ""):
        entry = "ERROR"
        if location:
            entry += f" | Location: {location}"
        entry += f" | Message: {message}"
        self.errors.append(entry)

    def add_warning(self, message: str, location: str = ""):
        entry = "WARNING"
        if location:
            entry += f" | Location: {location}"
        entry += f" | Message: {message}"
        self.warnings.append(entry)

    def add_finding(self, key: str, value: Any):
        self.findings[key] = value

    def has_errors(self)   -> bool:     return bool(self.errors)
    def has_warnings(self) -> bool:     return bool(self.warnings)


_mstate = _MFDBValidationState()


# ---------------------------------------------------------------------------
# Internal helpers (also imported by lib_mfdb_core)
# ---------------------------------------------------------------------------


def _load_json(path: str) -> dict:
    """Load raw JSON without BEJSON validation. Supports .mfdb.zip archives."""
    p = Path(path)
    # Scenario A: Regular file
    if p.is_file() and not path.lower().endswith(".zip"):
        return json.loads(p.read_text(encoding="utf-8"))
    
    # Scenario B: Zip Archive (defaulting to manifest)
    if path.lower().endswith(".zip") and p.is_file():
        with zipfile.ZipFile(path, "r") as z:
            # For MFDB archives, we assume the manifest is the target
            if "104a.mfdb.bejson" in z.namelist():
                return json.loads(z.read("104a.mfdb.bejson").decode("utf-8"))
            raise FileNotFoundError(f"104a.mfdb.bejson not found in archive: {path}")

    # Scenario C: Logical path inside zip (e.g. archive.zip/data/user.bejson)
    # This is a bit more complex, we check if a part of the path is a zip
    parts = p.parts
    for i, part in enumerate(parts):
        if part.lower().endswith(".zip"):
            zip_path = str(Path(*parts[:i+1]))
            inner_path = "/".join(parts[i+1:])
            if os.path.exists(zip_path):
                with zipfile.ZipFile(zip_path, "r") as z:
                    if inner_path in z.namelist():
                        return json.loads(z.read(inner_path).decode("utf-8"))
    
    # Fallback to standard path handling
    return json.loads(p.read_text(encoding="utf-8"))



def _rows_as_dicts(doc: dict) -> list[dict]:
    """Convert a BEJSON document's Values into a list of field-keyed dicts."""
    names = [f["name"] for f in doc["Fields"]]
    return [dict(zip(names, row)) for row in doc["Values"]]



def _resolve_entity_path(manifest_path: str, file_path_rel: str) -> str:
    """Resolve a relative file_path (from manifest record) to an absolute path."""
    if manifest_path.lower().endswith(".zip"):
        return os.path.join(manifest_path, file_path_rel)
    manifest_dir = os.path.dirname(os.path.abspath(manifest_path))
    return os.path.normpath(os.path.join(manifest_dir, file_path_rel))



# ---------------------------------------------------------------------------
# Archive Validation (v1.2 Feature)
# ---------------------------------------------------------------------------

def mfdb_validator_validate_archive(archive_path: str) -> bool:
    """
    Validate an MFDB .zip archive.
    Ensures the archive contains a valid 104a.mfdb.bejson manifest at the root.
    """
    _mstate.reset()
    p = Path(archive_path)
    if not p.exists():
        _mstate.add_error(f"Archive not found: {archive_path}", "File System")
        raise MFDBValidationError(f"Archive not found: {archive_path}", E_MFDB_MANIFEST_NOT_FOUND)

    try:
        with zipfile.ZipFile(archive_path, 'r') as zip_ref:
            nl = zip_ref.namelist()
            if "104a.mfdb.bejson" not in nl:
                _mstate.add_error("Archive missing 104a.mfdb.bejson at root", "Zip Structure")
                raise MFDBValidationError("Missing manifest inside archive", E_MFDB_INVALID_ARCHIVE)
    except zipfile.BadZipFile as exc:
        _mstate.add_error(f"Invalid zip file: {exc}", "Zip Parser")
        raise MFDBValidationError(str(exc), E_MFDB_INVALID_ARCHIVE) from exc

    return True


# ---------------------------------------------------------------------------
# Manifest Validation  (Spec §8.1)
# ---------------------------------------------------------------------------

def mfdb_validator_validate_manifest(manifest_path: str, reset_state: bool = True) -> bool:
    """
    Validate an MFDB manifest file (104a.mfdb.bejson).
    """
    if reset_state:
        _mstate.reset()
    p = Path(manifest_path)

    if not p.exists():
        _mstate.add_error(f"Manifest file not found: {manifest_path}", "File System")
        raise MFDBValidationError(f"File not found: {manifest_path}", E_MFDB_MANIFEST_NOT_FOUND)

    # Delegate BEJSON 104a structural validation
    try:
        bejson_validator_validate_file(manifest_path)
    except BEJSONValidationError as exc:
        _mstate.add_error(f"BEJSON 104a validation failed: {exc}", "BEJSON Validation")
        raise MFDBValidationError(str(exc), E_MFDB_NOT_MANIFEST) from exc

    doc = _load_json(manifest_path)

    # Federation Finding (v1.3)
    if "Network_Role" in doc:
        _mstate.add_finding("Network_Role", doc["Network_Role"])

    if doc.get("Format_Version") != "104a":
        _mstate.add_error("Manifest must be Format_Version '104a'", "Format_Version")
        raise MFDBValidationError("Manifest must be 104a", E_MFDB_NOT_MANIFEST)

    rt = doc.get("Records_Type", [])
    if rt != ["mfdb"]:
        _mstate.add_error(
            f'Records_Type must be ["mfdb"]. Found: {rt}',
            "Records_Type",
        )
        raise MFDBValidationError("Bad manifest Records_Type", E_MFDB_MANIFEST_RECORDS_TYPE)

    field_names = [f["name"] for f in doc.get("Fields", [])]
    for required in ("entity_name", "file_path"):
        if required not in field_names:
            _mstate.add_error(f"Manifest Fields must include '{required}'", "Fields")
            raise MFDBValidationError(
                f"Missing required field '{required}'", E_MFDB_MISSING_REQUIRED_FIELD
            )

    entries = _rows_as_dicts(doc)
    seen_names: set[str] = set()
    seen_paths: set[str] = set()

    for i, entry in enumerate(entries):
        entity_name = entry.get("entity_name")
        file_path   = entry.get("file_path")

        if not entity_name:
            _mstate.add_error(f"Record {i}: entity_name is null or missing", f"Values[{i}]")
            raise MFDBValidationError("Null entity_name", E_MFDB_NULL_REQUIRED)

        if not file_path:
            _mstate.add_error(f"Record {i}: file_path is null or missing", f"Values[{i}]")
            raise MFDBValidationError("Null file_path", E_MFDB_NULL_REQUIRED)

        if entity_name in seen_names:
            _mstate.add_error(f"Duplicate entity_name: '{entity_name}'", f"Values[{i}]")
            raise MFDBValidationError(f"Duplicate entity_name: {entity_name}", E_MFDB_DUPLICATE_ENTRY)
        seen_names.add(entity_name)

        if file_path in seen_paths:
            _mstate.add_error(f"Duplicate file_path: '{file_path}'", f"Values[{i}]")
            raise MFDBValidationError(f"Duplicate file_path: {file_path}", E_MFDB_DUPLICATE_ENTRY)
        seen_paths.add(file_path)

        resolved = _resolve_entity_path(manifest_path, file_path)
        if not os.path.exists(resolved):
            _mstate.add_error(
                f"Entity file '{file_path}' not found (resolved: {resolved})",
                f"Values[{i}]/file_path",
            )
            raise MFDBValidationError(
                f"Entity file not found: {resolved}", 
                E_MFDB_ENTITY_NOT_FOUND,
                context={"entity_name": entity_name, "file_path_rel": file_path, "resolved_path": resolved}
            )

    return True


# ---------------------------------------------------------------------------
# Entity File Validation  (Spec §8.2)
# ---------------------------------------------------------------------------

def mfdb_validator_validate_entity_file(
    entity_path: str,
    check_bidirectional: bool = True,
    reset_state: bool = True,
) -> bool:
    """
    Validate an MFDB entity file (BEJSON 104 with Parent_Hierarchy back-link).
    """
    if reset_state:
        _mstate.reset()
    p = Path(entity_path)

    if not p.exists():
        _mstate.add_error(f"Entity file not found: {entity_path}", "File System")
        raise MFDBValidationError(f"File not found: {entity_path}", E_MFDB_ENTITY_NOT_FOUND)

    try:
        bejson_validator_validate_file(entity_path)
    except BEJSONValidationError as exc:
        _mstate.add_error(f"BEJSON 104 validation failed: {exc}", "BEJSON Validation")
        raise MFDBValidationError(str(exc), E_MFDB_NOT_ENTITY_FILE) from exc

    doc = _load_json(entity_path)

    if doc.get("Format_Version") != "104":
        _mstate.add_error("Entity file must be Format_Version '104'", "Format_Version")
        raise MFDBValidationError("Entity file must be 104", E_MFDB_NOT_ENTITY_FILE)

    rt = doc.get("Records_Type", [])
    entity_name = rt[0] if isinstance(rt, list) and len(rt) > 0 else "Unknown"

    parent_hierarchy = doc.get("Parent_Hierarchy")
    if not parent_hierarchy:
        _mstate.add_error(
            "Entity file must contain Parent_Hierarchy pointing to the manifest",
            "Parent_Hierarchy",
        )
        raise MFDBValidationError("Missing Parent_Hierarchy", E_MFDB_NO_PARENT_HIERARCHY)

    entity_dir    = os.path.dirname(os.path.abspath(entity_path))
    manifest_path = os.path.normpath(os.path.join(entity_dir, parent_hierarchy))

    if not os.path.exists(manifest_path):
        _mstate.add_error(
            f"Parent_Hierarchy '{parent_hierarchy}' resolves to '{manifest_path}' which does not exist",
            "Parent_Hierarchy",
        )
        raise MFDBValidationError(
            f"Manifest not found: {manifest_path}", 
            E_MFDB_MANIFEST_NOT_FOUND,
            context={
                "entity_name": entity_name,
                "actual_path": os.path.abspath(entity_path),
                "suggested_hierarchy": "../104a.mfdb.bejson"
            }
        )

    if not os.path.basename(manifest_path).endswith(".mfdb.bejson"):
        _mstate.add_warning(
            f"Parent_Hierarchy target '{manifest_path}' does not end in '.mfdb.bejson'. "
            f"Expected filename: 104a.mfdb.bejson",
            "Parent_Hierarchy",
        )

    rt = doc.get("Records_Type", [])
    if len(rt) != 1:
        _mstate.add_error(
            f"Entity file Records_Type must contain exactly one string. Found: {rt}",
            "Records_Type",
        )
        raise MFDBValidationError("Entity Records_Type must be single-entry", E_MFDB_NOT_ENTITY_FILE)

    entity_name = rt[0]

    try:
        manifest_doc = _load_json(manifest_path)
        entries      = _rows_as_dicts(manifest_doc)
        manifest_entity_names = [e.get("entity_name") for e in entries]
    except Exception as exc:
        _mstate.add_error(f"Could not read manifest: {exc}", "Manifest")
        raise MFDBValidationError(f"Cannot read manifest: {exc}", E_MFDB_MANIFEST_NOT_FOUND) from exc

    if entity_name not in manifest_entity_names:
        _mstate.add_error(
            f"Records_Type '{entity_name}' does not appear as entity_name in the manifest",
            "Records_Type vs Manifest",
        )
        raise MFDBValidationError(
            f"Entity '{entity_name}' not registered in manifest", E_MFDB_ENTITY_NAME_MISMATCH
        )

    if check_bidirectional:
        match = next((e for e in entries if e.get("entity_name") == entity_name), None)
        if match:
            manifest_dir  = os.path.dirname(os.path.abspath(manifest_path))
            from_manifest = os.path.normpath(
                os.path.join(manifest_dir, match.get("file_path", ""))
            )
            this_file     = os.path.normpath(os.path.abspath(entity_path))
            if from_manifest != this_file:
                _mstate.add_error(
                    f"Bidirectional check failed for entity '{entity_name}': "
                    f"manifest points to '{from_manifest}', but this file is '{this_file}'",
                    "Bidirectional Path Check",
                )
                raise MFDBValidationError(
                    "Bidirectional path check failed", 
                    E_MFDB_BIDIRECTIONAL_FAIL,
                    context={
                        "entity_name": entity_name,
                        "manifest_path": manifest_path,
                        "expected_path": from_manifest,
                        "actual_path": this_file,
                        "suggested_hierarchy": os.path.relpath(manifest_path, entity_dir)
                    }
                )

    return True


# ---------------------------------------------------------------------------
# Database-Level Validation  (Spec §8.3)
# ---------------------------------------------------------------------------

def mfdb_validator_check_integrity(manifest_path: str) -> bool:
    """
    Strict integrity check for MFDB boot.
    Asserts that the record_count in the manifest matches the actual row count
    for every entity file.
    """
    manifest_doc = _load_json(manifest_path)
    entries      = _rows_as_dicts(manifest_doc)
    
    for entry in entries:
        entity_name    = entry["entity_name"]
        file_path_rel  = entry["file_path"]
        declared_count = entry.get("record_count")
        
        if declared_count is None:
            continue
            
        resolved = _resolve_entity_path(manifest_path, file_path_rel)
        if not os.path.exists(resolved):
            continue
            
        entity_doc   = _load_json(resolved)
        actual_count = len(entity_doc.get("Values", []))
        
        if actual_count != declared_count:
            msg = f"Integrity Failure: Entity '{entity_name}' declares {declared_count} records, but found {actual_count}."
            _mstate.add_error(msg, f"Entity/{entity_name}/record_count")
            raise MFDBValidationError(msg, E_MFDB_BIDIRECTIONAL_FAIL)
            
    return True

def mfdb_validator_validate_database(
    manifest_path: str,
    strict_fk: bool = False,
    reset_state: bool = True,
) -> bool:
    """
    Full MFDB database validation.
    """
    if reset_state:
        _mstate.reset()

    # Step 1: Validate the manifest itself.
    try:
        mfdb_validator_validate_manifest(manifest_path, reset_state=False)
    except MFDBValidationError:
        raise

    manifest_doc = _load_json(manifest_path)
    entries      = _rows_as_dicts(manifest_doc)

    pk_map = {
        e["entity_name"]: e.get("primary_key")
        for e in entries
        if e.get("primary_key")
    }

    # Federation Finding: Registry presence (v1.3)
    if "ConnectedSlave" in pk_map or any(e["entity_name"] == "ConnectedSlave" for e in entries):
        _mstate.add_finding("Has_Federation_Registry", True)

    # Step 2: Validate each entity file.
    for entry in entries:
        entity_name    = entry["entity_name"]
        file_path_rel  = entry["file_path"]
        declared_count = entry.get("record_count")

        resolved = _resolve_entity_path(manifest_path, file_path_rel)

        try:
            mfdb_validator_validate_entity_file(resolved, check_bidirectional=True, reset_state=False)
        except MFDBValidationError as exc:
            _mstate.add_error(
                f"Entity '{entity_name}' failed validation: {exc}",
                f"Entity/{entity_name}",
            )
            raise

        if declared_count is not None:
            entity_doc   = _load_json(resolved)
            actual_count = len(entity_doc.get("Values", []))
            if actual_count != declared_count:
                _mstate.add_warning(
                    f"Entity '{entity_name}': manifest declares record_count={declared_count}, "
                    f"actual={actual_count}. Call mfdb_core_sync_all_counts() to correct.",
                    f"Entity/{entity_name}/record_count",
                )

        if strict_fk:
            entity_doc = _load_json(resolved)
            fk_fields  = [
                f["name"]
                for f in entity_doc.get("Fields", [])
                if f["name"].endswith("_fk")
            ]
            for fk_field in fk_fields:
                target_found = any(
                    pk and (pk in fk_field or en.lower() in fk_field.lower())
                    for en, pk in pk_map.items()
                )
                if not target_found:
                    _mstate.add_warning(
                        f"Entity '{entity_name}': FK field '{fk_field}' has no matching "
                        f"primary_key declaration in the manifest.",
                        f"Entity/{entity_name}/{fk_field}",
                    )

    return True


# ---------------------------------------------------------------------------
# Validation report
# ---------------------------------------------------------------------------

def mfdb_validator_get_report(manifest_path: str, strict_fk: bool = False) -> str:
    """Run full database validation and return a human-readable report string."""
    valid = False
    try:
        valid = mfdb_validator_validate_database(manifest_path, strict_fk=strict_fk)
    except (MFDBValidationError, Exception):
        pass

    lines = [
        "=== MFDB Validation Report ===",
        f"Manifest : {manifest_path}",
        f"Status   : {'VALID' if valid else 'INVALID'}",
        "",
        f"Errors   : {len(_mstate.errors)}",
    ]
    if _mstate.has_errors():
        lines.append("---")
        lines.extend(_mstate.errors)

    lines += ["", f"Warnings : {len(_mstate.warnings)}"]
    if _mstate.has_warnings():
        lines.append("---")
        lines.extend(_mstate.warnings)

    return "\n".join(lines)


# ---------------------------------------------------------------------------
# State accessors
# ---------------------------------------------------------------------------

def mfdb_validator_reset_state():                   _mstate.reset()
def mfdb_validator_has_errors()    -> bool:         return _mstate.has_errors()
def mfdb_validator_has_warnings()  -> bool:         return _mstate.has_warnings()
def mfdb_validator_get_errors()    -> list[str]:    return list(_mstate.errors)
def mfdb_validator_get_warnings()  -> list[str]:    return list(_mstate.warnings)
def mfdb_validator_get_findings()  -> dict:         return dict(_mstate.findings)
def mfdb_validator_error_count()   -> int:          return len(_mstate.errors)
def mfdb_validator_warning_count() -> int:          return len(_mstate.warnings)