Skip to content

DerivaModel

The DerivaModel class provides schema introspection and manipulation capabilities for Deriva catalogs. It handles table relationships, associations, and catalog structure management.

Model module for DerivaML.

This module provides catalog and database model classes, as well as handle wrappers for ERMrest model objects and annotation builders.

Key components: - DerivaModel: Schema analysis utilities - DatabaseModel: SQLite database from BDBag - SchemaBuilder/SchemaORM: Create ORM from Deriva Model (Phase 1) - DataLoader: Fill database from data source (Phase 2) - DataSource: Protocol for data sources (BagDataSource, CatalogDataSource) - ForeignKeyOrderer: Compute FK-safe insertion order

Lazy imports are used for DatabaseModel and DerivaMLDatabase to avoid circular imports with the dataset module.

Aggregate

Bases: str, Enum

Aggregation functions for pseudo-columns.

Used when a pseudo-column follows an inbound foreign key and returns multiple values that need to be aggregated.

Attributes:

Name Type Description
MIN

Minimum value

MAX

Maximum value

CNT

Count of values

CNT_D

Count of distinct values

ARRAY

Array of all values

ARRAY_D

Array of distinct values

Example

pc = PseudoColumn( ... source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"], ... aggregate=Aggregate.CNT, ... markdown_name="Sample Count" ... )

Get distinct values as array

pc = PseudoColumn( ... source=[InboundFK("domain", "Tag_Item_fkey"), "Name"], ... aggregate=Aggregate.ARRAY_D, ... markdown_name="Tags" ... )

Source code in src/deriva_ml/model/annotations.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class Aggregate(str, Enum):
    """Aggregation functions for pseudo-columns.

    Used when a pseudo-column follows an inbound foreign key and returns
    multiple values that need to be aggregated.

    Attributes:
        MIN: Minimum value
        MAX: Maximum value
        CNT: Count of values
        CNT_D: Count of distinct values
        ARRAY: Array of all values
        ARRAY_D: Array of distinct values

    Example:
        >>> # Count related records
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
        ...     aggregate=Aggregate.CNT,
        ...     markdown_name="Sample Count"
        ... )
        >>>
        >>> # Get distinct values as array
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
        ...     aggregate=Aggregate.ARRAY_D,
        ...     markdown_name="Tags"
        ... )
    """
    MIN = "min"
    MAX = "max"
    CNT = "cnt"
    CNT_D = "cnt_d"
    ARRAY = "array"
    ARRAY_D = "array_d"

ArrayUxMode

Bases: str, Enum

Display modes for array values in pseudo-columns.

Controls how arrays of values are rendered in the UI.

Attributes:

Name Type Description
RAW

Raw array display

CSV

Comma-separated values

OLIST

Ordered (numbered) list

ULIST

Unordered (bulleted) list

Example

pc = PseudoColumn( ... source=[InboundFK("domain", "Tag_Item_fkey"), "Name"], ... aggregate=Aggregate.ARRAY, ... display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV) ... )

Source code in src/deriva_ml/model/annotations.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class ArrayUxMode(str, Enum):
    """Display modes for array values in pseudo-columns.

    Controls how arrays of values are rendered in the UI.

    Attributes:
        RAW: Raw array display
        CSV: Comma-separated values
        OLIST: Ordered (numbered) list
        ULIST: Unordered (bulleted) list

    Example:
        >>> pc = PseudoColumn(
        ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
        ...     aggregate=Aggregate.ARRAY,
        ...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV)
        ... )
    """
    RAW = "raw"
    CSV = "csv"
    OLIST = "olist"
    ULIST = "ulist"

BagDataSource

DataSource implementation for BDBag directories.

Reads data from CSV files in a bag's data/ directory. Handles asset URL localization via fetch.txt.

Example

source = BagDataSource(Path("/path/to/bag"))

List available tables

print(source.list_available_tables())

Get data for a table

for row in source.get_table_data("Image"): print(row["Filename"])

Source code in src/deriva_ml/model/data_sources.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
class BagDataSource:
    """DataSource implementation for BDBag directories.

    Reads data from CSV files in a bag's data/ directory.
    Handles asset URL localization via fetch.txt.

    Example:
        source = BagDataSource(Path("/path/to/bag"))

        # List available tables
        print(source.list_available_tables())

        # Get data for a table
        for row in source.get_table_data("Image"):
            print(row["Filename"])
    """

    def __init__(
        self,
        bag_path: Path,
        model: Model | None = None,
        asset_localization: bool = True,
    ):
        """Initialize from a bag path.

        Args:
            bag_path: Path to BDBag directory.
            model: Optional ERMrest Model for schema info. If not provided,
                will try to load from bag's schema.json.
            asset_localization: Whether to localize asset URLs to local paths
                using fetch.txt mapping.
        """
        self.bag_path = Path(bag_path)
        self.data_path = self.bag_path / "data"

        # Load model if not provided
        if model is None:
            schema_file = self.data_path / "schema.json"
            if schema_file.exists():
                self.model = Model.fromfile("file-system", schema_file)
            else:
                self.model = None
                logger.warning(f"No schema.json found in {self.bag_path}")
        else:
            self.model = model

        # Build asset map for URL localization
        self._asset_map = self._build_asset_map() if asset_localization else {}

        # Cache of table name -> list of csv file paths (multiple paths for nested datasets)
        self._csv_cache: dict[str, list[Path]] = {}
        self._build_csv_cache()

    def _build_csv_cache(self) -> None:
        """Build cache mapping table names to CSV file paths.

        Nested datasets can produce multiple CSV files for the same table
        at different directory depths. All paths are collected so that
        get_table_data() yields the union of all rows.
        """
        for csv_file in self.data_path.rglob("*.csv"):
            table_name = csv_file.stem
            self._csv_cache.setdefault(table_name, []).append(csv_file)

    def _build_asset_map(self) -> dict[str, str]:
        """Build a map from remote URLs to local file paths using fetch.txt.

        Returns:
            Dictionary mapping URL paths to local file paths.
        """
        fetch_map = {}
        fetch_file = self.bag_path / "fetch.txt"

        if not fetch_file.exists():
            logger.debug(f"No fetch.txt in bag {self.bag_path.name}")
            return fetch_map

        try:
            with fetch_file.open(newline="\n") as f:
                for row in f:
                    # Rows in fetch.txt are tab-separated: URL, size, local_path
                    fields = row.split("\t")
                    if len(fields) >= 3:
                        local_file = fields[2].replace("\n", "")
                        local_path = f"{self.bag_path}/{local_file}"
                        fetch_map[urlparse(fields[0]).path] = local_path
        except Exception as e:
            logger.warning(f"Error reading fetch.txt: {e}")

        return fetch_map

    def _get_table_name(self, table: DerivaTable | str) -> str:
        """Extract table name from table object or string."""
        if isinstance(table, DerivaTable):
            return table.name
        # Handle schema.table format
        if "." in table:
            return table.split(".")[-1]
        return table

    def _is_asset_table(self, table_name: str) -> bool:
        """Check if a table is an asset table (has Filename, URL, etc. columns)."""
        if self.model is None:
            return False

        for schema in self.model.schemas.values():
            if table_name in schema.tables:
                table = schema.tables[table_name]
                return ASSET_COLUMNS.issubset({c.name for c in table.columns})
        return False

    def _localize_asset_row(self, row: dict[str, Any]) -> dict[str, Any]:
        """Replace URL with local path in asset table row.

        Args:
            row: Dictionary of column values.

        Returns:
            Updated dictionary with localized file path.
        """
        if "URL" in row and "Filename" in row:
            url = row.get("URL")
            if url and url in self._asset_map:
                row = dict(row)  # Copy to avoid mutating original
                row["Filename"] = self._asset_map[url]
        return row

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Read table data from CSV files.

        Nested datasets may produce multiple CSV files for the same table
        at different directory depths. This method yields rows from all of
        them so that the full dataset (including parent and child records)
        is loaded.

        Args:
            table: Table object or name.

        Yields:
            Dictionary per row with column names as keys.
        """
        table_name = self._get_table_name(table)
        csv_files = self._csv_cache.get(table_name)

        if not csv_files:
            logger.debug(f"No CSV file found for table {table_name}")
            return

        is_asset = self._is_asset_table(table_name)

        for csv_file in csv_files:
            if not csv_file.exists():
                continue
            with csv_file.open(newline="") as f:
                reader = csv.DictReader(f)
                for row in reader:
                    if is_asset and self._asset_map:
                        row = self._localize_asset_row(row)
                    yield row

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if CSV exists for table.

        Args:
            table: Table object or name.

        Returns:
            True if CSV file exists for this table.
        """
        table_name = self._get_table_name(table)
        return table_name in self._csv_cache

    def list_available_tables(self) -> list[str]:
        """List all CSV files in data directory.

        Returns:
            List of table names (without .csv extension).
        """
        return sorted(self._csv_cache.keys())

    def get_row_count(self, table: DerivaTable | str) -> int:
        """Get the number of rows across all CSV files for a table.

        Args:
            table: Table object or name.

        Returns:
            Number of data rows (excluding headers).
        """
        table_name = self._get_table_name(table)
        csv_files = self._csv_cache.get(table_name)

        if not csv_files:
            return 0

        total = 0
        for csv_file in csv_files:
            if csv_file.exists():
                with csv_file.open(newline="") as f:
                    # Count lines minus header
                    total += sum(1 for _ in f) - 1
        return total

__init__

__init__(
    bag_path: Path,
    model: Model | None = None,
    asset_localization: bool = True,
)

Initialize from a bag path.

Parameters:

Name Type Description Default
bag_path Path

Path to BDBag directory.

required
model Model | None

Optional ERMrest Model for schema info. If not provided, will try to load from bag's schema.json.

None
asset_localization bool

Whether to localize asset URLs to local paths using fetch.txt mapping.

True
Source code in src/deriva_ml/model/data_sources.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(
    self,
    bag_path: Path,
    model: Model | None = None,
    asset_localization: bool = True,
):
    """Initialize from a bag path.

    Args:
        bag_path: Path to BDBag directory.
        model: Optional ERMrest Model for schema info. If not provided,
            will try to load from bag's schema.json.
        asset_localization: Whether to localize asset URLs to local paths
            using fetch.txt mapping.
    """
    self.bag_path = Path(bag_path)
    self.data_path = self.bag_path / "data"

    # Load model if not provided
    if model is None:
        schema_file = self.data_path / "schema.json"
        if schema_file.exists():
            self.model = Model.fromfile("file-system", schema_file)
        else:
            self.model = None
            logger.warning(f"No schema.json found in {self.bag_path}")
    else:
        self.model = model

    # Build asset map for URL localization
    self._asset_map = self._build_asset_map() if asset_localization else {}

    # Cache of table name -> list of csv file paths (multiple paths for nested datasets)
    self._csv_cache: dict[str, list[Path]] = {}
    self._build_csv_cache()

get_row_count

get_row_count(
    table: Table | str,
) -> int

Get the number of rows across all CSV files for a table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
int

Number of data rows (excluding headers).

Source code in src/deriva_ml/model/data_sources.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def get_row_count(self, table: DerivaTable | str) -> int:
    """Get the number of rows across all CSV files for a table.

    Args:
        table: Table object or name.

    Returns:
        Number of data rows (excluding headers).
    """
    table_name = self._get_table_name(table)
    csv_files = self._csv_cache.get(table_name)

    if not csv_files:
        return 0

    total = 0
    for csv_file in csv_files:
        if csv_file.exists():
            with csv_file.open(newline="") as f:
                # Count lines minus header
                total += sum(1 for _ in f) - 1
    return total

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Read table data from CSV files.

Nested datasets may produce multiple CSV files for the same table at different directory depths. This method yields rows from all of them so that the full dataset (including parent and child records) is loaded.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Read table data from CSV files.

    Nested datasets may produce multiple CSV files for the same table
    at different directory depths. This method yields rows from all of
    them so that the full dataset (including parent and child records)
    is loaded.

    Args:
        table: Table object or name.

    Yields:
        Dictionary per row with column names as keys.
    """
    table_name = self._get_table_name(table)
    csv_files = self._csv_cache.get(table_name)

    if not csv_files:
        logger.debug(f"No CSV file found for table {table_name}")
        return

    is_asset = self._is_asset_table(table_name)

    for csv_file in csv_files:
        if not csv_file.exists():
            continue
        with csv_file.open(newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                if is_asset and self._asset_map:
                    row = self._localize_asset_row(row)
                yield row

has_table

has_table(table: Table | str) -> bool

Check if CSV exists for table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
bool

True if CSV file exists for this table.

Source code in src/deriva_ml/model/data_sources.py
249
250
251
252
253
254
255
256
257
258
259
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if CSV exists for table.

    Args:
        table: Table object or name.

    Returns:
        True if CSV file exists for this table.
    """
    table_name = self._get_table_name(table)
    return table_name in self._csv_cache

list_available_tables

list_available_tables() -> list[str]

List all CSV files in data directory.

Returns:

Type Description
list[str]

List of table names (without .csv extension).

Source code in src/deriva_ml/model/data_sources.py
261
262
263
264
265
266
267
def list_available_tables(self) -> list[str]:
    """List all CSV files in data directory.

    Returns:
        List of table names (without .csv extension).
    """
    return sorted(self._csv_cache.keys())

CatalogDataSource

DataSource implementation for remote Deriva catalog.

Fetches data via ERMrest API / datapath with pagination support.

Example

catalog = server.connect_ermrest(catalog_id) source = CatalogDataSource(catalog, schemas=['domain', 'deriva-ml'])

List available tables

print(source.list_available_tables())

Get data for a table

for row in source.get_table_data("Image"): print(row["Filename"])

Source code in src/deriva_ml/model/data_sources.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
class CatalogDataSource:
    """DataSource implementation for remote Deriva catalog.

    Fetches data via ERMrest API / datapath with pagination support.

    Example:
        catalog = server.connect_ermrest(catalog_id)
        source = CatalogDataSource(catalog, schemas=['domain', 'deriva-ml'])

        # List available tables
        print(source.list_available_tables())

        # Get data for a table
        for row in source.get_table_data("Image"):
            print(row["Filename"])
    """

    def __init__(
        self,
        catalog: ErmrestCatalog,
        schemas: list[str],
        batch_size: int = 1000,
    ):
        """Initialize from catalog connection.

        Args:
            catalog: ERMrest catalog connection.
            schemas: Schemas to fetch data from.
            batch_size: Number of rows per API request.
        """
        self.catalog = catalog
        self.schemas = schemas
        self.batch_size = batch_size
        self._pb = catalog.getPathBuilder()
        self._model = catalog.getCatalogModel()

    def _get_table_info(self, table: DerivaTable | str) -> tuple[str, str] | None:
        """Get schema and table name for a table.

        Args:
            table: Table object or name.

        Returns:
            Tuple of (schema_name, table_name) or None if not found.
        """
        if isinstance(table, DerivaTable):
            return table.schema.name, table.name

        # Handle schema.table format
        if "." in table:
            parts = table.split(".")
            schema_name, table_name = parts[0], parts[1]
            if schema_name in self.schemas:
                return schema_name, table_name
            return None

        # Search schemas for table
        for schema_name in self.schemas:
            if schema_name in self._model.schemas:
                schema = self._model.schemas[schema_name]
                if table in schema.tables:
                    return schema_name, table

        return None

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Fetch table data via ERMrest API.

        Uses pagination to handle large tables efficiently.

        Args:
            table: Table object or name.

        Yields:
            Dictionary per row with column names as keys.
        """
        table_info = self._get_table_info(table)
        if table_info is None:
            logger.warning(f"Table {table} not found in schemas {self.schemas}")
            return

        schema_name, table_name = table_info

        # Build path
        path = self._pb.schemas[schema_name].tables[table_name]

        # Paginated fetch using RID ordering
        last_rid = None
        while True:
            # Build query with optional RID filter
            query = path.entities()
            if last_rid is not None:
                query = query.filter(path.RID > last_rid)

            # Fetch batch ordered by RID
            try:
                entities = list(query.sort(path.RID).fetch(limit=self.batch_size))
            except Exception as e:
                logger.error(f"Error fetching from {schema_name}.{table_name}: {e}")
                break

            if not entities:
                break

            for entity in entities:
                yield dict(entity)

            # Track last RID for pagination
            last_rid = entities[-1]["RID"]

            if len(entities) < self.batch_size:
                break

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if table exists in catalog.

        Args:
            table: Table object or name.

        Returns:
            True if table exists in configured schemas.
        """
        return self._get_table_info(table) is not None

    def list_available_tables(self) -> list[str]:
        """List all tables in configured schemas.

        Returns:
            List of fully-qualified table names (schema.table).
        """
        tables = []
        for schema_name in self.schemas:
            if schema_name in self._model.schemas:
                schema = self._model.schemas[schema_name]
                for table_name in schema.tables.keys():
                    tables.append(f"{schema_name}.{table_name}")
        return sorted(tables)

    def get_row_count(self, table: DerivaTable | str) -> int:
        """Get the number of rows in a table.

        Args:
            table: Table object or name.

        Returns:
            Number of rows in the table.
        """
        table_info = self._get_table_info(table)
        if table_info is None:
            return 0

        schema_name, table_name = table_info
        path = self._pb.schemas[schema_name].tables[table_name]

        try:
            # Use count aggregate
            result = path.aggregates(path.RID.cnt.alias("count")).fetch()
            return result[0]["count"] if result else 0
        except Exception as e:
            logger.error(f"Error counting {schema_name}.{table_name}: {e}")
            return 0

__init__

__init__(
    catalog: ErmrestCatalog,
    schemas: list[str],
    batch_size: int = 1000,
)

Initialize from catalog connection.

Parameters:

Name Type Description Default
catalog ErmrestCatalog

ERMrest catalog connection.

required
schemas list[str]

Schemas to fetch data from.

required
batch_size int

Number of rows per API request.

1000
Source code in src/deriva_ml/model/data_sources.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def __init__(
    self,
    catalog: ErmrestCatalog,
    schemas: list[str],
    batch_size: int = 1000,
):
    """Initialize from catalog connection.

    Args:
        catalog: ERMrest catalog connection.
        schemas: Schemas to fetch data from.
        batch_size: Number of rows per API request.
    """
    self.catalog = catalog
    self.schemas = schemas
    self.batch_size = batch_size
    self._pb = catalog.getPathBuilder()
    self._model = catalog.getCatalogModel()

get_row_count

get_row_count(
    table: Table | str,
) -> int

Get the number of rows in a table.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
int

Number of rows in the table.

Source code in src/deriva_ml/model/data_sources.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def get_row_count(self, table: DerivaTable | str) -> int:
    """Get the number of rows in a table.

    Args:
        table: Table object or name.

    Returns:
        Number of rows in the table.
    """
    table_info = self._get_table_info(table)
    if table_info is None:
        return 0

    schema_name, table_name = table_info
    path = self._pb.schemas[schema_name].tables[table_name]

    try:
        # Use count aggregate
        result = path.aggregates(path.RID.cnt.alias("count")).fetch()
        return result[0]["count"] if result else 0
    except Exception as e:
        logger.error(f"Error counting {schema_name}.{table_name}: {e}")
        return 0

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Fetch table data via ERMrest API.

Uses pagination to handle large tables efficiently.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Fetch table data via ERMrest API.

    Uses pagination to handle large tables efficiently.

    Args:
        table: Table object or name.

    Yields:
        Dictionary per row with column names as keys.
    """
    table_info = self._get_table_info(table)
    if table_info is None:
        logger.warning(f"Table {table} not found in schemas {self.schemas}")
        return

    schema_name, table_name = table_info

    # Build path
    path = self._pb.schemas[schema_name].tables[table_name]

    # Paginated fetch using RID ordering
    last_rid = None
    while True:
        # Build query with optional RID filter
        query = path.entities()
        if last_rid is not None:
            query = query.filter(path.RID > last_rid)

        # Fetch batch ordered by RID
        try:
            entities = list(query.sort(path.RID).fetch(limit=self.batch_size))
        except Exception as e:
            logger.error(f"Error fetching from {schema_name}.{table_name}: {e}")
            break

        if not entities:
            break

        for entity in entities:
            yield dict(entity)

        # Track last RID for pagination
        last_rid = entities[-1]["RID"]

        if len(entities) < self.batch_size:
            break

has_table

has_table(table: Table | str) -> bool

Check if table exists in catalog.

Parameters:

Name Type Description Default
table Table | str

Table object or name.

required

Returns:

Type Description
bool

True if table exists in configured schemas.

Source code in src/deriva_ml/model/data_sources.py
409
410
411
412
413
414
415
416
417
418
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if table exists in catalog.

    Args:
        table: Table object or name.

    Returns:
        True if table exists in configured schemas.
    """
    return self._get_table_info(table) is not None

list_available_tables

list_available_tables() -> list[str]

List all tables in configured schemas.

Returns:

Type Description
list[str]

List of fully-qualified table names (schema.table).

Source code in src/deriva_ml/model/data_sources.py
420
421
422
423
424
425
426
427
428
429
430
431
432
def list_available_tables(self) -> list[str]:
    """List all tables in configured schemas.

    Returns:
        List of fully-qualified table names (schema.table).
    """
    tables = []
    for schema_name in self.schemas:
        if schema_name in self._model.schemas:
            schema = self._model.schemas[schema_name]
            for table_name in schema.tables.keys():
                tables.append(f"{schema_name}.{table_name}")
    return sorted(tables)

ColumnDisplay dataclass

Bases: AnnotationBuilder

Column-display annotation builder.

Controls how column values are rendered.

Example

cd = ColumnDisplay() cd.default(ColumnDisplayOptions( ... pre_format=PreFormat(format="%.2f") ... ))

cd = ColumnDisplay() cd.default(ColumnDisplayOptions( ... markdown_pattern="Link" ... ))

Source code in src/deriva_ml/model/annotations.py
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
@dataclass
class ColumnDisplay(AnnotationBuilder):
    """Column-display annotation builder.

    Controls how column values are rendered.

    Example:
        >>> cd = ColumnDisplay()
        >>> cd.default(ColumnDisplayOptions(
        ...     pre_format=PreFormat(format="%.2f")
        ... ))
        >>>
        >>> # Markdown link
        >>> cd = ColumnDisplay()
        >>> cd.default(ColumnDisplayOptions(
        ...     markdown_pattern="[Link]({{{_value}}})"
        ... ))
    """
    tag = TAG_COLUMN_DISPLAY

    _contexts: dict[str, ColumnDisplayOptions | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        options: ColumnDisplayOptions | str
    ) -> "ColumnDisplay":
        """Set options for a context."""
        self._contexts[context] = options
        return self

    def default(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set default options."""
        return self.set_context(CONTEXT_DEFAULT, options)

    def compact(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set options for compact view."""
        return self.set_context(CONTEXT_COMPACT, options)

    def detailed(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
        """Set options for detailed view."""
        return self.set_context(CONTEXT_DETAILED, options)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, options in self._contexts.items():
            if isinstance(options, str):
                result[context] = options
            else:
                result[context] = options.to_dict()
        return result

compact

compact(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set options for compact view.

Source code in src/deriva_ml/model/annotations.py
1132
1133
1134
def compact(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set options for compact view."""
    return self.set_context(CONTEXT_COMPACT, options)

default

default(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set default options.

Source code in src/deriva_ml/model/annotations.py
1128
1129
1130
def default(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set default options."""
    return self.set_context(CONTEXT_DEFAULT, options)

detailed

detailed(
    options: ColumnDisplayOptions,
) -> "ColumnDisplay"

Set options for detailed view.

Source code in src/deriva_ml/model/annotations.py
1136
1137
1138
def detailed(self, options: ColumnDisplayOptions) -> "ColumnDisplay":
    """Set options for detailed view."""
    return self.set_context(CONTEXT_DETAILED, options)

set_context

set_context(
    context: str,
    options: ColumnDisplayOptions | str,
) -> "ColumnDisplay"

Set options for a context.

Source code in src/deriva_ml/model/annotations.py
1119
1120
1121
1122
1123
1124
1125
1126
def set_context(
    self,
    context: str,
    options: ColumnDisplayOptions | str
) -> "ColumnDisplay":
    """Set options for a context."""
    self._contexts[context] = options
    return self

ColumnDisplayOptions dataclass

Options for displaying a column in a specific context.

Parameters:

Name Type Description Default
pre_format PreFormat | None

Pre-formatting options

None
markdown_pattern str | None

Template for rendering

None
template_engine TemplateEngine | None

Template engine to use

None
column_order list[SortKey] | Literal[False] | None

Sort order, or False to disable

None
Source code in src/deriva_ml/model/annotations.py
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
@dataclass
class ColumnDisplayOptions:
    """Options for displaying a column in a specific context.

    Args:
        pre_format: Pre-formatting options
        markdown_pattern: Template for rendering
        template_engine: Template engine to use
        column_order: Sort order, or False to disable
    """
    pre_format: PreFormat | None = None
    markdown_pattern: str | None = None
    template_engine: TemplateEngine | None = None
    column_order: list[SortKey] | Literal[False] | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.pre_format is not None:
            result["pre_format"] = self.pre_format.to_dict()
        if self.markdown_pattern is not None:
            result["markdown_pattern"] = self.markdown_pattern
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.column_order is not None:
            if self.column_order is False:
                result["column_order"] = False
            else:
                result["column_order"] = [
                    k.to_dict() if isinstance(k, SortKey) else k
                    for k in self.column_order
                ]
        return result

DataLoader

Loads data into a database with FK ordering.

Phase 2 of the two-phase database creation pattern. Takes a SchemaORM (from Phase 1) and populates it from a DataSource.

Automatically orders tables by FK dependencies to ensure referential integrity during loading.

Example

Phase 1: Create ORM

orm = SchemaBuilder(model, schemas).build()

Phase 2: Fill with data from bag

source = BagDataSource(bag_path) loader = DataLoader(orm, source) counts = loader.load_tables() # All tables print(f"Loaded {sum(counts.values())} total rows")

Or load specific tables

counts = loader.load_tables(['Subject', 'Image'])

With progress callback

def on_progress(table, count, total): print(f"Loaded {table}: {count} rows") loader.load_tables(progress_callback=on_progress)

Source code in src/deriva_ml/model/data_loader.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class DataLoader:
    """Loads data into a database with FK ordering.

    Phase 2 of the two-phase database creation pattern. Takes a
    SchemaORM (from Phase 1) and populates it from a DataSource.

    Automatically orders tables by FK dependencies to ensure
    referential integrity during loading.

    Example:
        # Phase 1: Create ORM
        orm = SchemaBuilder(model, schemas).build()

        # Phase 2: Fill with data from bag
        source = BagDataSource(bag_path)
        loader = DataLoader(orm, source)
        counts = loader.load_tables()  # All tables
        print(f"Loaded {sum(counts.values())} total rows")

        # Or load specific tables
        counts = loader.load_tables(['Subject', 'Image'])

        # With progress callback
        def on_progress(table, count, total):
            print(f"Loaded {table}: {count} rows")
        loader.load_tables(progress_callback=on_progress)
    """

    def __init__(
        self,
        schema_orm: SchemaORM,
        data_source: DataSource,
    ):
        """Initialize the loader.

        Args:
            schema_orm: ORM structure from SchemaBuilder.
            data_source: Source of data to load (BagDataSource, CatalogDataSource, etc.).
        """
        self.orm = schema_orm
        self.source = data_source
        self.orderer = ForeignKeyOrderer(
            schema_orm.model,
            schema_orm.schemas,
        )

    def load_tables(
        self,
        tables: list[str | DerivaTable] | None = None,
        on_conflict: str = "ignore",
        batch_size: int = 1000,
        progress_callback: Callable[[str, int, int], None] | None = None,
    ) -> dict[str, int]:
        """Load data into specified tables with FK ordering.

        Tables are automatically ordered by FK dependencies to ensure
        referenced tables are populated first.

        Args:
            tables: Tables to load. If None, loads all tables that have
                data in the source.
            on_conflict: How to handle duplicate keys:
                - "ignore": Skip rows with duplicate keys (default)
                - "replace": Replace existing rows
                - "error": Raise error on duplicates
            batch_size: Number of rows per insert batch.
            progress_callback: Optional callback(table_name, rows_loaded, total_tables)
                called after each table is loaded.

        Returns:
            Dict mapping table names to row counts loaded.
        """
        # Determine tables to load
        if tables is None:
            # Get all tables that have data in source
            available = set(self.source.list_available_tables())
            # Filter to tables that exist in ORM
            orm_tables = set(self.orm.list_tables())

            # Match available tables to ORM tables
            tables_to_load = []
            for orm_table in orm_tables:
                # Check both qualified and unqualified names
                table_name = orm_table.split(".")[-1]
                if orm_table in available or table_name in available:
                    tables_to_load.append(orm_table)
        else:
            tables_to_load = [
                t if isinstance(t, str) else f"{t.schema.name}.{t.name}"
                for t in tables
            ]

        # Compute insertion order
        try:
            ordered_tables = self.orderer.get_insertion_order(tables_to_load)
        except ValueError as e:
            # Some tables might not be in the model, just use original order
            logger.warning(f"Could not compute FK ordering: {e}")
            ordered_tables = [
                self.orderer._to_table(t) if isinstance(t, str) else t
                for t in tables_to_load
                if self._table_exists(t)
            ]

        # Load in order
        counts = {}
        total_tables = len(ordered_tables)

        for i, table in enumerate(ordered_tables):
            table_key = f"{table.schema.name}.{table.name}"

            count = self._load_table(table, on_conflict, batch_size)
            counts[table_key] = count

            if progress_callback:
                progress_callback(table_key, count, total_tables)

            if count > 0:
                logger.info(f"Loaded {count} rows into {table_key}")

        return counts

    def _table_exists(self, table: str | DerivaTable) -> bool:
        """Check if table exists in ORM."""
        try:
            if isinstance(table, str):
                self.orm.find_table(table)
            else:
                self.orm.find_table(f"{table.schema.name}.{table.name}")
            return True
        except KeyError:
            return False

    def _load_table(
        self,
        table: DerivaTable,
        on_conflict: str,
        batch_size: int,
    ) -> int:
        """Load a single table.

        Args:
            table: Table to load.
            on_conflict: Conflict handling strategy.
            batch_size: Rows per batch.

        Returns:
            Number of rows loaded.
        """
        table_key = f"{table.schema.name}.{table.name}"

        # Find SQL table
        try:
            sql_table = self.orm.find_table(table_key)
        except KeyError:
            logger.warning(f"Table {table_key} not found in ORM")
            return 0

        # Check if source has data
        if not self.source.has_table(table):
            logger.debug(f"No data for {table_key} in source")
            return 0

        # Get data from source
        rows_loaded = 0
        batch = []

        with self.orm.engine.begin() as conn:
            for row in self.source.get_table_data(table):
                batch.append(row)

                if len(batch) >= batch_size:
                    rows_loaded += self._insert_batch(
                        conn, sql_table, batch, on_conflict
                    )
                    batch = []

            # Insert remaining rows
            if batch:
                rows_loaded += self._insert_batch(
                    conn, sql_table, batch, on_conflict
                )

        return rows_loaded

    def _insert_batch(
        self,
        conn: Any,
        sql_table: Any,
        rows: list[dict[str, Any]],
        on_conflict: str,
    ) -> int:
        """Insert a batch of rows.

        Args:
            conn: Database connection.
            sql_table: SQLAlchemy table.
            rows: List of row dictionaries.
            on_conflict: Conflict handling strategy.

        Returns:
            Number of rows inserted.
        """
        if not rows:
            return 0

        try:
            if on_conflict == "ignore":
                stmt = sqlite_insert(sql_table).on_conflict_do_nothing()
            elif on_conflict == "replace":
                # For SQLite, we need to specify all columns for upsert
                stmt = sqlite_insert(sql_table)
                update_cols = {
                    c.name: c for c in stmt.excluded
                    if c.name not in ("RID",)  # Don't update primary key
                }
                stmt = stmt.on_conflict_do_update(
                    index_elements=["RID"],
                    set_=update_cols,
                )
            else:
                stmt = sql_table.insert()

            conn.execute(stmt, rows)
            return len(rows)

        except Exception as e:
            logger.error(f"Error inserting into {sql_table.name}: {e}")
            if on_conflict == "error":
                raise
            return 0

    def load_table(
        self,
        table: str | DerivaTable,
        on_conflict: str = "ignore",
        batch_size: int = 1000,
    ) -> int:
        """Load a single table (without FK ordering).

        Use this when you know the dependencies are already satisfied
        or for loading a single table.

        Args:
            table: Table to load.
            on_conflict: Conflict handling strategy.
            batch_size: Rows per batch.

        Returns:
            Number of rows loaded.
        """
        if isinstance(table, str):
            table = self.orderer._to_table(table)

        return self._load_table(table, on_conflict, batch_size)

    def get_load_order(
        self,
        tables: list[str | DerivaTable] | None = None,
    ) -> list[str]:
        """Get the FK-safe load order for tables without loading.

        Useful for previewing or manually controlling load order.

        Args:
            tables: Tables to order. If None, orders all available.

        Returns:
            List of table names in safe insertion order.
        """
        if tables is None:
            available = self.source.list_available_tables()
            tables = [t for t in available if self._table_exists(t)]

        ordered = self.orderer.get_insertion_order(tables)
        return [f"{t.schema.name}.{t.name}" for t in ordered]

    def validate_load_order(
        self,
        tables: list[str | DerivaTable],
    ) -> list[tuple[str, str, str]]:
        """Validate that tables can be loaded in the given order.

        Args:
            tables: Ordered list of tables.

        Returns:
            List of FK violations as (table, missing_dep, fk_name) tuples.
            Empty if order is valid.
        """
        return self.orderer.validate_insertion_order(tables)

__init__

__init__(
    schema_orm: SchemaORM,
    data_source: DataSource,
)

Initialize the loader.

Parameters:

Name Type Description Default
schema_orm SchemaORM

ORM structure from SchemaBuilder.

required
data_source DataSource

Source of data to load (BagDataSource, CatalogDataSource, etc.).

required
Source code in src/deriva_ml/model/data_loader.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    schema_orm: SchemaORM,
    data_source: DataSource,
):
    """Initialize the loader.

    Args:
        schema_orm: ORM structure from SchemaBuilder.
        data_source: Source of data to load (BagDataSource, CatalogDataSource, etc.).
    """
    self.orm = schema_orm
    self.source = data_source
    self.orderer = ForeignKeyOrderer(
        schema_orm.model,
        schema_orm.schemas,
    )

get_load_order

get_load_order(
    tables: list[str | Table]
    | None = None,
) -> list[str]

Get the FK-safe load order for tables without loading.

Useful for previewing or manually controlling load order.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all available.

None

Returns:

Type Description
list[str]

List of table names in safe insertion order.

Source code in src/deriva_ml/model/data_loader.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def get_load_order(
    self,
    tables: list[str | DerivaTable] | None = None,
) -> list[str]:
    """Get the FK-safe load order for tables without loading.

    Useful for previewing or manually controlling load order.

    Args:
        tables: Tables to order. If None, orders all available.

    Returns:
        List of table names in safe insertion order.
    """
    if tables is None:
        available = self.source.list_available_tables()
        tables = [t for t in available if self._table_exists(t)]

    ordered = self.orderer.get_insertion_order(tables)
    return [f"{t.schema.name}.{t.name}" for t in ordered]

load_table

load_table(
    table: str | Table,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
) -> int

Load a single table (without FK ordering).

Use this when you know the dependencies are already satisfied or for loading a single table.

Parameters:

Name Type Description Default
table str | Table

Table to load.

required
on_conflict str

Conflict handling strategy.

'ignore'
batch_size int

Rows per batch.

1000

Returns:

Type Description
int

Number of rows loaded.

Source code in src/deriva_ml/model/data_loader.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def load_table(
    self,
    table: str | DerivaTable,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
) -> int:
    """Load a single table (without FK ordering).

    Use this when you know the dependencies are already satisfied
    or for loading a single table.

    Args:
        table: Table to load.
        on_conflict: Conflict handling strategy.
        batch_size: Rows per batch.

    Returns:
        Number of rows loaded.
    """
    if isinstance(table, str):
        table = self.orderer._to_table(table)

    return self._load_table(table, on_conflict, batch_size)

load_tables

load_tables(
    tables: list[str | Table]
    | None = None,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
    progress_callback: Callable[
        [str, int, int], None
    ]
    | None = None,
) -> dict[str, int]

Load data into specified tables with FK ordering.

Tables are automatically ordered by FK dependencies to ensure referenced tables are populated first.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to load. If None, loads all tables that have data in the source.

None
on_conflict str

How to handle duplicate keys: - "ignore": Skip rows with duplicate keys (default) - "replace": Replace existing rows - "error": Raise error on duplicates

'ignore'
batch_size int

Number of rows per insert batch.

1000
progress_callback Callable[[str, int, int], None] | None

Optional callback(table_name, rows_loaded, total_tables) called after each table is loaded.

None

Returns:

Type Description
dict[str, int]

Dict mapping table names to row counts loaded.

Source code in src/deriva_ml/model/data_loader.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def load_tables(
    self,
    tables: list[str | DerivaTable] | None = None,
    on_conflict: str = "ignore",
    batch_size: int = 1000,
    progress_callback: Callable[[str, int, int], None] | None = None,
) -> dict[str, int]:
    """Load data into specified tables with FK ordering.

    Tables are automatically ordered by FK dependencies to ensure
    referenced tables are populated first.

    Args:
        tables: Tables to load. If None, loads all tables that have
            data in the source.
        on_conflict: How to handle duplicate keys:
            - "ignore": Skip rows with duplicate keys (default)
            - "replace": Replace existing rows
            - "error": Raise error on duplicates
        batch_size: Number of rows per insert batch.
        progress_callback: Optional callback(table_name, rows_loaded, total_tables)
            called after each table is loaded.

    Returns:
        Dict mapping table names to row counts loaded.
    """
    # Determine tables to load
    if tables is None:
        # Get all tables that have data in source
        available = set(self.source.list_available_tables())
        # Filter to tables that exist in ORM
        orm_tables = set(self.orm.list_tables())

        # Match available tables to ORM tables
        tables_to_load = []
        for orm_table in orm_tables:
            # Check both qualified and unqualified names
            table_name = orm_table.split(".")[-1]
            if orm_table in available or table_name in available:
                tables_to_load.append(orm_table)
    else:
        tables_to_load = [
            t if isinstance(t, str) else f"{t.schema.name}.{t.name}"
            for t in tables
        ]

    # Compute insertion order
    try:
        ordered_tables = self.orderer.get_insertion_order(tables_to_load)
    except ValueError as e:
        # Some tables might not be in the model, just use original order
        logger.warning(f"Could not compute FK ordering: {e}")
        ordered_tables = [
            self.orderer._to_table(t) if isinstance(t, str) else t
            for t in tables_to_load
            if self._table_exists(t)
        ]

    # Load in order
    counts = {}
    total_tables = len(ordered_tables)

    for i, table in enumerate(ordered_tables):
        table_key = f"{table.schema.name}.{table.name}"

        count = self._load_table(table, on_conflict, batch_size)
        counts[table_key] = count

        if progress_callback:
            progress_callback(table_key, count, total_tables)

        if count > 0:
            logger.info(f"Loaded {count} rows into {table_key}")

    return counts

validate_load_order

validate_load_order(
    tables: list[str | Table],
) -> list[tuple[str, str, str]]

Validate that tables can be loaded in the given order.

Parameters:

Name Type Description Default
tables list[str | Table]

Ordered list of tables.

required

Returns:

Type Description
list[tuple[str, str, str]]

List of FK violations as (table, missing_dep, fk_name) tuples.

list[tuple[str, str, str]]

Empty if order is valid.

Source code in src/deriva_ml/model/data_loader.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def validate_load_order(
    self,
    tables: list[str | DerivaTable],
) -> list[tuple[str, str, str]]:
    """Validate that tables can be loaded in the given order.

    Args:
        tables: Ordered list of tables.

    Returns:
        List of FK violations as (table, missing_dep, fk_name) tuples.
        Empty if order is valid.
    """
    return self.orderer.validate_insertion_order(tables)

DataSource

Bases: Protocol

Protocol for data sources that can fill a database.

Implementations provide data for populating SQLite tables from different sources (bags, remote catalogs, etc.).

This is used with DataLoader in Phase 2 of the two-phase pattern.

Source code in src/deriva_ml/model/data_sources.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@runtime_checkable
class DataSource(Protocol):
    """Protocol for data sources that can fill a database.

    Implementations provide data for populating SQLite tables from
    different sources (bags, remote catalogs, etc.).

    This is used with DataLoader in Phase 2 of the two-phase pattern.
    """

    def get_table_data(
        self,
        table: DerivaTable | str,
    ) -> Iterator[dict[str, Any]]:
        """Yield rows for a table as dictionaries.

        Args:
            table: Table object or name to get data for.

        Yields:
            Dictionary per row with column names as keys.
        """
        ...

    def has_table(self, table: DerivaTable | str) -> bool:
        """Check if this source has data for the table.

        Args:
            table: Table object or name to check.

        Returns:
            True if data is available for this table.
        """
        ...

    def list_available_tables(self) -> list[str]:
        """List tables with available data.

        Returns:
            List of table names (may include schema prefix).
        """
        ...

get_table_data

get_table_data(
    table: Table | str,
) -> Iterator[dict[str, Any]]

Yield rows for a table as dictionaries.

Parameters:

Name Type Description Default
table Table | str

Table object or name to get data for.

required

Yields:

Type Description
dict[str, Any]

Dictionary per row with column names as keys.

Source code in src/deriva_ml/model/data_sources.py
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_table_data(
    self,
    table: DerivaTable | str,
) -> Iterator[dict[str, Any]]:
    """Yield rows for a table as dictionaries.

    Args:
        table: Table object or name to get data for.

    Yields:
        Dictionary per row with column names as keys.
    """
    ...

has_table

has_table(table: Table | str) -> bool

Check if this source has data for the table.

Parameters:

Name Type Description Default
table Table | str

Table object or name to check.

required

Returns:

Type Description
bool

True if data is available for this table.

Source code in src/deriva_ml/model/data_sources.py
66
67
68
69
70
71
72
73
74
75
def has_table(self, table: DerivaTable | str) -> bool:
    """Check if this source has data for the table.

    Args:
        table: Table object or name to check.

    Returns:
        True if data is available for this table.
    """
    ...

list_available_tables

list_available_tables() -> list[str]

List tables with available data.

Returns:

Type Description
list[str]

List of table names (may include schema prefix).

Source code in src/deriva_ml/model/data_sources.py
77
78
79
80
81
82
83
def list_available_tables(self) -> list[str]:
    """List tables with available data.

    Returns:
        List of table names (may include schema prefix).
    """
    ...

DerivaModel

Augmented interface to deriva model class.

This class provides a number of DerivaML specific methods that augment the interface in the deriva model class.

Attributes:

Name Type Description
model

ERMRest model for the catalog.

catalog ErmrestCatalog

ERMRest catalog for the model.

hostname

Hostname of the ERMRest server.

ml_schema

The ML schema name for the catalog.

domain_schemas

Frozenset of all domain schema names in the catalog.

default_schema

The default schema for table creation operations.

Source code in src/deriva_ml/model/catalog.py
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
class DerivaModel:
    """Augmented interface to deriva model class.

    This class provides a number of DerivaML specific methods that augment the interface in the deriva model class.

    Attributes:
        model: ERMRest model for the catalog.
        catalog: ERMRest catalog for the model.
        hostname: Hostname of the ERMRest server.
        ml_schema: The ML schema name for the catalog.
        domain_schemas: Frozenset of all domain schema names in the catalog.
        default_schema: The default schema for table creation operations.

    """

    def __init__(
        self,
        model: Model,
        ml_schema: str = ML_SCHEMA,
        domain_schemas: str | set[str] | None = None,
        default_schema: str | None = None,
    ):
        """Create and initialize a DerivaModel instance.

        This method will connect to a catalog and initialize schema configuration.
        This class is intended to be used as a base class on which domain-specific interfaces are built.

        Args:
            model: The ERMRest model for the catalog.
            ml_schema: The ML schema name.
            domain_schemas: Optional explicit set of domain schema names. If None,
                auto-detects all non-system schemas.
            default_schema: The default schema for table creation operations. If None
                and there is exactly one domain schema, that schema is used as default.
                If there are multiple domain schemas, default_schema must be specified.
        """
        self.model = model
        self.configuration = None
        self.catalog: ErmrestCatalog = self.model.catalog
        self.hostname = self.catalog.deriva_server.server if isinstance(self.catalog, ErmrestCatalog) else "localhost"

        self.ml_schema = ml_schema
        self._system_schemas = frozenset(SYSTEM_SCHEMAS | {ml_schema})

        # Determine domain schemas
        if domain_schemas is not None:
            if isinstance(domain_schemas, str):
                domain_schemas = {domain_schemas}
            self.domain_schemas = frozenset(domain_schemas)
        else:
            # Auto-detect all domain schemas
            self.domain_schemas = get_domain_schemas(self.model.schemas.keys(), ml_schema)

        # Determine default schema for table creation
        if default_schema is not None:
            if default_schema not in self.domain_schemas:
                raise DerivaMLException(
                    f"default_schema '{default_schema}' is not in domain_schemas: {self.domain_schemas}"
                )
            self.default_schema = default_schema
        elif len(self.domain_schemas) == 1:
            # Single domain schema - use it as default
            self.default_schema = next(iter(self.domain_schemas))
        elif len(self.domain_schemas) == 0:
            # No domain schemas - default_schema will be None
            self.default_schema = None
        else:
            # Multiple domain schemas, no explicit default
            self.default_schema = None

    def is_system_schema(self, schema_name: str) -> bool:
        """Check if a schema is a system or ML schema.

        Args:
            schema_name: Name of the schema to check.

        Returns:
            True if the schema is a system or ML schema.
        """
        return is_system_schema(schema_name, self.ml_schema)

    def is_domain_schema(self, schema_name: str) -> bool:
        """Check if a schema is a domain schema.

        Args:
            schema_name: Name of the schema to check.

        Returns:
            True if the schema is a domain schema.
        """
        return schema_name in self.domain_schemas

    def _require_default_schema(self) -> str:
        """Get default schema, raising an error if not set.

        Returns:
            The default schema name.

        Raises:
            DerivaMLException: If default_schema is not set.
        """
        if self.default_schema is None:
            raise DerivaMLException(
                f"No default_schema set. With multiple domain schemas {self.domain_schemas}, "
                "you must either specify a default_schema when creating DerivaML or "
                "pass an explicit schema parameter to this method."
            )
        return self.default_schema

    def refresh_model(self) -> None:
        self.model = self.catalog.getCatalogModel()

    @property
    def chaise_config(self) -> dict[str, Any]:
        """Return the chaise configuration."""
        return self.model.chaise_config

    def get_schema_description(self, include_system_columns: bool = False) -> dict[str, Any]:
        """Return a JSON description of the catalog schema structure.

        Provides a structured representation of the domain and ML schemas including
        tables, columns, foreign keys, and relationships. Useful for understanding
        the data model structure programmatically.

        Args:
            include_system_columns: If True, include RID, RCT, RMT, RCB, RMB columns.
                Default False to reduce output size.

        Returns:
            Dictionary with schema structure:
            {
                "domain_schemas": ["schema_name1", "schema_name2"],
                "default_schema": "schema_name1",
                "ml_schema": "deriva-ml",
                "schemas": {
                    "schema_name": {
                        "tables": {
                            "TableName": {
                                "comment": "description",
                                "is_vocabulary": bool,
                                "is_asset": bool,
                                "is_association": bool,
                                "columns": [...],
                                "foreign_keys": [...],
                                "features": [...]
                            }
                        }
                    }
                }
            }
        """
        system_columns = {"RID", "RCT", "RMT", "RCB", "RMB"}
        result = {
            "domain_schemas": sorted(self.domain_schemas),
            "default_schema": self.default_schema,
            "ml_schema": self.ml_schema,
            "schemas": {},
        }

        # Include all domain schemas and the ML schema
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if not schema:
                continue

            schema_info = {"tables": {}}

            for table_name, table in schema.tables.items():
                # Get columns
                columns = []
                for col in table.columns:
                    if not include_system_columns and col.name in system_columns:
                        continue
                    columns.append({
                        "name": col.name,
                        "type": str(col.type.typename),
                        "nullok": col.nullok,
                        "comment": col.comment or "",
                    })

                # Get foreign keys
                foreign_keys = []
                for fk in table.foreign_keys:
                    fk_cols = [c.name for c in fk.foreign_key_columns]
                    ref_cols = [c.name for c in fk.referenced_columns]
                    foreign_keys.append({
                        "columns": fk_cols,
                        "referenced_table": f"{fk.pk_table.schema.name}.{fk.pk_table.name}",
                        "referenced_columns": ref_cols,
                    })

                # Get features if this is a domain table
                features = []
                if self.is_domain_schema(schema_name):
                    try:
                        for f in self.find_features(table):
                            features.append({
                                "name": f.feature_name,
                                "feature_table": f.feature_table.name,
                            })
                    except Exception as e:
                        logger.debug(f"Could not enumerate features for table {table.name}: {e}")

                table_info = {
                    "comment": table.comment or "",
                    "is_vocabulary": self.is_vocabulary(table),
                    "is_asset": self.is_asset(table),
                    "is_association": bool(self.is_association(table)),
                    "columns": columns,
                    "foreign_keys": foreign_keys,
                }
                if features:
                    table_info["features"] = features

                schema_info["tables"][table_name] = table_info

            result["schemas"][schema_name] = schema_info

        return result

    def __getattr__(self, name: str) -> Any:
        # Called only if `name` is not found in Manager.  Delegate attributes to model class.
        return getattr(self.model, name)

    def name_to_table(self, table: TableInput) -> Table:
        """Return the table object corresponding to the given table name.

        Searches domain schemas first (in sorted order), then ML schema, then WWW.
        If the table name appears in more than one schema, returns the first match.

        Args:
          table: A ERMRest table object or a string that is the name of the table.

        Returns:
          Table object.

        Raises:
          DerivaMLException: If the table doesn't exist in any searchable schema.
        """
        if isinstance(table, Table):
            return table

        # Search domain schemas (sorted for deterministic order), then ML schema, then WWW
        search_order = [*sorted(self.domain_schemas), self.ml_schema, "WWW"]
        for sname in search_order:
            if sname not in self.model.schemas:
                continue
            s = self.model.schemas[sname]
            if table in s.tables:
                return s.tables[table]
        raise DerivaMLException(f"The table {table} doesn't exist.")

    def is_vocabulary(self, table_name: TableInput) -> bool:
        """Check if a given table is a controlled vocabulary table.

        Args:
          table_name: A ERMRest table object or the name of the table.

        Returns:
          Table object if the table is a controlled vocabulary, False otherwise.

        Raises:
          DerivaMLException: if the table doesn't exist.

        """
        vocab_columns = {"NAME", "URI", "SYNONYMS", "DESCRIPTION", "ID"}
        table = self.name_to_table(table_name)
        return vocab_columns.issubset({c.name.upper() for c in table.columns})

    def vocab_columns(self, table_name: TableInput) -> dict[str, str]:
        """Return mapping from canonical vocab column name to actual column name.

        Canonical names are TitleCase (Name, ID, URI, Description, Synonyms).
        Actual names reflect the table's schema — could be lowercase for
        FaceBase-style catalogs or TitleCase for DerivaML-native tables.

        Args:
            table_name: A table object or the name of the table.

        Returns:
            Dict mapping canonical name to actual column name in the table.
            E.g. ``{"Name": "name", "ID": "id", ...}`` for FaceBase tables
            or ``{"Name": "Name", "ID": "ID", ...}`` for DerivaML tables.
        """
        table = self.name_to_table(table_name)
        col_map = {c.name.upper(): c.name for c in table.columns}
        return {canon: col_map[canon.upper()] for canon in ("Name", "ID", "URI", "Description", "Synonyms")}

    def is_association(
        self,
        table_name: str | Table,
        unqualified: bool = True,
        pure: bool = True,
        min_arity: int = 2,
        max_arity: int = 2,
    ) -> bool | set[str] | int:
        """Check the specified table to see if it is an association table.

        Args:
            table_name: param unqualified:
            pure: return: (Default value = True)
            table_name: str | Table:
            unqualified:  (Default value = True)

        Returns:


        """
        table = self.name_to_table(table_name)
        return table.is_association(unqualified=unqualified, pure=pure, min_arity=min_arity, max_arity=max_arity)

    def find_association(self, table1: Table | str, table2: Table | str) -> tuple[Table, Column, Column]:
        """Given two tables, return an association table that connects the two and the two columns used to link them..

        Raises:
            DerivaML exception if there is either not an association table or more than one association table.
        """
        table1 = self.name_to_table(table1)
        table2 = self.name_to_table(table2)

        tables = [
            (a.table, a.self_fkey.columns[0].name, other_key.columns[0].name)
            for a in table1.find_associations(pure=False)
            if len(a.other_fkeys) == 1 and (other_key := a.other_fkeys.pop()).pk_table == table2
        ]

        if len(tables) == 1:
            return tables[0]
        elif len(tables) == 0:
            raise DerivaMLException(f"No association tables found between {table1.name} and {table2.name}.")
        else:
            raise DerivaMLException(
                f"There are {len(tables)} association tables between {table1.name} and {table2.name}."
            )

    def is_asset(self, table_name: TableInput) -> bool:
        """True if the specified table is an asset table.

        Args:
            table_name: str | Table:

        Returns:
            True if the specified table is an asset table, False otherwise.

        """
        asset_columns = {"Filename", "URL", "Length", "MD5", "Description"}
        table = self.name_to_table(table_name)
        return asset_columns.issubset({c.name for c in table.columns})

    def find_assets(self, with_metadata: bool = False) -> list[Table]:
        """Return the list of asset tables in the current model"""
        return [t for s in self.model.schemas.values() for t in s.tables.values() if self.is_asset(t)]

    def find_vocabularies(self) -> list[Table]:
        """Return a list of all controlled vocabulary tables in domain and ML schemas."""
        tables = []
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if schema:
                tables.extend(t for t in schema.tables.values() if self.is_vocabulary(t))
        return tables

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def find_features(self, table: TableInput | None = None) -> Iterable[Feature]:
        """List features in the catalog.

        If a table is specified, returns only features for that table.
        If no table is specified, returns all features across all tables in the catalog.

        Args:
            table: Optional table to find features for. If None, returns all features
                in the catalog.

        Returns:
            An iterable of Feature instances describing the features.
        """

        def is_feature(a: FindAssociationResult) -> bool:
            """Check if association represents a feature.

            Args:
                a: Association result to check
            Returns:
                bool: True if association represents a feature
            """
            return {
                "Feature_Name",
                "Execution",
                a.self_fkey.foreign_key_columns[0].name,
            }.issubset({c.name for c in a.table.columns})

        def find_table_features(t: Table) -> list[Feature]:
            """Find all features for a single table."""
            return [
                Feature(a, self) for a in t.find_associations(min_arity=3, max_arity=3, pure=False) if is_feature(a)
            ]

        if table is not None:
            # Find features for a specific table
            return find_table_features(self.name_to_table(table))
        else:
            # Find all features across all domain and ML schema tables
            features: list[Feature] = []
            for schema_name in [*self.domain_schemas, self.ml_schema]:
                schema = self.model.schemas.get(schema_name)
                if schema:
                    for t in schema.tables.values():
                        features.extend(find_table_features(t))
            return features

    def lookup_feature(self, table: TableInput, feature_name: str) -> Feature:
        """Lookup the named feature associated with the provided table.

        Args:
            table: param feature_name:
            table: str | Table:
            feature_name: str:

        Returns:
            A Feature class that represents the requested feature.

        Raises:
          DerivaMLException: If the feature cannot be found.
        """
        table = self.name_to_table(table)
        try:
            return [f for f in self.find_features(table) if f.feature_name == feature_name][0]
        except IndexError:
            raise DerivaMLException(f"Feature {table.name}:{feature_name} doesn't exist.")

    def asset_metadata(self, table: str | Table) -> set[str]:
        """Return the metadata columns for an asset table."""

        table = self.name_to_table(table)

        if not self.is_asset(table):
            raise DerivaMLTableTypeError("asset table", table.name)
        return {c.name for c in table.columns} - DerivaAssetColumns

    def apply(self) -> None:
        """Call ERMRestModel.apply"""
        if self.catalog == "file-system":
            raise DerivaMLException("Cannot apply() to non-catalog model.")
        else:
            self.model.apply()

    def is_dataset_rid(self, rid: RID, deleted: bool = False) -> bool:
        """Check if a given RID is a dataset RID."""
        try:
            rid_info = self.model.catalog.resolve_rid(rid, self.model)
        except KeyError as _e:
            raise DerivaMLException(f"Invalid RID {rid}")
        if rid_info.table.name != "Dataset":
            return False
        elif deleted:
            # Got a dataset rid. Now check to see if its deleted or not.
            return True
        else:
            return not list(rid_info.datapath.entities().fetch())[0]["Deleted"]

    def list_dataset_element_types(self) -> list[Table]:
        """
        Lists the data types of elements contained within a dataset.

        This method analyzes the dataset and identifies the data types for all
        elements within it. It is useful for understanding the structure and
        content of the dataset and allows for better manipulation and usage of its
        data.

        Returns:
            list[str]: A list of strings where each string represents a data type
            of an element found in the dataset.

        """

        dataset_table = self.name_to_table("Dataset")

        def is_domain_or_dataset_table(table: Table) -> bool:
            return self.is_domain_schema(table.schema.name) or table.name == dataset_table.name

        return [t for a in dataset_table.find_associations() if is_domain_or_dataset_table(t := a.other_fkeys.pop().pk_table)]

    def _build_join_tree(
        self,
        element_name: str,
        include_tables: set[str],
        all_paths: list[list[Table]],
    ) -> JoinNode:
        """Build a JoinTree rooted at *element_name* that reaches all *include_tables*.

        The algorithm:

        1. Collect all FK paths from `_schema_to_paths()` that start at the element
           table and end at a table in *include_tables*.
        2. For each target table, pick the SHORTEST sub-path from the element.
           If a longer path exists but ALL its intermediates are in *include_tables*,
           prefer it (user disambiguated).  If multiple equally-short paths exist
           and cannot be disambiguated, raise an ambiguity error.
        3. Merge the selected paths into a tree rooted at the element.
        4. Mark association tables (``is_association=True``) so their columns are
           excluded from output but they are still JOINed through.
        5. Set ``join_type="left"`` when the FK column is nullable.

        Args:
            element_name: The dataset element table (tree root), e.g. ``"Image"``.
            include_tables: Set of table names the user wants in the output.
            all_paths: All FK paths from ``_schema_to_paths()``.

        Returns:
            A ``JoinNode`` tree rooted at the element table.

        Raises:
            DerivaMLException: If ambiguous paths cannot be resolved.
        """
        element_table = self.name_to_table(element_name)

        # ── Step 1: collect sub-paths from element to each include_table ─────
        # Each "all_path" has the structure [Dataset, assoc, element, ..., endpoint].
        # We extract the sub-path starting from the element: [element, ..., endpoint].
        subpaths_by_target: dict[str, list[list[Table]]] = defaultdict(list)

        for path in all_paths:
            if len(path) < 3:
                continue
            if path[2].name != element_name:
                continue
            endpoint = path[-1].name
            if endpoint not in include_tables:
                continue
            # Sub-path from element onward
            sub = path[2:]  # [element, ..., endpoint]
            subpaths_by_target[endpoint].append(sub)

        # The element itself (self-path of length 1)
        if element_name in include_tables:
            subpaths_by_target.setdefault(element_name, []).append([element_table])

        # ── Step 2: for each target, pick the best path ──────────────────────
        selected_subpaths: dict[str, list[Table]] = {}

        for target, subpaths in subpaths_by_target.items():
            if target == element_name:
                # Self-path: no join needed
                selected_subpaths[target] = [element_table]
                continue

            # Deduplicate by table-name signature
            seen_sigs: set[tuple[str, ...]] = set()
            unique: list[list[Table]] = []
            for sp in subpaths:
                sig = tuple(t.name for t in sp)
                if sig not in seen_sigs:
                    seen_sigs.add(sig)
                    unique.append(sp)

            if len(unique) == 1:
                selected_subpaths[target] = unique[0]
                continue

            # Multiple paths — disambiguate.
            # Intermediates are tables between element (sp[0]) and endpoint (sp[-1]).
            path_intermediates = [tuple(t.name for t in sp[1:-1]) for sp in unique]

            # If all have identical intermediates, no ambiguity
            if len(set(path_intermediates)) <= 1:
                selected_subpaths[target] = unique[0]
                continue

            # A path is "selected" if all its non-association intermediates are
            # in include_tables.  Association tables (M:N link tables) are
            # infrastructure that the user shouldn't need to name explicitly —
            # they are transparently included in the join chain.
            #
            # We detect association tables by checking if the Table object has
            # exactly 2 FKs (the definition of a pure association table).
            # This works regardless of model context (bag or catalog).
            def _is_likely_association(tbl: Table) -> bool:
                """Check if table is an association table (M:N link table).

                An association table has only system columns (RID, RCT, RMT,
                RCB, RMB) plus FK columns to the tables it connects.  ERMrest's
                built-in is_association() counts system FKs (RCB/RMB → ERMrest_Client),
                so we use our own check that ignores them.
                """
                system_cols = {'RID', 'RCT', 'RMT', 'RCB', 'RMB'}
                try:
                    cols = {c.name for c in tbl.columns}
                    fks = list(tbl.foreign_keys)
                    # Domain FKs: those NOT to system tables like ERMrest_Client
                    domain_fks = [
                        fk for fk in fks
                        if fk.pk_table.name not in ('ERMrest_Client', 'ERMrest_Group')
                    ]
                    # FK column names
                    fk_col_names = set()
                    for fk in domain_fks:
                        for col in fk.columns:
                            fk_col_names.add(col.name if hasattr(col, 'name') else str(col))
                    # Non-system, non-FK columns
                    user_cols = cols - system_cols - fk_col_names
                    # Association = exactly 2 domain FKs and no other user columns
                    return len(domain_fks) == 2 and len(user_cols) == 0
                except Exception:
                    return False

            def _intermediates_covered(sp: list[Table], ints: tuple[str, ...]) -> bool:
                sp_tables = {t.name: t for t in sp}
                for t in ints:
                    if t in include_tables:
                        continue
                    tbl = sp_tables.get(t)
                    if tbl is not None and _is_likely_association(tbl):
                        continue  # transparent — doesn't need to be in include_tables
                    return False
                return True

            fully_covered = [
                (sp, ints)
                for sp, ints in zip(unique, path_intermediates)
                if _intermediates_covered(sp, ints)
            ]

            if len(fully_covered) == 1:
                sp, ints = fully_covered[0]
                if len(ints) > 0:
                    # User explicitly included intermediates
                    selected_subpaths[target] = sp
                    continue
                # Direct path (no intermediates) — check if there are indirect paths
                has_indirect = any(len(i) > 0 for i in path_intermediates)
                if not has_indirect:
                    selected_subpaths[target] = sp
                    continue
                # Direct FK alongside indirect — prefer direct (shortest)
                selected_subpaths[target] = sp
                continue

            if len(fully_covered) > 1:
                # Multiple fully-covered paths
                has_explicit = [(sp, ints) for sp, ints in fully_covered if len(ints) > 0]
                if len(has_explicit) == 1:
                    selected_subpaths[target] = has_explicit[0][0]
                    continue
                elif len(has_explicit) == 0:
                    # All direct paths — pick shortest
                    shortest = min(fully_covered, key=lambda x: len(x[0]))
                    selected_subpaths[target] = shortest[0]
                    continue
                else:
                    # Multiple explicit — prefer longest (most specific)
                    max_ints = max(len(ints) for _, ints in has_explicit)
                    longest = [sp for sp, ints in has_explicit if len(ints) == max_ints]
                    if len(longest) == 1:
                        selected_subpaths[target] = longest[0]
                        continue

            if len(fully_covered) == 0:
                # No path is fully covered.  Check if direct path exists.
                direct = [sp for sp, ints in zip(unique, path_intermediates) if len(ints) == 0]
                if len(direct) == 1:
                    selected_subpaths[target] = direct[0]
                    continue

            # Ambiguity error
            path_descriptions = []
            all_ints: set[str] = set()
            for sp, ints in zip(unique, path_intermediates):
                names = [t.name for t in sp]
                path_descriptions.append(" → ".join(names))
                all_ints.update(ints)

            suggestion_tables = all_ints - include_tables
            suggestion = ""
            if suggestion_tables:
                suggestion = (
                    f"\nInclude an intermediate table to disambiguate "
                    f"(e.g., add {', '.join(sorted(suggestion_tables))} to include_tables)."
                )

            raise DerivaMLException(
                f"Ambiguous path between {element_name} and {target}: "
                f"found {len(unique)} FK paths:\n"
                + "\n".join(f"  {d}" for d in path_descriptions)
                + suggestion
            )

        # ── Step 3: merge selected paths into a tree ─────────────────────────
        # Build the tree by inserting each selected sub-path into the tree.
        root = JoinNode(
            table=element_table,
            table_name=element_name,
            join_type="inner",
            fk_columns=None,
            is_association=bool(self.is_association(element_name)),
            children=[],
        )

        # Map table_name -> JoinNode for quick lookup during tree building
        node_map: dict[str, JoinNode] = {element_name: root}

        for target, subpath in selected_subpaths.items():
            if target == element_name:
                continue
            # subpath = [element, ..intermediate.., target]
            # Walk the subpath, creating nodes as needed
            for i in range(1, len(subpath)):
                child_table = subpath[i]
                child_name = child_table.name
                parent_table = subpath[i - 1]
                parent_name = parent_table.name

                if child_name in node_map:
                    continue  # Already in tree

                # Get FK column pairs
                col_pairs = self._table_relationship(parent_table, child_table)

                # Determine join type: LEFT for nullable FK columns
                join_type = "inner"
                for fk_col, pk_col in col_pairs:
                    if fk_col.nullok:
                        join_type = "left"
                        break

                node = JoinNode(
                    table=child_table,
                    table_name=child_name,
                    join_type=join_type,
                    fk_columns=col_pairs,
                    is_association=bool(self.is_association(child_name)),
                    children=[],
                )
                node_map[child_name] = node
                # Attach to parent
                if parent_name in node_map:
                    node_map[parent_name].children.append(node)
                else:
                    # Parent not yet in tree — this shouldn't happen since we
                    # process paths from element outward, but handle gracefully
                    logger.warning(
                        f"Parent {parent_name} not in tree when adding {child_name}"
                    )

        return root

    def _prepare_wide_table(
        self, dataset, dataset_rid: RID, include_tables: list[str]
    ) -> tuple[dict[str, Any], list[tuple], bool]:
        """Generate a join plan for denormalizing a dataset into a wide table.

        Uses a **JoinTree** approach that preserves path-specific structure:

        1. **Path discovery** -- ``_schema_to_paths()`` discovers all FK paths
           from Dataset through the schema.
        2. **Path filtering & deduplication** -- keep only paths relevant to
           *include_tables*, dedup duplicate association table routes.
        3. **JoinTree construction** -- for each element type, build a tree
           rooted at the element.  Each node is a table to JOIN; association
           tables are in the tree (for JOIN) but excluded from output columns.
           Nullable FK columns produce LEFT JOINs.
        4. **Flatten to legacy format** -- convert the tree to the
           ``(path, join_conditions, join_types)`` tuple expected by
           ``_denormalize()`` and ``_denormalize_datapath()``.

        Args:
            dataset: A DatasetLike object (DatasetBag or Dataset).
            dataset_rid: RID of the dataset.
            include_tables: List of table names to include in the output.

        Returns:
            ``(element_tables, denormalized_columns, multi_schema)`` where:

            - **element_tables** -- ``dict[str, (path, join_conditions, join_types)]``
              keyed by element table name.
              *path* is a list of table name strings in JOIN order (pre-order walk
              of the JoinTree, starting with "Dataset").
              *join_conditions* maps ``table_name -> set[(fk_col, pk_col)]``.
              *join_types* maps ``table_name -> "inner" | "left"``.
            - **denormalized_columns** -- list of
              ``(schema_name, table_name, column_name, type_name)`` for the output.
            - **multi_schema** -- True if output spans multiple domain schemas.
        """
        include_tables_set = set(include_tables)
        for t in include_tables_set:
            _ = self.name_to_table(t)  # validate existence

        # ── Phase 1: path discovery ──────────────────────────────────────────
        all_paths = self._schema_to_paths()

        # Filter paths: must end at a table in include_tables AND
        # have at least one table in include_tables along the path.
        table_paths = [
            path
            for path in all_paths
            if path[-1].name in include_tables_set
            and include_tables_set.intersection({p.name for p in path})
        ]

        # ── Phase 1b: deduplicate association table routes ───────────────────
        # In some catalogs (e.g., eye-ai), both Image_Dataset and Dataset_Image
        # exist.  Keep only one route per (element, endpoint) via different
        # association tables (path[1]).
        deduplicated_paths: list[list[Table]] = []
        seen_element_endpoint: dict[tuple[str, str], tuple[list[Table], Table]] = {}

        def _is_standard_assoc(assoc_name: str, element_name: str) -> bool:
            """Check if assoc table matches the Dataset_{Element} naming pattern."""
            return assoc_name == f"Dataset_{element_name}"

        for path in table_paths:
            if len(path) < 3:
                deduplicated_paths.append(path)
                continue
            assoc_table = path[1]
            element = path[2]
            endpoint = path[-1]
            key = (element.name, endpoint.name)

            if key not in seen_element_endpoint:
                seen_element_endpoint[key] = (path, assoc_table)
                deduplicated_paths.append(path)
            else:
                existing_path, existing_assoc = seen_element_endpoint[key]
                if existing_assoc.name != assoc_table.name:
                    # Duplicate route via different association table.
                    # Prefer the standard Dataset_{Element} pattern over legacy.
                    if _is_standard_assoc(assoc_table.name, element.name) and not _is_standard_assoc(existing_assoc.name, element.name):
                        # Replace existing with standard pattern
                        deduplicated_paths = [p for p in deduplicated_paths if not (len(p) >= 3 and (p[2].name, p[-1].name) == key)]
                        seen_element_endpoint[key] = (path, assoc_table)
                        deduplicated_paths.append(path)
                    # else: keep existing (either it's standard or both are non-standard)
                else:
                    deduplicated_paths.append(path)

        table_paths = deduplicated_paths

        # ── Phase 1c: group by element, filter to elements in include_tables ─
        paths_by_element: dict[str, list[list[Table]]] = defaultdict(list)
        for p in table_paths:
            if len(p) >= 3:
                paths_by_element[p[2].name].append(p)

        paths_by_element = {
            elem: paths
            for elem, paths in paths_by_element.items()
            if elem in include_tables_set
        }

        # ── Phase 2: build JoinTree per element ──────────────────────────────
        skip_columns = {"RCT", "RMT", "RCB", "RMB"}
        element_tables: dict[str, tuple[list[str], dict[str, set], dict[str, str]]] = {}

        for element_name, paths in paths_by_element.items():
            tree = self._build_join_tree(element_name, include_tables_set, table_paths)

            # ── Phase 3: flatten JoinTree to legacy format ───────────────────
            # Pre-order walk gives us the correct JOIN order.
            # We prepend "Dataset" and the association table that connects
            # Dataset to the element (taken from paths[0][0:3]).

            # Find the Dataset -> assoc -> element prefix from the first path
            if paths and len(paths[0]) >= 3:
                dataset_name = paths[0][0].name  # "Dataset"
                assoc_name = paths[0][1].name    # e.g. "Dataset_Image"
            else:
                dataset_name = "Dataset"
                assoc_name = None

            # Walk the tree to get the join order (element -> children)
            tree_nodes = tree.walk()

            # Build the legacy path: [Dataset, assoc, element, ...tree children...]
            path_names: list[str] = [dataset_name]
            if assoc_name:
                path_names.append(assoc_name)

            # Add tree nodes (element first, then its subtree in pre-order)
            for node in tree_nodes:
                if node.table_name not in path_names:
                    path_names.append(node.table_name)

            # Build join conditions and join types from the tree edges
            join_conditions: dict[str, set[tuple]] = {}
            join_types: dict[str, str] = {}

            # First, add the Dataset -> assoc and assoc -> element conditions
            if assoc_name:
                dataset_table = self.name_to_table(dataset_name)
                assoc_table_obj = self.name_to_table(assoc_name)
                try:
                    col_pairs = self._table_relationship(dataset_table, assoc_table_obj)
                    join_conditions[assoc_name] = set(col_pairs)
                    join_types[assoc_name] = "inner"
                except DerivaMLException:
                    pass

                try:
                    col_pairs = self._table_relationship(assoc_table_obj, tree.table)
                    join_conditions[tree.table_name] = set(col_pairs)
                    join_types[tree.table_name] = "inner"
                except DerivaMLException:
                    pass

            # Add conditions from the JoinTree edges
            for parent_node, child_node in tree.walk_edges():
                if child_node.fk_columns:
                    join_conditions[child_node.table_name] = set(child_node.fk_columns)
                    join_types[child_node.table_name] = child_node.join_type

            element_tables[element_name] = (path_names, join_conditions, join_types)

        # ── Phase 4: build denormalized column list ──────────────────────────
        denormalized_columns = []
        for table_name in include_tables_set:
            if self.is_association(table_name):
                continue
            table = self.name_to_table(table_name)
            for c in table.columns:
                if c.name not in skip_columns:
                    denormalized_columns.append(
                        (table.schema.name, table_name, c.name, c.type.typename)
                    )

        output_schemas = {s for s, _, _, _ in denormalized_columns if self.is_domain_schema(s)}
        multi_schema = len(output_schemas) > 1

        return element_tables, denormalized_columns, multi_schema

    def _table_relationship(
        self,
        table1: TableInput,
        table2: TableInput,
    ) -> list[tuple[Column, Column]]:
        """Return column pairs used to relate two tables.

        For simple FKs, returns a single-element list: [(fk_col, pk_col)].
        For composite FKs, returns multiple pairs: [(fk_col1, pk_col1), (fk_col2, pk_col2)].

        Each FK constraint counts as one relationship (even if composite),
        so ambiguity is detected when multiple separate FK constraints exist
        between the same two tables.
        """
        table1 = self.name_to_table(table1)
        table2 = self.name_to_table(table2)
        # Each FK constraint produces a list of (fk_col, pk_col) pairs
        relationships: list[list[tuple[Column, Column]]] = []
        for fk in table1.foreign_keys:
            if fk.pk_table == table2:
                pairs = list(zip(fk.foreign_key_columns, fk.referenced_columns))
                relationships.append(pairs)
        for fk in table1.referenced_by:
            if fk.table == table2:
                pairs = list(zip(fk.referenced_columns, fk.foreign_key_columns))
                relationships.append(pairs)

        if len(relationships) == 0:
            raise DerivaMLException(
                f"No FK relationship found between {table1.name} and {table2.name}. "
                f"These tables may not be directly connected. Check your include_tables list."
            )
        if len(relationships) > 1:
            path_descriptions = []
            for col_pairs in relationships:
                desc = ", ".join(
                    f"{fk_col.table.name}.{fk_col.name} → {pk_col.table.name}.{pk_col.name}"
                    for fk_col, pk_col in col_pairs
                )
                path_descriptions.append(f"  {desc}")
            raise DerivaMLException(
                f"Ambiguous linkage between {table1.name} and {table2.name}: "
                f"found {len(relationships)} FK relationships:\n"
                + "\n".join(path_descriptions)
            )
        return relationships[0]

    # Default tables to skip during FK path traversal.
    # These are ML schema tables that create unwanted traversal branches:
    # - Dataset_Dataset: nested dataset self-reference (handled separately)
    # - Execution: execution tracking (not useful for data traversal)
    _DEFAULT_SKIP_TABLES = frozenset({"Dataset_Dataset", "Execution"})

    def _schema_to_paths(
        self,
        root: Table | None = None,
        path: list[Table] | None = None,
        exclude_tables: set[str] | None = None,
        skip_tables: frozenset[str] | None = None,
        max_depth: int | None = None,
    ) -> list[list[Table]]:
        """Discover all FK paths through the schema graph via depth-first traversal.

        This is the shared foundation for both bag export (catalog_graph._collect_paths)
        and denormalization (_prepare_wide_table). Changes here affect both systems.

        Traversal rules:
        - Follows both outbound FKs (table.foreign_keys) and inbound FKs (table.referenced_by)
        - Only traverses tables in valid schemas (domain + ML)
        - Terminates at vocabulary tables (paths go INTO vocabs but not OUT)
        - Skips tables in exclude_tables and skip_tables
        - Detects and skips cycles (same table appearing twice in a path)
        - Prevents dataset element loopback (traversing back to Dataset via element associations)
        - When multiple FKs exist between the same two domain tables, deduplicates
          arcs to avoid redundant paths (keeps one arc per target table)

        Args:
            root: Starting table. Defaults to the Dataset table in the ML schema.
            path: Current path being built (used during recursion).
            exclude_tables: Caller-specified table names to skip. These tables and
                all paths through them are pruned from the result.
            skip_tables: Infrastructure table names to skip. Defaults to
                _DEFAULT_SKIP_TABLES (Dataset_Dataset, Execution). Override to
                customize which ML schema tables are excluded from traversal.
            max_depth: Maximum path length (number of tables). None = unlimited.
                Use to protect against pathological schemas with deep chains.

        Returns:
            List of paths, where each path is a list of Table objects starting
            from root. Every prefix of a path is also included (e.g., if
            [Dataset, A, B, C] is a path, then [Dataset], [Dataset, A], and
            [Dataset, A, B] are also in the result).
        """
        exclude_tables = exclude_tables or set()
        skip_tables = skip_tables if skip_tables is not None else self._DEFAULT_SKIP_TABLES

        root = root or self.model.schemas[self.ml_schema].tables["Dataset"]
        path = path.copy() if path else []
        parent = path[-1] if path else None  # Table we are coming from.
        path.append(root)
        paths = [path]

        # Depth limit check
        if max_depth is not None and len(path) >= max_depth:
            return paths

        def find_arcs(table: Table) -> set[Table]:
            """Return reachable tables via FK arcs, deduplicating multi-FK targets."""
            valid_schemas = self.domain_schemas | {self.ml_schema}
            arc_list = (
                [fk.pk_table for fk in table.foreign_keys]
                + [fk.table for fk in table.referenced_by]
            )
            arc_list = [t for t in arc_list if t.schema.name in valid_schemas]
            # Deduplicate: when multiple FKs point to the same target table,
            # keep only one arc. This prevents redundant path branching.
            # Downstream code (_prepare_wide_table, _table_relationship) handles
            # the specific FK selection and ambiguity detection.
            seen = set()
            deduped = []
            for t in arc_list:
                if t not in seen:
                    seen.add(t)
                    deduped.append(t)
            return set(deduped)

        def is_nested_dataset_loopback(n1: Table, n2: Table) -> bool:
            """Check if traversal would loop back to Dataset via an element association.

            Prevents: Subject -> Dataset_Subject -> Dataset (looping back to root).
            Allows: Dataset -> Dataset_Subject -> Subject (the intended direction).
            """
            dataset_table = self.model.schemas[self.ml_schema].tables["Dataset"]
            assoc_table = [a for a in dataset_table.find_associations() if a.table == n2]
            return len(assoc_table) == 1 and n1 != dataset_table

        # Vocabulary tables are terminal — traverse INTO but not OUT.
        if self.is_vocabulary(root):
            return paths

        for child in find_arcs(root):
            if child.name in skip_tables:
                continue
            if child.name in exclude_tables:
                continue
            if child == parent:
                # Don't loop back to immediate parent via referenced_by
                continue
            if is_nested_dataset_loopback(root, child):
                continue
            if child in path:
                # Cycle detected — skip to avoid infinite recursion.
                logger.warning(
                    f"Cycle in schema path: {child.name} "
                    f"path:{[p.name for p in path]}, skipping"
                )
                continue

            paths.extend(
                self._schema_to_paths(child, path, exclude_tables, skip_tables, max_depth)
            )
        return paths

    def create_table(self, table_def: TableDefinition, schema: str | None = None) -> Table:
        """Create a new table from TableDefinition.

        Args:
            table_def: Table definition (dataclass or dict).
            schema: Schema to create the table in. If None, uses default_schema.

        Returns:
            The newly created Table.

        Raises:
            DerivaMLException: If no schema specified and default_schema is not set.

        Note: @validate_call removed because TableDefinition is now a dataclass from
        deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.
        """
        schema = schema or self._require_default_schema()
        # Handle both TableDefinition (dataclass with to_dict) and plain dicts
        table_dict = table_def.to_dict() if hasattr(table_def, 'to_dict') else table_def
        return self.model.schemas[schema].create_table(table_dict)

    def _define_association(
        self,
        associates: list,
        metadata: list | None = None,
        table_name: str | None = None,
        comment: str | None = None,
        **kwargs,
    ) -> dict:
        """Build an association table definition with vocab-aware key selection.

        Wraps Table.define_association to ensure non-vocabulary tables use RID
        as their foreign key target. The default key search heuristic in
        define_association prefers Name/ID keys over RID, which is correct for
        vocabulary tables (FK to human-readable Name) but wrong for domain
        tables that happen to have non-nullable Name or ID keys (e.g., tables
        in cloned catalogs like FaceBase).

        Args:
            associates: Reference targets being associated (Table, Key, or tuples).
            metadata: Additional metadata fields and/or reference targets.
            table_name: Name for the association table.
            comment: Comment for the association table.
            **kwargs: Additional arguments passed to Table.define_association.

        Returns:
            Table definition dict suitable for create_table.
        """
        metadata = metadata or []

        def _resolve_key(ref):
            """Convert non-vocabulary Table references to their RID Key."""
            if isinstance(ref, tuple):
                # (name, Table) or (name, nullok, Table) — resolve the Table element
                items = list(ref)
                table_obj = items[-1]
                if isinstance(table_obj, Table) and not table_obj.is_vocabulary():
                    items[-1] = table_obj.key_by_columns(["RID"])
                return tuple(items)
            elif isinstance(ref, Table) and not ref.is_vocabulary():
                return ref.key_by_columns(["RID"])
            return ref  # Key objects or vocabulary Tables pass through

        resolved_associates = [_resolve_key(a) for a in associates]
        resolved_metadata = [_resolve_key(m) for m in metadata]

        return Table.define_association(
            associates=resolved_associates,
            metadata=resolved_metadata,
            table_name=table_name,
            comment=comment,
            **kwargs,
        )

chaise_config property

chaise_config: dict[str, Any]

Return the chaise configuration.

__init__

__init__(
    model: Model,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: str
    | set[str]
    | None = None,
    default_schema: str | None = None,
)

Create and initialize a DerivaModel instance.

This method will connect to a catalog and initialize schema configuration. This class is intended to be used as a base class on which domain-specific interfaces are built.

Parameters:

Name Type Description Default
model Model

The ERMRest model for the catalog.

required
ml_schema str

The ML schema name.

ML_SCHEMA
domain_schemas str | set[str] | None

Optional explicit set of domain schema names. If None, auto-detects all non-system schemas.

None
default_schema str | None

The default schema for table creation operations. If None and there is exactly one domain schema, that schema is used as default. If there are multiple domain schemas, default_schema must be specified.

None
Source code in src/deriva_ml/model/catalog.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __init__(
    self,
    model: Model,
    ml_schema: str = ML_SCHEMA,
    domain_schemas: str | set[str] | None = None,
    default_schema: str | None = None,
):
    """Create and initialize a DerivaModel instance.

    This method will connect to a catalog and initialize schema configuration.
    This class is intended to be used as a base class on which domain-specific interfaces are built.

    Args:
        model: The ERMRest model for the catalog.
        ml_schema: The ML schema name.
        domain_schemas: Optional explicit set of domain schema names. If None,
            auto-detects all non-system schemas.
        default_schema: The default schema for table creation operations. If None
            and there is exactly one domain schema, that schema is used as default.
            If there are multiple domain schemas, default_schema must be specified.
    """
    self.model = model
    self.configuration = None
    self.catalog: ErmrestCatalog = self.model.catalog
    self.hostname = self.catalog.deriva_server.server if isinstance(self.catalog, ErmrestCatalog) else "localhost"

    self.ml_schema = ml_schema
    self._system_schemas = frozenset(SYSTEM_SCHEMAS | {ml_schema})

    # Determine domain schemas
    if domain_schemas is not None:
        if isinstance(domain_schemas, str):
            domain_schemas = {domain_schemas}
        self.domain_schemas = frozenset(domain_schemas)
    else:
        # Auto-detect all domain schemas
        self.domain_schemas = get_domain_schemas(self.model.schemas.keys(), ml_schema)

    # Determine default schema for table creation
    if default_schema is not None:
        if default_schema not in self.domain_schemas:
            raise DerivaMLException(
                f"default_schema '{default_schema}' is not in domain_schemas: {self.domain_schemas}"
            )
        self.default_schema = default_schema
    elif len(self.domain_schemas) == 1:
        # Single domain schema - use it as default
        self.default_schema = next(iter(self.domain_schemas))
    elif len(self.domain_schemas) == 0:
        # No domain schemas - default_schema will be None
        self.default_schema = None
    else:
        # Multiple domain schemas, no explicit default
        self.default_schema = None

apply

apply() -> None

Call ERMRestModel.apply

Source code in src/deriva_ml/model/catalog.py
579
580
581
582
583
584
def apply(self) -> None:
    """Call ERMRestModel.apply"""
    if self.catalog == "file-system":
        raise DerivaMLException("Cannot apply() to non-catalog model.")
    else:
        self.model.apply()

asset_metadata

asset_metadata(
    table: str | Table,
) -> set[str]

Return the metadata columns for an asset table.

Source code in src/deriva_ml/model/catalog.py
570
571
572
573
574
575
576
577
def asset_metadata(self, table: str | Table) -> set[str]:
    """Return the metadata columns for an asset table."""

    table = self.name_to_table(table)

    if not self.is_asset(table):
        raise DerivaMLTableTypeError("asset table", table.name)
    return {c.name for c in table.columns} - DerivaAssetColumns

create_table

create_table(
    table_def: TableDefinition,
    schema: str | None = None,
) -> Table

Create a new table from TableDefinition.

Parameters:

Name Type Description Default
table_def TableDefinition

Table definition (dataclass or dict).

required
schema str | None

Schema to create the table in. If None, uses default_schema.

None

Returns:

Type Description
Table

The newly created Table.

Raises:

Type Description
DerivaMLException

If no schema specified and default_schema is not set.

Note: @validate_call removed because TableDefinition is now a dataclass from deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.

Source code in src/deriva_ml/model/catalog.py
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
def create_table(self, table_def: TableDefinition, schema: str | None = None) -> Table:
    """Create a new table from TableDefinition.

    Args:
        table_def: Table definition (dataclass or dict).
        schema: Schema to create the table in. If None, uses default_schema.

    Returns:
        The newly created Table.

    Raises:
        DerivaMLException: If no schema specified and default_schema is not set.

    Note: @validate_call removed because TableDefinition is now a dataclass from
    deriva.core.typed and Pydantic validation doesn't work well with dataclass fields.
    """
    schema = schema or self._require_default_schema()
    # Handle both TableDefinition (dataclass with to_dict) and plain dicts
    table_dict = table_def.to_dict() if hasattr(table_def, 'to_dict') else table_def
    return self.model.schemas[schema].create_table(table_dict)

find_assets

find_assets(
    with_metadata: bool = False,
) -> list[Table]

Return the list of asset tables in the current model

Source code in src/deriva_ml/model/catalog.py
489
490
491
def find_assets(self, with_metadata: bool = False) -> list[Table]:
    """Return the list of asset tables in the current model"""
    return [t for s in self.model.schemas.values() for t in s.tables.values() if self.is_asset(t)]

find_association

find_association(
    table1: Table | str,
    table2: Table | str,
) -> tuple[Table, Column, Column]

Given two tables, return an association table that connects the two and the two columns used to link them..

Source code in src/deriva_ml/model/catalog.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def find_association(self, table1: Table | str, table2: Table | str) -> tuple[Table, Column, Column]:
    """Given two tables, return an association table that connects the two and the two columns used to link them..

    Raises:
        DerivaML exception if there is either not an association table or more than one association table.
    """
    table1 = self.name_to_table(table1)
    table2 = self.name_to_table(table2)

    tables = [
        (a.table, a.self_fkey.columns[0].name, other_key.columns[0].name)
        for a in table1.find_associations(pure=False)
        if len(a.other_fkeys) == 1 and (other_key := a.other_fkeys.pop()).pk_table == table2
    ]

    if len(tables) == 1:
        return tables[0]
    elif len(tables) == 0:
        raise DerivaMLException(f"No association tables found between {table1.name} and {table2.name}.")
    else:
        raise DerivaMLException(
            f"There are {len(tables)} association tables between {table1.name} and {table2.name}."
        )

find_features

find_features(
    table: TableInput | None = None,
) -> Iterable[Feature]

List features in the catalog.

If a table is specified, returns only features for that table. If no table is specified, returns all features across all tables in the catalog.

Parameters:

Name Type Description Default
table TableInput | None

Optional table to find features for. If None, returns all features in the catalog.

None

Returns:

Type Description
Iterable[Feature]

An iterable of Feature instances describing the features.

Source code in src/deriva_ml/model/catalog.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def find_features(self, table: TableInput | None = None) -> Iterable[Feature]:
    """List features in the catalog.

    If a table is specified, returns only features for that table.
    If no table is specified, returns all features across all tables in the catalog.

    Args:
        table: Optional table to find features for. If None, returns all features
            in the catalog.

    Returns:
        An iterable of Feature instances describing the features.
    """

    def is_feature(a: FindAssociationResult) -> bool:
        """Check if association represents a feature.

        Args:
            a: Association result to check
        Returns:
            bool: True if association represents a feature
        """
        return {
            "Feature_Name",
            "Execution",
            a.self_fkey.foreign_key_columns[0].name,
        }.issubset({c.name for c in a.table.columns})

    def find_table_features(t: Table) -> list[Feature]:
        """Find all features for a single table."""
        return [
            Feature(a, self) for a in t.find_associations(min_arity=3, max_arity=3, pure=False) if is_feature(a)
        ]

    if table is not None:
        # Find features for a specific table
        return find_table_features(self.name_to_table(table))
    else:
        # Find all features across all domain and ML schema tables
        features: list[Feature] = []
        for schema_name in [*self.domain_schemas, self.ml_schema]:
            schema = self.model.schemas.get(schema_name)
            if schema:
                for t in schema.tables.values():
                    features.extend(find_table_features(t))
        return features

find_vocabularies

find_vocabularies() -> list[Table]

Return a list of all controlled vocabulary tables in domain and ML schemas.

Source code in src/deriva_ml/model/catalog.py
493
494
495
496
497
498
499
500
def find_vocabularies(self) -> list[Table]:
    """Return a list of all controlled vocabulary tables in domain and ML schemas."""
    tables = []
    for schema_name in [*self.domain_schemas, self.ml_schema]:
        schema = self.model.schemas.get(schema_name)
        if schema:
            tables.extend(t for t in schema.tables.values() if self.is_vocabulary(t))
    return tables

get_schema_description

get_schema_description(
    include_system_columns: bool = False,
) -> dict[str, Any]

Return a JSON description of the catalog schema structure.

Provides a structured representation of the domain and ML schemas including tables, columns, foreign keys, and relationships. Useful for understanding the data model structure programmatically.

Parameters:

Name Type Description Default
include_system_columns bool

If True, include RID, RCT, RMT, RCB, RMB columns. Default False to reduce output size.

False

Returns:

Type Description
dict[str, Any]

Dictionary with schema structure:

dict[str, Any]

{ "domain_schemas": ["schema_name1", "schema_name2"], "default_schema": "schema_name1", "ml_schema": "deriva-ml", "schemas": { "schema_name": { "tables": { "TableName": { "comment": "description", "is_vocabulary": bool, "is_asset": bool, "is_association": bool, "columns": [...], "foreign_keys": [...], "features": [...] } } } }

dict[str, Any]

}

Source code in src/deriva_ml/model/catalog.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def get_schema_description(self, include_system_columns: bool = False) -> dict[str, Any]:
    """Return a JSON description of the catalog schema structure.

    Provides a structured representation of the domain and ML schemas including
    tables, columns, foreign keys, and relationships. Useful for understanding
    the data model structure programmatically.

    Args:
        include_system_columns: If True, include RID, RCT, RMT, RCB, RMB columns.
            Default False to reduce output size.

    Returns:
        Dictionary with schema structure:
        {
            "domain_schemas": ["schema_name1", "schema_name2"],
            "default_schema": "schema_name1",
            "ml_schema": "deriva-ml",
            "schemas": {
                "schema_name": {
                    "tables": {
                        "TableName": {
                            "comment": "description",
                            "is_vocabulary": bool,
                            "is_asset": bool,
                            "is_association": bool,
                            "columns": [...],
                            "foreign_keys": [...],
                            "features": [...]
                        }
                    }
                }
            }
        }
    """
    system_columns = {"RID", "RCT", "RMT", "RCB", "RMB"}
    result = {
        "domain_schemas": sorted(self.domain_schemas),
        "default_schema": self.default_schema,
        "ml_schema": self.ml_schema,
        "schemas": {},
    }

    # Include all domain schemas and the ML schema
    for schema_name in [*self.domain_schemas, self.ml_schema]:
        schema = self.model.schemas.get(schema_name)
        if not schema:
            continue

        schema_info = {"tables": {}}

        for table_name, table in schema.tables.items():
            # Get columns
            columns = []
            for col in table.columns:
                if not include_system_columns and col.name in system_columns:
                    continue
                columns.append({
                    "name": col.name,
                    "type": str(col.type.typename),
                    "nullok": col.nullok,
                    "comment": col.comment or "",
                })

            # Get foreign keys
            foreign_keys = []
            for fk in table.foreign_keys:
                fk_cols = [c.name for c in fk.foreign_key_columns]
                ref_cols = [c.name for c in fk.referenced_columns]
                foreign_keys.append({
                    "columns": fk_cols,
                    "referenced_table": f"{fk.pk_table.schema.name}.{fk.pk_table.name}",
                    "referenced_columns": ref_cols,
                })

            # Get features if this is a domain table
            features = []
            if self.is_domain_schema(schema_name):
                try:
                    for f in self.find_features(table):
                        features.append({
                            "name": f.feature_name,
                            "feature_table": f.feature_table.name,
                        })
                except Exception as e:
                    logger.debug(f"Could not enumerate features for table {table.name}: {e}")

            table_info = {
                "comment": table.comment or "",
                "is_vocabulary": self.is_vocabulary(table),
                "is_asset": self.is_asset(table),
                "is_association": bool(self.is_association(table)),
                "columns": columns,
                "foreign_keys": foreign_keys,
            }
            if features:
                table_info["features"] = features

            schema_info["tables"][table_name] = table_info

        result["schemas"][schema_name] = schema_info

    return result

is_asset

is_asset(
    table_name: TableInput,
) -> bool

True if the specified table is an asset table.

Parameters:

Name Type Description Default
table_name TableInput

str | Table:

required

Returns:

Type Description
bool

True if the specified table is an asset table, False otherwise.

Source code in src/deriva_ml/model/catalog.py
475
476
477
478
479
480
481
482
483
484
485
486
487
def is_asset(self, table_name: TableInput) -> bool:
    """True if the specified table is an asset table.

    Args:
        table_name: str | Table:

    Returns:
        True if the specified table is an asset table, False otherwise.

    """
    asset_columns = {"Filename", "URL", "Length", "MD5", "Description"}
    table = self.name_to_table(table_name)
    return asset_columns.issubset({c.name for c in table.columns})

is_association

is_association(
    table_name: str | Table,
    unqualified: bool = True,
    pure: bool = True,
    min_arity: int = 2,
    max_arity: int = 2,
) -> bool | set[str] | int

Check the specified table to see if it is an association table.

Parameters:

Name Type Description Default
table_name str | Table

param unqualified:

required
pure bool

return: (Default value = True)

True
table_name str | Table

str | Table:

required
unqualified bool

(Default value = True)

True

Returns:

Source code in src/deriva_ml/model/catalog.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def is_association(
    self,
    table_name: str | Table,
    unqualified: bool = True,
    pure: bool = True,
    min_arity: int = 2,
    max_arity: int = 2,
) -> bool | set[str] | int:
    """Check the specified table to see if it is an association table.

    Args:
        table_name: param unqualified:
        pure: return: (Default value = True)
        table_name: str | Table:
        unqualified:  (Default value = True)

    Returns:


    """
    table = self.name_to_table(table_name)
    return table.is_association(unqualified=unqualified, pure=pure, min_arity=min_arity, max_arity=max_arity)

is_dataset_rid

is_dataset_rid(
    rid: RID, deleted: bool = False
) -> bool

Check if a given RID is a dataset RID.

Source code in src/deriva_ml/model/catalog.py
586
587
588
589
590
591
592
593
594
595
596
597
598
def is_dataset_rid(self, rid: RID, deleted: bool = False) -> bool:
    """Check if a given RID is a dataset RID."""
    try:
        rid_info = self.model.catalog.resolve_rid(rid, self.model)
    except KeyError as _e:
        raise DerivaMLException(f"Invalid RID {rid}")
    if rid_info.table.name != "Dataset":
        return False
    elif deleted:
        # Got a dataset rid. Now check to see if its deleted or not.
        return True
    else:
        return not list(rid_info.datapath.entities().fetch())[0]["Deleted"]

is_domain_schema

is_domain_schema(
    schema_name: str,
) -> bool

Check if a schema is a domain schema.

Parameters:

Name Type Description Default
schema_name str

Name of the schema to check.

required

Returns:

Type Description
bool

True if the schema is a domain schema.

Source code in src/deriva_ml/model/catalog.py
221
222
223
224
225
226
227
228
229
230
def is_domain_schema(self, schema_name: str) -> bool:
    """Check if a schema is a domain schema.

    Args:
        schema_name: Name of the schema to check.

    Returns:
        True if the schema is a domain schema.
    """
    return schema_name in self.domain_schemas

is_system_schema

is_system_schema(
    schema_name: str,
) -> bool

Check if a schema is a system or ML schema.

Parameters:

Name Type Description Default
schema_name str

Name of the schema to check.

required

Returns:

Type Description
bool

True if the schema is a system or ML schema.

Source code in src/deriva_ml/model/catalog.py
210
211
212
213
214
215
216
217
218
219
def is_system_schema(self, schema_name: str) -> bool:
    """Check if a schema is a system or ML schema.

    Args:
        schema_name: Name of the schema to check.

    Returns:
        True if the schema is a system or ML schema.
    """
    return is_system_schema(schema_name, self.ml_schema)

is_vocabulary

is_vocabulary(
    table_name: TableInput,
) -> bool

Check if a given table is a controlled vocabulary table.

Parameters:

Name Type Description Default
table_name TableInput

A ERMRest table object or the name of the table.

required

Returns:

Type Description
bool

Table object if the table is a controlled vocabulary, False otherwise.

Raises:

Type Description
DerivaMLException

if the table doesn't exist.

Source code in src/deriva_ml/model/catalog.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def is_vocabulary(self, table_name: TableInput) -> bool:
    """Check if a given table is a controlled vocabulary table.

    Args:
      table_name: A ERMRest table object or the name of the table.

    Returns:
      Table object if the table is a controlled vocabulary, False otherwise.

    Raises:
      DerivaMLException: if the table doesn't exist.

    """
    vocab_columns = {"NAME", "URI", "SYNONYMS", "DESCRIPTION", "ID"}
    table = self.name_to_table(table_name)
    return vocab_columns.issubset({c.name.upper() for c in table.columns})

list_dataset_element_types

list_dataset_element_types() -> (
    list[Table]
)

Lists the data types of elements contained within a dataset.

This method analyzes the dataset and identifies the data types for all elements within it. It is useful for understanding the structure and content of the dataset and allows for better manipulation and usage of its data.

Returns:

Type Description
list[Table]

list[str]: A list of strings where each string represents a data type

list[Table]

of an element found in the dataset.

Source code in src/deriva_ml/model/catalog.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def list_dataset_element_types(self) -> list[Table]:
    """
    Lists the data types of elements contained within a dataset.

    This method analyzes the dataset and identifies the data types for all
    elements within it. It is useful for understanding the structure and
    content of the dataset and allows for better manipulation and usage of its
    data.

    Returns:
        list[str]: A list of strings where each string represents a data type
        of an element found in the dataset.

    """

    dataset_table = self.name_to_table("Dataset")

    def is_domain_or_dataset_table(table: Table) -> bool:
        return self.is_domain_schema(table.schema.name) or table.name == dataset_table.name

    return [t for a in dataset_table.find_associations() if is_domain_or_dataset_table(t := a.other_fkeys.pop().pk_table)]

lookup_feature

lookup_feature(
    table: TableInput, feature_name: str
) -> Feature

Lookup the named feature associated with the provided table.

Parameters:

Name Type Description Default
table TableInput

param feature_name:

required
table TableInput

str | Table:

required
feature_name str

str:

required

Returns:

Type Description
Feature

A Feature class that represents the requested feature.

Raises:

Type Description
DerivaMLException

If the feature cannot be found.

Source code in src/deriva_ml/model/catalog.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
def lookup_feature(self, table: TableInput, feature_name: str) -> Feature:
    """Lookup the named feature associated with the provided table.

    Args:
        table: param feature_name:
        table: str | Table:
        feature_name: str:

    Returns:
        A Feature class that represents the requested feature.

    Raises:
      DerivaMLException: If the feature cannot be found.
    """
    table = self.name_to_table(table)
    try:
        return [f for f in self.find_features(table) if f.feature_name == feature_name][0]
    except IndexError:
        raise DerivaMLException(f"Feature {table.name}:{feature_name} doesn't exist.")

name_to_table

name_to_table(
    table: TableInput,
) -> Table

Return the table object corresponding to the given table name.

Searches domain schemas first (in sorted order), then ML schema, then WWW. If the table name appears in more than one schema, returns the first match.

Parameters:

Name Type Description Default
table TableInput

A ERMRest table object or a string that is the name of the table.

required

Returns:

Type Description
Table

Table object.

Raises:

Type Description
DerivaMLException

If the table doesn't exist in any searchable schema.

Source code in src/deriva_ml/model/catalog.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def name_to_table(self, table: TableInput) -> Table:
    """Return the table object corresponding to the given table name.

    Searches domain schemas first (in sorted order), then ML schema, then WWW.
    If the table name appears in more than one schema, returns the first match.

    Args:
      table: A ERMRest table object or a string that is the name of the table.

    Returns:
      Table object.

    Raises:
      DerivaMLException: If the table doesn't exist in any searchable schema.
    """
    if isinstance(table, Table):
        return table

    # Search domain schemas (sorted for deterministic order), then ML schema, then WWW
    search_order = [*sorted(self.domain_schemas), self.ml_schema, "WWW"]
    for sname in search_order:
        if sname not in self.model.schemas:
            continue
        s = self.model.schemas[sname]
        if table in s.tables:
            return s.tables[table]
    raise DerivaMLException(f"The table {table} doesn't exist.")

vocab_columns

vocab_columns(
    table_name: TableInput,
) -> dict[str, str]

Return mapping from canonical vocab column name to actual column name.

Canonical names are TitleCase (Name, ID, URI, Description, Synonyms). Actual names reflect the table's schema — could be lowercase for FaceBase-style catalogs or TitleCase for DerivaML-native tables.

Parameters:

Name Type Description Default
table_name TableInput

A table object or the name of the table.

required

Returns:

Type Description
dict[str, str]

Dict mapping canonical name to actual column name in the table.

dict[str, str]

E.g. {"Name": "name", "ID": "id", ...} for FaceBase tables

dict[str, str]

or {"Name": "Name", "ID": "ID", ...} for DerivaML tables.

Source code in src/deriva_ml/model/catalog.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def vocab_columns(self, table_name: TableInput) -> dict[str, str]:
    """Return mapping from canonical vocab column name to actual column name.

    Canonical names are TitleCase (Name, ID, URI, Description, Synonyms).
    Actual names reflect the table's schema — could be lowercase for
    FaceBase-style catalogs or TitleCase for DerivaML-native tables.

    Args:
        table_name: A table object or the name of the table.

    Returns:
        Dict mapping canonical name to actual column name in the table.
        E.g. ``{"Name": "name", "ID": "id", ...}`` for FaceBase tables
        or ``{"Name": "Name", "ID": "ID", ...}`` for DerivaML tables.
    """
    table = self.name_to_table(table_name)
    col_map = {c.name.upper(): c.name for c in table.columns}
    return {canon: col_map[canon.upper()] for canon in ("Name", "ID", "URI", "Description", "Synonyms")}

Display dataclass

Bases: AnnotationBuilder

Display annotation for tables and columns.

Controls the display name, description/tooltip, and how null values and foreign key links are rendered. Can be applied to both tables and columns.

Parameters:

Name Type Description Default
name str | None

Display name shown in the UI (mutually exclusive with markdown_name)

None
markdown_name str | None

Markdown-formatted display name (mutually exclusive with name)

None
name_style NameStyle | None

Styling options for automatic name formatting

None
comment str | None

Description text shown as tooltip/help text

None
show_null dict[str, bool | str] | None

How to display null values, per context

None
show_foreign_key_link dict[str, bool] | None

Whether to show FK values as links, per context

None

Raises:

Type Description
ValueError

If both name and markdown_name are provided

Example

Basic display name::

>>> display = Display(name="Research Subjects")
>>> handle.set_annotation(display)

With description/tooltip::

>>> display = Display(
...     name="Subjects",
...     comment="Individuals enrolled in research studies"
... )

Markdown-formatted name::

>>> display = Display(markdown_name="**Bold** _Italic_ Name")

Context-specific null display::

>>> from deriva_ml.model import CONTEXT_COMPACT, CONTEXT_DETAILED
>>> display = Display(
...     name="Value",
...     show_null={
...         CONTEXT_COMPACT: False,      # Hide nulls in lists
...         CONTEXT_DETAILED: '"N/A"'    # Show "N/A" string
...     }
... )

Control foreign key link display::

>>> display = Display(
...     name="Subject",
...     show_foreign_key_link={CONTEXT_COMPACT: False}
... )
Source code in src/deriva_ml/model/annotations.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
@dataclass
class Display(AnnotationBuilder):
    """Display annotation for tables and columns.

    Controls the display name, description/tooltip, and how null values
    and foreign key links are rendered. Can be applied to both tables
    and columns.

    Args:
        name: Display name shown in the UI (mutually exclusive with markdown_name)
        markdown_name: Markdown-formatted display name (mutually exclusive with name)
        name_style: Styling options for automatic name formatting
        comment: Description text shown as tooltip/help text
        show_null: How to display null values, per context
        show_foreign_key_link: Whether to show FK values as links, per context

    Raises:
        ValueError: If both name and markdown_name are provided

    Example:
        Basic display name::

            >>> display = Display(name="Research Subjects")
            >>> handle.set_annotation(display)

        With description/tooltip::

            >>> display = Display(
            ...     name="Subjects",
            ...     comment="Individuals enrolled in research studies"
            ... )

        Markdown-formatted name::

            >>> display = Display(markdown_name="**Bold** _Italic_ Name")

        Context-specific null display::

            >>> from deriva_ml.model import CONTEXT_COMPACT, CONTEXT_DETAILED
            >>> display = Display(
            ...     name="Value",
            ...     show_null={
            ...         CONTEXT_COMPACT: False,      # Hide nulls in lists
            ...         CONTEXT_DETAILED: '"N/A"'    # Show "N/A" string
            ...     }
            ... )

        Control foreign key link display::

            >>> display = Display(
            ...     name="Subject",
            ...     show_foreign_key_link={CONTEXT_COMPACT: False}
            ... )
    """
    tag = TAG_DISPLAY

    name: str | None = None
    markdown_name: str | None = None
    name_style: NameStyle | None = None
    comment: str | None = None
    show_null: dict[str, bool | str] | None = None
    show_foreign_key_link: dict[str, bool] | None = None

    def __post_init__(self):
        if self.name and self.markdown_name:
            raise ValueError("name and markdown_name are mutually exclusive")

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.name is not None:
            result["name"] = self.name
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.name_style is not None:
            style_dict = self.name_style.to_dict()
            if style_dict:
                result["name_style"] = style_dict
        if self.comment is not None:
            result["comment"] = self.comment
        if self.show_null is not None:
            result["show_null"] = self.show_null
        if self.show_foreign_key_link is not None:
            result["show_foreign_key_link"] = self.show_foreign_key_link
        return result

Facet dataclass

A facet definition for filtering.

Parameters:

Name Type Description Default
source str | list[str | InboundFK | OutboundFK] | None

Path to source data

None
sourcekey str | None

Reference to named source

None
markdown_name str | None

Display name

None
comment str | None

Description

None
entity bool | None

Whether this is an entity facet

None
open bool | None

Start expanded

None
ux_mode FacetUxMode | None

UI mode (choices, ranges, check_presence)

None
bar_plot bool | None

Show bar plot

None
choices list[Any] | None

Preset choice values

None
ranges list[FacetRange] | None

Preset range values

None
not_null bool | None

Filter to non-null values

None
hide_null_choice bool | None

Hide "null" option

None
hide_not_null_choice bool | None

Hide "not null" option

None
n_bins int | None

Number of bins for histogram

None
Source code in src/deriva_ml/model/annotations.py
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
@dataclass
class Facet:
    """A facet definition for filtering.

    Args:
        source: Path to source data
        sourcekey: Reference to named source
        markdown_name: Display name
        comment: Description
        entity: Whether this is an entity facet
        open: Start expanded
        ux_mode: UI mode (choices, ranges, check_presence)
        bar_plot: Show bar plot
        choices: Preset choice values
        ranges: Preset range values
        not_null: Filter to non-null values
        hide_null_choice: Hide "null" option
        hide_not_null_choice: Hide "not null" option
        n_bins: Number of bins for histogram
    """
    source: str | list[str | InboundFK | OutboundFK] | None = None
    sourcekey: str | None = None
    markdown_name: str | None = None
    comment: str | None = None
    entity: bool | None = None
    open: bool | None = None
    ux_mode: FacetUxMode | None = None
    bar_plot: bool | None = None
    choices: list[Any] | None = None
    ranges: list[FacetRange] | None = None
    not_null: bool | None = None
    hide_null_choice: bool | None = None
    hide_not_null_choice: bool | None = None
    n_bins: int | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}

        if self.source is not None:
            if isinstance(self.source, str):
                result["source"] = self.source
            else:
                result["source"] = [
                    item.to_dict() if hasattr(item, "to_dict") else item
                    for item in self.source
                ]

        if self.sourcekey is not None:
            result["sourcekey"] = self.sourcekey
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.comment is not None:
            result["comment"] = self.comment
        if self.entity is not None:
            result["entity"] = self.entity
        if self.open is not None:
            result["open"] = self.open
        if self.ux_mode is not None:
            result["ux_mode"] = self.ux_mode.value
        if self.bar_plot is not None:
            result["bar_plot"] = self.bar_plot
        if self.choices is not None:
            result["choices"] = self.choices
        if self.ranges is not None:
            result["ranges"] = [r.to_dict() for r in self.ranges]
        if self.not_null is not None:
            result["not_null"] = self.not_null
        if self.hide_null_choice is not None:
            result["hide_null_choice"] = self.hide_null_choice
        if self.hide_not_null_choice is not None:
            result["hide_not_null_choice"] = self.hide_not_null_choice
        if self.n_bins is not None:
            result["n_bins"] = self.n_bins

        return result

FacetList dataclass

A list of facets for filtering (visible_columns.filter).

Example

facets = FacetList([ ... Facet(source="Species", open=True), ... Facet(source="Age", ux_mode=FacetUxMode.RANGES) ... ])

Source code in src/deriva_ml/model/annotations.py
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
@dataclass
class FacetList:
    """A list of facets for filtering (visible_columns.filter).

    Example:
        >>> facets = FacetList([
        ...     Facet(source="Species", open=True),
        ...     Facet(source="Age", ux_mode=FacetUxMode.RANGES)
        ... ])
    """
    facets: list[Facet] = field(default_factory=list)

    def add(self, facet: Facet) -> "FacetList":
        """Add a facet to the list."""
        self.facets.append(facet)
        return self

    def to_dict(self) -> dict[str, list[dict]]:
        return {"and": [f.to_dict() for f in self.facets]}

add

add(facet: Facet) -> 'FacetList'

Add a facet to the list.

Source code in src/deriva_ml/model/annotations.py
1271
1272
1273
1274
def add(self, facet: Facet) -> "FacetList":
    """Add a facet to the list."""
    self.facets.append(facet)
    return self

FacetRange dataclass

A range for facet filtering.

Parameters:

Name Type Description Default
min float | None

Minimum value

None
max float | None

Maximum value

None
min_exclusive bool | None

Exclude min value

None
max_exclusive bool | None

Exclude max value

None
Source code in src/deriva_ml/model/annotations.py
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
@dataclass
class FacetRange:
    """A range for facet filtering.

    Args:
        min: Minimum value
        max: Maximum value
        min_exclusive: Exclude min value
        max_exclusive: Exclude max value
    """
    min: float | None = None
    max: float | None = None
    min_exclusive: bool | None = None
    max_exclusive: bool | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.min is not None:
            result["min"] = self.min
        if self.max is not None:
            result["max"] = self.max
        if self.min_exclusive is not None:
            result["min_exclusive"] = self.min_exclusive
        if self.max_exclusive is not None:
            result["max_exclusive"] = self.max_exclusive
        return result

FacetUxMode

Bases: str, Enum

UX modes for facet filters in the search panel.

Controls how users interact with a facet filter.

Attributes:

Name Type Description
CHOICES

Checkbox list for selecting values

RANGES

Range slider/inputs for numeric or date ranges

CHECK_PRESENCE

Check if value exists or is null

Example

Choice-based facet

Facet(source="Status", ux_mode=FacetUxMode.CHOICES)

Range-based facet for numeric values

Facet(source="Age", ux_mode=FacetUxMode.RANGES)

Check presence (has value / no value)

Facet(source="Notes", ux_mode=FacetUxMode.CHECK_PRESENCE)

Source code in src/deriva_ml/model/annotations.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class FacetUxMode(str, Enum):
    """UX modes for facet filters in the search panel.

    Controls how users interact with a facet filter.

    Attributes:
        CHOICES: Checkbox list for selecting values
        RANGES: Range slider/inputs for numeric or date ranges
        CHECK_PRESENCE: Check if value exists or is null

    Example:
        >>> # Choice-based facet
        >>> Facet(source="Status", ux_mode=FacetUxMode.CHOICES)
        >>>
        >>> # Range-based facet for numeric values
        >>> Facet(source="Age", ux_mode=FacetUxMode.RANGES)
        >>>
        >>> # Check presence (has value / no value)
        >>> Facet(source="Notes", ux_mode=FacetUxMode.CHECK_PRESENCE)
    """
    CHOICES = "choices"
    RANGES = "ranges"
    CHECK_PRESENCE = "check_presence"

ForeignKeyOrderer

Computes insertion order for tables based on FK dependencies.

Uses topological sort to ensure referenced tables are populated before tables that reference them. Handles cycles by either raising an error or breaking them.

Example

orderer = ForeignKeyOrderer(model, schemas=['domain', 'deriva-ml'])

Get insertion order

tables_to_fill = ['Image', 'Subject', 'Diagnosis'] ordered = orderer.get_insertion_order(tables_to_fill)

Returns: ['Subject', 'Image', 'Diagnosis']

Get all tables in safe order

all_ordered = orderer.get_insertion_order()

Get FK dependencies for a table

deps = orderer.get_dependencies('Image')

Returns: {'Subject', 'Dataset', ...}

Source code in src/deriva_ml/model/fk_orderer.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class ForeignKeyOrderer:
    """Computes insertion order for tables based on FK dependencies.

    Uses topological sort to ensure referenced tables are populated
    before tables that reference them. Handles cycles by either
    raising an error or breaking them.

    Example:
        orderer = ForeignKeyOrderer(model, schemas=['domain', 'deriva-ml'])

        # Get insertion order
        tables_to_fill = ['Image', 'Subject', 'Diagnosis']
        ordered = orderer.get_insertion_order(tables_to_fill)
        # Returns: ['Subject', 'Image', 'Diagnosis']

        # Get all tables in safe order
        all_ordered = orderer.get_insertion_order()

        # Get FK dependencies for a table
        deps = orderer.get_dependencies('Image')
        # Returns: {'Subject', 'Dataset', ...}
    """

    def __init__(
        self,
        model: Model,
        schemas: list[str],
    ):
        """Initialize the orderer.

        Args:
            model: ERMrest Model object.
            schemas: Schemas to consider for FK relationships.
        """
        self.model = model
        self.schemas = set(schemas)
        self._table_cache: dict[str, DerivaTable] = {}
        self._build_table_cache()

    def _build_table_cache(self) -> None:
        """Build cache mapping table names to Table objects."""
        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue
            schema = self.model.schemas[schema_name]
            for table_name, table in schema.tables.items():
                # Store both qualified and unqualified names
                self._table_cache[f"{schema_name}.{table_name}"] = table
                # Only store unqualified if not already present (avoids conflicts)
                if table_name not in self._table_cache:
                    self._table_cache[table_name] = table

    def _to_table(self, t: str | DerivaTable) -> DerivaTable:
        """Convert table name to Table object.

        Args:
            t: Table name or Table object.

        Returns:
            DerivaTable object.

        Raises:
            ValueError: If table not found.
        """
        if isinstance(t, DerivaTable):
            return t

        if t in self._table_cache:
            return self._table_cache[t]

        raise ValueError(f"Table {t} not found in schemas {self.schemas}")

    def _table_key(self, t: DerivaTable) -> str:
        """Get unique key for a table."""
        return f"{t.schema.name}.{t.name}"

    def get_dependencies(self, table: str | DerivaTable) -> set[DerivaTable]:
        """Get tables that this table depends on (FK targets).

        Args:
            table: Table name or object.

        Returns:
            Set of tables that must be populated before this table.
        """
        t = self._to_table(table)
        dependencies = set()

        for fk in t.foreign_keys:
            pk_table = fk.pk_table
            # Only include dependencies within our schemas
            if pk_table.schema.name in self.schemas:
                # Don't include self-references as dependencies
                if self._table_key(pk_table) != self._table_key(t):
                    dependencies.add(pk_table)

        return dependencies

    def get_dependents(self, table: str | DerivaTable) -> set[DerivaTable]:
        """Get tables that depend on this table (FK sources).

        Args:
            table: Table name or object.

        Returns:
            Set of tables that reference this table.
        """
        t = self._to_table(table)
        dependents = set()

        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue

            for other_table in self.model.schemas[schema_name].tables.values():
                if self._table_key(other_table) == self._table_key(t):
                    continue

                for fk in other_table.foreign_keys:
                    if self._table_key(fk.pk_table) == self._table_key(t):
                        dependents.add(other_table)
                        break

        return dependents

    def _build_dependency_graph(
        self,
        tables: list[str | DerivaTable] | None = None,
    ) -> dict[str, set[str]]:
        """Build FK dependency graph.

        Args:
            tables: Tables to include. If None, includes all tables.

        Returns:
            Dict mapping table key -> set of table keys it depends on.
        """
        if tables is None:
            # Include all tables in schemas
            table_objs = []
            for schema_name in self.schemas:
                if schema_name in self.model.schemas:
                    table_objs.extend(self.model.schemas[schema_name].tables.values())
        else:
            table_objs = [self._to_table(t) for t in tables]

        table_keys = {self._table_key(t) for t in table_objs}
        graph: dict[str, set[str]] = {}

        for t in table_objs:
            key = self._table_key(t)
            deps = set()

            for fk in t.foreign_keys:
                pk_key = self._table_key(fk.pk_table)
                # Only include deps within our table set
                if pk_key in table_keys and pk_key != key:
                    deps.add(pk_key)

            graph[key] = deps

        return graph

    def get_insertion_order(
        self,
        tables: list[str | DerivaTable] | None = None,
        handle_cycles: bool = True,
    ) -> list[DerivaTable]:
        """Compute FK-safe insertion order for the given tables.

        Returns tables ordered so that all FK dependencies are satisfied
        when inserting in order.

        Args:
            tables: Tables to order. If None, orders all tables in schemas.
            handle_cycles: If True, break cycles by removing edges.
                If False, raise CycleError on cycles.

        Returns:
            Ordered list of Table objects (insert from first to last).

        Raises:
            CycleError: If handle_cycles=False and cycles exist.
        """
        graph = self._build_dependency_graph(tables)

        try:
            ts = TopologicalSorter(graph)
            ordered_keys = list(ts.static_order())
        except CycleError as e:
            if handle_cycles:
                ordered_keys = self._break_cycles_and_sort(graph, e)
            else:
                raise

        # Convert keys back to Table objects
        return [self._table_cache[key] for key in ordered_keys]

    def get_deletion_order(
        self,
        tables: list[str | DerivaTable] | None = None,
        handle_cycles: bool = True,
    ) -> list[DerivaTable]:
        """Compute FK-safe deletion order for the given tables.

        Returns tables in reverse dependency order - tables that are
        referenced should be deleted last.

        Args:
            tables: Tables to order. If None, orders all tables in schemas.
            handle_cycles: If True, break cycles. If False, raise on cycles.

        Returns:
            Ordered list of Table objects (delete from first to last).
        """
        insertion_order = self.get_insertion_order(tables, handle_cycles)
        return list(reversed(insertion_order))

    def _break_cycles_and_sort(
        self,
        graph: dict[str, set[str]],
        error: CycleError,
        _depth: int = 0,
    ) -> list[str]:
        """Handle cycles by breaking them and re-sorting.

        Uses a simple strategy of removing edges from cycle members
        until no cycles remain.

        Args:
            graph: Dependency graph.
            error: CycleError with cycle info.

        Returns:
            Ordered list of table keys.
        """
        max_depth = len(graph)  # Can't have more cycles than edges
        if _depth > max_depth:
            logger.error("Too many cycles to break, returning arbitrary order")
            return list(graph.keys())

        # Get cycle from error message.
        # CycleError.args[1] is like ['A', 'B', 'C', 'A'] where first == last.
        cycle = list(error.args[1]) if len(error.args) > 1 else []

        if cycle:
            logger.warning(f"Breaking cycle in FK dependencies: {' -> '.join(cycle)}")

            # Remove one edge from the cycle to break it.
            # cycle[-1] == cycle[0], so the unique nodes are cycle[:-1].
            # Each consecutive pair cycle[i] -> cycle[i+1] corresponds to
            # graph[cycle[i+1]] containing cycle[i] (i.e., cycle[i+1] depends on cycle[i]).
            # Remove the last real edge: cycle[-2] from graph[cycle[-1]].
            edge_removed = False
            if len(cycle) >= 3:
                dep_node = cycle[-2]  # the dependency
                node = cycle[-1]      # the node that depends on dep_node
                if node in graph and dep_node in graph[node]:
                    graph[node].remove(dep_node)
                    logger.debug(f"Removed dependency {node} -> {dep_node}")
                    edge_removed = True

            if not edge_removed:
                # Try removing any edge in the cycle
                for i in range(len(cycle) - 1):
                    dep_node, node = cycle[i], cycle[i + 1]
                    if node in graph and dep_node in graph[node]:
                        graph[node].remove(dep_node)
                        logger.debug(f"Removed dependency {node} -> {dep_node}")
                        edge_removed = True
                        break

        # Try again
        try:
            ts = TopologicalSorter(graph)
            return list(ts.static_order())
        except CycleError as e:
            # Recursively break more cycles
            return self._break_cycles_and_sort(graph, e, _depth + 1)

    def validate_insertion_order(
        self,
        tables: list[str | DerivaTable],
    ) -> list[tuple[str, str, str]]:
        """Validate that a list of tables can be inserted in order.

        Checks each table to ensure all its FK dependencies are
        satisfied by tables earlier in the list.

        Args:
            tables: Ordered list of tables to validate.

        Returns:
            List of (table, missing_dependency, fk_name) tuples for
            any unsatisfied dependencies. Empty list if valid.
        """
        table_objs = [self._to_table(t) for t in tables]
        seen_keys = set()
        violations = []

        for t in table_objs:
            key = self._table_key(t)

            for fk in t.foreign_keys:
                pk_key = self._table_key(fk.pk_table)
                # Skip self-references and tables not in our set
                if pk_key == key:
                    continue
                if pk_key not in {self._table_key(x) for x in table_objs}:
                    continue

                if pk_key not in seen_keys:
                    violations.append((key, pk_key, fk.name[1]))

            seen_keys.add(key)

        return violations

    def get_all_tables(self) -> list[DerivaTable]:
        """Get all tables in configured schemas.

        Returns:
            List of all Table objects.
        """
        tables = []
        for schema_name in self.schemas:
            if schema_name in self.model.schemas:
                tables.extend(self.model.schemas[schema_name].tables.values())
        return tables

    def find_cycles(self) -> list[list[str]]:
        """Find all FK dependency cycles in the schema.

        Returns:
            List of cycles, each cycle is a list of table keys.
        """
        graph = self._build_dependency_graph()
        cycles = []

        # Use DFS to find cycles
        visited = set()
        rec_stack = set()
        path = []

        def dfs(node: str) -> bool:
            visited.add(node)
            rec_stack.add(node)
            path.append(node)

            for neighbor in graph.get(node, set()):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in rec_stack:
                    # Found cycle
                    idx = path.index(neighbor)
                    cycle = path[idx:] + [neighbor]
                    cycles.append(cycle)

            path.pop()
            rec_stack.remove(node)
            return False

        for node in graph:
            if node not in visited:
                dfs(node)

        return cycles

__init__

__init__(
    model: Model, schemas: list[str]
)

Initialize the orderer.

Parameters:

Name Type Description Default
model Model

ERMrest Model object.

required
schemas list[str]

Schemas to consider for FK relationships.

required
Source code in src/deriva_ml/model/fk_orderer.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    model: Model,
    schemas: list[str],
):
    """Initialize the orderer.

    Args:
        model: ERMrest Model object.
        schemas: Schemas to consider for FK relationships.
    """
    self.model = model
    self.schemas = set(schemas)
    self._table_cache: dict[str, DerivaTable] = {}
    self._build_table_cache()

find_cycles

find_cycles() -> list[list[str]]

Find all FK dependency cycles in the schema.

Returns:

Type Description
list[list[str]]

List of cycles, each cycle is a list of table keys.

Source code in src/deriva_ml/model/fk_orderer.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def find_cycles(self) -> list[list[str]]:
    """Find all FK dependency cycles in the schema.

    Returns:
        List of cycles, each cycle is a list of table keys.
    """
    graph = self._build_dependency_graph()
    cycles = []

    # Use DFS to find cycles
    visited = set()
    rec_stack = set()
    path = []

    def dfs(node: str) -> bool:
        visited.add(node)
        rec_stack.add(node)
        path.append(node)

        for neighbor in graph.get(node, set()):
            if neighbor not in visited:
                if dfs(neighbor):
                    return True
            elif neighbor in rec_stack:
                # Found cycle
                idx = path.index(neighbor)
                cycle = path[idx:] + [neighbor]
                cycles.append(cycle)

        path.pop()
        rec_stack.remove(node)
        return False

    for node in graph:
        if node not in visited:
            dfs(node)

    return cycles

get_all_tables

get_all_tables() -> list[DerivaTable]

Get all tables in configured schemas.

Returns:

Type Description
list[Table]

List of all Table objects.

Source code in src/deriva_ml/model/fk_orderer.py
353
354
355
356
357
358
359
360
361
362
363
def get_all_tables(self) -> list[DerivaTable]:
    """Get all tables in configured schemas.

    Returns:
        List of all Table objects.
    """
    tables = []
    for schema_name in self.schemas:
        if schema_name in self.model.schemas:
            tables.extend(self.model.schemas[schema_name].tables.values())
    return tables

get_deletion_order

get_deletion_order(
    tables: list[str | Table]
    | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]

Compute FK-safe deletion order for the given tables.

Returns tables in reverse dependency order - tables that are referenced should be deleted last.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all tables in schemas.

None
handle_cycles bool

If True, break cycles. If False, raise on cycles.

True

Returns:

Type Description
list[Table]

Ordered list of Table objects (delete from first to last).

Source code in src/deriva_ml/model/fk_orderer.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get_deletion_order(
    self,
    tables: list[str | DerivaTable] | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]:
    """Compute FK-safe deletion order for the given tables.

    Returns tables in reverse dependency order - tables that are
    referenced should be deleted last.

    Args:
        tables: Tables to order. If None, orders all tables in schemas.
        handle_cycles: If True, break cycles. If False, raise on cycles.

    Returns:
        Ordered list of Table objects (delete from first to last).
    """
    insertion_order = self.get_insertion_order(tables, handle_cycles)
    return list(reversed(insertion_order))

get_dependencies

get_dependencies(
    table: str | Table,
) -> set[DerivaTable]

Get tables that this table depends on (FK targets).

Parameters:

Name Type Description Default
table str | Table

Table name or object.

required

Returns:

Type Description
set[Table]

Set of tables that must be populated before this table.

Source code in src/deriva_ml/model/fk_orderer.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_dependencies(self, table: str | DerivaTable) -> set[DerivaTable]:
    """Get tables that this table depends on (FK targets).

    Args:
        table: Table name or object.

    Returns:
        Set of tables that must be populated before this table.
    """
    t = self._to_table(table)
    dependencies = set()

    for fk in t.foreign_keys:
        pk_table = fk.pk_table
        # Only include dependencies within our schemas
        if pk_table.schema.name in self.schemas:
            # Don't include self-references as dependencies
            if self._table_key(pk_table) != self._table_key(t):
                dependencies.add(pk_table)

    return dependencies

get_dependents

get_dependents(
    table: str | Table,
) -> set[DerivaTable]

Get tables that depend on this table (FK sources).

Parameters:

Name Type Description Default
table str | Table

Table name or object.

required

Returns:

Type Description
set[Table]

Set of tables that reference this table.

Source code in src/deriva_ml/model/fk_orderer.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def get_dependents(self, table: str | DerivaTable) -> set[DerivaTable]:
    """Get tables that depend on this table (FK sources).

    Args:
        table: Table name or object.

    Returns:
        Set of tables that reference this table.
    """
    t = self._to_table(table)
    dependents = set()

    for schema_name in self.schemas:
        if schema_name not in self.model.schemas:
            continue

        for other_table in self.model.schemas[schema_name].tables.values():
            if self._table_key(other_table) == self._table_key(t):
                continue

            for fk in other_table.foreign_keys:
                if self._table_key(fk.pk_table) == self._table_key(t):
                    dependents.add(other_table)
                    break

    return dependents

get_insertion_order

get_insertion_order(
    tables: list[str | Table]
    | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]

Compute FK-safe insertion order for the given tables.

Returns tables ordered so that all FK dependencies are satisfied when inserting in order.

Parameters:

Name Type Description Default
tables list[str | Table] | None

Tables to order. If None, orders all tables in schemas.

None
handle_cycles bool

If True, break cycles by removing edges. If False, raise CycleError on cycles.

True

Returns:

Type Description
list[Table]

Ordered list of Table objects (insert from first to last).

Raises:

Type Description
CycleError

If handle_cycles=False and cycles exist.

Source code in src/deriva_ml/model/fk_orderer.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_insertion_order(
    self,
    tables: list[str | DerivaTable] | None = None,
    handle_cycles: bool = True,
) -> list[DerivaTable]:
    """Compute FK-safe insertion order for the given tables.

    Returns tables ordered so that all FK dependencies are satisfied
    when inserting in order.

    Args:
        tables: Tables to order. If None, orders all tables in schemas.
        handle_cycles: If True, break cycles by removing edges.
            If False, raise CycleError on cycles.

    Returns:
        Ordered list of Table objects (insert from first to last).

    Raises:
        CycleError: If handle_cycles=False and cycles exist.
    """
    graph = self._build_dependency_graph(tables)

    try:
        ts = TopologicalSorter(graph)
        ordered_keys = list(ts.static_order())
    except CycleError as e:
        if handle_cycles:
            ordered_keys = self._break_cycles_and_sort(graph, e)
        else:
            raise

    # Convert keys back to Table objects
    return [self._table_cache[key] for key in ordered_keys]

validate_insertion_order

validate_insertion_order(
    tables: list[str | Table],
) -> list[tuple[str, str, str]]

Validate that a list of tables can be inserted in order.

Checks each table to ensure all its FK dependencies are satisfied by tables earlier in the list.

Parameters:

Name Type Description Default
tables list[str | Table]

Ordered list of tables to validate.

required

Returns:

Type Description
list[tuple[str, str, str]]

List of (table, missing_dependency, fk_name) tuples for

list[tuple[str, str, str]]

any unsatisfied dependencies. Empty list if valid.

Source code in src/deriva_ml/model/fk_orderer.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def validate_insertion_order(
    self,
    tables: list[str | DerivaTable],
) -> list[tuple[str, str, str]]:
    """Validate that a list of tables can be inserted in order.

    Checks each table to ensure all its FK dependencies are
    satisfied by tables earlier in the list.

    Args:
        tables: Ordered list of tables to validate.

    Returns:
        List of (table, missing_dependency, fk_name) tuples for
        any unsatisfied dependencies. Empty list if valid.
    """
    table_objs = [self._to_table(t) for t in tables]
    seen_keys = set()
    violations = []

    for t in table_objs:
        key = self._table_key(t)

        for fk in t.foreign_keys:
            pk_key = self._table_key(fk.pk_table)
            # Skip self-references and tables not in our set
            if pk_key == key:
                continue
            if pk_key not in {self._table_key(x) for x in table_objs}:
                continue

            if pk_key not in seen_keys:
                violations.append((key, pk_key, fk.name[1]))

        seen_keys.add(key)

    return violations

InboundFK dataclass

An inbound foreign key path step for pseudo-column source paths.

Use this when following a foreign key FROM another table TO the current table. This is common when counting or aggregating related records.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required
Example

Count images related to a subject (Image has FK to Subject)::

>>> # In Subject table, count related images
>>> pc = PseudoColumn(
...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
...     aggregate=Aggregate.CNT,
...     markdown_name="Image Count"
... )
Source code in src/deriva_ml/model/annotations.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
@dataclass
class InboundFK:
    """An inbound foreign key path step for pseudo-column source paths.

    Use this when following a foreign key FROM another table TO the current table.
    This is common when counting or aggregating related records.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Example:
        Count images related to a subject (Image has FK to Subject)::

            >>> # In Subject table, count related images
            >>> pc = PseudoColumn(
            ...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
            ...     aggregate=Aggregate.CNT,
            ...     markdown_name="Image Count"
            ... )
    """
    schema: str
    constraint: str

    def to_dict(self) -> dict[str, list[str]]:
        return {"inbound": [self.schema, self.constraint]}

NameStyle dataclass

Styling options for automatic display name formatting.

Applied to table or column names when no explicit display name is set.

Parameters:

Name Type Description Default
underline_space bool | None

Replace underscores with spaces (e.g., "First_Name" -> "First Name")

None
title_case bool | None

Apply title case formatting (e.g., "firstname" -> "Firstname")

None
markdown bool | None

Render the name as markdown

None
Example

Transform "Subject_ID" to "Subject Id" with title case

display = Display( ... name_style=NameStyle(underline_space=True, title_case=True) ... )

Source code in src/deriva_ml/model/annotations.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@dataclass
class NameStyle:
    """Styling options for automatic display name formatting.

    Applied to table or column names when no explicit display name is set.

    Args:
        underline_space: Replace underscores with spaces (e.g., "First_Name" -> "First Name")
        title_case: Apply title case formatting (e.g., "firstname" -> "Firstname")
        markdown: Render the name as markdown

    Example:
        >>> # Transform "Subject_ID" to "Subject Id" with title case
        >>> display = Display(
        ...     name_style=NameStyle(underline_space=True, title_case=True)
        ... )
    """
    underline_space: bool | None = None
    title_case: bool | None = None
    markdown: bool | None = None

    def to_dict(self) -> dict[str, bool]:
        """Convert to dictionary, excluding None values."""
        result = {}
        if self.underline_space is not None:
            result["underline_space"] = self.underline_space
        if self.title_case is not None:
            result["title_case"] = self.title_case
        if self.markdown is not None:
            result["markdown"] = self.markdown
        return result

to_dict

to_dict() -> dict[str, bool]

Convert to dictionary, excluding None values.

Source code in src/deriva_ml/model/annotations.py
316
317
318
319
320
321
322
323
324
325
def to_dict(self) -> dict[str, bool]:
    """Convert to dictionary, excluding None values."""
    result = {}
    if self.underline_space is not None:
        result["underline_space"] = self.underline_space
    if self.title_case is not None:
        result["title_case"] = self.title_case
    if self.markdown is not None:
        result["markdown"] = self.markdown
    return result

OutboundFK dataclass

An outbound foreign key path step for pseudo-column source paths.

Use this when following a foreign key FROM the current table TO another table. This is common when displaying values from referenced tables.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required
Example

Show species name from a related Species table::

>>> # Subject has FK to Species, display Species.Name
>>> pc = PseudoColumn(
...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
...     markdown_name="Species"
... )

Chain multiple outbound FKs::

>>> # Image -> Subject -> Species
>>> pc = PseudoColumn(
...     source=[
...         OutboundFK("domain", "Image_Subject_fkey"),
...         OutboundFK("domain", "Subject_Species_fkey"),
...         "Name"
...     ],
...     markdown_name="Species"
... )
Source code in src/deriva_ml/model/annotations.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
@dataclass
class OutboundFK:
    """An outbound foreign key path step for pseudo-column source paths.

    Use this when following a foreign key FROM the current table TO another table.
    This is common when displaying values from referenced tables.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Example:
        Show species name from a related Species table::

            >>> # Subject has FK to Species, display Species.Name
            >>> pc = PseudoColumn(
            ...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
            ...     markdown_name="Species"
            ... )

        Chain multiple outbound FKs::

            >>> # Image -> Subject -> Species
            >>> pc = PseudoColumn(
            ...     source=[
            ...         OutboundFK("domain", "Image_Subject_fkey"),
            ...         OutboundFK("domain", "Subject_Species_fkey"),
            ...         "Name"
            ...     ],
            ...     markdown_name="Species"
            ... )
    """
    schema: str
    constraint: str

    def to_dict(self) -> dict[str, list[str]]:
        return {"outbound": [self.schema, self.constraint]}

PreFormat dataclass

Pre-formatting options for column values.

Parameters:

Name Type Description Default
format str | None

Printf-style format string (e.g., "%.2f")

None
bool_true_value str | None

Display value for True

None
bool_false_value str | None

Display value for False

None
Source code in src/deriva_ml/model/annotations.py
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
@dataclass
class PreFormat:
    """Pre-formatting options for column values.

    Args:
        format: Printf-style format string (e.g., "%.2f")
        bool_true_value: Display value for True
        bool_false_value: Display value for False
    """
    format: str | None = None
    bool_true_value: str | None = None
    bool_false_value: str | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.format is not None:
            result["format"] = self.format
        if self.bool_true_value is not None:
            result["bool_true_value"] = self.bool_true_value
        if self.bool_false_value is not None:
            result["bool_false_value"] = self.bool_false_value
        return result

PseudoColumn dataclass

A pseudo-column definition for visible columns and foreign keys.

Pseudo-columns display computed values, values from related tables, or custom markdown patterns. They appear as columns in table views but are not actual database columns.

Parameters:

Name Type Description Default
source str | list[str | InboundFK | OutboundFK] | None

Path to source data. Can be: - A column name (string) - A list of FK path steps ending with a column name

None
sourcekey str | None

Reference to a named source in source-definitions annotation

None
markdown_name str | None

Display name for the column (supports markdown)

None
comment str | Literal[False] | None

Description/tooltip text (or False to hide)

None
entity bool | None

Whether this represents an entity (affects rendering)

None
aggregate Aggregate | None

Aggregation function when source returns multiple values

None
self_link bool | None

Make the value a link to the current row

None
display PseudoColumnDisplay | None

Display formatting options

None
array_options dict[str, Any] | None

Options for array aggregates (max_length, order)

None
Note

source and sourcekey are mutually exclusive. Use source for inline definitions, sourcekey to reference pre-defined sources.

Raises:

Type Description
ValueError

If both source and sourcekey are provided

Example

Simple column with custom display name::

>>> PseudoColumn(source="Internal_ID", markdown_name="ID")

Outbound FK traversal (display value from referenced table)::

>>> # Subject has FK to Species - show Species.Name
>>> PseudoColumn(
...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
...     markdown_name="Species"
... )

Inbound FK with aggregation (count related records)::

>>> # Count images pointing to this subject
>>> PseudoColumn(
...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
...     aggregate=Aggregate.CNT,
...     markdown_name="Images"
... )

Multi-hop FK path::

>>> # Image -> Subject -> Species
>>> PseudoColumn(
...     source=[
...         OutboundFK("domain", "Image_Subject_fkey"),
...         OutboundFK("domain", "Subject_Species_fkey"),
...         "Name"
...     ],
...     markdown_name="Species"
... )

With custom display formatting::

>>> PseudoColumn(
...     source="URL",
...     display=PseudoColumnDisplay(
...         markdown_pattern="[Download]({{{_value}}})",
...         show_foreign_key_link=False
...     )
... )

Array aggregate with display options::

>>> PseudoColumn(
...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
...     aggregate=Aggregate.ARRAY_D,
...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV),
...     markdown_name="Tags"
... )
Source code in src/deriva_ml/model/annotations.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
@dataclass
class PseudoColumn:
    """A pseudo-column definition for visible columns and foreign keys.

    Pseudo-columns display computed values, values from related tables,
    or custom markdown patterns. They appear as columns in table views
    but are not actual database columns.

    Args:
        source: Path to source data. Can be:
            - A column name (string)
            - A list of FK path steps ending with a column name
        sourcekey: Reference to a named source in source-definitions annotation
        markdown_name: Display name for the column (supports markdown)
        comment: Description/tooltip text (or False to hide)
        entity: Whether this represents an entity (affects rendering)
        aggregate: Aggregation function when source returns multiple values
        self_link: Make the value a link to the current row
        display: Display formatting options
        array_options: Options for array aggregates (max_length, order)

    Note:
        source and sourcekey are mutually exclusive. Use source for inline
        definitions, sourcekey to reference pre-defined sources.

    Raises:
        ValueError: If both source and sourcekey are provided

    Example:
        Simple column with custom display name::

            >>> PseudoColumn(source="Internal_ID", markdown_name="ID")

        Outbound FK traversal (display value from referenced table)::

            >>> # Subject has FK to Species - show Species.Name
            >>> PseudoColumn(
            ...     source=[OutboundFK("domain", "Subject_Species_fkey"), "Name"],
            ...     markdown_name="Species"
            ... )

        Inbound FK with aggregation (count related records)::

            >>> # Count images pointing to this subject
            >>> PseudoColumn(
            ...     source=[InboundFK("domain", "Image_Subject_fkey"), "RID"],
            ...     aggregate=Aggregate.CNT,
            ...     markdown_name="Images"
            ... )

        Multi-hop FK path::

            >>> # Image -> Subject -> Species
            >>> PseudoColumn(
            ...     source=[
            ...         OutboundFK("domain", "Image_Subject_fkey"),
            ...         OutboundFK("domain", "Subject_Species_fkey"),
            ...         "Name"
            ...     ],
            ...     markdown_name="Species"
            ... )

        With custom display formatting::

            >>> PseudoColumn(
            ...     source="URL",
            ...     display=PseudoColumnDisplay(
            ...         markdown_pattern="[Download]({{{_value}}})",
            ...         show_foreign_key_link=False
            ...     )
            ... )

        Array aggregate with display options::

            >>> PseudoColumn(
            ...     source=[InboundFK("domain", "Tag_Item_fkey"), "Name"],
            ...     aggregate=Aggregate.ARRAY_D,
            ...     display=PseudoColumnDisplay(array_ux_mode=ArrayUxMode.CSV),
            ...     markdown_name="Tags"
            ... )
    """
    source: str | list[str | InboundFK | OutboundFK] | None = None
    sourcekey: str | None = None
    markdown_name: str | None = None
    comment: str | Literal[False] | None = None
    entity: bool | None = None
    aggregate: Aggregate | None = None
    self_link: bool | None = None
    display: PseudoColumnDisplay | None = None
    array_options: dict[str, Any] | None = None  # Can be complex

    def __post_init__(self):
        if self.source is not None and self.sourcekey is not None:
            raise ValueError("source and sourcekey are mutually exclusive")

    def to_dict(self) -> dict[str, Any]:
        result = {}

        if self.source is not None:
            if isinstance(self.source, str):
                result["source"] = self.source
            else:
                # Convert path elements
                result["source"] = [
                    item.to_dict() if hasattr(item, "to_dict") else item
                    for item in self.source
                ]

        if self.sourcekey is not None:
            result["sourcekey"] = self.sourcekey
        if self.markdown_name is not None:
            result["markdown_name"] = self.markdown_name
        if self.comment is not None:
            result["comment"] = self.comment
        if self.entity is not None:
            result["entity"] = self.entity
        if self.aggregate is not None:
            result["aggregate"] = self.aggregate.value
        if self.self_link is not None:
            result["self_link"] = self.self_link
        if self.display is not None:
            result["display"] = self.display.to_dict()
        if self.array_options is not None:
            result["array_options"] = self.array_options

        return result

PseudoColumnDisplay dataclass

Display options for a pseudo-column.

Parameters:

Name Type Description Default
markdown_pattern str | None

Handlebars/mustache template

None
template_engine TemplateEngine | None

Template engine to use

None
show_foreign_key_link bool | None

Show as clickable link

None
array_ux_mode ArrayUxMode | None

How to render array values

None
column_order list[SortKey] | Literal[False] | None

Sort order for the column, or False to disable

None
wait_for list[str] | None

Template variables to wait for before rendering

None
Source code in src/deriva_ml/model/annotations.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
@dataclass
class PseudoColumnDisplay:
    """Display options for a pseudo-column.

    Args:
        markdown_pattern: Handlebars/mustache template
        template_engine: Template engine to use
        show_foreign_key_link: Show as clickable link
        array_ux_mode: How to render array values
        column_order: Sort order for the column, or False to disable
        wait_for: Template variables to wait for before rendering
    """
    markdown_pattern: str | None = None
    template_engine: TemplateEngine | None = None
    show_foreign_key_link: bool | None = None
    array_ux_mode: ArrayUxMode | None = None
    column_order: list[SortKey] | Literal[False] | None = None
    wait_for: list[str] | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.markdown_pattern is not None:
            result["markdown_pattern"] = self.markdown_pattern
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.show_foreign_key_link is not None:
            result["show_foreign_key_link"] = self.show_foreign_key_link
        if self.array_ux_mode is not None:
            result["array_ux_mode"] = self.array_ux_mode.value
        if self.column_order is not None:
            if self.column_order is False:
                result["column_order"] = False
            else:
                result["column_order"] = [
                    k.to_dict() if isinstance(k, SortKey) else k
                    for k in self.column_order
                ]
        if self.wait_for is not None:
            result["wait_for"] = self.wait_for
        return result

SchemaBuilder

Creates SQLAlchemy ORM from a Deriva catalog model.

Phase 1 of the two-phase database creation pattern. This class handles only schema/ORM creation - no data loading.

The Model can come from either a live catalog or a schema.json file: - From catalog: model = catalog.getCatalogModel() - From file: model = Model.fromfile("file-system", "path/to/schema.json")

Example

Create ORM from catalog model

model = catalog.getCatalogModel() builder = SchemaBuilder(model, schemas=['domain', 'deriva-ml']) orm = builder.build()

Create ORM from schema file

model = Model.fromfile("file-system", "schema.json") builder = SchemaBuilder(model, schemas=['domain'], database_path="local.db") orm = builder.build()

Use the ORM

ImageClass = orm.get_orm_class("Image") with Session(orm.engine) as session: images = session.query(ImageClass).all()

Clean up

orm.dispose()

Source code in src/deriva_ml/model/schema_builder.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
class SchemaBuilder:
    """Creates SQLAlchemy ORM from a Deriva catalog model.

    Phase 1 of the two-phase database creation pattern. This class handles
    only schema/ORM creation - no data loading.

    The Model can come from either a live catalog or a schema.json file:
    - From catalog: model = catalog.getCatalogModel()
    - From file: model = Model.fromfile("file-system", "path/to/schema.json")

    Example:
        # Create ORM from catalog model
        model = catalog.getCatalogModel()
        builder = SchemaBuilder(model, schemas=['domain', 'deriva-ml'])
        orm = builder.build()

        # Create ORM from schema file
        model = Model.fromfile("file-system", "schema.json")
        builder = SchemaBuilder(model, schemas=['domain'], database_path="local.db")
        orm = builder.build()

        # Use the ORM
        ImageClass = orm.get_orm_class("Image")
        with Session(orm.engine) as session:
            images = session.query(ImageClass).all()

        # Clean up
        orm.dispose()
    """

    # Type mapping from ERMrest to SQLAlchemy
    _TYPE_MAP = {
        "boolean": ERMRestBoolean,
        "date": StringToDate,
        "float4": StringToFloat,
        "float8": StringToFloat,
        "int2": StringToInteger,
        "int4": StringToInteger,
        "int8": StringToInteger,
        "json": JSON,
        "jsonb": JSON,
        "timestamptz": StringToDateTime,
        "timestamp": StringToDateTime,
    }

    def __init__(
        self,
        model: Model,
        schemas: list[str],
        database_path: Path | str = ":memory:",
    ):
        """Initialize the schema builder.

        Args:
            model: ERMrest Model object (from catalog or schema.json file).
            schemas: List of schema names to include in the ORM.
            database_path: Path to SQLite database file. Use ":memory:" for
                in-memory database (default). If a Path or string is provided,
                separate .db files will be created for each schema.
        """
        self.model = model
        self.schemas = schemas
        self.database_path = Path(database_path) if database_path != ":memory:" else database_path

        # Will be set during build()
        self.engine: Engine | None = None
        self.metadata: MetaData | None = None
        self.Base: AutomapBase | None = None
        self._class_prefix: str = ""

    @staticmethod
    def _sql_type(deriva_type: DerivaType) -> TypeEngine:
        """Map ERMrest type to SQLAlchemy type with CSV string conversion.

        Args:
            deriva_type: ERMrest type object.

        Returns:
            SQLAlchemy type class.
        """
        return SchemaBuilder._TYPE_MAP.get(deriva_type.typename, String)

    def _is_key_column(self, column: DerivaColumn, table: DerivaTable) -> bool:
        """Check if column is the primary key (RID)."""
        return column in [key.unique_columns[0] for key in table.keys] and column.name == "RID"

    def build(self) -> SchemaORM:
        """Build the SQLAlchemy ORM structure.

        Creates SQLite tables from the ERMrest schema and generates
        ORM classes via SQLAlchemy automap.

        Returns:
            SchemaORM object containing engine, metadata, Base, and utilities.

        Note:
            In-memory databases (database_path=":memory:") do not support
            SQLite schema attachments, so all tables will be created in a
            single database without schema prefixes in table names.
        """
        # Create unique prefix for ORM class names
        self._class_prefix = f"_{id(self)}_"

        # Determine if we're using in-memory or file-based database
        self._use_schemas = self.database_path != ":memory:"

        # Create engine
        if self.database_path == ":memory:":
            self.engine = create_engine("sqlite:///:memory:", future=True)
        else:
            # Ensure the database path exists
            if isinstance(self.database_path, Path):
                if self.database_path.suffix == ".db":
                    # Single file path
                    self.database_path.parent.mkdir(parents=True, exist_ok=True)
                    main_db = self.database_path
                else:
                    # Directory path
                    self.database_path.mkdir(parents=True, exist_ok=True)
                    main_db = self.database_path / "main.db"
            else:
                main_db = Path(self.database_path)
                main_db.parent.mkdir(parents=True, exist_ok=True)

            self.engine = create_engine(f"sqlite:///{main_db.resolve()}", future=True)

            # Attach schema-specific databases
            event.listen(self.engine, "connect", self._attach_schemas)

        self.metadata = MetaData()
        self.Base = automap_base(metadata=self.metadata)

        # Build the schema
        self._create_tables()

        logger.info(
            "Built ORM for schemas %s with %d tables",
            self.schemas,
            len(self.metadata.tables),
        )

        return SchemaORM(
            engine=self.engine,
            metadata=self.metadata,
            Base=self.Base,
            model=self.model,
            schemas=self.schemas,
            class_prefix=self._class_prefix,
            use_schemas=self._use_schemas,
        )

    def _attach_schemas(self, dbapi_conn, _conn_record):
        """Attach schema-specific SQLite databases."""
        cur = dbapi_conn.cursor()
        db_dir = self.database_path if self.database_path.is_dir() else self.database_path.parent
        for schema in self.schemas:
            schema_file = (db_dir / f"{schema}.db").resolve()
            cur.execute(f"ATTACH DATABASE '{schema_file}' AS '{schema}'")
        cur.close()

    def _create_tables(self) -> None:
        """Create SQLite tables from the ERMrest schema."""

        def col(model, name: str):
            """Get column from ORM class, handling both attribute and table column access."""
            try:
                return getattr(model, name).property.columns[0]
            except AttributeError:
                return model.__table__.c[name]

        def guess_attr_name(col_name: str) -> str:
            """Generate relationship attribute name from column name."""
            return col_name[:-3] if col_name.lower().endswith("_id") else col_name

        def make_table_name(schema_name: str, table_name: str) -> str:
            """Generate table name, including schema prefix if using schemas."""
            if self._use_schemas:
                return f"{schema_name}.{table_name}"
            else:
                # For in-memory, use underscore separator to avoid conflicts
                return f"{schema_name}_{table_name}"

        database_tables: list[SQLTable] = []

        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                logger.warning(f"Schema {schema_name} not found in model")
                continue

            for table in self.model.schemas[schema_name].tables.values():
                database_columns: list[SQLColumn] = []

                for c in table.columns:
                    database_column = SQLColumn(
                        name=c.name,
                        type_=self._sql_type(c.type),
                        comment=c.comment,
                        default=c.default,
                        primary_key=self._is_key_column(c, table),
                        nullable=c.nullok,
                    )
                    database_columns.append(database_column)

                # Use schema prefix only for file-based databases
                if self._use_schemas:
                    database_table = SQLTable(
                        table.name, self.metadata, *database_columns, schema=schema_name
                    )
                else:
                    # For in-memory, embed schema in table name
                    full_name = f"{schema_name}_{table.name}".replace("-", "_")
                    database_table = SQLTable(
                        full_name, self.metadata, *database_columns
                    )

                # Add unique constraints
                for key in table.keys:
                    key_columns = [c.name for c in key.unique_columns]
                    database_table.append_constraint(
                        SQLUniqueConstraint(*key_columns, name=key.name[1])
                    )

                # Add foreign key constraints (within same schema only for now)
                for fk in table.foreign_keys:
                    if fk.pk_table.schema.name not in self.schemas:
                        continue
                    if fk.pk_table.schema.name != schema_name:
                        continue

                    # Build reference column names
                    if self._use_schemas:
                        refcols = [
                            f"{schema_name}.{c.table.name}.{c.name}"
                            for c in fk.referenced_columns
                        ]
                    else:
                        # For in-memory, use the embedded schema name
                        ref_table_name = f"{schema_name}_{fk.pk_table.name}".replace("-", "_")
                        refcols = [
                            f"{ref_table_name}.{c.name}"
                            for c in fk.referenced_columns
                        ]

                    database_table.append_constraint(
                        SQLForeignKeyConstraint(
                            columns=[f"{c.name}" for c in fk.foreign_key_columns],
                            refcolumns=refcols,
                            name=fk.name[1],
                            comment=fk.comment,
                        )
                    )

                database_tables.append(database_table)

        # Create all tables
        with self.engine.begin() as conn:
            self.metadata.create_all(conn, tables=database_tables, checkfirst=True)

        # Configure ORM class naming
        def name_for_scalar_relationship(_base, local_cls, referred_cls, constraint):
            cols = list(constraint.columns) if constraint is not None else []
            if len(cols) == 1:
                name = cols[0].key
                if name in {c.key for c in local_cls.__table__.columns}:
                    name += "_rel"
                return name
            return constraint.name or referred_cls.__name__.lower()

        def name_for_collection_relationship(_base, local_cls, referred_cls, constraint):
            backref_name = constraint.name.replace("_fkey", "_collection")
            return backref_name or (referred_cls.__name__.lower() + "_collection")

        def classname_for_table(_base, tablename, table):
            return self._class_prefix + tablename.replace(".", "_").replace("-", "_")

        # Build ORM mappings
        self.Base.prepare(
            self.engine,
            name_for_scalar_relationship=name_for_scalar_relationship,
            name_for_collection_relationship=name_for_collection_relationship,
            classname_for_table=classname_for_table,
            reflect=True,
        )

        # Add cross-schema relationships
        for schema_name in self.schemas:
            if schema_name not in self.model.schemas:
                continue

            for table in self.model.schemas[schema_name].tables.values():
                for fk in table.foreign_keys:
                    if fk.pk_table.schema.name not in self.schemas:
                        continue
                    if fk.pk_table.schema.name == schema_name:
                        continue

                    table_name = make_table_name(schema_name, table.name)
                    table_class = self._get_orm_class_by_name(table_name)
                    foreign_key_column_name = fk.foreign_key_columns[0].name
                    foreign_key_column = col(table_class, foreign_key_column_name)

                    referenced_table_name = make_table_name(fk.pk_table.schema.name, fk.pk_table.name)
                    referenced_class = self._get_orm_class_by_name(referenced_table_name)
                    referenced_column = col(referenced_class, fk.referenced_columns[0].name)

                    relationship_attr = guess_attr_name(foreign_key_column_name)
                    backref_attr = fk.name[1].replace("_fkey", "_collection")

                    # Check if relationship already exists
                    existing_attr = getattr(table_class, relationship_attr, None)
                    from sqlalchemy.orm import RelationshipProperty
                    from sqlalchemy.orm.attributes import InstrumentedAttribute

                    is_relationship = isinstance(existing_attr, InstrumentedAttribute) and isinstance(
                        existing_attr.property, RelationshipProperty
                    )
                    if not is_relationship:
                        setattr(
                            table_class,
                            relationship_attr,
                            relationship(
                                referenced_class,
                                foreign_keys=[foreign_key_column],
                                primaryjoin=foreign(foreign_key_column) == referenced_column,
                                backref=backref(backref_attr, viewonly=True),
                                viewonly=True,
                            ),
                        )

        # Configure mappers
        self.Base.registry.configure()

    def _get_orm_class_by_name(self, table_name: str) -> Any | None:
        """Get ORM class by table name (internal use during build).

        Handles both schema.table format (file-based) and schema_table format (in-memory).
        """
        # Try exact match first
        if table_name in self.metadata.tables:
            sql_table = self.metadata.tables[table_name]
        else:
            # For in-memory databases, table names use underscore separator
            # Try converting schema.table to schema_table format
            if "." in table_name and not self._use_schemas:
                converted_name = table_name.replace(".", "_").replace("-", "_")
                if converted_name in self.metadata.tables:
                    sql_table = self.metadata.tables[converted_name]
                else:
                    sql_table = None
            else:
                # Try matching just the table name part
                sql_table = None
                for full_name, table in self.metadata.tables.items():
                    # Handle both . and _ separators
                    table_part = full_name.split(".")[-1] if "." in full_name else full_name.split("_", 1)[-1] if "_" in full_name else full_name
                    if table_part == table_name or full_name.endswith(f"_{table_name}"):
                        sql_table = table
                        break

        if sql_table is None:
            raise KeyError(f"Table {table_name} not found")

        for mapper in self.Base.registry.mappers:
            if mapper.persist_selectable is sql_table or sql_table in mapper.tables:
                return mapper.class_
        return None

__init__

__init__(
    model: Model,
    schemas: list[str],
    database_path: Path
    | str = ":memory:",
)

Initialize the schema builder.

Parameters:

Name Type Description Default
model Model

ERMrest Model object (from catalog or schema.json file).

required
schemas list[str]

List of schema names to include in the ORM.

required
database_path Path | str

Path to SQLite database file. Use ":memory:" for in-memory database (default). If a Path or string is provided, separate .db files will be created for each schema.

':memory:'
Source code in src/deriva_ml/model/schema_builder.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def __init__(
    self,
    model: Model,
    schemas: list[str],
    database_path: Path | str = ":memory:",
):
    """Initialize the schema builder.

    Args:
        model: ERMrest Model object (from catalog or schema.json file).
        schemas: List of schema names to include in the ORM.
        database_path: Path to SQLite database file. Use ":memory:" for
            in-memory database (default). If a Path or string is provided,
            separate .db files will be created for each schema.
    """
    self.model = model
    self.schemas = schemas
    self.database_path = Path(database_path) if database_path != ":memory:" else database_path

    # Will be set during build()
    self.engine: Engine | None = None
    self.metadata: MetaData | None = None
    self.Base: AutomapBase | None = None
    self._class_prefix: str = ""

build

build() -> SchemaORM

Build the SQLAlchemy ORM structure.

Creates SQLite tables from the ERMrest schema and generates ORM classes via SQLAlchemy automap.

Returns:

Type Description
SchemaORM

SchemaORM object containing engine, metadata, Base, and utilities.

Note

In-memory databases (database_path=":memory:") do not support SQLite schema attachments, so all tables will be created in a single database without schema prefixes in table names.

Source code in src/deriva_ml/model/schema_builder.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def build(self) -> SchemaORM:
    """Build the SQLAlchemy ORM structure.

    Creates SQLite tables from the ERMrest schema and generates
    ORM classes via SQLAlchemy automap.

    Returns:
        SchemaORM object containing engine, metadata, Base, and utilities.

    Note:
        In-memory databases (database_path=":memory:") do not support
        SQLite schema attachments, so all tables will be created in a
        single database without schema prefixes in table names.
    """
    # Create unique prefix for ORM class names
    self._class_prefix = f"_{id(self)}_"

    # Determine if we're using in-memory or file-based database
    self._use_schemas = self.database_path != ":memory:"

    # Create engine
    if self.database_path == ":memory:":
        self.engine = create_engine("sqlite:///:memory:", future=True)
    else:
        # Ensure the database path exists
        if isinstance(self.database_path, Path):
            if self.database_path.suffix == ".db":
                # Single file path
                self.database_path.parent.mkdir(parents=True, exist_ok=True)
                main_db = self.database_path
            else:
                # Directory path
                self.database_path.mkdir(parents=True, exist_ok=True)
                main_db = self.database_path / "main.db"
        else:
            main_db = Path(self.database_path)
            main_db.parent.mkdir(parents=True, exist_ok=True)

        self.engine = create_engine(f"sqlite:///{main_db.resolve()}", future=True)

        # Attach schema-specific databases
        event.listen(self.engine, "connect", self._attach_schemas)

    self.metadata = MetaData()
    self.Base = automap_base(metadata=self.metadata)

    # Build the schema
    self._create_tables()

    logger.info(
        "Built ORM for schemas %s with %d tables",
        self.schemas,
        len(self.metadata.tables),
    )

    return SchemaORM(
        engine=self.engine,
        metadata=self.metadata,
        Base=self.Base,
        model=self.model,
        schemas=self.schemas,
        class_prefix=self._class_prefix,
        use_schemas=self._use_schemas,
    )

SchemaORM

Container for SQLAlchemy ORM components.

Provides access to the ORM structure and utility methods for table/class lookup. This is the result of Phase 1 (SchemaBuilder).

Attributes:

Name Type Description
engine

SQLAlchemy Engine for database connections.

metadata

SQLAlchemy MetaData with table definitions.

Base

SQLAlchemy automap base for ORM classes.

model

ERMrest Model the ORM was built from.

schemas

List of schema names included.

use_schemas

Whether schema prefixes are used (False for in-memory).

Source code in src/deriva_ml/model/schema_builder.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
class SchemaORM:
    """Container for SQLAlchemy ORM components.

    Provides access to the ORM structure and utility methods for
    table/class lookup. This is the result of Phase 1 (SchemaBuilder).

    Attributes:
        engine: SQLAlchemy Engine for database connections.
        metadata: SQLAlchemy MetaData with table definitions.
        Base: SQLAlchemy automap base for ORM classes.
        model: ERMrest Model the ORM was built from.
        schemas: List of schema names included.
        use_schemas: Whether schema prefixes are used (False for in-memory).
    """

    def __init__(
        self,
        engine: Engine,
        metadata: MetaData,
        Base: AutomapBase,
        model: Model,
        schemas: list[str],
        class_prefix: str,
        use_schemas: bool = True,
    ):
        """Initialize SchemaORM container.

        Args:
            engine: SQLAlchemy Engine.
            metadata: SQLAlchemy MetaData with tables.
            Base: Automap base with ORM classes.
            model: Source ERMrest Model.
            schemas: Schemas that were included.
            class_prefix: Prefix used for ORM class names.
            use_schemas: Whether schema prefixes are used (False for in-memory).
        """
        self.engine = engine
        self.metadata = metadata
        self.Base = Base
        self.model = model
        self.schemas = schemas
        self._class_prefix = class_prefix
        self._use_schemas = use_schemas
        self._disposed = False

    def list_tables(self) -> list[str]:
        """List all tables in the database.

        Returns:
            List of fully-qualified table names (schema.table), sorted.
        """
        tables = list(self.metadata.tables.keys())
        tables.sort()
        return tables

    def find_table(self, table_name: str) -> SQLTable:
        """Find a table by name.

        Handles both schema.table format and schema_table format (for in-memory databases).

        Args:
            table_name: Table name, with or without schema prefix.
                Can be "schema.table", "schema_table", or just "table".

        Returns:
            SQLAlchemy Table object.

        Raises:
            KeyError: If table not found.
        """
        # Try exact match first
        if table_name in self.metadata.tables:
            return self.metadata.tables[table_name]

        # Try converting schema.table to schema_table format (for in-memory)
        if "." in table_name and not self._use_schemas:
            converted_name = table_name.replace(".", "_").replace("-", "_")
            if converted_name in self.metadata.tables:
                return self.metadata.tables[converted_name]

        # Try matching just the table name part
        for full_name, table in self.metadata.tables.items():
            # Handle . separator (file-based)
            if "." in full_name and full_name.split(".")[-1] == table_name:
                return table
            # Handle _ separator (in-memory) - match suffix after first _
            if "_" in full_name and "." not in full_name:
                # Check if table_name matches the part after schema prefix
                parts = full_name.split("_", 1)
                if len(parts) > 1 and parts[1] == table_name:
                    return table
                # Also check if it ends with the table name
                if full_name.endswith(f"_{table_name}"):
                    return table

        raise KeyError(f"Table {table_name} not found")

    def get_orm_class(self, table_name: str) -> Any | None:
        """Get the ORM class for a table by name.

        Args:
            table_name: Table name, with or without schema prefix.

        Returns:
            SQLAlchemy ORM class for the table.

        Raises:
            KeyError: If table not found.
        """
        sql_table = self.find_table(table_name)
        return self.get_orm_class_for_table(sql_table)

    def get_orm_class_for_table(self, table: SQLTable | DerivaTable | str) -> Any | None:
        """Get the ORM class for a table.

        Args:
            table: SQLAlchemy Table, Deriva Table, or table name.

        Returns:
            SQLAlchemy ORM class, or None if not found.
        """
        if isinstance(table, DerivaTable):
            # Try schema.table format first (file-based), then schema_table (in-memory)
            table_key = f"{table.schema.name}.{table.name}"
            table = self.metadata.tables.get(table_key)
            if table is None and not self._use_schemas:
                # Try underscore format for in-memory databases
                table_key = f"{table.schema.name}_{table.name}".replace("-", "_")
                table = self.metadata.tables.get(table_key)
        if isinstance(table, str):
            table = self.find_table(table)
        if table is None:
            return None

        for mapper in self.Base.registry.mappers:
            if mapper.persist_selectable is table or table in mapper.tables:
                return mapper.class_
        return None

    def get_table_contents(self, table: str) -> Generator[dict[str, Any], None, None]:
        """Retrieve all rows from a table as dictionaries.

        Args:
            table: Table name (with or without schema prefix).

        Yields:
            Dictionary for each row with column names as keys.
        """
        sql_table = self.find_table(table)
        with self.engine.connect() as conn:
            result = conn.execute(select(sql_table))
            for row in result.mappings():
                yield dict(row)

    @staticmethod
    def is_association_table(
        table_class,
        min_arity: int = 2,
        max_arity: int = 2,
        unqualified: bool = True,
        pure: bool = True,
        no_overlap: bool = True,
        return_fkeys: bool = False,
    ):
        """Check if an ORM class represents an association table.

        An association table links two or more tables through foreign keys,
        with a composite unique key covering those foreign keys.

        Args:
            table_class: SQLAlchemy ORM class to check.
            min_arity: Minimum number of foreign keys (default 2).
            max_arity: Maximum number of foreign keys (default 2).
            unqualified: If True, reject associations with extra key columns.
            pure: If True, reject associations with extra non-key columns.
            no_overlap: If True, reject associations with shared FK columns.
            return_fkeys: If True, return the foreign keys instead of arity.

        Returns:
            If return_fkeys=False: Integer arity if association, False otherwise.
            If return_fkeys=True: Set of foreign keys if association, False otherwise.
        """
        if min_arity < 2:
            raise ValueError("An association cannot have arity < 2")
        if max_arity is not None and max_arity < min_arity:
            raise ValueError("max_arity cannot be less than min_arity")

        mapper = inspect(table_class).mapper
        system_cols = {"RID", "RCT", "RMT", "RCB", "RMB"}

        non_sys_cols = {
            col.name for col in mapper.columns if col.name not in system_cols
        }

        unique_columns = [
            {c.name for c in constraint.columns}
            for constraint in inspect(table_class).local_table.constraints
            if isinstance(constraint, SQLUniqueConstraint)
        ]

        non_sys_key_colsets = {
            frozenset(uc)
            for uc in unique_columns
            if uc.issubset(non_sys_cols) and len(uc) > 1
        }

        if not non_sys_key_colsets:
            return False

        # Choose longest compound key
        row_key = sorted(non_sys_key_colsets, key=lambda s: len(s), reverse=True)[0]
        foreign_keys = list(inspect(table_class).relationships.values())

        covered_fkeys = {
            fkey for fkey in foreign_keys
            if {c.name for c in fkey.local_columns}.issubset(row_key)
        }
        covered_fkey_cols = set()

        if len(covered_fkeys) < min_arity:
            return False
        if max_arity is not None and len(covered_fkeys) > max_arity:
            return False

        for fkey in covered_fkeys:
            fkcols = {c.name for c in fkey.local_columns}
            if no_overlap and fkcols.intersection(covered_fkey_cols):
                return False
            covered_fkey_cols.update(fkcols)

        if unqualified and row_key.difference(covered_fkey_cols):
            return False

        if pure and non_sys_cols.difference(row_key):
            return False

        return covered_fkeys if return_fkeys else len(covered_fkeys)

    def get_association_class(
        self,
        left_cls: Type[Any],
        right_cls: Type[Any],
    ) -> tuple[Any, Any, Any] | None:
        """Find an association class connecting two ORM classes.

        Args:
            left_cls: First ORM class.
            right_cls: Second ORM class.

        Returns:
            Tuple of (association_class, left_relationship, right_relationship),
            or None if no association found.
        """
        for _, left_rel in inspect(left_cls).relationships.items():
            mid_cls = left_rel.mapper.class_
            is_assoc = self.is_association_table(mid_cls, return_fkeys=True)

            if not is_assoc:
                continue

            assoc_local_columns_left = list(is_assoc)[0].local_columns
            assoc_local_columns_right = list(is_assoc)[1].local_columns

            found_left = found_right = False

            for r in inspect(left_cls).relationships.values():
                remote_side = list(r.remote_side)[0]
                if remote_side in assoc_local_columns_left:
                    found_left = r
                if remote_side in assoc_local_columns_right:
                    found_left = r
                    # Swap if backwards
                    assoc_local_columns_left, assoc_local_columns_right = (
                        assoc_local_columns_right,
                        assoc_local_columns_left,
                    )

            for r in inspect(right_cls).relationships.values():
                remote_side = list(r.remote_side)[0]
                if remote_side in assoc_local_columns_right:
                    found_right = r

            if found_left and found_right:
                return mid_cls, found_left.class_attribute, found_right.class_attribute

        return None

    def dispose(self) -> None:
        """Dispose of SQLAlchemy resources.

        Call this when done with the database to properly clean up connections.
        After calling dispose(), the instance should not be used further.
        """
        if self._disposed:
            return

        if hasattr(self, "Base") and self.Base is not None:
            self.Base.registry.dispose()
        if hasattr(self, "engine") and self.engine is not None:
            self.engine.dispose()

        self._disposed = True

    def __del__(self) -> None:
        """Cleanup resources when garbage collected."""
        self.dispose()

    def __enter__(self) -> "SchemaORM":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        """Context manager exit - dispose resources."""
        self.dispose()
        return False

__del__

__del__() -> None

Cleanup resources when garbage collected.

Source code in src/deriva_ml/model/schema_builder.py
433
434
435
def __del__(self) -> None:
    """Cleanup resources when garbage collected."""
    self.dispose()

__enter__

__enter__() -> 'SchemaORM'

Context manager entry.

Source code in src/deriva_ml/model/schema_builder.py
437
438
439
def __enter__(self) -> "SchemaORM":
    """Context manager entry."""
    return self

__exit__

__exit__(
    exc_type, exc_val, exc_tb
) -> bool

Context manager exit - dispose resources.

Source code in src/deriva_ml/model/schema_builder.py
441
442
443
444
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
    """Context manager exit - dispose resources."""
    self.dispose()
    return False

__init__

__init__(
    engine: Engine,
    metadata: MetaData,
    Base: AutomapBase,
    model: Model,
    schemas: list[str],
    class_prefix: str,
    use_schemas: bool = True,
)

Initialize SchemaORM container.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy Engine.

required
metadata MetaData

SQLAlchemy MetaData with tables.

required
Base AutomapBase

Automap base with ORM classes.

required
model Model

Source ERMrest Model.

required
schemas list[str]

Schemas that were included.

required
class_prefix str

Prefix used for ORM class names.

required
use_schemas bool

Whether schema prefixes are used (False for in-memory).

True
Source code in src/deriva_ml/model/schema_builder.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(
    self,
    engine: Engine,
    metadata: MetaData,
    Base: AutomapBase,
    model: Model,
    schemas: list[str],
    class_prefix: str,
    use_schemas: bool = True,
):
    """Initialize SchemaORM container.

    Args:
        engine: SQLAlchemy Engine.
        metadata: SQLAlchemy MetaData with tables.
        Base: Automap base with ORM classes.
        model: Source ERMrest Model.
        schemas: Schemas that were included.
        class_prefix: Prefix used for ORM class names.
        use_schemas: Whether schema prefixes are used (False for in-memory).
    """
    self.engine = engine
    self.metadata = metadata
    self.Base = Base
    self.model = model
    self.schemas = schemas
    self._class_prefix = class_prefix
    self._use_schemas = use_schemas
    self._disposed = False

dispose

dispose() -> None

Dispose of SQLAlchemy resources.

Call this when done with the database to properly clean up connections. After calling dispose(), the instance should not be used further.

Source code in src/deriva_ml/model/schema_builder.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def dispose(self) -> None:
    """Dispose of SQLAlchemy resources.

    Call this when done with the database to properly clean up connections.
    After calling dispose(), the instance should not be used further.
    """
    if self._disposed:
        return

    if hasattr(self, "Base") and self.Base is not None:
        self.Base.registry.dispose()
    if hasattr(self, "engine") and self.engine is not None:
        self.engine.dispose()

    self._disposed = True

find_table

find_table(table_name: str) -> SQLTable

Find a table by name.

Handles both schema.table format and schema_table format (for in-memory databases).

Parameters:

Name Type Description Default
table_name str

Table name, with or without schema prefix. Can be "schema.table", "schema_table", or just "table".

required

Returns:

Type Description
Table

SQLAlchemy Table object.

Raises:

Type Description
KeyError

If table not found.

Source code in src/deriva_ml/model/schema_builder.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def find_table(self, table_name: str) -> SQLTable:
    """Find a table by name.

    Handles both schema.table format and schema_table format (for in-memory databases).

    Args:
        table_name: Table name, with or without schema prefix.
            Can be "schema.table", "schema_table", or just "table".

    Returns:
        SQLAlchemy Table object.

    Raises:
        KeyError: If table not found.
    """
    # Try exact match first
    if table_name in self.metadata.tables:
        return self.metadata.tables[table_name]

    # Try converting schema.table to schema_table format (for in-memory)
    if "." in table_name and not self._use_schemas:
        converted_name = table_name.replace(".", "_").replace("-", "_")
        if converted_name in self.metadata.tables:
            return self.metadata.tables[converted_name]

    # Try matching just the table name part
    for full_name, table in self.metadata.tables.items():
        # Handle . separator (file-based)
        if "." in full_name and full_name.split(".")[-1] == table_name:
            return table
        # Handle _ separator (in-memory) - match suffix after first _
        if "_" in full_name and "." not in full_name:
            # Check if table_name matches the part after schema prefix
            parts = full_name.split("_", 1)
            if len(parts) > 1 and parts[1] == table_name:
                return table
            # Also check if it ends with the table name
            if full_name.endswith(f"_{table_name}"):
                return table

    raise KeyError(f"Table {table_name} not found")

get_association_class

get_association_class(
    left_cls: Type[Any],
    right_cls: Type[Any],
) -> tuple[Any, Any, Any] | None

Find an association class connecting two ORM classes.

Parameters:

Name Type Description Default
left_cls Type[Any]

First ORM class.

required
right_cls Type[Any]

Second ORM class.

required

Returns:

Type Description
tuple[Any, Any, Any] | None

Tuple of (association_class, left_relationship, right_relationship),

tuple[Any, Any, Any] | None

or None if no association found.

Source code in src/deriva_ml/model/schema_builder.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def get_association_class(
    self,
    left_cls: Type[Any],
    right_cls: Type[Any],
) -> tuple[Any, Any, Any] | None:
    """Find an association class connecting two ORM classes.

    Args:
        left_cls: First ORM class.
        right_cls: Second ORM class.

    Returns:
        Tuple of (association_class, left_relationship, right_relationship),
        or None if no association found.
    """
    for _, left_rel in inspect(left_cls).relationships.items():
        mid_cls = left_rel.mapper.class_
        is_assoc = self.is_association_table(mid_cls, return_fkeys=True)

        if not is_assoc:
            continue

        assoc_local_columns_left = list(is_assoc)[0].local_columns
        assoc_local_columns_right = list(is_assoc)[1].local_columns

        found_left = found_right = False

        for r in inspect(left_cls).relationships.values():
            remote_side = list(r.remote_side)[0]
            if remote_side in assoc_local_columns_left:
                found_left = r
            if remote_side in assoc_local_columns_right:
                found_left = r
                # Swap if backwards
                assoc_local_columns_left, assoc_local_columns_right = (
                    assoc_local_columns_right,
                    assoc_local_columns_left,
                )

        for r in inspect(right_cls).relationships.values():
            remote_side = list(r.remote_side)[0]
            if remote_side in assoc_local_columns_right:
                found_right = r

        if found_left and found_right:
            return mid_cls, found_left.class_attribute, found_right.class_attribute

    return None

get_orm_class

get_orm_class(
    table_name: str,
) -> Any | None

Get the ORM class for a table by name.

Parameters:

Name Type Description Default
table_name str

Table name, with or without schema prefix.

required

Returns:

Type Description
Any | None

SQLAlchemy ORM class for the table.

Raises:

Type Description
KeyError

If table not found.

Source code in src/deriva_ml/model/schema_builder.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def get_orm_class(self, table_name: str) -> Any | None:
    """Get the ORM class for a table by name.

    Args:
        table_name: Table name, with or without schema prefix.

    Returns:
        SQLAlchemy ORM class for the table.

    Raises:
        KeyError: If table not found.
    """
    sql_table = self.find_table(table_name)
    return self.get_orm_class_for_table(sql_table)

get_orm_class_for_table

get_orm_class_for_table(
    table: Table | Table | str,
) -> Any | None

Get the ORM class for a table.

Parameters:

Name Type Description Default
table Table | Table | str

SQLAlchemy Table, Deriva Table, or table name.

required

Returns:

Type Description
Any | None

SQLAlchemy ORM class, or None if not found.

Source code in src/deriva_ml/model/schema_builder.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_orm_class_for_table(self, table: SQLTable | DerivaTable | str) -> Any | None:
    """Get the ORM class for a table.

    Args:
        table: SQLAlchemy Table, Deriva Table, or table name.

    Returns:
        SQLAlchemy ORM class, or None if not found.
    """
    if isinstance(table, DerivaTable):
        # Try schema.table format first (file-based), then schema_table (in-memory)
        table_key = f"{table.schema.name}.{table.name}"
        table = self.metadata.tables.get(table_key)
        if table is None and not self._use_schemas:
            # Try underscore format for in-memory databases
            table_key = f"{table.schema.name}_{table.name}".replace("-", "_")
            table = self.metadata.tables.get(table_key)
    if isinstance(table, str):
        table = self.find_table(table)
    if table is None:
        return None

    for mapper in self.Base.registry.mappers:
        if mapper.persist_selectable is table or table in mapper.tables:
            return mapper.class_
    return None

get_table_contents

get_table_contents(
    table: str,
) -> Generator[
    dict[str, Any], None, None
]

Retrieve all rows from a table as dictionaries.

Parameters:

Name Type Description Default
table str

Table name (with or without schema prefix).

required

Yields:

Type Description
dict[str, Any]

Dictionary for each row with column names as keys.

Source code in src/deriva_ml/model/schema_builder.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_table_contents(self, table: str) -> Generator[dict[str, Any], None, None]:
    """Retrieve all rows from a table as dictionaries.

    Args:
        table: Table name (with or without schema prefix).

    Yields:
        Dictionary for each row with column names as keys.
    """
    sql_table = self.find_table(table)
    with self.engine.connect() as conn:
        result = conn.execute(select(sql_table))
        for row in result.mappings():
            yield dict(row)

is_association_table staticmethod

is_association_table(
    table_class,
    min_arity: int = 2,
    max_arity: int = 2,
    unqualified: bool = True,
    pure: bool = True,
    no_overlap: bool = True,
    return_fkeys: bool = False,
)

Check if an ORM class represents an association table.

An association table links two or more tables through foreign keys, with a composite unique key covering those foreign keys.

Parameters:

Name Type Description Default
table_class

SQLAlchemy ORM class to check.

required
min_arity int

Minimum number of foreign keys (default 2).

2
max_arity int

Maximum number of foreign keys (default 2).

2
unqualified bool

If True, reject associations with extra key columns.

True
pure bool

If True, reject associations with extra non-key columns.

True
no_overlap bool

If True, reject associations with shared FK columns.

True
return_fkeys bool

If True, return the foreign keys instead of arity.

False

Returns:

Type Description

If return_fkeys=False: Integer arity if association, False otherwise.

If return_fkeys=True: Set of foreign keys if association, False otherwise.

Source code in src/deriva_ml/model/schema_builder.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@staticmethod
def is_association_table(
    table_class,
    min_arity: int = 2,
    max_arity: int = 2,
    unqualified: bool = True,
    pure: bool = True,
    no_overlap: bool = True,
    return_fkeys: bool = False,
):
    """Check if an ORM class represents an association table.

    An association table links two or more tables through foreign keys,
    with a composite unique key covering those foreign keys.

    Args:
        table_class: SQLAlchemy ORM class to check.
        min_arity: Minimum number of foreign keys (default 2).
        max_arity: Maximum number of foreign keys (default 2).
        unqualified: If True, reject associations with extra key columns.
        pure: If True, reject associations with extra non-key columns.
        no_overlap: If True, reject associations with shared FK columns.
        return_fkeys: If True, return the foreign keys instead of arity.

    Returns:
        If return_fkeys=False: Integer arity if association, False otherwise.
        If return_fkeys=True: Set of foreign keys if association, False otherwise.
    """
    if min_arity < 2:
        raise ValueError("An association cannot have arity < 2")
    if max_arity is not None and max_arity < min_arity:
        raise ValueError("max_arity cannot be less than min_arity")

    mapper = inspect(table_class).mapper
    system_cols = {"RID", "RCT", "RMT", "RCB", "RMB"}

    non_sys_cols = {
        col.name for col in mapper.columns if col.name not in system_cols
    }

    unique_columns = [
        {c.name for c in constraint.columns}
        for constraint in inspect(table_class).local_table.constraints
        if isinstance(constraint, SQLUniqueConstraint)
    ]

    non_sys_key_colsets = {
        frozenset(uc)
        for uc in unique_columns
        if uc.issubset(non_sys_cols) and len(uc) > 1
    }

    if not non_sys_key_colsets:
        return False

    # Choose longest compound key
    row_key = sorted(non_sys_key_colsets, key=lambda s: len(s), reverse=True)[0]
    foreign_keys = list(inspect(table_class).relationships.values())

    covered_fkeys = {
        fkey for fkey in foreign_keys
        if {c.name for c in fkey.local_columns}.issubset(row_key)
    }
    covered_fkey_cols = set()

    if len(covered_fkeys) < min_arity:
        return False
    if max_arity is not None and len(covered_fkeys) > max_arity:
        return False

    for fkey in covered_fkeys:
        fkcols = {c.name for c in fkey.local_columns}
        if no_overlap and fkcols.intersection(covered_fkey_cols):
            return False
        covered_fkey_cols.update(fkcols)

    if unqualified and row_key.difference(covered_fkey_cols):
        return False

    if pure and non_sys_cols.difference(row_key):
        return False

    return covered_fkeys if return_fkeys else len(covered_fkeys)

list_tables

list_tables() -> list[str]

List all tables in the database.

Returns:

Type Description
list[str]

List of fully-qualified table names (schema.table), sorted.

Source code in src/deriva_ml/model/schema_builder.py
175
176
177
178
179
180
181
182
183
def list_tables(self) -> list[str]:
    """List all tables in the database.

    Returns:
        List of fully-qualified table names (schema.table), sorted.
    """
    tables = list(self.metadata.tables.keys())
    tables.sort()
    return tables

SortKey dataclass

A sort key for row ordering.

Parameters:

Name Type Description Default
column str

Column name to sort by

required
descending bool

Sort in descending order (default False)

False
Example

SortKey("Name") # Ascending SortKey("Created", descending=True) # Descending

Source code in src/deriva_ml/model/annotations.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
@dataclass
class SortKey:
    """A sort key for row ordering.

    Args:
        column: Column name to sort by
        descending: Sort in descending order (default False)

    Example:
        >>> SortKey("Name")  # Ascending
        >>> SortKey("Created", descending=True)  # Descending
    """
    column: str
    descending: bool = False

    def to_dict(self) -> dict[str, Any] | str:
        """Convert to dict or string (if ascending)."""
        if self.descending:
            return {"column": self.column, "descending": True}
        return self.column

to_dict

to_dict() -> dict[str, Any] | str

Convert to dict or string (if ascending).

Source code in src/deriva_ml/model/annotations.py
433
434
435
436
437
def to_dict(self) -> dict[str, Any] | str:
    """Convert to dict or string (if ascending)."""
    if self.descending:
        return {"column": self.column, "descending": True}
    return self.column

TableDisplay dataclass

Bases: AnnotationBuilder

Table-display annotation builder.

Controls table-level display options like row naming and ordering.

Example

td = TableDisplay() td.row_name(row_markdown_pattern="{{{Name}}} ({{{Species}}})") td.compact(row_order=[SortKey("Name")])

Source code in src/deriva_ml/model/annotations.py
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
@dataclass
class TableDisplay(AnnotationBuilder):
    """Table-display annotation builder.

    Controls table-level display options like row naming and ordering.

    Example:
        >>> td = TableDisplay()
        >>> td.row_name(row_markdown_pattern="{{{Name}}} ({{{Species}}})")
        >>> td.compact(row_order=[SortKey("Name")])
    """
    tag = TAG_TABLE_DISPLAY

    _contexts: dict[str, TableDisplayOptions | str | None] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        options: TableDisplayOptions | str | None
    ) -> "TableDisplay":
        """Set options for a context."""
        self._contexts[context] = options
        return self

    def row_name(
        self,
        row_markdown_pattern: str,
        template_engine: TemplateEngine | None = None
    ) -> "TableDisplay":
        """Set row name pattern (used in foreign key dropdowns, etc.)."""
        return self.set_context(
            CONTEXT_ROW_NAME,
            TableDisplayOptions(
                row_markdown_pattern=row_markdown_pattern,
                template_engine=template_engine
            )
        )

    def compact(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set options for compact (list) view."""
        return self.set_context(CONTEXT_COMPACT, options)

    def detailed(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set options for detailed (record) view."""
        return self.set_context(CONTEXT_DETAILED, options)

    def default(self, options: TableDisplayOptions) -> "TableDisplay":
        """Set default options."""
        return self.set_context(CONTEXT_DEFAULT, options)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, options in self._contexts.items():
            if options is None:
                result[context] = None
            elif isinstance(options, str):
                result[context] = options
            else:
                result[context] = options.to_dict()
        return result

compact

compact(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set options for compact (list) view.

Source code in src/deriva_ml/model/annotations.py
1011
1012
1013
def compact(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set options for compact (list) view."""
    return self.set_context(CONTEXT_COMPACT, options)

default

default(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set default options.

Source code in src/deriva_ml/model/annotations.py
1019
1020
1021
def default(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set default options."""
    return self.set_context(CONTEXT_DEFAULT, options)

detailed

detailed(
    options: TableDisplayOptions,
) -> "TableDisplay"

Set options for detailed (record) view.

Source code in src/deriva_ml/model/annotations.py
1015
1016
1017
def detailed(self, options: TableDisplayOptions) -> "TableDisplay":
    """Set options for detailed (record) view."""
    return self.set_context(CONTEXT_DETAILED, options)

row_name

row_name(
    row_markdown_pattern: str,
    template_engine: TemplateEngine
    | None = None,
) -> "TableDisplay"

Set row name pattern (used in foreign key dropdowns, etc.).

Source code in src/deriva_ml/model/annotations.py
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def row_name(
    self,
    row_markdown_pattern: str,
    template_engine: TemplateEngine | None = None
) -> "TableDisplay":
    """Set row name pattern (used in foreign key dropdowns, etc.)."""
    return self.set_context(
        CONTEXT_ROW_NAME,
        TableDisplayOptions(
            row_markdown_pattern=row_markdown_pattern,
            template_engine=template_engine
        )
    )

set_context

set_context(
    context: str,
    options: TableDisplayOptions
    | str
    | None,
) -> "TableDisplay"

Set options for a context.

Source code in src/deriva_ml/model/annotations.py
988
989
990
991
992
993
994
995
def set_context(
    self,
    context: str,
    options: TableDisplayOptions | str | None
) -> "TableDisplay":
    """Set options for a context."""
    self._contexts[context] = options
    return self

TableDisplayOptions dataclass

Options for a single table display context.

Parameters:

Name Type Description Default
row_order list[SortKey] | None

Sort order for rows

None
page_size int | None

Number of rows per page

None
row_markdown_pattern str | None

Template for row names

None
page_markdown_pattern str | None

Template for page header

None
separator_markdown str | None

Template between rows

None
prefix_markdown str | None

Template before rows

None
suffix_markdown str | None

Template after rows

None
template_engine TemplateEngine | None

Template engine for patterns

None
collapse_toc_panel bool | None

Collapse TOC panel

None
hide_column_headers bool | None

Hide column headers

None
Source code in src/deriva_ml/model/annotations.py
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
@dataclass
class TableDisplayOptions:
    """Options for a single table display context.

    Args:
        row_order: Sort order for rows
        page_size: Number of rows per page
        row_markdown_pattern: Template for row names
        page_markdown_pattern: Template for page header
        separator_markdown: Template between rows
        prefix_markdown: Template before rows
        suffix_markdown: Template after rows
        template_engine: Template engine for patterns
        collapse_toc_panel: Collapse TOC panel
        hide_column_headers: Hide column headers
    """
    row_order: list[SortKey] | None = None
    page_size: int | None = None
    row_markdown_pattern: str | None = None
    page_markdown_pattern: str | None = None
    separator_markdown: str | None = None
    prefix_markdown: str | None = None
    suffix_markdown: str | None = None
    template_engine: TemplateEngine | None = None
    collapse_toc_panel: bool | None = None
    hide_column_headers: bool | None = None

    def to_dict(self) -> dict[str, Any]:
        result = {}
        if self.row_order is not None:
            result["row_order"] = [
                k.to_dict() if isinstance(k, SortKey) else k
                for k in self.row_order
            ]
        if self.page_size is not None:
            result["page_size"] = self.page_size
        if self.row_markdown_pattern is not None:
            result["row_markdown_pattern"] = self.row_markdown_pattern
        if self.page_markdown_pattern is not None:
            result["page_markdown_pattern"] = self.page_markdown_pattern
        if self.separator_markdown is not None:
            result["separator_markdown"] = self.separator_markdown
        if self.prefix_markdown is not None:
            result["prefix_markdown"] = self.prefix_markdown
        if self.suffix_markdown is not None:
            result["suffix_markdown"] = self.suffix_markdown
        if self.template_engine is not None:
            result["template_engine"] = self.template_engine.value
        if self.collapse_toc_panel is not None:
            result["collapse_toc_panel"] = self.collapse_toc_panel
        if self.hide_column_headers is not None:
            result["hide_column_headers"] = self.hide_column_headers
        return result

TemplateEngine

Bases: str, Enum

Template engine for markdown patterns.

Attributes:

Name Type Description
HANDLEBARS

Use Handlebars.js templating (recommended, more features)

MUSTACHE

Use Mustache templating (simpler, fewer features)

Example

display = PseudoColumnDisplay( ... markdown_pattern="{{{Name}}}", ... template_engine=TemplateEngine.HANDLEBARS ... )

Source code in src/deriva_ml/model/annotations.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class TemplateEngine(str, Enum):
    """Template engine for markdown patterns.

    Attributes:
        HANDLEBARS: Use Handlebars.js templating (recommended, more features)
        MUSTACHE: Use Mustache templating (simpler, fewer features)

    Example:
        >>> display = PseudoColumnDisplay(
        ...     markdown_pattern="[{{{Name}}}]({{{URL}}})",
        ...     template_engine=TemplateEngine.HANDLEBARS
        ... )
    """
    HANDLEBARS = "handlebars"
    MUSTACHE = "mustache"

VisibleColumns dataclass

Bases: AnnotationBuilder

Visible-columns annotation builder.

Controls which columns appear in different UI contexts and their order. This is one of the most commonly used annotations for customizing the Chaise interface.

Column entries can be: - Column names (strings): "Name", "RID", "Description" - Foreign key references: fk_constraint("schema", "constraint_name") - Pseudo-columns: PseudoColumn(...) for computed/derived values

Contexts: - compact: Table/list views (search results, data browser) - detailed: Single record view (full record page) - entry: Create/edit forms - entry/create: Create form only - entry/edit: Edit form only - *: Default for all contexts

Example

Basic column lists for different contexts::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name", "Status"])
>>> vc.detailed(["RID", "Name", "Status", "Description", "Created"])
>>> vc.entry(["Name", "Status", "Description"])
>>> handle.set_annotation(vc)

Method chaining::

>>> vc = (VisibleColumns()
...     .compact(["RID", "Name"])
...     .detailed(["RID", "Name", "Description"])
...     .entry(["Name", "Description"]))

Including foreign key references::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     fk_constraint("domain", "Subject_Species_fkey"),
... ])

With pseudo-columns for computed values::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     PseudoColumn(
...         source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
...         aggregate=Aggregate.CNT,
...         markdown_name="Samples"
...     ),
... ])

Context inheritance (reference another context)::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name"])
>>> vc.set_context("compact/brief", "compact")  # Inherit from compact

With faceted search (filter context)::

>>> vc = VisibleColumns()
>>> vc.compact(["RID", "Name", "Status"])
>>> facets = FacetList()
>>> facets.add(Facet(source="Status", open=True))
>>> vc._contexts["filter"] = facets.to_dict()
Source code in src/deriva_ml/model/annotations.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
@dataclass
class VisibleColumns(AnnotationBuilder):
    """Visible-columns annotation builder.

    Controls which columns appear in different UI contexts and their order.
    This is one of the most commonly used annotations for customizing the
    Chaise interface.

    Column entries can be:
    - Column names (strings): "Name", "RID", "Description"
    - Foreign key references: fk_constraint("schema", "constraint_name")
    - Pseudo-columns: PseudoColumn(...) for computed/derived values

    Contexts:
    - ``compact``: Table/list views (search results, data browser)
    - ``detailed``: Single record view (full record page)
    - ``entry``: Create/edit forms
    - ``entry/create``: Create form only
    - ``entry/edit``: Edit form only
    - ``*``: Default for all contexts

    Example:
        Basic column lists for different contexts::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name", "Status"])
            >>> vc.detailed(["RID", "Name", "Status", "Description", "Created"])
            >>> vc.entry(["Name", "Status", "Description"])
            >>> handle.set_annotation(vc)

        Method chaining::

            >>> vc = (VisibleColumns()
            ...     .compact(["RID", "Name"])
            ...     .detailed(["RID", "Name", "Description"])
            ...     .entry(["Name", "Description"]))

        Including foreign key references::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     fk_constraint("domain", "Subject_Species_fkey"),
            ... ])

        With pseudo-columns for computed values::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     PseudoColumn(
            ...         source=[InboundFK("domain", "Sample_Subject_fkey"), "RID"],
            ...         aggregate=Aggregate.CNT,
            ...         markdown_name="Samples"
            ...     ),
            ... ])

        Context inheritance (reference another context)::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name"])
            >>> vc.set_context("compact/brief", "compact")  # Inherit from compact

        With faceted search (filter context)::

            >>> vc = VisibleColumns()
            >>> vc.compact(["RID", "Name", "Status"])
            >>> facets = FacetList()
            >>> facets.add(Facet(source="Status", open=True))
            >>> vc._contexts["filter"] = facets.to_dict()
    """
    tag = TAG_VISIBLE_COLUMNS

    _contexts: dict[str, list[ColumnEntry] | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        columns: list[ColumnEntry] | str
    ) -> "VisibleColumns":
        """Set columns for a context.

        Args:
            context: Context name (e.g., "compact", "detailed", "*")
            columns: List of columns, or string referencing another context

        Returns:
            Self for chaining
        """
        self._contexts[context] = columns
        return self

    def compact(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for compact (list) view."""
        return self.set_context(CONTEXT_COMPACT, columns)

    def detailed(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for detailed (record) view."""
        return self.set_context(CONTEXT_DETAILED, columns)

    def entry(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for entry (create/edit) forms."""
        return self.set_context(CONTEXT_ENTRY, columns)

    def entry_create(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for create form only."""
        return self.set_context(CONTEXT_ENTRY_CREATE, columns)

    def entry_edit(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set columns for edit form only."""
        return self.set_context(CONTEXT_ENTRY_EDIT, columns)

    def default(self, columns: list[ColumnEntry]) -> "VisibleColumns":
        """Set default columns for all contexts."""
        return self.set_context(CONTEXT_DEFAULT, columns)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, columns in self._contexts.items():
            if isinstance(columns, str):
                result[context] = columns
            else:
                result[context] = [
                    c.to_dict() if isinstance(c, PseudoColumn) else c
                    for c in columns
                ]
        return result

compact

compact(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for compact (list) view.

Source code in src/deriva_ml/model/annotations.py
822
823
824
def compact(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for compact (list) view."""
    return self.set_context(CONTEXT_COMPACT, columns)

default

default(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set default columns for all contexts.

Source code in src/deriva_ml/model/annotations.py
842
843
844
def default(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set default columns for all contexts."""
    return self.set_context(CONTEXT_DEFAULT, columns)

detailed

detailed(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for detailed (record) view.

Source code in src/deriva_ml/model/annotations.py
826
827
828
def detailed(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for detailed (record) view."""
    return self.set_context(CONTEXT_DETAILED, columns)

entry

entry(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for entry (create/edit) forms.

Source code in src/deriva_ml/model/annotations.py
830
831
832
def entry(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for entry (create/edit) forms."""
    return self.set_context(CONTEXT_ENTRY, columns)

entry_create

entry_create(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for create form only.

Source code in src/deriva_ml/model/annotations.py
834
835
836
def entry_create(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for create form only."""
    return self.set_context(CONTEXT_ENTRY_CREATE, columns)

entry_edit

entry_edit(
    columns: list[ColumnEntry],
) -> "VisibleColumns"

Set columns for edit form only.

Source code in src/deriva_ml/model/annotations.py
838
839
840
def entry_edit(self, columns: list[ColumnEntry]) -> "VisibleColumns":
    """Set columns for edit form only."""
    return self.set_context(CONTEXT_ENTRY_EDIT, columns)

set_context

set_context(
    context: str,
    columns: list[ColumnEntry] | str,
) -> "VisibleColumns"

Set columns for a context.

Parameters:

Name Type Description Default
context str

Context name (e.g., "compact", "detailed", "*")

required
columns list[ColumnEntry] | str

List of columns, or string referencing another context

required

Returns:

Type Description
'VisibleColumns'

Self for chaining

Source code in src/deriva_ml/model/annotations.py
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
def set_context(
    self,
    context: str,
    columns: list[ColumnEntry] | str
) -> "VisibleColumns":
    """Set columns for a context.

    Args:
        context: Context name (e.g., "compact", "detailed", "*")
        columns: List of columns, or string referencing another context

    Returns:
        Self for chaining
    """
    self._contexts[context] = columns
    return self

VisibleForeignKeys dataclass

Bases: AnnotationBuilder

Visible-foreign-keys annotation builder.

Controls which related tables appear in the UI via inbound foreign keys.

Example

vfk = VisibleForeignKeys() vfk.detailed([ ... fk_constraint("domain", "Image_Subject_fkey"), ... fk_constraint("domain", "Diagnosis_Subject_fkey") ... ])

Source code in src/deriva_ml/model/annotations.py
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
@dataclass
class VisibleForeignKeys(AnnotationBuilder):
    """Visible-foreign-keys annotation builder.

    Controls which related tables appear in the UI via inbound foreign keys.

    Example:
        >>> vfk = VisibleForeignKeys()
        >>> vfk.detailed([
        ...     fk_constraint("domain", "Image_Subject_fkey"),
        ...     fk_constraint("domain", "Diagnosis_Subject_fkey")
        ... ])
    """
    tag = TAG_VISIBLE_FOREIGN_KEYS

    _contexts: dict[str, list[ForeignKeyEntry] | str] = field(default_factory=dict)

    def set_context(
        self,
        context: str,
        foreign_keys: list[ForeignKeyEntry] | str
    ) -> "VisibleForeignKeys":
        """Set foreign keys for a context."""
        self._contexts[context] = foreign_keys
        return self

    def detailed(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
        """Set foreign keys for detailed view."""
        return self.set_context(CONTEXT_DETAILED, foreign_keys)

    def default(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
        """Set default foreign keys for all contexts."""
        return self.set_context(CONTEXT_DEFAULT, foreign_keys)

    def to_dict(self) -> dict[str, Any]:
        result = {}
        for context, fkeys in self._contexts.items():
            if isinstance(fkeys, str):
                result[context] = fkeys
            else:
                result[context] = [
                    fk.to_dict() if isinstance(fk, PseudoColumn) else fk
                    for fk in fkeys
                ]
        return result

default

default(
    foreign_keys: list[ForeignKeyEntry],
) -> "VisibleForeignKeys"

Set default foreign keys for all contexts.

Source code in src/deriva_ml/model/annotations.py
897
898
899
def default(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
    """Set default foreign keys for all contexts."""
    return self.set_context(CONTEXT_DEFAULT, foreign_keys)

detailed

detailed(
    foreign_keys: list[ForeignKeyEntry],
) -> "VisibleForeignKeys"

Set foreign keys for detailed view.

Source code in src/deriva_ml/model/annotations.py
893
894
895
def detailed(self, foreign_keys: list[ForeignKeyEntry]) -> "VisibleForeignKeys":
    """Set foreign keys for detailed view."""
    return self.set_context(CONTEXT_DETAILED, foreign_keys)

set_context

set_context(
    context: str,
    foreign_keys: list[ForeignKeyEntry]
    | str,
) -> "VisibleForeignKeys"

Set foreign keys for a context.

Source code in src/deriva_ml/model/annotations.py
884
885
886
887
888
889
890
891
def set_context(
    self,
    context: str,
    foreign_keys: list[ForeignKeyEntry] | str
) -> "VisibleForeignKeys":
    """Set foreign keys for a context."""
    self._contexts[context] = foreign_keys
    return self

__getattr__

__getattr__(name: str)

Lazy import for DatabaseModel and DerivaMLDatabase.

Source code in src/deriva_ml/model/__init__.py
110
111
112
113
114
115
116
117
118
119
120
def __getattr__(name: str):
    """Lazy import for DatabaseModel and DerivaMLDatabase."""
    if name == "DatabaseModel":
        from deriva_ml.model.database import DatabaseModel

        return DatabaseModel
    if name == "DerivaMLDatabase":
        from deriva_ml.model.deriva_ml_database import DerivaMLDatabase

        return DerivaMLDatabase
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

fk_constraint

fk_constraint(
    schema: str, constraint: str
) -> list[str]

Create a foreign key constraint reference for visible-columns.

Use this in visible-columns to include a foreign key column (showing the referenced row's name/link). This is different from InboundFK/OutboundFK which are used inside PseudoColumn source paths.

Parameters:

Name Type Description Default
schema str

Schema name containing the FK constraint

required
constraint str

Foreign key constraint name

required

Returns:

Type Description
list[str]

[schema, constraint] list for use in visible-columns

Example

Include a foreign key in visible columns::

>>> vc = VisibleColumns()
>>> vc.compact([
...     "RID",
...     "Name",
...     fk_constraint("domain", "Subject_Species_fkey"),  # Shows Species
... ])

This is equivalent to the raw format::

>>> vc.compact(["RID", "Name", ["domain", "Subject_Species_fkey"]])
Source code in src/deriva_ml/model/annotations.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def fk_constraint(schema: str, constraint: str) -> list[str]:
    """Create a foreign key constraint reference for visible-columns.

    Use this in visible-columns to include a foreign key column (showing the
    referenced row's name/link). This is different from InboundFK/OutboundFK
    which are used inside PseudoColumn source paths.

    Args:
        schema: Schema name containing the FK constraint
        constraint: Foreign key constraint name

    Returns:
        [schema, constraint] list for use in visible-columns

    Example:
        Include a foreign key in visible columns::

            >>> vc = VisibleColumns()
            >>> vc.compact([
            ...     "RID",
            ...     "Name",
            ...     fk_constraint("domain", "Subject_Species_fkey"),  # Shows Species
            ... ])

        This is equivalent to the raw format::

            >>> vc.compact(["RID", "Name", ["domain", "Subject_Species_fkey"]])
    """
    return [schema, constraint]