Skip to content

DerivaModel

The DerivaModel class provides schema introspection and manipulation capabilities for Deriva catalogs. It handles table relationships, associations, and catalog structure management.

Model module for DerivaML.

This module provides catalog and database model classes, as well as handle wrappers for ERMrest model objects and annotation builders.

Key components: - DerivaModel: Schema analysis utilities - DatabaseModel: SQLite database from BDBag - SchemaBuilder/SchemaORM: Create ORM from Deriva Model (Phase 1) - DataLoader: Fill database from data source (Phase 2) - DataSource: Protocol for data sources (BagDataSource, CatalogDataSource) - ForeignKeyOrderer: Compute FK-safe insertion order

Lazy imports are used for DatabaseModel and DerivaMLDatabase to avoid circular imports with the dataset module.

Aggregate

Bases: str, Enum

Aggregation functions for pseudo-columns.

Used when a pseudo-column follows an inbound foreign key and returns multiple values that need to be aggregated.

Attributes:

Name Type Description
MIN

Minimum value

MAX

Maximum value

CNT

Count of values

CNT_D

Count of distinct values

ARRAY

Array of all values

ARRAY_D

Array of distinct values

Example

pc = PseudoColumn( ... source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"], ... aggregate=Aggregate.CNT, ... markdown_name="Sample Count" ... )

Get distinct values as array

pc = PseudoColumn( ... source=[InboundFK("domain", "Tag_Item_fkey"), "Name"], ... aggregate=Aggregate.ARRAY_D, ... markdown_name="Tags" ... )

Source code in src/deriva_ml/model/annotations.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class Aggregate(str, Enum):
    """Aggregation functions for pseudo-columns.

    Used when a pseudo-column follows an inbound foreign key and returns
    multiple values that need to be aggregated.

    Attributes:
        MIN: Minimum value
        MAX: Maximum value
        CNT: Count of values
        CNT_D: Count of distinct values
        ARRAY: Array of all values
        ARRAY_D: Array of distinct values

    Example:
        >>> # Count related records
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
        ...     aggregate=Aggregate.CNT,
        ...     markdown_name="Sample Count"
        ... )
        >>>
        >>> # Get distinct values as array
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
        ...     aggregate=Aggregate.ARRAY_D,
        ...     markdown_name="Tags"
        ... )
    """
    MIN = "min"
    MAX = "max"
    CNT = "cnt"
    CNT_D = "cnt_d"
    ARRAY = "array"
    ARRAY_D = "array_d"

ArrayUxMode

Bases: str, Enum

Display modes for array values in pseudo-columns.

Controls how arrays of values are rendered in the UI.

Attributes:

Name Type Description
RAW

Raw array display

CSV

Comma-separated values

OLIST

Ordered (numbered) list

ULIST

Unordered (bulleted) list

Example

pc = PseudoColumn( ... source=[InboundFK("domain", "Tag_Item_fkey"), "Name"], ... aggregate=Aggregate.ARRAY, ... display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV) ... )

Source code in src/deriva_ml/model/annotations.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class ArrayUxMode(str, Enum):
    """Display modes for array values in pseudo-columns.

    Controls how arrays of values are rendered in the UI.

    Attributes:
        RAW: Raw array display
        CSV: Comma-separated values
        OLIST: Ordered (numbered) list
        ULIST: Unordered (bulleted) list

    Example:
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
        ...     aggregate=Aggregate.ARRAY,
        ...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV)
        ... )
    """
    RAW = "raw"
    CSV = "csv"
    OLIST = "olist"
    ULIST = "ulist"

BagDataSource

DataSource implementation for BDBag directories.

Reads data from CSV files in a bag's data/ directory. Handles asset URL localization via fetch.txt.

Example

source = BagDataSource(Path("/path/to/bag"))

List available tables

print(source.list_available_tables())

Get data for a table

for row in source.get_table_data("Image"): print(row["Filename"])

Source code in src/deriva_ml/model/data_sources.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class BagDataSource:
    """DataSource implementation for BDBag directories.

    Reads data from CSV files in a bag's data/ directory.
    Handles asset URL localization via fetch.txt.

    Example:
        source = BagDataSource(Path("/path/to/bag"))

        # List available tables
        print(source.list_available_tables())

        # Get data for a table
        for row in source.get_table_data("Image"):
            print(row["Filename"])
    """

    def __init__(
        self,
        bag_path: Path,
        model: Model | None = None,
        asset_localization: bool = True,
    ):
        """Initialize from a bag path.

        Args:
            bag_path: Path to BDBag directory.
            model: Optional ERMrest Model for schema info. If not provided,
                will try to load from bag's schema.json.
            asset_localization: Whether to localize asset URLs to local paths
                using fetch.txt mapping.
        """
        self.bag_path = Path(bag_path)
        self.data_path = self.bag_path / "data"

        # Load model if not provided
        if model is None:
            schema_file = self.data_path / "schema.json"
            if schema_file.exists():
                self.model = Model.fromfile("file-system", schema_file)
            else:
                self.model = None
                logger.warning(f"No schema.json found in {self.bag_path}")
        else:
            self.model = model

        # Build asset map for URL localization
        self._asset_map = self._build_asset_map() if asset_localization else {}

        # Cache of table name -> list of csv file paths (multiple paths for nested datasets)
        self._csv_cache: dict[str, list[Path]] = {}
        self._build_csv_cache()

    def _build_csv_cache(self) -> None:
        """Build cache mapping table names to CSV file paths.

        Nested datasets can produce multiple CSV files for the same table
        at different directory depths. All paths are collected so that
        get_table_data() yields the union of all rows.
        """
        for csv_file in self.data_path.rglob("*.csv"):
            table_name = csv_file.stem
            self._csv_cache.setdefault(table_name, []).append(csv_file)

    def _build_asset_map(self) -> dict[str, str]:
        """Build a map from remote URLs to local file paths using fetch.txt.

        Returns:
            Dictionary mapping URL paths to local file paths.
        """
        fetch_map = {}
        fetch_file = self.bag_path / "fetch.txt"

        if not fetch_file.exists():
            logger.debug(f"No fetch.txt in bag {self.bag_path.name}")
            return fetch_map

        try:
            with fetch_file.open(newline="\n") as f:
                for row in f:
                    # Rows in fetch.txt are tab-separated: URL, size, local_path
                    fields = row.split("\t")
                    if len(fields) >= 3:
                        local_file = fields[2].replace("\n", "")
                        local_path = f"{self.bag_path}/{local_file}"
                        fetch_map[urlparse(fields[0]).path] = local_path
        except Exception as e:
            logger.warning(f"Error reading fetch.txt: {e}")

        return fetch_map

    def _get_table_name(self, table: DerivaTable | str) -> str:
        """Extract table name from table object or string."""
        if isinstance(table, DerivaTable):
            return table.name
        # Handle schema.table format
        if "." in table:
            return table.split(".")[-1]
        return table

    def _is_asset_table(self, table_name: str) -> bool:
        """Check if a table is an asset table."""
        if self.model is None:
            return False
        for schema in self.model.schemas.values():
            if table_name in schema.tables:
                return schema.tables[table_name].is_asset()
        return False

    def _localize_asset_row(self, row: dict[str, Any]) -> dict[str, Any]:
        """Replace URL with local path in asset table row.

        Args:
            row: Dictionary of column values.

        Returns:
            Updated dictionary with localized file path.
        """
        if "URL" in row and "Filename" in row:
            url = row.get("URL")
            if url and url in self._asset_map:
                row = dict(row)  # Copy to avoid mutating original
                row["Filename"] = self._asset_map[url]
        return row

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Read table data from CSV files.

        Nested datasets may produce multiple CSV files for the same table
        at different directory depths. This method yields rows from all of
        them so that the full dataset (including parent and child records)
        is loaded.

        Args:
            table: Table object or name.

        Yields:
            Dictionary per row with column names as keys.
        """
        table_name = self._get_table_name(table)
        csv_files = self._csv_cache.get(table_name)

        if not csv_files:
            logger.debug(f"No CSV file found for table {table_name}")
            return

        is_asset = self._is_asset_table(table_name)

        for csv_file in csv_files:
            if not csv_file.exists():
                continue
            with csv_file.open(newline="") as f:
                reader = csv.DictReader(f)
                for row in reader:
                    if is_asset and self._asset_map:
                        row = self._localize_asset_row(row)
                    yield row

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if CSV exists for table.

        Args:
            table: Table object or name.

        Returns:
            True if CSV file exists for this table.
        """
        table_name = self._get_table_name(table)
        return table_name in self._csv_cache

    def list_available_tables(self) -> list[str]:
        """List all CSV files in data directory.

        Returns:
            List of table names (without .csv extension).
        """
        return sorted(self._csv_cache.keys())

    def get_row_count(self, table: DerivaTable | str) -> int:
        """Get the number of rows across all CSV files for a table.

        Args:
            table: Table object or name.

        Returns:
            Number of data rows (excluding headers).
        """
        table_name = self._get_table_name(table)
        csv_files = self._csv_cache.get(table_name)

        if not csv_files:
            return 0

        total = 0
        for csv_file in csv_files:
            if csv_file.exists():
                with csv_file.open(newline="") as f:
                    # Count lines minus header
                    total += sum(1 for _ in f) - 1
        return total

__init__

__init__(
    bag_path: Path,
    model: Model | None = None,
    asset_localization: bool = True,
)

Initialize from a bag path.

Parameters:

Name Type Description Default
bag_path Path

Path to BDBag directory.

required
model Model | None

Optional ERMrest Model for schema info. If not provided, will try to load from bag's schema.json.

None
asset_localization bool

Whether to localize asset URLs to local paths using fetch.txt mapping.

True
Source code in src/deriva_ml/model/data_sources.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(
    self,
    bag_path: Path,
    model: Model | None = None,
    asset_localization: bool = True,
):
    """Initialize from a bag path.

    Args:
        bag_path: Path to BDBag directory.
        model: Optional ERMrest Model for schema info. If not provided,
            will try to load from bag's schema.json.
        asset_localization: Whether to localize asset URLs to local paths
            using fetch.txt mapping.
    """
    self.bag_path = Path(bag_path)
    self.data_path = self.bag_path / "data"

    # Load model if not provided
    if model is None:
        schema_file = self.data_path / "schema.json"
        if schema_file.exists():
            self.model = Model.fromfile("file-system", schema_file)
        else:
            self.model = None
            logger.warning(f"No schema.json found in {self.bag_path}")
    else:
        self.model = model

    # Build asset map for URL localization
    self._asset_map = self._build_asset_map() if asset_localization else {}

    # Cache of table name -> list of csv file paths (multiple paths for nested datasets)
    self._csv_cache: dict[str, list[Path]] = {}
    self._build_csv_cache()

get_row_count

get_row_count(
    table: Table | str,
) -> int

Get the number of rows across all CSV files for a table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
int

Number of data rows (excluding headers).

Source code in src/deriva_ml/model/data_sources.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_row_count(self, table: DerivaTable | str) -> int:
    """Get the number of rows across all CSV files for a table.

    Args:
        table: Table object or name.

    Returns:
        Number of data rows (excluding headers).
    """
    table_name = self._get_table_name(table)
    csv_files = self._csv_cache.get(table_name)

    if not csv_files:
        return 0

    total = 0
    for csv_file in csv_files:
        if csv_file.exists():
            with csv_file.open(newline="") as f:
                # Count lines minus header
                total += sum(1 for _ in f) - 1
    return total

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Read table data from CSV files.

Nested datasets may produce multiple CSV files for the same table at different directory depths. This method yields rows from all of them so that the full dataset (including parent and child records) is loaded.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Read table data from CSV files.

    Nested datasets may produce multiple CSV files for the same table
    at different directory depths. This method yields rows from all of
    them so that the full dataset (including parent and child records)
    is loaded.

    Args:
        table: Table object or name.

    Yields:
        Dictionary per row with column names as keys.
    """
    table_name = self._get_table_name(table)
    csv_files = self._csv_cache.get(table_name)

    if not csv_files:
        logger.debug(f"No CSV file found for table {table_name}")
        return

    is_asset = self._is_asset_table(table_name)

    for csv_file in csv_files:
        if not csv_file.exists():
            continue
        with csv_file.open(newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                if is_asset and self._asset_map:
                    row = self._localize_asset_row(row)
                yield row

has_table

has_table(table: Table | str) -> bool

Check if CSV exists for table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
bool

True if CSV file exists for this table.

Source code in src/deriva_ml/model/data_sources.py
244
245
246
247
248
249
250
251
252
253
254
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if CSV exists for table.

    Args:
        table: Table object or name.

    Returns:
        True if CSV file exists for this table.
    """
    table_name = self._get_table_name(table)
    return table_name in self._csv_cache

list_available_tables

list_available_tables() -> list[str]

List all CSV files in data directory.

Returns:

Type Description
list[str]

List of table names (without .csv extension).

Source code in src/deriva_ml/model/data_sources.py
256
257
258
259
260
261
262
def list_available_tables(self) -> list[str]:
    """List all CSV files in data directory.

    Returns:
        List of table names (without .csv extension).
    """
    return sorted(self._csv_cache.keys())

CatalogDataSource

DataSource implementation for remote Deriva catalog.

Fetches data via ERMrest API / datapath with pagination support.

Example

catalog = server.connect_ermrest(catalog_id) source = CatalogDataSource(catalog, schemas=['domain', 'deriva-ml'])

List available tables

print(source.list_available_tables())

Get data for a table

for row in source.get_table_data("Image"): print(row["Filename"])

Source code in src/deriva_ml/model/data_sources.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
class CatalogDataSource:
    """DataSource implementation for remote Deriva catalog.

    Fetches data via ERMrest API / datapath with pagination support.

    Example:
        catalog = server.connect_ermrest(catalog_id)
        source = CatalogDataSource(catalog, schemas=['domain', 'deriva-ml'])

        # List available tables
        print(source.list_available_tables())

        # Get data for a table
        for row in source.get_table_data("Image"):
            print(row["Filename"])
    """

    def __init__(
        self,
        catalog: ErmrestCatalog,
        schemas: list[str],
        batch_size: int = 1000,
    ):
        """Initialize from catalog connection.

        Args:
            catalog: ERMrest catalog connection.
            schemas: Schemas to fetch data from.
            batch_size: Number of rows per API request.
        """
        self.catalog = catalog
        self.schemas = schemas
        self.batch_size = batch_size
        self._pb = catalog.getPathBuilder()
        self._model = catalog.getCatalogModel()

    def _get_table_info(self, table: DerivaTable | str) -> tuple[str, str] | None:
        """Get schema and table name for a table.

        Args:
            table: Table object or name.

        Returns:
            Tuple of (schema_name, table_name) or None if not found.
        """
        if isinstance(table, DerivaTable):
            return table.schema.name, table.name

        # Handle schema.table format
        if "." in table:
            parts = table.split(".")
            schema_name, table_name = parts[0], parts[1]
            if schema_name in self.schemas:
                return schema_name, table_name
            return None

        # Search schemas for table
        for schema_name in self.schemas:
            if schema_name in self._model.schemas:
                schema = self._model.schemas[schema_name]
                if table in schema.tables:
                    return schema_name, table

        return None

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Fetch table data via ERMrest API.

        Uses pagination to handle large tables efficiently.

        Args:
            table: Table object or name.

        Yields:
            Dictionary per row with column names as keys.
        """
        table_info = self._get_table_info(table)
        if table_info is None:
            logger.warning(f"Table {table} not found in schemas {self.schemas}")
            return

        schema_name, table_name = table_info

        # Build path
        path = self._pb.schemas[schema_name].tables[table_name]

        # Paginated fetch using RID ordering
        last_rid = None
        while True:
            # Build query with optional RID filter
            query = path.entities()
            if last_rid is not None:
                query = query.filter(path.RID > last_rid)

            # Fetch batch ordered by RID
            try:
                entities = list(query.sort(path.RID).fetch(limit=self.batch_size))
            except Exception as e:
                logger.error(f"Error fetching from {schema_name}.{table_name}: {e}")
                break

            if not entities:
                break

            for entity in entities:
                yield dict(entity)

            # Track last RID for pagination
            last_rid = entities[-1]["RID"]

            if len(entities) < self.batch_size:
                break

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if table exists in catalog.

        Args:
            table: Table object or name.

        Returns:
            True if table exists in configured schemas.
        """
        return self._get_table_info(table) is not None

    def list_available_tables(self) -> list[str]:
        """List all tables in configured schemas.

        Returns:
            List of fully-qualified table names (schema.table).
        """
        tables = []
        for schema_name in self.schemas:
            if schema_name in self._model.schemas:
                schema = self._model.schemas[schema_name]
                for table_name in schema.tables.keys():
                    tables.append(f"{schema_name}.{table_name}")
        return sorted(tables)

    def get_row_count(self, table: DerivaTable | str) -> int:
        """Get the number of rows in a table.

        Args:
            table: Table object or name.

        Returns:
            Number of rows in the table.
        """
        table_info = self._get_table_info(table)
        if table_info is None:
            return 0

        schema_name, table_name = table_info
        path = self._pb.schemas[schema_name].tables[table_name]

        try:
            # Use count aggregate
            result = path.aggregates(path.RID.cnt.alias("count")).fetch()
            return result[0]["count"] if result else 0
        except Exception as e:
            logger.error(f"Error counting {schema_name}.{table_name}: {e}")
            return 0

__init__

__init__(
    catalog: ErmrestCatalog,
    schemas: list[str],
    batch_size: int = 1000,
)

Initialize from catalog connection.

Parameters:

Name Type Description Default
catalog ErmrestCatalog

ERMrest catalog connection.

required
schemas list[str]

Schemas to fetch data from.

required
batch_size int

Number of rows per API request.

1000
Source code in src/deriva_ml/model/data_sources.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def __init__(
    self,
    catalog: ErmrestCatalog,
    schemas: list[str],
    batch_size: int = 1000,
):
    """Initialize from catalog connection.

    Args:
        catalog: ERMrest catalog connection.
        schemas: Schemas to fetch data from.
        batch_size: Number of rows per API request.
    """
    self.catalog = catalog
    self.schemas = schemas
    self.batch_size = batch_size
    self._pb = catalog.getPathBuilder()
    self._model = catalog.getCatalogModel()

get_row_count

get_row_count(
    table: Table | str,
) -> int

Get the number of rows in a table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
int

Number of rows in the table.

Source code in src/deriva_ml/model/data_sources.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def get_row_count(self, table: DerivaTable | str) -> int:
    """Get the number of rows in a table.

    Args:
        table: Table object or name.

    Returns:
        Number of rows in the table.
    """
    table_info = self._get_table_info(table)
    if table_info is None:
        return 0

    schema_name, table_name = table_info
    path = self._pb.schemas[schema_name].tables[table_name]

    try:
        # Use count aggregate
        result = path.aggregates(path.RID.cnt.alias("count")).fetch()
        return result[0]["count"] if result else 0
    except Exception as e:
        logger.error(f"Error counting {schema_name}.{table_name}: {e}")
        return 0

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Fetch table data via ERMrest API.

Uses pagination to handle large tables efficiently.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Fetch table data via ERMrest API.

    Uses pagination to handle large tables efficiently.

    Args:
        table: Table object or name.

    Yields:
        Dictionary per row with column names as keys.
    """
    table_info = self._get_table_info(table)
    if table_info is None:
        logger.warning(f"Table {table} not found in schemas {self.schemas}")
        return

    schema_name, table_name = table_info

    # Build path
    path = self._pb.schemas[schema_name].tables[table_name]

    # Paginated fetch using RID ordering
    last_rid = None
    while True:
        # Build query with optional RID filter
        query = path.entities()
        if last_rid is not None:
            query = query.filter(path.RID > last_rid)

        # Fetch batch ordered by RID
        try:
            entities = list(query.sort(path.RID).fetch(limit=self.batch_size))
        except Exception as e:
            logger.error(f"Error fetching from {schema_name}.{table_name}: {e}")
            break

        if not entities:
            break

        for entity in entities:
            yield dict(entity)

        # Track last RID for pagination
        last_rid = entities[-1]["RID"]

        if len(entities) < self.batch_size:
            break

has_table

has_table(table: Table | str) -> bool

Check if table exists in catalog.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
bool

True if table exists in configured schemas.

Source code in src/deriva_ml/model/data_sources.py
404
405
406
407
408
409
410
411
412
413
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if table exists in catalog.

    Args:
        table: Table object or name.

    Returns:
        True if table exists in configured schemas.
    """
    return self._get_table_info(table) is not None

list_available_tables

list_available_tables() -> list[str]

List all tables in configured schemas.

Returns:

Type Description
list[str]

List of fully-qualified table names (schema.table).

Source code in src/deriva_ml/model/data_sources.py
415
416
417
418
419
420
421
422
423
424
425
426
427
def list_available_tables(self) -> list[str]:
    """List all tables in configured schemas.

    Returns:
        List of fully-qualified table names (schema.table).
    """
    tables = []
    for schema_name in self.schemas:
        if schema_name in self._model.schemas:
            schema = self._model.schemas[schema_name]
            for table_name in schema.tables.keys():
                tables.append(f"{schema_name}.{table_name}")
    return sorted(tables)

ColumnDisplay dataclass

Bases: AnnotationBuilder

Column-display annotation builder.

Controls how column values are rendered.

Example

cd = ColumnDisplay() cd.default(ColumnDisplayOptions( ... pre_format=PreFormat(format="%.2f") ... ))

cd = ColumnDisplay() cd.default(ColumnDisplayOptions( ... markdown_pattern="Link" ... ))

Source code in src/deriva_ml/model/annotations.py
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
@dataclass
class ColumnDisplay(AnnotationBuilder):
    """Column-display annotation builder.

    Controls how column values are rendered.

    Example:
        >>> cd = ColumnDisplay()
        >>> cd.default(ColumnDisplayOptions(
        ...     pre_format=PreFormat(format="%.2f")
        ... ))
        >>>
        >>> # Markdown link
        >>> cd = ColumnDisplay()
        >>> cd.default(ColumnDisplayOptions(
        ...     markdown_pattern="[Link]({{{_value}}})"
        ... ))
    """
    tag = TAG_COLUMN_DISPLAY

    _contexts: dict[str, ColumnDisplayOptions | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        options: ColumnDisplayOptions | str
    ) -> "ColumnDisplay":
        """Set options for a context."""
        self._contexts[context] = options
        return self

    def default(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set default options."""
        return self.set_context(CONTEXT_DEFAULT, options)

    def compact(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set options for compact view."""
        return self.set_context(CONTEXT_COMPACT, options)

    def detailed(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set options for detailed view."""
        return self.set_context(CONTEXT_DETAILED, options)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, options in self._contexts.items():
            if isinstance(options, str):
                result[context] = options
            else:
                result[context] = options.to_dict()
        return result

compact

compact(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set options for compact view.

Source code in src/deriva_ml/model/annotations.py
1132
1133
1134
def compact(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set options for compact view."""
    return self.set_context(CONTEXT_COMPACT, options)

default

default(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set default options.

Source code in src/deriva_ml/model/annotations.py
1128
1129
1130
def default(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set default options."""
    return self.set_context(CONTEXT_DEFAULT, options)

detailed

detailed(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set options for detailed view.

Source code in src/deriva_ml/model/annotations.py
1136
1137
1138
def detailed(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set options for detailed view."""
    return self.set_context(CONTEXT_DETAILED, options)

set_context

set_context(
    context: str,
    options: ColumnDisplayOptions | str,
) -> "ColumnDisplay"

Set options for a context.

Source code in src/deriva_ml/model/annotations.py
1119
1120
1121
1122
1123
1124
1125
1126
def set_context(
    self,
    context: str,
    options: ColumnDisplayOptions | str
) -> "ColumnDisplay":
    """Set options for a context."""
    self._contexts[context] = options
    return self

ColumnDisplayOptions dataclass

Options for displaying a column in a specific context.

Parameters:

Name Type Description Default
pre_format PreFormat | None

Pre-formatting options

None
markdown_pattern str | None

Template for rendering

None
template_engine TemplateEngine | None

Template engine to use

None
column_order list[SortKey] | Literal[False] | None

Sort order, or False to disable

None
Source code in src/deriva_ml/model/annotations.py
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
@dataclass
class ColumnDisplayOptions:
    """Options for displaying a column in a specific context.

    Args:
        pre_format: Pre-formatting options
        markdown_pattern: Template for rendering
        template_engine: Template engine to use
        column_order: Sort order, or False to disable
    """
    pre_format: PreFormat | None = None
    markdown_pattern: str | None = None
    template_engine: TemplateEngine | None = None
    column_order: list[SortKey] | Literal[False] | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.pre_format is not None:
            result["pre_format"] = self.pre_format.to_dict()
        if self.markdown_pattern is not None:
            result["markdown_pattern"] = self.markdown_pattern
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.column_order is not None:
            if self.column_order is False:
                result["column_order"] = False
            else:
                result["column_order"] = [
                    k.to_dict() if isinstance(k, SortKey) else k
                    for k in self.column_order
                ]
        return result

DataLoader

Loads data into a database with FK ordering.

Phase 2 of the two-phase database creation pattern. Takes a SchemaORM (from Phase 1) and populates it from a DataSource.

Automatically orders tables by FK dependencies to ensure referential integrity during loading.

Example

Phase 1: Create ORM

orm = SchemaBuilder(model, schemas).build()

Phase 2: Fill with data from bag

source = BagDataSource(bag_path) loader = DataLoader(orm, source) counts = loader.load_tables() # All tables print(f"Loaded {sum(counts.values())} total rows")

Or load specific tables

counts = loader.load_tables(['Subject', 'Image'])

With progress callback

def on_progress(table, count, total): print(f"Loaded {table}: {count} rows") loader.load_tables(progress_callback=on_progress)

Source code in src/deriva_ml/model/data_loader.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class DataLoader:
    """Loads data into a database with FK ordering.

    Phase 2 of the two-phase database creation pattern. Takes a
    SchemaORM (from Phase 1) and populates it from a DataSource.

    Automatically orders tables by FK dependencies to ensure
    referential integrity during loading.

    Example:
        # Phase 1: Create ORM
        orm = SchemaBuilder(model, schemas).build()

        # Phase 2: Fill with data from bag
        source = BagDataSource(bag_path)
        loader = DataLoader(orm, source)
        counts = loader.load_tables()  # All tables
        print(f"Loaded {sum(counts.values())} total rows")

        # Or load specific tables
        counts = loader.load_tables(['Subject', 'Image'])

        # With progress callback
        def on_progress(table, count, total):
            print(f"Loaded {table}: {count} rows")
        loader.load_tables(progress_callback=on_progress)
    """

    def __init__(
        self,
        schema_orm: SchemaORM,
        data_source: DataSource,
    ):
        """Initialize the loader.

        Args:
            schema_orm: ORM structure from SchemaBuilder.
            data_source: Source of data to load (BagDataSource, CatalogDataSource, etc.).
        """
        self.orm = schema_orm
        self.source = data_source
        self.orderer = ForeignKeyOrderer(
            schema_orm.model,
            schema_orm.schemas,
        )

    def load_tables(
        self,
        tables: list[str | DerivaTable] | None = None,
        on_conflict: str = "ignore",
        batch_size: int = 1000,
        progress_callback: Callable[[str, int, int], None] | None = None,
    ) -> dict[str, int]:
        """Load data into specified tables with FK ordering.

        Tables are automatically ordered by FK dependencies to ensure
        referenced tables are populated first.

        Args:
            tables: Tables to load. If None, loads all tables that have
                data in the source.
            on_conflict: How to handle duplicate keys:
                - "ignore": Skip rows with duplicate keys (default)
                - "replace": Replace existing rows
                - "error": Raise error on duplicates
            batch_size: Number of rows per insert batch.
            progress_callback: Optional callback(table_name, rows_loaded, total_tables)
                called after each table is loaded.

        Returns:
            Dict mapping table names to row counts loaded.
        """
        # Determine tables to load
        if tables is None:
            # Get all tables that have data in source
            available = set(self.source.list_available_tables())
            # Filter to tables that exist in ORM
            orm_tables = set(self.orm.list_tables())

            # Match available tables to ORM tables
            tables_to_load = []
            for orm_table in orm_tables:
                # Check both qualified and unqualified names
                table_name = orm_table.split(".")[-1]
                if orm_table in available or table_name in available:
                    tables_to_load.append(orm_table)
        else:
            tables_to_load = [
                t if isinstance(t, str) else f"{t.schema.name}.{t.name}"
                for t in tables
            ]

        # Compute insertion order
        try:
            ordered_tables = self.orderer.get_insertion_order(tables_to_load)
        except ValueError as e:
            # Some tables might not be in the model, just use original order
            logger.warning(f"Could not compute FK ordering: {e}")
            ordered_tables = [
                self.orderer._to_table(t) if isinstance(t, str) else t
                for t in tables_to_load
                if self._table_exists(t)
            ]

        # Load in order
        counts = {}
        total_tables = len(ordered_tables)

        for i, table in enumerate(ordered_tables):
            table_key = f"{table.schema.name}.{table.name}"

            count = self._load_table(table, on_conflict, batch_size)
            counts[table_key] = count

            if progress_callback:
                progress_callback(table_key, count, total_tables)

            if count > 0:
                logger.info(f"Loaded {count} rows into {table_key}")

        return counts

    def _table_exists(self, table: str | DerivaTable) -> bool:
        """Check if table exists in ORM."""
        try:
            if isinstance(table, str):
                self.orm.find_table(table)
            else:
                self.orm.find_table(f"{table.schema.name}.{table.name}")
            return True
        except KeyError:
            return False

    def _load_table(
        self,
        table: DerivaTable,
        on_conflict: str,
        batch_size: int,
    ) -> int:
        """Load a single table.

        Args:
            table: Table to load.
            on_conflict: Conflict handling strategy.
            batch_size: Rows per batch.

        Returns:
            Number of rows loaded.
        """
        table_key = f"{table.schema.name}.{table.name}"

        # Find SQL table
        try:
            sql_table = self.orm.find_table(table_key)
        except KeyError:
            logger.warning(f"Table {table_key} not found in ORM")
            return 0

        # Check if source has data
        if not self.source.has_table(table):
            logger.debug(f"No data for {table_key} in source")
            return 0

        # Get data from source
        rows_loaded = 0
        batch = []

        with self.orm.engine.begin() as conn:
            for row in self.source.get_table_data(table):
                batch.append(row)

                if len(batch) >= batch_size:
                    rows_loaded += self._insert_batch(
                        conn, sql_table, batch, on_conflict
                    )
                    batch = []

            # Insert remaining rows
            if batch:
                rows_loaded += self._insert_batch(
                    conn, sql_table, batch, on_conflict
                )

        return rows_loaded

    def _insert_batch(
        self,
        conn: Any,
        sql_table: Any,
        rows: list[dict[str, Any]],
        on_conflict: str,
    ) -> int:
        """Insert a batch of rows.

        Args:
            conn: Database connection.
            sql_table: SQLAlchemy table.
            rows: List of row dictionaries.
            on_conflict: Conflict handling strategy.

        Returns:
            Number of rows inserted.
        """
        if not rows:
            return 0

        try:
            if on_conflict == "ignore":
                stmt = sqlite_insert(sql_table).on_conflict_do_nothing()
            elif on_conflict == "replace":
                # For SQLite, we need to specify all columns for upsert
                stmt = sqlite_insert(sql_table)
                update_cols = {
                    c.name: c for c in stmt.excluded
                    if c.name not in ("RID",)  # Don't update primary key
                }
                stmt = stmt.on_conflict_do_update(
                    index_elements=["RID"],
                    set_=update_cols,
                )
            else:
                stmt = sql_table.insert()

            conn.execute(stmt, rows)
            return len(rows)

        except Exception as e:
            logger.error(f"Error inserting into {sql_table.name}: {e}")
            if on_conflict == "error":
                raise
            return 0

    def load_table(
        self,
        table: str | DerivaTable,
        on_conflict: str = "ignore",
        batch_size: int = 1000,
    ) -> int:
        """Load a single table (without FK ordering).

        Use this when you know the dependencies are already satisfied
        or for loading a single table.

        Args:
            table: Table to load.
            on_conflict: Conflict handling strategy.
            batch_size: Rows per batch.

        Returns:
            Number of rows loaded.
        """
        if isinstance(table, str):
            table = self.orderer._to_table(table)

        return self._load_table(table, on_conflict, batch_size)

    def get_load_order(
        self,
        tables: list[str | DerivaTable] | None = None,
    ) -> list[str]:
        """Get the FK-safe load order for tables without loading.

        Useful for previewing or manually controlling load order.

        Args:
            tables: Tables to order. If None, orders all available.

        Returns:
            List of table names in safe insertion order.
        """
        if tables is None:
            available = self.source.list_available_tables()
            tables = [t for t in available if self._table_exists(t)]

        ordered = self.orderer.get_insertion_order(tables)
        return [f"{t.schema.name}.{t.name}" for t in ordered]

    def validate_load_order(
        self,
        tables: list[str | DerivaTable],
    ) -> list[tuple[str, str, str]]:
        """Validate that tables can be loaded in the given order.

        Args:
            tables: Ordered list of tables.

        Returns:
            List of FK violations as (table, missing_dep, fk_name) tuples.
            Empty if order is valid.
        """
        return self.orderer.validate_insertion_order(tables)

__init__

__init__(
    schema_orm: SchemaORM,
    data_source: DataSource,
)

Initialize the loader.

Parameters:

Name Type Description Default
schema_orm SchemaORM

ORM structure from SchemaBuilder.

required
data_source DataSource

Source of data to load (BagDataSource, CatalogDataSource, etc.).

required
Source code in src/deriva_ml/model/data_loader.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    schema_orm: SchemaORM,
    data_source: DataSource,
):
    """Initialize the loader.

    Args:
        schema_orm: ORM structure from SchemaBuilder.
        data_source: Source of data to load (BagDataSource, CatalogDataSource, etc.).
    """
    self.orm = schema_orm
    self.source = data_source
    self.orderer = ForeignKeyOrderer(
        schema_orm.model,
        schema_orm.schemas,
    )

get_load_order

get_load_order(
    tables: list[str | Table]
    | None = None,
) -> list[str]

Get the FK-safe load order for tables without loading.

Useful for previewing or manually controlling load order.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all available.

None

Returns:

Type Description
list[str]

List of table names in safe insertion order.

Source code in src/deriva_ml/model/data_loader.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def get_load_order(
    self,
    tables: list[str | DerivaTable] | None = None,
) -> list[str]:
    """Get the FK-safe load order for tables without loading.

    Useful for previewing or manually controlling load order.

    Args:
        tables: Tables to order. If None, orders all available.

    Returns:
        List of table names in safe insertion order.
    """
    if tables is None:
        available = self.source.list_available_tables()
        tables = [t for t in available if self._table_exists(t)]

    ordered = self.orderer.get_insertion_order(tables)
    return [f"{t.schema.name}.{t.name}" for t in ordered]

load_table

load_table(
    table: str | Table,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
) -> int

Load a single table (without FK ordering).

Use this when you know the dependencies are already satisfied or for loading a single table.

Parameters:

Name Type Description Default
table str | Table

Table to load.

required
on_conflict str

Conflict handling strategy.

'ignore'
batch_size int

Rows per batch.

1000

Returns:

Type Description
int

Number of rows loaded.

Source code in src/deriva_ml/model/data_loader.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def load_table(
    self,
    table: str | DerivaTable,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
) -> int:
    """Load a single table (without FK ordering).

    Use this when you know the dependencies are already satisfied
    or for loading a single table.

    Args:
        table: Table to load.
        on_conflict: Conflict handling strategy.
        batch_size: Rows per batch.

    Returns:
        Number of rows loaded.
    """
    if isinstance(table, str):
        table = self.orderer._to_table(table)

    return self._load_table(table, on_conflict, batch_size)

load_tables

load_tables(
    tables: list[str | Table]
    | None = None,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
    progress_callback: Callable[
        [str, int, int], None
    ]
    | None = None,
) -> dict[str, int]

Load data into specified tables with FK ordering.

Tables are automatically ordered by FK dependencies to ensure referenced tables are populated first.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to load. If None, loads all tables that have data in the source.

None
on_conflict str

How to handle duplicate keys: - "ignore": Skip rows with duplicate keys (default) - "replace": Replace existing rows - "error": Raise error on duplicates

'ignore'
batch_size int

Number of rows per insert batch.

1000
progress_callback Callable[[str, int, int], None] | None

Optional callback(table_name, rows_loaded, total_tables) called after each table is loaded.

None

Returns:

Type Description
dict[str, int]

Dict mapping table names to row counts loaded.

Source code in src/deriva_ml/model/data_loader.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def load_tables(
    self,
    tables: list[str | DerivaTable] | None = None,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
    progress_callback: Callable[[str, int, int], None] | None = None,
) -> dict[str, int]:
    """Load data into specified tables with FK ordering.

    Tables are automatically ordered by FK dependencies to ensure
    referenced tables are populated first.

    Args:
        tables: Tables to load. If None, loads all tables that have
            data in the source.
        on_conflict: How to handle duplicate keys:
            - "ignore": Skip rows with duplicate keys (default)
            - "replace": Replace existing rows
            - "error": Raise error on duplicates
        batch_size: Number of rows per insert batch.
        progress_callback: Optional callback(table_name, rows_loaded, total_tables)
            called after each table is loaded.

    Returns:
        Dict mapping table names to row counts loaded.
    """
    # Determine tables to load
    if tables is None:
        # Get all tables that have data in source
        available = set(self.source.list_available_tables())
        # Filter to tables that exist in ORM
        orm_tables = set(self.orm.list_tables())

        # Match available tables to ORM tables
        tables_to_load = []
        for orm_table in orm_tables:
            # Check both qualified and unqualified names
            table_name = orm_table.split(".")[-1]
            if orm_table in available or table_name in available:
                tables_to_load.append(orm_table)
    else:
        tables_to_load = [
            t if isinstance(t, str) else f"{t.schema.name}.{t.name}"
            for t in tables
        ]

    # Compute insertion order
    try:
        ordered_tables = self.orderer.get_insertion_order(tables_to_load)
    except ValueError as e:
        # Some tables might not be in the model, just use original order
        logger.warning(f"Could not compute FK ordering: {e}")
        ordered_tables = [
            self.orderer._to_table(t) if isinstance(t, str) else t
            for t in tables_to_load
            if self._table_exists(t)
        ]

    # Load in order
    counts = {}
    total_tables = len(ordered_tables)

    for i, table in enumerate(ordered_tables):
        table_key = f"{table.schema.name}.{table.name}"

        count = self._load_table(table, on_conflict, batch_size)
        counts[table_key] = count

        if progress_callback:
            progress_callback(table_key, count, total_tables)

        if count > 0:
            logger.info(f"Loaded {count} rows into {table_key}")

    return counts

validate_load_order

validate_load_order(
    tables: list[str | Table],
) -> list[tuple[str, str, str]]

Validate that tables can be loaded in the given order.

Parameters:

Name Type Description Default
tables list[str | Table]

Ordered list of tables.

required

Returns:

Type Description
list[tuple[str, str, str]]

List of FK violations as (table, missing_dep, fk_name) tuples.

list[tuple[str, str, str]]

Empty if order is valid.

Source code in src/deriva_ml/model/data_loader.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def validate_load_order(
    self,
    tables: list[str | DerivaTable],
) -> list[tuple[str, str, str]]:
    """Validate that tables can be loaded in the given order.

    Args:
        tables: Ordered list of tables.

    Returns:
        List of FK violations as (table, missing_dep, fk_name) tuples.
        Empty if order is valid.
    """
    return self.orderer.validate_insertion_order(tables)

DataSource

Bases: Protocol

Protocol for data sources that can fill a database.

Implementations provide data for populating SQLite tables from different sources (bags, remote catalogs, etc.).

This is used with DataLoader in Phase 2 of the two-phase pattern.

Source code in src/deriva_ml/model/data_sources.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@runtime_checkable
class DataSource(Protocol):
    """Protocol for data sources that can fill a database.

    Implementations provide data for populating SQLite tables from
    different sources (bags, remote catalogs, etc.).

    This is used with DataLoader in Phase 2 of the two-phase pattern.
    """

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Yield rows for a table as dictionaries.

        Args:
            table: Table object or name to get data for.

        Yields:
            Dictionary per row with column names as keys.
        """
        ...

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if this source has data for the table.

        Args:
            table: Table object or name to check.

        Returns:
            True if data is available for this table.
        """
        ...

    def list_available_tables(self) -> list[str]:
        """List tables with available data.

        Returns:
            List of table names (may include schema prefix).
        """
        ...

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Yield rows for a table as dictionaries.

Parameters:

Name Type Description Default
table Table | str

Table object or name to get data for.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Yield rows for a table as dictionaries.

    Args:
        table: Table object or name to get data for.

    Yields:
        Dictionary per row with column names as keys.
    """
    ...

has_table

has_table(table: Table | str) -> bool

Check if this source has data for the table.

Parameters:

Name Type Description Default
table Table | str

Table object or name to check.

required

Returns:

Type Description
bool

True if data is available for this table.

Source code in src/deriva_ml/model/data_sources.py
63
64
65
66
67
68
69
70
71
72
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if this source has data for the table.

    Args:
        table: Table object or name to check.

    Returns:
        True if data is available for this table.
    """
    ...

list_available_tables

list_available_tables() -> list[str]

List tables with available data.

Returns:

Type Description
list[str]

List of table names (may include schema prefix).

Source code in src/deriva_ml/model/data_sources.py
74
75
76
77
78
79
80
def list_available_tables(self) -> list[str]:
    """List tables with available data.

    Returns:
        List of table names (may include schema prefix).
    """
    ...

DerivaModel

Augmented interface to deriva model class.

This class provides a number of DerivaML specific methods that augment the interface in the deriva model class.

Attributes:

Name Type Description
model

ERMRest model for the catalog.

catalog ErmrestCatalog

ERMRest catalog for the model.

hostname

Hostname of the ERMRest server.

ml_schema

The ML schema name for the catalog.

domain_schemas

Frozenset of all domain schema names in the catalog.

default_schema

The default schema for table creation operations.

Source code in src/deriva_ml/model/catalog.py
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
class DerivaModel:
    """Augmented interface to deriva model class.

    This class provides a number of DerivaML specific methods that augment the interface in the deriva model class.

    Attributes:
        model: ERMRest model for the catalog.
        catalog: ERMRest catalog for the model.
        hostname: Hostname of the ERMRest server.
        ml_schema: The ML schema name for the catalog.
        domain_schemas: Frozenset of all domain schema names in the catalog.
        default_schema: The default schema for table creation operations.

    """

    def __init__(
        self,
        model: Model,
        ml_schema: str = ML_SCHEMA,
        domain_schemas: str | set[str] | None = None,
        default_schema: str | None = None,
    ):
        """Create and initialize a DerivaModel instance.

        This method will connect to a catalog and initialize schema configuration.
        This class is intended to be used as a base class on which domain-specific interfaces are built.

        Args:
            model: The ERMRest model for the catalog.
            ml_schema: The ML schema name.
            domain_schemas: Optional explicit set of domain schema names. If None,
                auto-detects all non-system schemas.
            default_schema: The default schema for table creation operations. If None
                and there is exactly one domain schema, that schema is used as default.
                If there are multiple domain schemas, default_schema must be specified.
        """
        self.model = model
        self.configuration = None
        self.catalog: ErmrestCatalog = self.model.catalog
        self.hostname = self.catalog.deriva_server.server if isinstance(self.catalog, ErmrestCatalog) else "localhost"

        self.ml_schema = ml_schema
        self._system_schemas = frozenset(SYSTEM_SCHEMAS | {ml_schema})

        # Determine domain schemas
        if domain_schemas is not None:
            if isinstance(domain_schemas, str):
                domain_schemas = {domain_schemas}
            self.domain_schemas = frozenset(domain_schemas)
        else:
            # Auto-detect all domain schemas
            self.domain_schemas = _get_domain_schemas(self.model.schemas.keys(), ml_schema)

        # Determine default schema for table creation
        if default_schema is not None:
            if default_schema not in self.domain_schemas:
                raise DerivaMLException(
                    f"default_schema '{default_schema}' is not in domain_schemas: {self.domain_schemas}"
                )
            self.default_schema = default_schema
        elif len(self.domain_schemas) == 1:
            # Single domain schema - use it as default
            self.default_schema = next(iter(self.domain_schemas))
        elif len(self.domain_schemas) == 0:
            # No domain schemas - default_schema will be None
            self.default_schema = None
        else:
            # Multiple domain schemas, no explicit default
            self.default_schema = None

    @classmethod
    def from_cached(
        cls,
        schema_dict: dict,
        *,
        catalog,
        ml_schema: str = ML_SCHEMA,
        domain_schemas: "str | set[str] | None" = None,
        default_schema: "str | None" = None,
    ) -> "DerivaModel":
        """Construct a DerivaModel from a cached ermrest /schema dict.

        No network is touched. The ``catalog`` argument is passed to
        deriva-py's ``Model(catalog, model_doc)`` constructor as the
        first positional argument; in offline mode it will be a
        :class:`~deriva_ml.core.catalog_stub.CatalogStub`, in online
        mode it is a real ``ErmrestCatalog``. ``DerivaModel.__init__``
        then reads the catalog back off ``model.catalog`` as usual.

        This replicates what ``Model.fromcatalog(catalog)`` does
        online — the online call fetches
        ``catalog.get("/schema").json()`` and passes the result to
        ``Model(catalog, dict)``. Here we pass in the already-cached
        dict from :class:`~deriva_ml.core.schema_cache.SchemaCache`.

        Args:
            schema_dict: The JSON payload from a previous
                ``catalog.get('/schema').json()`` call, as persisted
                by ``SchemaCache``.
            catalog: The catalog object to associate with the model.
                Pass a real ``ErmrestCatalog`` online, or a
                ``CatalogStub`` offline.
            ml_schema: ML schema name (default ``"deriva-ml"``).
            domain_schemas: Optional explicit set of domain schema
                names. If None, auto-detects all non-system schemas
                from the cached dict.
            default_schema: Optional default schema name.

        Returns:
            A ``DerivaModel`` wrapping a deriva-py ``Model``
            reconstructed from the dict.
        """
        from deriva.core.ermrest_model import Model

        # Model.__init__(catalog, model_doc) stores catalog as
        # self._catalog and exposes it via the .catalog property;
        # DerivaModel.__init__ then reads self.model.catalog.
        model = Model(catalog, schema_dict)
        return cls(
            model,
            ml_schema=ml_schema,
            domain_schemas=domain_schemas,
            default_schema=default_schema,
        )

    def is_system_schema(self, schema_name: str) -> bool:
        """Check if a schema is a system or ML schema.

        Args:
            schema_name: Name of the schema to check.

        Returns:
            True if the schema is a system or ML schema.
        """
        return _is_system_schema(schema_name, self.ml_schema)

    def is_domain_schema(self, schema_name: str) -> bool:
        """Check if a schema is a domain schema.

        Args:
            schema_name: Name of the schema to check.

        Returns:
            True if the schema is a domain schema.
        """
        return schema_name in self.domain_schemas

    def _require_default_schema(self) -> str:
        """Get default schema, raising an error if not set.

        Returns:
            The default schema name.

        Raises:
            DerivaMLException: If default_schema is not set.
        """
        if self.default_schema is None:
            raise DerivaMLException(
                f"No default_schema set. With multiple domain schemas {self.domain_schemas}, "
                "you must either specify a default_schema when creating DerivaML or "
                "pass an explicit schema parameter to this method."
            )
        return self.default_schema

    def refresh_model(self) -> None:
        self.model = self.catalog.getCatalogModel()

    @property
    def chaise_config(self) -> dict[str, Any]:
        """Return the chaise configuration."""
        return self.model.chaise_config

    def get_schema_description(self, include_system_columns: bool = False) -> dict[str, Any]:
        """Return a JSON description of the catalog schema structure.

        Provides a structured representation of the domain and ML schemas including
        tables, columns, foreign keys, and relationships. Useful for understanding
        the data model structure programmatically.

        Args:
            include_system_columns: If True, include RID, RCT, RMT, RCB, RMB columns.
                Default False to reduce output size.

        Returns:
            Dictionary with schema structure:
            {
                "domain_schemas": ["schema_name1", "schema_name2"],
                "default_schema": "schema_name1",
                "ml_schema": "deriva-ml",
                "schemas": {
                    "schema_name": {
                        "tables": {
                            "TableName": {
                                "comment": "description",
                                "is_vocabulary": bool,
                                "is_asset": bool,
                                "is_association": bool,
                                "columns": [...],
                                "foreign_keys": [...],
                                "features": [...]
                            }
                        }
                    }
                }
            }
        """
        system_columns = {"RID", "RCT", "RMT", "RCB", "RMB"}
        result = {
            "domain_schemas": sorted(self.domain_schemas),
            "default_schema": self.default_schema,
            "ml_schema": self.ml_schema,
            "schemas": {},
        }

        # Include all domain schemas and the ML schema
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if not schema:
                continue

            schema_info = {"tables": {}}

            for table_name, table in schema.tables.items():
                # Get columns
                columns = []
                for col in table.columns:
                    if not include_system_columns and col.name in system_columns:
                        continue
                    columns.append(
                        {
                            "name": col.name,
                            "type": str(col.type.typename),
                            "nullok": col.nullok,
                            "comment": col.comment or "",
                        }
                    )

                # Get foreign keys
                foreign_keys = []
                for fk in table.foreign_keys:
                    fk_cols = [c.name for c in fk.foreign_key_columns]
                    ref_cols = [c.name for c in fk.referenced_columns]
                    foreign_keys.append(
                        {
                            "columns": fk_cols,
                            "referenced_table": f"{fk.pk_table.schema.name}.{fk.pk_table.name}",
                            "referenced_columns": ref_cols,
                        }
                    )

                # Get features if this is a domain table
                features = []
                if self.is_domain_schema(schema_name):
                    try:
                        for f in self.find_features(table):
                            features.append(
                                {
                                    "name": f.feature_name,
                                    "feature_table": f.feature_table.name,
                                }
                            )
                    except Exception as e:
                        logger.debug(f"Could not enumerate features for table {table.name}: {e}")

                table_info = {
                    "comment": table.comment or "",
                    "is_vocabulary": self.is_vocabulary(table),
                    "is_asset": self.is_asset(table),
                    "is_association": bool(self.is_association(table)),
                    "columns": columns,
                    "foreign_keys": foreign_keys,
                }
                if features:
                    table_info["features"] = features

                schema_info["tables"][table_name] = table_info

            result["schemas"][schema_name] = schema_info

        return result

    def __getattr__(self, name: str) -> Any:
        # Called only if `name` is not found in Manager.  Delegate attributes to model class.
        return getattr(self.model, name)

    def name_to_table(self, table: TableInput) -> Table:
        """Return the table object corresponding to the given table name.

        Searches domain schemas first (in sorted order), then ML schema, then WWW.
        If the table name appears in more than one schema, returns the first match.

        Args:
          table: A ERMRest table object or a string that is the name of the table.

        Returns:
          Table object.

        Raises:
          DerivaMLException: If the table doesn't exist in any searchable schema.
        """
        if isinstance(table, Table):
            return table

        # Search domain schemas (sorted for deterministic order), then ML schema, then WWW
        search_order = [*sorted(self.domain_schemas), self.ml_schema, "WWW"]
        for sname in search_order:
            if sname not in self.model.schemas:
                continue
            s = self.model.schemas[sname]
            if table in s.tables:
                return s.tables[table]
        raise DerivaMLException(f"The table {table} doesn't exist.")

    def is_vocabulary(self, table_name: TableInput) -> bool:
        """Check if a given table is a controlled vocabulary table.

        Delegates to ``Table.is_vocabulary()`` in deriva-py, which enforces both
        the required column names AND their types (ermrest_curie, ermrest_uri,
        text, markdown). The type check is stricter than a column-name-only
        check — a table with an ``ID`` column of the wrong type correctly
        returns False here where the legacy name-only implementation would
        have returned True.

        Mirrors :meth:`is_asset`, which already delegates to ``Table.is_asset()``.

        Args:
            table_name: An ERMrest Table object or the name of the table.

        Returns:
            True if the table has the structure of a controlled vocabulary,
            False otherwise.

        Raises:
            DerivaMLException: if the table doesn't exist.
        """
        table = self.name_to_table(table_name)
        return table.is_vocabulary()

    def vocab_columns(self, table_name: TableInput) -> dict[str, str]:
        """Return mapping from canonical vocab column name to actual column name.

        Canonical names are TitleCase (Name, ID, URI, Description, Synonyms).
        Actual names reflect the table's schema — could be lowercase for
        FaceBase-style catalogs or TitleCase for DerivaML-native tables.

        Args:
            table_name: A table object or the name of the table.

        Returns:
            Dict mapping canonical name to actual column name in the table.
            E.g. ``{"Name": "name", "ID": "id", ...}`` for FaceBase tables
            or ``{"Name": "Name", "ID": "ID", ...}`` for DerivaML tables.
        """
        table = self.name_to_table(table_name)
        col_map = {c.name.upper(): c.name for c in table.columns}
        return {canon: col_map[canon.upper()] for canon in ("Name", "ID", "URI", "Description", "Synonyms")}

    def is_association(
        self,
        table_name: str | Table,
        unqualified: bool = True,
        pure: bool = True,
        min_arity: int = 2,
        max_arity: int = 2,
    ) -> bool | set[str] | int:
        """Check the specified table to see if it is an association table.

        Args:
            table_name: param unqualified:
            pure: return: (Default value = True)
            table_name: str | Table:
            unqualified:  (Default value = True)

        Returns:


        """
        table = self.name_to_table(table_name)
        return table.is_association(unqualified=unqualified, pure=pure, min_arity=min_arity, max_arity=max_arity)

    def find_association(self, table1: Table | str, table2: Table | str) -> tuple[Table, Column, Column]:
        """Given two tables, return an association table that connects the two and the two columns used to link them..

        Raises:
            DerivaML exception if there is either not an association table or more than one association table.
        """
        table1 = self.name_to_table(table1)
        table2 = self.name_to_table(table2)

        tables = [
            (a.table, a.self_fkey.columns[0].name, other_key.columns[0].name)
            for a in table1.find_associations(pure=False)
            if len(a.other_fkeys) == 1 and (other_key := a.other_fkeys.pop()).pk_table == table2
        ]

        if len(tables) == 1:
            return tables[0]
        elif len(tables) == 0:
            raise DerivaMLException(f"No association tables found between {table1.name} and {table2.name}.")
        else:
            raise DerivaMLException(
                f"There are {len(tables)} association tables between {table1.name} and {table2.name}."
            )

    def is_asset(self, table_name: TableInput) -> bool:
        """True if the specified table is a proper asset table.

        Delegates to Table.is_asset() from deriva-py which checks:
        - Required columns exist (URL, Filename, Length, MD5)
        - URL, Length, MD5 are NOT NULL
        - URL has the asset annotation

        Args:
            table_name: str | Table

        Returns:
            True if the specified table is a proper asset table.
        """
        table = self.name_to_table(table_name)
        return table.is_asset()

    def find_assets(self, with_metadata: bool = False) -> list[Table]:
        """Return the list of asset tables in the current model"""
        return [t for s in self.model.schemas.values() for t in s.tables.values() if self.is_asset(t)]

    def find_vocabularies(self) -> list[Table]:
        """Return a list of all controlled vocabulary tables in domain and ML schemas."""
        tables = []
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if schema:
                tables.extend(t for t in schema.tables.values() if self.is_vocabulary(t))
        return tables

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def find_features(self, table: TableInput | None = None) -> Iterable[Feature]:
        """List features in the catalog.

        If a table is specified, returns only features for that table.
        If no table is specified, returns all features across all tables in the catalog.

        Args:
            table: Optional table to find features for. If None, returns all features
                in the catalog.

        Returns:
            An iterable of Feature instances describing the features.
        """

        def is_feature(a: FindAssociationResult) -> bool:
            """Check if association represents a feature.

            Args:
                a: Association result to check
            Returns:
                bool: True if association represents a feature
            """
            return {
                "Feature_Name",
                "Execution",
                a.self_fkey.foreign_key_columns[0].name,
            }.issubset({c.name for c in a.table.columns})

        def find_table_features(t: Table) -> list[Feature]:
            """Find all features for a single table."""
            return [
                Feature(a, self) for a in t.find_associations(min_arity=3, max_arity=3, pure=False) if is_feature(a)
            ]

        if table is not None:
            # Find features for a specific table
            return find_table_features(self.name_to_table(table))
        else:
            # Find all features across all domain and ML schema tables
            features: list[Feature] = []
            for schema_name in [*self.domain_schemas, self.ml_schema]:
                schema = self.model.schemas.get(schema_name)
                if schema:
                    for t in schema.tables.values():
                        features.extend(find_table_features(t))
            return features

    def lookup_feature(self, table: TableInput, feature_name: str) -> Feature:
        """Lookup the named feature associated with the provided table.

        Args:
            table: param feature_name:
            table: str | Table:
            feature_name: str:

        Returns:
            A Feature class that represents the requested feature.

        Raises:
          DerivaMLException: If the feature cannot be found.
        """
        table = self.name_to_table(table)
        try:
            return [f for f in self.find_features(table) if f.feature_name == feature_name][0]
        except IndexError:
            raise DerivaMLException(f"Feature {table.name}:{feature_name} doesn't exist.")

    def asset_metadata(self, table: str | Table) -> set[str]:
        """Return the metadata columns for an asset table."""

        table = self.name_to_table(table)

        if not self.is_asset(table):
            raise DerivaMLTableTypeError("asset table", table.name)
        return {c.name for c in table.columns} - DerivaAssetColumns

    def asset_metadata_columns(self, table: str | Table) -> list[Column]:
        """Return Column objects for the asset-metadata columns of ``table``.

        Like :meth:`asset_metadata` but returns the :class:`Column`
        instances (not just names) so callers can inspect attributes
        such as ``nullok``. Results are sorted by column name for
        deterministic iteration.

        Args:
            table: Asset table name or Table object.

        Returns:
            Sorted list of Column objects.

        Raises:
            DerivaMLTableTypeError: If ``table`` is not an asset table.
        """
        table = self.name_to_table(table)
        if not self.is_asset(table):
            raise DerivaMLTableTypeError("asset table", table.name)
        return sorted(
            (c for c in table.columns if c.name not in DerivaAssetColumns),
            key=lambda c: c.name,
        )

    def apply(self) -> None:
        """Call ERMRestModel.apply"""
        if self.catalog == "file-system":
            raise DerivaMLException("Cannot apply() to non-catalog model.")
        else:
            self.model.apply()

    def is_dataset_rid(self, rid: RID, deleted: bool = False) -> bool:
        """Check if a given RID is a dataset RID."""
        try:
            rid_info = self.model.catalog.resolve_rid(rid, self.model)
        except KeyError as _e:
            raise DerivaMLException(f"Invalid RID {rid}")
        if rid_info.table.name != "Dataset":
            return False
        elif deleted:
            # Got a dataset rid. Now check to see if its deleted or not.
            return True
        else:
            return not list(rid_info.datapath.entities().fetch())[0]["Deleted"]

    def list_dataset_element_types(self) -> list[Table]:
        """
        Lists the data types of elements contained within a dataset.

        This method analyzes the dataset and identifies the data types for all
        elements within it. It is useful for understanding the structure and
        content of the dataset and allows for better manipulation and usage of its
        data.

        Returns:
            list[str]: A list of strings where each string represents a data type
            of an element found in the dataset.

        """

        dataset_table = self.name_to_table("Dataset")

        def is_domain_or_dataset_table(table: Table) -> bool:
            return self.is_domain_schema(table.schema.name) or table.name == dataset_table.name

        return [
            t
            for a in dataset_table.find_associations()
            if is_domain_or_dataset_table(t := a.other_fkeys.pop().pk_table)
        ]

    def _is_association_table(self, name_or_table: str | Table) -> bool:
        """Check if a table is an M:N association (link) table.

        An association table (like ``Dataset_Image`` linking ``Dataset``
        and ``Image``) has exactly two domain FKs pointing at the tables
        it links. Denormalization treats such tables as **transparent
        intermediates**: they're joined through but their columns are
        excluded from the output unless the caller explicitly lists them
        in ``include_tables``.

        **Topology, not purity**: association-ness is determined by the
        FK arity alone, not by whether the table also carries metadata
        columns. Real Deriva linkage tables routinely carry annotation
        data (``Role``, ``Ordinal``, ``Comment``, etc.) while remaining
        semantically M:N bridges — the check must permit them. If the
        user wants those metadata columns in the output, they add the
        table to ``include_tables`` and it's no longer treated as
        transparent (see the ``transparent_intermediates`` logic in
        :meth:`Denormalizer.describe`).

        Stricter than ermrest's built-in ``Table.is_association()`` in
        one direction (we ignore the system FKs RCB/RMB → ERMrest_Client,
        so a 3-arg "association" in ermrest's eyes is usually a real
        M:N table in ours), looser in another (we don't require purity).

        Extracted from a nested function in :meth:`_build_join_tree` so
        the denormalization planner can also use it.

        Args:
            name_or_table: table name (looked up via
                :meth:`name_to_table`) or a :class:`Table` instance.

        Returns:
            ``True`` if the table has exactly 2 domain FKs.

        Example::

            model._is_association_table("Dataset_Image")       # True
            model._is_association_table("Dataset_Image_Role")  # True — extra Role col OK
            model._is_association_table("Image")               # False (has ≤1 FK)
            model._is_association_table("Observation")         # False (has 1 FK)
        """
        try:
            tbl = name_or_table if hasattr(name_or_table, "foreign_keys") else self.name_to_table(name_or_table)
            fks = list(tbl.foreign_keys)
            # Domain FKs exclude the system FKs to ERMrest_Client /
            # ERMrest_Group that every table carries (for RCB/RMB).
            domain_fks = [fk for fk in fks if fk.pk_table.name not in ("ERMrest_Client", "ERMrest_Group")]
            # Association-ness is pure FK-arity topology. Metadata
            # columns on the link table (Role, Ordinal, etc.) don't
            # disqualify it — the user can pull them into output by
            # naming the table in include_tables.
            return len(domain_fks) == 2
        except Exception:
            return False

    def _fk_neighbors(self, table: str | Table) -> set[Table]:
        """Return FK-neighbor tables of *table* (outbound + inbound, deduplicated).

        The undirected FK-adjacency primitive used by schema traversal.
        Follows both ``table.foreign_keys`` (outbound: tables *table*
        points at) and ``table.referenced_by`` (inbound: tables that
        point at *table*), filters to valid schemas (``domain_schemas ∪
        {ml_schema}``), and deduplicates so that multiple FKs between
        the same two tables count as one edge.

        **Direction-agnostic**: use :meth:`_downstream_fk_sources` for
        the directional (inbound-only) variant when you need to
        distinguish upstream from downstream.

        Extracted from a nested ``find_arcs`` in :meth:`_schema_to_paths`
        so the denormalization planner can reuse it as the FK-traversal
        primitive.

        Args:
            table: table name (looked up via :meth:`name_to_table`) or
                :class:`Table` instance.

        Returns:
            Set of :class:`Table` objects reachable from *table* via one
            FK arc (either direction), deduplicated by target.

        Example::

            # For Image, which has Image.Subject → Subject and is
            # referenced by Dataset_Image.Image:
            model._fk_neighbors("Image")
            # {<Table Subject>, <Table Dataset_Image>}
        """
        tbl = table if hasattr(table, "foreign_keys") else self.name_to_table(table)
        valid_schemas = self.domain_schemas | {self.ml_schema}
        # Outbound edges: tables this table's FKs point at.
        # Inbound edges: tables that have FKs pointing at this table.
        arc_list = [fk.pk_table for fk in tbl.foreign_keys] + [fk.table for fk in tbl.referenced_by]
        # Filter out system/auxiliary schemas (ERMrest_Client, public, etc.).
        arc_list = [t for t in arc_list if t.schema.name in valid_schemas]
        # Deduplicate: multi-FK targets (e.g., two FKs pointing at the
        # same table) should count as one neighbor. Downstream callers
        # handle specific FK selection via :meth:`_table_relationship`.
        seen: set[Table] = set()
        deduped: list[Table] = []
        for t in arc_list:
            if t not in seen:
                seen.add(t)
                deduped.append(t)
        return set(deduped)

    def _build_join_tree(
        self,
        element_name: str,
        include_tables: set[str],
        all_paths: list[list[Table]],
        via: set[str] | None = None,
    ) -> JoinNode:
        """Build a JoinTree rooted at *element_name* that reaches all *include_tables*.

        The algorithm:

        1. Collect all FK paths from `_schema_to_paths()` that start at the element
           table and end at a table in *include_tables*.
        2. For each target table, pick the SHORTEST sub-path from the element.
           If a longer path exists but ALL its intermediates are in *include_tables*,
           prefer it (user disambiguated).  If multiple equally-short paths exist
           and cannot be disambiguated, raise an ambiguity error.
        3. Merge the selected paths into a tree rooted at the element.
        4. Mark association tables (``is_association=True``) so their columns are
           excluded from output but they are still JOINed through.
        5. Set ``join_type="left"`` when the FK column is nullable.

        Args:
            element_name: The dataset element table (tree root), e.g. ``"Image"``.
            include_tables: Set of table names the user wants in the output.
            all_paths: All FK paths from ``_schema_to_paths()``.
            via: Optional set of table names the caller passed as
                ``via=`` — path-only routing hints. Intermediates in
                this set count as "covered" during disambiguation so the
                user can route through an intermediate without adding
                its columns to the output.

        Returns:
            A ``JoinNode`` tree rooted at the element table.

        Raises:
            DerivaMLException: If ambiguous paths cannot be resolved.
        """
        via = via or set()
        covering = include_tables | via
        element_table = self.name_to_table(element_name)

        # ── Step 1: collect sub-paths from element to each include_table ─────
        # Each "all_path" has the structure [Dataset, assoc, element, ..., endpoint].
        # We extract the sub-path starting from the element: [element, ..., endpoint].
        subpaths_by_target: dict[str, list[list[Table]]] = defaultdict(list)

        for path in all_paths:
            if len(path) < 3:
                continue
            if path[2].name != element_name:
                continue
            endpoint = path[-1].name
            if endpoint not in include_tables:
                continue
            # Sub-path from element onward
            sub = path[2:]  # [element, ..., endpoint]
            subpaths_by_target[endpoint].append(sub)

        # The element itself (self-path of length 1)
        if element_name in include_tables:
            subpaths_by_target.setdefault(element_name, []).append([element_table])

        # ── Step 2: for each target, pick the best path ──────────────────────
        selected_subpaths: dict[str, list[Table]] = {}

        for target, subpaths in subpaths_by_target.items():
            if target == element_name:
                # Self-path: no join needed
                selected_subpaths[target] = [element_table]
                continue

            # Deduplicate by table-name signature
            seen_sigs: set[tuple[str, ...]] = set()
            unique: list[list[Table]] = []
            for sp in subpaths:
                sig = tuple(t.name for t in sp)
                if sig not in seen_sigs:
                    seen_sigs.add(sig)
                    unique.append(sp)

            if len(unique) == 1:
                selected_subpaths[target] = unique[0]
                continue

            # Multiple paths — disambiguate.
            # Intermediates are tables between element (sp[0]) and endpoint (sp[-1]).
            path_intermediates = [tuple(t.name for t in sp[1:-1]) for sp in unique]

            # If all have identical intermediates, no ambiguity
            if len(set(path_intermediates)) <= 1:
                selected_subpaths[target] = unique[0]
                continue

            # A path is "selected" if all its non-association intermediates are
            # in include_tables.  Association tables (M:N link tables) are
            # infrastructure that the user shouldn't need to name explicitly —
            # they are transparently included in the join chain.
            #
            # We detect association tables via ``self._is_association_table``
            # (module-level method that ignores ERMrest system FKs).

            def _intermediates_covered(sp: list[Table], ints: tuple[str, ...]) -> bool:
                sp_tables = {t.name: t for t in sp}
                for t in ints:
                    if t in covering:
                        # In include_tables OR in via= — explicitly routed.
                        continue
                    tbl = sp_tables.get(t)
                    if tbl is not None and self._is_association_table(tbl):
                        continue  # transparent — doesn't need to be in include_tables
                    return False
                return True

            fully_covered = [
                (sp, ints) for sp, ints in zip(unique, path_intermediates) if _intermediates_covered(sp, ints)
            ]

            if len(fully_covered) == 1:
                sp, ints = fully_covered[0]
                if len(ints) > 0:
                    # User explicitly included intermediates
                    selected_subpaths[target] = sp
                    continue
                # Direct path (no intermediates) — check if there are indirect paths
                has_indirect = any(len(i) > 0 for i in path_intermediates)
                if not has_indirect:
                    selected_subpaths[target] = sp
                    continue
                # Direct FK alongside indirect — prefer direct (shortest)
                selected_subpaths[target] = sp
                continue

            if len(fully_covered) > 1:
                # Multiple fully-covered paths
                has_explicit = [(sp, ints) for sp, ints in fully_covered if len(ints) > 0]
                if len(has_explicit) == 1:
                    selected_subpaths[target] = has_explicit[0][0]
                    continue
                elif len(has_explicit) == 0:
                    # All direct paths — pick shortest
                    shortest = min(fully_covered, key=lambda x: len(x[0]))
                    selected_subpaths[target] = shortest[0]
                    continue
                else:
                    # Multiple explicit — prefer longest (most specific)
                    max_ints = max(len(ints) for _, ints in has_explicit)
                    longest = [sp for sp, ints in has_explicit if len(ints) == max_ints]
                    if len(longest) == 1:
                        selected_subpaths[target] = longest[0]
                        continue

            if len(fully_covered) == 0:
                # No path is fully covered.  Check if direct path exists.
                direct = [sp for sp, ints in zip(unique, path_intermediates) if len(ints) == 0]
                if len(direct) == 1:
                    selected_subpaths[target] = direct[0]
                    continue

            # Ambiguity error
            path_descriptions = []
            all_ints: set[str] = set()
            for sp, ints in zip(unique, path_intermediates):
                names = [t.name for t in sp]
                path_descriptions.append(" → ".join(names))
                all_ints.update(ints)

            suggestion_tables = all_ints - include_tables
            suggestion = ""
            if suggestion_tables:
                suggestion = (
                    f"\nInclude an intermediate table to disambiguate "
                    f"(e.g., add {', '.join(sorted(suggestion_tables))} to include_tables)."
                )

            raise DerivaMLException(
                f"Ambiguous path between {element_name} and {target}: "
                f"found {len(unique)} FK paths:\n" + "\n".join(f"  {d}" for d in path_descriptions) + suggestion
            )

        # ── Step 3: merge selected paths into a tree ─────────────────────────
        # Build the tree by inserting each selected sub-path into the tree.
        root = JoinNode(
            table=element_table,
            table_name=element_name,
            join_type="inner",
            fk_columns=None,
            is_association=bool(self.is_association(element_name)),
            children=[],
        )

        # Map table_name -> JoinNode for quick lookup during tree building
        node_map: dict[str, JoinNode] = {element_name: root}

        for target, subpath in selected_subpaths.items():
            if target == element_name:
                continue
            # subpath = [element, ..intermediate.., target]
            # Walk the subpath, creating nodes as needed
            for i in range(1, len(subpath)):
                child_table = subpath[i]
                child_name = child_table.name
                parent_table = subpath[i - 1]
                parent_name = parent_table.name

                if child_name in node_map:
                    continue  # Already in tree

                # Get FK column pairs
                col_pairs = self._table_relationship(parent_table, child_table)

                # Determine join type: LEFT for nullable FK columns
                join_type = "inner"
                for fk_col, pk_col in col_pairs:
                    if fk_col.nullok:
                        join_type = "left"
                        break

                node = JoinNode(
                    table=child_table,
                    table_name=child_name,
                    join_type=join_type,
                    fk_columns=col_pairs,
                    is_association=bool(self.is_association(child_name)),
                    children=[],
                )
                node_map[child_name] = node
                # Attach to parent
                if parent_name in node_map:
                    node_map[parent_name].children.append(node)
                else:
                    # Parent not yet in tree — this shouldn't happen since we
                    # process paths from element outward, but handle gracefully
                    logger.warning(f"Parent {parent_name} not in tree when adding {child_name}")

        return root

    # ------------------------------------------------------------------
    # Denormalization planner helpers (Rules 2, 5, 6)
    #
    # These methods compose ``_fk_neighbors`` / ``_schema_to_paths`` /
    # ``_is_association_table`` — they do NOT introduce new FK traversal.
    # ------------------------------------------------------------------

    def _downstream_fk_sources(self, table: str | Table) -> set[Table]:
        """Return tables that have an FK pointing AT *table* (directional downstream).

        Denormalization direction vocabulary:

        - **Upstream** = fewer rows per unit. Subject is upstream of Image
          because each Image has exactly one Subject.
        - **Downstream** = more rows per unit. Image is downstream of
          Subject because each Subject can have many Images.

        In ERMrest terms: if ``Image.Subject`` is an FK pointing at
        ``Subject.RID``, then Image is downstream of Subject — which
        means Image is in ``Subject.referenced_by``.

        This method returns direct downstream neighbors only — it does
        NOT do transparent association-table hopping. Callers that need
        "all reachable downstream tables, hopping through associations"
        should use :meth:`_outbound_reachable`.

        Compare with :meth:`_fk_neighbors`, which is direction-agnostic
        and returns both upstream and downstream neighbors.

        Args:
            table: table name (looked up via :meth:`name_to_table`) or
                :class:`Table` instance.

        Returns:
            Set of :class:`Table` objects whose FK points at *table*,
            filtered to the valid schemas (``domain_schemas ∪
            {ml_schema}``).

        Example::

            # Subject is pointed at by Image.Subject and Observation.Subject:
            model._downstream_fk_sources("Subject")
            # {<Table Image>, <Table Observation>}

            # Image is pointed at by Dataset_Image.Image:
            model._downstream_fk_sources("Image")
            # {<Table Dataset_Image>}
        """
        valid_schemas = self.domain_schemas | {self.ml_schema}
        tbl = table if hasattr(table, "foreign_keys") else self.name_to_table(table)
        targets: set[Table] = set()
        # Tables with FK pointing at us are downstream
        for fk in tbl.referenced_by:
            src = fk.table
            if src.schema.name not in valid_schemas:
                continue
            targets.add(src)
        return targets

    def _outbound_reachable(
        self,
        from_table: str,
        tables_in_set: set[str],
    ) -> set[str]:
        """Return tables in ``tables_in_set`` downstream of ``from_table``.

        BFS reachability over the FK graph in the one-to-many direction.
        Composes :meth:`_downstream_fk_sources` plus association-
        transparency logic — does NOT walk FKs directly.

        **Transparent association hops**: when the walker hits an
        association table (per :meth:`_is_association_table`) that isn't
        in ``tables_in_set``, it hops through it in BOTH directions —
        both the tables that point at the association (inbound) AND the
        tables the association's FKs point at (outbound). This lets
        ``A → assoc → B`` discover B from A even when A → assoc is an
        inbound FK and assoc → B is an outbound FK. Without this
        bidirectional hop, many-to-many relationships (Dataset ↔ Image
        via Dataset_Image) wouldn't be traversable.

        **Direction matters**: with ``Image.Subject → Subject.RID``:

        - ``_outbound_reachable('Subject', {'Image','Subject'})`` returns
          ``{'Image'}`` (Image is downstream of Subject).
        - ``_outbound_reachable('Image', {'Image','Subject'})`` returns
          ``set()`` (Subject is UPSTREAM of Image, not downstream).

        Args:
            from_table: starting table (the "upstream" side of the
                one-to-many relationship).
            tables_in_set: the subgraph — only tables in this set count
                as "destinations" in the result. Association tables
                outside the set are still traversable (transparent).

        Returns:
            Set of names in ``tables_in_set`` downstream of
            ``from_table`` (excluding ``from_table`` itself).

        Example::

            # Given schema: Image.Subject → Subject, Dataset ← Dataset_Image → Image
            subgraph = {"Image", "Subject"}
            model._outbound_reachable("Subject", subgraph)  # {"Image"}
            model._outbound_reachable("Image", subgraph)    # set()

            # With Dataset_Image as a transparent hop:
            subgraph = {"Dataset", "Image"}
            model._outbound_reachable("Dataset", subgraph)  # {"Image"}
        """
        seen_names: set[str] = set()
        visited: set[str] = set()
        stack: list[str] = [from_table]
        while stack:
            t = stack.pop()
            if t in visited:
                continue
            visited.add(t)
            try:
                tbl = self.name_to_table(t)
            except Exception:
                continue

            # When the current node is itself an association table AND it's
            # not the starting point, hop through both directions: both the
            # tables that point at it (referenced_by) AND the tables it
            # points to (foreign_keys). This is the "transparent bridge"
            # semantics — M:N link tables should be traversable in both
            # directions so that A→assoc→B discovers B from A.
            hopping_through_association = t != from_table and self._is_association_table(tbl)

            valid_schemas = self.domain_schemas | {self.ml_schema}
            neighbors: list[Table] = list(self._downstream_fk_sources(t))
            if hopping_through_association:
                # Add the association's outbound FK targets (the "other
                # side" of the M:N link) so we can see past the bridge.
                for fk in tbl.foreign_keys:
                    nxt = fk.pk_table
                    if nxt.schema.name in valid_schemas:
                        neighbors.append(nxt)

            for neighbor in neighbors:
                target_name = neighbor.name
                if target_name == from_table:
                    continue
                if target_name in tables_in_set:
                    seen_names.add(target_name)
                    # Continue only if this is itself an association (transparent)
                    if self._is_association_table(neighbor):
                        stack.append(target_name)
                elif self._is_association_table(neighbor):
                    # Transparent hop: continue through the association
                    stack.append(target_name)
                # else: non-requested, non-association — dead end
        return {t for t in seen_names if t in tables_in_set and t != from_table}

    def _find_sinks(
        self,
        include_tables: list[str],
        via: list[str] | None = None,
    ) -> list[str]:
        """Find sinks in the FK subgraph on ``include_tables ∪ via`` (Rule 2).

        A **sink** is a table in ``include_tables`` with no outbound FK
        (in the one-to-many / downstream sense) to any other table in
        the set. Intuition: the "deepest" table in the requested join —
        the one that receives FKs from others but doesn't have any
        others downstream. In star-schema denormalization, the sink is
        the natural ``row_per`` — one output row per sink row, with
        upstream columns hoisted.

        Composes :meth:`_outbound_reachable`; does not traverse FKs
        itself.

        Args:
            include_tables: requested tables — only these are candidates
                for the sink role (``via`` tables don't contribute columns).
            via: optional additional tables that participate in the
                subgraph for routing but aren't sink candidates.

        Returns:
            Sorted list of sink table names. Normally exactly one.
            Multiple sinks → caller should raise
            :class:`DerivaMLDenormalizeMultiLeaf`. Zero sinks → cycle,
            caller should raise :class:`DerivaMLDenormalizeNoSink`.

        Example::

            # Chain Subject ← Observation ← Image → sink is Image
            model._find_sinks(["Subject", "Observation", "Image"])
            # ["Image"]

            # Unrelated tables → multi-leaf (both are sinks)
            model._find_sinks(["Dataset", "Subject"])
            # ["Dataset", "Subject"]
        """
        via = via or []
        all_tables = set(include_tables) | set(via)
        # A sink is a requested table whose outbound-reach set, minus
        # itself, is empty — i.e., nothing else in the subgraph is
        # downstream of it.
        return sorted(
            t for t in all_tables if t in include_tables and not (self._outbound_reachable(t, all_tables) - {t})
        )

    def _determine_row_per(
        self,
        include_tables: list[str],
        via: list[str] | None,
        row_per: str | None,
    ) -> str:
        """Resolve the ``row_per`` table, implementing Rules 2 and 5.

        Two paths:

        - **Explicit** (``row_per`` not None): validate the caller's
          choice. ``row_per`` must be in ``include_tables``, and no
          table in ``include_tables`` may be downstream of it (Rule 5 —
          that would require aggregation, which the current engine
          doesn't do).
        - **Auto-infer** (``row_per is None``): apply Rule 2 via
          sink-finding. Expect exactly one sink.

        Args:
            include_tables: requested tables.
            via: optional path-only tables.
            row_per: caller's explicit leaf, or None to auto-infer.

        Returns:
            The resolved ``row_per`` table name — guaranteed to be in
            ``include_tables`` and free of downstream conflicts.

        Raises:
            ValueError: ``row_per`` is not in ``include_tables``.
            DerivaMLDenormalizeDownstreamLeaf: explicit ``row_per`` has
                downstream table(s) in ``include_tables`` (Rule 5).
            DerivaMLDenormalizeNoSink: no sink found (FK cycle in the
                subgraph — pathological).
            DerivaMLDenormalizeMultiLeaf: auto-inference finds more
                than one candidate sink (Rule 2).

        Example::

            model._determine_row_per(
                include_tables=["Subject", "Image"], via=[], row_per=None
            )
            # "Image" (auto-inferred — Image is the sink)

            # Rule 5: Subject with Image downstream is rejected.
            model._determine_row_per(
                include_tables=["Subject", "Image"], via=[], row_per="Subject"
            )
            # raises DerivaMLDenormalizeDownstreamLeaf
        """
        from deriva_ml.core.exceptions import (
            DerivaMLDenormalizeDownstreamLeaf,
            DerivaMLDenormalizeMultiLeaf,
            DerivaMLDenormalizeNoSink,
        )

        via = via or []
        all_tables = set(include_tables) | set(via)

        if row_per is not None:
            if row_per not in include_tables:
                raise ValueError(f"row_per={row_per!r} must be in include_tables={include_tables}")
            downstream = self._outbound_reachable(row_per, all_tables)
            downstream_in_inc = [t for t in include_tables if t in downstream and t != row_per]
            if downstream_in_inc:
                raise DerivaMLDenormalizeDownstreamLeaf(
                    row_per=row_per,
                    downstream_tables=sorted(downstream_in_inc),
                )
            return row_per

        sinks = self._find_sinks(include_tables, via)
        if not sinks:
            raise DerivaMLDenormalizeNoSink(
                f"No sink found in include_tables={include_tables}. The FK subgraph may contain a cycle."
            )
        if len(sinks) > 1:
            raise DerivaMLDenormalizeMultiLeaf(
                candidates=sinks,
                include_tables=list(include_tables),
            )
        return sinks[0]

    def _enumerate_paths(
        self,
        from_table: str,
        to_table: str,
        tables_in_set: set[str],
        max_depth: int = 6,
    ) -> list[list[str]]:
        """Enumerate simple FK paths from ``from_table`` to ``to_table``.

        **Delegates the DFS** to :meth:`_schema_to_paths` (the
        authoritative FK-graph enumerator — handles cycle detection,
        vocabulary termination, schema filtering, and multi-FK
        deduplication). Uses its ``stop_at`` kwarg so inner recursion
        frames can prune eagerly rather than emitting all prefixes and
        filtering at the top. **Do NOT write a fresh DFS here.**

        The only additional work is a **transparency filter**: a path
        is kept only if every intermediate table (non-endpoint nodes)
        is either in ``tables_in_set`` (the user's requested /
        via-routed set) or is a pure association table (which acts as
        a transparent bridge).

        Args:
            from_table: path start.
            to_table: path end.
            tables_in_set: ``include_tables ∪ via``. Paths passing
                through tables NOT in this set are accepted only if
                every intermediate is a pure association table.
            max_depth: forwarded to :meth:`_schema_to_paths` as a
                safety cap against pathological schemas.

        Returns:
            List of paths, each a list of table-name strings starting
            with ``from_table`` and ending with ``to_table``. Empty if
            no transparent-valid path exists.

        Example::

            # Diamond schema: Image → Subject direct AND Image → Observation → Subject.
            # With Observation in the set, both paths are valid:
            model._enumerate_paths("Image", "Subject", {"Image", "Subject", "Observation"})
            # [["Image", "Subject"], ["Image", "Observation", "Subject"]]

            # With only Image and Subject in the set, the multi-hop path
            # requires Observation as intermediate but it's not in the
            # set and not an association → only the direct path survives:
            model._enumerate_paths("Image", "Subject", {"Image", "Subject"})
            # [["Image", "Subject"]]
        """
        # Delegate the DFS — stop_at tells _schema_to_paths to only
        # keep paths ending at to_table (inner frames can prune early).
        paths = self._schema_to_paths(
            root=self.name_to_table(from_table),
            max_depth=max_depth,
            stop_at=to_table,
        )
        result: list[list[str]] = []
        for path in paths:
            names = [t.name for t in path]
            # Transparency filter: every intermediate must be either
            # requested (in tables_in_set) or a pure association.
            if all(mid in tables_in_set or self._is_association_table(mid) for mid in names[1:-1]):
                result.append(names)
        return result

    def _find_path_ambiguities(
        self,
        row_per: str,
        include_tables: list[str],
        via: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Enumerate path ambiguities between ``row_per`` and other requested tables (Rule 6).

        For each ``T`` in ``include_tables ∪ via`` (``T ≠ row_per``),
        enumerate all simple FK paths between ``row_per`` and ``T``
        using :meth:`_schema_to_paths` (full undirected DFS — we do
        NOT apply the transparency filter here, because we need to see
        the full picture to detect diamonds the user hasn't yet
        disambiguated).

        **User-signal disambiguation**: a path is considered "signaled"
        by the user if at least one of its intermediate tables is in
        ``include_tables ∪ via`` (pure association tables don't count —
        those are transparent). If exactly one path is signaled, the
        user has picked it and there's no ambiguity. If zero or >1 are
        signaled, we cannot silently choose, so an ambiguity is
        reported.

        This is distinct from :meth:`_enumerate_paths`, which applies
        the transparency filter to produce only "routable" paths given
        the current set. Here we want to see ALL candidates so we can
        warn about the diamond.

        Args:
            row_per: the leaf table (resolved earlier by
                :meth:`_determine_row_per`).
            include_tables: tables whose paths to ``row_per`` are checked.
            via: additional tables whose paths are checked (their columns
                aren't in the output, but they still participate in
                disambiguation).

        Returns:
            List of ambiguity dicts — empty when no ambiguities are
            detected. Each dict has:

            - ``from_table``: always ``row_per``.
            - ``to_table``: the ``T`` with multiple paths.
            - ``paths``: list of path lists (each path a list of table
              names, first element ``row_per``, last element ``T``).
            - ``suggested_intermediates``: non-endpoint tables that
              appear in at least one path but are not in
              ``include_tables`` and are not pure association tables
              — user could add any of these to ``include_tables`` or
              ``via`` to disambiguate.

        Example::

            # Diamond: Image→Subject direct AND Image→Observation→Subject.
            model._find_path_ambiguities(
                row_per="Image", include_tables=["Image", "Subject"]
            )
            # [{"from_table": "Image", "to_table": "Subject",
            #   "paths": [["Image", "Subject"],
            #             ["Image", "Observation", "Subject"]],
            #   "suggested_intermediates": ["Observation"]}]

            # Once Observation is added to include_tables, it "signals"
            # the multi-hop path → no ambiguity:
            model._find_path_ambiguities(
                row_per="Image", include_tables=["Image", "Observation", "Subject"]
            )
            # []
        """
        via = via or []
        all_tables = set(include_tables) | set(via)
        ambiguities: list[dict[str, Any]] = []

        for t in sorted(all_tables):
            if t == row_per:
                continue
            # Enumerate ALL simple paths (no transparency filter) — we need
            # the full picture to detect diamonds even when the user has not
            # requested the intermediate table.
            #
            # Note: we intentionally do NOT call ``_enumerate_paths`` here.
            # That helper applies a transparency filter (intermediates must
            # be requested or be association tables), which would mask the
            # very diamonds this rule must warn about. ``_enumerate_paths``
            # is for consumers who want only "routable" paths given the
            # current include_tables/via set.
            all_path_tables = self._schema_to_paths(
                root=self.name_to_table(row_per),
                max_depth=6,
                stop_at=t,
            )
            all_paths_named: list[list[str]] = [[tbl.name for tbl in p] for p in all_path_tables]
            unique = list({tuple(p): p for p in all_paths_named}.values())
            if len(unique) <= 1:
                continue

            # Monotonic-direction filter for diamond detection:
            # A genuine diamond has MULTIPLE paths that each constitute a
            # valid FK join chain — all-outbound (downstream) hops, with
            # association tables acting as transparent bridges. Paths that
            # change direction at an interior vertex are common-neighbor
            # shortcuts, not join alternatives. For example, with::
            #
            #     Image.Observation → Observation  (direct FK)
            #     Image.Subject → Subject           (direct FK)
            #     Observation.Subject → Subject     (direct FK)
            #
            # the undirected walk ``Image → Subject → Observation`` hops
            # Image.Subject downstream then Observation.Subject UPSTREAM
            # (Subject is a shared neighbor). This does not represent an
            # FK chain from Image to Observation — it represents a
            # co-occurrence via shared Subject, which is a materially
            # different query. We exclude such paths from ambiguity
            # detection so the direct FK Image→Observation isn't
            # spuriously flagged.
            #
            # Association tables remain transparent: the walker handles
            # them correctly via ``_is_association_table`` check inside
            # the direction test.
            def _edge_direction(a: str, b: str) -> str | None:
                """Return 'down' if a has a direct FK to b (outbound from
                a); 'up' if b has a direct FK to a (inbound to a); None
                if there's no direct FK between them."""
                try:
                    ta = self.name_to_table(a)
                    tb = self.name_to_table(b)
                except Exception:
                    return None
                for fk in ta.foreign_keys:
                    if fk.pk_table == tb:
                        return "down"
                for fk in tb.foreign_keys:
                    if fk.pk_table == ta:
                        return "up"
                return None

            def _is_downstream_chain(p: list[str]) -> bool:
                """Check that the path is all-downstream, treating pure
                association tables as transparent bridges. A transparent
                bridge Image ← assoc → Subject counts as a single
                downstream step (the assoc's referenced_by connects the
                two sides). Association tables at interior positions
                don't count as direction changes."""
                i = 0
                while i < len(p) - 1:
                    a, b = p[i], p[i + 1]
                    # If b is an interior association table, hop across
                    # it: count the A → assoc → C edge as a single
                    # transparent bridge and move two steps forward.
                    if i + 2 < len(p) and self._is_association_table(b):
                        # A → assoc → C: the bridge is legitimate
                        # regardless of internal direction; advance past.
                        i += 2
                        continue
                    d = _edge_direction(a, b)
                    if d != "down":
                        return False
                    i += 1
                return True

            downstream = [p for p in unique if _is_downstream_chain(p)]
            if len(downstream) <= 1:
                # Only 0 or 1 downstream paths means no genuine diamond;
                # other "paths" were common-neighbor shortcuts. Fall back
                # to the direct/signaled path and don't flag ambiguity.
                continue
            unique = downstream

            # Disambiguation rule:
            # - A path is "signaled" if at least one of its non-endpoint
            #   intermediates is in ``include_tables ∪ via`` (user explicitly
            #   routed through it). Association tables don't count — they're
            #   transparent and the user shouldn't need to name them.
            # - If exactly one path is signaled, the user has picked it → no
            #   ambiguity.
            # - Otherwise (0 or >1 signaled), we cannot silently choose →
            #   ambiguity.
            def _is_signaled(p: list[str]) -> bool:
                intermediates = p[1:-1]
                for mid in intermediates:
                    if mid in all_tables and not self._is_association_table(mid):
                        return True
                return False

            signaled = [p for p in unique if _is_signaled(p)]
            if len(signaled) == 1:
                # Exactly one user-signaled path — use it.
                continue

            # Ambiguity: either no user signal, or conflicting signals.
            reportable = signaled if len(signaled) > 1 else unique
            all_intermediates: set[str] = set()
            for p in reportable:
                for node in p[1:-1]:
                    if node not in include_tables and not self._is_association_table(node):
                        all_intermediates.add(node)
            ambiguities.append(
                {
                    "from_table": row_per,
                    "to_table": t,
                    "paths": reportable,
                    "suggested_intermediates": sorted(all_intermediates),
                }
            )
        return ambiguities

    def _prepare_wide_table(
        self,
        dataset,
        dataset_rid: RID,
        include_tables: list[str],
        *,
        row_per: str | None = None,
        via: list[str] | None = None,
    ) -> tuple[dict[str, Any], list[tuple], bool]:
        """Generate a join plan for denormalizing a dataset into a wide table.

        Uses a **JoinTree** approach that preserves path-specific structure:

        1. **Planner guards** -- validate ``row_per`` (Rule 2 / Rule 5) and
           check for path ambiguity (Rule 6) before any join work.
        2. **Path discovery** -- ``_schema_to_paths()`` discovers all FK paths
           from Dataset through the schema.
        3. **Path filtering & deduplication** -- keep only paths relevant to
           *include_tables*, dedup duplicate association table routes.
        4. **JoinTree construction** -- for each element type, build a tree
           rooted at the element.  Each node is a table to JOIN; association
           tables are in the tree (for JOIN) but excluded from output columns.
           Nullable FK columns produce LEFT JOINs.
        5. **Flatten to legacy format** -- convert the tree to the
           ``(path, join_conditions, join_types)`` tuple expected by
           the unified ``_denormalize_impl()`` in ``local_db/denormalize.py``.

        Args:
            dataset: A DatasetLike object (DatasetBag or Dataset).
            dataset_rid: RID of the dataset.
            include_tables: List of table names to include in the output.
            row_per: Explicit leaf table (one row per this table). If None,
                the sink is auto-inferred from include_tables.
            via: Additional tables used only for path routing (their columns
                are NOT included in the output).

        Returns:
            ``(element_tables, denormalized_columns, multi_schema)`` where:

            - **element_tables** -- ``dict[str, (path, join_conditions, join_types)]``
              keyed by element table name.
              *path* is a list of table name strings in JOIN order (pre-order walk
              of the JoinTree, starting with "Dataset").
              *join_conditions* maps ``table_name -> set[(fk_col, pk_col)]``.
              *join_types* maps ``table_name -> "inner" | "left"``.
            - **denormalized_columns** -- list of
              ``(schema_name, table_name, column_name, type_name)`` for the output.
            - **multi_schema** -- True if output spans multiple domain schemas.

        Raises:
            DerivaMLDenormalizeMultiLeaf / DerivaMLDenormalizeNoSink /
            DerivaMLDenormalizeDownstreamLeaf: from :meth:`_determine_row_per`.
            DerivaMLDenormalizeAmbiguousPath: if more than one FK path exists
                between row_per and a requested table.
        """
        include_tables_set = set(include_tables)
        for t in include_tables_set:
            _ = self.name_to_table(t)  # validate existence
        via_list = list(via or [])
        for t in via_list:
            _ = self.name_to_table(t)  # validate existence

        # ── Phase 0: planner guards (Rules 2, 5, 6) ──────────────────────────
        # Empty include_tables is a legal degenerate case (caller passes no
        # requested tables and expects an empty result). Skip guards then.
        if include_tables:
            resolved_row_per = self._determine_row_per(
                include_tables=list(include_tables),
                via=via_list,
                row_per=row_per,
            )
            ambiguities = self._find_path_ambiguities(
                row_per=resolved_row_per,
                include_tables=list(include_tables),
                via=via_list,
            )
            if ambiguities:
                from deriva_ml.core.exceptions import DerivaMLDenormalizeAmbiguousPath

                a = ambiguities[0]
                raise DerivaMLDenormalizeAmbiguousPath(
                    from_table=a["from_table"],
                    to_table=a["to_table"],
                    paths=a["paths"],
                    suggested_intermediates=a["suggested_intermediates"],
                )

        # ── Phase 1: path discovery ──────────────────────────────────────────
        all_paths = self._schema_to_paths()

        # Filter paths: must end at a table in include_tables AND
        # have at least one table in include_tables along the path.
        table_paths = [
            path
            for path in all_paths
            if path[-1].name in include_tables_set and include_tables_set.intersection({p.name for p in path})
        ]

        # ── Phase 1b: deduplicate association table routes ───────────────────
        # In some catalogs (e.g., eye-ai), both Image_Dataset and Dataset_Image
        # exist.  Keep only one route per (element, endpoint) via different
        # association tables (path[1]).
        deduplicated_paths: list[list[Table]] = []
        seen_element_endpoint: dict[tuple[str, str], tuple[list[Table], Table]] = {}

        def _is_standard_assoc(assoc_name: str, element_name: str) -> bool:
            """Check if assoc table matches the Dataset_{Element} naming pattern."""
            return assoc_name == f"Dataset_{element_name}"

        for path in table_paths:
            if len(path) < 3:
                deduplicated_paths.append(path)
                continue
            assoc_table = path[1]
            element = path[2]
            endpoint = path[-1]
            key = (element.name, endpoint.name)

            if key not in seen_element_endpoint:
                seen_element_endpoint[key] = (path, assoc_table)
                deduplicated_paths.append(path)
            else:
                existing_path, existing_assoc = seen_element_endpoint[key]
                if existing_assoc.name != assoc_table.name:
                    # Duplicate route via different association table.
                    # Prefer the standard Dataset_{Element} pattern over legacy.
                    if _is_standard_assoc(assoc_table.name, element.name) and not _is_standard_assoc(
                        existing_assoc.name, element.name
                    ):
                        # Replace existing with standard pattern
                        deduplicated_paths = [
                            p for p in deduplicated_paths if not (len(p) >= 3 and (p[2].name, p[-1].name) == key)
                        ]
                        seen_element_endpoint[key] = (path, assoc_table)
                        deduplicated_paths.append(path)
                    # else: keep existing (either it's standard or both are non-standard)
                else:
                    deduplicated_paths.append(path)

        table_paths = deduplicated_paths

        # ── Phase 1c: group by element, filter to elements in include_tables ─
        paths_by_element: dict[str, list[list[Table]]] = defaultdict(list)
        for p in table_paths:
            if len(p) >= 3:
                paths_by_element[p[2].name].append(p)

        paths_by_element = {elem: paths for elem, paths in paths_by_element.items() if elem in include_tables_set}

        # ── Phase 2: build JoinTree per element ──────────────────────────────
        skip_columns = {"RCT", "RMT", "RCB", "RMB"}
        element_tables: dict[str, tuple[list[str], dict[str, set], dict[str, str]]] = {}

        for element_name, paths in paths_by_element.items():
            tree = self._build_join_tree(element_name, include_tables_set, table_paths, via=set(via_list))

            # ── Phase 3: flatten JoinTree to legacy format ───────────────────
            # Pre-order walk gives us the correct JOIN order.
            # We prepend "Dataset" and the association table that connects
            # Dataset to the element (taken from paths[0][0:3]).

            # Find the Dataset -> assoc -> element prefix from the first path
            if paths and len(paths[0]) >= 3:
                dataset_name = paths[0][0].name  # "Dataset"
                assoc_name = paths[0][1].name  # e.g. "Dataset_Image"
            else:
                dataset_name = "Dataset"
                assoc_name = None

            # Walk the tree to get the join order (element -> children)
            tree_nodes = tree.walk()

            # Build the legacy path: [Dataset, assoc, element, ...tree children...]
            path_names: list[str] = [dataset_name]
            if assoc_name:
                path_names.append(assoc_name)

            # Add tree nodes (element first, then its subtree in pre-order)
            for node in tree_nodes:
                if node.table_name not in path_names:
                    path_names.append(node.table_name)

            # Build join conditions and join types from the tree edges
            join_conditions: dict[str, set[tuple]] = {}
            join_types: dict[str, str] = {}

            # First, add the Dataset -> assoc and assoc -> element conditions
            if assoc_name:
                dataset_table = self.name_to_table(dataset_name)
                assoc_table_obj = self.name_to_table(assoc_name)
                try:
                    col_pairs = self._table_relationship(dataset_table, assoc_table_obj)
                    join_conditions[assoc_name] = set(col_pairs)
                    join_types[assoc_name] = "inner"
                except DerivaMLException:
                    pass

                try:
                    col_pairs = self._table_relationship(assoc_table_obj, tree.table)
                    join_conditions[tree.table_name] = set(col_pairs)
                    join_types[tree.table_name] = "inner"
                except DerivaMLException:
                    pass

            # Add conditions from the JoinTree edges
            for parent_node, child_node in tree.walk_edges():
                if child_node.fk_columns:
                    join_conditions[child_node.table_name] = set(child_node.fk_columns)
                    join_types[child_node.table_name] = child_node.join_type

            element_tables[element_name] = (path_names, join_conditions, join_types)

        # ── Phase 4: build denormalized column list ──────────────────────────
        denormalized_columns = []
        for table_name in include_tables_set:
            if self.is_association(table_name):
                continue
            table = self.name_to_table(table_name)
            for c in table.columns:
                if c.name not in skip_columns:
                    denormalized_columns.append((table.schema.name, table_name, c.name, c.type.typename))

        output_schemas = {s for s, _, _, _ in denormalized_columns if self.is_domain_schema(s)}
        multi_schema = len(output_schemas) > 1

        return element_tables, denormalized_columns, multi_schema

    def _table_relationship(
        self,
        table1: TableInput,
        table2: TableInput,
    ) -> list[tuple[Column, Column]]:
        """Return column pairs used to relate two tables.

        For simple FKs, returns a single-element list: [(fk_col, pk_col)].
        For composite FKs, returns multiple pairs: [(fk_col1, pk_col1), (fk_col2, pk_col2)].

        Each FK constraint counts as one relationship (even if composite),
        so ambiguity is detected when multiple separate FK constraints exist
        between the same two tables.
        """
        table1 = self.name_to_table(table1)
        table2 = self.name_to_table(table2)
        # Each FK constraint produces a list of (fk_col, pk_col) pairs
        relationships: list[list[tuple[Column, Column]]] = []
        for fk in table1.foreign_keys:
            if fk.pk_table == table2:
                pairs = list(zip(fk.foreign_key_columns, fk.referenced_columns))
                relationships.append(pairs)
        for fk in table1.referenced_by:
            if fk.table == table2:
                pairs = list(zip(fk.referenced_columns, fk.foreign_key_columns))
                relationships.append(pairs)

        if len(relationships) == 0:
            raise DerivaMLException(
                f"No FK relationship found between {table1.name} and {table2.name}. "
                f"These tables may not be directly connected. Check your include_tables list."
            )
        if len(relationships) > 1:
            path_descriptions = []
            for col_pairs in relationships:
                desc = ", ".join(
                    f"{fk_col.table.name}.{fk_col.name}{pk_col.table.name}.{pk_col.name}"
                    for fk_col, pk_col in col_pairs
                )
                path_descriptions.append(f"  {desc}")
            raise DerivaMLException(
                f"Ambiguous linkage between {table1.name} and {table2.name}: "
                f"found {len(relationships)} FK relationships:\n" + "\n".join(path_descriptions)
            )
        return relationships[0]

    # Default tables to skip during FK path traversal.
    # These are ML schema tables that create unwanted traversal branches:
    # - Dataset_Dataset: nested dataset self-reference (handled separately)
    # - Execution: execution tracking (not useful for data traversal)
    _DEFAULT_SKIP_TABLES = frozenset({"Dataset_Dataset", "Execution"})

    def _schema_to_paths(
        self,
        root: Table | None = None,
        path: list[Table] | None = None,
        exclude_tables: set[str] | None = None,
        skip_tables: frozenset[str] | None = None,
        max_depth: int | None = None,
        stop_at: str | None = None,
    ) -> list[list[Table]]:
        """Discover all FK paths through the schema graph via depth-first traversal.

        This is the shared foundation for both bag export (catalog_graph._collect_paths)
        and denormalization (_prepare_wide_table). Changes here affect both systems.

        Traversal rules:
        - Follows both outbound FKs (table.foreign_keys) and inbound FKs (table.referenced_by)
        - Only traverses tables in valid schemas (domain + ML)
        - Terminates at vocabulary tables (paths go INTO vocabs but not OUT)
        - Skips tables in exclude_tables and skip_tables
        - Detects and skips cycles (same table appearing twice in a path)
        - Prevents dataset element loopback (traversing back to Dataset via element associations)
        - When multiple FKs exist between the same two domain tables, deduplicates
          arcs to avoid redundant paths (keeps one arc per target table)

        Args:
            root: Starting table. Defaults to the Dataset table in the ML schema.
            path: Current path being built (used during recursion).
            exclude_tables: Caller-specified table names to skip. These tables and
                all paths through them are pruned from the result.
            skip_tables: Infrastructure table names to skip. Defaults to
                _DEFAULT_SKIP_TABLES (Dataset_Dataset, Execution). Override to
                customize which ML schema tables are excluded from traversal.
            max_depth: Maximum path length (number of tables). None = unlimited.
                Use to protect against pathological schemas with deep chains.
            stop_at: If given, return only paths whose final table's name equals
                ``stop_at``. The root-only path ``[root]`` is excluded unless
                ``root.name == stop_at``. Default ``None`` returns all prefixes
                (the original behavior).

        Returns:
            List of paths, where each path is a list of Table objects starting
            from root. Every prefix of a path is also included (e.g., if
            [Dataset, A, B, C] is a path, then [Dataset], [Dataset, A], and
            [Dataset, A, B] are also in the result).
        """
        exclude_tables = exclude_tables or set()
        skip_tables = skip_tables if skip_tables is not None else self._DEFAULT_SKIP_TABLES

        root = root or self.model.schemas[self.ml_schema].tables["Dataset"]
        path = path.copy() if path else []
        parent = path[-1] if path else None  # Table we are coming from.
        path.append(root)
        paths = [path]

        # Depth limit check
        if max_depth is not None and len(path) >= max_depth:
            if stop_at is not None:
                return [p for p in paths if p and p[-1].name == stop_at]
            return paths

        def is_nested_dataset_loopback(n1: Table, n2: Table) -> bool:
            """Check if traversal would loop back to Dataset via an element association.

            Prevents: Subject -> Dataset_Subject -> Dataset (looping back to root).
            Allows: Dataset -> Dataset_Subject -> Subject (the intended direction).

            Uses :meth:`_is_association_table` (FK-arity topology) rather
            than ermrest's ``find_associations(pure=True)`` so that non-
            pure association tables — bridges that carry user metadata
            like ``Image_Dataset_Legacy`` — are ALSO recognized as
            dataset-element associations and excluded from upstream
            traversal. Without this, walking Image → Image_Dataset_Legacy →
            Dataset creates a phantom "hub" path that spuriously connects
            Image to any other dataset-member table (e.g. Subject,
            Observation) through a different Dataset_X association,
            producing false Rule-6 ambiguities.
            """
            dataset_table = self.model.schemas[self.ml_schema].tables["Dataset"]
            if n1 == dataset_table:
                # Outbound from Dataset → Dataset_X is always fine.
                return False
            # Is n2 an association table that points at Dataset (i.e. one
            # of its FK targets is the Dataset root)?
            if not self._is_association_table(n2):
                return False
            for fk in n2.foreign_keys:
                if fk.pk_table == dataset_table:
                    return True
            return False

        # Vocabulary tables are terminal — traverse INTO but not OUT.
        if self.is_vocabulary(root):
            if stop_at is not None:
                return [p for p in paths if p and p[-1].name == stop_at]
            return paths

        for child in self._fk_neighbors(root):
            if child.name in skip_tables:
                continue
            if child.name in exclude_tables:
                continue
            if child == parent:
                # Don't loop back to immediate parent via referenced_by
                continue
            if is_nested_dataset_loopback(root, child):
                continue
            if child in path:
                # Cycle detected — skip to avoid infinite recursion.
                logger.warning(f"Cycle in schema path: {child.name} path:{[p.name for p in path]}, skipping")
                continue

            paths.extend(self._schema_to_paths(child, path, exclude_tables, skip_tables, max_depth, stop_at))
        if stop_at is not None:
            return [p for p in paths if p and p[-1].name == stop_at]
        return paths

    def create_table(self, table_def: TableDefinition, schema: str | None = None) -> Table:
        """Create a new table from TableDefinition.

        Args:
            table_def: Table definition (dataclass or dict).
            schema: Schema to create the table in. If None, uses default_schema.

        Returns:
            The newly created Table.

        Raises:
            DerivaMLException: If no schema specified and default_schema is not set.

        Note: @validate_call removed because TableDefinition is now a dataclass from
        deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.
        """
        schema = schema or self._require_default_schema()
        # Handle both TableDefinition (dataclass with to_dict) and plain dicts
        table_dict = table_def.to_dict() if hasattr(table_def, "to_dict") else table_def
        return self.model.schemas[schema].create_table(table_dict)

    def _define_association(
        self,
        associates: list,
        metadata: list | None = None,
        table_name: str | None = None,
        comment: str | None = None,
        **kwargs,
    ) -> dict:
        """Build an association table definition with vocab-aware key selection.

        Wraps Table.define_association to ensure non-vocabulary tables use RID
        as their foreign key target. The default key search heuristic in
        define_association prefers Name/ID keys over RID, which is correct for
        vocabulary tables (FK to human-readable Name) but wrong for domain
        tables that happen to have non-nullable Name or ID keys (e.g., tables
        in cloned catalogs like FaceBase).

        Args:
            associates: Reference targets being associated (Table, Key, or tuples).
            metadata: Additional metadata fields and/or reference targets.
            table_name: Name for the association table.
            comment: Comment for the association table.
            **kwargs: Additional arguments passed to Table.define_association.

        Returns:
            Table definition dict suitable for create_table.
        """
        metadata = metadata or []

        def _resolve_key(ref):
            """Convert non-vocabulary Table references to their RID Key."""
            if isinstance(ref, tuple):
                # (name, Table) or (name, nullok, Table) — resolve the Table element
                items = list(ref)
                table_obj = items[-1]
                if isinstance(table_obj, Table) and not table_obj.is_vocabulary():
                    items[-1] = table_obj.key_by_columns(["RID"])
                return tuple(items)
            elif isinstance(ref, Table) and not ref.is_vocabulary():
                return ref.key_by_columns(["RID"])
            return ref  # Key objects or vocabulary Tables pass through

        resolved_associates = [_resolve_key(a) for a in associates]
        resolved_metadata = [_resolve_key(m) for m in metadata]

        return Table.define_association(
            associates=resolved_associates,
            metadata=resolved_metadata,
            table_name=table_name,
            comment=comment,
            **kwargs,
        )

chaise_config property

chaise_config: dict[str, Any]

Return the chaise configuration.

__init__

__init__(
    model: Model,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: str
    | set[str]
    | None = None,
    default_schema: str | None = None,
)

Create and initialize a DerivaModel instance.

This method will connect to a catalog and initialize schema configuration. This class is intended to be used as a base class on which domain-specific interfaces are built.

Parameters:

Name Type Description Default
model Model

The ERMRest model for the catalog.

required
ml_schema str

The ML schema name.

ML_SCHEMA
domain_schemas str | set[str] | None

Optional explicit set of domain schema names. If None, auto-detects all non-system schemas.

None
default_schema str | None

The default schema for table creation operations. If None and there is exactly one domain schema, that schema is used as default. If there are multiple domain schemas, default_schema must be specified.

None
Source code in src/deriva_ml/model/catalog.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __init__(
    self,
    model: Model,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: str | set[str] | None = None,
    default_schema: str | None = None,
):
    """Create and initialize a DerivaModel instance.

    This method will connect to a catalog and initialize schema configuration.
    This class is intended to be used as a base class on which domain-specific interfaces are built.

    Args:
        model: The ERMRest model for the catalog.
        ml_schema: The ML schema name.
        domain_schemas: Optional explicit set of domain schema names. If None,
            auto-detects all non-system schemas.
        default_schema: The default schema for table creation operations. If None
            and there is exactly one domain schema, that schema is used as default.
            If there are multiple domain schemas, default_schema must be specified.
    """
    self.model = model
    self.configuration = None
    self.catalog: ErmrestCatalog = self.model.catalog
    self.hostname = self.catalog.deriva_server.server if isinstance(self.catalog, ErmrestCatalog) else "localhost"

    self.ml_schema = ml_schema
    self._system_schemas = frozenset(SYSTEM_SCHEMAS | {ml_schema})

    # Determine domain schemas
    if domain_schemas is not None:
        if isinstance(domain_schemas, str):
            domain_schemas = {domain_schemas}
        self.domain_schemas = frozenset(domain_schemas)
    else:
        # Auto-detect all domain schemas
        self.domain_schemas = _get_domain_schemas(self.model.schemas.keys(), ml_schema)

    # Determine default schema for table creation
    if default_schema is not None:
        if default_schema not in self.domain_schemas:
            raise DerivaMLException(
                f"default_schema '{default_schema}' is not in domain_schemas: {self.domain_schemas}"
            )
        self.default_schema = default_schema
    elif len(self.domain_schemas) == 1:
        # Single domain schema - use it as default
        self.default_schema = next(iter(self.domain_schemas))
    elif len(self.domain_schemas) == 0:
        # No domain schemas - default_schema will be None
        self.default_schema = None
    else:
        # Multiple domain schemas, no explicit default
        self.default_schema = None

apply

apply() -> None

Call ERMRestModel.apply

Source code in src/deriva_ml/model/catalog.py
671
672
673
674
675
676
def apply(self) -> None:
    """Call ERMRestModel.apply"""
    if self.catalog == "file-system":
        raise DerivaMLException("Cannot apply() to non-catalog model.")
    else:
        self.model.apply()

asset_metadata

asset_metadata(
    table: str | Table,
) -> set[str]

Return the metadata columns for an asset table.

Source code in src/deriva_ml/model/catalog.py
637
638
639
640
641
642
643
644
def asset_metadata(self, table: str | Table) -> set[str]:
    """Return the metadata columns for an asset table."""

    table = self.name_to_table(table)

    if not self.is_asset(table):
        raise DerivaMLTableTypeError("asset table", table.name)
    return {c.name for c in table.columns} - DerivaAssetColumns

asset_metadata_columns

asset_metadata_columns(
    table: str | Table,
) -> list[Column]

Return Column objects for the asset-metadata columns of table.

Like :meth:asset_metadata but returns the :class:Column instances (not just names) so callers can inspect attributes such as nullok. Results are sorted by column name for deterministic iteration.

Parameters:

Name Type Description Default
table str | Table

Asset table name or Table object.

required

Returns:

Type Description
list[Column]

Sorted list of Column objects.

Raises:

Type Description
DerivaMLTableTypeError

If table is not an asset table.

Source code in src/deriva_ml/model/catalog.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def asset_metadata_columns(self, table: str | Table) -> list[Column]:
    """Return Column objects for the asset-metadata columns of ``table``.

    Like :meth:`asset_metadata` but returns the :class:`Column`
    instances (not just names) so callers can inspect attributes
    such as ``nullok``. Results are sorted by column name for
    deterministic iteration.

    Args:
        table: Asset table name or Table object.

    Returns:
        Sorted list of Column objects.

    Raises:
        DerivaMLTableTypeError: If ``table`` is not an asset table.
    """
    table = self.name_to_table(table)
    if not self.is_asset(table):
        raise DerivaMLTableTypeError("asset table", table.name)
    return sorted(
        (c for c in table.columns if c.name not in DerivaAssetColumns),
        key=lambda c: c.name,
    )

create_table

create_table(
    table_def: TableDefinition,
    schema: str | None = None,
) -> Table

Create a new table from TableDefinition.

Parameters:

Name Type Description Default
table_def TableDefinition

Table definition (dataclass or dict).

required
schema str | None

Schema to create the table in. If None, uses default_schema.

None

Returns:

Type Description
Table

The newly created Table.

Raises:

Type Description
DerivaMLException

If no schema specified and default_schema is not set.

Note: @validate_call removed because TableDefinition is now a dataclass from deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.

Source code in src/deriva_ml/model/catalog.py
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
def create_table(self, table_def: TableDefinition, schema: str | None = None) -> Table:
    """Create a new table from TableDefinition.

    Args:
        table_def: Table definition (dataclass or dict).
        schema: Schema to create the table in. If None, uses default_schema.

    Returns:
        The newly created Table.

    Raises:
        DerivaMLException: If no schema specified and default_schema is not set.

    Note: @validate_call removed because TableDefinition is now a dataclass from
    deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.
    """
    schema = schema or self._require_default_schema()
    # Handle both TableDefinition (dataclass with to_dict) and plain dicts
    table_dict = table_def.to_dict() if hasattr(table_def, "to_dict") else table_def
    return self.model.schemas[schema].create_table(table_dict)

find_assets

find_assets(
    with_metadata: bool = False,
) -> list[Table]

Return the list of asset tables in the current model

Source code in src/deriva_ml/model/catalog.py
556
557
558
def find_assets(self, with_metadata: bool = False) -> list[Table]:
    """Return the list of asset tables in the current model"""
    return [t for s in self.model.schemas.values() for t in s.tables.values() if self.is_asset(t)]

find_association

find_association(
    table1: Table | str,
    table2: Table | str,
) -> tuple[Table, Column, Column]

Given two tables, return an association table that connects the two and the two columns used to link them..

Source code in src/deriva_ml/model/catalog.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def find_association(self, table1: Table | str, table2: Table | str) -> tuple[Table, Column, Column]:
    """Given two tables, return an association table that connects the two and the two columns used to link them..

    Raises:
        DerivaML exception if there is either not an association table or more than one association table.
    """
    table1 = self.name_to_table(table1)
    table2 = self.name_to_table(table2)

    tables = [
        (a.table, a.self_fkey.columns[0].name, other_key.columns[0].name)
        for a in table1.find_associations(pure=False)
        if len(a.other_fkeys) == 1 and (other_key := a.other_fkeys.pop()).pk_table == table2
    ]

    if len(tables) == 1:
        return tables[0]
    elif len(tables) == 0:
        raise DerivaMLException(f"No association tables found between {table1.name} and {table2.name}.")
    else:
        raise DerivaMLException(
            f"There are {len(tables)} association tables between {table1.name} and {table2.name}."
        )

find_features

find_features(
    table: TableInput | None = None,
) -> Iterable[Feature]

List features in the catalog.

If a table is specified, returns only features for that table. If no table is specified, returns all features across all tables in the catalog.

Parameters:

Name Type Description Default
table TableInput | None

Optional table to find features for. If None, returns all features in the catalog.

None

Returns:

Type Description
Iterable[Feature]

An iterable of Feature instances describing the features.

Source code in src/deriva_ml/model/catalog.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def find_features(self, table: TableInput | None = None) -> Iterable[Feature]:
    """List features in the catalog.

    If a table is specified, returns only features for that table.
    If no table is specified, returns all features across all tables in the catalog.

    Args:
        table: Optional table to find features for. If None, returns all features
            in the catalog.

    Returns:
        An iterable of Feature instances describing the features.
    """

    def is_feature(a: FindAssociationResult) -> bool:
        """Check if association represents a feature.

        Args:
            a: Association result to check
        Returns:
            bool: True if association represents a feature
        """
        return {
            "Feature_Name",
            "Execution",
            a.self_fkey.foreign_key_columns[0].name,
        }.issubset({c.name for c in a.table.columns})

    def find_table_features(t: Table) -> list[Feature]:
        """Find all features for a single table."""
        return [
            Feature(a, self) for a in t.find_associations(min_arity=3, max_arity=3, pure=False) if is_feature(a)
        ]

    if table is not None:
        # Find features for a specific table
        return find_table_features(self.name_to_table(table))
    else:
        # Find all features across all domain and ML schema tables
        features: list[Feature] = []
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if schema:
                for t in schema.tables.values():
                    features.extend(find_table_features(t))
        return features

find_vocabularies

find_vocabularies() -> list[Table]

Return a list of all controlled vocabulary tables in domain and ML schemas.

Source code in src/deriva_ml/model/catalog.py
560
561
562
563
564
565
566
567
def find_vocabularies(self) -> list[Table]:
    """Return a list of all controlled vocabulary tables in domain and ML schemas."""
    tables = []
    for schema_name in [*self.domain_schemas, self.ml_schema]:
        schema = self.model.schemas.get(schema_name)
        if schema:
            tables.extend(t for t in schema.tables.values() if self.is_vocabulary(t))
    return tables

from_cached classmethod

from_cached(
    schema_dict: dict,
    *,
    catalog,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: "str | set[str] | None" = None,
    default_schema: "str | None" = None,
) -> "DerivaModel"

Construct a DerivaModel from a cached ermrest /schema dict.

No network is touched. The catalog argument is passed to deriva-py's Model(catalog, model_doc) constructor as the first positional argument; in offline mode it will be a :class:~deriva_ml.core.catalog_stub.CatalogStub, in online mode it is a real ErmrestCatalog. DerivaModel.__init__ then reads the catalog back off model.catalog as usual.

This replicates what Model.fromcatalog(catalog) does online — the online call fetches catalog.get("/schema").json() and passes the result to Model(catalog, dict). Here we pass in the already-cached dict from :class:~deriva_ml.core.schema_cache.SchemaCache.

Parameters:

Name Type Description Default
schema_dict dict

The JSON payload from a previous catalog.get('/schema').json() call, as persisted by SchemaCache.

required
catalog

The catalog object to associate with the model. Pass a real ErmrestCatalog online, or a CatalogStub offline.

required
ml_schema str

ML schema name (default "deriva-ml").

ML_SCHEMA
domain_schemas 'str | set[str] | None'

Optional explicit set of domain schema names. If None, auto-detects all non-system schemas from the cached dict.

None
default_schema 'str | None'

Optional default schema name.

None

Returns:

Type Description
'DerivaModel'

A DerivaModel wrapping a deriva-py Model

'DerivaModel'

reconstructed from the dict.

Source code in src/deriva_ml/model/catalog.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@classmethod
def from_cached(
    cls,
    schema_dict: dict,
    *,
    catalog,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: "str | set[str] | None" = None,
    default_schema: "str | None" = None,
) -> "DerivaModel":
    """Construct a DerivaModel from a cached ermrest /schema dict.

    No network is touched. The ``catalog`` argument is passed to
    deriva-py's ``Model(catalog, model_doc)`` constructor as the
    first positional argument; in offline mode it will be a
    :class:`~deriva_ml.core.catalog_stub.CatalogStub`, in online
    mode it is a real ``ErmrestCatalog``. ``DerivaModel.__init__``
    then reads the catalog back off ``model.catalog`` as usual.

    This replicates what ``Model.fromcatalog(catalog)`` does
    online — the online call fetches
    ``catalog.get("/schema").json()`` and passes the result to
    ``Model(catalog, dict)``. Here we pass in the already-cached
    dict from :class:`~deriva_ml.core.schema_cache.SchemaCache`.

    Args:
        schema_dict: The JSON payload from a previous
            ``catalog.get('/schema').json()`` call, as persisted
            by ``SchemaCache``.
        catalog: The catalog object to associate with the model.
            Pass a real ``ErmrestCatalog`` online, or a
            ``CatalogStub`` offline.
        ml_schema: ML schema name (default ``"deriva-ml"``).
        domain_schemas: Optional explicit set of domain schema
            names. If None, auto-detects all non-system schemas
            from the cached dict.
        default_schema: Optional default schema name.

    Returns:
        A ``DerivaModel`` wrapping a deriva-py ``Model``
        reconstructed from the dict.
    """
    from deriva.core.ermrest_model import Model

    # Model.__init__(catalog, model_doc) stores catalog as
    # self._catalog and exposes it via the .catalog property;
    # DerivaModel.__init__ then reads self.model.catalog.
    model = Model(catalog, schema_dict)
    return cls(
        model,
        ml_schema=ml_schema,
        domain_schemas=domain_schemas,
        default_schema=default_schema,
    )

get_schema_description

get_schema_description(
    include_system_columns: bool = False,
) -> dict[str, Any]

Return a JSON description of the catalog schema structure.

Provides a structured representation of the domain and ML schemas including tables, columns, foreign keys, and relationships. Useful for understanding the data model structure programmatically.

Parameters:

Name Type Description Default
include_system_columns bool

If True, include RID, RCT, RMT, RCB, RMB columns. Default False to reduce output size.

False

Returns:

Type Description
dict[str, Any]

Dictionary with schema structure:

dict[str, Any]

{ "domain_schemas": ["schema_name1", "schema_name2"], "default_schema": "schema_name1", "ml_schema": "deriva-ml", "schemas": { "schema_name": { "tables": { "TableName": { "comment": "description", "is_vocabulary": bool, "is_asset": bool, "is_association": bool, "columns": [...], "foreign_keys": [...], "features": [...] } } } }

dict[str, Any]

}

Source code in src/deriva_ml/model/catalog.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def get_schema_description(self, include_system_columns: bool = False) -> dict[str, Any]:
    """Return a JSON description of the catalog schema structure.

    Provides a structured representation of the domain and ML schemas including
    tables, columns, foreign keys, and relationships. Useful for understanding
    the data model structure programmatically.

    Args:
        include_system_columns: If True, include RID, RCT, RMT, RCB, RMB columns.
            Default False to reduce output size.

    Returns:
        Dictionary with schema structure:
        {
            "domain_schemas": ["schema_name1", "schema_name2"],
            "default_schema": "schema_name1",
            "ml_schema": "deriva-ml",
            "schemas": {
                "schema_name": {
                    "tables": {
                        "TableName": {
                            "comment": "description",
                            "is_vocabulary": bool,
                            "is_asset": bool,
                            "is_association": bool,
                            "columns": [...],
                            "foreign_keys": [...],
                            "features": [...]
                        }
                    }
                }
            }
        }
    """
    system_columns = {"RID", "RCT", "RMT", "RCB", "RMB"}
    result = {
        "domain_schemas": sorted(self.domain_schemas),
        "default_schema": self.default_schema,
        "ml_schema": self.ml_schema,
        "schemas": {},
    }

    # Include all domain schemas and the ML schema
    for schema_name in [*self.domain_schemas, self.ml_schema]:
        schema = self.model.schemas.get(schema_name)
        if not schema:
            continue

        schema_info = {"tables": {}}

        for table_name, table in schema.tables.items():
            # Get columns
            columns = []
            for col in table.columns:
                if not include_system_columns and col.name in system_columns:
                    continue
                columns.append(
                    {
                        "name": col.name,
                        "type": str(col.type.typename),
                        "nullok": col.nullok,
                        "comment": col.comment or "",
                    }
                )

            # Get foreign keys
            foreign_keys = []
            for fk in table.foreign_keys:
                fk_cols = [c.name for c in fk.foreign_key_columns]
                ref_cols = [c.name for c in fk.referenced_columns]
                foreign_keys.append(
                    {
                        "columns": fk_cols,
                        "referenced_table": f"{fk.pk_table.schema.name}.{fk.pk_table.name}",
                        "referenced_columns": ref_cols,
                    }
                )

            # Get features if this is a domain table
            features = []
            if self.is_domain_schema(schema_name):
                try:
                    for f in self.find_features(table):
                        features.append(
                            {
                                "name": f.feature_name,
                                "feature_table": f.feature_table.name,
                            }
                        )
                except Exception as e:
                    logger.debug(f"Could not enumerate features for table {table.name}: {e}")

            table_info = {
                "comment": table.comment or "",
                "is_vocabulary": self.is_vocabulary(table),
                "is_asset": self.is_asset(table),
                "is_association": bool(self.is_association(table)),
                "columns": columns,
                "foreign_keys": foreign_keys,
            }
            if features:
                table_info["features"] = features

            schema_info["tables"][table_name] = table_info

        result["schemas"][schema_name] = schema_info

    return result

is_asset

is_asset(
    table_name: TableInput,
) -> bool

True if the specified table is a proper asset table.

Delegates to Table.is_asset() from deriva-py which checks: - Required columns exist (URL, Filename, Length, MD5) - URL, Length, MD5 are NOT NULL - URL has the asset annotation

Parameters:

Name Type Description Default
table_name TableInput

str | Table

required

Returns:

Type Description
bool

True if the specified table is a proper asset table.

Source code in src/deriva_ml/model/catalog.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def is_asset(self, table_name: TableInput) -> bool:
    """True if the specified table is a proper asset table.

    Delegates to Table.is_asset() from deriva-py which checks:
    - Required columns exist (URL, Filename, Length, MD5)
    - URL, Length, MD5 are NOT NULL
    - URL has the asset annotation

    Args:
        table_name: str | Table

    Returns:
        True if the specified table is a proper asset table.
    """
    table = self.name_to_table(table_name)
    return table.is_asset()

is_association

is_association(
    table_name: str | Table,
    unqualified: bool = True,
    pure: bool = True,
    min_arity: int = 2,
    max_arity: int = 2,
) -> bool | set[str] | int

Check the specified table to see if it is an association table.

Parameters:

Name Type Description Default
table_name str | Table

param unqualified:

required
pure bool

return: (Default value = True)

True
table_name str | Table

str | Table:

required
unqualified bool

(Default value = True)

True

Returns:

Source code in src/deriva_ml/model/catalog.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def is_association(
    self,
    table_name: str | Table,
    unqualified: bool = True,
    pure: bool = True,
    min_arity: int = 2,
    max_arity: int = 2,
) -> bool | set[str] | int:
    """Check the specified table to see if it is an association table.

    Args:
        table_name: param unqualified:
        pure: return: (Default value = True)
        table_name: str | Table:
        unqualified:  (Default value = True)

    Returns:


    """
    table = self.name_to_table(table_name)
    return table.is_association(unqualified=unqualified, pure=pure, min_arity=min_arity, max_arity=max_arity)

is_dataset_rid

is_dataset_rid(
    rid: RID, deleted: bool = False
) -> bool

Check if a given RID is a dataset RID.

Source code in src/deriva_ml/model/catalog.py
678
679
680
681
682
683
684
685
686
687
688
689
690
def is_dataset_rid(self, rid: RID, deleted: bool = False) -> bool:
    """Check if a given RID is a dataset RID."""
    try:
        rid_info = self.model.catalog.resolve_rid(rid, self.model)
    except KeyError as _e:
        raise DerivaMLException(f"Invalid RID {rid}")
    if rid_info.table.name != "Dataset":
        return False
    elif deleted:
        # Got a dataset rid. Now check to see if its deleted or not.
        return True
    else:
        return not list(rid_info.datapath.entities().fetch())[0]["Deleted"]

is_domain_schema

is_domain_schema(
    schema_name: str,
) -> bool

Check if a schema is a domain schema.

Parameters:

Name Type Description Default
schema_name str

Name of the schema to check.

required

Returns:

Type Description
bool

True if the schema is a domain schema.

Source code in src/deriva_ml/model/catalog.py
271
272
273
274
275
276
277
278
279
280
def is_domain_schema(self, schema_name: str) -> bool:
    """Check if a schema is a domain schema.

    Args:
        schema_name: Name of the schema to check.

    Returns:
        True if the schema is a domain schema.
    """
    return schema_name in self.domain_schemas

is_system_schema

is_system_schema(
    schema_name: str,
) -> bool

Check if a schema is a system or ML schema.

Parameters:

Name Type Description Default
schema_name str

Name of the schema to check.

required

Returns:

Type Description
bool

True if the schema is a system or ML schema.

Source code in src/deriva_ml/model/catalog.py
260
261
262
263
264
265
266
267
268
269
def is_system_schema(self, schema_name: str) -> bool:
    """Check if a schema is a system or ML schema.

    Args:
        schema_name: Name of the schema to check.

    Returns:
        True if the schema is a system or ML schema.
    """
    return _is_system_schema(schema_name, self.ml_schema)

is_vocabulary

is_vocabulary(
    table_name: TableInput,
) -> bool

Check if a given table is a controlled vocabulary table.

Delegates to Table.is_vocabulary() in deriva-py, which enforces both the required column names AND their types (ermrest_curie, ermrest_uri, text, markdown). The type check is stricter than a column-name-only check — a table with an ID column of the wrong type correctly returns False here where the legacy name-only implementation would have returned True.

Mirrors :meth:is_asset, which already delegates to Table.is_asset().

Parameters:

Name Type Description Default
table_name TableInput

An ERMrest Table object or the name of the table.

required

Returns:

Type Description
bool

True if the table has the structure of a controlled vocabulary,

bool

False otherwise.

Raises:

Type Description
DerivaMLException

if the table doesn't exist.

Source code in src/deriva_ml/model/catalog.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def is_vocabulary(self, table_name: TableInput) -> bool:
    """Check if a given table is a controlled vocabulary table.

    Delegates to ``Table.is_vocabulary()`` in deriva-py, which enforces both
    the required column names AND their types (ermrest_curie, ermrest_uri,
    text, markdown). The type check is stricter than a column-name-only
    check — a table with an ``ID`` column of the wrong type correctly
    returns False here where the legacy name-only implementation would
    have returned True.

    Mirrors :meth:`is_asset`, which already delegates to ``Table.is_asset()``.

    Args:
        table_name: An ERMrest Table object or the name of the table.

    Returns:
        True if the table has the structure of a controlled vocabulary,
        False otherwise.

    Raises:
        DerivaMLException: if the table doesn't exist.
    """
    table = self.name_to_table(table_name)
    return table.is_vocabulary()

list_dataset_element_types

list_dataset_element_types() -> (
    list[Table]
)

Lists the data types of elements contained within a dataset.

This method analyzes the dataset and identifies the data types for all elements within it. It is useful for understanding the structure and content of the dataset and allows for better manipulation and usage of its data.

Returns:

Type Description
list[Table]

list[str]: A list of strings where each string represents a data type

list[Table]

of an element found in the dataset.

Source code in src/deriva_ml/model/catalog.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def list_dataset_element_types(self) -> list[Table]:
    """
    Lists the data types of elements contained within a dataset.

    This method analyzes the dataset and identifies the data types for all
    elements within it. It is useful for understanding the structure and
    content of the dataset and allows for better manipulation and usage of its
    data.

    Returns:
        list[str]: A list of strings where each string represents a data type
        of an element found in the dataset.

    """

    dataset_table = self.name_to_table("Dataset")

    def is_domain_or_dataset_table(table: Table) -> bool:
        return self.is_domain_schema(table.schema.name) or table.name == dataset_table.name

    return [
        t
        for a in dataset_table.find_associations()
        if is_domain_or_dataset_table(t := a.other_fkeys.pop().pk_table)
    ]

lookup_feature

lookup_feature(
    table: TableInput, feature_name: str
) -> Feature

Lookup the named feature associated with the provided table.

Parameters:

Name Type Description Default
table TableInput

param feature_name:

required
table TableInput

str | Table:

required
feature_name str

str:

required

Returns:

Type Description
Feature

A Feature class that represents the requested feature.

Raises:

Type Description
DerivaMLException

If the feature cannot be found.

Source code in src/deriva_ml/model/catalog.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
def lookup_feature(self, table: TableInput, feature_name: str) -> Feature:
    """Lookup the named feature associated with the provided table.

    Args:
        table: param feature_name:
        table: str | Table:
        feature_name: str:

    Returns:
        A Feature class that represents the requested feature.

    Raises:
      DerivaMLException: If the feature cannot be found.
    """
    table = self.name_to_table(table)
    try:
        return [f for f in self.find_features(table) if f.feature_name == feature_name][0]
    except IndexError:
        raise DerivaMLException(f"Feature {table.name}:{feature_name} doesn't exist.")

name_to_table

name_to_table(
    table: TableInput,
) -> Table

Return the table object corresponding to the given table name.

Searches domain schemas first (in sorted order), then ML schema, then WWW. If the table name appears in more than one schema, returns the first match.

Parameters:

Name Type Description Default
table TableInput

A ERMRest table object or a string that is the name of the table.

required

Returns:

Type Description
Table

Table object.

Raises:

Type Description
DerivaMLException

If the table doesn't exist in any searchable schema.

Source code in src/deriva_ml/model/catalog.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def name_to_table(self, table: TableInput) -> Table:
    """Return the table object corresponding to the given table name.

    Searches domain schemas first (in sorted order), then ML schema, then WWW.
    If the table name appears in more than one schema, returns the first match.

    Args:
      table: A ERMRest table object or a string that is the name of the table.

    Returns:
      Table object.

    Raises:
      DerivaMLException: If the table doesn't exist in any searchable schema.
    """
    if isinstance(table, Table):
        return table

    # Search domain schemas (sorted for deterministic order), then ML schema, then WWW
    search_order = [*sorted(self.domain_schemas), self.ml_schema, "WWW"]
    for sname in search_order:
        if sname not in self.model.schemas:
            continue
        s = self.model.schemas[sname]
        if table in s.tables:
            return s.tables[table]
    raise DerivaMLException(f"The table {table} doesn't exist.")

vocab_columns

vocab_columns(
    table_name: TableInput,
) -> dict[str, str]

Return mapping from canonical vocab column name to actual column name.

Canonical names are TitleCase (Name, ID, URI, Description, Synonyms). Actual names reflect the table's schema — could be lowercase for FaceBase-style catalogs or TitleCase for DerivaML-native tables.

Parameters:

Name Type Description Default
table_name TableInput

A table object or the name of the table.

required

Returns:

Type Description
dict[str, str]

Dict mapping canonical name to actual column name in the table.

dict[str, str]

E.g. {"Name": "name", "ID": "id", ...} for FaceBase tables

dict[str, str]

or {"Name": "Name", "ID": "ID", ...} for DerivaML tables.

Source code in src/deriva_ml/model/catalog.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def vocab_columns(self, table_name: TableInput) -> dict[str, str]:
    """Return mapping from canonical vocab column name to actual column name.

    Canonical names are TitleCase (Name, ID, URI, Description, Synonyms).
    Actual names reflect the table's schema — could be lowercase for
    FaceBase-style catalogs or TitleCase for DerivaML-native tables.

    Args:
        table_name: A table object or the name of the table.

    Returns:
        Dict mapping canonical name to actual column name in the table.
        E.g. ``{"Name": "name", "ID": "id", ...}`` for FaceBase tables
        or ``{"Name": "Name", "ID": "ID", ...}`` for DerivaML tables.
    """
    table = self.name_to_table(table_name)
    col_map = {c.name.upper(): c.name for c in table.columns}
    return {canon: col_map[canon.upper()] for canon in ("Name", "ID", "URI", "Description", "Synonyms")}

Display dataclass

Bases: AnnotationBuilder

Display annotation for tables and columns.

Controls the display name, description/tooltip, and how null values and foreign key links are rendered. Can be applied to both tables and columns.

Parameters:

Name Type Description Default
name str | None

Display name shown in the UI (mutually exclusive with markdown_name)

None
markdown_name str | None

Markdown-formatted display name (mutually exclusive with name)

None
name_style NameStyle | None

Styling options for automatic name formatting

None
comment str | None

Description text shown as tooltip/help text

None
show_null dict[str, bool | str] | None

How to display null values, per context

None
show_foreign_key_link dict[str, bool] | None

Whether to show FK values as links, per context

None

Raises:

Type Description
ValueError

If both name and markdown_name are provided

Example

Basic display name::

>>> display = Display(name="Research Subjects")  # doctest: +SKIP
>>> handle.set_annotation(display)

With description/tooltip::

>>> display = Display(
...     name="Subjects",
...     comment="Individuals enrolled in research studies"
... )

Markdown-formatted name::

>>> display = Display(markdown_name="**Bold** _Italic_ Name")

Context-specific null display::

>>> from deriva_ml.model import CONTEXT_COMPACT, CONTEXT_DETAILED
>>> display = Display(
...     name="Value",
...     show_null={
...         CONTEXT_COMPACT: False,      # Hide nulls in lists
...         CONTEXT_DETAILED: '"N/A"'    # Show "N/A" string
...     }
... )

Control foreign key link display::

>>> display = Display(
...     name="Subject",
...     show_foreign_key_link={CONTEXT_COMPACT: False}
... )
Source code in src/deriva_ml/model/annotations.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
@dataclass
class Display(AnnotationBuilder):
    """Display annotation for tables and columns.

    Controls the display name, description/tooltip, and how null values
    and foreign key links are rendered. Can be applied to both tables
    and columns.

    Args:
        name: Display name shown in the UI (mutually exclusive with markdown_name)
        markdown_name: Markdown-formatted display name (mutually exclusive with name)
        name_style: Styling options for automatic name formatting
        comment: Description text shown as tooltip/help text
        show_null: How to display null values, per context
        show_foreign_key_link: Whether to show FK values as links, per context

    Raises:
        ValueError: If both name and markdown_name are provided

    Example:
        Basic display name::

            >>> display = Display(name="Research Subjects")  # doctest: +SKIP
            >>> handle.set_annotation(display)

        With description/tooltip::

            >>> display = Display(
            ...     name="Subjects",
            ...     comment="Individuals enrolled in research studies"
            ... )

        Markdown-formatted name::

            >>> display = Display(markdown_name="**Bold** _Italic_ Name")

        Context-specific null display::

            >>> from deriva_ml.model import CONTEXT_COMPACT, CONTEXT_DETAILED
            >>> display = Display(
            ...     name="Value",
            ...     show_null={
            ...         CONTEXT_COMPACT: False,      # Hide nulls in lists
            ...         CONTEXT_DETAILED: '"N/A"'    # Show "N/A" string
            ...     }
            ... )

        Control foreign key link display::

            >>> display = Display(
            ...     name="Subject",
            ...     show_foreign_key_link={CONTEXT_COMPACT: False}
            ... )
    """
    tag = TAG_DISPLAY

    name: str | None = None
    markdown_name: str | None = None
    name_style: NameStyle | None = None
    comment: str | None = None
    show_null: dict[str, bool | str] | None = None
    show_foreign_key_link: dict[str, bool] | None = None

    def __post_init__(self):
        if self.name and self.markdown_name:
            raise ValueError("name and markdown_name are mutually exclusive")

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.name is not None:
            result["name"] = self.name
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.name_style is not None:
            style_dict = self.name_style.to_dict()
            if style_dict:
                result["name_style"] = style_dict
        if self.comment is not None:
            result["comment"] = self.comment
        if self.show_null is not None:
            result["show_null"] = self.show_null
        if self.show_foreign_key_link is not None:
            result["show_foreign_key_link"] = self.show_foreign_key_link
        return result

Facet dataclass

A facet definition for filtering.

Parameters:

Name Type Description Default
source str | list[str | InboundFK | OutboundFK] | None

Path to source data

None
sourcekey str | None

Reference to named source

None
markdown_name str | None

Display name

None
comment str | None

Description

None
entity bool | None

Whether this is an entity facet

None
open bool | None

Start expanded

None
ux_mode FacetUxMode | None

UI mode (choices, ranges, check_presence)

None
bar_plot bool | None

Show bar plot

None
choices list[Any] | None

Preset choice values

None
ranges list[FacetRange] | None

Preset range values

None
not_null bool | None

Filter to non-null values

None
hide_null_choice bool | None

Hide "null" option

None
hide_not_null_choice bool | None

Hide "not null" option

None
n_bins int | None

Number of bins for histogram

None
Source code in src/deriva_ml/model/annotations.py
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
@dataclass
class Facet:
    """A facet definition for filtering.

    Args:
        source: Path to source data
        sourcekey: Reference to named source
        markdown_name: Display name
        comment: Description
        entity: Whether this is an entity facet
        open: Start expanded
        ux_mode: UI mode (choices, ranges, check_presence)
        bar_plot: Show bar plot
        choices: Preset choice values
        ranges: Preset range values
        not_null: Filter to non-null values
        hide_null_choice: Hide "null" option
        hide_not_null_choice: Hide "not null" option
        n_bins: Number of bins for histogram
    """
    source: str | list[str | InboundFK | OutboundFK] | None = None
    sourcekey: str | None = None
    markdown_name: str | None = None
    comment: str | None = None
    entity: bool | None = None
    open: bool | None = None
    ux_mode: FacetUxMode | None = None
    bar_plot: bool | None = None
    choices: list[Any] | None = None
    ranges: list[FacetRange] | None = None
    not_null: bool | None = None
    hide_null_choice: bool | None = None
    hide_not_null_choice: bool | None = None
    n_bins: int | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}

        if self.source is not None:
            if isinstance(self.source, str):
                result["source"] = self.source
            else:
                result["source"] = [
                    item.to_dict() if hasattr(item, "to_dict") else item
                    for item in self.source
                ]

        if self.sourcekey is not None:
            result["sourcekey"] = self.sourcekey
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.comment is not None:
            result["comment"] = self.comment
        if self.entity is not None:
            result["entity"] = self.entity
        if self.open is not None:
            result["open"] = self.open
        if self.ux_mode is not None:
            result["ux_mode"] = self.ux_mode.value
        if self.bar_plot is not None:
            result["bar_plot"] = self.bar_plot
        if self.choices is not None:
            result["choices"] = self.choices
        if self.ranges is not None:
            result["ranges"] = [r.to_dict() for r in self.ranges]
        if self.not_null is not None:
            result["not_null"] = self.not_null
        if self.hide_null_choice is not None:
            result["hide_null_choice"] = self.hide_null_choice
        if self.hide_not_null_choice is not None:
            result["hide_not_null_choice"] = self.hide_not_null_choice
        if self.n_bins is not None:
            result["n_bins"] = self.n_bins

        return result

FacetList dataclass

A list of facets for filtering (visible_columns.filter).

Example

facets = FacetList([ ... Facet(source="Species", open=True), ... Facet(source="Age", ux_mode=FacetUxMode.RANGES) ... ])

Source code in src/deriva_ml/model/annotations.py
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
@dataclass
class FacetList:
    """A list of facets for filtering (visible_columns.filter).

    Example:
        >>> facets = FacetList([
        ...     Facet(source="Species", open=True),
        ...     Facet(source="Age", ux_mode=FacetUxMode.RANGES)
        ... ])
    """
    facets: list[Facet] = field(default_factory=list)

    def add(self, facet: Facet) -> "FacetList":
        """Add a facet to the list."""
        self.facets.append(facet)
        return self

    def to_dict(self) -> dict[str, list[dict]]:
        return {"and": [f.to_dict() for f in self.facets]}

add

add(facet: Facet) -> 'FacetList'

Add a facet to the list.

Source code in src/deriva_ml/model/annotations.py
1271
1272
1273
1274
def add(self, facet: Facet) -> "FacetList":
    """Add a facet to the list."""
    self.facets.append(facet)
    return self

FacetRange dataclass

A range for facet filtering.

Parameters:

Name Type Description Default
min float | None

Minimum value

None
max float | None

Maximum value

None
min_exclusive bool | None

Exclude min value

None
max_exclusive bool | None

Exclude max value

None
Source code in src/deriva_ml/model/annotations.py
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
@dataclass
class FacetRange:
    """A range for facet filtering.

    Args:
        min: Minimum value
        max: Maximum value
        min_exclusive: Exclude min value
        max_exclusive: Exclude max value
    """
    min: float | None = None
    max: float | None = None
    min_exclusive: bool | None = None
    max_exclusive: bool | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.min is not None:
            result["min"] = self.min
        if self.max is not None:
            result["max"] = self.max
        if self.min_exclusive is not None:
            result["min_exclusive"] = self.min_exclusive
        if self.max_exclusive is not None:
            result["max_exclusive"] = self.max_exclusive
        return result

FacetUxMode

Bases: str, Enum

UX modes for facet filters in the search panel.

Controls how users interact with a facet filter.

Attributes:

Name Type Description
CHOICES

Checkbox list for selecting values

RANGES

Range slider/inputs for numeric or date ranges

CHECK_PRESENCE

Check if value exists or is null

Example

Choice-based facet

Facet(source="Status", ux_mode=FacetUxMode.CHOICES)

Range-based facet for numeric values

Facet(source="Age", ux_mode=FacetUxMode.RANGES)

Check presence (has value / no value)

Facet(source="Notes", ux_mode=FacetUxMode.CHECK_PRESENCE)

Source code in src/deriva_ml/model/annotations.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class FacetUxMode(str, Enum):
    """UX modes for facet filters in the search panel.

    Controls how users interact with a facet filter.

    Attributes:
        CHOICES: Checkbox list for selecting values
        RANGES: Range slider/inputs for numeric or date ranges
        CHECK_PRESENCE: Check if value exists or is null

    Example:
        >>> # Choice-based facet
        >>> Facet(source="Status", ux_mode=FacetUxMode.CHOICES)
        >>>
        >>> # Range-based facet for numeric values
        >>> Facet(source="Age", ux_mode=FacetUxMode.RANGES)
        >>>
        >>> # Check presence (has value / no value)
        >>> Facet(source="Notes", ux_mode=FacetUxMode.CHECK_PRESENCE)
    """
    CHOICES = "choices"
    RANGES = "ranges"
    CHECK_PRESENCE = "check_presence"

ForeignKeyOrderer

Computes insertion order for tables based on FK dependencies.

Uses topological sort to ensure referenced tables are populated before tables that reference them. Handles cycles by either raising an error or breaking them.

Example

orderer = ForeignKeyOrderer(model, schemas=['domain', 'deriva-ml'])

Get insertion order

tables_to_fill = ['Image', 'Subject', 'Diagnosis'] ordered = orderer.get_insertion_order(tables_to_fill)

Returns: ['Subject', 'Image', 'Diagnosis']

Get all tables in safe order

all_ordered = orderer.get_insertion_order()

Get FK dependencies for a table

deps = orderer.get_dependencies('Image')

Returns: {'Subject', 'Dataset', ...}

Source code in src/deriva_ml/model/fk_orderer.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class ForeignKeyOrderer:
    """Computes insertion order for tables based on FK dependencies.

    Uses topological sort to ensure referenced tables are populated
    before tables that reference them. Handles cycles by either
    raising an error or breaking them.

    Example:
        orderer = ForeignKeyOrderer(model, schemas=['domain', 'deriva-ml'])

        # Get insertion order
        tables_to_fill = ['Image', 'Subject', 'Diagnosis']
        ordered = orderer.get_insertion_order(tables_to_fill)
        # Returns: ['Subject', 'Image', 'Diagnosis']

        # Get all tables in safe order
        all_ordered = orderer.get_insertion_order()

        # Get FK dependencies for a table
        deps = orderer.get_dependencies('Image')
        # Returns: {'Subject', 'Dataset', ...}
    """

    def __init__(
        self,
        model: Model,
        schemas: list[str],
    ):
        """Initialize the orderer.

        Args:
            model: ERMrest Model object.
            schemas: Schemas to consider for FK relationships.
        """
        self.model = model
        self.schemas = set(schemas)
        self._table_cache: dict[str, DerivaTable] = {}
        self._build_table_cache()

    def _build_table_cache(self) -> None:
        """Build cache mapping table names to Table objects."""
        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue
            schema = self.model.schemas[schema_name]
            for table_name, table in schema.tables.items():
                # Store both qualified and unqualified names
                self._table_cache[f"{schema_name}.{table_name}"] = table
                # Only store unqualified if not already present (avoids conflicts)
                if table_name not in self._table_cache:
                    self._table_cache[table_name] = table

    def _to_table(self, t: str | DerivaTable) -> DerivaTable:
        """Convert table name to Table object.

        Args:
            t: Table name or Table object.

        Returns:
            DerivaTable object.

        Raises:
            ValueError: If table not found.
        """
        if isinstance(t, DerivaTable):
            return t

        if t in self._table_cache:
            return self._table_cache[t]

        raise ValueError(f"Table {t} not found in schemas {self.schemas}")

    def _table_key(self, t: DerivaTable) -> str:
        """Get unique key for a table."""
        return f"{t.schema.name}.{t.name}"

    def get_dependencies(self, table: str | DerivaTable) -> set[DerivaTable]:
        """Get tables that this table depends on (FK targets).

        Args:
            table: Table name or object.

        Returns:
            Set of tables that must be populated before this table.
        """
        t = self._to_table(table)
        dependencies = set()

        for fk in t.foreign_keys:
            pk_table = fk.pk_table
            # Only include dependencies within our schemas
            if pk_table.schema.name in self.schemas:
                # Don't include self-references as dependencies
                if self._table_key(pk_table) != self._table_key(t):
                    dependencies.add(pk_table)

        return dependencies

    def get_dependents(self, table: str | DerivaTable) -> set[DerivaTable]:
        """Get tables that depend on this table (FK sources).

        Args:
            table: Table name or object.

        Returns:
            Set of tables that reference this table.
        """
        t = self._to_table(table)
        dependents = set()

        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue

            for other_table in self.model.schemas[schema_name].tables.values():
                if self._table_key(other_table) == self._table_key(t):
                    continue

                for fk in other_table.foreign_keys:
                    if self._table_key(fk.pk_table) == self._table_key(t):
                        dependents.add(other_table)
                        break

        return dependents

    def _build_dependency_graph(
        self,
        tables: list[str | DerivaTable] | None = None,
    ) -> dict[str, set[str]]:
        """Build FK dependency graph.

        Args:
            tables: Tables to include. If None, includes all tables.

        Returns:
            Dict mapping table key -> set of table keys it depends on.
        """
        if tables is None:
            # Include all tables in schemas
            table_objs = []
            for schema_name in self.schemas:
                if schema_name in self.model.schemas:
                    table_objs.extend(self.model.schemas[schema_name].tables.values())
        else:
            table_objs = [self._to_table(t) for t in tables]

        table_keys = {self._table_key(t) for t in table_objs}
        graph: dict[str, set[str]] = {}

        for t in table_objs:
            key = self._table_key(t)
            deps = set()

            for fk in t.foreign_keys:
                pk_key = self._table_key(fk.pk_table)
                # Only include deps within our table set
                if pk_key in table_keys and pk_key != key:
                    deps.add(pk_key)

            graph[key] = deps

        return graph

    def get_insertion_order(
        self,
        tables: list[str | DerivaTable] | None = None,
        handle_cycles: bool = True,
    ) -> list[DerivaTable]:
        """Compute FK-safe insertion order for the given tables.

        Returns tables ordered so that all FK dependencies are satisfied
        when inserting in order.

        Args:
            tables: Tables to order. If None, orders all tables in schemas.
            handle_cycles: If True, break cycles by removing edges.
                If False, raise CycleError on cycles.

        Returns:
            Ordered list of Table objects (insert from first to last).

        Raises:
            CycleError: If handle_cycles=False and cycles exist.
        """
        graph = self._build_dependency_graph(tables)

        try:
            ts = TopologicalSorter(graph)
            ordered_keys = list(ts.static_order())
        except CycleError as e:
            if handle_cycles:
                ordered_keys = self._break_cycles_and_sort(graph, e)
            else:
                raise

        # Convert keys back to Table objects
        return [self._table_cache[key] for key in ordered_keys]

    def get_deletion_order(
        self,
        tables: list[str | DerivaTable] | None = None,
        handle_cycles: bool = True,
    ) -> list[DerivaTable]:
        """Compute FK-safe deletion order for the given tables.

        Returns tables in reverse dependency order - tables that are
        referenced should be deleted last.

        Args:
            tables: Tables to order. If None, orders all tables in schemas.
            handle_cycles: If True, break cycles. If False, raise on cycles.

        Returns:
            Ordered list of Table objects (delete from first to last).
        """
        insertion_order = self.get_insertion_order(tables, handle_cycles)
        return list(reversed(insertion_order))

    def _break_cycles_and_sort(
        self,
        graph: dict[str, set[str]],
        error: CycleError,
        _depth: int = 0,
    ) -> list[str]:
        """Handle cycles by breaking them and re-sorting.

        Uses a simple strategy of removing edges from cycle members
        until no cycles remain.

        Args:
            graph: Dependency graph.
            error: CycleError with cycle info.

        Returns:
            Ordered list of table keys.
        """
        max_depth = len(graph)  # Can't have more cycles than edges
        if _depth > max_depth:
            logger.error("Too many cycles to break, returning arbitrary order")
            return list(graph.keys())

        # Get cycle from error message.
        # CycleError.args[1] is like ['A', 'B', 'C', 'A'] where first == last.
        cycle = list(error.args[1]) if len(error.args) > 1 else []

        if cycle:
            logger.warning(f"Breaking cycle in FK dependencies: {' -> '.join(cycle)}")

            # Remove one edge from the cycle to break it.
            # cycle[-1] == cycle[0], so the unique nodes are cycle[:-1].
            # Each consecutive pair cycle[i] -> cycle[i+1] corresponds to
            # graph[cycle[i+1]] containing cycle[i] (i.e., cycle[i+1] depends on cycle[i]).
            # Remove the last real edge: cycle[-2] from graph[cycle[-1]].
            edge_removed = False
            if len(cycle) >= 3:
                dep_node = cycle[-2]  # the dependency
                node = cycle[-1]      # the node that depends on dep_node
                if node in graph and dep_node in graph[node]:
                    graph[node].remove(dep_node)
                    logger.debug(f"Removed dependency {node} -> {dep_node}")
                    edge_removed = True

            if not edge_removed:
                # Try removing any edge in the cycle
                for i in range(len(cycle) - 1):
                    dep_node, node = cycle[i], cycle[i + 1]
                    if node in graph and dep_node in graph[node]:
                        graph[node].remove(dep_node)
                        logger.debug(f"Removed dependency {node} -> {dep_node}")
                        edge_removed = True
                        break

        # Try again
        try:
            ts = TopologicalSorter(graph)
            return list(ts.static_order())
        except CycleError as e:
            # Recursively break more cycles
            return self._break_cycles_and_sort(graph, e, _depth + 1)

    def validate_insertion_order(
        self,
        tables: list[str | DerivaTable],
    ) -> list[tuple[str, str, str]]:
        """Validate that a list of tables can be inserted in order.

        Checks each table to ensure all its FK dependencies are
        satisfied by tables earlier in the list.

        Args:
            tables: Ordered list of tables to validate.

        Returns:
            List of (table, missing_dependency, fk_name) tuples for
            any unsatisfied dependencies. Empty list if valid.
        """
        table_objs = [self._to_table(t) for t in tables]
        seen_keys = set()
        violations = []

        for t in table_objs:
            key = self._table_key(t)

            for fk in t.foreign_keys:
                pk_key = self._table_key(fk.pk_table)
                # Skip self-references and tables not in our set
                if pk_key == key:
                    continue
                if pk_key not in {self._table_key(x) for x in table_objs}:
                    continue

                if pk_key not in seen_keys:
                    violations.append((key, pk_key, fk.name[1]))

            seen_keys.add(key)

        return violations

    def get_all_tables(self) -> list[DerivaTable]:
        """Get all tables in configured schemas.

        Returns:
            List of all Table objects.
        """
        tables = []
        for schema_name in self.schemas:
            if schema_name in self.model.schemas:
                tables.extend(self.model.schemas[schema_name].tables.values())
        return tables

    def find_cycles(self) -> list[list[str]]:
        """Find all FK dependency cycles in the schema.

        Returns:
            List of cycles, each cycle is a list of table keys.
        """
        graph = self._build_dependency_graph()
        cycles = []

        # Use DFS to find cycles
        visited = set()
        rec_stack = set()
        path = []

        def dfs(node: str) -> bool:
            visited.add(node)
            rec_stack.add(node)
            path.append(node)

            for neighbor in graph.get(node, set()):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in rec_stack:
                    # Found cycle
                    idx = path.index(neighbor)
                    cycle = path[idx:] + [neighbor]
                    cycles.append(cycle)

            path.pop()
            rec_stack.remove(node)
            return False

        for node in graph:
            if node not in visited:
                dfs(node)

        return cycles

__init__

__init__(
    model: Model, schemas: list[str]
)

Initialize the orderer.

Parameters:

Name Type Description Default
model Model

ERMrest Model object.

required
schemas list[str]

Schemas to consider for FK relationships.

required
Source code in src/deriva_ml/model/fk_orderer.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    model: Model,
    schemas: list[str],
):
    """Initialize the orderer.

    Args:
        model: ERMrest Model object.
        schemas: Schemas to consider for FK relationships.
    """
    self.model = model
    self.schemas = set(schemas)
    self._table_cache: dict[str, DerivaTable] = {}
    self._build_table_cache()

find_cycles

find_cycles() -> list[list[str]]

Find all FK dependency cycles in the schema.

Returns:

Type Description
list[list[str]]

List of cycles, each cycle is a list of table keys.

Source code in src/deriva_ml/model/fk_orderer.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def find_cycles(self) -> list[list[str]]:
    """Find all FK dependency cycles in the schema.

    Returns:
        List of cycles, each cycle is a list of table keys.
    """
    graph = self._build_dependency_graph()
    cycles = []

    # Use DFS to find cycles
    visited = set()
    rec_stack = set()
    path = []

    def dfs(node: str) -> bool:
        visited.add(node)
        rec_stack.add(node)
        path.append(node)

        for neighbor in graph.get(node, set()):
            if neighbor not in visited:
                if dfs(neighbor):
                    return True
            elif neighbor in rec_stack:
                # Found cycle
                idx = path.index(neighbor)
                cycle = path[idx:] + [neighbor]
                cycles.append(cycle)

        path.pop()
        rec_stack.remove(node)
        return False

    for node in graph:
        if node not in visited:
            dfs(node)

    return cycles

get_all_tables

get_all_tables() -> list[DerivaTable]

Get all tables in configured schemas.

Returns:

Type Description
list[Table]

List of all Table objects.

Source code in src/deriva_ml/model/fk_orderer.py
353
354
355
356
357
358
359
360
361
362
363
def get_all_tables(self) -> list[DerivaTable]:
    """Get all tables in configured schemas.

    Returns:
        List of all Table objects.
    """
    tables = []
    for schema_name in self.schemas:
        if schema_name in self.model.schemas:
            tables.extend(self.model.schemas[schema_name].tables.values())
    return tables

get_deletion_order

get_deletion_order(
    tables: list[str | Table]
    | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]

Compute FK-safe deletion order for the given tables.

Returns tables in reverse dependency order - tables that are referenced should be deleted last.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all tables in schemas.

None
handle_cycles bool

If True, break cycles. If False, raise on cycles.

True

Returns:

Type Description
list[Table]

Ordered list of Table objects (delete from first to last).

Source code in src/deriva_ml/model/fk_orderer.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get_deletion_order(
    self,
    tables: list[str | DerivaTable] | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]:
    """Compute FK-safe deletion order for the given tables.

    Returns tables in reverse dependency order - tables that are
    referenced should be deleted last.

    Args:
        tables: Tables to order. If None, orders all tables in schemas.
        handle_cycles: If True, break cycles. If False, raise on cycles.

    Returns:
        Ordered list of Table objects (delete from first to last).
    """
    insertion_order = self.get_insertion_order(tables, handle_cycles)
    return list(reversed(insertion_order))

get_dependencies

get_dependencies(
    table: str | Table,
) -> set[DerivaTable]

Get tables that this table depends on (FK targets).

Parameters:

Name Type Description Default
table str | Table

Table name or object.

required

Returns:

Type Description
set[Table]

Set of tables that must be populated before this table.

Source code in src/deriva_ml/model/fk_orderer.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_dependencies(self, table: str | DerivaTable) -> set[DerivaTable]:
    """Get tables that this table depends on (FK targets).

    Args:
        table: Table name or object.

    Returns:
        Set of tables that must be populated before this table.
    """
    t = self._to_table(table)
    dependencies = set()

    for fk in t.foreign_keys:
        pk_table = fk.pk_table
        # Only include dependencies within our schemas
        if pk_table.schema.name in self.schemas:
            # Don't include self-references as dependencies
            if self._table_key(pk_table) != self._table_key(t):
                dependencies.add(pk_table)

    return dependencies

get_dependents

get_dependents(
    table: str | Table,
) -> set[DerivaTable]

Get tables that depend on this table (FK sources).

Parameters:

Name Type Description Default
table str | Table

Table name or object.

required

Returns:

Type Description
set[Table]

Set of tables that reference this table.

Source code in src/deriva_ml/model/fk_orderer.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def get_dependents(self, table: str | DerivaTable) -> set[DerivaTable]:
    """Get tables that depend on this table (FK sources).

    Args:
        table: Table name or object.

    Returns:
        Set of tables that reference this table.
    """
    t = self._to_table(table)
    dependents = set()

    for schema_name in self.schemas:
        if schema_name not in self.model.schemas:
            continue

        for other_table in self.model.schemas[schema_name].tables.values():
            if self._table_key(other_table) == self._table_key(t):
                continue

            for fk in other_table.foreign_keys:
                if self._table_key(fk.pk_table) == self._table_key(t):
                    dependents.add(other_table)
                    break

    return dependents

get_insertion_order

get_insertion_order(
    tables: list[str | Table]
    | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]

Compute FK-safe insertion order for the given tables.

Returns tables ordered so that all FK dependencies are satisfied when inserting in order.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all tables in schemas.

None
handle_cycles bool

If True, break cycles by removing edges. If False, raise CycleError on cycles.

True

Returns:

Type Description
list[Table]

Ordered list of Table objects (insert from first to last).

Raises:

Type Description
CycleError

If handle_cycles=False and cycles exist.

Source code in src/deriva_ml/model/fk_orderer.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_insertion_order(
    self,
    tables: list[str | DerivaTable] | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]:
    """Compute FK-safe insertion order for the given tables.

    Returns tables ordered so that all FK dependencies are satisfied
    when inserting in order.

    Args:
        tables: Tables to order. If None, orders all tables in schemas.
        handle_cycles: If True, break cycles by removing edges.
            If False, raise CycleError on cycles.

    Returns:
        Ordered list of Table objects (insert from first to last).

    Raises:
        CycleError: If handle_cycles=False and cycles exist.
    """
    graph = self._build_dependency_graph(tables)

    try:
        ts = TopologicalSorter(graph)
        ordered_keys = list(ts.static_order())
    except CycleError as e:
        if handle_cycles:
            ordered_keys = self._break_cycles_and_sort(graph, e)
        else:
            raise

    # Convert keys back to Table objects
    return [self._table_cache[key] for key in ordered_keys]

validate_insertion_order

validate_insertion_order(
    tables: list[str | Table],
) -> list[tuple[str, str, str]]

Validate that a list of tables can be inserted in order.

Checks each table to ensure all its FK dependencies are satisfied by tables earlier in the list.

Parameters:

Name Type Description Default
tables list[str | Table]

Ordered list of tables to validate.

required

Returns:

Type Description
list[tuple[str, str, str]]

List of (table, missing_dependency, fk_name) tuples for

list[tuple[str, str, str]]

any unsatisfied dependencies. Empty list if valid.

Source code in src/deriva_ml/model/fk_orderer.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def validate_insertion_order(
    self,
    tables: list[str | DerivaTable],
) -> list[tuple[str, str, str]]:
    """Validate that a list of tables can be inserted in order.

    Checks each table to ensure all its FK dependencies are
    satisfied by tables earlier in the list.

    Args:
        tables: Ordered list of tables to validate.

    Returns:
        List of (table, missing_dependency, fk_name) tuples for
        any unsatisfied dependencies. Empty list if valid.
    """
    table_objs = [self._to_table(t) for t in tables]
    seen_keys = set()
    violations = []

    for t in table_objs:
        key = self._table_key(t)

        for fk in t.foreign_keys:
            pk_key = self._table_key(fk.pk_table)
            # Skip self-references and tables not in our set
            if pk_key == key:
                continue
            if pk_key not in {self._table_key(x) for x in table_objs}:
                continue

            if pk_key not in seen_keys:
                violations.append((key, pk_key, fk.name[1]))

        seen_keys.add(key)

    return violations

InboundFK dataclass

An inbound foreign key path step for pseudo-column source paths.

Use this when following a foreign key FROM another table TO the current table. This is common when counting or aggregating related records.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required
Example

Count images related to a subject (Image has FK to Subject)::

>>> # In Subject table, count related images
>>> pc = PseudoColumn(
...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
...     aggregate=Aggregate.CNT,
...     markdown_name="Image Count"
... )
Source code in src/deriva_ml/model/annotations.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
@dataclass
class InboundFK:
    """An inbound foreign key path step for pseudo-column source paths.

    Use this when following a foreign key FROM another table TO the current table.
    This is common when counting or aggregating related records.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Example:
        Count images related to a subject (Image has FK to Subject)::

            >>> # In Subject table, count related images
            >>> pc = PseudoColumn(
            ...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
            ...     aggregate=Aggregate.CNT,
            ...     markdown_name="Image Count"
            ... )
    """
    schema: str
    constraint: str

    def to_dict(self) -> dict[str, list[str]]:
        return {"inbound": [self.schema, self.constraint]}

NameStyle dataclass

Styling options for automatic display name formatting.

Applied to table or column names when no explicit display name is set.

Parameters:

Name Type Description Default
underline_space bool | None

Replace underscores with spaces (e.g., "First_Name" -> "First Name")

None
title_case bool | None

Apply title case formatting (e.g., "firstname" -> "Firstname")

None
markdown bool | None

Render the name as markdown

None
Example

Transform "Subject_ID" to "Subject Id" with title case

display = Display( ... name_style=NameStyle(underline_space=True, title_case=True) ... )

Source code in src/deriva_ml/model/annotations.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@dataclass
class NameStyle:
    """Styling options for automatic display name formatting.

    Applied to table or column names when no explicit display name is set.

    Args:
        underline_space: Replace underscores with spaces (e.g., "First_Name" -> "First Name")
        title_case: Apply title case formatting (e.g., "firstname" -> "Firstname")
        markdown: Render the name as markdown

    Example:
        >>> # Transform "Subject_ID" to "Subject Id" with title case
        >>> display = Display(
        ...     name_style=NameStyle(underline_space=True, title_case=True)
        ... )
    """
    underline_space: bool | None = None
    title_case: bool | None = None
    markdown: bool | None = None

    def to_dict(self) -> dict[str, bool]:
        """Convert to dictionary, excluding None values."""
        result = {}
        if self.underline_space is not None:
            result["underline_space"] = self.underline_space
        if self.title_case is not None:
            result["title_case"] = self.title_case
        if self.markdown is not None:
            result["markdown"] = self.markdown
        return result

to_dict

to_dict() -> dict[str, bool]

Convert to dictionary, excluding None values.

Source code in src/deriva_ml/model/annotations.py
316
317
318
319
320
321
322
323
324
325
def to_dict(self) -> dict[str, bool]:
    """Convert to dictionary, excluding None values."""
    result = {}
    if self.underline_space is not None:
        result["underline_space"] = self.underline_space
    if self.title_case is not None:
        result["title_case"] = self.title_case
    if self.markdown is not None:
        result["markdown"] = self.markdown
    return result

OutboundFK dataclass

An outbound foreign key path step for pseudo-column source paths.

Use this when following a foreign key FROM the current table TO another table. This is common when displaying values from referenced tables.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required
Example

Show species name from a related Species table::

>>> # Subject has FK to Species, display Species.Name
>>> pc = PseudoColumn(
...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
...     markdown_name="Species"
... )

Chain multiple outbound FKs::

>>> # Image -> Subject -> Species
>>> pc = PseudoColumn(
...     source=[
...         OutboundFK("domain", "Image_Subject_fkey"),
...         OutboundFK("domain", "Subject_Species_fkey"),
...         "Name"
...     ],
...     markdown_name="Species"
... )
Source code in src/deriva_ml/model/annotations.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
@dataclass
class OutboundFK:
    """An outbound foreign key path step for pseudo-column source paths.

    Use this when following a foreign key FROM the current table TO another table.
    This is common when displaying values from referenced tables.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Example:
        Show species name from a related Species table::

            >>> # Subject has FK to Species, display Species.Name
            >>> pc = PseudoColumn(
            ...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
            ...     markdown_name="Species"
            ... )

        Chain multiple outbound FKs::

            >>> # Image -> Subject -> Species
            >>> pc = PseudoColumn(
            ...     source=[
            ...         OutboundFK("domain", "Image_Subject_fkey"),
            ...         OutboundFK("domain", "Subject_Species_fkey"),
            ...         "Name"
            ...     ],
            ...     markdown_name="Species"
            ... )
    """
    schema: str
    constraint: str

    def to_dict(self) -> dict[str, list[str]]:
        return {"outbound": [self.schema, self.constraint]}

PreFormat dataclass

Pre-formatting options for column values.

Parameters:

Name Type Description Default
format str | None

Printf-style format string (e.g., "%.2f")

None
bool_true_value str | None

Display value for True

None
bool_false_value str | None

Display value for False

None
Source code in src/deriva_ml/model/annotations.py
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
@dataclass
class PreFormat:
    """Pre-formatting options for column values.

    Args:
        format: Printf-style format string (e.g., "%.2f")
        bool_true_value: Display value for True
        bool_false_value: Display value for False
    """
    format: str | None = None
    bool_true_value: str | None = None
    bool_false_value: str | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.format is not None:
            result["format"] = self.format
        if self.bool_true_value is not None:
            result["bool_true_value"] = self.bool_true_value
        if self.bool_false_value is not None:
            result["bool_false_value"] = self.bool_false_value
        return result

PseudoColumn dataclass

A pseudo-column definition for visible columns and foreign keys.

Pseudo-columns display computed values, values from related tables, or custom markdown patterns. They appear as columns in table views but are not actual database columns.

Parameters:

Name Type Description Default
source str | list[str | InboundFK | OutboundFK] | None

Path to source data. Can be: - A column name (string) - A list of FK path steps ending with a column name

None
sourcekey str | None

Reference to a named source in source-definitions annotation

None
markdown_name str | None

Display name for the column (supports markdown)

None
comment str | Literal[False] | None

Description/tooltip text (or False to hide)

None
entity bool | None

Whether this represents an entity (affects rendering)

None
aggregate Aggregate | None

Aggregation function when source returns multiple values

None
self_link bool | None

Make the value a link to the current row

None
display PseudoColumnDisplay | None

Display formatting options

None
array_options dict[str, Any] | None

Options for array aggregates (max_length, order)

None
Note

source and sourcekey are mutually exclusive. Use source for inline definitions, sourcekey to reference pre-defined sources.

Raises:

Type Description
ValueError

If both source and sourcekey are provided

Example

Simple column with custom display name::

>>> PseudoColumn(source="Internal_ID", markdown_name="ID")

Outbound FK traversal (display value from referenced table)::

>>> # Subject has FK to Species - show Species.Name
>>> PseudoColumn(
...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
...     markdown_name="Species"
... )

Inbound FK with aggregation (count related records)::

>>> # Count images pointing to this subject
>>> PseudoColumn(
...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
...     aggregate=Aggregate.CNT,
...     markdown_name="Images"
... )

Multi-hop FK path::

>>> # Image -> Subject -> Species
>>> PseudoColumn(
...     source=[
...         OutboundFK("domain", "Image_Subject_fkey"),
...         OutboundFK("domain", "Subject_Species_fkey"),
...         "Name"
...     ],
...     markdown_name="Species"
... )

With custom display formatting::

>>> PseudoColumn(
...     source="URL",
...     display=PseudoColumnDisplay(
...         markdown_pattern="[Download]({{{_value}}})",
...         show_foreign_key_link=False
...     )
... )

Array aggregate with display options::

>>> PseudoColumn(
...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
...     aggregate=Aggregate.ARRAY_D,
...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV),
...     markdown_name="Tags"
... )
Source code in src/deriva_ml/model/annotations.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
@dataclass
class PseudoColumn:
    """A pseudo-column definition for visible columns and foreign keys.

    Pseudo-columns display computed values, values from related tables,
    or custom markdown patterns. They appear as columns in table views
    but are not actual database columns.

    Args:
        source: Path to source data. Can be:
            - A column name (string)
            - A list of FK path steps ending with a column name
        sourcekey: Reference to a named source in source-definitions annotation
        markdown_name: Display name for the column (supports markdown)
        comment: Description/tooltip text (or False to hide)
        entity: Whether this represents an entity (affects rendering)
        aggregate: Aggregation function when source returns multiple values
        self_link: Make the value a link to the current row
        display: Display formatting options
        array_options: Options for array aggregates (max_length, order)

    Note:
        source and sourcekey are mutually exclusive. Use source for inline
        definitions, sourcekey to reference pre-defined sources.

    Raises:
        ValueError: If both source and sourcekey are provided

    Example:
        Simple column with custom display name::

            >>> PseudoColumn(source="Internal_ID", markdown_name="ID")

        Outbound FK traversal (display value from referenced table)::

            >>> # Subject has FK to Species - show Species.Name
            >>> PseudoColumn(
            ...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
            ...     markdown_name="Species"
            ... )

        Inbound FK with aggregation (count related records)::

            >>> # Count images pointing to this subject
            >>> PseudoColumn(
            ...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
            ...     aggregate=Aggregate.CNT,
            ...     markdown_name="Images"
            ... )

        Multi-hop FK path::

            >>> # Image -> Subject -> Species
            >>> PseudoColumn(
            ...     source=[
            ...         OutboundFK("domain", "Image_Subject_fkey"),
            ...         OutboundFK("domain", "Subject_Species_fkey"),
            ...         "Name"
            ...     ],
            ...     markdown_name="Species"
            ... )

        With custom display formatting::

            >>> PseudoColumn(
            ...     source="URL",
            ...     display=PseudoColumnDisplay(
            ...         markdown_pattern="[Download]({{{_value}}})",
            ...         show_foreign_key_link=False
            ...     )
            ... )

        Array aggregate with display options::

            >>> PseudoColumn(
            ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
            ...     aggregate=Aggregate.ARRAY_D,
            ...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV),
            ...     markdown_name="Tags"
            ... )
    """
    source: str | list[str | InboundFK | OutboundFK] | None = None
    sourcekey: str | None = None
    markdown_name: str | None = None
    comment: str | Literal[False] | None = None
    entity: bool | None = None
    aggregate: Aggregate | None = None
    self_link: bool | None = None
    display: PseudoColumnDisplay | None = None
    array_options: dict[str, Any] | None = None  # Can be complex

    def __post_init__(self):
        if self.source is not None and self.sourcekey is not None:
            raise ValueError("source and sourcekey are mutually exclusive")

    def to_dict(self) -> dict[str, Any]:
        result = {}

        if self.source is not None:
            if isinstance(self.source, str):
                result["source"] = self.source
            else:
                # Convert path elements
                result["source"] = [
                    item.to_dict() if hasattr(item, "to_dict") else item
                    for item in self.source
                ]

        if self.sourcekey is not None:
            result["sourcekey"] = self.sourcekey
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.comment is not None:
            result["comment"] = self.comment
        if self.entity is not None:
            result["entity"] = self.entity
        if self.aggregate is not None:
            result["aggregate"] = self.aggregate.value
        if self.self_link is not None:
            result["self_link"] = self.self_link
        if self.display is not None:
            result["display"] = self.display.to_dict()
        if self.array_options is not None:
            result["array_options"] = self.array_options

        return result

PseudoColumnDisplay dataclass

Display options for a pseudo-column.

Parameters:

Name Type Description Default
markdown_pattern str | None

Handlebars/mustache template

None
template_engine TemplateEngine | None

Template engine to use

None
show_foreign_key_link bool | None

Show as clickable link

None
array_ux_mode ArrayUxMode | None

How to render array values

None
column_order list[SortKey] | Literal[False] | None

Sort order for the column, or False to disable

None
wait_for list[str] | None

Template variables to wait for before rendering

None
Source code in src/deriva_ml/model/annotations.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
@dataclass
class PseudoColumnDisplay:
    """Display options for a pseudo-column.

    Args:
        markdown_pattern: Handlebars/mustache template
        template_engine: Template engine to use
        show_foreign_key_link: Show as clickable link
        array_ux_mode: How to render array values
        column_order: Sort order for the column, or False to disable
        wait_for: Template variables to wait for before rendering
    """
    markdown_pattern: str | None = None
    template_engine: TemplateEngine | None = None
    show_foreign_key_link: bool | None = None
    array_ux_mode: ArrayUxMode | None = None
    column_order: list[SortKey] | Literal[False] | None = None
    wait_for: list[str] | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.markdown_pattern is not None:
            result["markdown_pattern"] = self.markdown_pattern
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.show_foreign_key_link is not None:
            result["show_foreign_key_link"] = self.show_foreign_key_link
        if self.array_ux_mode is not None:
            result["array_ux_mode"] = self.array_ux_mode.value
        if self.column_order is not None:
            if self.column_order is False:
                result["column_order"] = False
            else:
                result["column_order"] = [
                    k.to_dict() if isinstance(k, SortKey) else k
                    for k in self.column_order
                ]
        if self.wait_for is not None:
            result["wait_for"] = self.wait_for
        return result

SchemaBuilder

Creates SQLAlchemy ORM from a Deriva catalog model.

Phase 1 of the two-phase database creation pattern. This class handles only schema/ORM creation - no data loading.

The Model can come from either a live catalog or a schema.json file: - From catalog: model = catalog.getCatalogModel() - From file: model = Model.fromfile("file-system", "path/to/schema.json")

Example

Create ORM from catalog model

model = catalog.getCatalogModel() builder = SchemaBuilder(model, schemas=['domain', 'deriva-ml']) orm = builder.build()

Create ORM from schema file

model = Model.fromfile("file-system", "schema.json") builder = SchemaBuilder(model, schemas=['domain'], database_path="local.db") orm = builder.build()

Use the ORM

ImageClass = orm.get_orm_class("Image") with Session(orm.engine) as session: images = session.query(ImageClass).all()

Clean up

orm.dispose()

Source code in src/deriva_ml/model/schema_builder.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
class SchemaBuilder:
    """Creates SQLAlchemy ORM from a Deriva catalog model.

    Phase 1 of the two-phase database creation pattern. This class handles
    only schema/ORM creation - no data loading.

    The Model can come from either a live catalog or a schema.json file:
    - From catalog: model = catalog.getCatalogModel()
    - From file: model = Model.fromfile("file-system", "path/to/schema.json")

    Example:
        # Create ORM from catalog model
        model = catalog.getCatalogModel()
        builder = SchemaBuilder(model, schemas=['domain', 'deriva-ml'])
        orm = builder.build()

        # Create ORM from schema file
        model = Model.fromfile("file-system", "schema.json")
        builder = SchemaBuilder(model, schemas=['domain'], database_path="local.db")
        orm = builder.build()

        # Use the ORM
        ImageClass = orm.get_orm_class("Image")
        with Session(orm.engine) as session:
            images = session.query(ImageClass).all()

        # Clean up
        orm.dispose()
    """

    # Type mapping from ERMrest to SQLAlchemy
    _TYPE_MAP = {
        "boolean": ERMRestBoolean,
        "date": StringToDate,
        "float4": StringToFloat,
        "float8": StringToFloat,
        "int2": StringToInteger,
        "int4": StringToInteger,
        "int8": StringToInteger,
        "json": JSON,
        "jsonb": JSON,
        "timestamptz": StringToDateTime,
        "timestamp": StringToDateTime,
    }

    def __init__(
        self,
        model: Model,
        schemas: list[str],
        database_path: Path | str = ":memory:",
    ):
        """Initialize the schema builder.

        Args:
            model: ERMrest Model object (from catalog or schema.json file).
            schemas: List of schema names to include in the ORM.
            database_path: Path to SQLite database file. Use ":memory:" for
                in-memory database (default). If a Path or string is provided,
                separate .db files will be created for each schema.
        """
        self.model = model
        self.schemas = schemas
        self.database_path = Path(database_path) if database_path != ":memory:" else database_path

        # Will be set during build()
        self.engine: Engine | None = None
        self.metadata: MetaData | None = None
        self.Base: AutomapBase | None = None
        self._class_prefix: str = ""

    @staticmethod
    def _sql_type(deriva_type: DerivaType) -> TypeEngine:
        """Map ERMrest type to SQLAlchemy type with CSV string conversion.

        Args:
            deriva_type: ERMrest type object.

        Returns:
            SQLAlchemy type class.
        """
        return SchemaBuilder._TYPE_MAP.get(deriva_type.typename, String)

    def _is_key_column(self, column: DerivaColumn, table: DerivaTable) -> bool:
        """Check if column is the primary key (RID)."""
        return column in [key.unique_columns[0] for key in table.keys] and column.name == "RID"

    def build(self) -> SchemaORM:
        """Build the SQLAlchemy ORM structure.

        Creates SQLite tables from the ERMrest schema and generates
        ORM classes via SQLAlchemy automap.

        Returns:
            SchemaORM object containing engine, metadata, Base, and utilities.

        Note:
            In-memory databases (database_path=":memory:") do not support
            SQLite schema attachments, so all tables will be created in a
            single database without schema prefixes in table names.
        """
        # Create unique prefix for ORM class names
        self._class_prefix = f"_{id(self)}_"

        # Determine if we're using in-memory or file-based database
        self._use_schemas = self.database_path != ":memory:"

        # Create engine
        if self.database_path == ":memory:":
            self.engine = create_engine("sqlite:///:memory:", future=True)
        else:
            # Ensure the database path exists
            if isinstance(self.database_path, Path):
                if self.database_path.suffix == ".db":
                    # Single file path
                    self.database_path.parent.mkdir(parents=True, exist_ok=True)
                    main_db = self.database_path
                else:
                    # Directory path
                    self.database_path.mkdir(parents=True, exist_ok=True)
                    main_db = self.database_path / "main.db"
            else:
                main_db = Path(self.database_path)
                main_db.parent.mkdir(parents=True, exist_ok=True)

            self.engine = create_engine(f"sqlite:///{main_db.resolve()}", future=True)

            # Attach schema-specific databases
            event.listen(self.engine, "connect", self._attach_schemas)

        self.metadata = MetaData()
        self.Base = automap_base(metadata=self.metadata)

        # Build the schema
        self._create_tables()

        logger.info(
            "Built ORM for schemas %s with %d tables",
            self.schemas,
            len(self.metadata.tables),
        )

        return SchemaORM(
            engine=self.engine,
            metadata=self.metadata,
            Base=self.Base,
            model=self.model,
            schemas=self.schemas,
            class_prefix=self._class_prefix,
            use_schemas=self._use_schemas,
        )

    def _attach_schemas(self, dbapi_conn, _conn_record):
        """Attach schema-specific SQLite databases."""
        cur = dbapi_conn.cursor()
        db_dir = self.database_path if self.database_path.is_dir() else self.database_path.parent
        for schema in self.schemas:
            schema_file = (db_dir / f"{schema}.db").resolve()
            cur.execute(f"ATTACH DATABASE '{schema_file}' AS '{schema}'")
        cur.close()

    def _create_tables(self) -> None:
        """Create SQLite tables from the ERMrest schema."""

        def col(model, name: str):
            """Get column from ORM class, handling both attribute and table column access."""
            try:
                return getattr(model, name).property.columns[0]
            except AttributeError:
                return model.__table__.c[name]

        def guess_attr_name(col_name: str) -> str:
            """Generate relationship attribute name from column name."""
            return col_name[:-3] if col_name.lower().endswith("_id") else col_name

        def make_table_name(schema_name: str, table_name: str) -> str:
            """Generate table name, including schema prefix if using schemas."""
            if self._use_schemas:
                return f"{schema_name}.{table_name}"
            else:
                # For in-memory, use underscore separator to avoid conflicts
                return f"{schema_name}_{table_name}"

        database_tables: list[SQLTable] = []

        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                logger.warning(f"Schema {schema_name} not found in model")
                continue

            for table in self.model.schemas[schema_name].tables.values():
                database_columns: list[SQLColumn] = []

                for c in table.columns:
                    database_column = SQLColumn(
                        name=c.name,
                        type_=self._sql_type(c.type),
                        comment=c.comment,
                        default=c.default,
                        primary_key=self._is_key_column(c, table),
                        nullable=c.nullok,
                    )
                    database_columns.append(database_column)

                # Use schema prefix only for file-based databases
                if self._use_schemas:
                    database_table = SQLTable(
                        table.name, self.metadata, *database_columns, schema=schema_name
                    )
                else:
                    # For in-memory, embed schema in table name
                    full_name = f"{schema_name}_{table.name}".replace("-", "_")
                    database_table = SQLTable(
                        full_name, self.metadata, *database_columns
                    )

                # Add unique constraints
                for key in table.keys:
                    key_columns = [c.name for c in key.unique_columns]
                    database_table.append_constraint(
                        SQLUniqueConstraint(*key_columns, name=key.name[1])
                    )

                # Add foreign key constraints (within same schema only for now)
                for fk in table.foreign_keys:
                    if fk.pk_table.schema.name not in self.schemas:
                        continue
                    if fk.pk_table.schema.name != schema_name:
                        continue

                    # Build reference column names
                    if self._use_schemas:
                        refcols = [
                            f"{schema_name}.{c.table.name}.{c.name}"
                            for c in fk.referenced_columns
                        ]
                    else:
                        # For in-memory, use the embedded schema name
                        ref_table_name = f"{schema_name}_{fk.pk_table.name}".replace("-", "_")
                        refcols = [
                            f"{ref_table_name}.{c.name}"
                            for c in fk.referenced_columns
                        ]

                    database_table.append_constraint(
                        SQLForeignKeyConstraint(
                            columns=[f"{c.name}" for c in fk.foreign_key_columns],
                            refcolumns=refcols,
                            name=fk.name[1],
                            comment=fk.comment,
                        )
                    )

                database_tables.append(database_table)

        # Create all tables
        with self.engine.begin() as conn:
            self.metadata.create_all(conn, tables=database_tables, checkfirst=True)

        # Configure ORM class naming
        def name_for_scalar_relationship(_base, local_cls, referred_cls, constraint):
            cols = list(constraint.columns) if constraint is not None else []
            if len(cols) == 1:
                name = cols[0].key
                if name in {c.key for c in local_cls.__table__.columns}:
                    name += "_rel"
                return name
            return constraint.name or referred_cls.__name__.lower()

        def name_for_collection_relationship(_base, local_cls, referred_cls, constraint):
            backref_name = constraint.name.replace("_fkey", "_collection")
            return backref_name or (referred_cls.__name__.lower() + "_collection")

        def classname_for_table(_base, tablename, table):
            return self._class_prefix + tablename.replace(".", "_").replace("-", "_")

        # Build ORM mappings
        self.Base.prepare(
            self.engine,
            name_for_scalar_relationship=name_for_scalar_relationship,
            name_for_collection_relationship=name_for_collection_relationship,
            classname_for_table=classname_for_table,
            reflect=True,
        )

        # Add cross-schema relationships
        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue

            for table in self.model.schemas[schema_name].tables.values():
                for fk in table.foreign_keys:
                    if fk.pk_table.schema.name not in self.schemas:
                        continue
                    if fk.pk_table.schema.name == schema_name:
                        continue

                    table_name = make_table_name(schema_name, table.name)
                    table_class = self._get_orm_class_by_name(table_name)
                    foreign_key_column_name = fk.foreign_key_columns[0].name
                    foreign_key_column = col(table_class, foreign_key_column_name)

                    referenced_table_name = make_table_name(fk.pk_table.schema.name, fk.pk_table.name)
                    referenced_class = self._get_orm_class_by_name(referenced_table_name)
                    referenced_column = col(referenced_class, fk.referenced_columns[0].name)

                    relationship_attr = guess_attr_name(foreign_key_column_name)
                    backref_attr = fk.name[1].replace("_fkey", "_collection")

                    # Check if relationship already exists
                    existing_attr = getattr(table_class, relationship_attr, None)
                    from sqlalchemy.orm import RelationshipProperty
                    from sqlalchemy.orm.attributes import InstrumentedAttribute

                    is_relationship = isinstance(existing_attr, InstrumentedAttribute) and isinstance(
                        existing_attr.property, RelationshipProperty
                    )
                    if not is_relationship:
                        setattr(
                            table_class,
                            relationship_attr,
                            relationship(
                                referenced_class,
                                foreign_keys=[foreign_key_column],
                                primaryjoin=foreign(foreign_key_column) == referenced_column,
                                backref=backref(backref_attr, viewonly=True),
                                viewonly=True,
                            ),
                        )

        # Configure mappers
        self.Base.registry.configure()

    def _get_orm_class_by_name(self, table_name: str) -> Any | None:
        """Get ORM class by table name (internal use during build).

        Handles both schema.table format (file-based) and schema_table format (in-memory).
        """
        # Try exact match first
        if table_name in self.metadata.tables:
            sql_table = self.metadata.tables[table_name]
        else:
            # For in-memory databases, table names use underscore separator
            # Try converting schema.table to schema_table format
            if "." in table_name and not self._use_schemas:
                converted_name = table_name.replace(".", "_").replace("-", "_")
                if converted_name in self.metadata.tables:
                    sql_table = self.metadata.tables[converted_name]
                else:
                    sql_table = None
            else:
                # Try matching just the table name part
                sql_table = None
                for full_name, table in self.metadata.tables.items():
                    # Handle both . and _ separators
                    table_part = full_name.split(".")[-1] if "." in full_name else full_name.split("_", 1)[-1] if "_" in full_name else full_name
                    if table_part == table_name or full_name.endswith(f"_{table_name}"):
                        sql_table = table
                        break

        if sql_table is None:
            raise KeyError(f"Table {table_name} not found")

        for mapper in self.Base.registry.mappers:
            if mapper.persist_selectable is sql_table or sql_table in mapper.tables:
                return mapper.class_
        return None

__init__

__init__(
    model: Model,
    schemas: list[str],
    database_path: Path
    | str = ":memory:",
)

Initialize the schema builder.

Parameters:

Name Type Description Default
model Model

ERMrest Model object (from catalog or schema.json file).

required
schemas list[str]

List of schema names to include in the ORM.

required
database_path Path | str

Path to SQLite database file. Use ":memory:" for in-memory database (default). If a Path or string is provided, separate .db files will be created for each schema.

':memory:'
Source code in src/deriva_ml/model/schema_builder.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def __init__(
    self,
    model: Model,
    schemas: list[str],
    database_path: Path | str = ":memory:",
):
    """Initialize the schema builder.

    Args:
        model: ERMrest Model object (from catalog or schema.json file).
        schemas: List of schema names to include in the ORM.
        database_path: Path to SQLite database file. Use ":memory:" for
            in-memory database (default). If a Path or string is provided,
            separate .db files will be created for each schema.
    """
    self.model = model
    self.schemas = schemas
    self.database_path = Path(database_path) if database_path != ":memory:" else database_path

    # Will be set during build()
    self.engine: Engine | None = None
    self.metadata: MetaData | None = None
    self.Base: AutomapBase | None = None
    self._class_prefix: str = ""

build

build() -> SchemaORM

Build the SQLAlchemy ORM structure.

Creates SQLite tables from the ERMrest schema and generates ORM classes via SQLAlchemy automap.

Returns:

Type Description
SchemaORM

SchemaORM object containing engine, metadata, Base, and utilities.

Note

In-memory databases (database_path=":memory:") do not support SQLite schema attachments, so all tables will be created in a single database without schema prefixes in table names.

Source code in src/deriva_ml/model/schema_builder.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def build(self) -> SchemaORM:
    """Build the SQLAlchemy ORM structure.

    Creates SQLite tables from the ERMrest schema and generates
    ORM classes via SQLAlchemy automap.

    Returns:
        SchemaORM object containing engine, metadata, Base, and utilities.

    Note:
        In-memory databases (database_path=":memory:") do not support
        SQLite schema attachments, so all tables will be created in a
        single database without schema prefixes in table names.
    """
    # Create unique prefix for ORM class names
    self._class_prefix = f"_{id(self)}_"

    # Determine if we're using in-memory or file-based database
    self._use_schemas = self.database_path != ":memory:"

    # Create engine
    if self.database_path == ":memory:":
        self.engine = create_engine("sqlite:///:memory:", future=True)
    else:
        # Ensure the database path exists
        if isinstance(self.database_path, Path):
            if self.database_path.suffix == ".db":
                # Single file path
                self.database_path.parent.mkdir(parents=True, exist_ok=True)
                main_db = self.database_path
            else:
                # Directory path
                self.database_path.mkdir(parents=True, exist_ok=True)
                main_db = self.database_path / "main.db"
        else:
            main_db = Path(self.database_path)
            main_db.parent.mkdir(parents=True, exist_ok=True)

        self.engine = create_engine(f"sqlite:///{main_db.resolve()}", future=True)

        # Attach schema-specific databases
        event.listen(self.engine, "connect", self._attach_schemas)

    self.metadata = MetaData()
    self.Base = automap_base(metadata=self.metadata)

    # Build the schema
    self._create_tables()

    logger.info(
        "Built ORM for schemas %s with %d tables",
        self.schemas,
        len(self.metadata.tables),
    )

    return SchemaORM(
        engine=self.engine,
        metadata=self.metadata,
        Base=self.Base,
        model=self.model,
        schemas=self.schemas,
        class_prefix=self._class_prefix,
        use_schemas=self._use_schemas,
    )

SchemaORM

Container for SQLAlchemy ORM components.

Provides access to the ORM structure and utility methods for table/class lookup. This is the result of Phase 1 (SchemaBuilder).

Attributes:

Name Type Description
engine

SQLAlchemy Engine for database connections.

metadata

SQLAlchemy MetaData with table definitions.

Base

SQLAlchemy automap base for ORM classes.

model

ERMrest Model the ORM was built from.

schemas

List of schema names included.

use_schemas

Whether schema prefixes are used (False for in-memory).

Source code in src/deriva_ml/model/schema_builder.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class SchemaORM:
    """Container for SQLAlchemy ORM components.

    Provides access to the ORM structure and utility methods for
    table/class lookup. This is the result of Phase 1 (SchemaBuilder).

    Attributes:
        engine: SQLAlchemy Engine for database connections.
        metadata: SQLAlchemy MetaData with table definitions.
        Base: SQLAlchemy automap base for ORM classes.
        model: ERMrest Model the ORM was built from.
        schemas: List of schema names included.
        use_schemas: Whether schema prefixes are used (False for in-memory).
    """

    def __init__(
        self,
        engine: Engine,
        metadata: MetaData,
        Base: AutomapBase,
        model: Model,
        schemas: list[str],
        class_prefix: str,
        use_schemas: bool = True,
    ):
        """Initialize SchemaORM container.

        Args:
            engine: SQLAlchemy Engine.
            metadata: SQLAlchemy MetaData with tables.
            Base: Automap base with ORM classes.
            model: Source ERMrest Model.
            schemas: Schemas that were included.
            class_prefix: Prefix used for ORM class names.
            use_schemas: Whether schema prefixes are used (False for in-memory).
        """
        self.engine = engine
        self.metadata = metadata
        self.Base = Base
        self.model = model
        self.schemas = schemas
        self._class_prefix = class_prefix
        self._use_schemas = use_schemas
        self._disposed = False

    def list_tables(self) -> list[str]:
        """List all tables in the database.

        Returns:
            List of fully-qualified table names (schema.table), sorted.
        """
        tables = list(self.metadata.tables.keys())
        tables.sort()
        return tables

    def find_table(self, table_name: str) -> SQLTable:
        """Find a table by name.

        Handles both schema.table format and schema_table format (for in-memory databases).

        Args:
            table_name: Table name, with or without schema prefix.
                Can be "schema.table", "schema_table", or just "table".

        Returns:
            SQLAlchemy Table object.

        Raises:
            KeyError: If table not found.
        """
        # Try exact match first
        if table_name in self.metadata.tables:
            return self.metadata.tables[table_name]

        # Try converting schema.table to schema_table format (for in-memory)
        if "." in table_name and not self._use_schemas:
            converted_name = table_name.replace(".", "_").replace("-", "_")
            if converted_name in self.metadata.tables:
                return self.metadata.tables[converted_name]

        # Try matching just the table name part
        for full_name, table in self.metadata.tables.items():
            # Handle . separator (file-based)
            if "." in full_name and full_name.split(".")[-1] == table_name:
                return table
            # Handle _ separator (in-memory) - match suffix after first _
            if "_" in full_name and "." not in full_name:
                # Check if table_name matches the part after schema prefix
                parts = full_name.split("_", 1)
                if len(parts) > 1 and parts[1] == table_name:
                    return table
                # Also check if it ends with the table name
                if full_name.endswith(f"_{table_name}"):
                    return table

        raise KeyError(f"Table {table_name} not found")

    def get_orm_class(self, table_name: str) -> Any | None:
        """Get the ORM class for a table by name.

        Args:
            table_name: Table name, with or without schema prefix.

        Returns:
            SQLAlchemy ORM class for the table.

        Raises:
            KeyError: If table not found.
        """
        sql_table = self.find_table(table_name)
        return self.get_orm_class_for_table(sql_table)

    def get_orm_class_for_table(self, table: SQLTable | DerivaTable | str) -> Any | None:
        """Get the ORM class for a table.

        Args:
            table: SQLAlchemy Table, Deriva Table, or table name.

        Returns:
            SQLAlchemy ORM class, or None if not found.
        """
        if isinstance(table, DerivaTable):
            # Try schema.table format first (file-based), then schema_table (in-memory)
            table_key = f"{table.schema.name}.{table.name}"
            table = self.metadata.tables.get(table_key)
            if table is None and not self._use_schemas:
                # Try underscore format for in-memory databases
                table_key = f"{table.schema.name}_{table.name}".replace("-", "_")
                table = self.metadata.tables.get(table_key)
        if isinstance(table, str):
            table = self.find_table(table)
        if table is None:
            return None

        for mapper in self.Base.registry.mappers:
            if mapper.persist_selectable is table or table in mapper.tables:
                return mapper.class_
        return None

    def get_table_contents(self, table: str) -> Generator[dict[str, Any], None, None]:
        """Retrieve all rows from a table as dictionaries.

        Args:
            table: Table name (with or without schema prefix).

        Yields:
            Dictionary for each row with column names as keys.
        """
        sql_table = self.find_table(table)
        with self.engine.connect() as conn:
            result = conn.execute(select(sql_table))
            for row in result.mappings():
                yield dict(row)

    @staticmethod
    def is_association_table(
        table_class,
        min_arity: int = 2,
        max_arity: int = 2,
        unqualified: bool = True,
        pure: bool = True,
        no_overlap: bool = True,
        return_fkeys: bool = False,
    ):
        """Check if an ORM class represents an association table.

        An association table links two or more tables through foreign keys,
        with a composite unique key covering those foreign keys.

        Args:
            table_class: SQLAlchemy ORM class to check.
            min_arity: Minimum number of foreign keys (default 2).
            max_arity: Maximum number of foreign keys (default 2).
            unqualified: If True, reject associations with extra key columns.
            pure: If True, reject associations with extra non-key columns.
            no_overlap: If True, reject associations with shared FK columns.
            return_fkeys: If True, return the foreign keys instead of arity.

        Returns:
            If return_fkeys=False: Integer arity if association, False otherwise.
            If return_fkeys=True: Set of foreign keys if association, False otherwise.
        """
        if min_arity < 2:
            raise ValueError("An association cannot have arity < 2")
        if max_arity is not None and max_arity < min_arity:
            raise ValueError("max_arity cannot be less than min_arity")

        mapper = inspect(table_class).mapper
        system_cols = {"RID", "RCT", "RMT", "RCB", "RMB"}

        non_sys_cols = {
            col.name for col in mapper.columns if col.name not in system_cols
        }

        unique_columns = [
            {c.name for c in constraint.columns}
            for constraint in inspect(table_class).local_table.constraints
            if isinstance(constraint, SQLUniqueConstraint)
        ]

        non_sys_key_colsets = {
            frozenset(uc)
            for uc in unique_columns
            if uc.issubset(non_sys_cols) and len(uc) > 1
        }

        if not non_sys_key_colsets:
            return False

        # Choose longest compound key
        row_key = sorted(non_sys_key_colsets, key=lambda s: len(s), reverse=True)[0]
        foreign_keys = list(inspect(table_class).relationships.values())

        covered_fkeys = {
            fkey for fkey in foreign_keys
            if {c.name for c in fkey.local_columns}.issubset(row_key)
        }
        covered_fkey_cols = set()

        if len(covered_fkeys) < min_arity:
            return False
        if max_arity is not None and len(covered_fkeys) > max_arity:
            return False

        for fkey in covered_fkeys:
            fkcols = {c.name for c in fkey.local_columns}
            if no_overlap and fkcols.intersection(covered_fkey_cols):
                return False
            covered_fkey_cols.update(fkcols)

        if unqualified and row_key.difference(covered_fkey_cols):
            return False

        if pure and non_sys_cols.difference(row_key):
            return False

        return covered_fkeys if return_fkeys else len(covered_fkeys)

    def get_association_class(
        self,
        left_cls: Type[Any],
        right_cls: Type[Any],
    ) -> tuple[Any, Any, Any] | None:
        """Find an association class connecting two ORM classes.

        Args:
            left_cls: First ORM class.
            right_cls: Second ORM class.

        Returns:
            Tuple of (association_class, left_relationship, right_relationship),
            or None if no association found.
        """
        for _, left_rel in inspect(left_cls).relationships.items():
            mid_cls = left_rel.mapper.class_
            is_assoc = self.is_association_table(mid_cls, return_fkeys=True)

            if not is_assoc:
                continue

            assoc_local_columns_left = list(is_assoc)[0].local_columns
            assoc_local_columns_right = list(is_assoc)[1].local_columns

            found_left = found_right = False

            for r in inspect(left_cls).relationships.values():
                remote_side = list(r.remote_side)[0]
                if remote_side in assoc_local_columns_left:
                    found_left = r
                if remote_side in assoc_local_columns_right:
                    found_left = r
                    # Swap if backwards
                    assoc_local_columns_left, assoc_local_columns_right = (
                        assoc_local_columns_right,
                        assoc_local_columns_left,
                    )

            for r in inspect(right_cls).relationships.values():
                remote_side = list(r.remote_side)[0]
                if remote_side in assoc_local_columns_right:
                    found_right = r

            if found_left and found_right:
                return mid_cls, found_left.class_attribute, found_right.class_attribute

        return None

    def dispose(self) -> None:
        """Dispose of SQLAlchemy resources.

        Call this when done with the database to properly clean up connections.
        After calling dispose(), the instance should not be used further.
        """
        if self._disposed:
            return

        if hasattr(self, "Base") and self.Base is not None:
            self.Base.registry.dispose()
        if hasattr(self, "engine") and self.engine is not None:
            self.engine.dispose()

        self._disposed = True

    def __del__(self) -> None:
        """Cleanup resources when garbage collected.

        Best-effort. ``__del__`` runs at unpredictable points, including
        interpreter shutdown when SQLAlchemy module-level globals
        (registries, engines) may already be partially torn down. In
        that race we'd see ``AttributeError: 'NoneType' object has no
        attribute '_dispose_registries'`` printed via ``Exception
        ignored in:`` — benign but noisy enough to make every short
        script look like it failed. Swallow everything here; the
        explicit ``dispose()`` callable from ``__exit__`` and from
        callers still raises normally.
        """
        try:
            self.dispose()
        except Exception:
            pass

    def __enter__(self) -> "SchemaORM":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        """Context manager exit - dispose resources."""
        self.dispose()
        return False

__del__

__del__() -> None

Cleanup resources when garbage collected.

Best-effort. __del__ runs at unpredictable points, including interpreter shutdown when SQLAlchemy module-level globals (registries, engines) may already be partially torn down. In that race we'd see AttributeError: 'NoneType' object has no attribute '_dispose_registries' printed via Exception ignored in: — benign but noisy enough to make every short script look like it failed. Swallow everything here; the explicit dispose() callable from __exit__ and from callers still raises normally.

Source code in src/deriva_ml/model/schema_builder.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def __del__(self) -> None:
    """Cleanup resources when garbage collected.

    Best-effort. ``__del__`` runs at unpredictable points, including
    interpreter shutdown when SQLAlchemy module-level globals
    (registries, engines) may already be partially torn down. In
    that race we'd see ``AttributeError: 'NoneType' object has no
    attribute '_dispose_registries'`` printed via ``Exception
    ignored in:`` — benign but noisy enough to make every short
    script look like it failed. Swallow everything here; the
    explicit ``dispose()`` callable from ``__exit__`` and from
    callers still raises normally.
    """
    try:
        self.dispose()
    except Exception:
        pass

__enter__

__enter__() -> 'SchemaORM'

Context manager entry.

Source code in src/deriva_ml/model/schema_builder.py
451
452
453
def __enter__(self) -> "SchemaORM":
    """Context manager entry."""
    return self

__exit__

__exit__(
    exc_type, exc_val, exc_tb
) -> bool

Context manager exit - dispose resources.

Source code in src/deriva_ml/model/schema_builder.py
455
456
457
458
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
    """Context manager exit - dispose resources."""
    self.dispose()
    return False

__init__

__init__(
    engine: Engine,
    metadata: MetaData,
    Base: AutomapBase,
    model: Model,
    schemas: list[str],
    class_prefix: str,
    use_schemas: bool = True,
)

Initialize SchemaORM container.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy Engine.

required
metadata MetaData

SQLAlchemy MetaData with tables.

required
Base AutomapBase

Automap base with ORM classes.

required
model Model

Source ERMrest Model.

required
schemas list[str]

Schemas that were included.

required
class_prefix str

Prefix used for ORM class names.

required
use_schemas bool

Whether schema prefixes are used (False for in-memory).

True
Source code in src/deriva_ml/model/schema_builder.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(
    self,
    engine: Engine,
    metadata: MetaData,
    Base: AutomapBase,
    model: Model,
    schemas: list[str],
    class_prefix: str,
    use_schemas: bool = True,
):
    """Initialize SchemaORM container.

    Args:
        engine: SQLAlchemy Engine.
        metadata: SQLAlchemy MetaData with tables.
        Base: Automap base with ORM classes.
        model: Source ERMrest Model.
        schemas: Schemas that were included.
        class_prefix: Prefix used for ORM class names.
        use_schemas: Whether schema prefixes are used (False for in-memory).
    """
    self.engine = engine
    self.metadata = metadata
    self.Base = Base
    self.model = model
    self.schemas = schemas
    self._class_prefix = class_prefix
    self._use_schemas = use_schemas
    self._disposed = False

dispose

dispose() -> None

Dispose of SQLAlchemy resources.

Call this when done with the database to properly clean up connections. After calling dispose(), the instance should not be used further.

Source code in src/deriva_ml/model/schema_builder.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def dispose(self) -> None:
    """Dispose of SQLAlchemy resources.

    Call this when done with the database to properly clean up connections.
    After calling dispose(), the instance should not be used further.
    """
    if self._disposed:
        return

    if hasattr(self, "Base") and self.Base is not None:
        self.Base.registry.dispose()
    if hasattr(self, "engine") and self.engine is not None:
        self.engine.dispose()

    self._disposed = True

find_table

find_table(table_name: str) -> SQLTable

Find a table by name.

Handles both schema.table format and schema_table format (for in-memory databases).

Parameters:

Name Type Description Default
table_name str

Table name, with or without schema prefix. Can be "schema.table", "schema_table", or just "table".

required

Returns:

Type Description
Table

SQLAlchemy Table object.

Raises:

Type Description
KeyError

If table not found.

Source code in src/deriva_ml/model/schema_builder.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def find_table(self, table_name: str) -> SQLTable:
    """Find a table by name.

    Handles both schema.table format and schema_table format (for in-memory databases).

    Args:
        table_name: Table name, with or without schema prefix.
            Can be "schema.table", "schema_table", or just "table".

    Returns:
        SQLAlchemy Table object.

    Raises:
        KeyError: If table not found.
    """
    # Try exact match first
    if table_name in self.metadata.tables:
        return self.metadata.tables[table_name]

    # Try converting schema.table to schema_table format (for in-memory)
    if "." in table_name and not self._use_schemas:
        converted_name = table_name.replace(".", "_").replace("-", "_")
        if converted_name in self.metadata.tables:
            return self.metadata.tables[converted_name]

    # Try matching just the table name part
    for full_name, table in self.metadata.tables.items():
        # Handle . separator (file-based)
        if "." in full_name and full_name.split(".")[-1] == table_name:
            return table
        # Handle _ separator (in-memory) - match suffix after first _
        if "_" in full_name and "." not in full_name:
            # Check if table_name matches the part after schema prefix
            parts = full_name.split("_", 1)
            if len(parts) > 1 and parts[1] == table_name:
                return table
            # Also check if it ends with the table name
            if full_name.endswith(f"_{table_name}"):
                return table

    raise KeyError(f"Table {table_name} not found")

get_association_class

get_association_class(
    left_cls: Type[Any],
    right_cls: Type[Any],
) -> tuple[Any, Any, Any] | None

Find an association class connecting two ORM classes.

Parameters:

Name Type Description Default
left_cls Type[Any]

First ORM class.

required
right_cls Type[Any]

Second ORM class.

required

Returns:

Type Description
tuple[Any, Any, Any] | None

Tuple of (association_class, left_relationship, right_relationship),

tuple[Any, Any, Any] | None

or None if no association found.

Source code in src/deriva_ml/model/schema_builder.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def get_association_class(
    self,
    left_cls: Type[Any],
    right_cls: Type[Any],
) -> tuple[Any, Any, Any] | None:
    """Find an association class connecting two ORM classes.

    Args:
        left_cls: First ORM class.
        right_cls: Second ORM class.

    Returns:
        Tuple of (association_class, left_relationship, right_relationship),
        or None if no association found.
    """
    for _, left_rel in inspect(left_cls).relationships.items():
        mid_cls = left_rel.mapper.class_
        is_assoc = self.is_association_table(mid_cls, return_fkeys=True)

        if not is_assoc:
            continue

        assoc_local_columns_left = list(is_assoc)[0].local_columns
        assoc_local_columns_right = list(is_assoc)[1].local_columns

        found_left = found_right = False

        for r in inspect(left_cls).relationships.values():
            remote_side = list(r.remote_side)[0]
            if remote_side in assoc_local_columns_left:
                found_left = r
            if remote_side in assoc_local_columns_right:
                found_left = r
                # Swap if backwards
                assoc_local_columns_left, assoc_local_columns_right = (
                    assoc_local_columns_right,
                    assoc_local_columns_left,
                )

        for r in inspect(right_cls).relationships.values():
            remote_side = list(r.remote_side)[0]
            if remote_side in assoc_local_columns_right:
                found_right = r

        if found_left and found_right:
            return mid_cls, found_left.class_attribute, found_right.class_attribute

    return None

get_orm_class

get_orm_class(
    table_name: str,
) -> Any | None

Get the ORM class for a table by name.

Parameters:

Name Type Description Default
table_name str

Table name, with or without schema prefix.

required

Returns:

Type Description
Any | None

SQLAlchemy ORM class for the table.

Raises:

Type Description
KeyError

If table not found.

Source code in src/deriva_ml/model/schema_builder.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def get_orm_class(self, table_name: str) -> Any | None:
    """Get the ORM class for a table by name.

    Args:
        table_name: Table name, with or without schema prefix.

    Returns:
        SQLAlchemy ORM class for the table.

    Raises:
        KeyError: If table not found.
    """
    sql_table = self.find_table(table_name)
    return self.get_orm_class_for_table(sql_table)

get_orm_class_for_table

get_orm_class_for_table(
    table: Table | Table | str,
) -> Any | None

Get the ORM class for a table.

Parameters:

Name Type Description Default
table Table | Table | str

SQLAlchemy Table, Deriva Table, or table name.

required

Returns:

Type Description
Any | None

SQLAlchemy ORM class, or None if not found.

Source code in src/deriva_ml/model/schema_builder.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_orm_class_for_table(self, table: SQLTable | DerivaTable | str) -> Any | None:
    """Get the ORM class for a table.

    Args:
        table: SQLAlchemy Table, Deriva Table, or table name.

    Returns:
        SQLAlchemy ORM class, or None if not found.
    """
    if isinstance(table, DerivaTable):
        # Try schema.table format first (file-based), then schema_table (in-memory)
        table_key = f"{table.schema.name}.{table.name}"
        table = self.metadata.tables.get(table_key)
        if table is None and not self._use_schemas:
            # Try underscore format for in-memory databases
            table_key = f"{table.schema.name}_{table.name}".replace("-", "_")
            table = self.metadata.tables.get(table_key)
    if isinstance(table, str):
        table = self.find_table(table)
    if table is None:
        return None

    for mapper in self.Base.registry.mappers:
        if mapper.persist_selectable is table or table in mapper.tables:
            return mapper.class_
    return None

get_table_contents

get_table_contents(
    table: str,
) -> Generator[
    dict[str, Any], None, None
]

Retrieve all rows from a table as dictionaries.

Parameters:

Name Type Description Default
table str

Table name (with or without schema prefix).

required

Yields:

Type Description
dict[str, Any]

Dictionary for each row with column names as keys.

Source code in src/deriva_ml/model/schema_builder.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_table_contents(self, table: str) -> Generator[dict[str, Any], None, None]:
    """Retrieve all rows from a table as dictionaries.

    Args:
        table: Table name (with or without schema prefix).

    Yields:
        Dictionary for each row with column names as keys.
    """
    sql_table = self.find_table(table)
    with self.engine.connect() as conn:
        result = conn.execute(select(sql_table))
        for row in result.mappings():
            yield dict(row)

is_association_table staticmethod

is_association_table(
    table_class,
    min_arity: int = 2,
    max_arity: int = 2,
    unqualified: bool = True,
    pure: bool = True,
    no_overlap: bool = True,
    return_fkeys: bool = False,
)

Check if an ORM class represents an association table.

An association table links two or more tables through foreign keys, with a composite unique key covering those foreign keys.

Parameters:

Name Type Description Default
table_class

SQLAlchemy ORM class to check.

required
min_arity int

Minimum number of foreign keys (default 2).

2
max_arity int

Maximum number of foreign keys (default 2).

2
unqualified bool

If True, reject associations with extra key columns.

True
pure bool

If True, reject associations with extra non-key columns.

True
no_overlap bool

If True, reject associations with shared FK columns.

True
return_fkeys bool

If True, return the foreign keys instead of arity.

False

Returns:

Type Description

If return_fkeys=False: Integer arity if association, False otherwise.

If return_fkeys=True: Set of foreign keys if association, False otherwise.

Source code in src/deriva_ml/model/schema_builder.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@staticmethod
def is_association_table(
    table_class,
    min_arity: int = 2,
    max_arity: int = 2,
    unqualified: bool = True,
    pure: bool = True,
    no_overlap: bool = True,
    return_fkeys: bool = False,
):
    """Check if an ORM class represents an association table.

    An association table links two or more tables through foreign keys,
    with a composite unique key covering those foreign keys.

    Args:
        table_class: SQLAlchemy ORM class to check.
        min_arity: Minimum number of foreign keys (default 2).
        max_arity: Maximum number of foreign keys (default 2).
        unqualified: If True, reject associations with extra key columns.
        pure: If True, reject associations with extra non-key columns.
        no_overlap: If True, reject associations with shared FK columns.
        return_fkeys: If True, return the foreign keys instead of arity.

    Returns:
        If return_fkeys=False: Integer arity if association, False otherwise.
        If return_fkeys=True: Set of foreign keys if association, False otherwise.
    """
    if min_arity < 2:
        raise ValueError("An association cannot have arity < 2")
    if max_arity is not None and max_arity < min_arity:
        raise ValueError("max_arity cannot be less than min_arity")

    mapper = inspect(table_class).mapper
    system_cols = {"RID", "RCT", "RMT", "RCB", "RMB"}

    non_sys_cols = {
        col.name for col in mapper.columns if col.name not in system_cols
    }

    unique_columns = [
        {c.name for c in constraint.columns}
        for constraint in inspect(table_class).local_table.constraints
        if isinstance(constraint, SQLUniqueConstraint)
    ]

    non_sys_key_colsets = {
        frozenset(uc)
        for uc in unique_columns
        if uc.issubset(non_sys_cols) and len(uc) > 1
    }

    if not non_sys_key_colsets:
        return False

    # Choose longest compound key
    row_key = sorted(non_sys_key_colsets, key=lambda s: len(s), reverse=True)[0]
    foreign_keys = list(inspect(table_class).relationships.values())

    covered_fkeys = {
        fkey for fkey in foreign_keys
        if {c.name for c in fkey.local_columns}.issubset(row_key)
    }
    covered_fkey_cols = set()

    if len(covered_fkeys) < min_arity:
        return False
    if max_arity is not None and len(covered_fkeys) > max_arity:
        return False

    for fkey in covered_fkeys:
        fkcols = {c.name for c in fkey.local_columns}
        if no_overlap and fkcols.intersection(covered_fkey_cols):
            return False
        covered_fkey_cols.update(fkcols)

    if unqualified and row_key.difference(covered_fkey_cols):
        return False

    if pure and non_sys_cols.difference(row_key):
        return False

    return covered_fkeys if return_fkeys else len(covered_fkeys)

list_tables

list_tables() -> list[str]

List all tables in the database.

Returns:

Type Description
list[str]

List of fully-qualified table names (schema.table), sorted.

Source code in src/deriva_ml/model/schema_builder.py
175
176
177
178
179
180
181
182
183
def list_tables(self) -> list[str]:
    """List all tables in the database.

    Returns:
        List of fully-qualified table names (schema.table), sorted.
    """
    tables = list(self.metadata.tables.keys())
    tables.sort()
    return tables

SortKey dataclass

A sort key for row ordering.

Parameters:

Name Type Description Default
column str

Column name to sort by

required
descending bool

Sort in descending order (default False)

False
Example

SortKey("Name") # Ascending SortKey("Created", descending=True) # Descending

Source code in src/deriva_ml/model/annotations.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
@dataclass
class SortKey:
    """A sort key for row ordering.

    Args:
        column: Column name to sort by
        descending: Sort in descending order (default False)

    Example:
        >>> SortKey("Name")  # Ascending
        >>> SortKey("Created", descending=True)  # Descending
    """
    column: str
    descending: bool = False

    def to_dict(self) -> dict[str, Any] | str:
        """Convert to dict or string (if ascending)."""
        if self.descending:
            return {"column": self.column, "descending": True}
        return self.column

to_dict

to_dict() -> dict[str, Any] | str

Convert to dict or string (if ascending).

Source code in src/deriva_ml/model/annotations.py
433
434
435
436
437
def to_dict(self) -> dict[str, Any] | str:
    """Convert to dict or string (if ascending)."""
    if self.descending:
        return {"column": self.column, "descending": True}
    return self.column

TableDisplay dataclass

Bases: AnnotationBuilder

Table-display annotation builder.

Controls table-level display options like row naming and ordering.

Example

td = TableDisplay() td.row_name(row_markdown_pattern="{{{Name}}} ({{{Species}}})") td.compact(row_order=[SortKey("Name")])

Source code in src/deriva_ml/model/annotations.py
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
@dataclass
class TableDisplay(AnnotationBuilder):
    """Table-display annotation builder.

    Controls table-level display options like row naming and ordering.

    Example:
        >>> td = TableDisplay()
        >>> td.row_name(row_markdown_pattern="{{{Name}}} ({{{Species}}})")
        >>> td.compact(row_order=[SortKey("Name")])
    """
    tag = TAG_TABLE_DISPLAY

    _contexts: dict[str, TableDisplayOptions | str | None] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        options: TableDisplayOptions | str | None
    ) -> "TableDisplay":
        """Set options for a context."""
        self._contexts[context] = options
        return self

    def row_name(
        self,
        row_markdown_pattern: str,
        template_engine: TemplateEngine | None = None
    ) -> "TableDisplay":
        """Set row name pattern (used in foreign key dropdowns, etc.)."""
        return self.set_context(
            CONTEXT_ROW_NAME,
            TableDisplayOptions(
                row_markdown_pattern=row_markdown_pattern,
                template_engine=template_engine
            )
        )

    def compact(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set options for compact (list) view."""
        return self.set_context(CONTEXT_COMPACT, options)

    def detailed(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set options for detailed (record) view."""
        return self.set_context(CONTEXT_DETAILED, options)

    def default(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set default options."""
        return self.set_context(CONTEXT_DEFAULT, options)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, options in self._contexts.items():
            if options is None:
                result[context] = None
            elif isinstance(options, str):
                result[context] = options
            else:
                result[context] = options.to_dict()
        return result

compact

compact(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set options for compact (list) view.

Source code in src/deriva_ml/model/annotations.py
1011
1012
1013
def compact(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set options for compact (list) view."""
    return self.set_context(CONTEXT_COMPACT, options)

default

default(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set default options.

Source code in src/deriva_ml/model/annotations.py
1019
1020
1021
def default(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set default options."""
    return self.set_context(CONTEXT_DEFAULT, options)

detailed

detailed(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set options for detailed (record) view.

Source code in src/deriva_ml/model/annotations.py
1015
1016
1017
def detailed(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set options for detailed (record) view."""
    return self.set_context(CONTEXT_DETAILED, options)

row_name

row_name(
    row_markdown_pattern: str,
    template_engine: TemplateEngine
    | None = None,
) -> "TableDisplay"

Set row name pattern (used in foreign key dropdowns, etc.).

Source code in src/deriva_ml/model/annotations.py
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def row_name(
    self,
    row_markdown_pattern: str,
    template_engine: TemplateEngine | None = None
) -> "TableDisplay":
    """Set row name pattern (used in foreign key dropdowns, etc.)."""
    return self.set_context(
        CONTEXT_ROW_NAME,
        TableDisplayOptions(
            row_markdown_pattern=row_markdown_pattern,
            template_engine=template_engine
        )
    )

set_context

set_context(
    context: str,
    options: TableDisplayOptions
    | str
    | None,
) -> "TableDisplay"

Set options for a context.

Source code in src/deriva_ml/model/annotations.py
988
989
990
991
992
993
994
995
def set_context(
    self,
    context: str,
    options: TableDisplayOptions | str | None
) -> "TableDisplay":
    """Set options for a context."""
    self._contexts[context] = options
    return self

TableDisplayOptions dataclass

Options for a single table display context.

Parameters:

Name Type Description Default
row_order list[SortKey] | None

Sort order for rows

None
page_size int | None

Number of rows per page

None
row_markdown_pattern str | None

Template for row names

None
page_markdown_pattern str | None

Template for page header

None
separator_markdown str | None

Template between rows

None
prefix_markdown str | None

Template before rows

None
suffix_markdown str | None

Template after rows

None
template_engine TemplateEngine | None

Template engine for patterns

None
collapse_toc_panel bool | None

Collapse TOC panel

None
hide_column_headers bool | None

Hide column headers

None
Source code in src/deriva_ml/model/annotations.py
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
@dataclass
class TableDisplayOptions:
    """Options for a single table display context.

    Args:
        row_order: Sort order for rows
        page_size: Number of rows per page
        row_markdown_pattern: Template for row names
        page_markdown_pattern: Template for page header
        separator_markdown: Template between rows
        prefix_markdown: Template before rows
        suffix_markdown: Template after rows
        template_engine: Template engine for patterns
        collapse_toc_panel: Collapse TOC panel
        hide_column_headers: Hide column headers
    """
    row_order: list[SortKey] | None = None
    page_size: int | None = None
    row_markdown_pattern: str | None = None
    page_markdown_pattern: str | None = None
    separator_markdown: str | None = None
    prefix_markdown: str | None = None
    suffix_markdown: str | None = None
    template_engine: TemplateEngine | None = None
    collapse_toc_panel: bool | None = None
    hide_column_headers: bool | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.row_order is not None:
            result["row_order"] = [
                k.to_dict() if isinstance(k, SortKey) else k
                for k in self.row_order
            ]
        if self.page_size is not None:
            result["page_size"] = self.page_size
        if self.row_markdown_pattern is not None:
            result["row_markdown_pattern"] = self.row_markdown_pattern
        if self.page_markdown_pattern is not None:
            result["page_markdown_pattern"] = self.page_markdown_pattern
        if self.separator_markdown is not None:
            result["separator_markdown"] = self.separator_markdown
        if self.prefix_markdown is not None:
            result["prefix_markdown"] = self.prefix_markdown
        if self.suffix_markdown is not None:
            result["suffix_markdown"] = self.suffix_markdown
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.collapse_toc_panel is not None:
            result["collapse_toc_panel"] = self.collapse_toc_panel
        if self.hide_column_headers is not None:
            result["hide_column_headers"] = self.hide_column_headers
        return result

TemplateEngine

Bases: str, Enum

Template engine for markdown patterns.

Attributes:

Name Type Description
HANDLEBARS

Use Handlebars.js templating (recommended, more features)

MUSTACHE

Use Mustache templating (simpler, fewer features)

Example

display = PseudoColumnDisplay( ... markdown_pattern="{{{Name}}}", ... template_engine=TemplateEngine.HANDLEBARS ... )

Source code in src/deriva_ml/model/annotations.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class TemplateEngine(str, Enum):
    """Template engine for markdown patterns.

    Attributes:
        HANDLEBARS: Use Handlebars.js templating (recommended, more features)
        MUSTACHE: Use Mustache templating (simpler, fewer features)

    Example:
        >>> display = PseudoColumnDisplay(
        ...     markdown_pattern="[{{{Name}}}]({{{URL}}})",
        ...     template_engine=TemplateEngine.HANDLEBARS
        ... )
    """
    HANDLEBARS = "handlebars"
    MUSTACHE = "mustache"

VisibleColumns dataclass

Bases: AnnotationBuilder

Visible-columns annotation builder.

Controls which columns appear in different UI contexts and their order. This is one of the most commonly used annotations for customizing the Chaise interface.

Column entries can be: - Column names (strings): "Name", "RID", "Description" - Foreign key references: fk_constraint("schema", "constraint_name") - Pseudo-columns: PseudoColumn(...) for computed/derived values

Contexts: - compact: Table/list views (search results, data browser) - detailed: Single record view (full record page) - entry: Create/edit forms - entry/create: Create form only - entry/edit: Edit form only - *: Default for all contexts

Example

Basic column lists for different contexts::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name", "Status"])
>>> vc.detailed(["RID", "Name", "Status", "Description", "Created"])
>>> vc.entry(["Name", "Status", "Description"])
>>> handle.set_annotation(vc)

Method chaining::

>>> vc = (VisibleColumns()
...     .compact(["RID", "Name"])
...     .detailed(["RID", "Name", "Description"])
...     .entry(["Name", "Description"]))

Including foreign key references::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     fk_constraint("domain", "Subject_Species_fkey"),
... ])

With pseudo-columns for computed values::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     PseudoColumn(
...         source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
...         aggregate=Aggregate.CNT,
...         markdown_name="Samples"
...     ),
... ])

Context inheritance (reference another context)::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name"])
>>> vc.set_context("compact/brief", "compact")  # Inherit from compact

With faceted search (filter context)::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name", "Status"])
>>> facets = FacetList()
>>> facets.add(Facet(source="Status", open=True))
>>> vc._contexts["filter"] = facets.to_dict()
Source code in src/deriva_ml/model/annotations.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
@dataclass
class VisibleColumns(AnnotationBuilder):
    """Visible-columns annotation builder.

    Controls which columns appear in different UI contexts and their order.
    This is one of the most commonly used annotations for customizing the
    Chaise interface.

    Column entries can be:
    - Column names (strings): "Name", "RID", "Description"
    - Foreign key references: fk_constraint("schema", "constraint_name")
    - Pseudo-columns: PseudoColumn(...) for computed/derived values

    Contexts:
    - ``compact``: Table/list views (search results, data browser)
    - ``detailed``: Single record view (full record page)
    - ``entry``: Create/edit forms
    - ``entry/create``: Create form only
    - ``entry/edit``: Edit form only
    - ``*``: Default for all contexts

    Example:
        Basic column lists for different contexts::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name", "Status"])
            >>> vc.detailed(["RID", "Name", "Status", "Description", "Created"])
            >>> vc.entry(["Name", "Status", "Description"])
            >>> handle.set_annotation(vc)

        Method chaining::

            >>> vc = (VisibleColumns()
            ...     .compact(["RID", "Name"])
            ...     .detailed(["RID", "Name", "Description"])
            ...     .entry(["Name", "Description"]))

        Including foreign key references::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     fk_constraint("domain", "Subject_Species_fkey"),
            ... ])

        With pseudo-columns for computed values::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     PseudoColumn(
            ...         source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
            ...         aggregate=Aggregate.CNT,
            ...         markdown_name="Samples"
            ...     ),
            ... ])

        Context inheritance (reference another context)::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name"])
            >>> vc.set_context("compact/brief", "compact")  # Inherit from compact

        With faceted search (filter context)::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name", "Status"])
            >>> facets = FacetList()
            >>> facets.add(Facet(source="Status", open=True))
            >>> vc._contexts["filter"] = facets.to_dict()
    """
    tag = TAG_VISIBLE_COLUMNS

    _contexts: dict[str, list[ColumnEntry] | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        columns: list[ColumnEntry] | str
    ) -> "VisibleColumns":
        """Set columns for a context.

        Args:
            context: Context name (e.g., "compact", "detailed", "*")
            columns: List of columns, or string referencing another context

        Returns:
            Self for chaining
        """
        self._contexts[context] = columns
        return self

    def compact(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for compact (list) view."""
        return self.set_context(CONTEXT_COMPACT, columns)

    def detailed(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for detailed (record) view."""
        return self.set_context(CONTEXT_DETAILED, columns)

    def entry(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for entry (create/edit) forms."""
        return self.set_context(CONTEXT_ENTRY, columns)

    def entry_create(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for create form only."""
        return self.set_context(CONTEXT_ENTRY_CREATE, columns)

    def entry_edit(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for edit form only."""
        return self.set_context(CONTEXT_ENTRY_EDIT, columns)

    def default(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set default columns for all contexts."""
        return self.set_context(CONTEXT_DEFAULT, columns)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, columns in self._contexts.items():
            if isinstance(columns, str):
                result[context] = columns
            else:
                result[context] = [
                    c.to_dict() if isinstance(c, PseudoColumn) else c
                    for c in columns
                ]
        return result

compact

compact(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for compact (list) view.

Source code in src/deriva_ml/model/annotations.py
822
823
824
def compact(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for compact (list) view."""
    return self.set_context(CONTEXT_COMPACT, columns)

default

default(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set default columns for all contexts.

Source code in src/deriva_ml/model/annotations.py
842
843
844
def default(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set default columns for all contexts."""
    return self.set_context(CONTEXT_DEFAULT, columns)

detailed

detailed(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for detailed (record) view.

Source code in src/deriva_ml/model/annotations.py
826
827
828
def detailed(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for detailed (record) view."""
    return self.set_context(CONTEXT_DETAILED, columns)

entry

entry(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for entry (create/edit) forms.

Source code in src/deriva_ml/model/annotations.py
830
831
832
def entry(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for entry (create/edit) forms."""
    return self.set_context(CONTEXT_ENTRY, columns)

entry_create

entry_create(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for create form only.

Source code in src/deriva_ml/model/annotations.py
834
835
836
def entry_create(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for create form only."""
    return self.set_context(CONTEXT_ENTRY_CREATE, columns)

entry_edit

entry_edit(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for edit form only.

Source code in src/deriva_ml/model/annotations.py
838
839
840
def entry_edit(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for edit form only."""
    return self.set_context(CONTEXT_ENTRY_EDIT, columns)

set_context

set_context(
    context: str,
    columns: list[ColumnEntry] | str,
) -> "VisibleColumns"

Set columns for a context.

Parameters:

Name Type Description Default
context str

Context name (e.g., "compact", "detailed", "*")

required
columns list[ColumnEntry] | str

List of columns, or string referencing another context

required

Returns:

Type Description
'VisibleColumns'

Self for chaining

Source code in src/deriva_ml/model/annotations.py
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
def set_context(
    self,
    context: str,
    columns: list[ColumnEntry] | str
) -> "VisibleColumns":
    """Set columns for a context.

    Args:
        context: Context name (e.g., "compact", "detailed", "*")
        columns: List of columns, or string referencing another context

    Returns:
        Self for chaining
    """
    self._contexts[context] = columns
    return self

VisibleForeignKeys dataclass

Bases: AnnotationBuilder

Visible-foreign-keys annotation builder.

Controls which related tables appear in the UI via inbound foreign keys.

Example

vfk = VisibleForeignKeys() vfk.detailed([ ... fk_constraint("domain", "Image_Subject_fkey"), ... fk_constraint("domain", "Diagnosis_Subject_fkey") ... ])

Source code in src/deriva_ml/model/annotations.py
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
@dataclass
class VisibleForeignKeys(AnnotationBuilder):
    """Visible-foreign-keys annotation builder.

    Controls which related tables appear in the UI via inbound foreign keys.

    Example:
        >>> vfk = VisibleForeignKeys()
        >>> vfk.detailed([
        ...     fk_constraint("domain", "Image_Subject_fkey"),
        ...     fk_constraint("domain", "Diagnosis_Subject_fkey")
        ... ])
    """
    tag = TAG_VISIBLE_FOREIGN_KEYS

    _contexts: dict[str, list[ForeignKeyEntry] | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        foreign_keys: list[ForeignKeyEntry] | str
    ) -> "VisibleForeignKeys":
        """Set foreign keys for a context."""
        self._contexts[context] = foreign_keys
        return self

    def detailed(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
        """Set foreign keys for detailed view."""
        return self.set_context(CONTEXT_DETAILED, foreign_keys)

    def default(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
        """Set default foreign keys for all contexts."""
        return self.set_context(CONTEXT_DEFAULT, foreign_keys)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, fkeys in self._contexts.items():
            if isinstance(fkeys, str):
                result[context] = fkeys
            else:
                result[context] = [
                    fk.to_dict() if isinstance(fk, PseudoColumn) else fk
                    for fk in fkeys
                ]
        return result

default

default(
    foreign_keys: list[ForeignKeyEntry],
) -> "VisibleForeignKeys"

Set default foreign keys for all contexts.

Source code in src/deriva_ml/model/annotations.py
897
898
899
def default(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
    """Set default foreign keys for all contexts."""
    return self.set_context(CONTEXT_DEFAULT, foreign_keys)

detailed

detailed(
    foreign_keys: list[ForeignKeyEntry],
) -> "VisibleForeignKeys"

Set foreign keys for detailed view.

Source code in src/deriva_ml/model/annotations.py
893
894
895
def detailed(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
    """Set foreign keys for detailed view."""
    return self.set_context(CONTEXT_DETAILED, foreign_keys)

set_context

set_context(
    context: str,
    foreign_keys: list[ForeignKeyEntry]
    | str,
) -> "VisibleForeignKeys"

Set foreign keys for a context.

Source code in src/deriva_ml/model/annotations.py
884
885
886
887
888
889
890
891
def set_context(
    self,
    context: str,
    foreign_keys: list[ForeignKeyEntry] | str
) -> "VisibleForeignKeys":
    """Set foreign keys for a context."""
    self._contexts[context] = foreign_keys
    return self

__getattr__

__getattr__(name: str)

Lazy import for DatabaseModel and DerivaMLDatabase.

Source code in src/deriva_ml/model/__init__.py
110
111
112
113
114
115
116
117
118
119
120
def __getattr__(name: str):
    """Lazy import for DatabaseModel and DerivaMLDatabase."""
    if name == "DatabaseModel":
        from deriva_ml.model.database import DatabaseModel

        return DatabaseModel
    if name == "DerivaMLDatabase":
        from deriva_ml.model.deriva_ml_database import DerivaMLDatabase

        return DerivaMLDatabase
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

fk_constraint

fk_constraint(
    schema: str, constraint: str
) -> list[str]

Create a foreign key constraint reference for visible-columns.

Use this in visible-columns to include a foreign key column (showing the referenced row's name/link). This is different from InboundFK/OutboundFK which are used inside PseudoColumn source paths.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required

Returns:

Type Description
list[str]

[schema, constraint] list for use in visible-columns

Example

Include a foreign key in visible columns::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     fk_constraint("domain", "Subject_Species_fkey"),  # Shows Species
... ])

This is equivalent to the raw format::

>>> vc.compact(["RID", "Name", ["domain", "Subject_Species_fkey"]])
Source code in src/deriva_ml/model/annotations.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def fk_constraint(schema: str, constraint: str) -> list[str]:
    """Create a foreign key constraint reference for visible-columns.

    Use this in visible-columns to include a foreign key column (showing the
    referenced row's name/link). This is different from InboundFK/OutboundFK
    which are used inside PseudoColumn source paths.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Returns:
        [schema, constraint] list for use in visible-columns

    Example:
        Include a foreign key in visible columns::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     fk_constraint("domain", "Subject_Species_fkey"),  # Shows Species
            ... ])

        This is equivalent to the raw format::

            >>> vc.compact(["RID", "Name", ["domain", "Subject_Species_fkey"]])
    """
    return [schema, constraint]