Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
AIG-public
Cobalt
Commits
4b30c9f4
Commit
4b30c9f4
authored
Aug 23, 2017
by
Paul Rich
Browse files
Merge branch 'develop' into 93-enh-ssd-request
parents
29a2f27e
dadabcb6
Changes
5
Hide whitespace changes
Inline
Side-by-side
CHANGES
View file @
4b30c9f4
= Changes from previous Cobalt Versions =
== Changes to 1.0.21 ==
* Fixed numerous locking issues in job startup that could result in a small chance
of failed job initialization.
* Fixed an issue on Cray systems where brief network disruptions on service nodes
can cause a loss of contact with a forker resulting in a loss of the running jobs.
* Added additional logging for ProxyErrors. The original trace/error message should
now be presented to the caller in the event of a ProtocolError.
* Fixed an issue in the cdbwriter where an error in writing job attributes could
cause a recurring error loop preventing database updates.
== Changes to 1.0.20 ==
* Fix for certain IO error messages that were ambiguous.
* Fix for an issue where attrs on a job are changed as a part of a filter script while
using qmove that can cause attrs to become a string instead of a dictionary and
disrupt scheduling on a queue.
== Changes to 1.0.19 ==
* Hotfix for issue preventing rereservations for interactive jobs from being detected.
...
...
@@ -12,7 +28,7 @@
jobs and reservations to be used with tools like pdsh
== Changes to 1.0.17 ==
* Added queue and project information to envi
o
rnment in Cray jobs.
* Added queue and project information to envir
o
nment in Cray jobs.
== Changes to 1.0.15 ==
* Fix for an issue on Cray systems where Cobalt was not properly passing
...
...
src/lib/Components/DBWriter/cdbwriter.py
View file @
4b30c9f4
...
...
@@ -162,7 +162,7 @@ class MessageQueue(Component):
logging
.
debug
(
traceback
.
format_exc
())
self
.
msg_queue
.
pop
(
0
)
except
:
logger
.
error
(
"Error updating databse. Unable to add message.
"
)
logger
.
error
(
"Error updating databse. Unable to add message.
%s"
,
msg
)
logging
.
debug
(
traceback
.
format_exc
())
self
.
connected
=
False
#if we were clearing an overflow, here we go again.
...
...
@@ -872,7 +872,7 @@ class no_pk_dao(db2util.dao):
invalidFields
=
record
.
invalidFields
()
if
invalidFields
:
raise
adapterError
(
"Validation error prior to insert.
\n\n
Table: %s
\n\n
Field(s): %s
\n
"
%
(
record
.
fqtn
,
str
(
invalidFields
)))
raise
db2util
.
adapterError
(
"Validation error prior to insert.
\n\n
Table: %s
\n\n
Field(s): %s
\n
"
%
(
record
.
fqtn
,
str
(
invalidFields
)))
insertSQL
=
"insert into %s (%s) values (%s)"
%
(
self
.
table
.
fqName
,
...
...
src/lib/Components/system/CraySystem.py
View file @
4b30c9f4
"""Resource management for Cray ALPS based systems"""
import
logging
import
threading
import
thread
...
...
@@ -1255,57 +1254,58 @@ class CraySystem(BaseSystem):
Invokes a forker component to run a user script. In the event of a
fatal startup error, will release resource reservations.
Note:
Process Group startup and intialization holds the process group data lock.
'''
start_apg_timer
=
int
(
time
.
time
())
for
spec
in
specs
:
spec
[
'forker'
]
=
None
alps_res
=
self
.
alps_reservations
.
get
(
str
(
spec
[
'jobid'
]),
None
)
if
alps_res
is
not
None
:
spec
[
'alps_res_id'
]
=
alps_res
.
alps_res_id
new_pgroups
=
self
.
process_manager
.
init_groups
(
specs
)
for
pgroup
in
new_pgroups
:
_logger
.
info
(
'%s: process group %s created to track job status'
,
pgroup
.
label
,
pgroup
.
id
)
#check resource reservation, and attempt to start. If there's a
#failure here, set exit status in process group to a sentinel value.
try
:
started
=
self
.
process_manager
.
start_groups
([
pgroup
.
id
])
except
ComponentLookupError
:
_logger
.
error
(
"%s: failed to contact the %s component"
,
pgroup
.
label
,
pgroup
.
forker
)
#this should be reraised and the queue-manager handle it
#that would allow re-requesting the run instead of killing the
#job --PMR
except
xmlrpclib
.
Fault
:
_logger
.
error
(
"%s: a fault occurred while attempting to start "
"the process group using the %s component"
,
pgroup
.
label
,
pgroup
.
forker
)
pgroup
.
exit_status
=
255
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
except
Exception
:
_logger
.
error
(
"%s: an unexpected exception occurred while "
"attempting to start the process group using the %s "
"component; releasing resources"
,
pgroup
.
label
,
pgroup
.
forker
,
exc_info
=
True
)
pgroup
.
exit_status
=
255
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
else
:
if
started
is
not
None
and
started
!=
[]:
_logger
.
info
(
'%s: Process Group %s started successfully.'
,
pgroup
.
label
,
pgroup
.
id
)
else
:
_logger
.
error
(
'%s: Process Group startup failed. Aborting.'
,
pgroup
.
label
)
start_apg_timer
=
time
.
time
()
with
self
.
process_manager
.
process_groups_lock
:
for
spec
in
specs
:
spec
[
'forker'
]
=
None
alps_res
=
self
.
alps_reservations
.
get
(
str
(
spec
[
'jobid'
]),
None
)
if
alps_res
is
not
None
:
spec
[
'alps_res_id'
]
=
alps_res
.
alps_res_id
new_pgroups
=
self
.
process_manager
.
init_groups
(
specs
)
for
pgroup
in
new_pgroups
:
_logger
.
info
(
'%s: process group %s created to track job status'
,
pgroup
.
label
,
pgroup
.
id
)
#check resource reservation, and attempt to start. If there's a
#failure here, set exit status in process group to a sentinel value.
try
:
started
=
self
.
process_manager
.
start_groups
([
pgroup
.
id
])
except
ComponentLookupError
:
_logger
.
error
(
"%s: failed to contact the %s component"
,
pgroup
.
label
,
pgroup
.
forker
)
#this should be reraised and the queue-manager handle it
#that would allow re-requesting the run instead of killing the
#job --PMR
except
xmlrpclib
.
Fault
:
_logger
.
error
(
"%s: a fault occurred while attempting to start "
"the process group using the %s component"
,
pgroup
.
label
,
pgroup
.
forker
)
pgroup
.
exit_status
=
255
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
end_apg_timer
=
int
(
time
.
time
())
self
.
logger
.
debug
(
"add_process_groups startup time: %s sec"
,
(
end_apg_timer
-
start_apg_timer
))
except
Exception
:
_logger
.
error
(
"%s: an unexpected exception occurred while "
"attempting to start the process group using the %s "
"component; releasing resources"
,
pgroup
.
label
,
pgroup
.
forker
,
exc_info
=
True
)
pgroup
.
exit_status
=
255
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
else
:
if
started
is
not
None
and
started
!=
[]:
_logger
.
info
(
'%s: Process Group %s started successfully.'
,
pgroup
.
label
,
pgroup
.
id
)
else
:
_logger
.
error
(
'%s: Process Group startup failed. Aborting.'
,
pgroup
.
label
)
pgroup
.
exit_status
=
255
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
end_apg_timer
=
time
.
time
()
self
.
logger
.
debug
(
"add_process_groups startup time: %s sec"
,
(
end_apg_timer
-
start_apg_timer
))
return
new_pgroups
@
exposed
...
...
@@ -1350,11 +1350,12 @@ class CraySystem(BaseSystem):
is called.
'''
completed_pgs
=
self
.
process_manager
.
update_groups
()
for
pgroup
in
completed_pgs
:
_logger
.
info
(
'%s: process group reported as completed with status %s'
,
pgroup
.
label
,
pgroup
.
exit_status
)
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
with
self
.
process_manager
.
process_groups_lock
:
completed_pgs
=
self
.
process_manager
.
update_groups
()
for
pgroup
in
completed_pgs
:
_logger
.
info
(
'%s: process group reported as completed with status %s'
,
pgroup
.
label
,
pgroup
.
exit_status
)
self
.
reserve_resources_until
(
pgroup
.
location
,
None
,
pgroup
.
jobid
)
return
@
exposed
...
...
src/lib/Components/system/base_pg_manager.py
View file @
4b30c9f4
...
...
@@ -2,7 +2,6 @@
"""
import
logging
import
time
import
Queue
...
...
@@ -90,16 +89,17 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
# modify the forker in specs to force the job to round-robbin forkers
for
spec
in
specs
:
ordered_forkers
=
[
f
[
0
]
for
f
in
sorted
(
self
.
forker_taskcounts
.
items
(),
key
=
lambda
x
:
x
[
1
])]
if
len
(
ordered_forkers
)
<
0
:
raise
RuntimeError
(
"No forkers registered!"
)
else
:
spec
[
'forker'
]
=
ordered_forkers
[
0
]
#this is now a tuple
self
.
forker_taskcounts
[
spec
[
'forker'
]]
+=
1
_logger
.
info
(
"Job %s using forker %s"
,
spec
[
'jobid'
],
spec
[
'forker'
])
return
self
.
process_groups
.
q_add
(
specs
)
with
self
.
process_groups_lock
:
for
spec
in
specs
:
ordered_forkers
=
[
f
[
0
]
for
f
in
sorted
(
self
.
forker_taskcounts
.
items
(),
key
=
lambda
x
:
x
[
1
])]
if
len
(
ordered_forkers
)
<
0
:
raise
RuntimeError
(
"No forkers registered!"
)
else
:
spec
[
'forker'
]
=
ordered_forkers
[
0
]
#this is now a tuple
self
.
forker_taskcounts
[
spec
[
'forker'
]]
+=
1
_logger
.
info
(
"Job %s using forker %s"
,
spec
[
'jobid'
],
spec
[
'forker'
])
return
self
.
process_groups
.
q_add
(
specs
)
def
signal_groups
(
self
,
pgids
,
signame
=
"SIGINT"
):
'''Send signal with signame to a list of process groups.
...
...
@@ -133,16 +133,17 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Start process groups. Return groups that succeeded startup.
'''
started
=
[]
for
pg_id
in
pgids
:
try
:
self
.
process_groups
[
pg_id
].
start
()
except
ProcessGroupStartupError
:
_logger
.
error
(
"%s: Unable to start process group."
,
self
.
process_groups
[
pg_id
].
label
)
else
:
started
.
append
(
pg_id
)
self
.
process_groups
[
pg_id
].
startup_timeout
=
0
with
self
.
process_groups_lock
:
started
=
[]
for
pg_id
in
pgids
:
try
:
self
.
process_groups
[
pg_id
].
start
()
except
ProcessGroupStartupError
:
_logger
.
error
(
"%s: Unable to start process group."
,
self
.
process_groups
[
pg_id
].
label
)
else
:
started
.
append
(
pg_id
)
self
.
process_groups
[
pg_id
].
startup_timeout
=
0
return
started
#make automatic get final status of process group
...
...
@@ -157,76 +158,80 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
completed
=
{}
orphaned
=
[]
completed_pgs
=
[]
now
=
int
(
time
.
time
())
for
forker
in
self
.
forkers
:
completed
[
forker
]
=
[]
try
:
child_data
=
ComponentProxy
(
forker
).
get_children
(
"process group"
,
None
)
except
ComponentLookupError
,
e
:
_logger
.
error
(
"failed to contact the %s component to obtain a list of children"
,
forker
)
except
:
_logger
.
error
(
"unexpected exception while getting a list of children from the %s component"
,
forker
,
exc_info
=
True
)
else
:
for
child
in
child_data
:
children
[(
forker
,
child
[
'id'
])]
=
child
#clean up orphaned process groups
for
pg
in
self
.
process_groups
.
values
():
if
now
<
pg
.
startup_timeout
:
#wait for startup timeout. We don't want any hasty kills
continue
pg_id
=
pg
.
id
child_uid
=
(
pg
.
forker
,
pg
.
head_pid
)
if
child_uid
not
in
children
:
if
pg
.
mode
==
'interactive'
:
#interactive job, there is no child job
if
pg
.
interactive_complete
:
completed_pgs
.
append
(
pg
)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned
.
append
(
pg_id
)
continue
orphaned
.
append
(
pg_id
)
_logger
.
warning
(
'%s: orphaned job exited with unknown status'
,
pg
.
jobid
)
pg
.
exit_status
=
1234567
completed_pgs
.
append
(
pg
)
else
:
children
[
child_uid
][
'found'
]
=
True
pg
.
update_data
(
children
[
child_uid
])
if
pg
.
exit_status
is
not
None
:
_logger
.
info
(
'%s: job exited with status %s'
,
pg
.
jobid
,
pg
.
exit_status
)
completed
[
pg
.
forker
].
append
(
children
[
child_uid
][
'id'
])
completed_pgs
.
append
(
pg
)
#check for children without process groups and clean
for
forker
,
child_id
in
children
.
keys
():
if
not
children
[(
forker
,
child_id
)].
has_key
(
'found'
):
completed
[
forker
].
append
(
child_id
)
#clear completed
for
forker
in
completed
:
if
not
completed
[
forker
]
==
[]:
# Hold for update. Process group addition also results in a forker call, so we need to lock that, too
# so we have a consistient view
with
self
.
process_groups_lock
:
now
=
int
(
time
.
time
())
for
forker
in
self
.
forkers
:
try
:
ComponentProxy
(
forker
).
cleanup_children
(
completed
[
forker
])
except
ComponentLookupError
:
_logger
.
error
(
"failed to contact the %s component to cleanup children"
,
forker
)
except
Exception
:
_logger
.
error
(
"unexpected exception while requesting that the %s component perform cleanup"
,
forker
,
exc_info
=
True
)
#Send any needed SIGKILLs for children that have been sent a SIGINT.
for
pg
in
self
.
process_groups
.
values
():
if
(
pg
.
sigkill_timeout
is
not
None
and
now
>=
pg
.
sigkill_timeout
and
pg
.
exit_status
is
None
):
pg
.
signal
(
'SIGKILL'
)
# clear out the orphaned groups. There is no backend data for these
# groups. CQM shouldn't get anything back for these beyond tracking is
# lost.
self
.
cleanup_groups
(
orphaned
)
#return the exited process groups so we can invoke cleanup
child_data
=
ComponentProxy
(
forker
).
get_children
(
"process group"
,
None
)
except
ComponentLookupError
,
e
:
_logger
.
error
(
"failed to contact the %s component to obtain a list of children"
,
forker
)
except
:
_logger
.
error
(
"unexpected exception while getting a list of children from the %s component"
,
forker
,
exc_info
=
True
)
else
:
completed
[
forker
]
=
[]
for
child
in
child_data
:
children
[(
forker
,
child
[
'id'
])]
=
child
#clean up orphaned process groups
for
pg
in
self
.
process_groups
.
values
():
if
pg
.
forker
in
completed
:
if
now
<
pg
.
startup_timeout
:
#wait for startup timeout. We don't want any hasty kills
continue
pg_id
=
pg
.
id
child_uid
=
(
pg
.
forker
,
pg
.
head_pid
)
if
child_uid
not
in
children
:
if
pg
.
mode
==
'interactive'
:
#interactive job, there is no child job
if
pg
.
interactive_complete
:
completed_pgs
.
append
(
pg
)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned
.
append
(
pg_id
)
continue
orphaned
.
append
(
pg_id
)
_logger
.
warning
(
'%s: orphaned job exited with unknown status'
,
pg
.
jobid
)
pg
.
exit_status
=
1234567
completed_pgs
.
append
(
pg
)
else
:
children
[
child_uid
][
'found'
]
=
True
pg
.
update_data
(
children
[
child_uid
])
if
pg
.
exit_status
is
not
None
:
_logger
.
info
(
'%s: job exited with status %s'
,
pg
.
jobid
,
pg
.
exit_status
)
completed
[
pg
.
forker
].
append
(
children
[
child_uid
][
'id'
])
completed_pgs
.
append
(
pg
)
#check for children without process groups and clean
for
forker
,
child_id
in
children
.
keys
():
if
not
children
[(
forker
,
child_id
)].
has_key
(
'found'
):
completed
[
forker
].
append
(
child_id
)
#clear completed
for
forker
in
completed
:
if
not
completed
[
forker
]
==
[]:
try
:
ComponentProxy
(
forker
).
cleanup_children
(
completed
[
forker
])
except
ComponentLookupError
:
_logger
.
error
(
"failed to contact the %s component to cleanup children"
,
forker
)
except
Exception
:
_logger
.
error
(
"unexpected exception while requesting that the %s component perform cleanup"
,
forker
,
exc_info
=
True
)
#Send any needed SIGKILLs for children that have been sent a SIGINT.
for
pg
in
self
.
process_groups
.
values
():
if
(
pg
.
sigkill_timeout
is
not
None
and
now
>=
pg
.
sigkill_timeout
and
pg
.
exit_status
is
None
):
pg
.
signal
(
'SIGKILL'
)
# clear out the orphaned groups. There is no backend data for these
# groups. CQM shouldn't get anything back for these beyond tracking is
# lost.
self
.
cleanup_groups
(
orphaned
)
#return the exited process groups so we can invoke cleanup
return
completed_pgs
...
...
@@ -234,13 +239,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Clean up process group data from completed and logged process groups.
'''
cleaned_groups
=
[]
for
pg_id
in
pgids
:
pg
=
self
.
process_groups
[
pg_id
]
cleaned_groups
.
append
(
pg
)
self
.
forker_taskcounts
[
pg
.
forker
]
-=
1
del
self
.
process_groups
[
pg_id
]
_logger
.
info
(
'%s Process Group deleted'
,
pg
.
label
)
with
self
.
process_groups_lock
:
cleaned_groups
=
[]
for
pg_id
in
pgids
:
pg
=
self
.
process_groups
[
pg_id
]
cleaned_groups
.
append
(
pg
)
self
.
forker_taskcounts
[
pg
.
forker
]
-=
1
del
self
.
process_groups
[
pg_id
]
_logger
.
info
(
'%s Process Group deleted'
,
pg
.
label
)
return
cleaned_groups
def
select_ssh_host
(
self
):
...
...
src/lib/Proxy.py
View file @
4b30c9f4
...
...
@@ -70,7 +70,7 @@ class RetryMethod(_Method):
return
retval
except
xmlrpclib
.
ProtocolError
,
err
:
log
.
error
(
"Server failure: Protocol Error: %s %s"
%
\
(
err
.
errcode
,
err
.
errmsg
))
(
err
.
errcode
,
err
.
errmsg
)
,
exc_info
=
1
)
raise
xmlrpclib
.
Fault
(
20
,
"Server Failure"
)
except
xmlrpclib
.
Fault
as
fault
:
raise
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment