Source code for ophyd_async.panda._hdf_panda
from __future__ import annotations
from typing import Sequence
from ophyd_async.core import (
DEFAULT_TIMEOUT,
PathProvider,
SignalR,
StandardDetector,
)
from ophyd_async.epics.pvi import create_children_from_annotations, fill_pvi_entries
from ._common_blocks import CommonPandaBlocks
from ._panda_controller import PandaPcapController
from .writers._hdf_writer import PandaHDFWriter
[docs]
class HDFPanda(CommonPandaBlocks, StandardDetector):
def __init__(
self,
prefix: str,
path_provider: PathProvider,
config_sigs: Sequence[SignalR] = (),
name: str = "",
):
self._prefix = prefix
create_children_from_annotations(self)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
prefix=prefix,
path_provider=path_provider,
name_provider=lambda: name,
panda_device=self,
)
super().__init__(
controller=controller,
writer=writer,
config_sigs=config_sigs,
name=name,
)
[docs]
async def connect(
self, mock: bool = False, timeout: float = DEFAULT_TIMEOUT
) -> None:
await fill_pvi_entries(self, self._prefix + "PVI", timeout=timeout, mock=mock)
await super().connect(mock=mock, timeout=timeout)