[docs]classDevice(HasName):"""Common base class for all Ophyd Async Devices. By default, names and connects all Device children. """_name:str=""#: The parent Device if it existsparent:Optional[Device]=Nonedef__init__(self,name:str="")->None:self.set_name(name)@propertydefname(self)->str:"""Return the name of the Device"""returnself._namedefchildren(self)->Iterator[Tuple[str,Device]]:forattr_name,attrinself.__dict__.items():ifattr_name!="parent"andisinstance(attr,Device):yieldattr_name,attr
[docs]defset_name(self,name:str):"""Set ``self.name=name`` and each ``self.child.name=name+"-child"``. Parameters ---------- name: New name to set """self._name=nameforattr_name,childinself.children():child_name=f"{name}-{attr_name.rstrip('_')}"ifnameelse""child.set_name(child_name)child.parent=self
[docs]asyncdefconnect(self,sim:bool=False,timeout:float=DEFAULT_TIMEOUT):"""Connect self and all child Devices. Contains a timeout that gets propagated to child.connect methods. Parameters ---------- sim: If True then connect in simulation mode. timeout: Time to wait before failing with a TimeoutError. """coros={name:child_device.connect(sim,timeout=timeout)forname,child_deviceinself.children()}ifcoros:awaitwait_for_connection(**coros)
[docs]classDeviceCollector:"""Collector of top level Device instances to be used as a context manager Parameters ---------- set_name: If True, call ``device.set_name(variable_name)`` on all collected Devices connect: If True, call ``device.connect(sim)`` in parallel on all collected Devices sim: If True, connect Signals in simulation mode timeout: How long to wait for connect before logging an exception Notes ----- Example usage:: [async] with DeviceCollector(): t1x = motor.Motor("BLxxI-MO-TABLE-01:X") t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y") # Names and connects devices here assert t1x.comm.velocity.source assert t1x.name == "t1x" """def__init__(self,set_name=True,connect=True,sim=False,timeout:float=10.0,):self._set_name=set_nameself._connect=connectself._sim=simself._timeout=timeoutself._names_on_enter:Set[str]=set()self._objects_on_exit:Dict[str,Any]={}def_caller_locals(self):"""Walk up until we find a stack frame that doesn't have us as self"""try:raiseValueErrorexceptValueError:_,_,tb=sys.exc_info()asserttb,"Can't get traceback, this shouldn't happen"caller_frame=tb.tb_framewhilecaller_frame.f_locals.get("self",None)isself:caller_frame=caller_frame.f_backreturncaller_frame.f_localsdef__enter__(self)->"DeviceCollector":# Stash the names that were defined before we were calledself._names_on_enter=set(self._caller_locals())returnselfasyncdef__aenter__(self)->"DeviceCollector":returnself.__enter__()asyncdef_on_exit(self)->None:# Name and kick off connect for devicesconnect_coroutines:Dict[str,Coroutine]={}forname,objinself._objects_on_exit.items():ifnamenotinself._names_on_enterandisinstance(obj,Device):ifself._set_nameandnotobj.name:obj.set_name(name)ifself._connect:connect_coroutines[name]=obj.connect(self._sim,timeout=self._timeout)# Connect to all the devicesifconnect_coroutines:awaitwait_for_connection(**connect_coroutines)asyncdef__aexit__(self,type,value,traceback):self._objects_on_exit=self._caller_locals()awaitself._on_exit()def__exit__(self,type_,value,traceback):self._objects_on_exit=self._caller_locals()try:fut=call_in_bluesky_event_loop(self._on_exit())exceptRuntimeError:raiseNotConnected("Could not connect devices. Is the bluesky event loop running? See ""https://blueskyproject.io/ophyd-async/main/""user/explanations/event-loop-choice.html for more info.")returnfut