MIDAS
Loading...
Searching...
No Matches
history_schema.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: history_schema.cxx
4 Created by: Konstantin Olchanski
5
6 Contents: Schema based MIDAS history. Available drivers:
7 FileHistory: storage of data in binary files (replacement for the traditional MIDAS history)
8 MysqlHistory: storage of data in MySQL database (replacement for the ODBC based SQL history)
9 PgsqlHistory: storage of data in PostgreSQL database
10 SqliteHistory: storage of data in SQLITE3 database (not suitable for production use)
11
12\********************************************************************/
13
14#undef NDEBUG // midas required assert() to be always enabled
15
16#include "midas.h"
17#include "msystem.h"
18#include "mstrlcpy.h"
19
20#include <math.h>
21
22#include <vector>
23#include <list>
24#include <string>
25#include <map>
26#include <algorithm>
27
28// make mysql/my_global.h happy - it redefines closesocket()
29#undef closesocket
30
31//
32// benchmarks
33//
34// /usr/bin/time ./linux/bin/mh2sql . /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
35// -rw-r--r-- 1 alpha users 161028048 Oct 19 2012 /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
36// flush 10000, sync=OFF -> 51.51user 1.51system 0:53.76elapsed 98%CPU
37// flush 1000000, sync=NORMAL -> 51.83user 2.09system 1:08.37elapsed 78%CPU (flush never activated)
38// flush 100000, sync=NORMAL -> 51.38user 1.94system 1:06.94elapsed 79%CPU
39// flush 10000, sync=NORMAL -> 51.37user 2.03system 1:31.63elapsed 58%CPU
40// flush 1000, sync=NORMAL -> 52.16user 2.70system 4:38.58elapsed 19%CPU
41
43// MIDAS includes //
45
46#include "midas.h"
47#include "history.h"
48
50// helper stuff //
52
53#define FREE(x) { if (x) free(x); (x) = NULL; }
54
55static char* skip_spaces(char* s)
56{
57 while (*s) {
58 if (!isspace(*s))
59 break;
60 s++;
61 }
62 return s;
63}
64
65static std::string TimeToString(time_t t)
66{
67 const char* sign = "";
68
69 if (t == 0)
70 return "0";
71
72 time_t tt = t;
73
74 if (t < 0) {
75 sign = "-";
76 tt = -t;
77 }
78
79 assert(tt > 0);
80
81 std::string v;
82 while (tt) {
83 char c = '0' + tt%10;
84 tt /= 10;
85 v = c + v;
86 }
87
88 v = sign + v;
89
90 //printf("time %.0f -> %s\n", (double)t, v.c_str());
91
92 return v;
93}
94
95static std::string SmallIntToString(int i)
96{
97 //int ii = i;
98
99 if (i == 0)
100 return "0";
101
102 assert(i > 0);
103
104 std::string v;
105 while (i) {
106 char c = '0' + i%10;
107 i /= 10;
108 v = c + v;
109 }
110
111 //printf("SmallIntToString: %d -> %s\n", ii, v.c_str());
112
113 return v;
114}
115
116static bool MatchEventName(const char* event_name, const char* var_event_name)
117{
118 // new-style event name: "equipment_name/variable_name:tag_name"
119 // old-style event name: "equipment_name:tag_name" ("variable_name" is missing)
121
122 //printf("looking for event_name [%s], try table [%s] event name [%s], new style [%d]\n", var_event_name, table_name, event_name, newStyleEventName);
123
124 if (strcasecmp(event_name, var_event_name) == 0) {
125 return true;
126 } else if (newStyleEventName) {
127 return false;
128 } else { // for old style names, need more parsing
129 bool match = false;
130
131 const char* s = event_name;
132 for (int j=0; s[j]; j++) {
133
134 if ((var_event_name[j]==0) && (s[j]=='/')) {
135 match = true;
136 break;
137 }
138
139 if ((var_event_name[j]==0) && (s[j]=='_')) {
140 match = true;
141 break;
142 }
143
144 if (var_event_name[j]==0) {
145 match = false;
146 break;
147 }
148
149 if (tolower(var_event_name[j]) != tolower(s[j])) { // does not work for UTF-8 Unicode
150 match = false;
151 break;
152 }
153 }
154
155 return match;
156 }
157}
158
159static bool MatchTagName(const char* tag_name, int n_data, const char* var_tag_name, const int var_tag_index)
160{
161 char alt_tag_name[1024]; // maybe this is an array without "Names"?
163
164 //printf(" looking for tag [%s] alt [%s], try column name [%s]\n", var_tag_name, alt_tag_name, tag_name);
165
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
168 return true;
169
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
171 return true;
172
173 return false;
174}
175
176static void PrintTags(int ntags, const TAG tags[])
177{
178 for (int i=0; i<ntags; i++)
179 printf("tag %d: %s %s[%d]\n", i, rpc_tid_name(tags[i].type), tags[i].name, tags[i].n_data);
180}
181
182// convert MIDAS event name to something acceptable as an SQL identifier - table name, column name, etc
183
184static std::string MidasNameToSqlName(const char* s)
185{
186 std::string out;
187
188 for (int i=0; s[i]!=0; i++) {
189 char c = s[i];
190 if (isalpha(c) || isdigit(c))
191 out += tolower(c); // does not work for UTF-8 Unicode
192 else
193 out += '_';
194 }
195
196 return out;
197}
198
199// convert MIDAS event name to something acceptable as a file name
200
201static std::string MidasNameToFileName(const char* s)
202{
203 std::string out;
204
205 for (int i=0; s[i]!=0; i++) {
206 char c = s[i];
207 if (isalpha(c) || isdigit(c))
208 out += tolower(c); // does not work for UTF-8 Unicode
209 else
210 out += '_';
211 }
212
213 return out;
214}
215
216// compare event names
217
218static int event_name_cmp(const std::string& e1, const char* e2)
219{
220 return strcasecmp(e1.c_str(), e2);
221}
222
223// compare variable names
224
225static int var_name_cmp(const std::string& v1, const char* v2)
226{
227 return strcasecmp(v1.c_str(), v2);
228}
229
231// SQL data types //
233
234#ifdef HAVE_SQLITE
235static const char *sql_type_sqlite[TID_LAST] = {
236 "xxxINVALIDxxxNULL", // TID_NULL
237 "INTEGER", // TID_UINT8
238 "INTEGER", // TID_INT8
239 "TEXT", // TID_CHAR
240 "INTEGER", // TID_UINT16
241 "INTEGER", // TID_INT16
242 "INTEGER", // TID_UINT32
243 "INTEGER", // TID_INT32
244 "INTEGER", // TID_BOOL
245 "REAL", // TID_FLOAT
246 "REAL", // TID_DOUBLE
247 "INTEGER", // TID_BITFIELD
248 "TEXT", // TID_STRING
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
251 "xxxINVALIDxxxKEY",
252 "xxxINVALIDxxxLINK"
253};
254#endif
255
256#ifdef HAVE_PGSQL
257static const char *sql_type_pgsql[TID_LAST] = {
258 "xxxINVALIDxxxNULL", // TID_NULL
259 "smallint", // TID_BYTE
260 "smallint", // TID_SBYTE
261 "char(1)", // TID_CHAR
262 "integer", // TID_WORD
263 "smallint", // TID_SHORT
264 "bigint", // TID_DWORD
265 "integer", // TID_INT
266 "smallint", // TID_BOOL
267 "real", // TID_FLOAT
268 "double precision", // TID_DOUBLE
269 "bigint", // TID_BITFIELD
270 "text", // TID_STRING
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
273 "xxxINVALIDxxxKEY",
274 "xxxINVALIDxxxLINK"
275};
276#endif
277
278#ifdef HAVE_MYSQL
279static const char *sql_type_mysql[TID_LAST] = {
280 "xxxINVALIDxxxNULL", // TID_NULL
281 "tinyint unsigned", // TID_BYTE
282 "tinyint", // TID_SBYTE
283 "char", // TID_CHAR
284 "smallint unsigned", // TID_WORD
285 "smallint", // TID_SHORT
286 "integer unsigned", // TID_DWORD
287 "integer", // TID_INT
288 "tinyint", // TID_BOOL
289 "float", // TID_FLOAT
290 "double", // TID_DOUBLE
291 "integer unsigned", // TID_BITFIELD
292 "VARCHAR", // TID_STRING
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
295 "xxxINVALIDxxxKEY",
296 "xxxINVALIDxxxLINK"
297};
298#endif
299
300void DoctorPgsqlColumnType(std::string* col_type, const char* index_type)
301{
302 if (*col_type == index_type)
303 return;
304
305 if (*col_type == "bigint" && strcmp(index_type, "int8")==0) {
307 return;
308 }
309
310 if (*col_type == "integer" && strcmp(index_type, "int4")==0) {
312 return;
313 }
314
315 if (*col_type == "smallint" && strcmp(index_type, "int2")==0) {
317 return;
318 }
319
320 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
322 abort();
323}
324
325void DoctorSqlColumnType(std::string* col_type, const char* index_type)
326{
327 if (*col_type == index_type)
328 return;
329
330 if (*col_type == "int(10) unsigned" && strcmp(index_type, "integer unsigned")==0) {
332 return;
333 }
334
335 if (*col_type == "int(11)" && strcmp(index_type, "integer")==0) {
337 return;
338 }
339
340 if (*col_type == "integer" && strcmp(index_type, "int(11)")==0) {
342 return;
343 }
344
345 // MYSQL 8.0.23
346
347 if (*col_type == "int" && strcmp(index_type, "integer")==0) {
349 return;
350 }
351
352 if (*col_type == "int unsigned" && strcmp(index_type, "integer unsigned")==0) {
354 return;
355 }
356
357 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
359 abort();
360}
361
362#if 0
363static int sql2midasType_mysql(const char* name)
364{
365 for (int tid=0; tid<TID_LAST; tid++)
366 if (strcasecmp(name, sql_type_mysql[tid])==0)
367 return tid;
368 // FIXME!
369 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
370 return 0;
371}
372#endif
373
374#if 0
375static int sql2midasType_sqlite(const char* name)
376{
377 if (strcmp(name, "INTEGER") == 0)
378 return TID_INT;
379 if (strcmp(name, "REAL") == 0)
380 return TID_DOUBLE;
381 if (strcmp(name, "TEXT") == 0)
382 return TID_STRING;
383 // FIXME!
384 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
385 return 0;
386}
387#endif
388
390// Schema base classes //
392
394 std::string tag_name; // tag name from MIDAS
395 std::string tag_type; // tag type from MIDAS
396 std::string name; // entry name, same as tag_name except when read from SQL history when it could be the SQL column name
397 int type = 0; // MIDAS data type TID_xxx
398 int n_data = 0; // MIDAS array size
399 int n_bytes = 0; // n_data * size of MIDAS data type (only used by HsFileSchema?)
400};
401
403{
404public:
405
406 // event schema definitions
407 std::string fEventName;
410 std::vector<HsSchemaEntry> fVariables;
411 std::vector<int> fOffsets;
412 int fNumBytes = 0;
413
414 // run time data used by hs_write_event()
419
420 // schema disabled by write error
421 bool fDisabled = true;
422
423public:
424
425 HsSchema() // ctor
426 {
427 // empty
428 }
429
430 virtual void remove_inactive_columns() = 0; // used by SQL schemas
431 virtual void print(bool print_tags = true) const;
432 virtual ~HsSchema(); // dtor
433 virtual int flush_buffers() = 0;
434 virtual int close() = 0;
435 virtual int write_event(const time_t t, const char* data, const int data_size) = 0;
436 virtual int match_event_var(const char* event_name, const char* var_name, const int var_index);
437 virtual int read_last_written(const time_t timestamp,
438 const int debug,
439 time_t* last_written) = 0;
440 virtual int read_data(const time_t start_time,
441 const time_t end_time,
442 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
443 const int debug,
444 std::vector<time_t>& last_time,
445 MidasHistoryBufferInterface* buffer[]) = 0;
446};
447
449{
450protected:
451 std::vector<HsSchema*> fData;
452
453public:
454 ~HsSchemaVector() { // dtor
455 clear();
456 }
457
459 return fData[index];
460 }
461
462 unsigned size() const {
463 return fData.size();
464 }
465
466 void add(HsSchema* s);
467
468 void clear() {
469 for (unsigned i=0; i<fData.size(); i++)
470 if (fData[i]) {
471 delete fData[i];
472 fData[i] = NULL;
473 }
474 fData.clear();
475 }
476
477 void print(bool print_tags = true) const {
478 for (unsigned i=0; i<fData.size(); i++)
480 }
481
482 HsSchema* find_event(const char* event_name, const time_t timestamp, int debug = 0);
483};
484
486// Base class functions //
488
490{
491 // only report if undersize/oversize happens more than once -
492 // the first occurence is already reported by hs_write_event()
493 if (fCountWriteUndersize > 1) {
494 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as few as %d bytes", fEventName.c_str(), fCountWriteUndersize, fNumBytes, fWriteMinSize);
495 }
496
497 if (fCountWriteOversize > 1) {
498 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as much as %d bytes", fEventName.c_str(), fCountWriteOversize, fNumBytes, fWriteMaxSize);
499 }
500};
501
503{
504 // schema list "data" is sorted by decreasing "fTimeFrom", newest schema first
505
506 //printf("add: %s..%s %s\n", TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), s->fEventName.c_str());
507
508 bool added = false;
509
510 for (auto it = fData.begin(); it != fData.end(); it++) {
511 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
512 if (s->fTimeFrom == (*it)->fTimeFrom) {
513 // duplicate schema, keep the last one added (for file schema it is the newer file)
514 s->fTimeTo = (*it)->fTimeTo;
515 delete (*it);
516 (*it) = s;
517 return;
518 }
519 }
520
521 if (s->fTimeFrom > (*it)->fTimeFrom) {
522 fData.insert(it, s);
523 added = true;
524 break;
525 }
526 }
527
528 if (!added) {
529 fData.push_back(s);
530 }
531
532 //time_t oldest_time_from = fData.back()->fTimeFrom;
533
534 time_t time_to = 0;
535
536 for (auto it = fData.begin(); it != fData.end(); it++) {
537 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
538 (*it)->fTimeTo = time_to;
539 time_to = (*it)->fTimeFrom;
540
541 //printf("vvv: %s..%s %s\n", TimeToString((*it)->fTimeFrom-oldest_time_from).c_str(), TimeToString((*it)->fTimeTo-oldest_time_from).c_str(), (*it)->fEventName.c_str());
542 }
543 }
544}
545
546HsSchema* HsSchemaVector::find_event(const char* event_name, time_t t, int debug)
547{
548 HsSchema* ss = NULL;
549
550 if (debug) {
551 printf("find_event: All schema for event %s: (total %d)\n", event_name, (int)fData.size());
552 int found = 0;
553 for (unsigned i=0; i<fData.size(); i++) {
554 HsSchema* s = fData[i];
555 printf("find_event: schema %d name [%s]\n", i, s->fEventName.c_str());
556 if (event_name)
557 if (event_name_cmp(s->fEventName, event_name)!=0)
558 continue;
559 s->print();
560 found++;
561 }
562 printf("find_event: Found %d schemas for event %s\n", found, event_name);
563
564 //if (found == 0)
565 // abort();
566 }
567
568 for (unsigned i=0; i<fData.size(); i++) {
569 HsSchema* s = fData[i];
570
571 // wrong event
572 if (event_name)
573 if (event_name_cmp(s->fEventName, event_name)!=0)
574 continue;
575
576 // schema is from after the time we are looking for
577 if (s->fTimeFrom > t)
578 continue;
579
580 if (!ss)
581 ss = s;
582
583 // remember the newest schema
584 if (s->fTimeFrom > ss->fTimeFrom)
585 ss = s;
586 }
587
588 // try to find
589 for (unsigned i=0; i<fData.size(); i++) {
590 HsSchema* s = fData[i];
591
592 // wrong event
593 if (event_name)
594 if (event_name_cmp(s->fEventName, event_name)!=0)
595 continue;
596
597 // schema is from after the time we are looking for
598 if (s->fTimeFrom > t)
599 continue;
600
601 if (!ss)
602 ss = s;
603
604 // remember the newest schema
605 if (s->fTimeFrom > ss->fTimeFrom)
606 ss = s;
607 }
608
609 if (debug) {
610 if (ss) {
611 printf("find_event: for time %s, returning:\n", TimeToString(t).c_str());
612 ss->print();
613 } else {
614 printf("find_event: for time %s, nothing found:\n", TimeToString(t).c_str());
615 }
616 }
617
618 return ss;
619}
620
622// Sql interface class //
624
625class SqlBase
626{
627public:
628 int fDebug = 0;
629 bool fIsConnected = false;
631
632 SqlBase() { // ctor
633 };
634
635 virtual ~SqlBase() { // dtor
636 // confirm that the destructor of the concrete class
637 // disconnected the database
638 assert(!fIsConnected);
639 fDebug = 0;
640 fIsConnected = false;
641 }
642
643 virtual int Connect(const char* path) = 0;
644 virtual int Disconnect() = 0;
645 virtual bool IsConnected() = 0;
646
647 virtual int ListTables(std::vector<std::string> *plist) = 0;
648 virtual int ListColumns(const char* table_name, std::vector<std::string> *plist) = 0;
649
650 // sql commands
651 virtual int Exec(const char* table_name, const char* sql) = 0;
652 virtual int ExecDisconnected(const char* table_name, const char* sql) = 0;
653
654 // queries
655 virtual int Prepare(const char* table_name, const char* sql) = 0;
656 virtual int Step() = 0;
657 virtual const char* GetText(int column) = 0;
658 virtual time_t GetTime(int column) = 0;
659 virtual double GetDouble(int column) = 0;
660 virtual int Finalize() = 0;
661
662 // transactions
663 virtual int OpenTransaction(const char* table_name) = 0;
664 virtual int CommitTransaction(const char* table_name) = 0;
665 virtual int RollbackTransaction(const char* table_name) = 0;
666
667 // data types
668 virtual const char* ColumnType(int midas_tid) = 0;
669 virtual bool TypesCompatible(int midas_tid, const char* sql_type) = 0;
670
671 // string quoting
672 virtual std::string QuoteString(const char* s) = 0; // quote text string
673 virtual std::string QuoteId(const char* s) = 0; // quote identifier, such as table or column name
674};
675
677// Schema concrete classes //
679
680class HsSqlSchema : public HsSchema
681{
682public:
683
685 std::string fTableName;
686 std::vector<std::string> fColumnNames;
687 std::vector<std::string> fColumnTypes;
688 std::vector<bool> fColumnInactive;
689
690public:
691
692 HsSqlSchema() // ctor
693 {
694 // empty
695 }
696
697 ~HsSqlSchema() // dtor
698 {
699 assert(get_transaction_count() == 0);
700 }
701
703 void print(bool print_tags = true) const;
707 int close_transaction();
709 int close() { return close_transaction(); }
710 int write_event(const time_t t, const char* data, const int data_size);
711 int match_event_var(const char* event_name, const char* var_name, const int var_index);
712 int read_last_written(const time_t timestamp,
713 const int debug,
714 time_t* last_written);
715 int read_data(const time_t start_time,
716 const time_t end_time,
717 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
718 const int debug,
719 std::vector<time_t>& last_time,
721
722private:
723 // Sqlite uses a transaction per table; MySQL uses a single transaction for all tables.
724 // But to support future "single transaction" DBs more easily (e.g. if user wants to
725 // log to both Postgres and MySQL in future), we keep track of the transaction count
726 // per SQL engine.
728 static std::map<SqlBase*, int> gfTransactionCount;
729};
730
731std::map<SqlBase*, int> HsSqlSchema::gfTransactionCount;
732
733class HsFileSchema : public HsSchema
734{
735public:
736
737 std::string fFileName;
738 int fRecordSize = 0;
739 int fDataOffset = 0;
740 int fLastSize = 0;
741 int fWriterFd = -1;
744
745public:
746
747 HsFileSchema() // ctor
748 {
749 // empty
750 }
751
753 {
754 close();
755 fRecordSize = 0;
756 fDataOffset = 0;
757 fLastSize = 0;
758 fWriterFd = -1;
759 if (fRecordBuffer) {
760 free(fRecordBuffer);
762 }
764 }
765
766 void remove_inactive_columns() { /* empty */ };
767 void print(bool print_tags = true) const;
768 int flush_buffers() { return HS_SUCCESS; };
769 int close();
770 int write_event(const time_t t, const char* data, const int data_size);
771 int read_last_written(const time_t timestamp,
772 const int debug,
773 time_t* last_written);
774 int read_data(const time_t start_time,
775 const time_t end_time,
776 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
777 const int debug,
778 std::vector<time_t>& last_time,
780};
781
783// Print functions //
785
787{
788 unsigned nv = this->fVariables.size();
789 printf("event [%s], time %s..%s, %d variables, %d bytes\n", this->fEventName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
790 if (print_tags)
791 for (unsigned j=0; j<nv; j++)
792 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
793};
794
796{
797 unsigned nv = this->fVariables.size();
798 printf("event [%s], sql_table [%s], time %s..%s, %d variables, %d bytes\n", this->fEventName.c_str(), this->fTableName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
799 if (print_tags) {
800 for (unsigned j=0; j<nv; j++) {
801 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes);
802 printf(", sql_column [%s], sql_type [%s], offset %d", this->fColumnNames[j].c_str(), this->fColumnTypes[j].c_str(), this->fOffsets[j]);
803 printf(", inactive %d", (int)this->fColumnInactive[j]);
804 printf("\n");
805 }
806 }
807}
808
810{
811 unsigned nv = this->fVariables.size();
812 printf("event [%s], file_name [%s], time %s..%s, %d variables, %d bytes, dat_offset %d, record_size %d\n", this->fEventName.c_str(), this->fFileName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes, fDataOffset, fRecordSize);
813 if (print_tags) {
814 for (unsigned j=0; j<nv; j++)
815 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
816 }
817}
818
820// File functions //
822
823#ifdef HAVE_MYSQL
824
826// MYSQL/MariaDB database access //
828
829//#warning !!!HAVE_MYSQL!!!
830
831//#include <my_global.h> // my_global.h removed MySQL 8.0, MariaDB 10.2. K.O.
832#include <mysql.h>
833
834class Mysql: public SqlBase
835{
836public:
837 std::string fConnectString;
838 MYSQL* fMysql = NULL;
839
840 // query results
843 int fNumFields = 0;
844
845 // disconnected operation
846 unsigned fMaxDisconnected = 0;
847 std::list<std::string> fDisconnectedBuffer;
850 int fDisconnectedLost = 0;
851
852 Mysql(); // ctor
853 ~Mysql(); // dtor
854
855 int Connect(const char* path);
856 int Disconnect();
857 bool IsConnected();
858
859 int ConnectTable(const char* table_name);
860
861 int ListTables(std::vector<std::string> *plist);
862 int ListColumns(const char* table_name, std::vector<std::string> *plist);
863
864 int Exec(const char* table_name, const char* sql);
865 int ExecDisconnected(const char* table_name, const char* sql);
866
867 int Prepare(const char* table_name, const char* sql);
868 int Step();
869 const char* GetText(int column);
870 time_t GetTime(int column);
871 double GetDouble(int column);
872 int Finalize();
873
874 int OpenTransaction(const char* table_name);
875 int CommitTransaction(const char* table_name);
876 int RollbackTransaction(const char* table_name);
877
878 const char* ColumnType(int midas_tid);
879 bool TypesCompatible(int midas_tid, const char* sql_type);
880
881 std::string QuoteId(const char* s);
882 std::string QuoteString(const char* s);
883};
884
885Mysql::Mysql() // ctor
886{
887 fMysql = NULL;
888 fResult = NULL;
889 fRow = NULL;
890 fNumFields = 0;
891 fMaxDisconnected = 1000;
892 fNextReconnect = 0;
895 fTransactionPerTable = false;
896}
897
898Mysql::~Mysql() // dtor
899{
900 Disconnect();
901 fMysql = NULL;
902 fResult = NULL;
903 fRow = NULL;
904 fNumFields = 0;
905 if (fDisconnectedBuffer.size() > 0) {
906 cm_msg(MINFO, "Mysql::~Mysql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
908 }
909}
910
911int Mysql::Connect(const char* connect_string)
912{
913 if (fIsConnected)
914 Disconnect();
915
916 fConnectString = connect_string;
917
918 if (fDebug) {
919 cm_msg(MINFO, "Mysql::Connect", "Connecting to Mysql database specified by \'%s\'", connect_string);
921 }
922
923 std::string host_name;
924 std::string user_name;
925 std::string user_password;
926 std::string db_name;
927 int tcp_port = 0;
928 std::string unix_socket;
929 std::string buffer;
930
931 FILE* fp = fopen(connect_string, "r");
932 if (!fp) {
933 cm_msg(MERROR, "Mysql::Connect", "Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
934 return DB_FILE_ERROR;
935 }
936
937 while (1) {
938 char buf[256];
939 char* s = fgets(buf, sizeof(buf), fp);
940 if (!s)
941 break; // EOF
942
943 char*ss;
944 // kill trailing \n and \r
945 ss = strchr(s, '\n');
946 if (ss) *ss = 0;
947 ss = strchr(s, '\r');
948 if (ss) *ss = 0;
949
950 //printf("line [%s]\n", s);
951
952 if (strncasecmp(s, "server=", 7)==0)
953 host_name = skip_spaces(s + 7);
954 if (strncasecmp(s, "port=", 5)==0)
955 tcp_port = atoi(skip_spaces(s + 5));
956 if (strncasecmp(s, "database=", 9)==0)
957 db_name = skip_spaces(s + 9);
958 if (strncasecmp(s, "socket=", 7)==0)
959 unix_socket = skip_spaces(s + 7);
960 if (strncasecmp(s, "user=", 5)==0)
961 user_name = skip_spaces(s + 5);
962 if (strncasecmp(s, "password=", 9)==0)
963 user_password = skip_spaces(s + 9);
964 if (strncasecmp(s, "buffer=", 7)==0)
965 buffer = skip_spaces(s + 7);
966 }
967
968 fclose(fp);
969
970 int buffer_int = atoi(buffer.c_str());
971
972 if (buffer_int > 0 && buffer_int < 1000000)
974
975 if (fDebug)
976 printf("Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
977
978 if (!fMysql) {
980 if (!fMysql) {
981 return DB_FILE_ERROR;
982 }
983 }
984
986
987 if (mysql_real_connect(fMysql, host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
988 cm_msg(MERROR, "Mysql::Connect", "mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", mysql_errno(fMysql), mysql_error(fMysql));
989 Disconnect();
990 return DB_FILE_ERROR;
991 }
992
993 int status;
994
995 // FIXME:
996 //my_bool reconnect = 0;
997 //mysql_options(&mysql, MYSQL_OPT_RECONNECT, &reconnect);
998
999 status = Exec("(notable)", "SET SESSION sql_mode='ANSI'");
1000 if (status != DB_SUCCESS) {
1001 cm_msg(MERROR, "Mysql::Connect", "Cannot set ANSI mode, nothing will work");
1002 Disconnect();
1003 return DB_FILE_ERROR;
1004 }
1005
1006 if (fDebug) {
1007 cm_msg(MINFO, "Mysql::Connect", "Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1009 }
1010
1011 fIsConnected = true;
1012
1013 int count = 0;
1014 while (fDisconnectedBuffer.size() > 0) {
1015 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1016 if (status != DB_SUCCESS) {
1017 return status;
1018 }
1019 fDisconnectedBuffer.pop_front();
1020 count++;
1021 }
1022
1023 if (count > 0) {
1024 cm_msg(MINFO, "Mysql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1026 }
1027
1028 assert(fDisconnectedBuffer.size() == 0);
1030
1031 return DB_SUCCESS;
1032}
1033
1034int Mysql::Disconnect()
1035{
1036 if (fRow) {
1037 // FIXME: mysql_free_result(fResult);
1038 }
1039
1040 if (fResult)
1042
1043 if (fMysql)
1045
1046 fMysql = NULL;
1047 fResult = NULL;
1048 fRow = NULL;
1049
1050 fIsConnected = false;
1051 return DB_SUCCESS;
1052}
1053
1054bool Mysql::IsConnected()
1055{
1056 return fIsConnected;
1057}
1058
1059int Mysql::OpenTransaction(const char* table_name)
1060{
1061 return Exec(table_name, "START TRANSACTION");
1062 return DB_SUCCESS;
1063}
1064
1065int Mysql::CommitTransaction(const char* table_name)
1066{
1067 Exec(table_name, "COMMIT");
1068 return DB_SUCCESS;
1069}
1070
1071int Mysql::RollbackTransaction(const char* table_name)
1072{
1073 Exec(table_name, "ROLLBACK");
1074 return DB_SUCCESS;
1075}
1076
1077int Mysql::ListTables(std::vector<std::string> *plist)
1078{
1079 if (!fIsConnected)
1080 return DB_FILE_ERROR;
1081
1082 if (fDebug)
1083 printf("Mysql::ListTables!\n");
1084
1085 int status;
1086
1088
1089 if (fResult == NULL) {
1090 cm_msg(MERROR, "Mysql::ListTables", "mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1091 return DB_FILE_ERROR;
1092 }
1093
1095
1096 while (1) {
1097 status = Step();
1098 if (status != DB_SUCCESS)
1099 break;
1100 std::string tn = GetText(0);
1101 plist->push_back(tn);
1102 };
1103
1104 status = Finalize();
1105
1106 return DB_SUCCESS;
1107}
1108
1109int Mysql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1110{
1111 if (!fIsConnected)
1112 return DB_FILE_ERROR;
1113
1114 if (fDebug)
1115 printf("Mysql::ListColumns for table \'%s\'\n", table_name);
1116
1117 int status;
1118
1119 std::string cmd;
1120 cmd += "SHOW COLUMNS FROM ";
1121 cmd += QuoteId(table_name);
1122 cmd += ";";
1123
1124 status = Prepare(table_name, cmd.c_str());
1125 if (status != DB_SUCCESS)
1126 return status;
1127
1129
1130 while (1) {
1131 status = Step();
1132 if (status != DB_SUCCESS)
1133 break;
1134 std::string cn = GetText(0);
1135 std::string ct = GetText(1);
1136 plist->push_back(cn);
1137 plist->push_back(ct);
1138 //printf("cn [%s]\n", cn.c_str());
1139 //for (int i=0; i<fNumFields; i++)
1140 //printf(" field[%d]: [%s]\n", i, GetText(i));
1141 };
1142
1143 status = Finalize();
1144
1145 return DB_SUCCESS;
1146}
1147
1148int Mysql::Exec(const char* table_name, const char* sql)
1149{
1150 if (fDebug)
1151 printf("Mysql::Exec(%s, %s)\n", table_name, sql);
1152
1153 // FIXME: match Sqlite::Exec() return values:
1154 // return values:
1155 // DB_SUCCESS
1156 // DB_FILE_ERROR: not connected
1157 // DB_KEY_EXIST: "table already exists"
1158
1159 if (!fMysql)
1160 return DB_FILE_ERROR;
1161
1162 assert(fMysql);
1163 assert(fResult == NULL); // there should be no unfinalized queries
1164 assert(fRow == NULL);
1165
1166 if (mysql_query(fMysql, sql)) {
1167 if (mysql_errno(fMysql) == 1050) { // "Table already exists"
1168 return DB_KEY_EXIST;
1169 }
1170 if (mysql_errno(fMysql) == 1146) { // "Table does not exist"
1171 return DB_FILE_ERROR;
1172 }
1173 cm_msg(MERROR, "Mysql::Exec", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1174 if (mysql_errno(fMysql) == 1060) // "Duplicate column name"
1175 return DB_KEY_EXIST;
1176 if (mysql_errno(fMysql) == 2006) { // "MySQL server has gone away"
1177 Disconnect();
1178 return ExecDisconnected(table_name, sql);
1179 }
1180 return DB_FILE_ERROR;
1181 }
1182
1183 return DB_SUCCESS;
1184}
1185
1186int Mysql::ExecDisconnected(const char* table_name, const char* sql)
1187{
1188 if (fDebug)
1189 printf("Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1190
1192 fDisconnectedBuffer.push_back(sql);
1193 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1194 cm_msg(MERROR, "Mysql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1195 }
1196 } else {
1198 }
1199
1200 time_t now = time(NULL);
1201
1202 if (fNextReconnect == 0 || now >= fNextReconnect) {
1203 int status = Connect(fConnectString.c_str());
1204 if (status == DB_SUCCESS) {
1205 fNextReconnect = 0;
1207 } else {
1208 if (fNextReconnectDelaySec == 0) {
1210 } else if (fNextReconnectDelaySec < 10*60) {
1212 }
1213 if (fDebug) {
1214 cm_msg(MINFO, "Mysql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1216 }
1218 }
1219 }
1220
1221 return DB_SUCCESS;
1222}
1223
1224int Mysql::Prepare(const char* table_name, const char* sql)
1225{
1226 if (fDebug)
1227 printf("Mysql::Prepare(%s, %s)\n", table_name, sql);
1228
1229 if (!fMysql)
1230 return DB_FILE_ERROR;
1231
1232 assert(fMysql);
1233 assert(fResult == NULL); // there should be no unfinalized queries
1234 assert(fRow == NULL);
1235
1236 // if (mysql_query(fMysql, sql)) {
1237 // cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1238 // return DB_FILE_ERROR;
1239 //}
1240
1241 // Check if the connection to MySQL timed out; fix from B. Smith
1242 int status = mysql_query(fMysql, sql);
1243 if (status) {
1244 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1245 // "MySQL server has gone away" or "Lost connection to MySQL server during query"
1246 status = Connect(fConnectString.c_str());
1247 if (status == DB_SUCCESS) {
1248 // Retry after reconnecting
1250 } else {
1251 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql, status);
1252 return DB_FILE_ERROR;
1253 }
1254 }
1255 if (status) {
1256 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1257 return DB_FILE_ERROR;
1258 }
1259 cm_msg(MINFO, "Mysql::Prepare", "Reconnected to MySQL after long inactivity.");
1260 }
1261
1263 //fResult = mysql_use_result(fMysql); // cannot use this because it blocks writing into table
1264
1265 if (!fResult) {
1266 cm_msg(MERROR, "Mysql::Prepare", "mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1267 return DB_FILE_ERROR;
1268 }
1269
1271
1272 //printf("num fields %d\n", fNumFields);
1273
1274 return DB_SUCCESS;
1275}
1276
1277int Mysql::Step()
1278{
1279 if (/* DISABLES CODE */ (0) && fDebug)
1280 printf("Mysql::Step()\n");
1281
1282 assert(fMysql);
1283 assert(fResult);
1284
1286
1287 if (fRow)
1288 return DB_SUCCESS;
1289
1290 if (mysql_errno(fMysql) == 0)
1291 return DB_NO_MORE_SUBKEYS;
1292
1293 cm_msg(MERROR, "Mysql::Step", "mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1294
1295 return DB_FILE_ERROR;
1296}
1297
1298const char* Mysql::GetText(int column)
1299{
1300 assert(fMysql);
1301 assert(fResult);
1302 assert(fRow);
1303 assert(fNumFields > 0);
1304 assert(column >= 0);
1305 assert(column < fNumFields);
1306 if (fRow[column] == NULL)
1307 return "";
1308 return fRow[column];
1309}
1310
1311double Mysql::GetDouble(int column)
1312{
1313 return atof(GetText(column));
1314}
1315
1316time_t Mysql::GetTime(int column)
1317{
1318 return strtoul(GetText(column), NULL, 0);
1319}
1320
1321int Mysql::Finalize()
1322{
1323 assert(fMysql);
1324 assert(fResult);
1325
1327 fResult = NULL;
1328 fRow = NULL;
1329 fNumFields = 0;
1330
1331 return DB_SUCCESS;
1332}
1333
1334const char* Mysql::ColumnType(int midas_tid)
1335{
1336 assert(midas_tid>=0);
1337 assert(midas_tid<TID_LAST);
1338 return sql_type_mysql[midas_tid];
1339}
1340
1341bool Mysql::TypesCompatible(int midas_tid, const char* sql_type)
1342{
1343 if (/* DISABLES CODE */ (0))
1344 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1345
1346 //if (sql2midasType_mysql(sql_type) == midas_tid)
1347 // return true;
1348
1349 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1350 return true;
1351
1352 // permit writing FLOAT into DOUBLE
1353 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double")==0)
1354 return true;
1355
1356 // T2K quirk!
1357 // permit writing BYTE into signed tinyint
1358 if (midas_tid==TID_BYTE && strcmp(sql_type, "tinyint")==0)
1359 return true;
1360
1361 // T2K quirk!
1362 // permit writing WORD into signed tinyint
1363 if (midas_tid==TID_WORD && strcmp(sql_type, "tinyint")==0)
1364 return true;
1365
1366 // mysql quirk!
1367 //if (midas_tid==TID_DWORD && strcmp(sql_type, "int(10) unsigned")==0)
1368 // return true;
1369
1370 if (/* DISABLES CODE */ (0))
1371 printf("type mismatch!\n");
1372
1373 return false;
1374}
1375
1376std::string Mysql::QuoteId(const char* s)
1377{
1378 std::string q;
1379 q += "`";
1380 q += s;
1381 q += "`";
1382 return q;
1383}
1384
1385std::string Mysql::QuoteString(const char* s)
1386{
1387 std::string q;
1388 q += "\'";
1389 q += s;
1390#if 0
1391 while (int c = *s++) {
1392 if (c == '\'') {
1393 q += "\\'";
1394 } if (c == '"') {
1395 q += "\\\"";
1396 } else if (isprint(c)) {
1397 q += c;
1398 } else {
1399 char buf[256];
1400 sprintf(buf, "\\\\x%02x", c&0xFF);
1401 q += buf;
1402 }
1403 }
1404#endif
1405 q += "\'";
1406 return q;
1407}
1408
1409#endif // HAVE_MYSQL
1410
1411#ifdef HAVE_PGSQL
1412
1414// PostgreSQL database access //
1416
1417//#warning !!!HAVE_PGSQL!!!
1418
1419#include <libpq-fe.h>
1420
1421class Pgsql: public SqlBase
1422{
1423public:
1424 std::string fConnectString;
1425 int fDownsample = 0;
1426 PGconn* fPgsql = NULL;
1427
1428 // query results
1430 int fNumFields = 0;
1431 int fRow = 0;
1432
1433 // disconnected operation
1434 unsigned fMaxDisconnected = 0;
1435 std::list<std::string> fDisconnectedBuffer;
1437 int fNextReconnectDelaySec = 0;
1438 int fDisconnectedLost = 0;
1439
1440 Pgsql(); // ctor
1441 ~Pgsql(); // dtor
1442
1443 int Connect(const char* path);
1444 int Disconnect();
1445 bool IsConnected();
1446
1447 int ConnectTable(const char* table_name);
1448
1449 int ListTables(std::vector<std::string> *plist);
1450 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1451
1452 int Exec(const char* table_name, const char* sql);
1453 int ExecDisconnected(const char* table_name, const char* sql);
1454
1455 int Prepare(const char* table_name, const char* sql);
1456 std::string BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints, const char* table_name, const char* column_name);
1457 int Step();
1458 const char* GetText(int column);
1459 time_t GetTime(int column);
1460 double GetDouble(int column);
1461 int Finalize();
1462
1463 int OpenTransaction(const char* table_name);
1464 int CommitTransaction(const char* table_name);
1465 int RollbackTransaction(const char* table_name);
1466
1467 const char* ColumnType(int midas_tid);
1468 bool TypesCompatible(int midas_tid, const char* sql_type);
1469
1470 std::string QuoteId(const char* s);
1471 std::string QuoteString(const char* s);
1472};
1473
1474Pgsql::Pgsql() // ctor
1475{
1476 fPgsql = NULL;
1477 fDownsample = 0;
1478 fResult = NULL;
1479 fRow = -1;
1480 fNumFields = 0;
1481 fMaxDisconnected = 1000;
1482 fNextReconnect = 0;
1485 fTransactionPerTable = false;
1486}
1487
1488Pgsql::~Pgsql() // dtor
1489{
1490 Disconnect();
1491 if(fResult)
1493 fRow = -1;
1494 fNumFields = 0;
1495 if (fDisconnectedBuffer.size() > 0) {
1496 cm_msg(MINFO, "Pgsql::~Pgsql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
1498 }
1499}
1500
1501int Pgsql::Connect(const char* connect_string)
1502{
1503 if (fIsConnected)
1504 Disconnect();
1505
1506 fConnectString = connect_string;
1507
1508 if (fDebug) {
1509 cm_msg(MINFO, "Pgsql::Connect", "Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1511 }
1512
1513 std::string host_name;
1514 std::string user_name;
1515 std::string user_password;
1516 std::string db_name;
1517 std::string tcp_port;
1518 std::string unix_socket;
1519 std::string buffer;
1520
1521 FILE* fp = fopen(connect_string, "r");
1522 if (!fp) {
1523 cm_msg(MERROR, "Pgsql::Connect", "Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1524 return DB_FILE_ERROR;
1525 }
1526
1527 while (1) {
1528 char buf[256];
1529 char* s = fgets(buf, sizeof(buf), fp);
1530 if (!s)
1531 break; // EOF
1532
1533 char*ss;
1534 // kill trailing \n and \r
1535 ss = strchr(s, '\n');
1536 if (ss) *ss = 0;
1537 ss = strchr(s, '\r');
1538 if (ss) *ss = 0;
1539
1540 //printf("line [%s]\n", s);
1541
1542 if (strncasecmp(s, "server=", 7)==0)
1543 host_name = skip_spaces(s + 7);
1544 if (strncasecmp(s, "port=", 5)==0)
1545 tcp_port = skip_spaces(s + 5);
1546 if (strncasecmp(s, "database=", 9)==0)
1547 db_name = skip_spaces(s + 9);
1548 if (strncasecmp(s, "socket=", 7)==0)
1549 unix_socket = skip_spaces(s + 7);
1550 if (strncasecmp(s, "user=", 5)==0)
1551 user_name = skip_spaces(s + 5);
1552 if (strncasecmp(s, "password=", 9)==0)
1553 user_password = skip_spaces(s + 9);
1554 if (strncasecmp(s, "buffer=", 7)==0)
1555 buffer = skip_spaces(s + 7);
1556 }
1557
1558 fclose(fp);
1559
1560 int buffer_int = atoi(buffer.c_str());
1561
1562 if (buffer_int > 0 && buffer_int < 1000000)
1564
1565 if (fDebug)
1566 printf("Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1567
1568 fPgsql = PQsetdbLogin(host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1569 if (PQstatus(fPgsql) != CONNECTION_OK) {
1570 std::string msg(PQerrorMessage(fPgsql));
1571 msg.erase(std::remove(msg.begin(), msg.end(), '\n'), msg.end());
1572 cm_msg(MERROR, "Pgsql::Connect", "PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", msg.c_str());
1573 Disconnect();
1574 return DB_FILE_ERROR;
1575 }
1576
1577 int status;
1578
1579 if (fDebug) {
1580 cm_msg(MINFO, "Pgsql::Connect", "Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1582 }
1583
1584 fIsConnected = true;
1585
1586 int count = 0;
1587 while (fDisconnectedBuffer.size() > 0) {
1588 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1589 if (status != DB_SUCCESS) {
1590 return status;
1591 }
1592 fDisconnectedBuffer.pop_front();
1593 count++;
1594 }
1595
1596 if (count > 0) {
1597 cm_msg(MINFO, "Pgsql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1599 }
1600
1601 assert(fDisconnectedBuffer.size() == 0);
1603
1604 if (fDownsample) {
1605 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb';");
1606
1607 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1608 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB extension not installed");
1609 return DB_FILE_ERROR;
1610 }
1611 Finalize();
1612
1613 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb_toolkit';");
1614
1615 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1616 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB_toolkit extension not installed");
1617 return DB_FILE_ERROR;
1618 }
1619 Finalize();
1620
1621 cm_msg(MINFO, "Pgsql::Connect", "TimescaleDB extensions found - downsampling enabled");
1622 }
1623
1624 return DB_SUCCESS;
1625}
1626
1627int Pgsql::Disconnect()
1628{
1629 if (fPgsql)
1631
1632 fPgsql = NULL;
1633 fRow = -1;
1634
1635 fIsConnected = false;
1636 return DB_SUCCESS;
1637}
1638
1639bool Pgsql::IsConnected()
1640{
1641 return fIsConnected;
1642}
1643
1644int Pgsql::OpenTransaction(const char* table_name)
1645{
1646 return Exec(table_name, "BEGIN TRANSACTION;");
1647}
1648
1649int Pgsql::CommitTransaction(const char* table_name)
1650{
1651 return Exec(table_name, "COMMIT;");
1652}
1653
1654int Pgsql::RollbackTransaction(const char* table_name)
1655{
1656 return Exec(table_name, "ROLLBACK;");
1657}
1658
1659int Pgsql::ListTables(std::vector<std::string> *plist)
1660{
1661 if (!fIsConnected)
1662 return DB_FILE_ERROR;
1663
1664 if (fDebug)
1665 printf("Pgsql::ListTables!\n");
1666
1667 int status = Prepare("pg_tables", "select tablename from pg_tables where schemaname = 'public';");
1668
1669 if (status != DB_SUCCESS) {
1670 cm_msg(MERROR, "Pgsql::ListTables", "error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1671 return DB_FILE_ERROR;
1672 }
1673
1674 while (1) {
1675 if (Step() != DB_SUCCESS)
1676 break;
1677 std::string tn = GetText(0);
1678 plist->push_back(tn);
1679 };
1680
1681 Finalize();
1682
1683 return DB_SUCCESS;
1684}
1685
1686int Pgsql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1687{
1688 if (!fIsConnected)
1689 return DB_FILE_ERROR;
1690
1691 if (fDebug)
1692 printf("Pgsql::ListColumns for table \'%s\'\n", table_name);
1693
1694 std::string cmd;
1695 cmd += "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1696 cmd += QuoteString(table_name);
1697 cmd += ";";
1698
1699 int status = Prepare(table_name, cmd.c_str());
1700 if (status != DB_SUCCESS)
1701 return status;
1702
1704
1705 while (1) {
1706 if (Step() != DB_SUCCESS)
1707 break;
1708 std::string cn = GetText(0);
1709 std::string ct = GetText(1);
1710 plist->push_back(cn);
1711 plist->push_back(ct);
1712 };
1713
1714 Finalize();
1715
1716 return DB_SUCCESS;
1717}
1718
1719int Pgsql::Exec(const char* table_name, const char* sql)
1720{
1721 if (fDebug)
1722 printf("Pgsql::Exec(%s, %s)\n", table_name, sql);
1723
1724 if (!fPgsql)
1725 return DB_FILE_ERROR;
1726
1727 assert(fPgsql);
1728 assert(fRow == -1);
1729
1732 if(err != PGRES_TUPLES_OK) {
1733 if(err == PGRES_FATAL_ERROR) {
1734 // handle fatal error
1735 if(strstr(PQresultErrorMessage(fResult), "already exists"))
1736 return DB_KEY_EXIST;
1737 else return DB_FILE_ERROR;
1738 }
1739
1741 Disconnect();
1742 return ExecDisconnected(table_name, sql);
1743 }
1744 }
1745
1746 return DB_SUCCESS;
1747}
1748
1749int Pgsql::ExecDisconnected(const char* table_name, const char* sql)
1750{
1751 if (fDebug)
1752 printf("Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1753
1755 fDisconnectedBuffer.push_back(sql);
1756 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1757 cm_msg(MERROR, "Pgsql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1758 }
1759 } else {
1761 }
1762
1763 time_t now = time(NULL);
1764
1765 if (fNextReconnect == 0 || now >= fNextReconnect) {
1766 int status = Connect(fConnectString.c_str());
1767 if (status == DB_SUCCESS) {
1768 fNextReconnect = 0;
1770 } else {
1771 if (fNextReconnectDelaySec == 0) {
1773 } else if (fNextReconnectDelaySec < 10*60) {
1775 }
1776 if (fDebug) {
1777 cm_msg(MINFO, "Pgsql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1779 }
1781 }
1782 }
1783
1784 return DB_SUCCESS;
1785}
1786
1787int Pgsql::Prepare(const char* table_name, const char* sql)
1788{
1789 if (fDebug)
1790 printf("Pgsql::Prepare(%s, %s)\n", table_name, sql);
1791
1792 if (!fPgsql)
1793 return DB_FILE_ERROR;
1794
1795 assert(fPgsql);
1796 //assert(fResult==NULL);
1797 assert(fRow == -1);
1798
1800 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1801 // lost connection to server
1802 int status = Connect(fConnectString.c_str());
1803 if (status == DB_SUCCESS) {
1804 // Retry after reconnecting
1806 } else {
1807 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql, status);
1808 return DB_FILE_ERROR;
1809 }
1810 if (status) {
1811 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1812 return DB_FILE_ERROR;
1813 }
1814 cm_msg(MINFO, "Pgsql::Prepare", "Reconnected to PostgreSQL after long inactivity.");
1815 }
1816
1818
1819 return DB_SUCCESS;
1820}
1821
1822std::string Pgsql::BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints,
1823 const char* table_name, const char* column_name)
1824{
1825 std::string cmd;
1826 cmd += "SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1827
1828 cmd += " FROM unnest(( SELECT lttb";
1829 cmd += "(_t_time, ";
1830 cmd += column_name;
1831 cmd += ", ";
1832 cmd += std::to_string(npoints);
1833 cmd += ") ";
1834 cmd += "FROM ";
1835 cmd += QuoteId(table_name);
1836 cmd += " WHERE _t_time BETWEEN ";
1837 cmd += "to_timestamp(";
1838 cmd += TimeToString(start_time);
1839 cmd += ") AND to_timestamp(";
1840 cmd += TimeToString(end_time);
1841 cmd += ") )) ORDER BY time;";
1842
1843 return cmd;
1844}
1845
1846int Pgsql::Step()
1847{
1848 assert(fPgsql);
1849 assert(fResult);
1850
1851 fRow++;
1852
1853 if (fRow == PQntuples(fResult))
1854 return DB_NO_MORE_SUBKEYS;
1855
1856 return DB_SUCCESS;
1857}
1858
1859const char* Pgsql::GetText(int column)
1860{
1861 assert(fPgsql);
1862 assert(fResult);
1863 assert(fNumFields > 0);
1864 assert(column >= 0);
1865 assert(column < fNumFields);
1866
1867 return PQgetvalue(fResult, fRow, column);
1868}
1869
1870double Pgsql::GetDouble(int column)
1871{
1872 return atof(GetText(column));
1873}
1874
1875time_t Pgsql::GetTime(int column)
1876{
1877 return strtoul(GetText(column), NULL, 0);
1878}
1879
1880int Pgsql::Finalize()
1881{
1882 assert(fPgsql);
1883 assert(fResult);
1884
1885 fRow = -1;
1886 fNumFields = 0;
1887
1888 return DB_SUCCESS;
1889}
1890
1891const char* Pgsql::ColumnType(int midas_tid)
1892{
1893 assert(midas_tid>=0);
1894 assert(midas_tid<TID_LAST);
1895 return sql_type_pgsql[midas_tid];
1896}
1897
1898bool Pgsql::TypesCompatible(int midas_tid, const char* sql_type)
1899{
1900 if (/* DISABLES CODE */ (0))
1901 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1902
1903 //if (sql2midasType_mysql(sql_type) == midas_tid)
1904 // return true;
1905
1906 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1907 return true;
1908
1909 // permit writing FLOAT into DOUBLE
1910 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double precision")==0)
1911 return true;
1912
1913 // T2K quirk!
1914 // permit writing BYTE into signed tinyint
1915 if (midas_tid==TID_BYTE && strcmp(sql_type, "integer")==0)
1916 return true;
1917
1918 // T2K quirk!
1919 // permit writing WORD into signed tinyint
1920 if (midas_tid==TID_WORD && strcmp(sql_type, "integer")==0)
1921 return true;
1922
1923 if (/* DISABLES CODE */ (0))
1924 printf("type mismatch!\n");
1925
1926 return false;
1927}
1928
1929std::string Pgsql::QuoteId(const char* s)
1930{
1931 std::string q;
1932 q += '"';
1933 q += s;
1934 q += '"';
1935 return q;
1936}
1937
1938std::string Pgsql::QuoteString(const char* s)
1939{
1940 std::string q;
1941 q += '\'';
1942 q += s;
1943 q += '\'';
1944 return q;
1945}
1946
1947#endif // HAVE_PGSQL
1948
1949#ifdef HAVE_SQLITE
1950
1952// SQLITE database access //
1954
1955#include <sqlite3.h>
1956
1957typedef std::map<std::string, sqlite3*> DbMap;
1958
1959class Sqlite: public SqlBase
1960{
1961public:
1962 std::string fPath;
1963
1964 DbMap fMap;
1965
1966 // temporary storage of query data
1969
1970 Sqlite(); // ctor
1971 ~Sqlite(); // dtor
1972
1973 int Connect(const char* path);
1974 int Disconnect();
1975 bool IsConnected();
1976
1977 int ConnectTable(const char* table_name);
1978 sqlite3* GetTable(const char* table_name);
1979
1980 int ListTables(std::vector<std::string> *plist);
1981 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1982
1983 int Exec(const char* table_name, const char* sql);
1984 int ExecDisconnected(const char* table_name, const char* sql);
1985
1986 int Prepare(const char* table_name, const char* sql);
1987 int Step();
1988 const char* GetText(int column);
1989 time_t GetTime(int column);
1990 double GetDouble(int column);
1991 int Finalize();
1992
1993 int OpenTransaction(const char* table_name);
1994 int CommitTransaction(const char* table_name);
1995 int RollbackTransaction(const char* table_name);
1996
1997 const char* ColumnType(int midas_tid);
1998 bool TypesCompatible(int midas_tid, const char* sql_type);
1999
2000 std::string QuoteId(const char* s);
2001 std::string QuoteString(const char* s);
2002};
2003
2004std::string Sqlite::QuoteId(const char* s)
2005{
2006 std::string q;
2007 q += "\"";
2008 q += s;
2009 q += "\"";
2010 return q;
2011}
2012
2013std::string Sqlite::QuoteString(const char* s)
2014{
2015 std::string q;
2016 q += "\'";
2017 q += s;
2018 q += "\'";
2019 return q;
2020}
2021
2022const char* Sqlite::ColumnType(int midas_tid)
2023{
2024 assert(midas_tid>=0);
2025 assert(midas_tid<TID_LAST);
2026 return sql_type_sqlite[midas_tid];
2027}
2028
2029bool Sqlite::TypesCompatible(int midas_tid, const char* sql_type)
2030{
2031 if (0)
2032 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
2033
2034 //if (sql2midasType_sqlite(sql_type) == midas_tid)
2035 // return true;
2036
2037 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
2038 return true;
2039
2040 // permit writing FLOAT into DOUBLE
2041 if (midas_tid==TID_FLOAT && strcasecmp(sql_type, "double")==0)
2042 return true;
2043
2044 return false;
2045}
2046
2047const char* Sqlite::GetText(int column)
2048{
2049 return (const char*)sqlite3_column_text(fTempStmt, column);
2050}
2051
2052time_t Sqlite::GetTime(int column)
2053{
2055}
2056
2057double Sqlite::GetDouble(int column)
2058{
2060}
2061
2062Sqlite::Sqlite() // ctor
2063{
2064 fIsConnected = false;
2065 fTempDB = NULL;
2066 fTempStmt = NULL;
2067 fDebug = 0;
2068}
2069
2070Sqlite::~Sqlite() // dtor
2071{
2072 Disconnect();
2073}
2074
2075const char* xsqlite3_errstr(sqlite3* db, int errcode)
2076{
2077 //return sqlite3_errstr(errcode);
2078 return sqlite3_errmsg(db);
2079}
2080
2081int Sqlite::ConnectTable(const char* table_name)
2082{
2083 std::string fname = fPath + "mh_" + table_name + ".sqlite3";
2084
2085 sqlite3* db = NULL;
2086
2087 int status = sqlite3_open(fname.c_str(), &db);
2088
2089 if (status != SQLITE_OK) {
2090 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(), status, xsqlite3_errstr(db, status));
2092 db = NULL;
2093 return DB_FILE_ERROR;
2094 }
2095
2096#if SQLITE_VERSION_NUMBER >= 3006020
2098 if (status != SQLITE_OK) {
2099 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2100 }
2101#else
2102#warning Missing sqlite3_extended_result_codes()!
2103#endif
2104
2105 fMap[table_name] = db;
2106
2107 Exec(table_name, "PRAGMA journal_mode=persist;");
2108 Exec(table_name, "PRAGMA synchronous=normal;");
2109 //Exec(table_name, "PRAGMA synchronous=off;");
2110 Exec(table_name, "PRAGMA journal_size_limit=-1;");
2111
2112 if (0) {
2113 Exec(table_name, "PRAGMA legacy_file_format;");
2114 Exec(table_name, "PRAGMA synchronous;");
2115 Exec(table_name, "PRAGMA journal_mode;");
2116 Exec(table_name, "PRAGMA journal_size_limit;");
2117 }
2118
2119#ifdef SQLITE_LIMIT_COLUMN
2120 if (0) {
2122 printf("Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2123 }
2124#endif
2125
2126 if (fDebug)
2127 cm_msg(MINFO, "Sqlite::Connect", "Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2128
2129 return DB_SUCCESS;
2130}
2131
2132sqlite3* Sqlite::GetTable(const char* table_name)
2133{
2134 sqlite3* db = fMap[table_name];
2135
2136 if (db)
2137 return db;
2138
2139 int status = ConnectTable(table_name);
2140 if (status != DB_SUCCESS)
2141 return NULL;
2142
2143 return fMap[table_name];
2144}
2145
2146int Sqlite::Connect(const char* path)
2147{
2148 if (fIsConnected)
2149 Disconnect();
2150
2151 fPath = path;
2152
2153 // add trailing '/'
2154 if (fPath.length() > 0) {
2155 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
2156 fPath += DIR_SEPARATOR_STR;
2157 }
2158
2159 if (fDebug)
2160 cm_msg(MINFO, "Sqlite::Connect", "Connected to Sqlite database in \'%s\'", fPath.c_str());
2161
2162 fIsConnected = true;
2163
2164 return DB_SUCCESS;
2165}
2166
2167int Sqlite::Disconnect()
2168{
2169 if (!fIsConnected)
2170 return DB_SUCCESS;
2171
2172 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2173 const char* table_name = iter->first.c_str();
2174 sqlite3* db = iter->second;
2175 int status = sqlite3_close(db);
2176 if (status != SQLITE_OK) {
2177 cm_msg(MERROR, "Sqlite::Disconnect", "sqlite3_close(%s) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2178 }
2179 }
2180
2181 fMap.clear();
2182
2183 fIsConnected = false;
2184
2185 return DB_SUCCESS;
2186}
2187
2188bool Sqlite::IsConnected()
2189{
2190 return fIsConnected;
2191}
2192
2193int Sqlite::OpenTransaction(const char* table_name)
2194{
2195 int status = Exec(table_name, "BEGIN TRANSACTION");
2196 return status;
2197}
2198
2199int Sqlite::CommitTransaction(const char* table_name)
2200{
2201 int status = Exec(table_name, "COMMIT TRANSACTION");
2202 return status;
2203}
2204
2205int Sqlite::RollbackTransaction(const char* table_name)
2206{
2207 int status = Exec(table_name, "ROLLBACK TRANSACTION");
2208 return status;
2209}
2210
2211int Sqlite::Prepare(const char* table_name, const char* sql)
2212{
2213 sqlite3* db = GetTable(table_name);
2214 if (!db)
2215 return DB_FILE_ERROR;
2216
2217 if (fDebug)
2218 printf("Sqlite::Prepare(%s, %s)\n", table_name, sql);
2219
2220 assert(fTempDB==NULL);
2221 fTempDB = db;
2222
2223#if SQLITE_VERSION_NUMBER >= 3006020
2225#else
2226#warning Missing sqlite3_prepare_v2()!
2228#endif
2229
2230 if (status == SQLITE_OK)
2231 return DB_SUCCESS;
2232
2233 std::string sqlstring = sql;
2234 cm_msg(MERROR, "Sqlite::Prepare", "Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, xsqlite3_errstr(db, status));
2235
2236 fTempDB = NULL;
2237
2238 return DB_FILE_ERROR;
2239}
2240
2241int Sqlite::Step()
2242{
2243 if (0 && fDebug)
2244 printf("Sqlite::Step()\n");
2245
2246 assert(fTempDB);
2247 assert(fTempStmt);
2248
2250
2251 if (status == SQLITE_DONE)
2252 return DB_NO_MORE_SUBKEYS;
2253
2254 if (status == SQLITE_ROW)
2255 return DB_SUCCESS;
2256
2257 cm_msg(MERROR, "Sqlite::Step", "sqlite3_step() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2258
2259 return DB_FILE_ERROR;
2260}
2261
2262int Sqlite::Finalize()
2263{
2264 if (0 && fDebug)
2265 printf("Sqlite::Finalize()\n");
2266
2267 assert(fTempDB);
2268 assert(fTempStmt);
2269
2271
2272 if (status != SQLITE_OK) {
2273 cm_msg(MERROR, "Sqlite::Finalize", "sqlite3_finalize() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2274
2275 fTempDB = NULL;
2276 fTempStmt = NULL; // FIXME: maybe a memory leak?
2277 return DB_FILE_ERROR;
2278 }
2279
2280 fTempDB = NULL;
2281 fTempStmt = NULL;
2282
2283 return DB_SUCCESS;
2284}
2285
2286int Sqlite::ListTables(std::vector<std::string> *plist)
2287{
2288 if (!fIsConnected)
2289 return DB_FILE_ERROR;
2290
2291 if (fDebug)
2292 printf("Sqlite::ListTables at path [%s]\n", fPath.c_str());
2293
2294 int status;
2295
2296 const char* cmd = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2297
2298 DIR *dir = opendir(fPath.c_str());
2299 if (!dir) {
2300 cm_msg(MERROR, "Sqlite::ListTables", "Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2301 return HS_FILE_ERROR;
2302 }
2303
2304 while (1) {
2305 const struct dirent* de = readdir(dir);
2306 if (!de)
2307 break;
2308
2309 const char* dn = de->d_name;
2310
2311 //if (dn[0]!='m' || dn[1]!='h')
2312 //continue;
2313
2314 const char* s;
2315
2316 s = strstr(dn, "mh_");
2317 if (!s || s!=dn)
2318 continue;
2319
2320 s = strstr(dn, ".sqlite3");
2321 if (!s || s[8]!=0)
2322 continue;
2323
2324 char table_name[256];
2325 mstrlcpy(table_name, dn+3, sizeof(table_name));
2326 // FIXME: skip names like "xxx.sqlite3~" and "xxx.sqlite3-deleted"
2327 char* ss = strstr(table_name, ".sqlite3");
2328 if (!ss)
2329 continue;
2330 *ss = 0;
2331
2332 //printf("dn [%s] tn [%s]\n", dn, table_name);
2333
2334 status = Prepare(table_name, cmd);
2335 if (status != DB_SUCCESS)
2336 continue;
2337
2338 while (1) {
2339 status = Step();
2340 if (status != DB_SUCCESS)
2341 break;
2342
2343 const char* tn = GetText(0);
2344 //printf("table [%s]\n", tn);
2345 plist->push_back(tn);
2346 }
2347
2348 status = Finalize();
2349 }
2350
2351 closedir(dir);
2352 dir = NULL;
2353
2354 return DB_SUCCESS;
2355}
2356
2357int Sqlite::ListColumns(const char* table, std::vector<std::string> *plist)
2358{
2359 if (!fIsConnected)
2360 return DB_FILE_ERROR;
2361
2362 if (fDebug)
2363 printf("Sqlite::ListColumns for table \'%s\'\n", table);
2364
2365 std::string cmd;
2366 cmd = "PRAGMA table_info(";
2367 cmd += table;
2368 cmd += ");";
2369
2370 int status;
2371
2372 status = Prepare(table, cmd.c_str());
2373 if (status != DB_SUCCESS)
2374 return status;
2375
2376 while (1) {
2377 status = Step();
2378 if (status != DB_SUCCESS)
2379 break;
2380
2381 const char* colname = GetText(1);
2382 const char* coltype = GetText(2);
2383 //printf("column [%s] [%s]\n", colname, coltype);
2384 plist->push_back(colname); // column name
2385 plist->push_back(coltype); // column type
2386 }
2387
2388 status = Finalize();
2389
2390 return DB_SUCCESS;
2391}
2392
2393static int callback_debug = 0;
2394
2395static int callback(void *NotUsed, int argc, char **argv, char **azColName){
2396 if (callback_debug) {
2397 printf("history_sqlite::callback---->\n");
2398 for (int i=0; i<argc; i++){
2399 printf("history_sqlite::callback[%d] %s = %s\n", i, azColName[i], argv[i] ? argv[i] : "NULL");
2400 }
2401 }
2402 return 0;
2403}
2404
2405int Sqlite::Exec(const char* table_name, const char* sql)
2406{
2407 // return values:
2408 // DB_SUCCESS
2409 // DB_FILE_ERROR: not connected
2410 // DB_KEY_EXIST: "table already exists"
2411
2412 if (!fIsConnected)
2413 return DB_FILE_ERROR;
2414
2415 sqlite3* db = GetTable(table_name);
2416 if (!db)
2417 return DB_FILE_ERROR;
2418
2419 if (fDebug)
2420 printf("Sqlite::Exec(%s, %s)\n", table_name, sql);
2421
2422 int status;
2423
2424 callback_debug = fDebug;
2425 char* errmsg = NULL;
2426
2428 if (status != SQLITE_OK) {
2429 if (status == SQLITE_ERROR && strstr(errmsg, "duplicate column name"))
2430 return DB_KEY_EXIST;
2431 if (status == SQLITE_ERROR && strstr(errmsg, "already exists"))
2432 return DB_KEY_EXIST;
2433 std::string sqlstring = sql;
2434 cm_msg(MERROR, "Sqlite::Exec", "Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, errmsg);
2436 return DB_FILE_ERROR;
2437 }
2438
2439 return DB_SUCCESS;
2440}
2441
2442int Sqlite::ExecDisconnected(const char* table_name, const char* sql)
2443{
2444 cm_msg(MERROR, "Sqlite::Exec", "sqlite driver does not support disconnected operations");
2445 return DB_FILE_ERROR;
2446}
2447
2448#endif // HAVE_SQLITE
2449
2451// Methods of HsFileSchema //
2453
2454int HsFileSchema::write_event(const time_t t, const char* data, const int data_size)
2455{
2456 HsFileSchema* s = this;
2457
2458 assert(s->fVariables.size() == s->fOffsets.size());
2459
2460 int status;
2461
2462 if (s->fWriterFd < 0) {
2463 s->fWriterFd = open(s->fFileName.c_str(), O_RDWR);
2464 if (s->fWriterFd < 0) {
2465 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2466 return HS_FILE_ERROR;
2467 }
2468
2469 int file_size = lseek(s->fWriterFd, 0, SEEK_END);
2470
2471 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2472 if (nrec < 0)
2473 nrec = 0;
2474 int data_end = s->fDataOffset + nrec*s->fRecordSize;
2475
2476 //printf("file_size %d, nrec %d, data_end %d\n", file_size, nrec, data_end);
2477
2478 if (data_end != file_size) {
2479 if (nrec > 0)
2480 cm_msg(MERROR, "FileHistory::write_event", "File \'%s\' may be truncated, data offset %d, record size %d, file size: %d, should be %d, truncating the file", s->fFileName.c_str(), s->fDataOffset, s->fRecordSize, file_size, data_end);
2481
2483 if (status < 0) {
2484 cm_msg(MERROR, "FileHistory::write_event", "Cannot seek \'%s\' to offset %d, lseek() errno %d (%s)", s->fFileName.c_str(), data_end, errno, strerror(errno));
2485 return HS_FILE_ERROR;
2486 }
2488 if (status < 0) {
2489 cm_msg(MERROR, "FileHistory::write_event", "Cannot truncate \'%s\' to size %d, ftruncate() errno %d (%s)", s->fFileName.c_str(), data_end, errno, strerror(errno));
2490 return HS_FILE_ERROR;
2491 }
2492 }
2493 }
2494
2495 int expected_size = s->fRecordSize - 4;
2496
2497 // sanity check: record_size and n_bytes are computed from the byte counts in the file header
2498 assert(expected_size == s->fNumBytes);
2499
2500 if (s->fLastSize == 0)
2502
2503 if (data_size != s->fLastSize) {
2504 cm_msg(MERROR, "FileHistory::write_event", "Event \'%s\' data size mismatch, expected %d bytes, got %d bytes, previously %d bytes", s->fEventName.c_str(), expected_size, data_size, s->fLastSize);
2505 //printf("schema:\n");
2506 //s->print();
2507
2508 if (data_size < expected_size)
2509 return HS_FILE_ERROR;
2510
2511 // truncate for now
2512 // data_size = expected_size;
2513 s->fLastSize = data_size;
2514 }
2515
2516 int size = 4 + expected_size;
2517
2518 if (size != s->fRecordBufferSize) {
2519 s->fRecordBuffer = (char*)realloc(s->fRecordBuffer, size);
2520 assert(s->fRecordBuffer != NULL);
2521 s->fRecordBufferSize = size;
2522 }
2523
2524 memcpy(s->fRecordBuffer, &t, 4);
2526
2527 status = write(s->fWriterFd, s->fRecordBuffer, size);
2528 if (status != size) {
2529 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) returned %d, errno %d (%s)", s->fFileName.c_str(), size, status, errno, strerror(errno));
2530 return HS_FILE_ERROR;
2531 }
2532
2533#if 0
2534 status = write(s->fWriterFd, &t, 4);
2535 if (status != 4) {
2536 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2537 return HS_FILE_ERROR;
2538 }
2539
2541 if (status != expected_size) {
2542 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) errno %d (%s)", s->fFileName.c_str(), data_size, errno, strerror(errno));
2543 return HS_FILE_ERROR;
2544 }
2545#endif
2546
2547 return HS_SUCCESS;
2548}
2549
2551{
2552 if (fWriterFd >= 0) {
2554 fWriterFd = -1;
2555 }
2556 return HS_SUCCESS;
2557}
2558
2559static int ReadRecord(const char* file_name, int fd, int offset, int recsize, int irec, char* rec)
2560{
2561 int status;
2562 int fpos = offset + irec*recsize;
2563
2564 status = ::lseek(fd, fpos, SEEK_SET);
2565 if (status == -1) {
2566 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', lseek(%d) errno %d (%s)", file_name, fpos, errno, strerror(errno));
2567 return -1;
2568 }
2569
2570 status = ::read(fd, rec, recsize);
2571 if (status == 0) {
2572 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', unexpected end of file on read()", file_name);
2573 return -1;
2574 }
2575 if (status == -1) {
2576 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', read() errno %d (%s)", file_name, errno, strerror(errno));
2577 return -1;
2578 }
2579 if (status != recsize) {
2580 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', short read() returned %d instead of %d bytes", file_name, status, recsize);
2581 return -1;
2582 }
2583 return HS_SUCCESS;
2584}
2585
2586static int FindTime(const char* file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int* i1p, time_t* t1p, int* i2p, time_t* t2p, time_t* tstart, time_t* tend, int debug)
2587{
2588 //
2589 // purpose: find location time timestamp inside given file.
2590 // uses binary search
2591 // returns:
2592 // tstart, tend - time of first and last data in a file
2593 // i1p,t1p - data just before timestamp, used as "last_written"
2594 // i2p,t2p - data at timestamp or after timestamp, used as starting point to read data from file
2595 // assertions:
2596 // tstart <= t1p < t2p <= tend
2597 // i1p+1==i2p
2598 // t1p < timestamp <= t2p
2599 //
2600 // special cases:
2601 // 1) timestamp <= tstart - all data is in the future, return i1p==-1, t1p==-1, i2p==0, t2p==tstart
2602 // 2) tend < timestamp - all the data is in the past, return i1p = nrec-1, t1p = tend, i2p = nrec, t2p = 0;
2603 // 3) nrec == 1 only one record in this file and it is older than the timestamp (tstart == tend < timestamp)
2604 //
2605
2606 int status;
2607 char* buf = new char[recsize];
2608
2609 assert(nrec > 0);
2610
2611 int rec1 = 0;
2612 int rec2 = nrec-1;
2613
2615 if (status != HS_SUCCESS) {
2616 delete[] buf;
2617 return HS_FILE_ERROR;
2618 }
2619
2620 time_t t1 = *(DWORD*)buf;
2621
2622 *tstart = t1;
2623
2624 // timestamp is older than any data in this file
2625 if (timestamp <= t1) {
2626 *i1p = -1;
2627 *t1p = 0;
2628 *i2p = 0;
2629 *t2p = t1;
2630 *tend = 0;
2631 delete[] buf;
2632 return HS_SUCCESS;
2633 }
2634
2635 assert(t1 < timestamp);
2636
2637 if (nrec == 1) {
2638 *i1p = 0;
2639 *t1p = t1;
2640 *i2p = nrec; // == 1
2641 *t2p = 0;
2642 *tend = t1;
2643 delete[] buf;
2644 return HS_SUCCESS;
2645 }
2646
2648 if (status != HS_SUCCESS) {
2649 delete[] buf;
2650 return HS_FILE_ERROR;
2651 }
2652
2653 time_t t2 = *(DWORD*)buf;
2654
2655 *tend = t2;
2656
2657 // all the data is in the past
2658 if (t2 < timestamp) {
2659 *i1p = rec2;
2660 *t1p = t2;
2661 *i2p = nrec;
2662 *t2p = 0;
2663 delete[] buf;
2664 return HS_SUCCESS;
2665 }
2666
2667 assert(t1 < timestamp);
2668 assert(timestamp <= t2);
2669
2670 if (debug)
2671 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2672
2673 // implement binary search
2674
2675 do {
2676 int rec = (rec1+rec2)/2;
2677
2678 assert(rec >= 0);
2679 assert(rec < nrec);
2680
2682 if (status != HS_SUCCESS) {
2683 delete[] buf;
2684 return HS_FILE_ERROR;
2685 }
2686
2687 time_t t = *(DWORD*)buf;
2688
2689 if (timestamp <= t) {
2690 if (debug)
2691 printf("FindTime: rec %d..(x)..%d..%d, time %s..(%s)..%s..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t).c_str(), TimeToString(t2).c_str());
2692
2693 rec2 = rec;
2694 t2 = t;
2695 } else {
2696 if (debug)
2697 printf("FindTime: rec %d..%d..(x)..%d, time %s..%s..(%s)..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(t).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2698
2699 rec1 = rec;
2700 t1 = t;
2701 }
2702 } while (rec2 - rec1 > 1);
2703
2704 assert(rec1+1 == rec2);
2705 assert(t1 < timestamp);
2706 assert(timestamp <= t2);
2707
2708 if (debug)
2709 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s, this is the result.\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2710
2711 *i1p = rec1;
2712 *t1p = t1;
2713
2714 *i2p = rec2;
2715 *t2p = t2;
2716
2717 delete[] buf;
2718 return HS_SUCCESS;
2719}
2720
2722 const int debug,
2723 time_t* last_written)
2724{
2725 int status;
2726 HsFileSchema* s = this;
2727
2728 if (debug)
2729 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str());
2730
2731 int fd = open(s->fFileName.c_str(), O_RDONLY);
2732 if (fd < 0) {
2733 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2734 return HS_FILE_ERROR;
2735 }
2736
2737 int file_size = ::lseek(fd, 0, SEEK_END);
2738
2739 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2740 if (nrec < 0)
2741 nrec = 0;
2742
2743 if (nrec < 1) {
2744 ::close(fd);
2745 if (last_written)
2746 *last_written = 0;
2747 return HS_SUCCESS;
2748 }
2749
2750 time_t lw = 0;
2751
2752 // read last record to check if desired time is inside or outside of the file
2753
2754 if (1) {
2755 char* buf = new char[s->fRecordSize];
2756
2757 status = ReadRecord(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec - 1, buf);
2758 if (status != HS_SUCCESS) {
2759 delete[] buf;
2760 ::close(fd);
2761 return HS_FILE_ERROR;
2762 }
2763
2764 lw = *(DWORD*)buf;
2765
2766 delete[] buf;
2767 }
2768
2769 if (lw >= timestamp) {
2770 int irec = 0;
2771 time_t trec = 0;
2772 int iunused = 0;
2773 time_t tunused = 0;
2774 time_t tstart = 0; // not used
2775 time_t tend = 0; // not used
2776
2777 status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*debug);
2778 if (status != HS_SUCCESS) {
2779 ::close(fd);
2780 return HS_FILE_ERROR;
2781 }
2782
2783 assert(trec < timestamp);
2784
2785 lw = trec;
2786 }
2787
2788 if (last_written)
2789 *last_written = lw;
2790
2791 if (debug)
2792 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s, last_written %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
2793
2794 assert(lw < timestamp);
2795
2796 ::close(fd);
2797
2798 return HS_SUCCESS;
2799}
2800
2801int HsFileSchema::read_data(const time_t start_time,
2802 const time_t end_time,
2803 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
2804 const int debug,
2805 std::vector<time_t>& last_time,
2807{
2808 HsFileSchema* s = this;
2809
2810 if (debug)
2811 printf("FileHistory::read_data: file %s, schema time %s..%s, read time %s..%s, %d vars\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var);
2812
2813 //if (1) {
2814 // printf("Last time: ");
2815 // for (int i=0; i<num_var; i++) {
2816 // printf(" %s", TimeToString(last_time[i]).c_str());
2817 // }
2818 // printf("\n");
2819 //}
2820
2821 if (debug) {
2822 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
2823 for (size_t i=0; i<var_schema_index.size(); i++) {
2824 printf(" %2d", var_schema_index[i]);
2825 }
2826 printf("\n");
2827 }
2828
2829 int fd = ::open(s->fFileName.c_str(), O_RDONLY);
2830 if (fd < 0) {
2831 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2832 return HS_FILE_ERROR;
2833 }
2834
2835 off_t file_size = ::lseek(fd, 0, SEEK_END);
2836
2837 if (file_size == (off_t)-1) {
2838 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', fseek(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2839 ::close(fd);
2840 return HS_FILE_ERROR;
2841 }
2842
2843 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2844 if (nrec < 0)
2845 nrec = 0;
2846
2847 if (nrec < 1) {
2848 ::close(fd);
2849 return HS_SUCCESS;
2850 }
2851
2852 int iunused = 0;
2853 time_t tunused = 0;
2854 int irec = 0;
2855 time_t trec = 0;
2856 time_t tstart = 0;
2857 time_t tend = 0;
2858
2859 int status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*debug);
2860
2861 if (status != HS_SUCCESS) {
2862 ::close(fd);
2863 return HS_FILE_ERROR;
2864 }
2865
2866 if (debug) {
2867 printf("FindTime %d, nrec %d, (%d, %s) (%d, %s), tstart %s, tend %s, want %s\n", status, nrec, iunused, TimeToString(tunused).c_str(), irec, TimeToString(trec).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str(), TimeToString(start_time).c_str());
2868 }
2869
2870 if (irec < 0 || irec >= nrec) {
2871 // all data in this file is older than start_time
2872
2873 ::close(fd);
2874
2875 if (debug)
2876 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str());
2877
2878 return HS_SUCCESS;
2879 }
2880
2882 // data starts before time declared in schema
2883
2884 ::close(fd);
2885
2886 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of first data %s is before schema start time %s", s->fFileName.c_str(), TimeToString(tstart).c_str(), TimeToString(s->fTimeFrom).c_str());
2887
2888 return HS_FILE_ERROR;
2889 }
2890
2891 if (tend && s->fTimeTo && tend > s->fTimeTo) {
2892 // data ends after time declared in schema (overlaps with next file)
2893
2894 ::close(fd);
2895
2896 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of last data %s is after schema end time %s", s->fFileName.c_str(), TimeToString(tend).c_str(), TimeToString(s->fTimeTo).c_str());
2897
2898 return HS_FILE_ERROR;
2899 }
2900
2901 for (int i=0; i<num_var; i++) {
2902 int si = var_schema_index[i];
2903 if (si < 0)
2904 continue;
2905
2906 if (trec < last_time[i]) { // protect against duplicate and non-monotonous data
2907 ::close(fd);
2908
2909 cm_msg(MERROR, "FileHistory::read_data", "Internal history error at file \'%s\': variable %d data timestamp %s is before last timestamp %s", s->fFileName.c_str(), i, TimeToString(trec).c_str(), TimeToString(last_time[i]).c_str());
2910
2911 return HS_FILE_ERROR;
2912 }
2913 }
2914
2915 int count = 0;
2916
2918
2919 off_t xpos = ::lseek(fd, fpos, SEEK_SET);
2920 if (xpos == (off_t)-1) {
2921 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', lseek(%zu) errno %d (%s)", s->fFileName.c_str(), (size_t)fpos, errno, strerror(errno));
2922 ::close(fd);
2923 return HS_FILE_ERROR;
2924 }
2925
2926 char* buf = new char[s->fRecordSize];
2927
2928 int prec = irec;
2929
2930 while (1) {
2931 status = ::read(fd, buf, s->fRecordSize);
2932 if (status == 0) // EOF
2933 break;
2934 if (status == -1) {
2935 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', read() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2936 break;
2937 }
2938 if (status != s->fRecordSize) {
2939 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', short read() returned %d instead of %d bytes", s->fFileName.c_str(), status, s->fRecordSize);
2940 break;
2941 }
2942
2943 prec++;
2944
2945 bool past_end_of_last_file = (s->fTimeTo == 0) && (prec > nrec);
2946
2947 time_t t = *(DWORD*)buf;
2948
2949 if (debug > 1)
2950 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, row time %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(t).c_str());
2951
2952 if (t < trec) {
2953 delete[] buf;
2954 ::close(fd);
2955 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before start time %s", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(trec).c_str());
2956 return HS_FILE_ERROR;
2957 }
2958
2959 if (tend && (t > tend) && !past_end_of_last_file) {
2960 delete[] buf;
2961 ::close(fd);
2962 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is after last timestamp %s", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(tend).c_str());
2963 return HS_FILE_ERROR;
2964 }
2965
2966 if (t > end_time)
2967 break;
2968
2969 char* data = buf + 4;
2970
2971 for (int i=0; i<num_var; i++) {
2972 int si = var_schema_index[i];
2973 if (si < 0)
2974 continue;
2975
2976 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
2977 delete[] buf;
2978 ::close(fd);
2979
2980 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before timestamp %s of variable %d", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(last_time[i]).c_str(), i);
2981
2982 return HS_FILE_ERROR;
2983 }
2984
2985 double v = 0;
2986 void* ptr = data + s->fOffsets[si];
2987
2988 int ii = var_index[i];
2989 assert(ii >= 0);
2990 assert(ii < s->fVariables[si].n_data);
2991
2992 switch (s->fVariables[si].type) {
2993 default:
2994 // unknown data type
2995 v = 0;
2996 break;
2997 case TID_BYTE:
2998 v = ((unsigned char*)ptr)[ii];
2999 break;
3000 case TID_SBYTE:
3001 v = ((signed char *)ptr)[ii];
3002 break;
3003 case TID_CHAR:
3004 v = ((char*)ptr)[ii];
3005 break;
3006 case TID_WORD:
3007 v = ((unsigned short *)ptr)[ii];
3008 break;
3009 case TID_SHORT:
3010 v = ((signed short *)ptr)[ii];
3011 break;
3012 case TID_DWORD:
3013 v = ((unsigned int *)ptr)[ii];
3014 break;
3015 case TID_INT:
3016 v = ((int *)ptr)[ii];
3017 break;
3018 case TID_BOOL:
3019 v = ((unsigned int *)ptr)[ii];
3020 break;
3021 case TID_FLOAT:
3022 v = ((float*)ptr)[ii];
3023 break;
3024 case TID_DOUBLE:
3025 v = ((double*)ptr)[ii];
3026 break;
3027 }
3028
3029 buffer[i]->Add(t, v);
3030 last_time[i] = t;
3031 }
3032 count++;
3033 }
3034
3035 delete[] buf;
3036
3037 ::close(fd);
3038
3039 if (debug) {
3040 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
3041 for (size_t i=0; i<var_schema_index.size(); i++) {
3042 printf(" %2d", var_schema_index[i]);
3043 }
3044 printf(" read %d rows\n", count);
3045 }
3046
3047 if (debug)
3048 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var, count);
3049
3050 return HS_SUCCESS;
3051}
3052
3054// Implementation of the MidasHistoryInterface //
3056
3058{
3059protected:
3061 std::string fConnectString;
3062
3063 // writer data
3065 std::vector<HsSchema*> fEvents;
3066
3067 // reader data
3069
3070public:
3072 {
3073 fDebug = 0;
3074 }
3075
3077 {
3078 for (unsigned i=0; i<fEvents.size(); i++)
3079 if (fEvents[i]) {
3080 delete fEvents[i];
3081 fEvents[i] = NULL;
3082 }
3083 fEvents.clear();
3084 }
3085
3086 virtual int hs_set_debug(int debug)
3087 {
3088 int old = fDebug;
3089 fDebug = debug;
3090 return old;
3091 }
3092
3093 virtual int hs_connect(const char* connect_string) = 0;
3094 virtual int hs_disconnect() = 0;
3095
3096protected:
3098 // Schema functions //
3100
3101 // load existing schema
3102 virtual int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp) = 0; // event_name: =NULL means read only event names, =event_name means load tag names for all matching events; timestamp: =0 means read all schema all the way to the beginning of time, =time means read schema in effect at this time and all newer schema
3103
3104 // return a new copy of a schema for writing into history. If schema for this event does not exist, it will be created
3105 virtual HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]) = 0;
3106
3107public:
3109 // Functions used by mlogger //
3111
3112 int hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
3113 int hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer);
3114 int hs_flush_buffers();
3115
3117 // Functions used by mhttpd //
3119
3120 int hs_clear_cache();
3121 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3122 int hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags);
3123 int hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[]);
3124 int hs_read_buffer(time_t start_time, time_t end_time,
3125 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3127 int hs_status[]);
3128
3129 /*------------------------------------------------------------------*/
3130
3132 {
3133 public:
3137
3139
3143 double **fDataBuffer;
3144
3146
3148 {
3149 fNumAdded = 0;
3150
3154
3155 fNumAlloc = 0;
3156 fNumEntries = NULL;
3157 fTimeBuffer = NULL;
3158 fDataBuffer = NULL;
3159
3160 fPrevTime = 0;
3161 }
3162
3163 ~ReadBuffer() // dtor
3164 {
3165 }
3166
3168 {
3169 if (wantalloc < fNumAlloc - 10)
3170 return;
3171
3172 int newalloc = fNumAlloc*2;
3173
3174 if (newalloc <= 1000)
3175 newalloc = wantalloc + 1000;
3176
3177 //printf("wantalloc %d, fNumEntries %d, fNumAlloc %d, newalloc %d\n", wantalloc, *fNumEntries, fNumAlloc, newalloc);
3178
3180 assert(*fTimeBuffer);
3181
3182 *fDataBuffer = (double*)realloc(*fDataBuffer, sizeof(double)*newalloc);
3183 assert(*fDataBuffer);
3184
3186 }
3187
3188 void Add(time_t t, double v)
3189 {
3190 if (t < fFirstTime)
3191 return;
3192 if (t > fLastTime)
3193 return;
3194
3195 fNumAdded++;
3196
3197 if ((fPrevTime==0) || (t >= fPrevTime + fInterval)) {
3198 int pos = *fNumEntries;
3199
3200 Realloc(pos + 1);
3201
3202 (*fTimeBuffer)[pos] = t;
3203 (*fDataBuffer)[pos] = v;
3204
3205 (*fNumEntries) = pos + 1;
3206
3207 fPrevTime = t;
3208 }
3209 }
3210
3211 void Finish()
3212 {
3213
3214 }
3215 };
3216
3217 /*------------------------------------------------------------------*/
3218
3219 int hs_read(time_t start_time, time_t end_time, time_t interval,
3220 int num_var,
3221 const char* const event_name[], const char* const var_name[], const int var_index[],
3222 int num_entries[],
3223 time_t* time_buffer[], double* data_buffer[],
3224 int st[]);
3225 /*------------------------------------------------------------------*/
3226
3227
3228 int hs_read_binned(time_t start_time, time_t end_time, int num_bins,
3229 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3230 int num_entries[],
3231 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3234 time_t last_time[], double last_value[],
3235 int st[]);
3236};
3237
3239{
3240 fNumEntries = 0;
3241
3245
3246 fSum0 = new double[num_bins];
3247 fSum1 = new double[num_bins];
3248 fSum2 = new double[num_bins];
3249
3250 for (int i=0; i<num_bins; i++) {
3251 fSum0[i] = 0;
3252 fSum1[i] = 0;
3253 fSum2[i] = 0;
3254 }
3255}
3256
3258{
3259 delete fSum0; fSum0 = NULL;
3260 delete fSum1; fSum1 = NULL;
3261 delete fSum2; fSum2 = NULL;
3262 // poison the pointers
3263 fCount = NULL;
3264 fMean = NULL;
3265 fRms = NULL;
3266 fMin = NULL;
3267 fMax = NULL;
3274}
3275
3277{
3278 for (int ibin = 0; ibin < fNumBins; ibin++) {
3279 if (fMin)
3280 fMin[ibin] = 0;
3281 if (fMax)
3282 fMax[ibin] = 0;
3283 if (fBinsFirstTime)
3284 fBinsFirstTime[ibin] = 0;
3285 if (fBinsFirstValue)
3286 fBinsFirstValue[ibin] = 0;
3287 if (fBinsLastTime)
3288 fBinsLastTime[ibin] = 0;
3289 if (fBinsLastValue)
3290 fBinsLastValue[ibin] = 0;
3291 }
3292 if (fLastTimePtr)
3293 *fLastTimePtr = 0;
3294 if (fLastValuePtr)
3295 *fLastValuePtr = 0;
3296}
3297
3299{
3300 if (t < fFirstTime)
3301 return;
3302 if (t > fLastTime)
3303 return;
3304
3305 fNumEntries++;
3306
3307 double a = (double)(t - fFirstTime);
3308 double b = (double)(fLastTime - fFirstTime);
3309 double fbin = fNumBins*a/b;
3310
3311 int ibin = (int)fbin;
3312
3313 if (ibin < 0)
3314 ibin = 0;
3315 else if (ibin >= fNumBins)
3316 ibin = fNumBins-1;
3317
3318 if (fSum0[ibin] == 0) {
3319 if (fMin)
3320 fMin[ibin] = v;
3321 if (fMax)
3322 fMax[ibin] = v;
3323 if (fBinsFirstTime)
3324 fBinsFirstTime[ibin] = t;
3325 if (fBinsFirstValue)
3326 fBinsFirstValue[ibin] = v;
3327 if (fBinsLastTime)
3328 fBinsLastTime[ibin] = t;
3329 if (fBinsLastValue)
3330 fBinsLastValue[ibin] = v;
3331 if (fLastTimePtr)
3332 *fLastTimePtr = t;
3333 if (fLastValuePtr)
3334 *fLastValuePtr = v;
3335 }
3336
3337 fSum0[ibin] += 1.0;
3338 fSum1[ibin] += v;
3339 fSum2[ibin] += v*v;
3340
3341 if (fMin)
3342 if (v < fMin[ibin])
3343 fMin[ibin] = v;
3344
3345 if (fMax)
3346 if (v > fMax[ibin])
3347 fMax[ibin] = v;
3348
3349 // NOTE: this assumes t and v are sorted by time.
3350 if (fBinsLastTime)
3351 fBinsLastTime[ibin] = t;
3352 if (fBinsLastValue)
3353 fBinsLastValue[ibin] = v;
3354
3355 if (fLastTimePtr)
3356 if (t > *fLastTimePtr) {
3357 *fLastTimePtr = t;
3358 if (fLastValuePtr)
3359 *fLastValuePtr = v;
3360 }
3361}
3362
3364{
3365 for (int i=0; i<fNumBins; i++) {
3366 double num = fSum0[i];
3367 double mean = 0;
3368 double variance = 0;
3369 if (num > 0) {
3370 mean = fSum1[i]/num;
3372 }
3373 double rms = 0;
3374 if (variance > 0)
3375 rms = sqrt(variance);
3376
3377 if (fCount)
3378 fCount[i] = (int)num;
3379
3380 if (fMean)
3381 fMean[i] = mean;
3382
3383 if (fRms)
3384 fRms[i] = rms;
3385
3386 if (num == 0) {
3387 if (fMin)
3388 fMin[i] = 0;
3389 if (fMax)
3390 fMax[i] = 0;
3391 }
3392 }
3393}
3394
3395int SchemaHistoryBase::hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
3396{
3397 if (fDebug) {
3398 printf("hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3399 if (fDebug > 1)
3400 PrintTags(ntags, tags);
3401 }
3402
3403 // delete all events with the same name
3404 for (unsigned int i=0; i<fEvents.size(); i++)
3405 if (fEvents[i])
3406 if (event_name_cmp(fEvents[i]->fEventName, event_name)==0) {
3407 if (fDebug)
3408 printf("deleting exising event %s\n", event_name);
3409 fEvents[i]->close();
3410 delete fEvents[i];
3411 fEvents[i] = NULL;
3412 }
3413
3414 // check for wrong types etc
3415 for (int i=0; i<ntags; i++) {
3416 if (strlen(tags[i].name) < 1) {
3417 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has empty name at index %d", event_name, i);
3418 return HS_FILE_ERROR;
3419 }
3420 if (tags[i].name[0] == ' ') {
3421 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has name \'%s\' starting with a blank", event_name, tags[i].name);
3422 return HS_FILE_ERROR;
3423 }
3424 if (tags[i].type <= 0 || tags[i].type >= TID_LAST) {
3425 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3426 event_name, tags[i].name, i, tags[i].type);
3427 return HS_FILE_ERROR;
3428 }
3429 if (tags[i].type == TID_STRING) {
3430 cm_msg(MERROR, "hs_define_event",
3431 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3432 tags[i].name, i);
3433 return HS_FILE_ERROR;
3434 }
3435 if (tags[i].n_data <= 0) {
3436 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3437 event_name, tags[i].name, i, tags[i].n_data);
3438 return HS_FILE_ERROR;
3439 }
3440 }
3441
3442 // check for duplicate names. Done by sorting, since this takes only O(N*log*N))
3443 std::vector<std::string> names;
3444 for (int i=0; i<ntags; i++) {
3445 std::string str(tags[i].name);
3446 std::transform(str.begin(), str.end(), str.begin(), ::toupper);
3447 names.push_back(str);
3448 }
3449 std::sort(names.begin(), names.end());
3450 for (int i=0; i<ntags-1; i++) {
3451 if (names[i] == names[i + 1]) {
3452 cm_msg(MERROR, "hs_define_event",
3453 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3454 names[i].c_str());
3455 return HS_FILE_ERROR;
3456 }
3457 }
3458
3459 HsSchema* s = new_event(event_name, timestamp, ntags, tags);
3460 if (!s)
3461 return HS_FILE_ERROR;
3462
3463 s->fDisabled = false;
3464
3466
3467 // find empty slot in events list
3468 for (unsigned int i=0; i<fEvents.size(); i++)
3469 if (!fEvents[i]) {
3470 fEvents[i] = s;
3471 s = NULL;
3472 break;
3473 }
3474
3475 // if no empty slots, add at the end
3476 if (s)
3477 fEvents.push_back(s);
3478
3479 return HS_SUCCESS;
3480}
3481
3482int SchemaHistoryBase::hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer)
3483{
3484 if (fDebug)
3485 printf("hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (int)timestamp, buffer_size);
3486
3487 HsSchema *s = NULL;
3488
3489 // find this event
3490 for (size_t i=0; i<fEvents.size(); i++)
3491 if (fEvents[i] && (event_name_cmp(fEvents[i]->fEventName, event_name)==0)) {
3492 s = fEvents[i];
3493 break;
3494 }
3495
3496 // not found
3497 if (!s)
3498 return HS_UNDEFINED_EVENT;
3499
3500 // deactivated because of error?
3501 if (s->fDisabled)
3502 return HS_FILE_ERROR;
3503
3504 if (s->fNumBytes == 0) { // compute expected data size
3505 // NB: history data does not have any padding!
3506 for (unsigned i=0; i<s->fVariables.size(); i++) {
3507 s->fNumBytes += s->fVariables[i].n_bytes;
3508 }
3509 }
3510
3511 int status;
3512
3513 if (buffer_size > s->fNumBytes) { // too many bytes!
3514 if (s->fCountWriteOversize == 0) {
3515 // only report first occurence
3516 // count of all occurences is reported by HsSchema destructor
3517 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3518 }
3520 if (buffer_size > s->fWriteMaxSize)
3521 s->fWriteMaxSize = buffer_size;
3522 status = s->write_event(timestamp, buffer, s->fNumBytes);
3523 } else if (buffer_size < s->fNumBytes) { // too few bytes
3524 if (s->fCountWriteUndersize == 0) {
3525 // only report first occurence
3526 // count of all occurences is reported by HsSchema destructor
3527 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3528 }
3530 if (s->fWriteMinSize == 0)
3531 s->fWriteMinSize = buffer_size;
3532 else if (buffer_size < s->fWriteMinSize)
3533 s->fWriteMinSize = buffer_size;
3534 char* tmp = (char*)malloc(s->fNumBytes);
3535 memcpy(tmp, buffer, buffer_size);
3536 memset(tmp + buffer_size, 0, s->fNumBytes - buffer_size);
3537 status = s->write_event(timestamp, tmp, s->fNumBytes);
3538 free(tmp);
3539 } else {
3540 assert(buffer_size == s->fNumBytes); // obviously
3541 status = s->write_event(timestamp, buffer, buffer_size);
3542 }
3543
3544 // if could not write event, deactivate it
3545 if (status != HS_SUCCESS) {
3546 s->fDisabled = true;
3547 cm_msg(MERROR, "hs_write_event", "Event \'%s\' disabled after write error %d", event_name, status);
3548 return HS_FILE_ERROR;
3549 }
3550
3551 return HS_SUCCESS;
3552}
3553
3555{
3556 int status = HS_SUCCESS;
3557
3558 if (fDebug)
3559 printf("hs_flush_buffers!\n");
3560
3561 for (unsigned int i=0; i<fEvents.size(); i++)
3562 if (fEvents[i]) {
3563 int xstatus = fEvents[i]->flush_buffers();
3564 if (xstatus != HS_SUCCESS)
3565 status = xstatus;
3566 }
3567
3568 return status;
3569}
3570
3572// Functions used by mhttpd //
3574
3576{
3577 if (fDebug)
3578 printf("SchemaHistoryBase::hs_clear_cache!\n");
3579
3581 fSchema.clear();
3582
3583 return HS_SUCCESS;
3584}
3585
3586int SchemaHistoryBase::hs_get_events(time_t t, std::vector<std::string> *pevents)
3587{
3588 if (fDebug)
3589 printf("hs_get_events, time %s\n", TimeToString(t).c_str());
3590
3591 int status = read_schema(&fSchema, NULL, t);
3592 if (status != HS_SUCCESS)
3593 return status;
3594
3595 if (fDebug) {
3596 printf("hs_get_events: available schema:\n");
3597 fSchema.print(false);
3598 }
3599
3600 assert(pevents);
3601
3602 for (unsigned i=0; i<fSchema.size(); i++) {
3603 HsSchema* s = fSchema[i];
3604 if (t && s->fTimeTo && s->fTimeTo < t)
3605 continue;
3606 bool dupe = false;
3607 for (unsigned j=0; j<pevents->size(); j++)
3608 if (event_name_cmp((*pevents)[j], s->fEventName.c_str())==0) {
3609 dupe = true;
3610 break;
3611 }
3612
3613 if (!dupe)
3614 pevents->push_back(s->fEventName);
3615 }
3616
3617 std::sort(pevents->begin(), pevents->end());
3618
3619 if (fDebug) {
3620 printf("hs_get_events: returning %d events\n", (int)pevents->size());
3621 for (unsigned i=0; i<pevents->size(); i++) {
3622 printf(" %d: [%s]\n", i, (*pevents)[i].c_str());
3623 }
3624 }
3625
3626 return HS_SUCCESS;
3627}
3628
3629int SchemaHistoryBase::hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags)
3630{
3631 if (fDebug)
3632 printf("hs_get_tags: event [%s], time %s\n", event_name, TimeToString(t).c_str());
3633
3634 assert(ptags);
3635
3636 int status = read_schema(&fSchema, event_name, t);
3637 if (status != HS_SUCCESS)
3638 return status;
3639
3640 bool found_event = false;
3641 for (unsigned i=0; i<fSchema.size(); i++) {
3642 HsSchema* s = fSchema[i];
3643 if (t && s->fTimeTo && s->fTimeTo < t)
3644 continue;
3645
3646 if (event_name_cmp(s->fEventName, event_name) != 0)
3647 continue;
3648
3649 found_event = true;
3650
3651 for (unsigned i=0; i<s->fVariables.size(); i++) {
3652 const char* tagname = s->fVariables[i].name.c_str();
3653 //printf("event_name [%s], table_name [%s], column name [%s], tag name [%s]\n", event_name, tn.c_str(), cn.c_str(), tagname);
3654
3655 bool dupe = false;
3656
3657 for (unsigned k=0; k<ptags->size(); k++)
3658 if (strcasecmp((*ptags)[k].name, tagname) == 0) {
3659 dupe = true;
3660 break;
3661 }
3662
3663 if (!dupe) {
3664 TAG t;
3665 mstrlcpy(t.name, tagname, sizeof(t.name));
3666 t.type = s->fVariables[i].type;
3667 t.n_data = s->fVariables[i].n_data;
3668
3669 ptags->push_back(t);
3670 }
3671 }
3672 }
3673
3674 if (!found_event)
3675 return HS_UNDEFINED_EVENT;
3676
3677 if (fDebug) {
3678 printf("hs_get_tags: event [%s], returning %d tags\n", event_name, (int)ptags->size());
3679 for (unsigned i=0; i<ptags->size(); i++) {
3680 printf(" tag[%d]: %s[%d] type %d\n", i, (*ptags)[i].name, (*ptags)[i].n_data, (*ptags)[i].type);
3681 }
3682 }
3683
3684 return HS_SUCCESS;
3685}
3686
3687int SchemaHistoryBase::hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[])
3688{
3689 if (fDebug) {
3690 printf("hs_get_last_written: timestamp %s, num_var %d\n", TimeToString(timestamp).c_str(), num_var);
3691 }
3692
3693 for (int j=0; j<num_var; j++) {
3694 last_written[j] = 0;
3695 }
3696
3697 for (int i=0; i<num_var; i++) {
3698 int status = read_schema(&fSchema, event_name[i], 0);
3699 if (status != HS_SUCCESS)
3700 return status;
3701 }
3702
3703 //fSchema.print(false);
3704
3705 for (int i=0; i<num_var; i++) {
3706 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3707 HsSchema* s = fSchema[ss];
3708 // schema is too new
3709 if (s->fTimeFrom && s->fTimeFrom >= timestamp)
3710 continue;
3711 // this schema is newer than last_written and may contain newer data?
3712 if (s->fTimeFrom && s->fTimeFrom < last_written[i])
3713 continue;
3714 // schema for the variables we want?
3715 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3716 if (sindex < 0)
3717 continue;
3718
3719 time_t lw = 0;
3720
3721 int status = s->read_last_written(timestamp, fDebug, &lw);
3722
3723 if (status == HS_SUCCESS && lw != 0) {
3724 for (int j=0; j<num_var; j++) {
3725 int sj = s->match_event_var(event_name[j], var_name[j], var_index[j]);
3726 if (sj < 0)
3727 continue;
3728
3729 if (lw > last_written[j])
3730 last_written[j] = lw;
3731 }
3732 }
3733 }
3734 }
3735
3736 if (fDebug) {
3737 printf("hs_get_last_written: timestamp time %s, num_var %d, result:\n", TimeToString(timestamp).c_str(), num_var);
3738 for (int i=0; i<num_var; i++) {
3739 printf(" event [%s] tag [%s] index [%d] last_written %s\n", event_name[i], var_name[i], var_index[i], TimeToString(last_written[i]).c_str());
3740 }
3741 }
3742
3743 return HS_SUCCESS;
3744}
3745
3747 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3749 int hs_status[])
3750{
3751 if (fDebug)
3752 printf("hs_read_buffer: %d variables, start time %s, end time %s\n", num_var, TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
3753
3754 for (int i=0; i<num_var; i++) {
3755 int status = read_schema(&fSchema, event_name[i], start_time);
3756 if (status != HS_SUCCESS)
3757 return status;
3758 }
3759
3760#if 0
3761 if (fDebug)
3762 fSchema.print(false);
3763#endif
3764
3765 for (int i=0; i<num_var; i++) {
3767 }
3768
3769 //for (unsigned ss=0; ss<fSchema.size(); ss++) {
3770 // HsSchema* s = fSchema[ss];
3771 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3772 // assert(fs != NULL);
3773 // printf("schema %d from %s to %s, filename %s\n", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3774 //}
3775
3776 // check that schema are sorted by time
3777
3778#if 0
3779 // check that schema list is sorted by time, descending fTimeFrom, newest schema first
3780 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3781 if (fDebug) {
3782 //printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d %d %d\n", ss, fSchema.size(),
3783 // TimeToString(fSchema[ss-1]->fTimeFrom).c_str(),
3784 // TimeToString(fSchema[ss]->fTimeFrom).c_str(),
3785 // TimeToString(fSchema[ss]->fTimeTo).c_str(),
3786 // fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo,
3787 // fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom,
3788 // (fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo) && (fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom));
3789 printf("Schema %zu/%zu: from %s to %s, name %s\n", ss, fSchema.size(),
3790 TimeToString(fSchema[ss]->fTimeFrom).c_str(),
3791 TimeToString(fSchema[ss]->fTimeTo).c_str(),
3792 fSchema[ss]->fEventName.c_str());
3793 }
3794
3795 if (ss > 0) {
3796 //if ((fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo) && (fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom)) {
3797 if ((fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeFrom)) {
3798 // good
3799 } else {
3800 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3801 return HS_FILE_ERROR;
3802 }
3803 }
3804 }
3805#endif
3806
3807 std::vector<HsSchema*> slist;
3808 std::vector<std::vector<int>> smap;
3809
3810 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3811 HsSchema* s = fSchema[ss];
3812 // schema is too new?
3813 if (s->fTimeFrom && s->fTimeFrom > end_time)
3814 continue;
3815 // schema is too old
3816 if (s->fTimeTo && s->fTimeTo < start_time)
3817 continue;
3818
3819 std::vector<int> sm;
3820
3821 for (int i=0; i<num_var; i++) {
3822 // schema for the variables we want?
3823 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3824 if (sindex < 0)
3825 continue;
3826
3827 if (sm.empty()) {
3828 for (int i=0; i<num_var; i++) {
3829 sm.push_back(-1);
3830 }
3831 }
3832
3833 sm[i] = sindex;
3834 }
3835
3836 if (!sm.empty()) {
3837 slist.push_back(s);
3838 smap.push_back(sm);
3839 }
3840 }
3841
3842 if (0||fDebug) {
3843 printf("Found %d matching schema:\n", (int)slist.size());
3844
3845 for (size_t i=0; i<slist.size(); i++) {
3846 HsSchema* s = slist[i];
3847 s->print();
3848 for (int k=0; k<num_var; k++)
3849 printf(" tag %s[%d] sindex %d\n", var_name[k], var_index[k], smap[i][k]);
3850 }
3851 }
3852
3853 //for (size_t ss=0; ss<slist.size(); ss++) {
3854 // HsSchema* s = slist[ss];
3855 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3856 // assert(fs != NULL);
3857 // printf("schema %zu from %s to %s, filename %s", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3858 // printf(" smap ");
3859 // for (int k=0; k<num_var; k++)
3860 // printf(" %2d", smap[ss][k]);
3861 // printf("\n");
3862 //}
3863
3864 for (size_t ss=1; ss<slist.size(); ss++) {
3865 if (fDebug) {
3866 printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3867 TimeToString(slist[ss-1]->fTimeFrom).c_str(),
3868 TimeToString(slist[ss]->fTimeFrom).c_str(),
3869 TimeToString(slist[ss]->fTimeTo).c_str(),
3870 slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom);
3871 }
3872 if (slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom) {
3873 // good
3874 } else {
3875 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3876 return HS_FILE_ERROR;
3877 }
3878 }
3879
3880 std::vector<time_t> last_time;
3881
3882 for (int i=0; i<num_var; i++) {
3883 last_time.push_back(start_time);
3884 }
3885
3886 for (int i=slist.size()-1; i>=0; i--) {
3887 HsSchema* s = slist[i];
3888
3889 int status = s->read_data(start_time, end_time, num_var, smap[i], var_index, fDebug, last_time, buffer);
3890
3891 if (status == HS_SUCCESS) {
3892 for (int j=0; j<num_var; j++) {
3893 if (smap[i][j] >= 0)
3895 }
3896 }
3897 }
3898
3899 return HS_SUCCESS;
3900}
3901
3903 int num_var,
3904 const char* const event_name[], const char* const var_name[], const int var_index[],
3905 int num_entries[],
3906 time_t* time_buffer[], double* data_buffer[],
3907 int st[])
3908{
3909 int status;
3910
3911 ReadBuffer** buffer = new ReadBuffer*[num_var];
3913
3914 for (int i=0; i<num_var; i++) {
3915 buffer[i] = new ReadBuffer(start_time, end_time, interval);
3916 bi[i] = buffer[i];
3917
3918 // make sure outputs are initialized to something sane
3919 if (num_entries)
3920 num_entries[i] = 0;
3921 if (time_buffer)
3922 time_buffer[i] = NULL;
3923 if (data_buffer)
3924 data_buffer[i] = NULL;
3925 if (st)
3926 st[i] = 0;
3927
3928 if (num_entries)
3929 buffer[i]->fNumEntries = &num_entries[i];
3930 if (time_buffer)
3931 buffer[i]->fTimeBuffer = &time_buffer[i];
3932 if (data_buffer)
3933 buffer[i]->fDataBuffer = &data_buffer[i];
3934 }
3935
3936 status = hs_read_buffer(start_time, end_time,
3937 num_var, event_name, var_name, var_index,
3938 bi, st);
3939
3940 for (int i=0; i<num_var; i++) {
3941 buffer[i]->Finish();
3942 delete buffer[i];
3943 }
3944
3945 delete[] buffer;
3946 delete[] bi;
3947
3948 return status;
3949}
3950
3952 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3953 int num_entries[],
3954 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3957 time_t last_time[], double last_value[],
3958 int st[])
3959{
3960 int status;
3961
3964
3965 for (int i=0; i<num_var; i++) {
3966 buffer[i] = new MidasHistoryBinnedBuffer(start_time, end_time, num_bins);
3967 xbuffer[i] = buffer[i];
3968
3969 if (count_bins)
3970 buffer[i]->fCount = count_bins[i];
3971 if (mean_bins)
3972 buffer[i]->fMean = mean_bins[i];
3973 if (rms_bins)
3974 buffer[i]->fRms = rms_bins[i];
3975 if (min_bins)
3976 buffer[i]->fMin = min_bins[i];
3977 if (max_bins)
3978 buffer[i]->fMax = max_bins[i];
3979 if (bins_first_time)
3980 buffer[i]->fBinsFirstTime = bins_first_time[i];
3981 if (bins_first_value)
3982 buffer[i]->fBinsFirstValue = bins_first_value[i];
3983 if (bins_last_time)
3984 buffer[i]->fBinsLastTime = bins_last_time[i];
3985 if (bins_last_value)
3986 buffer[i]->fBinsLastValue = bins_last_value[i];
3987 if (last_time)
3988 buffer[i]->fLastTimePtr = &last_time[i];
3989 if (last_value)
3990 buffer[i]->fLastValuePtr = &last_value[i];
3991
3992 buffer[i]->Start();
3993 }
3994
3995 status = hs_read_buffer(start_time, end_time,
3996 num_var, event_name, var_name, var_index,
3997 xbuffer,
3998 st);
3999
4000 for (int i=0; i<num_var; i++) {
4001 buffer[i]->Finish();
4002 if (num_entries)
4003 num_entries[i] = buffer[i]->fNumEntries;
4004 if (0) {
4005 for (int j=0; j<num_bins; j++) {
4006 printf("var %d bin %d count %d, first %s last %s value first %f last %f\n", i, j, count_bins[i][j], TimeToString(bins_first_time[i][j]).c_str(), TimeToString(bins_last_time[i][j]).c_str(), bins_first_value[i][j], bins_last_value[i][j]);
4007 }
4008 }
4009 delete buffer[i];
4010 }
4011
4012 delete[] buffer;
4013 delete[] xbuffer;
4014
4015 return status;
4016}
4017
4019// SQL schema //
4021
4023{
4024 if (!fSql->IsConnected()) {
4025 return HS_SUCCESS;
4026 }
4027
4028 int status = HS_SUCCESS;
4029 if (get_transaction_count() > 0) {
4032 }
4033 return status;
4034}
4035
4036int HsSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4037{
4038 if (!MatchEventName(this->fEventName.c_str(), event_name))
4039 return -1;
4040
4041 for (unsigned j=0; j<this->fVariables.size(); j++) {
4042 if (MatchTagName(this->fVariables[j].name.c_str(), this->fVariables[j].n_data, var_name, var_index)) {
4043 // Second clause in if() is case where MatchTagName used the "alternate tag name".
4044 // E.g. our variable name is "IM05[3]" (n_data=1), but we're looking for var_name="IM05" and var_index=3.
4045 if (var_index < this->fVariables[j].n_data || (this->fVariables[j].n_data == 1 && this->fVariables[j].name.find("[") != std::string::npos)) {
4046 return j;
4047 }
4048 }
4049 }
4050
4051 return -1;
4052}
4053
4054int HsSqlSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4055{
4056 if (event_name_cmp(this->fTableName, event_name)==0) {
4057 for (unsigned j=0; j<this->fVariables.size(); j++) {
4058 if (var_name_cmp(this->fColumnNames[j], var_name)==0)
4059 return j;
4060 }
4061 }
4062
4063 return HsSchema::match_event_var(event_name, var_name, var_index);
4064}
4065
4066static HsSqlSchema* NewSqlSchema(HsSchemaVector* sv, const char* table_name, time_t t)
4067{
4068 time_t tt = 0;
4069 int j=-1;
4070 int jjx=-1; // remember oldest schema
4071 time_t ttx = 0;
4072 for (unsigned i=0; i<sv->size(); i++) {
4073 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
4074 if (s->fTableName != table_name)
4075 continue;
4076
4077 if (s->fTimeFrom == t) {
4078 return s;
4079 }
4080
4081 // remember the last schema before time t
4082 if (s->fTimeFrom < t) {
4083 if (s->fTimeFrom > tt) {
4084 tt = s->fTimeFrom;
4085 j = i;
4086 }
4087 }
4088
4089 if (jjx < 0) {
4090 jjx = i;
4091 ttx = s->fTimeFrom;
4092 }
4093
4094 if (s->fTimeFrom < ttx) {
4095 jjx = i;
4096 ttx = s->fTimeFrom;
4097 }
4098
4099 //printf("table_name [%s], t=%s, i=%d, j=%d %s, tt=%s, dt is %d\n", table_name, TimeToString(t).c_str(), i, j, TimeToString(s->fTimeFrom).c_str(), TimeToString(tt).c_str(), (int)(s->fTimeFrom-t));
4100 }
4101
4102 //printf("NewSqlSchema: will copy schema j=%d, tt=%d at time %d\n", j, tt, t);
4103
4104 //printf("cloned schema at time %s: ", TimeToString(t).c_str());
4105 //(*sv)[j]->print(false);
4106
4107 //printf("schema before:\n");
4108 //sv->print(false);
4109
4110 if (j >= 0) {
4111 HsSqlSchema* s = new HsSqlSchema;
4112 *s = *(HsSqlSchema*)(*sv)[j]; // make a copy
4113 s->fTimeFrom = t;
4114 sv->add(s);
4115
4116 //printf("schema after:\n");
4117 //sv->print(false);
4118
4119 return s;
4120 }
4121
4122 if (jjx >= 0) {
4123 cm_msg(MERROR, "NewSqlSchema", "Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4124
4125 HsSqlSchema* s = new HsSqlSchema;
4126 *s = *(HsSqlSchema*)(*sv)[jjx]; // make a copy
4127 s->fTimeFrom = t;
4128 s->fTimeTo = ttx;
4129 sv->add(s);
4130
4131 //printf("schema after:\n");
4132 //sv->print(false);
4133
4134 return s;
4135 }
4136
4137 cm_msg(MERROR, "NewSqlSchema", "Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4138 return NULL;
4139}
4140
4142{
4143 assert(fVariables.size() == fColumnInactive.size());
4144 assert(fVariables.size() == fColumnNames.size());
4145 assert(fVariables.size() == fColumnTypes.size());
4146 assert(fVariables.size() == fOffsets.size());
4147
4148 size_t count_active = 0;
4149 size_t count_inactive = 0;
4150
4151 for (size_t i=0; i<fColumnInactive.size(); i++) {
4152 if (fColumnInactive[i])
4153 count_inactive += 1;
4154 else
4155 count_active += 1;
4156 }
4157
4158 //printf("remove_inactive_columns: enter! count_active: %zu, count_inactive: %zu\n", count_active, count_inactive);
4159 //print();
4160
4161 if (count_inactive > 0) {
4162 size_t j=0;
4163
4164 for (size_t i=0; i<fColumnInactive.size(); i++) {
4165 if (fColumnInactive[i]) {
4166 // skip this entry
4167 } else {
4168 if (j != i) {
4173 fOffsets[j] = fOffsets[i];
4174 }
4175 j++;
4176 }
4177 }
4178
4179 //print();
4180 //printf("%zu %zu\n", j, count_active);
4181
4182 assert(j == count_active);
4183
4184 //print();
4185
4186 fVariables.resize(count_active);
4188 fColumnNames.resize(count_active);
4189 fColumnTypes.resize(count_active);
4190 fOffsets.resize(count_active);
4191
4192 assert(fVariables.size() == fColumnInactive.size());
4193 assert(fVariables.size() == fColumnNames.size());
4194 assert(fVariables.size() == fColumnTypes.size());
4195 assert(fVariables.size() == fOffsets.size());
4196
4197 //printf("remove_inactice_columns: exit!\n");
4198 //print();
4199 }
4200}
4201
4202int HsSqlSchema::write_event(const time_t t, const char* data, const int data_size)
4203{
4204 HsSqlSchema* s = this;
4205
4206 assert(s->fVariables.size() == s->fColumnInactive.size());
4207 assert(s->fVariables.size() == s->fColumnNames.size());
4208 assert(s->fVariables.size() == s->fColumnTypes.size());
4209 assert(s->fVariables.size() == s->fOffsets.size());
4210
4211 std::string tags;
4212 std::string values;
4213
4214 for (unsigned i=0; i<s->fVariables.size(); i++) {
4215 // NB: inactive columns should have been removed from the schema. K.O.
4216
4217 if (s->fColumnInactive[i]) {
4218 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected inactive column %d", i);
4220 return HS_FILE_ERROR;
4221 }
4222
4223 int type = s->fVariables[i].type;
4224 int n_data = s->fVariables[i].n_data;
4225 int offset = s->fOffsets[i];
4226 const char* column_name = s->fColumnNames[i].c_str();
4227
4228 if (offset < 0) {
4229 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected negative offset %d for column %d", offset, i);
4231 return HS_FILE_ERROR;
4232 }
4233
4234 assert(n_data == 1);
4235 assert(strlen(column_name) > 0);
4236 assert(offset < data_size);
4237
4238 void* ptr = (void*)(data+offset);
4239
4240 tags += ", ";
4241 tags += fSql->QuoteId(column_name);
4242
4243 values += ", ";
4244
4245 char buf[1024];
4246 int j=0;
4247
4248 switch (type) {
4249 default:
4250 sprintf(buf, "unknownType%d", type);
4251 break;
4252 case TID_BYTE:
4253 sprintf(buf, "%u",((unsigned char *)ptr)[j]);
4254 break;
4255 case TID_SBYTE:
4256 sprintf(buf, "%d",((signed char*)ptr)[j]);
4257 break;
4258 case TID_CHAR:
4259 // FIXME: quotes
4260 sprintf(buf, "\'%c\'",((char*)ptr)[j]);
4261 break;
4262 case TID_WORD:
4263 sprintf(buf, "%u",((unsigned short *)ptr)[j]);
4264 break;
4265 case TID_SHORT:
4266 sprintf(buf, "%d",((short *)ptr)[j]);
4267 break;
4268 case TID_DWORD:
4269 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4270 break;
4271 case TID_INT:
4272 sprintf(buf, "%d",((int *)ptr)[j]);
4273 break;
4274 case TID_BOOL:
4275 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4276 break;
4277 case TID_FLOAT:
4278 // FIXME: quotes
4279 sprintf(buf, "\'%.8g\'",((float*)ptr)[j]);
4280 break;
4281 case TID_DOUBLE:
4282 // FIXME: quotes
4283 sprintf(buf, "\'%.16g\'",((double*)ptr)[j]);
4284 break;
4285 }
4286
4287 values += buf;
4288 }
4289
4290 // 2001-02-16 20:38:40.1
4291 struct tm tms;
4292 localtime_r(&t, &tms); // somebody must call tzset() before this.
4293 char buf[1024];
4294 strftime(buf, sizeof(buf)-1, "%Y-%m-%d %H:%M:%S.0", &tms);
4295
4296 std::string cmd;
4297 cmd = "INSERT INTO ";
4298 cmd += fSql->QuoteId(s->fTableName.c_str());
4299 cmd += " (_t_time, _i_time";
4300 cmd += tags;
4301 cmd += ") VALUES (";
4302 cmd += fSql->QuoteString(buf);
4303 cmd += ", ";
4304 cmd += fSql->QuoteString(TimeToString(t).c_str());
4305 cmd += "";
4306 cmd += values;
4307 cmd += ");";
4308
4309 if (fSql->IsConnected()) {
4310 if (s->get_transaction_count() == 0)
4311 fSql->OpenTransaction(s->fTableName.c_str());
4312
4314
4315 int status = fSql->Exec(s->fTableName.c_str(), cmd.c_str());
4316
4317 // mh2sql who does not call hs_flush_buffers()
4318 // so we should flush the transaction by hand
4319 // some SQL engines have limited transaction buffers... K.O.
4320 if (s->get_transaction_count() > 100000) {
4321 //printf("flush table %s\n", table_name);
4322 fSql->CommitTransaction(s->fTableName.c_str());
4324 }
4325
4326 if (status != DB_SUCCESS) {
4327 return status;
4328 }
4329 } else {
4330 int status = fSql->ExecDisconnected(s->fTableName.c_str(), cmd.c_str());
4331 if (status != DB_SUCCESS) {
4332 return status;
4333 }
4334 }
4335
4336 return HS_SUCCESS;
4337}
4338
4340 const int debug,
4341 time_t* last_written)
4342{
4343 if (debug)
4344 printf("SqlHistory::read_last_written: table [%s], timestamp %s\n", fTableName.c_str(), TimeToString(timestamp).c_str());
4345
4346 std::string cmd;
4347 cmd += "SELECT _i_time FROM ";
4348 cmd += fSql->QuoteId(fTableName.c_str());
4349 cmd += " WHERE _i_time < ";
4350 cmd += TimeToString(timestamp);
4351 cmd += " ORDER BY _i_time DESC LIMIT 2;";
4352
4353 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4354
4355 if (status != DB_SUCCESS)
4356 return status;
4357
4358 time_t lw = 0;
4359
4360 /* Loop through the rows in the result-set */
4361
4362 while (1) {
4363 status = fSql->Step();
4364 if (status != DB_SUCCESS)
4365 break;
4366
4367 time_t t = fSql->GetTime(0);
4368
4369 if (t >= timestamp)
4370 continue;
4371
4372 if (t > lw)
4373 lw = t;
4374 }
4375
4376 fSql->Finalize();
4377
4378 *last_written = lw;
4379
4380 if (debug)
4381 printf("SqlHistory::read_last_written: table [%s], timestamp %s, last_written %s\n", fTableName.c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
4382
4383 return HS_SUCCESS;
4384}
4385
4386int HsSqlSchema::read_data(const time_t start_time,
4387 const time_t end_time,
4388 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
4389 const int debug,
4390 std::vector<time_t>& last_time,
4392{
4393 bool bad_last_time = false;
4394
4395 if (debug)
4396 printf("SqlHistory::read_data: table [%s], start %s, end %s\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4397
4398 std::string collist;
4399
4400 for (int i=0; i<num_var; i++) {
4401 int j = var_schema_index[i];
4402 if (j < 0)
4403 continue;
4404 if (collist.length() > 0)
4405 collist += ", ";
4407 }
4408
4409 std::string cmd;
4410 cmd += "SELECT _i_time, ";
4411 cmd += collist;
4412 cmd += " FROM ";
4413 cmd += fSql->QuoteId(fTableName.c_str());
4414 cmd += " WHERE _i_time>=";
4415 cmd += TimeToString(start_time);
4416 cmd += " and _i_time<=";
4417 cmd += TimeToString(end_time);
4418 cmd += " ORDER BY _i_time;";
4419
4420 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4421
4422 if (status != DB_SUCCESS)
4423 return HS_FILE_ERROR;
4424
4425 /* Loop through the rows in the result-set */
4426
4427 int count = 0;
4428
4429 while (1) {
4430 status = fSql->Step();
4431 if (status != DB_SUCCESS)
4432 break;
4433
4434 count++;
4435
4436 time_t t = fSql->GetTime(0);
4437
4438 if (t < start_time || t > end_time)
4439 continue;
4440
4441 int k = 0;
4442
4443 for (int i=0; i<num_var; i++) {
4444 int j = var_schema_index[i];
4445 if (j < 0)
4446 continue;
4447
4448 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
4449 bad_last_time = true;
4450 } else {
4451 double v = fSql->GetDouble(1+k);
4452
4453 //printf("Column %d, index %d, Row %d, time %d, value %f\n", k, colindex[k], count, t, v);
4454
4455 buffer[i]->Add(t, v);
4456 last_time[i] = t;
4457 }
4458
4459 k++;
4460 }
4461 }
4462
4463 fSql->Finalize();
4464
4465 if (bad_last_time) {
4466 cm_msg(MERROR, "SqlHistory::read_data", "Detected duplicate or non-monotonous data in table \"%s\" for start time %s and end time %s", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4467 }
4468
4469 if (debug)
4470 printf("SqlHistory::read_data: table [%s], start %s, end %s, read %d rows\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), count);
4471
4472 return HS_SUCCESS;
4473}
4474
4476 if (!fSql || fSql->fTransactionPerTable) {
4478 } else {
4479 return gfTransactionCount[fSql];
4480 }
4481}
4482
4484 if (!fSql || fSql->fTransactionPerTable) {
4486 } else {
4488 }
4489}
4490
4498
4500// SQL history functions //
4502
4503static int StartSqlTransaction(SqlBase* sql, const char* table_name, bool* have_transaction)
4504{
4505 if (*have_transaction)
4506 return HS_SUCCESS;
4507
4508 int status = sql->OpenTransaction(table_name);
4509 if (status != DB_SUCCESS)
4510 return HS_FILE_ERROR;
4511
4512 *have_transaction = true;
4513 return HS_SUCCESS;
4514}
4515
4516static int CreateSqlTable(SqlBase* sql, const char* table_name, bool* have_transaction, bool set_default_timestamp = false)
4517{
4518 int status;
4519
4521 if (status != DB_SUCCESS)
4522 return HS_FILE_ERROR;
4523
4524 std::string cmd;
4525
4526 cmd = "CREATE TABLE ";
4527 cmd += sql->QuoteId(table_name);
4529 cmd += " (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4530 } else {
4531 cmd += " (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4532 }
4533
4534 status = sql->Exec(table_name, cmd.c_str());
4535
4536
4537 if (status == DB_KEY_EXIST) {
4538 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", but it already exists", table_name);
4540 return status;
4541 }
4542
4543 if (status != DB_SUCCESS) {
4544 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4546 return HS_FILE_ERROR;
4547 }
4548
4549 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\"", table_name);
4551
4552 std::string i_index_name;
4553 i_index_name = table_name;
4554 i_index_name += "_i_time_index";
4555
4556 std::string t_index_name;
4557 t_index_name = table_name;
4558 t_index_name += "_t_time_index";
4559
4560 cmd = "CREATE INDEX ";
4561 cmd += sql->QuoteId(i_index_name.c_str());
4562 cmd += " ON ";
4563 cmd += sql->QuoteId(table_name);
4564 cmd += " (_i_time ASC);";
4565
4566 status = sql->Exec(table_name, cmd.c_str());
4567 if (status != DB_SUCCESS)
4568 return HS_FILE_ERROR;
4569
4570 cmd = "CREATE INDEX ";
4571 cmd += sql->QuoteId(t_index_name.c_str());
4572 cmd += " ON ";
4573 cmd += sql->QuoteId(table_name);
4574 cmd += " (_t_time);";
4575
4576 status = sql->Exec(table_name, cmd.c_str());
4577 if (status != DB_SUCCESS)
4578 return HS_FILE_ERROR;
4579
4580 return status;
4581}
4582
4583static int CreateSqlHyperTable(SqlBase* sql, const char* table_name, bool* have_transaction) {
4584 int status;
4585
4587 if (status != DB_SUCCESS)
4588 return HS_FILE_ERROR;
4589
4590 std::string cmd;
4591
4592 cmd = "CREATE TABLE ";
4593 cmd += sql->QuoteId(table_name);
4594 cmd += " (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4595
4596 status = sql->Exec(table_name, cmd.c_str());
4597
4598 if (status == DB_KEY_EXIST) {
4599 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", but it already exists", table_name);
4601 return status;
4602 }
4603
4604 if (status != DB_SUCCESS) {
4605 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4607 return HS_FILE_ERROR;
4608 }
4609
4610 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\"", table_name);
4612
4613 cmd = "SELECT create_hypertable(";
4614 cmd += sql->QuoteString(table_name);
4615 cmd += ", '_t_time');";
4616
4617 // convert regular table to hypertable
4618 status = sql->Exec(table_name, cmd.c_str());
4619
4620 if (status != DB_SUCCESS) {
4621 cm_msg(MINFO, "CreateSqlHyperTable", "Converting SQL table to hypertable \"%s\", error status %d", table_name, status);
4623 return HS_FILE_ERROR;
4624 }
4625
4626 std::string i_index_name;
4627 i_index_name = table_name;
4628 i_index_name += "_i_time_index";
4629
4630 std::string t_index_name;
4631 t_index_name = table_name;
4632 t_index_name += "_t_time_index";
4633
4634 cmd = "CREATE INDEX ";
4635 cmd += sql->QuoteId(i_index_name.c_str());
4636 cmd += " ON ";
4637 cmd += sql->QuoteId(table_name);
4638 cmd += " (_i_time ASC);";
4639
4640 status = sql->Exec(table_name, cmd.c_str());
4641 if (status != DB_SUCCESS)
4642 return HS_FILE_ERROR;
4643
4644 cmd = "CREATE INDEX ";
4645 cmd += sql->QuoteId(t_index_name.c_str());
4646 cmd += " ON ";
4647 cmd += sql->QuoteId(table_name);
4648 cmd += " (_t_time);";
4649
4650 status = sql->Exec(table_name, cmd.c_str());
4651 if (status != DB_SUCCESS)
4652 return HS_FILE_ERROR;
4653
4654 return status;
4655}
4656
4657static int CreateSqlColumn(SqlBase* sql, const char* table_name, const char* column_name, const char* column_type, bool* have_transaction, int debug)
4658{
4659 if (debug)
4660 printf("CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4661
4662 int status = StartSqlTransaction(sql, table_name, have_transaction);
4663 if (status != HS_SUCCESS)
4664 return status;
4665
4666 std::string cmd;
4667 cmd = "ALTER TABLE ";
4668 cmd += sql->QuoteId(table_name);
4669 cmd += " ADD COLUMN ";
4670 cmd += sql->QuoteId(column_name);
4671 cmd += " ";
4672 cmd += column_type;
4673 cmd += ";";
4674
4675 status = sql->Exec(table_name, cmd.c_str());
4676
4677 cm_msg(MINFO, "CreateSqlColumn", "Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name, status);
4679
4680 return status;
4681}
4682
4684// SQL history base classes //
4686
4688{
4689public:
4691
4693 {
4694 fSql = NULL;
4696 }
4697
4698 virtual ~SqlHistoryBase() // dtor
4699 {
4700 hs_disconnect();
4701 if (fSql)
4702 delete fSql;
4703 fSql = NULL;
4704 }
4705
4707 {
4708 if (fSql)
4709 fSql->fDebug = debug;
4711 }
4712
4713 int hs_connect(const char* connect_string);
4714 int hs_disconnect();
4715 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
4716 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
4717
4718protected:
4720 virtual int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name) = 0;
4721 virtual int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp) = 0;
4722 virtual int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction) = 0;
4723
4724 int update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable);
4725 int update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction);
4726};
4727
4729{
4730 if (fDebug)
4731 printf("hs_connect [%s]!\n", connect_string);
4732
4733 assert(fSql);
4734
4735 if (fSql->IsConnected())
4736 if (strcmp(fConnectString.c_str(), connect_string) == 0)
4737 return HS_SUCCESS;
4738
4739 hs_disconnect();
4740
4741 if (!connect_string || strlen(connect_string) < 1) {
4742 // FIXME: should use "logger dir" or some such default, that code should be in hs_get_history(), not here
4743 connect_string = ".";
4744 }
4745
4747
4748 if (fDebug)
4749 printf("hs_connect: connecting to SQL database \'%s\'\n", fConnectString.c_str());
4750
4751 int status = fSql->Connect(fConnectString.c_str());
4752 if (status != DB_SUCCESS)
4753 return status;
4754
4755 return HS_SUCCESS;
4756}
4757
4759{
4760 if (fDebug)
4761 printf("hs_disconnect!\n");
4762
4764
4765 fSql->Disconnect();
4766
4768
4769 return HS_SUCCESS;
4770}
4771
4772HsSchema* SqlHistoryBase::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
4773{
4774 if (fDebug)
4775 printf("SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
4776
4777 int status;
4778
4779 if (fWriterCurrentSchema.size() == 0) {
4781 if (status != HS_SUCCESS)
4782 return NULL;
4783 }
4784
4785 HsSqlSchema* s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4786
4787 // schema does not exist, the SQL tables probably do not exist yet
4788
4789 if (!s) {
4790 status = create_table(&fWriterCurrentSchema, event_name, timestamp);
4791 if (status != HS_SUCCESS)
4792 return NULL;
4793
4794 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4795
4796 if (!s) {
4797 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4798 fWriterCurrentSchema.find_event(event_name, timestamp, 1);
4799 return NULL;
4800 }
4801 }
4802
4803 assert(s != NULL);
4804
4806 if (status != HS_SUCCESS)
4807 return NULL;
4808
4809 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4810
4811 if (!s) {
4812 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4813 return NULL;
4814 }
4815
4816 if (0||fDebug) {
4817 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4818 if (s)
4819 s->print();
4820 }
4821
4822 status = update_schema(s, timestamp, ntags, tags, true);
4823 if (status != HS_SUCCESS) {
4824 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4825 return NULL;
4826 }
4827
4829 if (status != HS_SUCCESS)
4830 return NULL;
4831
4832 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4833
4834 if (!s) {
4835 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4836 return NULL;
4837 }
4838
4839 if (0||fDebug) {
4840 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4841 if (s)
4842 s->print();
4843 }
4844
4845 // last call to UpdateMysqlSchema with "false" will check that new schema matches the new tags
4846
4847 status = update_schema(s, timestamp, ntags, tags, false);
4848 if (status != HS_SUCCESS) {
4849 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4850 //fDebug = 1;
4851 //update_schema(s, timestamp, ntags, tags, false);
4852 //abort();
4853 return NULL;
4854 }
4855
4856 HsSqlSchema* e = new HsSqlSchema();
4857
4858 *e = *s; // make a copy of the schema
4859
4860 return e;
4861}
4862
4863int SqlHistoryBase::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
4864{
4865 if (fDebug)
4866 printf("SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
4867
4868 int status;
4869
4870 if (fSchema.size() == 0) {
4872 if (status != HS_SUCCESS)
4873 return status;
4874 }
4875
4876 //sv->print(false);
4877
4878 if (event_name == NULL)
4879 return HS_SUCCESS;
4880
4881 for (unsigned i=0; i<sv->size(); i++) {
4882 HsSqlSchema* h = (HsSqlSchema*)(*sv)[i];
4883 // skip schema with already read column names
4884 if (h->fVariables.size() > 0)
4885 continue;
4886 // skip schema with different name
4887 if (!MatchEventName(h->fEventName.c_str(), event_name))
4888 continue;
4889
4890 unsigned nn = sv->size();
4891
4892 status = read_column_names(sv, h->fTableName.c_str(), h->fEventName.c_str());
4893
4894 // if new schema was added, loop all over again
4895 if (sv->size() != nn)
4896 i=0;
4897 }
4898
4899 //sv->print(false);
4900
4901 return HS_SUCCESS;
4902}
4903
4904int SqlHistoryBase::update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
4905{
4906 int status;
4907 bool have_transaction = false;
4908
4909 status = update_schema1(s, timestamp, ntags, tags, write_enable, &have_transaction);
4910
4911 if (have_transaction) {
4912 int xstatus;
4913
4914 if (status == HS_SUCCESS)
4916 else
4918
4919 if (xstatus != DB_SUCCESS) {
4920 return HS_FILE_ERROR;
4921 }
4922 have_transaction = false;
4923 }
4924
4925 return status;
4926}
4927
4928int SqlHistoryBase::update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction)
4929{
4930 int status;
4931
4932 if (fDebug)
4933 printf("update_schema1\n");
4934
4935 // check that compare schema with tags[]
4936
4937 bool schema_ok = true;
4938
4939 int offset = 0;
4940 for (int i=0; i<ntags; i++) {
4941 for (unsigned int j=0; j<tags[i].n_data; j++) {
4942 int tagtype = tags[i].type;
4943 std::string tagname = tags[i].name;
4944 std::string maybe_colname = MidasNameToSqlName(tags[i].name);
4945
4946 if (tags[i].n_data > 1) {
4947 char s[256];
4948 sprintf(s, "[%d]", j);
4949 tagname += s;
4950
4951 sprintf(s, "_%d", j);
4952 maybe_colname += s;
4953 }
4954
4955 int count = 0;
4956
4957 for (unsigned j=0; j<s->fVariables.size(); j++) {
4958 // NB: inactive columns will be reactivated or recreated by the if(count==0) branch. K.O.
4959 if (s->fColumnInactive[j])
4960 continue;
4961 if (tagname == s->fVariables[j].name) {
4962 if (s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str())) {
4963 if (count == 0) {
4964 s->fOffsets[j] = offset;
4966 }
4967 count++;
4968 if (count > 1) {
4969 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
4971 }
4972 } else {
4973 // column with incompatible type, mark it as unused
4974 schema_ok = false;
4975 if (fDebug)
4976 printf("Incompatible column!\n");
4977 if (write_enable) {
4978 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" as incompatible with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
4980
4981 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[i].tag_type.c_str(), timestamp, false, have_transaction);
4982 if (status != HS_SUCCESS)
4983 return status;
4984 }
4985 }
4986 }
4987 }
4988
4989 if (count == 0) {
4990 // tag does not have a corresponding column
4991 schema_ok = false;
4992 if (fDebug)
4993 printf("No column for tag %s!\n", tagname.c_str());
4994
4995 bool found_column = false;
4996
4997 if (write_enable) {
4998 for (unsigned j=0; j<s->fVariables.size(); j++) {
4999 if (tagname == s->fVariables[j].tag_name) {
5000 bool typeok = s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str());
5001 if (typeok) {
5002 cm_msg(MINFO, "SqlHistory::update_schema", "Reactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5004
5005 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[j].tag_type.c_str(), timestamp, true, have_transaction);
5006 if (status != HS_SUCCESS)
5007 return status;
5008
5009 if (count == 0) {
5010 s->fOffsets[j] = offset;
5012 }
5013 count++;
5014 found_column = true;
5015 if (count > 1) {
5016 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5018 }
5019 }
5020 }
5021 }
5022 }
5023
5024 // create column
5025 if (!found_column && write_enable) {
5026 std::string col_name = maybe_colname;
5027 const char* col_type = s->fSql->ColumnType(tagtype);
5028
5029 bool dupe = false;
5030 for (unsigned kk=0; kk<s->fColumnNames.size(); kk++)
5031 if (s->fColumnNames[kk] == col_name) {
5032 dupe = true;
5033 break;
5034 }
5035
5036 time_t now = time(NULL);
5037
5038 bool retry = false;
5039 for (int t=0; t<20; t++) {
5040
5041 // if duplicate column name, change it, try again
5042 if (dupe || retry) {
5044 col_name += "_";
5046 if (t > 0) {
5047 char s[256];
5048 sprintf(s, "_%d", t);
5049 col_name += s;
5050 }
5051 }
5052
5053 if (fDebug)
5054 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5055
5057
5058 if (status == DB_KEY_EXIST) {
5059 if (fDebug)
5060 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5061 retry = true;
5062 continue;
5063 }
5064
5065 if (status != HS_SUCCESS)
5066 return status;
5067
5068 break;
5069 }
5070
5071 if (status != HS_SUCCESS)
5072 return status;
5073
5074 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str(), rpc_tid_name(tagtype), timestamp, true, have_transaction);
5075 if (status != HS_SUCCESS)
5076 return status;
5077 }
5078 }
5079
5080 if (count > 1) {
5081 // schema has duplicate tags
5082 schema_ok = false;
5083 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->fEventName.c_str(), tagname.c_str());
5085 }
5086 }
5087 }
5088
5089 // mark as unused all columns not listed in tags
5090
5091 for (unsigned k=0; k<s->fColumnNames.size(); k++)
5092 if (s->fVariables[k].name.length() > 0) {
5093 bool found = false;
5094
5095 for (int i=0; i<ntags; i++) {
5096 for (unsigned int j=0; j<tags[i].n_data; j++) {
5097 std::string tagname = tags[i].name;
5098
5099 if (tags[i].n_data > 1) {
5100 char s[256];
5101 sprintf(s, "[%d]", j);
5102 tagname += s;
5103 }
5104
5105 if (s->fVariables[k].name == tagname) {
5106 found = true;
5107 break;
5108 }
5109 }
5110
5111 if (found)
5112 break;
5113 }
5114
5115 if (!found) {
5116 // column not found in tags list
5117 schema_ok = false;
5118 if (fDebug)
5119 printf("Event [%s] Column [%s] tag [%s] not listed in tags list!\n", s->fEventName.c_str(), s->fColumnNames[k].c_str(), s->fVariables[k].name.c_str());
5120 if (write_enable) {
5121 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" not used for any tags", s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fTableName.c_str(), s->fEventName.c_str());
5123
5124 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fVariables[k].tag_name.c_str(), s->fVariables[k].tag_type.c_str(), timestamp, false, have_transaction);
5125 if (status != HS_SUCCESS)
5126 return status;
5127 }
5128 }
5129 }
5130
5131 if (!write_enable)
5132 if (!schema_ok) {
5133 if (fDebug)
5134 printf("Return error!\n");
5135 return HS_FILE_ERROR;
5136 }
5137
5138 return HS_SUCCESS;
5139}
5140
5142// SQLITE functions //
5144
5145static int ReadSqliteTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5146{
5147 if (debug)
5148 printf("ReadSqliteTableNames: table [%s]\n", table_name);
5149
5150 int status;
5151 std::string cmd;
5152
5153 // FIXME: quotes
5154 cmd = "SELECT event_name, _i_time FROM \'_event_name_";
5155 cmd += table_name;
5156 cmd += "\' WHERE table_name='";
5157 cmd += table_name;
5158 cmd += "';";
5159
5160 status = sql->Prepare(table_name, cmd.c_str());
5161
5162 if (status != DB_SUCCESS)
5163 return status;
5164
5165 while (1) {
5166 status = sql->Step();
5167
5168 if (status != DB_SUCCESS)
5169 break;
5170
5171 std::string xevent_name = sql->GetText(0);
5172 time_t xevent_time = sql->GetTime(1);
5173
5174 //printf("read event name [%s] time %s\n", xevent_name.c_str(), TimeToString(xevent_time).c_str());
5175
5176 HsSqlSchema* s = new HsSqlSchema;
5177 s->fSql = sql;
5180 s->fTimeTo = 0;
5181 s->fTableName = table_name;
5182 sv->add(s);
5183 }
5184
5185 status = sql->Finalize();
5186
5187 return HS_SUCCESS;
5188}
5189
5190static int ReadSqliteTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5191{
5192 if (debug)
5193 printf("ReadSqliteTableSchema: table [%s]\n", table_name);
5194
5195 if (1) {
5196 // seed schema with table names
5197 HsSqlSchema* s = new HsSqlSchema;
5198 s->fSql = sql;
5199 s->fEventName = table_name;
5200 s->fTimeFrom = 0;
5201 s->fTimeTo = 0;
5202 s->fTableName = table_name;
5203 sv->add(s);
5204 }
5205
5206 return ReadSqliteTableNames(sql, sv, table_name, debug);
5207}
5208
5210// SQLITE history classes //
5212
5214{
5215public:
5216 SqliteHistory() { // ctor
5217#ifdef HAVE_SQLITE
5218 fSql = new Sqlite();
5219#endif
5220 }
5221
5223 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5224 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5225 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5226};
5227
5229{
5230 int status;
5231
5232 if (fDebug)
5233 printf("SqliteHistory::read_table_and_event_names!\n");
5234
5235 // loop over all tables
5236
5237 std::vector<std::string> tables;
5239 if (status != DB_SUCCESS)
5240 return status;
5241
5242 for (unsigned i=0; i<tables.size(); i++) {
5243 const char* table_name = tables[i].c_str();
5244
5245 const char* s;
5246 s = strstr(table_name, "_event_name_");
5247 if (s == table_name)
5248 continue;
5249 s = strstr(table_name, "_column_names_");
5250 if (s == table_name)
5251 continue;
5252
5253 status = ReadSqliteTableSchema(fSql, sv, table_name, fDebug);
5254 }
5255
5256 return HS_SUCCESS;
5257}
5258
5259int SqliteHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5260{
5261 if (fDebug)
5262 printf("SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5263
5264 // for all schema for table_name, prepopulate is with column names
5265
5266 std::vector<std::string> columns;
5267 fSql->ListColumns(table_name, &columns);
5268
5269 // first, populate column names
5270
5271 for (unsigned i=0; i<sv->size(); i++) {
5272 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5273
5274 if (s->fTableName != table_name)
5275 continue;
5276
5277 // schema should be empty at this point
5278 //assert(s->fVariables.size() == 0);
5279
5280 for (unsigned j=0; j<columns.size(); j+=2) {
5281 const char* cn = columns[j+0].c_str();
5282 const char* ct = columns[j+1].c_str();
5283
5284 if (strcmp(cn, "_t_time") == 0)
5285 continue;
5286 if (strcmp(cn, "_i_time") == 0)
5287 continue;
5288
5289 bool found = false;
5290
5291 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
5292 if (s->fColumnNames[k] == cn) {
5293 found = true;
5294 break;
5295 }
5296 }
5297
5298 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5299
5300 if (!found) {
5302 se.name = cn;
5303 se.type = 0;
5304 se.n_data = 1;
5305 se.n_bytes = 0;
5306 s->fVariables.push_back(se);
5307 s->fColumnNames.push_back(cn);
5308 s->fColumnTypes.push_back(ct);
5309 s->fColumnInactive.push_back(false);
5310 s->fOffsets.push_back(-1);
5311 }
5312 }
5313 }
5314
5315 // then read column name information
5316
5317 std::string tn;
5318 tn += "_column_names_";
5319 tn += table_name;
5320
5321 std::string cmd;
5322 cmd = "SELECT column_name, tag_name, tag_type, _i_time FROM ";
5323 cmd += fSql->QuoteId(tn.c_str());
5324 cmd += " WHERE table_name=";
5325 cmd += fSql->QuoteString(table_name);
5326 cmd += " ORDER BY _i_time ASC;";
5327
5328 int status = fSql->Prepare(table_name, cmd.c_str());
5329
5330 if (status != DB_SUCCESS) {
5331 return status;
5332 }
5333
5334 while (1) {
5335 status = fSql->Step();
5336
5337 if (status != DB_SUCCESS)
5338 break;
5339
5340 // NOTE: SQL "SELECT ORDER BY _i_time ASC" returns data sorted by time
5341 // in this code we use the data from the last data row
5342 // so if multiple rows are present, the latest one is used
5343
5344 std::string col_name = fSql->GetText(0);
5345 std::string tag_name = fSql->GetText(1);
5346 std::string tag_type = fSql->GetText(2);
5348
5349 //printf("read table [%s] column [%s] tag name [%s] time %s\n", table_name, col_name.c_str(), tag_name.c_str(), TimeToString(xxx_time).c_str());
5350
5351 // make sure a schema exists at this time point
5352 NewSqlSchema(sv, table_name, schema_time);
5353
5354 // add this information to all schema
5355
5356 for (unsigned i=0; i<sv->size(); i++) {
5357 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5358 if (s->fTableName != table_name)
5359 continue;
5360 if (s->fTimeFrom < schema_time)
5361 continue;
5362
5363 //printf("add column to schema %d\n", s->fTimeFrom);
5364
5365 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
5366 if (col_name != s->fColumnNames[j])
5367 continue;
5368 s->fVariables[j].name = tag_name;
5369 s->fVariables[j].type = rpc_name_tid(tag_type.c_str());
5370 s->fVariables[j].n_data = 1;
5371 s->fVariables[j].n_bytes = rpc_tid_size(s->fVariables[j].type);
5372 }
5373 }
5374 }
5375
5376 status = fSql->Finalize();
5377
5378 return HS_SUCCESS;
5379}
5380
5381int SqliteHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5382{
5383 if (fDebug)
5384 printf("SqliteHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5385
5386 int status;
5387 bool have_transaction = false;
5388 std::string table_name = MidasNameToSqlName(event_name);
5389
5390 // FIXME: what about duplicate table names?
5391 status = CreateSqlTable(fSql, table_name.c_str(), &have_transaction);
5392
5393 //if (status == DB_KEY_EXIST) {
5394 // return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5395 //}
5396
5397 if (status != HS_SUCCESS) {
5398 // FIXME: ???
5399 // FIXME: at least close or revert the transaction
5400 return status;
5401 }
5402
5403 std::string cmd;
5404
5405 std::string en;
5406 en += "_event_name_";
5407 en += table_name;
5408
5409 cmd = "CREATE TABLE ";
5410 cmd += fSql->QuoteId(en.c_str());
5411 cmd += " (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5412
5413 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5414
5415 cmd = "INSERT INTO ";
5416 cmd += fSql->QuoteId(en.c_str());
5417 cmd += " (table_name, event_name, _i_time) VALUES (";
5418 cmd += fSql->QuoteString(table_name.c_str());
5419 cmd += ", ";
5420 cmd += fSql->QuoteString(event_name);
5421 cmd += ", ";
5422 cmd += fSql->QuoteString(TimeToString(timestamp).c_str());
5423 cmd += ");";
5424
5425 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5426
5427 std::string cn;
5428 cn += "_column_names_";
5429 cn += table_name;
5430
5431 cmd = "CREATE TABLE ";
5432 cmd += fSql->QuoteId(cn.c_str());
5433 cmd += " (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5434
5435 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5436
5437 status = fSql->CommitTransaction(table_name.c_str());
5438 if (status != DB_SUCCESS) {
5439 return HS_FILE_ERROR;
5440 }
5441
5442 return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5443}
5444
5445int SqliteHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5446{
5447 if (fDebug)
5448 printf("SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name, TimeToString(timestamp).c_str());
5449
5450 int status = StartSqlTransaction(fSql, table_name, have_transaction);
5451 if (status != HS_SUCCESS)
5452 return status;
5453
5454 // FIXME: quotes
5455 std::string cmd;
5456 cmd = "INSERT INTO \'_column_names_";
5457 cmd += table_name;
5458 cmd += "\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5459 cmd += table_name;
5460 cmd += "\', \'";
5461 cmd += column_name;
5462 cmd += "\', \'";
5463 cmd += tag_name;
5464 cmd += "\', \'";
5465 cmd += tag_type;
5466 cmd += "\', \'";
5467 cmd += column_type;
5468 cmd += "\', \'";
5469 cmd += TimeToString(timestamp);
5470 cmd += "\');";
5471 status = fSql->Exec(table_name, cmd.c_str());
5472
5473 return status;
5474}
5475
5477// Mysql history classes //
5479
5481{
5482public:
5483 MysqlHistory() { // ctor
5484#ifdef HAVE_MYSQL
5485 fSql = new Mysql();
5486#endif
5487 }
5488
5490 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5491 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5492 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5493};
5494
5495static int ReadMysqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5496{
5497 if (debug)
5498 printf("ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5499
5500 int status;
5501 std::string cmd;
5502
5503 if (table_name) {
5504 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5505 cmd += table_name;
5506 cmd += "';";
5507 } else {
5508 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5509 table_name = "_history_index";
5510 }
5511
5512 status = sql->Prepare(table_name, cmd.c_str());
5513
5514 if (status != DB_SUCCESS)
5515 return status;
5516
5517 bool found_must_have_table = false;
5518 int count = 0;
5519
5520 while (1) {
5521 status = sql->Step();
5522
5523 if (status != DB_SUCCESS)
5524 break;
5525
5526 const char* xevent_name = sql->GetText(0);
5527 const char* xtable_name = sql->GetText(1);
5528 time_t xevent_time = sql->GetTime(2);
5529
5530 if (debug == 999) {
5531 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5532 }
5533
5535 assert(must_have_event_name != NULL);
5537 found_must_have_table = true;
5538 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5539 } else {
5540 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5541 }
5542 }
5543
5544 HsSqlSchema* s = new HsSqlSchema;
5545 s->fSql = sql;
5548 s->fTimeTo = 0;
5550 sv->add(s);
5551 count++;
5552 }
5553
5554 status = sql->Finalize();
5555
5557 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5558 if (debug == 999)
5559 return HS_FILE_ERROR;
5560 // NB: recursion is broken by setting debug to 999.
5562 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5564 abort();
5565 return HS_FILE_ERROR;
5566 }
5567
5568 if (0) {
5569 // print accumulated schema
5570 printf("ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5571 sv->print(false);
5572 }
5573
5574 return HS_SUCCESS;
5575}
5576
5577int MysqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5578{
5579 if (fDebug)
5580 printf("MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5581
5582 // for all schema for table_name, prepopulate is with column names
5583
5584 std::vector<std::string> columns;
5585 fSql->ListColumns(table_name, &columns);
5586
5587 // first, populate column names
5588
5589 for (unsigned i=0; i<sv->size(); i++) {
5590 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5591
5592 if (s->fTableName != table_name)
5593 continue;
5594
5595 // schema should be empty at this point
5596 //assert(s->fVariables.size() == 0);
5597
5598 for (unsigned j=0; j<columns.size(); j+=2) {
5599 const char* cn = columns[j+0].c_str();
5600 const char* ct = columns[j+1].c_str();
5601
5602 if (strcmp(cn, "_t_time") == 0)
5603 continue;
5604 if (strcmp(cn, "_i_time") == 0)
5605 continue;
5606
5607 bool found = false;
5608
5609 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
5610 if (s->fColumnNames[k] == cn) {
5611 found = true;
5612 break;
5613 }
5614 }
5615
5616 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5617
5618 if (!found) {
5620 se.tag_name = cn;
5621 se.tag_type = "";
5622 se.name = cn;
5623 se.type = 0;
5624 se.n_data = 1;
5625 se.n_bytes = 0;
5626 s->fVariables.push_back(se);
5627 s->fColumnNames.push_back(cn);
5628 s->fColumnTypes.push_back(ct);
5629 s->fColumnInactive.push_back(false);
5630 s->fOffsets.push_back(-1);
5631 }
5632 }
5633 }
5634
5635 // then read column name information
5636
5637 std::string cmd;
5638 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5639 cmd += fSql->QuoteString(event_name);
5640 cmd += ";";
5641
5642 int status = fSql->Prepare(table_name, cmd.c_str());
5643
5644 if (status != DB_SUCCESS) {
5645 return status;
5646 }
5647
5648 while (1) {
5649 status = fSql->Step();
5650
5651 if (status != DB_SUCCESS)
5652 break;
5653
5654 const char* col_name = fSql->GetText(0);
5655 const char* col_type = fSql->GetText(1);
5656 const char* tag_name = fSql->GetText(2);
5657 const char* tag_type = fSql->GetText(3);
5659 const char* active = fSql->GetText(5);
5660 int iactive = atoi(active);
5661
5662 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
5663
5664 if (!col_name)
5665 continue;
5666 if (!tag_name)
5667 continue;
5668 if (strlen(col_name) < 1)
5669 continue;
5670
5671 // make sure a schema exists at this time point
5672 NewSqlSchema(sv, table_name, schema_time);
5673
5674 // add this information to all schema
5675
5676 for (unsigned i=0; i<sv->size(); i++) {
5677 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5678 if (s->fTableName != table_name)
5679 continue;
5680 if (s->fTimeFrom < schema_time)
5681 continue;
5682
5683 int tid = rpc_name_tid(tag_type);
5684 int tid_size = rpc_tid_size(tid);
5685
5686 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
5687 if (col_name != s->fColumnNames[j])
5688 continue;
5689
5690 s->fVariables[j].tag_name = tag_name;
5691 s->fVariables[j].tag_type = tag_type;
5692 if (!iactive) {
5693 s->fVariables[j].name = "";
5694 s->fColumnInactive[j] = true;
5695 } else {
5696 s->fVariables[j].name = tag_name;
5697 s->fColumnInactive[j] = false;
5698 }
5699 s->fVariables[j].type = tid;
5700 s->fVariables[j].n_data = 1;
5701 s->fVariables[j].n_bytes = tid_size;
5702
5703 // doctor column names in case MySQL returns different type
5704 // from the type used to create the column, but the types
5705 // are actually the same. K.O.
5707 }
5708 }
5709 }
5710
5711 status = fSql->Finalize();
5712
5713 return HS_SUCCESS;
5714}
5715
5716#if 0
5717static int ReadMysqlTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5718{
5719 if (debug)
5720 printf("ReadMysqlTableSchema: table [%s]\n", table_name);
5721
5722 if (1) {
5723 // seed schema with table names
5724 HsSqlSchema* s = new HsSqlSchema;
5725 s->fSql = sql;
5726 s->fEventName = table_name;
5727 s->fTimeFrom = 0;
5728 s->fTimeTo = 0;
5729 s->fTableName = table_name;
5730 sv->add(s);
5731 }
5732
5733 return ReadMysqlTableNames(sql, sv, table_name, debug, NULL, NULL);
5734}
5735#endif
5736
5738{
5739 int status;
5740
5741 if (fDebug)
5742 printf("MysqlHistory::read_table_and_event_names!\n");
5743
5744 // loop over all tables
5745
5746 std::vector<std::string> tables;
5748 if (status != DB_SUCCESS)
5749 return status;
5750
5751 for (unsigned i=0; i<tables.size(); i++) {
5752 const char* table_name = tables[i].c_str();
5753
5754 const char* s;
5755 s = strstr(table_name, "_history_index");
5756 if (s == table_name)
5757 continue;
5758
5759 if (1) {
5760 // seed schema with table names
5761 HsSqlSchema* s = new HsSqlSchema;
5762 s->fSql = fSql;
5763 s->fEventName = table_name;
5764 s->fTimeFrom = 0;
5765 s->fTimeTo = 0;
5766 s->fTableName = table_name;
5767 sv->add(s);
5768 }
5769 }
5770
5771 if (0) {
5772 // print accumulated schema
5773 printf("read_table_and_event_names:\n");
5774 sv->print(false);
5775 }
5776
5778
5779 return HS_SUCCESS;
5780}
5781
5782int MysqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5783{
5784 if (fDebug)
5785 printf("MysqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5786
5787 int status;
5788 std::string table_name = MidasNameToSqlName(event_name);
5789
5790 // MySQL table name length limit is 64 bytes
5791 if (table_name.length() > 40) {
5792 table_name.resize(40);
5793 table_name += "_T";
5794 }
5795
5796 time_t now = time(NULL);
5797
5798 int max_attempts = 10;
5799 for (int i=0; i<max_attempts; i++) {
5800 status = fSql->OpenTransaction(table_name.c_str());
5801 if (status != DB_SUCCESS) {
5802 return HS_FILE_ERROR;
5803 }
5804
5805 bool have_transaction = true;
5806
5807 std::string xtable_name = table_name;
5808
5809 if (i>0) {
5810 xtable_name += "_";
5812 if (i>1) {
5813 xtable_name += "_";
5814 char buf[256];
5815 sprintf(buf, "%d", i);
5816 xtable_name += buf;
5817 }
5818 }
5819
5821
5822 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
5823
5824 if (status == DB_KEY_EXIST) {
5825 // already exists, try with different name!
5826 fSql->RollbackTransaction(table_name.c_str());
5827 continue;
5828 }
5829
5830 if (status != HS_SUCCESS) {
5831 // MYSQL cannot roll back "create table", if we cannot create SQL tables, nothing will work. Give up now.
5832 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name, TimeToString(timestamp).c_str());
5833 abort();
5834
5835 // fatal error, give up!
5836 fSql->RollbackTransaction(table_name.c_str());
5837 break;
5838 }
5839
5840 for (int j=0; j<2; j++) {
5841 std::string cmd;
5842 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5843 cmd += fSql->QuoteString(event_name);
5844 cmd += ", ";
5845 cmd += fSql->QuoteString(xtable_name.c_str());
5846 cmd += ", ";
5847 char buf[256];
5848 sprintf(buf, "%.0f", (double)timestamp);
5849 cmd += fSql->QuoteString(buf);
5850 cmd += ", ";
5851 cmd += fSql->QuoteString("1");
5852 cmd += ");";
5853
5854 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
5855 if (status == DB_SUCCESS)
5856 break;
5857
5858 status = CreateSqlTable(fSql, "_history_index", &have_transaction);
5859 status = CreateSqlColumn(fSql, "_history_index", "event_name", "varchar(256) character set binary not null", &have_transaction, fDebug);
5860 status = CreateSqlColumn(fSql, "_history_index", "table_name", "varchar(256)", &have_transaction, fDebug);
5861 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "varchar(256) character set binary", &have_transaction, fDebug);
5862 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "varchar(256)", &have_transaction, fDebug);
5863 status = CreateSqlColumn(fSql, "_history_index", "column_name", "varchar(256)", &have_transaction, fDebug);
5864 status = CreateSqlColumn(fSql, "_history_index", "column_type", "varchar(256)", &have_transaction, fDebug);
5865 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
5866 status = CreateSqlColumn(fSql, "_history_index", "active", "boolean", &have_transaction, fDebug);
5867 }
5868
5869 status = fSql->CommitTransaction(table_name.c_str());
5870
5871 if (status != DB_SUCCESS) {
5872 return HS_FILE_ERROR;
5873 }
5874
5875 return ReadMysqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
5876 }
5877
5878 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
5879
5880 return HS_FILE_ERROR;
5881}
5882
5883int MysqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5884{
5885 if (fDebug)
5886 printf("MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
5887
5888 std::string cmd;
5889 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5890 cmd += fSql->QuoteString(event_name);
5891 cmd += ", ";
5892 cmd += fSql->QuoteString(table_name);
5893 cmd += ", ";
5894 cmd += fSql->QuoteString(tag_name);
5895 cmd += ", ";
5896 cmd += fSql->QuoteString(tag_type);
5897 cmd += ", ";
5898 cmd += fSql->QuoteString(column_name);
5899 cmd += ", ";
5900 cmd += fSql->QuoteString(column_type);
5901 cmd += ", ";
5902 char buf[256];
5903 sprintf(buf, "%.0f", (double)timestamp);
5904 cmd += fSql->QuoteString(buf);
5905 cmd += ", ";
5906 if (active)
5907 cmd += fSql->QuoteString("1");
5908 else
5909 cmd += fSql->QuoteString("0");
5910 cmd += ");";
5911
5912 int status = fSql->Exec(table_name, cmd.c_str());
5913 if (status != DB_SUCCESS)
5914 return HS_FILE_ERROR;
5915
5916 return HS_SUCCESS;
5917}
5918
5920// PostgreSQL history classes //
5922
5923#ifdef HAVE_PGSQL
5924
5925class PgsqlHistory: public SqlHistoryBase
5926{
5927public:
5928 Pgsql *fPgsql = NULL;
5929public:
5930 PgsqlHistory() { // ctor
5931 fPgsql = new Pgsql();
5932 fSql = fPgsql;
5933 }
5934
5936 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5937 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5938 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5939};
5940
5941static int ReadPgsqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5942{
5943 if (debug)
5944 printf("ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5945
5946 int status;
5947 std::string cmd;
5948
5949 if (table_name) {
5950 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5951 cmd += table_name;
5952 cmd += "';";
5953 } else {
5954 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5955 table_name = "_history_index";
5956 }
5957
5958 status = sql->Prepare(table_name, cmd.c_str());
5959
5960 if (status != DB_SUCCESS)
5961 return status;
5962
5963 bool found_must_have_table = false;
5964 int count = 0;
5965
5966 while (1) {
5967 status = sql->Step();
5968
5969 if (status != DB_SUCCESS)
5970 break;
5971
5972 const char* xevent_name = sql->GetText(0);
5973 const char* xtable_name = sql->GetText(1);
5974 time_t xevent_time = sql->GetTime(2);
5975
5976 if (debug == 999) {
5977 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5978 }
5979
5981 assert(must_have_event_name != NULL);
5983 found_must_have_table = true;
5984 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5985 } else {
5986 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5987 }
5988 }
5989
5990 HsSqlSchema* s = new HsSqlSchema;
5991 s->fSql = sql;
5994 s->fTimeTo = 0;
5996 sv->add(s);
5997 count++;
5998 }
5999
6000 status = sql->Finalize();
6001
6003 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
6004 if (debug == 999)
6005 return HS_FILE_ERROR;
6006 // NB: recursion is broken by setting debug to 999.
6008 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
6010 abort();
6011 return HS_FILE_ERROR;
6012 }
6013
6014 if (0) {
6015 // print accumulated schema
6016 printf("ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
6017 sv->print(false);
6018 }
6019
6020 return HS_SUCCESS;
6021}
6022
6023int PgsqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
6024{
6025 if (fDebug)
6026 printf("PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
6027
6028 // for all schema for table_name, prepopulate is with column names
6029
6030 std::vector<std::string> columns;
6031 fSql->ListColumns(table_name, &columns);
6032
6033 // first, populate column names
6034
6035 for (unsigned i=0; i<sv->size(); i++) {
6036 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6037
6038 if (s->fTableName != table_name)
6039 continue;
6040
6041 // schema should be empty at this point
6042 //assert(s->fVariables.size() == 0);
6043
6044 for (unsigned j=0; j<columns.size(); j+=2) {
6045 const char* cn = columns[j+0].c_str();
6046 const char* ct = columns[j+1].c_str();
6047
6048 if (strcmp(cn, "_t_time") == 0)
6049 continue;
6050 if (strcmp(cn, "_i_time") == 0)
6051 continue;
6052
6053 bool found = false;
6054
6055 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
6056 if (s->fColumnNames[k] == cn) {
6057 found = true;
6058 break;
6059 }
6060 }
6061
6062 if (!found) {
6064 se.tag_name = cn;
6065 se.tag_type = "";
6066 se.name = cn;
6067 se.type = 0;
6068 se.n_data = 1;
6069 se.n_bytes = 0;
6070 s->fVariables.push_back(se);
6071 s->fColumnNames.push_back(cn);
6072 s->fColumnTypes.push_back(ct);
6073 s->fColumnInactive.push_back(false);
6074 s->fOffsets.push_back(-1);
6075 }
6076 }
6077 }
6078
6079 // then read column name information
6080
6081 std::string cmd;
6082 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6083 cmd += fSql->QuoteString(event_name);
6084 cmd += ";";
6085
6086 int status = fSql->Prepare(table_name, cmd.c_str());
6087
6088 if (status != DB_SUCCESS) {
6089 return status;
6090 }
6091
6092 while (1) {
6093 status = fSql->Step();
6094
6095 if (status != DB_SUCCESS)
6096 break;
6097
6098 const char* col_name = fSql->GetText(0);
6099 const char* col_type = fSql->GetText(1);
6100 const char* tag_name = fSql->GetText(2);
6101 const char* tag_type = fSql->GetText(3);
6102 time_t schema_time = fSql->GetTime(4);
6103 const char* active = fSql->GetText(5);
6104 int iactive = atoi(active);
6105
6106 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
6107
6108 if (!col_name)
6109 continue;
6110 if (!tag_name)
6111 continue;
6112 if (strlen(col_name) < 1)
6113 continue;
6114
6115 // make sure a schema exists at this time point
6116 NewSqlSchema(sv, table_name, schema_time);
6117
6118 // add this information to all schema
6119 for (unsigned i=0; i<sv->size(); i++) {
6120 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6121 if (s->fTableName != table_name)
6122 continue;
6123 if (s->fTimeFrom < schema_time)
6124 continue;
6125
6126 int tid = rpc_name_tid(tag_type);
6127 int tid_size = rpc_tid_size(tid);
6128
6129 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
6130 if (col_name != s->fColumnNames[j])
6131 continue;
6132
6133 s->fVariables[j].tag_name = tag_name;
6134 s->fVariables[j].tag_type = tag_type;
6135 if (!iactive) {
6136 s->fVariables[j].name = "";
6137 s->fColumnInactive[j] = true;
6138 } else {
6139 s->fVariables[j].name = tag_name;
6140 s->fColumnInactive[j] = false;
6141 }
6142 s->fVariables[j].type = tid;
6143 s->fVariables[j].n_data = 1;
6144 s->fVariables[j].n_bytes = tid_size;
6145
6146 // doctor column names in case MySQL returns different type
6147 // from the type used to create the column, but the types
6148 // are actually the same. K.O.
6150 }
6151 }
6152 }
6153
6154 status = fSql->Finalize();
6155
6156 return HS_SUCCESS;
6157}
6158
6159int PgsqlHistory::read_table_and_event_names(HsSchemaVector *sv)
6160{
6161 int status;
6162
6163 if (fDebug)
6164 printf("PgsqlHistory::read_table_and_event_names!\n");
6165
6166 // loop over all tables
6167
6168 std::vector<std::string> tables;
6169 status = fSql->ListTables(&tables);
6170 if (status != DB_SUCCESS)
6171 return status;
6172
6173 for (unsigned i=0; i<tables.size(); i++) {
6174 const char* table_name = tables[i].c_str();
6175
6176 const char* s;
6177 s = strstr(table_name, "_history_index");
6178 if (s == table_name)
6179 continue;
6180
6181 if (1) {
6182 // seed schema with table names
6183 HsSqlSchema* s = new HsSqlSchema;
6184 s->fSql = fSql;
6185 s->fEventName = table_name;
6186 s->fTimeFrom = 0;
6187 s->fTimeTo = 0;
6188 s->fTableName = table_name;
6189 sv->add(s);
6190 }
6191 }
6192
6193 if (0) {
6194 // print accumulated schema
6195 printf("read_table_and_event_names:\n");
6196 sv->print(false);
6197 }
6198
6199 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6200
6201 return HS_SUCCESS;
6202}
6203
6204int PgsqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
6205{
6206 if (fDebug)
6207 printf("PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
6208
6209 int status;
6210 std::string table_name = MidasNameToSqlName(event_name);
6211
6212 // PostgreSQL table name length limit is 64 bytes
6213 if (table_name.length() > 40) {
6214 table_name.resize(40);
6215 table_name += "_T";
6216 }
6217
6218 time_t now = time(NULL);
6219
6220 int max_attempts = 10;
6221 for (int i=0; i<max_attempts; i++) {
6222 status = fSql->OpenTransaction(table_name.c_str());
6223 if (status != DB_SUCCESS) {
6224 return HS_FILE_ERROR;
6225 }
6226
6227 bool have_transaction = true;
6228
6229 std::string xtable_name = table_name;
6230
6231 if (i>0) {
6232 xtable_name += "_";
6234 if (i>1) {
6235 xtable_name += "_";
6236 char buf[256];
6237 sprintf(buf, "%d", i);
6238 xtable_name += buf;
6239 }
6240 }
6241
6242 if (fPgsql->fDownsample)
6244 else
6246
6247 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
6248
6249 if (status == DB_KEY_EXIST) {
6250 // already exists, try with different name!
6251 fSql->RollbackTransaction(table_name.c_str());
6252 continue;
6253 }
6254
6255 if (status != HS_SUCCESS) {
6256 fSql->RollbackTransaction(table_name.c_str());
6257 continue;
6258 }
6259
6260 fSql->Exec(table_name.c_str(), "SAVEPOINT t0");
6261
6262 for (int j=0; j<2; j++) {
6263 std::string cmd;
6264 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6265 cmd += fSql->QuoteString(event_name);
6266 cmd += ", ";
6267 cmd += fSql->QuoteString(xtable_name.c_str());
6268 cmd += ", ";
6269 char buf[256];
6270 sprintf(buf, "%.0f", (double)timestamp);
6271 cmd += buf;
6272 cmd += ", ";
6273 cmd += fSql->QuoteString("1");
6274 cmd += ");";
6275
6276 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6277 if (status == DB_SUCCESS)
6278 break;
6279
6280 // if INSERT failed _history_index does not exist then recover to savepoint t0
6281 // to prevent whole transition abort
6282 fSql->Exec(table_name.c_str(), "ROLLBACK TO SAVEPOINT t0");
6283
6284 status = CreateSqlTable(fSql, "_history_index", &have_transaction, true);
6285 status = CreateSqlColumn(fSql, "_history_index", "event_name", "text not null", &have_transaction, fDebug);
6286 status = CreateSqlColumn(fSql, "_history_index", "table_name", "text", &have_transaction, fDebug);
6287 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "text", &have_transaction, fDebug);
6288 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "text", &have_transaction, fDebug);
6289 status = CreateSqlColumn(fSql, "_history_index", "column_name", "text", &have_transaction, fDebug);
6290 status = CreateSqlColumn(fSql, "_history_index", "column_type", "text", &have_transaction, fDebug);
6291 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
6292 status = CreateSqlColumn(fSql, "_history_index", "active", "smallint", &have_transaction, fDebug);
6293
6294 status = fSql->CommitTransaction(table_name.c_str());
6295 }
6296
6297 if (status != DB_SUCCESS) {
6298 return HS_FILE_ERROR;
6299 }
6300
6301 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6302 }
6303
6304 cm_msg(MERROR, "PgsqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
6305
6306 return HS_FILE_ERROR;
6307}
6308
6309int PgsqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
6310{
6311 if (fDebug)
6312 printf("PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
6313
6314 std::string cmd;
6315 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6316 cmd += fSql->QuoteString(event_name);
6317 cmd += ", ";
6318 cmd += fSql->QuoteString(table_name);
6319 cmd += ", ";
6320 cmd += fSql->QuoteString(tag_name);
6321 cmd += ", ";
6322 cmd += fSql->QuoteString(tag_type);
6323 cmd += ", ";
6324 cmd += fSql->QuoteString(column_name);
6325 cmd += ", ";
6326 cmd += fSql->QuoteString(column_type);
6327 cmd += ", ";
6328 char buf[256];
6329 sprintf(buf, "%.0f", (double)timestamp);
6330 cmd += buf;
6331 cmd += ", ";
6332 if (active)
6333 cmd += fSql->QuoteString("1");
6334 else
6335 cmd += fSql->QuoteString("0");
6336 cmd += ");";
6337
6338 int status = fSql->Exec(table_name, cmd.c_str());
6339 if (status != DB_SUCCESS)
6340 return HS_FILE_ERROR;
6341
6342 return HS_SUCCESS;
6343}
6344
6345#endif // HAVE_PGSQL
6346
6348// File history class //
6350
6351const time_t kDay = 24*60*60;
6352const time_t kMonth = 30*kDay;
6353
6354const double KiB = 1024;
6355const double MiB = KiB*KiB;
6356//const double GiB = KiB*MiB;
6357
6359{
6360protected:
6361 std::string fPath;
6363 std::vector<std::string> fSortedFiles;
6364 std::vector<bool> fSortedRead;
6367
6368public:
6369 FileHistory() // ctor
6370 {
6372 fConfMaxFileSize = 100*MiB;
6373
6374 fPathLastMtime = 0;
6375 }
6376
6377 int hs_connect(const char* connect_string);
6378 int hs_disconnect();
6379 int hs_clear_cache();
6380 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
6381 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
6382
6383protected:
6384 int create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep);
6385 HsFileSchema* read_file_schema(const char* filename);
6386 int read_file_list(bool *pchanged);
6387 void clear_file_list();
6388};
6389
6391{
6392 if (fDebug)
6393 printf("hs_connect [%s]!\n", connect_string);
6394
6395 hs_disconnect();
6396
6399
6400 // add trailing '/'
6401 if (fPath.length() > 0) {
6402 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
6404 }
6405
6406 return HS_SUCCESS;
6407}
6408
6410{
6411 if (fDebug)
6412 printf("FileHistory::hs_clear_cache!\n");
6413 fPathLastMtime = 0;
6415}
6416
6418{
6419 if (fDebug)
6420 printf("FileHistory::hs_disconnect!\n");
6421
6424
6425 return HS_SUCCESS;
6426}
6427
6429{
6430 fPathLastMtime = 0;
6431 fSortedFiles.clear();
6432 fSortedRead.clear();
6433}
6434
6436{
6437 int status;
6438 double start_time = ss_time_sec();
6439
6440 if (pchanged)
6441 *pchanged = false;
6442
6443 struct stat stat_buf;
6444 status = stat(fPath.c_str(), &stat_buf);
6445 if (status != 0) {
6446 cm_msg(MERROR, "FileHistory::read_file_list", "Cannot stat(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
6447 return HS_FILE_ERROR;
6448 }
6449
6450 //printf("dir [%s], mtime: %d %d last: %d %d, mtime %s", fPath.c_str(), stat_buf.st_mtimespec.tv_sec, stat_buf.st_mtimespec.tv_nsec, last_mtimespec.tv_sec, last_mtimespec.tv_nsec, ctime(&stat_buf.st_mtimespec.tv_sec));
6451
6452 if (stat_buf.st_mtime == fPathLastMtime) {
6453 if (fDebug)
6454 printf("FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n", fPath.c_str(), int(stat_buf.st_mtime));
6455 return HS_SUCCESS;
6456 }
6457
6458 fPathLastMtime = stat_buf.st_mtime;
6459
6460 if (fDebug)
6461 printf("FileHistory::read_file_list: reading list of history files in \"%s\"\n", fPath.c_str());
6462
6463 std::vector<std::string> flist;
6464
6465 ss_file_find(fPath.c_str(), "mhf_*.dat", &flist);
6466
6467 double ls_time = ss_time_sec();
6468 double ls_elapsed = ls_time - start_time;
6469 if (ls_elapsed > 5.000) {
6470 cm_msg(MINFO, "FileHistory::read_file_list", "\"ls -l\" of \"%s\" took %.1f sec", fPath.c_str(), ls_elapsed);
6472 }
6473
6474 // note: reverse iterator is used to sort filenames by time, newest first
6475 std::sort(flist.rbegin(), flist.rend());
6476
6477#if 0
6478 {
6479 printf("file names sorted by time:\n");
6480 for (unsigned i=0; i<flist.size(); i++) {
6481 printf("%d: %s\n", i, flist[i].c_str());
6482 }
6483 }
6484#endif
6485
6486 std::vector<bool> fread;
6487 fread.resize(flist.size()); // fill with "false"
6488
6489 // loop over the old list of files,
6490 // for files we already read, loop over new file
6491 // list and mark the same file as read. K.O.
6492 for (size_t j=0; j<fSortedFiles.size(); j++) {
6493 if (fSortedRead[j]) {
6494 for (size_t i=0; i<flist.size(); i++) {
6495 if (flist[i] == fSortedFiles[j]) {
6496 fread[i] = true;
6497 break;
6498 }
6499 }
6500 }
6501 }
6502
6505
6506 if (pchanged)
6507 *pchanged = true;
6508
6509 return HS_SUCCESS;
6510}
6511
6512int FileHistory::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
6513{
6514 if (fDebug)
6515 printf("FileHistory::read_schema: event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
6516
6517 if (fSchema.size() == 0) {
6518 if (fDebug)
6519 printf("FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6521 }
6522
6524 DWORD old_timeout = 0;
6527
6528 bool changed = false;
6529
6531
6532 if (status != HS_SUCCESS) {
6534 return status;
6535 }
6536
6537 if (!changed) {
6538 if ((*sv).find_event(event_name, timestamp)) {
6539 if (fDebug)
6540 printf("FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name, TimeToString(timestamp).c_str());
6542 return HS_SUCCESS;
6543 }
6544 }
6545
6546 double start_time = ss_time_sec();
6547
6548 int count_read = 0;
6549
6550 for (unsigned i=0; i<fSortedFiles.size(); i++) {
6551 std::string file_name = fPath + fSortedFiles[i];
6552 if (fSortedRead[i])
6553 continue;
6554 //bool dupe = false;
6555 //for (unsigned ss=0; ss<sv->size(); ss++) {
6556 // HsFileSchema* ssp = (HsFileSchema*)(*sv)[ss];
6557 // if (file_name == ssp->fFileName) {
6558 // dupe = true;
6559 // break;
6560 // }
6561 //}
6562 //if (dupe)
6563 // continue;
6564 fSortedRead[i] = true;
6566 if (!s)
6567 continue;
6568 sv->add(s);
6569 count_read++;
6570
6571 if (event_name) {
6572 if (s->fEventName == event_name) {
6573 //printf("file %s event_name %s time %s, age %f\n", file_name.c_str(), s->fEventName.c_str(), TimeToString(s->fTimeFrom).c_str(), double(timestamp - s->fTimeFrom));
6574 if (s->fTimeFrom <= timestamp) {
6575 // this file is older than the time requested,
6576 // subsequent files will be even older,
6577 // we can stop reading here.
6578 break;
6579 }
6580 }
6581 }
6582 }
6583
6584 double end_time = ss_time_sec();
6585 double read_elapsed = end_time - start_time;
6586 if (read_elapsed > 5.000) {
6587 cm_msg(MINFO, "FileHistory::read_schema", "Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name, TimeToString(timestamp).c_str(), count_read, read_elapsed);
6589 }
6590
6592
6593 return HS_SUCCESS;
6594}
6595
6596HsSchema* FileHistory::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
6597{
6598 if (fDebug)
6599 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
6600
6601 int status;
6602
6603 HsFileSchema* s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6604
6605 if (!s) {
6606 //printf("hs_define_event: no schema for event %s\n", event_name);
6607 status = read_schema(&fWriterCurrentSchema, event_name, timestamp);
6608 if (status != HS_SUCCESS)
6609 return NULL;
6610 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6611 } else {
6612 //printf("hs_define_event: already have schema for event %s\n", s->fEventName.c_str());
6613 }
6614
6615 bool xdebug = false;
6616
6617 if (s) { // is existing schema the same as new schema?
6618 bool same = true;
6619
6620 if (same)
6621 if (s->fEventName != event_name) {
6622 if (xdebug)
6623 printf("AAA: [%s] [%s]!\n", s->fEventName.c_str(), event_name);
6624 same = false;
6625 }
6626
6627 if (same)
6628 if (s->fVariables.size() != (unsigned)ntags) {
6629 if (xdebug)
6630 printf("BBB: event [%s]: ntags: %d -> %d!\n", event_name, (int)s->fVariables.size(), ntags);
6631 same = false;
6632 }
6633
6634 if (same)
6635 for (unsigned i=0; i<s->fVariables.size(); i++) {
6636 if (s->fVariables[i].name != tags[i].name) {
6637 if (xdebug)
6638 printf("CCC: event [%s] index %d: name [%s] -> [%s]!\n", event_name, i, s->fVariables[i].name.c_str(), tags[i].name);
6639 same = false;
6640 }
6641 if (s->fVariables[i].type != (int)tags[i].type) {
6642 if (xdebug)
6643 printf("DDD: event [%s] index %d: type %d -> %d!\n", event_name, i, s->fVariables[i].type, tags[i].type);
6644 same = false;
6645 }
6646 if (s->fVariables[i].n_data != (int)tags[i].n_data) {
6647 if (xdebug)
6648 printf("EEE: event [%s] index %d: n_data %d -> %d!\n", event_name, i, s->fVariables[i].n_data, tags[i].n_data);
6649 same = false;
6650 }
6651 if (!same)
6652 break;
6653 }
6654
6655 if (!same) {
6656 if (xdebug) {
6657 printf("*** Schema for event %s has changed!\n", event_name);
6658
6659 printf("*** Old schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6660 s->print();
6661 printf("*** New tags:\n");
6662 PrintTags(ntags, tags);
6663 }
6664
6665 if (fDebug)
6666 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags);
6667
6668 s = NULL;
6669 }
6670 }
6671
6672 if (s) {
6673 // maybe this schema is too old - rotate files every so often
6674 time_t age = timestamp - s->fTimeFrom;
6675 //printf("*** age %s (%.1f months), timestamp %s, time_from %s, file %s\n", TimeToString(age).c_str(), (double)age/(double)kMonth, TimeToString(timestamp).c_str(), TimeToString(s->fTimeFrom).c_str(), s->fFileName.c_str());
6676 if (age > fConfMaxFileAge) {
6677 if (fDebug)
6678 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, (double)age/(double)kMonth);
6679
6680 // force creation of a new file
6681 s = NULL;
6682 }
6683 }
6684
6685 if (s) {
6686 // maybe this file is too big - rotate files to limit maximum size
6687 double size = ss_file_size(s->fFileName.c_str());
6688 //printf("*** size %.0f, file %s\n", size, s->fFileName.c_str());
6689 if (size > fConfMaxFileSize) {
6690 if (fDebug)
6691 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, size/MiB);
6692
6693 // force creation of a new file
6694 s = NULL;
6695 }
6696 }
6697
6698 if (!s) {
6699 std::string filename;
6700
6701 status = create_file(event_name, timestamp, ntags, tags, &filename);
6702 if (status != HS_SUCCESS)
6703 return NULL;
6704
6705 HsFileSchema* ss = read_file_schema(filename.c_str());
6706 if (!ss) {
6707 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6708 return NULL;
6709 }
6710
6712
6713 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6714
6715 if (!s) {
6716 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6717 return NULL;
6718 }
6719
6720 if (xdebug) {
6721 printf("*** New schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6722 s->print();
6723 }
6724 }
6725
6726 assert(s != NULL);
6727
6728#if 0
6729 {
6730 printf("schema for [%s] is %p\n", event_name, s);
6731 if (s)
6732 s->print();
6733 }
6734#endif
6735
6736 HsFileSchema* e = new HsFileSchema();
6737
6738 *e = *s; // make a copy of the schema
6739
6740 return e;
6741}
6742
6743int FileHistory::create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep)
6744{
6745 if (fDebug)
6746 printf("FileHistory::create_file: event [%s]\n", event_name);
6747
6748 // NB: file names are constructed in such a way
6749 // that when sorted lexicographically ("ls -1 | sort")
6750 // they *also* become sorted by time
6751
6752 struct tm tm;
6753 localtime_r(&timestamp, &tm); // somebody must call tzset() before this.
6754
6755 char buf[256];
6756 strftime(buf, sizeof(buf), "%Y%m%d", &tm);
6757
6758 std::string filename;
6759 filename += fPath;
6760 filename += "mhf_";
6761 filename += TimeToString(timestamp);
6762 filename += "_";
6763 filename += buf;
6764 filename += "_";
6765 filename += MidasNameToFileName(event_name);
6766
6767 std::string try_filename = filename + ".dat";
6768
6769 FILE *fp = NULL;
6770 for (int itry=0; itry<10; itry++) {
6771 if (itry > 0) {
6772 char s[256];
6773 sprintf(s, "_%d", rand());
6774 try_filename = filename + s + ".dat";
6775 }
6776
6777 fp = fopen(try_filename.c_str(), "r");
6778 if (fp != NULL) {
6779 // this file already exists, try with a different name
6780 fclose(fp);
6781 continue;
6782 }
6783
6784 fp = fopen(try_filename.c_str(), "w");
6785 if (fp == NULL) {
6786 // somehow cannot create this file, try again
6787 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6788 continue;
6789 }
6790
6791 // file opened
6792 break;
6793 }
6794
6795 if (fp == NULL) {
6796 // somehow cannot create any file, whine!
6797 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6798 return HS_FILE_ERROR;
6799 }
6800
6801 std::string ss;
6802
6803 ss += "version: 2.0\n";
6804 ss += "event_name: ";
6805 ss += event_name;
6806 ss += "\n";
6807 ss += "time: ";
6808 ss += TimeToString(timestamp);
6809 ss += "\n";
6810
6811 int recsize = 0;
6812
6813 ss += "tag: /DWORD 1 4 /timestamp\n";
6814 recsize += 4;
6815
6816 bool padded = false;
6817 int offset = 0;
6818
6819 bool xdebug = false; // (strcmp(event_name, "u_Beam") == 0);
6820
6821 for (int i=0; i<ntags; i++) {
6822 int tsize = rpc_tid_size(tags[i].type);
6823 int n_bytes = tags[i].n_data*tsize;
6824 int xalign = (offset % tsize);
6825
6826 if (xdebug)
6827 printf("tag %d, tsize %d, n_bytes %d, xalign %d\n", i, tsize, n_bytes, xalign);
6828
6829#if 0
6830 // looks like history data does not do alignement and padding
6831 if (xalign != 0) {
6832 padded = true;
6833 int pad_bytes = tsize - xalign;
6834 assert(pad_bytes > 0);
6835
6836 ss += "tag: ";
6837 ss += "XPAD";
6838 ss += " ";
6839 ss += SmallIntToString(1);
6840 ss += " ";
6842 ss += " ";
6843 ss += "pad_";
6844 ss += SmallIntToString(i);
6845 ss += "\n";
6846
6847 offset += pad_bytes;
6848 recsize += pad_bytes;
6849
6850 assert((offset % tsize) == 0);
6851 fprintf(stderr, "FIXME: need to debug padding!\n");
6852 //abort();
6853 }
6854#endif
6855
6856 ss += "tag: ";
6857 ss += rpc_tid_name(tags[i].type);
6858 ss += " ";
6859 ss += SmallIntToString(tags[i].n_data);
6860 ss += " ";
6861 ss += SmallIntToString(n_bytes);
6862 ss += " ";
6863 ss += tags[i].name;
6864 ss += "\n";
6865
6866 recsize += n_bytes;
6867 offset += n_bytes;
6868 }
6869
6870 ss += "record_size: ";
6872 ss += "\n";
6873
6874 // reserve space for "data_offset: ..."
6875 int sslength = ss.length() + 127;
6876
6877 int block = 1024;
6878 int nb = (sslength + block - 1)/block;
6879 int data_offset = block * nb;
6880
6881 ss += "data_offset: ";
6883 ss += "\n";
6884
6885 fprintf(fp, "%s", ss.c_str());
6886
6887 fclose(fp);
6888
6889 if (1 && padded) {
6890 printf("Schema in file %s has padding:\n", try_filename.c_str());
6891 printf("%s", ss.c_str());
6892 }
6893
6894 if (filenamep)
6896
6897 return HS_SUCCESS;
6898}
6899
6901{
6902 if (fDebug)
6903 printf("FileHistory::read_file_schema: file %s\n", filename);
6904
6905 FILE* fp = fopen(filename, "r");
6906 if (!fp) {
6907 cm_msg(MERROR, "FileHistory::read_file_schema", "Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
6908 return NULL;
6909 }
6910
6911 HsFileSchema* s = NULL;
6912
6913 // File format looks like this:
6914 // version: 2.0
6915 // event_name: u_Beam
6916 // time: 1023174012
6917 // tag: /DWORD 1 4 /timestamp
6918 // tag: FLOAT 1 4 B1
6919 // ...
6920 // tag: FLOAT 1 4 Ref Heater
6921 // record_size: 84
6922 // data_offset: 1024
6923
6924 int rd_recsize = 0;
6925 int offset = 0;
6926
6927 while (1) {
6928 char buf[1024];
6929 char* b = fgets(buf, sizeof(buf), fp);
6930
6931 //printf("read: %s\n", b);
6932
6933 if (!b) {
6934 break; // end of file
6935 }
6936
6937 char*bb;
6938
6939 bb = strchr(b, '\n');
6940 if (bb)
6941 *bb = 0;
6942
6943 bb = strchr(b, '\r');
6944 if (bb)
6945 *bb = 0;
6946
6947 bb = strstr(b, "version: 2.0");
6948 if (bb == b) {
6949 s = new HsFileSchema();
6950 assert(s);
6951
6952 s->fFileName = filename;
6953 continue;
6954 }
6955
6956 if (!s) {
6957 // malformed history file
6958 break;
6959 }
6960
6961 bb = strstr(b, "event_name: ");
6962 if (bb == b) {
6963 s->fEventName = bb + 12;
6964 continue;
6965 }
6966
6967 bb = strstr(b, "time: ");
6968 if (bb == b) {
6969 s->fTimeFrom = strtoul(bb + 6, NULL, 10);
6970 continue;
6971 }
6972
6973 // tag format is like this:
6974 //
6975 // tag: FLOAT 1 4 Ref Heater
6976 //
6977 // "FLOAT" is the MIDAS type, "/DWORD" is special tag for the timestamp
6978 // "1" is the number of array elements
6979 // "4" is the total tag size in bytes (n_data*tid_size)
6980 // "Ref Heater" is the tag name
6981
6982 bb = strstr(b, "tag: ");
6983 if (bb == b) {
6984 bb += 5; // now points to the tag MIDAS type
6985 const char* midas_type = bb;
6986 char* bbb = strchr(bb, ' ');
6987 if (bbb) {
6988 *bbb = 0;
6989 HsSchemaEntry t;
6990 if (midas_type[0] == '/') {
6991 t.type = 0;
6992 } else {
6994 if (t.type == 0) {
6995 cm_msg(MERROR, "FileHistory::read_file_schema", "Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
6996 if (s)
6997 delete s;
6998 s = NULL;
6999 break;
7000 }
7001 }
7002 bbb++;
7003 while (*bbb == ' ')
7004 bbb++;
7005 if (*bbb) {
7006 t.n_data = strtoul(bbb, &bbb, 10);
7007 while (*bbb == ' ')
7008 bbb++;
7009 if (*bbb) {
7010 t.n_bytes = strtoul(bbb, &bbb, 10);
7011 while (*bbb == ' ')
7012 bbb++;
7013 t.name = bbb;
7014 }
7015 }
7016
7017 if (midas_type[0] != '/') {
7018 s->fVariables.push_back(t);
7019 s->fOffsets.push_back(offset);
7020 offset += t.n_bytes;
7021 }
7022
7023 rd_recsize += t.n_bytes;
7024 }
7025 continue;
7026 }
7027
7028 bb = strstr(b, "record_size: ");
7029 if (bb == b) {
7030 s->fRecordSize = atoi(bb + 12);
7031 continue;
7032 }
7033
7034 bb = strstr(b, "data_offset: ");
7035 if (bb == b) {
7036 s->fDataOffset = atoi(bb + 12);
7037 // data offset is the last entry in the file
7038 break;
7039 }
7040 }
7041
7042 fclose(fp);
7043
7044 if (!s) {
7045 cm_msg(MERROR, "FileHistory::read_file_schema", "Malformed history schema in \'%s\', maybe it is not a history file", filename);
7046 return NULL;
7047 }
7048
7049 if (rd_recsize != s->fRecordSize) {
7050 cm_msg(MERROR, "FileHistory::read_file_schema", "Record size mismatch in history schema from \'%s\', file says %d while total of all tags is %d", filename, s->fRecordSize, rd_recsize);
7051 if (s)
7052 delete s;
7053 return NULL;
7054 }
7055
7056 if (!s) {
7057 cm_msg(MERROR, "FileHistory::read_file_schema", "Could not read history schema from \'%s\', maybe it is not a history file", filename);
7058 if (s)
7059 delete s;
7060 return NULL;
7061 }
7062
7063 if (fDebug > 1)
7064 s->print();
7065
7066 return s;
7067}
7068
7070// Factory constructors //
7072
7074{
7075#ifdef HAVE_SQLITE
7076 return new SqliteHistory();
7077#else
7078 cm_msg(MERROR, "MakeMidasHistorySqlite", "Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7079 return NULL;
7080#endif
7081}
7082
7084{
7085#ifdef HAVE_MYSQL
7086 return new MysqlHistory();
7087#else
7088 cm_msg(MERROR, "MakeMidasHistoryMysql", "Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7089 return NULL;
7090#endif
7091}
7092
7094{
7095#ifdef HAVE_PGSQL
7096 return new PgsqlHistory();
7097#else
7098 cm_msg(MERROR, "MakeMidasHistoryPgsql", "Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
7099 return NULL;
7100#endif
7101}
7102
7107
7108/* emacs
7109 * Local Variables:
7110 * tab-width: 8
7111 * c-basic-offset: 3
7112 * indent-tabs-mode: nil
7113 * End:
7114 */
#define FALSE
Definition cfortran.h:309
std::string fPath
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int create_file(const char *event_name, time_t timestamp, int ntags, const TAG tags[], std::string *filenamep)
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
void remove_inactive_columns()
int write_event(const time_t t, const char *data, const int data_size)
std::string fFileName
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int fCountWriteUndersize
virtual void print(bool print_tags=true) const
std::vector< int > fOffsets
virtual ~HsSchema()
virtual void remove_inactive_columns()=0
std::vector< HsSchemaEntry > fVariables
virtual int write_event(const time_t t, const char *data, const int data_size)=0
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
virtual int close()=0
int fCountWriteOversize
std::string fEventName
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
unsigned size() const
void print(bool print_tags=true) const
std::vector< HsSchema * > fData
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
void add(HsSchema *s)
HsSchema * operator[](int index) const
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
std::vector< std::string > fColumnNames
void print(bool print_tags=true) const
void remove_inactive_columns()
std::vector< std::string > fColumnTypes
std::vector< bool > fColumnInactive
void increment_transaction_count()
int match_event_var(const char *event_name, const char *var_name, const int var_index)
std::string fTableName
static std::map< SqlBase *, int > gfTransactionCount
int write_event(const time_t t, const char *data, const int data_size)
void reset_transaction_count()
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
Definition history.h:112
char name[NAME_LENGTH]
Definition history.h:111
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
HsSchemaVector fSchema
std::vector< HsSchema * > fEvents
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
HsSchemaVector fWriterCurrentSchema
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Finalize()=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual ~SqlBase()
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual int Step()=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3325
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3283
#define DB_KEY_EXIST
Definition midas.h:641
#define DB_FILE_ERROR
Definition midas.h:647
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_MORE_SUBKEYS
Definition midas.h:646
#define HS_UNDEFINED_VAR
Definition midas.h:733
#define HS_SUCCESS
Definition midas.h:727
#define HS_FILE_ERROR
Definition midas.h:728
#define HS_UNDEFINED_EVENT
Definition midas.h:732
unsigned int DWORD
Definition mcstd.h:51
#define TID_DOUBLE
Definition midas.h:343
#define TID_SBYTE
Definition midas.h:329
#define TID_BOOL
Definition midas.h:340
#define TID_SHORT
Definition midas.h:334
#define TID_WORD
Definition midas.h:332
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_CHAR
Definition midas.h:331
#define TID_INT
Definition midas.h:338
#define TID_FLOAT
Definition midas.h:341
#define TID_LAST
Definition midas.h:354
#define TID_DWORD
Definition midas.h:336
double ss_file_size(const char *path)
Definition system.cxx:6978
double ss_time_sec()
Definition system.cxx:3467
INT ss_file_find(const char *path, const char *pattern, char **plist)
Definition system.cxx:6719
INT cm_msg_flush_buffer()
Definition midas.cxx:865
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
const char * rpc_tid_name(INT id)
Definition midas.cxx:11772
int rpc_name_tid(const char *name)
Definition midas.cxx:11786
INT rpc_tid_size(INT id)
Definition midas.cxx:11765
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
static int ReadRecord(const char *file_name, int fd, int offset, int recsize, int irec, char *rec)
const time_t kMonth
static int FindTime(const char *file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int *i1p, time_t *t1p, int *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
const double KiB
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
const double MiB
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
const time_t kDay
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
INT index
Definition mana.cxx:271
DWORD last_time
Definition mana.cxx:3070
void * data
Definition mana.cxx:268
BOOL debug
debug printouts
Definition mana.cxx:254
INT type
Definition mana.cxx:269
char host_name[HOST_NAME_LENGTH]
Definition mana.cxx:242
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
static int offset
Definition mgd.cxx:1500
#define DIR_SEPARATOR
Definition midas.h:193
DWORD BOOL
Definition midas.h:105
#define DIR_SEPARATOR_STR
Definition midas.h:194
#define read(n, a, f)
#define write(n, a, f, d)
#define name(x)
Definition midas_macro.h:24
static FILE * fp
struct callback_addr callback
Definition mserver.cxx:22
MUTEX_T * tm
Definition odbedit.cxx:39
BOOL match(char *pat, char *str)
Definition odbedit.cxx:190
INT j
Definition odbhist.cxx:40
INT k
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
char file_name[256]
Definition odbhist.cxx:41
INT add
Definition odbhist.cxx:40
DWORD status
Definition odbhist.cxx:39
char var_name[256]
Definition odbhist.cxx:41
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
std::string tag_name
int type
std::string tag_type
int n_data
std::string name
int n_bytes
Definition midas.h:1232
DWORD type
Definition midas.h:1234
DWORD n_data
Definition midas.h:1235
char name[NAME_LENGTH]
Definition midas.h:1233
char c
Definition system.cxx:1310
@ DIR
Definition test_init.cxx:7
static double e(void)
Definition tinyexpr.c:136