Advanced Tutorial 2: Pipeline¶
Overview¶
In this tutorial, we will discuss the following topics:
In the Beginner Tutorial 4, we learned how to build a data pipeline that handles data loading and preprocessing tasks efficiently. Now that you have understood some basic operations in the Pipeline
, we will demonstrate some advanced concepts and how to leverage them to create efficient Pipelines
in this tutorial.
Iterating Through Pipeline¶
In many deep learning tasks, the parameters for preprocessing tasks are precomputed by looping through the dataset. For example, in the ImageNet
dataset, people usually use a precomputed global pixel average for each channel to normalize the images.
Basic Concept¶
In this section, we will see how to iterate through the pipeline in FastEstimator. First we will create a sample NumpyDataset from the data dictionary and load it into a Pipeline
:
import numpy as np
from fastestimator.dataset.data import cifair10
# sample numpy array to later create datasets from them
x_train, y_train = (np.random.sample((10, 2)), np.random.sample((10, 1)))
train_data = {"x": x_train, "y": y_train}
import fastestimator as fe
from fastestimator.dataset.numpy_dataset import NumpyDataset
# create NumpyDataset from the sample data
dataset_fe = NumpyDataset(train_data)
pipeline_fe = fe.Pipeline(train_data=dataset_fe, batch_size=3)
Let's get the loader object for the Pipeline
, then iterate through the loader with a for loop:
with pipeline_fe(mode="train") as loader_fe:
for batch in loader_fe:
print(batch)
{'x': tensor([[0.7792, 0.4546], [0.6361, 0.7613], [0.5676, 0.9048]], dtype=torch.float64), 'y': tensor([[0.9328], [0.9089], [0.1312]], dtype=torch.float64)} {'x': tensor([[0.0175, 0.2374], [0.3992, 0.8328], [0.7125, 0.0620]], dtype=torch.float64), 'y': tensor([[0.4130], [0.9074], [0.3998]], dtype=torch.float64)} {'x': tensor([[0.0584, 0.7026], [0.9152, 0.5944], [0.5536, 0.4152]], dtype=torch.float64), 'y': tensor([[0.0863], [0.5301], [0.6771]], dtype=torch.float64)} {'x': tensor([[0.4141, 0.0859]], dtype=torch.float64), 'y': tensor([[0.5408]], dtype=torch.float64)}
Example¶
Let's say we have the ciFAIR-10 dataset and we want to find global average pixel value over three channels:
from fastestimator.dataset.data import cifair10
cifair_train, _ = cifair10.load_data()
We will take the batch_size
64 and load the data into Pipeline
pipeline_cifair = fe.Pipeline(train_data=cifair_train, batch_size=64)
Now we will iterate through batch data and compute the mean pixel values for all three channels of the dataset.
with pipeline_cifair(mode="train", shuffle=False) as loader_fe:
mean_arr = np.zeros((3))
for i, batch in enumerate(loader_fe):
mean_arr = mean_arr + np.mean(batch["x"].numpy(), axis=(0, 1, 2))
mean_arr = mean_arr / (i+1)
print("Mean pixel value over the channels are: ", mean_arr)
Mean pixel value over the channels are: [125.32287898 122.96682199 113.8856495 ]
Advanced Batching Control¶
Sometimes you may need advanced control over pipeline batching behavior, or even to run pipeline ops on an entire batch of data at once rather than on individual samples. Both of these use cases are enabled through the Batch
Op. We'll start with looking at how you can customize batching behavior using the op:
Dropping Last Batch¶
If the total number of dataset elements is not divisible by the batch_size
, by default, the last batch will have less data than other batches. To drop the last batch we can set drop_last
to True
. Therefore, if the last batch is incomplete it will be dropped.
from fastestimator.op.numpyop import Batch
pipeline_fe = fe.Pipeline(train_data=dataset_fe, ops=[Batch(batch_size=3, drop_last=True)])
Since Batch
is an op, you can schedule it's behavior to change over different epochs (see Advanced Tutorial 5), as well as for specific modes or datasets (see Advanced Tutorial 13).
Padding Batch Data¶
There might be scenario where the input tensors have different dimensions within a batch. For example, in Natural Language Processing, we have input strings with different lengths. For that we need to pad the data to the maximum length within the batch.
To further illustrate in code, we will take numpy array that contains different shapes of array elements and load it into the Pipeline
.
# define numpy arrays with different shapes
elem1 = np.array([4, 5])
elem2 = np.array([1, 2, 6])
elem3 = np.array([3])
# create train dataset
x_train = np.array([elem1, elem2, elem3], dtype=object)
train_data = {"x": x_train}
dataset_fe = NumpyDataset(train_data)
We will set any pad_value
that we want to append at the end of the tensor data. pad_value
can be either int
or float
:
pipeline_fe = fe.Pipeline(train_data=dataset_fe, ops=[Batch(batch_size=3, pad_value=0)])
Now let's print the batch data after padding:
with pipeline_fe(mode="train", shuffle=False) as loader_fe:
for elem in loader_fe:
print(elem)
{'x': tensor([[4, 5, 0], [1, 2, 6], [3, 0, 0]])}
Numpy Ops on Batches of Data¶
Normally Pipeline
ops run on individual data instances before they are combined together into a batch. There might, however, be certain instances where you need to run an op on the entire batch of data at once. You could use a TensorOp
in the Network
to accomplish this, but it is also possible in the Pipeline
by placing your NumpyOp
after the Batch
Op in the op list. This is generally less efficient than performing pre-processing on a per-instance level though, so we recommend only using the feature if you are certain that you need it. This process uses the forward_batch method of NumpyOp
s, which has a default implementation that breaks the batch apart and runs the forward method on each individual instance before recombining the batch. A handful of Ops override this default behavior to take advantage of the full batch information. If you want to implement a custom op that leverages all of the available batch information, take a look at the NumpyOp
base class implementation for more information.
For now, let's consider a simple example using a LambdaOp
which will subtract the batch-global mean from each sample in the batch:
# create train dataset
train_data = {"x": np.array([[1.0, 2.0, 3.0], [4, 5, 6], [7, 8 ,9]])}
dataset_fe = NumpyDataset(train_data)
#Imports
from fastestimator.op.numpyop import LambdaOp
from fastestimator.backend import reduce_mean
# Set up the pipeline
pipeline_fe = fe.Pipeline(train_data=dataset_fe,
ops=[Batch(batch_size=3),
LambdaOp(inputs="x", outputs="x", fn=lambda x: x-reduce_mean(x))
])
# Check the results
pipeline_fe.get_results()
{'x': tensor([[-4., -3., -2.], [-1., 0., 1.], [ 2., 3., 4.]], dtype=torch.float64)}
As you can see, the batch mean (5) was successfully subtracted from each sample
Filtering Data¶
Suppose that you want more control over the composition of a particular batch of data. For example, you might have some bad data you want to exclude, or difficult samples that you want to save for later during training. While it would be more computationally efficient to modify your dataset to exclude undesirable samples, you can also apply a filter inside the Pipeline
using the RemoveIf
Op. This can be applied either before or after the Batch
Op depending on your requirements. Let's take a look at an example:
from fastestimator.backend import reduce_max
from fastestimator.dataset import GeneratorDataset
from fastestimator.op.numpyop import RemoveIf
# Let's start with a dataset that generates random 5x5x3 'images'
image_generator = ({'x':np.random.rand(5,5,3)} for _ in iter(int, 1))
train_data = GeneratorDataset(samples_per_epoch=5, generator=image_generator)
# Now let's remove individual images if they don't have at least 1 value greater than 0.9
# Let's also remove batches of images if the mean of the batch is less than 0.6
pipeline_fe = fe.Pipeline(train_data=train_data,
ops=[RemoveIf(inputs='x', fn=lambda x: reduce_max(x) <= 0.9),
Batch(batch_size=4),
RemoveIf(inputs='x', fn=lambda x: reduce_mean(x) < 0.5)
])
# Check the results
batches = pipeline_fe.get_results(num_steps=2)
for batch in batches:
print(f"batch mean: {reduce_mean(batch['x'])}")
for sample in batch['x']:
print(f"sample max: {reduce_max(sample)}")
print('---')
batch mean: 0.5012458402998198 sample max: 0.959835264378248 sample max: 0.9998099671968909 sample max: 0.9553317550086283 sample max: 0.9985566586444284 --- batch mean: 0.5615718094782679 sample max: 0.9934757632773238 ---
Note that since the dataset specified that it contained 5 samples (samples_per_epoch=5), there were still 5 samples in the output after filtering. The RemoveIf
op defaults to filtering with replacement, meaning that discarded samples are replaced with other samples from the dataset. If you wish to discard without replacement (for example, in eval mode), you can set replacement=False. When replacement is True the system will still draw all of the available data once before repeating samples. See the RemoveIf
docs for more detailed information.
Benchmark Pipeline Speed¶
It is often the case that the bottleneck of deep learning training is the data pipeline. As a result, the GPU may be underutilized. FastEstimator provides a method to check the speed of a Pipeline
in order to help diagnose any potential problems. The way to benchmark Pipeline
speed in FastEstimator is very simple: call Pipeline.benchmark
.
For illustration, we will create a Pipeline
for the ciFAIR-10 dataset with list of Numpy operators that expand dimensions, apply Minmax
and finally Rotate
the input images:
from fastestimator.op.numpyop.univariate import Minmax, ExpandDims
from fastestimator.op.numpyop.multivariate import Rotate
pipeline = fe.Pipeline(train_data=cifair_train,
ops=[Minmax(inputs="x", outputs="x_out"),
Rotate(image_in="x_out", image_out="x_out", limit=180),
ExpandDims(inputs="x_out", outputs="x_out", mode="train")],
batch_size=64)
Let's benchmark the pre-processing speed for this pipeline in training mode:
pipeline.benchmark(mode="train")
FastEstimator-Benchmark: Dataset: , Step: 100, Epoch: 1, Steps/sec: 189.0564267416377 FastEstimator-Benchmark: Dataset: , Step: 200, Epoch: 1, Steps/sec: 254.93623924284935 FastEstimator-Benchmark: Dataset: , Step: 300, Epoch: 1, Steps/sec: 243.89103090633392 FastEstimator-Benchmark: Dataset: , Step: 400, Epoch: 1, Steps/sec: 281.2587460839808 FastEstimator-Benchmark: Dataset: , Step: 500, Epoch: 1, Steps/sec: 262.6512477872427 FastEstimator-Benchmark: Dataset: , Step: 600, Epoch: 1, Steps/sec: 260.0439313025715 FastEstimator-Benchmark: Dataset: , Step: 700, Epoch: 1, Steps/sec: 235.81544039811544 FastEstimator-Benchmark: Dataset: , Step: 800, Epoch: 1, Steps/sec: 231.6578051316986 FastEstimator-Benchmark: Dataset: , Step: 900, Epoch: 1, Steps/sec: 228.99282500888296 FastEstimator-Benchmark: Dataset: , Step: 1000, Epoch: 1, Steps/sec: 221.91052155741298 Breakdown of time taken by Pipeline Operations (mode:train epoch:1, ds_id:) Op : Inputs : Outputs : Time -------------------------------------- Minmax : x : x_out : 39.91% Rotate : x_out : x_out : 49.76% ExpandDims : x_out : x_out : 10.33%