Commit 24e007ad authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Merge remote-tracking branch 'grenoble/master' into import-grenoble

parents 972269c4 0aa1bb73
......@@ -19,8 +19,6 @@ You will need the `yggdrasil-c` repository as well for some of the features.
# Requirements
A connector:
* taktuk `sudo aptitude install taktuk`
* Flux: see `https://github.com/flux-framework/flux-core`
......
import json
#constants
ISIPC = "ISIPC"
GETSPWE = "get spawn errors"
GETSPWN = "get spawn"
STOPMPI = "stop mpi bridge"
......@@ -95,18 +96,27 @@ heartBeat= 30
MAXQ = 500
#states
BOOSTRAP = "bootstrap"
INIT = "init"
INITd = "initialized"
RUNING = "running"
SICK = "sick"
DONE = "done"
ERROR = "error"
TIMEOUT = "timeout"
CANCELED = "canceled"
TERMINAISON_PENDING = "terminaison pending"
IDLE = 1
INIT = 2
RUNNING = 4
DONE = 40
ERROR = 50
TIMEOUT = 60
CANCELED = 70
#Authorized transitions
IDLE_INIT = IDLE+INIT
IDDLE_CANCEL = IDLE+CANCELED
INIT_RUNNING = INIT+RUNNING
INIT_ERROR = INIT+ERROR
INIT_CANCELED = INIT+CANCELED
RUNNING_DONE = RUNNING+DONE
RUNNING_ERROR = RUNNING+ERROR
RUNNING_TIMEOUT = RUNNING+TIMEOUT
RUNNING_CANCELED = RUNNING+CANCELED
TERMINAISON_PENDING = "terminaison_pending"
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
......@@ -127,5 +137,9 @@ def blue(str) :
def green(str) :
return bold(bcolors.OKGREEN+str+bcolors.ENDC)
def orange(str) :
return bold(bcolors.WARNING+str+bcolors.ENDC)
def error(str) :
return bold(bcolors.FAIL+str+bcolors.ENDC)
......@@ -3,3 +3,4 @@ from .encoder import MPIDecoder
from .network import Network
from .framework import FrameworkControler
from .main import runner
from .main import argless_runner
......@@ -161,10 +161,10 @@ class Erebor:
False)
else :
logger.warning("need to buffer the message for later")
if mpi_dnode not in self.mpi_wait_route_msg :
self.mpi_wait_route_msg[mpi_dnode] = list()
if "{}@{}".format(mpi_dnode, dnetowkrId) not in self.mpi_wait_route_msg :
self.mpi_wait_route_msg["{}@{}".format(mpi_dnode, dnetowkrId)] = list()
# buffer the message for later
self.mpi_wait_route_msg[mpi_dnode].append([
self.mpi_wait_route_msg["{}@{}".format(mpi_dnode, dnetowkrId)].append([
mpi_dnode, dnetowkrId, mpi_snode,
snetworkId, m])
else :
......@@ -194,13 +194,12 @@ class Erebor:
snetworkId, m):
gateway_network = None
neighbour_gateway = None
# if we have a gateway to send the message
logger.debug("if we have a gateway to send the message");
if dnetowkrId in self.routing_table.keys() :
neighbour_gateway = self.routing_table[dnetowkrId]
gateway_network = self.networks.get(neighbour_gateway.networkId)
# If we do not know the route, send to the master he'll know what to
# do.
else :
logger.debug("If we do not know the route, send to the master he'll know what to do.")
# otherwise take the default route
logger.debug("take master network to send the message")
gateway_network = self.master_network
......@@ -211,8 +210,7 @@ class Erebor:
consts.FROM:snode+"@"+snetworkId,
consts.DEST:dnode+"@"+dnetowkrId,
consts.DATA:m})
# if we are on the same network, send directly to the dnode
# otherwise, send to the gateway rank
logger.debug("if we are on the same network, send directly to the dnode")
if neighbour_gateway.networkId == dnetowkrId :
logger.debug(
" TRANS-NETWORK for {}@{}, gateway {}@{}".format(
......@@ -225,6 +223,7 @@ class Erebor:
message,
False)
else :
logger.debug("otherwise, send to the gateway rank")
logger.debug(
" TRANS-NETWORK for {}@{}, gateway {}@{}".format(
dnode, dnetowkrId,
......@@ -353,11 +352,18 @@ class Erebor:
final_cllbck,
final_cllbck_value)
logger.info("FROM {} EXECUTE ACTION {}".format(local_from,decoded_message[consts.ACTION]))
logger.info(("FROM {} EXECUTE ACTION {} ON NETWORK {}"
"snode {} snetid {} dnode {} dnetid {}"
).format(
local_from,
decoded_message[consts.ACTION],
network.ID,
snode, snetid, dnode, dnetid))
# make it spawn nodes
if decoded_message[consts.ACTION] == consts.SPAWN :
if int(decoded_message[consts.CALLBACK]) > -1 :
logger.debug("callback asked "+network.ID)
logger.debug("spawn nodes {}".format(decoded_message[consts.DATA]))
network.connector.spawn_nodes(
decoded_message[consts.DEST],
decoded_message[consts.DATA],
......@@ -489,6 +495,7 @@ class Erebor:
elif decoded_message[consts.ACTION] == consts.MPIDEPL :
on_network = self.networks.get(decoded_message[consts.NETWORK])
port = decoded_message[consts.PORT]
is_ipc = decoded_message[consts.ISIPC]
# init the routing table for this group
self.mpi_routing_table[on_network] = dict()
# TODO handle the fact that maybe a children is already launched on
......@@ -503,7 +510,7 @@ class Erebor:
# Will be called when a replication is done.
def after_children_startup(child) :
network = self.networks.get(child.networkId)
self.start_MPI_bridge_on(child, network.ID, port)
self.start_MPI_bridge_on(child, network.ID, port, is_ipc)
self.mpi_launching_to_wait[network] -= 1
# when everyone has bootstrap
if self.mpi_launching_to_wait[network] == 0 :
......@@ -518,7 +525,7 @@ class Erebor:
after_children_startup)
self.launch_erebor_on(str(spawn.rank), on_network)
else :
self.start_mpi_bridhe(on_network.ID, port)
self.start_mpi_bridhe(on_network.ID, port, is_ipc)
# in case of an one element group, still propagate the
# correct deployed information
if len(spawned) == 1 :
......@@ -528,11 +535,14 @@ class Erebor:
elif decoded_message[consts.ACTION] == consts.MPIREG :
rank = decoded_message[consts.RANK]
self.mpi_routing_table[network][rank] = local_from
if rank in self.mpi_wait_route_msg :
for message in self.mpi_wait_route_msg[rank] :
logger.debug("Registering {}@{}".format(rank, network.ID))
if "{}@{}".format(rank, network.ID) in self.mpi_wait_route_msg :
logger.debug(" Take wait messages ")
for message in self.mpi_wait_route_msg["{}@{}".format(rank, network.ID)] :
logger.debug("Send a queued message")
self.send_mpi_trans_network_message_to(message[0],
message[1], message[2], message[3], message[4])
del self.mpi_wait_route_msg[rank]
del self.mpi_wait_route_msg["{}@{}".format(rank, network.ID)]
# clean all registered mpi ranks associated with this network.
elif decoded_message[consts.ACTION] == consts.MPICLN :
on_network = self.networks.get(decoded_message[consts.NETWORK])
......@@ -624,41 +634,6 @@ class Erebor:
self.propagate_dead_nodes(snode, snetid, error_nodes)
# Is called when a trans network message arrive here
# Need to chose if the message has to be treated here or forwarded
def trans_network_callback(self, message, rank_from) :
source = message[consts.FROM]
destination = message[consts.DEST]
logger.info(
"a trans network message arrived "+source+" -> "+destination)
package = message[consts.DATA]
# source and destination are on the form number@networkId
snode = source.split("@")[0]
snetwork = source.split("@")[1]
dnode = destination.split("@")[0]
dnetwork = destination.split("@")[1]
# look if I am the right destination Erebor.
# -> destination need to match on of my network identifier
network = self.networks.get(dnetwork)
# If I have a match, it means the package belong to one of my networks
if network != None :
logger.debug("local network routing {}".format(dnode))
# check my rank to see is i'm equivalent to dnode
if network.state == consts.RUNING :
dnode_list = self.helper.build_list("["+dnode.replace("/",",")+"]")
logger.debug(" node list {}".format(dnode_list))
# If I am the destination
if network.rank in dnode_list :
self.interpret_trans_network(package, network,
rank_from,snode,snetwork, dnode, dnetwork)
else :
logger.debug("not destination, forwarding")
self.send_trans_network_message_to(
dnode,
dnetwork,
snode,
snetwork,
package)
# Is called when a trans network message arrive here
# Need to chose if the message has to be treated here or forwarded
def mpi_trans_network_callback(self, message, rank_from) :
......@@ -680,7 +655,7 @@ class Erebor:
if network != None :
logger.debug("local network routing {}".format(mpi_dnode))
# check my rank to see is i'm equivalent to mpi_dnode
if network.state == consts.RUNING :
if network.state == consts.RUNNING :
mpi_hook = self.mpi_bridges[network]
# If I am the destination
if mpi_hook.is_destination_for(mpi_dnode) :
......@@ -716,19 +691,20 @@ class Erebor:
logger.info(
"a trans network message arrived "+source+" -> "+destination)
package = message[consts.DATA]
# source and destination are on the form number@networkId
logger.debug("source and destination are on the form number@networkId")
snode = source.split("@")[0]
snetwork = source.split("@")[1]
dnode = destination.split("@")[0]
dnetwork = destination.split("@")[1]
# look if I am the right destination Erebor.
# -> destination need to match on of my network identifier
logger.debug(" look if I am the right destination Erebor.")
logger.debug(" -> destination need to match on of my network identifier")
network = self.networks.get(dnetwork)
# If I have a match, it means the package belong to one of my networks
logger.debug("dnetwork {}".format(dnetwork))
logger.debug("If I have a match, it means the package belong to one of my networks")
if network != None :
logger.debug("local network routing {}".format(dnode))
# check my rank to see is i'm equivalent to dnode
if network.state == consts.RUNING :
logger.debug("local network routing {} {}".format(dnode, network.ID))
logger.debug("check my rank to see is i'm equivalent to dnode")
if network.state == consts.RUNNING :
dnode_list = self.helper.build_list("["+dnode.replace("/",",")+"]")
logger.debug(" node list {}".format(dnode_list))
# If I am the destination
......@@ -834,7 +810,8 @@ class Erebor:
elif order[consts.ORDER] == consts.STRTMPI :
networkId = order[consts.NETWORK]
port = order[consts.PORT]
self.start_mpi_bridhe(networkId, port)
is_ipc = order[consts.ISIPC]
self.start_mpi_bridhe(networkId, port, is_ipc)
# If i'm asked to stop the MPI bridge
elif order[consts.ORDER] == consts.STOPMPI :
networkId = order[consts.NETWORK]
......@@ -966,7 +943,7 @@ class Erebor:
else :
self.connect_to_master_network(networkId)
# update state
self.state = consts.RUNING
self.state = consts.RUNNING
# Connect to an existing network, for whom I am a slave
# There can only be one of those
......@@ -1026,7 +1003,7 @@ class Erebor:
network.connector.register_on_dead_bridge(self.dead_bridge_callback)
# Start a new network, for whom I am the master
def start_mpi_bridhe(self, networkId, port) :
def start_mpi_bridhe(self, networkId, port, is_ipc) :
network = self.networks.get(networkId)
if network != None :
self.mpi_bridges[network] = MPIHook(self.add_to_queue,
......@@ -1036,7 +1013,8 @@ class Erebor:
network,
self.debug_list,
self.loglv,
self.logf)
self.logf,
is_ipc)
self.mpi_bridges[network].start()
# Start a new network, for whom I am the master
......@@ -1109,14 +1087,15 @@ class Erebor:
# Ask the Erebor children to start a MPIBridge on the given networkId
# -> only work with direct children
def start_MPI_bridge_on(self, children, networkId, port) :
def start_MPI_bridge_on(self, children, networkId, port, is_ipc) :
network = self.networks.get(children.networkId)
if network != None :
order = json.dumps({
consts.TYPE:consts.ORDER,
consts.ORDER:consts.STRTMPI,
consts.NETWORK:networkId,
consts.PORT:port
consts.PORT:port,
consts.ISIPC:is_ipc
});
network.connector.send_message_to(children.rank, children.target, order, False)
......
......@@ -332,13 +332,14 @@ class FrameworkControler:
# port : the port fort zmq on the hook
# callback : the function who will be executed at the end of te
# operation
def start_mpi_session(self, dnetid, snode, snetid, port, callback):
def start_mpi_session(self, dnetid, snode, snetid, port, callback, is_ipc=consts.TRUE):
id_callback = self.erebor.register_execute_callback(callback)
message = json.dumps({
consts.ACTION:consts.MPIDEPL,
consts.PORT:port,
consts.CALLBACK:str(id_callback),
consts.NETWORK:dnetid})
consts.NETWORK:dnetid,
consts.ISIPC:is_ipc})
self.erebor.send_trans_network_message_to("0",
dnetid,
snode,
......
......@@ -7,9 +7,23 @@ import yggdrasil
from yggdrasil.erebor import Erebor
from yggdrasil.erebor.network import Network
from ..log import configure_logger
import sys, traceback, threading, time
logger = logging.getLogger('yggdrasil')
def argless_runner(Controler):
erebor = Erebor(True,
"erebor,network,isengard,wrapper,unix_socket,bridge,mpi",
"CRITICAL", "", "", "")
def signal_handler(a, b):
erebor.terminate()
signal.signal(signal.SIGINT, signal_handler)
controler = Controler(erebor, "root", "", "")
# Start the test sample after the root network bootstrap
erebor.on_network_init("root", controler.start)
controler.bootstrap()
erebor.process_messages()
erebor.terminate()
def runner(argv, Controler=None):
print("start")
......@@ -140,6 +154,7 @@ def runner(argv, Controler=None):
hlist = ",".join(f.readlines())
hlist = hlist.replace("\n", "")
hlist = hlist.replace("\r", "")
f.close()
# Remove unecessary /
if epath != "" and not epath.endswith("/"):
epath = epath+"/"
......@@ -164,6 +179,7 @@ def runner(argv, Controler=None):
else:
erebor.bootstrap(ID)
erebor.process_messages()
erebor.terminate()
except:
logger.exception("Unexpected error:")
erebor.terminate()
......
......@@ -11,7 +11,7 @@ logger = logging.getLogger('yggdrasil')
class MPIHook(Thread) :
def __init__(self, add_to_queue, trans_hook, mpi_trans_hook, port, network,
debug_list, loglv, logf) :
debug_list, loglv, logf, is_ipc) :
Thread.__init__(self)
self.debug_list = debug_list
self.decoder = MPIDecoder()
......@@ -28,36 +28,46 @@ class MPIHook(Thread) :
self.daemon = True
self.send_trans_network_message_to = trans_hook
self.send_mpi_trans_network_message_to = mpi_trans_hook
self.is_ipc = is_ipc == consts.TRUE
def run(self) :
# A context per thread
context = zmq.Context()
rep_socket = context.socket(zmq.PULL)
rep_socket.bind("ipc:///tmp/{}".format(self.port))
logger.debug(consts.bold("mpi -> reply socket ready to receive"))
while self.poll:
if self.is_ipc :
rep_socket.bind("ipc:///tmp/{}".format(self.port))
else :
rep_socket.bind("tcp://*:{}".format(self.port))
logger.debug(consts.bold("mpi -> reply socket ready to receive on {}").format(self.port))
poller = zmq.Poller()
poller.register(rep_socket)
while self.poll: # TODO do actual polling pls
try :
logger.debug(consts.bold("wait to receive"))
rcv_message = rep_socket.recv()
logger.debug(consts.bold("received"))
logger.debug(consts.bold("received {}".format(rcv_message)))
decoded_message = self.unpack(rcv_message)
logger.debug(consts.bold("unpacked {}".format(decoded_message)))
# a new process is registering to me
if decoded_message[consts.TYPE] == consts.REG :
logger.debug(consts.bold("mpi -> new registration to do later"))
self.add_to_queue(Event(self.new_registration, decoded_message))
# a process wants to send a message to another one
if decoded_message[consts.TYPE] == consts.MPISEND :
logger.debug(consts.bold("mpi -> send a message later"))
self.add_to_queue(Event(self.send_message_to_distant_mpi_rank,
decoded_message))
if decoded_message[consts.TYPE] == consts.MESSAGE :
logger.debug(consts.bold("mpi -> send a ctrl message later"))
self.add_to_queue(Event(self.send_message_to_distant_erebor,
decoded_message))
# unlock the state of request socket
logger.debug(consts.bold("mpi -> done loop"))
events = dict(poller.poll(10000))
if events[rep_socket] == zmq.POLLIN :
rcv_message = rep_socket.recv()
logger.debug(consts.bold("received"))
logger.debug(consts.bold("received {}".format(rcv_message)))
decoded_message = self.unpack(rcv_message)
logger.debug(consts.bold("unpacked {}".format(decoded_message)))
# a new process is registering to me
if decoded_message[consts.TYPE] == consts.REG :
logger.debug(consts.bold("mpi -> new registration to do later"))
self.add_to_queue(Event(self.new_registration, decoded_message))
# a process wants to send a message to another one
if decoded_message[consts.TYPE] == consts.MPISEND :
logger.debug(consts.bold("mpi -> send a message later"))
self.add_to_queue(Event(self.send_message_to_distant_mpi_rank,
decoded_message))
if decoded_message[consts.TYPE] == consts.MESSAGE :
logger.debug(consts.bold("mpi -> send a ctrl message later"))
self.add_to_queue(Event(self.send_message_to_distant_erebor,
decoded_message))
# unlock the state of request socket
logger.debug(consts.bold("mpi -> done loop"))
else :
logger.debug(consts.bold("no messages"))
except Exception as e:
logger.warning(consts.bold("{}".format(e)))
......@@ -89,7 +99,10 @@ class MPIHook(Thread) :
rank = decoded_message[consts.RANK]
logger.info(consts.bold("-> new registration {}".format(rank)))
socket = self.context.socket(zmq.PUSH)
socket.connect("ipc:///tmp/{}".format(decoded_message[consts.PORT]))
if self.is_ipc :
socket.connect("ipc:///tmp/{}".format(decoded_message[consts.PORT]))
else :
socket.connect("tcp://127.0.0.1:{}".format(decoded_message[consts.PORT]))
self.subscriber[rank] = socket
# advertise the root node
message = json.dumps({consts.ACTION:consts.MPIREG,consts.RANK:rank})
......
......@@ -132,7 +132,7 @@ class Network():
# -> to the registered callbacks
# -> to my network master (if it exsit: propagate = true)
def notify_bootstrap(self, propagate):
self.state = consts.RUNING
self.state = consts.RUNNING
data = json.dumps({
consts.FROM: self.rank,
consts.TYPE: consts.INFOS,
......
......@@ -25,6 +25,7 @@ class RunningCommand:
self.callback_object = callback_object
def test_and_dispatch(self, txt_utf8, txt_base_64):
logger.info(txt_utf8);
if self.stdout_regex.match(txt_utf8) is not None:
self.append_stdout(txt_utf8, txt_base_64)
return 0
......@@ -41,22 +42,22 @@ class RunningCommand:
return -1
def get_stdout_regex(self):
regex = "^.*{}-{}: {} \(.*\): output >.*".format(self.name, self.dest,
self.command)
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" output >.*".format(self.name, self.dest, self.command))
regex = "^.*{}-{}: {} \(.*\): output >.*".format(self.name,
self.dest, re.escape(self.command))
logger.debug("regex -> ^.*{}-{}: {} \(.*\):" " output >.*".format(self.name,
self.dest, re.escape(self.command)))
return re.compile(regex)
def get_stderr_regex(self):
regex = "^.*{}-{}: {} \(.*\): error >.*".format(self.name, self.dest,
self.command)
regex = "^.*{}-{}: {} \(.*\): error >.*".format(self.name,
self.dest, re.escape(self.command))
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" error >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
def get_final_regex(self):
regex = "^.*{}-{}: {} \(.*\): status >.*".format(self.name, self.dest,
self.command)
regex = "^.*{}-{}: {} \(.*\): status >.*".format(self.name,
self.dest, re.escape(self.command))
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" status >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
......
......@@ -194,7 +194,7 @@ class Isengard:
self.flog_time(t)
def send_to_taktuk(self, data):
#logger.debug("command to taktuk {}".format(data))
logger.debug("command to taktuk {}".format(data))
if self.isRoot :
self.socket.send_message(data)
else :
......@@ -228,39 +228,44 @@ class Isengard:
self.spawned_lock.acquire()
arguments = txt.split("connector:")[1].split(";")
logger.warning(consts.bold("connector error {}".format(arguments)))
if "Possible precedence issue with control " not in arguments[2] :
if ("TERM" not in arguments[2]
and "Warning" not in arguments[2]
and "Possible precedence issue with control " not in arguments[2] ):
sp_node, index = self.get_wait_spawned(arguments[PEER])
if index > -1 :
del self.wait_spawn[index]
self.dead_node(sp_node)
self.propagate_state_info(sp_node)
else :
sp_node_key = arguments[PEER]+"-"+arguments[PEER_POS]
sp_node = self.spawned[sp_node_key]
nodes = [sp_node.name]
ranks = [str(sp_node.rank)]
childrens = self.get_children_by_position(sp_node, arguments[PEER_POS])
for children in childrens :
self.dead_node(children)
nodes.append(children.name)
nodes.append(str(children.rank))
self.dead_node(sp_node)
self.fire_node_dead(arguments[PEER], nodes, ranks)
# Advertise running commandes that nodes are deads
toremove = []
dispactch= False
for i in range(0, len(self.running_commands)) :
running_command = self.running_commands[i]
ret = running_command.node_deads(nodes, ranks)
if ret > -1:
dispactch = True
if ret == 2:
toremove.append(i)
logger.info("dead command propagation")
break
if dispactch:
for i in toremove:
del self.running_commands[i]
try:
sp_node_key = arguments[PEER]+"-"+arguments[PEER_POS]
sp_node = self.spawned[sp_node_key]
nodes = [sp_node.name]
ranks = [str(sp_node.rank)]
childrens = self.get_children_by_position(sp_node, arguments[PEER_POS])
for children in childrens :
self.dead_node(children)
nodes.append(children.name)
nodes.append(str(children.rank))
self.dead_node(sp_node)
self.fire_node_dead(arguments[PEER], nodes, ranks)
# Advertise running commandes that nodes are deads
toremove = []
dispactch= False
for i in range(0, len(self.running_commands)) :
running_command = self.running_commands[i]
ret = running_command.node_deads(nodes, ranks)
if ret > -1:
dispactch = True
if ret == 2:
toremove.append(i)
logger.info("dead command propagation")
break
if dispactch:
for i in toremove:
del self.running_commands[i]
except :
pass
self.spawned_lock.release()
def get_children_by_position(self, father, position) :
......@@ -286,9 +291,9 @@ class Isengard:
return list(self.error_nodes_s)