diff --git a/batchoar.py b/batchoar.py new file mode 100644 index 0000000000000000000000000000000000000000..6472f650fa45894d1996ce89d2ccc8b3e25b6d85 --- /dev/null +++ b/batchoar.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# coding: utf-8 + +import argparse +import yaml +import itertools +import subprocess +import logging +from collections import deque + +from . import oargen + +def argparser(): + parser = argparse.ArgumentParser() + parser.add_argument('job_file', + help="YAML file which contains all the jobs to launch") + parser.add_argument('destination', + help="New directory to create used to store outputs") + parser.add_argument('-b', '--besteffort', action="store_true", + help="Launch job in besteffort mode") + parser.add_argument('-t', '--time', default="10", + help="Default maximum duration of the jobs (format: h[:m[:s]])") + parser.add_argument('-g', '--gpu', action="store_true", + help="If True, reserves only cores with GPUs") + parser.add_argument('-c', '--core', default=1, type=int, + help="Number of cores to reserve") + parser.add_argument('-H', '--host', + help="Name of the host (SQL LIKE syntax accepted)") + parser.add_argument('-i', '--interactive', action="store_true", + help="Launch job in interactive mode") + parser.add_argument('-C', '--checkpoint', type=int, metavar="SECONDS", + help="Enable checkpoint signals with the given delay (in seconds)") + parser.add_argument('-j', '--max-jobs', type=int, default=-1) + parser.add_argument('--print-commands', action="store_true", + help="Print each individual command") + parser.add_argument('-r', '--run', action="store_true", + help="Run the command") + parser.add_argument('-l', '--logger', default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + help="Logging level: DEBUG, INFO (default), WARNING, ERROR") + args = parser.parse_args() + + numeric_level = getattr(logging, args.logger.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError("Invalid log level: {}".format(args.logger)) + logging.basicConfig(level=numeric_level) + + return args + + + +def load_variables(yaml_variables): + variable_names = [] + variables = [] + for name, values in yaml_variables.items(): + variable_names.append(name) + variables.append(values) + + return variable_names, itertools.product(*variables) + + +def load_flags(yaml_flags, args): + flags = {} + for command_flag, oar_flag in yaml_flags.items(): + if getattr(args, oar_flag): + flags[command_flag] = command_flag + else: + flags[command_flag] = "" + return flags + + +def load_job(job_name, job, var_names, var_tuples, constants, flags, precommand): + for name, value in constants.items(): + job = job.replace(f"${name}$", str(value)) + job_name = job_name.replace(f"${name}$", str(value)) + for flag, str_flag in flags.items(): + job = job.replace(f"@{flag}@", str_flag) + job_name = job_name.replace(f"@{flag}@", str_flag) + + if precommand: + job = f"{precommand} {job}" + + jobs = [] + for var_tuple in var_tuples: + new_job = job + new_job_name = job_name + for var_name, var in zip(var_names, var_tuple): + new_job = new_job.replace(f"%{var_name}%", str(var)) + new_job_name = new_job_name.replace(f"%{var_name}%", str(var)) + jobs.append((new_job_name, new_job)) + return jobs + + +def load_jobs(yaml_in, args): + yaml_jobs = yaml.load(yaml_in) + + var_names, var_tuples = load_variables(yaml_jobs["variables"]) + constants = yaml_jobs["constants"] + flags = load_flags(yaml_jobs["flags"], args) + try: + precommand = yaml_jobs["environment"]["precommand"] + except KeyError: + precommand = "" + + jobs = [] + for job_name, job in yaml_jobs["jobs"].items(): + jobs.extend(load_job(job_name, job, var_names, var_tuples, constants, flags, precommand)) + + return jobs + + +def extract_job_id(cmd_output): + for line in cmd_output.splitlines(): + if line.startswith("OAR_JOB_ID="): + return line.split('=')[1] + return None + + +def main(): + args = argparser() + + with open(args.job_file) as fin: + jobs = load_jobs(fin, args) + + anteriors = deque() + for job_name, job in jobs: + if len(anteriors) < args.max_jobs: + anterior = None + else: + anterior = anteriors.pop() + + oar_command = oargen.prepare_oarsub(args.gpu, args.host, args.core, args.time, + command=job, command_is_string=True, + name=job_name, besteffort=args.besteffort, + checkpoint=args.checkpoint, anterior=anterior) + + cmd_output = oargen.run_oarsub(oar_command, print_cmd=args.print_commands, + fake_run=not args.run, return_output=True) + if cmd_output is not None: + job_id = extract_job_id(cmd_output) + if job_id is None: + logging.warning("Job '{}' seems to have failed to launch...".format(job_name)) + else: + anteriors.appendleft(job_id) + + + +if __name__ == '__main__': + main()