Skip to content

DatasetBag Class

The DatasetBag class represents a downloaded dataset packaged as a BDBag. It provides methods to access dataset contents, metadata, and associated files from the local filesystem.

SQLite-backed dataset access for downloaded BDBags.

This module provides the DatasetBag class, which allows querying and navigating downloaded dataset bags using SQLite. When a dataset is downloaded from a Deriva catalog, it is stored as a BDBag (Big Data Bag) containing:

  • CSV files with table data
  • Asset files (images, documents, etc.)
  • A schema.json describing the catalog structure
  • A fetch.txt manifest of referenced files

The DatasetBag class provides a read-only interface to this data, mirroring the Dataset class API where possible. This allows code to work uniformly with both live catalog datasets and downloaded bags.

Key concepts: - DatasetBag wraps a single dataset within a downloaded bag - A bag may contain multiple datasets (nested/hierarchical) - All operations are read-only (bags are immutable snapshots) - Queries use SQLite via SQLAlchemy ORM - Table-level access (get_table_as_dict, lookup_term) is on the catalog (DerivaMLDatabase)

Typical usage

Download a dataset from a catalog

bag = ml.download_dataset_bag(dataset_spec)

List dataset members by type

members = bag.list_dataset_members(recurse=True) for image in members.get("Image", []): ... print(image["Filename"])

DatasetBag

Read-only interface to a downloaded dataset bag.

DatasetBag manages access to a materialized BDBag (Big Data Bag) that contains a snapshot of dataset data from a Deriva catalog. It provides methods for:

  • Listing dataset members and their attributes
  • Navigating dataset relationships (parents, children)
  • Accessing feature values
  • Denormalizing data across related tables

A bag may contain multiple datasets when nested datasets are involved. Each DatasetBag instance represents a single dataset within the bag - use list_dataset_children() to navigate to nested datasets.

For catalog-level operations like querying arbitrary tables or looking up vocabulary terms, use the DerivaMLDatabase class instead.

The class implements the DatasetLike protocol, providing the same read interface as the Dataset class. This allows code to work with both live catalogs and downloaded bags interchangeably.

Attributes:

Name Type Description
dataset_rid RID

The unique Resource Identifier for this dataset.

dataset_types list[str]

List of vocabulary terms describing the dataset type.

description str

Human-readable description of the dataset.

execution_rid RID | None

RID of the execution associated with this dataset version, if any.

model DatabaseModel

The DatabaseModel providing SQLite access to bag data.

engine Engine

SQLAlchemy engine for database queries.

metadata MetaData

SQLAlchemy metadata with table definitions.

Example

Download a dataset

bag = dataset.download_dataset_bag(version="1.0.0")

List members by type

members = bag.list_dataset_members() for image in members.get("Image", []): ... print(f"File: {image['Filename']}")

Navigate to nested datasets

for child in bag.list_dataset_children(): ... print(f"Nested: {child.dataset_rid}")

Source code in src/deriva_ml/dataset/dataset_bag.py
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
class DatasetBag:
    """Read-only interface to a downloaded dataset bag.

    DatasetBag manages access to a materialized BDBag (Big Data Bag) that contains
    a snapshot of dataset data from a Deriva catalog. It provides methods for:

    - Listing dataset members and their attributes
    - Navigating dataset relationships (parents, children)
    - Accessing feature values
    - Denormalizing data across related tables

    A bag may contain multiple datasets when nested datasets are involved. Each
    DatasetBag instance represents a single dataset within the bag - use
    list_dataset_children() to navigate to nested datasets.

    For catalog-level operations like querying arbitrary tables or looking up
    vocabulary terms, use the DerivaMLDatabase class instead.

    The class implements the DatasetLike protocol, providing the same read interface
    as the Dataset class. This allows code to work with both live catalogs and
    downloaded bags interchangeably.

    Attributes:
        dataset_rid (RID): The unique Resource Identifier for this dataset.
        dataset_types (list[str]): List of vocabulary terms describing the dataset type.
        description (str): Human-readable description of the dataset.
        execution_rid (RID | None): RID of the execution associated with this dataset version, if any.
        model (DatabaseModel): The DatabaseModel providing SQLite access to bag data.
        engine (Engine): SQLAlchemy engine for database queries.
        metadata (MetaData): SQLAlchemy metadata with table definitions.

    Example:
        >>> # Download a dataset
        >>> bag = dataset.download_dataset_bag(version="1.0.0")
        >>> # List members by type
        >>> members = bag.list_dataset_members()
        >>> for image in members.get("Image", []):
        ...     print(f"File: {image['Filename']}")
        >>> # Navigate to nested datasets
        >>> for child in bag.list_dataset_children():
        ...     print(f"Nested: {child.dataset_rid}")
    """

    def __init__(
        self,
        catalog: "DerivaMLDatabase",
        dataset_rid: RID | None = None,
        dataset_types: str | list[str] | None = None,
        description: str = "",
        execution_rid: RID | None = None,
    ):
        """Initialize a DatasetBag instance for a dataset within a downloaded bag.

        This mirrors the Dataset class initialization pattern, where both classes
        take a catalog-like object as their first argument for consistency.

        Args:
            catalog: The DerivaMLDatabase instance providing access to the bag's data.
                This implements the DerivaMLCatalog protocol.
            dataset_rid: The RID of the dataset to wrap. If None, uses the primary
                dataset RID from the bag.
            dataset_types: One or more dataset type terms. Can be a single string
                or list of strings.
            description: Human-readable description of the dataset.
            execution_rid: RID of the execution associated with this dataset version.
                If None, will be looked up from the Dataset_Version table.

        Raises:
            DerivaMLException: If no dataset_rid is provided and none can be
                determined from the bag, or if the RID doesn't exist in the bag.
        """
        # Store reference to the catalog and extract the underlying model
        self._catalog = catalog
        self.model = catalog.model
        self.engine = cast(Engine, self.model.engine)
        self.metadata = self.model.metadata

        # Use provided RID or fall back to the bag's primary dataset
        self.dataset_rid = dataset_rid or self.model.dataset_rid
        self.description = description
        self.execution_rid = execution_rid or (
            self.model._get_dataset_execution(self.dataset_rid) or {}
        ).get("Execution")

        # Normalize dataset_types to always be a list of strings for consistency
        # with the Dataset class interface
        if dataset_types is None:
            self.dataset_types: list[str] = []
        elif isinstance(dataset_types, str):
            self.dataset_types: list[str] = [dataset_types]
        else:
            self.dataset_types: list[str] = list(dataset_types)

        if not self.dataset_rid:
            raise DerivaMLException("No dataset RID provided")

        # Validate that this dataset exists in the bag
        self.model.rid_lookup(self.dataset_rid)

        # Cache the version and dataset table reference
        self._current_version = self.model.dataset_version(self.dataset_rid)
        self._dataset_table = self.model.dataset_table

    def __repr__(self) -> str:
        """Return a string representation of the DatasetBag for debugging."""
        return (f"<deriva_ml.DatasetBag object at {hex(id(self))}: rid='{self.dataset_rid}', "
                f"version='{self.current_version}', types={self.dataset_types}>")

    @property
    def current_version(self) -> DatasetVersion:
        """Get the version of the dataset at the time the bag was downloaded.

        For a DatasetBag, this is the version that was current when the bag was
        created. Unlike the live Dataset class, this value is immutable since
        bags are read-only snapshots.

        Returns:
            DatasetVersion: The semantic version (major.minor.patch) of this dataset.
        """
        return self._current_version

    def list_tables(self) -> list[str]:
        """List all tables available in the bag's SQLite database.

        Returns the fully-qualified names of all tables (e.g., "domain.Image",
        "deriva-ml.Dataset") that were exported in this bag.

        Returns:
            list[str]: Table names in "schema.table" format, sorted alphabetically.
        """
        return self.model.list_tables()

    def get_table_as_dict(self, table: str) -> Generator[dict[str, Any], None, None]:
        """Get table contents as dictionaries.

        Convenience method that delegates to the underlying catalog. This provides
        access to all rows in a table, not just those belonging to this dataset.
        For dataset-filtered results, use list_dataset_members() instead.

        Args:
            table: Name of the table to retrieve (e.g., "Subject", "Image").

        Yields:
            dict: Dictionary for each row in the table.

        Example:
            >>> for subject in bag.get_table_as_dict("Subject"):
            ...     print(subject["Name"])
        """
        return self._catalog.get_table_as_dict(table)

    def get_table_as_dataframe(self, table: str) -> pd.DataFrame:
        """Get table contents as a pandas DataFrame.

        Convenience method that wraps get_table_as_dict() to return a DataFrame.
        Provides access to all rows in a table, not just those belonging to this
        dataset. For dataset-filtered results, use list_dataset_members() instead.

        Args:
            table: Name of the table to retrieve (e.g., "Subject", "Image").

        Returns:
            DataFrame with one row per record in the table.

        Example:
            >>> df = bag.get_table_as_dataframe("Image")
            >>> print(df.shape)
        """
        return pd.DataFrame(self.get_table_as_dict(table))

    @staticmethod
    def _find_relationship_attr(source, target):
        """Find the SQLAlchemy relationship attribute connecting two ORM classes.

        Searches for a relationship on `source` that points to `target`, which is
        needed to construct proper JOIN clauses in SQL queries.

        Args:
            source: Source ORM class or AliasedClass.
            target: Target ORM class or AliasedClass.

        Returns:
            InstrumentedAttribute: The relationship attribute on source pointing to target.

        Raises:
            LookupError: If no relationship exists between the two classes.

        Note:
            When multiple relationships exist, prefers MANYTOONE direction as this
            is typically the more natural join direction for denormalization.
        """
        src_mapper = inspect(source).mapper
        tgt_mapper = inspect(target).mapper

        # Collect all relationships on the source mapper that point to target
        candidates: list[RelationshipProperty] = [rel for rel in src_mapper.relationships if rel.mapper is tgt_mapper]

        if not candidates:
            raise LookupError(f"No relationship from {src_mapper.class_.__name__}{tgt_mapper.class_.__name__}")

        # Prefer MANYTOONE when multiple paths exist (often best for joins)
        candidates.sort(key=lambda r: r.direction.name != "MANYTOONE")
        rel = candidates[0]

        # Return the bound attribute (handles AliasedClass properly)
        return getattr(source, rel.key) if isinstance(source, AliasedClass) else rel.class_attribute

    def _dataset_table_view(self, table: str) -> CompoundSelect[Any]:
        """Build a SQL query for all rows in a table that belong to this dataset.

        Creates a UNION of queries that traverse all possible paths from the
        Dataset table to the target table, filtering by this dataset's RID
        (and any nested dataset RIDs).

        This is necessary because table data may be linked to datasets through
        different relationship paths (e.g., Image might be linked directly to
        Dataset or through an intermediate Subject table).

        Args:
            table: Name of the table to query.

        Returns:
            CompoundSelect: A SQLAlchemy UNION query selecting all matching rows.
        """
        table_class = self.model.get_orm_class_by_name(table)
        dataset_table_class = self.model.get_orm_class_by_name(self._dataset_table.name)

        # Include this dataset and all nested datasets in the query
        dataset_rids = [self.dataset_rid] + [c.dataset_rid for c in self.list_dataset_children(recurse=True)]

        # Find all paths from Dataset to the target table
        paths = [[t.name for t in p] for p in self.model._schema_to_paths() if p[-1].name == table]

        # Build a SELECT query for each path and UNION them together
        sql_cmds = []
        for path in paths:
            path_sql = select(table_class)
            last_class = self.model.get_orm_class_by_name(path[0])
            # Join through each table in the path
            for t in path[1:]:
                t_class = self.model.get_orm_class_by_name(t)
                path_sql = path_sql.join(self._find_relationship_attr(last_class, t_class))
                last_class = t_class
            # Filter to only rows belonging to our dataset(s)
            path_sql = path_sql.where(dataset_table_class.RID.in_(dataset_rids))
            sql_cmds.append(path_sql)
        return union(*sql_cmds)

    def dataset_history(self) -> list[DatasetHistory]:
        """Retrieves the version history of a dataset.

        Returns a chronological list of dataset versions, including their version numbers,
        creation times, and associated metadata.

        Returns:
            list[DatasetHistory]: List of history entries, each containing:
                - dataset_version: Version number (major.minor.patch)
                - minid: Minimal Viable Identifier
                - snapshot: Catalog snapshot time
                - dataset_rid: Dataset Resource Identifier
                - version_rid: Version Resource Identifier
                - description: Version description
                - execution_rid: Associated execution RID

        Raises:
            DerivaMLException: If dataset_rid is not a valid dataset RID.

        Example:
            >>> history = ml.dataset_history("1-abc123")
            >>> for entry in history:
            ...     print(f"Version {entry.dataset_version}: {entry.description}")
        """
        # Query Dataset_Version table directly via the model
        return [
            DatasetHistory(
                dataset_version=DatasetVersion.parse(v["Version"]),
                minid=v["Minid"],
                snapshot=v["Snapshot"],
                dataset_rid=self.dataset_rid,
                version_rid=v["RID"],
                description=v["Description"],
                execution_rid=v["Execution"],
            )
            for v in self.model._get_table_contents("Dataset_Version")
            if v["Dataset"] == self.dataset_rid
        ]

    def list_dataset_members(
        self,
        recurse: bool = False,
        limit: int | None = None,
        _visited: set[RID] | None = None,
        version: Any = None,
        **kwargs: Any,
    ) -> dict[str, list[dict[str, Any]]]:
        """Return a list of entities associated with a specific dataset.

        Args:
            recurse: Whether to include members of nested datasets.
            limit: Maximum number of members to return per type. None for no limit.
            _visited: Internal parameter to track visited datasets and prevent infinite recursion.
            version: Ignored (bags are immutable snapshots).
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Returns:
            Dictionary mapping member types to lists of member records.
        """
        # Initialize visited set for recursion guard
        if _visited is None:
            _visited = set()

        # Prevent infinite recursion by checking if we've already visited this dataset
        if self.dataset_rid in _visited:
            return {}
        _visited.add(self.dataset_rid)

        # Look at each of the element types that might be in the _dataset_table and get the list of rid for them from
        # the appropriate association table.
        members = defaultdict(list)

        dataset_class = self.model.get_orm_class_for_table(self._dataset_table)
        for element_table in self.model.list_dataset_element_types():
            element_class = self.model.get_orm_class_for_table(element_table)

            assoc_class, dataset_rel, element_rel = self.model.get_orm_association_class(dataset_class, element_class)

            element_table = inspect(element_class).mapped_table
            if not self.model.is_domain_schema(element_table.schema) and element_table.name not in ["Dataset", "File"]:
                # Look at domain tables and nested datasets.
                continue

            # Get the names of the columns that we are going to need for linking
            with Session(self.engine) as session:
                # For Dataset_Dataset, use Nested_Dataset column to find nested datasets
                # (similar to how the live catalog does it in Dataset.list_dataset_members)
                if element_table.name == "Dataset":
                    sql_cmd = (
                        select(element_class)
                        .join(assoc_class, element_class.RID == assoc_class.__table__.c["Nested_Dataset"])
                        .where(self.dataset_rid == assoc_class.__table__.c["Dataset"])
                    )
                else:
                    # For other tables, use the original join via element_rel
                    sql_cmd = (
                        select(element_class)
                        .join(element_rel)
                        .where(self.dataset_rid == assoc_class.__table__.c["Dataset"])
                    )
                if limit is not None:
                    sql_cmd = sql_cmd.limit(limit)
                # Get back the list of ORM entities and convert them to dictionaries.
                element_entities = session.scalars(sql_cmd).all()
                element_rows = [{c.key: getattr(obj, c.key) for c in obj.__table__.columns} for obj in element_entities]
            members[element_table.name].extend(element_rows)
            if recurse and (element_table.name == self._dataset_table.name):
                # Get the members for all the nested datasets and add to the member list.
                nested_datasets = [d["RID"] for d in element_rows]
                for ds in nested_datasets:
                    nested_dataset = self._catalog.lookup_dataset(ds)
                    for k, v in nested_dataset.list_dataset_members(recurse=recurse, limit=limit, _visited=_visited).items():
                        members[k].extend(v)
        return dict(members)

    def find_features(self, table: str | Table) -> Iterable[Feature]:
        """Find all features defined on a table within this dataset bag.

        Features are measurable properties associated with records in a table,
        stored as association tables linking the target table to vocabulary
        terms, assets, or metadata columns. This method discovers all such
        feature definitions for the given table.

        Each returned ``Feature`` object provides:

        - ``feature_name``: The feature's name (e.g., ``"Classification"``)
        - ``target_table``: The table the feature applies to
        - ``feature_table``: The association table storing feature values
        - ``term_columns``, ``asset_columns``, ``value_columns``: Column role sets
        - ``feature_record_class()``: A Pydantic model for reading/writing values

        Args:
            table: The table to find features for (name or Table object).

        Returns:
            An iterable of Feature instances describing each feature
            defined on the table.

        Example:
            >>> for f in bag.find_features("Image"):
            ...     print(f"{f.feature_name}: {len(f.term_columns)} terms, "
            ...           f"{len(f.value_columns)} value columns")
        """
        return self.model.find_features(table)

    def fetch_table_features(
        self,
        table: Table | str,
        feature_name: str | None = None,
        selector: Callable[[list[FeatureRecord]], FeatureRecord] | None = None,
    ) -> dict[str, list[FeatureRecord]]:
        """Fetch all feature values for a table, grouped by feature name.

        Queries the local SQLite database within this dataset bag and returns
        a dictionary mapping feature names to lists of FeatureRecord instances.
        This is useful for retrieving all annotations on a table in a single
        call — for example, getting all classification labels, quality scores,
        and bounding boxes for a set of images at once.

        **Selector for resolving multiple values:**

        An asset may have multiple values for the same feature — for example,
        labels from different annotators or model runs. When a ``selector`` is
        provided, records are grouped by target RID and the selector is called
        once per group to pick a single value. Groups with only one record
        are passed through unchanged.

        A selector is any callable with signature
        ``(list[FeatureRecord]) -> FeatureRecord``. Built-in selectors:

        - ``FeatureRecord.select_newest`` — picks the record with the most
          recent ``RCT`` (Row Creation Time).

        Custom selector example::

            def select_highest_confidence(records):
                return max(records, key=lambda r: getattr(r, "Confidence", 0))

        Args:
            table: The table to fetch features for (name or Table object).
            feature_name: If provided, only fetch values for this specific
                feature. If ``None``, fetch all features on the table.
            selector: Optional function to select among multiple feature values
                for the same target object. Receives a list of FeatureRecord
                instances (all for the same target RID) and returns the selected
                one.

        Returns:
            dict[str, list[FeatureRecord]]: Keys are feature names, values are
            lists of FeatureRecord instances. When a selector is provided, each
            target object appears at most once per feature.

        Raises:
            DerivaMLException: If a specified ``feature_name`` doesn't exist
                on the table.

        Examples:
            Fetch all features for a table::

                >>> features = bag.fetch_table_features("Image")
                >>> for name, records in features.items():
                ...     print(f"{name}: {len(records)} values")

            Fetch a single feature with newest-value selection::

                >>> features = bag.fetch_table_features(
                ...     "Image",
                ...     feature_name="Classification",
                ...     selector=FeatureRecord.select_newest,
                ... )

            Convert results to a DataFrame::

                >>> features = bag.fetch_table_features("Image", feature_name="Quality")
                >>> import pandas as pd
                >>> df = pd.DataFrame([r.model_dump() for r in features["Quality"]])
        """
        features = list(self.find_features(table))
        if feature_name is not None:
            features = [f for f in features if f.feature_name == feature_name]
            if not features:
                table_name = table if isinstance(table, str) else table.name
                raise DerivaMLException(
                    f"Feature '{feature_name}' not found on table '{table_name}'."
                )

        result: dict[str, list[FeatureRecord]] = {}

        for feat in features:
            record_class = feat.feature_record_class()
            field_names = set(record_class.model_fields.keys())
            target_col = feat.target_table.name

            # Query raw values from SQLite
            feature_table = self.model.find_table(feat.feature_table.name)
            with Session(self.engine) as session:
                sql_cmd = select(feature_table)
                sql_result = session.execute(sql_cmd)
                rows = [dict(row._mapping) for row in sql_result]

            records: list[FeatureRecord] = []
            for raw_value in rows:
                filtered_data = {k: v for k, v in raw_value.items() if k in field_names}
                records.append(record_class(**filtered_data))

            if selector and records:
                # Group by target RID and apply selector
                grouped: dict[str, list[FeatureRecord]] = defaultdict(list)
                for rec in records:
                    target_rid = getattr(rec, target_col, None)
                    if target_rid is not None:
                        grouped[target_rid].append(rec)
                records = [
                    selector(group) if len(group) > 1 else group[0]
                    for group in grouped.values()
                ]

            result[feat.feature_name] = records

        return result

    def list_feature_values(
        self,
        table: Table | str,
        feature_name: str,
        selector: Callable[[list[FeatureRecord]], FeatureRecord] | None = None,
    ) -> Iterable[FeatureRecord]:
        """Retrieve all values for a single feature as typed FeatureRecord instances.

        Convenience wrapper around ``fetch_table_features()`` for the common
        case of querying a single feature by name. Returns a flat list of
        FeatureRecord objects — one per feature value (or one per target object
        when a ``selector`` is provided).

        Each returned record is a dynamically-generated Pydantic model with
        typed fields matching the feature's definition. For example, an
        ``Image_Classification`` feature might produce records with fields
        ``Image`` (str), ``Image_Class`` (str), ``Execution`` (str),
        ``RCT`` (str), and ``Feature_Name`` (str).

        Args:
            table: The table the feature is defined on (name or Table object).
            feature_name: Name of the feature to retrieve values for.
            selector: Optional function to resolve multiple values per target.
                See ``fetch_table_features`` for details on how selectors work.
                Use ``FeatureRecord.select_newest`` to pick the most recently
                created value.

        Returns:
            Iterable[FeatureRecord]: FeatureRecord instances with:

            - ``Execution``: RID of the execution that created this value
            - ``Feature_Name``: Name of the feature
            - ``RCT``: Row Creation Time (ISO 8601 timestamp)
            - Feature-specific columns as typed attributes (vocabulary terms,
              asset references, or value columns depending on the feature)
            - ``model_dump()``: Convert to a dictionary

        Raises:
            DerivaMLException: If the feature doesn't exist on the table.

        Examples:
            Get typed feature records::

                >>> for record in bag.list_feature_values("Image", "Quality"):
                ...     print(f"Image {record.Image}: {record.ImageQuality}")
                ...     print(f"Created by execution: {record.Execution}")

            Select newest when multiple values exist::

                >>> records = list(bag.list_feature_values(
                ...     "Image", "Quality",
                ...     selector=FeatureRecord.select_newest,
                ... ))

            Convert to a list of dicts::

                >>> dicts = [r.model_dump() for r in
                ...          bag.list_feature_values("Image", "Classification")]
        """
        result = self.fetch_table_features(table, feature_name=feature_name, selector=selector)
        return result.get(feature_name, [])

    def list_dataset_element_types(self) -> Iterable[Table]:
        """List the types of elements that can be contained in datasets.

        This method analyzes the dataset and identifies the data types for all
        elements within it. It is useful for understanding the structure and
        content of the dataset and allows for better manipulation and usage of its
        data.

        Returns:
            list[str]: A list of strings where each string represents a data type
            of an element found in the dataset.

        """
        return self.model.list_dataset_element_types()

    def list_dataset_children(
        self,
        recurse: bool = False,
        _visited: set[RID] | None = None,
        version: Any = None,
        **kwargs: Any,
    ) -> list[Self]:
        """Get nested datasets.

        Args:
            recurse: Whether to include children of children.
            _visited: Internal parameter to track visited datasets and prevent infinite recursion.
            version: Ignored (bags are immutable snapshots).
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Returns:
            List of child dataset bags.
        """
        # Initialize visited set for recursion guard
        if _visited is None:
            _visited = set()

        # Prevent infinite recursion by checking if we've already visited this dataset
        if self.dataset_rid in _visited:
            return []
        _visited.add(self.dataset_rid)

        ds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset")
        nds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Dataset")
        dv_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Version")

        with Session(self.engine) as session:
            sql_cmd = (
                select(nds_table.Nested_Dataset, dv_table.Version)
                .join_from(ds_table, nds_table, onclause=ds_table.RID == nds_table.Nested_Dataset)
                .join_from(ds_table, dv_table, onclause=ds_table.Version == dv_table.RID)
                .where(nds_table.Dataset == self.dataset_rid)
            )
            nested = [self._catalog.lookup_dataset(r[0]) for r in session.execute(sql_cmd).all()]

        result = copy(nested)
        if recurse:
            for child in nested:
                result.extend(child.list_dataset_children(recurse=recurse, _visited=_visited))
        return result

    def list_dataset_parents(
        self,
        recurse: bool = False,
        _visited: set[RID] | None = None,
        version: Any = None,
        **kwargs: Any,
    ) -> list[Self]:
        """Given a dataset_table RID, return a list of RIDs of the parent datasets if this is included in a
        nested dataset.

        Args:
            recurse: If True, recursively return all ancestor datasets.
            _visited: Internal parameter to track visited datasets and prevent infinite recursion.
            version: Ignored (bags are immutable snapshots).
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Returns:
            List of parent dataset bags.
        """
        # Initialize visited set for recursion guard
        if _visited is None:
            _visited = set()

        # Prevent infinite recursion by checking if we've already visited this dataset
        if self.dataset_rid in _visited:
            return []
        _visited.add(self.dataset_rid)

        nds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Dataset")

        with Session(self.engine) as session:
            sql_cmd = select(nds_table.Dataset).where(nds_table.Nested_Dataset == self.dataset_rid)
            parents = [self._catalog.lookup_dataset(r[0]) for r in session.execute(sql_cmd).all()]

        if recurse:
            for parent in parents.copy():
                parents.extend(parent.list_dataset_parents(recurse=True, _visited=_visited))
        return parents

    def list_executions(self) -> list[RID]:
        """List all execution RIDs associated with this dataset.

        Returns all executions that used this dataset as input. This is
        tracked through the Dataset_Execution association table.

        Note:
            Unlike the live Dataset class which returns Execution objects,
            DatasetBag returns a list of execution RIDs since the bag is
            an offline snapshot and cannot look up live execution objects.

        Returns:
            List of execution RIDs associated with this dataset.

        Example:
            >>> bag = ml.download_dataset_bag(dataset_spec)
            >>> execution_rids = bag.list_executions()
            >>> for rid in execution_rids:
            ...     print(f"Associated execution: {rid}")
        """
        de_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Execution")

        with Session(self.engine) as session:
            sql_cmd = select(de_table.Execution).where(de_table.Dataset == self.dataset_rid)
            return [r[0] for r in session.execute(sql_cmd).all()]

    def _denormalize(self, include_tables: list[str]) -> Select:
        """Build a SQL query that joins multiple tables into a denormalized view.

        This method creates a "wide table" by joining related tables together,
        producing a single query that returns columns from all specified tables.
        This is useful for machine learning pipelines that need flat data.

        The method:
        1. Analyzes the schema to find join paths between tables
        2. Determines the correct join order based on foreign key relationships
        3. Builds SELECT statements with properly aliased columns
        4. Creates a UNION if multiple paths exist to the same tables

        Args:
            include_tables: List of table names to include in the output. Additional
                tables may be included if they're needed to join the requested tables.

        Returns:
            Select: A SQLAlchemy query that produces the denormalized result.

        Note:
            Column names in the result are prefixed with the table name to avoid
            collisions (e.g., "Image.Filename", "Subject.RID").
        """
        # Skip over tables that we don't want to include in the denormalized dataset.
        # Also, strip off the Dataset/Dataset_X part of the path so we don't include dataset columns in the denormalized
        # table.

        def build_join_on_clause(table_name, join_condition_pairs):
            """Build a SQLAlchemy ON clause from join condition column pairs.

            For simple FKs: single equality condition.
            For composite FKs: AND of multiple equality conditions.

            Each ``(fk_col, pk_col)`` pair comes from ``_table_relationship``
            which always returns the FK column first and the PK column second.
            We use the column objects' own ``.table.name`` to find the correct
            ORM classes -- this is more robust than relying on sequential path
            order, which breaks with branching join trees.

            Args:
                table_name: Name of the table being joined (the target).
                join_condition_pairs: Set of (fk_col, pk_col) Column pairs from
                    _table_relationship(). Each pair represents one column in the FK.

            Returns:
                SQLAlchemy AND clause for use as join onclause.
            """
            conditions = []
            for fk_col, pk_col in join_condition_pairs:
                # Use the FK column's table info to get the correct ORM class
                fk_table_name = fk_col.table.name if hasattr(fk_col.table, 'name') else str(fk_col.table)
                pk_table_name = pk_col.table.name if hasattr(pk_col.table, 'name') else str(pk_col.table)
                fk_class = self.model.get_orm_class_by_name(fk_table_name)
                pk_class = self.model.get_orm_class_by_name(pk_table_name)
                left = fk_class.__table__.columns[fk_col.name]
                right = pk_class.__table__.columns[pk_col.name]
                conditions.append(left == right)
            return and_(*conditions)

        from deriva_ml.model.catalog import denormalize_column_name

        join_tables, column_specs, multi_schema = self.model._prepare_wide_table(
            self, self.dataset_rid, include_tables
        )

        denormalized_columns = [
            self.model.get_orm_class_by_name(table_name)
            .__table__.columns[column_name]
            .label(denormalize_column_name(schema_name, table_name, column_name, multi_schema))
            for schema_name, table_name, column_name, _type_name in column_specs
        ]
        sql_statements = []
        for key, (path, join_conditions, join_types) in join_tables.items():
            sql_statement = select(*denormalized_columns).select_from(
                self.model.get_orm_class_for_table(self._dataset_table)
            )
            for table_name in path[1:]:  # Skip over dataset table
                if table_name not in join_conditions:
                    continue  # No join condition — skip (not connected)
                on_clause = build_join_on_clause(
                    table_name, join_conditions[table_name]
                )
                table_class = self.model.get_orm_class_by_name(table_name)
                # Use LEFT OUTER JOIN for nullable FK columns to preserve all
                # rows from the left side (e.g., Images with null Observation FK).
                if join_types.get(table_name) == "left":
                    sql_statement = sql_statement.outerjoin(table_class, onclause=on_clause)
                else:
                    sql_statement = sql_statement.join(table_class, onclause=on_clause)
            dataset_rid_list = [self.dataset_rid] + [c.dataset_rid for c in self.list_dataset_children(recurse=True)]
            dataset_class = self.model.get_orm_class_by_name(self._dataset_table.name)
            sql_statement = sql_statement.where(dataset_class.RID.in_(dataset_rid_list))
            sql_statements.append(sql_statement)
        if not sql_statements:
            # No join paths found — return empty result with correct columns.
            if denormalized_columns:
                return select(*denormalized_columns).where(literal(False))
            return select(literal(1)).where(literal(False))
        return union(*sql_statements)

    def denormalize_as_dataframe(
        self,
        include_tables: list[str],
        version: Any = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Denormalize the dataset bag into a single wide table (DataFrame).

        Denormalization transforms normalized relational data into a single "wide table"
        (also called a "flat table" or "denormalized table") by joining related tables
        together. This produces a DataFrame where each row contains all related information
        from multiple source tables, with columns from each table combined side-by-side.

        Wide tables are the standard input format for most machine learning frameworks,
        which expect all features for a single observation to be in one row. This method
        bridges the gap between normalized database schemas and ML-ready tabular data.

        **How it works:**

        Tables are joined based on their foreign key relationships stored in the bag's
        schema. For example, if Image has a foreign key to Subject, denormalizing
        ["Subject", "Image"] produces rows where each image appears with its subject's
        metadata.

        **Column naming:**

        Column names are prefixed with the source table name using dots to avoid
        collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the
        live Dataset class which uses underscores.

        Args:
            include_tables: List of table names to include in the output. Tables
                are joined based on their foreign key relationships.
                Order doesn't matter - the join order is determined automatically.
            version: Ignored (bags are immutable snapshots of a specific version).
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Returns:
            pd.DataFrame: Wide table with columns from all included tables.

        Example:
            Create a training dataset from a downloaded bag::

                >>> # Download and materialize the dataset
                >>> bag = ml.download_dataset_bag(spec, materialize=True)

                >>> # Denormalize into a wide table
                >>> df = bag.denormalize_as_dataframe(["Image", "Diagnosis"])
                >>> print(df.columns.tolist())
                ['Image.RID', 'Image.Filename', 'Image.URL', 'Diagnosis.RID',
                 'Diagnosis.Label', 'Diagnosis.Confidence']

                >>> # Access local file paths for images
                >>> for _, row in df.iterrows():
                ...     local_path = bag.get_asset_path("Image", row["Image.RID"])
                ...     label = row["Diagnosis.Label"]
                ...     # Train on local_path with label

        See Also:
            denormalize_as_dict: Generator version for memory-efficient processing.
        """
        sql_stmt = self._denormalize(include_tables=include_tables)
        with Session(self.engine) as session:
            result = session.execute(sql_stmt)
            rows = [dict(row._mapping) for row in result]
        return pd.DataFrame(rows)

    def denormalize_as_dict(
        self,
        include_tables: list[str],
        version: Any = None,
        **kwargs: Any,
    ) -> Generator[dict[str, Any], None, None]:
        """Denormalize the dataset bag and yield rows as dictionaries.

        This is a memory-efficient alternative to denormalize_as_dataframe() that
        yields one row at a time as a dictionary instead of loading all data into
        a DataFrame. Use this when processing large datasets that may not fit in
        memory, or when you want to process rows incrementally.

        Like denormalize_as_dataframe(), this produces a "wide table" representation
        where each yielded dictionary contains all columns from the joined tables.
        See denormalize_as_dataframe() for detailed explanation of how denormalization
        works.

        **Column naming:**

        Column names are prefixed with the source table name using dots to avoid
        collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the
        live Dataset class which uses underscores.

        Args:
            include_tables: List of table names to include in the output.
                Tables are joined based on their foreign key relationships.
            version: Ignored (bags are immutable snapshots of a specific version).
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Yields:
            dict[str, Any]: Dictionary representing one row of the wide table.
                Keys are column names in "Table.Column" format.

        Example:
            Stream through a large dataset for training::

                >>> bag = ml.download_dataset_bag(spec, materialize=True)
                >>> for row in bag.denormalize_as_dict(["Image", "Diagnosis"]):
                ...     # Get local file path for this image
                ...     local_path = bag.get_asset_path("Image", row["Image.RID"])
                ...     label = row["Diagnosis.Label"]
                ...     # Process image and label...

            Build a PyTorch dataset efficiently::

                >>> class BagDataset(torch.utils.data.IterableDataset):
                ...     def __init__(self, bag, tables):
                ...         self.bag = bag
                ...         self.tables = tables
                ...     def __iter__(self):
                ...         for row in self.bag.denormalize_as_dict(self.tables):
                ...             img_path = self.bag.get_asset_path("Image", row["Image.RID"])
                ...             yield load_image(img_path), row["Diagnosis.Label"]

        See Also:
            denormalize_as_dataframe: Returns all data as a pandas DataFrame.
        """
        sql_stmt = self._denormalize(include_tables=include_tables)
        with Session(self.engine) as session:
            result = session.execute(sql_stmt)
            for row in result:
                yield dict(row._mapping)

    # SQLAlchemy type name → ermrest type name mapping.
    _SQLALCHEMY_TO_ERMREST: dict[str, str] = {
        "TEXT": "text",
        "VARCHAR": "text",
        "STRING": "text",
        "INTEGER": "int4",
        "BIGINT": "int8",
        "SMALLINT": "int2",
        "FLOAT": "float8",
        "REAL": "float4",
        "NUMERIC": "float8",
        "BOOLEAN": "boolean",
        "DATE": "date",
        "DATETIME": "timestamptz",
        "TIMESTAMP": "timestamptz",
        "BLOB": "bytea",
    }

    def denormalize_columns(
        self,
        include_tables: list[str],
        **kwargs: Any,
    ) -> list[tuple[str, str]]:
        """Return the columns that denormalize would produce, without fetching data.

        Performs the same validation as :meth:`denormalize_as_dataframe` (table existence,
        FK path resolution, ambiguity detection) but stops before executing any data
        queries.

        Args:
            include_tables: List of table names to include.
            **kwargs: Additional arguments (ignored, for protocol compatibility).

        Returns:
            List of ``(column_name, column_type)`` tuples using dot notation.
            Type strings use ermrest type names (``text``, ``int4``, etc.).

        Example:
            >>> cols = bag.denormalize_columns(["Image", "Subject"])
            >>> for name, dtype in cols:
            ...     print(f"  {name}: {dtype}")
            Image.RID: ermrest_rid
            Image.Filename: text
            Subject.RID: ermrest_rid
            Subject.Name: text
        """
        from deriva_ml.model.catalog import denormalize_column_name

        _, column_specs, multi_schema = self.model._prepare_wide_table(
            self, self.dataset_rid, list(include_tables)
        )

        result = []
        for schema_name, table_name, col_name, type_name in column_specs:
            prefixed = denormalize_column_name(
                schema_name, table_name, col_name, multi_schema
            )
            # _prepare_wide_table now returns ermrest type names directly,
            # so no mapping needed.
            result.append((prefixed, type_name))
        return result


    # =========================================================================
    # Asset Restructuring Methods
    # =========================================================================

    def _build_dataset_type_path_map(
        self,
        type_selector: Callable[[list[str]], str] | None = None,
    ) -> dict[RID, list[str]]:
        """Build a mapping from dataset RID to its type path in the hierarchy.

        Recursively traverses nested datasets to create a mapping where each
        dataset RID maps to its hierarchical type path (e.g., ["complete", "training"]).

        Args:
            type_selector: Function to select type when dataset has multiple types.
                Receives list of type names, returns selected type name.
                Defaults to selecting first type or "unknown" if no types.

        Returns:
            Dictionary mapping dataset RID to list of type names from root to leaf.
            e.g., {"4-ABC": ["complete", "training"], "4-DEF": ["complete", "testing"]}
        """
        if type_selector is None:
            type_selector = lambda types: types[0] if types else "Testing"

        type_paths: dict[RID, list[str]] = {}

        def traverse(dataset: DatasetBag, parent_path: list[str], visited: set[RID]) -> None:
            if dataset.dataset_rid in visited:
                return
            visited.add(dataset.dataset_rid)

            current_type = type_selector(dataset.dataset_types)
            # None means this dataset's type is structural/container (e.g. "Split")
            # and should not contribute a path component — traverse children
            # with the same parent_path so they get clean paths.
            if current_type is None:
                current_path = parent_path
            else:
                current_path = parent_path + [current_type]
            type_paths[dataset.dataset_rid] = current_path

            for child in dataset.list_dataset_children():
                traverse(child, current_path, visited)

        traverse(self, [], set())
        return type_paths

    def _get_asset_dataset_mapping(self, asset_table: str) -> dict[RID, RID]:
        """Map asset RIDs to their containing dataset RID.

        For each asset in the specified table, determines which dataset it belongs to.
        This uses _dataset_table_view to find assets reachable through any FK path
        from the dataset, not just directly associated assets.

        Assets are mapped to their most specific (leaf) dataset in the hierarchy.
        For example, if a Split dataset contains Training and Testing children,
        and images are members of Training, the images map to Training (not Split).

        Args:
            asset_table: Name of the asset table (e.g., "Image")

        Returns:
            Dictionary mapping asset RID to the dataset RID that contains it.
        """
        asset_to_dataset: dict[RID, RID] = {}

        def collect_from_dataset(dataset: DatasetBag, visited: set[RID]) -> None:
            if dataset.dataset_rid in visited:
                return
            visited.add(dataset.dataset_rid)

            # Process children FIRST (depth-first) so leaf datasets get priority
            # This ensures assets are mapped to their most specific dataset
            for child in dataset.list_dataset_children():
                collect_from_dataset(child, visited)

            # Then process this dataset's assets
            # Only set if not already mapped (child/leaf dataset wins)
            for asset in dataset._get_reachable_assets(asset_table):
                if asset["RID"] not in asset_to_dataset:
                    asset_to_dataset[asset["RID"]] = dataset.dataset_rid

        collect_from_dataset(self, set())
        return asset_to_dataset

    def _get_reachable_assets(self, asset_table: str) -> list[dict[str, Any]]:
        """Get all assets reachable from this dataset through any FK path.

        Unlike list_dataset_members which only returns directly associated entities,
        this method traverses foreign key relationships to find assets that are
        indirectly connected to the dataset. For example, if a dataset contains
        Subjects, and Subject -> Encounter -> Image, this method will find those
        Images even though they're not directly in the Dataset_Image association table.

        Args:
            asset_table: Name of the asset table (e.g., "Image")

        Returns:
            List of asset records as dictionaries.
        """
        # Use the _dataset_table_view query which traverses all FK paths
        sql_query = self._dataset_table_view(asset_table)

        with Session(self.engine) as session:
            result = session.execute(sql_query)
            # Convert rows to dictionaries
            rows = [dict(row._mapping) for row in result]

        return rows

    def _load_feature_values_cache(
        self,
        asset_table: str,
        group_keys: list[str],
        enforce_vocabulary: bool = True,
        value_selector: Callable | None = None,
    ) -> dict[str, dict[RID, Any]]:
        """Load feature values into a cache for efficient lookup.

        Pre-loads feature values for any group_keys that are feature names,
        organizing them by target entity RID for fast lookup.

        Args:
            asset_table: The asset table name to find features for.
            group_keys: List of potential feature names to cache. Supports two formats:
                - "FeatureName": Uses the first term column (default behavior)
                - "FeatureName.column_name": Uses the specified column from the feature table
            enforce_vocabulary: If True (default), only allow features with
                controlled vocabulary term columns and raise an error if an
                asset has multiple values. If False, allow any feature type
                and use the first value found when multiple exist.
            value_selector: Optional function to select which feature value to use
                when an asset has multiple values for the same feature. Receives a
                list of FeatureRecord objects and returns the selected one. If not
                provided and multiple values exist, raises DerivaMLException when
                enforce_vocabulary=True or uses the first value when False.

        Returns:
            Dictionary mapping group_key -> {target_rid -> feature_value}
            Only includes entries for keys that are actually features.

        Raises:
            DerivaMLException: If enforce_vocabulary is True and:
                - A feature has no term columns (not vocabulary-based), or
                - An asset has multiple different vocabulary term values for the same feature
                  and no value_selector is provided.
        """
        from deriva_ml.core.exceptions import DerivaMLException
        from deriva_ml.feature import FeatureRecord

        cache: dict[str, dict[RID, Any]] = {}
        # Store FeatureRecord objects directly for later selection
        records_cache: dict[str, dict[RID, list[FeatureRecord]]] = {}
        # Track which column to use for each group_key's value extraction
        column_for_group: dict[str, str] = {}
        logger = logging.getLogger("deriva_ml")

        # Parse group_keys to extract feature names and optional column specifications
        # Format: "FeatureName" or "FeatureName.column_name"
        feature_column_map: dict[str, str | None] = {}  # group_key -> specific column or None
        feature_names_to_check: set[str] = set()
        for key in group_keys:
            if "." in key:
                parts = key.split(".", 1)
                feature_name = parts[0]
                column_name = parts[1]
                feature_column_map[key] = column_name
                feature_names_to_check.add(feature_name)
            else:
                feature_column_map[key] = None
                feature_names_to_check.add(key)

        def process_feature(feat: Any, table_name: str, group_key: str, specific_column: str | None) -> None:
            """Process a single feature and add its values to the cache."""
            term_cols = [c.name for c in feat.term_columns]
            value_cols = [c.name for c in feat.value_columns]
            all_cols = term_cols + value_cols

            # Determine which column to use for the value
            if specific_column:
                # User specified a specific column
                if specific_column not in all_cols:
                    raise DerivaMLException(
                        f"Column '{specific_column}' not found in feature '{feat.feature_name}'. "
                        f"Available columns: {all_cols}"
                    )
                use_column = specific_column
            elif term_cols:
                # Use first term column (default behavior)
                use_column = term_cols[0]
            elif not enforce_vocabulary and value_cols:
                # Fall back to value columns if allowed
                use_column = value_cols[0]
            else:
                if enforce_vocabulary:
                    raise DerivaMLException(
                        f"Feature '{feat.feature_name}' on table '{table_name}' has no "
                        f"controlled vocabulary term columns. Only vocabulary-based features "
                        f"can be used for grouping when enforce_vocabulary=True. "
                        f"Set enforce_vocabulary=False to allow non-vocabulary features."
                    )
                return

            # Track the column used for this group_key
            column_for_group[group_key] = use_column
            records_cache[group_key] = defaultdict(list)
            feature_values = self.list_feature_values(table_name, feat.feature_name)

            for fv in feature_values:
                target_rid = getattr(fv, table_name, None)
                if target_rid is None:
                    continue

                # Check the value column is populated
                value = getattr(fv, use_column, None)
                if value is None:
                    continue

                records_cache[group_key][target_rid].append(fv)

        # Find all features on tables that this asset table references
        asset_table_obj = self.model.name_to_table(asset_table)

        # Check features on the asset table itself
        for feature in self.find_features(asset_table):
            if feature.feature_name in feature_names_to_check:
                # Find all group_keys that reference this feature
                for group_key, specific_col in feature_column_map.items():
                    # Check if this group_key references this feature
                    key_feature = group_key.split(".")[0] if "." in group_key else group_key
                    if key_feature == feature.feature_name:
                        try:
                            process_feature(feature, asset_table, group_key, specific_col)
                        except DerivaMLException:
                            raise
                        except Exception as e:
                            logger.warning(f"Could not load feature {feature.feature_name}: {e}")

        # Also check features on referenced tables (via foreign keys)
        for fk in asset_table_obj.foreign_keys:
            target_table = fk.pk_table
            for feature in self.find_features(target_table):
                if feature.feature_name in feature_names_to_check:
                    # Find all group_keys that reference this feature
                    for group_key, specific_col in feature_column_map.items():
                        # Check if this group_key references this feature
                        key_feature = group_key.split(".")[0] if "." in group_key else group_key
                        if key_feature == feature.feature_name:
                            try:
                                process_feature(feature, target_table.name, group_key, specific_col)
                            except DerivaMLException:
                                raise
                            except Exception as e:
                                logger.warning(f"Could not load feature {feature.feature_name}: {e}")

        # Now resolve multiple values using value_selector or error handling
        for group_key, target_records in records_cache.items():
            cache[group_key] = {}
            use_column = column_for_group[group_key]
            for target_rid, records in target_records.items():
                if len(records) == 1:
                    # Single value - straightforward
                    cache[group_key][target_rid] = getattr(records[0], use_column)
                elif len(records) > 1:
                    # Multiple values - need to resolve
                    unique_values = set(getattr(r, use_column) for r in records)
                    if len(unique_values) == 1:
                        # All records have same value, use it
                        cache[group_key][target_rid] = getattr(records[0], use_column)
                    elif value_selector:
                        # Use provided selector function
                        selected = value_selector(records)
                        cache[group_key][target_rid] = getattr(selected, use_column)
                    elif enforce_vocabulary:
                        # Multiple different values without selector - error
                        values_str = ", ".join(
                            f"'{getattr(r, use_column)}' (exec: {r.Execution})"
                            for r in records
                        )
                        raise DerivaMLException(
                            f"Asset '{target_rid}' has multiple different values for "
                            f"feature '{records[0].Feature_Name}': {values_str}. "
                            f"Provide a value_selector function to choose between values, "
                            f"or set enforce_vocabulary=False to use the first value."
                        )
                    else:
                        # Not enforcing - use first value
                        cache[group_key][target_rid] = getattr(records[0], use_column)

        return cache

    def _resolve_grouping_value(
        self,
        asset: dict[str, Any],
        group_key: str,
        feature_cache: dict[str, dict[RID, Any]],
    ) -> str:
        """Resolve a grouping value for an asset.

        First checks if group_key is a direct column on the asset record,
        then checks if it's a feature name in the feature cache.

        Args:
            asset: The asset record dictionary.
            group_key: Column name or feature name to group by.
            feature_cache: Pre-loaded feature values keyed by feature name -> target RID -> value.

        Returns:
            The resolved value as a string, or "Unknown" if not found or None.
            Uses "Unknown" (capitalized) to match vocabulary term naming conventions.
        """
        # First check if it's a direct column on the asset table
        if group_key in asset:
            value = asset[group_key]
            if value is not None:
                return str(value)
            return "Unknown"

        # Check if it's a feature name
        if group_key in feature_cache:
            feature_values = feature_cache[group_key]
            # Check each column in the asset that might be a FK to the feature target
            for column_name, column_value in asset.items():
                if column_value and column_value in feature_values:
                    return str(feature_values[column_value])
            # Also check if the asset's own RID is in the feature values
            if asset.get("RID") in feature_values:
                return str(feature_values[asset["RID"]])

        return "Unknown"

    def _detect_asset_table(self) -> str | None:
        """Auto-detect the asset table from dataset members.

        Searches for asset tables in the dataset members by examining
        the schema. Returns the first asset table found, or None if
        no asset tables are in the dataset.

        Returns:
            Name of the detected asset table, or None if not found.
        """
        members = self.list_dataset_members(recurse=True)
        for table_name in members:
            if table_name == "Dataset":
                continue
            # Check if this table is an asset table
            try:
                table = self.model.name_to_table(table_name)
                if self.model.is_asset(table):
                    return table_name
            except (KeyError, AttributeError):
                continue
        return None

    def _validate_dataset_types(self) -> list[str] | None:
        """Validate that the dataset or its children have Training/Testing types.

        Checks if this dataset is of type Training or Testing, or if it has
        nested children of those types. Returns the valid types found.

        Returns:
            List of Training/Testing type names found, or None if validation fails.
        """
        valid_types = {"Training", "Testing"}
        found_types: set[str] = set()

        def check_dataset(ds: DatasetBag, visited: set[RID]) -> None:
            if ds.dataset_rid in visited:
                return
            visited.add(ds.dataset_rid)

            for dtype in ds.dataset_types:
                if dtype in valid_types:
                    found_types.add(dtype)

            for child in ds.list_dataset_children():
                check_dataset(child, visited)

        check_dataset(self, set())
        return list(found_types) if found_types else None

    def restructure_assets(
        self,
        output_dir: Path | str,
        asset_table: str | None = None,
        group_by: list[str] | None = None,
        use_symlinks: bool = True,
        type_selector: Callable[[list[str]], str] | None = None,
        type_to_dir_map: dict[str, str] | None = None,
        enforce_vocabulary: bool = True,
        value_selector: Callable | None = None,
        file_transformer: Callable[[Path, Path], Path] | None = None,
    ) -> dict[Path, Path]:
        """Restructure downloaded assets into a directory hierarchy.

        Creates a directory structure organizing assets by dataset types and
        grouping values. This is useful for ML workflows that expect data
        organized in conventional folder structures (e.g., PyTorch ImageFolder).

        The dataset should be of type Training or Testing, or have nested
        children of those types. The top-level directory name is determined
        by the dataset type (e.g., "Training" -> "training").

        **Finding assets through foreign key relationships:**

        Assets are found by traversing all foreign key paths from the dataset,
        not just direct associations. For example, if a dataset contains Subjects,
        and the schema has Subject -> Encounter -> Image relationships, this method
        will find all Images reachable through those paths even though they are
        not directly in a Dataset_Image association table.

        **Handling datasets without types (prediction scenarios):**

        If a dataset has no type defined, it is treated as Testing. This is
        common for prediction/inference scenarios where you want to apply a
        trained model to new unlabeled data.

        **Handling missing labels:**

        If an asset doesn't have a value for a group_by key (e.g., no label
        assigned), it is placed in an "Unknown" directory. This allows
        restructure_assets to work with unlabeled data for prediction.

        Args:
            output_dir: Base directory for restructured assets.
            asset_table: Name of the asset table (e.g., "Image"). If None,
                auto-detects from dataset members. Raises DerivaMLException
                if multiple asset tables are found and none is specified.
            group_by: Names to group assets by. Each name creates a subdirectory
                level after the dataset type path. Names can be:

                - **Column names**: Direct columns on the asset table. The column
                  value becomes the subdirectory name.
                - **Feature names**: Features defined on the asset table (or tables
                  it references via foreign keys). The feature's vocabulary term
                  value becomes the subdirectory name.
                - **Feature.column**: Specify a particular column from a multi-term
                  feature (e.g., "Classification.Label" to use the Label column).

                Column names are checked first, then feature names. If a value
                is not found, "unknown" is used as the subdirectory name.

            use_symlinks: If True (default), create symlinks to original files.
                If False, copy files. Symlinks save disk space but require
                the original bag to remain in place. Ignored when
                ``file_transformer`` is provided.
            type_selector: Function to select type when dataset has multiple types.
                Receives list of type names, returns selected type name.
                Defaults to selecting first type or "unknown" if no types.
            type_to_dir_map: Optional mapping from dataset type names to directory
                names. Defaults to {"Training": "training", "Testing": "testing",
                "Unknown": "unknown"}. Use this to customize directory names or
                add new type mappings.
            enforce_vocabulary: If True (default), only allow features that have
                controlled vocabulary term columns, and raise an error if an asset
                has multiple different values for the same feature without a
                value_selector. This ensures clean, unambiguous directory structures.
                If False, allow any feature type and use the first value found
                when multiple values exist.
            value_selector: Optional function to select which feature value to use
                when an asset has multiple values for the same feature. Receives a
                list of FeatureRecord objects (typed Pydantic models with named
                attributes for each feature column) and returns the selected one.
                Use the Execution attribute to distinguish between values from
                different executions. Built-in selectors on FeatureRecord:
                ``select_newest``, ``select_first``, ``select_latest``,
                ``select_majority_vote(column)``.
            file_transformer: Optional callable invoked instead of the default
                symlink/copy step. Receives ``(src_path, dest_path)`` where
                ``dest_path`` is the suggested destination (preserving the original
                filename and extension). The transformer is responsible for writing
                the output file — it may change the extension or format — and must
                return the actual ``Path`` it wrote. When provided, ``use_symlinks``
                is ignored.

                Example — convert DICOM to PNG on placement::

                    def oct_to_png(src: Path, dest: Path) -> Path:
                        img = load_oct_dcm(str(src))
                        out = dest.with_suffix(".png")
                        PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
                        return out

                    bag.restructure_assets(
                        output_dir="./ml_data",
                        group_by=["Diagnosis"],
                        file_transformer=oct_to_png,
                    )

        Returns:
            Manifest dict mapping each source ``Path`` to the actual output
            ``Path`` written. When no ``file_transformer`` is provided, source
            and output paths differ only in directory location. When a
            transformer is provided, the output path may also differ in name
            or extension.

        Raises:
            DerivaMLException: If asset_table cannot be determined (multiple
                asset tables exist without specification), if no valid dataset
                types (Training/Testing) are found, or if enforce_vocabulary
                is True and a feature has multiple values without value_selector.

        Examples:
            Basic restructuring with auto-detected asset table::

                manifest = bag.restructure_assets(
                    output_dir="./ml_data",
                    group_by=["Diagnosis"],
                )
                # Creates:
                # ./ml_data/training/Normal/image1.jpg
                # ./ml_data/testing/Abnormal/image2.jpg

            Custom type-to-directory mapping::

                manifest = bag.restructure_assets(
                    output_dir="./ml_data",
                    group_by=["Diagnosis"],
                    type_to_dir_map={"Training": "train", "Testing": "test"},
                )
                # Creates:
                # ./ml_data/train/Normal/image1.jpg
                # ./ml_data/test/Abnormal/image2.jpg

            Select specific feature column for multi-term features::

                manifest = bag.restructure_assets(
                    output_dir="./ml_data",
                    group_by=["Classification.Label"],  # Use Label column
                )

            Handle multiple feature values with a built-in selector::

                from deriva_ml.feature import FeatureRecord

                manifest = bag.restructure_assets(
                    output_dir="./ml_data",
                    group_by=["Diagnosis"],
                    value_selector=FeatureRecord.select_newest,
                )

            Prediction scenario with unlabeled data::

                # Dataset has no type - treated as Testing
                # Assets have no labels - placed in Unknown directory
                manifest = bag.restructure_assets(
                    output_dir="./prediction_data",
                    group_by=["Diagnosis"],
                )
                # Creates:
                # ./prediction_data/testing/Unknown/image1.jpg
                # ./prediction_data/testing/Unknown/image2.jpg

            Convert DICOM files to PNG during restructuring::

                from PIL import Image as PILImage

                def oct_to_png(src: Path, dest: Path) -> Path:
                    img = load_oct_dcm(str(src))
                    out = dest.with_suffix(".png")
                    PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
                    return out

                manifest = bag.restructure_assets(
                    output_dir="./ml_data",
                    asset_table="OCT_DICOM",
                    group_by=["Image_Diagnosis.Diagnosis_Image"],
                    type_to_dir_map={"Training": "train", "Testing": "test"},
                    file_transformer=oct_to_png,
                )
                # manifest maps each source .dcm Path to its output .png Path:
                # Path(".../bag/OCT/image1.dcm") -> Path("./ml_data/train/Normal/image1.png")
        """
        logger = logging.getLogger("deriva_ml")
        group_by = group_by or []
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)

        # Default type-to-directory mapping
        if type_to_dir_map is None:
            type_to_dir_map = {"Training": "training", "Testing": "testing", "Unknown": "unknown"}

        # Auto-detect asset table if not provided
        if asset_table is None:
            asset_table = self._detect_asset_table()
            if asset_table is None:
                raise DerivaMLException(
                    "Could not auto-detect asset table. No asset tables found in dataset members. "
                    "Specify the asset_table parameter explicitly."
                )
            logger.info(f"Auto-detected asset table: {asset_table}")

        # Step 1: Build dataset type path map with directory name mapping
        def map_type_to_dir(types: list[str]) -> str | None:
            """Map dataset types to directory name using type_to_dir_map.

            If dataset has no types, treat it as Testing (prediction use case).
            Returns None when the type is not in type_to_dir_map, signalling
            that this dataset is a structural container (e.g. a Split parent)
            and should not contribute a path component. Its children will
            still be traversed and their own types will determine the path.
            """
            if not types:
                # No types defined - treat as Testing for prediction scenarios
                return type_to_dir_map.get("Testing", "testing")
            if type_selector:
                selected_type = type_selector(types)
            else:
                selected_type = types[0]
            if selected_type in type_to_dir_map:
                return type_to_dir_map[selected_type]
            # Type not explicitly mapped — treat as transparent container
            return None

        type_path_map = self._build_dataset_type_path_map(map_type_to_dir)

        # Step 2: Get asset-to-dataset mapping
        asset_dataset_map = self._get_asset_dataset_mapping(asset_table)

        # Step 3: Load feature values cache for relevant features
        feature_cache = self._load_feature_values_cache(
            asset_table, group_by, enforce_vocabulary, value_selector
        )

        # Step 4: Get all assets reachable through FK paths
        # This uses _get_reachable_assets which traverses FK relationships,
        # so assets connected via Subject -> Encounter -> Image are found
        # even if the dataset only contains Subjects directly.
        assets = self._get_reachable_assets(asset_table)

        manifest: dict[Path, Path] = {}

        if not assets:
            logger.warning(f"No assets found in table '{asset_table}'")
            return manifest

        # Step 5: Process each asset
        for asset in assets:
            # Get source file path
            filename = asset.get("Filename")
            if not filename:
                logger.warning(f"Asset {asset.get('RID')} has no Filename")
                continue

            source_path = Path(filename)
            if not source_path.exists():
                # Filename may be a bare basename stored in the SQLite cache
                # before image materialization.  Fall back to the canonical
                # BDBag asset layout: data/asset/{RID}/{table}/{filename}.
                try:
                    bag_root = Path(self._catalog._database_model.bag_path)
                    source_path = (
                        bag_root / "data" / "asset"
                        / asset.get("RID", "") / asset_table
                        / Path(filename).name
                    )
                except AttributeError:
                    pass  # catalog doesn't have _database_model (e.g. in tests)

            if not source_path.exists():
                logger.warning(f"Asset file not found: {filename}")
                continue

            # Get dataset type path
            dataset_rid = asset_dataset_map.get(asset["RID"])
            type_path = type_path_map.get(dataset_rid, ["unknown"])

            # Resolve grouping values
            group_path = []
            for key in group_by:
                value = self._resolve_grouping_value(asset, key, feature_cache)
                group_path.append(value)

            # Build target directory
            target_dir = output_dir.joinpath(*type_path, *group_path)
            target_dir.mkdir(parents=True, exist_ok=True)

            # Suggested destination preserves the original filename
            target_path = target_dir / source_path.name

            # Handle existing files at the suggested destination
            if target_path.exists() or target_path.is_symlink():
                target_path.unlink()

            if file_transformer is not None:
                # Transformer is responsible for writing the output file.
                # It receives the suggested dest and returns the actual path written,
                # which may differ in name or extension (e.g. DICOM -> PNG).
                actual_path = file_transformer(source_path, target_path)
            elif use_symlinks:
                try:
                    target_path.symlink_to(source_path.resolve())
                except OSError as e:
                    # Fall back to copy on platforms that don't support symlinks
                    logger.warning(f"Symlink failed, falling back to copy: {e}")
                    shutil.copy2(source_path, target_path)
                actual_path = target_path
            else:
                shutil.copy2(source_path, target_path)
                actual_path = target_path

            manifest[source_path] = actual_path

        return manifest

current_version property

current_version: DatasetVersion

Get the version of the dataset at the time the bag was downloaded.

For a DatasetBag, this is the version that was current when the bag was created. Unlike the live Dataset class, this value is immutable since bags are read-only snapshots.

Returns:

Name Type Description
DatasetVersion DatasetVersion

The semantic version (major.minor.patch) of this dataset.

__init__

__init__(
    catalog: "DerivaMLDatabase",
    dataset_rid: RID | None = None,
    dataset_types: str
    | list[str]
    | None = None,
    description: str = "",
    execution_rid: RID | None = None,
)

Initialize a DatasetBag instance for a dataset within a downloaded bag.

This mirrors the Dataset class initialization pattern, where both classes take a catalog-like object as their first argument for consistency.

Parameters:

Name Type Description Default
catalog 'DerivaMLDatabase'

The DerivaMLDatabase instance providing access to the bag's data. This implements the DerivaMLCatalog protocol.

required
dataset_rid RID | None

The RID of the dataset to wrap. If None, uses the primary dataset RID from the bag.

None
dataset_types str | list[str] | None

One or more dataset type terms. Can be a single string or list of strings.

None
description str

Human-readable description of the dataset.

''
execution_rid RID | None

RID of the execution associated with this dataset version. If None, will be looked up from the Dataset_Version table.

None

Raises:

Type Description
DerivaMLException

If no dataset_rid is provided and none can be determined from the bag, or if the RID doesn't exist in the bag.

Source code in src/deriva_ml/dataset/dataset_bag.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def __init__(
    self,
    catalog: "DerivaMLDatabase",
    dataset_rid: RID | None = None,
    dataset_types: str | list[str] | None = None,
    description: str = "",
    execution_rid: RID | None = None,
):
    """Initialize a DatasetBag instance for a dataset within a downloaded bag.

    This mirrors the Dataset class initialization pattern, where both classes
    take a catalog-like object as their first argument for consistency.

    Args:
        catalog: The DerivaMLDatabase instance providing access to the bag's data.
            This implements the DerivaMLCatalog protocol.
        dataset_rid: The RID of the dataset to wrap. If None, uses the primary
            dataset RID from the bag.
        dataset_types: One or more dataset type terms. Can be a single string
            or list of strings.
        description: Human-readable description of the dataset.
        execution_rid: RID of the execution associated with this dataset version.
            If None, will be looked up from the Dataset_Version table.

    Raises:
        DerivaMLException: If no dataset_rid is provided and none can be
            determined from the bag, or if the RID doesn't exist in the bag.
    """
    # Store reference to the catalog and extract the underlying model
    self._catalog = catalog
    self.model = catalog.model
    self.engine = cast(Engine, self.model.engine)
    self.metadata = self.model.metadata

    # Use provided RID or fall back to the bag's primary dataset
    self.dataset_rid = dataset_rid or self.model.dataset_rid
    self.description = description
    self.execution_rid = execution_rid or (
        self.model._get_dataset_execution(self.dataset_rid) or {}
    ).get("Execution")

    # Normalize dataset_types to always be a list of strings for consistency
    # with the Dataset class interface
    if dataset_types is None:
        self.dataset_types: list[str] = []
    elif isinstance(dataset_types, str):
        self.dataset_types: list[str] = [dataset_types]
    else:
        self.dataset_types: list[str] = list(dataset_types)

    if not self.dataset_rid:
        raise DerivaMLException("No dataset RID provided")

    # Validate that this dataset exists in the bag
    self.model.rid_lookup(self.dataset_rid)

    # Cache the version and dataset table reference
    self._current_version = self.model.dataset_version(self.dataset_rid)
    self._dataset_table = self.model.dataset_table

__repr__

__repr__() -> str

Return a string representation of the DatasetBag for debugging.

Source code in src/deriva_ml/dataset/dataset_bag.py
170
171
172
173
def __repr__(self) -> str:
    """Return a string representation of the DatasetBag for debugging."""
    return (f"<deriva_ml.DatasetBag object at {hex(id(self))}: rid='{self.dataset_rid}', "
            f"version='{self.current_version}', types={self.dataset_types}>")

dataset_history

dataset_history() -> list[
    DatasetHistory
]

Retrieves the version history of a dataset.

Returns a chronological list of dataset versions, including their version numbers, creation times, and associated metadata.

Returns:

Type Description
list[DatasetHistory]

list[DatasetHistory]: List of history entries, each containing: - dataset_version: Version number (major.minor.patch) - minid: Minimal Viable Identifier - snapshot: Catalog snapshot time - dataset_rid: Dataset Resource Identifier - version_rid: Version Resource Identifier - description: Version description - execution_rid: Associated execution RID

Raises:

Type Description
DerivaMLException

If dataset_rid is not a valid dataset RID.

Example

history = ml.dataset_history("1-abc123") for entry in history: ... print(f"Version {entry.dataset_version}: {entry.description}")

Source code in src/deriva_ml/dataset/dataset_bag.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def dataset_history(self) -> list[DatasetHistory]:
    """Retrieves the version history of a dataset.

    Returns a chronological list of dataset versions, including their version numbers,
    creation times, and associated metadata.

    Returns:
        list[DatasetHistory]: List of history entries, each containing:
            - dataset_version: Version number (major.minor.patch)
            - minid: Minimal Viable Identifier
            - snapshot: Catalog snapshot time
            - dataset_rid: Dataset Resource Identifier
            - version_rid: Version Resource Identifier
            - description: Version description
            - execution_rid: Associated execution RID

    Raises:
        DerivaMLException: If dataset_rid is not a valid dataset RID.

    Example:
        >>> history = ml.dataset_history("1-abc123")
        >>> for entry in history:
        ...     print(f"Version {entry.dataset_version}: {entry.description}")
    """
    # Query Dataset_Version table directly via the model
    return [
        DatasetHistory(
            dataset_version=DatasetVersion.parse(v["Version"]),
            minid=v["Minid"],
            snapshot=v["Snapshot"],
            dataset_rid=self.dataset_rid,
            version_rid=v["RID"],
            description=v["Description"],
            execution_rid=v["Execution"],
        )
        for v in self.model._get_table_contents("Dataset_Version")
        if v["Dataset"] == self.dataset_rid
    ]

denormalize_as_dataframe

denormalize_as_dataframe(
    include_tables: list[str],
    version: Any = None,
    **kwargs: Any,
) -> pd.DataFrame

Denormalize the dataset bag into a single wide table (DataFrame).

Denormalization transforms normalized relational data into a single "wide table" (also called a "flat table" or "denormalized table") by joining related tables together. This produces a DataFrame where each row contains all related information from multiple source tables, with columns from each table combined side-by-side.

Wide tables are the standard input format for most machine learning frameworks, which expect all features for a single observation to be in one row. This method bridges the gap between normalized database schemas and ML-ready tabular data.

How it works:

Tables are joined based on their foreign key relationships stored in the bag's schema. For example, if Image has a foreign key to Subject, denormalizing ["Subject", "Image"] produces rows where each image appears with its subject's metadata.

Column naming:

Column names are prefixed with the source table name using dots to avoid collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the live Dataset class which uses underscores.

Parameters:

Name Type Description Default
include_tables list[str]

List of table names to include in the output. Tables are joined based on their foreign key relationships. Order doesn't matter - the join order is determined automatically.

required
version Any

Ignored (bags are immutable snapshots of a specific version).

None
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: Wide table with columns from all included tables.

Example

Create a training dataset from a downloaded bag::

>>> # Download and materialize the dataset
>>> bag = ml.download_dataset_bag(spec, materialize=True)

>>> # Denormalize into a wide table
>>> df = bag.denormalize_as_dataframe(["Image", "Diagnosis"])
>>> print(df.columns.tolist())
['Image.RID', 'Image.Filename', 'Image.URL', 'Diagnosis.RID',
 'Diagnosis.Label', 'Diagnosis.Confidence']

>>> # Access local file paths for images
>>> for _, row in df.iterrows():
...     local_path = bag.get_asset_path("Image", row["Image.RID"])
...     label = row["Diagnosis.Label"]
...     # Train on local_path with label
See Also

denormalize_as_dict: Generator version for memory-efficient processing.

Source code in src/deriva_ml/dataset/dataset_bag.py
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
def denormalize_as_dataframe(
    self,
    include_tables: list[str],
    version: Any = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Denormalize the dataset bag into a single wide table (DataFrame).

    Denormalization transforms normalized relational data into a single "wide table"
    (also called a "flat table" or "denormalized table") by joining related tables
    together. This produces a DataFrame where each row contains all related information
    from multiple source tables, with columns from each table combined side-by-side.

    Wide tables are the standard input format for most machine learning frameworks,
    which expect all features for a single observation to be in one row. This method
    bridges the gap between normalized database schemas and ML-ready tabular data.

    **How it works:**

    Tables are joined based on their foreign key relationships stored in the bag's
    schema. For example, if Image has a foreign key to Subject, denormalizing
    ["Subject", "Image"] produces rows where each image appears with its subject's
    metadata.

    **Column naming:**

    Column names are prefixed with the source table name using dots to avoid
    collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the
    live Dataset class which uses underscores.

    Args:
        include_tables: List of table names to include in the output. Tables
            are joined based on their foreign key relationships.
            Order doesn't matter - the join order is determined automatically.
        version: Ignored (bags are immutable snapshots of a specific version).
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Returns:
        pd.DataFrame: Wide table with columns from all included tables.

    Example:
        Create a training dataset from a downloaded bag::

            >>> # Download and materialize the dataset
            >>> bag = ml.download_dataset_bag(spec, materialize=True)

            >>> # Denormalize into a wide table
            >>> df = bag.denormalize_as_dataframe(["Image", "Diagnosis"])
            >>> print(df.columns.tolist())
            ['Image.RID', 'Image.Filename', 'Image.URL', 'Diagnosis.RID',
             'Diagnosis.Label', 'Diagnosis.Confidence']

            >>> # Access local file paths for images
            >>> for _, row in df.iterrows():
            ...     local_path = bag.get_asset_path("Image", row["Image.RID"])
            ...     label = row["Diagnosis.Label"]
            ...     # Train on local_path with label

    See Also:
        denormalize_as_dict: Generator version for memory-efficient processing.
    """
    sql_stmt = self._denormalize(include_tables=include_tables)
    with Session(self.engine) as session:
        result = session.execute(sql_stmt)
        rows = [dict(row._mapping) for row in result]
    return pd.DataFrame(rows)

denormalize_as_dict

denormalize_as_dict(
    include_tables: list[str],
    version: Any = None,
    **kwargs: Any,
) -> Generator[
    dict[str, Any], None, None
]

Denormalize the dataset bag and yield rows as dictionaries.

This is a memory-efficient alternative to denormalize_as_dataframe() that yields one row at a time as a dictionary instead of loading all data into a DataFrame. Use this when processing large datasets that may not fit in memory, or when you want to process rows incrementally.

Like denormalize_as_dataframe(), this produces a "wide table" representation where each yielded dictionary contains all columns from the joined tables. See denormalize_as_dataframe() for detailed explanation of how denormalization works.

Column naming:

Column names are prefixed with the source table name using dots to avoid collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the live Dataset class which uses underscores.

Parameters:

Name Type Description Default
include_tables list[str]

List of table names to include in the output. Tables are joined based on their foreign key relationships.

required
version Any

Ignored (bags are immutable snapshots of a specific version).

None
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Yields:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary representing one row of the wide table. Keys are column names in "Table.Column" format.

Example

Stream through a large dataset for training::

>>> bag = ml.download_dataset_bag(spec, materialize=True)
>>> for row in bag.denormalize_as_dict(["Image", "Diagnosis"]):
...     # Get local file path for this image
...     local_path = bag.get_asset_path("Image", row["Image.RID"])
...     label = row["Diagnosis.Label"]
...     # Process image and label...

Build a PyTorch dataset efficiently::

>>> class BagDataset(torch.utils.data.IterableDataset):
...     def __init__(self, bag, tables):
...         self.bag = bag
...         self.tables = tables
...     def __iter__(self):
...         for row in self.bag.denormalize_as_dict(self.tables):
...             img_path = self.bag.get_asset_path("Image", row["Image.RID"])
...             yield load_image(img_path), row["Diagnosis.Label"]
See Also

denormalize_as_dataframe: Returns all data as a pandas DataFrame.

Source code in src/deriva_ml/dataset/dataset_bag.py
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def denormalize_as_dict(
    self,
    include_tables: list[str],
    version: Any = None,
    **kwargs: Any,
) -> Generator[dict[str, Any], None, None]:
    """Denormalize the dataset bag and yield rows as dictionaries.

    This is a memory-efficient alternative to denormalize_as_dataframe() that
    yields one row at a time as a dictionary instead of loading all data into
    a DataFrame. Use this when processing large datasets that may not fit in
    memory, or when you want to process rows incrementally.

    Like denormalize_as_dataframe(), this produces a "wide table" representation
    where each yielded dictionary contains all columns from the joined tables.
    See denormalize_as_dataframe() for detailed explanation of how denormalization
    works.

    **Column naming:**

    Column names are prefixed with the source table name using dots to avoid
    collisions (e.g., "Image.Filename", "Subject.RID"). This differs from the
    live Dataset class which uses underscores.

    Args:
        include_tables: List of table names to include in the output.
            Tables are joined based on their foreign key relationships.
        version: Ignored (bags are immutable snapshots of a specific version).
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Yields:
        dict[str, Any]: Dictionary representing one row of the wide table.
            Keys are column names in "Table.Column" format.

    Example:
        Stream through a large dataset for training::

            >>> bag = ml.download_dataset_bag(spec, materialize=True)
            >>> for row in bag.denormalize_as_dict(["Image", "Diagnosis"]):
            ...     # Get local file path for this image
            ...     local_path = bag.get_asset_path("Image", row["Image.RID"])
            ...     label = row["Diagnosis.Label"]
            ...     # Process image and label...

        Build a PyTorch dataset efficiently::

            >>> class BagDataset(torch.utils.data.IterableDataset):
            ...     def __init__(self, bag, tables):
            ...         self.bag = bag
            ...         self.tables = tables
            ...     def __iter__(self):
            ...         for row in self.bag.denormalize_as_dict(self.tables):
            ...             img_path = self.bag.get_asset_path("Image", row["Image.RID"])
            ...             yield load_image(img_path), row["Diagnosis.Label"]

    See Also:
        denormalize_as_dataframe: Returns all data as a pandas DataFrame.
    """
    sql_stmt = self._denormalize(include_tables=include_tables)
    with Session(self.engine) as session:
        result = session.execute(sql_stmt)
        for row in result:
            yield dict(row._mapping)

denormalize_columns

denormalize_columns(
    include_tables: list[str],
    **kwargs: Any,
) -> list[tuple[str, str]]

Return the columns that denormalize would produce, without fetching data.

Performs the same validation as :meth:denormalize_as_dataframe (table existence, FK path resolution, ambiguity detection) but stops before executing any data queries.

Parameters:

Name Type Description Default
include_tables list[str]

List of table names to include.

required
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Returns:

Type Description
list[tuple[str, str]]

List of (column_name, column_type) tuples using dot notation.

list[tuple[str, str]]

Type strings use ermrest type names (text, int4, etc.).

Example

cols = bag.denormalize_columns(["Image", "Subject"]) for name, dtype in cols: ... print(f" {name}: {dtype}") Image.RID: ermrest_rid Image.Filename: text Subject.RID: ermrest_rid Subject.Name: text

Source code in src/deriva_ml/dataset/dataset_bag.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
def denormalize_columns(
    self,
    include_tables: list[str],
    **kwargs: Any,
) -> list[tuple[str, str]]:
    """Return the columns that denormalize would produce, without fetching data.

    Performs the same validation as :meth:`denormalize_as_dataframe` (table existence,
    FK path resolution, ambiguity detection) but stops before executing any data
    queries.

    Args:
        include_tables: List of table names to include.
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Returns:
        List of ``(column_name, column_type)`` tuples using dot notation.
        Type strings use ermrest type names (``text``, ``int4``, etc.).

    Example:
        >>> cols = bag.denormalize_columns(["Image", "Subject"])
        >>> for name, dtype in cols:
        ...     print(f"  {name}: {dtype}")
        Image.RID: ermrest_rid
        Image.Filename: text
        Subject.RID: ermrest_rid
        Subject.Name: text
    """
    from deriva_ml.model.catalog import denormalize_column_name

    _, column_specs, multi_schema = self.model._prepare_wide_table(
        self, self.dataset_rid, list(include_tables)
    )

    result = []
    for schema_name, table_name, col_name, type_name in column_specs:
        prefixed = denormalize_column_name(
            schema_name, table_name, col_name, multi_schema
        )
        # _prepare_wide_table now returns ermrest type names directly,
        # so no mapping needed.
        result.append((prefixed, type_name))
    return result

fetch_table_features

fetch_table_features(
    table: Table | str,
    feature_name: str | None = None,
    selector: Callable[
        [list[FeatureRecord]],
        FeatureRecord,
    ]
    | None = None,
) -> dict[str, list[FeatureRecord]]

Fetch all feature values for a table, grouped by feature name.

Queries the local SQLite database within this dataset bag and returns a dictionary mapping feature names to lists of FeatureRecord instances. This is useful for retrieving all annotations on a table in a single call — for example, getting all classification labels, quality scores, and bounding boxes for a set of images at once.

Selector for resolving multiple values:

An asset may have multiple values for the same feature — for example, labels from different annotators or model runs. When a selector is provided, records are grouped by target RID and the selector is called once per group to pick a single value. Groups with only one record are passed through unchanged.

A selector is any callable with signature (list[FeatureRecord]) -> FeatureRecord. Built-in selectors:

  • FeatureRecord.select_newest — picks the record with the most recent RCT (Row Creation Time).

Custom selector example::

def select_highest_confidence(records):
    return max(records, key=lambda r: getattr(r, "Confidence", 0))

Parameters:

Name Type Description Default
table Table | str

The table to fetch features for (name or Table object).

required
feature_name str | None

If provided, only fetch values for this specific feature. If None, fetch all features on the table.

None
selector Callable[[list[FeatureRecord]], FeatureRecord] | None

Optional function to select among multiple feature values for the same target object. Receives a list of FeatureRecord instances (all for the same target RID) and returns the selected one.

None

Returns:

Type Description
dict[str, list[FeatureRecord]]

dict[str, list[FeatureRecord]]: Keys are feature names, values are

dict[str, list[FeatureRecord]]

lists of FeatureRecord instances. When a selector is provided, each

dict[str, list[FeatureRecord]]

target object appears at most once per feature.

Raises:

Type Description
DerivaMLException

If a specified feature_name doesn't exist on the table.

Examples:

Fetch all features for a table::

>>> features = bag.fetch_table_features("Image")
>>> for name, records in features.items():
...     print(f"{name}: {len(records)} values")

Fetch a single feature with newest-value selection::

>>> features = bag.fetch_table_features(
...     "Image",
...     feature_name="Classification",
...     selector=FeatureRecord.select_newest,
... )

Convert results to a DataFrame::

>>> features = bag.fetch_table_features("Image", feature_name="Quality")
>>> import pandas as pd
>>> df = pd.DataFrame([r.model_dump() for r in features["Quality"]])
Source code in src/deriva_ml/dataset/dataset_bag.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
def fetch_table_features(
    self,
    table: Table | str,
    feature_name: str | None = None,
    selector: Callable[[list[FeatureRecord]], FeatureRecord] | None = None,
) -> dict[str, list[FeatureRecord]]:
    """Fetch all feature values for a table, grouped by feature name.

    Queries the local SQLite database within this dataset bag and returns
    a dictionary mapping feature names to lists of FeatureRecord instances.
    This is useful for retrieving all annotations on a table in a single
    call — for example, getting all classification labels, quality scores,
    and bounding boxes for a set of images at once.

    **Selector for resolving multiple values:**

    An asset may have multiple values for the same feature — for example,
    labels from different annotators or model runs. When a ``selector`` is
    provided, records are grouped by target RID and the selector is called
    once per group to pick a single value. Groups with only one record
    are passed through unchanged.

    A selector is any callable with signature
    ``(list[FeatureRecord]) -> FeatureRecord``. Built-in selectors:

    - ``FeatureRecord.select_newest`` — picks the record with the most
      recent ``RCT`` (Row Creation Time).

    Custom selector example::

        def select_highest_confidence(records):
            return max(records, key=lambda r: getattr(r, "Confidence", 0))

    Args:
        table: The table to fetch features for (name or Table object).
        feature_name: If provided, only fetch values for this specific
            feature. If ``None``, fetch all features on the table.
        selector: Optional function to select among multiple feature values
            for the same target object. Receives a list of FeatureRecord
            instances (all for the same target RID) and returns the selected
            one.

    Returns:
        dict[str, list[FeatureRecord]]: Keys are feature names, values are
        lists of FeatureRecord instances. When a selector is provided, each
        target object appears at most once per feature.

    Raises:
        DerivaMLException: If a specified ``feature_name`` doesn't exist
            on the table.

    Examples:
        Fetch all features for a table::

            >>> features = bag.fetch_table_features("Image")
            >>> for name, records in features.items():
            ...     print(f"{name}: {len(records)} values")

        Fetch a single feature with newest-value selection::

            >>> features = bag.fetch_table_features(
            ...     "Image",
            ...     feature_name="Classification",
            ...     selector=FeatureRecord.select_newest,
            ... )

        Convert results to a DataFrame::

            >>> features = bag.fetch_table_features("Image", feature_name="Quality")
            >>> import pandas as pd
            >>> df = pd.DataFrame([r.model_dump() for r in features["Quality"]])
    """
    features = list(self.find_features(table))
    if feature_name is not None:
        features = [f for f in features if f.feature_name == feature_name]
        if not features:
            table_name = table if isinstance(table, str) else table.name
            raise DerivaMLException(
                f"Feature '{feature_name}' not found on table '{table_name}'."
            )

    result: dict[str, list[FeatureRecord]] = {}

    for feat in features:
        record_class = feat.feature_record_class()
        field_names = set(record_class.model_fields.keys())
        target_col = feat.target_table.name

        # Query raw values from SQLite
        feature_table = self.model.find_table(feat.feature_table.name)
        with Session(self.engine) as session:
            sql_cmd = select(feature_table)
            sql_result = session.execute(sql_cmd)
            rows = [dict(row._mapping) for row in sql_result]

        records: list[FeatureRecord] = []
        for raw_value in rows:
            filtered_data = {k: v for k, v in raw_value.items() if k in field_names}
            records.append(record_class(**filtered_data))

        if selector and records:
            # Group by target RID and apply selector
            grouped: dict[str, list[FeatureRecord]] = defaultdict(list)
            for rec in records:
                target_rid = getattr(rec, target_col, None)
                if target_rid is not None:
                    grouped[target_rid].append(rec)
            records = [
                selector(group) if len(group) > 1 else group[0]
                for group in grouped.values()
            ]

        result[feat.feature_name] = records

    return result

find_features

find_features(
    table: str | Table,
) -> Iterable[Feature]

Find all features defined on a table within this dataset bag.

Features are measurable properties associated with records in a table, stored as association tables linking the target table to vocabulary terms, assets, or metadata columns. This method discovers all such feature definitions for the given table.

Each returned Feature object provides:

  • feature_name: The feature's name (e.g., "Classification")
  • target_table: The table the feature applies to
  • feature_table: The association table storing feature values
  • term_columns, asset_columns, value_columns: Column role sets
  • feature_record_class(): A Pydantic model for reading/writing values

Parameters:

Name Type Description Default
table str | Table

The table to find features for (name or Table object).

required

Returns:

Type Description
Iterable[Feature]

An iterable of Feature instances describing each feature

Iterable[Feature]

defined on the table.

Example

for f in bag.find_features("Image"): ... print(f"{f.feature_name}: {len(f.term_columns)} terms, " ... f"{len(f.value_columns)} value columns")

Source code in src/deriva_ml/dataset/dataset_bag.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def find_features(self, table: str | Table) -> Iterable[Feature]:
    """Find all features defined on a table within this dataset bag.

    Features are measurable properties associated with records in a table,
    stored as association tables linking the target table to vocabulary
    terms, assets, or metadata columns. This method discovers all such
    feature definitions for the given table.

    Each returned ``Feature`` object provides:

    - ``feature_name``: The feature's name (e.g., ``"Classification"``)
    - ``target_table``: The table the feature applies to
    - ``feature_table``: The association table storing feature values
    - ``term_columns``, ``asset_columns``, ``value_columns``: Column role sets
    - ``feature_record_class()``: A Pydantic model for reading/writing values

    Args:
        table: The table to find features for (name or Table object).

    Returns:
        An iterable of Feature instances describing each feature
        defined on the table.

    Example:
        >>> for f in bag.find_features("Image"):
        ...     print(f"{f.feature_name}: {len(f.term_columns)} terms, "
        ...           f"{len(f.value_columns)} value columns")
    """
    return self.model.find_features(table)

get_table_as_dataframe

get_table_as_dataframe(
    table: str,
) -> pd.DataFrame

Get table contents as a pandas DataFrame.

Convenience method that wraps get_table_as_dict() to return a DataFrame. Provides access to all rows in a table, not just those belonging to this dataset. For dataset-filtered results, use list_dataset_members() instead.

Parameters:

Name Type Description Default
table str

Name of the table to retrieve (e.g., "Subject", "Image").

required

Returns:

Type Description
DataFrame

DataFrame with one row per record in the table.

Example

df = bag.get_table_as_dataframe("Image") print(df.shape)

Source code in src/deriva_ml/dataset/dataset_bag.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_table_as_dataframe(self, table: str) -> pd.DataFrame:
    """Get table contents as a pandas DataFrame.

    Convenience method that wraps get_table_as_dict() to return a DataFrame.
    Provides access to all rows in a table, not just those belonging to this
    dataset. For dataset-filtered results, use list_dataset_members() instead.

    Args:
        table: Name of the table to retrieve (e.g., "Subject", "Image").

    Returns:
        DataFrame with one row per record in the table.

    Example:
        >>> df = bag.get_table_as_dataframe("Image")
        >>> print(df.shape)
    """
    return pd.DataFrame(self.get_table_as_dict(table))

get_table_as_dict

get_table_as_dict(
    table: str,
) -> Generator[
    dict[str, Any], None, None
]

Get table contents as dictionaries.

Convenience method that delegates to the underlying catalog. This provides access to all rows in a table, not just those belonging to this dataset. For dataset-filtered results, use list_dataset_members() instead.

Parameters:

Name Type Description Default
table str

Name of the table to retrieve (e.g., "Subject", "Image").

required

Yields:

Name Type Description
dict dict[str, Any]

Dictionary for each row in the table.

Example

for subject in bag.get_table_as_dict("Subject"): ... print(subject["Name"])

Source code in src/deriva_ml/dataset/dataset_bag.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_table_as_dict(self, table: str) -> Generator[dict[str, Any], None, None]:
    """Get table contents as dictionaries.

    Convenience method that delegates to the underlying catalog. This provides
    access to all rows in a table, not just those belonging to this dataset.
    For dataset-filtered results, use list_dataset_members() instead.

    Args:
        table: Name of the table to retrieve (e.g., "Subject", "Image").

    Yields:
        dict: Dictionary for each row in the table.

    Example:
        >>> for subject in bag.get_table_as_dict("Subject"):
        ...     print(subject["Name"])
    """
    return self._catalog.get_table_as_dict(table)

list_dataset_children

list_dataset_children(
    recurse: bool = False,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> list[Self]

Get nested datasets.

Parameters:

Name Type Description Default
recurse bool

Whether to include children of children.

False
_visited set[RID] | None

Internal parameter to track visited datasets and prevent infinite recursion.

None
version Any

Ignored (bags are immutable snapshots).

None
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Returns:

Type Description
list[Self]

List of child dataset bags.

Source code in src/deriva_ml/dataset/dataset_bag.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def list_dataset_children(
    self,
    recurse: bool = False,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> list[Self]:
    """Get nested datasets.

    Args:
        recurse: Whether to include children of children.
        _visited: Internal parameter to track visited datasets and prevent infinite recursion.
        version: Ignored (bags are immutable snapshots).
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Returns:
        List of child dataset bags.
    """
    # Initialize visited set for recursion guard
    if _visited is None:
        _visited = set()

    # Prevent infinite recursion by checking if we've already visited this dataset
    if self.dataset_rid in _visited:
        return []
    _visited.add(self.dataset_rid)

    ds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset")
    nds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Dataset")
    dv_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Version")

    with Session(self.engine) as session:
        sql_cmd = (
            select(nds_table.Nested_Dataset, dv_table.Version)
            .join_from(ds_table, nds_table, onclause=ds_table.RID == nds_table.Nested_Dataset)
            .join_from(ds_table, dv_table, onclause=ds_table.Version == dv_table.RID)
            .where(nds_table.Dataset == self.dataset_rid)
        )
        nested = [self._catalog.lookup_dataset(r[0]) for r in session.execute(sql_cmd).all()]

    result = copy(nested)
    if recurse:
        for child in nested:
            result.extend(child.list_dataset_children(recurse=recurse, _visited=_visited))
    return result

list_dataset_element_types

list_dataset_element_types() -> (
    Iterable[Table]
)

List the types of elements that can be contained in datasets.

This method analyzes the dataset and identifies the data types for all elements within it. It is useful for understanding the structure and content of the dataset and allows for better manipulation and usage of its data.

Returns:

Type Description
Iterable[Table]

list[str]: A list of strings where each string represents a data type

Iterable[Table]

of an element found in the dataset.

Source code in src/deriva_ml/dataset/dataset_bag.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
def list_dataset_element_types(self) -> Iterable[Table]:
    """List the types of elements that can be contained in datasets.

    This method analyzes the dataset and identifies the data types for all
    elements within it. It is useful for understanding the structure and
    content of the dataset and allows for better manipulation and usage of its
    data.

    Returns:
        list[str]: A list of strings where each string represents a data type
        of an element found in the dataset.

    """
    return self.model.list_dataset_element_types()

list_dataset_members

list_dataset_members(
    recurse: bool = False,
    limit: int | None = None,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> dict[str, list[dict[str, Any]]]

Return a list of entities associated with a specific dataset.

Parameters:

Name Type Description Default
recurse bool

Whether to include members of nested datasets.

False
limit int | None

Maximum number of members to return per type. None for no limit.

None
_visited set[RID] | None

Internal parameter to track visited datasets and prevent infinite recursion.

None
version Any

Ignored (bags are immutable snapshots).

None
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Returns:

Type Description
dict[str, list[dict[str, Any]]]

Dictionary mapping member types to lists of member records.

Source code in src/deriva_ml/dataset/dataset_bag.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def list_dataset_members(
    self,
    recurse: bool = False,
    limit: int | None = None,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> dict[str, list[dict[str, Any]]]:
    """Return a list of entities associated with a specific dataset.

    Args:
        recurse: Whether to include members of nested datasets.
        limit: Maximum number of members to return per type. None for no limit.
        _visited: Internal parameter to track visited datasets and prevent infinite recursion.
        version: Ignored (bags are immutable snapshots).
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Returns:
        Dictionary mapping member types to lists of member records.
    """
    # Initialize visited set for recursion guard
    if _visited is None:
        _visited = set()

    # Prevent infinite recursion by checking if we've already visited this dataset
    if self.dataset_rid in _visited:
        return {}
    _visited.add(self.dataset_rid)

    # Look at each of the element types that might be in the _dataset_table and get the list of rid for them from
    # the appropriate association table.
    members = defaultdict(list)

    dataset_class = self.model.get_orm_class_for_table(self._dataset_table)
    for element_table in self.model.list_dataset_element_types():
        element_class = self.model.get_orm_class_for_table(element_table)

        assoc_class, dataset_rel, element_rel = self.model.get_orm_association_class(dataset_class, element_class)

        element_table = inspect(element_class).mapped_table
        if not self.model.is_domain_schema(element_table.schema) and element_table.name not in ["Dataset", "File"]:
            # Look at domain tables and nested datasets.
            continue

        # Get the names of the columns that we are going to need for linking
        with Session(self.engine) as session:
            # For Dataset_Dataset, use Nested_Dataset column to find nested datasets
            # (similar to how the live catalog does it in Dataset.list_dataset_members)
            if element_table.name == "Dataset":
                sql_cmd = (
                    select(element_class)
                    .join(assoc_class, element_class.RID == assoc_class.__table__.c["Nested_Dataset"])
                    .where(self.dataset_rid == assoc_class.__table__.c["Dataset"])
                )
            else:
                # For other tables, use the original join via element_rel
                sql_cmd = (
                    select(element_class)
                    .join(element_rel)
                    .where(self.dataset_rid == assoc_class.__table__.c["Dataset"])
                )
            if limit is not None:
                sql_cmd = sql_cmd.limit(limit)
            # Get back the list of ORM entities and convert them to dictionaries.
            element_entities = session.scalars(sql_cmd).all()
            element_rows = [{c.key: getattr(obj, c.key) for c in obj.__table__.columns} for obj in element_entities]
        members[element_table.name].extend(element_rows)
        if recurse and (element_table.name == self._dataset_table.name):
            # Get the members for all the nested datasets and add to the member list.
            nested_datasets = [d["RID"] for d in element_rows]
            for ds in nested_datasets:
                nested_dataset = self._catalog.lookup_dataset(ds)
                for k, v in nested_dataset.list_dataset_members(recurse=recurse, limit=limit, _visited=_visited).items():
                    members[k].extend(v)
    return dict(members)

list_dataset_parents

list_dataset_parents(
    recurse: bool = False,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> list[Self]

Given a dataset_table RID, return a list of RIDs of the parent datasets if this is included in a nested dataset.

Parameters:

Name Type Description Default
recurse bool

If True, recursively return all ancestor datasets.

False
_visited set[RID] | None

Internal parameter to track visited datasets and prevent infinite recursion.

None
version Any

Ignored (bags are immutable snapshots).

None
**kwargs Any

Additional arguments (ignored, for protocol compatibility).

{}

Returns:

Type Description
list[Self]

List of parent dataset bags.

Source code in src/deriva_ml/dataset/dataset_bag.py
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def list_dataset_parents(
    self,
    recurse: bool = False,
    _visited: set[RID] | None = None,
    version: Any = None,
    **kwargs: Any,
) -> list[Self]:
    """Given a dataset_table RID, return a list of RIDs of the parent datasets if this is included in a
    nested dataset.

    Args:
        recurse: If True, recursively return all ancestor datasets.
        _visited: Internal parameter to track visited datasets and prevent infinite recursion.
        version: Ignored (bags are immutable snapshots).
        **kwargs: Additional arguments (ignored, for protocol compatibility).

    Returns:
        List of parent dataset bags.
    """
    # Initialize visited set for recursion guard
    if _visited is None:
        _visited = set()

    # Prevent infinite recursion by checking if we've already visited this dataset
    if self.dataset_rid in _visited:
        return []
    _visited.add(self.dataset_rid)

    nds_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Dataset")

    with Session(self.engine) as session:
        sql_cmd = select(nds_table.Dataset).where(nds_table.Nested_Dataset == self.dataset_rid)
        parents = [self._catalog.lookup_dataset(r[0]) for r in session.execute(sql_cmd).all()]

    if recurse:
        for parent in parents.copy():
            parents.extend(parent.list_dataset_parents(recurse=True, _visited=_visited))
    return parents

list_executions

list_executions() -> list[RID]

List all execution RIDs associated with this dataset.

Returns all executions that used this dataset as input. This is tracked through the Dataset_Execution association table.

Note

Unlike the live Dataset class which returns Execution objects, DatasetBag returns a list of execution RIDs since the bag is an offline snapshot and cannot look up live execution objects.

Returns:

Type Description
list[RID]

List of execution RIDs associated with this dataset.

Example

bag = ml.download_dataset_bag(dataset_spec) execution_rids = bag.list_executions() for rid in execution_rids: ... print(f"Associated execution: {rid}")

Source code in src/deriva_ml/dataset/dataset_bag.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
def list_executions(self) -> list[RID]:
    """List all execution RIDs associated with this dataset.

    Returns all executions that used this dataset as input. This is
    tracked through the Dataset_Execution association table.

    Note:
        Unlike the live Dataset class which returns Execution objects,
        DatasetBag returns a list of execution RIDs since the bag is
        an offline snapshot and cannot look up live execution objects.

    Returns:
        List of execution RIDs associated with this dataset.

    Example:
        >>> bag = ml.download_dataset_bag(dataset_spec)
        >>> execution_rids = bag.list_executions()
        >>> for rid in execution_rids:
        ...     print(f"Associated execution: {rid}")
    """
    de_table = self.model.get_orm_class_by_name(f"{self.model.ml_schema}.Dataset_Execution")

    with Session(self.engine) as session:
        sql_cmd = select(de_table.Execution).where(de_table.Dataset == self.dataset_rid)
        return [r[0] for r in session.execute(sql_cmd).all()]

list_feature_values

list_feature_values(
    table: Table | str,
    feature_name: str,
    selector: Callable[
        [list[FeatureRecord]],
        FeatureRecord,
    ]
    | None = None,
) -> Iterable[FeatureRecord]

Retrieve all values for a single feature as typed FeatureRecord instances.

Convenience wrapper around fetch_table_features() for the common case of querying a single feature by name. Returns a flat list of FeatureRecord objects — one per feature value (or one per target object when a selector is provided).

Each returned record is a dynamically-generated Pydantic model with typed fields matching the feature's definition. For example, an Image_Classification feature might produce records with fields Image (str), Image_Class (str), Execution (str), RCT (str), and Feature_Name (str).

Parameters:

Name Type Description Default
table Table | str

The table the feature is defined on (name or Table object).

required
feature_name str

Name of the feature to retrieve values for.

required
selector Callable[[list[FeatureRecord]], FeatureRecord] | None

Optional function to resolve multiple values per target. See fetch_table_features for details on how selectors work. Use FeatureRecord.select_newest to pick the most recently created value.

None

Returns:

Type Description
Iterable[FeatureRecord]

Iterable[FeatureRecord]: FeatureRecord instances with:

Iterable[FeatureRecord]
  • Execution: RID of the execution that created this value
Iterable[FeatureRecord]
  • Feature_Name: Name of the feature
Iterable[FeatureRecord]
  • RCT: Row Creation Time (ISO 8601 timestamp)
Iterable[FeatureRecord]
  • Feature-specific columns as typed attributes (vocabulary terms, asset references, or value columns depending on the feature)
Iterable[FeatureRecord]
  • model_dump(): Convert to a dictionary

Raises:

Type Description
DerivaMLException

If the feature doesn't exist on the table.

Examples:

Get typed feature records::

>>> for record in bag.list_feature_values("Image", "Quality"):
...     print(f"Image {record.Image}: {record.ImageQuality}")
...     print(f"Created by execution: {record.Execution}")

Select newest when multiple values exist::

>>> records = list(bag.list_feature_values(
...     "Image", "Quality",
...     selector=FeatureRecord.select_newest,
... ))

Convert to a list of dicts::

>>> dicts = [r.model_dump() for r in
...          bag.list_feature_values("Image", "Classification")]
Source code in src/deriva_ml/dataset/dataset_bag.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def list_feature_values(
    self,
    table: Table | str,
    feature_name: str,
    selector: Callable[[list[FeatureRecord]], FeatureRecord] | None = None,
) -> Iterable[FeatureRecord]:
    """Retrieve all values for a single feature as typed FeatureRecord instances.

    Convenience wrapper around ``fetch_table_features()`` for the common
    case of querying a single feature by name. Returns a flat list of
    FeatureRecord objects — one per feature value (or one per target object
    when a ``selector`` is provided).

    Each returned record is a dynamically-generated Pydantic model with
    typed fields matching the feature's definition. For example, an
    ``Image_Classification`` feature might produce records with fields
    ``Image`` (str), ``Image_Class`` (str), ``Execution`` (str),
    ``RCT`` (str), and ``Feature_Name`` (str).

    Args:
        table: The table the feature is defined on (name or Table object).
        feature_name: Name of the feature to retrieve values for.
        selector: Optional function to resolve multiple values per target.
            See ``fetch_table_features`` for details on how selectors work.
            Use ``FeatureRecord.select_newest`` to pick the most recently
            created value.

    Returns:
        Iterable[FeatureRecord]: FeatureRecord instances with:

        - ``Execution``: RID of the execution that created this value
        - ``Feature_Name``: Name of the feature
        - ``RCT``: Row Creation Time (ISO 8601 timestamp)
        - Feature-specific columns as typed attributes (vocabulary terms,
          asset references, or value columns depending on the feature)
        - ``model_dump()``: Convert to a dictionary

    Raises:
        DerivaMLException: If the feature doesn't exist on the table.

    Examples:
        Get typed feature records::

            >>> for record in bag.list_feature_values("Image", "Quality"):
            ...     print(f"Image {record.Image}: {record.ImageQuality}")
            ...     print(f"Created by execution: {record.Execution}")

        Select newest when multiple values exist::

            >>> records = list(bag.list_feature_values(
            ...     "Image", "Quality",
            ...     selector=FeatureRecord.select_newest,
            ... ))

        Convert to a list of dicts::

            >>> dicts = [r.model_dump() for r in
            ...          bag.list_feature_values("Image", "Classification")]
    """
    result = self.fetch_table_features(table, feature_name=feature_name, selector=selector)
    return result.get(feature_name, [])

list_tables

list_tables() -> list[str]

List all tables available in the bag's SQLite database.

Returns the fully-qualified names of all tables (e.g., "domain.Image", "deriva-ml.Dataset") that were exported in this bag.

Returns:

Type Description
list[str]

list[str]: Table names in "schema.table" format, sorted alphabetically.

Source code in src/deriva_ml/dataset/dataset_bag.py
188
189
190
191
192
193
194
195
196
197
def list_tables(self) -> list[str]:
    """List all tables available in the bag's SQLite database.

    Returns the fully-qualified names of all tables (e.g., "domain.Image",
    "deriva-ml.Dataset") that were exported in this bag.

    Returns:
        list[str]: Table names in "schema.table" format, sorted alphabetically.
    """
    return self.model.list_tables()

restructure_assets

restructure_assets(
    output_dir: Path | str,
    asset_table: str | None = None,
    group_by: list[str] | None = None,
    use_symlinks: bool = True,
    type_selector: Callable[
        [list[str]], str
    ]
    | None = None,
    type_to_dir_map: dict[str, str]
    | None = None,
    enforce_vocabulary: bool = True,
    value_selector: Callable
    | None = None,
    file_transformer: Callable[
        [Path, Path], Path
    ]
    | None = None,
) -> dict[Path, Path]

Restructure downloaded assets into a directory hierarchy.

Creates a directory structure organizing assets by dataset types and grouping values. This is useful for ML workflows that expect data organized in conventional folder structures (e.g., PyTorch ImageFolder).

The dataset should be of type Training or Testing, or have nested children of those types. The top-level directory name is determined by the dataset type (e.g., "Training" -> "training").

Finding assets through foreign key relationships:

Assets are found by traversing all foreign key paths from the dataset, not just direct associations. For example, if a dataset contains Subjects, and the schema has Subject -> Encounter -> Image relationships, this method will find all Images reachable through those paths even though they are not directly in a Dataset_Image association table.

Handling datasets without types (prediction scenarios):

If a dataset has no type defined, it is treated as Testing. This is common for prediction/inference scenarios where you want to apply a trained model to new unlabeled data.

Handling missing labels:

If an asset doesn't have a value for a group_by key (e.g., no label assigned), it is placed in an "Unknown" directory. This allows restructure_assets to work with unlabeled data for prediction.

Parameters:

Name Type Description Default
output_dir Path | str

Base directory for restructured assets.

required
asset_table str | None

Name of the asset table (e.g., "Image"). If None, auto-detects from dataset members. Raises DerivaMLException if multiple asset tables are found and none is specified.

None
group_by list[str] | None

Names to group assets by. Each name creates a subdirectory level after the dataset type path. Names can be:

  • Column names: Direct columns on the asset table. The column value becomes the subdirectory name.
  • Feature names: Features defined on the asset table (or tables it references via foreign keys). The feature's vocabulary term value becomes the subdirectory name.
  • Feature.column: Specify a particular column from a multi-term feature (e.g., "Classification.Label" to use the Label column).

Column names are checked first, then feature names. If a value is not found, "unknown" is used as the subdirectory name.

None
use_symlinks bool

If True (default), create symlinks to original files. If False, copy files. Symlinks save disk space but require the original bag to remain in place. Ignored when file_transformer is provided.

True
type_selector Callable[[list[str]], str] | None

Function to select type when dataset has multiple types. Receives list of type names, returns selected type name. Defaults to selecting first type or "unknown" if no types.

None
type_to_dir_map dict[str, str] | None

Optional mapping from dataset type names to directory names. Defaults to {"Training": "training", "Testing": "testing", "Unknown": "unknown"}. Use this to customize directory names or add new type mappings.

None
enforce_vocabulary bool

If True (default), only allow features that have controlled vocabulary term columns, and raise an error if an asset has multiple different values for the same feature without a value_selector. This ensures clean, unambiguous directory structures. If False, allow any feature type and use the first value found when multiple values exist.

True
value_selector Callable | None

Optional function to select which feature value to use when an asset has multiple values for the same feature. Receives a list of FeatureRecord objects (typed Pydantic models with named attributes for each feature column) and returns the selected one. Use the Execution attribute to distinguish between values from different executions. Built-in selectors on FeatureRecord: select_newest, select_first, select_latest, select_majority_vote(column).

None
file_transformer Callable[[Path, Path], Path] | None

Optional callable invoked instead of the default symlink/copy step. Receives (src_path, dest_path) where dest_path is the suggested destination (preserving the original filename and extension). The transformer is responsible for writing the output file — it may change the extension or format — and must return the actual Path it wrote. When provided, use_symlinks is ignored.

Example — convert DICOM to PNG on placement::

def oct_to_png(src: Path, dest: Path) -> Path:
    img = load_oct_dcm(str(src))
    out = dest.with_suffix(".png")
    PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
    return out

bag.restructure_assets(
    output_dir="./ml_data",
    group_by=["Diagnosis"],
    file_transformer=oct_to_png,
)
None

Returns:

Type Description
dict[Path, Path]

Manifest dict mapping each source Path to the actual output

dict[Path, Path]

Path written. When no file_transformer is provided, source

dict[Path, Path]

and output paths differ only in directory location. When a

dict[Path, Path]

transformer is provided, the output path may also differ in name

dict[Path, Path]

or extension.

Raises:

Type Description
DerivaMLException

If asset_table cannot be determined (multiple asset tables exist without specification), if no valid dataset types (Training/Testing) are found, or if enforce_vocabulary is True and a feature has multiple values without value_selector.

Examples:

Basic restructuring with auto-detected asset table::

manifest = bag.restructure_assets(
    output_dir="./ml_data",
    group_by=["Diagnosis"],
)
# Creates:
# ./ml_data/training/Normal/image1.jpg
# ./ml_data/testing/Abnormal/image2.jpg

Custom type-to-directory mapping::

manifest = bag.restructure_assets(
    output_dir="./ml_data",
    group_by=["Diagnosis"],
    type_to_dir_map={"Training": "train", "Testing": "test"},
)
# Creates:
# ./ml_data/train/Normal/image1.jpg
# ./ml_data/test/Abnormal/image2.jpg

Select specific feature column for multi-term features::

manifest = bag.restructure_assets(
    output_dir="./ml_data",
    group_by=["Classification.Label"],  # Use Label column
)

Handle multiple feature values with a built-in selector::

from deriva_ml.feature import FeatureRecord

manifest = bag.restructure_assets(
    output_dir="./ml_data",
    group_by=["Diagnosis"],
    value_selector=FeatureRecord.select_newest,
)

Prediction scenario with unlabeled data::

# Dataset has no type - treated as Testing
# Assets have no labels - placed in Unknown directory
manifest = bag.restructure_assets(
    output_dir="./prediction_data",
    group_by=["Diagnosis"],
)
# Creates:
# ./prediction_data/testing/Unknown/image1.jpg
# ./prediction_data/testing/Unknown/image2.jpg

Convert DICOM files to PNG during restructuring::

from PIL import Image as PILImage

def oct_to_png(src: Path, dest: Path) -> Path:
    img = load_oct_dcm(str(src))
    out = dest.with_suffix(".png")
    PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
    return out

manifest = bag.restructure_assets(
    output_dir="./ml_data",
    asset_table="OCT_DICOM",
    group_by=["Image_Diagnosis.Diagnosis_Image"],
    type_to_dir_map={"Training": "train", "Testing": "test"},
    file_transformer=oct_to_png,
)
# manifest maps each source .dcm Path to its output .png Path:
# Path(".../bag/OCT/image1.dcm") -> Path("./ml_data/train/Normal/image1.png")
Source code in src/deriva_ml/dataset/dataset_bag.py
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
def restructure_assets(
    self,
    output_dir: Path | str,
    asset_table: str | None = None,
    group_by: list[str] | None = None,
    use_symlinks: bool = True,
    type_selector: Callable[[list[str]], str] | None = None,
    type_to_dir_map: dict[str, str] | None = None,
    enforce_vocabulary: bool = True,
    value_selector: Callable | None = None,
    file_transformer: Callable[[Path, Path], Path] | None = None,
) -> dict[Path, Path]:
    """Restructure downloaded assets into a directory hierarchy.

    Creates a directory structure organizing assets by dataset types and
    grouping values. This is useful for ML workflows that expect data
    organized in conventional folder structures (e.g., PyTorch ImageFolder).

    The dataset should be of type Training or Testing, or have nested
    children of those types. The top-level directory name is determined
    by the dataset type (e.g., "Training" -> "training").

    **Finding assets through foreign key relationships:**

    Assets are found by traversing all foreign key paths from the dataset,
    not just direct associations. For example, if a dataset contains Subjects,
    and the schema has Subject -> Encounter -> Image relationships, this method
    will find all Images reachable through those paths even though they are
    not directly in a Dataset_Image association table.

    **Handling datasets without types (prediction scenarios):**

    If a dataset has no type defined, it is treated as Testing. This is
    common for prediction/inference scenarios where you want to apply a
    trained model to new unlabeled data.

    **Handling missing labels:**

    If an asset doesn't have a value for a group_by key (e.g., no label
    assigned), it is placed in an "Unknown" directory. This allows
    restructure_assets to work with unlabeled data for prediction.

    Args:
        output_dir: Base directory for restructured assets.
        asset_table: Name of the asset table (e.g., "Image"). If None,
            auto-detects from dataset members. Raises DerivaMLException
            if multiple asset tables are found and none is specified.
        group_by: Names to group assets by. Each name creates a subdirectory
            level after the dataset type path. Names can be:

            - **Column names**: Direct columns on the asset table. The column
              value becomes the subdirectory name.
            - **Feature names**: Features defined on the asset table (or tables
              it references via foreign keys). The feature's vocabulary term
              value becomes the subdirectory name.
            - **Feature.column**: Specify a particular column from a multi-term
              feature (e.g., "Classification.Label" to use the Label column).

            Column names are checked first, then feature names. If a value
            is not found, "unknown" is used as the subdirectory name.

        use_symlinks: If True (default), create symlinks to original files.
            If False, copy files. Symlinks save disk space but require
            the original bag to remain in place. Ignored when
            ``file_transformer`` is provided.
        type_selector: Function to select type when dataset has multiple types.
            Receives list of type names, returns selected type name.
            Defaults to selecting first type or "unknown" if no types.
        type_to_dir_map: Optional mapping from dataset type names to directory
            names. Defaults to {"Training": "training", "Testing": "testing",
            "Unknown": "unknown"}. Use this to customize directory names or
            add new type mappings.
        enforce_vocabulary: If True (default), only allow features that have
            controlled vocabulary term columns, and raise an error if an asset
            has multiple different values for the same feature without a
            value_selector. This ensures clean, unambiguous directory structures.
            If False, allow any feature type and use the first value found
            when multiple values exist.
        value_selector: Optional function to select which feature value to use
            when an asset has multiple values for the same feature. Receives a
            list of FeatureRecord objects (typed Pydantic models with named
            attributes for each feature column) and returns the selected one.
            Use the Execution attribute to distinguish between values from
            different executions. Built-in selectors on FeatureRecord:
            ``select_newest``, ``select_first``, ``select_latest``,
            ``select_majority_vote(column)``.
        file_transformer: Optional callable invoked instead of the default
            symlink/copy step. Receives ``(src_path, dest_path)`` where
            ``dest_path`` is the suggested destination (preserving the original
            filename and extension). The transformer is responsible for writing
            the output file — it may change the extension or format — and must
            return the actual ``Path`` it wrote. When provided, ``use_symlinks``
            is ignored.

            Example — convert DICOM to PNG on placement::

                def oct_to_png(src: Path, dest: Path) -> Path:
                    img = load_oct_dcm(str(src))
                    out = dest.with_suffix(".png")
                    PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
                    return out

                bag.restructure_assets(
                    output_dir="./ml_data",
                    group_by=["Diagnosis"],
                    file_transformer=oct_to_png,
                )

    Returns:
        Manifest dict mapping each source ``Path`` to the actual output
        ``Path`` written. When no ``file_transformer`` is provided, source
        and output paths differ only in directory location. When a
        transformer is provided, the output path may also differ in name
        or extension.

    Raises:
        DerivaMLException: If asset_table cannot be determined (multiple
            asset tables exist without specification), if no valid dataset
            types (Training/Testing) are found, or if enforce_vocabulary
            is True and a feature has multiple values without value_selector.

    Examples:
        Basic restructuring with auto-detected asset table::

            manifest = bag.restructure_assets(
                output_dir="./ml_data",
                group_by=["Diagnosis"],
            )
            # Creates:
            # ./ml_data/training/Normal/image1.jpg
            # ./ml_data/testing/Abnormal/image2.jpg

        Custom type-to-directory mapping::

            manifest = bag.restructure_assets(
                output_dir="./ml_data",
                group_by=["Diagnosis"],
                type_to_dir_map={"Training": "train", "Testing": "test"},
            )
            # Creates:
            # ./ml_data/train/Normal/image1.jpg
            # ./ml_data/test/Abnormal/image2.jpg

        Select specific feature column for multi-term features::

            manifest = bag.restructure_assets(
                output_dir="./ml_data",
                group_by=["Classification.Label"],  # Use Label column
            )

        Handle multiple feature values with a built-in selector::

            from deriva_ml.feature import FeatureRecord

            manifest = bag.restructure_assets(
                output_dir="./ml_data",
                group_by=["Diagnosis"],
                value_selector=FeatureRecord.select_newest,
            )

        Prediction scenario with unlabeled data::

            # Dataset has no type - treated as Testing
            # Assets have no labels - placed in Unknown directory
            manifest = bag.restructure_assets(
                output_dir="./prediction_data",
                group_by=["Diagnosis"],
            )
            # Creates:
            # ./prediction_data/testing/Unknown/image1.jpg
            # ./prediction_data/testing/Unknown/image2.jpg

        Convert DICOM files to PNG during restructuring::

            from PIL import Image as PILImage

            def oct_to_png(src: Path, dest: Path) -> Path:
                img = load_oct_dcm(str(src))
                out = dest.with_suffix(".png")
                PILImage.fromarray((img * 255).astype(np.uint8)).save(out)
                return out

            manifest = bag.restructure_assets(
                output_dir="./ml_data",
                asset_table="OCT_DICOM",
                group_by=["Image_Diagnosis.Diagnosis_Image"],
                type_to_dir_map={"Training": "train", "Testing": "test"},
                file_transformer=oct_to_png,
            )
            # manifest maps each source .dcm Path to its output .png Path:
            # Path(".../bag/OCT/image1.dcm") -> Path("./ml_data/train/Normal/image1.png")
    """
    logger = logging.getLogger("deriva_ml")
    group_by = group_by or []
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Default type-to-directory mapping
    if type_to_dir_map is None:
        type_to_dir_map = {"Training": "training", "Testing": "testing", "Unknown": "unknown"}

    # Auto-detect asset table if not provided
    if asset_table is None:
        asset_table = self._detect_asset_table()
        if asset_table is None:
            raise DerivaMLException(
                "Could not auto-detect asset table. No asset tables found in dataset members. "
                "Specify the asset_table parameter explicitly."
            )
        logger.info(f"Auto-detected asset table: {asset_table}")

    # Step 1: Build dataset type path map with directory name mapping
    def map_type_to_dir(types: list[str]) -> str | None:
        """Map dataset types to directory name using type_to_dir_map.

        If dataset has no types, treat it as Testing (prediction use case).
        Returns None when the type is not in type_to_dir_map, signalling
        that this dataset is a structural container (e.g. a Split parent)
        and should not contribute a path component. Its children will
        still be traversed and their own types will determine the path.
        """
        if not types:
            # No types defined - treat as Testing for prediction scenarios
            return type_to_dir_map.get("Testing", "testing")
        if type_selector:
            selected_type = type_selector(types)
        else:
            selected_type = types[0]
        if selected_type in type_to_dir_map:
            return type_to_dir_map[selected_type]
        # Type not explicitly mapped — treat as transparent container
        return None

    type_path_map = self._build_dataset_type_path_map(map_type_to_dir)

    # Step 2: Get asset-to-dataset mapping
    asset_dataset_map = self._get_asset_dataset_mapping(asset_table)

    # Step 3: Load feature values cache for relevant features
    feature_cache = self._load_feature_values_cache(
        asset_table, group_by, enforce_vocabulary, value_selector
    )

    # Step 4: Get all assets reachable through FK paths
    # This uses _get_reachable_assets which traverses FK relationships,
    # so assets connected via Subject -> Encounter -> Image are found
    # even if the dataset only contains Subjects directly.
    assets = self._get_reachable_assets(asset_table)

    manifest: dict[Path, Path] = {}

    if not assets:
        logger.warning(f"No assets found in table '{asset_table}'")
        return manifest

    # Step 5: Process each asset
    for asset in assets:
        # Get source file path
        filename = asset.get("Filename")
        if not filename:
            logger.warning(f"Asset {asset.get('RID')} has no Filename")
            continue

        source_path = Path(filename)
        if not source_path.exists():
            # Filename may be a bare basename stored in the SQLite cache
            # before image materialization.  Fall back to the canonical
            # BDBag asset layout: data/asset/{RID}/{table}/{filename}.
            try:
                bag_root = Path(self._catalog._database_model.bag_path)
                source_path = (
                    bag_root / "data" / "asset"
                    / asset.get("RID", "") / asset_table
                    / Path(filename).name
                )
            except AttributeError:
                pass  # catalog doesn't have _database_model (e.g. in tests)

        if not source_path.exists():
            logger.warning(f"Asset file not found: {filename}")
            continue

        # Get dataset type path
        dataset_rid = asset_dataset_map.get(asset["RID"])
        type_path = type_path_map.get(dataset_rid, ["unknown"])

        # Resolve grouping values
        group_path = []
        for key in group_by:
            value = self._resolve_grouping_value(asset, key, feature_cache)
            group_path.append(value)

        # Build target directory
        target_dir = output_dir.joinpath(*type_path, *group_path)
        target_dir.mkdir(parents=True, exist_ok=True)

        # Suggested destination preserves the original filename
        target_path = target_dir / source_path.name

        # Handle existing files at the suggested destination
        if target_path.exists() or target_path.is_symlink():
            target_path.unlink()

        if file_transformer is not None:
            # Transformer is responsible for writing the output file.
            # It receives the suggested dest and returns the actual path written,
            # which may differ in name or extension (e.g. DICOM -> PNG).
            actual_path = file_transformer(source_path, target_path)
        elif use_symlinks:
            try:
                target_path.symlink_to(source_path.resolve())
            except OSError as e:
                # Fall back to copy on platforms that don't support symlinks
                logger.warning(f"Symlink failed, falling back to copy: {e}")
                shutil.copy2(source_path, target_path)
            actual_path = target_path
        else:
            shutil.copy2(source_path, target_path)
            actual_path = target_path

        manifest[source_path] = actual_path

    return manifest