Source code for event_model.documents.run_stop

from typing import Dict

from typing_extensions import Annotated, Literal, NotRequired, TypedDict

from .generate.type_wrapper import DataType, Field, add_extra_schema

RUN_STOP_EXTRA_SCHEMA = {
    "patternProperties": {"^([^./]+)$": {"$ref": "#/$defs/DataType"}},
    "additionalProperties": False,
}


[docs] @add_extra_schema(RUN_STOP_EXTRA_SCHEMA) class RunStop(TypedDict): """ Document for the end of a run indicating the success/fail state of the run and the end time """ data_type: NotRequired[Annotated[DataType, Field(description="data_type")]] exit_status: Annotated[ Literal["success", "abort", "fail"], Field(description="State of the run when it ended"), ] num_events: NotRequired[ Annotated[ Dict[str, int], Field( description="Number of Events per named stream", ), ] ] reason: NotRequired[ Annotated[str, Field(description="Long-form description of why the run ended")] ] run_start: Annotated[ str, Field( description="Reference back to the run_start document that this document " "is paired with.", ), ] time: Annotated[float, Field(description="The time the run ended. Unix epoch")] uid: Annotated[str, Field(description="Globally unique ID for this document")]