Unverified Commit 83573c2d authored by Philipp Sommer's avatar Philipp Sommer
Browse files

first draft of pydantic based progress api

parent 00297c0e
Pipeline #35099 failed with stage
from ._version import get_versions
from .progress_report import ( # noqa: F401
FormalProgressInterface,
ProgressReport,
)
from .progress_report import ProgressReport # noqa: F401
__version__ = get_versions()["version"]
del get_versions
import abc
import json
from typing import List
from __future__ import annotations
import curses
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional
def report_to_json(report: "ProgressReport") -> dict:
return {
"step_message": report.step_message,
"steps": report.steps,
"completed": report.completed,
"children": [report_to_json(child) for child in report.children],
}
from deprogressapi.settings import ReportMethod, ReportSettings
from pydantic import BaseModel, Field
try:
from demessaging.PulsarMessageConstants import Status
except (ImportError, ModuleNotFoundError):
class FormalProgressInterface(metaclass=abc.ABCMeta):
def __init__(self):
self.__root_progress = None
class Status(str, Enum): # type: ignore
def set_root_progress_report(self, progress_report_root: "ProgressReport"):
self.__root_progress = progress_report_root
self.submit()
SUCCESS = "success"
ERROR = "error"
RUNNING = "running"
def has_root(self) -> bool:
return self.__root_progress is not None
def submit(self):
if self.__root_progress:
self.send(json.dumps(report_to_json(self.__root_progress)))
try:
from typing import Literal # type: ignore
except ImportError:
from typing_extensions import Literal # type: ignore
if TYPE_CHECKING:
from demessaging.PulsarMessageConsumer import PulsarMessageConsumer
def get_data(self):
return report_to_json(self.__root_progress)
@abc.abstractmethod
def send(self, progress_json: str):
pass
class BaseReport(BaseModel):
"""A base report for sending messages via pulsar."""
class Config:
underscore_attrs_are_private = True
_pulsar: Optional[PulsarMessageConsumer] = None
_request: Optional[Dict] = None
_response_properties: Optional[Dict] = None
_settings: ReportSettings = ReportSettings()
_window: Optional[curses.window] = None # type: ignore
# selector for the report type. should be changed by subclasses to make
# sure we select the correct model when deserializing
report_type: Literal["basic"] = Field(
"basic", description="Selector for the report type."
)
# status of the process that we report about.
status: Status = Field(
default=Status.RUNNING, description="Status of the underlying process."
)
class ProgressReport:
def __init__(
self,
step_message: str,
send_handler: FormalProgressInterface,
steps: int = 0,
):
self.step_message = step_message
self.steps = steps
self.send_handler = send_handler
self.completed = False
self.children: List[ProgressReport] = []
if not send_handler.has_root():
# this is the first report for the given send handler
# so it automatically becomes the root report
send_handler.set_root_progress_report(self)
def complete(self):
self.completed = True
# re-submit the now completed report
self.send_handler.submit()
def create_subreport(
self, step_message: str, steps: int = 0
) -> "ProgressReport":
child = ProgressReport(
step_message=step_message,
send_handler=self.send_handler,
steps=steps,
)
*args,
pulsar: Optional[PulsarMessageConsumer] = None,
request: Optional[Dict] = None,
response_properties: Optional[Dict] = None,
submit: bool = False,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self._pulsar = pulsar
self._request = request
self._response_properties = response_properties
if submit:
self.submit()
@property
def window(self) -> curses.window: # type: ignore
if self._window is None:
self._window = curses.initscr()
return self._window
def submit(self) -> None:
"""Submit the report.
This method submits the report and submits it via the pulsar or
calls the :meth:`report` method."""
if self._pulsar is not None:
from demessaging.PulsarMessageConstants import (
MessageType,
PropertyKeys,
)
response_properties = self._response_properties or {}
response_properties[PropertyKeys.STATUS] = self.status
self._pulsar.send_response(
request=self._request,
msg_type=MessageType.PROGRESS,
response_payload=self.json(),
response_properties=response_properties,
)
else:
self.report()
def report(self) -> None:
"""Output the report.
This method, supposed to be implemented by subclasses, reports to the
final user on the command-line.
The default implementation just prints the json string of this report.
"""
method = self._settings.get_report_method()
if method == ReportMethod.print_:
self.print_()
elif method == ReportMethod.curses:
self.print_curses()
def print_(self):
print(self.render_report())
def print_curses(self):
"""Update the :attr:`window`."""
self.window.addstr(1, 0, self.render_report())
self.window.refresh()
def render_report(self) -> str:
return self.json(indent=2)
def complete(self, status: Status = Status.SUCCESS):
self.status = status
self.submit()
if self._window is not None:
curses.endwin()
self._window = None
def __enter__(self):
self.status = Status.RUNNING
self.submit()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.complete(Status.SUCCESS if exc_value is None else Status.ERROR)
class ProgressReport(BaseReport):
"""A tree-like structured progress report."""
report_type: Literal["tree"] = Field( # type: ignore
"tree", description="Selector for the report type."
)
step_message: str = Field(description="The description of the process.")
steps: int = Field(
default=0, description="The number of subprocesses in this report."
)
children: List[ProgressReport] = Field(default_factory=list)
_parent: Optional[ProgressReport] = None
def submit(self):
if self._parent is not None:
self._parent.submit()
else:
super().submit()
def create_subreport(self, *args, **kwargs):
submit = kwargs.pop("submit", False)
child = ProgressReport(*args, **kwargs)
child._parent = self
self.children.append(child)
self.send_handler.submit()
if submit:
self.submit()
return child
ProgressReport.update_forward_refs()
"""Settings for the DASF progress API."""
from enum import Enum
from pydantic import BaseSettings, Field
def is_running_in_notebook():
"""Test if we are running inside a notebook."""
class ReportMethod(str, Enum):
"""Methods for reporting."""
auto = "AUTO"
print_ = "PRINT"
curses = "WINDOWED"
jupyter = "JUPYTER"
class ReportSettings(BaseSettings):
"""Settings for displaying the reports."""
class Config:
env_prefix = "DASF_REPORT_"
report_method: ReportMethod = Field(
ReportMethod.auto,
description="The method to use for printing the reports.",
)
use_curses: bool = Field(
True,
description="Whether to use curses or not for displaying reports.",
)
@property
def auto_report_method(self) -> ReportMethod:
"""Automatically determine the report method."""
method = (
ReportMethod.curses if self.use_curses else ReportMethod.print_
)
try:
from IPython import get_ipython
except (ImportError, ModuleNotFoundError):
return ReportMethod.curses
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
method = ReportMethod.jupyter
except NameError:
pass
return method
def get_report_method(self) -> ReportMethod:
if self.report_method == ReportMethod.auto:
return self.auto_report_method
else:
return self.report_method
import time
if __name__ == "__main__":
from deprogressapi import ProgressReport, FormalProgressInterface
# dummy send handler
class ProgressImplementation(FormalProgressInterface):
def send(self, progress_json: str):
pass
# create the (dummy) send handler
progress_send_handler = ProgressImplementation()
from deprogressapi import ProgressReport
# create the root report
pr1 = ProgressReport('Init', send_handler=progress_send_handler, steps=2)
# create a sub report
pr1_sub1 = pr1.create_subreport(step_message='Step 1.1 - Download')
# run the corresponding task ...
# complete the sub report
pr1_sub1.complete()
# create another sub report with pre-determined number of steps
pr1_sub2 = pr1.create_subreport(step_message='Step 1.2 - Process Download', steps=2)
# create corresponding sub-sub reports
pr1_sub21 = pr1_sub2.create_subreport(step_message='Step 1.2.1 - Process Download')
# ...
pr1_sub21.complete()
pr1_sub22 = pr1_sub2.create_subreport(step_message='Step 1.2.2 - Process Download')
# ...
pr1_sub22.complete()
pr1_sub2.complete()
pr1.complete()
with ProgressReport(step_message="Init", steps=2) as pr1:
time.sleep(1)
# create a sub report
with pr1.create_subreport(
step_message="Step 1.1 - Download"
) as pr1_sub1:
# run the corresponding task ...
time.sleep(1)
with pr1.create_subreport(
step_message="Step 1.2 - Process Download", steps=2
) as pr1_sub2:
# create another sub report with pre-determined number of steps
time.sleep(1)
# we can also do this without the `with` clause. Then we have to
# use `submit` during the initialization and run `complete` at the
# end
pr1_sub21 = pr1_sub2.create_subreport(
step_message="Step 1.2.1 - Process Download", submit=True
)
time.sleep(1)
pr1_sub21.complete()
with pr1_sub2.create_subreport(
step_message="Step 1.2.2 - Process Download"
) as pr1_sub22:
time.sleep(1)
time.sleep(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment