Skip to content
Snippets Groups Projects
Commit 61528e9e authored by Jeremy Auguste's avatar Jeremy Auguste
Browse files

Added batchoar script (WIP)

parent b44989bc
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# coding: utf-8
import argparse
import yaml
import itertools
import subprocess
import logging
from collections import deque
from . import oargen
def argparser():
parser = argparse.ArgumentParser()
parser.add_argument('job_file',
help="YAML file which contains all the jobs to launch")
parser.add_argument('destination',
help="New directory to create used to store outputs")
parser.add_argument('-b', '--besteffort', action="store_true",
help="Launch job in besteffort mode")
parser.add_argument('-t', '--time', default="10",
help="Default maximum duration of the jobs (format: h[:m[:s]])")
parser.add_argument('-g', '--gpu', action="store_true",
help="If True, reserves only cores with GPUs")
parser.add_argument('-c', '--core', default=1, type=int,
help="Number of cores to reserve")
parser.add_argument('-H', '--host',
help="Name of the host (SQL LIKE syntax accepted)")
parser.add_argument('-i', '--interactive', action="store_true",
help="Launch job in interactive mode")
parser.add_argument('-C', '--checkpoint', type=int, metavar="SECONDS",
help="Enable checkpoint signals with the given delay (in seconds)")
parser.add_argument('-j', '--max-jobs', type=int, default=-1)
parser.add_argument('--print-commands', action="store_true",
help="Print each individual command")
parser.add_argument('-r', '--run', action="store_true",
help="Run the command")
parser.add_argument('-l', '--logger', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help="Logging level: DEBUG, INFO (default), WARNING, ERROR")
args = parser.parse_args()
numeric_level = getattr(logging, args.logger.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: {}".format(args.logger))
logging.basicConfig(level=numeric_level)
return args
def load_variables(yaml_variables):
variable_names = []
variables = []
for name, values in yaml_variables.items():
variable_names.append(name)
variables.append(values)
return variable_names, itertools.product(*variables)
def load_flags(yaml_flags, args):
flags = {}
for command_flag, oar_flag in yaml_flags.items():
if getattr(args, oar_flag):
flags[command_flag] = command_flag
else:
flags[command_flag] = ""
return flags
def load_job(job_name, job, var_names, var_tuples, constants, flags, precommand):
for name, value in constants.items():
job = job.replace(f"${name}$", str(value))
job_name = job_name.replace(f"${name}$", str(value))
for flag, str_flag in flags.items():
job = job.replace(f"@{flag}@", str_flag)
job_name = job_name.replace(f"@{flag}@", str_flag)
if precommand:
job = f"{precommand} {job}"
jobs = []
for var_tuple in var_tuples:
new_job = job
new_job_name = job_name
for var_name, var in zip(var_names, var_tuple):
new_job = new_job.replace(f"%{var_name}%", str(var))
new_job_name = new_job_name.replace(f"%{var_name}%", str(var))
jobs.append((new_job_name, new_job))
return jobs
def load_jobs(yaml_in, args):
yaml_jobs = yaml.load(yaml_in)
var_names, var_tuples = load_variables(yaml_jobs["variables"])
constants = yaml_jobs["constants"]
flags = load_flags(yaml_jobs["flags"], args)
try:
precommand = yaml_jobs["environment"]["precommand"]
except KeyError:
precommand = ""
jobs = []
for job_name, job in yaml_jobs["jobs"].items():
jobs.extend(load_job(job_name, job, var_names, var_tuples, constants, flags, precommand))
return jobs
def extract_job_id(cmd_output):
for line in cmd_output.splitlines():
if line.startswith("OAR_JOB_ID="):
return line.split('=')[1]
return None
def main():
args = argparser()
with open(args.job_file) as fin:
jobs = load_jobs(fin, args)
anteriors = deque()
for job_name, job in jobs:
if len(anteriors) < args.max_jobs:
anterior = None
else:
anterior = anteriors.pop()
oar_command = oargen.prepare_oarsub(args.gpu, args.host, args.core, args.time,
command=job, command_is_string=True,
name=job_name, besteffort=args.besteffort,
checkpoint=args.checkpoint, anterior=anterior)
cmd_output = oargen.run_oarsub(oar_command, print_cmd=args.print_commands,
fake_run=not args.run, return_output=True)
if cmd_output is not None:
job_id = extract_job_id(cmd_output)
if job_id is None:
logging.warning("Job '{}' seems to have failed to launch...".format(job_name))
else:
anteriors.appendleft(job_id)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment