tomato.daemon package
tomato.daemon: module of functions comprising the tomato daemon
Code author: Peter Kraus
- tomato.daemon.setup_logging(daemon: Daemon)
Helper function to set up logging (folder, filename, verbosity, format) based on the passed daemon state.
- tomato.daemon.tomato_daemon()
The function called when tomato-daemon is executed.
Manages the state of the tomato daemon, including recovery of state via
io
, processing state updates viacmd
, and the manager threads for both jobs (job
) and drivers (driver
).
Submodules
tomato.daemon.cmd: command parsing for tomato daemon
Code author: Peter Kraus
All functions in this module expect a dict
containing the command specification
and a Daemon
object as arguments. The Daemon
object is
altered by the command.
All functions in this module return a Reply
.
- tomato.daemon.cmd.merge_pipelines(cur: dict[str, Pipeline], new: dict[str, Pipeline]) dict[str, Pipeline]
Helper function for merging a
dict
of newPipelines
into the currentdict
.
tomato.daemon.driver: the driver manager of tomato daemon
Code author: Peter Kraus
- tomato.daemon.driver.tomato_driver_bootstrap(req: Socket, logger: Logger, interface: ModelInterface, driver: str)
- tomato.daemon.driver.tomato_driver() None
The function called when tomato-driver is executed.
This function is responsible for managing all activities involving devices of a single driver type.
First, the list of devices (and their channel/address) for the specified driver is fetched from the tomato-daemon. Then, a new instance of the specified driver is spawned, populating its device map using the above list. If successful, the current process information is fed back to the tomato-daemon.
Afterwards, the main loop handles all requests related to each of the devices managed by this driver process, including job commands. Finally, if the driver is instructed to stop, it attempts to perform a teardown before exiting.
- tomato.daemon.driver.spawn_tomato_driver(port: int, driver: str, req: Socket, verbosity: int, logpath: str)
- tomato.daemon.driver.stop_tomato_driver(port: int, context)
- tomato.daemon.driver.manager(port: int, timeout: int = 1000)
The driver manager thread of tomato-daemon.
This manager ensures individual driver processes are (re-)spawned and instructed to quit as necessary.
tomato.daemon.io: functions for storing and loading data
Code author: Peter Kraus
- tomato.daemon.io.load(daemon: Daemon)
Restores a saved status from a
pickle
file into the providedDaemon
.
- tomato.daemon.io.merge_netcdfs(job: Job, snapshot=False)
Merges the individual pickled
xr.Datasets
of each Component found injob.jobpath
into a singlexr.DataTree
, which is then stored in the NetCDF file, using the Component role as the group label.
- tomato.daemon.io.data_to_pickle(ds: Dataset, path: Path, role: str)
Dumps the data provided as
xr.Dataset
into apickle
. Concatenates with any existing data stored in thepickle
.
tomato.daemon.job: the job manager of tomato daemon
Code author: Peter Kraus
Note
Functions in this module that receive the Daemon
state
object should be acting on a copy. All changes to the Daemon
state have to
be propagated via the tomato.daemon.cmd
set of functions.
- tomato.daemon.job.find_matching_pipelines(pips: dict[str, Pipeline], cmps: dict[str, Component], method: list[Task]) list[Pipeline]
- tomato.daemon.job.kill_tomato_job(process: Process)
Wrapper around
psutil.terminate()
.Here we kill the (grand)children of the process with the name of tomato-job, i.e. the individual task functions. This allows the tomato-job process to exit gracefully once the task functions join.
Note that on Windows, the tomato-job.exe process has two children: a python.exe which is the actual process running the job, and conhost.exe, which we want to avoid killing.
- tomato.daemon.job.manage_running_pips(daemon: Daemon, req)
Function that manages jobs and tomato-daemon pipelines.
The function only affects pipelines marked as running, i.e. with a set
jobid
. Jobs scheduled for killing (i.e.status == "rd"
) are terminated. Jobs that are supposed to be running but have crashed are given appropriate status ("ce"
). Pipelines of both are reset.Successful job completions are not processed here, but within the job process.
- tomato.daemon.job.check_queued_jobs(daemon: Daemon, req) dict[int, list[Pipeline]]
Function to check whether the queued jobs can be submitted onto any pipeline.
Returns a
dict
containing the jobids as keys and lists of matchedPipelines
as values.
- tomato.daemon.job.action_queued_jobs(daemon, matched, req)
Function that assigns jobs if a matched pipeline contains the requested sample.
The tomato-job process is launched from this function.
- tomato.daemon.job.manager(port: int, timeout: int = 500)
The job manager thread of tomato-daemon.
This manager ensures the job queue is iterated over and pipelines are managed/reset. Note that we poll the tomato-daemon for status only once per iteration of the main loop.
- tomato.daemon.job.lazy_pirate(pyobj: Any, retries: int, timeout: int, address: str, context: Context) Any
- tomato.daemon.job.tomato_job() None
The function called when tomato-job is executed.
This function is resposible for managing all activities of a single job, including contacting the daemon about job pid, spawning of sub-processes to run tasks on each Component of the Pipeline, merging data at the end of the job, and reporting back to the daemon once the job is successfully finished.
- tomato.daemon.job.job_thread(tasks: list, component: Component, device: Device, driver: Driver, jobpath: Path, logpath: Path)
A subthread of tomato-job, responsible for tasks on one Component of a Pipeline.
For each task in tasks, starts the task, then monitors the Component status and polls for data, and moves on to the next task as instructed in the payload.
Stores the data for that Component as a pickle of a
xr.Dataset
.