Data Pipelines

Data pipelines are usually a series of data transformations on datasets.

Transform

Hub Transform provides a functionality to modify the samples of the dataset or create a new dataset from the existing one. To apply these modifications user needs to add a @hub.transform decorator to any custom generator function.

Examples

Basic transform pipeline creation:

my_schema = {
    "image": Tensor((28, 28, 4), "int32", (28, 28, 4)),
    "label": "<U20",
    "confidence": "float",
}

ds = hub.Dataset(
   "./data/test/test_pipeline_basic", mode="w", shape=(100,), schema=my_schema
)

for i in range(len(ds)):
   ds["image", i] = np.ones((28, 28, 4), dtype="int32")
   ds["label", i] = f"hello {i}"
   ds["confidence", i] = 0.2

@hub.transform(schema=my_schema)
def my_transform(sample, multiplier: int = 2):
   return {
      "image": sample["image"].compute() * multiplier,
      "label": sample["label"].compute(),
      "confidence": sample["confidence"].compute() * multiplier
   }

out_ds = my_transform(ds, multiplier=2)
res_ds = out_ds.store("./data/test/test_pipeline_basic_output")

Transormation function can return either a dictionary that corresponds to the provided schema or a list of such dictionaries. In that case the number of samples in the final dataset will be equal to the number of all the returned dictionaries:

dynamic_schema = {
    "image": Tensor(shape=(None, None, None), dtype="int32", max_shape=(32, 32, 3)),
    "label": "<U20",
}

ds = hub.Dataset(
        "./data/test/test_pipeline_dynamic3", mode="w", shape=(1,), schema=dynamic_schema, cache=False
    )
    
ds["image", 0] = np.ones((30, 32, 3))

@hub.transform(schema=dynamic_schema, scheduler="threaded", nodes=8)
def dynamic_transform(sample, multiplier: int = 2):
   return [{
      "image": sample["image"].compute() * multiplier,
      "label": sample["label"].compute(),
   } for i in range(4)]

out_ds = dynamic_transform(ds, multiplier=4).store("./data/test/test_pipeline_dynamic_output2")

You can use transform with multuple processes by adding scheduler and nodes arguments:


my_schema = {
   "image": Tensor((width, width, channels), dtype, (width, width, channels), chunks=(sample_size // 20, width, width, channels)),
}

@hub.transform(schema=my_schema, scheduler="processed", nodes=2)
def my_transform(x):

   a = np.random.random((width, width, channels))
   for i in range(10):
         a *= np.random.random((width, width, channels))

   return {
         "image": (np.ones((width, width, channels), dtype=dtype) * 255),
   }

ds = hub.Dataset(
   "./data/test/test_pipeline_basic_4", mode="w", shape=(sample_size,), schema=my_schema, cache=0
)

ds_t = my_transform(ds).store("./data/test/test_pipeline_basic_4")

Ray Transform

There is also an option of using ray as a scheduler. In this case RayTransform will be applied to samples.

ds = hub.Dataset(
   "./data/ray/ray_pipeline_basic",
   mode="w",
   shape=(100,),
   schema=my_schema,
   cache=False,
)

for i in range(len(ds)):
   ds["image", i] = np.ones((28, 28, 4), dtype="int32")
   ds["label", i] = f"hello {i}"
   ds["confidence/confidence", i] = 0.2

@hub.transform(schema=my_schema, scheduler="ray")
def my_transform(sample, multiplier: int = 2):
   return {
      "image": sample["image"].compute() * multiplier,
      "label": sample["label"].compute(),
      "confidence": {
            "confidence": sample["confidence/confidence"].compute() * multiplier
      },
   }

out_ds = my_transform(ds, multiplier=2)

API

hub.compute.transform(schema, scheduler='single', workers=1)

Transform is a decorator of a function. The function should output a dictionary per sample

Parameters
schema: Schema

The output format of the transformed dataset

scheduler: str

“single” - for single threaded, “threaded” using multiple threads, “processed”, “ray” scheduler, “dask” scheduler

workers: int

how many workers will be started for the process

class hub.compute.transform.Transform(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)
__getitem__(slice_)

Get an item to be computed without iterating on the whole dataset Creates a dataset view, then a temporary dataset to apply the transform

slice_: slice

Gets a slice or slices from dataset

__init__(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)

Transform applies a user defined function to each sample in single threaded manner

Parameters
  • func (function) – user defined function func(x, **kwargs)

  • schema (dict of dtypes) – the structure of the final dataset that will be created

  • ds (Iterative) – input dataset or a list that can be iterated

  • scheduler (str) – choice between “single”, “threaded”, “processed”

  • workers (int) – how many threads or processes to use

  • **kwargs – additional arguments that will be passed to func as static argument for all samples

__weakref__

list of weak references to the object (if defined)

classmethod _flatten(items, schema)

Takes a dictionary or list of dictionary Returns a dictionary of concatenated values Dictionary follows schema

classmethod _flatten_dict(d: Dict, parent_key='', schema=None)

Helper function to flatten dictionary of a recursive tensor

Parameters

d (dict) –

_pbar(show: bool = True)

Returns a progress bar, if empty then it function does nothing

_split_list_to_dicts(xs)

Helper function that transform list of dicts into dicts of lists

Parameters

xs (list of dicts) –

Returns

xs_new

Return type

dicts of lists

classmethod _unwrap(results)

If there is any list then unwrap it into its elements

create_dataset(url, length=None, token=None)

Helper function to creat a dataset

classmethod dtype_from_path(path, schema)

Helper function to get the dtype from the path

store(url: str, token: dict = None, length: int = None, ds: Iterable = None, progressbar: bool = True, sample_per_shard=None)

The function to apply the transformation for each element in batchified manner

Parameters
  • url (str) – path where the data is going to be stored

  • token (str or dict, optional) – If url is refering to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict

  • length (int) – in case shape is None, user can provide length

  • ds (Iterable) –

  • progressbar (bool) – Show progress bar

  • sample_per_shard (int) – How to split the iterator not to overfill RAM

Returns

ds – uploaded dataset

Return type

hub.Dataset

store_shard(ds_in: Iterable, ds_out: hub.api.dataset.Dataset, offset: int, token=None)

Takes a shard of iteratable ds_in, compute and stores in DatasetView

upload(results, ds: hub.api.dataset.Dataset, token: dict, progressbar: bool = True)

Batchified upload of results For each tensor batchify based on its chunk and upload If tensor is dynamic then still upload element by element For dynamic tensors, it disable dynamicness and then enables it back

Parameters
  • dataset (hub.Dataset) – Dataset object that should be written to

  • results – Output of transform function

  • progressbar (bool) –

Returns

ds – Uploaded dataset

Return type

hub.Dataset

class hub.compute.ray.RayTransform(func, schema, ds, scheduler='ray', workers=1, **kwargs)
__init__(func, schema, ds, scheduler='ray', workers=1, **kwargs)

Transform applies a user defined function to each sample in single threaded manner

Parameters
  • func (function) – user defined function func(x, **kwargs)

  • schema (dict of dtypes) – the structure of the final dataset that will be created

  • ds (Iterative) – input dataset or a list that can be iterated

  • scheduler (str) – choice between “single”, “threaded”, “processed”

  • workers (int) – how many threads or processes to use

  • **kwargs – additional arguments that will be passed to func as static argument for all samples

store(url: str, token: dict = None, length: int = None, ds: Iterable = None, progressbar: bool = True)

The function to apply the transformation for each element in batchified manner

Parameters
  • url (str) – path where the data is going to be stored

  • token (str or dict, optional) – If url is refering to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict

  • length (int) – in case shape is None, user can provide length

  • ds (Iterable) –

  • progressbar (bool) – Show progress bar

Returns

ds – uploaded dataset

Return type

hub.Dataset

upload(results, url: str, token: dict, progressbar: bool = True)

Batchified upload of results For each tensor batchify based on its chunk and upload If tensor is dynamic then still upload element by element

Parameters
  • dataset (hub.Dataset) – Dataset object that should be written to

  • results – Output of transform function

  • progressbar (bool) –

Returns

ds – Uploaded dataset

Return type

hub.Dataset