Skip to content

Documentation for Execution class in DerivaML

Execution management for DerivaML.

Provides the Execution lifecycle, workflow tracking, hydra-zen configuration helpers (BaseConfig, notebook_config, run_notebook), and multirun support for running reproducible ML experiments with full provenance tracking.

AssetRID dataclass

Bases: str

A string subclass representing an asset Resource ID with optional description.

.. deprecated:: Use :class:AssetSpec instead for new code. AssetRID is retained for backward compatibility.

Attributes:

Name Type Description
rid str

The Resource ID string identifying the asset in Deriva.

description str

Optional human-readable description of the asset.

Example

asset = AssetRID("3RA", "Pretrained model weights") print(asset) # "3RA" print(asset.description) # "Pretrained model weights"

Source code in src/deriva_ml/execution/execution_configuration.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@dataclass
class AssetRID(str):
    """A string subclass representing an asset Resource ID with optional description.

    .. deprecated::
        Use :class:`AssetSpec` instead for new code. ``AssetRID`` is retained
        for backward compatibility.

    Attributes:
        rid: The Resource ID string identifying the asset in Deriva.
        description: Optional human-readable description of the asset.

    Example:
        >>> asset = AssetRID("3RA", "Pretrained model weights")
        >>> print(asset)  # "3RA"
        >>> print(asset.description)  # "Pretrained model weights"
    """

    rid: str
    description: str = ""

    def __new__(cls, rid: str, description: str = ""):
        obj = super().__new__(cls, rid)
        obj.description = description
        return obj

AssetSpec

Bases: BaseModel

Specification for an asset in execution configurations.

Used to reference assets as inputs to executions, similar to how DatasetSpec is used for datasets. Supports optional checksum-based caching for large assets like model weights.

Attributes:

Name Type Description
rid RID

Resource Identifier of the asset.

asset_role str

Role of the asset ("Input" or "Output"). Defaults to "Input".

cache bool

If True, cache the downloaded asset by MD5 checksum in the DerivaML cache directory. Cached assets are reused across executions when the checksum matches, avoiding repeated downloads of large files.

Example

spec = AssetSpec(rid="3JSE") spec = AssetSpec(rid="3JSE", cache=True) # enable caching

Source code in src/deriva_ml/asset/aux_classes.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class AssetSpec(BaseModel):
    """Specification for an asset in execution configurations.

    Used to reference assets as inputs to executions, similar to how
    DatasetSpec is used for datasets. Supports optional checksum-based
    caching for large assets like model weights.

    Attributes:
        rid: Resource Identifier of the asset.
        asset_role: Role of the asset ("Input" or "Output"). Defaults to "Input".
        cache: If True, cache the downloaded asset by MD5 checksum in the
            DerivaML cache directory. Cached assets are reused across executions
            when the checksum matches, avoiding repeated downloads of large files.

    Example:
        >>> spec = AssetSpec(rid="3JSE")
        >>> spec = AssetSpec(rid="3JSE", cache=True)  # enable caching
    """

    rid: RID
    asset_role: str = "Input"
    cache: bool = False

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="before")
    @classmethod
    def _check_bare_rid(cls, data: Any) -> dict[str, str | bool]:
        """Allow bare RID string as shorthand."""
        return {"rid": data} if isinstance(data, str) else data

AssetSpecConfig

Hydra-zen configuration interface for AssetSpec.

Use in hydra-zen store definitions to specify assets with caching:

>>> from hydra_zen import store
>>> asset_store = store(group="assets")
>>> asset_store(
...     [AssetSpecConfig(rid="6-EPNR", cache=True)],
...     name="cached_weights",
... )
Source code in src/deriva_ml/asset/aux_classes.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@hydrated_dataclass(AssetSpec)
class AssetSpecConfig:
    """Hydra-zen configuration interface for AssetSpec.

    Use in hydra-zen store definitions to specify assets with caching:

        >>> from hydra_zen import store
        >>> asset_store = store(group="assets")
        >>> asset_store(
        ...     [AssetSpecConfig(rid="6-EPNR", cache=True)],
        ...     name="cached_weights",
        ... )
    """

    rid: str
    cache: bool = False

BaseConfig dataclass

Base configuration for DerivaML applications.

This dataclass defines the common configuration structure shared by both script execution and notebook modes. Project-specific configs should inherit from this class to get the standard DerivaML fields.

Note

Fields use Any type annotations because several DerivaML types (DerivaMLConfig, DatasetSpec) are Pydantic models which are not compatible with OmegaConf structured configs. The actual types at runtime are documented below.

Attributes:

Name Type Description
deriva_ml Any

DerivaML connection configuration (DerivaMLConfig at runtime).

datasets Any

List of dataset specifications (list[DatasetSpec] at runtime).

assets Any

List of asset RIDs to load (list[str] at runtime).

dry_run bool

If True, skip catalog writes (for testing/debugging).

description str

Human-readable description of this run.

config_choices dict[str, str]

Dictionary mapping config group names to selected config names. This is automatically populated by get_notebook_configuration() with the Hydra runtime choices (e.g., {"model_config": "cifar10_quick", "assets": "roc_quick"}). Useful for tracking which configurations were used in an execution.

Example

from dataclasses import dataclass from deriva_ml.execution import BaseConfig

@dataclass ... class MyConfig(BaseConfig): ... learning_rate: float = 0.001 ... epochs: int = 10

Source code in src/deriva_ml/execution/base_config.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@dataclass
class BaseConfig:
    """Base configuration for DerivaML applications.

    This dataclass defines the common configuration structure shared by
    both script execution and notebook modes. Project-specific configs
    should inherit from this class to get the standard DerivaML fields.

    Note:
        Fields use ``Any`` type annotations because several DerivaML types
        (DerivaMLConfig, DatasetSpec) are Pydantic models which are not
        compatible with OmegaConf structured configs. The actual types at
        runtime are documented below.

    Attributes:
        deriva_ml: DerivaML connection configuration (DerivaMLConfig at runtime).
        datasets: List of dataset specifications (list[DatasetSpec] at runtime).
        assets: List of asset RIDs to load (list[str] at runtime).
        dry_run: If True, skip catalog writes (for testing/debugging).
        description: Human-readable description of this run.
        config_choices: Dictionary mapping config group names to selected config names.
            This is automatically populated by get_notebook_configuration() with the
            Hydra runtime choices (e.g., {"model_config": "cifar10_quick", "assets": "roc_quick"}).
            Useful for tracking which configurations were used in an execution.

    Example:
        >>> from dataclasses import dataclass
        >>> from deriva_ml.execution import BaseConfig
        >>>
        >>> @dataclass
        ... class MyConfig(BaseConfig):
        ...     learning_rate: float = 0.001
        ...     epochs: int = 10
    """
    deriva_ml: Any = None
    datasets: Any = None
    assets: Any = None
    dry_run: bool = False
    description: str = ""
    config_choices: dict[str, str] = field(default_factory=dict)
    script_config: Any = None

DerivaMLModel

Bases: Protocol

Protocol for model functions compatible with DerivaML's run_model().

A model function must accept keyword arguments ml_instance and execution that are injected at runtime by run_model(). All other parameters are configured via Hydra and passed through the model_config.

The model function is responsible for: 1. Downloading input datasets via execution.download_dataset_bag() 2. Performing the ML computation (training, inference, etc.) 3. Registering output files via execution.asset_file_path()

Output files registered with asset_file_path() are automatically uploaded to the catalog after the model completes.

Attributes

This protocol defines a callable signature, not attributes.

Examples

Basic model function:

def my_model(
    epochs: int = 10,
    ml_instance: DerivaML = None,
    execution: Execution = None,
) -> None:
    # Training logic here
    pass

With domain-specific DerivaML subclass:

def eyeai_model(
    threshold: float = 0.5,
    ml_instance: EyeAI = None,  # EyeAI is a DerivaML subclass
    execution: Execution = None,
) -> None:
    # Can use EyeAI-specific methods
    ml_instance.some_eyeai_method()

Checking protocol compliance:

>>> from deriva_ml.execution.model_protocol import DerivaMLModel
>>> isinstance(my_model, DerivaMLModel)
True
Source code in src/deriva_ml/execution/model_protocol.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@runtime_checkable
class DerivaMLModel(Protocol):
    """Protocol for model functions compatible with DerivaML's run_model().

    A model function must accept keyword arguments `ml_instance` and `execution`
    that are injected at runtime by run_model(). All other parameters are
    configured via Hydra and passed through the model_config.

    The model function is responsible for:
    1. Downloading input datasets via execution.download_dataset_bag()
    2. Performing the ML computation (training, inference, etc.)
    3. Registering output files via execution.asset_file_path()

    Output files registered with asset_file_path() are automatically uploaded
    to the catalog after the model completes.

    Attributes
    ----------
    This protocol defines a callable signature, not attributes.

    Examples
    --------
    Basic model function:

        def my_model(
            epochs: int = 10,
            ml_instance: DerivaML = None,
            execution: Execution = None,
        ) -> None:
            # Training logic here
            pass

    With domain-specific DerivaML subclass:

        def eyeai_model(
            threshold: float = 0.5,
            ml_instance: EyeAI = None,  # EyeAI is a DerivaML subclass
            execution: Execution = None,
        ) -> None:
            # Can use EyeAI-specific methods
            ml_instance.some_eyeai_method()

    Checking protocol compliance:

        >>> from deriva_ml.execution.model_protocol import DerivaMLModel
        >>> isinstance(my_model, DerivaMLModel)
        True
    """

    def __call__(
        self,
        *args: Any,
        ml_instance: "DerivaML",
        execution: "Execution",
        **kwargs: Any,
    ) -> None:
        """Execute the model within a DerivaML execution context.

        Parameters
        ----------
        *args : Any
            Positional arguments (typically not used; prefer keyword args).
        ml_instance : DerivaML
            The DerivaML instance (or subclass like EyeAI) connected to the
            catalog. Use this for catalog operations not available through
            the execution context.
        execution : Execution
            The execution context manager. Provides:
            - execution.datasets: List of input DatasetSpec objects
            - execution.download_dataset_bag(): Download dataset as BDBag
            - execution.asset_file_path(): Register output file for upload
            - execution.working_dir: Path to local working directory
        **kwargs : Any
            Model-specific parameters configured via Hydra.

        Returns
        -------
        None
            Models should not return values. Results are captured through:
            - Files registered with asset_file_path() (uploaded to catalog)
            - Datasets created with execution.create_dataset()
            - Status updates via execution.update_status()
        """
        ...

__call__

__call__(
    *args: Any,
    ml_instance: "DerivaML",
    execution: "Execution",
    **kwargs: Any,
) -> None

Execute the model within a DerivaML execution context.

Parameters

args : Any Positional arguments (typically not used; prefer keyword args). ml_instance : DerivaML The DerivaML instance (or subclass like EyeAI) connected to the catalog. Use this for catalog operations not available through the execution context. execution : Execution The execution context manager. Provides: - execution.datasets: List of input DatasetSpec objects - execution.download_dataset_bag(): Download dataset as BDBag - execution.asset_file_path(): Register output file for upload - execution.working_dir: Path to local working directory *kwargs : Any Model-specific parameters configured via Hydra.

Returns

None Models should not return values. Results are captured through: - Files registered with asset_file_path() (uploaded to catalog) - Datasets created with execution.create_dataset() - Status updates via execution.update_status()

Source code in src/deriva_ml/execution/model_protocol.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def __call__(
    self,
    *args: Any,
    ml_instance: "DerivaML",
    execution: "Execution",
    **kwargs: Any,
) -> None:
    """Execute the model within a DerivaML execution context.

    Parameters
    ----------
    *args : Any
        Positional arguments (typically not used; prefer keyword args).
    ml_instance : DerivaML
        The DerivaML instance (or subclass like EyeAI) connected to the
        catalog. Use this for catalog operations not available through
        the execution context.
    execution : Execution
        The execution context manager. Provides:
        - execution.datasets: List of input DatasetSpec objects
        - execution.download_dataset_bag(): Download dataset as BDBag
        - execution.asset_file_path(): Register output file for upload
        - execution.working_dir: Path to local working directory
    **kwargs : Any
        Model-specific parameters configured via Hydra.

    Returns
    -------
    None
        Models should not return values. Results are captured through:
        - Files registered with asset_file_path() (uploaded to catalog)
        - Datasets created with execution.create_dataset()
        - Status updates via execution.update_status()
    """
    ...

DescribedList

Bases: list

A list with an attached description.

This class extends list to add a description attribute while maintaining full list compatibility. This allows configuration values (like asset RIDs or dataset specs) to carry documentation without changing how they're used.

When stored in hydra-zen and resolved via instantiate(), the result is a DescribedList that behaves like a regular list but has a description attribute.

Attributes:

Name Type Description
description

Human-readable description of this configuration.

Example

from hydra_zen import store from deriva_ml.execution import with_description

asset_store = store(group="assets") asset_store( ... with_description( ... ["3WMG", "3XPA"], ... "Model weights from quick and extended training", ... ), ... name="comparison_weights", ... )

After instantiation, usage is identical to a regular list:

config.assets[0] # "3WMG"

len(config.assets) # 2

for rid in config.assets: ...

config.assets.description # "Model weights from..."

Source code in src/deriva_ml/execution/base_config.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class DescribedList(list):
    """A list with an attached description.

    This class extends list to add a `description` attribute while maintaining
    full list compatibility. This allows configuration values (like asset RIDs
    or dataset specs) to carry documentation without changing how they're used.

    When stored in hydra-zen and resolved via `instantiate()`, the result is a
    DescribedList that behaves like a regular list but has a `description` attribute.

    Attributes:
        description: Human-readable description of this configuration.

    Example:
        >>> from hydra_zen import store
        >>> from deriva_ml.execution import with_description
        >>>
        >>> asset_store = store(group="assets")
        >>> asset_store(
        ...     with_description(
        ...         ["3WMG", "3XPA"],
        ...         "Model weights from quick and extended training",
        ...     ),
        ...     name="comparison_weights",
        ... )
        >>>
        >>> # After instantiation, usage is identical to a regular list:
        >>> # config.assets[0]  # "3WMG"
        >>> # len(config.assets)  # 2
        >>> # for rid in config.assets: ...
        >>> # config.assets.description  # "Model weights from..."
    """

    def __init__(self, items: list | None = None, description: str = ""):
        """Initialize a DescribedList.

        Args:
            items: Initial list items. If None, creates empty list.
            description: Human-readable description of this list.
        """
        super().__init__(items or [])
        self.description = description

    def __repr__(self) -> str:
        """Return string representation including description."""
        if self.description:
            return f"DescribedList({list(self)!r}, description={self.description!r})"
        return f"DescribedList({list(self)!r})"

__init__

__init__(
    items: list | None = None,
    description: str = "",
)

Initialize a DescribedList.

Parameters:

Name Type Description Default
items list | None

Initial list items. If None, creates empty list.

None
description str

Human-readable description of this list.

''
Source code in src/deriva_ml/execution/base_config.py
605
606
607
608
609
610
611
612
613
def __init__(self, items: list | None = None, description: str = ""):
    """Initialize a DescribedList.

    Args:
        items: Initial list items. If None, creates empty list.
        description: Human-readable description of this list.
    """
    super().__init__(items or [])
    self.description = description

__repr__

__repr__() -> str

Return string representation including description.

Source code in src/deriva_ml/execution/base_config.py
615
616
617
618
619
def __repr__(self) -> str:
    """Return string representation including description."""
    if self.description:
        return f"DescribedList({list(self)!r}, description={self.description!r})"
    return f"DescribedList({list(self)!r})"

Execution

Manages the lifecycle and context of a DerivaML execution.

An Execution represents a computational or manual process within DerivaML. It provides: - Dataset materialization and access - Asset management (inputs and outputs) - Status tracking and updates - Provenance recording - Result upload and cataloging

The class handles downloading required datasets and assets, tracking execution state, and managing the upload of results. Every dataset and file generated is associated with an execution record for provenance tracking.

Attributes:

Name Type Description
dataset_rids list[RID]

RIDs of datasets used in the execution.

datasets list[DatasetBag]

Materialized dataset objects.

configuration ExecutionConfiguration

Execution settings and parameters.

workflow_rid RID

RID of the associated workflow.

status Status

Current execution status.

asset_paths list[AssetFilePath]

Paths to execution assets.

start_time datetime | None

When execution started.

stop_time datetime | None

When execution completed.

Example

The context manager handles start/stop timing. Upload must be called AFTER the context manager exits::

>>> config = ExecutionConfiguration(
...     workflow="analysis",
...     description="Process samples",
... )
>>> with ml.create_execution(config) as execution:
...     bag = execution.download_dataset_bag(dataset_spec)
...     # Run analysis using bag.path
...     output_path = execution.asset_file_path("Model", "model.pt")
...     # Write results to output_path
...
>>> # IMPORTANT: Call upload AFTER exiting the context manager
>>> execution.upload_execution_outputs()
Source code in src/deriva_ml/execution/execution.py
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
class Execution:
    """Manages the lifecycle and context of a DerivaML execution.

    An Execution represents a computational or manual process within DerivaML. It provides:
    - Dataset materialization and access
    - Asset management (inputs and outputs)
    - Status tracking and updates
    - Provenance recording
    - Result upload and cataloging

    The class handles downloading required datasets and assets, tracking execution state,
    and managing the upload of results. Every dataset and file generated is associated
    with an execution record for provenance tracking.

    Attributes:
        dataset_rids (list[RID]): RIDs of datasets used in the execution.
        datasets (list[DatasetBag]): Materialized dataset objects.
        configuration (ExecutionConfiguration): Execution settings and parameters.
        workflow_rid (RID): RID of the associated workflow.
        status (Status): Current execution status.
        asset_paths (list[AssetFilePath]): Paths to execution assets.
        start_time (datetime | None): When execution started.
        stop_time (datetime | None): When execution completed.

    Example:
        The context manager handles start/stop timing. Upload must be called AFTER
        the context manager exits::

            >>> config = ExecutionConfiguration(
            ...     workflow="analysis",
            ...     description="Process samples",
            ... )
            >>> with ml.create_execution(config) as execution:
            ...     bag = execution.download_dataset_bag(dataset_spec)
            ...     # Run analysis using bag.path
            ...     output_path = execution.asset_file_path("Model", "model.pt")
            ...     # Write results to output_path
            ...
            >>> # IMPORTANT: Call upload AFTER exiting the context manager
            >>> execution.upload_execution_outputs()
    """

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def __init__(
        self,
        configuration: ExecutionConfiguration,
        ml_object: DerivaML,
        workflow: Workflow | None = None,
        reload: RID | None = None,
        dry_run: bool = False,
    ):
        """Initializes an Execution instance.

        Creates a new execution or reloads an existing one. Initializes the execution
        environment, downloads required datasets, and sets up asset tracking.

        Args:
            configuration: Settings and parameters for the execution.
            ml_object: DerivaML instance managing the execution.
            workflow: Optional Workflow object. If not specified, the workflow is taken from
                the ExecutionConfiguration object. Must be a Workflow object, not a RID.
            reload: Optional RID of existing execution to reload.
            dry_run: If True, don't create catalog records or upload results.

        Raises:
            DerivaMLException: If initialization fails, configuration is invalid,
                or workflow is not a Workflow object.

        Example:
            Create an execution with a workflow::

                >>> workflow = ml.lookup_workflow("2-ABC1")
                >>> config = ExecutionConfiguration(
                ...     workflow=workflow,
                ...     description="Process data"
                ... )
                >>> execution = Execution(config, ml)

            Or pass workflow separately::

                >>> workflow = ml.lookup_workflow_by_url(
                ...     "https://github.com/org/repo/blob/abc123/analysis.py"
                ... )
                >>> config = ExecutionConfiguration(description="Run analysis")
                >>> execution = Execution(config, ml, workflow=workflow)
        """

        self.asset_paths: dict[str, list[AssetFilePath]] = {}
        self.configuration = configuration
        self._ml_object = ml_object
        self._model = ml_object.model
        self._logger = ml_object._logger
        self.start_time = None
        self.stop_time = None
        self._status = Status.created
        self.uploaded_assets: dict[str, list[AssetFilePath]] | None = None
        self.configuration.argv = sys.argv
        self._execution_record: ExecutionRecord | None = None  # Lazily created after RID is assigned

        self.dataset_rids: List[RID] = []
        self.datasets: list[DatasetBag] = []

        self._working_dir = self._ml_object.working_dir
        self._cache_dir = self._ml_object.cache_dir
        if self._working_dir is None:
            raise DerivaMLException(
                "DerivaML working_dir is not set. "
                "Ensure the DerivaML instance was initialized with a valid working_dir."
            )
        self._dry_run = dry_run

        # Make sure we have a valid Workflow object.
        if workflow:
            self.configuration.workflow = workflow

        if self.configuration.workflow is None:
            raise DerivaMLException("Workflow must be specified either in configuration or as a parameter")

        if not isinstance(self.configuration.workflow, Workflow):
            raise DerivaMLException(
                f"Workflow must be a Workflow object, not {type(self.configuration.workflow).__name__}. "
                "Use ml.lookup_workflow(rid) or ml.lookup_workflow_by_url(url) to get a Workflow object."
            )

        # Validate workflow type(s) and register in catalog
        for wt in self.configuration.workflow.workflow_type:
            self._ml_object.lookup_term(MLVocab.workflow_type, wt)
        self.workflow_rid = (
            self._ml_object.add_workflow(self.configuration.workflow) if not self._dry_run else DRY_RUN_RID
        )

        # Validate the datasets and assets to be valid.
        for d in self.configuration.datasets:
            if self._ml_object.resolve_rid(d.rid).table.name != "Dataset":
                raise DerivaMLException("Dataset specified in execution configuration is not a dataset")

        for a in self.configuration.assets:
            if not self._model.is_asset(self._ml_object.resolve_rid(a.rid).table.name):
                raise DerivaMLException("Asset specified in execution configuration is not an asset table")

        schema_path = self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema]
        if reload:
            self.execution_rid = reload
            if self.execution_rid == DRY_RUN_RID:
                self._dry_run = True
        elif self._dry_run:
            self.execution_rid = DRY_RUN_RID
        else:
            self.execution_rid = schema_path.Execution.insert(
                [
                    {
                        "Description": self.configuration.description,
                        "Workflow": self.workflow_rid,
                    }
                ]
            )[0]["RID"]

        if rid_path := os.environ.get("DERIVA_ML_SAVE_EXECUTION_RID", None):
            # Put execution_rid into the provided file path so we can find it later.
            with Path(rid_path).open("w") as f:
                json.dump(
                    {
                        "hostname": self._ml_object.host_name,
                        "catalog_id": self._ml_object.catalog_id,
                        "workflow_rid": self.workflow_rid,
                        "execution_rid": self.execution_rid,
                    },
                    f,
                )

        # Create a directory for execution rid so we can recover the state in case of a crash.
        execution_root(prefix=self._ml_object.working_dir, exec_rid=self.execution_rid)

        # Create the ExecutionRecord to handle catalog state operations
        if not self._dry_run:
            self._execution_record = ExecutionRecord(
                execution_rid=self.execution_rid,
                workflow=self.configuration.workflow,
                status=Status.created,
                description=self.configuration.description,
                _ml_instance=self._ml_object,
                _logger=self._logger,
            )

        self._initialize_execution(reload)

    def _save_runtime_environment(self):
        runtime_env_path = self.asset_file_path(
            asset_name="Execution_Metadata",
            file_name=f"environment_snapshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
            asset_types=ExecMetadataType.runtime_env.value,
            description=_ENV_SNAPSHOT_DESCRIPTION,
        )
        with Path(runtime_env_path).open("w") as fp:
            json.dump(get_execution_environment(), fp)

    def _upload_hydra_config_assets(self):
        """Upload hydra assets to the catalog with Hydra_Config type."""
        hydra_runtime_output_dir = self._ml_object.hydra_runtime_output_dir
        if hydra_runtime_output_dir:
            timestamp = hydra_runtime_output_dir.parts[-1]
            for hydra_asset in hydra_runtime_output_dir.rglob("*"):
                if hydra_asset.is_dir():
                    continue
                # Register file for upload (side effect); result intentionally unused
                # Use Hydra_Config type for Hydra YAML configuration files
                self.asset_file_path(
                    asset_name=MLAsset.execution_metadata,
                    file_name=hydra_runtime_output_dir / hydra_asset,
                    rename_file=f"hydra-{timestamp}-{hydra_asset.name}",
                    asset_types=ExecMetadataType.hydra_config.value,
                    description=self._get_metadata_description(hydra_asset.name),
                )

    @staticmethod
    def _get_metadata_description(file_name: str) -> str | None:
        """Resolve a description for an execution metadata file.

        Handles both direct filenames (configuration.json, uv.lock) and
        hydra-renamed files (hydra-{timestamp}-{original_name}).

        Args:
            file_name: The filename as it appears in the catalog.

        Returns:
            A human-readable description, or None if the file is unrecognized.
        """
        if file_name in _METADATA_DESCRIPTIONS:
            return _METADATA_DESCRIPTIONS[file_name]

        # Hydra renamed files: "hydra-{timestamp}-{original_name}"
        if file_name.startswith("hydra-"):
            for original, desc in _METADATA_DESCRIPTIONS.items():
                if file_name.endswith(f"-{original}"):
                    return desc

        if file_name.startswith("environment_snapshot_"):
            return _ENV_SNAPSHOT_DESCRIPTION

        return None

    def _set_asset_descriptions(self, uploaded_assets: dict[str, list[AssetFilePath]]) -> None:
        """Set Description on asset records after upload.

        Applies descriptions from two sources:
        1. Hardcoded descriptions for known execution metadata files.
        2. User-provided descriptions passed via the ``description`` parameter
           of ``asset_file_path()``.

        Args:
            uploaded_assets: Dict mapping "{schema}/{table}" to uploaded AssetFilePaths.
        """
        manifest = self._get_manifest()
        pb = self._ml_object.pathBuilder()

        # Group updates by schema/table for batch efficiency
        table_updates: dict[str, list[dict[str, str]]] = {}

        for table_key, assets in uploaded_assets.items():
            for asset in assets:
                # Determine description: check manifest first, then fall back to
                # hardcoded metadata descriptions for Execution_Metadata files.
                table_name = table_key.split("/")[1] if "/" in table_key else table_key
                manifest_key = f"{table_name}/{asset.file_name}"
                entry = manifest.assets.get(manifest_key)

                description = None
                if entry and entry.description:
                    description = entry.description
                elif table_name == "Execution_Metadata":
                    description = self._get_metadata_description(asset.file_name)

                if description and asset.asset_rid:
                    table_updates.setdefault(table_key, []).append(
                        {"RID": asset.asset_rid, "Description": description}
                    )

        for table_key, updates in table_updates.items():
            schema, table_name = table_key.split("/", 1) if "/" in table_key else ("", table_key)
            pb.schemas[schema].tables[table_name].update(updates)

    def _initialize_execution(self, reload: RID | None = None) -> None:
        """Initialize the execution environment.

        Sets up the working directory, downloads required datasets and assets,
        and saves initial configuration metadata.

        Args:
            reload: Optional RID of a previously initialized execution to reload.

        Raises:
            DerivaMLException: If initialization fails.
        """
        # Materialize bdbag
        for dataset in self.configuration.datasets:
            self.update_status(Status.initializing, f"Materialize bag {dataset.rid}... ")
            self.datasets.append(self.download_dataset_bag(dataset))
            self.dataset_rids.append(dataset.rid)

        # Update execution info
        schema_path = self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema]
        if self.dataset_rids and not (reload or self._dry_run):
            schema_path.Dataset_Execution.insert(
                [{"Dataset": d, "Execution": self.execution_rid} for d in self.dataset_rids]
            )

        # Download assets....
        self.update_status(Status.running, "Downloading assets ...")
        self.asset_paths = {}
        for asset_spec in self.configuration.assets:
            asset_rid = asset_spec.rid
            use_cache = asset_spec.cache
            asset_table = self._ml_object.resolve_rid(asset_rid).table.name
            dest_dir = (
                execution_root(self._ml_object.working_dir, self.execution_rid) / "downloaded-assets" / asset_table
            )
            dest_dir.mkdir(parents=True, exist_ok=True)
            self.asset_paths.setdefault(asset_table, []).append(
                self.download_asset(
                    asset_rid=asset_rid,
                    dest_dir=dest_dir,
                    update_catalog=not (reload or self._dry_run),
                    use_cache=use_cache,
                )
            )

        # Save configuration details and upload (skip in dry_run mode)
        if not reload and not self._dry_run:
            # Save DerivaML configuration with Deriva_Config type
            cfile = self.asset_file_path(
                asset_name=MLAsset.execution_metadata,
                file_name="configuration.json",
                asset_types=ExecMetadataType.deriva_config.value,
                description=_METADATA_DESCRIPTIONS["configuration.json"],
            )

            with Path(cfile).open("w", encoding="utf-8") as config_file:
                json.dump(self.configuration.model_dump(mode="json"), config_file)
            # Only try to copy uv.lock if git_root is available (local workflow)
            if self.configuration.workflow.git_root:
                lock_file = Path(self.configuration.workflow.git_root) / "uv.lock"
            else:
                lock_file = None
            if lock_file and lock_file.exists():
                _ = self.asset_file_path(
                    asset_name=MLAsset.execution_metadata,
                    file_name=lock_file,
                    asset_types=ExecMetadataType.execution_config.value,
                    description=_METADATA_DESCRIPTIONS["uv.lock"],
                )

            self._upload_hydra_config_assets()

            # save runtime env
            self._save_runtime_environment()

            # Now upload the files so we have the info in case the execution fails.
            self.uploaded_assets = self._upload_execution_dirs()
            self._set_asset_descriptions(self.uploaded_assets)
        self.start_time = datetime.now()
        self.update_status(Status.pending, "Initialize status finished.")

    @property
    def status(self) -> Status:
        """Get the current execution status.

        Returns:
            Status: The current status (Created, Running, Completed, Failed, etc.).
        """
        if self._execution_record is not None:
            return self._execution_record.status
        return self._status

    @status.setter
    def status(self, value: Status) -> None:
        """Set the execution status.

        Args:
            value: The new status value.
        """
        self._status = value
        if self._execution_record is not None:
            self._execution_record._status = value

    @property
    def execution_record(self) -> ExecutionRecord | None:
        """Get the ExecutionRecord for catalog operations.

        Returns:
            ExecutionRecord if not in dry_run mode, None otherwise.
        """
        return self._execution_record

    @property
    def working_dir(self) -> Path:
        """Return the working directory for the execution."""
        return self._execution_root

    @property
    def _execution_root(self) -> Path:
        """Get the root directory for this execution's files.

        Returns:
            Path to the execution-specific directory.
        """
        return execution_root(self._working_dir, self.execution_rid)

    @property
    def _feature_root(self) -> Path:
        """Get the root directory for feature files.

        Returns:
            Path to the feature directory within the execution.
        """
        return feature_root(self._working_dir, self.execution_rid)

    @property
    def _asset_root(self) -> Path:
        """Get the root directory for asset files.

        Returns:
            Path to the asset directory within the execution.
        """
        return asset_root(self._working_dir, self.execution_rid)

    @property
    def database_catalog(self) -> DerivaMLDatabase | None:
        """Get a catalog-like interface for downloaded datasets.

        Returns a DerivaMLDatabase that implements the DerivaMLCatalog
        protocol, allowing the same code to work with both live catalogs
        and downloaded bags.

        This is useful for writing code that can operate on either a live
        catalog (via DerivaML) or on downloaded bags (via DerivaMLDatabase).

        Returns:
            DerivaMLDatabase wrapping the primary downloaded dataset's model,
            or None if no datasets have been downloaded.

        Example:
            >>> with ml.create_execution(config) as exe:
            ...     if exe.database_catalog:
            ...         db = exe.database_catalog
            ...         # Use same interface as DerivaML
            ...         dataset = db.lookup_dataset("4HM")
            ...         term = db.lookup_term("Diagnosis", "cancer")
            ...     else:
            ...         # No datasets downloaded, use live catalog
            ...         pass
        """
        if not self.datasets:
            return None
        # Use the first dataset's model as the primary
        return DerivaMLDatabase(self.datasets[0].model)

    @property
    def catalog(self) -> "DerivaML":
        """Get the live catalog (DerivaML) instance for this execution.

        This provides access to the live catalog for operations that require
        catalog connectivity, such as looking up datasets or other read operations.

        Returns:
            DerivaML: The live catalog instance.

        Example:
            >>> with ml.create_execution(config) as exe:
            ...     # Use live catalog for lookups
            ...     existing_dataset = exe.catalog.lookup_dataset("1-ABC")
        """
        return self._ml_object

    def add_features(self, features: list[FeatureRecord]) -> int:
        """Add feature values to the catalog in batch.

        Convenience method that delegates to ``catalog.add_features()``.
        Automatically sets the ``Execution`` field on each record to this
        execution's RID if not already set.

        Args:
            features: List of FeatureRecord instances to insert. All must share
                the same feature definition. Create records using the class
                returned by ``Feature.feature_record_class()``.

        Returns:
            Number of feature records inserted.

        Example:
            >>> feature = exe.catalog.lookup_feature("Image", "Classification")
            >>> RecordClass = feature.feature_record_class()
            >>> records = [
            ...     RecordClass(Image="1-ABC", Image_Class="Normal"),
            ...     RecordClass(Image="1-DEF", Image_Class="Abnormal"),
            ... ]
            >>> exe.add_features(records)  # Execution RID set automatically
        """
        for f in features:
            if f.Execution is None:
                f.Execution = self.execution_rid
        return self._ml_object.add_features(features)

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def download_dataset_bag(self, dataset: DatasetSpec) -> DatasetBag:
        """Downloads and materializes a dataset for use in the execution.

        Downloads the specified dataset as a BDBag and materializes it in the execution's
        working directory. The dataset version is determined by the DatasetSpec.

        Args:
            dataset: Specification of the dataset to download, including version and
                materialization options.

        Returns:
            DatasetBag: Object containing:
                - path: Local filesystem path to downloaded dataset
                - rid: Dataset's Resource Identifier
                - minid: Dataset's Minimal Viable Identifier

        Raises:
            DerivaMLException: If download or materialization fails.

        Example:
            >>> spec = DatasetSpec(rid="1-abc123", version="1.2.0")
            >>> bag = execution.download_dataset_bag(spec)
            >>> print(f"Downloaded to {bag.path}")
        """
        return self._ml_object.download_dataset_bag(dataset)

    @validate_call
    def update_status(self, status: Status, msg: str) -> None:
        """Updates the execution's status in the catalog.

        Records a new status and associated message in the catalog, allowing remote
        tracking of execution progress.

        Args:
            status: New status value (e.g., running, completed, failed).
            msg: Description of the status change or current state.

        Raises:
            DerivaMLException: If status update fails.

        Example:
            >>> execution.update_status(Status.running, "Processing sample 1 of 10")
        """
        self._status = status
        self._logger.info(msg)

        if self._dry_run:
            return

        # Delegate to ExecutionRecord for catalog updates
        if self._execution_record is not None:
            self._execution_record.update_status(status, msg)
        else:
            # Fallback for cases where ExecutionRecord isn't available
            self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema].Execution.update(
                [
                    {
                        "RID": self.execution_rid,
                        "Status": status.value,
                        "Status_Detail": msg,
                    }
                ]
            )

    def execution_start(self) -> None:
        """Marks the execution as started.

        Records the start time and updates the execution's status to 'running'.
        This should be called before beginning the main execution work.

        Example:
            >>> execution.execution_start()
            >>> try:
            ...     # Run analysis
            ...     execution.execution_stop()
            ... except Exception:
            ...     execution.update_status(Status.failed, "Analysis error")
        """
        self.start_time = datetime.now()
        self.uploaded_assets = None
        self.update_status(Status.initializing, "Start execution  ...")

    def execution_stop(self) -> None:
        """Marks the execution as completed.

        Records the stop time and updates the execution's status to 'completed'.
        This should be called after all execution work is finished.

        Example:
            >>> try:
            ...     # Run analysis
            ...     execution.execution_stop()
            ... except Exception:
            ...     execution.update_status(Status.failed, "Analysis error")
        """
        self.stop_time = datetime.now()
        duration = self.stop_time - self.start_time
        hours, remainder = divmod(duration.total_seconds(), 3600)
        minutes, seconds = divmod(remainder, 60)
        duration = f"{round(hours, 0)}H {round(minutes, 0)}min {round(seconds, 4)}sec"

        self.update_status(Status.completed, "Algorithm execution ended.")
        if not self._dry_run:
            self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema].Execution.update(
                [{"RID": self.execution_rid, "Duration": duration}]
            )

    def _build_upload_staging(self) -> Path:
        """Build ephemeral symlink tree from manifest for GenericUploader.

        Reads the manifest and creates symlinks from the flat assets/ directory
        into the regex-expected tree structure under asset/ that the
        GenericUploader needs for pattern matching.

        Returns:
            Path to the asset root (with staged symlinks) for upload.
        """
        manifest = self._get_manifest()
        pending = manifest.pending_assets()

        if not pending:
            # No manifest entries — fall back to old asset/ tree if it exists
            return self._asset_root

        staging_root = asset_root(self._working_dir, self.execution_rid)

        for key, entry in pending.items():
            # key is "{AssetTable}/{filename}"
            parts = key.split("/", 1)
            if len(parts) != 2:
                continue
            asset_table_name, filename = parts

            # Source file in flat storage
            flat_dir = flat_asset_dir(self._working_dir, self.execution_rid, asset_table_name)
            source = flat_dir / filename
            if not source.exists():
                self._logger.warning(f"Asset file not found: {source}")
                continue

            # Build metadata subdirectory path in sorted key order.
            # This must match the regex group order in asset_table_upload_spec()
            # which also sorts metadata_columns alphabetically.
            metadata_parts = (
                [str(entry.metadata[k]) for k in sorted(entry.metadata)]
                if entry.metadata else []
            )
            target_dir = staging_root / entry.schema / asset_table_name
            for part in metadata_parts:
                target_dir = target_dir / part
            target_dir.mkdir(parents=True, exist_ok=True)

            target = target_dir / filename
            if not target.exists():
                try:
                    target.symlink_to(source.resolve())
                except (OSError, PermissionError):
                    import shutil
                    shutil.copy2(source, target)

        return staging_root

    def _cleanup_upload_staging(self) -> None:
        """Remove ephemeral symlinks created by _build_upload_staging."""
        staging_root = asset_root(self._working_dir, self.execution_rid)
        if staging_root.exists():
            import shutil
            shutil.rmtree(staging_root, ignore_errors=True)

    def _upload_execution_dirs(
        self,
        progress_callback: Callable[[UploadProgress], None] | None = None,
        max_retries: int = 3,
        retry_delay: float = 5.0,
        timeout: tuple[int, int] | None = None,
        chunk_size: int | None = None,
    ) -> dict[str, list[AssetFilePath]]:
        """Upload execution assets using manifest-driven staging.

        Builds an ephemeral symlink tree from the manifest, uploads via
        GenericUploader, then updates the manifest with RIDs for uploaded assets.

        Args:
            progress_callback: Optional callback for upload progress updates.
            max_retries: Maximum retry attempts for failed uploads (default: 3).
            retry_delay: Initial delay between retries in seconds (default: 5.0).
            timeout: (connect_timeout, read_timeout) in seconds. Default (600, 600).
            chunk_size: Optional chunk size in bytes for hatrac uploads.

        Returns:
            dict mapping "{schema}/{table}" to list of AssetFilePath with RIDs.

        Raises:
            DerivaMLException: If upload fails.
        """
        # Build staging symlinks from manifest into the regex-expected tree
        upload_root = self._build_upload_staging()

        try:
            self.update_status(Status.running, "Uploading execution files...")
            results = upload_directory(
                self._model,
                upload_root,
                progress_callback=progress_callback,
                max_retries=max_retries,
                retry_delay=retry_delay,
                timeout=timeout,
                chunk_size=chunk_size,
            )
        except (RuntimeError, DerivaMLException) as e:
            error = format_exception(e)
            self.update_status(Status.failed, error)
            raise DerivaMLException(f"Failed to upload execution_assets: {error}")

        # Update manifest with upload results
        manifest = self._get_manifest()

        asset_map = {}
        for path, status in results.items():
            asset_table, file_name = normalize_asset_dir(path)

            # Find and update manifest entry
            table_name = asset_table.split("/")[1] if "/" in asset_table else asset_table
            manifest_key = f"{table_name}/{file_name}"
            rid = status.result["RID"]
            try:
                manifest.mark_uploaded(manifest_key, rid)
            except KeyError:
                pass  # File wasn't in manifest (legacy flow)

            asset_map.setdefault(asset_table, []).append(
                AssetFilePath(
                    asset_path=path,
                    asset_table=asset_table,
                    file_name=file_name,
                    asset_metadata={
                        k: v
                        for k, v in status.result.items()
                        if k in self._model.asset_metadata(table_name)
                    },
                    asset_types=[],
                    asset_rid=rid,
                )
            )
        self._update_asset_execution_table(asset_map)
        self.update_status(Status.running, "Updating features...")

        for p in self._feature_root.glob("**/*.jsonl"):
            m = is_feature_dir(p.parent)
            self._update_feature_table(
                target_table=m["target_table"],
                feature_name=m["feature_name"],
                feature_file=p,
                uploaded_files=asset_map,
            )

        self.update_status(Status.running, "Upload assets complete")
        return asset_map

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def download_asset(
        self, asset_rid: RID, dest_dir: Path, update_catalog: bool = True, use_cache: bool = False
    ) -> AssetFilePath:
        """Download an asset from a URL and place it in a local directory.

        Args:
            asset_rid: RID of the asset.
            dest_dir: Destination directory for the asset.
            update_catalog: Whether to update the catalog execution information after downloading.
            use_cache: If True, check the cache directory for a previously downloaded copy
                with a matching MD5 checksum before downloading. Cached copies are stored
                in ``cache_dir/assets/{rid}_{md5}/`` and symlinked into the destination.

        Returns:
            An AssetFilePath with the path to the downloaded (or cached) asset file.
        """

        asset_table = self._ml_object.resolve_rid(asset_rid).table
        if not self._model.is_asset(asset_table):
            raise DerivaMLException(f"RID {asset_rid}  is not for an asset table.")

        asset_record = self._ml_object.retrieve_rid(asset_rid)
        asset_metadata = {k: v for k, v in asset_record.items() if k in self._model.asset_metadata(asset_table)}
        asset_url = asset_record["URL"]
        asset_filename = dest_dir / asset_record["Filename"]

        # Check cache before downloading
        cache_hit = False
        if use_cache:
            md5 = asset_record.get("MD5")
            if md5:
                asset_cache_dir = self._ml_object.cache_dir / "assets"
                asset_cache_dir.mkdir(parents=True, exist_ok=True)
                cache_key = f"{asset_rid}_{md5}"
                cached_file = asset_cache_dir / cache_key / asset_record["Filename"]
                if cached_file.exists():
                    # Cache hit — symlink from cache to destination
                    self._logger.info(f"Using cached asset {asset_rid} (MD5: {md5})")
                    if asset_filename.exists() or asset_filename.is_symlink():
                        asset_filename.unlink()
                    asset_filename.symlink_to(cached_file)
                    cache_hit = True

        if not cache_hit:
            hs = HatracStore("https", self._ml_object.host_name, self._ml_object.credential)
            hs.get_obj(path=asset_url, destfilename=asset_filename.as_posix())

            # Store in cache for future use
            if use_cache:
                md5 = asset_record.get("MD5")
                if md5:
                    asset_cache_dir = self._ml_object.cache_dir / "assets"
                    asset_cache_dir.mkdir(parents=True, exist_ok=True)
                    cache_key = f"{asset_rid}_{md5}"
                    cache_entry_dir = asset_cache_dir / cache_key
                    cache_entry_dir.mkdir(parents=True, exist_ok=True)
                    cached_file = cache_entry_dir / asset_record["Filename"]
                    # Move file to cache, then symlink back
                    shutil.move(str(asset_filename), str(cached_file))
                    asset_filename.symlink_to(cached_file)
                    self._logger.info(f"Cached asset {asset_rid} (MD5: {md5})")

        asset_type_table, _col_l, _col_r = self._model.find_association(asset_table, MLVocab.asset_type)
        type_path = self._ml_object.pathBuilder().schemas[asset_type_table.schema.name].tables[asset_type_table.name]
        asset_types = [
            asset_type[MLVocab.asset_type.value]
            for asset_type in type_path.filter(type_path.columns[asset_table.name] == asset_rid)
            .attributes(type_path.Asset_Type)
            .fetch()
        ]

        asset_path = AssetFilePath(
            file_name=asset_filename,
            asset_rid=asset_rid,
            asset_path=asset_filename,
            asset_metadata=asset_metadata,
            asset_table=asset_table.name,
            asset_types=asset_types,
        )

        if update_catalog:
            self._update_asset_execution_table(
                {f"{asset_table.schema.name}/{asset_table.name}": [asset_path]},
                asset_role="Input",
            )
        return asset_path

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def upload_assets(
        self,
        assets_dir: str | Path,
    ) -> dict[Any, FileUploadState] | None:
        """Uploads assets from a directory to the catalog.

        Scans the specified directory for assets and uploads them to the catalog,
        recording their metadata and types. Assets are organized by their types
        and associated with the execution.

        Args:
            assets_dir: Directory containing assets to upload.

        Returns:
            dict[Any, FileUploadState] | None: Mapping of assets to their upload states,
                or None if no assets were found.

        Raises:
            DerivaMLException: If upload fails or assets are invalid.

        Example:
            >>> states = execution.upload_assets("output/results")
            >>> for asset, state in states.items():
            ...     print(f"{asset}: {state}")
        """

        def path_to_asset(path: str) -> str:
            """Pull the asset name out of a path to that asset in the filesystem"""
            components = path.split("/")
            return components[components.index("asset") + 2]  # Look for asset in the path to find the name

        if not self._model.is_asset(Path(assets_dir).name):
            raise DerivaMLException("Directory does not have name of an asset table.")
        results = upload_directory(self._model, assets_dir)
        return {path_to_asset(p): r for p, r in results.items()}

    def upload_execution_outputs(
        self,
        clean_folder: bool | None = None,
        progress_callback: Callable[[UploadProgress], None] | None = None,
        max_retries: int = 3,
        retry_delay: float = 5.0,
        timeout: tuple[int, int] | None = None,
        chunk_size: int | None = None,
    ) -> dict[str, list[AssetFilePath]]:
        """Uploads all outputs from the execution to the catalog.

        Scans the execution's output directories for assets, features, and other results,
        then uploads them to the catalog. Can optionally clean up the output folders
        after successful upload.

        IMPORTANT: This method must be called AFTER exiting the context manager, not inside it.
        The context manager handles execution timing (start/stop), while this method handles
        the separate upload step.

        Args:
            clean_folder: Whether to delete output folders after upload. If None (default),
                uses the DerivaML instance's clean_execution_dir setting. Pass True/False
                to override for this specific execution.
            progress_callback: Optional callback function to receive upload progress updates.
                Called with UploadProgress objects containing file name, bytes uploaded,
                total bytes, percent complete, phase, and status message.
            max_retries: Maximum number of retry attempts for failed uploads (default: 3).
            retry_delay: Initial delay in seconds between retries, doubles with each attempt (default: 5.0).
            timeout: Tuple of (connect_timeout, read_timeout) in seconds. Default is (600, 600).
                Note: urllib3 uses connect_timeout as the socket timeout during request body
                writes, so it must be large enough for a full chunk upload.
            chunk_size: Optional chunk size in bytes for hatrac uploads. If provided,
                large files will be uploaded in chunks of this size.

        Returns:
            dict[str, list[AssetFilePath]]: Mapping of asset types to their file paths.

        Raises:
            DerivaMLException: If upload fails or outputs are invalid.

        Example:
            >>> with ml.create_execution(config) as execution:
            ...     # Do ML work, register output files with asset_file_path()
            ...     path = execution.asset_file_path("Model", "model.pt")
            ...     # Write to path...
            ...
            >>> # Upload AFTER the context manager exits
            >>> def my_callback(progress):
            ...     print(f"Uploading {progress.file_name}: {progress.percent_complete:.1f}%")
            >>> outputs = execution.upload_execution_outputs(progress_callback=my_callback)
            >>>
            >>> # Upload large files with increased timeout (30 min per chunk)
            >>> outputs = execution.upload_execution_outputs(timeout=(6, 1800))
            >>>
            >>> # Override cleanup setting for this execution
            >>> outputs = execution.upload_execution_outputs(clean_folder=False)  # Keep files
        """
        if self._dry_run:
            return {}

        # Use DerivaML instance setting if not explicitly provided
        if clean_folder is None:
            clean_folder = getattr(self._ml_object, 'clean_execution_dir', True)

        try:
            self.uploaded_assets = self._upload_execution_dirs(
                progress_callback=progress_callback,
                max_retries=max_retries,
                retry_delay=retry_delay,
                timeout=timeout,
                chunk_size=chunk_size,
            )
            self._set_asset_descriptions(self.uploaded_assets)
            self.update_status(Status.completed, "Successfully end the execution.")
            if clean_folder:
                self._clean_folder_contents(self._execution_root)
            return self.uploaded_assets
        except Exception as e:
            error = format_exception(e)
            self.update_status(Status.failed, error)
            raise e

    def _clean_folder_contents(self, folder_path: Path, remove_folder: bool = True):
        """Clean up folder contents and optionally the folder itself.

        Removes all files and subdirectories within the specified folder.
        Uses retry logic for Windows compatibility where files may be temporarily locked.

        Args:
            folder_path: Path to the folder to clean.
            remove_folder: If True (default), also remove the folder itself after
                cleaning its contents. If False, only remove contents.
        """
        MAX_RETRIES = 3
        RETRY_DELAY = 1  # seconds

        def remove_with_retry(path: Path, is_dir: bool = False) -> bool:
            for attempt in range(MAX_RETRIES):
                try:
                    if is_dir:
                        shutil.rmtree(path)
                    else:
                        Path(path).unlink()
                    return True
                except (OSError, PermissionError) as e:
                    if attempt == MAX_RETRIES - 1:
                        logging.warning(f"Failed to remove {path}: {e}")
                        return False
                    time.sleep(RETRY_DELAY)
            return False

        try:
            # First remove all contents
            with os.scandir(folder_path) as entries:
                for entry in entries:
                    if entry.is_dir() and not entry.is_symlink():
                        remove_with_retry(Path(entry.path), is_dir=True)
                    else:
                        remove_with_retry(Path(entry.path))

            # Then remove the folder itself if requested
            if remove_folder:
                remove_with_retry(folder_path, is_dir=True)

        except OSError as e:
            logging.warning(f"Failed to clean folder {folder_path}: {e}")

    def _update_feature_table(
        self,
        target_table: str,
        feature_name: str,
        feature_file: str | Path,
        uploaded_files: dict[str, list[AssetFilePath]],
    ) -> None:
        """Update the feature table with values from a JSONL file.

        Reads feature values from a file and inserts them into the catalog,
        replacing file paths with the RIDs of uploaded assets.

        Args:
            target_table: Name of the table the feature is defined on.
            feature_name: Name of the feature to update.
            feature_file: Path to JSONL file containing feature values.
            uploaded_files: Map from asset table names to their uploaded AssetFilePath objects.
        """

        # Get the column names of all the Feature columns that should be the RID of an asset
        asset_columns = [
            c.name for c in self._ml_object.feature_record_class(target_table, feature_name).feature.asset_columns
        ]

        # Get the names of the columns in the feature that are assets.
        asset_columns = [
            c.name for c in self._ml_object.feature_record_class(target_table, feature_name).feature.asset_columns
        ]

        feature_table = self._ml_object.feature_record_class(target_table, feature_name).feature.feature_table.name
        asset_map = {
            (asset_table, asset.file_name): asset.asset_rid
            for asset_table, assets in uploaded_files.items()
            for asset in assets
        }

        def map_path(e):
            """Go through the asset columns and replace the file name with the RID for the uploaded file."""
            for c in asset_columns:
                e[c] = asset_map[normalize_asset_dir(e[c])]
            return e

        # Load the JSON file that has the set of records that contain the feature values.
        with Path(feature_file).open("r") as feature_values:
            entities = [json.loads(line.strip()) for line in feature_values]
        # Update the asset columns in the feature and add to the catalog.
        self._ml_object.domain_path().tables[feature_table].insert([map_path(e) for e in entities], on_conflict_skip=True)

    def _update_asset_execution_table(
        self,
        uploaded_assets: dict[str, list[AssetFilePath]],
        asset_role: str = "Output",
    ) -> None:
        """Add entry to the association table connecting an asset to an execution RID

        Args:
            uploaded_assets: Dictionary whose key is the name of an asset table and whose value is a list of RIDs for
                newly added assets to that table.
             asset_role: A term or list of terms from the Asset_Role vocabulary.
        """
        # Make sure the asset role is in the controlled vocabulary table.
        if self._dry_run:
            # Don't do any updates of we are doing a dry run.
            return
        self._ml_object.lookup_term(MLVocab.asset_role, asset_role)

        pb = self._ml_object.pathBuilder()
        for asset_table, asset_list in uploaded_assets.items():
            asset_table_name = asset_table.split("/")[1]  # Peel off the schema from the asset table
            asset_exe, asset_fk, execution_fk = self._model.find_association(asset_table_name, "Execution")
            asset_exe_path = pb.schemas[asset_exe.schema.name].tables[asset_exe.name]

            asset_exe_path.insert(
                [
                    {
                        asset_fk: asset_path.asset_rid,
                        execution_fk: self.execution_rid,
                        "Asset_Role": asset_role,
                    }
                    for asset_path in asset_list
                ],
                on_conflict_skip=True,
            )

            # Now add in the type names via the asset_asset_type association table.
            # Get the list of types for each file in the asset.
            if asset_role == "Input":
                return
            asset_type_map = {}
            with Path(
                asset_type_path(
                    self._working_dir,
                    self.execution_rid,
                    self._model.name_to_table(asset_table_name),
                )
            ).open("r") as asset_type_file:
                for line in asset_type_file:
                    asset_type_map.update(json.loads(line.strip()))
            for asset_path in asset_list:
                asset_path.asset_types = asset_type_map[asset_path.file_name]

            asset_asset_type, _, _ = self._model.find_association(asset_table_name, "Asset_Type")
            type_path = pb.schemas[asset_asset_type.schema.name].tables[asset_asset_type.name]

            type_path.insert(
                [
                    {asset_table_name: asset.asset_rid, "Asset_Type": t}
                    for asset in asset_list
                    for t in asset_type_map[asset.file_name]
                ],
                on_conflict_skip=True,
            )

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def asset_file_path(
        self,
        asset_name: str,
        file_name: str | Path,
        asset_types: list[str] | str | None = None,
        copy_file=False,
        rename_file: str | None = None,
        metadata=None,
        description: str | None = None,
        **kwargs,
    ) -> AssetFilePath:
        """Register a file for upload and return a path to write to.

        This routine has three modes depending on whether file_name refers to an existing file:
        1. **New file**: file_name doesn't exist — returns a path to write to.
        2. **Symlink**: file_name exists, copy_file=False — symlinks into staging.
        3. **Copy**: file_name exists, copy_file=True — copies into staging.

        Files are stored in a flat per-table directory (``assets/{AssetTable}/``).
        Metadata is tracked in a persistent JSON manifest for crash safety.
        Metadata can be set at registration time via the ``metadata`` parameter
        (an AssetRecord or dict) or incrementally after via the returned
        AssetFilePath's ``metadata`` property.

        Args:
            asset_name: Name of the asset table. Must be a valid asset table.
            file_name: Name of file to be uploaded, or path to an existing file.
            asset_types: Asset type terms from Asset_Type vocabulary. Defaults to asset_name.
            copy_file: Whether to copy the file rather than creating a symbolic link.
            rename_file: If provided, rename the file during staging.
            metadata: An AssetRecord instance or dict of metadata column values.
            description: Optional description for the asset record.
            **kwargs: Additional metadata values (legacy support, merged with metadata).

        Returns:
            AssetFilePath bound to the manifest for write-through metadata updates.

        Raises:
            DerivaMLException: If the asset table doesn't exist.
            DerivaMLValidationError: If asset_types contains invalid terms.
        """
        if not self._model.is_asset(asset_name):
            raise DerivaMLException(f"Table {asset_name} is not an asset")

        asset_table = self._model.name_to_table(asset_name)
        schema = asset_table.schema.name

        # Validate and normalize asset types
        asset_types = asset_types or kwargs.pop("Asset_Type", None) or asset_name
        asset_types = [asset_types] if isinstance(asset_types, str) else asset_types
        for t in asset_types:
            self._ml_object.lookup_term(MLVocab.asset_type, t)

        # Resolve metadata from AssetRecord, dict, or kwargs
        metadata_dict: dict[str, Any] = {}
        if metadata is not None:
            if hasattr(metadata, "model_dump"):
                metadata_dict = {k: v for k, v in metadata.model_dump().items() if v is not None}
            else:
                metadata_dict = dict(metadata)
        # Merge any kwargs that aren't standard parameters
        metadata_dict.update(kwargs)

        # Determine file name and path
        file_name = Path(file_name)
        if file_name.name == "_implementations.log":
            file_name = file_name.with_name("-implementations.log")

        if not file_name.is_absolute():
            file_name = file_name.resolve()

        target_name = Path(rename_file) if file_name.exists() and rename_file else file_name

        # Store file in flat per-table directory
        flat_dir = flat_asset_dir(self._working_dir, self.execution_rid, asset_name)
        flat_path = flat_dir / target_name.name

        if file_name.exists():
            if copy_file:
                flat_path.write_bytes(file_name.read_bytes())
            else:
                try:
                    flat_path.symlink_to(file_name)
                except (OSError, PermissionError):
                    flat_path.write_bytes(file_name.read_bytes())

        # Register in manifest (write-through + fsync)
        manifest = self._get_manifest()
        manifest_key = f"{asset_name}/{target_name.name}"
        manifest.add_asset(manifest_key, AssetEntry(
            asset_table=asset_name,
            schema=schema,
            asset_types=asset_types,
            metadata=metadata_dict,
            description=description,
        ))

        # Also write legacy asset-type JSONL for backward compatibility with upload
        with Path(asset_type_path(self._working_dir, self.execution_rid, asset_table)).open("a") as f:
            f.write(json.dumps({target_name.name: asset_types}) + "\n")

        result = AssetFilePath(
            asset_path=flat_path,
            asset_table=asset_name,
            file_name=target_name.name,
            asset_metadata=metadata_dict,
            asset_types=asset_types,
        )
        result._bind_manifest(manifest, manifest_key)
        return result

    def _get_manifest(self) -> AssetManifest:
        """Get or create the asset manifest for this execution."""
        if not hasattr(self, "_manifest") or self._manifest is None:
            mp = manifest_path(self._working_dir, self.execution_rid)
            self._manifest = AssetManifest(mp, self.execution_rid)
        return self._manifest

    def table_path(self, table: str) -> Path:
        """Return a local file path to a CSV to add values to a table on upload.

        Args:
            table: Name of table to be uploaded.

        Returns:
            Pathlib path to the file in which to place table values.
        """
        # Find which domain schema contains this table
        table_schema = None
        for domain_schema in self._ml_object.domain_schemas:
            if domain_schema in self._model.schemas:
                if table in self._model.schemas[domain_schema].tables:
                    table_schema = domain_schema
                    break

        if table_schema is None:
            raise DerivaMLException("Table '{}' not found in any domain schema".format(table))

        return table_path(self._working_dir, schema=table_schema, table=table)

    def execute(self) -> Execution:
        """Initiate an execution with the provided configuration. Can be used in a context manager."""
        self.execution_start()
        return self

    @validate_call
    def add_features(self, features: Iterable[FeatureRecord]) -> None:
        """Adds feature records to the catalog.

        Associates feature records with this execution and uploads them to the catalog.
        Features represent measurable properties or characteristics of records.

        NOTE: The catalog is not updated until upload_execution_outputs() is called.

        Args:
            features: Feature records to add, each containing a value and metadata.

        Raises:
            DerivaMLException: If feature addition fails or features are invalid.

        Example:
            >>> feature = FeatureRecord(value="high", confidence=0.95)
            >>> execution.add_features([feature])
        """

        # Make sure feature list is homogeneous:
        sorted_features = defaultdict(list)
        for f in features:
            sorted_features[type(f)].append(f)
        for fs in sorted_features.values():
            self._add_features(fs)

    def _add_features(self, features: list[FeatureRecord]) -> None:
        # Update feature records to include current execution_rid
        first_row = features[0]
        feature = first_row.feature
        # Use the schema from the feature table
        feature_schema = feature.feature_table.schema.name
        json_path = feature_value_path(
            self._working_dir,
            schema=feature_schema,
            target_table=feature.target_table.name,
            feature_name=feature.feature_name,
            exec_rid=self.execution_rid,
        )
        with Path(json_path).open("a", encoding="utf-8") as file:
            for feature in features:
                feature.Execution = self.execution_rid
                file.write(json.dumps(feature.model_dump(mode="json", exclude={"RCT"})) + "\n")

    def list_input_datasets(self) -> list[Dataset]:
        """List all datasets that were inputs to this execution.

        Returns:
            List of Dataset objects that were used as inputs.

        Example:
            >>> for ds in execution.list_input_datasets():
            ...     print(f"Input: {ds.dataset_rid} - {ds.description}")
        """
        if self._execution_record is not None:
            return self._execution_record.list_input_datasets()

        # Fallback for dry_run mode
        pb = self._ml_object.pathBuilder()
        dataset_exec = pb.schemas[self._ml_object.ml_schema].Dataset_Execution

        records = list(
            dataset_exec.filter(dataset_exec.Execution == self.execution_rid)
            .entities()
            .fetch()
        )

        return [self._ml_object.lookup_dataset(r["Dataset"]) for r in records]

    def list_assets(self, asset_role: str | None = None) -> list["Asset"]:
        """List all assets that were inputs or outputs of this execution.

        Args:
            asset_role: Optional filter: "Input" or "Output". If None, returns all.

        Returns:
            List of Asset objects associated with this execution.

        Example:
            >>> inputs = execution.list_assets(asset_role="Input")
            >>> outputs = execution.list_assets(asset_role="Output")
        """
        if self._execution_record is not None:
            return self._execution_record.list_assets(asset_role=asset_role)

        # Fallback for dry_run mode

        pb = self._ml_object.pathBuilder()
        asset_exec = pb.schemas[self._ml_object.ml_schema].Execution_Asset_Execution

        query = asset_exec.filter(asset_exec.Execution == self.execution_rid)
        if asset_role:
            query = query.filter(asset_exec.Asset_Role == asset_role)

        records = list(query.entities().fetch())

        assets = []
        for r in records:
            try:
                asset = self._ml_object.lookup_asset(r["Execution_Asset"])
                assets.append(asset)
            except Exception:
                pass  # Skip assets that can't be looked up
        return assets

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def create_dataset(
        self,
        dataset_types: str | list[str] | None = None,
        version: DatasetVersion | str | None = None,
        description: str = "",
    ) -> Dataset:
        """Create a new dataset with specified types.

        Creates a dataset associated with this execution for provenance tracking.

        Args:
            dataset_types: One or more dataset type terms from Dataset_Type vocabulary.
            description: Markdown description of the dataset being created.
            version: Dataset version. Defaults to 0.1.0.

        Returns:
            The newly created Dataset.
        """
        return Dataset.create_dataset(
            ml_instance=self._ml_object,
            execution_rid=self.execution_rid,
            dataset_types=dataset_types,
            version=version,
            description=description,
        )

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def add_files(
        self,
        files: Iterable[FileSpec],
        dataset_types: str | list[str] | None = None,
        description: str = "",
    ) -> "Dataset":
        """Adds files to the catalog with their metadata.

        Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with
        specified file types.

        Args:
            files: File specifications containing MD5 checksum, length, and URL.
            dataset_types: One or more dataset type terms from File_Type vocabulary.
            description: Description of the files.

        Returns:
            RID: Dataset  that identifies newly added files. Will be nested to mirror original directory structure
            of the files.

        Raises:
            DerivaMLInvalidTerm: If file_types are invalid or execution_rid is not an execution record.

        Examples:
            Add a single file type:
                >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)]
                >>> rids = exe.add_files(files, file_types="text")

            Add multiple file types:
                >>> rids = exe.add_files(
                ...     files=[FileSpec(url="image.png", md5="def456", length=2000)],
                ...     file_types=["image", "png"],
                ... )
        """
        return self._ml_object.add_files(
            files=files,
            execution_rid=self.execution_rid,
            dataset_types=dataset_types,
            description=description,
        )

    # =========================================================================
    # Execution Nesting Methods
    # =========================================================================

    def add_nested_execution(
        self,
        nested_execution: "Execution | ExecutionRecord | RID",
        sequence: int | None = None,
    ) -> None:
        """Add a nested (child) execution to this execution.

        Creates a parent-child relationship between this execution and another.
        This is useful for grouping related executions, such as parameter sweeps
        or pipeline stages.

        Args:
            nested_execution: The child execution to add (Execution, ExecutionRecord, or RID).
            sequence: Optional ordering index (0, 1, 2...). Use None for parallel executions.

        Raises:
            DerivaMLException: If the association cannot be created.

        Example:
            >>> parent_exec = ml.create_execution(parent_config)
            >>> child_exec = ml.create_execution(child_config)
            >>> parent_exec.add_nested_execution(child_exec, sequence=0)
        """
        if self._dry_run:
            return

        # Get the RID from the nested execution
        if isinstance(nested_execution, Execution):
            nested_rid = nested_execution.execution_rid
        elif isinstance(nested_execution, ExecutionRecord):
            nested_rid = nested_execution.execution_rid
        else:
            nested_rid = nested_execution

        # Delegate to ExecutionRecord if available
        if self._execution_record is not None:
            self._execution_record.add_nested_execution(nested_rid, sequence=sequence)
        else:
            # Fallback for cases without execution record
            pb = self._ml_object.pathBuilder()
            execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

            record = {
                "Execution": self.execution_rid,
                "Nested_Execution": nested_rid,
            }
            if sequence is not None:
                record["Sequence"] = sequence

            execution_execution.insert([record])

    def list_nested_executions(
        self,
        recurse: bool = False,
        _visited: set[RID] | None = None,
    ) -> list["ExecutionRecord"]:
        """List all nested (child) executions of this execution.

        Args:
            recurse: If True, recursively return all descendant executions.
            _visited: Internal parameter to track visited executions and prevent infinite recursion.

        Returns:
            List of nested ExecutionRecord objects, ordered by sequence if available.
            To get full Execution objects with lifecycle management, use restore_execution().

        Example:
            >>> children = parent_exec.list_nested_executions()
            >>> all_descendants = parent_exec.list_nested_executions(recurse=True)
        """
        if self._execution_record is not None:
            return list(self._execution_record.list_nested_executions(recurse=recurse, _visited=_visited))

        # Fallback for dry_run mode
        if _visited is None:
            _visited = set()

        if self.execution_rid in _visited:
            return []
        _visited.add(self.execution_rid)

        pb = self._ml_object.pathBuilder()
        execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

        # Query for nested executions, ordered by sequence
        nested = list(
            execution_execution.filter(execution_execution.Execution == self.execution_rid)
            .entities()
            .fetch()
        )

        # Sort by sequence (None values at the end)
        nested.sort(key=lambda x: (x.get("Sequence") is None, x.get("Sequence")))

        children = []
        for record in nested:
            child = self._ml_object.lookup_execution(record["Nested_Execution"])
            children.append(child)
            if recurse:
                children.extend(child.list_nested_executions(recurse=True, _visited=_visited))

        return children

    def list_parent_executions(
        self,
        recurse: bool = False,
        _visited: set[RID] | None = None,
    ) -> list["ExecutionRecord"]:
        """List all parent executions that contain this execution as a nested child.

        Args:
            recurse: If True, recursively return all ancestor executions.
            _visited: Internal parameter to track visited executions and prevent infinite recursion.

        Returns:
            List of parent ExecutionRecord objects.
            To get full Execution objects with lifecycle management, use restore_execution().

        Example:
            >>> parents = child_exec.list_parent_executions()
            >>> all_ancestors = child_exec.list_parent_executions(recurse=True)
        """
        if self._execution_record is not None:
            return list(self._execution_record.list_parent_executions(recurse=recurse, _visited=_visited))

        # Fallback for dry_run mode
        if _visited is None:
            _visited = set()

        if self.execution_rid in _visited:
            return []
        _visited.add(self.execution_rid)

        pb = self._ml_object.pathBuilder()
        execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

        parent_records = list(
            execution_execution.filter(execution_execution.Nested_Execution == self.execution_rid)
            .entities()
            .fetch()
        )

        parents = []
        for record in parent_records:
            parent = self._ml_object.lookup_execution(record["Execution"])
            parents.append(parent)
            if recurse:
                parents.extend(parent.list_parent_executions(recurse=True, _visited=_visited))

        return parents

    def is_nested(self) -> bool:
        """Check if this execution is nested within another execution.

        Returns:
            True if this execution has at least one parent execution.
        """
        if self._execution_record is not None:
            return self._execution_record.is_nested()
        return len(self.list_parent_executions()) > 0

    def is_parent(self) -> bool:
        """Check if this execution has nested child executions.

        Returns:
            True if this execution has at least one nested execution.
        """
        if self._execution_record is not None:
            return self._execution_record.is_parent()
        return len(self.list_nested_executions()) > 0

    def __str__(self):
        items = [
            f"caching_dir: {self._cache_dir}",
            f"_working_dir: {self._working_dir}",
            f"execution_rid: {self.execution_rid}",
            f"workflow_rid: {self.workflow_rid}",
            f"asset_paths: {self.asset_paths}",
            f"configuration: {self.configuration}",
        ]
        return "\n".join(items)

    def __enter__(self):
        """
        Method invoked when entering the context.

        Returns:
        - self: The instance itself.

        """
        self.execution_start()
        return self

    def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> bool:
        """
        Method invoked when exiting the context.

        Args:
           exc_type: Exception type.
           exc_value: Exception value.
           exc_tb: Exception traceback.

        Returns:
           bool: True if execution completed successfully, False otherwise.
        """
        if not exc_type:
            self.update_status(Status.running, "Successfully run Ml.")
            self.execution_stop()
            return True
        else:
            self.update_status(
                Status.failed,
                f"Exception type: {exc_type}, Exception value: {exc_value}",
            )
            logging.error(f"Exception type: {exc_type}, Exception value: {exc_value}, Exception traceback: {exc_tb}")
            return False

catalog property

catalog: 'DerivaML'

Get the live catalog (DerivaML) instance for this execution.

This provides access to the live catalog for operations that require catalog connectivity, such as looking up datasets or other read operations.

Returns:

Name Type Description
DerivaML 'DerivaML'

The live catalog instance.

Example

with ml.create_execution(config) as exe: ... # Use live catalog for lookups ... existing_dataset = exe.catalog.lookup_dataset("1-ABC")

database_catalog property

database_catalog: (
    DerivaMLDatabase | None
)

Get a catalog-like interface for downloaded datasets.

Returns a DerivaMLDatabase that implements the DerivaMLCatalog protocol, allowing the same code to work with both live catalogs and downloaded bags.

This is useful for writing code that can operate on either a live catalog (via DerivaML) or on downloaded bags (via DerivaMLDatabase).

Returns:

Type Description
DerivaMLDatabase | None

DerivaMLDatabase wrapping the primary downloaded dataset's model,

DerivaMLDatabase | None

or None if no datasets have been downloaded.

Example

with ml.create_execution(config) as exe: ... if exe.database_catalog: ... db = exe.database_catalog ... # Use same interface as DerivaML ... dataset = db.lookup_dataset("4HM") ... term = db.lookup_term("Diagnosis", "cancer") ... else: ... # No datasets downloaded, use live catalog ... pass

execution_record property

execution_record: ExecutionRecord | None

Get the ExecutionRecord for catalog operations.

Returns:

Type Description
ExecutionRecord | None

ExecutionRecord if not in dry_run mode, None otherwise.

status property writable

status: Status

Get the current execution status.

Returns:

Name Type Description
Status Status

The current status (Created, Running, Completed, Failed, etc.).

working_dir property

working_dir: Path

Return the working directory for the execution.

__enter__

__enter__()

Method invoked when entering the context.

Returns: - self: The instance itself.

Source code in src/deriva_ml/execution/execution.py
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
def __enter__(self):
    """
    Method invoked when entering the context.

    Returns:
    - self: The instance itself.

    """
    self.execution_start()
    return self

__exit__

__exit__(
    exc_type: Any,
    exc_value: Any,
    exc_tb: Any,
) -> bool

Method invoked when exiting the context.

Parameters:

Name Type Description Default
exc_type Any

Exception type.

required
exc_value Any

Exception value.

required
exc_tb Any

Exception traceback.

required

Returns:

Name Type Description
bool bool

True if execution completed successfully, False otherwise.

Source code in src/deriva_ml/execution/execution.py
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> bool:
    """
    Method invoked when exiting the context.

    Args:
       exc_type: Exception type.
       exc_value: Exception value.
       exc_tb: Exception traceback.

    Returns:
       bool: True if execution completed successfully, False otherwise.
    """
    if not exc_type:
        self.update_status(Status.running, "Successfully run Ml.")
        self.execution_stop()
        return True
    else:
        self.update_status(
            Status.failed,
            f"Exception type: {exc_type}, Exception value: {exc_value}",
        )
        logging.error(f"Exception type: {exc_type}, Exception value: {exc_value}, Exception traceback: {exc_tb}")
        return False

__init__

__init__(
    configuration: ExecutionConfiguration,
    ml_object: DerivaML,
    workflow: Workflow | None = None,
    reload: RID | None = None,
    dry_run: bool = False,
)

Initializes an Execution instance.

Creates a new execution or reloads an existing one. Initializes the execution environment, downloads required datasets, and sets up asset tracking.

Parameters:

Name Type Description Default
configuration ExecutionConfiguration

Settings and parameters for the execution.

required
ml_object DerivaML

DerivaML instance managing the execution.

required
workflow Workflow | None

Optional Workflow object. If not specified, the workflow is taken from the ExecutionConfiguration object. Must be a Workflow object, not a RID.

None
reload RID | None

Optional RID of existing execution to reload.

None
dry_run bool

If True, don't create catalog records or upload results.

False

Raises:

Type Description
DerivaMLException

If initialization fails, configuration is invalid, or workflow is not a Workflow object.

Example

Create an execution with a workflow::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> config = ExecutionConfiguration(
...     workflow=workflow,
...     description="Process data"
... )
>>> execution = Execution(config, ml)

Or pass workflow separately::

>>> workflow = ml.lookup_workflow_by_url(
...     "https://github.com/org/repo/blob/abc123/analysis.py"
... )
>>> config = ExecutionConfiguration(description="Run analysis")
>>> execution = Execution(config, ml, workflow=workflow)
Source code in src/deriva_ml/execution/execution.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def __init__(
    self,
    configuration: ExecutionConfiguration,
    ml_object: DerivaML,
    workflow: Workflow | None = None,
    reload: RID | None = None,
    dry_run: bool = False,
):
    """Initializes an Execution instance.

    Creates a new execution or reloads an existing one. Initializes the execution
    environment, downloads required datasets, and sets up asset tracking.

    Args:
        configuration: Settings and parameters for the execution.
        ml_object: DerivaML instance managing the execution.
        workflow: Optional Workflow object. If not specified, the workflow is taken from
            the ExecutionConfiguration object. Must be a Workflow object, not a RID.
        reload: Optional RID of existing execution to reload.
        dry_run: If True, don't create catalog records or upload results.

    Raises:
        DerivaMLException: If initialization fails, configuration is invalid,
            or workflow is not a Workflow object.

    Example:
        Create an execution with a workflow::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> config = ExecutionConfiguration(
            ...     workflow=workflow,
            ...     description="Process data"
            ... )
            >>> execution = Execution(config, ml)

        Or pass workflow separately::

            >>> workflow = ml.lookup_workflow_by_url(
            ...     "https://github.com/org/repo/blob/abc123/analysis.py"
            ... )
            >>> config = ExecutionConfiguration(description="Run analysis")
            >>> execution = Execution(config, ml, workflow=workflow)
    """

    self.asset_paths: dict[str, list[AssetFilePath]] = {}
    self.configuration = configuration
    self._ml_object = ml_object
    self._model = ml_object.model
    self._logger = ml_object._logger
    self.start_time = None
    self.stop_time = None
    self._status = Status.created
    self.uploaded_assets: dict[str, list[AssetFilePath]] | None = None
    self.configuration.argv = sys.argv
    self._execution_record: ExecutionRecord | None = None  # Lazily created after RID is assigned

    self.dataset_rids: List[RID] = []
    self.datasets: list[DatasetBag] = []

    self._working_dir = self._ml_object.working_dir
    self._cache_dir = self._ml_object.cache_dir
    if self._working_dir is None:
        raise DerivaMLException(
            "DerivaML working_dir is not set. "
            "Ensure the DerivaML instance was initialized with a valid working_dir."
        )
    self._dry_run = dry_run

    # Make sure we have a valid Workflow object.
    if workflow:
        self.configuration.workflow = workflow

    if self.configuration.workflow is None:
        raise DerivaMLException("Workflow must be specified either in configuration or as a parameter")

    if not isinstance(self.configuration.workflow, Workflow):
        raise DerivaMLException(
            f"Workflow must be a Workflow object, not {type(self.configuration.workflow).__name__}. "
            "Use ml.lookup_workflow(rid) or ml.lookup_workflow_by_url(url) to get a Workflow object."
        )

    # Validate workflow type(s) and register in catalog
    for wt in self.configuration.workflow.workflow_type:
        self._ml_object.lookup_term(MLVocab.workflow_type, wt)
    self.workflow_rid = (
        self._ml_object.add_workflow(self.configuration.workflow) if not self._dry_run else DRY_RUN_RID
    )

    # Validate the datasets and assets to be valid.
    for d in self.configuration.datasets:
        if self._ml_object.resolve_rid(d.rid).table.name != "Dataset":
            raise DerivaMLException("Dataset specified in execution configuration is not a dataset")

    for a in self.configuration.assets:
        if not self._model.is_asset(self._ml_object.resolve_rid(a.rid).table.name):
            raise DerivaMLException("Asset specified in execution configuration is not an asset table")

    schema_path = self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema]
    if reload:
        self.execution_rid = reload
        if self.execution_rid == DRY_RUN_RID:
            self._dry_run = True
    elif self._dry_run:
        self.execution_rid = DRY_RUN_RID
    else:
        self.execution_rid = schema_path.Execution.insert(
            [
                {
                    "Description": self.configuration.description,
                    "Workflow": self.workflow_rid,
                }
            ]
        )[0]["RID"]

    if rid_path := os.environ.get("DERIVA_ML_SAVE_EXECUTION_RID", None):
        # Put execution_rid into the provided file path so we can find it later.
        with Path(rid_path).open("w") as f:
            json.dump(
                {
                    "hostname": self._ml_object.host_name,
                    "catalog_id": self._ml_object.catalog_id,
                    "workflow_rid": self.workflow_rid,
                    "execution_rid": self.execution_rid,
                },
                f,
            )

    # Create a directory for execution rid so we can recover the state in case of a crash.
    execution_root(prefix=self._ml_object.working_dir, exec_rid=self.execution_rid)

    # Create the ExecutionRecord to handle catalog state operations
    if not self._dry_run:
        self._execution_record = ExecutionRecord(
            execution_rid=self.execution_rid,
            workflow=self.configuration.workflow,
            status=Status.created,
            description=self.configuration.description,
            _ml_instance=self._ml_object,
            _logger=self._logger,
        )

    self._initialize_execution(reload)

add_features

add_features(
    features: Iterable[FeatureRecord],
) -> None

Adds feature records to the catalog.

Associates feature records with this execution and uploads them to the catalog. Features represent measurable properties or characteristics of records.

NOTE: The catalog is not updated until upload_execution_outputs() is called.

Parameters:

Name Type Description Default
features Iterable[FeatureRecord]

Feature records to add, each containing a value and metadata.

required

Raises:

Type Description
DerivaMLException

If feature addition fails or features are invalid.

Example

feature = FeatureRecord(value="high", confidence=0.95) execution.add_features([feature])

Source code in src/deriva_ml/execution/execution.py
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
@validate_call
def add_features(self, features: Iterable[FeatureRecord]) -> None:
    """Adds feature records to the catalog.

    Associates feature records with this execution and uploads them to the catalog.
    Features represent measurable properties or characteristics of records.

    NOTE: The catalog is not updated until upload_execution_outputs() is called.

    Args:
        features: Feature records to add, each containing a value and metadata.

    Raises:
        DerivaMLException: If feature addition fails or features are invalid.

    Example:
        >>> feature = FeatureRecord(value="high", confidence=0.95)
        >>> execution.add_features([feature])
    """

    # Make sure feature list is homogeneous:
    sorted_features = defaultdict(list)
    for f in features:
        sorted_features[type(f)].append(f)
    for fs in sorted_features.values():
        self._add_features(fs)

add_files

add_files(
    files: Iterable[FileSpec],
    dataset_types: str
    | list[str]
    | None = None,
    description: str = "",
) -> "Dataset"

Adds files to the catalog with their metadata.

Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with specified file types.

Parameters:

Name Type Description Default
files Iterable[FileSpec]

File specifications containing MD5 checksum, length, and URL.

required
dataset_types str | list[str] | None

One or more dataset type terms from File_Type vocabulary.

None
description str

Description of the files.

''

Returns:

Name Type Description
RID 'Dataset'

Dataset that identifies newly added files. Will be nested to mirror original directory structure

'Dataset'

of the files.

Raises:

Type Description
DerivaMLInvalidTerm

If file_types are invalid or execution_rid is not an execution record.

Examples:

Add a single file type: >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)] >>> rids = exe.add_files(files, file_types="text")

Add multiple file types: >>> rids = exe.add_files( ... files=[FileSpec(url="image.png", md5="def456", length=2000)], ... file_types=["image", "png"], ... )

Source code in src/deriva_ml/execution/execution.py
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def add_files(
    self,
    files: Iterable[FileSpec],
    dataset_types: str | list[str] | None = None,
    description: str = "",
) -> "Dataset":
    """Adds files to the catalog with their metadata.

    Registers files in the catalog along with their metadata (MD5, length, URL) and associates them with
    specified file types.

    Args:
        files: File specifications containing MD5 checksum, length, and URL.
        dataset_types: One or more dataset type terms from File_Type vocabulary.
        description: Description of the files.

    Returns:
        RID: Dataset  that identifies newly added files. Will be nested to mirror original directory structure
        of the files.

    Raises:
        DerivaMLInvalidTerm: If file_types are invalid or execution_rid is not an execution record.

    Examples:
        Add a single file type:
            >>> files = [FileSpec(url="path/to/file.txt", md5="abc123", length=1000)]
            >>> rids = exe.add_files(files, file_types="text")

        Add multiple file types:
            >>> rids = exe.add_files(
            ...     files=[FileSpec(url="image.png", md5="def456", length=2000)],
            ...     file_types=["image", "png"],
            ... )
    """
    return self._ml_object.add_files(
        files=files,
        execution_rid=self.execution_rid,
        dataset_types=dataset_types,
        description=description,
    )

add_nested_execution

add_nested_execution(
    nested_execution: "Execution | ExecutionRecord | RID",
    sequence: int | None = None,
) -> None

Add a nested (child) execution to this execution.

Creates a parent-child relationship between this execution and another. This is useful for grouping related executions, such as parameter sweeps or pipeline stages.

Parameters:

Name Type Description Default
nested_execution 'Execution | ExecutionRecord | RID'

The child execution to add (Execution, ExecutionRecord, or RID).

required
sequence int | None

Optional ordering index (0, 1, 2...). Use None for parallel executions.

None

Raises:

Type Description
DerivaMLException

If the association cannot be created.

Example

parent_exec = ml.create_execution(parent_config) child_exec = ml.create_execution(child_config) parent_exec.add_nested_execution(child_exec, sequence=0)

Source code in src/deriva_ml/execution/execution.py
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
def add_nested_execution(
    self,
    nested_execution: "Execution | ExecutionRecord | RID",
    sequence: int | None = None,
) -> None:
    """Add a nested (child) execution to this execution.

    Creates a parent-child relationship between this execution and another.
    This is useful for grouping related executions, such as parameter sweeps
    or pipeline stages.

    Args:
        nested_execution: The child execution to add (Execution, ExecutionRecord, or RID).
        sequence: Optional ordering index (0, 1, 2...). Use None for parallel executions.

    Raises:
        DerivaMLException: If the association cannot be created.

    Example:
        >>> parent_exec = ml.create_execution(parent_config)
        >>> child_exec = ml.create_execution(child_config)
        >>> parent_exec.add_nested_execution(child_exec, sequence=0)
    """
    if self._dry_run:
        return

    # Get the RID from the nested execution
    if isinstance(nested_execution, Execution):
        nested_rid = nested_execution.execution_rid
    elif isinstance(nested_execution, ExecutionRecord):
        nested_rid = nested_execution.execution_rid
    else:
        nested_rid = nested_execution

    # Delegate to ExecutionRecord if available
    if self._execution_record is not None:
        self._execution_record.add_nested_execution(nested_rid, sequence=sequence)
    else:
        # Fallback for cases without execution record
        pb = self._ml_object.pathBuilder()
        execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

        record = {
            "Execution": self.execution_rid,
            "Nested_Execution": nested_rid,
        }
        if sequence is not None:
            record["Sequence"] = sequence

        execution_execution.insert([record])

asset_file_path

asset_file_path(
    asset_name: str,
    file_name: str | Path,
    asset_types: list[str]
    | str
    | None = None,
    copy_file=False,
    rename_file: str | None = None,
    metadata=None,
    description: str | None = None,
    **kwargs,
) -> AssetFilePath

Register a file for upload and return a path to write to.

This routine has three modes depending on whether file_name refers to an existing file: 1. New file: file_name doesn't exist — returns a path to write to. 2. Symlink: file_name exists, copy_file=False — symlinks into staging. 3. Copy: file_name exists, copy_file=True — copies into staging.

Files are stored in a flat per-table directory (assets/{AssetTable}/). Metadata is tracked in a persistent JSON manifest for crash safety. Metadata can be set at registration time via the metadata parameter (an AssetRecord or dict) or incrementally after via the returned AssetFilePath's metadata property.

Parameters:

Name Type Description Default
asset_name str

Name of the asset table. Must be a valid asset table.

required
file_name str | Path

Name of file to be uploaded, or path to an existing file.

required
asset_types list[str] | str | None

Asset type terms from Asset_Type vocabulary. Defaults to asset_name.

None
copy_file

Whether to copy the file rather than creating a symbolic link.

False
rename_file str | None

If provided, rename the file during staging.

None
metadata

An AssetRecord instance or dict of metadata column values.

None
description str | None

Optional description for the asset record.

None
**kwargs

Additional metadata values (legacy support, merged with metadata).

{}

Returns:

Type Description
AssetFilePath

AssetFilePath bound to the manifest for write-through metadata updates.

Raises:

Type Description
DerivaMLException

If the asset table doesn't exist.

DerivaMLValidationError

If asset_types contains invalid terms.

Source code in src/deriva_ml/execution/execution.py
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def asset_file_path(
    self,
    asset_name: str,
    file_name: str | Path,
    asset_types: list[str] | str | None = None,
    copy_file=False,
    rename_file: str | None = None,
    metadata=None,
    description: str | None = None,
    **kwargs,
) -> AssetFilePath:
    """Register a file for upload and return a path to write to.

    This routine has three modes depending on whether file_name refers to an existing file:
    1. **New file**: file_name doesn't exist — returns a path to write to.
    2. **Symlink**: file_name exists, copy_file=False — symlinks into staging.
    3. **Copy**: file_name exists, copy_file=True — copies into staging.

    Files are stored in a flat per-table directory (``assets/{AssetTable}/``).
    Metadata is tracked in a persistent JSON manifest for crash safety.
    Metadata can be set at registration time via the ``metadata`` parameter
    (an AssetRecord or dict) or incrementally after via the returned
    AssetFilePath's ``metadata`` property.

    Args:
        asset_name: Name of the asset table. Must be a valid asset table.
        file_name: Name of file to be uploaded, or path to an existing file.
        asset_types: Asset type terms from Asset_Type vocabulary. Defaults to asset_name.
        copy_file: Whether to copy the file rather than creating a symbolic link.
        rename_file: If provided, rename the file during staging.
        metadata: An AssetRecord instance or dict of metadata column values.
        description: Optional description for the asset record.
        **kwargs: Additional metadata values (legacy support, merged with metadata).

    Returns:
        AssetFilePath bound to the manifest for write-through metadata updates.

    Raises:
        DerivaMLException: If the asset table doesn't exist.
        DerivaMLValidationError: If asset_types contains invalid terms.
    """
    if not self._model.is_asset(asset_name):
        raise DerivaMLException(f"Table {asset_name} is not an asset")

    asset_table = self._model.name_to_table(asset_name)
    schema = asset_table.schema.name

    # Validate and normalize asset types
    asset_types = asset_types or kwargs.pop("Asset_Type", None) or asset_name
    asset_types = [asset_types] if isinstance(asset_types, str) else asset_types
    for t in asset_types:
        self._ml_object.lookup_term(MLVocab.asset_type, t)

    # Resolve metadata from AssetRecord, dict, or kwargs
    metadata_dict: dict[str, Any] = {}
    if metadata is not None:
        if hasattr(metadata, "model_dump"):
            metadata_dict = {k: v for k, v in metadata.model_dump().items() if v is not None}
        else:
            metadata_dict = dict(metadata)
    # Merge any kwargs that aren't standard parameters
    metadata_dict.update(kwargs)

    # Determine file name and path
    file_name = Path(file_name)
    if file_name.name == "_implementations.log":
        file_name = file_name.with_name("-implementations.log")

    if not file_name.is_absolute():
        file_name = file_name.resolve()

    target_name = Path(rename_file) if file_name.exists() and rename_file else file_name

    # Store file in flat per-table directory
    flat_dir = flat_asset_dir(self._working_dir, self.execution_rid, asset_name)
    flat_path = flat_dir / target_name.name

    if file_name.exists():
        if copy_file:
            flat_path.write_bytes(file_name.read_bytes())
        else:
            try:
                flat_path.symlink_to(file_name)
            except (OSError, PermissionError):
                flat_path.write_bytes(file_name.read_bytes())

    # Register in manifest (write-through + fsync)
    manifest = self._get_manifest()
    manifest_key = f"{asset_name}/{target_name.name}"
    manifest.add_asset(manifest_key, AssetEntry(
        asset_table=asset_name,
        schema=schema,
        asset_types=asset_types,
        metadata=metadata_dict,
        description=description,
    ))

    # Also write legacy asset-type JSONL for backward compatibility with upload
    with Path(asset_type_path(self._working_dir, self.execution_rid, asset_table)).open("a") as f:
        f.write(json.dumps({target_name.name: asset_types}) + "\n")

    result = AssetFilePath(
        asset_path=flat_path,
        asset_table=asset_name,
        file_name=target_name.name,
        asset_metadata=metadata_dict,
        asset_types=asset_types,
    )
    result._bind_manifest(manifest, manifest_key)
    return result

create_dataset

create_dataset(
    dataset_types: str
    | list[str]
    | None = None,
    version: DatasetVersion
    | str
    | None = None,
    description: str = "",
) -> Dataset

Create a new dataset with specified types.

Creates a dataset associated with this execution for provenance tracking.

Parameters:

Name Type Description Default
dataset_types str | list[str] | None

One or more dataset type terms from Dataset_Type vocabulary.

None
description str

Markdown description of the dataset being created.

''
version DatasetVersion | str | None

Dataset version. Defaults to 0.1.0.

None

Returns:

Type Description
Dataset

The newly created Dataset.

Source code in src/deriva_ml/execution/execution.py
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def create_dataset(
    self,
    dataset_types: str | list[str] | None = None,
    version: DatasetVersion | str | None = None,
    description: str = "",
) -> Dataset:
    """Create a new dataset with specified types.

    Creates a dataset associated with this execution for provenance tracking.

    Args:
        dataset_types: One or more dataset type terms from Dataset_Type vocabulary.
        description: Markdown description of the dataset being created.
        version: Dataset version. Defaults to 0.1.0.

    Returns:
        The newly created Dataset.
    """
    return Dataset.create_dataset(
        ml_instance=self._ml_object,
        execution_rid=self.execution_rid,
        dataset_types=dataset_types,
        version=version,
        description=description,
    )

download_asset

download_asset(
    asset_rid: RID,
    dest_dir: Path,
    update_catalog: bool = True,
    use_cache: bool = False,
) -> AssetFilePath

Download an asset from a URL and place it in a local directory.

Parameters:

Name Type Description Default
asset_rid RID

RID of the asset.

required
dest_dir Path

Destination directory for the asset.

required
update_catalog bool

Whether to update the catalog execution information after downloading.

True
use_cache bool

If True, check the cache directory for a previously downloaded copy with a matching MD5 checksum before downloading. Cached copies are stored in cache_dir/assets/{rid}_{md5}/ and symlinked into the destination.

False

Returns:

Type Description
AssetFilePath

An AssetFilePath with the path to the downloaded (or cached) asset file.

Source code in src/deriva_ml/execution/execution.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def download_asset(
    self, asset_rid: RID, dest_dir: Path, update_catalog: bool = True, use_cache: bool = False
) -> AssetFilePath:
    """Download an asset from a URL and place it in a local directory.

    Args:
        asset_rid: RID of the asset.
        dest_dir: Destination directory for the asset.
        update_catalog: Whether to update the catalog execution information after downloading.
        use_cache: If True, check the cache directory for a previously downloaded copy
            with a matching MD5 checksum before downloading. Cached copies are stored
            in ``cache_dir/assets/{rid}_{md5}/`` and symlinked into the destination.

    Returns:
        An AssetFilePath with the path to the downloaded (or cached) asset file.
    """

    asset_table = self._ml_object.resolve_rid(asset_rid).table
    if not self._model.is_asset(asset_table):
        raise DerivaMLException(f"RID {asset_rid}  is not for an asset table.")

    asset_record = self._ml_object.retrieve_rid(asset_rid)
    asset_metadata = {k: v for k, v in asset_record.items() if k in self._model.asset_metadata(asset_table)}
    asset_url = asset_record["URL"]
    asset_filename = dest_dir / asset_record["Filename"]

    # Check cache before downloading
    cache_hit = False
    if use_cache:
        md5 = asset_record.get("MD5")
        if md5:
            asset_cache_dir = self._ml_object.cache_dir / "assets"
            asset_cache_dir.mkdir(parents=True, exist_ok=True)
            cache_key = f"{asset_rid}_{md5}"
            cached_file = asset_cache_dir / cache_key / asset_record["Filename"]
            if cached_file.exists():
                # Cache hit — symlink from cache to destination
                self._logger.info(f"Using cached asset {asset_rid} (MD5: {md5})")
                if asset_filename.exists() or asset_filename.is_symlink():
                    asset_filename.unlink()
                asset_filename.symlink_to(cached_file)
                cache_hit = True

    if not cache_hit:
        hs = HatracStore("https", self._ml_object.host_name, self._ml_object.credential)
        hs.get_obj(path=asset_url, destfilename=asset_filename.as_posix())

        # Store in cache for future use
        if use_cache:
            md5 = asset_record.get("MD5")
            if md5:
                asset_cache_dir = self._ml_object.cache_dir / "assets"
                asset_cache_dir.mkdir(parents=True, exist_ok=True)
                cache_key = f"{asset_rid}_{md5}"
                cache_entry_dir = asset_cache_dir / cache_key
                cache_entry_dir.mkdir(parents=True, exist_ok=True)
                cached_file = cache_entry_dir / asset_record["Filename"]
                # Move file to cache, then symlink back
                shutil.move(str(asset_filename), str(cached_file))
                asset_filename.symlink_to(cached_file)
                self._logger.info(f"Cached asset {asset_rid} (MD5: {md5})")

    asset_type_table, _col_l, _col_r = self._model.find_association(asset_table, MLVocab.asset_type)
    type_path = self._ml_object.pathBuilder().schemas[asset_type_table.schema.name].tables[asset_type_table.name]
    asset_types = [
        asset_type[MLVocab.asset_type.value]
        for asset_type in type_path.filter(type_path.columns[asset_table.name] == asset_rid)
        .attributes(type_path.Asset_Type)
        .fetch()
    ]

    asset_path = AssetFilePath(
        file_name=asset_filename,
        asset_rid=asset_rid,
        asset_path=asset_filename,
        asset_metadata=asset_metadata,
        asset_table=asset_table.name,
        asset_types=asset_types,
    )

    if update_catalog:
        self._update_asset_execution_table(
            {f"{asset_table.schema.name}/{asset_table.name}": [asset_path]},
            asset_role="Input",
        )
    return asset_path

download_dataset_bag

download_dataset_bag(
    dataset: DatasetSpec,
) -> DatasetBag

Downloads and materializes a dataset for use in the execution.

Downloads the specified dataset as a BDBag and materializes it in the execution's working directory. The dataset version is determined by the DatasetSpec.

Parameters:

Name Type Description Default
dataset DatasetSpec

Specification of the dataset to download, including version and materialization options.

required

Returns:

Name Type Description
DatasetBag DatasetBag

Object containing: - path: Local filesystem path to downloaded dataset - rid: Dataset's Resource Identifier - minid: Dataset's Minimal Viable Identifier

Raises:

Type Description
DerivaMLException

If download or materialization fails.

Example

spec = DatasetSpec(rid="1-abc123", version="1.2.0") bag = execution.download_dataset_bag(spec) print(f"Downloaded to {bag.path}")

Source code in src/deriva_ml/execution/execution.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def download_dataset_bag(self, dataset: DatasetSpec) -> DatasetBag:
    """Downloads and materializes a dataset for use in the execution.

    Downloads the specified dataset as a BDBag and materializes it in the execution's
    working directory. The dataset version is determined by the DatasetSpec.

    Args:
        dataset: Specification of the dataset to download, including version and
            materialization options.

    Returns:
        DatasetBag: Object containing:
            - path: Local filesystem path to downloaded dataset
            - rid: Dataset's Resource Identifier
            - minid: Dataset's Minimal Viable Identifier

    Raises:
        DerivaMLException: If download or materialization fails.

    Example:
        >>> spec = DatasetSpec(rid="1-abc123", version="1.2.0")
        >>> bag = execution.download_dataset_bag(spec)
        >>> print(f"Downloaded to {bag.path}")
    """
    return self._ml_object.download_dataset_bag(dataset)

execute

execute() -> Execution

Initiate an execution with the provided configuration. Can be used in a context manager.

Source code in src/deriva_ml/execution/execution.py
1411
1412
1413
1414
def execute(self) -> Execution:
    """Initiate an execution with the provided configuration. Can be used in a context manager."""
    self.execution_start()
    return self

execution_start

execution_start() -> None

Marks the execution as started.

Records the start time and updates the execution's status to 'running'. This should be called before beginning the main execution work.

Example

execution.execution_start() try: ... # Run analysis ... execution.execution_stop() ... except Exception: ... execution.update_status(Status.failed, "Analysis error")

Source code in src/deriva_ml/execution/execution.py
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def execution_start(self) -> None:
    """Marks the execution as started.

    Records the start time and updates the execution's status to 'running'.
    This should be called before beginning the main execution work.

    Example:
        >>> execution.execution_start()
        >>> try:
        ...     # Run analysis
        ...     execution.execution_stop()
        ... except Exception:
        ...     execution.update_status(Status.failed, "Analysis error")
    """
    self.start_time = datetime.now()
    self.uploaded_assets = None
    self.update_status(Status.initializing, "Start execution  ...")

execution_stop

execution_stop() -> None

Marks the execution as completed.

Records the stop time and updates the execution's status to 'completed'. This should be called after all execution work is finished.

Example

try: ... # Run analysis ... execution.execution_stop() ... except Exception: ... execution.update_status(Status.failed, "Analysis error")

Source code in src/deriva_ml/execution/execution.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def execution_stop(self) -> None:
    """Marks the execution as completed.

    Records the stop time and updates the execution's status to 'completed'.
    This should be called after all execution work is finished.

    Example:
        >>> try:
        ...     # Run analysis
        ...     execution.execution_stop()
        ... except Exception:
        ...     execution.update_status(Status.failed, "Analysis error")
    """
    self.stop_time = datetime.now()
    duration = self.stop_time - self.start_time
    hours, remainder = divmod(duration.total_seconds(), 3600)
    minutes, seconds = divmod(remainder, 60)
    duration = f"{round(hours, 0)}H {round(minutes, 0)}min {round(seconds, 4)}sec"

    self.update_status(Status.completed, "Algorithm execution ended.")
    if not self._dry_run:
        self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema].Execution.update(
            [{"RID": self.execution_rid, "Duration": duration}]
        )

is_nested

is_nested() -> bool

Check if this execution is nested within another execution.

Returns:

Type Description
bool

True if this execution has at least one parent execution.

Source code in src/deriva_ml/execution/execution.py
1746
1747
1748
1749
1750
1751
1752
1753
1754
def is_nested(self) -> bool:
    """Check if this execution is nested within another execution.

    Returns:
        True if this execution has at least one parent execution.
    """
    if self._execution_record is not None:
        return self._execution_record.is_nested()
    return len(self.list_parent_executions()) > 0

is_parent

is_parent() -> bool

Check if this execution has nested child executions.

Returns:

Type Description
bool

True if this execution has at least one nested execution.

Source code in src/deriva_ml/execution/execution.py
1756
1757
1758
1759
1760
1761
1762
1763
1764
def is_parent(self) -> bool:
    """Check if this execution has nested child executions.

    Returns:
        True if this execution has at least one nested execution.
    """
    if self._execution_record is not None:
        return self._execution_record.is_parent()
    return len(self.list_nested_executions()) > 0

list_assets

list_assets(
    asset_role: str | None = None,
) -> list["Asset"]

List all assets that were inputs or outputs of this execution.

Parameters:

Name Type Description Default
asset_role str | None

Optional filter: "Input" or "Output". If None, returns all.

None

Returns:

Type Description
list['Asset']

List of Asset objects associated with this execution.

Example

inputs = execution.list_assets(asset_role="Input") outputs = execution.list_assets(asset_role="Output")

Source code in src/deriva_ml/execution/execution.py
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
def list_assets(self, asset_role: str | None = None) -> list["Asset"]:
    """List all assets that were inputs or outputs of this execution.

    Args:
        asset_role: Optional filter: "Input" or "Output". If None, returns all.

    Returns:
        List of Asset objects associated with this execution.

    Example:
        >>> inputs = execution.list_assets(asset_role="Input")
        >>> outputs = execution.list_assets(asset_role="Output")
    """
    if self._execution_record is not None:
        return self._execution_record.list_assets(asset_role=asset_role)

    # Fallback for dry_run mode

    pb = self._ml_object.pathBuilder()
    asset_exec = pb.schemas[self._ml_object.ml_schema].Execution_Asset_Execution

    query = asset_exec.filter(asset_exec.Execution == self.execution_rid)
    if asset_role:
        query = query.filter(asset_exec.Asset_Role == asset_role)

    records = list(query.entities().fetch())

    assets = []
    for r in records:
        try:
            asset = self._ml_object.lookup_asset(r["Execution_Asset"])
            assets.append(asset)
        except Exception:
            pass  # Skip assets that can't be looked up
    return assets

list_input_datasets

list_input_datasets() -> list[Dataset]

List all datasets that were inputs to this execution.

Returns:

Type Description
list[Dataset]

List of Dataset objects that were used as inputs.

Example

for ds in execution.list_input_datasets(): ... print(f"Input: {ds.dataset_rid} - {ds.description}")

Source code in src/deriva_ml/execution/execution.py
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
def list_input_datasets(self) -> list[Dataset]:
    """List all datasets that were inputs to this execution.

    Returns:
        List of Dataset objects that were used as inputs.

    Example:
        >>> for ds in execution.list_input_datasets():
        ...     print(f"Input: {ds.dataset_rid} - {ds.description}")
    """
    if self._execution_record is not None:
        return self._execution_record.list_input_datasets()

    # Fallback for dry_run mode
    pb = self._ml_object.pathBuilder()
    dataset_exec = pb.schemas[self._ml_object.ml_schema].Dataset_Execution

    records = list(
        dataset_exec.filter(dataset_exec.Execution == self.execution_rid)
        .entities()
        .fetch()
    )

    return [self._ml_object.lookup_dataset(r["Dataset"]) for r in records]

list_nested_executions

list_nested_executions(
    recurse: bool = False,
    _visited: set[RID] | None = None,
) -> list["ExecutionRecord"]

List all nested (child) executions of this execution.

Parameters:

Name Type Description Default
recurse bool

If True, recursively return all descendant executions.

False
_visited set[RID] | None

Internal parameter to track visited executions and prevent infinite recursion.

None

Returns:

Type Description
list['ExecutionRecord']

List of nested ExecutionRecord objects, ordered by sequence if available.

list['ExecutionRecord']

To get full Execution objects with lifecycle management, use restore_execution().

Example

children = parent_exec.list_nested_executions() all_descendants = parent_exec.list_nested_executions(recurse=True)

Source code in src/deriva_ml/execution/execution.py
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
def list_nested_executions(
    self,
    recurse: bool = False,
    _visited: set[RID] | None = None,
) -> list["ExecutionRecord"]:
    """List all nested (child) executions of this execution.

    Args:
        recurse: If True, recursively return all descendant executions.
        _visited: Internal parameter to track visited executions and prevent infinite recursion.

    Returns:
        List of nested ExecutionRecord objects, ordered by sequence if available.
        To get full Execution objects with lifecycle management, use restore_execution().

    Example:
        >>> children = parent_exec.list_nested_executions()
        >>> all_descendants = parent_exec.list_nested_executions(recurse=True)
    """
    if self._execution_record is not None:
        return list(self._execution_record.list_nested_executions(recurse=recurse, _visited=_visited))

    # Fallback for dry_run mode
    if _visited is None:
        _visited = set()

    if self.execution_rid in _visited:
        return []
    _visited.add(self.execution_rid)

    pb = self._ml_object.pathBuilder()
    execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

    # Query for nested executions, ordered by sequence
    nested = list(
        execution_execution.filter(execution_execution.Execution == self.execution_rid)
        .entities()
        .fetch()
    )

    # Sort by sequence (None values at the end)
    nested.sort(key=lambda x: (x.get("Sequence") is None, x.get("Sequence")))

    children = []
    for record in nested:
        child = self._ml_object.lookup_execution(record["Nested_Execution"])
        children.append(child)
        if recurse:
            children.extend(child.list_nested_executions(recurse=True, _visited=_visited))

    return children

list_parent_executions

list_parent_executions(
    recurse: bool = False,
    _visited: set[RID] | None = None,
) -> list["ExecutionRecord"]

List all parent executions that contain this execution as a nested child.

Parameters:

Name Type Description Default
recurse bool

If True, recursively return all ancestor executions.

False
_visited set[RID] | None

Internal parameter to track visited executions and prevent infinite recursion.

None

Returns:

Type Description
list['ExecutionRecord']

List of parent ExecutionRecord objects.

list['ExecutionRecord']

To get full Execution objects with lifecycle management, use restore_execution().

Example

parents = child_exec.list_parent_executions() all_ancestors = child_exec.list_parent_executions(recurse=True)

Source code in src/deriva_ml/execution/execution.py
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
def list_parent_executions(
    self,
    recurse: bool = False,
    _visited: set[RID] | None = None,
) -> list["ExecutionRecord"]:
    """List all parent executions that contain this execution as a nested child.

    Args:
        recurse: If True, recursively return all ancestor executions.
        _visited: Internal parameter to track visited executions and prevent infinite recursion.

    Returns:
        List of parent ExecutionRecord objects.
        To get full Execution objects with lifecycle management, use restore_execution().

    Example:
        >>> parents = child_exec.list_parent_executions()
        >>> all_ancestors = child_exec.list_parent_executions(recurse=True)
    """
    if self._execution_record is not None:
        return list(self._execution_record.list_parent_executions(recurse=recurse, _visited=_visited))

    # Fallback for dry_run mode
    if _visited is None:
        _visited = set()

    if self.execution_rid in _visited:
        return []
    _visited.add(self.execution_rid)

    pb = self._ml_object.pathBuilder()
    execution_execution = pb.schemas[self._ml_object.ml_schema].Execution_Execution

    parent_records = list(
        execution_execution.filter(execution_execution.Nested_Execution == self.execution_rid)
        .entities()
        .fetch()
    )

    parents = []
    for record in parent_records:
        parent = self._ml_object.lookup_execution(record["Execution"])
        parents.append(parent)
        if recurse:
            parents.extend(parent.list_parent_executions(recurse=True, _visited=_visited))

    return parents

table_path

table_path(table: str) -> Path

Return a local file path to a CSV to add values to a table on upload.

Parameters:

Name Type Description Default
table str

Name of table to be uploaded.

required

Returns:

Type Description
Path

Pathlib path to the file in which to place table values.

Source code in src/deriva_ml/execution/execution.py
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
def table_path(self, table: str) -> Path:
    """Return a local file path to a CSV to add values to a table on upload.

    Args:
        table: Name of table to be uploaded.

    Returns:
        Pathlib path to the file in which to place table values.
    """
    # Find which domain schema contains this table
    table_schema = None
    for domain_schema in self._ml_object.domain_schemas:
        if domain_schema in self._model.schemas:
            if table in self._model.schemas[domain_schema].tables:
                table_schema = domain_schema
                break

    if table_schema is None:
        raise DerivaMLException("Table '{}' not found in any domain schema".format(table))

    return table_path(self._working_dir, schema=table_schema, table=table)

update_status

update_status(
    status: Status, msg: str
) -> None

Updates the execution's status in the catalog.

Records a new status and associated message in the catalog, allowing remote tracking of execution progress.

Parameters:

Name Type Description Default
status Status

New status value (e.g., running, completed, failed).

required
msg str

Description of the status change or current state.

required

Raises:

Type Description
DerivaMLException

If status update fails.

Example

execution.update_status(Status.running, "Processing sample 1 of 10")

Source code in src/deriva_ml/execution/execution.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
@validate_call
def update_status(self, status: Status, msg: str) -> None:
    """Updates the execution's status in the catalog.

    Records a new status and associated message in the catalog, allowing remote
    tracking of execution progress.

    Args:
        status: New status value (e.g., running, completed, failed).
        msg: Description of the status change or current state.

    Raises:
        DerivaMLException: If status update fails.

    Example:
        >>> execution.update_status(Status.running, "Processing sample 1 of 10")
    """
    self._status = status
    self._logger.info(msg)

    if self._dry_run:
        return

    # Delegate to ExecutionRecord for catalog updates
    if self._execution_record is not None:
        self._execution_record.update_status(status, msg)
    else:
        # Fallback for cases where ExecutionRecord isn't available
        self._ml_object.pathBuilder().schemas[self._ml_object.ml_schema].Execution.update(
            [
                {
                    "RID": self.execution_rid,
                    "Status": status.value,
                    "Status_Detail": msg,
                }
            ]
        )

upload_assets

upload_assets(
    assets_dir: str | Path,
) -> dict[Any, FileUploadState] | None

Uploads assets from a directory to the catalog.

Scans the specified directory for assets and uploads them to the catalog, recording their metadata and types. Assets are organized by their types and associated with the execution.

Parameters:

Name Type Description Default
assets_dir str | Path

Directory containing assets to upload.

required

Returns:

Type Description
dict[Any, FileUploadState] | None

dict[Any, FileUploadState] | None: Mapping of assets to their upload states, or None if no assets were found.

Raises:

Type Description
DerivaMLException

If upload fails or assets are invalid.

Example

states = execution.upload_assets("output/results") for asset, state in states.items(): ... print(f"{asset}: {state}")

Source code in src/deriva_ml/execution/execution.py
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def upload_assets(
    self,
    assets_dir: str | Path,
) -> dict[Any, FileUploadState] | None:
    """Uploads assets from a directory to the catalog.

    Scans the specified directory for assets and uploads them to the catalog,
    recording their metadata and types. Assets are organized by their types
    and associated with the execution.

    Args:
        assets_dir: Directory containing assets to upload.

    Returns:
        dict[Any, FileUploadState] | None: Mapping of assets to their upload states,
            or None if no assets were found.

    Raises:
        DerivaMLException: If upload fails or assets are invalid.

    Example:
        >>> states = execution.upload_assets("output/results")
        >>> for asset, state in states.items():
        ...     print(f"{asset}: {state}")
    """

    def path_to_asset(path: str) -> str:
        """Pull the asset name out of a path to that asset in the filesystem"""
        components = path.split("/")
        return components[components.index("asset") + 2]  # Look for asset in the path to find the name

    if not self._model.is_asset(Path(assets_dir).name):
        raise DerivaMLException("Directory does not have name of an asset table.")
    results = upload_directory(self._model, assets_dir)
    return {path_to_asset(p): r for p, r in results.items()}

upload_execution_outputs

upload_execution_outputs(
    clean_folder: bool | None = None,
    progress_callback: Callable[
        [UploadProgress], None
    ]
    | None = None,
    max_retries: int = 3,
    retry_delay: float = 5.0,
    timeout: tuple[int, int]
    | None = None,
    chunk_size: int | None = None,
) -> dict[str, list[AssetFilePath]]

Uploads all outputs from the execution to the catalog.

Scans the execution's output directories for assets, features, and other results, then uploads them to the catalog. Can optionally clean up the output folders after successful upload.

IMPORTANT: This method must be called AFTER exiting the context manager, not inside it. The context manager handles execution timing (start/stop), while this method handles the separate upload step.

Parameters:

Name Type Description Default
clean_folder bool | None

Whether to delete output folders after upload. If None (default), uses the DerivaML instance's clean_execution_dir setting. Pass True/False to override for this specific execution.

None
progress_callback Callable[[UploadProgress], None] | None

Optional callback function to receive upload progress updates. Called with UploadProgress objects containing file name, bytes uploaded, total bytes, percent complete, phase, and status message.

None
max_retries int

Maximum number of retry attempts for failed uploads (default: 3).

3
retry_delay float

Initial delay in seconds between retries, doubles with each attempt (default: 5.0).

5.0
timeout tuple[int, int] | None

Tuple of (connect_timeout, read_timeout) in seconds. Default is (600, 600). Note: urllib3 uses connect_timeout as the socket timeout during request body writes, so it must be large enough for a full chunk upload.

None
chunk_size int | None

Optional chunk size in bytes for hatrac uploads. If provided, large files will be uploaded in chunks of this size.

None

Returns:

Type Description
dict[str, list[AssetFilePath]]

dict[str, list[AssetFilePath]]: Mapping of asset types to their file paths.

Raises:

Type Description
DerivaMLException

If upload fails or outputs are invalid.

Example

with ml.create_execution(config) as execution: ... # Do ML work, register output files with asset_file_path() ... path = execution.asset_file_path("Model", "model.pt") ... # Write to path... ...

Upload AFTER the context manager exits

def my_callback(progress): ... print(f"Uploading {progress.file_name}: {progress.percent_complete:.1f}%") outputs = execution.upload_execution_outputs(progress_callback=my_callback)

Upload large files with increased timeout (30 min per chunk)

outputs = execution.upload_execution_outputs(timeout=(6, 1800))

Override cleanup setting for this execution

outputs = execution.upload_execution_outputs(clean_folder=False) # Keep files

Source code in src/deriva_ml/execution/execution.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def upload_execution_outputs(
    self,
    clean_folder: bool | None = None,
    progress_callback: Callable[[UploadProgress], None] | None = None,
    max_retries: int = 3,
    retry_delay: float = 5.0,
    timeout: tuple[int, int] | None = None,
    chunk_size: int | None = None,
) -> dict[str, list[AssetFilePath]]:
    """Uploads all outputs from the execution to the catalog.

    Scans the execution's output directories for assets, features, and other results,
    then uploads them to the catalog. Can optionally clean up the output folders
    after successful upload.

    IMPORTANT: This method must be called AFTER exiting the context manager, not inside it.
    The context manager handles execution timing (start/stop), while this method handles
    the separate upload step.

    Args:
        clean_folder: Whether to delete output folders after upload. If None (default),
            uses the DerivaML instance's clean_execution_dir setting. Pass True/False
            to override for this specific execution.
        progress_callback: Optional callback function to receive upload progress updates.
            Called with UploadProgress objects containing file name, bytes uploaded,
            total bytes, percent complete, phase, and status message.
        max_retries: Maximum number of retry attempts for failed uploads (default: 3).
        retry_delay: Initial delay in seconds between retries, doubles with each attempt (default: 5.0).
        timeout: Tuple of (connect_timeout, read_timeout) in seconds. Default is (600, 600).
            Note: urllib3 uses connect_timeout as the socket timeout during request body
            writes, so it must be large enough for a full chunk upload.
        chunk_size: Optional chunk size in bytes for hatrac uploads. If provided,
            large files will be uploaded in chunks of this size.

    Returns:
        dict[str, list[AssetFilePath]]: Mapping of asset types to their file paths.

    Raises:
        DerivaMLException: If upload fails or outputs are invalid.

    Example:
        >>> with ml.create_execution(config) as execution:
        ...     # Do ML work, register output files with asset_file_path()
        ...     path = execution.asset_file_path("Model", "model.pt")
        ...     # Write to path...
        ...
        >>> # Upload AFTER the context manager exits
        >>> def my_callback(progress):
        ...     print(f"Uploading {progress.file_name}: {progress.percent_complete:.1f}%")
        >>> outputs = execution.upload_execution_outputs(progress_callback=my_callback)
        >>>
        >>> # Upload large files with increased timeout (30 min per chunk)
        >>> outputs = execution.upload_execution_outputs(timeout=(6, 1800))
        >>>
        >>> # Override cleanup setting for this execution
        >>> outputs = execution.upload_execution_outputs(clean_folder=False)  # Keep files
    """
    if self._dry_run:
        return {}

    # Use DerivaML instance setting if not explicitly provided
    if clean_folder is None:
        clean_folder = getattr(self._ml_object, 'clean_execution_dir', True)

    try:
        self.uploaded_assets = self._upload_execution_dirs(
            progress_callback=progress_callback,
            max_retries=max_retries,
            retry_delay=retry_delay,
            timeout=timeout,
            chunk_size=chunk_size,
        )
        self._set_asset_descriptions(self.uploaded_assets)
        self.update_status(Status.completed, "Successfully end the execution.")
        if clean_folder:
            self._clean_folder_contents(self._execution_root)
        return self.uploaded_assets
    except Exception as e:
        error = format_exception(e)
        self.update_status(Status.failed, error)
        raise e

ExecutionConfiguration

Bases: BaseModel

Configuration for a DerivaML execution.

Defines the complete configuration for a computational or manual process in DerivaML, including required datasets, input assets, workflow definition, and parameters.

Attributes:

Name Type Description
datasets list[DatasetSpec]

Dataset specifications, each containing: - rid: Dataset Resource Identifier - version: Version to use - materialize: Whether to extract dataset contents

assets list[AssetSpec]

Asset specifications. Each element can be: - A plain RID string (no caching) - An AssetSpec(rid=..., cache=True) for checksum-based caching

workflow Workflow | None

Workflow object defining the computational process. Use ml.lookup_workflow(rid) or ml.lookup_workflow_by_url(url) to get a Workflow object from a RID or URL. Defaults to None, which means the workflow must be provided via the workflow parameter of ml.create_execution() instead. If no workflow is specified in either place, a DerivaMLException is raised at execution creation time.

description str

Description of execution purpose (supports Markdown).

argv list[str]

Command line arguments used to start execution.

config_choices dict[str, str]

Hydra config group choices that were selected. Maps group names to selected config names (e.g., {"model_config": "cifar10_quick"}). Automatically populated by run_model() and get_notebook_configuration().

Example

Plain RIDs (backward compatible)

config = ExecutionConfiguration(assets=["6-EPNR", "6-EP56"])

Mixed: cached model weights + uncached embeddings

config = ExecutionConfiguration( ... assets=[ ... AssetSpec(rid="6-EPNR", cache=True), ... "6-EP56", ... ] ... )

Source code in src/deriva_ml/execution/execution_configuration.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class ExecutionConfiguration(BaseModel):
    """Configuration for a DerivaML execution.

    Defines the complete configuration for a computational or manual process in DerivaML,
    including required datasets, input assets, workflow definition, and parameters.

    Attributes:
        datasets (list[DatasetSpec]): Dataset specifications, each containing:
            - rid: Dataset Resource Identifier
            - version: Version to use
            - materialize: Whether to extract dataset contents
        assets (list[AssetSpec]): Asset specifications. Each element can be:
            - A plain RID string (no caching)
            - An ``AssetSpec(rid=..., cache=True)`` for checksum-based caching
        workflow (Workflow | None): Workflow object defining the computational process.
            Use ``ml.lookup_workflow(rid)`` or ``ml.lookup_workflow_by_url(url)`` to get
            a Workflow object from a RID or URL. Defaults to ``None``, which means the
            workflow must be provided via the ``workflow`` parameter of
            ``ml.create_execution()`` instead. If no workflow is specified in either
            place, a ``DerivaMLException`` is raised at execution creation time.
        description (str): Description of execution purpose (supports Markdown).
        argv (list[str]): Command line arguments used to start execution.
        config_choices (dict[str, str]): Hydra config group choices that were selected.
            Maps group names to selected config names (e.g., {"model_config": "cifar10_quick"}).
            Automatically populated by run_model() and get_notebook_configuration().

    Example:
        >>> # Plain RIDs (backward compatible)
        >>> config = ExecutionConfiguration(assets=["6-EPNR", "6-EP56"])
        >>>
        >>> # Mixed: cached model weights + uncached embeddings
        >>> config = ExecutionConfiguration(
        ...     assets=[
        ...         AssetSpec(rid="6-EPNR", cache=True),
        ...         "6-EP56",
        ...     ]
        ... )
    """

    datasets: list[DatasetSpec] = []
    assets: list[AssetSpec] = []
    workflow: Workflow | None = None
    description: str = ""
    argv: list[str] = Field(default_factory=lambda: sys.argv)
    config_choices: dict[str, str] = Field(default_factory=dict)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("assets", mode="before")
    @classmethod
    def validate_assets(cls, value: Any) -> Any:
        """Normalize asset entries to AssetSpec objects.

        Accepts plain RID strings, AssetRID objects, DictConfig from Hydra,
        AssetSpec objects, or dicts with 'rid' and optional 'cache' keys.
        """
        result = []
        for v in value:
            if isinstance(v, AssetSpec):
                result.append(v)
            elif isinstance(v, dict):
                # Dict with rid/cache keys (e.g., from JSON config)
                result.append(AssetSpec(**v))
            elif isinstance(v, DictConfig):
                # OmegaConf DictConfig from Hydra — may have rid+cache or just rid
                d = dict(v)
                if "rid" in d:
                    result.append(AssetSpec(**d))
                else:
                    # Legacy DictConfig with just .rid attribute (AssetRID-style)
                    result.append(AssetSpec(rid=v.rid, cache=getattr(v, "cache", False)))
            elif isinstance(v, AssetRID):
                result.append(AssetSpec(rid=v.rid))
            elif isinstance(v, str):
                result.append(AssetSpec(rid=v))
            else:
                # Unknown type — try string coercion
                result.append(AssetSpec(rid=str(v)))
        return result

    @staticmethod
    def load_configuration(path: Path) -> ExecutionConfiguration:
        """Creates an ExecutionConfiguration from a JSON file.

        Loads and parses a JSON configuration file into an ExecutionConfiguration
        instance. The file should contain a valid configuration specification.

        Args:
            path: Path to JSON configuration file.

        Returns:
            ExecutionConfiguration: Loaded configuration instance.

        Raises:
            ValueError: If JSON file is invalid or missing required fields.
            FileNotFoundError: If configuration file doesn't exist.

        Example:
            >>> config = ExecutionConfiguration.load_configuration(Path("config.json"))
            >>> print(f"Workflow: {config.workflow}")
            >>> print(f"Datasets: {len(config.datasets)}")
        """
        with Path(path).open() as fd:
            config = json.load(fd)
        return ExecutionConfiguration.model_validate(config)

load_configuration staticmethod

load_configuration(
    path: Path,
) -> ExecutionConfiguration

Creates an ExecutionConfiguration from a JSON file.

Loads and parses a JSON configuration file into an ExecutionConfiguration instance. The file should contain a valid configuration specification.

Parameters:

Name Type Description Default
path Path

Path to JSON configuration file.

required

Returns:

Name Type Description
ExecutionConfiguration ExecutionConfiguration

Loaded configuration instance.

Raises:

Type Description
ValueError

If JSON file is invalid or missing required fields.

FileNotFoundError

If configuration file doesn't exist.

Example

config = ExecutionConfiguration.load_configuration(Path("config.json")) print(f"Workflow: {config.workflow}") print(f"Datasets: {len(config.datasets)}")

Source code in src/deriva_ml/execution/execution_configuration.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@staticmethod
def load_configuration(path: Path) -> ExecutionConfiguration:
    """Creates an ExecutionConfiguration from a JSON file.

    Loads and parses a JSON configuration file into an ExecutionConfiguration
    instance. The file should contain a valid configuration specification.

    Args:
        path: Path to JSON configuration file.

    Returns:
        ExecutionConfiguration: Loaded configuration instance.

    Raises:
        ValueError: If JSON file is invalid or missing required fields.
        FileNotFoundError: If configuration file doesn't exist.

    Example:
        >>> config = ExecutionConfiguration.load_configuration(Path("config.json"))
        >>> print(f"Workflow: {config.workflow}")
        >>> print(f"Datasets: {len(config.datasets)}")
    """
    with Path(path).open() as fd:
        config = json.load(fd)
    return ExecutionConfiguration.model_validate(config)

validate_assets classmethod

validate_assets(value: Any) -> Any

Normalize asset entries to AssetSpec objects.

Accepts plain RID strings, AssetRID objects, DictConfig from Hydra, AssetSpec objects, or dicts with 'rid' and optional 'cache' keys.

Source code in src/deriva_ml/execution/execution_configuration.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@field_validator("assets", mode="before")
@classmethod
def validate_assets(cls, value: Any) -> Any:
    """Normalize asset entries to AssetSpec objects.

    Accepts plain RID strings, AssetRID objects, DictConfig from Hydra,
    AssetSpec objects, or dicts with 'rid' and optional 'cache' keys.
    """
    result = []
    for v in value:
        if isinstance(v, AssetSpec):
            result.append(v)
        elif isinstance(v, dict):
            # Dict with rid/cache keys (e.g., from JSON config)
            result.append(AssetSpec(**v))
        elif isinstance(v, DictConfig):
            # OmegaConf DictConfig from Hydra — may have rid+cache or just rid
            d = dict(v)
            if "rid" in d:
                result.append(AssetSpec(**d))
            else:
                # Legacy DictConfig with just .rid attribute (AssetRID-style)
                result.append(AssetSpec(rid=v.rid, cache=getattr(v, "cache", False)))
        elif isinstance(v, AssetRID):
            result.append(AssetSpec(rid=v.rid))
        elif isinstance(v, str):
            result.append(AssetSpec(rid=v))
        else:
            # Unknown type — try string coercion
            result.append(AssetSpec(rid=str(v)))
    return result

MultirunSpec dataclass

Specification for a multirun experiment.

Attributes:

Name Type Description
name str

Unique identifier for this multirun configuration.

overrides list[str]

List of Hydra override strings (same syntax as command line). Examples: - "+experiment=cifar10_quick,cifar10_extended" - "model_config.learning_rate=0.0001,0.001,0.01" - "model_config.epochs=5,10,25,50"

description str

Rich description for the parent execution. Supports full markdown formatting (headers, tables, bold, etc.).

Source code in src/deriva_ml/execution/multirun_config.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass
class MultirunSpec:
    """Specification for a multirun experiment.

    Attributes:
        name: Unique identifier for this multirun configuration.
        overrides: List of Hydra override strings (same syntax as command line).
            Examples:
            - "+experiment=cifar10_quick,cifar10_extended"
            - "model_config.learning_rate=0.0001,0.001,0.01"
            - "model_config.epochs=5,10,25,50"
        description: Rich description for the parent execution. Supports full
            markdown formatting (headers, tables, bold, etc.).
    """
    name: str
    overrides: list[str] = field(default_factory=list)
    description: str = ""

Workflow

Bases: BaseModel

Represents a computational workflow in DerivaML.

A workflow defines a computational process or analysis pipeline. Each workflow has a unique identifier, source code location, and type. Workflows are typically associated with Git repositories for version control.

When a Workflow is retrieved via lookup_workflow(rid) or lookup_workflow_by_url(), it is bound to a catalog and its description and workflow_type properties become writable. Setting these properties will update the catalog record. If the catalog is read-only (a snapshot), attempting to set them will raise a DerivaMLException.

Attributes:

Name Type Description
name str

Human-readable name of the workflow.

url str

URI to the workflow source code (typically a GitHub URL).

workflow_type str | list[str]

Type(s) of workflow (must be controlled vocabulary terms). Accepts a single string or a list of strings. Internally normalized to a list. When the workflow is bound to a writable catalog, setting this property will update the catalog record. The new values must be valid terms from the Workflow_Type vocabulary.

version str | None

Version identifier (semantic versioning).

description str | None

Description of workflow purpose and behavior. When the workflow is bound to a writable catalog, setting this property will update the catalog record.

rid RID | None

Resource Identifier if registered in catalog.

checksum str | None

Git hash of workflow source code.

is_notebook bool

Whether workflow is a Jupyter notebook.

Note

The recommended way to create a Workflow is via :meth:DerivaML.create_workflow() <deriva_ml.DerivaML.create_workflow>, which validates the workflow type against the catalog vocabulary::

>>> workflow = ml.create_workflow(
...     name="RNA Analysis",
...     workflow_type="python_notebook",
...     description="RNA sequence analysis"
... )
Example

Create a workflow directly (without catalog validation)::

>>> workflow = Workflow(
...     name="RNA Analysis",
...     url="https://github.com/org/repo/analysis.ipynb",
...     workflow_type="python_notebook",
...     version="1.0.0",
...     description="RNA sequence analysis"
... )

Look up an existing workflow by RID and update its properties::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.description = "Updated description for RNA analysis"
>>> workflow.workflow_type = "python_script"
>>> print(workflow.description)
Updated description for RNA analysis

Look up by URL and update::

>>> url = "https://github.com/org/repo/blob/abc123/analysis.py"
>>> workflow = ml.lookup_workflow_by_url(url)
>>> workflow.description = "New description"

Attempting to update on a read-only catalog raises an error::

>>> snapshot_ml = ml.catalog_snapshot("2023-01-15T10:30:00")
>>> workflow = snapshot_ml.lookup_workflow("2-ABC1")
>>> workflow.description = "New description"  # Raises DerivaMLException
Source code in src/deriva_ml/execution/workflow.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
class Workflow(BaseModel):
    """Represents a computational workflow in DerivaML.

    A workflow defines a computational process or analysis pipeline. Each workflow has
    a unique identifier, source code location, and type. Workflows are typically
    associated with Git repositories for version control.

    When a Workflow is retrieved via ``lookup_workflow(rid)`` or ``lookup_workflow_by_url()``,
    it is bound to a catalog and its ``description`` and ``workflow_type`` properties become
    writable. Setting these properties will update the catalog record. If the catalog is
    read-only (a snapshot), attempting to set them will raise a ``DerivaMLException``.

    Attributes:
        name (str): Human-readable name of the workflow.
        url (str): URI to the workflow source code (typically a GitHub URL).
        workflow_type (str | list[str]): Type(s) of workflow (must be controlled vocabulary terms).
            Accepts a single string or a list of strings. Internally normalized to a list.
            When the workflow is bound to a writable catalog, setting this property
            will update the catalog record. The new values must be valid terms from
            the Workflow_Type vocabulary.
        version (str | None): Version identifier (semantic versioning).
        description (str | None): Description of workflow purpose and behavior.
            When the workflow is bound to a writable catalog, setting this property
            will update the catalog record.
        rid (RID | None): Resource Identifier if registered in catalog.
        checksum (str | None): Git hash of workflow source code.
        is_notebook (bool): Whether workflow is a Jupyter notebook.

    Note:
        The recommended way to create a Workflow is via :meth:`DerivaML.create_workflow()
        <deriva_ml.DerivaML.create_workflow>`, which validates the workflow type against
        the catalog vocabulary::

            >>> workflow = ml.create_workflow(
            ...     name="RNA Analysis",
            ...     workflow_type="python_notebook",
            ...     description="RNA sequence analysis"
            ... )

    Example:
        Create a workflow directly (without catalog validation)::

            >>> workflow = Workflow(
            ...     name="RNA Analysis",
            ...     url="https://github.com/org/repo/analysis.ipynb",
            ...     workflow_type="python_notebook",
            ...     version="1.0.0",
            ...     description="RNA sequence analysis"
            ... )

        Look up an existing workflow by RID and update its properties::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "Updated description for RNA analysis"
            >>> workflow.workflow_type = "python_script"
            >>> print(workflow.description)
            Updated description for RNA analysis

        Look up by URL and update::

            >>> url = "https://github.com/org/repo/blob/abc123/analysis.py"
            >>> workflow = ml.lookup_workflow_by_url(url)
            >>> workflow.description = "New description"

        Attempting to update on a read-only catalog raises an error::

            >>> snapshot_ml = ml.catalog_snapshot("2023-01-15T10:30:00")
            >>> workflow = snapshot_ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "New description"  # Raises DerivaMLException
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    workflow_type: str | list[str]
    description: str | None = None
    url: str | None = None
    version: str | None = None
    rid: RID | None = None
    checksum: str | None = None
    is_notebook: bool = False
    git_root: Path | None = None
    allow_dirty: bool = False

    _ml_instance: "DerivaMLCatalog | None" = PrivateAttr(default=None)
    _logger: logging.Logger = PrivateAttr(default_factory=lambda: logging.getLogger("deriva_ml"))

    @field_validator("workflow_type", mode="before")
    @classmethod
    def _normalize_workflow_type(cls, v: str | list[str]) -> list[str]:
        """Normalize workflow_type to always be a list of strings."""
        if isinstance(v, str):
            return [v]
        return list(v)

    def __setattr__(self, name: str, value: Any) -> None:
        """Override setattr to intercept description and workflow_type updates.

        When the workflow is bound to a catalog (via lookup_workflow), setting
        the ``description`` or ``workflow_type`` properties will update the catalog
        record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

        Args:
            name: The attribute name being set.
            value: The value to set.

        Raises:
            DerivaMLException: If attempting to set properties on a read-only
                catalog (snapshot), or if workflow_type is not a valid vocabulary term.

        Examples:
            Update description::

                >>> workflow = ml.lookup_workflow("2-ABC1")
                >>> workflow.description = "Updated description"

            Update workflow type::

                >>> workflow = ml.lookup_workflow("2-ABC1")
                >>> workflow.workflow_type = "python_notebook"
        """
        # Only intercept updates after full initialization
        # Use __dict__ check to avoid recursion during Pydantic model construction
        if (
            "__pydantic_private__" in self.__dict__
            and self.__dict__.get("__pydantic_private__", {}).get("_ml_instance") is not None
        ):
            if name == "description":
                self._update_description_in_catalog(value)
            elif name == "workflow_type":
                # Normalize to list
                if isinstance(value, str):
                    value = [value]
                self._update_workflow_types_in_catalog(value)
        super().__setattr__(name, value)

    def _check_writable_catalog(self, operation: str) -> None:
        """Check that the catalog is writable and workflow is registered.

        Args:
            operation: Description of the operation being attempted.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                or if the catalog is read-only (a snapshot).
        """
        # Import here to avoid circular dependency at module load
        import importlib
        _deriva_core = importlib.import_module("deriva.core")
        ErmrestSnapshot = _deriva_core.ErmrestSnapshot

        if self.rid is None:
            raise DerivaMLException(
                f"Cannot {operation}: Workflow is not registered in the catalog (no RID)"
            )

        if isinstance(self._ml_instance.catalog, ErmrestSnapshot):
            raise DerivaMLException(
                f"Cannot {operation} on a read-only catalog snapshot. "
                "Use a writable catalog connection instead."
            )

    def _update_description_in_catalog(self, new_description: str | None) -> None:
        """Update the description field in the catalog.

        This internal method is called when the description property is set
        on a catalog-bound Workflow object.

        Args:
            new_description: The new description value.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                or if the catalog is read-only (a snapshot).
        """
        self._check_writable_catalog("update description")

        # Update the catalog record
        pb = self._ml_instance.pathBuilder()
        workflow_path = pb.schemas[self._ml_instance.ml_schema].Workflow
        workflow_path.update([{"RID": self.rid, "Description": new_description}])

    def _get_workflow_type_association_table(self):
        """Get the association table for workflow types.

        Returns:
            Tuple of (table_name, table_path) for the Workflow-Workflow_Type association table.
        """
        atable_name = "Workflow_Workflow_Type"
        pb = self._ml_instance.pathBuilder()
        atable_path = pb.schemas[self._ml_instance.ml_schema].tables[atable_name]
        return atable_name, atable_path

    @property
    def workflow_types(self) -> list[str]:
        """Get the workflow types from the catalog.

        This property fetches the current workflow types directly from the catalog,
        ensuring consistency when multiple Workflow instances reference the same
        workflow or when types are modified externally.

        When not bound to a catalog, returns the local ``workflow_type`` field.

        Returns:
            List of workflow type term names from the Workflow_Type vocabulary.
        """
        if self._ml_instance is not None:
            _, atable_path = self._get_workflow_type_association_table()
            wt_types = (
                atable_path.filter(atable_path.Workflow == self.rid)
                .attributes(atable_path.Workflow_Type)
                .fetch()
            )
            return [wt[MLVocab.workflow_type] for wt in wt_types]
        return list(self.workflow_type)

    def add_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
        """Add a workflow type to this workflow.

        Adds a type term to this workflow if it's not already present. The term must
        exist in the Workflow_Type vocabulary.

        Args:
            workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only, or the term doesn't exist.
        """
        self._check_writable_catalog("add workflow_type")

        if isinstance(workflow_type, VocabularyTerm):
            vocab_term = workflow_type
        else:
            vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

        if vocab_term.name in self.workflow_types:
            return

        _, atable_path = self._get_workflow_type_association_table()
        atable_path.insert([{MLVocab.workflow_type: vocab_term.name, "Workflow": self.rid}])

    def remove_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
        """Remove a workflow type from this workflow.

        Removes a type term from this workflow if it's currently associated.

        Args:
            workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only, or the term doesn't exist.
        """
        self._check_writable_catalog("remove workflow_type")

        if isinstance(workflow_type, VocabularyTerm):
            vocab_term = workflow_type
        else:
            vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

        if vocab_term.name not in self.workflow_types:
            return

        _, atable_path = self._get_workflow_type_association_table()
        atable_path.filter(
            (atable_path.Workflow == self.rid) & (atable_path.Workflow_Type == vocab_term.name)
        ).delete()

    def add_workflow_types(self, workflow_types: str | VocabularyTerm | list[str | VocabularyTerm]) -> None:
        """Add one or more workflow types to this workflow.

        Args:
            workflow_types: Single term or list of terms. Can be strings (term names)
                or VocabularyTerm objects.

        Raises:
            DerivaMLException: If any term doesn't exist in the Workflow_Type vocabulary.
        """
        types_to_add = [workflow_types] if not isinstance(workflow_types, list) else workflow_types

        for term in types_to_add:
            self.add_workflow_type(term)

    def _update_workflow_types_in_catalog(self, new_workflow_types: list[str]) -> None:
        """Replace all workflow types in the catalog with the given list.

        This internal method is called when the workflow_type property is set
        on a catalog-bound Workflow object. Each new type must be a valid
        term from the Workflow_Type vocabulary.

        Args:
            new_workflow_types: List of new workflow type names.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only (a snapshot), or any workflow_type
                is not a valid vocabulary term.
        """
        self._check_writable_catalog("update workflow_type")

        # Validate all new types exist in vocabulary
        for wt in new_workflow_types:
            self._ml_instance.lookup_term(MLVocab.workflow_type, wt)

        # Delete all existing type associations
        _, atable_path = self._get_workflow_type_association_table()
        atable_path.filter(atable_path.Workflow == self.rid).delete()

        # Insert new type associations
        if new_workflow_types:
            atable_path.insert([
                {MLVocab.workflow_type: wt, "Workflow": self.rid}
                for wt in new_workflow_types
            ])

    @model_validator(mode="after")
    def setup_url_checksum(self) -> "Workflow":
        """Creates a workflow from the current execution context.

        Identifies the currently executing program (script or notebook) and creates
        a workflow definition. Automatically determines the Git repository information
        and source code checksum.

        The behavior can be configured using environment variables:
            - DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
            - DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
            - DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

        Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true):
            - DERIVA_MCP_VERSION: Semantic version of the Docker image
            - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time
            - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier)
            - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

        Args:

        Returns:
            Workflow: New workflow instance with detected Git information.

        Raises:
            DerivaMLException: If not in a Git repository or detection fails (non-Docker).

        Example:
            >>> workflow = Workflow.create_workflow(
            ...     name="Sample Analysis",
            ...     workflow_type="python_script",
            ...     description="Process sample data"
            ... )
        """
        self._logger = logging.getLogger("deriva_ml")

        # Check if running in Docker container (no git repo available)
        if os.environ.get("DERIVA_MCP_IN_DOCKER", "").lower() == "true":
            # Use Docker image metadata for provenance
            self.version = self.version or os.environ.get("DERIVA_MCP_VERSION", "")

            # Use image digest as checksum (unique identifier for the container)
            # Fall back to git commit if digest not available
            self.checksum = self.checksum or (
                os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
                or os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
            )

            # Build URL pointing to the Docker image or source repo
            if not self.url:
                image_name = os.environ.get(
                    "DERIVA_MCP_IMAGE_NAME",
                    "ghcr.io/informatics-isi-edu/deriva-ml-mcp",
                )
                image_digest = os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
                if image_digest:
                    # URL format: image@sha256:digest
                    self.url = f"{image_name}@{image_digest}"
                else:
                    # Fall back to source repo with git commit
                    source_url = "https://github.com/informatics-isi-edu/deriva-ml-mcp"
                    git_commit = os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
                    self.url = f"{source_url}/commit/{git_commit}" if git_commit else source_url

            return self

        # Check to see if execution file info is being passed in by calling program (notebook runner)
        if "DERIVA_ML_WORKFLOW_URL" in os.environ:
            self.url = os.environ["DERIVA_ML_WORKFLOW_URL"]
            self.checksum = os.environ.get("DERIVA_ML_WORKFLOW_CHECKSUM", "")
            notebook_path = os.environ.get("DERIVA_ML_NOTEBOOK_PATH")
            if notebook_path:
                self.git_root = Workflow._get_git_root(Path(notebook_path))
            self.is_notebook = True
            return self

        # Standard git detection for local development
        # Check env var for allow_dirty (set by CLI --allow-dirty flag or dry-run mode)
        if os.environ.get("DERIVA_ML_ALLOW_DIRTY", "").lower() == "true":
            self.allow_dirty = True
        if os.environ.get("DERIVA_ML_DRY_RUN", "").lower() == "true":
            self.allow_dirty = True

        if not self.url:
            path, self.is_notebook = Workflow._get_python_script()
            self.url, self.checksum = Workflow.get_url_and_checksum(path, allow_dirty=self.allow_dirty)
            self.git_root = Workflow._get_git_root(path)

        self.version = self.version or Workflow.get_dynamic_version(root=str(self.git_root or Path.cwd()))
        return self

    @staticmethod
    def get_url_and_checksum(executable_path: Path, allow_dirty: bool = False) -> tuple[str, str]:
        """Determines the Git URL and checksum for a file.

        Computes the Git repository URL and file checksum for the specified path.
        For notebooks, strips cell outputs before computing the checksum.

        Args:
            executable_path: Path to the workflow file.
            allow_dirty: If True, log a warning instead of raising an error
                when the file has uncommitted changes. Defaults to False.

        Returns:
            tuple[str, str]: (GitHub URL, Git object hash)

        Raises:
            DerivaMLException: If not in a Git repository.
            DerivaMLDirtyWorkflowError: If the file has uncommitted changes
                and allow_dirty is False.

        Example:
            >>> url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb"))
            >>> print(f"URL: {url}")
            >>> print(f"Checksum: {checksum}")
        """
        try:
            subprocess.run(
                ["git", "rev-parse", "--is-inside-work-tree"],
                capture_output=True,
                text=True,
                check=True,
            )
        except subprocess.CalledProcessError:
            raise DerivaMLException("Not executing in a Git repository.")

        github_url, is_dirty = Workflow._github_url(executable_path)

        if is_dirty:
            if allow_dirty:
                logging.getLogger("deriva_ml").warning(
                    f"File {executable_path} has uncommitted changes. "
                    f"Proceeding with --allow-dirty override."
                )
            else:
                raise DerivaMLDirtyWorkflowError(str(executable_path))

        # If you are in a notebook, strip out the outputs before computing the checksum.
        if executable_path != "REPL":
            if "ipynb" == executable_path.suffix:
                strip_proc = subprocess.run(
                    ["nbstripout", "-t", str(executable_path)],
                    capture_output=True,
                )
                hash_proc = subprocess.run(
                    ["git", "hash-object", "--stdin"],
                    input=strip_proc.stdout,
                    capture_output=True,
                    text=True,
                )
                checksum = hash_proc.stdout.strip()
            else:
                checksum = subprocess.run(
                    ["git", "hash-object", str(executable_path)],
                    capture_output=True,
                    text=True,
                    check=False,
                ).stdout.strip()
        else:
            checksum = "1"
        return github_url, checksum

    @staticmethod
    def _get_git_root(executable_path: Path) -> str | None:
        """Gets the root directory of the Git repository.

        Args:
            executable_path: Path to check for Git repository.

        Returns:
            str | None: Absolute path to repository root, or None if not in repository.
        """
        try:
            result = subprocess.run(
                ["git", "rev-parse", "--show-toplevel"],
                cwd=executable_path.parent,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
                text=True,
                check=True,
            )
            return result.stdout.strip()
        except subprocess.CalledProcessError:
            return None  # Not in a git repository

    @staticmethod
    def _check_nbstrip_status() -> None:
        """Checks if nbstripout is installed and configured.

        Verifies that the nbstripout tool is available and properly installed in the
        Git repository. Issues warnings if setup is incomplete.
        """
        logger = logging.getLogger("deriva_ml")
        try:
            if subprocess.run(
                ["nbstripout", "--is-installed"],
                check=False,
                capture_output=True,
            ).returncode:
                logger.warning("nbstripout is not installed in repository. Please run nbstripout --install")
        except (subprocess.CalledProcessError, FileNotFoundError):
            logger.warning("nbstripout is not found. Please install it with: pip install nbstripout")

    @staticmethod
    def _get_notebook_path() -> Path | None:
        """Gets the path of the currently executing notebook.

        Returns:
            Path | None: Absolute path to current notebook, or None if not in notebook.
        """

        server, session = Workflow._get_notebook_session()

        if server and session:
            relative_path = session["notebook"]["path"]
            # Join the notebook directory with the relative path
            return Path(server["root_dir"]) / relative_path
        else:
            return None

    @staticmethod
    def _get_notebook_session() -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
        """Return the absolute path of the current notebook."""
        # Get the kernel's connection file and extract the kernel ID
        try:
            if not (connection_file := Path(get_kernel_connection()).name):
                return None, None
        except RuntimeError:
            return None, None

        # Extract kernel ID from connection filename.
        # Standard Jupyter format: "kernel-<kernel_id>.json"
        # PyCharm/other formats may vary: "<kernel_id>.json" or other patterns
        kernel_id = None
        if connection_file.startswith("kernel-") and "-" in connection_file:
            # Standard format: kernel-<uuid>.json
            parts = connection_file.split("-", 1)
            if len(parts) > 1:
                kernel_id = parts[1].rsplit(".", 1)[0]
        else:
            # Fallback: assume filename (without extension) is the kernel ID
            kernel_id = connection_file.rsplit(".", 1)[0]

        if not kernel_id:
            return None, None

        # Look through the running server sessions to find the matching kernel ID
        for server in get_servers():
            try:
                # If a token is required for authentication, include it in headers
                token = server.get("token", "")
                headers = {}
                if token:
                    headers["Authorization"] = f"token {token}"

                try:
                    sessions_url = server["url"] + "api/sessions"
                    response = requests.get(sessions_url, headers=headers)
                    response.raise_for_status()
                    sessions = response.json()
                except RequestException as e:
                    raise e
                for sess in sessions:
                    if sess["kernel"]["id"] == kernel_id:
                        return server, sess
            except Exception as _e:
                # Ignore servers we can't connect to.
                pass
        return None, None

    @staticmethod
    def _in_repl():
        # Standard Python interactive mode
        if hasattr(sys, "ps1"):
            return True

        # Interactive mode forced by -i
        if sys.flags.interactive:
            return True

        # IPython / Jupyter detection
        try:
            from IPython import get_ipython

            if get_ipython() is not None:
                return True
        except ImportError:
            pass

        return False

    @staticmethod
    def _get_python_script() -> tuple[Path, bool]:
        """Return the path to the currently executing script"""
        is_notebook = Workflow._get_notebook_path() is not None
        return Path(_get_calling_module()), is_notebook

    @staticmethod
    def _github_url(executable_path: Path) -> tuple[str, bool]:
        """Return a GitHub URL for the latest commit of the script from which this routine is called.

        This routine is used to be called from a script or notebook (e.g., python -m file). It assumes that
        the file is in a GitHub repository and committed.  It returns a URL to the last commited version of this
        file in GitHub.

        Returns: A tuple with the gethub_url and a boolean to indicate if uncommited changes
            have been made to the file.

        """

        # Get repo URL from local GitHub repo.
        if executable_path == "REPL":
            return "REPL", True
        try:
            result = subprocess.run(
                ["git", "remote", "get-url", "origin"],
                capture_output=True,
                text=True,
                cwd=executable_path.parent,
            )
            github_url = result.stdout.strip().removesuffix(".git")
        except subprocess.CalledProcessError:
            raise DerivaMLException("No GIT remote found")

        # Find the root directory for the repository
        repo_root = Workflow._get_git_root(executable_path)

        # Now check to see if a file has been modified since the last commit.
        try:
            result = subprocess.run(
                ["git", "status", "--porcelain"],
                cwd=executable_path.parent,
                capture_output=True,
                text=True,
                check=False,
            )
            is_dirty = bool("M " in result.stdout.strip())  # Returns True if the output indicates a modified file
        except subprocess.CalledProcessError:
            is_dirty = False  # If the Git command fails, assume no changes

        """Get SHA-1 hash of latest commit of the file in the repository"""

        result = subprocess.run(
            ["git", "log", "-n", "1", "--pretty=format:%H", executable_path],
            cwd=repo_root,
            capture_output=True,
            text=True,
            check=False,
        )
        sha = result.stdout.strip()
        url = f"{github_url}/blob/{sha}/{executable_path.relative_to(repo_root)}"
        return url, is_dirty

    @staticmethod
    def get_dynamic_version(root: str | os.PathLike | None = None) -> str:
        """
        Return a dynamic version string based on VCS state (setuptools_scm),
        including dirty/uncommitted changes if configured.

        Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.
        """
        # 1) Tell setuptools to use stdlib distutils (or no override) to avoid
        #    the '_distutils_hack' assertion you hit.
        os.environ.setdefault("SETUPTOOLS_USE_DISTUTILS", "stdlib")

        warnings.filterwarnings(
            "ignore",
            category=UserWarning,
            module="_distutils_hack",
        )
        try:
            from setuptools_scm import get_version
        except Exception as e:  # ImportError or anything environment-specific
            raise RuntimeError(f"setuptools_scm is not available: {e}") from e

        if root is None:
            # Adjust this to point at your repo root if needed
            root = Path(__file__).resolve().parents[1]

        return get_version(root=root)

workflow_types property

workflow_types: list[str]

Get the workflow types from the catalog.

This property fetches the current workflow types directly from the catalog, ensuring consistency when multiple Workflow instances reference the same workflow or when types are modified externally.

When not bound to a catalog, returns the local workflow_type field.

Returns:

Type Description
list[str]

List of workflow type term names from the Workflow_Type vocabulary.

__setattr__

__setattr__(
    name: str, value: Any
) -> None

Override setattr to intercept description and workflow_type updates.

When the workflow is bound to a catalog (via lookup_workflow), setting the description or workflow_type properties will update the catalog record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

Parameters:

Name Type Description Default
name str

The attribute name being set.

required
value Any

The value to set.

required

Raises:

Type Description
DerivaMLException

If attempting to set properties on a read-only catalog (snapshot), or if workflow_type is not a valid vocabulary term.

Examples:

Update description::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.description = "Updated description"

Update workflow type::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.workflow_type = "python_notebook"
Source code in src/deriva_ml/execution/workflow.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def __setattr__(self, name: str, value: Any) -> None:
    """Override setattr to intercept description and workflow_type updates.

    When the workflow is bound to a catalog (via lookup_workflow), setting
    the ``description`` or ``workflow_type`` properties will update the catalog
    record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

    Args:
        name: The attribute name being set.
        value: The value to set.

    Raises:
        DerivaMLException: If attempting to set properties on a read-only
            catalog (snapshot), or if workflow_type is not a valid vocabulary term.

    Examples:
        Update description::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "Updated description"

        Update workflow type::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.workflow_type = "python_notebook"
    """
    # Only intercept updates after full initialization
    # Use __dict__ check to avoid recursion during Pydantic model construction
    if (
        "__pydantic_private__" in self.__dict__
        and self.__dict__.get("__pydantic_private__", {}).get("_ml_instance") is not None
    ):
        if name == "description":
            self._update_description_in_catalog(value)
        elif name == "workflow_type":
            # Normalize to list
            if isinstance(value, str):
                value = [value]
            self._update_workflow_types_in_catalog(value)
    super().__setattr__(name, value)

add_workflow_type

add_workflow_type(
    workflow_type: str | VocabularyTerm,
) -> None

Add a workflow type to this workflow.

Adds a type term to this workflow if it's not already present. The term must exist in the Workflow_Type vocabulary.

Parameters:

Name Type Description Default
workflow_type str | VocabularyTerm

Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

required

Raises:

Type Description
DerivaMLException

If the workflow is not registered (no RID), the catalog is read-only, or the term doesn't exist.

Source code in src/deriva_ml/execution/workflow.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def add_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
    """Add a workflow type to this workflow.

    Adds a type term to this workflow if it's not already present. The term must
    exist in the Workflow_Type vocabulary.

    Args:
        workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

    Raises:
        DerivaMLException: If the workflow is not registered (no RID),
            the catalog is read-only, or the term doesn't exist.
    """
    self._check_writable_catalog("add workflow_type")

    if isinstance(workflow_type, VocabularyTerm):
        vocab_term = workflow_type
    else:
        vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

    if vocab_term.name in self.workflow_types:
        return

    _, atable_path = self._get_workflow_type_association_table()
    atable_path.insert([{MLVocab.workflow_type: vocab_term.name, "Workflow": self.rid}])

add_workflow_types

add_workflow_types(
    workflow_types: str
    | VocabularyTerm
    | list[str | VocabularyTerm],
) -> None

Add one or more workflow types to this workflow.

Parameters:

Name Type Description Default
workflow_types str | VocabularyTerm | list[str | VocabularyTerm]

Single term or list of terms. Can be strings (term names) or VocabularyTerm objects.

required

Raises:

Type Description
DerivaMLException

If any term doesn't exist in the Workflow_Type vocabulary.

Source code in src/deriva_ml/execution/workflow.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def add_workflow_types(self, workflow_types: str | VocabularyTerm | list[str | VocabularyTerm]) -> None:
    """Add one or more workflow types to this workflow.

    Args:
        workflow_types: Single term or list of terms. Can be strings (term names)
            or VocabularyTerm objects.

    Raises:
        DerivaMLException: If any term doesn't exist in the Workflow_Type vocabulary.
    """
    types_to_add = [workflow_types] if not isinstance(workflow_types, list) else workflow_types

    for term in types_to_add:
        self.add_workflow_type(term)

get_dynamic_version staticmethod

get_dynamic_version(
    root: str | PathLike | None = None,
) -> str

Return a dynamic version string based on VCS state (setuptools_scm), including dirty/uncommitted changes if configured.

Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.

Source code in src/deriva_ml/execution/workflow.py
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
@staticmethod
def get_dynamic_version(root: str | os.PathLike | None = None) -> str:
    """
    Return a dynamic version string based on VCS state (setuptools_scm),
    including dirty/uncommitted changes if configured.

    Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.
    """
    # 1) Tell setuptools to use stdlib distutils (or no override) to avoid
    #    the '_distutils_hack' assertion you hit.
    os.environ.setdefault("SETUPTOOLS_USE_DISTUTILS", "stdlib")

    warnings.filterwarnings(
        "ignore",
        category=UserWarning,
        module="_distutils_hack",
    )
    try:
        from setuptools_scm import get_version
    except Exception as e:  # ImportError or anything environment-specific
        raise RuntimeError(f"setuptools_scm is not available: {e}") from e

    if root is None:
        # Adjust this to point at your repo root if needed
        root = Path(__file__).resolve().parents[1]

    return get_version(root=root)

get_url_and_checksum staticmethod

get_url_and_checksum(
    executable_path: Path,
    allow_dirty: bool = False,
) -> tuple[str, str]

Determines the Git URL and checksum for a file.

Computes the Git repository URL and file checksum for the specified path. For notebooks, strips cell outputs before computing the checksum.

Parameters:

Name Type Description Default
executable_path Path

Path to the workflow file.

required
allow_dirty bool

If True, log a warning instead of raising an error when the file has uncommitted changes. Defaults to False.

False

Returns:

Type Description
tuple[str, str]

tuple[str, str]: (GitHub URL, Git object hash)

Raises:

Type Description
DerivaMLException

If not in a Git repository.

DerivaMLDirtyWorkflowError

If the file has uncommitted changes and allow_dirty is False.

Example

url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb")) print(f"URL: {url}") print(f"Checksum: {checksum}")

Source code in src/deriva_ml/execution/workflow.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
@staticmethod
def get_url_and_checksum(executable_path: Path, allow_dirty: bool = False) -> tuple[str, str]:
    """Determines the Git URL and checksum for a file.

    Computes the Git repository URL and file checksum for the specified path.
    For notebooks, strips cell outputs before computing the checksum.

    Args:
        executable_path: Path to the workflow file.
        allow_dirty: If True, log a warning instead of raising an error
            when the file has uncommitted changes. Defaults to False.

    Returns:
        tuple[str, str]: (GitHub URL, Git object hash)

    Raises:
        DerivaMLException: If not in a Git repository.
        DerivaMLDirtyWorkflowError: If the file has uncommitted changes
            and allow_dirty is False.

    Example:
        >>> url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb"))
        >>> print(f"URL: {url}")
        >>> print(f"Checksum: {checksum}")
    """
    try:
        subprocess.run(
            ["git", "rev-parse", "--is-inside-work-tree"],
            capture_output=True,
            text=True,
            check=True,
        )
    except subprocess.CalledProcessError:
        raise DerivaMLException("Not executing in a Git repository.")

    github_url, is_dirty = Workflow._github_url(executable_path)

    if is_dirty:
        if allow_dirty:
            logging.getLogger("deriva_ml").warning(
                f"File {executable_path} has uncommitted changes. "
                f"Proceeding with --allow-dirty override."
            )
        else:
            raise DerivaMLDirtyWorkflowError(str(executable_path))

    # If you are in a notebook, strip out the outputs before computing the checksum.
    if executable_path != "REPL":
        if "ipynb" == executable_path.suffix:
            strip_proc = subprocess.run(
                ["nbstripout", "-t", str(executable_path)],
                capture_output=True,
            )
            hash_proc = subprocess.run(
                ["git", "hash-object", "--stdin"],
                input=strip_proc.stdout,
                capture_output=True,
                text=True,
            )
            checksum = hash_proc.stdout.strip()
        else:
            checksum = subprocess.run(
                ["git", "hash-object", str(executable_path)],
                capture_output=True,
                text=True,
                check=False,
            ).stdout.strip()
    else:
        checksum = "1"
    return github_url, checksum

remove_workflow_type

remove_workflow_type(
    workflow_type: str | VocabularyTerm,
) -> None

Remove a workflow type from this workflow.

Removes a type term from this workflow if it's currently associated.

Parameters:

Name Type Description Default
workflow_type str | VocabularyTerm

Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

required

Raises:

Type Description
DerivaMLException

If the workflow is not registered (no RID), the catalog is read-only, or the term doesn't exist.

Source code in src/deriva_ml/execution/workflow.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def remove_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
    """Remove a workflow type from this workflow.

    Removes a type term from this workflow if it's currently associated.

    Args:
        workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

    Raises:
        DerivaMLException: If the workflow is not registered (no RID),
            the catalog is read-only, or the term doesn't exist.
    """
    self._check_writable_catalog("remove workflow_type")

    if isinstance(workflow_type, VocabularyTerm):
        vocab_term = workflow_type
    else:
        vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

    if vocab_term.name not in self.workflow_types:
        return

    _, atable_path = self._get_workflow_type_association_table()
    atable_path.filter(
        (atable_path.Workflow == self.rid) & (atable_path.Workflow_Type == vocab_term.name)
    ).delete()

setup_url_checksum

setup_url_checksum() -> 'Workflow'

Creates a workflow from the current execution context.

Identifies the currently executing program (script or notebook) and creates a workflow definition. Automatically determines the Git repository information and source code checksum.

The behavior can be configured using environment variables
  • DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
  • DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
  • DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true): - DERIVA_MCP_VERSION: Semantic version of the Docker image - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier) - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

Args:

Returns:

Name Type Description
Workflow 'Workflow'

New workflow instance with detected Git information.

Raises:

Type Description
DerivaMLException

If not in a Git repository or detection fails (non-Docker).

Example

workflow = Workflow.create_workflow( ... name="Sample Analysis", ... workflow_type="python_script", ... description="Process sample data" ... )

Source code in src/deriva_ml/execution/workflow.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
@model_validator(mode="after")
def setup_url_checksum(self) -> "Workflow":
    """Creates a workflow from the current execution context.

    Identifies the currently executing program (script or notebook) and creates
    a workflow definition. Automatically determines the Git repository information
    and source code checksum.

    The behavior can be configured using environment variables:
        - DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
        - DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
        - DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

    Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true):
        - DERIVA_MCP_VERSION: Semantic version of the Docker image
        - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time
        - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier)
        - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

    Args:

    Returns:
        Workflow: New workflow instance with detected Git information.

    Raises:
        DerivaMLException: If not in a Git repository or detection fails (non-Docker).

    Example:
        >>> workflow = Workflow.create_workflow(
        ...     name="Sample Analysis",
        ...     workflow_type="python_script",
        ...     description="Process sample data"
        ... )
    """
    self._logger = logging.getLogger("deriva_ml")

    # Check if running in Docker container (no git repo available)
    if os.environ.get("DERIVA_MCP_IN_DOCKER", "").lower() == "true":
        # Use Docker image metadata for provenance
        self.version = self.version or os.environ.get("DERIVA_MCP_VERSION", "")

        # Use image digest as checksum (unique identifier for the container)
        # Fall back to git commit if digest not available
        self.checksum = self.checksum or (
            os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
            or os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
        )

        # Build URL pointing to the Docker image or source repo
        if not self.url:
            image_name = os.environ.get(
                "DERIVA_MCP_IMAGE_NAME",
                "ghcr.io/informatics-isi-edu/deriva-ml-mcp",
            )
            image_digest = os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
            if image_digest:
                # URL format: image@sha256:digest
                self.url = f"{image_name}@{image_digest}"
            else:
                # Fall back to source repo with git commit
                source_url = "https://github.com/informatics-isi-edu/deriva-ml-mcp"
                git_commit = os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
                self.url = f"{source_url}/commit/{git_commit}" if git_commit else source_url

        return self

    # Check to see if execution file info is being passed in by calling program (notebook runner)
    if "DERIVA_ML_WORKFLOW_URL" in os.environ:
        self.url = os.environ["DERIVA_ML_WORKFLOW_URL"]
        self.checksum = os.environ.get("DERIVA_ML_WORKFLOW_CHECKSUM", "")
        notebook_path = os.environ.get("DERIVA_ML_NOTEBOOK_PATH")
        if notebook_path:
            self.git_root = Workflow._get_git_root(Path(notebook_path))
        self.is_notebook = True
        return self

    # Standard git detection for local development
    # Check env var for allow_dirty (set by CLI --allow-dirty flag or dry-run mode)
    if os.environ.get("DERIVA_ML_ALLOW_DIRTY", "").lower() == "true":
        self.allow_dirty = True
    if os.environ.get("DERIVA_ML_DRY_RUN", "").lower() == "true":
        self.allow_dirty = True

    if not self.url:
        path, self.is_notebook = Workflow._get_python_script()
        self.url, self.checksum = Workflow.get_url_and_checksum(path, allow_dirty=self.allow_dirty)
        self.git_root = Workflow._get_git_root(path)

    self.version = self.version or Workflow.get_dynamic_version(root=str(self.git_root or Path.cwd()))
    return self

__getattr__

__getattr__(name)

Lazy import to avoid circular dependencies.

Source code in src/deriva_ml/execution/__init__.py
45
46
47
48
49
50
51
def __getattr__(name):
    """Lazy import to avoid circular dependencies."""
    if name == "Execution":
        from deriva_ml.execution.execution import Execution

        return Execution
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

create_model_config

create_model_config(
    ml_class: type["DerivaML"]
    | None = None,
    description: str = "Model execution",
    hydra_defaults: list | None = None,
) -> Any

Create a hydra-zen configuration for run_model.

This helper creates a properly configured hydra-zen builds() for run_model with the specified DerivaML class bound via partial application.

Parameters

ml_class : type[DerivaML], optional The DerivaML class (or subclass) to use. If None, uses the base DerivaML.

str, optional

Default description for executions. Can be overridden at runtime.

list, optional

Custom hydra defaults. If None, uses standard defaults for deriva_ml, datasets, assets, workflow, and model_config groups.

Returns

Any A hydra-zen builds() configuration ready to be registered with store().

Examples

Basic usage with DerivaML:

>>> from deriva_ml.execution.runner import create_model_config
>>> model_config = create_model_config()
>>> store(model_config, name="deriva_model")

With a custom subclass:

>>> from eye_ai import EyeAI
>>> model_config = create_model_config(EyeAI, description="EyeAI analysis")
>>> store(model_config, name="eyeai_model")

With custom hydra defaults:

>>> model_config = create_model_config(
...     hydra_defaults=[
...         "_self_",
...         {"deriva_ml": "production"},
...         {"datasets": "full_dataset"},
...     ]
... )
Source code in src/deriva_ml/execution/runner.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
def create_model_config(
    ml_class: type["DerivaML"] | None = None,
    description: str = "Model execution",
    hydra_defaults: list | None = None,
) -> Any:
    """Create a hydra-zen configuration for run_model.

    This helper creates a properly configured hydra-zen builds() for run_model
    with the specified DerivaML class bound via partial application.

    Parameters
    ----------
    ml_class : type[DerivaML], optional
        The DerivaML class (or subclass) to use. If None, uses the base DerivaML.

    description : str, optional
        Default description for executions. Can be overridden at runtime.

    hydra_defaults : list, optional
        Custom hydra defaults. If None, uses standard defaults for deriva_ml,
        datasets, assets, workflow, and model_config groups.

    Returns
    -------
    Any
        A hydra-zen builds() configuration ready to be registered with store().

    Examples
    --------
    Basic usage with DerivaML:

        >>> from deriva_ml.execution.runner import create_model_config
        >>> model_config = create_model_config()
        >>> store(model_config, name="deriva_model")

    With a custom subclass:

        >>> from eye_ai import EyeAI
        >>> model_config = create_model_config(EyeAI, description="EyeAI analysis")
        >>> store(model_config, name="eyeai_model")

    With custom hydra defaults:

        >>> model_config = create_model_config(
        ...     hydra_defaults=[
        ...         "_self_",
        ...         {"deriva_ml": "production"},
        ...         {"datasets": "full_dataset"},
        ...     ]
        ... )
    """
    from functools import partial

    if hydra_defaults is None:
        hydra_defaults = [
            "_self_",
            {"deriva_ml": "default_deriva"},
            {"datasets": "default_dataset"},
            {"assets": "default_asset"},
            {"workflow": "default_workflow"},
            {"model_config": "default_model"},
        ]

    # Create a partial function with ml_class bound
    if ml_class is not None:
        run_func = partial(run_model, ml_class=ml_class)
    else:
        run_func = run_model

    return builds(
        run_func,
        description=description,
        populate_full_signature=True,
        hydra_defaults=hydra_defaults,
    )

get_all_multirun_configs

get_all_multirun_configs() -> dict[
    str, MultirunSpec
]

Get all registered multirun configurations.

Returns:

Type Description
dict[str, MultirunSpec]

Dictionary mapping names to MultirunSpec instances.

Source code in src/deriva_ml/execution/multirun_config.py
146
147
148
149
150
151
152
def get_all_multirun_configs() -> dict[str, MultirunSpec]:
    """Get all registered multirun configurations.

    Returns:
        Dictionary mapping names to MultirunSpec instances.
    """
    return dict(_multirun_registry)

get_multirun_config

get_multirun_config(
    name: str,
) -> MultirunSpec | None

Look up a registered multirun configuration by name.

Parameters:

Name Type Description Default
name str

The name of the multirun configuration.

required

Returns:

Type Description
MultirunSpec | None

The MultirunSpec if found, None otherwise.

Source code in src/deriva_ml/execution/multirun_config.py
125
126
127
128
129
130
131
132
133
134
def get_multirun_config(name: str) -> MultirunSpec | None:
    """Look up a registered multirun configuration by name.

    Args:
        name: The name of the multirun configuration.

    Returns:
        The MultirunSpec if found, None otherwise.
    """
    return _multirun_registry.get(name)

get_notebook_configuration

get_notebook_configuration(
    config_class: type[T],
    config_name: str,
    overrides: list[str] | None = None,
    job_name: str = "notebook",
    version_base: str = "1.3",
) -> T

Load and return a hydra-zen configuration for use in notebooks.

This function is the notebook equivalent of run_model. While run_model launches a full execution with model training, get_notebook_configuration simply resolves the configuration and returns it for interactive use.

The function handles: - Adding configurations to the hydra store - Launching hydra-zen to resolve defaults and overrides - Returning the instantiated configuration object

Parameters:

Name Type Description Default
config_class type[T]

The hydra-zen builds() class for the configuration. This should be a class created with builds(YourConfig, ...).

required
config_name str

Name of the configuration in the hydra store. Must match the name used when calling store(config_class, name=...).

required
overrides list[str] | None

Optional list of Hydra override strings (e.g., ["param=value"]).

None
job_name str

Name for the Hydra job (default: "notebook").

'notebook'
version_base str

Hydra version base (default: "1.3").

'1.3'

Returns:

Type Description
T

The instantiated configuration object with all defaults resolved.

Example

In your notebook's configuration module (e.g., configs/roc_analysis.py):

from dataclasses import dataclass, field from hydra_zen import builds, store from deriva_ml.execution import BaseConfig

@dataclass ... class ROCAnalysisConfig(BaseConfig): ... execution_rids: list[str] = field(default_factory=list)

ROCAnalysisConfigBuilds = builds( ... ROCAnalysisConfig, ... populate_full_signature=True, ... hydra_defaults=["self", {"deriva_ml": "default_deriva"}], ... ) store(ROCAnalysisConfigBuilds, name="roc_analysis")

In your notebook:

from configs import load_all_configs from configs.roc_analysis import ROCAnalysisConfigBuilds from deriva_ml.execution import get_notebook_configuration

Load all project configs into hydra store

load_all_configs()

Get resolved configuration

config = get_notebook_configuration( ... ROCAnalysisConfigBuilds, ... config_name="roc_analysis", ... overrides=["execution_rids=[3JRC,3KT0]"], ... )

Use the configuration

print(config.execution_rids) # ['3JRC', '3KT0'] print(config.deriva_ml.hostname) # From default_deriva config

Environment Variables

DERIVA_ML_HYDRA_OVERRIDES: JSON-encoded list of override strings. When running via deriva-ml-run-notebook, this is automatically set from command-line arguments. Overrides from this environment variable are applied first, then any overrides passed directly to this function are applied (taking precedence).

Source code in src/deriva_ml/execution/base_config.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def get_notebook_configuration(
    config_class: type[T],
    config_name: str,
    overrides: list[str] | None = None,
    job_name: str = "notebook",
    version_base: str = "1.3",
) -> T:
    """Load and return a hydra-zen configuration for use in notebooks.

    This function is the notebook equivalent of `run_model`. While `run_model`
    launches a full execution with model training, `get_notebook_configuration`
    simply resolves the configuration and returns it for interactive use.

    The function handles:
    - Adding configurations to the hydra store
    - Launching hydra-zen to resolve defaults and overrides
    - Returning the instantiated configuration object

    Args:
        config_class: The hydra-zen builds() class for the configuration.
            This should be a class created with `builds(YourConfig, ...)`.
        config_name: Name of the configuration in the hydra store.
            Must match the name used when calling `store(config_class, name=...)`.
        overrides: Optional list of Hydra override strings (e.g., ["param=value"]).
        job_name: Name for the Hydra job (default: "notebook").
        version_base: Hydra version base (default: "1.3").

    Returns:
        The instantiated configuration object with all defaults resolved.

    Example:
        In your notebook's configuration module (e.g., `configs/roc_analysis.py`):

        >>> from dataclasses import dataclass, field
        >>> from hydra_zen import builds, store
        >>> from deriva_ml.execution import BaseConfig
        >>>
        >>> @dataclass
        ... class ROCAnalysisConfig(BaseConfig):
        ...     execution_rids: list[str] = field(default_factory=list)
        >>>
        >>> ROCAnalysisConfigBuilds = builds(
        ...     ROCAnalysisConfig,
        ...     populate_full_signature=True,
        ...     hydra_defaults=["_self_", {"deriva_ml": "default_deriva"}],
        ... )
        >>> store(ROCAnalysisConfigBuilds, name="roc_analysis")

        In your notebook:

        >>> from configs import load_all_configs
        >>> from configs.roc_analysis import ROCAnalysisConfigBuilds
        >>> from deriva_ml.execution import get_notebook_configuration
        >>>
        >>> # Load all project configs into hydra store
        >>> load_all_configs()
        >>>
        >>> # Get resolved configuration
        >>> config = get_notebook_configuration(
        ...     ROCAnalysisConfigBuilds,
        ...     config_name="roc_analysis",
        ...     overrides=["execution_rids=[3JRC,3KT0]"],
        ... )
        >>>
        >>> # Use the configuration
        >>> print(config.execution_rids)  # ['3JRC', '3KT0']
        >>> print(config.deriva_ml.hostname)  # From default_deriva config

    Environment Variables:
        DERIVA_ML_HYDRA_OVERRIDES: JSON-encoded list of override strings.
            When running via `deriva-ml-run-notebook`, this is automatically
            set from command-line arguments. Overrides from this environment
            variable are applied first, then any overrides passed directly
            to this function are applied (taking precedence).
    """
    # Ensure configs are in the hydra store
    store.add_to_hydra_store(overwrite_ok=True)

    # Collect overrides from environment variable (set by run_notebook CLI)
    env_overrides_json = os.environ.get("DERIVA_ML_HYDRA_OVERRIDES")
    env_overrides = json.loads(env_overrides_json) if env_overrides_json else []

    # Merge overrides: env overrides first, then explicit overrides (higher precedence)
    all_overrides = env_overrides + (overrides or [])

    # Variables to capture from within the task function
    captured_choices: dict[str, str] = {}
    captured_output_dir: str | None = None

    # Define a task function that instantiates and returns the config
    # The cfg from launch() is an OmegaConf DictConfig, so we need to
    # use hydra_zen.instantiate() to convert it to actual Python objects
    def return_instantiated_config(cfg: Any) -> T:
        nonlocal captured_choices, captured_output_dir
        # Capture the Hydra runtime choices (which config names were selected)
        # and runtime output directory (for uploading hydra config files)
        # Filter out None values (some Hydra internal groups have None choices)
        try:
            from hydra.core.hydra_config import HydraConfig
            hydra_cfg = HydraConfig.get()
            choices = hydra_cfg.runtime.choices
            captured_choices = {k: v for k, v in choices.items() if v is not None}
            captured_output_dir = hydra_cfg.runtime.output_dir
        except Exception:
            # If HydraConfig is not available, leave choices empty
            pass
        return instantiate(cfg)

    # Launch hydra-zen to resolve the configuration
    result = launch(
        config_class,
        return_instantiated_config,
        version_base=version_base,
        config_name=config_name,
        job_name=job_name,
        overrides=all_overrides,
    )

    # Inject the captured choices into the config object
    config = result.return_value
    if hasattr(config, "config_choices"):
        config.config_choices = captured_choices

    # Store the hydra output dir in module-level variable for run_notebook() to use.
    # This is NOT stored on the config because it's a runtime artifact, not a
    # configuration parameter, and adding it to the structured config causes
    # Hydra OmegaConf composition errors.
    global _captured_hydra_output_dir
    _captured_hydra_output_dir = captured_output_dir

    return config

list_multirun_configs

list_multirun_configs() -> list[str]

List all registered multirun configuration names.

Returns:

Type Description
list[str]

List of registered multirun config names.

Source code in src/deriva_ml/execution/multirun_config.py
137
138
139
140
141
142
143
def list_multirun_configs() -> list[str]:
    """List all registered multirun configuration names.

    Returns:
        List of registered multirun config names.
    """
    return list(_multirun_registry.keys())

load_configs

load_configs(
    package_name: str = "configs",
) -> list[str]

Dynamically import all configuration modules from a package.

This function discovers and imports all Python modules in the specified package. Each module is expected to register its configurations with the hydra-zen store as a side effect of being imported.

Parameters:

Name Type Description Default
package_name str

Name of the package containing config modules. Default is "configs" which works for the standard project layout.

'configs'

Returns:

Type Description
list[str]

List of module names that were successfully loaded.

Raises:

Type Description
ImportError

If a config module fails to import.

Example

In your main script or notebook

from deriva_ml.execution import load_configs

load_configs() # Loads from "configs" package

or

load_configs("my_project.configs") # Custom package

Note

The "experiments" module (if present) is loaded last because it typically depends on other configs being registered first.

Source code in src/deriva_ml/execution/base_config.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def load_configs(package_name: str = "configs") -> list[str]:
    """Dynamically import all configuration modules from a package.

    This function discovers and imports all Python modules in the specified
    package. Each module is expected to register its configurations with
    the hydra-zen store as a side effect of being imported.

    Args:
        package_name: Name of the package containing config modules.
            Default is "configs" which works for the standard project layout.

    Returns:
        List of module names that were successfully loaded.

    Raises:
        ImportError: If a config module fails to import.

    Example:
        # In your main script or notebook
        from deriva_ml.execution import load_configs

        load_configs()  # Loads from "configs" package
        # or
        load_configs("my_project.configs")  # Custom package

    Note:
        The "experiments" module (if present) is loaded last because it
        typically depends on other configs being registered first.
    """
    loaded_modules = []

    try:
        package = importlib.import_module(package_name)
    except ImportError:
        # Package doesn't exist, return empty
        return []

    package_dir = Path(package.__file__).parent

    # Collect module names, recursing into subpackages
    modules_to_load = []
    for module_info in pkgutil.iter_modules([str(package_dir)]):
        if module_info.ispkg:
            # Recurse into subpackages (e.g., configs/dev/)
            sub_loaded = load_configs(f"{package_name}.{module_info.name}")
            loaded_modules.extend(sub_loaded)
        else:
            modules_to_load.append(module_info.name)

    # Sort modules but ensure 'experiments' is loaded last
    modules_to_load.sort()
    if "experiments" in modules_to_load:
        modules_to_load.remove("experiments")
        modules_to_load.append("experiments")

    for module_name in modules_to_load:
        full_name = f"{package_name}.{module_name}"
        importlib.import_module(full_name)
        loaded_modules.append(full_name)

    return sorted(loaded_modules)

notebook_config

notebook_config(
    name: str,
    config_class: type[BaseConfig]
    | None = None,
    defaults: dict[str, str]
    | None = None,
    **field_defaults: Any,
) -> Any

Register a notebook configuration with simplified syntax.

This is the recommended way to create notebook configurations. It handles all the hydra-zen boilerplate (builds, store, defaults) automatically.

For simple notebooks that only use BaseConfig fields (deriva_ml, datasets, assets, etc.), just specify which defaults to use. For notebooks with custom parameters, provide a config_class that inherits from BaseConfig.

Parameters:

Name Type Description Default
name str

Configuration name. Used both as the hydra config name and to look up the config in run_notebook().

required
config_class type[BaseConfig] | None

Optional dataclass inheriting from BaseConfig. If None, uses BaseConfig directly (suitable for notebooks that only need the standard fields).

None
defaults dict[str, str] | None

Dict mapping config group names to config names. These override the base defaults. Common groups: - "deriva_ml": Connection config (e.g., "default_deriva", "eye_ai") - "datasets": Dataset config (e.g., "cifar10_training") - "assets": Asset config (e.g., "model_weights") - "workflow": Workflow config (e.g., "default_workflow")

None
**field_defaults Any

Default values for fields in config_class.

{}

Returns:

Type Description
Any

The hydra-zen builds() class, in case you need to reference it directly.

Examples:

Simple notebook using only standard fields:

# configs/roc_analysis.py
from deriva_ml.execution import notebook_config

notebook_config(
    "roc_analysis",
    defaults={"assets": "roc_comparison_probabilities"},
)

Notebook with custom parameters:

# configs/training_analysis.py
from dataclasses import dataclass
from deriva_ml.execution import BaseConfig, notebook_config

@dataclass
class TrainingAnalysisConfig(BaseConfig):
    learning_rate: float = 0.001
    batch_size: int = 32

notebook_config(
    "training_analysis",
    config_class=TrainingAnalysisConfig,
    defaults={"datasets": "cifar10_training"},
    learning_rate=0.01,  # Override default
)
Source code in src/deriva_ml/execution/base_config.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def notebook_config(
    name: str,
    config_class: type[BaseConfig] | None = None,
    defaults: dict[str, str] | None = None,
    **field_defaults: Any,
) -> Any:
    """Register a notebook configuration with simplified syntax.

    This is the recommended way to create notebook configurations. It handles
    all the hydra-zen boilerplate (builds, store, defaults) automatically.

    For simple notebooks that only use BaseConfig fields (deriva_ml, datasets,
    assets, etc.), just specify which defaults to use. For notebooks with
    custom parameters, provide a config_class that inherits from BaseConfig.

    Args:
        name: Configuration name. Used both as the hydra config name and
            to look up the config in run_notebook().
        config_class: Optional dataclass inheriting from BaseConfig. If None,
            uses BaseConfig directly (suitable for notebooks that only need
            the standard fields).
        defaults: Dict mapping config group names to config names. These
            override the base defaults. Common groups:
            - "deriva_ml": Connection config (e.g., "default_deriva", "eye_ai")
            - "datasets": Dataset config (e.g., "cifar10_training")
            - "assets": Asset config (e.g., "model_weights")
            - "workflow": Workflow config (e.g., "default_workflow")
        **field_defaults: Default values for fields in config_class.

    Returns:
        The hydra-zen builds() class, in case you need to reference it directly.

    Examples:
        Simple notebook using only standard fields:

            # configs/roc_analysis.py
            from deriva_ml.execution import notebook_config

            notebook_config(
                "roc_analysis",
                defaults={"assets": "roc_comparison_probabilities"},
            )

        Notebook with custom parameters:

            # configs/training_analysis.py
            from dataclasses import dataclass
            from deriva_ml.execution import BaseConfig, notebook_config

            @dataclass
            class TrainingAnalysisConfig(BaseConfig):
                learning_rate: float = 0.001
                batch_size: int = 32

            notebook_config(
                "training_analysis",
                config_class=TrainingAnalysisConfig,
                defaults={"datasets": "cifar10_training"},
                learning_rate=0.01,  # Override default
            )
    """
    # Use BaseConfig if no custom class provided
    actual_class = config_class or BaseConfig

    # Build the hydra defaults list
    hydra_defaults = ["_self_"]

    # Start with base defaults, then apply overrides
    default_groups = {
        "deriva_ml": "default_deriva",
        "datasets": "default_dataset",
        "assets": "default_asset",
    }
    if defaults:
        default_groups.update(defaults)

    for group, config_name in default_groups.items():
        hydra_defaults.append({group: config_name})

    # Create the hydra-zen builds() class
    config_builds = builds(
        actual_class,
        populate_full_signature=True,
        hydra_defaults=hydra_defaults,
        **field_defaults,
    )

    # Register with hydra-zen store
    store(config_builds, name=name)

    # Also register in our internal registry for run_notebook()
    _notebook_configs[name] = (config_builds, name)

    return config_builds

reset_multirun_state

reset_multirun_state() -> None

Reset the global multirun state.

This is primarily useful for testing to ensure clean state between tests.

Source code in src/deriva_ml/execution/runner.py
695
696
697
698
699
700
701
702
703
704
705
def reset_multirun_state() -> None:
    """Reset the global multirun state.

    This is primarily useful for testing to ensure clean state between tests.
    """
    global _multirun_state
    _multirun_state.parent_execution_rid = None
    _multirun_state.parent_execution = None
    _multirun_state.ml_instance = None
    _multirun_state.job_sequence = 0
    _multirun_state.sweep_dir = None

run_model

run_model(
    deriva_ml: "DerivaMLConfig",
    datasets: list["DatasetSpec"],
    assets: list["RID"],
    description: str,
    workflow: "Workflow",
    model_config: Any,
    dry_run: bool = False,
    ml_class: type["DerivaML"]
    | None = None,
    upload_timeout: int = 600,
    upload_chunk_size: int = 50000000,
    script_config: Any = None,
) -> None

Execute a machine learning model within a DerivaML execution context.

This function serves as the main entry point called by hydra-zen after configuration resolution. It orchestrates the complete execution lifecycle: connecting to Deriva, creating an execution record, running the model, and uploading results.

In multirun mode, this function also: - Creates a parent execution on the first job to group all sweep jobs - Links each child execution to the parent with sequence ordering

Parameters

deriva_ml : DerivaMLConfig Configuration for the DerivaML connection. Contains server URL, catalog ID, credentials, and other connection parameters.

list[DatasetSpec]

Specifications for datasets to use in this execution. Each DatasetSpec identifies a dataset in the Deriva catalog to be made available to the model.

list[RID]

Resource IDs (RIDs) of assets to include in the execution. Typically used for model weight files, pretrained checkpoints, or other artifacts needed by the model.

str

Human-readable description of this execution run. Stored in the Deriva catalog for provenance tracking. In multirun mode, this is also used for the parent execution if running via multirun_config.

Workflow

The workflow definition to associate with this execution. Defines the computational pipeline and its metadata.

Any

A hydra-zen callable that wraps the actual model code. When called with ml_instance and execution arguments, it runs the model training or inference logic.

bool, optional

If True, create the execution record but skip actual model execution. Useful for testing configuration without running expensive computations. Default is False.

type[DerivaML], optional

The DerivaML class (or subclass) to instantiate. If None, uses the base DerivaML class. Use this to instantiate domain-specific classes like EyeAI or GUDMAP.

int, optional

Timeout in seconds for each chunk upload. Default is 600 (10 min). This value is used as both the connect and read timeout. Since urllib3 uses the connect timeout for socket writes, it must be large enough to send a full chunk over the network.

int, optional

Chunk size in bytes for hatrac uploads. Default is 50000000 (50 MB). Larger chunks reduce overhead but require more memory.

Returns

None Results are uploaded to the Deriva catalog as execution outputs.

Examples

This function is typically not called directly, but through hydra:

# From command line:
python deriva_run.py +experiment=cifar10_cnn dry_run=True

# Multirun (creates parent + child executions):
python deriva_run.py --multirun +experiment=cifar10_quick,cifar10_extended

# With a custom DerivaML subclass (in your script):
from functools import partial
run_model_eyeai = partial(run_model, ml_class=EyeAI)
Source code in src/deriva_ml/execution/runner.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def run_model(
    deriva_ml: "DerivaMLConfig",
    datasets: list["DatasetSpec"],
    assets: list["RID"],
    description: str,
    workflow: "Workflow",
    model_config: Any,
    dry_run: bool = False,
    ml_class: type["DerivaML"] | None = None,
    upload_timeout: int = 600,
    upload_chunk_size: int = 50_000_000,
    script_config: Any = None,
) -> None:
    """
    Execute a machine learning model within a DerivaML execution context.

    This function serves as the main entry point called by hydra-zen after
    configuration resolution. It orchestrates the complete execution lifecycle:
    connecting to Deriva, creating an execution record, running the model,
    and uploading results.

    In multirun mode, this function also:
    - Creates a parent execution on the first job to group all sweep jobs
    - Links each child execution to the parent with sequence ordering

    Parameters
    ----------
    deriva_ml : DerivaMLConfig
        Configuration for the DerivaML connection. Contains server URL,
        catalog ID, credentials, and other connection parameters.

    datasets : list[DatasetSpec]
        Specifications for datasets to use in this execution. Each DatasetSpec
        identifies a dataset in the Deriva catalog to be made available to
        the model.

    assets : list[RID]
        Resource IDs (RIDs) of assets to include in the execution. Typically
        used for model weight files, pretrained checkpoints, or other
        artifacts needed by the model.

    description : str
        Human-readable description of this execution run. Stored in the
        Deriva catalog for provenance tracking. In multirun mode, this is
        also used for the parent execution if running via multirun_config.

    workflow : Workflow
        The workflow definition to associate with this execution. Defines
        the computational pipeline and its metadata.

    model_config : Any
        A hydra-zen callable that wraps the actual model code. When called
        with `ml_instance` and `execution` arguments, it runs the model
        training or inference logic.

    dry_run : bool, optional
        If True, create the execution record but skip actual model execution.
        Useful for testing configuration without running expensive computations.
        Default is False.

    ml_class : type[DerivaML], optional
        The DerivaML class (or subclass) to instantiate. If None, uses the
        base DerivaML class. Use this to instantiate domain-specific classes
        like EyeAI or GUDMAP.

    upload_timeout : int, optional
        Timeout in seconds for each chunk upload. Default is 600 (10 min).
        This value is used as both the connect and read timeout. Since urllib3
        uses the connect timeout for socket writes, it must be large enough
        to send a full chunk over the network.

    upload_chunk_size : int, optional
        Chunk size in bytes for hatrac uploads. Default is 50000000 (50 MB).
        Larger chunks reduce overhead but require more memory.

    Returns
    -------
    None
        Results are uploaded to the Deriva catalog as execution outputs.

    Examples
    --------
    This function is typically not called directly, but through hydra:

        # From command line:
        python deriva_run.py +experiment=cifar10_cnn dry_run=True

        # Multirun (creates parent + child executions):
        python deriva_run.py --multirun +experiment=cifar10_quick,cifar10_extended

        # With a custom DerivaML subclass (in your script):
        from functools import partial
        run_model_eyeai = partial(run_model, ml_class=EyeAI)
    """
    global _multirun_state

    # Import here to avoid circular imports
    from deriva_ml import DerivaML
    from deriva_ml.execution import ExecutionConfiguration

    # ---------------------------------------------------------------------------
    # Clear hydra's logging configuration
    # ---------------------------------------------------------------------------
    # Hydra sets up its own logging handlers which can interfere with DerivaML's
    # logging. Remove them to ensure consistent log output.
    root = logging.getLogger()
    for handler in root.handlers[:]:
        root.removeHandler(handler)

    # ---------------------------------------------------------------------------
    # Connect to the Deriva catalog
    # ---------------------------------------------------------------------------
    # Use the provided ml_class or default to DerivaML
    if ml_class is None:
        ml_class = DerivaML

    ml_instance = ml_class.instantiate(deriva_ml)

    # ---------------------------------------------------------------------------
    # Validate that all config RIDs exist in the catalog
    # ---------------------------------------------------------------------------
    # Check dataset RIDs, versions, and asset RIDs before creating an execution
    # or downloading any data. This catches typos, wrong catalogs, and stale
    # versions early with clear error messages.
    from deriva_ml.core.validation import validate_execution_config

    validation_result = validate_execution_config(ml_instance, datasets, assets)
    if not validation_result.is_valid:
        from deriva_ml.core.exceptions import DerivaMLException

        raise DerivaMLException(
            f"Execution config validation failed:\n{validation_result}"
        )
    if validation_result.warnings:
        for warning in validation_result.warnings:
            logging.warning(warning)

    # ---------------------------------------------------------------------------
    # Correct workflow URL from model function source
    # ---------------------------------------------------------------------------
    # When run via `deriva-ml-run`, the Workflow object is created during Hydra
    # config resolution — before the model function is on the call stack. This
    # causes find_caller.py to pick up the CLI entry point (e.g.,
    # .venv/bin/deriva-ml-run) instead of the actual model file. Here we extract
    # the model function's source file and recompute the workflow URL.
    model_source = _resolve_model_source(model_config)
    if model_source is not None:
        try:
            from deriva_ml.execution.workflow import Workflow as _Wf

            new_url, new_checksum = _Wf.get_url_and_checksum(model_source, allow_dirty=workflow.allow_dirty)
            new_git_root = _Wf._get_git_root(model_source)
            workflow.url = new_url
            workflow.checksum = new_checksum
            workflow.git_root = new_git_root
        except Exception as e:
            logger.debug(f"Could not recompute workflow URL/checksum: {e}")

    # ---------------------------------------------------------------------------
    # Handle multirun mode - create parent execution on first job
    # ---------------------------------------------------------------------------
    is_multirun = _is_multirun()
    if is_multirun and _multirun_state.parent_execution is None:
        _create_parent_execution(ml_instance, workflow, description, dry_run)

    # ---------------------------------------------------------------------------
    # Capture Hydra runtime choices for provenance
    # ---------------------------------------------------------------------------
    # The choices dict maps config group names to the selected config names
    # e.g., {"model_config": "cifar10_quick", "datasets": "cifar10_training"}
    # Filter out None values (some Hydra internal groups have None choices)
    config_choices: dict[str, str] = {}
    try:
        hydra_cfg = HydraConfig.get()
        config_choices = {k: v for k, v in hydra_cfg.runtime.choices.items() if v is not None}
    except Exception as e:
        logger.debug(f"HydraConfig not available (not in Hydra context): {e}")

    # ---------------------------------------------------------------------------
    # Create the execution context
    # ---------------------------------------------------------------------------
    # The ExecutionConfiguration bundles together all the inputs for this run:
    # which datasets to use, which assets (model weights, etc.), and metadata.

    # In multirun mode, enhance the description with job info
    job_description = description
    if is_multirun:
        job_num = _get_job_num()
        job_description = f"[Job {job_num}] {description}"

    execution_config = ExecutionConfiguration(
        datasets=datasets,
        assets=assets,
        description=job_description,
        config_choices=config_choices,
    )

    # Create the execution record in the catalog. This generates a unique
    # execution ID and sets up the working directories for this run.
    execution = ml_instance.create_execution(
        execution_config,
        workflow=workflow,
        dry_run=dry_run
    )

    # ---------------------------------------------------------------------------
    # Link to parent execution in multirun mode
    # ---------------------------------------------------------------------------
    if is_multirun and _multirun_state.parent_execution is not None:
        if not dry_run:
            try:
                # Get the current job sequence from the global state
                job_sequence = _multirun_state.job_sequence
                _multirun_state.parent_execution.add_nested_execution(
                    execution,
                    sequence=job_sequence
                )
                logging.info(
                    f"Linked execution {execution.execution_rid} to parent "
                    f"{_multirun_state.parent_execution_rid} (sequence={job_sequence})"
                )
                # Increment the sequence for the next job
                _multirun_state.job_sequence += 1
            except Exception as e:
                logging.warning(f"Failed to link execution to parent: {e}")

    # ---------------------------------------------------------------------------
    # Run the model within the execution context
    # ---------------------------------------------------------------------------
    # The context manager handles setup (downloading datasets, creating output
    # directories) and teardown (recording completion status, timing).
    with execution.execute() as exec_context:
        # Determine which callable to invoke. script_config takes precedence
        # over model_config when both are provided — this allows skill-generated
        # scripts to use a separate config group while sharing the same runner.
        callable_config = script_config if script_config is not None else model_config
        if dry_run:
            if script_config is not None:
                # Script configs handle dry run internally by checking
                # execution=None and printing a preview instead of writing.
                logging.info("Dry run mode: running script preview")
                callable_config(ml_instance=ml_instance, execution=None)
            else:
                # Model configs may not handle execution=None, so skip entirely.
                logging.info("Dry run mode: skipping model execution")
        else:
            callable_config(ml_instance=ml_instance, execution=exec_context)

    # ---------------------------------------------------------------------------
    # Upload results to the catalog
    # ---------------------------------------------------------------------------
    # After the model completes, upload any output files (metrics, predictions,
    # model checkpoints) to the Deriva catalog for permanent storage.
    if not dry_run:
        uploaded_assets = execution.upload_execution_outputs(
            timeout=(upload_timeout, upload_timeout),
            chunk_size=upload_chunk_size,
        )

        # Print summary of uploaded assets
        total_files = sum(len(files) for files in uploaded_assets.values())
        if total_files > 0:
            print(f"\nUploaded {total_files} asset(s) to catalog:")
            for asset_type, files in uploaded_assets.items():
                for f in files:
                    print(f"  - {asset_type}: {f}")

run_notebook

run_notebook(
    config_name: str,
    overrides: list[str] | None = None,
    workflow_name: str | None = None,
    workflow_type: str = "Analysis Notebook",
    ml_class: type[DerivaML]
    | None = None,
    config_package: str = "configs",
) -> tuple[
    DerivaML, Execution, BaseConfig
]

Initialize a notebook with DerivaML execution context.

This is the main entry point for notebooks. It handles all the setup: 1. Loads all config modules from the config package 2. Resolves the hydra-zen configuration 3. Creates the DerivaML connection 4. Creates a workflow and execution context 5. Downloads any specified datasets and assets

Parameters:

Name Type Description Default
config_name str

Name of the notebook configuration (registered via notebook_config() or store()).

required
overrides list[str] | None

Optional list of Hydra override strings (e.g., ["assets=different_assets"]).

None
workflow_name str | None

Name for the workflow. Defaults to config_name.

None
workflow_type str

Type of workflow (default: "Analysis Notebook").

'Analysis Notebook'
ml_class type[DerivaML] | None

Optional DerivaML subclass to use. If None, uses DerivaML.

None
config_package str

Package containing config modules (default: "configs").

'configs'

Returns:

Type Description
DerivaML

Tuple of (ml_instance, execution, config):

Execution
  • ml_instance: Connected DerivaML (or subclass) instance
BaseConfig
  • execution: Execution context with downloaded inputs
tuple[DerivaML, Execution, BaseConfig]
  • config: Resolved configuration object
Example

Simple usage

from deriva_ml.execution import run_notebook

ml, execution, config = run_notebook("roc_analysis")

Access config values

print(config.assets) print(config.deriva_ml.hostname)

Use ml and execution

for asset_table, paths in execution.asset_paths.items(): for path in paths: print(f"Downloaded: {path.file_name}")

At the end of notebook

execution.upload_execution_outputs()

Example with overrides

ml, execution, config = run_notebook( "roc_analysis", overrides=["assets=roc_quick_probabilities"], )

Example with custom ML class

from eye_ai import EyeAI

ml, execution, config = run_notebook( "eye_analysis", ml_class=EyeAI, )

Source code in src/deriva_ml/execution/base_config.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def run_notebook(
    config_name: str,
    overrides: list[str] | None = None,
    workflow_name: str | None = None,
    workflow_type: str = "Analysis Notebook",
    ml_class: type["DerivaML"] | None = None,
    config_package: str = "configs",
) -> tuple["DerivaML", "Execution", BaseConfig]:
    """Initialize a notebook with DerivaML execution context.

    This is the main entry point for notebooks. It handles all the setup:
    1. Loads all config modules from the config package
    2. Resolves the hydra-zen configuration
    3. Creates the DerivaML connection
    4. Creates a workflow and execution context
    5. Downloads any specified datasets and assets

    Args:
        config_name: Name of the notebook configuration (registered via
            notebook_config() or store()).
        overrides: Optional list of Hydra override strings
            (e.g., ["assets=different_assets"]).
        workflow_name: Name for the workflow. Defaults to config_name.
        workflow_type: Type of workflow (default: "Analysis Notebook").
        ml_class: Optional DerivaML subclass to use. If None, uses DerivaML.
        config_package: Package containing config modules (default: "configs").

    Returns:
        Tuple of (ml_instance, execution, config):
        - ml_instance: Connected DerivaML (or subclass) instance
        - execution: Execution context with downloaded inputs
        - config: Resolved configuration object

    Example:
        # Simple usage
        from deriva_ml.execution import run_notebook

        ml, execution, config = run_notebook("roc_analysis")

        # Access config values
        print(config.assets)
        print(config.deriva_ml.hostname)

        # Use ml and execution
        for asset_table, paths in execution.asset_paths.items():
            for path in paths:
                print(f"Downloaded: {path.file_name}")

        # At the end of notebook
        execution.upload_execution_outputs()

    Example with overrides:
        ml, execution, config = run_notebook(
            "roc_analysis",
            overrides=["assets=roc_quick_probabilities"],
        )

    Example with custom ML class:
        from eye_ai import EyeAI

        ml, execution, config = run_notebook(
            "eye_analysis",
            ml_class=EyeAI,
        )
    """
    # Import here to avoid circular imports
    from deriva_ml import DerivaML
    from deriva_ml.execution import Execution, ExecutionConfiguration

    # Load all config modules
    load_configs(config_package)

    # Get the config builds class from our registry or try the store
    if config_name in _notebook_configs:
        config_builds, _ = _notebook_configs[config_name]
    else:
        # Fall back to looking up in hydra store by building a simple config
        # This handles configs registered the old way
        config_builds = DerivaBaseConfig

    # Resolve the configuration
    config = get_notebook_configuration(
        config_builds,
        config_name=config_name,
        overrides=overrides,
    )

    # Create DerivaML instance, passing the hydra output dir captured during
    # config resolution so that hydra YAML configs get uploaded with the execution.
    actual_ml_class = ml_class or DerivaML
    hydra_output_dir = Path(_captured_hydra_output_dir) if _captured_hydra_output_dir else None
    ml = actual_ml_class(
        hostname=config.deriva_ml.hostname,
        catalog_id=config.deriva_ml.catalog_id,
        hydra_runtime_output_dir=hydra_output_dir,
    )

    # Validate that all config RIDs exist in the catalog before proceeding.
    from deriva_ml.core.validation import validate_execution_config

    _datasets = config.datasets if config.datasets else []
    _assets = config.assets if config.assets else []
    validation_result = validate_execution_config(ml, _datasets, _assets)
    if not validation_result.is_valid:
        from deriva_ml.core.exceptions import DerivaMLException

        raise DerivaMLException(
            f"Notebook config validation failed:\n{validation_result}"
        )
    if validation_result.warnings:
        import logging as _logging

        for warning in validation_result.warnings:
            _logging.warning(warning)

    # Create workflow
    actual_workflow_name = workflow_name or config_name.replace("_", " ").title()
    workflow = ml.create_workflow(
        name=actual_workflow_name,
        workflow_type=workflow_type,
        description=config.description or f"Running {config_name}",
    )

    # Create execution configuration
    exec_config = ExecutionConfiguration(
        workflow=workflow,
        datasets=config.datasets if config.datasets else [],
        assets=config.assets if config.assets else [],
        description=config.description or f"Execution of {config_name}",
    )

    # Create execution context (downloads inputs)
    execution = Execution(configuration=exec_config, ml_object=ml, dry_run=config.dry_run)

    return ml, execution, config

with_description

with_description(
    items: list, description: str
) -> Any

Create a hydra-zen config for a list with an attached description.

Use this to add descriptions to configuration values like asset RIDs or dataset specifications. The result is a hydra-zen config that, when instantiated, produces a DescribedList.

Parameters:

Name Type Description Default
items list

List items (e.g., asset RIDs, dataset specs).

required
description str

Human-readable description of this configuration.

required

Returns:

Type Description
Any

A hydra-zen config that instantiates to a DescribedList.

Example

from hydra_zen import store from deriva_ml.execution import with_description

Assets with description

asset_store = store(group="assets") asset_store( ... with_description( ... ["3WMG", "3XPA"], ... "Model weights from quick and extended training runs", ... ), ... name="comparison_weights", ... )

Datasets with description

from deriva_ml.dataset import DatasetSpecConfig datasets_store = store(group="datasets") datasets_store( ... with_description( ... [DatasetSpecConfig(rid="28CT", version="0.21.0")], ... "Complete CIFAR-10 dataset with 10,000 images", ... ), ... name="cifar10_complete", ... )

After instantiation:

config.assets is a DescribedList

config.assets[0] # "3WMG"

config.assets.description # "Model weights from..."

Note

For model configs created with builds(), use the zen_meta parameter instead:

model_store( ... Cifar10CNNConfig, ... name="cifar10_quick", ... epochs=3, ... zen_meta={"description": "Quick training - 3 epochs"}, ... )

Source code in src/deriva_ml/execution/base_config.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def with_description(items: list, description: str) -> Any:
    """Create a hydra-zen config for a list with an attached description.

    Use this to add descriptions to configuration values like asset RIDs
    or dataset specifications. The result is a hydra-zen config that, when
    instantiated, produces a DescribedList.

    Args:
        items: List items (e.g., asset RIDs, dataset specs).
        description: Human-readable description of this configuration.

    Returns:
        A hydra-zen config that instantiates to a DescribedList.

    Example:
        >>> from hydra_zen import store
        >>> from deriva_ml.execution import with_description
        >>>
        >>> # Assets with description
        >>> asset_store = store(group="assets")
        >>> asset_store(
        ...     with_description(
        ...         ["3WMG", "3XPA"],
        ...         "Model weights from quick and extended training runs",
        ...     ),
        ...     name="comparison_weights",
        ... )
        >>>
        >>> # Datasets with description
        >>> from deriva_ml.dataset import DatasetSpecConfig
        >>> datasets_store = store(group="datasets")
        >>> datasets_store(
        ...     with_description(
        ...         [DatasetSpecConfig(rid="28CT", version="0.21.0")],
        ...         "Complete CIFAR-10 dataset with 10,000 images",
        ...     ),
        ...     name="cifar10_complete",
        ... )
        >>>
        >>> # After instantiation:
        >>> # config.assets is a DescribedList
        >>> # config.assets[0]  # "3WMG"
        >>> # config.assets.description  # "Model weights from..."

    Note:
        For model configs created with `builds()`, use the `zen_meta` parameter
        instead:

        >>> model_store(
        ...     Cifar10CNNConfig,
        ...     name="cifar10_quick",
        ...     epochs=3,
        ...     zen_meta={"description": "Quick training - 3 epochs"},
        ... )
    """
    return _DescribedListConfig(items=items, description=description)