Commit 13d7915a authored by Michael Salim's avatar Michael Salim
Browse files

cooley workaround: qstat does not work on compute nodes!

parent c3cbf3c1
......@@ -65,9 +65,12 @@ def qstat(scheduler_id, attrs):
qstat_cmd = f"{exe} {scheduler_id}"
os.environ['QSTAT_HEADER'] = ':'.join(attrs)
p = subprocess.Popen(shlex.split(qstat_cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
try:
p = subprocess.Popen(shlex.split(qstat_cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except OSError:
raise JobStatusFailed(f"could not execute {qstat_cmd}")
stdout, _ = p.communicate()
stdout = stdout.decode('utf-8')
......
......@@ -57,7 +57,7 @@ class Scheduler:
currently inside a scheduled job'''
environment = {}
for generic_name, specific_var in self.SCHEDULER_VARIABLES.items():
environment[generic_name] = os.environ.get([specific_var], None)
environment[generic_name] = os.environ.get(specific_var, None)
if environment['id']:
self.current_scheduler_id = environment['id']
......@@ -87,11 +87,16 @@ class Scheduler:
sched_id = self.current_scheduler_id
if sched_id is None:
return float("inf")
info = self.get_status(sched_id)
self.remaining_seconds = info['time_remaining_sec']
self.last_check_seconds = time.time()
logger.debug(f"{self.remaining_seconds} seconds remaining")
return self.remaining_seconds
try:
info = self.get_status(sched_id)
except JobStatusFailed:
logger.warning("Could not get remaining time from scheduler")
return float("inf")
else:
self.remaining_seconds = info['time_remaining_sec']
self.last_check_seconds = time.time()
logger.debug(f"{self.remaining_seconds} seconds remaining")
return self.remaining_seconds
def submit(self, job, cmd):
logger.info(f"Submit {job.cute_id} {cmd} ({self.__class__})")
......
......@@ -39,6 +39,11 @@ def delay(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = now + tosleep + period
yield
def elapsed_time_minutes():
start = time.time()
while True:
yield (time.time() - start) / 60.0
def sufficient_time(job):
return 60*job.wall_time_minutes < scheduler.remaining_time_seconds()
......@@ -67,8 +72,12 @@ def create_new_runners(jobs, runner_group, worker_group):
def main(args, transition_pool, runner_group, job_source):
delay_timer = delay()
if args.time_limit_minutes > 0:
timeout = lambda : elapsed_time_minutes() >= args.time_limit_minutes
else:
timeout = lambda : scheduler.remaining_time_seconds() <= 0.0
while not scheduler.remaining_time_seconds() <= 0.0:
while not timeout():
logger.debug("\n************\nSERVICE LOOP\n************")
wait = True
......@@ -124,7 +133,7 @@ def get_args(inputcmd=None):
parser.add_argument('--nodes-per-worker', type=int, default=1)
parser.add_argument('--max-ranks-per-node', type=int, default=1,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int,
parser.add_argument('--time-limit-minutes', type=int, default=0,
help="Provide a walltime limit if not already imposed")
parser.add_argument('--daemon', action='store_true')
if inputcmd:
......
......@@ -75,7 +75,7 @@ class COOLEYMPICommand(DEFAULTMPICommand):
self.mpi = 'mpirun'
self.nproc = '-n'
self.ppn = '--ppn'
self.env = '-e'
self.env = '--env'
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
......
......@@ -76,7 +76,7 @@ class WorkerGroup:
data = open(self.workers_file).read()
splitter = ',' if ',' in data else None
node_ids = data.split(splitter)
self.workers_str = " ".join(worker_ids)
self.workers_str = " ".join(node_ids)
for id in node_ids:
self.workers.append(Worker(id, host_type='COOLEY', num_nodes=1,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment