Commit 3b339d96 authored by Paul Rich's avatar Paul Rich
Browse files

Fixing spacing in cdbwriter.py

parent 76c466ca
"""Cobalt Component that writes messages generated by other
"""Cobalt Component that writes messages generated by other
components to a target database.
"""
__revision__ = '$Revision: 1 $'
import os
import sys
......@@ -17,12 +16,14 @@ except ImportError:
import db2util
import Cobalt.Logging, Cobalt.Util
import Cobalt.Logging
import Cobalt.Util
from Cobalt.Statistics import Statistics
from Cobalt.Components.DBWriter.cdbMessages import LogMessage, LogMessageDecoder, LogMessageEncoder
from Cobalt.Components.base import Component, exposed, automatic, query, locking
from Cobalt.Proxy import ComponentProxy
__revision__ = '$Revision: 1 $'
logger = logging.getLogger("Cobalt.Components.cdbwriter")
config = ConfigParser.ConfigParser()
......@@ -32,848 +33,845 @@ if not config.has_section('cdbwriter'):
sys.exit(1)
def get_cdbwriter_config(option, default):
try:
value = config.get('cdbwriter', option)
except ConfigParser.NoOptionError:
value = default
return value
try:
value = config.get('cdbwriter', option)
except ConfigParser.NoOptionError:
value = default
return value
class MessageQueue(Component):
name = "cdbwriter"
implementation = "cdbwriter"
logger = logging.getLogger("Cobalt.Components.cdbwriter")
_configfields = ['user', 'pwd', 'database', 'schema']
_config = ConfigParser.ConfigParser()
_config.read(Cobalt.CONFIG_FILES)
if not config._sections.has_key('cdbwriter'):
logger.error('"cdbwriter" section missing from config file.')
config = _config._sections['cdbwriter']
mfields = [field for field in _configfields if not config.has_key(field)]
if mfields:
logger.error("Missing option(s) in cobalt config file [cdbwriter] section: %s" % (" ".join(mfields)))
sys.exit(1)
def __init__(self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
self.sync_state = Cobalt.Util.FailureMode("Foreign Data Sync")
self.connected = False
self.msg_queue = []
self.decoder = LogMessageDecoder()
self.overflow = False
self.overflow_filename = None
self.overflow_file = None
self.clearing_overflow = False
self.max_queued = int(get_cdbwriter_config('max_queued_msgs', '-1'))
if self.max_queued <= 0:
logger.info("message queue set to unlimited.")
self.max_queued = None
self.overflow_filename = get_cdbwriter_config('overflow_file', None)
if self.overflow_filename == None:
logger.warning("No file given to catch maximum messages. Setting queue size to unlimited.")
self.max_queued = None
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
state.update({
'cdbwriter_version': 1,
'msg_queue': self.msg_queue,
'overflow': self.overflow})
return state
def __setstate__(self, state):
Component.__setstate__(self, state)
self.msg_queue = state['msg_queue']
self.connected = False
self.decoder = LogMessageDecoder()
self.clearing_overflow = False
self.overflow_filename = None
self.overflow_file = None
self.max_queued = int(get_cdbwriter_config('max_queued_msgs', '-1'))
if self.max_queued <= 0:
logger.info("message queue set to unlimited.")
self.max_queued = None
else:
self.overflow_filename = get_cdbwriter_config('overflow_file', None)
if self.max_queued and (self.overflow_filename == None):
logger.warning("No file given to catch maximum messages. Setting queue size to unlimited.")
self.max_queued = None
if state.has_key('overflow') and self.max_queued:
self.overflow = state['overflow']
else:
self.overflow = False
def init_database_connection(self):
user = get_cdbwriter_config('user', None)
pwd = get_cdbwriter_config('pwd', None)
database = get_cdbwriter_config('database', None)
schema = get_cdbwriter_config('schema', None)
try:
self.database_writer = DatabaseWriter(database, user, pwd, schema)
except:
#make this a log statement
logging.error("Unable to connect to %s as %s" % (database, user))
self.connected = False
logging.debug(traceback.format_exc())
else:
self.connected = True
def iterate(self):
"""Go through the messages that are sitting on the queue and
load them into the database."""
#if we're not connected, try to reconnect to the database
if not self.connected:
logger.debug("Attempting reconnection.")
self.init_database_connection()
if self.connected and self.overflow:
self.clearing_overflow = True
self.open_overflow('r')
if self.overflow_file:
overflow_queue = [self.decoder.decode(line)
for line in self.overflow_file]
overflow_queue.extend(self.msg_queue)
self.msg_queue = overflow_queue
self.close_overflow()
self.del_overflow()
self.overflow = False
while self.msg_queue and self.connected:
msg = self.msg_queue[0]
name = "cdbwriter"
implementation = "cdbwriter"
logger = logging.getLogger("Cobalt.Components.cdbwriter")
_configfields = ['user', 'pwd', 'database', 'schema']
_config = ConfigParser.ConfigParser()
_config.read(Cobalt.CONFIG_FILES)
if not config._sections.has_key('cdbwriter'):
logger.error('"cdbwriter" section missing from config file.')
config = _config._sections['cdbwriter']
mfields = [field for field in _configfields if not config.has_key(field)]
if mfields:
logger.error("Missing option(s) in cobalt config file [cdbwriter] section: %s" % (" ".join(mfields)))
sys.exit(1)
def __init__(self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
self.sync_state = Cobalt.Util.FailureMode("Foreign Data Sync")
self.connected = False
self.msg_queue = []
self.decoder = LogMessageDecoder()
self.overflow = False
self.overflow_filename = None
self.overflow_file = None
self.clearing_overflow = False
self.max_queued = int(get_cdbwriter_config('max_queued_msgs', '-1'))
if self.max_queued <= 0:
logger.info("message queue set to unlimited.")
self.max_queued = None
self.overflow_filename = get_cdbwriter_config('overflow_file', None)
if self.overflow_filename == None:
logger.warning("No file given to catch maximum messages. Setting queue size to unlimited.")
self.max_queued = None
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
state.update({
'cdbwriter_version': 1,
'msg_queue': self.msg_queue,
'overflow': self.overflow})
return state
def __setstate__(self, state):
Component.__setstate__(self, state)
self.msg_queue = state['msg_queue']
self.connected = False
self.decoder = LogMessageDecoder()
self.clearing_overflow = False
self.overflow_filename = None
self.overflow_file = None
self.max_queued = int(get_cdbwriter_config('max_queued_msgs', '-1'))
if self.max_queued <= 0:
logger.info("message queue set to unlimited.")
self.max_queued = None
else:
self.overflow_filename = get_cdbwriter_config('overflow_file', None)
if self.max_queued and (self.overflow_filename == None):
logger.warning("No file given to catch maximum messages. Setting queue size to unlimited.")
self.max_queued = None
if state.has_key('overflow') and self.max_queued:
self.overflow = state['overflow']
else:
self.overflow = False
try:
self.database_writer.addMessage(msg)
except db2util.adapterError:
logger.error ("Error updating databse. Unable to add message due to adapter error. Message dropped.")
def init_database_connection(self):
user = get_cdbwriter_config('user', None)
pwd = get_cdbwriter_config('pwd', None)
database = get_cdbwriter_config('database', None)
schema = get_cdbwriter_config('schema', None)
try:
self.database_writer = DatabaseWriter(database, user, pwd, schema)
except:
#make this a log statement
logging.error("Unable to connect to %s as %s" % (database, user))
self.connected = False
logging.debug(traceback.format_exc())
self.msg_queue.pop(0)
except:
logger.error ("Error updating databse. Unable to add message. %s", msg)
else:
self.connected = True
def iterate(self):
"""Go through the messages that are sitting on the queue and
load them into the database."""
#if we're not connected, try to reconnect to the database
if not self.connected:
logger.debug("Attempting reconnection.")
self.init_database_connection()
if self.connected and self.overflow:
self.clearing_overflow = True
self.open_overflow('r')
if self.overflow_file:
overflow_queue = [self.decoder.decode(line)
for line in self.overflow_file]
overflow_queue.extend(self.msg_queue)
self.msg_queue = overflow_queue
self.close_overflow()
self.del_overflow()
self.overflow = False
while self.msg_queue and self.connected:
msg = self.msg_queue[0]
try:
self.database_writer.addMessage(msg)
except db2util.adapterError:
logger.error("Error updating databse. Unable to add message due to adapter error. Message dropped.")
logging.debug(traceback.format_exc())
self.msg_queue.pop(0)
except:
logger.error("Error updating databse. Unable to add message. %s", msg)
logging.debug(traceback.format_exc())
self.connected = False
#if we were clearing an overflow, here we go again.
if ((self.max_queued != None) and
(len(self.msg_queue) >= self.max_queued)):
self.overflow = True
self.open_overflow('a')
if self.overflow_file != None:
self.queue_to_overflow()
self.close_overflow()
break
else:
#message added
self.msg_queue.pop(0)
self.clearing_overflow = False
iterate = automatic(iterate)
def add_message(self, msg):
#keep the queue from consuming all memory
if ((self.max_queued != None) and
(len(self.msg_queue) >= self.max_queued)and
(not self.clearing_overflow)):
self.overflow = True
self.open_overflow('a')
if self.overflow_file == None:
logger.critical("MESSAGE DROPPED: %s", msg)
else:
self.queue_to_overflow()
self.close_overflow()
#and now queue as normal
msgDict = None
try:
msgDict = self.decoder.decode(msg)
except ValueError:
logger.error("Bad message recieved. Failed to decode string %s" % msg)
return
except:
logging.debug(traceback.format_exc())
self.connected = False
#if we were clearing an overflow, here we go again.
if ((self.max_queued != None) and
(len(self.msg_queue) >= self.max_queued)):
self.overflow = True
self.open_overflow('a')
if self.overflow_file != None:
self.queue_to_overflow()
self.close_overflow()
break
else:
#message added
self.msg_queue.pop(0)
self.clearing_overflow = False
iterate = automatic(iterate)
def add_message(self, msg):
#keep the queue from consuming all memory
if ((self.max_queued != None) and
(len(self.msg_queue) >= self.max_queued)and
(not self.clearing_overflow)):
self.overflow = True
self.open_overflow('a')
if self.overflow_file == None:
logger.critical("MESSAGE DROPPED: %s", msg)
else:
self.queue_to_overflow()
self.close_overflow()
#and now queue as normal
msgDict = None
try:
msgDict = self.decoder.decode(msg)
except ValueError:
logger.error("Bad message recieved. Failed to decode string %s" % msg)
return
except:
logging.debug(traceback.format_exc())
self.msg_queue.append(msgDict)
add_message = exposed(add_message)
def save_me(self):
Component.save(self)
save_me = automatic(save_me)
def open_overflow(self, mode):
try:
self.overflow_file = open(self.overflow_filename,mode)
except:
self.logger.critical("Unable to open overflow file! Information to database will be lost!")
def close_overflow(self):
if self.overflow and self.overflow_file:
self.overflow_file.close()
self.overflow_file = None
def del_overflow(self):
os.remove(self.overflow_filename)
def queue_to_overflow(self):
elements_written = 0
if len(self.msg_queue) == 0:
return
while self.msg_queue:
msg = self.msg_queue.pop(0)
try:
self.overflow_file.write(json.dumps(msg, cls=LogMessageEncoder)+'\n')
elements_written += 1
except IOError:
logger.error('Could only partially empty queue, %d messages written' %
elements_written)
self.msg_queue.insert(0, msg)
if elements_written > 0:
del self.msg_queue[0:elements_written-1] #empty the queue of what we have written
return len(self.msg_queue)
self.msg_queue.append(msgDict)
add_message = exposed(add_message)
def save_me(self):
Component.save(self)
save_me = automatic(save_me)
def open_overflow(self, mode):
try:
self.overflow_file = open(self.overflow_filename,mode)
except:
self.logger.critical("Unable to open overflow file! Information to database will be lost!")
def close_overflow(self):
if self.overflow and self.overflow_file:
self.overflow_file.close()
self.overflow_file = None
def del_overflow(self):
os.remove(self.overflow_filename)
def queue_to_overflow(self):
elements_written = 0
if len(self.msg_queue) == 0:
return
while self.msg_queue:
msg = self.msg_queue.pop(0)
try:
self.overflow_file.write(json.dumps(msg, cls=LogMessageEncoder)+'\n')
elements_written += 1
except IOError:
logger.error('Could only partially empty queue, %d messages written' %
elements_written)
self.msg_queue.insert(0, msg)
if elements_written > 0:
del self.msg_queue[0:elements_written-1] #empty the queue of what we have written
return len(self.msg_queue)
#for storing the message queue to avoid memory problems:
def encodeLogMsg(logMsg):
return json.dumps(logMsg)
return json.dumps(logMsg)
def decodeLogMsg(msgStr):
return json.loads(msgStr)
return json.loads(msgStr)
#Class for handling database output
class DatabaseWriter(object):
def __init__(self, dbName, username, password, schema):
self.db = db2util.db()
try:
self.db.connect(dbName, username, password)
except:
logger.error("Failed to open a connection to database %s as user %s" %(dbName, username))
raise
self.schema = schema
table_names = ['RESERVATION_DATA', 'RESERVATION_PARTS',
'RESERVATION_EVENTS', 'RESERVATION_USERS',
'RESERVATION_PROG', 'JOB_DATA', 'JOB_ATTR',
'JOB_DEPS', 'JOB_EVENTS','JOB_COBALT_STATES', 'JOB_PROG',
'JOB_RUN_USERS']
no_pk_tables = ['RESERVATION_PARTS', 'RESERVATION_USERS',
'JOB_ATTR', 'JOB_RUN_USERS']
#Handle tables, There is probably a better way to do this.
self.daos = {}
try:
for table_name in table_names:
logger.info("Accessing table: %s" % table_name)
if table_name in ['RESERVATION_EVENTS', 'JOB_EVENTS',
'JOB_COBALT_STATES']:
self.daos[table_name] = StateTableData(self.db, schema,
table_name)
elif table_name == 'RESERVATION_DATA':
self.daos[table_name] = ResDataData(self.db, schema,
table_name)
elif table_name == 'JOB_DATA':
self.daos[table_name] = JobDataData(self.db, schema,
table_name)
elif table_name == 'JOB_DEPS':
self.daos[table_name] = JobDepsData(self.db, schema,
table_name)
elif table_name == 'JOB_PROG':
self.daos[table_name] = JobProgData(self.db, schema,
table_name)
elif table_name in no_pk_tables:
self.daos[table_name] = no_pk_dao(self.db, schema,
table_name)
else:
self.daos[table_name] = db2util.dao(self.db, schema,
table_name)
except:
logger.error("Error accessing table %s!" % table_name)
self.db.close()
raise
#we opened with a schema, let's make that the default for now.
self.db.prepExec("set current schema %s" % schema)
def addMessage(self, logMsg):
logger.debug("Inserting Data message of type: %s.%s " % (logMsg.item_type, logMsg.state))
#print logMsg
if logMsg.item_type == 'reservation':
if logMsg.state == 'creating':
self.__addResMsg(logMsg)
else:
self.__modifyResMsg(logMsg)
#elif logMsg.item_type == 'partition':
# print "Not yet implemented."
elif logMsg.item_type == 'job_prog':
self.__addJobProgMsg(logMsg, logMsg.item)
elif logMsg.item_type == 'job_data':
self.__addJobDataMsg(logMsg)
#else something has gone screw-ball.
else:
raise RuntimeError("Support for %s type of message not implemented." % logMsg.item_type)
return
def __addResMsg(self, logMsg):
"""Unpack a Reservation Message when a Reservation is created."""
res_data_record = self.daos['RESERVATION_DATA'].table.getRecord({
'CYCLE': int(logMsg.item.cycle),
'CYCLEID': logMsg.item.cycle_id,
'DURATION': logMsg.item.duration,
'NAME':logMsg.item.name,
'QUEUE': logMsg.item.queue,
'RESID': logMsg.item.res_id,
'START': logMsg.item.start,
'PROJECT': logMsg.item.project,
'BLOCK_PASSTHROUGH': logMsg.item.block_passthrough
})
res_data_id = 1
res_data_id = self.daos['RESERVATION_DATA'].insert(res_data_record)
part_list = logMsg.item.partitions.split(':')
if part_list[0] != '':
for partition in part_list:
res_partitions_record = self.daos['RESERVATION_PARTS'].table.getRecord({
'RES_DATA_ID': res_data_id,
'NAME': partition
})
self.daos['RESERVATION_PARTS'].insert(res_partitions_record)