Source code for ophyd_async.fastcs.panda._hdf_panda
from __future__ import annotations
from typing import Sequence
from ophyd_async.core import DEFAULT_TIMEOUT, PathProvider, SignalR, StandardDetector
from ophyd_async.epics.pvi import create_children_from_annotations, fill_pvi_entries
from ._block import CommonPandaBlocks
from ._control import PandaPcapController
from ._writer import PandaHDFWriter
[docs]
class HDFPanda(CommonPandaBlocks, StandardDetector):
def __init__(
self,
prefix: str,
path_provider: PathProvider,
config_sigs: Sequence[SignalR] = (),
name: str = "",
):
self._prefix = prefix
create_children_from_annotations(self)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
prefix=prefix,
path_provider=path_provider,
name_provider=lambda: name,
panda_data_block=self.data,
)
super().__init__(
controller=controller,
writer=writer,
config_sigs=config_sigs,
name=name,
)
[docs]
async def connect(
self, mock: bool = False, timeout: float = DEFAULT_TIMEOUT
) -> None:
await fill_pvi_entries(self, self._prefix + "PVI", timeout=timeout, mock=mock)
await super().connect(mock=mock, timeout=timeout)