Source code for ophyd_async.core._soft_signal_backend
from__future__importannotationsimporttimefromabcimportabstractmethodfromcollections.abcimportSequencefromdataclassesimportdataclassfromfunctoolsimportlru_cachefromtypingimportAny,Generic,get_originimportnumpyasnpfrombluesky.protocolsimportReadingfromevent_modelimportDataKeyfrom._signal_backendimport(Array1D,EnumT,Primitive,PrimitiveT,SignalBackend,SignalDatatype,SignalDatatypeT,SignalMetadata,TableT,make_datakey,)from._tableimportTablefrom._utilsimportCallback,get_dtype,get_enum_clsclassSoftConverter(Generic[SignalDatatypeT]):# This is Any -> SignalDatatypeT because we support coercing# value types to SignalDatatype to allow people to do things like# SignalRW[Enum].set("enum value")@abstractmethoddefwrite_value(self,value:Any)->SignalDatatypeT:...@dataclassclassPrimitiveSoftConverter(SoftConverter[PrimitiveT]):datatype:type[PrimitiveT]defwrite_value(self,value:Any)->PrimitiveT:returnself.datatype(value)ifvalueelseself.datatype()classSequenceStrSoftConverter(SoftConverter[Sequence[str]]):defwrite_value(self,value:Any)->Sequence[str]:return[str(v)forvinvalue]ifvalueelse[]@dataclassclassSequenceEnumSoftConverter(SoftConverter[Sequence[EnumT]]):datatype:type[EnumT]defwrite_value(self,value:Any)->Sequence[EnumT]:return[self.datatype(v)forvinvalue]ifvalueelse[]@dataclassclassNDArraySoftConverter(SoftConverter[Array1D]):datatype:np.dtypedefwrite_value(self,value:Any)->Array1D:returnnp.array(()ifvalueisNoneelsevalue,dtype=self.datatype)@dataclassclassEnumSoftConverter(SoftConverter[EnumT]):datatype:type[EnumT]defwrite_value(self,value:Any)->EnumT:return(self.datatype(value)ifvalueelselist(self.datatype.__members__.values())[0])@dataclassclassTableSoftConverter(SoftConverter[TableT]):datatype:type[TableT]defwrite_value(self,value:Any)->TableT:ifisinstance(value,dict):returnself.datatype(**value)elifisinstance(value,self.datatype):returnvalueelifvalueisNone:returnself.datatype()else:raiseTypeError(f"Cannot convert {value} to {self.datatype}")@lru_cachedefmake_converter(datatype:type[SignalDatatype])->SoftConverter:enum_cls=get_enum_cls(datatype)ifdatatype==Sequence[str]:returnSequenceStrSoftConverter()elifget_origin(datatype)==Sequenceandenum_cls:returnSequenceEnumSoftConverter(enum_cls)elifget_origin(datatype)==np.ndarray:returnNDArraySoftConverter(get_dtype(datatype))elifenum_cls:returnEnumSoftConverter(enum_cls)elifissubclass(datatype,Table):returnTableSoftConverter(datatype)elifissubclass(datatype,Primitive):returnPrimitiveSoftConverter(datatype)raiseTypeError(f"Can't make converter for {datatype}")
[docs]classSoftSignalBackend(SignalBackend[SignalDatatypeT]):"""An backend to a soft Signal, for test signals see ``MockSignalBackend``."""def__init__(self,datatype:type[SignalDatatypeT]|None,initial_value:SignalDatatypeT|None=None,units:str|None=None,precision:int|None=None,):# Create the right converter for the datatypeself.converter=make_converter(datatypeorfloat)# Add the extra static metadata to the dictionaryself.metadata:SignalMetadata={}ifunitsisnotNone:self.metadata["units"]=unitsifprecisionisnotNone:self.metadata["precision"]=precisionifenum_cls:=get_enum_cls(datatype):self.metadata["choices"]=[v.valueforvinenum_cls]# Create and set the initial valueself.initial_value=self.converter.write_value(initial_value)self.reading:Reading[SignalDatatypeT]self.callback:Callback[Reading[SignalDatatypeT]]|None=Noneself.set_value(self.initial_value)super().__init__(datatype)defset_value(self,value:SignalDatatypeT):self.reading=Reading(value=self.converter.write_value(value),timestamp=time.monotonic(),alarm_severity=0,)ifself.callback:self.callback(self.reading)defsource(self,name:str,read:bool)->str:returnf"soft://{name}"asyncdefconnect(self,timeout:float):passasyncdefput(self,value:SignalDatatypeT|None,wait:bool)->None:write_value=self.initial_valueifvalueisNoneelsevalueself.set_value(write_value)asyncdefget_datakey(self,source:str)->DataKey:returnmake_datakey(self.datatypeorfloat,self.reading["value"],source,self.metadata)asyncdefget_reading(self)->Reading[SignalDatatypeT]:returnself.readingasyncdefget_value(self)->SignalDatatypeT:returnself.reading["value"]asyncdefget_setpoint(self)->SignalDatatypeT:# For a soft signal, the setpoint and readback values are the same.returnself.reading["value"]defset_callback(self,callback:Callback[Reading[SignalDatatypeT]]|None)->None:ifcallback:assertnotself.callback,"Cannot set a callback when one is already set"callback(self.reading)self.callback=callback