cli_commands.py 13.7 KB
Newer Older
1
import getpass
2
import os
3
from importlib.util import find_spec
4
import subprocess
5
import signal
6
7
import sys

8
import django
9

10
11
12
13
14
15
16
17
18
19
20
21
22
def ls_procs(keywords):
    if type(keywords) == str: keywords = [keywords]

    username = getpass.getuser()
    
    searchcmd = 'ps aux | grep '
    searchcmd += ' | grep '.join(f'"{k}"' for k in keywords) 
    grep = subprocess.Popen(searchcmd, shell=True, stdout=subprocess.PIPE)
    stdout,stderr = grep.communicate()
    stdout = stdout.decode('utf-8')

    processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
    return processes
Michael Salim's avatar
Michael Salim committed
23
24
25
26
27
28
29
30

def cmd_confirmation(message=''):
    confirm = ''
    while not confirm.lower() in ['y', 'n']:
        try:
            confirm = input(f"{message} [y/n]: ")
        except: pass
    return confirm.lower() == 'y'
31
32

def newapp(args):
33
34
35
36
37
38
39
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition
40
41
42
43
44
45
46
47
48
49
50
51
52

    def py_app_path(path):
        if not path: return path
        args = path.split()
        app = args[0]
        if not app.endswith('.py'): return path
        
        args = args[1:]
        exe = sys.executable + ' '
        fullpath = os.path.abspath(app) + ' '
        args = ' '.join(args)
        return exe + fullpath + args

Michael Salim's avatar
Michael Salim committed
53
54
    if AppDef.objects.filter(name=args.name).exists():
        raise RuntimeError(f"An application named {args.name} exists")
55
56
57
58
    
    for arg in (args.executable,args.preprocess,args.postprocess):
        paths = arg.split()
        if arg and not all(os.path.exists(p) for p in paths):
59
            raise RuntimeError(f"{paths} not found")
Michael Salim's avatar
Michael Salim committed
60
61
62
63

    app = AppDef()
    app.name = args.name
    app.description = ' '.join(args.description)
64
65
66
    app.executable = py_app_path(args.executable)
    app.default_preprocess = py_app_path(args.preprocess)
    app.default_postprocess = py_app_path(args.postprocess)
Michael Salim's avatar
Michael Salim committed
67
68
69
70
71
    app.environ_vars = ":".join(args.env)
    app.save()
    print(app)
    print("Added app to database")

72
73

def newjob(args):
74
75
76
77
78
79
80
81
82
83
84
85
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition
    BALSAM_SITE = settings.BALSAM_SITE

    if not args.allowed_site:
        args.allowed_site = [BALSAM_SITE]

Michael Salim's avatar
Michael Salim committed
86
87
88
89
90
91
92
93
94
95
96
    if not AppDef.objects.filter(name=args.application).exists():
        raise RuntimeError(f"App {args.application} not registered in local DB")

    job = Job()
    job.name = args.name
    job.description = ' '.join(args.description)
    job.workflow = args.workflow
    job.allowed_work_sites = ' '.join(args.allowed_site)

    job.wall_time_minutes = args.wall_minutes
    job.num_nodes = args.num_nodes
97
    job.ranks_per_node = args.ranks_per_node
Michael Salim's avatar
Michael Salim committed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
    job.threads_per_rank = args.threads_per_rank
    job.threads_per_core = args.threads_per_core

    job.application = args.application
    job.application_args = ' '.join(args.args)
    job.preprocess = args.preprocessor
    job.postprocess = args.postprocessor
    job.post_error_handler = args.post_handle_error
    job.post_timeout_handler = args.post_handle_timeout
    job.auto_timeout_retry = not args.disable_auto_timeout_retry
    job.input_files = ' '.join(args.input_files)

    job.stage_in_url = args.url_in
    job.stage_out_url = args.url_out
    job.stage_out_files = ' '.join(args.stage_out_files)
    job.environ_vars = ":".join(args.env)

    print(job)
    if not args.yes:
        if not cmd_confirmation('Confirm adding job to DB'):
            print("Add job aborted")
            return
    job.save()
    return job
    print("Added job to database")


def match_uniq_job(s):
126
127
128
129
130
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from balsam.service import models
    Job = models.BalsamJob

Michael Salim's avatar
Michael Salim committed
131
132
133
134
135
136
137
138
139
140
141
142
    job = Job.objects.filter(job_id__icontains=s)
    if job.count() > 1:
        raise ValueError(f"More than one ID matched {s}")
    elif job.count() == 1: return job
    
    job = Job.objects.filter(name__contains=s)
    if job.count() > 1: job = Job.objects.filter(name=s)
    if job.count() > 1: 
        raise ValueError(f"More than one Job name matches {s}")
    elif job.count() == 1: return job

    raise ValueError(f"No job in local DB matched {s}")
143
144

def newdep(args):
145
146
147
148
149
150
151
152
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition

Michael Salim's avatar
Michael Salim committed
153
154
155
    parent = match_uniq_job(args.parent)
    child = match_uniq_job(args.child)
    dag.add_dependency(parent, child)
156
    print(f"Created link {parent.first().cute_id} --> {child.first().cute_id}")
157
158

def ls(args):
159
160
161
162
163
164
165
166
167
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    import balsam.scripts.ls_commands as lscmd
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition

Michael Salim's avatar
Michael Salim committed
168
169
170
171
    objects = args.objects
    name = args.name
    history = args.history
    verbose = args.verbose
172
    state = args.state
Michael Salim's avatar
Michael Salim committed
173
174
175
176
177
    id = args.id
    tree = args.tree
    wf = args.wf

    if objects.startswith('job'):
178
        lscmd.ls_jobs(name, history, id, verbose, tree, wf, state)
Michael Salim's avatar
Michael Salim committed
179
180
181
182
    elif objects.startswith('app'):
        lscmd.ls_apps(name, id, verbose)
    elif objects.startswith('work') or objects.startswith('wf'):
        lscmd.ls_wf(name, verbose, tree, wf)
183
184

def modify(args):
185
186
187
188
189
190
191
192
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition

Michael Salim's avatar
Michael Salim committed
193
194
195
196
197
198
199
200
201
202
203
204
    if args.obj_type == 'jobs': cls = Job
    elif args.obj_type == 'apps': cls = AppDef

    item = cls.objects.filter(pk__contains=args.id)
    if item.count() == 0:
        raise RuntimeError(f"no matching {args.obj_type}")
    elif item.count() > 1:
        raise RuntimeError(f"more than one matching {args.obj_type}")
    item = item.first()

    target_type = type(getattr(item, args.attr))
    new_value = target_type(args.value)
205
    if args.attr == 'state':
206
207
208
        if item.state == 'USER_KILLED':
            print("Cannot mutate state of a killed job")
            return
209
210
211
212
        item.update_state(new_value, 'User mutated state from command line')
    else:
        setattr(item, args.attr, new_value)
        item.save()
Michael Salim's avatar
Michael Salim committed
213
214
    print(f'{args.obj_type[:-1]} {args.attr} changed to:  {new_value}')

215
216

def rm(args):
217
218
219
220
221
222
223
224
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition

Michael Salim's avatar
Michael Salim committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
    objects_name = args.objects
    name = args.name
    objid = args.id
    deleteall = args.all
    force = args.force

    # Are we removing jobs or apps?
    if objects_name.startswith('job'): cls = Job
    elif objects_name.startswith('app'): cls = AppDef
    objects = cls.objects.all()

    # Filter: all objects, by name-match (multiple), or by ID (unique)?
    if deleteall:
        deletion_objs = objects
        message = f"ALL {objects_name}"
    elif name: 
        deletion_objs = objects.filter(name__icontains=name)
        message = f"{len(deletion_objs)} {objects_name} matching name {name}"
        if not deletion_objs.exists(): 
            print("No {objects_name} matching query")
            return
    elif objid: 
        deletion_objs = objects.filter(pk__icontains=objid)
        if deletion_objs.count() > 1:
            raise RuntimeError(f"Multiple {objects_name} match ID")
        elif deletion_objs.count() == 0:
            raise RuntimeError(f"No {objects_name} match ID")
        else:
            message = f"{objects_name[:-1]} with ID matching {objid}"
    
    # User confirmation
    if not force:
        if not cmd_confirmation(f"PERMANENTLY remove {message}?"):
            print("Delete aborted")
            return

    # Actually delete things here
    for obj in deletion_objs:
263
        msg = f"Deleted {objects_name[:-1]} {obj.cute_id}"
Michael Salim's avatar
Michael Salim committed
264
        obj.delete()
265
        print(msg)
Michael Salim's avatar
Michael Salim committed
266

267
268

def qsub(args):
269
270
271
272
273
274
275
276
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob
    AppDef = models.ApplicationDefinition

Michael Salim's avatar
Michael Salim committed
277
    job = Job()
Kevin Harms's avatar
Kevin Harms committed
278
    job.name = args.name if args.name else "default"
Michael Salim's avatar
Michael Salim committed
279
280
281
282
283
284
    job.description = 'Added by balsam qsub'
    job.workflow = 'qsub'
    job.allowed_work_sites = settings.BALSAM_SITE

    job.wall_time_minutes = args.wall_minutes
    job.num_nodes = args.nodes
285
    job.ranks_per_node = args.ranks_per_node
Michael Salim's avatar
Michael Salim committed
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
    job.threads_per_rank = args.threads_per_rank
    job.threads_per_core = args.threads_per_core
    job.environ_vars = ":".join(args.env)

    job.application = ''
    job.application_args = ''
    job.preprocess = ''
    job.postprocess = ''
    job.post_error_handler = False
    job.post_timeout_handler = False
    job.auto_timeout_retry = False
    job.input_files = ''
    job.stage_in_url = ''
    job.stage_out_url = ''
    job.stage_out_files = ''
    job.direct_command = ' '.join(args.command)

    print(job)
    job.save()
    print("Added to database")
306
307

def kill(args):
308
309
310
311
312
313
314
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    from balsam.launcher import dag
    Job = models.BalsamJob

Michael Salim's avatar
Michael Salim committed
315
316
317
318
319
320
321
322
323
324
    job_id = args.id
    
    job = Job.objects.filter(job_id__startswith=job_id)
    if job.count() > 1:
        raise RuntimeError(f"More than one job matches {job_id}")
    if job.count() == 0:
        print(f"No jobs match the given ID {job_id}")

    job = job.first()

325
    if cmd_confirmation(f'Really kill job {job.name} {job.cute_id} ??'):
Michael Salim's avatar
Michael Salim committed
326
327
328
        dag.kill(job, recursive=args.recursive)
        print("Job killed")

329
330

def mkchild(args):
331
332
333
334
335
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.launcher import dag

Michael Salim's avatar
Michael Salim committed
336
337
338
339
    if not dag.current_job:
        raise RuntimeError(f"mkchild requires that BALSAM_JOB_ID is in the environment")
    child_job = newjob(args)
    dag.add_dependency(dag.current_job, child_job)
340
    print(f"Created link {dag.current_job.cute_id} --> {child_job.cute_id}")
341
342

def launcher(args):
343
    daemon = args.daemon
344
    fname = find_spec("balsam.launcher.launcher").origin
Michael Salim's avatar
Michael Salim committed
345
346
347
    original_args = sys.argv[2:]
    command = [sys.executable] + [fname] + original_args
    print("Starting Balsam launcher")
348
    p = subprocess.Popen(command)
349

350
351
    if args.daemon:
        sys.exit(0)
352
    else:
353
354
        p.wait()

355
356

def service(args):
357
358
359
360
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings

Michael Salim's avatar
Michael Salim committed
361
    print("dummy -- invoking balsam metascheduler service")
362

363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def dbserver(args):
    fname = find_spec("balsam.django_config.db_daemon").origin

    if args.stop:
        server_pids = [int(line.split()[1]) for line in ls_procs('db_daemon')]
        if not server_pids:
            print(f"No db_daemon processes running under {getpass.getuser()}")
        else:
            assert len(server_pids) == 1
            pid = server_pids[0]
            print(f"Stopping db_daemon {pid}")
            os.kill(pid, signal.SIGUSR1)
        return
    else:
        path = args.path
        if path: cmd = [sys.executable, fname, path]
        else: cmd = [sys.executable, fname]
        p = subprocess.Popen(cmd)
        print(f"Starting Balsam DB server daemon (PID: {p.pid})")

def init(args):
    from balsam.django_config.serverinfo import ServerInfo
    path = os.path.expanduser(args.path)
    if os.path.exists(path):
        if not os.path.isdir(path):
            print(f"{path} is not a directory")
            sys.exit(1)
    else:
        try: 
            os.mkdir(path, mode=0o755)
        except:
            print(f"Failed to create directory {path}")
            sys.exit(1)
        
    db_type = args.db_type
    serverinfo = ServerInfo(path)
    serverinfo.update({'db_type': db_type})

    fname = find_spec("balsam.scripts.init").origin
    p = subprocess.Popen(f'BALSAM_DB_PATH={path} {sys.executable} {fname}',
                     shell=True)
    p.wait()


407
def make_dummies(args):
408
409
410
411
412
413
    os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
    django.setup()
    from django.conf import settings
    from balsam.service import models
    Job = models.BalsamJob

414
415
416
417
418
419
420
    for i in range(args.num):
        job = Job()
        job.name = f'dummy{i}'
        job.description = 'Added by balsam make_dummies'
        job.workflow = 'dummy'
        job.allowed_work_sites = settings.BALSAM_SITE

421
        job.wall_time_minutes = 0
422
        job.num_nodes = 1
423
        job.ranks_per_node = 1
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
        job.threads_per_rank = 1
        job.threads_per_core = 1
        job.environ_vars = ""

        job.application = ''
        job.application_args = ''
        job.preprocess = ''
        job.postprocess = ''
        job.post_error_handler = False
        job.post_timeout_handler = False
        job.auto_timeout_retry = False
        job.input_files = ''
        job.stage_in_url = ''
        job.stage_out_url = ''
        job.stage_out_files = ''
        job.direct_command = 'echo hello'

        job.save()
    print(f"Added {args.num} dummy jobs to the DB")