@traceable()
class OneOf(TensorOp):
"""Perform one of several possible TensorOps.
Args:
*tensor_ops: Ops to choose between with a specified (or uniform) probability.
probs: List of probabilities, must sum to 1. When None, the probabilities will be equally distributed.
"""
def __init__(self, *tensor_ops: TensorOp, probs: Optional[List[float]] = None) -> None:
inputs = tensor_ops[0].inputs
outputs = tensor_ops[0].outputs
mode = tensor_ops[0].mode
ds_id = tensor_ops[0].ds_id
super().__init__(inputs=inputs, outputs=outputs, mode=mode, ds_id=ds_id)
self.in_list = tensor_ops[0].in_list
self.out_list = tensor_ops[0].out_list
for op in tensor_ops[1:]:
assert inputs == op.inputs, "All ops within a OneOf must share the same inputs"
assert self.in_list == op.in_list, "All ops within OneOf must share the same input configuration"
assert outputs == op.outputs, "All ops within a OneOf must share the same outputs"
assert self.out_list == op.out_list, "All ops within OneOf must share the same output configuration"
assert mode == op.mode, "All ops within a OneOf must share the same mode"
assert ds_id == op.ds_id, "All ops within a OneOf must share the same ds_id"
if probs:
assert len(tensor_ops) == len(probs), "The number of probabilities do not match with number of Operators"
assert abs(sum(probs) - 1) < 1e-8, "Probabilities must sum to 1"
else:
probs = [1 / len(tensor_ops) for _ in tensor_ops]
self.ops = tensor_ops
self.probs = probs
self.framework = None
def build(self, framework: str, device: Optional[torch.device] = None) -> None:
assert framework in {"tf", "torch"}, "unrecognized framework: {}".format(framework)
self.framework = framework
for op in self.ops:
op.build(framework, device)
def get_fe_loss_keys(self) -> Set[str]:
return set.union(*[op.get_fe_loss_keys() for op in self.ops])
def get_fe_models(self) -> Set[Model]:
return set.union(*[op.get_fe_models() for op in self.ops])
def fe_retain_graph(self, retain: Optional[bool] = None) -> Optional[bool]:
resp = None
for op in self.ops:
resp = resp or op.fe_retain_graph(retain)
return resp
def __getstate__(self) -> Dict[str, List[Dict[Any, Any]]]:
return {'ops': [elem.__getstate__() if hasattr(elem, '__getstate__') else {} for elem in self.ops]}
def forward(self, data: Union[Tensor, List[Tensor]], state: Dict[str, Any]) -> Union[Tensor, List[Tensor]]:
"""Execute a randomly selected op from the list of `numpy_ops`.
Args:
data: The information to be passed to one of the wrapped operators.
state: Information about the current execution context, for example {"mode": "train"}.
Returns:
The `data` after application of one of the available numpyOps.
"""
if self.framework == 'tf':
idx = cast(tf.random.categorical(tf.math.log([self.probs]), 1), dtype='int32')[0, 0]
results = tf.switch_case(idx, [lambda op=op: op.forward(data, state) for op in self.ops])
else:
results = np.random.choice(self.ops, p=self.probs).forward(data, state)
return results