Main automation module

class automan.automation.Automator(simulation_dir, output_dir, all_problems, cluster_manager_factory=None)[source]

Main class to automate a collection of problems.

This processess command line options and runs all tasks with a scheduler that is configured using the config.json file if it is present. Here is typical usage:

>>> all_problems = [EllipticalDrop]
>>> automator = Automator('outputs', 'figures', all_problems)
>>> automator.run()

The class also creates a automan.cluster_manager.ClusterManager instance and integrates the cluster management features as well. This allows a user to automate their results across a collection of remote machines accessible only by ssh.

add_task(task, name=None, post_proc=False)[source]

Add a task or a problem instance to also execute.

If the name is specified then it is a treated as a named task wherein it must be only invoked explicitly via the command line when asked.

If post_proc is True then the task is given an additional dependency if possible such that the task is run after the RunAll task is completed.

Parameters

task: Task or Problem instance: Task or Problem to add. name: str: name of the task (optional). post_proc: bool: Add a dependency to the task with the RunAll task.

run(argv=None)[source]

Start the automation.

class automan.automation.CommandTask(command, output_dir, job_info=None, depends=None)[source]

Convenience class to run a command via the framework. The class provides a method to run the simulation and also check if the simulation is completed. The command should ideally produce all of its outputs inside an output directory that is specified.

clean()[source]

Clean out any generated results.

This completely removes the output directory.

complete()[source]

Should return True/False indicating success of task.

property job
output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.FileCommandTask(command, files, job_info=None, depends=None)[source]

Convenience class to run a command which produces as output one or more files. The difference from the CommandTask is that this does not place its outputs in a separate directory.

clean()[source]

Clean out any generated results.

This completely removes the output directory.

output()[source]

Return list of output paths.

class automan.automation.Problem(simulation_dir, output_dir)[source]

This class represents a numerical problem or computational problem of interest that needs to be solved.

The class helps one run a variety of commands (or simulations), and then assemble/compare the results from those in the run method. This is perhaps easily understood with an example. Let us say one wishes to run the elliptical drop example problem with the standard SPH and TVF and compare the results and their convergence properties while also keep track of the computational time. To do this one will have to run several simulations, then collect and process the results. This is achieved by subclassing this class and implementing the following methods:

  • get_name(self): returns a string of the name of the problem. All results and simulations are collected inside a directory with this name.

  • get_commands(self): returns a sequence of (directory_name, command_string, job_info, depends) tuples. These are to be executed before the run method is called.

  • get_requires(self): returns a sequence of (name, task) tuples. These are to be exeuted before the run method is called.

  • run(self): Processes the completed simulations to make plots etc.

See the EllipticalDrop example class below to see a full implementation.

clean()[source]

Cleanup any generated output from the analysis code. This does not clean the output of any nested commands.

get_commands()[source]

Return a sequence of (name, command_string, job_info_dict) or (name, command_string, job_info_dict, depends).

The name represents the command being run and is used as a subdirectory for generated output.

The command_string is the command that needs to be run.

The job_info_dict is a dictionary with any additional info to be used by the job, these are additional arguments to the automan.jobs.Job class. It may be None if nothing special need be passed.

The depends is any dependencies this simulation has in terms of other simulations/tasks.

get_name()[source]

Return the name of this problem, this name is used as a directory for the simulation and the outputs.

get_outputs()[source]

Get a list of outputs generated by this problem. By default it returns the output directory (as a single element of a list).

get_requires()[source]

Return a sequence of tuples of form (name, task).

The name represents the command being run and is used as a subdirectory for generated output.

The task is a automan.automation.Task instance.

input_path(*args)[source]

Given any arguments, relative to the simulation dir, return the absolute path.

make_output_dir()[source]

Convenience to make the output directory if needed.

output_path(*args)[source]

Given any arguments relative to the output_dir return the absolute path.

run()[source]

Run any analysis code for the simulations completed. This is usually run after the simulation commands are completed.

setup()[source]

Called by init, so add any initialization here.

simulation_path(*args)

Given any arguments, relative to the simulation dir, return the absolute path.

task_cls

alias of automan.automation.CommandTask

class automan.automation.PySPHProblem(simulation_dir, output_dir)[source]
task_cls

alias of automan.automation.PySPHTask

class automan.automation.PySPHTask(command, output_dir, job_info=None, depends=None)[source]

Convenience class to run a PySPH simulation via an automation framework.

This task automatically adds the output directory specification for pysph so users to not need to add it.

class automan.automation.RunAll(simulation_dir, output_dir, problem_classes, force=False, match='', depends=None)[source]

Solves a given collection of problems.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

class automan.automation.Simulation(root, base_command, job_info=None, depends=None, **kw)[source]

A convenient class to abstract code for a particular simulation. Simulation objects are typically created by Problem instances in order to abstract and simulate repetitive code for a particular simulation.

For example if one were comparing the elliptical_drop example, one could instantiate a Simulation object as follows:

>>> s = Simlation('outputs/sph', 'pysph run elliptical_drop')

One can pass any additional command line arguments as follows:

>>> s = Simlation(
...     'outputs/sph', 'pysph run elliptical_drop', timestep=0.005
... )
>>> s.command
'pysph run elliptical_drop --timestep=0.001'
>>> s.input_path('results.npz')
'outputs/sph/results.npz'

The extra parameters can be used to filter and compare different simulations. One can define additional plot methods for a particular subclass and use these to easily plot results for different cases.

One can also pass any additional parameters to the automan.jobs.Job class via the job_info kwarg so as to run the command suitably. For example:

>>> s = Simlation('outputs/sph', 'pysph run elliptical_drop',
...               job_info=dict(n_thread=4))

The object has other methods that are convenient when comparing plots. Along with the compare_cases, filter_cases and filter_by_name this is an extremely powerful way to automate and compare results.

property command
property data
get_command_line_args()[source]
get_labels(labels)[source]
input_path(*args)[source]

Given any arguments, relative to the simulation dir, return the absolute path.

kwargs_to_command_line(kwargs)[source]
render_parameter(param)[source]

Return string to be used for labels for given parameter.

class automan.automation.SolveProblem(problem, match='', force=False, depends=None)[source]

Solves a particular Problem. This runs all the commands that the problem requires and then runs the problem instance’s run method.

The match argument is a string which when provided helps run only a subset of the requirements for the problem.

The force argument specifies that the problem should be cleaned, so as to re-run any post-processing.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.Task(depends=None)[source]

Basic task to run. Subclass this to do whatever is needed.

This class is very similar to luigi’s Task class.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

output()[source]

Return list of output paths.

requires()[source]

Return iterable of tasks this task requires.

It is important that one either return tasks that are idempotent or return the same instance as this method is called repeatedly.

run(scheduler)[source]

Run the task, using the given scheduler.

Using the scheduler is optional but recommended for any long-running tasks. It is safe to raise an exception immediately when running the task but for long running tasks, the exception will not matter and the complete method should do.

class automan.automation.TaskRunner(tasks, scheduler)[source]

Run given tasks using the given scheduler.

add_task(task)[source]
run(wait=5)[source]

Run the tasks that were given.

Wait for the given amount of time to poll for completed tasks.

Returns the number of tasks that had errors.

class automan.automation.WrapperTask(depends=None)[source]

A task that wraps other tasks and is done when all its requirements are done.

complete()[source]

Should return True/False indicating success of task.

If the task was just executed (in this invocation) but failed, raise any Exception that is a subclass of Exception as this signals an error to the task execution engine.

If the task was executed in an earlier invocation of the automation, then just return True/False so as to be able to re-run the simulation.

automan.automation.key_to_option(key)[source]

Convert a dictionary key to a valid command line option. This simply replaces underscores with dashes.

automan.automation.kwargs_to_command_line(kwargs)[source]

Convert a dictionary of keyword arguments to a list of command-line options. If the value of the key is None, no value is passed.

Examples

>>> sorted(kwargs_to_command_line(dict(some_arg=1, something_else=None)))
['--some-arg=1', '--something-else']

Utility functions for automation

Utility functions for automation scripts.

automan.utils.compare_runs(sims, method, labels, exact=None, styles=<function styles>)[source]

Given a sequence of Simulation instances, a method name, the labels to compare and an optional method name for an exact solution, this calls the methods with the appropriate parameters for each simulation.

Parameters

sims: sequence

Sequence of Simulation objects.

method: str or callable

Name of a method on each simulation method to call for plotting. Or a callable which is passed the simulation instance and any kwargs.

labels: sequence

Sequence of parameters to use as labels for the plot.

exact: str or callable

Name of a method that produces an exact solution plot or a callable that will be called.

styles: callable: returns an iterator/iterable of style keyword arguments.

Defaults to the styles function defined in this module.

automan.utils.dprod(a, b)[source]

Multiplies the given list of dictionaries a and b.

This makes a list of new dictionaries which is the product of the given two dictionaries.

Example

>>> dprod(mdict(a=[1, 2], b=['xy']), mdict(c='ab'))
[{'a': 1, 'b': 'xy', 'c': 'a'},
 {'a': 1, 'b': 'xy', 'c': 'b'},
 {'a': 2, 'b': 'xy', 'c': 'a'},
 {'a': 2, 'b': 'xy', 'c': 'b'}]
automan.utils.filter_by_name(cases, names)[source]

Filter a sequence of Simulations by their names. That is, if the case has a name contained in the given names, it will be selected.

automan.utils.filter_cases(runs, predicate=None, **params)[source]

Given a sequence of simulations and any additional parameters, filter out all the cases having exactly those parameters and return a list of them.

One may also pass a callable to filter the cases using the predicate keyword argument. If this is not a callable, it is treated as a parameter. If predicate is passed though, the other keyword arguments are ignored.

automan.utils.mdict(**kw)[source]

Expands out the passed kwargs into a list of dictionaries.

Each kwarg value is expected to be a sequence. The resulting list of dictionaries is the product of the different values and the same keys.

Example

>>> mdict(a=[1, 2], b='xy')
[{'a': 1, 'b': 'x'},
 {'a': 1, 'b': 'y'},
 {'a': 2, 'b': 'x'},
 {'a': 2, 'b': 'y'}]
automan.utils.opts2path(opts, keys=None, ignore=None, kmap=None)[source]

Renders the given options as a path name.

Parameters

opts: dict

dictionary of options

keys: list

Keys of the options use.

ignore: list

Ignore these keys in the options.

kmap: dict

map the key names through this dict.

Examples

>>> opts2path(dict(x=1, y='hello', z=0.1))
'x_1_hello_z_0.1'
>>> opts2path(dict(x=1, y='hello', z=0.1), keys=['x'])
'x_1'
>>> opts2path(dict(x=1, y='hello', z=0.1), ignore=['x'])
'hello_z_0.1'
>>> opts2path(dict(x=1, y='hello', z=0.1), kmap=dict(x='XX'))
'XX_1_hello_z_0.1'
automan.utils.styles(sims)[source]

Cycles over a set of possible styles to use for plotting.

The method is passed a sequence of the Simulation instances. This should return an iterator which produces a dictionary each time containing a set of keyword arguments to be used for a particular plot.

Parameters

sims: sequence

Sequence of Simulation objects.

Returns

An iterator which produces a dictionary containing a set of kwargs to be used for the plotting. Can also return an iterable containing dictionaries.

Low-level job management module

class automan.jobs.Job(command, output_dir, n_core=1, n_thread=1, env=None)[source]
clean(force=False)[source]
get_info()[source]
get_stderr()[source]
get_stdout()[source]
join()[source]
pretty_command()[source]
run()[source]
status()[source]
substitute_in_command(basename, substitute)[source]

Replace occurrence of given basename with the substitute.

This is useful where the user asks to run [‘python’, ‘script.py’] and we wish to change the ‘python’ to a specific Python. Normally this is not needed as the PATH is set to pick up the right Python. However, in the rare cases where this rewriting is needed, this method is available.

to_dict()[source]
class automan.jobs.JobProxy(worker, job_id, job)[source]
clean(force=False)[source]
copy_output(dest)[source]
free_cores()[source]
get_info()[source]
get_stderr()[source]
get_stdout()[source]
run()[source]
status()[source]
total_cores()[source]
class automan.jobs.LocalWorker[source]
clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
get_config()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

class automan.jobs.RemoteWorker(host, python, chdir=None, testing=False, nfs=False)[source]
clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
free_cores()[source]
get_config()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

total_cores()[source]
class automan.jobs.Scheduler(root='.', worker_config=(), wait=5)[source]
add_worker(conf)[source]
load(fname)[source]
save(fname)[source]
submit(job)[source]
class automan.jobs.Worker[source]
can_run(req_core)[source]

Returns True if the worker can run a job with the required cores.

clean(job_id, force=False)[source]
copy_output(job_id, dest)[source]
cores_required(n_core)[source]
free_cores()[source]
get_info(job_id)[source]
get_stderr(job_id)[source]
get_stdout(job_id)[source]
run(job)[source]

Runs the job and returns a JobProxy for the job.

status(job_id)[source]

Returns status of the job.

total_cores()[source]
automan.jobs.cores_required(n_core)[source]
automan.jobs.free_cores()[source]
automan.jobs.serve(channel)[source]

Serve the remote manager via execnet.

automan.jobs.threads_required(n_thread, n_core)[source]
automan.jobs.total_cores()[source]

Cluster management module

Code to bootstrap and update the project so a remote host can be used as a worker to help with the automation of tasks.

This requires ssh/scp and rsync to work on all machines.

This is currently only tested on Linux machines.

exception automan.cluster_manager.BootstrapError[source]
class automan.cluster_manager.ClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]

The cluster manager class.

This class primarily helps setup software on a remote worker machine such that it can run any computational jobs from the automation framework.

The general directory structure of a remote worker machine is as follows:

remote_home/           # Could be ~
    automan/           # Root of automation directory (configurable)
        envs/          # python virtual environments for use.
        my_project/    # Current directory for specific projects.

The project directories are synced from this machine to the remote worker.

The “my_project” is the root of the directory with the automation script and this should contain the required sources that need to be executed. One can use a list of source directories which will be copied over but it is probably most convenient to put it all in the root of the project directory to keep everything self-contained.

The ClusterManager class manages these remote workers by helping setup the directories, bootstrapping the Python virtualenv and also keeping these up-to-date as project directory is changed on the local machine.

The class therefore has two primary public methods,

  1. add_worker(self, host, home, nfs) which adds a new worker machine by bootstrapping the machine with the software and the appropriate source directories.

  2. update(), which keeps the directory and software up-to-date.

The class variables BOOTSTRAP and UPDATE are the content of scripts uploaded to these machines and should be extended by users to do what they wish.

The class creates a config.json in the current working directory that may be edited by a user. It also creates a directory called .{self.root} which defaults to .automan. The bootstrap and update scripts are put here and may be edited by the user for any new hosts.

One may override the _get_python, _get_helper_scripts, and _get_bootstrap_code, _get_update_code methods to change this to use other package managers like edm or conda. See the conda_cluster_manager for an example.

BOOTSTRAP = '#!/bin/bash\n\nset -e\nif hash virtualenv 2>/dev/null; then\n    virtualenv -p python3 --system-site-packages envs/{project_name}\nelse\n    python3 virtualenv.py --system-site-packages envs/{project_name}\nfi\nsource envs/{project_name}/bin/activate\n\npip install automan\n\n# Run any requirements.txt from the user\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    pip install -r requirements.txt\nfi\n'
UPDATE = '#!/bin/bash\n\nset -e\nsource envs/{project_name}/bin/activate\n# Run any requirements.txt from the user\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    pip install -r requirements.txt\nfi\n'
add_worker(host, home, nfs)[source]
cli(argv=None)[source]

This is just a demonstration of how this class could be used.

create_scheduler()[source]

Return a automan.jobs.Scheduler from the configuration.

delete(sim_dir, remotes)[source]
update(rebuild=True)[source]
class automan.conda_cluster_manager.CondaClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]
BOOTSTRAP = '#!/bin/bash\n\nset -e\nCONDA_ROOT={conda_root}\nENV_FILE="{project_name}/environments.yml"\nif [ -f $ENV_FILE ] ; then\n    ~/$CONDA_ROOT/bin/conda env create -q -f $ENV_FILE -n {project_name}\nelse\n    ~/$CONDA_ROOT/bin/conda create -y -q -n {project_name} psutil execnet\nfi\n\nsource ~/$CONDA_ROOT/bin/activate {project_name}\npip install automan\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    pip install -r requirements.txt\nfi\n'
CONDA_ROOT = 'miniconda3'
UPDATE = '#!/bin/bash\n\nset -e\nCONDA_ROOT={conda_root}\nENV_FILE="{project_name}/environments.yml"\nif [ -f $ENV_FILE ] ; then\n    ~/$CONDA_ROOT/bin/conda env update -q -f $ENV_FILE -n {project_name}\nfi\n\nsource ~/$CONDA_ROOT/bin/activate {project_name}\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    pip install -r requirements.txt\nfi\n'
class automan.edm_cluster_manager.EDMClusterManager(root='automan', sources=None, config_fname='config.json', exclude_paths=None, testing=False)[source]
BOOTSTRAP = '#!/bin/bash\n\nset -e\nENV_FILE="{project_name}/{env_file}"\n\nif hash edm 2>/dev/null; then\n    EDM_EXE=edm\nelse\n    EDM_EXE=~/{edm_root}/bin/edm\nfi\n\nif [ -f $ENV_FILE ] ; then\n    $EDM_EXE -q envs import --force {project_name} -f $ENV_FILE\nelse\n    $EDM_EXE -q envs create --force {project_name} --version 3.6\n    $EDM_EXE -q install psutil execnet -y -e {project_name}\nfi\n\n$EDM_EXE run -e {project_name} -- pip install automan\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    $EDM_EXE run -e {project_name} -- pip install -r requirements.txt\nfi\n'
EDM_ROOT = '.edm'
ENV_FILE = 'bundled_env.json'
UPDATE = '#!/bin/bash\n\nset -e\nENV_FILE="{project_name}/{env_file}"\n\nif hash edm 2>/dev/null; then\n    EDM_EXE=edm\nelse\n    EDM_EXE=~/{edm_root}/bin/edm\nfi\n\nif [ -f $ENV_FILE ] ; then\n    $EDM_EXE -q envs import --force {project_name} -f $ENV_FILE\nfi\n\ncd {project_name}\nif [ -f "requirements.txt" ] ; then\n    $EDM_EXE run -e {project_name} -- pip install -r requirements.txt\nfi\n'