00001
00002
00003 import datetime
00004 import getopt
00005 import os
00006 import re
00007 import sys
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 def usage():
00018 print """Database Updater Utility
00019
00020 Note: Before invoking this utility the following database environmental variables must be set:-
00021
00022 SK_TSQL_URL This is a semi-colon separated list of URLs. Each URL takes the form:-
00023 protocol://host[:port]/[database]
00024 where:
00025 protocol - mysql
00026 host - host name or IP address of database server
00027 port - port number
00028 database - name of database
00029 Example: mysql://myhost:3306/test
00030
00031 SK_TSQL_USER The account user name. If different names for different databases in the cascade
00032 then this can be a semi-colon separated list in the same order as SK_TSQL_URL.
00033 If the list is shorter than that list, then the first entry is used for the missing entries.
00034
00035 SK_TSQL_PSWD The account password here. As with SK_TSQL_USER it can be a semi-colon separated list
00036 with the first entry providing the default if the list is shorter than SK_TSQL_URL.
00037
00038 Although the oaOfflineDatabase supports multi-database environmental variables this utility ALWAYS
00039 uses the first entry.
00040
00041 Invocation: database_updater.py [<options>] <command> [<arg>]
00042
00043 <options> are:-
00044
00045 --convert_unsigned
00046 Convert ANY integer data > 2147483647 to signed before
00047 storing by subtracting 4294967296. Useful for unsigned channel IDs.
00048 CAUTION: Don't use if table contains BIGINTs.
00049 --debug Print out all MySQL commands
00050 --help Prints this help.
00051
00052 <command> [<arg>] is one of:-
00053
00054 apply_global_update <update-file>
00055 Applies update using global sequence numbers (SEQNOs)
00056 - fails if not authorising DB i.e. does not have a GLOBALSEQNO table
00057
00058 apply_local_update <update-file>
00059 Applies update using local sequence numbers (SEQNOs)
00060
00061 drop_table <table-name>
00062 Removes table (main and VLD) and any entry in GLOBALSEQNO and LOCALSEQNO tables
00063 - asks confirmation if DB is authorising i.e. has a GLOBALSEQNO table
00064
00065 <update-file> can contain any number of the following:-
00066
00067 o Blank lines and comments (# in column 1)
00068
00069 o SQL <sql-command>;
00070 Any arbitrary SQL command that ends with a ';'. Can span several lines.
00071 e.g. SQL drop table
00072 if exists DEMO_DB_TABLE;
00073
00074 o BEGIN_TABLE <table-name> <start-date> <end-date> <aggregate-number> <creation-date> {<task>} {<key>=<value> ...}
00075 If not supplied a value of 0 is assumed for <task>.
00076 Valid <key>=<value> sets are:-
00077 SIMMASK=<value> where <value> is one of: 'Data', 'MC' or 'all'
00078 EPOCH=<value> where <value> is a small integer in range 0..100
00079 Note: If creating the VLD table, this forces it to have EPOCH and REALITY columns
00080 Although it is not required, this should be used for all new entries.
00081 If using a <key>=<value> set the task value must also be supplied.
00082 Followed by one or more rows of data, one per line in the form <value>,<value>, ... <value>
00083 This utility supplies both SEQNO and ROW_COUNTER to the start of each row
00084 e.g. BEGIN_TABLE DEMO_DB_TABLE '2009-01-01 00:00:00' '2009-02-00 00:00:00' 0 '2009-04-07 18:00:00' 0 EPOCH=0
00085 -2006450160, 101, 201, 301, 1.01, 2.01, 3.01
00086 -2006450170, 102, 202, 302, 1.02, 2.02, 3.02
00087
00088 """
00089
00090
00091
00092
00093 class DatabaseInterface:
00094 """A simple interface to a MySQL Database.
00095
00096 Makes no assumption about the tables that the database holds."""
00097
00098 def __init__(self,parent,testConnection=1):
00099
00100
00101
00102 self.parent = parent
00103 self.debug = parent.debug
00104 self.access_string = ""
00105 self.SetAccessString()
00106 self.public_access_string = re.compile(r'--password=\S+').sub('--password=XXX',self.access_string)
00107 self.results_file = '/tmp/mysql_results_%d.tmp' % os.getpid()
00108 self.results = []
00109
00110 if testConnection:
00111 if self.IsOK(): print 'DatabaseInterface initialising with account info: ' + self.public_access_string
00112 else: print 'MySQL DBI connection failed with account info: ' + self.public_access_string
00113
00114
00115
00116 def IsOK(self):
00117 """Return True if able to execute queries."""
00118 if not self.access_string: return False
00119
00120 return self.Query('select 1',False)
00121
00122
00123
00124 def GetResults(self):
00125 """Return the results from the last query."""
00126 return self.results
00127
00128
00129
00130 def Query(self,sql,log_error = True):
00131 """Apply query and return True if successful. To get the results themselves call GetResults()
00132
00133 Unless log_error is False, errors are logged."""
00134
00135 self.results = []
00136 cmd = 'mysql %s --execute="%s" > %s 2>&1' % (self.access_string,sql,self.results_file)
00137 if self.debug: print 'About to execute: ' + cmd
00138 exit_code = os.system(cmd)
00139 if os.path.isfile(self.results_file):
00140 log_results = exit_code and log_error
00141 if log_results: print 'SQL query failed with error code %d' % exit_code
00142 file_results = open(self.results_file)
00143 for line in file_results:
00144 line = line.strip()
00145 if log_results: print ' SQL log:' + line
00146 self.results.append(line)
00147 os.remove(self.results_file)
00148 return exit_code == 0
00149
00150
00151
00152 def SetAccessString(self):
00153 """Prepare self.SetAccessString from TSQL environment."""
00154 self.access_string = ""
00155 env_url = os.environ.get('SK_TSQL_URL')
00156 env_user = os.environ.get('SK_TSQL_USER')
00157 env_pswd = os.environ.get('SK_TSQL_PSWD')
00158 if not env_url or not env_user or not env_pswd:
00159 print 'Cannot connect to database. One or more of the environmental variables:-'
00160 print ' SK_TSQL_URL, SK_TSQL_USER or SK_TSQL_PSWD'
00161 print 'is not defined.'
00162 return
00163
00164 mo = re.search(r'//(.*?)/(\w+)',env_url)
00165 if not mo:
00166 print 'Cannot parse the environmental variables SK_TSQL_URL'
00167 return
00168 (server,db) = mo.groups()
00169 port_opt = ''
00170 mo = re.search(r'^(.*):(\d+)$',server)
00171 if mo:
00172 server = mo.group(1)
00173 port_opt = '--port=' + mo.group(2)
00174
00175 user = env_user
00176 mo = re.search(r'(.*?);',env_user)
00177 if mo: user = mo.group(1)
00178
00179 pswd = env_pswd
00180 mo = re.search(r'(.*?);',env_pswd)
00181 if mo: pswd = mo.group(1)
00182 pswd_opt = '--password=' + pswd
00183 if pswd == '\\0': pswd_opt = ''
00184 self.access_string = '%s --host %s %s --user=%s %s' % (db,server,port_opt,user,pswd_opt)
00185
00186
00187
00188 def TableExists(self,table_name):
00189 """Return True if table exists."""
00190 return self.Query('describe ' + table_name,False)
00191
00192
00193
00194
00195
00196 class TableUpdate :
00197 """A table update for a single aggregate."""
00198
00199 def __init__(self,parent,begin_line,is_global):
00200 self.parent = parent
00201 self.debug = parent.debug
00202 self.dbi = parent.dbi
00203 self.is_global = is_global
00204 self.failed = False
00205 self.applied = False
00206 self.num_conv_unsign = 0
00207 self.rows = []
00208 self.table_name = ""
00209 self.start_date = ""
00210 self.end_date = ""
00211 self.detectormask = "1"
00212 self.simmask = "1"
00213 self.epoch = ""
00214 self.aggregate = ""
00215 self.task = "0"
00216 self.creation_date = ""
00217 self.seqno = ""
00218
00219
00220 mo = re.search(r"^BEGIN_TABLE\s+(\w+)\s+'(.*?)'\s+'(.*?)'\s+(\d+)\s+'(.*?)'(|\s+(.*))$",begin_line)
00221
00222
00223
00224 if not mo:
00225 print "Failing update; cannot parse line: " + begin_line
00226 self.failed = True
00227 return
00228 (self.table_name,self.start_date,self.end_date,self.aggregate,self.creation_date,dummy,options) = mo.groups()
00229 if options:
00230
00231
00232 options = options.lower()
00233
00234
00235 mo = re.match(r"\s*(\d+)(|\s+(.*))$",options)
00236 if mo: (self.task,dummy,options) = mo.groups()
00237 while options:
00238 mo = re.search(r"^(\S+)=(\S+)(|\s+(.*))$",options)
00239 if not mo:
00240 print "Failing update; cannot parse options: '%s'" % options
00241 self.failed = True
00242 return
00243 (key,value,dummy,options) = mo.groups()
00244
00245
00246 if key == "task":
00247 print "TASK from key/value"
00248 self.task = int(value)
00249
00250
00251 elif key == "simmask":
00252 if value == "data" : self.simmask = "1"
00253 elif value == "mc" : self.simmask = "4"
00254 elif value == "all" : self.simmask = "-1"
00255 else:
00256 print "Failing update; bad SIMMASK option value: '%s'" % value
00257 self.failed = True
00258 return
00259
00260
00261 elif key == "epoch":
00262 try:
00263 val_int = int(value)
00264 if val_int >= 0 and val_int <= 100: self.epoch = value
00265 else: raise
00266 except:
00267 print "Failing update; bad EPOCH option value: '%s'" % value
00268 self.failed = True
00269 return
00270
00271 else:
00272 print "Failing update; bad option key: '%s'" % key
00273 self.failed = True
00274 return
00275
00276 if not self.parent.dbi.TableExists(self.table_name):
00277 print "Failing update; table %s does not exist." % self.table_name
00278 self.failed = True
00279 return
00280 for date in (self.start_date,self.end_date,self.creation_date):
00281 if not re.search(r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d',date):
00282 print "Failing update; bad date '%s' in %s." % (date,begin_line)
00283 self.failed = True
00284 return
00285
00286
00287
00288 def AddRow(self,row_line):
00289 """Add a row."""
00290
00291
00292 if ( not self.parent.convert_unsigned ) :
00293 self.rows.append(row_line.strip())
00294 return
00295 converted_line = ""
00296 for value in row_line.strip().split(','):
00297 try:
00298 int_value = int(value)
00299 if int_value > 2147483647:
00300 int_value -= 4294967296
00301 self.num_conv_unsign += 1
00302 value = str(int_value)
00303 except: pass
00304 if converted_line: converted_line += ","
00305 converted_line += value
00306 self.rows.append(converted_line)
00307
00308
00309
00310
00311 def Apply(self):
00312 """Apply update and return True if successful.
00313 The table must already exist but it's VLD will be created if
00314 necessary.
00315 If it fails it attempts to remove any data already committed."""
00316
00317 if not self.CanApply(): return False
00318
00319
00320 if not self.dbi.TableExists(self.table_name + "VLD"):
00321 print " Creating table %sVLD" % self.table_name
00322 sql = "CREATE TABLE %sVLD (" % self.table_name
00323 sql += "SEQNO integer not null primary key,"
00324 sql += "TIMESTART datetime not null,"
00325 sql += "TIMEEND datetime not null,"
00326 if self.epoch != "":
00327 sql += "EPOCH tinyint(4),"
00328 sql += "REALITY tinyint(4),"
00329 sql += "DETECTORMASK tinyint(4),"
00330 sql += "SIMMASK tinyint(4),"
00331 sql += "TASK integer,"
00332 sql += "AGGREGATENO integer,"
00333 sql += "CREATIONDATE datetime not null,"
00334 sql += "INSERTDATE datetime not null,"
00335 sql += "key TIMESTART (TIMESTART), "
00336 sql += "key TIMEEND (TIMEEND));"
00337 if not self.dbi.Query(sql): return False
00338
00339 print " Applying update to table %s for validity range '%s' - '%s' aggregate %s task %s..." %\
00340 (self.table_name,self.start_date,self.end_date,self.aggregate,self.task)
00341 if self.num_conv_unsign: print " (%d unsigned integer converted before applying update)" % self.num_conv_unsign
00342 print " requesting sequence number ... ",
00343 require_global = -1
00344 if self.is_global: require_global = 1
00345 cmd = 'allocate_seq_no.exe %s %d | tail -1' % (self.table_name,require_global)
00346 if self.debug: print "About to execute " + cmd
00347 inp = os.popen(cmd,"r")
00348 line = inp.readline()
00349 mo = re.search(r'(\d+)',line)
00350 if mo: self.seqno = int(mo.group(1))
00351 if not self.seqno:
00352 print "failed to allocate a sequence number!"
00353 self.failed = True
00354 return False
00355 print "allocated sequence number %d" % self.seqno
00356
00357
00358 self.RemoveSeqno()
00359
00360
00361
00362
00363
00364 NUM_ROWS_PER_QUERY = 100
00365 row_num = 0
00366 num_rows = len(self.rows)
00367 sql = ""
00368
00369 while row_num < num_rows:
00370 row = self.rows[row_num]
00371 row_num += 1
00372 if not sql: sql = "INSERT INTO %s VALUES " % self.table_name
00373 sql += "(%d, %d, %s)" % (self.seqno,row_num,row)
00374 if row_num == num_rows or row_num % NUM_ROWS_PER_QUERY == 0:
00375 if not self.dbi.Query(sql):
00376 print "Update has failed, attempting to remove any data."
00377 self.RemoveSeqno()
00378 return False
00379 sql = ""
00380 else: sql += ","
00381
00382
00383
00384 sql = "INSERT INTO %sVLD VALUES (" % self.table_name
00385 sql += "%d, '%s', '%s'," % \
00386 (self.seqno,self.start_date,self.end_date)
00387 if self.epoch != "": sql += "%s, 0," % self.epoch
00388 sql += "'%s', '%s', %s, %s, '%s', '%s')" % \
00389 (self.detectormask,self.simmask,self.task,self.aggregate,self.creation_date,\
00390 datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S"))
00391 if not self.dbi.Query(sql):
00392 print "Update has failed, attempting to remove any data."
00393 self.RemoveSeqno()
00394 return False
00395
00396 self.applied = True
00397 return True
00398
00399
00400
00401 def CanApply(self):
00402 """Return True if no errors have been found processing the data,
00403 it has not already applied and it has some rows."""
00404 return not self.failed and not self.applied and self.rows
00405
00406
00407
00408 def RemoveSeqno(self):
00409 """Attempt to remove sequence number."""
00410 self.dbi.Query("DELETE FROM %s WHERE SEQNO = %d" % (self.table_name,self.seqno))
00411 self.dbi.Query("DELETE FROM %sVLD WHERE SEQNO = %d" % (self.table_name,self.seqno))
00412
00413
00414
00415
00416
00417 class DatabaseUpdater :
00418 """A database updater that applies ASCII update files.
00419
00420 The constructor also invokes command execution."""
00421
00422 def __init__(self):
00423
00424
00425 self.convert_unsigned = False
00426 self.debug = 0
00427 self.command = ''
00428 self.args = ''
00429 self.ParseCommand()
00430
00431
00432 self.dbi = DatabaseInterface(self)
00433 if not self.dbi.IsOK():
00434 print "\nCannot connect to database, please check values of SK_TSQL_URL/USER/PSWD \nAborting."
00435 usage()
00436 return
00437
00438
00439 if self.command == "apply_global_update": self.ExecuteApplyUpdate(True)
00440 elif self.command == "apply_local_update": self.ExecuteApplyUpdate(False)
00441 elif self.command == "drop_table": self.ExecuteDropTable()
00442 else:
00443 print "\nUnknown command %s.\n" % self.command
00444 usage()
00445 return
00446
00447
00448
00449 def ExecuteApplyUpdate(self,is_global):
00450 """Apply updates either local or global sequence numbers."""
00451
00452 update_file = self.args[0]
00453
00454 if not os.path.isfile(update_file):
00455 print "\n Cannot '%s'; cannot find update file '%s'" % (self.command,update_file)
00456 return
00457
00458 if is_global and not self.IsAuthorising():
00459 print "\n Cannot 'apply_global_update'; connected database is not authorising i.e. does not have a GLOBALSEQNO table"
00460 return
00461
00462
00463 tu = None
00464 file_update = open(update_file)
00465 for line in file_update:
00466
00467 if re.search(r'^\s*$',line) or re.search(r'^\s*#',line): continue
00468
00469 mo = re.search(r'^\s*SQL\s+(.*)',line)
00470 if mo:
00471 if not self.ExecuteSQL(mo.group(1),file_update): return
00472 continue
00473 if not tu or re.search(r'^\s*BEGIN_TABLE',line):
00474
00475 if tu:
00476 if not tu.Apply():
00477 print "\n Aborting '%s'; update file '%s' contains a bad update." % (self.command,update_file)
00478 return
00479 tu = None
00480 tu = TableUpdate(self,line,is_global)
00481 else: tu.AddRow(line)
00482 file_update.close()
00483
00484
00485 if not tu:
00486 print "\n Cannot '%s'; update file '%s' is empty." % (self.command,update_file)
00487 return
00488 if not tu.Apply():
00489 print "\n Aborting '%s'; update file '%s' contains a bad update." % (self.command,update_file)
00490 return
00491
00492
00493
00494
00495 def ExecuteDropTable(self):
00496 """Removes table (main and VLD) and any entry in GLOBALSEQNO and LOCALSEQNO tables."""
00497
00498 table_name = self.args[0]
00499 if not self.dbi.TableExists('table_name'):
00500 print 'Database does not have table %s' % table_name
00501 else:
00502
00503
00504 print '\nAre you sure that you want to remove tables %s and %sVLD Ans:[y or n]' % (table_name,table_name),
00505 ans = raw_input()
00506 if ans[0] != 'y' and ans[0] != 'Y':
00507 print "\nRequest not confirmed, table not deleted."
00508 return
00509 if self.IsAuthorising():
00510 print '\nAre you sure REALLY sure - you are connected to an authorising (Master) database Ans:[y or n]',
00511 ans = raw_input()
00512 if ans[0] != 'y' and ans[0] != 'Y':
00513 print "\nRequest not confirmed, table not deleted."
00514 return
00515
00516
00517 for table in (table_name, table_name + 'VLD'):
00518 if self.dbi.TableExists(table):
00519 print "Dropping table " + table
00520 if not self.dbi.Query('DROP TABLE %s' % table):
00521 print "\nFailed to drop table %s." % table
00522 return
00523
00524
00525 for seqno_table in ('LOCALSEQNO','GLOBALSEQNO'):
00526 if self.dbi.TableExists(seqno_table):
00527 print "Removing entry from %s" % seqno_table
00528 self.dbi.Query("DELETE FROM %s WHERE TABLENAME = '%s'" % (seqno_table,table_name))
00529
00530
00531
00532 def ExecuteSQL(self,sql_start,file_update):
00533 """Assemble and execute SQL command. Return True if no errors when executed."""
00534 sql = sql_start.strip()
00535 if not re.search(r';\s*$',sql):
00536 for line in file_update:
00537 sql += line.strip()
00538 if re.search(r';\s*$',sql): break
00539 return self.dbi.Query(sql);
00540
00541
00542
00543
00544 def IsAuthorising(self):
00545 """Return True if database is authorising i.e. has a GLOBALSEQNO table."""
00546 return self.dbi.TableExists('GLOBALSEQNO')
00547
00548
00549
00550 def ParseCommand(self):
00551 try: opts, args = getopt.getopt(sys.argv[1:], "cdh", ["convert_unsigned","debug","help"])
00552 except getopt.GetoptError, err:
00553 print str(err)
00554 usage()
00555 sys.exit(2)
00556
00557
00558 for o, a in opts:
00559
00560 if o in ("-c","--convert_unsigned"): self.convert_unsigned = True
00561
00562 elif o in ("-d", "--debug"): self.debug = 1
00563
00564 elif o in ("-h", "--help"):
00565 usage()
00566 sys.exit()
00567 else:
00568 assert False, "unhandled option: " + o
00569
00570
00571 if not args:
00572 print "\nNo command supplied.\n"
00573 usage()
00574 sys.exit()
00575 self.command = args.pop(0)
00576 if not args:
00577 print "\nNo command args supplied.\n"
00578 usage()
00579 sys.exit()
00580 self.args = args
00581
00582
00583
00584 if __name__ == "__main__":
00585 dbu = DatabaseUpdater()
00586