Source code for event_model.documents.event
from typing import Any, Dict, Union
from typing_extensions import Annotated, NotRequired, TypedDict
from .generate.type_wrapper import Field, add_extra_schema
EVENT_EXTRA_SCHEMA = {"additionalProperties": False}
[docs]
class PartialEvent(TypedDict):
    data: Annotated[Dict[str, Any], Field(description="The actual measurement data")]
    filled: NotRequired[
        Annotated[
            Dict[str, Union[bool, str]],
            Field(
                description="Mapping each of the keys of externally-stored data to the "
                "boolean False, indicating that the data has not been loaded, or to "
                "foreign keys (moved here from 'data' when the data was loaded)"
            ),
        ]
    ]
    time: Annotated[
        float,
        Field(
            description="The event time. This maybe different than the timestamps on "
            "each of the data entries.",
        ),
    ]
    timestamps: Annotated[
        Dict[str, Any],
        Field(description="The timestamps of the individual measurement data"),
    ] 
@add_extra_schema(EVENT_EXTRA_SCHEMA)
class Event(PartialEvent):
    """Document to record a quanta of collected data"""
    descriptor: Annotated[
        str, Field(description="UID of the EventDescriptor to which this Event belongs")
    ]
    seq_num: Annotated[
        int,
        Field(
            description="Sequence number to identify the location of this Event in the "
            "Event stream",
        ),
    ]
    uid: Annotated[str, Field(description="Globally unique identifier for this Event")]