Skip to content

DerivaML Class

The DerivaML class provides a range of methods to interact with a Deriva catalog. These methods assume tha tthe catalog contains a deriva-ml and a domain schema.

Data Catalog: The catalog must include both the domain schema and a standard ML schema for effective data management.

ERD

  • Domain schema: The domain schema includes the data collected or generated by domain-specific experiments or systems.
  • ML schema: Each entity in the ML schema is designed to capture details of the ML development process. It including the following tables
    • A Dataset represents a data collection, such as aggregation identified for training, validation, and testing purposes.
    • A Workflow represents a specific sequence of computational steps or human interactions.
    • An Execution is an instance of a workflow that a user instantiates at a specific time.
    • An Execution Asset is an output file that results from the execution of a workflow.
    • An Execution Metadata is an asset entity for saving metadata files referencing a given execution.

BuiltinTypes

Bases: Enum

ERMrest built-in data types.

Maps ERMrest's built-in data types to their type names. These types are used for defining column types in tables and for type validation.

Attributes:

Name Type Description
text str

Text/string type.

int2 str

16-bit integer.

jsonb str

Binary JSON.

float8 str

64-bit float.

timestamp str

Timestamp without timezone.

int8 str

64-bit integer.

boolean str

Boolean type.

json str

JSON type.

float4 str

32-bit float.

int4 str

32-bit integer.

timestamptz str

Timestamp with timezone.

date str

Date type.

ermrest_rid str

Resource identifier.

ermrest_rcb str

Record created by.

ermrest_rmb str

Record modified by.

ermrest_rct str

Record creation time.

ermrest_rmt str

Record modification time.

markdown str

Markdown text.

longtext str

Long text.

ermrest_curie str

Compact URI.

ermrest_uri str

URI type.

color_rgb_hex str

RGB color in hex.

serial2 str

16-bit auto-incrementing.

serial4 str

32-bit auto-incrementing.

serial8 str

64-bit auto-incrementing.

Source code in src/deriva_ml/core/enums.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class BuiltinTypes(Enum):
    """ERMrest built-in data types.

    Maps ERMrest's built-in data types to their type names. These types are used for defining
    column types in tables and for type validation.

    Attributes:
        text (str): Text/string type.
        int2 (str): 16-bit integer.
        jsonb (str): Binary JSON.
        float8 (str): 64-bit float.
        timestamp (str): Timestamp without timezone.
        int8 (str): 64-bit integer.
        boolean (str): Boolean type.
        json (str): JSON type.
        float4 (str): 32-bit float.
        int4 (str): 32-bit integer.
        timestamptz (str): Timestamp with timezone.
        date (str): Date type.
        ermrest_rid (str): Resource identifier.
        ermrest_rcb (str): Record created by.
        ermrest_rmb (str): Record modified by.
        ermrest_rct (str): Record creation time.
        ermrest_rmt (str): Record modification time.
        markdown (str): Markdown text.
        longtext (str): Long text.
        ermrest_curie (str): Compact URI.
        ermrest_uri (str): URI type.
        color_rgb_hex (str): RGB color in hex.
        serial2 (str): 16-bit auto-incrementing.
        serial4 (str): 32-bit auto-incrementing.
        serial8 (str): 64-bit auto-incrementing.
    """

    text = builtin_types.text.typename
    int2 = builtin_types.int2.typename
    jsonb = builtin_types.json.typename
    float8 = builtin_types.float8.typename
    timestamp = builtin_types.timestamp.typename
    int8 = builtin_types.int8.typename
    boolean = builtin_types.boolean.typename
    json = builtin_types.json.typename
    float4 = builtin_types.float4.typename
    int4 = builtin_types.int4.typename
    timestamptz = builtin_types.timestamptz.typename
    date = builtin_types.date.typename
    ermrest_rid = builtin_types.ermrest_rid.typename
    ermrest_rcb = builtin_types.ermrest_rcb.typename
    ermrest_rmb = builtin_types.ermrest_rmb.typename
    ermrest_rct = builtin_types.ermrest_rct.typename
    ermrest_rmt = builtin_types.ermrest_rmt.typename
    markdown = builtin_types.markdown.typename
    longtext = builtin_types.longtext.typename
    ermrest_curie = builtin_types.ermrest_curie.typename
    ermrest_uri = builtin_types.ermrest_uri.typename
    color_rgb_hex = builtin_types.color_rgb_hex.typename
    serial2 = builtin_types.serial2.typename
    serial4 = builtin_types.serial4.typename
    serial8 = builtin_types.serial8.typename

ColumnDefinition

Bases: BaseModel

Defines a column in an ERMrest table.

Provides a Pydantic model for defining columns with their types, constraints, and metadata. Maps to deriva_py's Column.define functionality.

Attributes:

Name Type Description
name str

Name of the column.

type BuiltinTypes

ERMrest data type for the column.

nullok bool

Whether NULL values are allowed. Defaults to True.

default Any

Default value for the column.

comment str | None

Description of the column's purpose.

acls dict

Access control lists.

acl_bindings dict

Dynamic access control bindings.

annotations dict

Additional metadata annotations.

Example

col = ColumnDefinition( ... name="score", ... type=BuiltinTypes.float4, ... nullok=False, ... comment="Confidence score between 0 and 1" ... )

Source code in src/deriva_ml/core/ermrest.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class ColumnDefinition(BaseModel):
    """Defines a column in an ERMrest table.

    Provides a Pydantic model for defining columns with their types, constraints, and metadata.
    Maps to deriva_py's Column.define functionality.

    Attributes:
        name (str): Name of the column.
        type (BuiltinTypes): ERMrest data type for the column.
        nullok (bool): Whether NULL values are allowed. Defaults to True.
        default (Any): Default value for the column.
        comment (str | None): Description of the column's purpose.
        acls (dict): Access control lists.
        acl_bindings (dict): Dynamic access control bindings.
        annotations (dict): Additional metadata annotations.

    Example:
        >>> col = ColumnDefinition(
        ...     name="score",
        ...     type=BuiltinTypes.float4,
        ...     nullok=False,
        ...     comment="Confidence score between 0 and 1"
        ... )
    """
    name: str
    type: BuiltinTypes
    nullok: bool = True
    default: Any = None
    comment: str | None = None
    acls: dict = Field(default_factory=dict)
    acl_bindings: dict = Field(default_factory=dict)
    annotations: dict = Field(default_factory=dict)

    @field_validator("type", mode="before")
    @classmethod
    def extract_type_name(cls, value: Any) -> Any:
        if isinstance(value, dict):
            return BuiltinTypes(value["typename"])
        else:
            return value

    @model_serializer()
    def serialize_column_definition(self):
        return em.Column.define(
            self.name,
            builtin_types[self.type.value],
            nullok=self.nullok,
            default=self.default,
            comment=self.comment,
            acls=self.acls,
            acl_bindings=self.acl_bindings,
            annotations=self.annotations,
        )

DerivaML

Bases: Dataset

Core class for machine learning operations on a Deriva catalog.

This class provides core functionality for managing ML workflows, features, and datasets in a Deriva catalog. It handles data versioning, feature management, vocabulary control, and execution tracking.

Attributes:

Name Type Description
host_name str

Hostname of the Deriva server (e.g., 'deriva.example.org').

catalog_id Union[str, int]

Catalog identifier or name.

domain_schema str

Schema name for domain-specific tables and relationships.

model DerivaModel

ERMRest model for the catalog.

working_dir Path

Directory for storing computation data and results.

cache_dir Path

Directory for caching downloaded datasets.

ml_schema str

Schema name for ML-specific tables (default: 'deriva_ml').

configuration ExecutionConfiguration

Current execution configuration.

project_name str

Name of the current project.

start_time datetime

Timestamp when this instance was created.

status str

Current status of operations.

Example

ml = DerivaML('deriva.example.org', 'my_catalog') ml.create_feature('my_table', 'new_feature') ml.add_term('vocabulary_table', 'new_term', description='Description of term')

Source code in src/deriva_ml/core/base.py
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
class DerivaML(Dataset):
    """Core class for machine learning operations on a Deriva catalog.

    This class provides core functionality for managing ML workflows, features, and datasets in a Deriva catalog.
    It handles data versioning, feature management, vocabulary control, and execution tracking.

    Attributes:
        host_name (str): Hostname of the Deriva server (e.g., 'deriva.example.org').
        catalog_id (Union[str, int]): Catalog identifier or name.
        domain_schema (str): Schema name for domain-specific tables and relationships.
        model (DerivaModel): ERMRest model for the catalog.
        working_dir (Path): Directory for storing computation data and results.
        cache_dir (Path): Directory for caching downloaded datasets.
        ml_schema (str): Schema name for ML-specific tables (default: 'deriva_ml').
        configuration (ExecutionConfiguration): Current execution configuration.
        project_name (str): Name of the current project.
        start_time (datetime): Timestamp when this instance was created.
        status (str): Current status of operations.

    Example:
        >>> ml = DerivaML('deriva.example.org', 'my_catalog')
        >>> ml.create_feature('my_table', 'new_feature')
        >>> ml.add_term('vocabulary_table', 'new_term', description='Description of term')
    """

    def __init__(
        self,
        hostname: str,
        catalog_id: str | int,
        domain_schema: str | None = None,
        project_name: str | None = None,
        cache_dir: str | Path | None = None,
        working_dir: str | Path | None = None,
        ml_schema: str = ML_SCHEMA,
        logging_level=logging.WARNING,
        credential=None,
        use_minid: bool = True,
    ):
        """Initializes a DerivaML instance.

        This method will connect to a catalog and initialize local configuration for the ML execution.
        This class is intended to be used as a base class on which domain-specific interfaces are built.

        Args:
            hostname: Hostname of the Deriva server.
            catalog_id: Catalog ID. Either an identifier or a catalog name.
            domain_schema: Schema name for domain-specific tables and relationships. Defaults to the name of the
                schema that is not one of the standard schemas.  If there is more than one user-defined schema, then
                this argument must be provided a value.
            ml_schema: Schema name for ML schema. Used if you have a non-standard configuration of deriva-ml.
            project_name: Project name. Defaults to name of domain schema.
            cache_dir: Directory path for caching data downloaded from the Deriva server as bdbag.
            working_dir: Directory path for storing data used by or generated by any computations.
            use_minid: Use the MINID service when downloading dataset bags.
        """
        # Get or use provided credentials for server access
        self.credential = credential or get_credential(hostname)

        # Initialize server connection and catalog access
        server = DerivaServer(
            "https",
            hostname,
            credentials=self.credential,
            session_config=self._get_session_config(),
        )
        self.catalog = server.connect_ermrest(catalog_id)
        self.model = DerivaModel(self.catalog.getCatalogModel(), domain_schema=domain_schema)

        # Set up working and cache directories
        default_workdir = self.__class__.__name__ + "_working"
        self.working_dir = (
            Path(working_dir) / getpass.getuser() if working_dir else Path.home() / "deriva-ml"
        ) / default_workdir

        self.working_dir.mkdir(parents=True, exist_ok=True)
        self.cache_dir = Path(cache_dir) if cache_dir else self.working_dir / "cache"
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Initialize dataset functionality from the parent class
        super().__init__(self.model, self.cache_dir, self.working_dir, use_minid=use_minid)

        # Set up logging
        self._logger = logging.getLogger("deriva_ml")
        self._logger.setLevel(logging_level)

        # Store instance configuration
        self.host_name = hostname
        self.catalog_id = catalog_id
        self.ml_schema = ml_schema
        self.configuration = None
        self._execution: Execution | None = None
        self.domain_schema = self.model.domain_schema
        self.project_name = project_name or self.domain_schema
        self.start_time = datetime.now()
        self.status = Status.pending.value

        # Configure logging format
        logging.basicConfig(
            level=logging_level,
            format="%(asctime)s - %(name)s.%(levelname)s - %(message)s",
        )

        # Set Deriva library logging level
        deriva_logger = logging.getLogger("deriva")
        deriva_logger.setLevel(logging_level)

    def __del__(self):
        """Cleanup method to handle incomplete executions."""
        try:
            # Mark execution as aborted if not completed
            if self._execution and self._execution.status != Status.completed:
                self._execution.update_status(Status.aborted, "Execution Aborted")
        except (AttributeError, requests.HTTPError):
            pass

    @staticmethod
    def _get_session_config():
        """Returns customized HTTP session configuration.

        Configures retry behavior and connection settings for HTTP requests to the Deriva server. Settings include:
        - Idempotent retry behavior for all HTTP methods
        - Increased retry attempts for read and connect operations
        - Exponential backoff for retries

        Returns:
            dict: Session configuration dictionary with retry and connection settings.

        Example:
            >>> config = DerivaML._get_session_config()
            >>> print(config['retry_read']) # 8
        """
        # Start with a default configuration
        session_config = DEFAULT_SESSION_CONFIG.copy()

        # Customize retry behavior for robustness
        session_config.update(
            {
                # Allow retries for all HTTP methods (PUT/POST are idempotent)
                "allow_retry_on_all_methods": True,
                # Increase retry attempts for better reliability
                "retry_read": 8,
                "retry_connect": 5,
                # Use exponential backoff for retries
                "retry_backoff_factor": 5,
            }
        )
        return session_config

    @property
    def pathBuilder(self) -> SchemaWrapper:
        """Returns catalog path builder for queries.

        The path builder provides a fluent interface for constructing complex queries against the catalog.
        This is a core component used by many other methods to interact with the catalog.

        Returns:
            datapath._CatalogWrapper: A new instance of the catalog path builder.

        Example:
            >>> path = ml.pathBuilder.schemas['my_schema'].tables['my_table']
            >>> results = path.entities().fetch()
        """
        return self.catalog.getPathBuilder()

    @property
    def domain_path(self) -> datapath.DataPath:
        """Returns path builder for domain schema.

        Provides a convenient way to access tables and construct queries within the domain-specific schema.

        Returns:
            datapath._CatalogWrapper: Path builder object scoped to the domain schema.

        Example:
            >>> domain = ml.domain_path
            >>> results = domain.my_table.entities().fetch()
        """
        return self.pathBuilder.schemas[self.domain_schema]

    def table_path(self, table: str | Table) -> Path:
        """Returns a local filesystem path for table CSV files.

        Generates a standardized path where CSV files should be placed when preparing to upload data to a table.
        The path follows the project's directory structure conventions.

        Args:
            table: Name of the table or Table object to get the path for.

        Returns:
            Path: Filesystem path where the CSV file should be placed.

        Example:
            >>> path = ml.table_path("experiment_results")
            >>> df.to_csv(path) # Save data for upload
        """
        return table_path(
            self.working_dir,
            schema=self.domain_schema,
            table=self.model.name_to_table(table).name,
        )

    def download_dir(self, cached: bool = False) -> Path:
        """Returns the appropriate download directory.

        Provides the appropriate directory path for storing downloaded files, either in the cache or working directory.

        Args:
            cached: If True, returns the cache directory path. If False, returns the working directory path.

        Returns:
            Path: Directory path where downloaded files should be stored.

        Example:
            >>> cache_dir = ml.download_dir(cached=True)
            >>> work_dir = ml.download_dir(cached=False)
        """
        # Return cache directory if cached=True, otherwise working directory
        return self.cache_dir if cached else self.working_dir

    @staticmethod
    def globus_login(host: str) -> None:
        """Authenticates with Globus for accessing Deriva services.

        Performs authentication using Globus Auth to access Deriva services. If already logged in, notifies the user.
        Uses non-interactive authentication flow without a browser or local server.

        Args:
            host: The hostname of the Deriva server to authenticate with (e.g., 'deriva.example.org').

        Example:
            >>> DerivaML.globus_login('deriva.example.org')
            'Login Successful'
        """
        gnl = GlobusNativeLogin(host=host)
        if gnl.is_logged_in([host]):
            print("You are already logged in.")
        else:
            gnl.login(
                [host],
                no_local_server=True,
                no_browser=True,
                refresh_tokens=True,
                update_bdbag_keychain=True,
            )
            print("Login Successful")

    def chaise_url(self, table: RID | Table | str) -> str:
        """Generates Chaise web interface URL.

        Chaise is Deriva's web interface for data exploration. This method creates a URL that directly links to
        the specified table or record.

        Args:
            table: Table to generate URL for (name, Table object, or RID).

        Returns:
            str: URL in format: https://{host}/chaise/recordset/#{catalog}/{schema}:{table}

        Raises:
            DerivaMLException: If table or RID cannot be found.

        Examples:
            Using table name:
                >>> ml.chaise_url("experiment_table")
                'https://deriva.org/chaise/recordset/#1/schema:experiment_table'

            Using RID:
                >>> ml.chaise_url("1-abc123")
        """
        # Get the table object and build base URI
        table_obj = self.model.name_to_table(table)
        try:
            uri = self.catalog.get_server_uri().replace("ermrest/catalog/", "chaise/recordset/#")
        except DerivaMLException:
            # Handle RID case
            uri = self.cite(cast(str, table))
        return f"{uri}/{urlquote(table_obj.schema.name)}:{urlquote(table_obj.name)}"

    def cite(self, entity: Dict[str, Any] | str) -> str:
        """Generates permanent citation URL.

        Creates a versioned URL that can be used to reference a specific entity in the catalog. The URL includes
        the catalog snapshot time to ensure version stability.

        Args:
            entity: Either a RID string or a dictionary containing entity data with a 'RID' key.

        Returns:
            str: Permanent citation URL in format: https://{host}/id/{catalog}/{rid}@{snapshot_time}

        Raises:
            DerivaMLException: If an entity doesn't exist or lacks a RID.

        Examples:
            Using a RID string:
                >>> url = ml.cite("1-abc123")
                >>> print(url)
                'https://deriva.org/id/1/1-abc123@2024-01-01T12:00:00'

            Using a dictionary:
                >>> url = ml.cite({"RID": "1-abc123"})
        """
        # Return if already a citation URL
        if isinstance(entity, str) and entity.startswith(f"https://{self.host_name}/id/{self.catalog_id}/"):
            return entity

        try:
            # Resolve RID and create citation URL with snapshot time
            self.resolve_rid(rid := entity if isinstance(entity, str) else entity["RID"])
            return f"https://{self.host_name}/id/{self.catalog_id}/{rid}@{self.catalog.latest_snapshot().snaptime}"
        except KeyError as e:
            raise DerivaMLException(f"Entity {e} does not have RID column")
        except DerivaMLException as _e:
            raise DerivaMLException("Entity RID does not exist")

    def user_list(self) -> List[Dict[str, str]]:
        """Returns catalog user list.

        Retrieves basic information about all users who have access to the catalog, including their
        identifiers and full names.

        Returns:
            List[Dict[str, str]]: List of user information dictionaries, each containing:
                - 'ID': User identifier
                - 'Full_Name': User's full name

        Examples:

            >>> users = ml.user_list()
            >>> for user in users:
            ...     print(f"{user['Full_Name']} ({user['ID']})")
        """
        # Get the user table path and fetch basic user info
        user_path = self.pathBuilder.public.ERMrest_Client.path
        return [{"ID": u["ID"], "Full_Name": u["Full_Name"]} for u in user_path.entities().fetch()]

    def resolve_rid(self, rid: RID) -> ResolveRidResult:
        """Resolves RID to catalog location.

        Looks up a RID and returns information about where it exists in the catalog, including schema,
        table, and column metadata.

        Args:
            rid: Resource Identifier to resolve.

        Returns:
            ResolveRidResult: Named tuple containing:
                - schema: Schema name
                - table: Table name
                - columns: Column definitions
                - datapath: Path builder for accessing the entity

        Raises:
            DerivaMLException: If RID doesn't exist in catalog.

        Examples:
            >>> result = ml.resolve_rid("1-abc123")
            >>> print(f"Found in {result.schema}.{result.table}")
            >>> data = result.datapath.entities().fetch()
        """
        try:
            # Attempt to resolve RID using catalog model
            return self.catalog.resolve_rid(rid, self.model.model)
        except KeyError as _e:
            raise DerivaMLException(f"Invalid RID {rid}")

    def retrieve_rid(self, rid: RID) -> dict[str, Any]:
        """Retrieves complete record for RID.

        Fetches all column values for the entity identified by the RID.

        Args:
            rid: Resource Identifier of the record to retrieve.

        Returns:
            dict[str, Any]: Dictionary containing all column values for the entity.

        Raises:
            DerivaMLException: If the RID doesn't exist in the catalog.

        Example:
            >>> record = ml.retrieve_rid("1-abc123")
            >>> print(f"Name: {record['name']}, Created: {record['creation_date']}")
        """
        # Resolve RID and fetch the first (only) matching record
        return self.resolve_rid(rid).datapath.entities().fetch()[0]

    def add_page(self, title: str, content: str) -> None:
        """Adds page to web interface.

        Creates a new page in the catalog's web interface with the specified title and content. The page will be
        accessible through the catalog's navigation system.

        Args:
            title: The title of the page to be displayed in navigation and headers.
            content: The main content of the page can include HTML markup.

        Raises:
            DerivaMLException: If the page creation fails or the user lacks necessary permissions.

        Example:
            >>> ml.add_page(
            ...     title="Analysis Results",
            ...     content="<h1>Results</h1><p>Analysis completed successfully...</p>"
            ... )
        """
        # Insert page into www tables with title and content
        self.pathBuilder.www.tables[self.domain_schema].insert([{"Title": title, "Content": content}])

    def create_vocabulary(self, vocab_name: str, comment: str = "", schema: str | None = None) -> Table:
        """Creates a controlled vocabulary table.

        A controlled vocabulary table maintains a list of standardized terms and their definitions. Each term can have
        synonyms and descriptions to ensure consistent terminology usage across the dataset.

        Args:
            vocab_name: Name for the new vocabulary table. Must be a valid SQL identifier.
            comment: Description of the vocabulary's purpose and usage. Defaults to empty string.
            schema: Schema name to create the table in. If None, uses domain_schema.

        Returns:
            Table: ERMRest table object representing the newly created vocabulary table.

        Raises:
            DerivaMLException: If vocab_name is invalid or already exists.

        Examples:
            Create a vocabulary for tissue types:

                >>> table = ml.create_vocabulary(
                ...     vocab_name="tissue_types",
                ...     comment="Standard tissue classifications",
                ...     schema="bio_schema"
                ... )
        """
        # Use domain schema if none specified
        schema = schema or self.domain_schema

        # Create and return vocabulary table with RID-based URI pattern
        try:
            vocab_table = self.model.schemas[schema].create_table(
                Table.define_vocabulary(vocab_name, f"{self.project_name}:{{RID}}", comment=comment)
            )
        except ValueError:
            raise DerivaMLException(f"Table {vocab_name} already exist")
        return vocab_table

    def create_table(self, table: TableDefinition) -> Table:
        """Creates a new table in the catalog.

        Creates a table using the provided TableDefinition object, which specifies the table structure including
        columns, keys, and foreign key relationships.

        Args:
            table: A TableDefinition object containing the complete specification of the table to create.

        Returns:
            Table: The newly created ERMRest table object.

        Raises:
            DerivaMLException: If table creation fails or the definition is invalid.

        Example:

            >>> table_def = TableDefinition(
            ...     name="experiments",
            ...     column_definitions=[
            ...         ColumnDefinition(name="name", type=BuiltinTypes.text),
            ...         ColumnDefinition(name="date", type=BuiltinTypes.date)
            ...     ]
            ... )
            >>> new_table = ml.create_table(table_def)
        """
        # Create table in domain schema using provided definition
        return self.model.schemas[self.domain_schema].create_table(table.model_dump())

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def create_asset(
        self,
        asset_name: str,
        column_defs: Iterable[ColumnDefinition] | None = None,
        fkey_defs: Iterable[ColumnDefinition] | None = None,
        referenced_tables: Iterable[Table] | None = None,
        comment: str = "",
        schema: str | None = None,
    ) -> Table:
        """Creates an asset table.

        Args:
            asset_name: Name of the asset table.
            column_defs: Iterable of ColumnDefinition objects to provide additional metadata for asset.
            fkey_defs: Iterable of ForeignKeyDefinition objects to provide additional metadata for asset.
            referenced_tables: Iterable of Table objects to which asset should provide foreign-key references to.
            comment: Description of the asset table. (Default value = '')
            schema: Schema in which to create the asset table.  Defaults to domain_schema.

        Returns:
            Table object for the asset table.
        """
        # Initialize empty collections if None provided
        column_defs = column_defs or []
        fkey_defs = fkey_defs or []
        referenced_tables = referenced_tables or []
        schema = schema or self.domain_schema

        # Add an asset type to vocabulary
        self.add_term(MLVocab.asset_type, asset_name, description=f"A {asset_name} asset")

        # Create the main asset table
        asset_table = self.model.schemas[schema].create_table(
            Table.define_asset(
                schema,
                asset_name,
                column_defs=[c.model_dump() for c in column_defs],
                fkey_defs=[fk.model_dump() for fk in fkey_defs],
                comment=comment,
            )
        )

        # Create an association table between asset and asset type
        self.model.schemas[self.domain_schema].create_table(
            Table.define_association(
                [
                    (asset_table.name, asset_table),
                    ("Asset_Type", self.model.name_to_table("Asset_Type")),
                ]
            )
        )

        # Create references to other tables if specified
        for t in referenced_tables:
            asset_table.create_reference(self.model.name_to_table(t))

        # Create an association table for tracking execution
        atable = self.model.schemas[self.domain_schema].create_table(
            Table.define_association(
                [
                    (asset_name, asset_table),
                    (
                        "Execution",
                        self.model.schemas[self.ml_schema].tables["Execution"],
                    ),
                ]
            )
        )
        atable.create_reference(self.model.name_to_table("Asset_Role"))

        # Add asset annotations
        asset_annotation(asset_table)
        return asset_table

    def list_assets(self, asset_table: Table | str) -> list[dict[str, Any]]:
        """Lists contents of an asset table.

        Returns a list of assets with their types for the specified asset table.

        Args:
            asset_table: Table or name of the asset table to list assets for.

        Returns:
            list[dict[str, Any]]: List of asset records, each containing:
                - RID: Resource identifier
                - Type: Asset type
                - Metadata: Asset metadata

        Raises:
            DerivaMLException: If the table is not an asset table or doesn't exist.

        Example:
            >>> assets = ml.list_assets("tissue_types")
            >>> for asset in assets:
            ...     print(f"{asset['RID']}: {asset['Type']}")
        """
        # Validate and get asset table reference
        asset_table = self.model.name_to_table(asset_table)
        if not self.model.is_asset(asset_table):
            raise DerivaMLException(f"Table {asset_table.name} is not an asset")

        # Get path builders for asset and type tables
        pb = self._model.catalog.getPathBuilder()
        asset_path = pb.schemas[asset_table.schema.name].tables[asset_table.name]
        (
            asset_type_table,
            _,
            _,
        ) = self._model.find_association(asset_table, MLVocab.asset_type)
        type_path = pb.schemas[asset_type_table.schema.name].tables[asset_type_table.name]

        # Build a list of assets with their types
        assets = []
        for asset in asset_path.entities().fetch():
            # Get associated asset types for each asset
            asset_types = (
                type_path.filter(type_path.columns[asset_table.name] == asset["RID"])
                .attributes(type_path.Asset_Type)
                .fetch()
            )
            # Combine asset data with its types
            assets.append(
                asset | {MLVocab.asset_type.value: [asset_type[MLVocab.asset_type.value] for asset_type in asset_types]}
            )
        return assets

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def create_feature(
        self,
        target_table: Table | str,
        feature_name: str,
        terms: list[Table | str] | None = None,
        assets: list[Table | str] | None = None,
        metadata: list[ColumnDefinition | Table | Key | str] | None = None,
        optional: list[str] | None = None,
        comment: str = "",
    ) -> type[FeatureRecord]:
        """Creates a new feature definition.

        A feature represents a measurable property or characteristic that can be associated with records in the target
        table. Features can include vocabulary terms, asset references, and additional metadata.

        Args:
            target_table: Table to associate the feature with (name or Table object).
            feature_name: Unique name for the feature within the target table.
            terms: Optional vocabulary tables/names whose terms can be used as feature values.
            assets: Optional asset tables/names that can be referenced by this feature.
            metadata: Optional columns, tables, or keys to include in a feature definition.
            optional: Column names that are not required when creating feature instances.
            comment: Description of the feature's purpose and usage.

        Returns:
            type[FeatureRecord]: Feature class for creating validated instances.

        Raises:
            DerivaMLException: If a feature definition is invalid or conflicts with existing features.

        Examples:
            Create a feature with confidence score:
                >>> feature_class = ml.create_feature(
                ...     target_table="samples",
                ...     feature_name="expression_level",
                ...     terms=["expression_values"],
                ...     metadata=[ColumnDefinition(name="confidence", type=BuiltinTypes.float4)],
                ...     comment="Gene expression measurement"
                ... )
        """
        # Initialize empty collections if None provided
        terms = terms or []
        assets = assets or []
        metadata = metadata or []
        optional = optional or []

        def normalize_metadata(m: Key | Table | ColumnDefinition | str):
            """Helper function to normalize metadata references."""
            if isinstance(m, str):
                return self.model.name_to_table(m)
            elif isinstance(m, ColumnDefinition):
                return m.model_dump()
            else:
                return m

        # Validate asset and term tables
        if not all(map(self.model.is_asset, assets)):
            raise DerivaMLException("Invalid create_feature asset table.")
        if not all(map(self.model.is_vocabulary, terms)):
            raise DerivaMLException("Invalid create_feature asset table.")

        # Get references to required tables
        target_table = self.model.name_to_table(target_table)
        execution = self.model.schemas[self.ml_schema].tables["Execution"]
        feature_name_table = self.model.schemas[self.ml_schema].tables["Feature_Name"]

        # Add feature name to vocabulary
        feature_name_term = self.add_term("Feature_Name", feature_name, description=comment)
        atable_name = f"Execution_{target_table.name}_{feature_name_term.name}"
        # Create an association table implementing the feature
        atable = self.model.schemas[self.domain_schema].create_table(
            target_table.define_association(
                table_name=atable_name,
                associates=[execution, target_table, feature_name_table],
                metadata=[normalize_metadata(m) for m in chain(assets, terms, metadata)],
                comment=comment,
            )
        )
        # Configure optional columns and default feature name
        for c in optional:
            atable.columns[c].alter(nullok=True)
        atable.columns["Feature_Name"].alter(default=feature_name_term.name)

        # Return feature record class for creating instances
        return self.feature_record_class(target_table, feature_name)

    def feature_record_class(self, table: str | Table, feature_name: str) -> type[FeatureRecord]:
        """Returns a pydantic model class for feature records.

        Creates a typed interface for creating new instances of the specified feature. The returned class includes
        validation and type checking based on the feature's definition.

        Args:
            table: The table containing the feature, either as name or Table object.
            feature_name: Name of the feature to create a record class for.

        Returns:
            type[FeatureRecord]: A pydantic model class for creating validated feature records.

        Raises:
            DerivaMLException: If the feature doesn't exist or the table is invalid.

        Example:
            >>> ExpressionFeature = ml.feature_record_class("samples", "expression_level")
            >>> feature = ExpressionFeature(value="high", confidence=0.95)
        """
        # Look up a feature and return its record class
        return self.lookup_feature(table, feature_name).feature_record_class()

    def delete_feature(self, table: Table | str, feature_name: str) -> bool:
        """Removes a feature definition and its data.

        Deletes the feature and its implementation table from the catalog. This operation cannot be undone and
        will remove all feature values associated with this feature.

        Args:
            table: The table containing the feature, either as name or Table object.
            feature_name: Name of the feature to delete.

        Returns:
            bool: True if the feature was successfully deleted, False if it didn't exist.

        Raises:
            DerivaMLException: If deletion fails due to constraints or permissions.

        Example:
            >>> success = ml.delete_feature("samples", "obsolete_feature")
            >>> print("Deleted" if success else "Not found")
        """
        # Get table reference and find feature
        table = self.model.name_to_table(table)
        try:
            # Find and delete the feature's implementation table
            feature = next(f for f in self.model.find_features(table) if f.feature_name == feature_name)
            feature.feature_table.drop()
            return True
        except StopIteration:
            return False

    def lookup_feature(self, table: str | Table, feature_name: str) -> Feature:
        """Retrieves a Feature object.

        Looks up and returns a Feature object that provides an interface to work with an existing feature
        definition in the catalog.

        Args:
            table: The table containing the feature, either as name or Table object.
            feature_name: Name of the feature to look up.

        Returns:
            Feature: An object representing the feature and its implementation.

        Raises:
            DerivaMLException: If the feature doesn't exist in the specified table.

        Example:
            >>> feature = ml.lookup_feature("samples", "expression_level")
            >>> print(feature.feature_name)
            'expression_level'
        """
        return self.model.lookup_feature(table, feature_name)

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def list_feature_values(self, table: Table | str, feature_name: str) -> datapath._ResultSet:
        """Retrieves all values for a feature.

        Returns all instances of the specified feature that have been created, including their associated
        metadata and references.

        Args:
            table: The table containing the feature, either as name or Table object.
            feature_name: Name of the feature to retrieve values for.

        Returns:
            datapath._ResultSet: A result set containing all feature values and their metadata.

        Raises:
            DerivaMLException: If the feature doesn't exist or cannot be accessed.

        Example:
            >>> values = ml.list_feature_values("samples", "expression_level")
            >>> for value in values:
            ...     print(f"Sample {value['RID']}: {value['value']}")
        """
        # Get table and feature references
        table = self.model.name_to_table(table)
        feature = self.lookup_feature(table, feature_name)

        # Build and execute query for feature values
        pb = self.catalog.getPathBuilder()
        return pb.schemas[feature.feature_table.schema.name].tables[feature.feature_table.name].entities().fetch()

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def add_term(
        self,
        table: str | Table,
        term_name: str,
        description: str,
        synonyms: list[str] | None = None,
        exists_ok: bool = True,
    ) -> VocabularyTerm:
        """Adds a term to a vocabulary table.

        Creates a new standardized term with description and optional synonyms in a vocabulary table.
        Can either create a new term or return an existing one if it already exists.

        Args:
            table: Vocabulary table to add term to (name or Table object).
            term_name: Primary name of the term (must be unique within vocabulary).
            description: Explanation of term's meaning and usage.
            synonyms: Alternative names for the term.
            exists_ok: If True, return the existing term if found. If False, raise error.

        Returns:
            VocabularyTerm: Object representing the created or existing term.

        Raises:
            DerivaMLException: If a term exists and exists_ok=False, or if the table is not a vocabulary table.

        Examples:
            Add a new tissue type:
                >>> term = ml.add_term(
                ...     table="tissue_types",
                ...     term_name="epithelial",
                ...     description="Epithelial tissue type",
                ...     synonyms=["epithelium"]
                ... )

            Attempt to add an existing term:
                >>> term = ml.add_term("tissue_types", "epithelial", "...", exists_ok=True)
        """
        # Initialize an empty synonyms list if None
        synonyms = synonyms or []

        # Get table reference and validate if it is a vocabulary table
        table = self.model.name_to_table(table)
        pb = self.catalog.getPathBuilder()
        if not (self.model.is_vocabulary(table)):
            raise DerivaMLTableTypeError("vocabulary", table.name)

        # Get schema and table names for path building
        schema_name = table.schema.name
        table_name = table.name

        try:
            # Attempt to insert a new term
            term_id = VocabularyTerm.model_validate(
                pb.schemas[schema_name]
                .tables[table_name]
                .insert(
                    [
                        {
                            "Name": term_name,
                            "Description": description,
                            "Synonyms": synonyms,
                        }
                    ],
                    defaults={"ID", "URI"},
                )[0]
            )
        except DataPathException:
            # Term exists - look it up or raise an error
            term_id = self.lookup_term(table, term_name)
            if not exists_ok:
                raise DerivaMLInvalidTerm(table.name, term_name, msg="term already exists")
        return term_id

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def lookup_term(self, table: str | Table, term_name: str) -> VocabularyTerm:
        """Finds a term in a vocabulary table.

        Searches for a term in the specified vocabulary table, matching either the primary name
        or any of its synonyms.

        Args:
            table: Vocabulary table to search in (name or Table object).
            term_name: Name or synonym of the term to find.

        Returns:
            VocabularyTerm: The matching vocabulary term.

        Raises:
            DerivaMLVocabularyException: If the table is not a vocabulary table, or term is not found.

        Examples:
            Look up by primary name:
                >>> term = ml.lookup_term("tissue_types", "epithelial")
                >>> print(term.description)

            Look up by synonym:
                >>> term = ml.lookup_term("tissue_types", "epithelium")
        """
        # Get and validate vocabulary table reference
        vocab_table = self.model.name_to_table(table)
        if not self.model.is_vocabulary(vocab_table):
            raise DerivaMLException(f"The table {table} is not a controlled vocabulary")

        # Get schema and table paths
        schema_name, table_name = vocab_table.schema.name, vocab_table.name
        schema_path = self.catalog.getPathBuilder().schemas[schema_name]

        # Search for term by name or synonym
        for term in schema_path.tables[table_name].entities().fetch():
            if term_name == term["Name"] or (term["Synonyms"] and term_name in term["Synonyms"]):
                return VocabularyTerm.model_validate(term)

        # Term not found
        raise DerivaMLInvalidTerm(table_name, term_name)

    def list_vocabulary_terms(self, table: str | Table) -> list[VocabularyTerm]:
        """Lists all terms in a vocabulary table.

        Retrieves all terms, their descriptions, and synonyms from a controlled vocabulary table.

        Args:
            table: Vocabulary table to list terms from (name or Table object).

        Returns:
            list[VocabularyTerm]: List of vocabulary terms with their metadata.

        Raises:
            DerivaMLException: If table doesn't exist or is not a vocabulary table.

        Examples:
            >>> terms = ml.list_vocabulary_terms("tissue_types")
            >>> for term in terms:
            ...     print(f"{term.name}: {term.description}")
            ...     if term.synonyms:
            ...         print(f"  Synonyms: {', '.join(term.synonyms)}")
        """
        # Get path builder and table reference
        pb = self.catalog.getPathBuilder()
        table = self.model.name_to_table(table.value if isinstance(table, MLVocab) else table)

        # Validate table is a vocabulary table
        if not (self.model.is_vocabulary(table)):
            raise DerivaMLException(f"The table {table} is not a controlled vocabulary")

        # Fetch and convert all terms to VocabularyTerm objects
        return [VocabularyTerm(**v) for v in pb.schemas[table.schema.name].tables[table.name].entities().fetch()]

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def download_dataset_bag(
        self,
        dataset: DatasetSpec,
        execution_rid: RID | None = None,
    ) -> DatasetBag:
        """Downloads a dataset to the local filesystem and creates a MINID if needed.

        Downloads a dataset specified by DatasetSpec to the local filesystem. If the dataset doesn't have
        a MINID (Minimal Viable Identifier), one will be created. The dataset can optionally be associated
        with an execution record.

        Args:
            dataset: Specification of the dataset to download, including version and materialization options.
            execution_rid: Optional execution RID to associate the download with.

        Returns:
            DatasetBag: Object containing:
                - path: Local filesystem path to downloaded dataset
                - rid: Dataset's Resource Identifier
                - minid: Dataset's Minimal Viable Identifier

        Examples:
            Download with default options:
                >>> spec = DatasetSpec(rid="1-abc123")
                >>> bag = ml.download_dataset_bag(dataset=spec)
                >>> print(f"Downloaded to {bag.path}")

            Download with execution tracking:
                >>> bag = ml.download_dataset_bag(
                ...     dataset=DatasetSpec(rid="1-abc123", materialize=True),
                ...     execution_rid="1-xyz789"
                ... )
        """
        if not self._is_dataset_rid(dataset.rid):
            raise DerivaMLTableTypeError("Dataset", dataset.rid)
        return self._download_dataset_bag(
            dataset=dataset,
            execution_rid=execution_rid,
            snapshot_catalog=DerivaML(self.host_name, self._version_snapshot(dataset)),
        )

    def _update_status(self, new_status: Status, status_detail: str, execution_rid: RID):
        """Update the status of an execution in the catalog.

        Args:
            new_status: New status.
            status_detail: Details of the status.
            execution_rid: Resource Identifier (RID) of the execution.
            new_status: Status:
            status_detail: str:
             execution_rid: RID:

        Returns:

        """
        self.status = new_status.value
        self.pathBuilder.schemas[self.ml_schema].Execution.update(
            [
                {
                    "RID": execution_rid,
                    "Status": self.status,
                    "Status_Detail": status_detail,
                }
            ]
        )

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def add_files(
        self,
        files: Iterable[FileSpec],
        dataset_types: str | list[str] | None = None,
        description: str = "",
        execution_rid: RID | None = None,
    ) -> RID:
        """Adds files to the catalog with their metadata.

        Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with
        specified file types. Optionally links files to an execution record.

        Args:
            files: File specifications containing MD5 checksum, length, and URL.
            dataset_types: One or more dataset type terms from File_Type vocabulary.
            description: Description of the files.
            execution_rid: Optional execution RID to associate files with.

        Returns:
            RID: Resource of dataset that represents the newly added files.

        Raises:
            DerivaMLException: If file_types are invalid or execution_rid is not an execution record.

        Examples:
            Add a single file type:
                >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)]
                >>> rids = ml.add_files(files, file_types="text")

            Add multiple file types:
                >>> rids = ml.add_files(
                ...     files=[FileSpec(url="image.png", md5="def456", length=2000)],
                ...     file_types=["image", "png"],
                ...     execution_rid="1-xyz789"
                ... )
        """
        if execution_rid and self.resolve_rid(execution_rid).table.name != "Execution":
            raise DerivaMLTableTypeError("Execution", execution_rid)

        filespec_list = list(files)

        # Get a list of all defined file types and their synonyms.
        defined_types = set(
            chain.from_iterable([[t.name] + t.synonyms for t in self.list_vocabulary_terms(MLVocab.asset_type)])
        )

        # Get a list of al of the file types used in the filespec_list
        spec_types = set(chain.from_iterable(filespec.file_types for filespec in filespec_list))

        # Now make sure that all of the file types and dataset_types in the spec list are defined.
        if spec_types - defined_types:
            raise DerivaMLInvalidTerm(MLVocab.asset_type.name, f"{spec_types - defined_types}")

        # Normalize dataset_types, make sure FIle type is included.
        if isinstance(dataset_types, list):
            dataset_types = ["File"] + dataset_types if "File" not in dataset_types else dataset_types
        else:
            dataset_types = ["File", dataset_types] if dataset_types else ["File"]
        for ds_type in dataset_types:
            self.lookup_term(MLVocab.dataset_type, ds_type)

        # Add files to the file table, and collect up the resulting entries by directory name.
        pb = self._model.catalog.getPathBuilder()
        file_records = list(
            pb.schemas[self.ml_schema].tables["File"].insert([f.model_dump(by_alias=True) for f in filespec_list])
        )

        # Get the name of the association table between file_table and file_type and add file_type records
        atable = self.model.find_association(MLTable.file, MLVocab.asset_type)[0].name
        # Need to get a link between file record and file_types.
        type_map = {
            file_spec.md5: file_spec.file_types + ([] if "File" in file_spec.file_types else [])
            for file_spec in filespec_list
        }
        file_type_records = [
            {MLVocab.asset_type.value: file_type, "File": file_record["RID"]}
            for file_record in file_records
            for file_type in type_map[file_record["MD5"]]
        ]
        pb.schemas[self._ml_schema].tables[atable].insert(file_type_records)

        if execution_rid:
            # Get the name of the association table between file_table and execution.
            pb.schemas[self._ml_schema].File_Execution.insert(
                [
                    {"File": file_record["RID"], "Execution": execution_rid, "Asset_Role": "Output"}
                    for file_record in file_records
                ]
            )

        # Now create datasets to capture the original directory structure of the files.
        dir_rid_map = defaultdict(list)
        for e in file_records:
            dir_rid_map[Path(urlsplit(e["URL"]).path).parent].append(e["RID"])

        nested_datasets = []
        path_length = 0
        dataset = None
        # Start with the longest path so we get subdirectories first.
        for p, rids in sorted(dir_rid_map.items(), key=lambda kv: len(kv[0].parts), reverse=True):
            dataset = self.create_dataset(
                dataset_types=dataset_types, execution_rid=execution_rid, description=description
            )
            members = rids
            if len(p.parts) < path_length:
                # Going up one level in directory, so Create nested dataset
                members = nested_datasets + rids
                nested_datasets = []
            self.add_dataset_members(dataset_rid=dataset, members=members, execution_rid=execution_rid)
            nested_datasets.append(dataset)
            path_length = len(p.parts)

        return dataset

    def list_files(self, file_types: list[str] | None = None) -> list[dict[str, Any]]:
        """Lists files in the catalog with their metadata.

        Returns a list of files with their metadata including URL, MD5 hash, length, description,
        and associated file types. Files can be optionally filtered by type.

        Args:
            file_types: Filter results to only include these file types.

        Returns:
            list[dict[str, Any]]: List of file records, each containing:
                - RID: Resource identifier
                - URL: File location
                - MD5: File hash
                - Length: File size
                - Description: File description
                - File_Types: List of associated file types

        Examples:
            List all files:
                >>> files = ml.list_files()
                >>> for f in files:
                ...     print(f"{f['RID']}: {f['URL']}")

            Filter by file type:
                >>> image_files = ml.list_files(["image", "png"])
        """

        asset_type_atable, file_fk, asset_type_fk = self.model.find_association("File", "Asset_Type")
        ml_path = self.pathBuilder.schemas[self._ml_schema]
        file = ml_path.File
        asset_type = ml_path.tables[asset_type_atable.name]

        path = file.path
        path = path.link(asset_type.alias("AT"), on=file.RID == asset_type.columns[file_fk], join_type="left")
        if file_types:
            path = path.filter(asset_type.columns[asset_type_fk] == datapath.Any(*file_types))
        path = path.attributes(
            path.File.RID,
            path.File.URL,
            path.File.MD5,
            path.File.Length,
            path.File.Description,
            path.AT.columns[asset_type_fk],
        )

        file_map = {}
        for f in path.fetch():
            entry = file_map.setdefault(f["RID"], {**f, "File_Types": []})
            if ft := f.get("Asset_Type"):  # assign-and-test in one go
                entry["File_Types"].append(ft)

        # Now get rid of the File_Type key and return the result
        return [(f, f.pop("Asset_Type"))[0] for f in file_map.values()]

    def list_workflows(self) -> list[Workflow]:
        """Lists all workflows in the catalog.

        Retrieves all workflow definitions, including their names, URLs, types, versions,
        and descriptions.

        Returns:
            list[Workflow]: List of workflow objects, each containing:
                - name: Workflow name
                - url: Source code URL
                - workflow_type: Type of workflow
                - version: Version identifier
                - description: Workflow description
                - rid: Resource identifier
                - checksum: Source code checksum

        Examples:
            >>> workflows = ml.list_workflows()
            >>> for w in workflows:
                    print(f"{w.name} (v{w.version}): {w.description}")
                    print(f"  Source: {w.url}")
        """
        # Get a workflow table path and fetch all workflows
        workflow_path = self.pathBuilder.schemas[self.ml_schema].Workflow
        return [
            Workflow(
                name=w["Name"],
                url=w["URL"],
                workflow_type=w["Workflow_Type"],
                version=w["Version"],
                description=w["Description"],
                rid=w["RID"],
                checksum=w["Checksum"],
            )
            for w in workflow_path.entities().fetch()
        ]

    def add_workflow(self, workflow: Workflow) -> RID:
        """Adds a workflow to the catalog.

        Registers a new workflow in the catalog or returns the RID of an existing workflow with the same
        URL or checksum.

        Each workflow represents a specific computational process or analysis pipeline.

        Args:
            workflow: Workflow object containing name, URL, type, version, and description.

        Returns:
            RID: Resource Identifier of the added or existing workflow.

        Raises:
            DerivaMLException: If workflow insertion fails or required fields are missing.

        Examples:
            >>> workflow = Workflow(
            ...     name="Gene Analysis",
            ...     url="https://github.com/org/repo/workflows/gene_analysis.py",
            ...     workflow_type="python_script",
            ...     version="1.0.0",
            ...     description="Analyzes gene expression patterns"
            ... )
            >>> workflow_rid = ml.add_workflow(workflow)
        """
        # Check if a workflow already exists by URL
        if workflow_rid := self.lookup_workflow(workflow.checksum or workflow.url):
            return workflow_rid

        # Get an ML schema path for the workflow table
        ml_schema_path = self.pathBuilder.schemas[self.ml_schema]

        try:
            # Create a workflow record
            workflow_record = {
                "URL": workflow.url,
                "Name": workflow.name,
                "Description": workflow.description,
                "Checksum": workflow.checksum,
                "Version": workflow.version,
                MLVocab.workflow_type: self.lookup_term(MLVocab.workflow_type, workflow.workflow_type).name,
            }
            # Insert a workflow and get its RID
            workflow_rid = ml_schema_path.Workflow.insert([workflow_record])[0]["RID"]
        except Exception as e:
            error = format_exception(e)
            raise DerivaMLException(f"Failed to insert workflow. Error: {error}")
        return workflow_rid

    def lookup_workflow(self, url_or_checksum: str) -> RID | None:
        """Finds a workflow by URL.

        Args:
            url_or_checksum: URL or checksum of the workflow.
        Returns:
            RID: Resource Identifier of the workflow if found, None otherwise.

        Example:
            >>> rid = ml.lookup_workflow("https://github.com/org/repo/workflow.py")
            >>> if rid:
            ...     print(f"Found workflow: {rid}")
        """
        # Get a workflow table path
        workflow_path = self.pathBuilder.schemas[self.ml_schema].Workflow
        try:
            # Search for workflow by URL
            url_column = workflow_path.URL
            checksum_column = workflow_path.Checksum
            return list(
                workflow_path.path.filter(
                    (url_column == url_or_checksum) | (checksum_column == url_or_checksum)
                ).entities()
            )[0]["RID"]
        except IndexError:
            return None

    def create_workflow(self, name: str, workflow_type: str, description: str = "") -> Workflow:
        """Creates a new workflow definition.

        Creates a Workflow object that represents a computational process or analysis pipeline. The workflow type
        must be a term from the controlled vocabulary. This method is typically used to define new analysis
        workflows before execution.

        Args:
            name: Name of the workflow.
            workflow_type: Type of workflow (must exist in workflow_type vocabulary).
            description: Description of what the workflow does.

        Returns:
            Workflow: New workflow object ready for registration.

        Raises:
            DerivaMLException: If workflow_type is not in the vocabulary.

        Examples:
            >>> workflow = ml.create_workflow(
            ...     name="RNA Analysis",
            ...     workflow_type="python_notebook",
            ...     description="RNA sequence analysis pipeline"
            ... )
            >>> rid = ml.add_workflow(workflow)
        """
        # Validate workflow type exists in vocabulary
        self.lookup_term(MLVocab.workflow_type, workflow_type)

        # Create and return a new workflow object
        return Workflow(name=name, workflow_type=workflow_type, description=description)

    def create_execution(self, configuration: ExecutionConfiguration, dry_run: bool = False) -> "Execution":
        """Creates an execution environment.

        Given an execution configuration, initialize the local compute environment to prepare for executing an
        ML or analytic routine.  This routine has a number of side effects.

        1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is
        not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

        2. If any execution assets are provided in the configuration, they are downloaded
        and placed in the working directory.


        Args:
            configuration: ExecutionConfiguration:
            dry_run: Do not create an execution record or upload results.

        Returns:
            An execution object.
        """
        # Import here to avoid circular dependency
        from deriva_ml.execution.execution import Execution

        # Create and store an execution instance
        self._execution = Execution(configuration, self, dry_run=dry_run)
        return self._execution

    def restore_execution(self, execution_rid: RID | None = None) -> Execution:
        """Restores a previous execution.

        Given an execution RID, retrieves the execution configuration and restores the local compute environment.
        This routine has a number of side effects.

        1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is
        not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

        2. If any execution assets are provided in the configuration, they are downloaded and placed
        in the working directory.

        Args:
            execution_rid: Resource Identifier (RID) of the execution to restore.

        Returns:
            Execution: An execution object representing the restored execution environment.

        Raises:
            DerivaMLException: If execution_rid is not valid or execution cannot be restored.

        Example:
            >>> execution = ml.restore_execution("1-abc123")
        """
        # Import here to avoid circular dependency
        from deriva_ml.execution.execution import Execution

        # If no RID provided, try to find single execution in working directory
        if not execution_rid:
            e_rids = execution_rids(self.working_dir)
            if len(e_rids) != 1:
                raise DerivaMLException(f"Multiple execution RIDs were found {e_rids}.")
            execution_rid = e_rids[0]

        # Try to load configuration from a file
        cfile = asset_file_path(
            prefix=self.working_dir,
            exec_rid=execution_rid,
            file_name="configuration.json",
            asset_table=self.model.name_to_table("Execution_Metadata"),
            metadata={},
        )

        # Load configuration from a file or create from an execution record
        if cfile.exists():
            configuration = ExecutionConfiguration.load_configuration(cfile)
        else:
            execution = self.retrieve_rid(execution_rid)
            configuration = ExecutionConfiguration(
                workflow=execution["Workflow"],
                description=execution["Description"],
            )

        # Create and return an execution instance
        return Execution(configuration, self, reload=execution_rid)

domain_path property

domain_path: DataPath

Returns path builder for domain schema.

Provides a convenient way to access tables and construct queries within the domain-specific schema.

Returns:

Type Description
DataPath

datapath._CatalogWrapper: Path builder object scoped to the domain schema.

Example

domain = ml.domain_path results = domain.my_table.entities().fetch()

pathBuilder property

pathBuilder: _SchemaWrapper

Returns catalog path builder for queries.

The path builder provides a fluent interface for constructing complex queries against the catalog. This is a core component used by many other methods to interact with the catalog.

Returns:

Type Description
_SchemaWrapper

datapath._CatalogWrapper: A new instance of the catalog path builder.

Example

path = ml.pathBuilder.schemas['my_schema'].tables['my_table'] results = path.entities().fetch()

__del__

__del__()

Cleanup method to handle incomplete executions.

Source code in src/deriva_ml/core/base.py
192
193
194
195
196
197
198
199
def __del__(self):
    """Cleanup method to handle incomplete executions."""
    try:
        # Mark execution as aborted if not completed
        if self._execution and self._execution.status != Status.completed:
            self._execution.update_status(Status.aborted, "Execution Aborted")
    except (AttributeError, requests.HTTPError):
        pass

__init__

__init__(
    hostname: str,
    catalog_id: str | int,
    domain_schema: str | None = None,
    project_name: str | None = None,
    cache_dir: str | Path | None = None,
    working_dir: str
    | Path
    | None = None,
    ml_schema: str = ML_SCHEMA,
    logging_level=logging.WARNING,
    credential=None,
    use_minid: bool = True,
)

Initializes a DerivaML instance.

This method will connect to a catalog and initialize local configuration for the ML execution. This class is intended to be used as a base class on which domain-specific interfaces are built.

Parameters:

Name Type Description Default
hostname str

Hostname of the Deriva server.

required
catalog_id str | int

Catalog ID. Either an identifier or a catalog name.

required
domain_schema str | None

Schema name for domain-specific tables and relationships. Defaults to the name of the schema that is not one of the standard schemas. If there is more than one user-defined schema, then this argument must be provided a value.

None
ml_schema str

Schema name for ML schema. Used if you have a non-standard configuration of deriva-ml.

ML_SCHEMA
project_name str | None

Project name. Defaults to name of domain schema.

None
cache_dir str | Path | None

Directory path for caching data downloaded from the Deriva server as bdbag.

None
working_dir str | Path | None

Directory path for storing data used by or generated by any computations.

None
use_minid bool

Use the MINID service when downloading dataset bags.

True
Source code in src/deriva_ml/core/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __init__(
    self,
    hostname: str,
    catalog_id: str | int,
    domain_schema: str | None = None,
    project_name: str | None = None,
    cache_dir: str | Path | None = None,
    working_dir: str | Path | None = None,
    ml_schema: str = ML_SCHEMA,
    logging_level=logging.WARNING,
    credential=None,
    use_minid: bool = True,
):
    """Initializes a DerivaML instance.

    This method will connect to a catalog and initialize local configuration for the ML execution.
    This class is intended to be used as a base class on which domain-specific interfaces are built.

    Args:
        hostname: Hostname of the Deriva server.
        catalog_id: Catalog ID. Either an identifier or a catalog name.
        domain_schema: Schema name for domain-specific tables and relationships. Defaults to the name of the
            schema that is not one of the standard schemas.  If there is more than one user-defined schema, then
            this argument must be provided a value.
        ml_schema: Schema name for ML schema. Used if you have a non-standard configuration of deriva-ml.
        project_name: Project name. Defaults to name of domain schema.
        cache_dir: Directory path for caching data downloaded from the Deriva server as bdbag.
        working_dir: Directory path for storing data used by or generated by any computations.
        use_minid: Use the MINID service when downloading dataset bags.
    """
    # Get or use provided credentials for server access
    self.credential = credential or get_credential(hostname)

    # Initialize server connection and catalog access
    server = DerivaServer(
        "https",
        hostname,
        credentials=self.credential,
        session_config=self._get_session_config(),
    )
    self.catalog = server.connect_ermrest(catalog_id)
    self.model = DerivaModel(self.catalog.getCatalogModel(), domain_schema=domain_schema)

    # Set up working and cache directories
    default_workdir = self.__class__.__name__ + "_working"
    self.working_dir = (
        Path(working_dir) / getpass.getuser() if working_dir else Path.home() / "deriva-ml"
    ) / default_workdir

    self.working_dir.mkdir(parents=True, exist_ok=True)
    self.cache_dir = Path(cache_dir) if cache_dir else self.working_dir / "cache"
    self.cache_dir.mkdir(parents=True, exist_ok=True)

    # Initialize dataset functionality from the parent class
    super().__init__(self.model, self.cache_dir, self.working_dir, use_minid=use_minid)

    # Set up logging
    self._logger = logging.getLogger("deriva_ml")
    self._logger.setLevel(logging_level)

    # Store instance configuration
    self.host_name = hostname
    self.catalog_id = catalog_id
    self.ml_schema = ml_schema
    self.configuration = None
    self._execution: Execution | None = None
    self.domain_schema = self.model.domain_schema
    self.project_name = project_name or self.domain_schema
    self.start_time = datetime.now()
    self.status = Status.pending.value

    # Configure logging format
    logging.basicConfig(
        level=logging_level,
        format="%(asctime)s - %(name)s.%(levelname)s - %(message)s",
    )

    # Set Deriva library logging level
    deriva_logger = logging.getLogger("deriva")
    deriva_logger.setLevel(logging_level)

add_dataset_element_type

add_dataset_element_type(
    element: str | Table,
) -> Table

A dataset_table is a heterogeneous collection of objects, each of which comes from a different table. This routine makes it possible to add objects from the specified table to a dataset_table.

Parameters:

Name Type Description Default
element str | Table

Name of the table or table object that is to be added to the dataset_table.

required

Returns:

Type Description
Table

The table object that was added to the dataset_table.

Source code in src/deriva_ml/dataset/dataset.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def add_dataset_element_type(self, element: str | Table) -> Table:
    """A dataset_table is a heterogeneous collection of objects, each of which comes from a different table. This
    routine makes it possible to add objects from the specified table to a dataset_table.

    Args:
        element: Name of the table or table object that is to be added to the dataset_table.

    Returns:
        The table object that was added to the dataset_table.
    """
    # Add table to map
    element_table = self._model.name_to_table(element)
    atable_def = Table.define_association([self._dataset_table, element_table])
    try:
        table = self._model.schemas[self._model.domain_schema].create_table(atable_def)
    except ValueError as e:
        if "already exists" in str(e):
            table = self._model.name_to_table(atable_def["table_name"])
        else:
            raise e

    # self.model = self.catalog.getCatalogModel()
    self._dataset_table.annotations.update(self._generate_dataset_download_annotations())
    self._model.model.apply()
    return table

add_dataset_members

add_dataset_members(
    dataset_rid: RID,
    members: list[RID]
    | dict[str, list[RID]],
    validate: bool = True,
    description: str | None = "",
    execution_rid: RID | None = None,
) -> None

Adds members to a dataset.

Associates one or more records with a dataset. Can optionally validate member types and create a new dataset version to track the changes.

Parameters:

Name Type Description Default
dataset_rid RID

Resource Identifier of the dataset.

required
members list[RID] | dict[str, list[RID]]

List of RIDs to add as dataset members. Can be orginized into a dictionary that indicates the table that the member rids belong to.

required
validate bool

Whether to validate member types. Defaults to True.

True
description str | None

Optional description of the member additions.

''
execution_rid RID | None

Optional execution RID to associate with changes.

None

Raises:

Type Description
DerivaMLException

If: - dataset_rid is invalid - members are invalid or of wrong type - adding members would create a cycle - validation fails

Example

ml.add_dataset_members( ... dataset_rid="1-abc123", ... members=["1-def456", "1-ghi789"], ... description="Added sample data" ... )

Source code in src/deriva_ml/dataset/dataset.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
@validate_call
def add_dataset_members(
    self,
    dataset_rid: RID,
    members: list[RID] | dict[str, list[RID]],
    validate: bool = True,
    description: str | None = "",
    execution_rid: RID | None = None,
) -> None:
    """Adds members to a dataset.

    Associates one or more records with a dataset. Can optionally validate member types
    and create a new dataset version to track the changes.

    Args:
        dataset_rid: Resource Identifier of the dataset.
        members: List of RIDs to add as dataset members. Can be orginized into a dictionary that indicates the
            table that the member rids belong to.
        validate: Whether to validate member types. Defaults to True.
        description: Optional description of the member additions.
        execution_rid: Optional execution RID to associate with changes.

    Raises:
        DerivaMLException: If:
            - dataset_rid is invalid
            - members are invalid or of wrong type
            - adding members would create a cycle
            - validation fails

    Example:
        >>> ml.add_dataset_members(
        ...     dataset_rid="1-abc123",
        ...     members=["1-def456", "1-ghi789"],
        ...     description="Added sample data"
        ... )
    """
    description = description or "Updated dataset via add_dataset_members"

    def check_dataset_cycle(member_rid, path=None):
        """

        Args:
          member_rid:
          path: (Default value = None)

        Returns:

        """
        path = path or set(dataset_rid)
        return member_rid in path

    if validate:
        existing_rids = set(m["RID"] for ms in self.list_dataset_members(dataset_rid).values() for m in ms)
        if overlap := set(existing_rids).intersection(members):
            raise DerivaMLException(f"Attempting to add existing member to dataset_table {dataset_rid}: {overlap}")

    # Now go through every rid to be added to the data set and sort them based on what association table entries
    # need to be made.
    dataset_elements = {}
    association_map = {
        a.other_fkeys.pop().pk_table.name: a.table.name for a in self._dataset_table.find_associations()
    }

    # Get a list of all the object types that can be linked to a dataset_table.
    if type(members) is list:
        members = set(members)
        for m in members:
            try:
                rid_info = self._model.catalog.resolve_rid(m)
            except KeyError:
                raise DerivaMLException(f"Invalid RID: {m}")
            if rid_info.table.name not in association_map:
                raise DerivaMLException(f"RID table: {rid_info.table.name} not part of dataset_table")
            if rid_info.table == self._dataset_table and check_dataset_cycle(rid_info.rid):
                raise DerivaMLException("Creating cycle of datasets is not allowed")
            dataset_elements.setdefault(rid_info.table.name, []).append(rid_info.rid)
    else:
        dataset_elements = {t: set(ms) for t, ms in members.items()}
    # Now make the entries into the association tables.
    pb = self._model.catalog.getPathBuilder()
    for table, elements in dataset_elements.items():
        schema_path = pb.schemas[
            self._ml_schema if (table == "Dataset" or table == "File") else self._model.domain_schema
        ]
        fk_column = "Nested_Dataset" if table == "Dataset" else table
        if len(elements):
            # Find out the name of the column in the association table.
            schema_path.tables[association_map[table]].insert(
                [{"Dataset": dataset_rid, fk_column: e} for e in elements]
            )
    self.increment_dataset_version(
        dataset_rid,
        VersionPart.minor,
        description=description,
        execution_rid=execution_rid,
    )

add_files

add_files(
    files: Iterable[FileSpec],
    dataset_types: str
    | list[str]
    | None = None,
    description: str = "",
    execution_rid: RID | None = None,
) -> RID

Adds files to the catalog with their metadata.

Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with specified file types. Optionally links files to an execution record.

Parameters:

Name Type Description Default
files Iterable[FileSpec]

File specifications containing MD5 checksum, length, and URL.

required
dataset_types str | list[str] | None

One or more dataset type terms from File_Type vocabulary.

None
description str

Description of the files.

''
execution_rid RID | None

Optional execution RID to associate files with.

None

Returns:

Name Type Description
RID RID

Resource of dataset that represents the newly added files.

Raises:

Type Description
DerivaMLException

If file_types are invalid or execution_rid is not an execution record.

Examples:

Add a single file type: >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)] >>> rids = ml.add_files(files, file_types="text")

Add multiple file types: >>> rids = ml.add_files( ... files=[FileSpec(url="image.png", md5="def456", length=2000)], ... file_types=["image", "png"], ... execution_rid="1-xyz789" ... )

Source code in src/deriva_ml/core/base.py
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def add_files(
    self,
    files: Iterable[FileSpec],
    dataset_types: str | list[str] | None = None,
    description: str = "",
    execution_rid: RID | None = None,
) -> RID:
    """Adds files to the catalog with their metadata.

    Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with
    specified file types. Optionally links files to an execution record.

    Args:
        files: File specifications containing MD5 checksum, length, and URL.
        dataset_types: One or more dataset type terms from File_Type vocabulary.
        description: Description of the files.
        execution_rid: Optional execution RID to associate files with.

    Returns:
        RID: Resource of dataset that represents the newly added files.

    Raises:
        DerivaMLException: If file_types are invalid or execution_rid is not an execution record.

    Examples:
        Add a single file type:
            >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)]
            >>> rids = ml.add_files(files, file_types="text")

        Add multiple file types:
            >>> rids = ml.add_files(
            ...     files=[FileSpec(url="image.png", md5="def456", length=2000)],
            ...     file_types=["image", "png"],
            ...     execution_rid="1-xyz789"
            ... )
    """
    if execution_rid and self.resolve_rid(execution_rid).table.name != "Execution":
        raise DerivaMLTableTypeError("Execution", execution_rid)

    filespec_list = list(files)

    # Get a list of all defined file types and their synonyms.
    defined_types = set(
        chain.from_iterable([[t.name] + t.synonyms for t in self.list_vocabulary_terms(MLVocab.asset_type)])
    )

    # Get a list of al of the file types used in the filespec_list
    spec_types = set(chain.from_iterable(filespec.file_types for filespec in filespec_list))

    # Now make sure that all of the file types and dataset_types in the spec list are defined.
    if spec_types - defined_types:
        raise DerivaMLInvalidTerm(MLVocab.asset_type.name, f"{spec_types - defined_types}")

    # Normalize dataset_types, make sure FIle type is included.
    if isinstance(dataset_types, list):
        dataset_types = ["File"] + dataset_types if "File" not in dataset_types else dataset_types
    else:
        dataset_types = ["File", dataset_types] if dataset_types else ["File"]
    for ds_type in dataset_types:
        self.lookup_term(MLVocab.dataset_type, ds_type)

    # Add files to the file table, and collect up the resulting entries by directory name.
    pb = self._model.catalog.getPathBuilder()
    file_records = list(
        pb.schemas[self.ml_schema].tables["File"].insert([f.model_dump(by_alias=True) for f in filespec_list])
    )

    # Get the name of the association table between file_table and file_type and add file_type records
    atable = self.model.find_association(MLTable.file, MLVocab.asset_type)[0].name
    # Need to get a link between file record and file_types.
    type_map = {
        file_spec.md5: file_spec.file_types + ([] if "File" in file_spec.file_types else [])
        for file_spec in filespec_list
    }
    file_type_records = [
        {MLVocab.asset_type.value: file_type, "File": file_record["RID"]}
        for file_record in file_records
        for file_type in type_map[file_record["MD5"]]
    ]
    pb.schemas[self._ml_schema].tables[atable].insert(file_type_records)

    if execution_rid:
        # Get the name of the association table between file_table and execution.
        pb.schemas[self._ml_schema].File_Execution.insert(
            [
                {"File": file_record["RID"], "Execution": execution_rid, "Asset_Role": "Output"}
                for file_record in file_records
            ]
        )

    # Now create datasets to capture the original directory structure of the files.
    dir_rid_map = defaultdict(list)
    for e in file_records:
        dir_rid_map[Path(urlsplit(e["URL"]).path).parent].append(e["RID"])

    nested_datasets = []
    path_length = 0
    dataset = None
    # Start with the longest path so we get subdirectories first.
    for p, rids in sorted(dir_rid_map.items(), key=lambda kv: len(kv[0].parts), reverse=True):
        dataset = self.create_dataset(
            dataset_types=dataset_types, execution_rid=execution_rid, description=description
        )
        members = rids
        if len(p.parts) < path_length:
            # Going up one level in directory, so Create nested dataset
            members = nested_datasets + rids
            nested_datasets = []
        self.add_dataset_members(dataset_rid=dataset, members=members, execution_rid=execution_rid)
        nested_datasets.append(dataset)
        path_length = len(p.parts)

    return dataset

add_page

add_page(
    title: str, content: str
) -> None

Adds page to web interface.

Creates a new page in the catalog's web interface with the specified title and content. The page will be accessible through the catalog's navigation system.

Parameters:

Name Type Description Default
title str

The title of the page to be displayed in navigation and headers.

required
content str

The main content of the page can include HTML markup.

required

Raises:

Type Description
DerivaMLException

If the page creation fails or the user lacks necessary permissions.

Example

ml.add_page( ... title="Analysis Results", ... content="

Results

Analysis completed successfully...

" ... )

Source code in src/deriva_ml/core/base.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def add_page(self, title: str, content: str) -> None:
    """Adds page to web interface.

    Creates a new page in the catalog's web interface with the specified title and content. The page will be
    accessible through the catalog's navigation system.

    Args:
        title: The title of the page to be displayed in navigation and headers.
        content: The main content of the page can include HTML markup.

    Raises:
        DerivaMLException: If the page creation fails or the user lacks necessary permissions.

    Example:
        >>> ml.add_page(
        ...     title="Analysis Results",
        ...     content="<h1>Results</h1><p>Analysis completed successfully...</p>"
        ... )
    """
    # Insert page into www tables with title and content
    self.pathBuilder.www.tables[self.domain_schema].insert([{"Title": title, "Content": content}])

add_term

add_term(
    table: str | Table,
    term_name: str,
    description: str,
    synonyms: list[str] | None = None,
    exists_ok: bool = True,
) -> VocabularyTerm

Adds a term to a vocabulary table.

Creates a new standardized term with description and optional synonyms in a vocabulary table. Can either create a new term or return an existing one if it already exists.

Parameters:

Name Type Description Default
table str | Table

Vocabulary table to add term to (name or Table object).

required
term_name str

Primary name of the term (must be unique within vocabulary).

required
description str

Explanation of term's meaning and usage.

required
synonyms list[str] | None

Alternative names for the term.

None
exists_ok bool

If True, return the existing term if found. If False, raise error.

True

Returns:

Name Type Description
VocabularyTerm VocabularyTerm

Object representing the created or existing term.

Raises:

Type Description
DerivaMLException

If a term exists and exists_ok=False, or if the table is not a vocabulary table.

Examples:

Add a new tissue type: >>> term = ml.add_term( ... table="tissue_types", ... term_name="epithelial", ... description="Epithelial tissue type", ... synonyms=["epithelium"] ... )

Attempt to add an existing term: >>> term = ml.add_term("tissue_types", "epithelial", "...", exists_ok=True)

Source code in src/deriva_ml/core/base.py
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def add_term(
    self,
    table: str | Table,
    term_name: str,
    description: str,
    synonyms: list[str] | None = None,
    exists_ok: bool = True,
) -> VocabularyTerm:
    """Adds a term to a vocabulary table.

    Creates a new standardized term with description and optional synonyms in a vocabulary table.
    Can either create a new term or return an existing one if it already exists.

    Args:
        table: Vocabulary table to add term to (name or Table object).
        term_name: Primary name of the term (must be unique within vocabulary).
        description: Explanation of term's meaning and usage.
        synonyms: Alternative names for the term.
        exists_ok: If True, return the existing term if found. If False, raise error.

    Returns:
        VocabularyTerm: Object representing the created or existing term.

    Raises:
        DerivaMLException: If a term exists and exists_ok=False, or if the table is not a vocabulary table.

    Examples:
        Add a new tissue type:
            >>> term = ml.add_term(
            ...     table="tissue_types",
            ...     term_name="epithelial",
            ...     description="Epithelial tissue type",
            ...     synonyms=["epithelium"]
            ... )

        Attempt to add an existing term:
            >>> term = ml.add_term("tissue_types", "epithelial", "...", exists_ok=True)
    """
    # Initialize an empty synonyms list if None
    synonyms = synonyms or []

    # Get table reference and validate if it is a vocabulary table
    table = self.model.name_to_table(table)
    pb = self.catalog.getPathBuilder()
    if not (self.model.is_vocabulary(table)):
        raise DerivaMLTableTypeError("vocabulary", table.name)

    # Get schema and table names for path building
    schema_name = table.schema.name
    table_name = table.name

    try:
        # Attempt to insert a new term
        term_id = VocabularyTerm.model_validate(
            pb.schemas[schema_name]
            .tables[table_name]
            .insert(
                [
                    {
                        "Name": term_name,
                        "Description": description,
                        "Synonyms": synonyms,
                    }
                ],
                defaults={"ID", "URI"},
            )[0]
        )
    except DataPathException:
        # Term exists - look it up or raise an error
        term_id = self.lookup_term(table, term_name)
        if not exists_ok:
            raise DerivaMLInvalidTerm(table.name, term_name, msg="term already exists")
    return term_id

add_workflow

add_workflow(workflow: Workflow) -> RID

Adds a workflow to the catalog.

Registers a new workflow in the catalog or returns the RID of an existing workflow with the same URL or checksum.

Each workflow represents a specific computational process or analysis pipeline.

Parameters:

Name Type Description Default
workflow Workflow

Workflow object containing name, URL, type, version, and description.

required

Returns:

Name Type Description
RID RID

Resource Identifier of the added or existing workflow.

Raises:

Type Description
DerivaMLException

If workflow insertion fails or required fields are missing.

Examples:

>>> workflow = Workflow(
...     name="Gene Analysis",
...     url="https://github.com/org/repo/workflows/gene_analysis.py",
...     workflow_type="python_script",
...     version="1.0.0",
...     description="Analyzes gene expression patterns"
... )
>>> workflow_rid = ml.add_workflow(workflow)
Source code in src/deriva_ml/core/base.py
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
def add_workflow(self, workflow: Workflow) -> RID:
    """Adds a workflow to the catalog.

    Registers a new workflow in the catalog or returns the RID of an existing workflow with the same
    URL or checksum.

    Each workflow represents a specific computational process or analysis pipeline.

    Args:
        workflow: Workflow object containing name, URL, type, version, and description.

    Returns:
        RID: Resource Identifier of the added or existing workflow.

    Raises:
        DerivaMLException: If workflow insertion fails or required fields are missing.

    Examples:
        >>> workflow = Workflow(
        ...     name="Gene Analysis",
        ...     url="https://github.com/org/repo/workflows/gene_analysis.py",
        ...     workflow_type="python_script",
        ...     version="1.0.0",
        ...     description="Analyzes gene expression patterns"
        ... )
        >>> workflow_rid = ml.add_workflow(workflow)
    """
    # Check if a workflow already exists by URL
    if workflow_rid := self.lookup_workflow(workflow.checksum or workflow.url):
        return workflow_rid

    # Get an ML schema path for the workflow table
    ml_schema_path = self.pathBuilder.schemas[self.ml_schema]

    try:
        # Create a workflow record
        workflow_record = {
            "URL": workflow.url,
            "Name": workflow.name,
            "Description": workflow.description,
            "Checksum": workflow.checksum,
            "Version": workflow.version,
            MLVocab.workflow_type: self.lookup_term(MLVocab.workflow_type, workflow.workflow_type).name,
        }
        # Insert a workflow and get its RID
        workflow_rid = ml_schema_path.Workflow.insert([workflow_record])[0]["RID"]
    except Exception as e:
        error = format_exception(e)
        raise DerivaMLException(f"Failed to insert workflow. Error: {error}")
    return workflow_rid

chaise_url

chaise_url(
    table: RID | Table | str,
) -> str

Generates Chaise web interface URL.

Chaise is Deriva's web interface for data exploration. This method creates a URL that directly links to the specified table or record.

Parameters:

Name Type Description Default
table RID | Table | str

Table to generate URL for (name, Table object, or RID).

required

Returns:

Name Type Description
str str

URL in format: https://{host}/chaise/recordset/#{catalog}/{schema}:{table}

Raises:

Type Description
DerivaMLException

If table or RID cannot be found.

Examples:

Using table name: >>> ml.chaise_url("experiment_table") 'https://deriva.org/chaise/recordset/#1/schema:experiment_table'

Using RID: >>> ml.chaise_url("1-abc123")

Source code in src/deriva_ml/core/base.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def chaise_url(self, table: RID | Table | str) -> str:
    """Generates Chaise web interface URL.

    Chaise is Deriva's web interface for data exploration. This method creates a URL that directly links to
    the specified table or record.

    Args:
        table: Table to generate URL for (name, Table object, or RID).

    Returns:
        str: URL in format: https://{host}/chaise/recordset/#{catalog}/{schema}:{table}

    Raises:
        DerivaMLException: If table or RID cannot be found.

    Examples:
        Using table name:
            >>> ml.chaise_url("experiment_table")
            'https://deriva.org/chaise/recordset/#1/schema:experiment_table'

        Using RID:
            >>> ml.chaise_url("1-abc123")
    """
    # Get the table object and build base URI
    table_obj = self.model.name_to_table(table)
    try:
        uri = self.catalog.get_server_uri().replace("ermrest/catalog/", "chaise/recordset/#")
    except DerivaMLException:
        # Handle RID case
        uri = self.cite(cast(str, table))
    return f"{uri}/{urlquote(table_obj.schema.name)}:{urlquote(table_obj.name)}"

cite

cite(
    entity: Dict[str, Any] | str,
) -> str

Generates permanent citation URL.

Creates a versioned URL that can be used to reference a specific entity in the catalog. The URL includes the catalog snapshot time to ensure version stability.

Parameters:

Name Type Description Default
entity Dict[str, Any] | str

Either a RID string or a dictionary containing entity data with a 'RID' key.

required

Returns:

Name Type Description
str str

Permanent citation URL in format: https://{host}/id/{catalog}/{rid}@{snapshot_time}

Raises:

Type Description
DerivaMLException

If an entity doesn't exist or lacks a RID.

Examples:

Using a RID string: >>> url = ml.cite("1-abc123") >>> print(url) 'https://deriva.org/id/1/1-abc123@2024-01-01T12:00:00'

Using a dictionary: >>> url = ml.cite({"RID": "1-abc123"})

Source code in src/deriva_ml/core/base.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def cite(self, entity: Dict[str, Any] | str) -> str:
    """Generates permanent citation URL.

    Creates a versioned URL that can be used to reference a specific entity in the catalog. The URL includes
    the catalog snapshot time to ensure version stability.

    Args:
        entity: Either a RID string or a dictionary containing entity data with a 'RID' key.

    Returns:
        str: Permanent citation URL in format: https://{host}/id/{catalog}/{rid}@{snapshot_time}

    Raises:
        DerivaMLException: If an entity doesn't exist or lacks a RID.

    Examples:
        Using a RID string:
            >>> url = ml.cite("1-abc123")
            >>> print(url)
            'https://deriva.org/id/1/1-abc123@2024-01-01T12:00:00'

        Using a dictionary:
            >>> url = ml.cite({"RID": "1-abc123"})
    """
    # Return if already a citation URL
    if isinstance(entity, str) and entity.startswith(f"https://{self.host_name}/id/{self.catalog_id}/"):
        return entity

    try:
        # Resolve RID and create citation URL with snapshot time
        self.resolve_rid(rid := entity if isinstance(entity, str) else entity["RID"])
        return f"https://{self.host_name}/id/{self.catalog_id}/{rid}@{self.catalog.latest_snapshot().snaptime}"
    except KeyError as e:
        raise DerivaMLException(f"Entity {e} does not have RID column")
    except DerivaMLException as _e:
        raise DerivaMLException("Entity RID does not exist")

create_asset

create_asset(
    asset_name: str,
    column_defs: Iterable[
        ColumnDefinition
    ]
    | None = None,
    fkey_defs: Iterable[
        ColumnDefinition
    ]
    | None = None,
    referenced_tables: Iterable[Table]
    | None = None,
    comment: str = "",
    schema: str | None = None,
) -> Table

Creates an asset table.

Parameters:

Name Type Description Default
asset_name str

Name of the asset table.

required
column_defs Iterable[ColumnDefinition] | None

Iterable of ColumnDefinition objects to provide additional metadata for asset.

None
fkey_defs Iterable[ColumnDefinition] | None

Iterable of ForeignKeyDefinition objects to provide additional metadata for asset.

None
referenced_tables Iterable[Table] | None

Iterable of Table objects to which asset should provide foreign-key references to.

None
comment str

Description of the asset table. (Default value = '')

''
schema str | None

Schema in which to create the asset table. Defaults to domain_schema.

None

Returns:

Type Description
Table

Table object for the asset table.

Source code in src/deriva_ml/core/base.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def create_asset(
    self,
    asset_name: str,
    column_defs: Iterable[ColumnDefinition] | None = None,
    fkey_defs: Iterable[ColumnDefinition] | None = None,
    referenced_tables: Iterable[Table] | None = None,
    comment: str = "",
    schema: str | None = None,
) -> Table:
    """Creates an asset table.

    Args:
        asset_name: Name of the asset table.
        column_defs: Iterable of ColumnDefinition objects to provide additional metadata for asset.
        fkey_defs: Iterable of ForeignKeyDefinition objects to provide additional metadata for asset.
        referenced_tables: Iterable of Table objects to which asset should provide foreign-key references to.
        comment: Description of the asset table. (Default value = '')
        schema: Schema in which to create the asset table.  Defaults to domain_schema.

    Returns:
        Table object for the asset table.
    """
    # Initialize empty collections if None provided
    column_defs = column_defs or []
    fkey_defs = fkey_defs or []
    referenced_tables = referenced_tables or []
    schema = schema or self.domain_schema

    # Add an asset type to vocabulary
    self.add_term(MLVocab.asset_type, asset_name, description=f"A {asset_name} asset")

    # Create the main asset table
    asset_table = self.model.schemas[schema].create_table(
        Table.define_asset(
            schema,
            asset_name,
            column_defs=[c.model_dump() for c in column_defs],
            fkey_defs=[fk.model_dump() for fk in fkey_defs],
            comment=comment,
        )
    )

    # Create an association table between asset and asset type
    self.model.schemas[self.domain_schema].create_table(
        Table.define_association(
            [
                (asset_table.name, asset_table),
                ("Asset_Type", self.model.name_to_table("Asset_Type")),
            ]
        )
    )

    # Create references to other tables if specified
    for t in referenced_tables:
        asset_table.create_reference(self.model.name_to_table(t))

    # Create an association table for tracking execution
    atable = self.model.schemas[self.domain_schema].create_table(
        Table.define_association(
            [
                (asset_name, asset_table),
                (
                    "Execution",
                    self.model.schemas[self.ml_schema].tables["Execution"],
                ),
            ]
        )
    )
    atable.create_reference(self.model.name_to_table("Asset_Role"))

    # Add asset annotations
    asset_annotation(asset_table)
    return asset_table

create_dataset

create_dataset(
    dataset_types: str
    | list[str]
    | None = None,
    description: str = "",
    execution_rid: RID | None = None,
    version: DatasetVersion
    | None = None,
) -> RID

Creates a new dataset in the catalog.

Creates a dataset with specified types and description. The dataset can be associated with an execution and initialized with a specific version.

Parameters:

Name Type Description Default
dataset_types str | list[str] | None

One or more dataset type terms from Dataset_Type vocabulary.

None
description str

Description of the dataset's purpose and contents.

''
execution_rid RID | None

Optional execution RID to associate with dataset creation.

None
version DatasetVersion | None

Optional initial version number. Defaults to 0.1.0.

None

Returns:

Name Type Description
RID RID

Resource Identifier of the newly created dataset.

Raises:

Type Description
DerivaMLException

If dataset_types are invalid or creation fails.

Example

rid = ml.create_dataset( ... dataset_types=["experiment", "raw_data"], ... description="RNA sequencing experiment data", ... version=DatasetVersion(1, 0, 0) ... )

Source code in src/deriva_ml/dataset/dataset.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def create_dataset(
    self,
    dataset_types: str | list[str] | None = None,
    description: str = "",
    execution_rid: RID | None = None,
    version: DatasetVersion | None = None,
) -> RID:
    """Creates a new dataset in the catalog.

    Creates a dataset with specified types and description. The dataset can be associated
    with an execution and initialized with a specific version.

    Args:
        dataset_types: One or more dataset type terms from Dataset_Type vocabulary.
        description: Description of the dataset's purpose and contents.
        execution_rid: Optional execution RID to associate with dataset creation.
        version: Optional initial version number. Defaults to 0.1.0.

    Returns:
        RID: Resource Identifier of the newly created dataset.

    Raises:
        DerivaMLException: If dataset_types are invalid or creation fails.

    Example:
        >>> rid = ml.create_dataset(
        ...     dataset_types=["experiment", "raw_data"],
        ...     description="RNA sequencing experiment data",
        ...     version=DatasetVersion(1, 0, 0)
        ... )
    """

    version = version or DatasetVersion(0, 1, 0)
    dataset_types = dataset_types or []

    type_path = self._model.catalog.getPathBuilder().schemas[self._ml_schema].tables[MLVocab.dataset_type.value]
    defined_types = list(type_path.entities().fetch())

    def check_dataset_type(dtype: str) -> bool:
        for term in defined_types:
            if dtype == term["Name"] or (term["Synonyms"] and ds_type in term["Synonyms"]):
                return True
        return False

    # Create the entry for the new dataset_table and get its RID.
    ds_types = [dataset_types] if isinstance(dataset_types, str) else dataset_types
    pb = self._model.catalog.getPathBuilder()
    for ds_type in ds_types:
        if not check_dataset_type(ds_type):
            raise DerivaMLException("Dataset type must be a vocabulary term.")
    dataset_table_path = pb.schemas[self._dataset_table.schema.name].tables[self._dataset_table.name]
    dataset_rid = dataset_table_path.insert(
        [
            {
                "Description": description,
                "Deleted": False,
            }
        ]
    )[0]["RID"]

    # Get the name of the association table between dataset_table and dataset_type.
    associations = list(self._model.schemas[self._ml_schema].tables[MLVocab.dataset_type].find_associations())
    atable = associations[0].name if associations else None
    pb.schemas[self._ml_schema].tables[atable].insert(
        [{MLVocab.dataset_type: ds_type, "Dataset": dataset_rid} for ds_type in ds_types]
    )
    if execution_rid is not None:
        pb.schemas[self._ml_schema].Dataset_Execution.insert([{"Dataset": dataset_rid, "Execution": execution_rid}])
    self._insert_dataset_versions(
        [DatasetSpec(rid=dataset_rid, version=version)],
        execution_rid=execution_rid,
        description="Initial dataset creation.",
    )
    return dataset_rid

create_execution

create_execution(
    configuration: ExecutionConfiguration,
    dry_run: bool = False,
) -> "Execution"

Creates an execution environment.

Given an execution configuration, initialize the local compute environment to prepare for executing an ML or analytic routine. This routine has a number of side effects.

  1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

  2. If any execution assets are provided in the configuration, they are downloaded and placed in the working directory.

Parameters:

Name Type Description Default
configuration ExecutionConfiguration

ExecutionConfiguration:

required
dry_run bool

Do not create an execution record or upload results.

False

Returns:

Type Description
'Execution'

An execution object.

Source code in src/deriva_ml/core/base.py
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
def create_execution(self, configuration: ExecutionConfiguration, dry_run: bool = False) -> "Execution":
    """Creates an execution environment.

    Given an execution configuration, initialize the local compute environment to prepare for executing an
    ML or analytic routine.  This routine has a number of side effects.

    1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is
    not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

    2. If any execution assets are provided in the configuration, they are downloaded
    and placed in the working directory.


    Args:
        configuration: ExecutionConfiguration:
        dry_run: Do not create an execution record or upload results.

    Returns:
        An execution object.
    """
    # Import here to avoid circular dependency
    from deriva_ml.execution.execution import Execution

    # Create and store an execution instance
    self._execution = Execution(configuration, self, dry_run=dry_run)
    return self._execution

create_feature

create_feature(
    target_table: Table | str,
    feature_name: str,
    terms: list[Table | str]
    | None = None,
    assets: list[Table | str]
    | None = None,
    metadata: list[
        ColumnDefinition
        | Table
        | Key
        | str
    ]
    | None = None,
    optional: list[str] | None = None,
    comment: str = "",
) -> type[FeatureRecord]

Creates a new feature definition.

A feature represents a measurable property or characteristic that can be associated with records in the target table. Features can include vocabulary terms, asset references, and additional metadata.

Parameters:

Name Type Description Default
target_table Table | str

Table to associate the feature with (name or Table object).

required
feature_name str

Unique name for the feature within the target table.

required
terms list[Table | str] | None

Optional vocabulary tables/names whose terms can be used as feature values.

None
assets list[Table | str] | None

Optional asset tables/names that can be referenced by this feature.

None
metadata list[ColumnDefinition | Table | Key | str] | None

Optional columns, tables, or keys to include in a feature definition.

None
optional list[str] | None

Column names that are not required when creating feature instances.

None
comment str

Description of the feature's purpose and usage.

''

Returns:

Type Description
type[FeatureRecord]

type[FeatureRecord]: Feature class for creating validated instances.

Raises:

Type Description
DerivaMLException

If a feature definition is invalid or conflicts with existing features.

Examples:

Create a feature with confidence score: >>> feature_class = ml.create_feature( ... target_table="samples", ... feature_name="expression_level", ... terms=["expression_values"], ... metadata=[ColumnDefinition(name="confidence", type=BuiltinTypes.float4)], ... comment="Gene expression measurement" ... )

Source code in src/deriva_ml/core/base.py
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def create_feature(
    self,
    target_table: Table | str,
    feature_name: str,
    terms: list[Table | str] | None = None,
    assets: list[Table | str] | None = None,
    metadata: list[ColumnDefinition | Table | Key | str] | None = None,
    optional: list[str] | None = None,
    comment: str = "",
) -> type[FeatureRecord]:
    """Creates a new feature definition.

    A feature represents a measurable property or characteristic that can be associated with records in the target
    table. Features can include vocabulary terms, asset references, and additional metadata.

    Args:
        target_table: Table to associate the feature with (name or Table object).
        feature_name: Unique name for the feature within the target table.
        terms: Optional vocabulary tables/names whose terms can be used as feature values.
        assets: Optional asset tables/names that can be referenced by this feature.
        metadata: Optional columns, tables, or keys to include in a feature definition.
        optional: Column names that are not required when creating feature instances.
        comment: Description of the feature's purpose and usage.

    Returns:
        type[FeatureRecord]: Feature class for creating validated instances.

    Raises:
        DerivaMLException: If a feature definition is invalid or conflicts with existing features.

    Examples:
        Create a feature with confidence score:
            >>> feature_class = ml.create_feature(
            ...     target_table="samples",
            ...     feature_name="expression_level",
            ...     terms=["expression_values"],
            ...     metadata=[ColumnDefinition(name="confidence", type=BuiltinTypes.float4)],
            ...     comment="Gene expression measurement"
            ... )
    """
    # Initialize empty collections if None provided
    terms = terms or []
    assets = assets or []
    metadata = metadata or []
    optional = optional or []

    def normalize_metadata(m: Key | Table | ColumnDefinition | str):
        """Helper function to normalize metadata references."""
        if isinstance(m, str):
            return self.model.name_to_table(m)
        elif isinstance(m, ColumnDefinition):
            return m.model_dump()
        else:
            return m

    # Validate asset and term tables
    if not all(map(self.model.is_asset, assets)):
        raise DerivaMLException("Invalid create_feature asset table.")
    if not all(map(self.model.is_vocabulary, terms)):
        raise DerivaMLException("Invalid create_feature asset table.")

    # Get references to required tables
    target_table = self.model.name_to_table(target_table)
    execution = self.model.schemas[self.ml_schema].tables["Execution"]
    feature_name_table = self.model.schemas[self.ml_schema].tables["Feature_Name"]

    # Add feature name to vocabulary
    feature_name_term = self.add_term("Feature_Name", feature_name, description=comment)
    atable_name = f"Execution_{target_table.name}_{feature_name_term.name}"
    # Create an association table implementing the feature
    atable = self.model.schemas[self.domain_schema].create_table(
        target_table.define_association(
            table_name=atable_name,
            associates=[execution, target_table, feature_name_table],
            metadata=[normalize_metadata(m) for m in chain(assets, terms, metadata)],
            comment=comment,
        )
    )
    # Configure optional columns and default feature name
    for c in optional:
        atable.columns[c].alter(nullok=True)
    atable.columns["Feature_Name"].alter(default=feature_name_term.name)

    # Return feature record class for creating instances
    return self.feature_record_class(target_table, feature_name)

create_table

create_table(
    table: TableDefinition,
) -> Table

Creates a new table in the catalog.

Creates a table using the provided TableDefinition object, which specifies the table structure including columns, keys, and foreign key relationships.

Parameters:

Name Type Description Default
table TableDefinition

A TableDefinition object containing the complete specification of the table to create.

required

Returns:

Name Type Description
Table Table

The newly created ERMRest table object.

Raises:

Type Description
DerivaMLException

If table creation fails or the definition is invalid.

Example:

>>> table_def = TableDefinition(
...     name="experiments",
...     column_definitions=[
...         ColumnDefinition(name="name", type=BuiltinTypes.text),
...         ColumnDefinition(name="date", type=BuiltinTypes.date)
...     ]
... )
>>> new_table = ml.create_table(table_def)
Source code in src/deriva_ml/core/base.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def create_table(self, table: TableDefinition) -> Table:
    """Creates a new table in the catalog.

    Creates a table using the provided TableDefinition object, which specifies the table structure including
    columns, keys, and foreign key relationships.

    Args:
        table: A TableDefinition object containing the complete specification of the table to create.

    Returns:
        Table: The newly created ERMRest table object.

    Raises:
        DerivaMLException: If table creation fails or the definition is invalid.

    Example:

        >>> table_def = TableDefinition(
        ...     name="experiments",
        ...     column_definitions=[
        ...         ColumnDefinition(name="name", type=BuiltinTypes.text),
        ...         ColumnDefinition(name="date", type=BuiltinTypes.date)
        ...     ]
        ... )
        >>> new_table = ml.create_table(table_def)
    """
    # Create table in domain schema using provided definition
    return self.model.schemas[self.domain_schema].create_table(table.model_dump())

create_vocabulary

create_vocabulary(
    vocab_name: str,
    comment: str = "",
    schema: str | None = None,
) -> Table

Creates a controlled vocabulary table.

A controlled vocabulary table maintains a list of standardized terms and their definitions. Each term can have synonyms and descriptions to ensure consistent terminology usage across the dataset.

Parameters:

Name Type Description Default
vocab_name str

Name for the new vocabulary table. Must be a valid SQL identifier.

required
comment str

Description of the vocabulary's purpose and usage. Defaults to empty string.

''
schema str | None

Schema name to create the table in. If None, uses domain_schema.

None

Returns:

Name Type Description
Table Table

ERMRest table object representing the newly created vocabulary table.

Raises:

Type Description
DerivaMLException

If vocab_name is invalid or already exists.

Examples:

Create a vocabulary for tissue types:

>>> table = ml.create_vocabulary(
...     vocab_name="tissue_types",
...     comment="Standard tissue classifications",
...     schema="bio_schema"
... )
Source code in src/deriva_ml/core/base.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def create_vocabulary(self, vocab_name: str, comment: str = "", schema: str | None = None) -> Table:
    """Creates a controlled vocabulary table.

    A controlled vocabulary table maintains a list of standardized terms and their definitions. Each term can have
    synonyms and descriptions to ensure consistent terminology usage across the dataset.

    Args:
        vocab_name: Name for the new vocabulary table. Must be a valid SQL identifier.
        comment: Description of the vocabulary's purpose and usage. Defaults to empty string.
        schema: Schema name to create the table in. If None, uses domain_schema.

    Returns:
        Table: ERMRest table object representing the newly created vocabulary table.

    Raises:
        DerivaMLException: If vocab_name is invalid or already exists.

    Examples:
        Create a vocabulary for tissue types:

            >>> table = ml.create_vocabulary(
            ...     vocab_name="tissue_types",
            ...     comment="Standard tissue classifications",
            ...     schema="bio_schema"
            ... )
    """
    # Use domain schema if none specified
    schema = schema or self.domain_schema

    # Create and return vocabulary table with RID-based URI pattern
    try:
        vocab_table = self.model.schemas[schema].create_table(
            Table.define_vocabulary(vocab_name, f"{self.project_name}:{{RID}}", comment=comment)
        )
    except ValueError:
        raise DerivaMLException(f"Table {vocab_name} already exist")
    return vocab_table

create_workflow

create_workflow(
    name: str,
    workflow_type: str,
    description: str = "",
) -> Workflow

Creates a new workflow definition.

Creates a Workflow object that represents a computational process or analysis pipeline. The workflow type must be a term from the controlled vocabulary. This method is typically used to define new analysis workflows before execution.

Parameters:

Name Type Description Default
name str

Name of the workflow.

required
workflow_type str

Type of workflow (must exist in workflow_type vocabulary).

required
description str

Description of what the workflow does.

''

Returns:

Name Type Description
Workflow Workflow

New workflow object ready for registration.

Raises:

Type Description
DerivaMLException

If workflow_type is not in the vocabulary.

Examples:

>>> workflow = ml.create_workflow(
...     name="RNA Analysis",
...     workflow_type="python_notebook",
...     description="RNA sequence analysis pipeline"
... )
>>> rid = ml.add_workflow(workflow)
Source code in src/deriva_ml/core/base.py
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
def create_workflow(self, name: str, workflow_type: str, description: str = "") -> Workflow:
    """Creates a new workflow definition.

    Creates a Workflow object that represents a computational process or analysis pipeline. The workflow type
    must be a term from the controlled vocabulary. This method is typically used to define new analysis
    workflows before execution.

    Args:
        name: Name of the workflow.
        workflow_type: Type of workflow (must exist in workflow_type vocabulary).
        description: Description of what the workflow does.

    Returns:
        Workflow: New workflow object ready for registration.

    Raises:
        DerivaMLException: If workflow_type is not in the vocabulary.

    Examples:
        >>> workflow = ml.create_workflow(
        ...     name="RNA Analysis",
        ...     workflow_type="python_notebook",
        ...     description="RNA sequence analysis pipeline"
        ... )
        >>> rid = ml.add_workflow(workflow)
    """
    # Validate workflow type exists in vocabulary
    self.lookup_term(MLVocab.workflow_type, workflow_type)

    # Create and return a new workflow object
    return Workflow(name=name, workflow_type=workflow_type, description=description)

dataset_history

dataset_history(
    dataset_rid: RID,
) -> list[DatasetHistory]

Retrieves the version history of a dataset.

Returns a chronological list of dataset versions, including their version numbers, creation times, and associated metadata.

Parameters:

Name Type Description Default
dataset_rid RID

Resource Identifier of the dataset.

required

Returns:

Type Description
list[DatasetHistory]

list[DatasetHistory]: List of history entries, each containing: - dataset_version: Version number (major.minor.patch) - minid: Minimal Viable Identifier - snapshot: Catalog snapshot time - dataset_rid: Dataset Resource Identifier - version_rid: Version Resource Identifier - description: Version description - execution_rid: Associated execution RID

Raises:

Type Description
DerivaMLException

If dataset_rid is not a valid dataset RID.

Example

history = ml.dataset_history("1-abc123") for entry in history: ... print(f"Version {entry.dataset_version}: {entry.description}")

Source code in src/deriva_ml/dataset/dataset.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def dataset_history(self, dataset_rid: RID) -> list[DatasetHistory]:
    """Retrieves the version history of a dataset.

    Returns a chronological list of dataset versions, including their version numbers,
    creation times, and associated metadata.

    Args:
        dataset_rid: Resource Identifier of the dataset.

    Returns:
        list[DatasetHistory]: List of history entries, each containing:
            - dataset_version: Version number (major.minor.patch)
            - minid: Minimal Viable Identifier
            - snapshot: Catalog snapshot time
            - dataset_rid: Dataset Resource Identifier
            - version_rid: Version Resource Identifier
            - description: Version description
            - execution_rid: Associated execution RID

    Raises:
        DerivaMLException: If dataset_rid is not a valid dataset RID.

    Example:
        >>> history = ml.dataset_history("1-abc123")
        >>> for entry in history:
        ...     print(f"Version {entry.dataset_version}: {entry.description}")
    """

    if not self._is_dataset_rid(dataset_rid):
        raise DerivaMLException(f"RID is not for a data set: {dataset_rid}")
    version_path = self._model.catalog.getPathBuilder().schemas[self._ml_schema].tables["Dataset_Version"]
    return [
        DatasetHistory(
            dataset_version=DatasetVersion.parse(v["Version"]),
            minid=v["Minid"],
            snapshot=v["Snapshot"],
            dataset_rid=dataset_rid,
            version_rid=v["RID"],
            description=v["Description"],
            execution_rid=v["Execution"],
        )
        for v in version_path.filter(version_path.Dataset == dataset_rid).entities().fetch()
    ]

dataset_version

dataset_version(
    dataset_rid: RID,
) -> DatasetVersion

Retrieve the current version of the specified dataset_table.

Given a rid, return the most recent version of the dataset. It is important to remember that this version captures the state of the catalog at the time the version was created, not the current state of the catalog. This means that its possible that the values associated with an object in the catalog may be different from the values of that object in the dataset.

Parameters:

Name Type Description Default
dataset_rid RID

The RID of the dataset to retrieve the version for.

required

Returns:

Type Description
DatasetVersion

A tuple with the semantic version of the dataset_table.

Source code in src/deriva_ml/dataset/dataset.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@validate_call
def dataset_version(self, dataset_rid: RID) -> DatasetVersion:
    """Retrieve the current version of the specified dataset_table.

    Given a rid, return the most recent version of the dataset. It is important to remember that this version
    captures the state of the catalog at the time the version was created, not the current state of the catalog.
    This means that its possible that the values associated with an object in the catalog may be different
    from the values of that object in the dataset.

    Args:
        dataset_rid: The RID of the dataset to retrieve the version for.

    Returns:
        A tuple with the semantic version of the dataset_table.
    """
    history = self.dataset_history(dataset_rid)
    if not history:
        return DatasetVersion(0, 1, 0)
    else:
        # Ensure we return a DatasetVersion, not a string
        versions = [h.dataset_version for h in history]
        return max(versions) if versions else DatasetVersion(0, 1, 0)

delete_dataset

delete_dataset(
    dataset_rid: RID,
    recurse: bool = False,
) -> None

Delete a dataset_table from the catalog.

Parameters:

Name Type Description Default
dataset_rid RID

RID of the dataset_table to delete.

required
recurse bool

If True, delete the dataset_table along with any nested datasets. (Default value = False)

False
Source code in src/deriva_ml/dataset/dataset.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
@validate_call
def delete_dataset(self, dataset_rid: RID, recurse: bool = False) -> None:
    """Delete a dataset_table from the catalog.

    Args:
        dataset_rid: RID of the dataset_table to delete.
        recurse: If True, delete the dataset_table along with any nested datasets. (Default value = False)
    """
    # Get association table entries for this dataset_table
    # Delete association table entries
    if not self._is_dataset_rid(dataset_rid):
        raise DerivaMLException("Dataset_rid is not a dataset.")

    if parents := self.list_dataset_parents(dataset_rid):
        raise DerivaMLException(f'Dataset_rid "{dataset_rid}" is in a nested dataset: {parents}.')

    pb = self._model.catalog.getPathBuilder()
    dataset_path = pb.schemas[self._dataset_table.schema.name].tables[self._dataset_table.name]

    rid_list = [dataset_rid] + (self.list_dataset_children(dataset_rid=dataset_rid) if recurse else [])
    dataset_path.update([{"RID": r, "Deleted": True} for r in rid_list])

delete_dataset_members

delete_dataset_members(
    dataset_rid: RID,
    members: list[RID],
    description: str = "",
    execution_rid: RID | None = None,
) -> None

Remove elements to an existing dataset_table.

Delete elements from an existing dataset. In addition to deleting members, the minor version number of the dataset is incremented and the description, if provide is applied to that new version.

Parameters:

Name Type Description Default
dataset_rid RID

RID of dataset_table to extend or None if a new dataset_table is to be created.

required
members list[RID]

List of member RIDs to add to the dataset_table.

required
description str

Markdown description of the updated dataset.

''
execution_rid RID | None

Optional RID of execution associated with this operation.

None
Source code in src/deriva_ml/dataset/dataset.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
@validate_call
def delete_dataset_members(
    self,
    dataset_rid: RID,
    members: list[RID],
    description: str = "",
    execution_rid: RID | None = None,
) -> None:
    """Remove elements to an existing dataset_table.

    Delete elements from an existing dataset. In addition to deleting members, the minor version number of the
    dataset is incremented and the description, if provide is applied to that new version.

    Args:
        dataset_rid: RID of dataset_table to extend or None if a new dataset_table is to be created.
        members: List of member RIDs to add to the dataset_table.
        description: Markdown description of the updated dataset.
        execution_rid: Optional RID of execution associated with this operation.
    """

    members = set(members)
    description = description or "Deletes dataset members"

    # Now go through every rid to be added to the data set and sort them based on what association table entries
    # need to be made.
    dataset_elements = {}
    association_map = {
        a.other_fkeys.pop().pk_table.name: a.table.name for a in self._dataset_table.find_associations()
    }
    # Get a list of all the object types that can be linked to a dataset_table.
    for m in members:
        try:
            rid_info = self._model.catalog.resolve_rid(m)
        except KeyError:
            raise DerivaMLException(f"Invalid RID: {m}")
        if rid_info.table.name not in association_map:
            raise DerivaMLException(f"RID table: {rid_info.table.name} not part of dataset_table")
        dataset_elements.setdefault(rid_info.table.name, []).append(rid_info.rid)
    # Now make the entries into the association tables.
    pb = self._model.catalog.getPathBuilder()
    for table, elements in dataset_elements.items():
        schema_path = pb.schemas[self._ml_schema if table == "Dataset" else self._model.domain_schema]
        fk_column = "Nested_Dataset" if table == "Dataset" else table

        if len(elements):
            atable_path = schema_path.tables[association_map[table]]
            # Find out the name of the column in the association table.
            for e in elements:
                entity = atable_path.filter(
                    (atable_path.Dataset == dataset_rid) & (atable_path.columns[fk_column] == e),
                )
                entity.delete()
    self.increment_dataset_version(
        dataset_rid,
        VersionPart.minor,
        description=description,
        execution_rid=execution_rid,
    )

delete_feature

delete_feature(
    table: Table | str,
    feature_name: str,
) -> bool

Removes a feature definition and its data.

Deletes the feature and its implementation table from the catalog. This operation cannot be undone and will remove all feature values associated with this feature.

Parameters:

Name Type Description Default
table Table | str

The table containing the feature, either as name or Table object.

required
feature_name str

Name of the feature to delete.

required

Returns:

Name Type Description
bool bool

True if the feature was successfully deleted, False if it didn't exist.

Raises:

Type Description
DerivaMLException

If deletion fails due to constraints or permissions.

Example

success = ml.delete_feature("samples", "obsolete_feature") print("Deleted" if success else "Not found")

Source code in src/deriva_ml/core/base.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
def delete_feature(self, table: Table | str, feature_name: str) -> bool:
    """Removes a feature definition and its data.

    Deletes the feature and its implementation table from the catalog. This operation cannot be undone and
    will remove all feature values associated with this feature.

    Args:
        table: The table containing the feature, either as name or Table object.
        feature_name: Name of the feature to delete.

    Returns:
        bool: True if the feature was successfully deleted, False if it didn't exist.

    Raises:
        DerivaMLException: If deletion fails due to constraints or permissions.

    Example:
        >>> success = ml.delete_feature("samples", "obsolete_feature")
        >>> print("Deleted" if success else "Not found")
    """
    # Get table reference and find feature
    table = self.model.name_to_table(table)
    try:
        # Find and delete the feature's implementation table
        feature = next(f for f in self.model.find_features(table) if f.feature_name == feature_name)
        feature.feature_table.drop()
        return True
    except StopIteration:
        return False

download_dataset_bag

download_dataset_bag(
    dataset: DatasetSpec,
    execution_rid: RID | None = None,
) -> DatasetBag

Downloads a dataset to the local filesystem and creates a MINID if needed.

Downloads a dataset specified by DatasetSpec to the local filesystem. If the dataset doesn't have a MINID (Minimal Viable Identifier), one will be created. The dataset can optionally be associated with an execution record.

Parameters:

Name Type Description Default
dataset DatasetSpec

Specification of the dataset to download, including version and materialization options.

required
execution_rid RID | None

Optional execution RID to associate the download with.

None

Returns:

Name Type Description
DatasetBag DatasetBag

Object containing: - path: Local filesystem path to downloaded dataset - rid: Dataset's Resource Identifier - minid: Dataset's Minimal Viable Identifier

Examples:

Download with default options: >>> spec = DatasetSpec(rid="1-abc123") >>> bag = ml.download_dataset_bag(dataset=spec) >>> print(f"Downloaded to {bag.path}")

Download with execution tracking: >>> bag = ml.download_dataset_bag( ... dataset=DatasetSpec(rid="1-abc123", materialize=True), ... execution_rid="1-xyz789" ... )

Source code in src/deriva_ml/core/base.py
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def download_dataset_bag(
    self,
    dataset: DatasetSpec,
    execution_rid: RID | None = None,
) -> DatasetBag:
    """Downloads a dataset to the local filesystem and creates a MINID if needed.

    Downloads a dataset specified by DatasetSpec to the local filesystem. If the dataset doesn't have
    a MINID (Minimal Viable Identifier), one will be created. The dataset can optionally be associated
    with an execution record.

    Args:
        dataset: Specification of the dataset to download, including version and materialization options.
        execution_rid: Optional execution RID to associate the download with.

    Returns:
        DatasetBag: Object containing:
            - path: Local filesystem path to downloaded dataset
            - rid: Dataset's Resource Identifier
            - minid: Dataset's Minimal Viable Identifier

    Examples:
        Download with default options:
            >>> spec = DatasetSpec(rid="1-abc123")
            >>> bag = ml.download_dataset_bag(dataset=spec)
            >>> print(f"Downloaded to {bag.path}")

        Download with execution tracking:
            >>> bag = ml.download_dataset_bag(
            ...     dataset=DatasetSpec(rid="1-abc123", materialize=True),
            ...     execution_rid="1-xyz789"
            ... )
    """
    if not self._is_dataset_rid(dataset.rid):
        raise DerivaMLTableTypeError("Dataset", dataset.rid)
    return self._download_dataset_bag(
        dataset=dataset,
        execution_rid=execution_rid,
        snapshot_catalog=DerivaML(self.host_name, self._version_snapshot(dataset)),
    )

download_dir

download_dir(
    cached: bool = False,
) -> Path

Returns the appropriate download directory.

Provides the appropriate directory path for storing downloaded files, either in the cache or working directory.

Parameters:

Name Type Description Default
cached bool

If True, returns the cache directory path. If False, returns the working directory path.

False

Returns:

Name Type Description
Path Path

Directory path where downloaded files should be stored.

Example

cache_dir = ml.download_dir(cached=True) work_dir = ml.download_dir(cached=False)

Source code in src/deriva_ml/core/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def download_dir(self, cached: bool = False) -> Path:
    """Returns the appropriate download directory.

    Provides the appropriate directory path for storing downloaded files, either in the cache or working directory.

    Args:
        cached: If True, returns the cache directory path. If False, returns the working directory path.

    Returns:
        Path: Directory path where downloaded files should be stored.

    Example:
        >>> cache_dir = ml.download_dir(cached=True)
        >>> work_dir = ml.download_dir(cached=False)
    """
    # Return cache directory if cached=True, otherwise working directory
    return self.cache_dir if cached else self.working_dir

feature_record_class

feature_record_class(
    table: str | Table,
    feature_name: str,
) -> type[FeatureRecord]

Returns a pydantic model class for feature records.

Creates a typed interface for creating new instances of the specified feature. The returned class includes validation and type checking based on the feature's definition.

Parameters:

Name Type Description Default
table str | Table

The table containing the feature, either as name or Table object.

required
feature_name str

Name of the feature to create a record class for.

required

Returns:

Type Description
type[FeatureRecord]

type[FeatureRecord]: A pydantic model class for creating validated feature records.

Raises:

Type Description
DerivaMLException

If the feature doesn't exist or the table is invalid.

Example

ExpressionFeature = ml.feature_record_class("samples", "expression_level") feature = ExpressionFeature(value="high", confidence=0.95)

Source code in src/deriva_ml/core/base.py
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def feature_record_class(self, table: str | Table, feature_name: str) -> type[FeatureRecord]:
    """Returns a pydantic model class for feature records.

    Creates a typed interface for creating new instances of the specified feature. The returned class includes
    validation and type checking based on the feature's definition.

    Args:
        table: The table containing the feature, either as name or Table object.
        feature_name: Name of the feature to create a record class for.

    Returns:
        type[FeatureRecord]: A pydantic model class for creating validated feature records.

    Raises:
        DerivaMLException: If the feature doesn't exist or the table is invalid.

    Example:
        >>> ExpressionFeature = ml.feature_record_class("samples", "expression_level")
        >>> feature = ExpressionFeature(value="high", confidence=0.95)
    """
    # Look up a feature and return its record class
    return self.lookup_feature(table, feature_name).feature_record_class()

find_datasets

find_datasets(
    deleted: bool = False,
) -> Iterable[dict[str, Any]]

Returns a list of currently available datasets.

Parameters:

Name Type Description Default
deleted bool

If True, included the datasets that have been deleted.

False

Returns:

Type Description
Iterable[dict[str, Any]]

list of currently available datasets.

Source code in src/deriva_ml/dataset/dataset.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def find_datasets(self, deleted: bool = False) -> Iterable[dict[str, Any]]:
    """Returns a list of currently available datasets.

    Arguments:
        deleted: If True, included the datasets that have been deleted.

    Returns:
         list of currently available datasets.
    """
    # Get datapath to all the tables we will need: Dataset, DatasetType and the association table.
    pb = self._model.catalog.getPathBuilder()
    dataset_path = pb.schemas[self._dataset_table.schema.name].tables[self._dataset_table.name]
    associations = list(self._model.schemas[self._ml_schema].tables[MLVocab.dataset_type].find_associations())
    atable = associations[0].name if associations else None
    ml_path = pb.schemas[self._ml_schema]
    atable_path = ml_path.tables[atable]

    if deleted:
        filtered_path = dataset_path
    else:
        filtered_path = dataset_path.filter(
            (dataset_path.Deleted == False) | (dataset_path.Deleted == None)  # noqa: E711, E712
        )

    # Get a list of all the dataset_type values associated with this dataset_table.
    datasets = []
    for dataset in filtered_path.entities().fetch():
        ds_types = (
            atable_path.filter(atable_path.Dataset == dataset["RID"]).attributes(atable_path.Dataset_Type).fetch()
        )
        datasets.append(dataset | {MLVocab.dataset_type: [ds[MLVocab.dataset_type] for ds in ds_types]})
    return datasets

globus_login staticmethod

globus_login(host: str) -> None

Authenticates with Globus for accessing Deriva services.

Performs authentication using Globus Auth to access Deriva services. If already logged in, notifies the user. Uses non-interactive authentication flow without a browser or local server.

Parameters:

Name Type Description Default
host str

The hostname of the Deriva server to authenticate with (e.g., 'deriva.example.org').

required
Example

DerivaML.globus_login('deriva.example.org') 'Login Successful'

Source code in src/deriva_ml/core/base.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
@staticmethod
def globus_login(host: str) -> None:
    """Authenticates with Globus for accessing Deriva services.

    Performs authentication using Globus Auth to access Deriva services. If already logged in, notifies the user.
    Uses non-interactive authentication flow without a browser or local server.

    Args:
        host: The hostname of the Deriva server to authenticate with (e.g., 'deriva.example.org').

    Example:
        >>> DerivaML.globus_login('deriva.example.org')
        'Login Successful'
    """
    gnl = GlobusNativeLogin(host=host)
    if gnl.is_logged_in([host]):
        print("You are already logged in.")
    else:
        gnl.login(
            [host],
            no_local_server=True,
            no_browser=True,
            refresh_tokens=True,
            update_bdbag_keychain=True,
        )
        print("Login Successful")

increment_dataset_version

increment_dataset_version(
    dataset_rid: RID,
    component: VersionPart,
    description: str | None = "",
    execution_rid: RID | None = None,
) -> DatasetVersion

Increments a dataset's version number.

Creates a new version of the dataset by incrementing the specified version component (major, minor, or patch). The new version is recorded with an optional description and execution reference.

Parameters:

Name Type Description Default
dataset_rid RID

Resource Identifier of the dataset to version.

required
component VersionPart

Which version component to increment ('major', 'minor', or 'patch').

required
description str | None

Optional description of the changes in this version.

''
execution_rid RID | None

Optional execution RID to associate with this version.

None

Returns:

Name Type Description
DatasetVersion DatasetVersion

The new version number.

Raises:

Type Description
DerivaMLException

If dataset_rid is invalid or version increment fails.

Example

new_version = ml.increment_dataset_version( ... dataset_rid="1-abc123", ... component="minor", ... description="Added new samples" ... ) print(f"New version: {new_version}") # e.g., "1.2.0"

Source code in src/deriva_ml/dataset/dataset.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def increment_dataset_version(
    self,
    dataset_rid: RID,
    component: VersionPart,
    description: str | None = "",
    execution_rid: RID | None = None,
) -> DatasetVersion:
    """Increments a dataset's version number.

    Creates a new version of the dataset by incrementing the specified version component
    (major, minor, or patch). The new version is recorded with an optional description
    and execution reference.

    Args:
        dataset_rid: Resource Identifier of the dataset to version.
        component: Which version component to increment ('major', 'minor', or 'patch').
        description: Optional description of the changes in this version.
        execution_rid: Optional execution RID to associate with this version.

    Returns:
        DatasetVersion: The new version number.

    Raises:
        DerivaMLException: If dataset_rid is invalid or version increment fails.

    Example:
        >>> new_version = ml.increment_dataset_version(
        ...     dataset_rid="1-abc123",
        ...     component="minor",
        ...     description="Added new samples"
        ... )
        >>> print(f"New version: {new_version}")  # e.g., "1.2.0"
    """

    # Find all the datasets that are reachable from this dataset and determine their new version numbers.
    related_datasets = list(self._build_dataset_graph(dataset_rid=dataset_rid))
    version_update_list = [
        DatasetSpec(
            rid=ds_rid,
            version=self.dataset_version(ds_rid).increment_version(component),
        )
        for ds_rid in related_datasets
    ]
    self._insert_dataset_versions(version_update_list, description=description, execution_rid=execution_rid)
    return next((d.version for d in version_update_list if d.rid == dataset_rid))

list_assets

list_assets(
    asset_table: Table | str,
) -> list[dict[str, Any]]

Lists contents of an asset table.

Returns a list of assets with their types for the specified asset table.

Parameters:

Name Type Description Default
asset_table Table | str

Table or name of the asset table to list assets for.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of asset records, each containing: - RID: Resource identifier - Type: Asset type - Metadata: Asset metadata

Raises:

Type Description
DerivaMLException

If the table is not an asset table or doesn't exist.

Example

assets = ml.list_assets("tissue_types") for asset in assets: ... print(f"{asset['RID']}: {asset['Type']}")

Source code in src/deriva_ml/core/base.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def list_assets(self, asset_table: Table | str) -> list[dict[str, Any]]:
    """Lists contents of an asset table.

    Returns a list of assets with their types for the specified asset table.

    Args:
        asset_table: Table or name of the asset table to list assets for.

    Returns:
        list[dict[str, Any]]: List of asset records, each containing:
            - RID: Resource identifier
            - Type: Asset type
            - Metadata: Asset metadata

    Raises:
        DerivaMLException: If the table is not an asset table or doesn't exist.

    Example:
        >>> assets = ml.list_assets("tissue_types")
        >>> for asset in assets:
        ...     print(f"{asset['RID']}: {asset['Type']}")
    """
    # Validate and get asset table reference
    asset_table = self.model.name_to_table(asset_table)
    if not self.model.is_asset(asset_table):
        raise DerivaMLException(f"Table {asset_table.name} is not an asset")

    # Get path builders for asset and type tables
    pb = self._model.catalog.getPathBuilder()
    asset_path = pb.schemas[asset_table.schema.name].tables[asset_table.name]
    (
        asset_type_table,
        _,
        _,
    ) = self._model.find_association(asset_table, MLVocab.asset_type)
    type_path = pb.schemas[asset_type_table.schema.name].tables[asset_type_table.name]

    # Build a list of assets with their types
    assets = []
    for asset in asset_path.entities().fetch():
        # Get associated asset types for each asset
        asset_types = (
            type_path.filter(type_path.columns[asset_table.name] == asset["RID"])
            .attributes(type_path.Asset_Type)
            .fetch()
        )
        # Combine asset data with its types
        assets.append(
            asset | {MLVocab.asset_type.value: [asset_type[MLVocab.asset_type.value] for asset_type in asset_types]}
        )
    return assets

list_dataset_children

list_dataset_children(
    dataset_rid: RID,
    recurse: bool = False,
) -> list[RID]

Given a dataset_table RID, return a list of RIDs for any nested datasets.

Parameters:

Name Type Description Default
dataset_rid RID

A dataset_table RID.

required
recurse bool

If True, return a list of nested datasets RIDs.

False

Returns:

Type Description
list[RID]

list of nested dataset RIDs.

Source code in src/deriva_ml/dataset/dataset.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
@validate_call
def list_dataset_children(self, dataset_rid: RID, recurse: bool = False) -> list[RID]:
    """Given a dataset_table RID, return a list of RIDs for any nested datasets.

    Args:
        dataset_rid: A dataset_table RID.
        recurse: If True, return a list of nested datasets RIDs.

    Returns:
      list of nested dataset RIDs.

    """
    dataset_dataset_path = self._model.catalog.getPathBuilder().schemas[self._ml_schema].tables["Dataset_Dataset"]
    nested_datasets = list(dataset_dataset_path.entities().fetch())

    def find_children(rid: RID):
        children = [child["Nested_Dataset"] for child in nested_datasets if child["Dataset"] == rid]
        if recurse:
            for child in children.copy():
                children.extend(find_children(child))
        return children

    return find_children(dataset_rid)

list_dataset_element_types

list_dataset_element_types() -> (
    Iterable[Table]
)

List the types of entities that can be added to a dataset_table.

Returns:

Type Description
Iterable[Table]

return: An iterable of Table objects that can be included as an element of a dataset_table.

Source code in src/deriva_ml/dataset/dataset.py
484
485
486
487
488
489
490
491
492
493
494
def list_dataset_element_types(self) -> Iterable[Table]:
    """List the types of entities that can be added to a dataset_table.

    Returns:
      :return: An iterable of Table objects that can be included as an element of a dataset_table.
    """

    def domain_table(table: Table) -> bool:
        return table.schema.name == self._model.domain_schema or table.name == self._dataset_table.name

    return [t for a in self._dataset_table.find_associations() if domain_table(t := a.other_fkeys.pop().pk_table)]

list_dataset_members

list_dataset_members(
    dataset_rid: RID,
    recurse: bool = False,
    limit: int | None = None,
) -> dict[str, list[dict[str, Any]]]

Lists members of a dataset.

Returns a dictionary mapping member types to lists of member records. Can optionally recurse through nested datasets and limit the number of results.

Parameters:

Name Type Description Default
dataset_rid RID

Resource Identifier of the dataset.

required
recurse bool

Whether to include members of nested datasets. Defaults to False.

False
limit int | None

Maximum number of members to return per type. None for no limit.

None

Returns:

Type Description
dict[str, list[dict[str, Any]]]

dict[str, list[dict[str, Any]]]: Dictionary mapping member types to lists of members. Each member is a dictionary containing the record's attributes.

Raises:

Type Description
DerivaMLException

If dataset_rid is invalid.

Example

members = ml.list_dataset_members("1-abc123", recurse=True) for type_name, records in members.items(): ... print(f"{type_name}: {len(records)} records")

Source code in src/deriva_ml/dataset/dataset.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def list_dataset_members(
    self, dataset_rid: RID, recurse: bool = False, limit: int | None = None
) -> dict[str, list[dict[str, Any]]]:
    """Lists members of a dataset.

    Returns a dictionary mapping member types to lists of member records. Can optionally
    recurse through nested datasets and limit the number of results.

    Args:
        dataset_rid: Resource Identifier of the dataset.
        recurse: Whether to include members of nested datasets. Defaults to False.
        limit: Maximum number of members to return per type. None for no limit.

    Returns:
        dict[str, list[dict[str, Any]]]: Dictionary mapping member types to lists of members.
            Each member is a dictionary containing the record's attributes.

    Raises:
        DerivaMLException: If dataset_rid is invalid.

    Example:
        >>> members = ml.list_dataset_members("1-abc123", recurse=True)
        >>> for type_name, records in members.items():
        ...     print(f"{type_name}: {len(records)} records")
    """

    if not self._is_dataset_rid(dataset_rid):
        raise DerivaMLException(f"RID is not for a dataset_table: {dataset_rid}")

    # Look at each of the element types that might be in the dataset_table and get the list of rid for them from
    # the appropriate association table.
    members = defaultdict(list)
    pb = self._model.catalog.getPathBuilder()
    for assoc_table in self._dataset_table.find_associations():
        other_fkey = assoc_table.other_fkeys.pop()
        target_table = other_fkey.pk_table
        member_table = assoc_table.table

        # Look at domain tables and nested datasets.
        if target_table.schema.name != self._model.domain_schema and not (
            target_table == self._dataset_table or target_table.name == "File"
        ):
            continue
        member_column = (
            "Nested_Dataset" if target_table == self._dataset_table else other_fkey.foreign_key_columns[0].name
        )

        target_path = pb.schemas[target_table.schema.name].tables[target_table.name]
        member_path = pb.schemas[member_table.schema.name].tables[member_table.name]

        path = member_path.filter(member_path.Dataset == dataset_rid).link(
            target_path,
            on=(member_path.columns[member_column] == target_path.columns["RID"]),
        )
        target_entities = list(path.entities().fetch(limit=limit) if limit else path.entities().fetch())
        members[target_table.name].extend(target_entities)
        if recurse and target_table == self._dataset_table:
            # Get the members for all the nested datasets and add to the member list.
            nested_datasets = [d["RID"] for d in target_entities]
            for ds in nested_datasets:
                for k, v in self.list_dataset_members(ds, recurse=recurse).items():
                    members[k].extend(v)
    return dict(members)

list_dataset_parents

list_dataset_parents(
    dataset_rid: RID,
) -> list[str]

Given a dataset_table RID, return a list of RIDs of the parent datasets if this is included in a nested dataset.

Parameters:

Name Type Description Default
dataset_rid RID

return: RID of the parent dataset_table.

required

Returns:

Type Description
list[str]

RID of the parent dataset_table.

Source code in src/deriva_ml/dataset/dataset.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
@validate_call
def list_dataset_parents(self, dataset_rid: RID) -> list[str]:
    """Given a dataset_table RID, return a list of RIDs of the parent datasets if this is included in a
    nested dataset.

    Args:
        dataset_rid: return: RID of the parent dataset_table.

    Returns:
        RID of the parent dataset_table.
    """
    if not self._is_dataset_rid(dataset_rid):
        raise DerivaMLException(f"RID: {dataset_rid} does not belong to dataset_table {self._dataset_table.name}")
    # Get association table for nested datasets
    pb = self._model.catalog.getPathBuilder()
    atable_path = pb.schemas[self._ml_schema].Dataset_Dataset
    return [p["Dataset"] for p in atable_path.filter(atable_path.Nested_Dataset == dataset_rid).entities().fetch()]

list_feature_values

list_feature_values(
    table: Table | str,
    feature_name: str,
) -> datapath._ResultSet

Retrieves all values for a feature.

Returns all instances of the specified feature that have been created, including their associated metadata and references.

Parameters:

Name Type Description Default
table Table | str

The table containing the feature, either as name or Table object.

required
feature_name str

Name of the feature to retrieve values for.

required

Returns:

Type Description
_ResultSet

datapath._ResultSet: A result set containing all feature values and their metadata.

Raises:

Type Description
DerivaMLException

If the feature doesn't exist or cannot be accessed.

Example

values = ml.list_feature_values("samples", "expression_level") for value in values: ... print(f"Sample {value['RID']}: {value['value']}")

Source code in src/deriva_ml/core/base.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def list_feature_values(self, table: Table | str, feature_name: str) -> datapath._ResultSet:
    """Retrieves all values for a feature.

    Returns all instances of the specified feature that have been created, including their associated
    metadata and references.

    Args:
        table: The table containing the feature, either as name or Table object.
        feature_name: Name of the feature to retrieve values for.

    Returns:
        datapath._ResultSet: A result set containing all feature values and their metadata.

    Raises:
        DerivaMLException: If the feature doesn't exist or cannot be accessed.

    Example:
        >>> values = ml.list_feature_values("samples", "expression_level")
        >>> for value in values:
        ...     print(f"Sample {value['RID']}: {value['value']}")
    """
    # Get table and feature references
    table = self.model.name_to_table(table)
    feature = self.lookup_feature(table, feature_name)

    # Build and execute query for feature values
    pb = self.catalog.getPathBuilder()
    return pb.schemas[feature.feature_table.schema.name].tables[feature.feature_table.name].entities().fetch()

list_files

list_files(
    file_types: list[str] | None = None,
) -> list[dict[str, Any]]

Lists files in the catalog with their metadata.

Returns a list of files with their metadata including URL, MD5 hash, length, description, and associated file types. Files can be optionally filtered by type.

Parameters:

Name Type Description Default
file_types list[str] | None

Filter results to only include these file types.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of file records, each containing: - RID: Resource identifier - URL: File location - MD5: File hash - Length: File size - Description: File description - File_Types: List of associated file types

Examples:

List all files: >>> files = ml.list_files() >>> for f in files: ... print(f"{f['RID']}: {f['URL']}")

Filter by file type: >>> image_files = ml.list_files(["image", "png"])

Source code in src/deriva_ml/core/base.py
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
def list_files(self, file_types: list[str] | None = None) -> list[dict[str, Any]]:
    """Lists files in the catalog with their metadata.

    Returns a list of files with their metadata including URL, MD5 hash, length, description,
    and associated file types. Files can be optionally filtered by type.

    Args:
        file_types: Filter results to only include these file types.

    Returns:
        list[dict[str, Any]]: List of file records, each containing:
            - RID: Resource identifier
            - URL: File location
            - MD5: File hash
            - Length: File size
            - Description: File description
            - File_Types: List of associated file types

    Examples:
        List all files:
            >>> files = ml.list_files()
            >>> for f in files:
            ...     print(f"{f['RID']}: {f['URL']}")

        Filter by file type:
            >>> image_files = ml.list_files(["image", "png"])
    """

    asset_type_atable, file_fk, asset_type_fk = self.model.find_association("File", "Asset_Type")
    ml_path = self.pathBuilder.schemas[self._ml_schema]
    file = ml_path.File
    asset_type = ml_path.tables[asset_type_atable.name]

    path = file.path
    path = path.link(asset_type.alias("AT"), on=file.RID == asset_type.columns[file_fk], join_type="left")
    if file_types:
        path = path.filter(asset_type.columns[asset_type_fk] == datapath.Any(*file_types))
    path = path.attributes(
        path.File.RID,
        path.File.URL,
        path.File.MD5,
        path.File.Length,
        path.File.Description,
        path.AT.columns[asset_type_fk],
    )

    file_map = {}
    for f in path.fetch():
        entry = file_map.setdefault(f["RID"], {**f, "File_Types": []})
        if ft := f.get("Asset_Type"):  # assign-and-test in one go
            entry["File_Types"].append(ft)

    # Now get rid of the File_Type key and return the result
    return [(f, f.pop("Asset_Type"))[0] for f in file_map.values()]

list_vocabulary_terms

list_vocabulary_terms(
    table: str | Table,
) -> list[VocabularyTerm]

Lists all terms in a vocabulary table.

Retrieves all terms, their descriptions, and synonyms from a controlled vocabulary table.

Parameters:

Name Type Description Default
table str | Table

Vocabulary table to list terms from (name or Table object).

required

Returns:

Type Description
list[VocabularyTerm]

list[VocabularyTerm]: List of vocabulary terms with their metadata.

Raises:

Type Description
DerivaMLException

If table doesn't exist or is not a vocabulary table.

Examples:

>>> terms = ml.list_vocabulary_terms("tissue_types")
>>> for term in terms:
...     print(f"{term.name}: {term.description}")
...     if term.synonyms:
...         print(f"  Synonyms: {', '.join(term.synonyms)}")
Source code in src/deriva_ml/core/base.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
def list_vocabulary_terms(self, table: str | Table) -> list[VocabularyTerm]:
    """Lists all terms in a vocabulary table.

    Retrieves all terms, their descriptions, and synonyms from a controlled vocabulary table.

    Args:
        table: Vocabulary table to list terms from (name or Table object).

    Returns:
        list[VocabularyTerm]: List of vocabulary terms with their metadata.

    Raises:
        DerivaMLException: If table doesn't exist or is not a vocabulary table.

    Examples:
        >>> terms = ml.list_vocabulary_terms("tissue_types")
        >>> for term in terms:
        ...     print(f"{term.name}: {term.description}")
        ...     if term.synonyms:
        ...         print(f"  Synonyms: {', '.join(term.synonyms)}")
    """
    # Get path builder and table reference
    pb = self.catalog.getPathBuilder()
    table = self.model.name_to_table(table.value if isinstance(table, MLVocab) else table)

    # Validate table is a vocabulary table
    if not (self.model.is_vocabulary(table)):
        raise DerivaMLException(f"The table {table} is not a controlled vocabulary")

    # Fetch and convert all terms to VocabularyTerm objects
    return [VocabularyTerm(**v) for v in pb.schemas[table.schema.name].tables[table.name].entities().fetch()]

list_workflows

list_workflows() -> list[Workflow]

Lists all workflows in the catalog.

Retrieves all workflow definitions, including their names, URLs, types, versions, and descriptions.

Returns:

Type Description
list[Workflow]

list[Workflow]: List of workflow objects, each containing: - name: Workflow name - url: Source code URL - workflow_type: Type of workflow - version: Version identifier - description: Workflow description - rid: Resource identifier - checksum: Source code checksum

Examples:

>>> workflows = ml.list_workflows()
>>> for w in workflows:
        print(f"{w.name} (v{w.version}): {w.description}")
        print(f"  Source: {w.url}")
Source code in src/deriva_ml/core/base.py
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
def list_workflows(self) -> list[Workflow]:
    """Lists all workflows in the catalog.

    Retrieves all workflow definitions, including their names, URLs, types, versions,
    and descriptions.

    Returns:
        list[Workflow]: List of workflow objects, each containing:
            - name: Workflow name
            - url: Source code URL
            - workflow_type: Type of workflow
            - version: Version identifier
            - description: Workflow description
            - rid: Resource identifier
            - checksum: Source code checksum

    Examples:
        >>> workflows = ml.list_workflows()
        >>> for w in workflows:
                print(f"{w.name} (v{w.version}): {w.description}")
                print(f"  Source: {w.url}")
    """
    # Get a workflow table path and fetch all workflows
    workflow_path = self.pathBuilder.schemas[self.ml_schema].Workflow
    return [
        Workflow(
            name=w["Name"],
            url=w["URL"],
            workflow_type=w["Workflow_Type"],
            version=w["Version"],
            description=w["Description"],
            rid=w["RID"],
            checksum=w["Checksum"],
        )
        for w in workflow_path.entities().fetch()
    ]

lookup_feature

lookup_feature(
    table: str | Table,
    feature_name: str,
) -> Feature

Retrieves a Feature object.

Looks up and returns a Feature object that provides an interface to work with an existing feature definition in the catalog.

Parameters:

Name Type Description Default
table str | Table

The table containing the feature, either as name or Table object.

required
feature_name str

Name of the feature to look up.

required

Returns:

Name Type Description
Feature Feature

An object representing the feature and its implementation.

Raises:

Type Description
DerivaMLException

If the feature doesn't exist in the specified table.

Example

feature = ml.lookup_feature("samples", "expression_level") print(feature.feature_name) 'expression_level'

Source code in src/deriva_ml/core/base.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def lookup_feature(self, table: str | Table, feature_name: str) -> Feature:
    """Retrieves a Feature object.

    Looks up and returns a Feature object that provides an interface to work with an existing feature
    definition in the catalog.

    Args:
        table: The table containing the feature, either as name or Table object.
        feature_name: Name of the feature to look up.

    Returns:
        Feature: An object representing the feature and its implementation.

    Raises:
        DerivaMLException: If the feature doesn't exist in the specified table.

    Example:
        >>> feature = ml.lookup_feature("samples", "expression_level")
        >>> print(feature.feature_name)
        'expression_level'
    """
    return self.model.lookup_feature(table, feature_name)

lookup_term

lookup_term(
    table: str | Table, term_name: str
) -> VocabularyTerm

Finds a term in a vocabulary table.

Searches for a term in the specified vocabulary table, matching either the primary name or any of its synonyms.

Parameters:

Name Type Description Default
table str | Table

Vocabulary table to search in (name or Table object).

required
term_name str

Name or synonym of the term to find.

required

Returns:

Name Type Description
VocabularyTerm VocabularyTerm

The matching vocabulary term.

Raises:

Type Description
DerivaMLVocabularyException

If the table is not a vocabulary table, or term is not found.

Examples:

Look up by primary name: >>> term = ml.lookup_term("tissue_types", "epithelial") >>> print(term.description)

Look up by synonym: >>> term = ml.lookup_term("tissue_types", "epithelium")

Source code in src/deriva_ml/core/base.py
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def lookup_term(self, table: str | Table, term_name: str) -> VocabularyTerm:
    """Finds a term in a vocabulary table.

    Searches for a term in the specified vocabulary table, matching either the primary name
    or any of its synonyms.

    Args:
        table: Vocabulary table to search in (name or Table object).
        term_name: Name or synonym of the term to find.

    Returns:
        VocabularyTerm: The matching vocabulary term.

    Raises:
        DerivaMLVocabularyException: If the table is not a vocabulary table, or term is not found.

    Examples:
        Look up by primary name:
            >>> term = ml.lookup_term("tissue_types", "epithelial")
            >>> print(term.description)

        Look up by synonym:
            >>> term = ml.lookup_term("tissue_types", "epithelium")
    """
    # Get and validate vocabulary table reference
    vocab_table = self.model.name_to_table(table)
    if not self.model.is_vocabulary(vocab_table):
        raise DerivaMLException(f"The table {table} is not a controlled vocabulary")

    # Get schema and table paths
    schema_name, table_name = vocab_table.schema.name, vocab_table.name
    schema_path = self.catalog.getPathBuilder().schemas[schema_name]

    # Search for term by name or synonym
    for term in schema_path.tables[table_name].entities().fetch():
        if term_name == term["Name"] or (term["Synonyms"] and term_name in term["Synonyms"]):
            return VocabularyTerm.model_validate(term)

    # Term not found
    raise DerivaMLInvalidTerm(table_name, term_name)

lookup_workflow

lookup_workflow(
    url_or_checksum: str,
) -> RID | None

Finds a workflow by URL.

Parameters:

Name Type Description Default
url_or_checksum str

URL or checksum of the workflow.

required

Returns: RID: Resource Identifier of the workflow if found, None otherwise.

Example

rid = ml.lookup_workflow("https://github.com/org/repo/workflow.py") if rid: ... print(f"Found workflow: {rid}")

Source code in src/deriva_ml/core/base.py
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
def lookup_workflow(self, url_or_checksum: str) -> RID | None:
    """Finds a workflow by URL.

    Args:
        url_or_checksum: URL or checksum of the workflow.
    Returns:
        RID: Resource Identifier of the workflow if found, None otherwise.

    Example:
        >>> rid = ml.lookup_workflow("https://github.com/org/repo/workflow.py")
        >>> if rid:
        ...     print(f"Found workflow: {rid}")
    """
    # Get a workflow table path
    workflow_path = self.pathBuilder.schemas[self.ml_schema].Workflow
    try:
        # Search for workflow by URL
        url_column = workflow_path.URL
        checksum_column = workflow_path.Checksum
        return list(
            workflow_path.path.filter(
                (url_column == url_or_checksum) | (checksum_column == url_or_checksum)
            ).entities()
        )[0]["RID"]
    except IndexError:
        return None

resolve_rid

resolve_rid(
    rid: RID,
) -> ResolveRidResult

Resolves RID to catalog location.

Looks up a RID and returns information about where it exists in the catalog, including schema, table, and column metadata.

Parameters:

Name Type Description Default
rid RID

Resource Identifier to resolve.

required

Returns:

Name Type Description
ResolveRidResult ResolveRidResult

Named tuple containing: - schema: Schema name - table: Table name - columns: Column definitions - datapath: Path builder for accessing the entity

Raises:

Type Description
DerivaMLException

If RID doesn't exist in catalog.

Examples:

>>> result = ml.resolve_rid("1-abc123")
>>> print(f"Found in {result.schema}.{result.table}")
>>> data = result.datapath.entities().fetch()
Source code in src/deriva_ml/core/base.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def resolve_rid(self, rid: RID) -> ResolveRidResult:
    """Resolves RID to catalog location.

    Looks up a RID and returns information about where it exists in the catalog, including schema,
    table, and column metadata.

    Args:
        rid: Resource Identifier to resolve.

    Returns:
        ResolveRidResult: Named tuple containing:
            - schema: Schema name
            - table: Table name
            - columns: Column definitions
            - datapath: Path builder for accessing the entity

    Raises:
        DerivaMLException: If RID doesn't exist in catalog.

    Examples:
        >>> result = ml.resolve_rid("1-abc123")
        >>> print(f"Found in {result.schema}.{result.table}")
        >>> data = result.datapath.entities().fetch()
    """
    try:
        # Attempt to resolve RID using catalog model
        return self.catalog.resolve_rid(rid, self.model.model)
    except KeyError as _e:
        raise DerivaMLException(f"Invalid RID {rid}")

restore_execution

restore_execution(
    execution_rid: RID | None = None,
) -> Execution

Restores a previous execution.

Given an execution RID, retrieves the execution configuration and restores the local compute environment. This routine has a number of side effects.

  1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

  2. If any execution assets are provided in the configuration, they are downloaded and placed in the working directory.

Parameters:

Name Type Description Default
execution_rid RID | None

Resource Identifier (RID) of the execution to restore.

None

Returns:

Name Type Description
Execution Execution

An execution object representing the restored execution environment.

Raises:

Type Description
DerivaMLException

If execution_rid is not valid or execution cannot be restored.

Example

execution = ml.restore_execution("1-abc123")

Source code in src/deriva_ml/core/base.py
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
def restore_execution(self, execution_rid: RID | None = None) -> Execution:
    """Restores a previous execution.

    Given an execution RID, retrieves the execution configuration and restores the local compute environment.
    This routine has a number of side effects.

    1. The datasets specified in the configuration are downloaded and placed in the cache-dir. If a version is
    not specified in the configuration, then a new minor version number is created for the dataset and downloaded.

    2. If any execution assets are provided in the configuration, they are downloaded and placed
    in the working directory.

    Args:
        execution_rid: Resource Identifier (RID) of the execution to restore.

    Returns:
        Execution: An execution object representing the restored execution environment.

    Raises:
        DerivaMLException: If execution_rid is not valid or execution cannot be restored.

    Example:
        >>> execution = ml.restore_execution("1-abc123")
    """
    # Import here to avoid circular dependency
    from deriva_ml.execution.execution import Execution

    # If no RID provided, try to find single execution in working directory
    if not execution_rid:
        e_rids = execution_rids(self.working_dir)
        if len(e_rids) != 1:
            raise DerivaMLException(f"Multiple execution RIDs were found {e_rids}.")
        execution_rid = e_rids[0]

    # Try to load configuration from a file
    cfile = asset_file_path(
        prefix=self.working_dir,
        exec_rid=execution_rid,
        file_name="configuration.json",
        asset_table=self.model.name_to_table("Execution_Metadata"),
        metadata={},
    )

    # Load configuration from a file or create from an execution record
    if cfile.exists():
        configuration = ExecutionConfiguration.load_configuration(cfile)
    else:
        execution = self.retrieve_rid(execution_rid)
        configuration = ExecutionConfiguration(
            workflow=execution["Workflow"],
            description=execution["Description"],
        )

    # Create and return an execution instance
    return Execution(configuration, self, reload=execution_rid)

retrieve_rid

retrieve_rid(
    rid: RID,
) -> dict[str, Any]

Retrieves complete record for RID.

Fetches all column values for the entity identified by the RID.

Parameters:

Name Type Description Default
rid RID

Resource Identifier of the record to retrieve.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary containing all column values for the entity.

Raises:

Type Description
DerivaMLException

If the RID doesn't exist in the catalog.

Example

record = ml.retrieve_rid("1-abc123") print(f"Name: {record['name']}, Created: {record['creation_date']}")

Source code in src/deriva_ml/core/base.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def retrieve_rid(self, rid: RID) -> dict[str, Any]:
    """Retrieves complete record for RID.

    Fetches all column values for the entity identified by the RID.

    Args:
        rid: Resource Identifier of the record to retrieve.

    Returns:
        dict[str, Any]: Dictionary containing all column values for the entity.

    Raises:
        DerivaMLException: If the RID doesn't exist in the catalog.

    Example:
        >>> record = ml.retrieve_rid("1-abc123")
        >>> print(f"Name: {record['name']}, Created: {record['creation_date']}")
    """
    # Resolve RID and fetch the first (only) matching record
    return self.resolve_rid(rid).datapath.entities().fetch()[0]

table_path

table_path(table: str | Table) -> Path

Returns a local filesystem path for table CSV files.

Generates a standardized path where CSV files should be placed when preparing to upload data to a table. The path follows the project's directory structure conventions.

Parameters:

Name Type Description Default
table str | Table

Name of the table or Table object to get the path for.

required

Returns:

Name Type Description
Path Path

Filesystem path where the CSV file should be placed.

Example

path = ml.table_path("experiment_results") df.to_csv(path) # Save data for upload

Source code in src/deriva_ml/core/base.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def table_path(self, table: str | Table) -> Path:
    """Returns a local filesystem path for table CSV files.

    Generates a standardized path where CSV files should be placed when preparing to upload data to a table.
    The path follows the project's directory structure conventions.

    Args:
        table: Name of the table or Table object to get the path for.

    Returns:
        Path: Filesystem path where the CSV file should be placed.

    Example:
        >>> path = ml.table_path("experiment_results")
        >>> df.to_csv(path) # Save data for upload
    """
    return table_path(
        self.working_dir,
        schema=self.domain_schema,
        table=self.model.name_to_table(table).name,
    )

user_list

user_list() -> List[Dict[str, str]]

Returns catalog user list.

Retrieves basic information about all users who have access to the catalog, including their identifiers and full names.

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: List of user information dictionaries, each containing: - 'ID': User identifier - 'Full_Name': User's full name

Examples:

>>> users = ml.user_list()
>>> for user in users:
...     print(f"{user['Full_Name']} ({user['ID']})")
Source code in src/deriva_ml/core/base.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def user_list(self) -> List[Dict[str, str]]:
    """Returns catalog user list.

    Retrieves basic information about all users who have access to the catalog, including their
    identifiers and full names.

    Returns:
        List[Dict[str, str]]: List of user information dictionaries, each containing:
            - 'ID': User identifier
            - 'Full_Name': User's full name

    Examples:

        >>> users = ml.user_list()
        >>> for user in users:
        ...     print(f"{user['Full_Name']} ({user['ID']})")
    """
    # Get the user table path and fetch basic user info
    user_path = self.pathBuilder.public.ERMrest_Client.path
    return [{"ID": u["ID"], "Full_Name": u["Full_Name"]} for u in user_path.entities().fetch()]

DerivaMLException

Bases: Exception

Exception class specific to DerivaML module.

Parameters:

Name Type Description Default
msg str

Optional message for the exception.

''
Source code in src/deriva_ml/core/exceptions.py
 6
 7
 8
 9
10
11
12
13
14
15
class DerivaMLException(Exception):
    """Exception class specific to DerivaML module.

    Args:
        msg (str): Optional message for the exception.
    """

    def __init__(self, msg=""):
        super().__init__(msg)
        self._msg = msg

DerivaMLInvalidTerm

Bases: DerivaMLException

Exception class for invalid terms in DerivaML controlled vocabulary.

Source code in src/deriva_ml/core/exceptions.py
18
19
20
21
22
class DerivaMLInvalidTerm(DerivaMLException):
    """Exception class for invalid terms in DerivaML controlled vocabulary."""
    def __init__(self, vocabulary, term: str, msg: str = "Term doesn't exist"):
        """Exception indicating undefined term type"""
        super().__init__(f"Invalid term {term} in vocabulary {vocabulary}: {msg}.")

__init__

__init__(
    vocabulary,
    term: str,
    msg: str = "Term doesn't exist",
)

Exception indicating undefined term type

Source code in src/deriva_ml/core/exceptions.py
20
21
22
def __init__(self, vocabulary, term: str, msg: str = "Term doesn't exist"):
    """Exception indicating undefined term type"""
    super().__init__(f"Invalid term {term} in vocabulary {vocabulary}: {msg}.")

DerivaMLTableTypeError

Bases: DerivaMLException

RID for table is not of correct type.

Source code in src/deriva_ml/core/exceptions.py
24
25
26
27
28
class DerivaMLTableTypeError(DerivaMLException):
    """RID for table is not of correct type."""
    def __init__(self, table_type, table: str):
        """Exception indicating undefined term type"""
        super().__init__(f"Table  {table} is not of type {table_type}.")

__init__

__init__(table_type, table: str)

Exception indicating undefined term type

Source code in src/deriva_ml/core/exceptions.py
26
27
28
def __init__(self, table_type, table: str):
    """Exception indicating undefined term type"""
    super().__init__(f"Table  {table} is not of type {table_type}.")

ExecAssetType

Bases: BaseStrEnum

Execution asset type identifiers.

Defines the types of assets that can be produced during an execution.

Attributes:

Name Type Description
input_file str

Input file used by the execution.

output_file str

Output file produced by the execution.

notebook_output str

Jupyter notebook output from the execution.

Source code in src/deriva_ml/core/enums.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class ExecAssetType(BaseStrEnum):
    """Execution asset type identifiers.

    Defines the types of assets that can be produced during an execution.

    Attributes:
        input_file (str): Input file used by the execution.
        output_file (str): Output file produced by the execution.
        notebook_output (str): Jupyter notebook output from the execution.
    """

    input_file = "Input_File"
    output_file = "Output_File"
    notebook_output = "Notebook_Output"
    model_file = "Model_File"

ExecMetadataType

Bases: BaseStrEnum

Execution metadata type identifiers.

Defines the types of metadata that can be associated with an execution.

Attributes:

Name Type Description
execution_config str

Execution configuration data.

runtime_env str

Runtime environment information.

Source code in src/deriva_ml/core/enums.py
194
195
196
197
198
199
200
201
202
203
204
205
class ExecMetadataType(BaseStrEnum):
    """Execution metadata type identifiers.

    Defines the types of metadata that can be associated with an execution.

    Attributes:
        execution_config (str): Execution configuration data.
        runtime_env (str): Runtime environment information.
    """

    execution_config = "Execution_Config"
    runtime_env = "Runtime_Env"

FileSpec

Bases: BaseModel

An entry into the File table

Attributes:

Name Type Description
url str

The File url to the url.

description str | None

The description of the file.

md5 str

The MD5 hash of the file.

length int

The length of the file in bytes.

file_types conlist(str) | None

A list of file types. Each files_type should be a defined term in MLVocab.file_type vocabulary.

Source code in src/deriva_ml/core/filespec.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class FileSpec(BaseModel):
    """An entry into the File table

    Attributes:
        url: The File url to the url.
        description: The description of the file.
        md5: The MD5 hash of the file.
        length: The length of the file in bytes.
        file_types: A list of file types.  Each files_type should be a defined term in MLVocab.file_type vocabulary.
    """

    url: str = Field(alias="URL", validation_alias="url")
    md5: str = Field(alias="MD5", validation_alias="md5")
    length: int = Field(alias="Length", validation_alias="length")
    description: str | None = Field(default="", alias="Description", validation_alias="description")
    file_types: conlist(str) | None = []

    @field_validator("url")
    @classmethod
    def validate_file_url(cls, url: str) -> str:
        """Examine the provided URL. If it's a local path, convert it into a tag URL.

        Args:
            url: The URL to validate and potentially convert

        Returns:
            The validated/converted URL

        Raises:
            ValidationError: If the URL is not a file URL
        """
        url_parts = urlparse(url)
        if url_parts.scheme == "tag":
            # Already a tag URL, so just return it.
            return url
        elif (not url_parts.scheme) or url_parts.scheme == "file":
            # There is no scheme part of the URL, or it is a file URL, so it is a local file path.
            # Convert to a tag URL.
            return f"tag://{gethostname()},{date.today()}:file://{url_parts.path}"
        else:
            raise ValueError("url is not a file URL")

    @classmethod
    def create_filespecs(
        cls, path: Path | str, description: str, file_types: list[str] | Callable[[Path], list[str]] | None = None
    ) -> Generator[FileSpec, None, None]:
        """Given a file or directory, generate the sequence of corresponding FileSpecs suitable to create a File table.

        Args:
            path: Path to the file or directory.
            description: The description of the file(s)
            file_types: A list of file types or a function that takes a file path and returns a list of file types.

        Returns:
            An iterable of FileSpecs for each file in the directory.
        """

        path = Path(path)
        file_types = file_types or []
        file_types_fn = file_types if callable(file_types) else lambda _x: file_types

        def create_spec(file_path: Path) -> FileSpec:
            hashes = hash_utils.compute_file_hashes(file_path, hashes=frozenset(["md5", "sha256"]))
            md5 = hashes["md5"][0]
            type_list = file_types_fn(file_path)
            return FileSpec(
                length=path.stat().st_size,
                md5=md5,
                description=description,
                url=file_path.as_posix(),
                file_types=type_list if "File" in type_list else ["File"] + type_list,
            )

        files = [path] if path.is_file() else [f for f in Path(path).rglob("*") if f.is_file()]
        return (create_spec(file) for file in files)

    @staticmethod
    def read_filespec(path: Path | str) -> Generator[FileSpec, None, None]:
        """Get FileSpecs from a JSON lines file.

        Args:
         path: Path to the .jsonl file (string or Path).

        Yields:
             A FileSpec object.
        """
        path = Path(path)
        with path.open("r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                yield FileSpec(**json.loads(line))

create_filespecs classmethod

create_filespecs(
    path: Path | str,
    description: str,
    file_types: list[str]
    | Callable[[Path], list[str]]
    | None = None,
) -> Generator[FileSpec, None, None]

Given a file or directory, generate the sequence of corresponding FileSpecs suitable to create a File table.

Parameters:

Name Type Description Default
path Path | str

Path to the file or directory.

required
description str

The description of the file(s)

required
file_types list[str] | Callable[[Path], list[str]] | None

A list of file types or a function that takes a file path and returns a list of file types.

None

Returns:

Type Description
None

An iterable of FileSpecs for each file in the directory.

Source code in src/deriva_ml/core/filespec.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def create_filespecs(
    cls, path: Path | str, description: str, file_types: list[str] | Callable[[Path], list[str]] | None = None
) -> Generator[FileSpec, None, None]:
    """Given a file or directory, generate the sequence of corresponding FileSpecs suitable to create a File table.

    Args:
        path: Path to the file or directory.
        description: The description of the file(s)
        file_types: A list of file types or a function that takes a file path and returns a list of file types.

    Returns:
        An iterable of FileSpecs for each file in the directory.
    """

    path = Path(path)
    file_types = file_types or []
    file_types_fn = file_types if callable(file_types) else lambda _x: file_types

    def create_spec(file_path: Path) -> FileSpec:
        hashes = hash_utils.compute_file_hashes(file_path, hashes=frozenset(["md5", "sha256"]))
        md5 = hashes["md5"][0]
        type_list = file_types_fn(file_path)
        return FileSpec(
            length=path.stat().st_size,
            md5=md5,
            description=description,
            url=file_path.as_posix(),
            file_types=type_list if "File" in type_list else ["File"] + type_list,
        )

    files = [path] if path.is_file() else [f for f in Path(path).rglob("*") if f.is_file()]
    return (create_spec(file) for file in files)

read_filespec staticmethod

read_filespec(
    path: Path | str,
) -> Generator[FileSpec, None, None]

Get FileSpecs from a JSON lines file.

Parameters:

Name Type Description Default
path Path | str

Path to the .jsonl file (string or Path).

required

Yields:

Type Description
FileSpec

A FileSpec object.

Source code in src/deriva_ml/core/filespec.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@staticmethod
def read_filespec(path: Path | str) -> Generator[FileSpec, None, None]:
    """Get FileSpecs from a JSON lines file.

    Args:
     path: Path to the .jsonl file (string or Path).

    Yields:
         A FileSpec object.
    """
    path = Path(path)
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            yield FileSpec(**json.loads(line))

validate_file_url classmethod

validate_file_url(url: str) -> str

Examine the provided URL. If it's a local path, convert it into a tag URL.

Parameters:

Name Type Description Default
url str

The URL to validate and potentially convert

required

Returns:

Type Description
str

The validated/converted URL

Raises:

Type Description
ValidationError

If the URL is not a file URL

Source code in src/deriva_ml/core/filespec.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@field_validator("url")
@classmethod
def validate_file_url(cls, url: str) -> str:
    """Examine the provided URL. If it's a local path, convert it into a tag URL.

    Args:
        url: The URL to validate and potentially convert

    Returns:
        The validated/converted URL

    Raises:
        ValidationError: If the URL is not a file URL
    """
    url_parts = urlparse(url)
    if url_parts.scheme == "tag":
        # Already a tag URL, so just return it.
        return url
    elif (not url_parts.scheme) or url_parts.scheme == "file":
        # There is no scheme part of the URL, or it is a file URL, so it is a local file path.
        # Convert to a tag URL.
        return f"tag://{gethostname()},{date.today()}:file://{url_parts.path}"
    else:
        raise ValueError("url is not a file URL")

FileUploadState

Bases: BaseModel

Tracks the state and result of a file upload operation.

Attributes:

Name Type Description
state UploadState

Current state of the upload (success, failed, etc.).

status str

Detailed status message.

result Any

Upload result data, if any.

rid RID | None

Resource identifier of the uploaded file, if successful.

Source code in src/deriva_ml/core/ermrest.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class FileUploadState(BaseModel):
    """Tracks the state and result of a file upload operation.

    Attributes:
        state (UploadState): Current state of the upload (success, failed, etc.).
        status (str): Detailed status message.
        result (Any): Upload result data, if any.
        rid (RID | None): Resource identifier of the uploaded file, if successful.
    """
    state: UploadState
    status: str
    result: Any

    @computed_field
    @property
    def rid(self) -> RID | None:
        return self.result and self.result["RID"]

MLAsset

Bases: BaseStrEnum

Asset type identifiers.

Defines the types of assets that can be associated with executions.

Attributes:

Name Type Description
execution_metadata str

Metadata about an execution.

execution_asset str

Asset produced by an execution.

Source code in src/deriva_ml/core/enums.py
169
170
171
172
173
174
175
176
177
178
179
180
class MLAsset(BaseStrEnum):
    """Asset type identifiers.

    Defines the types of assets that can be associated with executions.

    Attributes:
        execution_metadata (str): Metadata about an execution.
        execution_asset (str): Asset produced by an execution.
    """

    execution_metadata = "Execution_Metadata"
    execution_asset = "Execution_Asset"

MLVocab

Bases: BaseStrEnum

Controlled vocabulary type identifiers.

Defines the names of controlled vocabulary tables used in DerivaML for various types of entities and attributes.

Attributes:

Name Type Description
dataset_type str

Dataset classification vocabulary.

workflow_type str

Workflow classification vocabulary.

asset_type str

Asset classification vocabulary.

asset_role str

Asset role classification vocabulary.

Source code in src/deriva_ml/core/enums.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class MLVocab(BaseStrEnum):
    """Controlled vocabulary type identifiers.

    Defines the names of controlled vocabulary tables used in DerivaML for various types
    of entities and attributes.

    Attributes:
        dataset_type (str): Dataset classification vocabulary.
        workflow_type (str): Workflow classification vocabulary.
        asset_type (str): Asset classification vocabulary.
        asset_role (str): Asset role classification vocabulary.
    """

    dataset_type = "Dataset_Type"
    workflow_type = "Workflow_Type"
    asset_type = "Asset_Type"
    asset_role = "Asset_Role"
    feature_name = "Feature_Name"

TableDefinition

Bases: BaseModel

Defines a complete table structure in ERMrest.

Provides a Pydantic model for defining tables with their columns, keys, and relationships. Maps to deriva_py's Table.define functionality.

Attributes:

Name Type Description
name str

Name of the table.

column_defs Iterable[ColumnDefinition]

Column definitions.

key_defs Iterable[KeyDefinition]

Key constraint definitions.

fkey_defs Iterable[ForeignKeyDefinition]

Foreign key relationship definitions.

comment str | None

Description of the table's purpose.

acls dict

Access control lists.

acl_bindings dict

Dynamic access control bindings.

annotations dict

Additional metadata annotations.

Example

table = TableDefinition( ... name="experiment", ... column_defs=[ ... ColumnDefinition(name="id", type=BuiltinTypes.text), ... ColumnDefinition(name="date", type=BuiltinTypes.date) ... ], ... comment="Experimental data records" ... )

Source code in src/deriva_ml/core/ermrest.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class TableDefinition(BaseModel):
    """Defines a complete table structure in ERMrest.

    Provides a Pydantic model for defining tables with their columns, keys, and relationships.
    Maps to deriva_py's Table.define functionality.

    Attributes:
        name (str): Name of the table.
        column_defs (Iterable[ColumnDefinition]): Column definitions.
        key_defs (Iterable[KeyDefinition]): Key constraint definitions.
        fkey_defs (Iterable[ForeignKeyDefinition]): Foreign key relationship definitions.
        comment (str | None): Description of the table's purpose.
        acls (dict): Access control lists.
        acl_bindings (dict): Dynamic access control bindings.
        annotations (dict): Additional metadata annotations.

    Example:
        >>> table = TableDefinition(
        ...     name="experiment",
        ...     column_defs=[
        ...         ColumnDefinition(name="id", type=BuiltinTypes.text),
        ...         ColumnDefinition(name="date", type=BuiltinTypes.date)
        ...     ],
        ...     comment="Experimental data records"
        ... )
    """
    name: str
    column_defs: Iterable[ColumnDefinition]
    key_defs: Iterable[KeyDefinition] = Field(default_factory=list)
    fkey_defs: Iterable[ForeignKeyDefinition] = Field(default_factory=list)
    comment: str | None = None
    acls: dict = Field(default_factory=dict)
    acl_bindings: dict = Field(default_factory=dict)
    annotations: dict = Field(default_factory=dict)

    @model_serializer()
    def serialize_table_definition(self):
        return em.Table.define(
            tname=self.name,
            column_defs=[c.model_dump() for c in self.column_defs],
            key_defs=[k.model_dump() for k in self.key_defs],
            fkey_defs=[fk.model_dump() for fk in self.fkey_defs],
            comment=self.comment,
            acls=self.acls,
            acl_bindings=self.acl_bindings,
            annotations=self.annotations,
        )

UploadState

Bases: Enum

File upload operation states.

Represents the various states a file upload operation can be in, from initiation to completion.

Attributes:

Name Type Description
success int

Upload completed successfully.

failed int

Upload failed.

pending int

Upload is queued.

running int

Upload is in progress.

paused int

Upload is temporarily paused.

aborted int

Upload was aborted.

cancelled int

Upload was cancelled.

timeout int

Upload timed out.

Source code in src/deriva_ml/core/enums.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class UploadState(Enum):
    """File upload operation states.

    Represents the various states a file upload operation can be in, from initiation to completion.

    Attributes:
        success (int): Upload completed successfully.
        failed (int): Upload failed.
        pending (int): Upload is queued.
        running (int): Upload is in progress.
        paused (int): Upload is temporarily paused.
        aborted (int): Upload was aborted.
        cancelled (int): Upload was cancelled.
        timeout (int): Upload timed out.
    """

    success = 0
    failed = 1
    pending = 2
    running = 3
    paused = 4
    aborted = 5
    cancelled = 6
    timeout = 7