Skip to content

dataloader

FEDataLoader

Bases: DataLoader

Source code in fastestimator\fastestimator\dataset\dataloader.py
class FEDataLoader(DataLoader):
    _current_threads = []

    def shutdown(self) -> None:
        """Close the worker threads used by this iterator.

        The hope is that this will prevent "RuntimeError: DataLoader worker (pid(s) XXXX) exited unexpectedly" during
        the test suites.
        """
        if isinstance(self._iterator, _MultiProcessingDataLoaderIter):
            self._iterator._shutdown_workers()
        self._iterator = None
        FEDataLoader._current_threads.clear()

    def __iter__(self) -> _BaseDataLoaderIter:
        # Similar to the original iter method, but we remember iterators in order to manually close them when new ones
        # are created
        if self.persistent_workers and self.num_workers > 0:
            if self._iterator is None:
                self._iterator = self._get_iterator()
            else:
                self._iterator._reset(self)
        else:
            self.shutdown()
            self._iterator = self._get_iterator()
        if isinstance(self._iterator, _MultiProcessingDataLoaderIter):
            FEDataLoader._current_threads.extend([w.pid for w in self._iterator._workers])
        return self._iterator

shutdown

Close the worker threads used by this iterator.

The hope is that this will prevent "RuntimeError: DataLoader worker (pid(s) XXXX) exited unexpectedly" during the test suites.

Source code in fastestimator\fastestimator\dataset\dataloader.py
def shutdown(self) -> None:
    """Close the worker threads used by this iterator.

    The hope is that this will prevent "RuntimeError: DataLoader worker (pid(s) XXXX) exited unexpectedly" during
    the test suites.
    """
    if isinstance(self._iterator, _MultiProcessingDataLoaderIter):
        self._iterator._shutdown_workers()
    self._iterator = None
    FEDataLoader._current_threads.clear()