Commit 6498a0b7 authored by Michael Salim's avatar Michael Salim

fixed some 2-->3 issues, running on theta

parent 95d68fc6
......@@ -3,6 +3,10 @@ from django.conf import settings
from balsam import models
import logging
logger = logging.getLogger('console')
try:
input = raw_input
except NameError:
pass
class Command(BaseCommand):
help = 'Add BalsamJob to DB'
......@@ -45,7 +49,7 @@ class Command(BaseCommand):
if options['yes']:
answer = 'yes'
else:
answer = str(raw_input(' Enter "yes" to continue:'))
answer = str(input(' Enter "yes" to continue:'))
if answer == 'yes':
app = models.ApplicationDefinition.objects.get(name=options['application'])
if app is None:
......
......@@ -17,7 +17,7 @@ class Command(BaseCommand):
for app in apps:
logger.info('About to remove App pk = ' + str(app.pk) + ' \n' + str(app))
answer = raw_input(' Enter "yes" to continue: ')
answer = input(' Enter "yes" to continue: ')
if answer == 'yes':
app.delete()
logger.info('App deleted')
......
......@@ -3,6 +3,10 @@ from django.conf import settings
from balsam import models
import logging
logger = logging.getLogger('console')
try:
input = raw_input
except NameError:
pass
class Command(BaseCommand):
help = 'Remove BalsamJobs'
......@@ -17,7 +21,7 @@ class Command(BaseCommand):
for job in jobs:
logger.info('About to delete BalsamJob pk = ' + str(job.pk) + ' \n' + str(job))
answer = raw_input(' Enter "yes" to continue: ')
answer = input(' Enter "yes" to continue: ')
if answer == 'yes':
job.delete()
logger.info('BalsamJob deleted')
......
......@@ -91,7 +91,8 @@ class Command(BaseCommand):
# first loop over jobs in transition and remove entries that are complete
for pk in jobs_in_transition_by_id.keys():
# 2-->3 bug: have to cast keys from iterator to list
for pk in list(jobs_in_transition_by_id.keys()):
proc = jobs_in_transition_by_id[pk]
if not proc.is_alive():
# did subprocess exit cleanly with exitcode == 0
......
......@@ -73,7 +73,7 @@ def preprocess(job):
if os.path.exists(app.preprocess):
stdout = run_subprocess.run_subprocess(app.preprocess)
# write stdout to log file
f = open(os.path.join(job.working_directory,app.name+'.preprocess.log.pid' + str(os.getpid())),'w')
f = open(os.path.join(job.working_directory,app.name+'.preprocess.log.pid' + str(os.getpid())),'wb')
f.write(stdout)
f.close()
job.state = PREPROCESSED.name
......@@ -160,7 +160,7 @@ def postprocess(job):
if os.path.exists(app.postprocess):
stdout = run_subprocess.run_subprocess(app.postprocess)
# write stdout to log file
f = open(os.path.join(job.working_directory,app.name+'.postprocess.log.pid' + str(os.getpid())),'w')
f = open(os.path.join(job.working_directory,app.name+'.postprocess.log.pid' + str(os.getpid())),'wb')
f.write(stdout)
f.close()
job.state = POSTPROCESSED.name
......@@ -412,7 +412,7 @@ class BalsamJob(models.Model):
cmd = app.executable + ' '
if self.config_file != '':
stdout = run_subprocess.run_subprocess(app.config_script + ' ' + self.config_file)
cmd += stdout
cmd += stdout.decode('utf-8')
return cmd
@staticmethod
......
......@@ -50,8 +50,12 @@ def submit(job,cmd):
try:
output = run_subprocess.run_subprocess(command)
output = output.strip()
try:
scheduler_id = int(output)
except ValueError:
scheduler_id = int(output.split()[-1])
logger.debug('CobaltScheduler job (pk=' + str(job.pk) + ') submitted to scheduler as job ' + str(output))
job.scheduler_id = output
job.scheduler_id = scheduler_id
except run_subprocess.SubprocessNonzeroReturnCode as e:
raise exceptions.SubmitNonZeroReturnCode('CobaltScheduler submit command returned non-zero value. command = "' + command +'", exception: ' + str(e))
except run_subprocess.SubprocessFailed as e:
......@@ -140,6 +144,8 @@ class QStat:
logger.exception(' received exception while trying to run qstat: ' + str(sys.exc_info()[1]))
stdout,stderr = p.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
logger.debug(' qstat ouput: \n' + stdout )
if p.returncode != 0:
logger.exception(' return code for qstat is non-zero. stdout = \n' + stdout + '\n stderr = \n' + stderr )
......
......@@ -113,16 +113,15 @@ class MessageInterface:
#logger.debug(str(ssl_options_dict))
# setup our connection parameters
self.parameters = pika.ConnectionParameters('localhost')
#self.parameters = pika.ConnectionParameters(
# host = self.host,
# port = self.port,
# virtual_host = self.virtual_host,
# credentials = self.credentials,
# socket_timeout = self.socket_timeout,
# ssl = True,
# ssl_options = ssl_options_dict,
# )
self.parameters = pika.ConnectionParameters(
host = self.host,
port = self.port,
virtual_host = self.virtual_host,
credentials = self.credentials,
socket_timeout = self.socket_timeout,
ssl = True,
ssl_options = ssl_options_dict,
)
def create_queue(self,name,routing_key):
# declare a random queue which this job will use to receive messages
......
#!/usr/bin/env bash
source argobalsam_env/bin/activate
source /gpfs/mira-home/msalim/argobalsam/env/bin/activate
export ARGOBALSAM_INSTALL_PATH=$(pwd)
export ARGOBALSAM_INSTALL_PATH=/gpfs/mira-home/msalim/argobalsam/src
export ARGOBALSAM_DATA_PATH=$ARGOBALSAM_INSTALL_PATH/data
export ARGOBALSAM_EXE_PATH=$ARGOBALSAM_INSTALL_PATH/exe
......@@ -50,10 +50,10 @@ BALSAM_SCHEDULER_HISTORY_EXE = '/usr/bin/'
BALSAM_SERVICE_PERIOD = 10 # seconds between service loop execution
BALSAM_MAX_QUEUED = 20 # the maximum number of jobs allowed on the local queue
BALSAM_SUBMIT_JOBS = True # submit jobs to queue, turn off when testing
BALSAM_DEFAULT_QUEUE = 'default' # default local queue name
BALSAM_DEFAULT_PROJECT = 'visualization' # default local project name
BALSAM_DEFAULT_QUEUE = 'debug-cache-quad' # default local queue name
BALSAM_DEFAULT_PROJECT = 'datascience' # default local project name
BALSAM_ALLOWED_EXECUTABLE_DIRECTORY = ALLOWED_EXE_PATH # path to allowed executables
BALSAM_SITE = 'cooley' # local balsam site name
BALSAM_SITE = 'theta' # local balsam site name
BALSAM_SCHEDULER_CLASS = 'CobaltScheduler' # local scheduler in use
BALSAM_SCHEDULER_SUBMIT_SCRIPT = os.path.join(BALSAM_ALLOWED_EXECUTABLE_DIRECTORY,'submit.sh')
BALSAM_SCHEDULER_USE_SUBMIT_SCRIPT = True
......@@ -81,7 +81,7 @@ GRIDFTP_GLOBUS_URL_COPY = os.path.join(GRIDFTP_BIN,'globus-url-copy'
GRIDFTP_PROXY_INFO = os.path.join(GRIDFTP_BIN,'grid-proxy-info')
GRIDFTP_PROXY_INIT = os.path.join(GRIDFTP_BIN,'grid-proxy-init')
GRIDFTP_PROTOCOL = 'gsiftp://'
GRIDFTP_SERVER = 'atlasgridftp02.hep.anl.gov'
GRIDFTP_SERVER = ''
#------------------------------
# RABBITMQ/PIKA CONFIG
......@@ -93,7 +93,7 @@ try:
RABBITMQ_SSL_KEY = os.environ['X509_USER_KEY']
RABBITMQ_SSL_CA_CERTS = os.environ['X509_CACERTS']
except KeyError as e:
logger.error('Environment variable undefined: ' + str(e))
#logger.error('Environment variable undefined: ' + str(e))
RABBITMQ_SSL_CERT = ''
RABBITMQ_SSL_KEY = ''
RABBITMQ_SSL_CA_CERTS = ''
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment