List of keys to monitor. If None then all metrics will be recorded. If you want to record 'all
the usual stuff' plus a particular key which isn't normally recorded, you can use a '' character here.
For example: monitor_names=['', 'y_true']. When recording intermediate variables in the pipeline or
network, you will need to add their names in the monitor_names argument when calling fe.Estimator.
None
instance_id_key
Optional[str]
A key corresponding to data instance ids. If provided, the CSV logger will record per-instance
metric information into the csv file in addition to the standard metrics.
None
mode
Union[None, str, Iterable[str]]
What mode(s) to execute this Trace in. For example, "train", "eval", "test", or "infer". To execute
regardless of mode, pass None. To execute in all modes except for a particular one, you can pass an argument
like "!infer" or "!train".
None
Source code in fastestimator/fastestimator/trace/io/csv_logger.py
@traceable()classCSVLogger(Trace):"""Log monitored quantities in a CSV file. Args: filename: Output filename. monitor_names: List of keys to monitor. If None then all metrics will be recorded. If you want to record 'all the usual stuff' plus a particular key which isn't normally recorded, you can use a '*' character here. For example: monitor_names=['*', 'y_true']. When recording intermediate variables in the pipeline or network, you will need to add their names in the monitor_names argument when calling fe.Estimator. instance_id_key: A key corresponding to data instance ids. If provided, the CSV logger will record per-instance metric information into the csv file in addition to the standard metrics. mode: What mode(s) to execute this Trace in. For example, "train", "eval", "test", or "infer". To execute regardless of mode, pass None. To execute in all modes except for a particular one, you can pass an argument like "!infer" or "!train". """def__init__(self,filename:str,monitor_names:Optional[Union[List[str],str]]=None,instance_id_key:Optional[str]=None,mode:Union[None,str,Iterable[str]]=None)->None:self.instance_id_key=instance_id_keymonitor_names=to_list(monitor_names)instance_id_key=to_list(instance_id_key)inputs=monitor_namesifmonitor_nameselse["*"]inputs.extend(instance_id_key)super().__init__(inputs=inputs,mode=mode)self.filename=filenameself.df_agg=None# DataFrame for aggregate metricsself.df_ins=None# DataFrame for instance metricsdefon_begin(self,data:Data)->None:base_keys=["instance_id","mode","step","epoch"]ifself.instance_id_keyelse["mode","step","epoch"]self.df_agg=pd.DataFrame(columns=base_keys)self.df_ins=pd.DataFrame(columns=base_keys)defon_epoch_end(self,data:Data)->None:keys=set(self.inputs)if"*"notinself.inputselseset(self.inputs)|data.read_logs().keys()keys=keys-{'*',self.instance_id_key}tmpdic={}forkeyinkeys:tmpdic[key]=self._parse_val(data.read_logs().get(key,''))ifkeynotinself.df_agg.columns:self.df_agg[key]=''forcolinset(self.df_agg.columns)-{'mode','step','epoch'}-tmpdic.keys():tmpdic[col]=''# Only record an entry if there is at least one piece of actual information presentifany(tmpdic.values()):self.df_agg=pd.concat(objs=[self.df_agg,pd.DataFrame([{"mode":self.system.mode,"step":self.system.global_step,"epoch":self.system.epoch_idx,**tmpdic}])],ignore_index=True)self._save()# Write on epoch end so that people can see results sooner if debuggingdefon_batch_end(self,data:Data)->None:ifself.instance_id_key:ins_data=data.read_per_instance_logs()keys=set(self.inputs)if"*"notinself.inputselseset(self.inputs)|ins_data.keys()keys=list(keys-{'*',self.instance_id_key})ids=data[self.instance_id_key]batch_size=len(ids)vals=[ins_data.get(key,data.get(key,_SKIP()))forkeyinkeys]# Ignore vals which are not batchedvals=[valif(hasattr(val,'ndim')andval.ndim>0andval.shape[0]==batch_size)or(isinstance(val,(list,tuple))andlen(val)==batch_size)else_SKIP()forvalinvals]# Don't bother recording instance if no data is availableifany((notisinstance(val,_SKIP)forvalinvals)):forkeyinkeys:ifkeynotinself.df_ins.columns:self.df_ins[key]=''rows=[]forsampleinzip_longest(ids,*vals,fillvalue=''):row={"instance_id":self._parse_val(sample[0]),"mode":self.system.mode,"step":self.system.global_step,"epoch":self.system.epoch_idx,**{key:self._parse_val(val)forkey,valinzip(keys,sample[1:])}}forcolinself.df_ins.columns:ifcolnotinrow.keys():row[col]=''rows.append(row)self.df_ins=pd.concat(objs=[self.df_ins,pd.DataFrame(rows)],ignore_index=True)ifself.system.mode=="train"andself.system.log_stepsand(self.system.global_step%self.system.log_steps==0orself.system.global_step==1):keys=set(self.inputs)if"*"notinself.inputselseset(self.inputs)|data.read_logs().keys()keys=keys-{'*',self.instance_id_key}tmpdic={}forkeyinkeys:ifself.instance_id_key:# If you are using an instance_id key, then don't report per-instance values at the agg leveltmpdic[key]=self._parse_val(data.read_logs().get(key,''))else:tmpdic[key]=self._parse_val(data.get(key,''))ifkeynotinself.df_agg.columns:self.df_agg[key]=''forcolinset(self.df_agg.columns)-{'mode','step','epoch'}-tmpdic.keys():tmpdic[col]=''# Only record an entry if there's at least 1 piece of actual informationifany(tmpdic.values()):self.df_agg=pd.concat(objs=[self.df_agg,pd.DataFrame([{"mode":self.system.mode,"step":self.system.global_step,"epoch":self.system.epoch_idx,**tmpdic}])],ignore_index=True)def_save(self)->None:"""Write the current state to disk. """stack=[self.df_ins,self.df_agg]ifself.system.mode=="test":ifos.path.exists(self.filename):df1=pd.read_csv(self.filename,dtype=str)stack.insert(0,df1)stack=pd.concat(stack,axis=0,ignore_index=True)stack.to_csv(self.filename,index=False)@staticmethoddef_parse_val(val:Any)->str:"""Convert values into string representations. Args: val: A value to be printed. Returns: A formatted version of `val` appropriate for a csv file. """ifisinstance(val,str):returnvalifisinstance(val,ValWithError):returnstr(val).replace(',',';')val=to_number(val)ifval.size>1:returnnp.array2string(val,separator=';')ifval.dtype.kindin{'U','S'}:# Unicode or String# remove the b'' from strings stored in tensorsreturnstr(val,'utf-8')returnstr(val)