Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
AIG-public
Cobalt
Commits
03e7c0d9
Commit
03e7c0d9
authored
Nov 07, 2016
by
Paul Rich
Browse files
Merge branch 'cray-merge-tests' into 'develop'
Test updates for Cray Port See merge request !11
parents
d7151590
eb6d217b
Changes
6
Expand all
Hide whitespace changes
Inline
Side-by-side
testsuite/TestCobalt/TestComponents/test_cqm.py
View file @
03e7c0d9
...
...
@@ -40,6 +40,8 @@ import types
import
unittest
import
xmlrpclib
from
mock
import
patch
import
Cobalt.Components.cqm
from
Cobalt.Components.base
import
Component
,
exposed
,
automatic
,
query
from
Cobalt.Components.cqm
import
QueueManager
,
Signal_Map
...
...
@@ -3594,3 +3596,45 @@ class TestCQMIntegration (CQMIntegrationTestBase):
del
self
.
taskman
CQMIntegrationTestBase
.
teardown
(
self
)
class
TestJobStatuses
(
object
):
states
=
{
'Ready'
:
(
'queued'
,
'Q'
),
'Preempted'
:
(
'preempted'
,
'P'
),
'Job_Prologue'
:
(
'starting'
,
'R'
),
'Job_Prologue_Retry'
:
(
'starting'
,
'R'
),
'Resource_Prologue'
:(
'starting'
,
'R'
),
'Resource_Prologue_Retry'
:
(
'starting'
,
'R'
),
'Run_Retry'
:
(
'starting'
,
'R'
),
'Running'
:
(
'running'
,
'R'
),
'Kill_Retry'
:
(
'killing'
,
'K'
),
'Killing'
:
(
'killing'
,
'K'
),
'Preempt_Retry'
:
(
'preempting'
,
'P'
),
'Preempting'
:
(
'preempting'
,
'P'
),
'Preempt_Finalize_Retry'
:
(
'preempting'
,
'P'
),
'Preempt_Epilogue'
:
(
'preempting'
,
'P'
),
'Job_Prologue_Retry_Release'
:
(
'exiting'
,
'E'
),
'Resource_Prologue_Retry_Release'
:
(
'exiting'
,
'E'
),
'Finalize_Retry'
:
(
'exiting'
,
'E'
),
'Resource_Epilogue'
:
(
'exiting'
,
'E'
),
'Resource_Epilogue_Retry'
:
(
'exiting'
,
'E'
),
'Job_Epilogue'
:
(
'exiting'
,
'E'
),
'Job_Epilogue_Retry'
:
(
'exiting'
,
'E'
),
'Terminal'
:
(
'done'
,
'E'
)
}
def
setup
(
self
):
spec
=
{}
self
.
job
=
Cobalt
.
Components
.
cqm
.
Job
(
spec
)
def
teardown
(
self
):
del
self
.
job
def
test_display_states
(
self
):
for
state
,
printable_status
in
self
.
states
.
items
():
with
patch
.
object
(
Cobalt
.
Components
.
cqm
.
Job
,
'_sm_state'
,
state
):
assert
self
.
job
.
state
==
printable_status
[
0
],
(
"Bad State %s: Expected %s, got %s"
%
(
state
,
printable_status
[
0
],
self
.
job
.
state
))
assert
self
.
job
.
short_state
==
printable_status
[
1
],
(
"Bad State %s: Expected %s, got %s"
%
(
state
,
printable_status
[
1
],
self
.
job
.
short_state
))
testsuite/TestCobalt/TestComponents/test_cray.py
0 → 100644
View file @
03e7c0d9
This diff is collapsed.
Click to expand it.
testsuite/TestCobalt/TestComponents/test_process_manager.py
0 → 100644
View file @
03e7c0d9
'''Process Manager for cluster/cray systems tests'''
import
time
import
logging
import
sys
from
mock
import
Mock
,
MagicMock
,
patch
import
Cobalt.Proxy
from
Cobalt.Components.system.base_pg_manager
import
ProcessGroupManager
default_child_data
=
[{
'id'
:
1
}]
def
fake_forker
(
*
args
,
**
kwargs
):
print
args
print
kwargs
raise
RuntimeError
(
'boom'
)
#return 1
class
InspectMock
(
MagicMock
):
'''allow us to inspect what is going on within a proxy call'''
def
__getattr__
(
self
,
attr
):
if
attr
==
'get_children'
:
return
MagicMock
(
return_value
=
[{
'id'
:
1
}])
elif
attr
==
'fork'
:
return
MagicMock
(
return_value
=
1
)
return
super
(
InspectMock
,
self
).
__getattr__
(
attr
)
class
TestProcessManager
(
object
):
'''tests for the base project manager'''
def
setup
(
self
):
'''common setup for process group tests'''
self
.
base_spec
=
{
'args'
:[
'arg1'
,
'arg2'
],
'user'
:
'frodo'
,
'jobid'
:
1
,
'executable'
:
'job.exe'
,
'size'
:
2
,
'cwd'
:
'/home/frodo'
,
'location'
:
'loc1'
}
self
.
process_manager
=
ProcessGroupManager
()
self
.
process_manager
.
forkers
=
[
'forker1'
]
self
.
process_manager
.
forker_taskcounts
=
{
'forker1'
:
0
}
def
teardown
(
self
):
'''common teardown for process group tests'''
del
self
.
base_spec
del
self
.
process_manager
def
test_process_manager_init_groups_single
(
self
):
'''ProcessGroupManager.init_groups: create a process group and add to process manager'''
specs
=
[
self
.
base_spec
]
self
.
process_manager
.
init_groups
(
specs
)
assert
self
.
process_manager
.
process_groups
.
get
(
1
,
None
)
is
not
None
,
"process group not created"
assert
self
.
process_manager
.
process_groups
[
1
].
forker
==
'forker1'
,
"forker not set"
@
patch
.
object
(
Cobalt
.
Proxy
.
DeferredProxyMethod
,
'__call__'
,
return_value
=
1
)
def
test_process_manager_start_groups_single
(
self
,
*
args
,
**
kwargs
):
'''ProcessGroupManager.start_groups: start up a single process group'''
self
.
base_spec
[
'startup_timeout'
]
=
120
self
.
process_manager
.
init_groups
([
self
.
base_spec
])
started
=
self
.
process_manager
.
start_groups
([
1
])
assert
len
(
started
)
==
1
,
"started %s groups, should have started 1"
%
len
(
started
)
assert
sorted
(
started
)
==
[
1
],
"wrong groups started."
assert
self
.
process_manager
.
process_groups
[
1
].
startup_timeout
==
0
,
(
"startup_timeout not reset"
)
@
patch
(
'Cobalt.Proxy.DeferredProxy'
,
side_effect
=
InspectMock
)
def
test_process_manager_update_groups_timeout
(
self
,
*
args
,
**
kwargs
):
'''ProcessGroupManager.update_groups: startup timeout respected.'''
now
=
int
(
time
.
time
())
pgroups
=
self
.
process_manager
.
process_groups
self
.
process_manager
.
init_groups
([
self
.
base_spec
])
pgroups
[
1
].
startup_timeout
=
120
+
now
self
.
process_manager
.
update_groups
()
pgroups
=
self
.
process_manager
.
process_groups
assert
len
(
pgroups
)
==
1
,
"%s groups, should have 1"
%
len
(
pgroups
)
assert
sorted
(
pgroups
.
keys
())
==
[
1
],
"wrong groups."
assert
pgroups
[
1
].
startup_timeout
==
now
+
120
,
(
"bad startup timeout: %s"
%
pgroups
[
1
].
startup_timeout
)
@
patch
(
'Cobalt.Proxy.DeferredProxy'
,
side_effect
=
InspectMock
)
def
test_process_manager_update_groups_timeout_exceeded
(
self
,
*
args
,
**
kwargs
):
'''ProcessGroupManager.update_groups: startup timeout exceeded.'''
now
=
int
(
time
.
time
())
pgroups
=
self
.
process_manager
.
process_groups
self
.
process_manager
.
init_groups
([
self
.
base_spec
])
pgroups
[
1
].
startup_timeout
=
now
-
120
self
.
process_manager
.
update_groups
()
pgroups
=
self
.
process_manager
.
process_groups
assert
len
(
pgroups
)
==
0
,
"%s groups, should have 0"
%
len
(
pgroups
)
assert
sorted
(
pgroups
.
keys
())
==
[],
"groups should be empty"
testsuite/TestCobalt/TestComponents/test_processgroups.py
0 → 100644
View file @
03e7c0d9
'''Tests for base ProcessGroup class and actions'''
from
nose.tools
import
raises
from
mock
import
Mock
,
MagicMock
,
patch
import
Cobalt.Exceptions
from
Cobalt.DataTypes.ProcessGroup
import
ProcessGroup
from
Cobalt.Proxy
import
ComponentProxy
mock_proxy
=
MagicMock
()
class
TestProcessGroup
(
object
):
'''Group together process group tests, and apply common setup'''
def
setup
(
self
):
'''common setup for process group tests'''
self
.
base_spec
=
{
'args'
:[
'arg1'
,
'arg2'
],
'user'
:
'frodo'
,
'jobid'
:
1
,
'executable'
:
'job.exe'
,
'size'
:
2
,
'cwd'
:
'/home/frodo'
,
'location'
:
'loc1'
}
def
teardown
(
self
):
'''common teardown for process group tests'''
del
self
.
base_spec
def
test_process_group_init
(
self
):
'''ProcessGroup.__init__: basic initialization'''
pgroup
=
ProcessGroup
(
self
.
base_spec
)
assert
pgroup
is
not
None
,
"process group creation failed"
@
raises
(
Cobalt
.
Exceptions
.
DataCreationError
)
def
test_process_group_init_missing_fields
(
self
):
'''ProcessGroup.__init__: exception on bad spec'''
pgroup
=
ProcessGroup
({})
assert
False
,
"Should raise exception"
@
patch
.
object
(
Cobalt
.
Proxy
.
DeferredProxyMethod
,
'__call__'
,
return_value
=
1
)
def
test_process_group_start_base
(
self
,
proxy
):
'''basic process group startup'''
pgroup
=
ProcessGroup
(
self
.
base_spec
)
data
=
pgroup
.
prefork
()
pgroup
.
start
()
proxy
.
assert_called_with
([
pgroup
.
executable
]
+
pgroup
.
args
,
pgroup
.
tag
,
"Job %s/%s/%s"
%
(
pgroup
.
jobid
,
pgroup
.
user
,
pgroup
.
id
),
pgroup
.
env
,
data
,
pgroup
.
runid
)
testsuite/TestCobalt/TestComponents/test_system.py
0 → 100644
View file @
03e7c0d9
"""Tests for general system component classes.
"""
from
nose.tools
import
raises
from
testsuite.TestCobalt.Utilities.assert_functions
import
assert_match
,
assert_not_match
from
Cobalt.Components.system.resource
import
Resource
from
Cobalt.Components.system.ClusterNode
import
ClusterNode
import
Cobalt.Exceptions
import
time
def
is_match
(
a
,
b
):
return
a
is
b
class
TestSystemResource
(
object
):
def
setup
(
self
):
default_spec
=
{
'name'
:
'res1'
,
'attributes'
:
{
'mem'
:
64
,
'cpupn'
:
16
,
'pubnet'
:
True
},
}
self
.
resource_list
=
[]
self
.
resource_list
.
append
(
Resource
(
default_spec
))
self
.
resource_list
[
0
].
managed
=
True
def
teardown
(
self
):
del
self
.
resource_list
def
test_resource_init_no_attrs
(
self
):
#initialize a resource with no attributes set, need to get {}.
resource
=
Resource
({
'name'
:
'foo'
})
assert_match
(
resource
.
name
,
'foo'
,
"Resource not initialized"
)
assert_match
(
resource
.
attributes
,
{},
"Improper attributes set"
)
assert_match
(
resource
.
status
,
'idle'
,
"Default resource not idle"
)
assert
not
resource
.
managed
,
"Default resource shouldn't be managed"
def
test_resource_init_with_attrs
(
self
):
#expected default initialization
attrs
=
{
'bar'
:
1
,
'baz'
:
'hi'
}
resource
=
Resource
({
'name'
:
'foo'
,
'attributes'
:
attrs
})
assert_match
(
resource
.
name
,
'foo'
,
"Resource not initialized"
)
assert_match
(
resource
.
attributes
,
attrs
,
"Imporper attributes set."
)
assert_match
(
resource
.
status
,
'idle'
,
"Default resource not idle"
)
assert
not
resource
.
managed
,
"Default resource shouldn't be managed"
@
raises
(
Cobalt
.
Exceptions
.
InvalidStatusError
)
def
test_resource_set_bad_state
(
self
):
#Make sure the resource state constraint is obeyed.
self
.
resource_list
[
0
].
status
=
'badstatus'
assert
False
,
"Exception not raised"
def
test_resource_state_reserved
(
self
):
#if resource_until is set, the resource is reserved.
assert
not
self
.
resource_list
[
0
].
reserved
,
"Resource erroniously reserved."
self
.
resource_list
[
0
].
reserved_until
=
time
.
time
()
+
600
assert
self
.
resource_list
[
0
].
reserved
,
"Resource should be reserved."
def
test_resource_reservation_idle
(
self
):
#Can we make a reservation
until
=
time
.
time
()
+
600
assert
self
.
resource_list
[
0
].
reserve
(
until
,
'foo'
,
1
),
"reservation failed."
assert_match
(
self
.
resource_list
[
0
].
reserved_until
,
until
,
"reserved_until not set."
)
assert_match
(
self
.
resource_list
[
0
].
reserved_by
,
'foo'
,
"reserved_by not set."
)
assert_match
(
self
.
resource_list
[
0
].
reserved_jobid
,
1
,
"reserved_jobid not set."
)
assert_match
(
self
.
resource_list
[
0
].
status
,
'allocated'
,
"resource not allocated"
)
def
test_resource_reservation_reserved_bad_user
(
self
):
#Fence the reservation manipulation to user
now
=
time
.
time
()
until
=
now
+
600
user
=
'foo'
jobid
=
1
res
=
self
.
resource_list
[
0
]
assert
res
.
reserve
(
now
,
user
,
jobid
),
"failed initial reservation"
try
:
res
.
reserve
(
until
,
"bar"
,
1
)
except
Cobalt
.
Exceptions
.
ResourceReservationFailure
:
pass
else
:
assert
False
,
"ResourceReservationFailure not raised"
assert_match
(
res
.
reserved_until
,
now
,
"reserved until modified."
)
assert_match
(
res
.
reserved_by
,
user
,
"reserved user modified."
)
assert_match
(
res
.
reserved_jobid
,
jobid
,
"reserved jobid modified."
)
assert_match
(
res
.
status
,
"allocated"
,
"improper status"
)
def
test_resource_reservation_reserved_bad_job
(
self
):
#make sure a bad jobid but right user doesn't re-reserve resource.
now
=
time
.
time
()
until
=
now
+
600
user
=
'foo'
jobid
=
1
res
=
self
.
resource_list
[
0
]
assert
res
.
reserve
(
now
,
user
,
jobid
),
"failed initial reservation"
try
:
res
.
reserve
(
until
,
user
,
2
)
except
Cobalt
.
Exceptions
.
ResourceReservationFailure
:
pass
else
:
assert
False
,
"ResourceReservationFailure not raised"
assert_match
(
res
.
reserved_until
,
now
,
"reserved until modified."
)
assert_match
(
res
.
reserved_by
,
user
,
"reserved user modified."
)
assert_match
(
res
.
reserved_jobid
,
jobid
,
"reserved jobid modified."
)
assert_match
(
res
.
status
,
"allocated"
,
"improper status"
)
def
test_resource_release
(
self
):
#release resource reservation
res
=
self
.
resource_list
[
0
]
until
=
time
.
time
()
+
600
user
=
'foo'
jobid
=
1
res
.
reserve
(
until
,
user
,
jobid
)
assert
res
.
reserved
,
"reservation failed"
assert
res
.
release
(
user
,
jobid
),
"release failed"
assert
not
res
.
reserved
,
"still reserved"
assert_match
(
res
.
reserved_until
,
None
,
"reserved until not unset"
,
is_match
)
assert_match
(
res
.
reserved_by
,
None
,
"reserved by not unset"
,
is_match
)
assert_match
(
res
.
reserved_jobid
,
None
,
"reserved jobid not unset."
,
is_match
)
assert_match
(
res
.
status
,
"cleanup-pending"
,
"improper status"
)
def
test_resource_release_force
(
self
):
#force-release reservation, because admin commands are a thing.
res
=
self
.
resource_list
[
0
]
until
=
time
.
time
()
+
600
user
=
'foo'
jobid
=
1
res
.
reserve
(
until
,
user
,
jobid
)
assert
res
.
reserved
,
"reservation failed"
assert
res
.
release
(
force
=
True
),
"release failed"
assert
not
res
.
reserved
,
"release failed"
assert_match
(
res
.
reserved_until
,
None
,
"reserved until not unset"
,
is_match
)
assert_match
(
res
.
reserved_by
,
None
,
"reserved by not unset"
,
is_match
)
assert_match
(
res
.
reserved_jobid
,
None
,
"reserved jobid not unset."
,
is_match
)
assert_match
(
res
.
status
,
"cleanup-pending"
,
"improper status"
)
def
test_resource_release_unreserved
(
self
):
#you can't release something that hasn't been reserved yet.
#However, it would show up as being released anyway.
res
=
self
.
resource_list
[
0
]
assert
res
.
release
(
force
=
True
),
"must not show as reserved."
def
test_resource_release_bad_user
(
self
):
#keep a user from releasing some one else's reservation
#need both bad user and bad joid
res
=
self
.
resource_list
[
0
]
until
=
time
.
time
()
+
600
user
=
'foo'
jobid
=
1
assert
res
.
reserve
(
until
,
user
,
jobid
),
"failed to reserve"
assert
res
.
reserved
,
"reservation failed"
assert
res
.
release
(
'bar'
,
jobid
),
"release succeeded"
assert_match
(
res
.
reserved_until
,
None
,
'reserve until unset'
)
assert_match
(
res
.
reserved_by
,
None
,
'reserve by unset'
)
assert_match
(
res
.
reserved_jobid
,
None
,
'reserve jobid unset'
)
assert_match
(
res
.
status
,
'cleanup-pending'
,
'improper status'
)
def
test_resource_release_bad_jobid
(
self
):
#Release if user good but jobid bad
res
=
self
.
resource_list
[
0
]
until
=
time
.
time
()
+
600
user
=
'foo'
jobid
=
1
assert
res
.
reserve
(
until
,
user
,
jobid
),
"failed to reserve"
assert
res
.
reserved
,
"reservation failed"
assert
res
.
release
(
user
,
2
),
"release succeeded"
assert_match
(
res
.
reserved_until
,
None
,
'reserve until unset'
)
assert_match
(
res
.
reserved_by
,
None
,
'reserve by unset'
)
assert_match
(
res
.
reserved_jobid
,
None
,
'reserve jobid unset'
)
assert_match
(
res
.
status
,
'cleanup-pending'
,
'improper status'
)
def
test_resource_release_both_bad
(
self
):
#Do not release if user and job are bad
res
=
self
.
resource_list
[
0
]
until
=
time
.
time
()
+
600
user
=
'foo'
jobid
=
1
assert
res
.
reserve
(
until
,
user
,
jobid
),
"failed to reserve"
assert
res
.
reserved
,
"reservation failed"
assert
not
res
.
release
(
'bar'
,
2
),
"release succeeded"
assert_match
(
res
.
reserved_until
,
until
,
'reserve until unset'
)
assert_match
(
res
.
reserved_by
,
user
,
'reserve by unset'
)
assert_match
(
res
.
reserved_jobid
,
jobid
,
'reserve jobid unset'
)
assert_not_match
(
res
.
status
,
'allocated'
,
'improper status'
)
@
raises
(
Cobalt
.
Exceptions
.
UnmanagedResourceError
)
def
test_unmanaged_reserve
(
self
):
#can't do anything to unmanaged resources
self
.
resource_list
[
0
].
managed
=
False
self
.
resource_list
[
0
].
reserve
(
time
.
time
())
@
raises
(
Cobalt
.
Exceptions
.
UnmanagedResourceError
)
def
test_unmanaged_release
(
self
):
#if we're not managing it, we can't release it.
self
.
resource_list
[
0
].
managed
=
False
self
.
resource_list
[
0
].
release
()
class
TestClusterNode
(
object
):
def
setup
(
self
):
'''Set up common test parameters. Run as a part of every test.'''
self
.
now
=
time
.
time
()
def
teardown
(
self
):
'''Clean up any default parameters that need it. Run every test.'''
pass
def
setup_base_node
(
self
):
'''Generate a bare-bones test node.'''
spec
=
{
'name'
:
'node1'
}
self
.
nodelist
=
[
ClusterNode
(
spec
)]
self
.
testnode
=
self
.
nodelist
[
0
]
self
.
testnode
.
schedulable
=
True
def
test_init
(
self
):
#test basic initialization
spec
=
{
'name'
:
'node1'
,
'attributes'
:
{
'ncpu'
:
1
,
'mem'
:
4
},
'queues'
:
[
'foo'
,
'bar'
],
'backfill_epsilon'
:
600
,
}
node
=
ClusterNode
(
spec
)
assert_match
(
node
.
name
,
'node1'
,
"wrong name"
)
assert_match
(
node
.
attributes
,
{
'ncpu'
:
1
,
'mem'
:
4
},
"Attributes not set."
)
assert
node
.
schedulable
,
"defaulted to not schedulable"
assert_match
(
node
.
drain_until
,
None
,
"drain_until should not be set"
,
is_match
)
assert_match
(
node
.
drain_jobid
,
None
,
"drain_jobid should not be set"
,
is_match
)
assert_match
(
node
.
queues
,
[
'foo'
,
'bar'
],
"queues not set"
)
assert_match
(
node
.
backfill_epsilon
,
600
,
"backfill_epsilon not set"
)
def
test_init_defaults
(
self
):
#test that defaults are being properly set
spec
=
{
'name'
:
'node1'
}
node
=
ClusterNode
(
spec
)
assert_match
(
node
.
attributes
,
{},
'bad default attributes'
)
assert_match
(
node
.
queues
,
[
'default'
],
'bad default queues'
)
assert_match
(
node
.
backfill_epsilon
,
120
,
'bad default backfill_epsilon'
)
def
test_set_drain
(
self
):
#Set all draining parameters correctly.
self
.
setup_base_node
()
self
.
testnode
.
set_drain
(
self
.
now
+
500
,
1234
)
assert
self
.
testnode
.
draining
,
"Node not reporting that it is draining"
assert_match
(
self
.
testnode
.
drain_jobid
,
1234
,
"Draining jobid set incorrectly."
)
assert_match
(
self
.
testnode
.
drain_until
,
int
(
self
.
now
+
500
),
"Drain until set incorrectly."
)
def
test_clear_drain
(
self
):
#make sure draining gets cleared correctly
self
.
setup_base_node
()
self
.
testnode
.
set_drain
(
self
.
now
+
500
,
1234
)
self
.
testnode
.
clear_drain
()
assert
not
self
.
testnode
.
draining
,
"Node still draining."
assert_match
(
self
.
testnode
.
drain_jobid
,
None
,
"Draining jobid still set."
,
is_match
)
assert_match
(
self
.
testnode
.
drain_until
,
None
,
"Drain until still set."
,
is_match
)
def
test_read_only_attrs
(
self
):
#These are read-only attributes. Make sure they stay that way.
self
.
setup_base_node
()
testattrs
=
[
'drain_until'
,
'drain_jobid'
,
'draining'
]
for
attr
in
testattrs
:
try
:
setattr
(
self
.
testnode
,
attr
,
'foo'
)
except
AttributeError
:
pass
else
:
assert
False
,
(
"Read only attribute %s did not raise exception"
%
(
attr
))
@
raises
(
ValueError
)
def
test_no_negative_backfill_epsilon
(
self
):
#ensure ValueError raised for backfill_epsilon
self
.
setup_base_node
()
self
.
testnode
.
backfill_epsilon
=
-
1
@
raises
(
Cobalt
.
Exceptions
.
UnschedulableNodeError
)
def
test_no_drain_down
(
self
):
#don't drain down hardware
self
.
setup_base_node
()
self
.
testnode
.
status
=
'down'
self
.
testnode
.
set_drain
(
self
.
now
,
1234
)
@
raises
(
Cobalt
.
Exceptions
.
UnschedulableNodeError
)
def
test_no_drain_unschedulable
(
self
):
#don't drain on unscheduled hardware
self
.
setup_base_node
()
self
.
testnode
.
schedulable
=
False
self
.
testnode
.
set_drain
(
self
.
now
,
1234
)
testsuite/TestCobalt/Utilities/assert_functions.py
0 → 100644
View file @
03e7c0d9
"""Helper and convenience functions for common assert cases"""
def
assert_match
(
test
,
expect
,
desc
,
comp
=
None
):
'''Assert that we have a match, log failure in convenient way otherwise.
comp should correspond to a boolean __eq__ function, if None, will
use default == comparitor.
'''
if
comp
is
None
:
comp
=
lambda
a
,
b
:
a
==
b
assert
comp
(
test
,
expect
),
"FAILED MATCH: %s: Expected: %s; Got %s"
%
(
desc
,
str
(
expect
),
str
(
test
))
def
assert_not_match
(
test
,
expect
,
desc
,
comp
=
None
):
'''Assert non-matching test and expected result. Default comparitor
corresponds to __ne__ function. By default will use !=.
'''
if
comp
is
None
:
comp
=
lambda
a
,
b
:
a
!=
b
assert
comp
(
test
,
expect
),
"FAILED NON-MATCH: %s: Got %s for both"
%
(
desc
,
str
(
test
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment