{{breadcrumbs}}
← BACK TO HUB
HUB / lib_bejson_core.py

lib_bejson_core.py

Runtime
Python
Category
Core
Path
/storage/emulated/0/Projects/Management/Libraries/py/Core/lib_bejson_core.py
FILE // lib_bejson_core.py
"""
Library:     lib_bejson_core.py
Family:      Core
Jurisdiction: ["PYTHON", "BEJSON_LIBRARIES"]
Status:      OFFICIAL
Author:      Elton Boehnen
Version:     1.3 OFFICIAL
MFDB Version: 1.3.1
Date:        2026-05-01
Description: BEJSON (Boehnen Elton JSON) core library — document creation, mutation, validation,
             atomic file I/O with fsync, and query/sort utilities.
             MFDB relational functions are in lib_mfdb_core.py (decoupled).
"""
import time

import copy
import json
import os
import sys
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Callable

LIB_DIR = os.path.dirname(os.path.abspath(__file__))
if LIB_DIR not in sys.path:
    sys.path.append(LIB_DIR)

from lib_bejson_validator import (
    BEJSONValidationError,
    bejson_validator_get_report,
    bejson_validator_validate_file,
    bejson_validator_validate_string,
)

# ---------------------------------------------------------------------------
# Error codes
# ---------------------------------------------------------------------------

E_CORE_INVALID_VERSION = 20
E_CORE_INVALID_OPERATION = 21
E_CORE_INDEX_OUT_OF_BOUNDS = 22
E_CORE_FIELD_NOT_FOUND = 23
E_CORE_TYPE_CONVERSION_FAILED = 24
E_CORE_BACKUP_FAILED = 25
E_CORE_WRITE_FAILED = 26
E_CORE_QUERY_FAILED = 27


class BEJSONCoreError(Exception):
    def __init__(self, message: str, code: int):
        super().__init__(message)
        self.code = code


# ---------------------------------------------------------------------------
# ATOMIC FILE OPERATIONS
# ---------------------------------------------------------------------------

def __bejson_core_atomic_backup(file_path: str, backup_suffix: str = ".backup") -> str:
    """
    Create a timestamped backup of file_path.
    Returns the backup path, or '' if no file existed.
    Mirrors __bejson_core_atomic_backup.
    """
    path = Path(file_path)
    if not path.exists():
        return ""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = path.with_name(f"{path.name}{backup_suffix}.{timestamp}")
    try:
        shutil.copy2(path, backup_path)
    except OSError as exc:
        raise BEJSONCoreError(f"Backup failed: {exc}", E_CORE_BACKUP_FAILED)
    return str(backup_path)


def __bejson_core_restore_backup(file_path: str, backup_path: str) -> bool:
    """
    Restore backup_path → file_path.
    Mirrors __bejson_core_restore_backup.
    """
    bp = Path(backup_path)
    if bp.exists():
        shutil.move(str(bp), file_path)
        return True
    return False


def bejson_core_atomic_write(file_path: str, content: dict, create_backup: bool = True) -> None:
    """
    Validate content and write it atomically to file_path.
    CRITICAL FIXES (v3.1):
      - Same-partition temp files: temp file is created as a sibling to the
        target file (same directory), guaranteeing atomic os.rename() on the
        same filesystem.  Cross-device falls back to shutil.copy2.
      - Explicit fsync: file descriptor is flushed via os.fsync() BEFORE
        rename, closing the data-loss window from volatile page cache.
      - Cross-filesystem safe: if os.rename fails (EXDEV), falls back to
        shutil.copy2 + os.unlink (atomic enough with fsync).
    Mirrors bejson_core_atomic_write.
    """
    backup_path = ""
    if create_backup:
        backup_path = __bejson_core_atomic_backup(file_path)

    json_text = json.dumps(content, indent=2)

    # Write to a temp file first, validate, then rename
    path = Path(file_path)
    # Ensure output parent directory exists
    path.parent.mkdir(parents=True, exist_ok=True)

    # CRITICAL FIX: always create temp as a SIBLING to the target file
    # so os.rename stays on the same partition (true atomic inode swap).
    temp_dir = str(path.parent)
    # Only fall back to system tmpdir if parent is not writable AND
    # the user explicitly set TMPDIR (opt-in cross-partition fallback).
    if not os.access(temp_dir, os.W_OK):
        user_tmp = os.environ.get("TMPDIR", "")
        if user_tmp:
            temp_dir = user_tmp
            os.makedirs(temp_dir, exist_ok=True)
        else:
            raise BEJSONCoreError(
                f"Cannot write to target directory '{temp_dir}' and "
                "TMPDIR not set. Set TMPDIR to a writable location on the "
                "same partition as the target, or grant write permission.",
                E_CORE_WRITE_FAILED,
            )

    tmp_fd = None
    tmp_path = ""
    try:
        # Use O_WRONLY|O_CREAT|O_EXCL for safe temp file creation
        fd, tmp_path = tempfile.mkstemp(
            dir=temp_dir, suffix=".tmp", prefix=".bejson_"
        )
        tmp_fd = fd
        with os.fdopen(fd, "w", encoding="utf-8") as tmp:
            tmp.write(json_text)
            # CRITICAL FIX: explicit fsync before close
            tmp.flush()
            os.fsync(tmp.fileno())
    except OSError as exc:
        if tmp_fd is not None:
            try:
                os.close(tmp_fd)
            except OSError:
                pass
        if tmp_path:
            try:
                os.unlink(tmp_path)
            except OSError:
                pass
        if backup_path:
            __bejson_core_restore_backup(file_path, backup_path)
        raise BEJSONCoreError(f"Write failed: {exc}", E_CORE_WRITE_FAILED)

    # Validate the temp file BEFORE committing
    try:
        bejson_validator_validate_file(tmp_path)
    except BEJSONValidationError as exc:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass
        if backup_path:
            __bejson_core_restore_backup(file_path, backup_path)
        raise BEJSONCoreError(f"Validation failed: {exc}", E_CORE_WRITE_FAILED)

    # Atomic rename (same-partition guaranteed by sibling temp)
    try:
        os.rename(tmp_path, file_path)
        # fsync the directory entry to ensure rename is durable
        dir_fd = os.open(str(path.parent), os.O_RDONLY)
        try:
            os.fsync(dir_fd)
        finally:
            os.close(dir_fd)
    except OSError as exc:
        # Cross-filesystem: fall back to copy2 + unlink (still safe with fsync)
        try:
            shutil.copy2(tmp_path, file_path)
            os.unlink(tmp_path)
        except OSError:
            try:
                os.unlink(tmp_path)
            except OSError:
                pass
            if backup_path:
                __bejson_core_restore_backup(file_path, backup_path)
            raise BEJSONCoreError(f"Atomic move failed: {exc}", E_CORE_WRITE_FAILED)

    # Clean up old backup (write succeeded)
    if backup_path:
        Path(backup_path).unlink(missing_ok=True)


# ---------------------------------------------------------------------------
# DOCUMENT CREATION
# ---------------------------------------------------------------------------

def bejson_core_create_104(records_type: str, fields: list[dict], values: list[list]) -> dict:
    """
    Create a BEJSON 104 document.
    Mirrors bejson_core_create_104.
    """
    return {
        "Format": "BEJSON",
        "Format_Version": "104",
        "Format_Creator": "Elton Boehnen",
        "Records_Type": [records_type],
        "Fields": fields,
        "Values": values,
    }


def bejson_core_create_104a(
    records_type: str,
    fields: list[dict],
    values: list[list],
    **custom_headers,
) -> dict:
    """
    Create a BEJSON 104a document with optional custom top-level headers.
    Mirrors bejson_core_create_104a.
    """
    doc = {
        "Format": "BEJSON",
        "Format_Version": "104a",
        "Format_Creator": "Elton Boehnen",
        "Records_Type": [records_type],
        "Fields": fields,
        "Values": values,
    }
    doc.update(custom_headers)
    return doc


def bejson_core_create_104db(
    records_types: list[str], fields: list[dict], values: list[list]
) -> dict:
    """
    Create a BEJSON 104db document (multi-type).
    Mirrors bejson_core_create_104db.
    """
    return {
        "Format": "BEJSON",
        "Format_Version": "104db",
        "Format_Creator": "Elton Boehnen",
        "Records_Type": records_types,
        "Fields": fields,
        "Values": values,
    }


# ---------------------------------------------------------------------------
# DOCUMENT LOADING & PARSING
# ---------------------------------------------------------------------------

def bejson_core_load_file(file_path: str) -> dict:
    """
    Load and validate a BEJSON file from disk.
    Mirrors bejson_core_load_file.
    """
    path = Path(file_path)
    if not path.exists():
        raise BEJSONCoreError(f"File not found: {file_path}", E_CORE_FIELD_NOT_FOUND)
    bejson_validator_validate_file(file_path)
    return json.loads(path.read_text(encoding="utf-8"))


def bejson_core_load_string(json_string: str) -> dict:
    """
    Parse and validate a BEJSON JSON string.
    Mirrors bejson_core_load_string.
    """
    bejson_validator_validate_string(json_string)
    return json.loads(json_string)


def bejson_core_get_version(doc: dict) -> str:
    """Return the Format_Version. Mirrors bejson_core_get_version."""
    return doc["Format_Version"]


def bejson_core_get_records_types(doc: dict) -> list[str]:
    """Return the Records_Type list. Mirrors bejson_core_get_records_types."""
    return doc["Records_Type"]


def bejson_core_get_fields(doc: dict) -> list[dict]:
    """Return the Fields list. Mirrors bejson_core_get_fields."""
    return doc["Fields"]


def bejson_core_get_field_index(doc: dict, field_name: str) -> int:
    """
    Return the zero-based index of a field by name.
    Raises BEJSONCoreError if not found.
    Mirrors bejson_core_get_field_index.
    """
    for i, f in enumerate(doc["Fields"]):
        if f["name"] == field_name:
            return i
    raise BEJSONCoreError(f"Field not found: {field_name}", E_CORE_FIELD_NOT_FOUND)


def bejson_core_get_field_def(doc: dict, field_name: str) -> dict:
    """
    Return the field definition dict for a named field.
    Mirrors bejson_core_get_field_def.
    """
    for f in doc["Fields"]:
        if f["name"] == field_name:
            return f
    raise BEJSONCoreError(f"Field not found: {field_name}", E_CORE_FIELD_NOT_FOUND)


def bejson_core_get_field_count(doc: dict) -> int:
    """Return number of fields. Mirrors bejson_core_get_field_count."""
    return len(doc["Fields"])


def bejson_core_get_record_count(doc: dict) -> int:
    """Return number of records. Mirrors bejson_core_get_record_count."""
    return len(doc["Values"])


# ---------------------------------------------------------------------------
# POSITION-BASED INDEXING & QUERYING
# ---------------------------------------------------------------------------

def _check_record_bounds(doc: dict, record_index: int):
    if record_index < 0 or record_index >= bejson_core_get_record_count(doc):
        raise BEJSONCoreError(
            f"Record index {record_index} out of bounds", E_CORE_INDEX_OUT_OF_BOUNDS
        )


def _check_field_bounds(doc: dict, field_index: int):
    if field_index < 0 or field_index >= bejson_core_get_field_count(doc):
        raise BEJSONCoreError(
            f"Field index {field_index} out of bounds", E_CORE_INDEX_OUT_OF_BOUNDS
        )


def bejson_core_get_value_at(doc: dict, record_index: int, field_index: int) -> Any:
    """
    Return the value at [record_index][field_index].
    Mirrors bejson_core_get_value_at.
    """
    _check_record_bounds(doc, record_index)
    _check_field_bounds(doc, field_index)
    return doc["Values"][record_index][field_index]


def bejson_core_get_record(doc: dict, record_index: int) -> list:
    """
    Return a record (list of values) by index.
    Mirrors bejson_core_get_record.
    """
    _check_record_bounds(doc, record_index)
    return doc["Values"][record_index]


def bejson_core_get_field_values(doc: dict, field_name: str) -> list:
    """
    Return a list of all values for a named field across all records.
    Mirrors bejson_core_get_field_values.
    """
    idx = bejson_core_get_field_index(doc, field_name)
    return [record[idx] for record in doc["Values"]]


def bejson_core_query_records(doc: dict, field_name: str, search_value: Any) -> list[list]:
    """
    Return all records where field_name == search_value (exact match).
    Mirrors bejson_core_query_records.
    """
    idx = bejson_core_get_field_index(doc, field_name)
    return [record for record in doc["Values"] if record[idx] == search_value]


def bejson_core_query_records_advanced(doc: dict, **conditions) -> list[list]:
    """
    Return records matching all keyword conditions (AND logic).
    Example: bejson_core_query_records_advanced(doc, age=30, city="NYC")
    Mirrors bejson_core_query_records_advanced.
    """
    field_indices = {name: bejson_core_get_field_index(doc, name) for name in conditions}
    return [
        record
        for record in doc["Values"]
        if all(record[field_indices[name]] == val for name, val in conditions.items())
    ]


# ---------------------------------------------------------------------------
# 104DB SPECIFIC OPERATIONS
# ---------------------------------------------------------------------------

def bejson_core_get_records_by_type(doc: dict, record_type: str) -> list[list]:
    """
    Return records whose Record_Type_Parent matches record_type.
    Mirrors bejson_core_get_records_by_type.
    """
    if bejson_core_get_version(doc) != "104db":
        raise BEJSONCoreError("Operation requires 104db document", E_CORE_INVALID_OPERATION)
    return [record for record in doc["Values"] if record[0] == record_type]


def bejson_core_has_record_type(doc: dict, record_type: str) -> bool:
    """
    Return True if record_type is declared in Records_Type.
    Mirrors bejson_core_has_record_type.
    """
    return record_type in doc["Records_Type"]


def bejson_core_get_field_applicability(doc: dict, field_name: str) -> str:
    """
    Return the Record_Type_Parent for a field.
    In 104db, this must be a valid record type (no 'common').
    """
    field_def = bejson_core_get_field_def(doc, field_name)
    rtp = field_def.get("Record_Type_Parent")
    
    version = bejson_core_get_version(doc)
    if version == "104db":
        if rtp is None:
            # Check for legacy 'applies_to' only to provide a helpful error
            if "applies_to" in field_def:
                 raise BEJSONCoreError(f"Field '{field_name}' uses legacy 'applies_to'. 104db requires 'Record_Type_Parent'.", E_CORE_INVALID_OPERATION)
            raise BEJSONCoreError(f"Field '{field_name}' missing Record_Type_Parent in 104db", E_CORE_INVALID_OPERATION)
    
    return rtp or "common"


# ---------------------------------------------------------------------------
# DATA MODIFICATION
# All mutation functions return a new document dict (immutable style).
# ---------------------------------------------------------------------------

def _coerce_value(value: Any, field_type: str) -> Any:
    """Coerce and validate a raw Python value to the declared field type."""
    if field_type == "string":
        return str(value)
    if field_type in ("integer", "number"):
        try:
            return int(value) if field_type == "integer" else float(value)
        except (TypeError, ValueError):
            raise BEJSONCoreError(
                f"Cannot convert '{value}' to {field_type}", E_CORE_TYPE_CONVERSION_FAILED
            )
    if field_type == "boolean":
        if isinstance(value, bool):
            return value
        if isinstance(value, str) and value.lower() in ("true", "false"):
            return value.lower() == "true"
        raise BEJSONCoreError(
            f"Cannot convert '{value}' to boolean", E_CORE_TYPE_CONVERSION_FAILED
        )
    return value


def bejson_core_set_value_at(
    doc: dict, record_index: int, field_index: int, new_value: Any
) -> dict:
    """
    Return a new document with the value at [record_index][field_index] replaced.
    Mirrors bejson_core_set_value_at.
    """
    _check_record_bounds(doc, record_index)
    _check_field_bounds(doc, field_index)

    field_def = doc["Fields"][field_index]
    coerced = _coerce_value(new_value, field_def["type"])

    doc = copy.deepcopy(doc)
    doc["Values"][record_index][field_index] = coerced
    return doc


def bejson_core_add_record(doc: dict, values: list) -> dict:
    """
    Return a new document with a record appended.
    Mirrors bejson_core_add_record.
    """
    field_count = bejson_core_get_field_count(doc)
    if len(values) != field_count:
        raise BEJSONCoreError(
            f"Record must have {field_count} values, got {len(values)}",
            E_CORE_INVALID_OPERATION,
        )
    coerced = [_coerce_value(v, doc["Fields"][i]["type"]) for i, v in enumerate(values)]
    doc = copy.deepcopy(doc)
    doc["Values"].append(coerced)
    return doc


def bejson_core_remove_record(doc: dict, record_index: int) -> dict:
    """
    Return a new document with the record at record_index removed.
    Mirrors bejson_core_remove_record.
    """
    _check_record_bounds(doc, record_index)
    doc = copy.deepcopy(doc)
    del doc["Values"][record_index]
    return doc


def bejson_core_update_field(
    doc: dict, record_index: int, field_name: str, new_value: Any
) -> dict:
    """
    Return a new document with a named field updated in a specific record.
    Mirrors bejson_core_update_field.
    """
    field_index = bejson_core_get_field_index(doc, field_name)
    return bejson_core_set_value_at(doc, record_index, field_index, new_value)


# ---------------------------------------------------------------------------
# TABLE OPERATIONS (COLUMN / ROW MANIPULATION)
# ---------------------------------------------------------------------------

def bejson_core_add_column(
    doc: dict,
    field_name: str,
    field_type: str,
    default_value: Any = None,
    record_type_parent: str = "",
) -> dict:
    """
    Return a new document with a new column appended.
    Mirrors bejson_core_add_column.
    """
    try:
        bejson_core_get_field_index(doc, field_name)
        raise BEJSONCoreError(f"Field '{field_name}' already exists", E_CORE_INVALID_OPERATION)
    except BEJSONCoreError as exc:
        if exc.code != E_CORE_FIELD_NOT_FOUND:
            raise

    new_field: dict = {"name": field_name, "type": field_type}
    if record_type_parent:
        new_field["Record_Type_Parent"] = record_type_parent

    doc = copy.deepcopy(doc)
    doc["Fields"].append(new_field)
    for record in doc["Values"]:
        record.append(default_value)
    return doc


def bejson_core_remove_column(doc: dict, field_name: str) -> dict:
    """
    Return a new document with the named column removed.
    Mirrors bejson_core_remove_column.
    """
    idx = bejson_core_get_field_index(doc, field_name)
    doc = copy.deepcopy(doc)
    del doc["Fields"][idx]
    for record in doc["Values"]:
        del record[idx]
    return doc


def bejson_core_rename_column(doc: dict, old_name: str, new_name: str) -> dict:
    """
    Return a new document with a column renamed.
    Mirrors bejson_core_rename_column.
    """
    idx = bejson_core_get_field_index(doc, old_name)
    try:
        bejson_core_get_field_index(doc, new_name)
        raise BEJSONCoreError(f"Field '{new_name}' already exists", E_CORE_INVALID_OPERATION)
    except BEJSONCoreError as exc:
        if exc.code != E_CORE_FIELD_NOT_FOUND:
            raise

    doc = copy.deepcopy(doc)
    doc["Fields"][idx]["name"] = new_name
    return doc


def bejson_core_get_column(doc: dict, field_name: str) -> list:
    """
    Return all values for a column. Mirrors bejson_core_get_column.
    Delegates to bejson_core_get_field_values.
    """
    return bejson_core_get_field_values(doc, field_name)


def bejson_core_set_column(doc: dict, field_name: str, values: list) -> dict:
    """
    Return a new document with an entire column replaced.
    Mirrors bejson_core_set_column.
    """
    idx = bejson_core_get_field_index(doc, field_name)
    record_count = bejson_core_get_record_count(doc)
    if len(values) != record_count:
        raise BEJSONCoreError(
            f"Value count ({len(values)}) must match record count ({record_count})",
            E_CORE_INVALID_OPERATION,
        )
    doc = copy.deepcopy(doc)
    for i, val in enumerate(values):
        doc["Values"][i][idx] = val
    return doc


def bejson_core_filter_rows(doc: dict, predicate) -> dict:
    """
    Return a new document containing only records for which predicate(record) is True.
    predicate receives a raw list (the record values).
    Mirrors bejson_core_filter_rows.
    """
    doc = copy.deepcopy(doc)
    doc["Values"] = [record for record in doc["Values"] if predicate(record)]
    return doc


def bejson_core_sort_by_field(doc: dict, field_name: str, ascending: bool = True) -> dict:
    """
    Return a new document with Values sorted by a named field.
    Mirrors bejson_core_sort_by_field.
    """
    idx = bejson_core_get_field_index(doc, field_name)
    doc = copy.deepcopy(doc)
    doc["Values"].sort(key=lambda r: (r[idx] is None, r[idx]), reverse=not ascending)
    return doc


# ---------------------------------------------------------------------------
# UTILITY FUNCTIONS
# ---------------------------------------------------------------------------

def bejson_core_pretty_print(doc: dict) -> str:
    """Return a pretty-printed JSON string. Mirrors bejson_core_pretty_print."""
    return json.dumps(doc, indent=2)


def bejson_core_compact_print(doc: dict) -> str:
    """Return a compact JSON string. Mirrors bejson_core_compact_print."""
    return json.dumps(doc, separators=(",", ":"))


def bejson_core_is_valid(doc: dict) -> bool:
    """
    Return True if doc is a valid BEJSON document, False otherwise.
    Mirrors bejson_core_is_valid.
    """
    try:
        bejson_validator_validate_string(json.dumps(doc))
        return True
    except (BEJSONValidationError, Exception):
        return False


def bejson_core_get_stats(doc: dict) -> dict:
    """
    Return a statistics dict for the document.
    Mirrors bejson_core_get_stats.
    """
    return {
        "version": bejson_core_get_version(doc),
        "field_count": bejson_core_get_field_count(doc),
        "record_count": bejson_core_get_record_count(doc),
        "records_types": bejson_core_get_records_types(doc),
    }

# ---------------------------------------------------------------------------
# LOCKING
# ---------------------------------------------------------------------------

def bejson_core_acquire_lock(file_path: str, timeout: int = 10) -> bool:
    """Acquire a lock file for the given file_path."""
    lock_path = file_path + ".lock"
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            # Use O_EXCL to ensure atomic creation
            fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
            with os.fdopen(fd, "w") as f:
                f.write(str(os.getpid()))
            return True
        except FileExistsError:
            time.sleep(0.1)
    return False

def bejson_core_release_lock(file_path: str) -> None:
    """Release the lock file for the given file_path."""
    lock_path = file_path + ".lock"
    if os.path.exists(lock_path):
        os.unlink(lock_path)