DerivaML Execution¶
DerivaML is a class library built on the Deriva Scientific Asset management system that is designed to help simplify a number of the basic operations associated with building and testing ML libraries based on common toolkits such as TensorFlow. This notebook reviews the basic features of the DerivaML library.
%load_ext autoreload
%autoreload 2
import builtins
from deriva.core.utils.globus_auth_utils import GlobusNativeLogin
from deriva_ml import ExecutionConfiguration, MLVocab, DerivaSystemColumns, DatasetSpec
from deriva_ml.demo_catalog import create_demo_catalog, DemoML
from IPython.display import display, Markdown, JSON
import itertools
import pandas as pd
Set the details for the catalog we want and authenticate to the server if needed.
hostname = 'dev.eye-ai.org'
domain_schema = 'demo-schema'
gnl = GlobusNativeLogin(host=hostname)
if gnl.is_logged_in([hostname]):
print("You are already logged in.")
else:
gnl.login([hostname], no_local_server=True, no_browser=True, refresh_tokens=True, update_bdbag_keychain=True)
print("Login Successful")
Create a test catalog and get an instance of the DerivaML class. Use options so that we create some initial datasets and features. Use the exploration API to find out what features and datasets we have.
test_catalog = create_demo_catalog(hostname, domain_schema, create_features=True, create_datasets=True)
ml_instance = DemoML(hostname, test_catalog.catalog_id)
print(f'Creating catalog at {ml_instance.catalog_id}')
display(
Markdown('## Datasets'),
pd.DataFrame(ml_instance.find_datasets()).drop(columns=DerivaSystemColumns),
Markdown('## Features'),
[f'{f.target_table.name}:{f.feature_name}' for f in ml_instance.find_features("Subject")],
[f'{f.target_table.name}:{f.feature_name}' for f in ml_instance.find_features("Image")]
)
datasets = pd.DataFrame(ml_instance.find_datasets()).drop(columns=DerivaSystemColumns)
training_dataset_rid = [ds['RID'] for ds in ml_instance.find_datasets() if 'Training' in ds['Dataset_Type']][0]
testing_dataset_rid = [ds['RID'] for ds in ml_instance.find_datasets() if 'Testing' in ds['Dataset_Type']][0]
display(
Markdown(f'Training Dataset: {training_dataset_rid}'),
Markdown('## Datasets'),
datasets)
Initializing the environment for an execution¶
In DerivaML, the catalog is the source of record for all of the data created and used by a machine learning experiment. While we can use the Deriva API to interact directly with the catalog, DerivaML provides a much simpler way of retrieving and adding data to a catalog.
The core concept in this process is an execution. An execution can be the process of training a model, of executing a model, for running analysis scripts, or even a manual operation. Every execution in DerivaML is uniquely identified by a resource identifier (RID).
The steps involved in creating and using an execution are:
- Create an Execution configuration object that identifies the inputs, and code for the execution.
- Create a workflow object to represent the code/operation that you will perform
- Create an execution instance, which will download all of the required inputs from the catalog Locate the input files using methods in the execution instance
- Perform your computation, placing output files in locations provided by the execution instance methods
- Upload the results of the computation using the execution instance methods. This will upload all of your files and tag them with the execution RID so you know how they were generated. In addition, and new tabular data in CSV format will be uploaded to corrisponding tables in the catalog.
Creating an ExectutionConfiguration
¶
An execution can be described by the datasets and files that it needs, the code that it runs, and the resulting files that it creates. This information is captured in an ExecutionConfiguration object:
class ExecutionConfiguration:
"""
Define the parameters that are used to configure a specific execution.
Arguments:
datasets: List of dataset RIDS, MINIDS for datasets to be downloaded prior to execution. By default,
all the datasets are materialized. However, if the assets associated with a dataset are not
needed, a dictionary that defines the rid and the materialization parameter for the
download_dataset_bag method can be specified, e.g. datasets=[{'rid': RID, 'materialize': True}].
assets: List of assets to be downloaded prior to execution. The values must be RIDs in an asset table
workflow: A workflow instance. Must have a name, URI to the workflow instance, and a type.
description: A description of the execution. Can use markdown format.
Creating a Workflow
¶
The actual code that is being run is represented by a Workflow
class. A workflow class is intended to be quite general and could be a Python script, a Jupyter notebook, a manual process, or even a Airflow or some other type of workflow system. In order to create a workflow class instance, we will need to have a name for the workflow, a URI to name the resource that the workflow is capturing, and a workflow type.
The url for the workflow will depend on what the workflow is actually doing. In general, its a good idea to make the URL a reference to a tagged code or repository in GitHub. This will require some disiplane on your process to ensure that you always have workflows that are commited and tagged in a repo.
The workflow type is a controlled vocabulary. You can create new workflow types using the standard APIs for adding terms.
ml_instance.add_term(MLVocab.workflow_type, "Execution Notebook", description="Notebook for demonstrating executions")
ml_instance.add_term(MLVocab.asset_type, "API_Model", description="Model for our API workflow")
api_workflow = ml_instance.create_workflow(
name="Execution Notebook Workflow",
workflow_type="Execution Notebook",
description="Demonstration notebook"
)
notebook_execution = ml_instance.create_execution(ExecutionConfiguration( description="Sample Execution", workflow=api_workflow))
# Now lets create model configuration for our program.
model_file = notebook_execution.asset_file_path("Execution_Asset",'modelfile.txt', asset_types="API_Model")
with builtins.open(model_file, "w") as fp:
fp.write(f"My model")
# Now upload the file and retrieve the RID of the new asset from the returned results.
uploaded_assets = notebook_execution.upload_execution_outputs()
training_model_rid = [a.asset_rid for a in uploaded_assets['deriva-ml/Execution_Asset'] if 'API_Model' in a.asset_types][0]
display(
Markdown(f'## Training Model: {training_model_rid}'),
JSON(ml_instance.retrieve_rid(training_model_rid))
)
Setup for a ML run¶
ml_instance.add_term(MLVocab.workflow_type, "ML Demo", description="A ML Workflow that uses Deriva ML API")
config = ExecutionConfiguration(
assets = [training_model_rid],
description="Notebook ML Execution",
workflow=api_workflow,
datasets=[DatasetSpec(rid=training_dataset_rid, version=ml_instance.dataset_version(training_dataset_rid)),
DatasetSpec(rid=testing_dataset_rid, version=ml_instance.dataset_version(training_dataset_rid), materialize=False)],
)
ml_execution = ml_instance.create_execution(config)
ml_execution.asset_paths
with ml_execution.execute() as deriva_exec:
# Get the input datasets:
training_dataset = ml_execution.datasets[0] # Input dataset
image_rids = training_dataset.get_table_as_dataframe('Image')['RID']
# Get input files
with open(ml_execution.asset_paths[0], 'rt') as model_file:
training_model = model_file.read()
print(f'Got model file: {training_model}')
# Put your ML code here....
pass
# Write a new model
model_file = ml_execution.asset_path('API_Model', 'modelfile.txt')
with open(model_file, 'w') as f:
f.write("Hello there a new model;\n")
# Create some new feature values.
bb_csv_path, bb_asset_paths = ml_execution.execution_asset_path('BoundingBox')
bounding_box_files = [bb_asset_paths['BoundingBox'] / f"box{i}.txt" for i in range(10)]
for i in range(10):
bounding_box_files.append(fn := bb_asset_paths['BoundingBox'] / f"box{i}.txt")
with builtins.open(fn, "w") as fp:
fp.write(f"Hi there {i}")
ImageBoundingboxFeature = ml_instance.feature_record_class("Image", "BoundingBox")
image_bounding_box_feature_list = [ImageBoundingboxFeature(Image=image_rid,
Execution=ml_execution.execution_rid,
BoundingBox=asset_rid)
for image_rid, asset_rid in zip(image_rids, itertools.cycle(bounding_box_files))]
ml_execution.add_features(image_bounding_box_feature_list)
upload_status = ml_execution.upload_execution_outputs()
Now lets check the assets produced by this execution to make sure that they are what we expect.
# Get datapath to the ML schema.
schema_path = ml_instance.pathBuilder.schemas[ml_instance.ml_schema]
# Now get path to the execution table, and get our execution record. We filter on the RID for the
# execution we are looking for.
executions = schema_path.Execution.filter(schema_path.Execution.RID == ml_execution.execution_rid)
execution_info = list(executions.entities().fetch())[0]
# To get the assets for the execution, we need to go through the linking table to the assets.
asset_path = executions.link(schema_path.Execution_Asset_Execution).link(schema_path.Execution_Asset)
pd.DataFrame(asset_path.entities().fetch()).drop(columns=DerivaSystemColumns + ['MD5'])
# Now lets display our results.
display(
Markdown(f'### Execution: {ml_execution.execution_rid}'),
JSON(execution_info),
Markdown(f'### Execution Assets'),
pd.DataFrame(asset_path.entities().fetch()).drop(columns=DerivaSystemColumns + ['MD5']),
)
test_catalog.delete_ermrest_catalog(really=True)