Using Ops with Custom Data Loaders¶
After using FE for a while you will likely become attached to the operator paradigm, but might concievably encounter a usecase which is not well supported by the default FE Pipeline. As you may already be aware, one way to avoid any limitations imposed by the FE API is to pass your own PyTorch Dataloader (or TensorFlow dataset) directly into the FE Pipeline (instead of passing a PyTorch/FE Dataset). Normally this would prevent you from using FE Ops, but there is a way around this:
Op Dataset¶
FE contains an object called an OpDataset which is what we use internally to chain Ops onto datasets within our dataloader. You can construct one youself as well for use within your own dataloader. Let's see an example.
from fastestimator.dataset.data import mnist
from fastestimator.dataset.op_dataset import OpDataset
from fastestimator.op.numpyop.univariate import ExpandDims, Minmax
# Let's start by getting a simple dataset
train_data, eval_data = mnist.load_data()
# Now we can manually put this dataset into an OpDataset, along with our Op list
op_ds = OpDataset(dataset=train_data,
mode="train",
ops=[ExpandDims(inputs="x", outputs="x"),
Minmax(inputs="x", outputs="x")])
Note that while this will work for most use cases, the Batch
Op and RemoveIf
Op will not work as expected if you try to put them into your own custom OpDataset.
Custom Data Loaders¶
Now let's construct a custom PyTorch data loader using our OpDataset. Suppose, for example, that you want your batch size to change every step following the Fibonacci sequence. Even though the FE API lacks support for this critically important feature, you can still implement it yourself using a custom PyTorch batch sampler:
import math
import random
from torch.utils.data import Sampler
# A batch sampler that will increase the batch size based on the Fibonacci Sequence for a specified number of batches
class FibonacciSampler(Sampler):
def __init__(self, ds_length: int, n_batches: int):
self.ds_ln = ds_length
self.n_batches = n_batches
self.fib_fn = lambda n: round((math.pow((1+math.sqrt(5))/2, n) - math.pow((1-math.sqrt(5))/2, n))/math.sqrt(5))
def __len__(self):
return self.ds_ln
def __iter__(self):
indices = [random.sample(range(self.ds_ln), self.fib_fn(i)) for i in range(1, self.n_batches+1)]
return iter(indices)
my_sampler = FibonacciSampler(ds_length=len(op_ds), n_batches=10)
# Now let's build a custom data loader using this sampler:
from torch.utils.data import DataLoader
import numpy as np
loader = DataLoader(dataset=op_ds,
batch_sampler=my_sampler,
worker_init_fn=lambda _: np.random.seed(random.randint(0, 2**32 - 1)),
num_workers=4)
#The worker_init_fn is needed to ensure that any randomness you have in your pipeline behaves properly across different threads
Putting Things Together¶
Now that we have a custom data loader along with our op dataset, let's use them with an FE pipeline and see what happens:
from fastestimator import Pipeline
pipeline = Pipeline(train_data = loader)
data = pipeline.get_results(num_steps=10)
for idx, batch in enumerate(data):
print(f"batch {idx}: {batch['x'].shape}")
batch 0: torch.Size([1, 28, 28, 1]) batch 1: torch.Size([1, 28, 28, 1]) batch 2: torch.Size([2, 28, 28, 1]) batch 3: torch.Size([3, 28, 28, 1]) batch 4: torch.Size([5, 28, 28, 1]) batch 5: torch.Size([8, 28, 28, 1]) batch 6: torch.Size([13, 28, 28, 1]) batch 7: torch.Size([21, 28, 28, 1]) batch 8: torch.Size([34, 28, 28, 1]) batch 9: torch.Size([55, 28, 28, 1])
As expected, our batch size is now increasing every step following the Fibonacci sequence, but we have also successfully integrated FE Ops into our customized pipeline. Huzzah!