Main automation module

class automan.automation.Automator(simulation_dir, output_dir, all_problems, cluster_manager_factory=None)[source]

Main class to automate a collection of problems.

This processess command line options and runs all tasks with a scheduler that is configured using the config.json file if it is present. Here is typical usage:

>>> all_problems = [EllipticalDrop]
>>> automator = Automator('outputs', 'figures', all_problems)
>>> automator.run()

The class also creates a automan.cluster_manager.ClusterManager instance and integrates the cluster management features as well. This allows a user to automate their results across a collection of remote machines accessible only by ssh.

run(argv=None)[source]

Start the automation.

class automan.automation.CommandTask(command, output_dir, job_info=None, depends=None)[source]

Convenience class to run a command via the framework. The class provides a method to run the simulation and also check if the simulation is completed. The command should ideally produce all of its outputs inside an output directory that is specified.

clean()[source]

Clean out any generated results.

This completely removes the output directory.

complete()[source]

Should return True/False indicating success of task.

job
output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.Problem(simulation_dir, output_dir)[source]

This class represents a numerical problem or computational problem of interest that needs to be solved.

The class helps one run a variety of commands (or simulations), and then assemble/compare the results from those in the run method. This is perhaps easily understood with an example. Let us say one wishes to run the elliptical drop example problem with the standard SPH and TVF and compare the results and their convergence properties while also keep track of the computational time. To do this one will have to run several simulations, then collect and process the results. This is achieved by subclassing this class and implementing the following methods:

  • get_name(self): returns a string of the name of the problem. All results and simulations are collected inside a directory with this name.
  • get_commands(self): returns a sequence of (directory_name, command_string, job_info, depends) tuples. These are to be executed before the run method is called.
  • get_requires(self): returns a sequence of (name, task) tuples. These are to be exeuted before the run method is called.
  • run(self): Processes the completed simulations to make plots etc.

See the EllipticalDrop example class below to see a full implementation.

clean()[source]

Cleanup any generated output from the analysis code. This does not clean the output of any nested commands.

get_commands()[source]

Return a sequence of (name, command_string, job_info_dict) or (name, command_string, job_info_dict, depends).

The name represents the command being run and is used as a subdirectory for generated output.

The command_string is the command that needs to be run.

The job_info_dict is a dictionary with any additional info to be used by the job, these are additional arguments to the automan.jobs.Job class. It may be None if nothing special need be passed.

The depends is any dependencies this simulation has in terms of other simulations/tasks.

get_name()[source]

Return the name of this problem, this name is used as a directory for the simulation and the outputs.

get_outputs()[source]

Get a list of outputs generated by this problem. By default it returns the output directory (as a single element of a list).

get_requires()[source]

Return a sequence of tuples of form (name, task).

The name represents the command being run and is used as a subdirectory for generated output.

The task is a automan.automation.Task instance.

input_path(*args)[source]

Given any arguments, relative to the simulation dir, return the absolute path.

make_output_dir()[source]

Convenience to make the output directory if needed.

output_path(*args)[source]

Given any arguments relative to the output_dir return the absolute path.

run()[source]

Run any analysis code for the simulations completed. This is usually run after the simulation commands are completed.

setup()[source]

Called by init, so add any initialization here.

task_cls

alias of CommandTask

class automan.automation.PySPHProblem(simulation_dir, output_dir)[source]
task_cls

alias of PySPHTask

class automan.automation.PySPHTask(command, output_dir, job_info=None, depends=None)[source]

Convenience class to run a PySPH simulation via an automation framework.

This task automatically adds the output directory specification for pysph so users to not need to add it.

class automan.automation.RunAll(simulation_dir, output_dir, problem_classes, force=False, match='')[source]

Solves a given collection of problems.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

class automan.automation.Simulation(root, base_command, job_info=None, depends=None, **kw)[source]

A convenient class to abstract code for a particular simulation. Simulation objects are typically created by Problem instances in order to abstract and simulate repetitive code for a particular simulation.

For example if one were comparing the elliptical_drop example, one could instantiate a Simulation object as follows:

>>> s = Simlation('outputs/sph', 'pysph run elliptical_drop')

One can pass any additional command line arguments as follows:

>>> s = Simlation(
...     'outputs/sph', 'pysph run elliptical_drop', timestep=0.005
... )
>>> s.command
'pysph run elliptical_drop --timestep=0.001'
>>> s.input_path('results.npz')
'outputs/sph/results.npz'

The extra parameters can be used to filter and compare different simulations. One can define additional plot methods for a particular subclass and use these to easily plot results for different cases.

One can also pass any additional parameters to the automan.jobs.Job class via the job_info kwarg so as to run the command suitably. For example:

>>> s = Simlation('outputs/sph', 'pysph run elliptical_drop',
...               job_info=dict(n_thread=4))

The object has other methods that are convenient when comparing plots. Along with the compare_cases, filter_cases and filter_by_name this is an extremely powerful way to automate and compare results.

command
data
get_command_line_args()[source]
get_labels(labels)[source]
input_path(*args)[source]

Given any arguments, relative to the simulation dir, return the absolute path.

kwargs_to_command_line(kwargs)[source]
render_parameter(param)[source]

Return string to be used for labels for given parameter.

class automan.automation.SolveProblem(problem, match='', force=False)[source]

Solves a particular Problem. This runs all the commands that the problem requires and then runs the problem instance’s run method.

The match argument is a string which when provided helps run only a subset of the requirements for the problem.

The force argument specifies that the problem should be cleaned, so as to re-run any post-processing.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.Task[source]

Basic task to run. Subclass this to do whatever is needed.

This class is very similar to luigi’s Task class.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.TaskRunner(tasks, scheduler)[source]

Run given tasks using the given scheduler.

add_task(task)[source]
run(wait=5)[source]

Run the tasks that were given.

Wait for the given amount of time to poll for completed tasks.

Returns the number of tasks that had errors.

class automan.automation.WrapperTask[source]

A task that wraps other tasks and is done when all its requirements are done.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

automan.automation.compare_runs(sims, method, labels, exact=None)[source]

Given a sequence of Simulation instances, a method name, the labels to compare and an optional method name for an exact solution, this calls the methods with the appropriate parameters for each simulation.

Parameters

sims: sequence
Sequence of Simulation objects.
method: str or callable
Name of a method on each simulation method to call for plotting. Or a callable which is passed the simulation instance and any kwargs.
labels: sequence
Sequence of parameters to use as labels for the plot.
exact: str or callable
Name of a method that produces an exact solution plot or a callable that will be called.
automan.automation.filter_by_name(cases, names)[source]

Filter a sequence of Simulations by their names. That is, if the case has a name contained in the given names, it will be selected.

automan.automation.filter_cases(runs, predicate=None, **params)[source]

Given a sequence of simulations and any additional parameters, filter out all the cases having exactly those parameters and return a list of them.

One may also pass a callable to filter the cases using the predicate keyword argument. If this is not a callable, it is treated as a parameter. If predicate is passed though, the other keyword arguments are ignored.

automan.automation.key_to_option(key)[source]

Convert a dictionary key to a valid command line option. This simply replaces underscores with dashes.

automan.automation.kwargs_to_command_line(kwargs)[source]

Convert a dictionary of keyword arguments to a list of command-line options. If the value of the key is None, no value is passed.

Examples

>>> sorted(kwargs_to_command_line(dict(some_arg=1, something_else=None)))
['--some-arg=1', '--something-else']
automan.automation.linestyles()[source]

Cycles over a set of possible linestyles to use for plotting.

Low-level job management module

class automan.jobs.Job(command, output_dir, n_core=1, n_thread=1, env=None)[source]
clean(force=False)[source]
get_info()[source]
get_stderr()[source]
get_stdout()[source]
join()[source]
pretty_command()[source]
run()[source]
status()[source]
substitute_in_command(basename, substitute)[source]

Replace occurrence of given basename with the substitute.

This is useful where the user asks to run [‘python’, ‘script.py’] and we wish to change the ‘python’ to a specific Python. Normally this is not needed as the PATH is set to pick up the right Python. However, in the rare cases where this rewriting is needed, this method is available.

to_dict()[source]
class automan.jobs.JobProxy(worker, job_id, job)[source]
clean(force=False)[source]
copy_output(dest)[source]
free_cores()[source]
get_info()[source]
get_stderr()[source]
get_stdout()[source]
run()[source]
status()[source]
total_cores()[source]
class automan.jobs.LocalWorker[source]
clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
get_config()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

class automan.jobs.RemoteWorker(host, python, chdir=None, testing=False, nfs=False)[source]
clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
free_cores()[source]
get_config()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

total_cores()[source]
class automan.jobs.Scheduler(root='.', worker_config=(), wait=5)[source]
add_worker(conf)[source]
load(fname)[source]
save(fname)[source]
submit(job)[source]
class automan.jobs.Worker[source]
can_run(n_core)[source]

Returns True if the worker can run a job with the required cores.

clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
free_cores()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

total_cores()[source]
automan.jobs.free_cores()[source]
automan.jobs.serve(channel)[source]

Serve the remote manager via execnet.

automan.jobs.total_cores()[source]

Cluster management module

Code to bootstrap and update the project so a remote host can be used as a worker to help with the automation of tasks.

This requires ssh/scp and rsync to work on all machines.

This is currently only tested on Linux machines.

exception automan.cluster_manager.BootstrapError[source]
class automan.cluster_manager.ClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]

The cluster manager class.

This class primarily helps setup software on a remote worker machine such that it can run any computational jobs from the automation framework.

The general directory structure of a remote worker machine is as follows:

remote_home/           # Could be ~
    automan/           # Root of automation directory (configurable)
        envs/          # python virtual environments for use.
        my_project/    # Current directory for specific projects.

The project directories are synced from this machine to the remote worker.

The “my_project” is the root of the directory with the automation script and this should contain the required sources that need to be executed. One can use a list of source directories which will be copied over but it is probably most convenient to put it all in the root of the project directory to keep everything self-contained.

The ClusterManager class manages these remote workers by helping setup the directories, bootstrapping the Python virtualenv and also keeping these up-to-date as project directory is changed on the local machine.

The class therefore has two primary public methods,

  1. add_worker(self, host, home, nfs) which adds a new worker machine by bootstrapping the machine with the software and the appropriate source directories.
  2. update(), which keeps the directory and software up-to-date.

The class variables BOOTSTRAP and UPDATE are the content of scripts uploaded to these machines and should be extended by users to do what they wish.

The class creates a config.json in the current working directory that may be edited by a user. It also creates a directory called .{self.root} which defaults to .automan. The bootstrap and update scripts are put here and may be edited by the user for any new hosts.

One may override the _get_python, _get_helper_scripts, and _get_bootstrap_code, _get_update_code methods to change this to use other package managers like edm or conda. See the conda_cluster_manager for an example.

BOOTSTRAP = '#!/bin/bash\n\nset -e\nif hash virtualenv 2>/dev/null; then\n virtualenv --system-site-packages envs/{project_name}\nelse\n python virtualenv.py --system-site-packages envs/{project_name}\nfi\nsource envs/{project_name}/bin/activate\n\npip install automan\n\n# Run any requirements.txt from the user\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n pip install -r requirements.txt\nfi\n'
UPDATE = '#!/bin/bash\n\nset -e\nsource envs/{project_name}/bin/activate\n# Run any requirements.txt from the user\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n pip install -r requirements.txt\nfi\n'
add_worker(host, home, nfs)[source]
cli(argv=None)[source]

This is just a demonstration of how this class could be used.

create_scheduler()[source]

Return a automan.jobs.Scheduler from the configuration.

update(rebuild=True)[source]
class automan.conda_cluster_manager.CondaClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]
BOOTSTRAP = '#!/bin/bash\n\nset -e\nCONDA_ROOT={conda_root}\nENV_FILE="{project_name}/environments.yml"\nif [ -f $ENV_FILE ] ; then\n ~/$CONDA_ROOT/bin/conda env create -q -f $ENV_FILE -n {project_name}\nelse\n ~/$CONDA_ROOT/bin/conda create -y -q -n {project_name} psutil execnet\nfi\n\nsource ~/$CONDA_ROOT/bin/activate {project_name}\npip install automan\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n pip install -r requirements.txt\nfi\n'
CONDA_ROOT = 'miniconda3'
UPDATE = '#!/bin/bash\n\nset -e\nCONDA_ROOT={conda_root}\nENV_FILE="{project_name}/environments.yml"\nif [ -f $ENV_FILE ] ; then\n ~/$CONDA_ROOT/bin/conda env update -q -f $ENV_FILE -n {project_name}\nfi\n\nsource ~/$CONDA_ROOT/bin/activate {project_name}\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n pip install -r requirements.txt\nfi\n'
class automan.edm_cluster_manager.EDMClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]
BOOTSTRAP = '#!/bin/bash\n\nset -e\nENV_FILE="{project_name}/{env_file}"\n\nif hash edm 2>/dev/null; then\n EDM_EXE=edm\nelse\n EDM_EXE=~/{edm_root}/bin/edm\nfi\n\nif [ -f $ENV_FILE ] ; then\n $EDM_EXE -q envs import --force {project_name} -f $ENV_FILE\nelse\n $EDM_EXE -q envs create --force {project_name} --version 3.6\n $EDM_EXE -q install psutil execnet -y -e {project_name}\nfi\n\n$EDM_EXE run -e {project_name} -- pip install automan\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n $EDM_EXE run -e {project_name} -- pip install -r requirements.txt\nfi\n'
EDM_ROOT = '.edm'
ENV_FILE = 'bundled_env.json'
UPDATE = '#!/bin/bash\n\nset -e\nENV_FILE="{project_name}/{env_file}"\n\nif hash edm 2>/dev/null; then\n EDM_EXE=edm\nelse\n EDM_EXE=~/{edm_root}/bin/edm\nfi\n\nif [ -f $ENV_FILE ] ; then\n $EDM_EXE -q envs import --force {project_name} -f $ENV_FILE\nfi\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n $EDM_EXE run -e {project_name} -- pip install -r requirements.txt\nfi\n'