tomato.daemon package

tomato.daemon: module of functions comprising the tomato daemon

Code author: Peter Kraus

tomato.daemon.setup_logging(daemon: Daemon)

Helper function to set up logging (folder, filename, verbosity, format) based on the passed daemon state.

tomato.daemon.tomato_daemon()

The function called when tomato-daemon is executed.

Manages the state of the tomato daemon, including recovery of state via io, processing state updates via cmd, and the manager threads for both jobs (job) and drivers (driver).

Submodules

tomato.daemon.cmd: command parsing for tomato daemon

Code author: Peter Kraus

All functions in this module expect a dict containing the command specification and a Daemon object as arguments. The Daemon object is altered by the command.

All functions in this module return a Reply.

tomato.daemon.cmd.merge_pipelines(cur: dict[str, Pipeline], new: dict[str, Pipeline]) dict[str, Pipeline]

Helper function for merging a dict of new Pipelines into the current dict.

tomato.daemon.cmd.status(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.stop(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.setup(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.pipeline(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.job(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.driver(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.device(msg: dict, daemon: Daemon) Reply
tomato.daemon.cmd.component(msg: dict, daemon: Daemon) Reply

tomato.daemon.driver: the driver manager of tomato daemon

Code author: Peter Kraus

tomato.daemon.driver.tomato_driver_bootstrap(req: Socket, logger: Logger, interface: ModelInterface, driver: str)
tomato.daemon.driver.tomato_driver() None

The function called when tomato-driver is executed.

This function is responsible for managing all activities involving devices of a single driver type.

First, the list of devices (and their channel/address) for the specified driver is fetched from the tomato-daemon. Then, a new instance of the specified driver is spawned, populating its device map using the above list. If successful, the current process information is fed back to the tomato-daemon.

Afterwards, the main loop handles all requests related to each of the devices managed by this driver process, including job commands. Finally, if the driver is instructed to stop, it attempts to perform a teardown before exiting.

tomato.daemon.driver.spawn_tomato_driver(port: int, driver: str, req: Socket, verbosity: int, logpath: str)
tomato.daemon.driver.stop_tomato_driver(port: int, context)
tomato.daemon.driver.manager(port: int, timeout: int = 1000)

The driver manager thread of tomato-daemon.

This manager ensures individual driver processes are (re-)spawned and instructed to quit as necessary.

tomato.daemon.io: functions for storing and loading data

Code author: Peter Kraus

tomato.daemon.io.store(daemon: Daemon)

Stores the status of the provided Daemon as a pickle file.

tomato.daemon.io.load(daemon: Daemon)

Restores a saved status from a pickle file into the provided Daemon.

tomato.daemon.io.merge_netcdfs(job: Job, snapshot=False)

Merges the individual pickled xr.Datasets of each Component found in job.jobpath into a single xr.DataTree, which is then stored in the NetCDF file, using the Component role as the group label.

tomato.daemon.io.data_to_pickle(ds: Dataset, path: Path, role: str)

Dumps the data provided as xr.Dataset into a pickle. Concatenates with any existing data stored in the pickle.

tomato.daemon.job: the job manager of tomato daemon

Code author: Peter Kraus

Note

Functions in this module that receive the Daemon state object should be acting on a copy. All changes to the Daemon state have to be propagated via the tomato.daemon.cmd set of functions.

tomato.daemon.job.find_matching_pipelines(pips: dict[str, Pipeline], cmps: dict[str, Component], method: list[Task]) list[Pipeline]
tomato.daemon.job.kill_tomato_job(process: Process)

Wrapper around psutil.terminate().

Here we kill the (grand)children of the process with the name of tomato-job, i.e. the individual task functions. This allows the tomato-job process to exit gracefully once the task functions join.

Note that on Windows, the tomato-job.exe process has two children: a python.exe which is the actual process running the job, and conhost.exe, which we want to avoid killing.

tomato.daemon.job.manage_running_pips(daemon: Daemon, req)

Function that manages jobs and tomato-daemon pipelines.

The function only affects pipelines marked as running, i.e. with a set jobid. Jobs scheduled for killing (i.e. status == "rd") are terminated. Jobs that are supposed to be running but have crashed are given appropriate status ("ce"). Pipelines of both are reset.

Successful job completions are not processed here, but within the job process.

tomato.daemon.job.check_queued_jobs(daemon: Daemon, req) dict[int, list[Pipeline]]

Function to check whether the queued jobs can be submitted onto any pipeline.

Returns a dict containing the jobids as keys and lists of matched Pipelines as values.

tomato.daemon.job.action_queued_jobs(daemon, matched, req)

Function that assigns jobs if a matched pipeline contains the requested sample.

The tomato-job process is launched from this function.

tomato.daemon.job.manager(port: int, timeout: int = 500)

The job manager thread of tomato-daemon.

This manager ensures the job queue is iterated over and pipelines are managed/reset. Note that we poll the tomato-daemon for status only once per iteration of the main loop.

tomato.daemon.job.lazy_pirate(pyobj: Any, retries: int, timeout: int, address: str, context: Context) Any
tomato.daemon.job.tomato_job() None

The function called when tomato-job is executed.

This function is resposible for managing all activities of a single job, including contacting the daemon about job pid, spawning of sub-processes to run tasks on each Component of the Pipeline, merging data at the end of the job, and reporting back to the daemon once the job is successfully finished.

tomato.daemon.job.job_thread(tasks: list, component: Component, device: Device, driver: Driver, jobpath: Path, logpath: Path)

A subthread of tomato-job, responsible for tasks on one Component of a Pipeline.

For each task in tasks, starts the task, then monitors the Component status and polls for data, and moves on to the next task as instructed in the payload.

Stores the data for that Component as a pickle of a xr.Dataset.

tomato.daemon.job.job_main_loop(context: Context, port: int, job: Job, pipname: str, logpath: Path) None

The main loop function of tomato-job, split for better readability.