Data Pipelines¶
Data pipelines are a series of data transformations on datasets.
Transform¶
Hub Transform provides a functionality to modify the samples of the dataset or create a new dataset from the existing one.
To apply these modifications user needs to add a @hub.transform
decorator to any custom function. User defined transform function is applied to every sample on the input. It takes in an iterator or a dataset, and output another dataset with specified schema.
There are optimizations done behind the scenes to efficiently process and store the data.
How to upload a dataset with @hub.Transform¶
Define the desired schema
import hub
from hub.schema import Tensor, Text
import numpy as np
my_schema = {
"image": Tensor((28, 28, 4), "int32", (28, 28, 4)),
"label": Text(shape=(None,), max_shape=(20,)),
"confidence": "float",
}
tag = "./data/test/test_pipeline_basic"
Before
ds = hub.Dataset(
tag, mode="w+", shape=(100,), schema=my_schema
)
for i in range(len(ds)):
ds["image", i] = np.ones((28, 28, 4), dtype="int32")
ds["label", i] = f"hello {i}"
ds["confidence", i] = 0.2
assert ds["confidence"][0].compute() == 0.2
After
@hub.transform(schema=my_schema)
def my_transform(index):
return {
"image": np.ones((28, 28, 4), dtype="int32"),
"label": f"hello {index}",
"confidence": 0.2,
}
ds = my_transform(range(100))
ds = ds.store(tag)
assert ds["confidence"][0].compute() == 0.2
# Access the dataset later
ds = hub.Dataset(tag)
assert ds["confidence"][0].compute() == 0.2
Adding arguments¶
@hub.transform(schema=my_schema)
def my_transform(index, multiplier):
return {
"image": np.ones((28, 28, 4), dtype="int32"),
"label": f"hello {index}",
"confidence": 0.2 * multiplier,
}
ds = my_transform(range(100), multiplier=10)
ds = ds.store(tag)
assert ds["confidence"][0].compute() == 2.0
Stacking multiple transforms¶
@hub.transform(schema=my_schema)
def my_transform_1(index):
return {
"image": np.ones((28, 28, 4), dtype="int32"),
"label": f"hello {index}",
"confidence": 0.2,
}
@hub.transform(schema=my_schema)
def my_transform_2(sample, multiplier: int = 2):
return {
"image": sample["image"].compute() * multiplier,
"label": sample["label"].compute(),
"confidence": sample["confidence"].compute() * multiplier,
}
ds = my_transform_1(range(100))
ds = my_transform_2(ds, multiplier=10)
ds = ds.store(tag)
assert ds["confidence"][0].compute() == 2.0
Returning multiple elements¶
Transormation function can return either a dictionary that corresponds to the provided schema or a list of such dictionaries. In that case the number of samples in the final dataset will be equal to the number of all the returned dictionaries:
my_schema = {
"image": Tensor(shape=(None, None, None), dtype="int32", max_shape=(32, 32, 3)),
"label": Text(shape=(None,), max_shape=(20,)),
}
ds = hub.Dataset(
"./data/test/test_pipeline_dynamic3",
mode="w+",
shape=(1,),
schema=my_schema,
cache=False,
)
ds["image", 0] = np.ones((30, 32, 3))
@hub.transform(schema=my_schema)
def dynamic_transform(sample, multiplier: int = 2):
return [
{
"image": sample["image"].compute() * multiplier,
"label": sample["label"].compute(),
}
for i in range(multiplier)
]
out_ds = dynamic_transform(ds, multiplier=4).store("./data/pipeline")
assert len(ds) == 1
assert len(out_ds) == 4
Local parrarel execution¶
You can use transform with multuple processes or threads by setting scheduler
to threaded
or processed
and set number of workers
.
width = 256
channels = 3
dtype = "uint8"
my_schema = {"image": Image(shape=(width, width, channels), dtype=dtype)}
@hub.transform(schema=my_schema, scheduler="processed", workers=2)
def my_transform(x):
a = np.random.random((width, width, channels))
for i in range(10):
a *= np.random.random((width, width, channels))
return {
"image": (np.ones((width, width, channels), dtype=dtype) * 255),
}
ds_t = my_transform(range(100)).store("./data/pipeline")
Scaling compute to a cluster¶
[in development]
There is also an option of using ray
as a scheduler. In this case RayTransform
will be applied to samples.
ds = hub.Dataset(
"./data/ray/ray_pipeline_basic",
mode="w+",
shape=(100,),
schema=my_schema,
cache=False,
)
for i in range(len(ds)):
ds["image", i] = np.ones((28, 28, 4), dtype="int32")
ds["label", i] = f"hello {i}"
ds["confidence/confidence", i] = 0.2
@hub.transform(schema=my_schema, scheduler="ray")
def my_transform(sample, multiplier: int = 2):
return {
"image": sample["image"].compute() * multiplier,
"label": sample["label"].compute(),
"confidence": {
"confidence": sample["confidence/confidence"].compute() * multiplier
},
}
out_ds = my_transform(ds, multiplier=2)
API¶
-
hub.compute.
transform
(schema, scheduler='single', workers=1)¶ - Transform is a decorator of a function. The function should output a dictionary per sample.
- schema: Schema
The output format of the transformed dataset
- scheduler: str
“single” - for single threaded, “threaded” using multiple threads, “processed”, “ray” scheduler, “dask” scheduler
- workers: int
how many workers will be started for the process
-
class
hub.compute.transform.
Transform
(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)¶ -
__getitem__
(slice_)¶ - Get an item to be computed without iterating on the whole dataset.Creates a dataset view, then a temporary dataset to apply the transform.
- slice_: slice
Gets a slice or slices from dataset
-
__init__
(func, schema, ds, scheduler: str = 'single', workers: int = 1, **kwargs)¶ - Transform applies a user defined function to each sample in single threaded manner.
- Parameters
func (function) – user defined function func(x, **kwargs)
schema (dict of dtypes) – the structure of the final dataset that will be created
ds (Iterative) – input dataset or a list that can be iterated
scheduler (str) – choice between “single”, “threaded”, “processed”
workers (int) – how many threads or processes to use
**kwargs – additional arguments that will be passed to func as static argument for all samples
-
__weakref__
¶ list of weak references to the object (if defined)
-
classmethod
_flatten_dict
(d: Dict, parent_key='', schema=None)¶ - Helper function to flatten dictionary of a recursive tensor
- Parameters
d (dict) –
-
_pbar
(show: bool = True)¶ Returns a progress bar, if empty then it function does nothing
-
_split_list_to_dicts
(xs)¶ - Helper function that transform list of dicts into dicts of lists
- Parameters
xs (list of dicts) –
- Returns
xs_new
- Return type
dicts of lists
-
classmethod
_unwrap
(results)¶ If there is any list then unwrap it into its elements
-
call_func
(fn_index, item, as_list=False)¶ Calls all the functions one after the other
- Parameters
fn_index (int) – The index starting from which the functions need to be called
item – The item on which functions need to be applied
as_list (bool, optional) – If true then treats the item as a list.
- Returns
The final output obtained after all transforms
- Return type
result
-
create_dataset
(url: str, length: int = None, token: dict = None, public: bool = True)¶ Helper function to creat a dataset
-
classmethod
dtype_from_path
(path, schema)¶ Helper function to get the dtype from the path
-
store
(url: str, token: dict = None, length: int = None, ds: Iterable = None, progressbar: bool = True, sample_per_shard: int = None, public: bool = True)¶ - The function to apply the transformation for each element in batchified manner
- Parameters
url (str) – path where the data is going to be stored
token (str or dict, optional) – If url is refering to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict
length (int) – in case shape is None, user can provide length
ds (Iterable) –
progressbar (bool) – Show progress bar
sample_per_shard (int) – How to split the iterator not to overfill RAM
public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public
- Returns
ds – uploaded dataset
- Return type
-
store_shard
(ds_in: Iterable, ds_out: hub.api.dataset.Dataset, offset: int, token=None)¶ Takes a shard of iteratable ds_in, compute and stores in DatasetView
-
upload
(results, ds: hub.api.dataset.Dataset, token: dict, progressbar: bool = True)¶ Batchified upload of results. For each tensor batchify based on its chunk and upload. If tensor is dynamic then still upload element by element. For dynamic tensors, it disable dynamicness and then enables it back.
- Parameters
dataset (hub.Dataset) – Dataset object that should be written to
results – Output of transform function
progressbar (bool) –
- Returns
ds – Uploaded dataset
- Return type
-
-
class
hub.compute.ray.
RayTransform
(func, schema, ds, scheduler='ray', workers=1, **kwargs)¶ -
__init__
(func, schema, ds, scheduler='ray', workers=1, **kwargs)¶ - Transform applies a user defined function to each sample in single threaded manner.
- Parameters
func (function) – user defined function func(x, **kwargs)
schema (dict of dtypes) – the structure of the final dataset that will be created
ds (Iterative) – input dataset or a list that can be iterated
scheduler (str) – choice between “single”, “threaded”, “processed”
workers (int) – how many threads or processes to use
**kwargs – additional arguments that will be passed to func as static argument for all samples
-
set_dynamic_shapes
(results, ds)¶ Sets shapes for dynamic tensors after the dataset is uploaded
- Parameters
results (Tuple) – results from uploading each chunk which includes (key, slice, shape) tuple
ds – Dataset to set the shapes to
-
store
(url: str, token: dict = None, length: int = None, ds: Iterable = None, progressbar: bool = True, public: bool = True)¶ The function to apply the transformation for each element in batchified manner
- Parameters
url (str) – path where the data is going to be stored
token (str or dict, optional) – If url is refering to a place where authorization is required, token is the parameter to pass the credentials, it can be filepath or dict
length (int) – in case shape is None, user can provide length
ds (Iterable) –
progressbar (bool) – Show progress bar
public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public
- Returns
ds – uploaded dataset
- Return type
-
upload
(results, url: str, token: dict, progressbar: bool = True, public: bool = True)¶ Batchified upload of results. For each tensor batchify based on its chunk and upload. If tensor is dynamic then still upload element by element.
- Parameters
dataset (hub.Dataset) – Dataset object that should be written to
results – Output of transform function
progressbar (bool) –
public (bool, optional) – only applicable if using hub storage, ignored otherwise setting this to False allows only the user who created it to access the dataset and the dataset won’t be visible in the visualizer to the public
- Returns
ds – Uploaded dataset
- Return type
-