Data Pipelines

Data pipelines are a series of data transformations on datasets.

Transform

Hub Transform provides a functionality to modify the samples of the dataset or create a new dataset from the existing one. To apply these modifications user needs to add a @hub.transform decorator to any custom function. User defined transform function is applied to every sample on the input. It takes in an iterator or a dataset, and output another dataset with specified schema.

There are optimizations done behind the scenes to efficiently process and store the data.

How to upload a dataset with @hub.Transform

Define the desired schema

import hub
from hub.schema import Tensor, Text
import numpy as np

my_schema = {
    "image": Tensor((28, 28, 4), "int32", (28, 28, 4)),
    "label": Text(shape=(None,), max_shape=(20,)),
    "confidence": "float",
}
tag = "./data/test/test_pipeline_basic"

Before

ds = hub.Dataset(
   tag, mode="w+", shape=(100,), schema=my_schema
)

for i in range(len(ds)):
   ds["image", i] = np.ones((28, 28, 4), dtype="int32")
   ds["label", i] = f"hello {i}"
   ds["confidence", i] = 0.2
   
assert ds["confidence"][0].compute() == 0.2

After

@hub.transform(schema=my_schema)
def my_transform(index):
    return {
        "image": np.ones((28, 28, 4), dtype="int32"),
        "label": f"hello {index}",
        "confidence": 0.2,
    }


ds = my_transform(range(100))
ds = ds.store(tag)

assert ds["confidence"][0].compute() == 0.2

# Access the dataset later
ds = hub.Dataset(tag)
assert ds["confidence"][0].compute() == 0.2

Adding arguments

@hub.transform(schema=my_schema)
def my_transform(index, multiplier):
    return {
        "image": np.ones((28, 28, 4), dtype="int32"),
        "label": f"hello {index}",
        "confidence": 0.2 * multiplier,
    }


ds = my_transform(range(100), multiplier=10)
ds = ds.store(tag)

assert ds["confidence"][0].compute() == 2.0

Stacking multiple transforms

@hub.transform(schema=my_schema)
def my_transform_1(index):
    return {
        "image": np.ones((28, 28, 4), dtype="int32"),
        "label": f"hello {index}",
        "confidence": 0.2,
    }


@hub.transform(schema=my_schema)
def my_transform_2(sample, multiplier: int = 2):
    return {
        "image": sample["image"].compute() * multiplier,
        "label": sample["label"].compute(),
        "confidence": sample["confidence"].compute() * multiplier,
    }


ds = my_transform_1(range(100))
ds = my_transform_2(ds, multiplier=10)
ds = ds.store(tag)

assert ds["confidence"][0].compute() == 2.0

Returning multiple elements

Transformation function can return either a dictionary that corresponds to the provided schema or a list of such dictionaries. In that case the number of samples in the final dataset will be equal to the number of all the returned dictionaries:

my_schema = {
    "image": Tensor(shape=(None, None, None), dtype="int32", max_shape=(32, 32, 3)),
    "label": Text(shape=(None,), max_shape=(20,)),
}

ds = hub.Dataset(
    "./data/test/test_pipeline_dynamic3",
    mode="w+",
    shape=(1,),
    schema=my_schema,
    cache=False,
)

ds["image", 0] = np.ones((30, 32, 3))


@hub.transform(schema=my_schema)
def dynamic_transform(sample, multiplier: int = 2):
    return [
        {
            "image": sample["image"].compute() * multiplier,
            "label": sample["label"].compute(),
        }
        for i in range(multiplier)
    ]


out_ds = dynamic_transform(ds, multiplier=4).store("./data/pipeline")
assert len(ds) == 1
assert len(out_ds) == 4

Local parallel execution

You can use transform with multuple processes or threads by setting scheduler to threaded or processed and set number of workers.


width = 256
channels = 3
dtype = "uint8"

my_schema = {"image": Image(shape=(width, width, channels), dtype=dtype)}


@hub.transform(schema=my_schema, scheduler="processed", workers=2)
def my_transform(x):

    a = np.random.random((width, width, channels))
    for i in range(10):
        a *= np.random.random((width, width, channels))

    return {
        "image": (np.ones((width, width, channels), dtype=dtype) * 255),
    }


ds_t = my_transform(range(100)).store("./data/pipeline")

Scaling compute to a cluster

[in development]

There is also an option of using ray as a scheduler. In this case RayTransform will be applied to samples.

ds = hub.Dataset(
   "./data/ray/ray_pipeline_basic",
   mode="w+",
   shape=(100,),
   schema=my_schema,
   cache=False,
)

for i in range(len(ds)):
   ds["image", i] = np.ones((28, 28, 4), dtype="int32")
   ds["label", i] = f"hello {i}"
   ds["confidence/confidence", i] = 0.2

@hub.transform(schema=my_schema, scheduler="ray")
def my_transform(sample, multiplier: int = 2):
   return {
      "image": sample["image"].compute() * multiplier,
      "label": sample["label"].compute(),
      "confidence": {
            "confidence": sample["confidence/confidence"].compute() * multiplier
      },
   }

out_ds = my_transform(ds, multiplier=2)

API

License: This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at https://mozilla.org/MPL/2.0/.

hub.compute.transform(schema, scheduler='single', workers=1)
Transform is a decorator of a function. The function should output a dictionary per sample.
schema: Schema

The output format of the transformed dataset

scheduler: str

“single” - for single threaded, “threaded” using multiple threads, “processed”, “ray” scheduler, “dask” scheduler

workers: int

how many workers will be started for the process

class hub.compute.transform.Transform(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)
__getitem__(slice_)
Get an item to be computed without iterating on the whole dataset.
Creates a dataset view, then a temporary dataset to apply the transform.
slice_: slice

Gets a slice or slices from dataset

__init__(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)
Transform applies a user defined function to each sample in single threaded manner.
Parameters
  • func (function) – user defined function func(x, **kwargs)

  • schema (dict of dtypes) – the structure of the final dataset that will be created

  • ds (Iterative) – input dataset or a list that can be iterated

  • scheduler (str) – choice between “single”, “threaded”, “processed”

  • workers (int) – how many threads or processes to use

  • **kwargs – additional arguments that will be passed to func as static argument for all samples

__weakref__

list of weak references to the object (if defined)

classmethod _flatten_dict(d: Dict, parent_key='', schema=None)
Helper function to flatten dictionary of a recursive tensor
Parameters

d (dict) –

_pbar(show: bool = True)

Returns a progress bar, if empty then it function does nothing

_split_list_to_dicts(xs)
Helper function that transform list of dicts into dicts of lists
Parameters

xs (list of dicts) –

Returns

xs_new

Return type

dicts of lists

classmethod _unwrap(results)

If there is any list then unwrap it into its elements

call_func(fn_index, item, as_list=False)

Calls all the functions one after the other

Parameters
  • fn_index (int) – The index starting from which the functions need to be called

  • item – The item on which functions need to be applied

  • as_list (bool, optional) – If true then treats the item as a list.

Returns

The final output obtained after all transforms

Return type

result

create_dataset(url: str, length: Optional[int] = None, token: Optional[dict] = None, public: bool = True)

Helper function to create a dataset

classmethod dtype_from_path(path, schema)

Helper function to get the dtype from the path

store(url: str, token: Optional[dict] = None, length: Optional[int] = None, ds: Optional[Iterable] = None, progressbar: bool = True, sample_per_shard: Optional[int] = None, public: bool = True)
The function to apply the transformation for each element in batchified manner
Parameters
  • url (str) – path where the data is going to be stored

  • token (str or dict, optional) – If url is referring to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict

  • length (int) – in case shape is None, user can provide length

  • ds (Iterable) –

  • progressbar (bool) – Show progress bar

  • sample_per_shard (int) – How to split the iterator not to overfill RAM

  • public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public

Returns

ds – uploaded dataset

Return type

hub.Dataset

store_shard(ds_in: Iterable, ds_out: hub.api.dataset.Dataset, offset: int, token=None)

Takes a shard of iteratable ds_in, compute and stores in DatasetView

upload(results, ds: hub.api.dataset.Dataset, token: dict, progressbar: bool = True)

Batchified upload of results. For each tensor batchify based on its chunk and upload. If tensor is dynamic then still upload element by element. For dynamic tensors, it disable dynamicness and then enables it back.

Parameters
  • dataset (hub.Dataset) – Dataset object that should be written to

  • results – Output of transform function

  • progressbar (bool) –

Returns

ds – Uploaded dataset

Return type

hub.Dataset

class hub.compute.ray.RayTransform(func, schema, ds, scheduler='ray', workers=1, **kwargs)
__init__(func, schema, ds, scheduler='ray', workers=1, **kwargs)
Transform applies a user defined function to each sample in single threaded manner.
Parameters
  • func (function) – user defined function func(x, **kwargs)

  • schema (dict of dtypes) – the structure of the final dataset that will be created

  • ds (Iterative) – input dataset or a list that can be iterated

  • scheduler (str) – choice between “single”, “threaded”, “processed”

  • workers (int) – how many threads or processes to use

  • **kwargs – additional arguments that will be passed to func as static argument for all samples

set_dynamic_shapes(results, ds)

Sets shapes for dynamic tensors after the dataset is uploaded

Parameters
  • results (Tuple) – results from uploading each chunk which includes (key, slice, shape) tuple

  • ds – Dataset to set the shapes to

store(url: str, token: Optional[dict] = None, length: Optional[int] = None, ds: Optional[Iterable] = None, progressbar: bool = True, public: bool = True)

The function to apply the transformation for each element in batchified manner

Parameters
  • url (str) – path where the data is going to be stored

  • token (str or dict, optional) – If url is refering to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict

  • length (int) – in case shape is None, user can provide length

  • ds (Iterable) –

  • progressbar (bool) – Show progress bar

  • public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public

Returns

ds – uploaded dataset

Return type

hub.Dataset

upload(results, url: str, token: dict, progressbar: bool = True, public: bool = True)

Batchified upload of results. For each tensor batchify based on its chunk and upload. If tensor is dynamic then still upload element by element.

Parameters
  • dataset (hub.Dataset) – Dataset object that should be written to

  • results – Output of transform function

  • progressbar (bool) –

  • public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public

Returns

ds – Uploaded dataset

Return type

hub.Dataset