Skip to content

network

BaseNetwork

A base class for Network objects.

Networks are used to define the computation graph surrounding one or more models during training.

Parameters:

Name Type Description Default
ops Iterable[Union[TensorOp, Scheduler[TensorOp]]]

The operators to be executed throughout training / testing / inference. These are likely to contain one or more model ops, as well as loss ops and update ops.

required
Source code in fastestimator\fastestimator\network.py
class BaseNetwork:
    """A base class for Network objects.

    Networks are used to define the computation graph surrounding one or more models during training.

    Args:
        ops: The operators to be executed throughout training / testing / inference. These are likely to contain one or
            more model ops, as well as loss ops and update ops.
    """
    def __init__(self, ops: Iterable[Union[TensorOp, Scheduler[TensorOp]]]) -> None:
        self.ops = to_list(ops)
        self.models = to_list(_collect_models(ops))
        self._verify_inputs()
        self.effective_inputs = dict()
        self.effective_outputs = dict()
        self.epoch_ops = []
        self.epoch_models = set()
        self.epoch_state = dict()

    def _verify_inputs(self) -> None:
        """Ensure that all ops are TensorOps.

        Raises:
            AssertionError: If any of the ops are not TensorOps.
        """
        for op in get_current_items(self.ops):
            assert isinstance(op, TensorOp), "unsupported op format, must provide TensorOp in Network"

    def get_scheduled_items(self, mode: str) -> List[Any]:
        """Get a list of items considered for scheduling.

        Args:
            mode: Current execution mode.

        Returns:
            List of schedulable items in Network.
        """
        if mode == "train":
            all_items = self.ops + [model.optimizer for model in self.models]
        else:
            all_items = self.ops
        return all_items

    def load_epoch(self, mode: str, epoch: int, output_keys: Optional[Set[str]] = None, warmup: bool = False) -> None:
        """Prepare the network to run a given epoch and mode.

        This method is necessary since schedulers and op mode restrictions may result in different computation graphs
        every epoch.

        Args:
            mode: The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch to prepare to execute.
            output_keys: What keys must be moved from the GPU back to the CPU after executing a step.
            warmup: Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).
        """
        self.effective_inputs[mode] = self.get_effective_input_keys(mode, epoch)
        self.effective_outputs[mode] = self.get_all_output_keys(mode, epoch)
        if output_keys:
            self.effective_outputs[mode] = self.effective_outputs[mode].intersection(output_keys)
        self.epoch_ops = get_current_items(self.ops, mode, epoch)
        self.epoch_models = set(op.model for op in self.epoch_ops if isinstance(op, (UpdateOp, ModelOp)))
        gradient_ops = [op for op in self.epoch_ops if hasattr(op, "retain_graph")]
        for idx, gradient_op in enumerate(gradient_ops):
            gradient_op.retain_graph = idx != len(gradient_ops) - 1
        self.epoch_state = {"warmup": warmup, "mode": mode, "req_grad": len(gradient_ops) > 0}
        for model in self.epoch_models:
            if hasattr(model, "optimizer") and model.optimizer is not None:
                if isinstance(model.optimizer, Scheduler):
                    model.current_optimizer = model.optimizer.get_current_value(epoch)
                else:
                    model.current_optimizer = model.optimizer

    def unload_epoch(self) -> None:
        """Clean up the network after running an epoch.
        """
        pass

    def get_loss_keys(self) -> Set[str]:
        """Find all of the keys associated with model losses.

        Returns:
            All of the keys associated with model losses in this network.
        """
        loss_keys = set()
        for op in get_current_items(self.ops):
            if isinstance(op, UpdateOp):
                loss_keys.update(op.inputs)
        return loss_keys

    def get_effective_input_keys(self, mode: str, epoch: int) -> Set[str]:
        """Determine which keys need to be provided as input to the network during the given `epoch`.

        Args:
            mode: The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch number to consider for determining inputs.

        Returns:
            The necessary inputs for the network to execute the given `epoch` and `mode`.
        """
        input_keys = set()
        produced_keys = set()
        for op in get_current_items(self.ops, mode, epoch):
            input_keys.update(set(key for key in op.inputs if key not in produced_keys))
            produced_keys.update(op.outputs)
        return input_keys

    def get_all_output_keys(self, mode: str, epoch: int) -> Set[str]:
        """Get all of the keys that will be generated by the network during the given `epoch` and `mode`.

        Args:
            mode: The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch number to consider when searching for outputs.

        Returns:
            The keys that will be generated by the network's Ops during the `epoch` for the given `mode`.
        """
        output_keys = set()
        for op in get_current_items(self.ops, mode, epoch):
            output_keys.update(op.outputs)
        return output_keys

    @staticmethod
    def _forward_batch(batch: MutableMapping[str, Any], state: Dict[str, Any], ops: List[TensorOp]) -> None:
        """Run a forward pass through the network's Op chain given a `batch` of data.

        Args:
            batch: A batch of input data. Predictions from the network will be written back into this dictionary.
            state: A dictionary holding information about the current execution context. The TF gradient tape, for
                example will be stored here.
            ops: Which ops to execute.
        """
        for op in ops:
            data = get_inputs_by_op(op, batch)
            data = op.forward(data, state)
            if op.outputs:
                write_outputs_by_op(op, batch, data)

    def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:  # Batch, Prediction
        """Run a forward step through the Network on a batch of data.

        Implementations of this method within derived classes should handle bringing the prediction data back from the
        (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

        Args:
            batch: The batch of data serving as input to the Network.

        Returns:
            (batch_data, prediction_data)
        """
        raise NotImplementedError

    def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
        """Run a forward step through the Network on an element of data.

        Args:
            data: The element to data to use as input.
            mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch in which to run the transform.

        Returns:
            (batch_data, prediction_data)
        """
        raise NotImplementedError

get_all_output_keys

Get all of the keys that will be generated by the network during the given epoch and mode.

Parameters:

Name Type Description Default
mode str

The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch number to consider when searching for outputs.

required

Returns:

Type Description
Set[str]

The keys that will be generated by the network's Ops during the epoch for the given mode.

Source code in fastestimator\fastestimator\network.py
def get_all_output_keys(self, mode: str, epoch: int) -> Set[str]:
    """Get all of the keys that will be generated by the network during the given `epoch` and `mode`.

    Args:
        mode: The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch number to consider when searching for outputs.

    Returns:
        The keys that will be generated by the network's Ops during the `epoch` for the given `mode`.
    """
    output_keys = set()
    for op in get_current_items(self.ops, mode, epoch):
        output_keys.update(op.outputs)
    return output_keys

get_effective_input_keys

Determine which keys need to be provided as input to the network during the given epoch.

Parameters:

Name Type Description Default
mode str

The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch number to consider for determining inputs.

required

Returns:

Type Description
Set[str]

The necessary inputs for the network to execute the given epoch and mode.

Source code in fastestimator\fastestimator\network.py
def get_effective_input_keys(self, mode: str, epoch: int) -> Set[str]:
    """Determine which keys need to be provided as input to the network during the given `epoch`.

    Args:
        mode: The execution mode to consider. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch number to consider for determining inputs.

    Returns:
        The necessary inputs for the network to execute the given `epoch` and `mode`.
    """
    input_keys = set()
    produced_keys = set()
    for op in get_current_items(self.ops, mode, epoch):
        input_keys.update(set(key for key in op.inputs if key not in produced_keys))
        produced_keys.update(op.outputs)
    return input_keys

get_loss_keys

Find all of the keys associated with model losses.

Returns:

Type Description
Set[str]

All of the keys associated with model losses in this network.

Source code in fastestimator\fastestimator\network.py
def get_loss_keys(self) -> Set[str]:
    """Find all of the keys associated with model losses.

    Returns:
        All of the keys associated with model losses in this network.
    """
    loss_keys = set()
    for op in get_current_items(self.ops):
        if isinstance(op, UpdateOp):
            loss_keys.update(op.inputs)
    return loss_keys

get_scheduled_items

Get a list of items considered for scheduling.

Parameters:

Name Type Description Default
mode str

Current execution mode.

required

Returns:

Type Description
List[Any]

List of schedulable items in Network.

Source code in fastestimator\fastestimator\network.py
def get_scheduled_items(self, mode: str) -> List[Any]:
    """Get a list of items considered for scheduling.

    Args:
        mode: Current execution mode.

    Returns:
        List of schedulable items in Network.
    """
    if mode == "train":
        all_items = self.ops + [model.optimizer for model in self.models]
    else:
        all_items = self.ops
    return all_items

load_epoch

Prepare the network to run a given epoch and mode.

This method is necessary since schedulers and op mode restrictions may result in different computation graphs every epoch.

Parameters:

Name Type Description Default
mode str

The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch to prepare to execute.

required
output_keys Optional[Set[str]]

What keys must be moved from the GPU back to the CPU after executing a step.

None
warmup bool

Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).

False
Source code in fastestimator\fastestimator\network.py
def load_epoch(self, mode: str, epoch: int, output_keys: Optional[Set[str]] = None, warmup: bool = False) -> None:
    """Prepare the network to run a given epoch and mode.

    This method is necessary since schedulers and op mode restrictions may result in different computation graphs
    every epoch.

    Args:
        mode: The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch to prepare to execute.
        output_keys: What keys must be moved from the GPU back to the CPU after executing a step.
        warmup: Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).
    """
    self.effective_inputs[mode] = self.get_effective_input_keys(mode, epoch)
    self.effective_outputs[mode] = self.get_all_output_keys(mode, epoch)
    if output_keys:
        self.effective_outputs[mode] = self.effective_outputs[mode].intersection(output_keys)
    self.epoch_ops = get_current_items(self.ops, mode, epoch)
    self.epoch_models = set(op.model for op in self.epoch_ops if isinstance(op, (UpdateOp, ModelOp)))
    gradient_ops = [op for op in self.epoch_ops if hasattr(op, "retain_graph")]
    for idx, gradient_op in enumerate(gradient_ops):
        gradient_op.retain_graph = idx != len(gradient_ops) - 1
    self.epoch_state = {"warmup": warmup, "mode": mode, "req_grad": len(gradient_ops) > 0}
    for model in self.epoch_models:
        if hasattr(model, "optimizer") and model.optimizer is not None:
            if isinstance(model.optimizer, Scheduler):
                model.current_optimizer = model.optimizer.get_current_value(epoch)
            else:
                model.current_optimizer = model.optimizer

run_step

Run a forward step through the Network on a batch of data.

Implementations of this method within derived classes should handle bringing the prediction data back from the (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

Parameters:

Name Type Description Default
batch Dict[str, Any]

The batch of data serving as input to the Network.

required

Returns:

Type Description
Tuple[Dict[str, Any], Dict[str, Any]]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:  # Batch, Prediction
    """Run a forward step through the Network on a batch of data.

    Implementations of this method within derived classes should handle bringing the prediction data back from the
    (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

    Args:
        batch: The batch of data serving as input to the Network.

    Returns:
        (batch_data, prediction_data)
    """
    raise NotImplementedError

transform

Run a forward step through the Network on an element of data.

Parameters:

Name Type Description Default
data Dict[str, Any]

The element to data to use as input.

required
mode str

The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch in which to run the transform.

1

Returns:

Type Description
Dict[str, Any]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
    """Run a forward step through the Network on an element of data.

    Args:
        data: The element to data to use as input.
        mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch in which to run the transform.

    Returns:
        (batch_data, prediction_data)
    """
    raise NotImplementedError

unload_epoch

Clean up the network after running an epoch.

Source code in fastestimator\fastestimator\network.py
def unload_epoch(self) -> None:
    """Clean up the network after running an epoch.
    """
    pass

TFNetwork

Bases: BaseNetwork

An extension of BaseNetwork for TensorFlow models.

Source code in fastestimator\fastestimator\network.py
class TFNetwork(BaseNetwork):
    """An extension of BaseNetwork for TensorFlow models.
    """
    def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """Run a forward step through the Network on a batch of data.

        Implementations of this method within derived classes should handle bringing the prediction data back from the
        (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

        Args:
            batch: The batch of data serving as input to the Network.

        Returns:
            (batch_data, prediction_data)
        """
        mode = self.epoch_state["mode"]
        batch_in = self._get_effective_batch_input(batch, mode)
        strategy = tf.distribute.get_strategy()
        if isinstance(strategy, tf.distribute.MirroredStrategy):
            if self.epoch_state["warmup"]:
                prediction = strategy.experimental_run_v2(
                    self._forward_step_eager,
                    args=(batch_in, self.epoch_state, self.epoch_ops, to_list(self.effective_outputs[mode])))
            else:
                prediction = strategy.experimental_run_v2(
                    self._forward_step_static,
                    args=(batch_in, self.epoch_state, self.epoch_ops, to_list(self.effective_outputs[mode])))
            batch = self._per_replica_to_global(batch)
            prediction = self._per_replica_to_global(prediction)
        else:
            if self.epoch_state["warmup"]:
                prediction = self._forward_step_eager(batch_in,
                                                      self.epoch_state,
                                                      self.epoch_ops,
                                                      to_list(self.effective_outputs[mode]))
            else:
                prediction = self._forward_step_static(batch_in,
                                                       self.epoch_state,
                                                       self.epoch_ops,
                                                       to_list(self.effective_outputs[mode]))
        return batch, prediction

    def _per_replica_to_global(self, data: T) -> T:
        """Combine data from "per-replica" values recursively.

        For multi-GPU training, data are distributed using `tf.distribute.Strategy.experimental_distribute_dataset`.
        This method collects data from all replicas and combines them into one.

        Args:
            data: Distributed data.

        Returns:
            Combined data from all replicas.
        """
        if isinstance(data, DistributedValues):
            if data.values[0].shape.rank == 0:
                return tf.reduce_mean(tuple(d for d in data.values if not tf.math.is_nan(d)))
            else:
                return tf.concat(data.values, axis=0)
        elif isinstance(data, dict):
            result = {}
            for key, val in data.items():
                result[key] = self._per_replica_to_global(val)
            return result
        elif isinstance(data, list):
            return [self._per_replica_to_global(val) for val in data]
        elif isinstance(data, tuple):
            return tuple([self._per_replica_to_global(val) for val in data])
        elif isinstance(data, set):
            return set([self._per_replica_to_global(val) for val in data])
        else:
            return data

    def _get_effective_batch_input(self, batch: MutableMapping[str, Any], mode: str) -> Dict[str, Any]:
        """Filter input data so that only the data required by the Network is moved onto the GPU.

        Args:
            batch: An unfiltered batch of input data.
            mode: The current execution mode. One of 'train', 'eval', 'test', or 'infer'.

        Returns:
            The filtered input data ready for use on GPU(s).
        """
        new_batch = {}
        for key in self.effective_inputs[mode]:
            if key in batch:
                new_batch[key] = batch[key]
        return new_batch

    def _forward_step_eager(self,
                            batch: Dict[str, Any],
                            state: Dict[str, Any],
                            ops: List[TensorOp],
                            effective_outputs: List[str]) -> Dict[str, Any]:
        """Run a forward step of the Network in eager (non-static graph) mode.

        Args:
            batch: The input data for the Network.
            state: A dictionary containing information about the current execution environment, including the active
                gradient tape.
            ops: A list of Ops to run during the forward step.
            effective_outputs: Which outputs should be copied from the GPU back onto the CPU for further use in Traces.

        Returns:
            The prediction dictionary resulting from a forward pass of the Network.
        """
        batch = ChainMap({}, batch)
        prediction = {}
        with tf.GradientTape(persistent=True) if state["req_grad"] else NonContext() as tape:
            state['tape'] = tape
            self._forward_batch(batch, state, ops)
        del state['tape']
        del tape
        for key in effective_outputs:
            if key in batch:
                prediction[key] = batch[key]
        return prediction

    @tf.function
    def _forward_step_static(self,
                             batch: Dict[str, Any],
                             state: Dict[str, Any],
                             ops: List[TensorOp],
                             effective_outputs: List[str]) -> Dict[str, Any]:
        """Run a forward step of the Network in static graph mode.

        Args:
            batch: The input data for the Network.
            state: A dictionary containing information about the current execution environment, including the active
                gradient tape.
            ops: A list of Ops to run during the forward step.
            effective_outputs: Which outputs should be copied from the GPU back onto the CPU for further use in Traces.

        Returns:
            The prediction dictionary resulting from a forward pass of the Network.
        """
        batch = ChainMap({}, batch)
        prediction = {}
        with tf.GradientTape(persistent=True) if state["req_grad"] else NonContext() as tape:
            state['tape'] = tape
            self._forward_batch(batch, state, ops)
        del state['tape']
        del tape
        for key in effective_outputs:
            if key in batch:
                prediction[key] = batch[key]
        return prediction

    def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
        """Run a forward step through the Network on an element of data.

        Args:
            data: The element to data to use as input.
            mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch in which to run the transform.

        Returns:
            (batch_data, prediction_data)
        """
        self.load_epoch(mode, epoch, warmup=True)
        data = to_tensor(data, target_type="tf")
        data, prediction = self.run_step(data)
        self.unload_epoch()
        # handle tensorflow multi-gpu inferencing issue, it will replicate data on each device
        if isinstance(tf.distribute.get_strategy(), tf.distribute.MirroredStrategy):
            prediction = self._subsample_data(prediction, get_batch_size(data))
        data.update(prediction)
        return data

    def _subsample_data(self, data: T, n: int) -> T:
        """Subsample data by selecting the first n indices recursively.

        Args:
            data: The data to be subsampled.

        Returns:
            Subsampled data.
        """
        if isinstance(data, dict):
            return {key: self._subsample_data(val, n) for (key, val) in data.items()}
        elif isinstance(data, list):
            return [self._subsample_data(val, n) for val in data]
        elif isinstance(data, tuple):
            return tuple([self._subsample_data(val, n) for val in data])
        elif isinstance(data, set):
            return set([self._subsample_data(val, n) for val in data])
        elif hasattr(data, "shape") and list(data.shape) and data.shape[0] > n:
            return data[0:n]
        else:
            return data

run_step

Run a forward step through the Network on a batch of data.

Implementations of this method within derived classes should handle bringing the prediction data back from the (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

Parameters:

Name Type Description Default
batch Dict[str, Any]

The batch of data serving as input to the Network.

required

Returns:

Type Description
Tuple[Dict[str, Any], Dict[str, Any]]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Run a forward step through the Network on a batch of data.

    Implementations of this method within derived classes should handle bringing the prediction data back from the
    (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

    Args:
        batch: The batch of data serving as input to the Network.

    Returns:
        (batch_data, prediction_data)
    """
    mode = self.epoch_state["mode"]
    batch_in = self._get_effective_batch_input(batch, mode)
    strategy = tf.distribute.get_strategy()
    if isinstance(strategy, tf.distribute.MirroredStrategy):
        if self.epoch_state["warmup"]:
            prediction = strategy.experimental_run_v2(
                self._forward_step_eager,
                args=(batch_in, self.epoch_state, self.epoch_ops, to_list(self.effective_outputs[mode])))
        else:
            prediction = strategy.experimental_run_v2(
                self._forward_step_static,
                args=(batch_in, self.epoch_state, self.epoch_ops, to_list(self.effective_outputs[mode])))
        batch = self._per_replica_to_global(batch)
        prediction = self._per_replica_to_global(prediction)
    else:
        if self.epoch_state["warmup"]:
            prediction = self._forward_step_eager(batch_in,
                                                  self.epoch_state,
                                                  self.epoch_ops,
                                                  to_list(self.effective_outputs[mode]))
        else:
            prediction = self._forward_step_static(batch_in,
                                                   self.epoch_state,
                                                   self.epoch_ops,
                                                   to_list(self.effective_outputs[mode]))
    return batch, prediction

transform

Run a forward step through the Network on an element of data.

Parameters:

Name Type Description Default
data Dict[str, Any]

The element to data to use as input.

required
mode str

The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch in which to run the transform.

1

Returns:

Type Description
Dict[str, Any]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
    """Run a forward step through the Network on an element of data.

    Args:
        data: The element to data to use as input.
        mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch in which to run the transform.

    Returns:
        (batch_data, prediction_data)
    """
    self.load_epoch(mode, epoch, warmup=True)
    data = to_tensor(data, target_type="tf")
    data, prediction = self.run_step(data)
    self.unload_epoch()
    # handle tensorflow multi-gpu inferencing issue, it will replicate data on each device
    if isinstance(tf.distribute.get_strategy(), tf.distribute.MirroredStrategy):
        prediction = self._subsample_data(prediction, get_batch_size(data))
    data.update(prediction)
    return data

TorchNetwork

Bases: BaseNetwork

An extension of BaseNetwork for PyTorch models.

Parameters:

Name Type Description Default
ops Iterable[Union[TensorOp, Scheduler[TensorOp]]]

The ops defining the execution graph for this Network.

required
Source code in fastestimator\fastestimator\network.py
class TorchNetwork(BaseNetwork):
    """An extension of BaseNetwork for PyTorch models.

    Args:
        ops: The ops defining the execution graph for this Network.
    """
    def __init__(self, ops: Iterable[Union[TensorOp, Scheduler[TensorOp]]]) -> None:
        super().__init__(ops)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    def load_epoch(self, mode: str, epoch: int, output_keys: Optional[Set[str]] = None, warmup: bool = False) -> None:
        """Prepare the network to run a given epoch and mode.

        This method is necessary since schedulers and op mode restrictions may result in different computation graphs
        every epoch. This also moves all of the necessary models from the CPU onto the GPU(s).

        Args:
            mode: The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch to prepare to execute.
            output_keys: What keys must be moved from the GPU back to the CPU after executing a step.
            warmup: Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).
        """
        super().load_epoch(mode, epoch, output_keys, warmup)
        if self.device.type == "cuda":
            for model in self.epoch_models:
                # move model variables to gpu
                model.to(self.device)
                # move optimizer variables to gpu
                self._move_optimizer_between_device(model.current_optimizer.state, self.device)

    def _move_optimizer_between_device(self, data: Dict[str, Any], device: str) -> None:
        """Move optimizer state between gpu and cpu recursively.

        Args:
            data: Optimizer state.
            device: The target device.
        """
        for key in data:
            if isinstance(data[key], dict):
                self._move_optimizer_between_device(data[key], device)
            else:
                try:
                    data[key] = data[key].to(device)
                except:
                    pass

    def unload_epoch(self) -> None:
        """Clean up the network after running an epoch.

        In this case we move all of the models from the GPU(s) back to the CPU.
        """
        if self.device.type == "cuda":
            for model in self.epoch_models:
                # move model variables to cpu
                model.to("cpu")
                # move optimizer variables to cpu
                self._move_optimizer_between_device(model.current_optimizer.state, "cpu")

    def _get_effective_batch_input(self, batch: MutableMapping[str, Any], mode: str) -> Dict[str, Any]:
        """Copy input data from the the CPU onto the GPU(s).

        This method will filter inputs from the batch so that only data required by the network during execution will be
        copied to the GPU.

        Args:
            batch: The input data to be moved.
            mode: The current execution mode. One of 'train', 'eval', 'test', or 'infer'.

        Returns:
            The input data ready for use on GPU(s).
        """
        if self.device.type == "cuda":
            new_batch = {
                key: self._move_tensor_between_device(batch[key], self.device)
                for key in self.effective_inputs[mode] if key in batch
            }
        else:
            new_batch = {key: batch[key] for key in self.effective_inputs[mode] if key in batch}
        return new_batch

    def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """Run a forward step through the Network on a batch of data.

        Implementations of this method within derived classes should handle bringing the prediction data back from the
        (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

        Args:
            batch: The batch of data serving as input to the Network.

        Returns:
            (batch_data, prediction_data)
        """
        mode = self.epoch_state["mode"]
        batch_in = self._get_effective_batch_input(batch, mode)
        self.epoch_state["tape"] = NonContext()
        # gpu operation
        with torch.no_grad() if not self.epoch_state["req_grad"] else NonContext():
            self._forward_batch(batch_in, self.epoch_state, self.epoch_ops)
        # copy data to cpu
        if self.device.type == "cuda":
            prediction = {
                key: self._move_tensor_between_device(self._detach_tensor(batch_in[key]), "cpu")
                for key in self.effective_outputs[mode] if key in batch_in
            }
        else:
            prediction = {
                key: self._detach_tensor(batch_in[key])
                for key in self.effective_outputs[mode] if key in batch_in
            }
        return batch, prediction

    def _move_tensor_between_device(self, data: T, device: str) -> T:
        """Move tensor between gpu and cpu recursively.

        Args:
            data: The input data to be moved.
            device: The target device.

        Returns:
            Output data.
        """
        if isinstance(data, dict):
            return {key: self._move_tensor_between_device(value, device) for (key, value) in data.items()}
        elif isinstance(data, list):
            return [self._move_tensor_between_device(val, device) for val in data]
        elif isinstance(data, tuple):
            return tuple([self._move_tensor_between_device(val, device) for val in data])
        elif isinstance(data, set):
            return set([self._move_tensor_between_device(val, device) for val in data])
        elif isinstance(data, torch.Tensor):
            return data.to(device)
        else:
            return data

    def _detach_tensor(self, data: T) -> T:
        """Detach tensor from current graph recursively.

        Args:
            data: The data to be detached.

        Returns:
            Output data.
        """
        if isinstance(data, dict):
            return {key: self._detach_tensor(value) for (key, value) in data.items()}
        elif isinstance(data, list):
            return [self._detach_tensor(val) for val in data]
        elif isinstance(data, tuple):
            return tuple([self._detach_tensor(val) for val in data])
        elif isinstance(data, set):
            return set([self._detach_tensor(val) for val in data])
        elif isinstance(data, torch.Tensor):
            return data.detach()

    def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
        """Run a forward step through the Network on an element of data.

        Args:
            data: The element to data to use as input.
            mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
            epoch: The epoch in which to run the transform.

        Returns:
            (batch_data, prediction_data)
        """
        self.load_epoch(mode, epoch, warmup=True)
        data = to_tensor(data, "torch")
        data, prediction = self.run_step(data)
        self.unload_epoch()
        data.update(prediction)
        return data

load_epoch

Prepare the network to run a given epoch and mode.

This method is necessary since schedulers and op mode restrictions may result in different computation graphs every epoch. This also moves all of the necessary models from the CPU onto the GPU(s).

Parameters:

Name Type Description Default
mode str

The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch to prepare to execute.

required
output_keys Optional[Set[str]]

What keys must be moved from the GPU back to the CPU after executing a step.

None
warmup bool

Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).

False
Source code in fastestimator\fastestimator\network.py
def load_epoch(self, mode: str, epoch: int, output_keys: Optional[Set[str]] = None, warmup: bool = False) -> None:
    """Prepare the network to run a given epoch and mode.

    This method is necessary since schedulers and op mode restrictions may result in different computation graphs
    every epoch. This also moves all of the necessary models from the CPU onto the GPU(s).

    Args:
        mode: The mode to prepare to execute. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch to prepare to execute.
        output_keys: What keys must be moved from the GPU back to the CPU after executing a step.
        warmup: Whether to prepare to execute it warmup mode or not (end users can likely ignore this argument).
    """
    super().load_epoch(mode, epoch, output_keys, warmup)
    if self.device.type == "cuda":
        for model in self.epoch_models:
            # move model variables to gpu
            model.to(self.device)
            # move optimizer variables to gpu
            self._move_optimizer_between_device(model.current_optimizer.state, self.device)

run_step

Run a forward step through the Network on a batch of data.

Implementations of this method within derived classes should handle bringing the prediction data back from the (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

Parameters:

Name Type Description Default
batch Dict[str, Any]

The batch of data serving as input to the Network.

required

Returns:

Type Description
Tuple[Dict[str, Any], Dict[str, Any]]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def run_step(self, batch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Run a forward step through the Network on a batch of data.

    Implementations of this method within derived classes should handle bringing the prediction data back from the
    (multi-)GPU environment to the CPU. This method expects that Network.load_epoch() has already been invoked.

    Args:
        batch: The batch of data serving as input to the Network.

    Returns:
        (batch_data, prediction_data)
    """
    mode = self.epoch_state["mode"]
    batch_in = self._get_effective_batch_input(batch, mode)
    self.epoch_state["tape"] = NonContext()
    # gpu operation
    with torch.no_grad() if not self.epoch_state["req_grad"] else NonContext():
        self._forward_batch(batch_in, self.epoch_state, self.epoch_ops)
    # copy data to cpu
    if self.device.type == "cuda":
        prediction = {
            key: self._move_tensor_between_device(self._detach_tensor(batch_in[key]), "cpu")
            for key in self.effective_outputs[mode] if key in batch_in
        }
    else:
        prediction = {
            key: self._detach_tensor(batch_in[key])
            for key in self.effective_outputs[mode] if key in batch_in
        }
    return batch, prediction

transform

Run a forward step through the Network on an element of data.

Parameters:

Name Type Description Default
data Dict[str, Any]

The element to data to use as input.

required
mode str

The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.

required
epoch int

The epoch in which to run the transform.

1

Returns:

Type Description
Dict[str, Any]

(batch_data, prediction_data)

Source code in fastestimator\fastestimator\network.py
def transform(self, data: Dict[str, Any], mode: str, epoch: int = 1) -> Dict[str, Any]:
    """Run a forward step through the Network on an element of data.

    Args:
        data: The element to data to use as input.
        mode: The mode in which to run the transform. One of 'train', 'eval', 'test', or 'infer'.
        epoch: The epoch in which to run the transform.

    Returns:
        (batch_data, prediction_data)
    """
    self.load_epoch(mode, epoch, warmup=True)
    data = to_tensor(data, "torch")
    data, prediction = self.run_step(data)
    self.unload_epoch()
    data.update(prediction)
    return data

unload_epoch

Clean up the network after running an epoch.

In this case we move all of the models from the GPU(s) back to the CPU.

Source code in fastestimator\fastestimator\network.py
def unload_epoch(self) -> None:
    """Clean up the network after running an epoch.

    In this case we move all of the models from the GPU(s) back to the CPU.
    """
    if self.device.type == "cuda":
        for model in self.epoch_models:
            # move model variables to cpu
            model.to("cpu")
            # move optimizer variables to cpu
            self._move_optimizer_between_device(model.current_optimizer.state, "cpu")

Network

A function to automatically instantiate the correct Network derived class based on the given ops.

Parameters:

Name Type Description Default
ops Iterable[Union[TensorOp, Scheduler[TensorOp]]]

A collection of Ops defining the graph for this Network. It should contain at least one ModelOp, and all models should be either TensorFlow or Pytorch. We currently do not support mixing TensorFlow and Pytorch models within the same network.

required

Returns:

Type Description
BaseNetwork

A network instance containing the given ops.

Raises:

Type Description
AssertionError

If TensorFlow and PyTorch models are mixed, or if no models are provided.

ValueError

If a model is provided whose type cannot be identified as either TensorFlow or PyTorch.

Source code in fastestimator\fastestimator\network.py
def Network(ops: Iterable[Union[TensorOp, Scheduler[TensorOp]]]) -> BaseNetwork:
    """A function to automatically instantiate the correct Network derived class based on the given `ops`.

    Args:
        ops: A collection of Ops defining the graph for this Network. It should contain at least one ModelOp, and all
            models should be either TensorFlow or Pytorch. We currently do not support mixing TensorFlow and Pytorch
            models within the same network.

    Returns:
        A network instance containing the given `ops`.

    Raises:
        AssertionError: If TensorFlow and PyTorch models are mixed, or if no models are provided.
        ValueError: If a model is provided whose type cannot be identified as either TensorFlow or PyTorch.
    """
    models = _collect_models(ops)
    assert models, "cannot find model in Network ops"
    framework = set()
    for model in models:
        if isinstance(model, tf.keras.Model):
            framework.add("tf")
        elif isinstance(model, torch.nn.Module):
            framework.add("torch")
        else:
            framework.add("unknown")
    assert len(framework) == 1, "please make sure either tensorflow or torch model is used in network"

    framework = framework.pop()
    if framework == "tf":
        network = TFNetwork(ops)
    elif framework == "torch":
        network = TorchNetwork(ops)
    else:
        raise ValueError("Unknown model type")
    return network

build

Build model instances and associate them with optimizers.

This method can be used with TensorFlow models / optimizers:

model_def = fe.architecture.tensorflow.LeNet
model = fe.build(model_fn = model_def, optimizer_fn="adam")
model = fe.build(model_fn = model_def, optimizer_fn=lambda: tf.optimizers.Adam(lr=0.1))
model = fe.build(model_fn = model_def, optimizer_fn="adam", weights_path="~/weights.h5")

This method can be used with PyTorch models / optimizers:

model_def = fe.architecture.pytorch.LeNet
model = fe.build(model_fn = model_def, optimizer_fn="adam")
model = fe.build(model_fn = model_def, optimizer_fn=lambda x: torch.optim.Adam(params=x, lr=0.1))
model = fe.build(model_fn = model_def, optimizer_fn="adam", weights_path="~/weights.pt)

Parameters:

Name Type Description Default
model_fn Callable[[], Union[Model, List[Model]]]

A function that define model(s).

required
optimizer_fn Union[str, Scheduler, Callable, List[str], List[Callable], List[Scheduler], None]

Optimizer string/definition or a list of optimizer instances/strings. The number of optimizers provided here should match the number of models generated by the model_fn.

required
model_name Union[str, List[str], None]

Name(s) of the model(s) that will be used for logging purpose. If None, a name will be automatically generated and assigned.

None
weights_path Union[str, None, List[Union[str, None]]]

Path(s) from which to load model weights. If not None, then the number of weight paths provided should match the number of models generated by the model_fn.

None

Returns:

Name Type Description
models Union[Model, List[Model]]

The model(s) built by FastEstimator.

Source code in fastestimator\fastestimator\network.py
def build(model_fn: Callable[[], Union[Model, List[Model]]],
          optimizer_fn: Union[str, Scheduler, Callable, List[str], List[Callable], List[Scheduler], None],
          weights_path: Union[str, None, List[Union[str, None]]] = None,
          model_name: Union[str, List[str], None] = None) -> Union[Model, List[Model]]:
    """Build model instances and associate them with optimizers.

    This method can be used with TensorFlow models / optimizers:
    ```python
    model_def = fe.architecture.tensorflow.LeNet
    model = fe.build(model_fn = model_def, optimizer_fn="adam")
    model = fe.build(model_fn = model_def, optimizer_fn=lambda: tf.optimizers.Adam(lr=0.1))
    model = fe.build(model_fn = model_def, optimizer_fn="adam", weights_path="~/weights.h5")
    ```

    This method can be used with PyTorch models / optimizers:
    ```python
    model_def = fe.architecture.pytorch.LeNet
    model = fe.build(model_fn = model_def, optimizer_fn="adam")
    model = fe.build(model_fn = model_def, optimizer_fn=lambda x: torch.optim.Adam(params=x, lr=0.1))
    model = fe.build(model_fn = model_def, optimizer_fn="adam", weights_path="~/weights.pt)
    ```

    Args:
        model_fn: A function that define model(s).
        optimizer_fn: Optimizer string/definition or a list of optimizer instances/strings. The number of optimizers
            provided here should match the number of models generated by the `model_fn`.
        model_name: Name(s) of the model(s) that will be used for logging purpose. If None, a name will be
            automatically generated and assigned.
        weights_path: Path(s) from which to load model weights. If not None, then the number of weight paths provided
            should match the number of models generated by the `model_fn`.

    Returns:
        models: The model(s) built by FastEstimator.
    """
    def _generate_model_names(num_names):
        names = ["model" if i + build.count == 0 else "model{}".format(i + build.count) for i in range(num_names)]
        build.count += num_names
        return names

    if not hasattr(build, "count"):
        build.count = 0
    models, optimizer_fn = to_list(model_fn()), to_list(optimizer_fn)
    # fill optimizer
    if not optimizer_fn:
        optimizer_fn = [None]
    # check framework
    if isinstance(models[0], tf.keras.Model):
        framework = "tf"
    elif isinstance(models[0], torch.nn.Module):
        framework = "torch"
    else:
        raise ValueError("unrecognized model format: {}".format(type(models[0])))
    # multi-gpu handling
    if torch.cuda.device_count() > 1:
        if framework == "tf" and not isinstance(tf.distribute.get_strategy(), tf.distribute.MirroredStrategy):
            tf.distribute.experimental_set_strategy(tf.distribute.MirroredStrategy())
            models = to_list(model_fn())
        if framework == "torch":
            models = [torch.nn.DataParallel(model) for model in models]
    # generate names
    if not model_name:
        model_name = _generate_model_names(len(models))
    model_name = to_list(model_name)
    # load weights
    if weights_path:
        weights_path = to_list(weights_path)
    else:
        weights_path = [None] * len(models)
    assert len(models) == len(optimizer_fn) == len(weights_path) == len(model_name), \
        "Found inconsistency in number of models, optimizers, model_name or weights"
    # create optimizer
    for idx, (model, optimizer_def, weight, name) in enumerate(zip(models, optimizer_fn, weights_path, model_name)):
        models[idx] = _fe_compile(model, optimizer_def, weight, name, framework)
    if len(models) == 1:
        models = models[0]
    return models