Skip to content

Upload

This module provides functions that help structure local directories for uploading to a DerivaML catalog, and generating an upload specification for those directories.

Here is the directory layout we support:

deriva-ml/ execution execution-asset file1, file2, .... <- Need to update execution_asset association table. execution-metadata feature asset file1, file2, ... .jsonl <- needs to have asset_name column remapped before uploading table record_table.csv asset file1, file2, .... asset-type file1.jsonl, file2.jsonl

asset_file_path

asset_file_path(
    prefix: Path | str,
    exec_rid: RID,
    asset_table: Table,
    file_name: str,
    metadata: dict[str, Any],
) -> Path

Return the file in which to place assets of a specified type are to be uploaded.

Parameters:

Name Type Description Default
prefix Path | str

Path prefix to use.

required
exec_rid RID

RID to use.

required
asset_table Table

Table in which to place assets.

required
file_name str

File name to use.

required
metadata dict[str, Any]

Any additional metadata to add to the asset

required

Returns: Path to directory in which to place assets of type asset_type.

Source code in src/deriva_ml/dataset/upload.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def asset_file_path(
    prefix: Path | str,
    exec_rid: RID,
    asset_table: Table,
    file_name: str,
    metadata: dict[str, Any],
) -> Path:
    """Return the file in which to place  assets of a specified type are to be uploaded.

    Args:
        prefix: Path prefix to use.
        exec_rid: RID to use.
        asset_table: Table in which to place assets.
        file_name: File name to use.
        metadata: Any additional metadata to add to the asset
    Returns:
        Path to directory in which to place assets of type asset_type.
    """
    schema = asset_table.schema.name
    asset_name = asset_table.name

    path = execution_root(prefix, exec_rid) / "asset" / schema / asset_name
    metadata = metadata or {}
    asset_columns = {
        "Filename",
        "URL",
        "Length",
        "MD5",
        "Description",
    }.union(set(DerivaSystemColumns))
    asset_metadata = {c.name for c in asset_table.columns} - asset_columns
    if not (asset_metadata >= set(metadata.keys())):
        raise DerivaMLException(f"Metadata {metadata} does not match asset metadata {asset_metadata}")

    for m in asset_metadata:
        path = path / metadata.get(m, "None")
    path.mkdir(parents=True, exist_ok=True)
    return path / file_name

asset_root

asset_root(
    prefix: Path | str, exec_rid: str
) -> Path

Return the path to the directory in which features for the specified execution should be placed.

Source code in src/deriva_ml/dataset/upload.py
134
135
136
137
138
def asset_root(prefix: Path | str, exec_rid: str) -> Path:
    """Return the path to the directory in which features for the specified execution should be placed."""
    path = execution_root(prefix, exec_rid) / "asset"
    path.mkdir(parents=True, exist_ok=True)
    return path

asset_table_upload_spec

asset_table_upload_spec(
    model: DerivaModel,
    asset_table: str | Table,
)

Generate upload specification for an asset table.

Parameters:

Name Type Description Default
model DerivaModel

The DerivaModel instance.

required
asset_table str | Table

The asset table name or Table object.

required

Returns:

Type Description

A dictionary containing the upload specification for the asset table.

Source code in src/deriva_ml/dataset/upload.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def asset_table_upload_spec(model: DerivaModel, asset_table: str | Table):
    """Generate upload specification for an asset table.

    Args:
        model: The DerivaModel instance.
        asset_table: The asset table name or Table object.

    Returns:
        A dictionary containing the upload specification for the asset table.
    """
    metadata_columns = model.asset_metadata(asset_table)
    asset_table = model.name_to_table(asset_table)
    schema = model.name_to_table(asset_table).schema.name
    metadata_path = "/".join([rf"(?P<{c}>[-\w]+)" for c in metadata_columns])
    asset_path = f"{exec_dir_regex}/asset/{schema}/{asset_table.name}/{metadata_path}/{asset_file_regex}"
    asset_table = model.name_to_table(asset_table)
    schema = model.name_to_table(asset_table).schema.name

    # Create upload specification
    spec = {
        # Upload assets into an asset table of an asset table.
        "column_map": {
            "MD5": "{md5}",
            "URL": "{URI}",
            "Length": "{file_size}",
            "Filename": "{file_name}",
        }
        | {c: f"{{{c}}}" for c in metadata_columns},
        "file_pattern": asset_path,  # Sets schema, asset_table, file
        "asset_type": "file",
        "target_table": [schema, asset_table.name],
        "checksum_types": ["sha256", "md5"],
        "hatrac_options": {"versioned_urls": True},
        "hatrac_templates": {
            "hatrac_uri": f"/hatrac/{asset_table.name}/{{md5}}.{{file_name}}",
            "content-disposition": "filename*=UTF-8''{file_name}",
        },
        "record_query_template": "/entity/{target_table}/MD5={md5}&Filename={file_name}",
    }
    return spec

asset_type_path

asset_type_path(
    prefix: Path | str,
    exec_rid: RID,
    asset_table: Table,
) -> Path

Return the path to a JSON line file in which to place asset_type information.

Parameters:

Name Type Description Default
prefix Path | str

Location of upload root directory

required
exec_rid RID

Execution RID

required
asset_table Table

Table in which to place assets.

required

Returns:

Type Description
Path

Path to the file in which to place asset_type values for the named asset.

Source code in src/deriva_ml/dataset/upload.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
def asset_type_path(prefix: Path | str, exec_rid: RID, asset_table: Table) -> Path:
    """Return the path to a JSON line file in which to place asset_type information.

    Args:
        prefix: Location of upload root directory
        exec_rid: Execution RID
        asset_table: Table in which to place assets.

    Returns:
        Path to the file in which to place asset_type values for the named asset.
    """
    path = execution_root(prefix, exec_rid=exec_rid) / "asset-type" / asset_table.schema.name
    path.mkdir(parents=True, exist_ok=True)
    return path / f"{asset_table.name}.jsonl"

bulk_upload_configuration

bulk_upload_configuration(
    model: DerivaModel,
) -> dict[str, Any]

Return an upload specification for deriva-ml Arguments: model: Model from which to generate the upload configuration

Source code in src/deriva_ml/dataset/upload.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def bulk_upload_configuration(model: DerivaModel) -> dict[str, Any]:
    """Return an upload specification for deriva-ml
    Arguments:
        model: Model from which to generate the upload configuration
    """
    asset_tables_with_metadata = [
        asset_table_upload_spec(model=model, asset_table=t) for t in model.find_assets() if model.asset_metadata(t)
    ]
    return {
        "asset_mappings": asset_tables_with_metadata
        + [
            {
                # Upload assets into an asset table of an asset table without any metadata
                "column_map": {
                    "MD5": "{md5}",
                    "URL": "{URI}",
                    "Length": "{file_size}",
                    "Filename": "{file_name}",
                },
                "asset_type": "file",
                "target_table": ["{schema}", "{asset_table}"],
                "file_pattern": asset_path_regex + "/" + asset_file_regex,  # Sets schema, asset_table, name, ext
                "checksum_types": ["sha256", "md5"],
                "hatrac_options": {"versioned_urls": True},
                "hatrac_templates": {
                    "hatrac_uri": "/hatrac/{asset_table}/{md5}.{file_name}",
                    "content-disposition": "filename*=UTF-8''{file_name}",
                },
                "record_query_template": "/entity/{target_table}/MD5={md5}&Filename={file_name}",
            },
            # {
            #  Upload the records into a  table
            #   "asset_type": "skip",
            ##   "default_columns": ["RID", "RCB", "RMB", "RCT", "RMT"],
            #  "file_pattern": feature_value_regex,  # Sets schema, table,
            #  "ext_pattern": "^.*[.](?P<file_ext>json|csv)$",
            #  "target_table": ["{schema}", "{table}"],
            # },
            {
                #  Upload the records into a  table
                "asset_type": "table",
                "default_columns": ["RID", "RCB", "RMB", "RCT", "RMT"],
                "file_pattern": table_regex,  # Sets schema, table,
                "ext_pattern": "^.*[.](?P<file_ext>json|csv)$",
                "target_table": ["{schema}", "{table}"],
            },
        ],
        "version_update_url": "https://github.com/informatics-isi-edu/deriva-client",
        "version_compatibility": [[">=1.4.0", "<2.0.0"]],
    }

execution_rids

execution_rids(
    prefix: Path | str,
) -> list[RID]

Return a list of all the execution RIDS that have files waiting to be uploaded.

Source code in src/deriva_ml/dataset/upload.py
114
115
116
117
def execution_rids(prefix: Path | str) -> list[RID]:
    """Return a list of all the execution RIDS that have files waiting to be uploaded."""
    path = upload_root(prefix) / "execution"
    return [d.name for d in path.iterdir()]

execution_root

execution_root(
    prefix: Path | str, exec_rid
) -> Path

Path to directory to place execution specific upload files.

Source code in src/deriva_ml/dataset/upload.py
120
121
122
123
124
def execution_root(prefix: Path | str, exec_rid) -> Path:
    """Path to directory to place execution specific upload files."""
    path = upload_root(prefix) / "execution" / exec_rid
    path.mkdir(exist_ok=True, parents=True)
    return path

feature_dir

feature_dir(
    prefix: Path | str,
    exec_rid: str,
    schema: str,
    target_table: str,
    feature_name: str,
) -> Path

Return the path to eht directory in which a named feature for an execution should be placed.

Source code in src/deriva_ml/dataset/upload.py
141
142
143
144
145
def feature_dir(prefix: Path | str, exec_rid: str, schema: str, target_table: str, feature_name: str) -> Path:
    """Return the path to eht directory in which a named feature for an execution should be placed."""
    path = feature_root(prefix, exec_rid) / schema / target_table / feature_name
    path.mkdir(parents=True, exist_ok=True)
    return path

feature_root

feature_root(
    prefix: Path | str, exec_rid: str
) -> Path

Return the path to the directory in which features for the specified execution should be placed.

Source code in src/deriva_ml/dataset/upload.py
127
128
129
130
131
def feature_root(prefix: Path | str, exec_rid: str) -> Path:
    """Return the path to the directory in which features for the specified execution should be placed."""
    path = execution_root(prefix, exec_rid) / "feature"
    path.mkdir(parents=True, exist_ok=True)
    return path

feature_value_path

feature_value_path(
    prefix: Path | str,
    exec_rid: str,
    schema: str,
    target_table: str,
    feature_name: str,
) -> Path

Return the path to a CSV file in which to place feature values that are to be uploaded.

Parameters:

Name Type Description Default
prefix Path | str

Location of upload root directory

required
exec_rid str

RID of the execution to be associated with this feature.

required
schema str

Domain schema name

required
target_table str

Target table name for the feature.

required
feature_name str

Name of the feature.

required

Returns:

Type Description
Path

Path to CSV file in which to place feature values

Source code in src/deriva_ml/dataset/upload.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def feature_value_path(prefix: Path | str, exec_rid: str, schema: str, target_table: str, feature_name: str) -> Path:
    """Return the path to a CSV file in which to place feature values that are to be uploaded.

    Args:
        prefix: Location of upload root directory
        exec_rid: RID of the execution to be associated with this feature.
        schema: Domain schema name
        target_table: Target table name for the feature.
        feature_name: Name of the feature.

    Returns:
        Path to CSV file in which to place feature values
    """
    return feature_dir(prefix, exec_rid, schema, target_table, feature_name) / f"{feature_name}.jsonl"

is_feature_dir

is_feature_dir(
    path: Path,
) -> Optional[re.Match]

Path matches the pattern for where the table for a feature would go.

Source code in src/deriva_ml/dataset/upload.py
87
88
89
def is_feature_dir(path: Path) -> Optional[re.Match]:
    """Path matches the pattern for where the table for a feature would go."""
    return re.match(feature_table_dir_regex + "$", path.as_posix())

normalize_asset_dir

normalize_asset_dir(
    path: str,
) -> Optional[tuple[str, str]]

Parse a path to an asset file and return the asset table name and file name.

Parameters:

Name Type Description Default
path str

Path to the asset file

required

Returns:

Type Description
Optional[tuple[str, str]]

Tuple of (schema/table, filename) or None if path doesn't match pattern

Source code in src/deriva_ml/dataset/upload.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def normalize_asset_dir(path: str) -> Optional[tuple[str, str]]:
    """Parse a path to an asset file and return the asset table name and file name.

    Args:
        path: Path to the asset file

    Returns:
        Tuple of (schema/table, filename) or None if path doesn't match pattern
    """
    path = Path(path)
    if not (m := re.match(asset_path_regex, str(path))):
        return None
    return f"{m['schema']}/{m['asset_table']}", path.name

table_path

table_path(
    prefix: Path | str,
    schema: str,
    table: str,
) -> Path

Return the path to a CSV file in which to place table values that are to be uploaded.

Parameters:

Name Type Description Default
prefix Path | str

Location of upload root directory

required
schema str

Domain schema

required
table str

Name of the table to be uploaded.

required

Returns:

Type Description
Path

Path to the file in which to place table values that are to be uploaded.

Source code in src/deriva_ml/dataset/upload.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def table_path(prefix: Path | str, schema: str, table: str) -> Path:
    """Return the path to a CSV file in which to place table values that are to be uploaded.

    Args:
        prefix: Location of upload root directory
        schema: Domain schema
        table: Name of the table to be uploaded.

    Returns:
        Path to the file in which to place table values that are to be uploaded.
    """
    path = upload_root(prefix) / "table" / schema / table
    path.mkdir(parents=True, exist_ok=True)
    return path / f"{table}.csv"

upload_asset

upload_asset(
    model: DerivaModel,
    file: Path | str,
    table: Table,
    **kwargs: Any,
) -> dict

Upload the specified file into Hatrac and update the associated asset table.

Parameters:

Name Type Description Default
file Path | str

path to the file to upload.

required
table Table

Name of the asset table

required
model DerivaModel

Model to upload assets to.

required
kwargs Any

Keyword arguments for values of additional columns to be added to the asset table.

{}

Returns:

Source code in src/deriva_ml/dataset/upload.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def upload_asset(model: DerivaModel, file: Path | str, table: Table, **kwargs: Any) -> dict:
    """Upload the specified file into Hatrac and update the associated asset table.

    Args:
        file: path to the file to upload.
        table: Name of the asset table
        model: Model to upload assets to.
        kwargs: Keyword arguments for values of additional columns to be added to the asset table.

    Returns:

    """
    if not model.is_asset(table):
        raise DerivaMLException(f"Table {table} is not an asset table.")

    file_path = Path(file)
    file_name = file_path.name
    file_size = file_path.stat().st_size

    hatrac_path = f"/hatrac/{table.name}/"
    hs = HatracStore(
        "https",
        server=model.catalog.deriva_server.server,
        credentials=model.catalog.deriva_server.credentials,
    )
    md5_hashes = hash_utils.compute_file_hashes(file, ["md5"])["md5"]
    sanitized_filename = urlquote(re.sub("[^a-zA-Z0-9_.-]", "_", md5_hashes[0] + "." + file_name))
    hatrac_path = f"{hatrac_path}{sanitized_filename}"

    try:
        # Upload the file to hatrac.
        hatrac_uri = hs.put_obj(
            hatrac_path,
            file,
            md5=md5_hashes[1],
            content_type=mime_utils.guess_content_type(file),
            content_disposition="filename*=UTF-8''" + file_name,
        )
    except Exception as e:
        raise e
    try:
        # Now update the asset table.
        ipath = model.catalog.getPathBuilder().schemas[table.schema.name].tables[table.name]
        return list(
            ipath.insert(
                [
                    {
                        "URL": hatrac_uri,
                        "Filename": file_name,
                        "Length": file_size,
                        "MD5": md5_hashes[0],
                    }
                    | kwargs
                ]
            )
        )[0]
    except Exception as e:
        raise e

upload_directory

upload_directory(
    model: DerivaModel,
    directory: Path | str,
) -> dict[Any, FileUploadState] | None

Upload assets from a directory. This routine assumes that the current upload specification includes a configuration for the specified directory. Every asset in the specified directory is uploaded

Parameters:

Name Type Description Default
model DerivaModel

Model to upload assets to.

required
directory Path | str

Directory containing the assets and tables to upload.

required

Returns:

Type Description
dict[Any, FileUploadState] | None

Results of the upload operation.

Raises:

Type Description
DerivaMLException

If there is an issue with uploading the assets.

Source code in src/deriva_ml/dataset/upload.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def upload_directory(model: DerivaModel, directory: Path | str) -> dict[Any, FileUploadState] | None:
    """Upload assets from a directory. This routine assumes that the current upload specification includes a
    configuration for the specified directory.  Every asset in the specified directory is uploaded

    Args:
        model: Model to upload assets to.
        directory: Directory containing the assets and tables to upload.

    Returns:
        Results of the upload operation.

    Raises:
        DerivaMLException: If there is an issue with uploading the assets.
    """
    directory = Path(directory)
    if not directory.is_dir():
        raise DerivaMLException("Directory does not exist")

    # Now upload the files by creating an upload spec and then calling the uploader.
    with TemporaryDirectory() as temp_dir:
        spec_file = Path(temp_dir) / "config.json"

        with spec_file.open("w+") as cfile:
            json.dump(bulk_upload_configuration(model), cfile)
        uploader = GenericUploader(
            server={
                "host": model.hostname,
                "protocol": "https",
                "catalog_id": model.catalog.catalog_id,
            },
            config_file=spec_file,
        )
        try:
            uploader.getUpdatedConfig()
            uploader.scanDirectory(directory)
            results = {
                path: FileUploadState(
                    state=UploadState(result["State"]),
                    status=result["Status"],
                    result=result["Result"],
                )
                for path, result in uploader.uploadFiles().items()
            }
        finally:
            uploader.cleanup()
        return results

upload_root

upload_root(prefix: Path | str) -> Path

Return the top level directory of where to put files to be uploaded.

Source code in src/deriva_ml/dataset/upload.py
107
108
109
110
111
def upload_root(prefix: Path | str) -> Path:
    """Return the top level directory of where to put files to be uploaded."""
    path = Path(prefix) / "deriva-ml"
    path.mkdir(exist_ok=True, parents=True)
    return path