Skip to content

Workflow Class

The Workflow class represents a computational workflow in DerivaML. Workflows define the steps and logic for ML experiments and can be associated with Python scripts, Jupyter notebooks, or programmatically defined processes.

Workflow

Bases: BaseModel

Represents a computational workflow in DerivaML.

A workflow defines a computational process or analysis pipeline. Each workflow has a unique identifier, source code location, and type. Workflows are typically associated with Git repositories for version control.

When a Workflow is retrieved via lookup_workflow(rid) or lookup_workflow_by_url(), it is bound to a catalog and its description and workflow_type properties become writable. Setting these properties will update the catalog record. If the catalog is read-only (a snapshot), attempting to set them will raise a DerivaMLException.

Attributes:

Name Type Description
name str

Human-readable name of the workflow.

url str

URI to the workflow source code (typically a GitHub URL).

workflow_type str | list[str]

Type(s) of workflow (must be controlled vocabulary terms). Accepts a single string or a list of strings. Internally normalized to a list. When the workflow is bound to a writable catalog, setting this property will update the catalog record. The new values must be valid terms from the Workflow_Type vocabulary.

version str | None

Version identifier (semantic versioning).

description str | None

Description of workflow purpose and behavior. When the workflow is bound to a writable catalog, setting this property will update the catalog record.

rid RID | None

Resource Identifier if registered in catalog.

checksum str | None

Git hash of workflow source code.

is_notebook bool

Whether workflow is a Jupyter notebook.

Note

The recommended way to create a Workflow is via :meth:DerivaML.create_workflow() <deriva_ml.DerivaML.create_workflow>, which validates the workflow type against the catalog vocabulary::

>>> workflow = ml.create_workflow(
...     name="RNA Analysis",
...     workflow_type="python_notebook",
...     description="RNA sequence analysis"
... )
Example

Create a workflow directly (without catalog validation)::

>>> workflow = Workflow(
...     name="RNA Analysis",
...     url="https://github.com/org/repo/analysis.ipynb",
...     workflow_type="python_notebook",
...     version="1.0.0",
...     description="RNA sequence analysis"
... )

Look up an existing workflow by RID and update its properties::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.description = "Updated description for RNA analysis"
>>> workflow.workflow_type = "python_script"
>>> print(workflow.description)
Updated description for RNA analysis

Look up by URL and update::

>>> url = "https://github.com/org/repo/blob/abc123/analysis.py"
>>> workflow = ml.lookup_workflow_by_url(url)
>>> workflow.description = "New description"

Attempting to update on a read-only catalog raises an error::

>>> snapshot_ml = ml.catalog_snapshot("2023-01-15T10:30:00")
>>> workflow = snapshot_ml.lookup_workflow("2-ABC1")
>>> workflow.description = "New description"  # Raises DerivaMLException
Source code in src/deriva_ml/execution/workflow.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
class Workflow(BaseModel):
    """Represents a computational workflow in DerivaML.

    A workflow defines a computational process or analysis pipeline. Each workflow has
    a unique identifier, source code location, and type. Workflows are typically
    associated with Git repositories for version control.

    When a Workflow is retrieved via ``lookup_workflow(rid)`` or ``lookup_workflow_by_url()``,
    it is bound to a catalog and its ``description`` and ``workflow_type`` properties become
    writable. Setting these properties will update the catalog record. If the catalog is
    read-only (a snapshot), attempting to set them will raise a ``DerivaMLException``.

    Attributes:
        name (str): Human-readable name of the workflow.
        url (str): URI to the workflow source code (typically a GitHub URL).
        workflow_type (str | list[str]): Type(s) of workflow (must be controlled vocabulary terms).
            Accepts a single string or a list of strings. Internally normalized to a list.
            When the workflow is bound to a writable catalog, setting this property
            will update the catalog record. The new values must be valid terms from
            the Workflow_Type vocabulary.
        version (str | None): Version identifier (semantic versioning).
        description (str | None): Description of workflow purpose and behavior.
            When the workflow is bound to a writable catalog, setting this property
            will update the catalog record.
        rid (RID | None): Resource Identifier if registered in catalog.
        checksum (str | None): Git hash of workflow source code.
        is_notebook (bool): Whether workflow is a Jupyter notebook.

    Note:
        The recommended way to create a Workflow is via :meth:`DerivaML.create_workflow()
        <deriva_ml.DerivaML.create_workflow>`, which validates the workflow type against
        the catalog vocabulary::

            >>> workflow = ml.create_workflow(
            ...     name="RNA Analysis",
            ...     workflow_type="python_notebook",
            ...     description="RNA sequence analysis"
            ... )

    Example:
        Create a workflow directly (without catalog validation)::

            >>> workflow = Workflow(
            ...     name="RNA Analysis",
            ...     url="https://github.com/org/repo/analysis.ipynb",
            ...     workflow_type="python_notebook",
            ...     version="1.0.0",
            ...     description="RNA sequence analysis"
            ... )

        Look up an existing workflow by RID and update its properties::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "Updated description for RNA analysis"
            >>> workflow.workflow_type = "python_script"
            >>> print(workflow.description)
            Updated description for RNA analysis

        Look up by URL and update::

            >>> url = "https://github.com/org/repo/blob/abc123/analysis.py"
            >>> workflow = ml.lookup_workflow_by_url(url)
            >>> workflow.description = "New description"

        Attempting to update on a read-only catalog raises an error::

            >>> snapshot_ml = ml.catalog_snapshot("2023-01-15T10:30:00")
            >>> workflow = snapshot_ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "New description"  # Raises DerivaMLException
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    workflow_type: str | list[str]
    description: str | None = None
    url: str | None = None
    version: str | None = None
    rid: RID | None = None
    checksum: str | None = None
    is_notebook: bool = False
    git_root: Path | None = None
    allow_dirty: bool = False

    _ml_instance: "DerivaMLCatalog | None" = PrivateAttr(default=None)
    _logger: logging.Logger = PrivateAttr(default_factory=lambda: logging.getLogger("deriva_ml"))

    @field_validator("workflow_type", mode="before")
    @classmethod
    def _normalize_workflow_type(cls, v: str | list[str]) -> list[str]:
        """Normalize workflow_type to always be a list of strings."""
        if isinstance(v, str):
            return [v]
        return list(v)

    def __setattr__(self, name: str, value: Any) -> None:
        """Override setattr to intercept description and workflow_type updates.

        When the workflow is bound to a catalog (via lookup_workflow), setting
        the ``description`` or ``workflow_type`` properties will update the catalog
        record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

        Args:
            name: The attribute name being set.
            value: The value to set.

        Raises:
            DerivaMLException: If attempting to set properties on a read-only
                catalog (snapshot), or if workflow_type is not a valid vocabulary term.

        Examples:
            Update description::

                >>> workflow = ml.lookup_workflow("2-ABC1")
                >>> workflow.description = "Updated description"

            Update workflow type::

                >>> workflow = ml.lookup_workflow("2-ABC1")
                >>> workflow.workflow_type = "python_notebook"
        """
        # Only intercept updates after full initialization
        # Use __dict__ check to avoid recursion during Pydantic model construction
        if (
            "__pydantic_private__" in self.__dict__
            and self.__dict__.get("__pydantic_private__", {}).get("_ml_instance") is not None
        ):
            if name == "description":
                self._update_description_in_catalog(value)
            elif name == "workflow_type":
                # Normalize to list
                if isinstance(value, str):
                    value = [value]
                self._update_workflow_types_in_catalog(value)
        super().__setattr__(name, value)

    def _check_writable_catalog(self, operation: str) -> None:
        """Check that the catalog is writable and workflow is registered.

        Args:
            operation: Description of the operation being attempted.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                or if the catalog is read-only (a snapshot).
        """
        # Import here to avoid circular dependency at module load
        import importlib
        _deriva_core = importlib.import_module("deriva.core")
        ErmrestSnapshot = _deriva_core.ErmrestSnapshot

        if self.rid is None:
            raise DerivaMLException(
                f"Cannot {operation}: Workflow is not registered in the catalog (no RID)"
            )

        if isinstance(self._ml_instance.catalog, ErmrestSnapshot):
            raise DerivaMLException(
                f"Cannot {operation} on a read-only catalog snapshot. "
                "Use a writable catalog connection instead."
            )

    def _update_description_in_catalog(self, new_description: str | None) -> None:
        """Update the description field in the catalog.

        This internal method is called when the description property is set
        on a catalog-bound Workflow object.

        Args:
            new_description: The new description value.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                or if the catalog is read-only (a snapshot).
        """
        self._check_writable_catalog("update description")

        # Update the catalog record
        pb = self._ml_instance.pathBuilder()
        workflow_path = pb.schemas[self._ml_instance.ml_schema].Workflow
        workflow_path.update([{"RID": self.rid, "Description": new_description}])

    def _get_workflow_type_association_table(self):
        """Get the association table for workflow types.

        Returns:
            Tuple of (table_name, table_path) for the Workflow-Workflow_Type association table.
        """
        atable_name = "Workflow_Workflow_Type"
        pb = self._ml_instance.pathBuilder()
        atable_path = pb.schemas[self._ml_instance.ml_schema].tables[atable_name]
        return atable_name, atable_path

    @property
    def workflow_types(self) -> list[str]:
        """Get the workflow types from the catalog.

        This property fetches the current workflow types directly from the catalog,
        ensuring consistency when multiple Workflow instances reference the same
        workflow or when types are modified externally.

        When not bound to a catalog, returns the local ``workflow_type`` field.

        Returns:
            List of workflow type term names from the Workflow_Type vocabulary.
        """
        if self._ml_instance is not None:
            _, atable_path = self._get_workflow_type_association_table()
            wt_types = (
                atable_path.filter(atable_path.Workflow == self.rid)
                .attributes(atable_path.Workflow_Type)
                .fetch()
            )
            return [wt[MLVocab.workflow_type] for wt in wt_types]
        return list(self.workflow_type)

    def add_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
        """Add a workflow type to this workflow.

        Adds a type term to this workflow if it's not already present. The term must
        exist in the Workflow_Type vocabulary.

        Args:
            workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only, or the term doesn't exist.
        """
        self._check_writable_catalog("add workflow_type")

        if isinstance(workflow_type, VocabularyTerm):
            vocab_term = workflow_type
        else:
            vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

        if vocab_term.name in self.workflow_types:
            return

        _, atable_path = self._get_workflow_type_association_table()
        atable_path.insert([{MLVocab.workflow_type: vocab_term.name, "Workflow": self.rid}])

    def remove_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
        """Remove a workflow type from this workflow.

        Removes a type term from this workflow if it's currently associated.

        Args:
            workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only, or the term doesn't exist.
        """
        self._check_writable_catalog("remove workflow_type")

        if isinstance(workflow_type, VocabularyTerm):
            vocab_term = workflow_type
        else:
            vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

        if vocab_term.name not in self.workflow_types:
            return

        _, atable_path = self._get_workflow_type_association_table()
        atable_path.filter(
            (atable_path.Workflow == self.rid) & (atable_path.Workflow_Type == vocab_term.name)
        ).delete()

    def add_workflow_types(self, workflow_types: str | VocabularyTerm | list[str | VocabularyTerm]) -> None:
        """Add one or more workflow types to this workflow.

        Args:
            workflow_types: Single term or list of terms. Can be strings (term names)
                or VocabularyTerm objects.

        Raises:
            DerivaMLException: If any term doesn't exist in the Workflow_Type vocabulary.
        """
        types_to_add = [workflow_types] if not isinstance(workflow_types, list) else workflow_types

        for term in types_to_add:
            self.add_workflow_type(term)

    def _update_workflow_types_in_catalog(self, new_workflow_types: list[str]) -> None:
        """Replace all workflow types in the catalog with the given list.

        This internal method is called when the workflow_type property is set
        on a catalog-bound Workflow object. Each new type must be a valid
        term from the Workflow_Type vocabulary.

        Args:
            new_workflow_types: List of new workflow type names.

        Raises:
            DerivaMLException: If the workflow is not registered (no RID),
                the catalog is read-only (a snapshot), or any workflow_type
                is not a valid vocabulary term.
        """
        self._check_writable_catalog("update workflow_type")

        # Validate all new types exist in vocabulary
        for wt in new_workflow_types:
            self._ml_instance.lookup_term(MLVocab.workflow_type, wt)

        # Delete all existing type associations
        _, atable_path = self._get_workflow_type_association_table()
        atable_path.filter(atable_path.Workflow == self.rid).delete()

        # Insert new type associations
        if new_workflow_types:
            atable_path.insert([
                {MLVocab.workflow_type: wt, "Workflow": self.rid}
                for wt in new_workflow_types
            ])

    @model_validator(mode="after")
    def setup_url_checksum(self) -> "Workflow":
        """Creates a workflow from the current execution context.

        Identifies the currently executing program (script or notebook) and creates
        a workflow definition. Automatically determines the Git repository information
        and source code checksum.

        The behavior can be configured using environment variables:
            - DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
            - DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
            - DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

        Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true):
            - DERIVA_MCP_VERSION: Semantic version of the Docker image
            - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time
            - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier)
            - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

        Args:

        Returns:
            Workflow: New workflow instance with detected Git information.

        Raises:
            DerivaMLException: If not in a Git repository or detection fails (non-Docker).

        Example:
            >>> workflow = Workflow.create_workflow(
            ...     name="Sample Analysis",
            ...     workflow_type="python_script",
            ...     description="Process sample data"
            ... )
        """
        self._logger = logging.getLogger("deriva_ml")

        # Check if running in Docker container (no git repo available)
        if os.environ.get("DERIVA_MCP_IN_DOCKER", "").lower() == "true":
            # Use Docker image metadata for provenance
            self.version = self.version or os.environ.get("DERIVA_MCP_VERSION", "")

            # Use image digest as checksum (unique identifier for the container)
            # Fall back to git commit if digest not available
            self.checksum = self.checksum or (
                os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
                or os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
            )

            # Build URL pointing to the Docker image or source repo
            if not self.url:
                image_name = os.environ.get(
                    "DERIVA_MCP_IMAGE_NAME",
                    "ghcr.io/informatics-isi-edu/deriva-ml-mcp",
                )
                image_digest = os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
                if image_digest:
                    # URL format: image@sha256:digest
                    self.url = f"{image_name}@{image_digest}"
                else:
                    # Fall back to source repo with git commit
                    source_url = "https://github.com/informatics-isi-edu/deriva-ml-mcp"
                    git_commit = os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
                    self.url = f"{source_url}/commit/{git_commit}" if git_commit else source_url

            return self

        # Check to see if execution file info is being passed in by calling program (notebook runner)
        if "DERIVA_ML_WORKFLOW_URL" in os.environ:
            self.url = os.environ["DERIVA_ML_WORKFLOW_URL"]
            self.checksum = os.environ.get("DERIVA_ML_WORKFLOW_CHECKSUM", "")
            notebook_path = os.environ.get("DERIVA_ML_NOTEBOOK_PATH")
            if notebook_path:
                self.git_root = Workflow._get_git_root(Path(notebook_path))
            self.is_notebook = True
            return self

        # Standard git detection for local development
        # Check env var for allow_dirty (set by CLI --allow-dirty flag or dry-run mode)
        if os.environ.get("DERIVA_ML_ALLOW_DIRTY", "").lower() == "true":
            self.allow_dirty = True
        if os.environ.get("DERIVA_ML_DRY_RUN", "").lower() == "true":
            self.allow_dirty = True

        if not self.url:
            path, self.is_notebook = Workflow._get_python_script()
            self.url, self.checksum = Workflow.get_url_and_checksum(path, allow_dirty=self.allow_dirty)
            self.git_root = Workflow._get_git_root(path)

        self.version = self.version or Workflow.get_dynamic_version(root=str(self.git_root or Path.cwd()))
        return self

    @staticmethod
    def get_url_and_checksum(executable_path: Path, allow_dirty: bool = False) -> tuple[str, str]:
        """Determines the Git URL and checksum for a file.

        Computes the Git repository URL and file checksum for the specified path.
        For notebooks, strips cell outputs before computing the checksum.

        Args:
            executable_path: Path to the workflow file.
            allow_dirty: If True, log a warning instead of raising an error
                when the file has uncommitted changes. Defaults to False.

        Returns:
            tuple[str, str]: (GitHub URL, Git object hash)

        Raises:
            DerivaMLException: If not in a Git repository.
            DerivaMLDirtyWorkflowError: If the file has uncommitted changes
                and allow_dirty is False.

        Example:
            >>> url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb"))
            >>> print(f"URL: {url}")
            >>> print(f"Checksum: {checksum}")
        """
        try:
            subprocess.run(
                ["git", "rev-parse", "--is-inside-work-tree"],
                capture_output=True,
                text=True,
                check=True,
            )
        except subprocess.CalledProcessError:
            raise DerivaMLException("Not executing in a Git repository.")

        github_url, is_dirty = Workflow._github_url(executable_path)

        if is_dirty:
            if allow_dirty:
                logging.getLogger("deriva_ml").warning(
                    f"File {executable_path} has uncommitted changes. "
                    f"Proceeding with --allow-dirty override."
                )
            else:
                raise DerivaMLDirtyWorkflowError(str(executable_path))

        # If you are in a notebook, strip out the outputs before computing the checksum.
        if executable_path != "REPL":
            if "ipynb" == executable_path.suffix:
                strip_proc = subprocess.run(
                    ["nbstripout", "-t", str(executable_path)],
                    capture_output=True,
                )
                hash_proc = subprocess.run(
                    ["git", "hash-object", "--stdin"],
                    input=strip_proc.stdout,
                    capture_output=True,
                    text=True,
                )
                checksum = hash_proc.stdout.strip()
            else:
                checksum = subprocess.run(
                    ["git", "hash-object", str(executable_path)],
                    capture_output=True,
                    text=True,
                    check=False,
                ).stdout.strip()
        else:
            checksum = "1"
        return github_url, checksum

    @staticmethod
    def _get_git_root(executable_path: Path) -> str | None:
        """Gets the root directory of the Git repository.

        Args:
            executable_path: Path to check for Git repository.

        Returns:
            str | None: Absolute path to repository root, or None if not in repository.
        """
        try:
            result = subprocess.run(
                ["git", "rev-parse", "--show-toplevel"],
                cwd=executable_path.parent,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
                text=True,
                check=True,
            )
            return result.stdout.strip()
        except subprocess.CalledProcessError:
            return None  # Not in a git repository

    @staticmethod
    def _check_nbstrip_status() -> None:
        """Checks if nbstripout is installed and configured.

        Verifies that the nbstripout tool is available and properly installed in the
        Git repository. Issues warnings if setup is incomplete.
        """
        logger = logging.getLogger("deriva_ml")
        try:
            if subprocess.run(
                ["nbstripout", "--is-installed"],
                check=False,
                capture_output=True,
            ).returncode:
                logger.warning("nbstripout is not installed in repository. Please run nbstripout --install")
        except (subprocess.CalledProcessError, FileNotFoundError):
            logger.warning("nbstripout is not found. Please install it with: pip install nbstripout")

    @staticmethod
    def _get_notebook_path() -> Path | None:
        """Gets the path of the currently executing notebook.

        Returns:
            Path | None: Absolute path to current notebook, or None if not in notebook.
        """

        server, session = Workflow._get_notebook_session()

        if server and session:
            relative_path = session["notebook"]["path"]
            # Join the notebook directory with the relative path
            return Path(server["root_dir"]) / relative_path
        else:
            return None

    @staticmethod
    def _get_notebook_session() -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
        """Return the absolute path of the current notebook."""
        # Get the kernel's connection file and extract the kernel ID
        try:
            if not (connection_file := Path(get_kernel_connection()).name):
                return None, None
        except RuntimeError:
            return None, None

        # Extract kernel ID from connection filename.
        # Standard Jupyter format: "kernel-<kernel_id>.json"
        # PyCharm/other formats may vary: "<kernel_id>.json" or other patterns
        kernel_id = None
        if connection_file.startswith("kernel-") and "-" in connection_file:
            # Standard format: kernel-<uuid>.json
            parts = connection_file.split("-", 1)
            if len(parts) > 1:
                kernel_id = parts[1].rsplit(".", 1)[0]
        else:
            # Fallback: assume filename (without extension) is the kernel ID
            kernel_id = connection_file.rsplit(".", 1)[0]

        if not kernel_id:
            return None, None

        # Look through the running server sessions to find the matching kernel ID
        for server in get_servers():
            try:
                # If a token is required for authentication, include it in headers
                token = server.get("token", "")
                headers = {}
                if token:
                    headers["Authorization"] = f"token {token}"

                try:
                    sessions_url = server["url"] + "api/sessions"
                    response = requests.get(sessions_url, headers=headers)
                    response.raise_for_status()
                    sessions = response.json()
                except RequestException as e:
                    raise e
                for sess in sessions:
                    if sess["kernel"]["id"] == kernel_id:
                        return server, sess
            except Exception as _e:
                # Ignore servers we can't connect to.
                pass
        return None, None

    @staticmethod
    def _in_repl():
        # Standard Python interactive mode
        if hasattr(sys, "ps1"):
            return True

        # Interactive mode forced by -i
        if sys.flags.interactive:
            return True

        # IPython / Jupyter detection
        try:
            from IPython import get_ipython

            if get_ipython() is not None:
                return True
        except ImportError:
            pass

        return False

    @staticmethod
    def _get_python_script() -> tuple[Path, bool]:
        """Return the path to the currently executing script"""
        is_notebook = Workflow._get_notebook_path() is not None
        return Path(_get_calling_module()), is_notebook

    @staticmethod
    def _github_url(executable_path: Path) -> tuple[str, bool]:
        """Return a GitHub URL for the latest commit of the script from which this routine is called.

        This routine is used to be called from a script or notebook (e.g., python -m file). It assumes that
        the file is in a GitHub repository and committed.  It returns a URL to the last commited version of this
        file in GitHub.

        Returns: A tuple with the gethub_url and a boolean to indicate if uncommited changes
            have been made to the file.

        """

        # Get repo URL from local GitHub repo.
        if executable_path == "REPL":
            return "REPL", True
        try:
            result = subprocess.run(
                ["git", "remote", "get-url", "origin"],
                capture_output=True,
                text=True,
                cwd=executable_path.parent,
            )
            github_url = result.stdout.strip().removesuffix(".git")
        except subprocess.CalledProcessError:
            raise DerivaMLException("No GIT remote found")

        # Find the root directory for the repository
        repo_root = Workflow._get_git_root(executable_path)

        # Now check to see if a file has been modified since the last commit.
        try:
            result = subprocess.run(
                ["git", "status", "--porcelain"],
                cwd=executable_path.parent,
                capture_output=True,
                text=True,
                check=False,
            )
            is_dirty = bool("M " in result.stdout.strip())  # Returns True if the output indicates a modified file
        except subprocess.CalledProcessError:
            is_dirty = False  # If the Git command fails, assume no changes

        """Get SHA-1 hash of latest commit of the file in the repository"""

        result = subprocess.run(
            ["git", "log", "-n", "1", "--pretty=format:%H", executable_path],
            cwd=repo_root,
            capture_output=True,
            text=True,
            check=False,
        )
        sha = result.stdout.strip()
        url = f"{github_url}/blob/{sha}/{executable_path.relative_to(repo_root)}"
        return url, is_dirty

    @staticmethod
    def get_dynamic_version(root: str | os.PathLike | None = None) -> str:
        """
        Return a dynamic version string based on VCS state (setuptools_scm),
        including dirty/uncommitted changes if configured.

        Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.
        """
        # 1) Tell setuptools to use stdlib distutils (or no override) to avoid
        #    the '_distutils_hack' assertion you hit.
        os.environ.setdefault("SETUPTOOLS_USE_DISTUTILS", "stdlib")

        warnings.filterwarnings(
            "ignore",
            category=UserWarning,
            module="_distutils_hack",
        )
        try:
            from setuptools_scm import get_version
        except Exception as e:  # ImportError or anything environment-specific
            raise RuntimeError(f"setuptools_scm is not available: {e}") from e

        if root is None:
            # Adjust this to point at your repo root if needed
            root = Path(__file__).resolve().parents[1]

        return get_version(root=root)

workflow_types property

workflow_types: list[str]

Get the workflow types from the catalog.

This property fetches the current workflow types directly from the catalog, ensuring consistency when multiple Workflow instances reference the same workflow or when types are modified externally.

When not bound to a catalog, returns the local workflow_type field.

Returns:

Type Description
list[str]

List of workflow type term names from the Workflow_Type vocabulary.

__setattr__

__setattr__(
    name: str, value: Any
) -> None

Override setattr to intercept description and workflow_type updates.

When the workflow is bound to a catalog (via lookup_workflow), setting the description or workflow_type properties will update the catalog record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

Parameters:

Name Type Description Default
name str

The attribute name being set.

required
value Any

The value to set.

required

Raises:

Type Description
DerivaMLException

If attempting to set properties on a read-only catalog (snapshot), or if workflow_type is not a valid vocabulary term.

Examples:

Update description::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.description = "Updated description"

Update workflow type::

>>> workflow = ml.lookup_workflow("2-ABC1")
>>> workflow.workflow_type = "python_notebook"
Source code in src/deriva_ml/execution/workflow.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def __setattr__(self, name: str, value: Any) -> None:
    """Override setattr to intercept description and workflow_type updates.

    When the workflow is bound to a catalog (via lookup_workflow), setting
    the ``description`` or ``workflow_type`` properties will update the catalog
    record. If the catalog is read-only (a snapshot), a DerivaMLException is raised.

    Args:
        name: The attribute name being set.
        value: The value to set.

    Raises:
        DerivaMLException: If attempting to set properties on a read-only
            catalog (snapshot), or if workflow_type is not a valid vocabulary term.

    Examples:
        Update description::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.description = "Updated description"

        Update workflow type::

            >>> workflow = ml.lookup_workflow("2-ABC1")
            >>> workflow.workflow_type = "python_notebook"
    """
    # Only intercept updates after full initialization
    # Use __dict__ check to avoid recursion during Pydantic model construction
    if (
        "__pydantic_private__" in self.__dict__
        and self.__dict__.get("__pydantic_private__", {}).get("_ml_instance") is not None
    ):
        if name == "description":
            self._update_description_in_catalog(value)
        elif name == "workflow_type":
            # Normalize to list
            if isinstance(value, str):
                value = [value]
            self._update_workflow_types_in_catalog(value)
    super().__setattr__(name, value)

add_workflow_type

add_workflow_type(
    workflow_type: str | VocabularyTerm,
) -> None

Add a workflow type to this workflow.

Adds a type term to this workflow if it's not already present. The term must exist in the Workflow_Type vocabulary.

Parameters:

Name Type Description Default
workflow_type str | VocabularyTerm

Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

required

Raises:

Type Description
DerivaMLException

If the workflow is not registered (no RID), the catalog is read-only, or the term doesn't exist.

Source code in src/deriva_ml/execution/workflow.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def add_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
    """Add a workflow type to this workflow.

    Adds a type term to this workflow if it's not already present. The term must
    exist in the Workflow_Type vocabulary.

    Args:
        workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

    Raises:
        DerivaMLException: If the workflow is not registered (no RID),
            the catalog is read-only, or the term doesn't exist.
    """
    self._check_writable_catalog("add workflow_type")

    if isinstance(workflow_type, VocabularyTerm):
        vocab_term = workflow_type
    else:
        vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

    if vocab_term.name in self.workflow_types:
        return

    _, atable_path = self._get_workflow_type_association_table()
    atable_path.insert([{MLVocab.workflow_type: vocab_term.name, "Workflow": self.rid}])

add_workflow_types

add_workflow_types(
    workflow_types: str
    | VocabularyTerm
    | list[str | VocabularyTerm],
) -> None

Add one or more workflow types to this workflow.

Parameters:

Name Type Description Default
workflow_types str | VocabularyTerm | list[str | VocabularyTerm]

Single term or list of terms. Can be strings (term names) or VocabularyTerm objects.

required

Raises:

Type Description
DerivaMLException

If any term doesn't exist in the Workflow_Type vocabulary.

Source code in src/deriva_ml/execution/workflow.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def add_workflow_types(self, workflow_types: str | VocabularyTerm | list[str | VocabularyTerm]) -> None:
    """Add one or more workflow types to this workflow.

    Args:
        workflow_types: Single term or list of terms. Can be strings (term names)
            or VocabularyTerm objects.

    Raises:
        DerivaMLException: If any term doesn't exist in the Workflow_Type vocabulary.
    """
    types_to_add = [workflow_types] if not isinstance(workflow_types, list) else workflow_types

    for term in types_to_add:
        self.add_workflow_type(term)

get_dynamic_version staticmethod

get_dynamic_version(
    root: str | PathLike | None = None,
) -> str

Return a dynamic version string based on VCS state (setuptools_scm), including dirty/uncommitted changes if configured.

Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.

Source code in src/deriva_ml/execution/workflow.py
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
@staticmethod
def get_dynamic_version(root: str | os.PathLike | None = None) -> str:
    """
    Return a dynamic version string based on VCS state (setuptools_scm),
    including dirty/uncommitted changes if configured.

    Works under uv / Python 3.10+ by forcing setuptools to use stdlib distutils.
    """
    # 1) Tell setuptools to use stdlib distutils (or no override) to avoid
    #    the '_distutils_hack' assertion you hit.
    os.environ.setdefault("SETUPTOOLS_USE_DISTUTILS", "stdlib")

    warnings.filterwarnings(
        "ignore",
        category=UserWarning,
        module="_distutils_hack",
    )
    try:
        from setuptools_scm import get_version
    except Exception as e:  # ImportError or anything environment-specific
        raise RuntimeError(f"setuptools_scm is not available: {e}") from e

    if root is None:
        # Adjust this to point at your repo root if needed
        root = Path(__file__).resolve().parents[1]

    return get_version(root=root)

get_url_and_checksum staticmethod

get_url_and_checksum(
    executable_path: Path,
    allow_dirty: bool = False,
) -> tuple[str, str]

Determines the Git URL and checksum for a file.

Computes the Git repository URL and file checksum for the specified path. For notebooks, strips cell outputs before computing the checksum.

Parameters:

Name Type Description Default
executable_path Path

Path to the workflow file.

required
allow_dirty bool

If True, log a warning instead of raising an error when the file has uncommitted changes. Defaults to False.

False

Returns:

Type Description
tuple[str, str]

tuple[str, str]: (GitHub URL, Git object hash)

Raises:

Type Description
DerivaMLException

If not in a Git repository.

DerivaMLDirtyWorkflowError

If the file has uncommitted changes and allow_dirty is False.

Example

url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb")) print(f"URL: {url}") print(f"Checksum: {checksum}")

Source code in src/deriva_ml/execution/workflow.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
@staticmethod
def get_url_and_checksum(executable_path: Path, allow_dirty: bool = False) -> tuple[str, str]:
    """Determines the Git URL and checksum for a file.

    Computes the Git repository URL and file checksum for the specified path.
    For notebooks, strips cell outputs before computing the checksum.

    Args:
        executable_path: Path to the workflow file.
        allow_dirty: If True, log a warning instead of raising an error
            when the file has uncommitted changes. Defaults to False.

    Returns:
        tuple[str, str]: (GitHub URL, Git object hash)

    Raises:
        DerivaMLException: If not in a Git repository.
        DerivaMLDirtyWorkflowError: If the file has uncommitted changes
            and allow_dirty is False.

    Example:
        >>> url, checksum = Workflow.get_url_and_checksum(Path("analysis.ipynb"))
        >>> print(f"URL: {url}")
        >>> print(f"Checksum: {checksum}")
    """
    try:
        subprocess.run(
            ["git", "rev-parse", "--is-inside-work-tree"],
            capture_output=True,
            text=True,
            check=True,
        )
    except subprocess.CalledProcessError:
        raise DerivaMLException("Not executing in a Git repository.")

    github_url, is_dirty = Workflow._github_url(executable_path)

    if is_dirty:
        if allow_dirty:
            logging.getLogger("deriva_ml").warning(
                f"File {executable_path} has uncommitted changes. "
                f"Proceeding with --allow-dirty override."
            )
        else:
            raise DerivaMLDirtyWorkflowError(str(executable_path))

    # If you are in a notebook, strip out the outputs before computing the checksum.
    if executable_path != "REPL":
        if "ipynb" == executable_path.suffix:
            strip_proc = subprocess.run(
                ["nbstripout", "-t", str(executable_path)],
                capture_output=True,
            )
            hash_proc = subprocess.run(
                ["git", "hash-object", "--stdin"],
                input=strip_proc.stdout,
                capture_output=True,
                text=True,
            )
            checksum = hash_proc.stdout.strip()
        else:
            checksum = subprocess.run(
                ["git", "hash-object", str(executable_path)],
                capture_output=True,
                text=True,
                check=False,
            ).stdout.strip()
    else:
        checksum = "1"
    return github_url, checksum

remove_workflow_type

remove_workflow_type(
    workflow_type: str | VocabularyTerm,
) -> None

Remove a workflow type from this workflow.

Removes a type term from this workflow if it's currently associated.

Parameters:

Name Type Description Default
workflow_type str | VocabularyTerm

Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

required

Raises:

Type Description
DerivaMLException

If the workflow is not registered (no RID), the catalog is read-only, or the term doesn't exist.

Source code in src/deriva_ml/execution/workflow.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def remove_workflow_type(self, workflow_type: str | VocabularyTerm) -> None:
    """Remove a workflow type from this workflow.

    Removes a type term from this workflow if it's currently associated.

    Args:
        workflow_type: Term name (string) or VocabularyTerm object from Workflow_Type vocabulary.

    Raises:
        DerivaMLException: If the workflow is not registered (no RID),
            the catalog is read-only, or the term doesn't exist.
    """
    self._check_writable_catalog("remove workflow_type")

    if isinstance(workflow_type, VocabularyTerm):
        vocab_term = workflow_type
    else:
        vocab_term = self._ml_instance.lookup_term(MLVocab.workflow_type, workflow_type)

    if vocab_term.name not in self.workflow_types:
        return

    _, atable_path = self._get_workflow_type_association_table()
    atable_path.filter(
        (atable_path.Workflow == self.rid) & (atable_path.Workflow_Type == vocab_term.name)
    ).delete()

setup_url_checksum

setup_url_checksum() -> 'Workflow'

Creates a workflow from the current execution context.

Identifies the currently executing program (script or notebook) and creates a workflow definition. Automatically determines the Git repository information and source code checksum.

The behavior can be configured using environment variables
  • DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
  • DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
  • DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true): - DERIVA_MCP_VERSION: Semantic version of the Docker image - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier) - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

Args:

Returns:

Name Type Description
Workflow 'Workflow'

New workflow instance with detected Git information.

Raises:

Type Description
DerivaMLException

If not in a Git repository or detection fails (non-Docker).

Example

workflow = Workflow.create_workflow( ... name="Sample Analysis", ... workflow_type="python_script", ... description="Process sample data" ... )

Source code in src/deriva_ml/execution/workflow.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
@model_validator(mode="after")
def setup_url_checksum(self) -> "Workflow":
    """Creates a workflow from the current execution context.

    Identifies the currently executing program (script or notebook) and creates
    a workflow definition. Automatically determines the Git repository information
    and source code checksum.

    The behavior can be configured using environment variables:
        - DERIVA_ML_WORKFLOW_URL: Override the detected workflow URL
        - DERIVA_ML_WORKFLOW_CHECKSUM: Override the computed checksum
        - DERIVA_MCP_IN_DOCKER: Set to "true" to use Docker metadata instead of git

    Docker environment variables (used when DERIVA_MCP_IN_DOCKER=true):
        - DERIVA_MCP_VERSION: Semantic version of the Docker image
        - DERIVA_MCP_GIT_COMMIT: Git commit hash at build time
        - DERIVA_MCP_IMAGE_DIGEST: Docker image digest (unique identifier)
        - DERIVA_MCP_IMAGE_NAME: Docker image name (e.g., ghcr.io/org/repo)

    Args:

    Returns:
        Workflow: New workflow instance with detected Git information.

    Raises:
        DerivaMLException: If not in a Git repository or detection fails (non-Docker).

    Example:
        >>> workflow = Workflow.create_workflow(
        ...     name="Sample Analysis",
        ...     workflow_type="python_script",
        ...     description="Process sample data"
        ... )
    """
    self._logger = logging.getLogger("deriva_ml")

    # Check if running in Docker container (no git repo available)
    if os.environ.get("DERIVA_MCP_IN_DOCKER", "").lower() == "true":
        # Use Docker image metadata for provenance
        self.version = self.version or os.environ.get("DERIVA_MCP_VERSION", "")

        # Use image digest as checksum (unique identifier for the container)
        # Fall back to git commit if digest not available
        self.checksum = self.checksum or (
            os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
            or os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
        )

        # Build URL pointing to the Docker image or source repo
        if not self.url:
            image_name = os.environ.get(
                "DERIVA_MCP_IMAGE_NAME",
                "ghcr.io/informatics-isi-edu/deriva-ml-mcp",
            )
            image_digest = os.environ.get("DERIVA_MCP_IMAGE_DIGEST", "")
            if image_digest:
                # URL format: image@sha256:digest
                self.url = f"{image_name}@{image_digest}"
            else:
                # Fall back to source repo with git commit
                source_url = "https://github.com/informatics-isi-edu/deriva-ml-mcp"
                git_commit = os.environ.get("DERIVA_MCP_GIT_COMMIT", "")
                self.url = f"{source_url}/commit/{git_commit}" if git_commit else source_url

        return self

    # Check to see if execution file info is being passed in by calling program (notebook runner)
    if "DERIVA_ML_WORKFLOW_URL" in os.environ:
        self.url = os.environ["DERIVA_ML_WORKFLOW_URL"]
        self.checksum = os.environ.get("DERIVA_ML_WORKFLOW_CHECKSUM", "")
        notebook_path = os.environ.get("DERIVA_ML_NOTEBOOK_PATH")
        if notebook_path:
            self.git_root = Workflow._get_git_root(Path(notebook_path))
        self.is_notebook = True
        return self

    # Standard git detection for local development
    # Check env var for allow_dirty (set by CLI --allow-dirty flag or dry-run mode)
    if os.environ.get("DERIVA_ML_ALLOW_DIRTY", "").lower() == "true":
        self.allow_dirty = True
    if os.environ.get("DERIVA_ML_DRY_RUN", "").lower() == "true":
        self.allow_dirty = True

    if not self.url:
        path, self.is_notebook = Workflow._get_python_script()
        self.url, self.checksum = Workflow.get_url_and_checksum(path, allow_dirty=self.allow_dirty)
        self.git_root = Workflow._get_git_root(path)

    self.version = self.version or Workflow.get_dynamic_version(root=str(self.git_root or Path.cwd()))
    return self