MIDAS
Loading...
Searching...
No Matches
history_schema.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: history_schema.cxx
4 Created by: Konstantin Olchanski
5
6 Contents: Schema based MIDAS history. Available drivers:
7 FileHistory: storage of data in binary files (replacement for the traditional MIDAS history)
8 MysqlHistory: storage of data in MySQL database (replacement for the ODBC based SQL history)
9 PgsqlHistory: storage of data in PostgreSQL database
10 SqliteHistory: storage of data in SQLITE3 database (not suitable for production use)
11
12\********************************************************************/
13
14#undef NDEBUG // midas required assert() to be always enabled
15
16#include "midas.h"
17#include "msystem.h"
18#include "mstrlcpy.h"
19
20#include <math.h>
21
22#include <vector>
23#include <list>
24#include <string>
25#include <map>
26#include <algorithm>
27
28// make mysql/my_global.h happy - it redefines closesocket()
29#undef closesocket
30
31//
32// benchmarks
33//
34// /usr/bin/time ./linux/bin/mh2sql . /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
35// -rw-r--r-- 1 alpha users 161028048 Oct 19 2012 /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
36// flush 10000, sync=OFF -> 51.51user 1.51system 0:53.76elapsed 98%CPU
37// flush 1000000, sync=NORMAL -> 51.83user 2.09system 1:08.37elapsed 78%CPU (flush never activated)
38// flush 100000, sync=NORMAL -> 51.38user 1.94system 1:06.94elapsed 79%CPU
39// flush 10000, sync=NORMAL -> 51.37user 2.03system 1:31.63elapsed 58%CPU
40// flush 1000, sync=NORMAL -> 52.16user 2.70system 4:38.58elapsed 19%CPU
41
43// MIDAS includes //
45
46#include "midas.h"
47#include "history.h"
48
50// helper stuff //
52
53#define FREE(x) { if (x) free(x); (x) = NULL; }
54
55static char* skip_spaces(char* s)
56{
57 while (*s) {
58 if (!isspace(*s))
59 break;
60 s++;
61 }
62 return s;
63}
64
65static std::string TimeToString(time_t t)
66{
67 const char* sign = "";
68
69 if (t == 0)
70 return "0";
71
72 time_t tt = t;
73
74 if (t < 0) {
75 sign = "-";
76 tt = -t;
77 }
78
79 assert(tt > 0);
80
81 std::string v;
82 while (tt) {
83 char c = '0' + tt%10;
84 tt /= 10;
85 v = c + v;
86 }
87
88 v = sign + v;
89
90 //printf("time %.0f -> %s\n", (double)t, v.c_str());
91
92 return v;
93}
94
95static std::string SmallIntToString(int i)
96{
97 //int ii = i;
98
99 if (i == 0)
100 return "0";
101
102 assert(i > 0);
103
104 std::string v;
105 while (i) {
106 char c = '0' + i%10;
107 i /= 10;
108 v = c + v;
109 }
110
111 //printf("SmallIntToString: %d -> %s\n", ii, v.c_str());
112
113 return v;
114}
115
116static bool MatchEventName(const char* event_name, const char* var_event_name)
117{
118 // new-style event name: "equipment_name/variable_name:tag_name"
119 // old-style event name: "equipment_name:tag_name" ("variable_name" is missing)
121
122 //printf("looking for event_name [%s], try table [%s] event name [%s], new style [%d]\n", var_event_name, table_name, event_name, newStyleEventName);
123
124 if (strcasecmp(event_name, var_event_name) == 0) {
125 return true;
126 } else if (newStyleEventName) {
127 return false;
128 } else { // for old style names, need more parsing
129 bool match = false;
130
131 const char* s = event_name;
132 for (int j=0; s[j]; j++) {
133
134 if ((var_event_name[j]==0) && (s[j]=='/')) {
135 match = true;
136 break;
137 }
138
139 if ((var_event_name[j]==0) && (s[j]=='_')) {
140 match = true;
141 break;
142 }
143
144 if (var_event_name[j]==0) {
145 match = false;
146 break;
147 }
148
149 if (tolower(var_event_name[j]) != tolower(s[j])) { // does not work for UTF-8 Unicode
150 match = false;
151 break;
152 }
153 }
154
155 return match;
156 }
157}
158
159static bool MatchTagName(const char* tag_name, int n_data, const char* var_tag_name, const int var_tag_index)
160{
161 char alt_tag_name[1024]; // maybe this is an array without "Names"?
163
164 //printf(" looking for tag [%s] alt [%s], try column name [%s]\n", var_tag_name, alt_tag_name, tag_name);
165
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
168 return true;
169
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
171 return true;
172
173 return false;
174}
175
176static void PrintTags(int ntags, const TAG tags[])
177{
178 for (int i=0; i<ntags; i++)
179 printf("tag %d: %s %s[%d]\n", i, rpc_tid_name(tags[i].type), tags[i].name, tags[i].n_data);
180}
181
182// convert MIDAS event name to something acceptable as an SQL identifier - table name, column name, etc
183
184static std::string MidasNameToSqlName(const char* s)
185{
186 std::string out;
187
188 for (int i=0; s[i]!=0; i++) {
189 char c = s[i];
190 if (isalpha(c) || isdigit(c))
191 out += tolower(c); // does not work for UTF-8 Unicode
192 else
193 out += '_';
194 }
195
196 return out;
197}
198
199// convert MIDAS event name to something acceptable as a file name
200
201static std::string MidasNameToFileName(const char* s)
202{
203 std::string out;
204
205 for (int i=0; s[i]!=0; i++) {
206 char c = s[i];
207 if (isalpha(c) || isdigit(c))
208 out += tolower(c); // does not work for UTF-8 Unicode
209 else
210 out += '_';
211 }
212
213 return out;
214}
215
216// compare event names
217
218static int event_name_cmp(const std::string& e1, const char* e2)
219{
220 return strcasecmp(e1.c_str(), e2);
221}
222
223// compare variable names
224
225static int var_name_cmp(const std::string& v1, const char* v2)
226{
227 return strcasecmp(v1.c_str(), v2);
228}
229
231// SQL data types //
233
234#ifdef HAVE_SQLITE
235static const char *sql_type_sqlite[TID_LAST] = {
236 "xxxINVALIDxxxNULL", // TID_NULL
237 "INTEGER", // TID_UINT8
238 "INTEGER", // TID_INT8
239 "TEXT", // TID_CHAR
240 "INTEGER", // TID_UINT16
241 "INTEGER", // TID_INT16
242 "INTEGER", // TID_UINT32
243 "INTEGER", // TID_INT32
244 "INTEGER", // TID_BOOL
245 "REAL", // TID_FLOAT
246 "REAL", // TID_DOUBLE
247 "INTEGER", // TID_BITFIELD
248 "TEXT", // TID_STRING
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
251 "xxxINVALIDxxxKEY",
252 "xxxINVALIDxxxLINK"
253};
254#endif
255
256#ifdef HAVE_PGSQL
257static const char *sql_type_pgsql[TID_LAST] = {
258 "xxxINVALIDxxxNULL", // TID_NULL
259 "smallint", // TID_BYTE
260 "smallint", // TID_SBYTE
261 "char(1)", // TID_CHAR
262 "integer", // TID_WORD
263 "smallint", // TID_SHORT
264 "bigint", // TID_DWORD
265 "integer", // TID_INT
266 "smallint", // TID_BOOL
267 "real", // TID_FLOAT
268 "double precision", // TID_DOUBLE
269 "bigint", // TID_BITFIELD
270 "text", // TID_STRING
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
273 "xxxINVALIDxxxKEY",
274 "xxxINVALIDxxxLINK"
275};
276#endif
277
278#ifdef HAVE_MYSQL
279static const char *sql_type_mysql[TID_LAST] = {
280 "xxxINVALIDxxxNULL", // TID_NULL
281 "tinyint unsigned", // TID_BYTE
282 "tinyint", // TID_SBYTE
283 "char", // TID_CHAR
284 "smallint unsigned", // TID_WORD
285 "smallint", // TID_SHORT
286 "integer unsigned", // TID_DWORD
287 "integer", // TID_INT
288 "tinyint", // TID_BOOL
289 "float", // TID_FLOAT
290 "double", // TID_DOUBLE
291 "integer unsigned", // TID_BITFIELD
292 "VARCHAR", // TID_STRING
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
295 "xxxINVALIDxxxKEY",
296 "xxxINVALIDxxxLINK"
297};
298#endif
299
300void DoctorPgsqlColumnType(std::string* col_type, const char* index_type)
301{
302 if (*col_type == index_type)
303 return;
304
305 if (*col_type == "bigint" && strcmp(index_type, "int8")==0) {
307 return;
308 }
309
310 if (*col_type == "integer" && strcmp(index_type, "int4")==0) {
312 return;
313 }
314
315 if (*col_type == "smallint" && strcmp(index_type, "int2")==0) {
317 return;
318 }
319
320 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
322 abort();
323}
324
325void DoctorSqlColumnType(std::string* col_type, const char* index_type)
326{
327 if (*col_type == index_type)
328 return;
329
330 if (*col_type == "int(10) unsigned" && strcmp(index_type, "integer unsigned")==0) {
332 return;
333 }
334
335 if (*col_type == "int(11)" && strcmp(index_type, "integer")==0) {
337 return;
338 }
339
340 if (*col_type == "integer" && strcmp(index_type, "int(11)")==0) {
342 return;
343 }
344
345 // MYSQL 8.0.23
346
347 if (*col_type == "int" && strcmp(index_type, "integer")==0) {
349 return;
350 }
351
352 if (*col_type == "int unsigned" && strcmp(index_type, "integer unsigned")==0) {
354 return;
355 }
356
357 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
359 abort();
360}
361
362#if 0
363static int sql2midasType_mysql(const char* name)
364{
365 for (int tid=0; tid<TID_LAST; tid++)
366 if (strcasecmp(name, sql_type_mysql[tid])==0)
367 return tid;
368 // FIXME!
369 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
370 return 0;
371}
372#endif
373
374#if 0
375static int sql2midasType_sqlite(const char* name)
376{
377 if (strcmp(name, "INTEGER") == 0)
378 return TID_INT;
379 if (strcmp(name, "REAL") == 0)
380 return TID_DOUBLE;
381 if (strcmp(name, "TEXT") == 0)
382 return TID_STRING;
383 // FIXME!
384 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
385 return 0;
386}
387#endif
388
390// Schema base classes //
392
394 std::string tag_name; // tag name from MIDAS
395 std::string tag_type; // tag type from MIDAS
396 std::string name; // entry name, same as tag_name except when read from SQL history when it could be the SQL column name
397 int type = 0; // MIDAS data type TID_xxx
398 int n_data = 0; // MIDAS array size
399 int n_bytes = 0; // n_data * size of MIDAS data type (only used by HsFileSchema?)
400};
401
403{
404public:
405
406 // event schema definitions
407 std::string fEventName;
410 std::vector<HsSchemaEntry> fVariables;
411 std::vector<int> fOffsets;
412 int fNumBytes = 0;
413
414 // run time data used by hs_write_event()
419
420 // schema disabled by write error
421 bool fDisabled = true;
422
423public:
424
425 HsSchema() // ctor
426 {
427 // empty
428 }
429
430 virtual void remove_inactive_columns() = 0; // used by SQL schemas
431 virtual void print(bool print_tags = true) const;
432 virtual ~HsSchema(); // dtor
433 virtual int flush_buffers() = 0;
434 virtual int close() = 0;
435 virtual int write_event(const time_t t, const char* data, const int data_size) = 0;
436 virtual int match_event_var(const char* event_name, const char* var_name, const int var_index);
437 virtual int read_last_written(const time_t timestamp,
438 const int debug,
439 time_t* last_written) = 0;
440 virtual int read_data(const time_t start_time,
441 const time_t end_time,
442 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
443 const int debug,
444 std::vector<time_t>& last_time,
445 MidasHistoryBufferInterface* buffer[]) = 0;
446};
447
449{
450protected:
451 std::vector<HsSchema*> fData;
452
453public:
454 ~HsSchemaVector() { // dtor
455 clear();
456 }
457
459 return fData[index];
460 }
461
462 unsigned size() const {
463 return fData.size();
464 }
465
466 void add(HsSchema* s);
467
468 void clear() {
469 for (unsigned i=0; i<fData.size(); i++)
470 if (fData[i]) {
471 delete fData[i];
472 fData[i] = NULL;
473 }
474 fData.clear();
475 }
476
477 void print(bool print_tags = true) const {
478 for (unsigned i=0; i<fData.size(); i++)
480 }
481
482 HsSchema* find_event(const char* event_name, const time_t timestamp, int debug = 0);
483};
484
486// Base class functions //
488
490{
491 // only report if undersize/oversize happens more than once -
492 // the first occurence is already reported by hs_write_event()
493 if (fCountWriteUndersize > 1) {
494 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as few as %d bytes", fEventName.c_str(), fCountWriteUndersize, fNumBytes, fWriteMinSize);
495 }
496
497 if (fCountWriteOversize > 1) {
498 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as much as %d bytes", fEventName.c_str(), fCountWriteOversize, fNumBytes, fWriteMaxSize);
499 }
500};
501
503{
504 // schema list "data" is sorted by decreasing "fTimeFrom", newest schema first
505
506 //printf("add: %s..%s %s\n", TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), s->fEventName.c_str());
507
508 bool added = false;
509
510 for (auto it = fData.begin(); it != fData.end(); it++) {
511 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
512 if (s->fTimeFrom == (*it)->fTimeFrom) {
513 // duplicate schema, keep the last one added (for file schema it is the newer file)
514 s->fTimeTo = (*it)->fTimeTo;
515 delete (*it);
516 (*it) = s;
517 return;
518 }
519 }
520
521 if (s->fTimeFrom > (*it)->fTimeFrom) {
522 fData.insert(it, s);
523 added = true;
524 break;
525 }
526 }
527
528 if (!added) {
529 fData.push_back(s);
530 }
531
532 //time_t oldest_time_from = fData.back()->fTimeFrom;
533
534 time_t time_to = 0;
535
536 for (auto it = fData.begin(); it != fData.end(); it++) {
537 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
538 (*it)->fTimeTo = time_to;
539 time_to = (*it)->fTimeFrom;
540
541 //printf("vvv: %s..%s %s\n", TimeToString((*it)->fTimeFrom-oldest_time_from).c_str(), TimeToString((*it)->fTimeTo-oldest_time_from).c_str(), (*it)->fEventName.c_str());
542 }
543 }
544}
545
546HsSchema* HsSchemaVector::find_event(const char* event_name, time_t t, int debug)
547{
548 HsSchema* ss = NULL;
549
550 if (debug) {
551 printf("find_event: All schema for event %s: (total %d)\n", event_name, (int)fData.size());
552 int found = 0;
553 for (unsigned i=0; i<fData.size(); i++) {
554 HsSchema* s = fData[i];
555 printf("find_event: schema %d name [%s]\n", i, s->fEventName.c_str());
556 if (event_name)
557 if (event_name_cmp(s->fEventName, event_name)!=0)
558 continue;
559 s->print();
560 found++;
561 }
562 printf("find_event: Found %d schemas for event %s\n", found, event_name);
563
564 //if (found == 0)
565 // abort();
566 }
567
568 for (unsigned i=0; i<fData.size(); i++) {
569 HsSchema* s = fData[i];
570
571 // wrong event
572 if (event_name)
573 if (event_name_cmp(s->fEventName, event_name)!=0)
574 continue;
575
576 // schema is from after the time we are looking for
577 if (s->fTimeFrom > t)
578 continue;
579
580 if (!ss)
581 ss = s;
582
583 // remember the newest schema
584 if (s->fTimeFrom > ss->fTimeFrom)
585 ss = s;
586 }
587
588 // try to find
589 for (unsigned i=0; i<fData.size(); i++) {
590 HsSchema* s = fData[i];
591
592 // wrong event
593 if (event_name)
594 if (event_name_cmp(s->fEventName, event_name)!=0)
595 continue;
596
597 // schema is from after the time we are looking for
598 if (s->fTimeFrom > t)
599 continue;
600
601 if (!ss)
602 ss = s;
603
604 // remember the newest schema
605 if (s->fTimeFrom > ss->fTimeFrom)
606 ss = s;
607 }
608
609 if (debug) {
610 if (ss) {
611 printf("find_event: for time %s, returning:\n", TimeToString(t).c_str());
612 ss->print();
613 } else {
614 printf("find_event: for time %s, nothing found:\n", TimeToString(t).c_str());
615 }
616 }
617
618 return ss;
619}
620
622// Sql interface class //
624
625class SqlBase
626{
627public:
628 int fDebug = 0;
629 bool fIsConnected = false;
631
632 SqlBase() { // ctor
633 };
634
635 virtual ~SqlBase() { // dtor
636 // confirm that the destructor of the concrete class
637 // disconnected the database
638 assert(!fIsConnected);
639 fDebug = 0;
640 fIsConnected = false;
641 }
642
643 virtual int Connect(const char* path) = 0;
644 virtual int Disconnect() = 0;
645 virtual bool IsConnected() = 0;
646
647 virtual int ListTables(std::vector<std::string> *plist) = 0;
648 virtual int ListColumns(const char* table_name, std::vector<std::string> *plist) = 0;
649
650 // sql commands
651 virtual int Exec(const char* table_name, const char* sql) = 0;
652 virtual int ExecDisconnected(const char* table_name, const char* sql) = 0;
653
654 // queries
655 virtual int Prepare(const char* table_name, const char* sql) = 0;
656 virtual int Step() = 0;
657 virtual const char* GetText(int column) = 0;
658 virtual time_t GetTime(int column) = 0;
659 virtual double GetDouble(int column) = 0;
660 virtual int Finalize() = 0;
661
662 // transactions
663 virtual int OpenTransaction(const char* table_name) = 0;
664 virtual int CommitTransaction(const char* table_name) = 0;
665 virtual int RollbackTransaction(const char* table_name) = 0;
666
667 // data types
668 virtual const char* ColumnType(int midas_tid) = 0;
669 virtual bool TypesCompatible(int midas_tid, const char* sql_type) = 0;
670
671 // string quoting
672 virtual std::string QuoteString(const char* s) = 0; // quote text string
673 virtual std::string QuoteId(const char* s) = 0; // quote identifier, such as table or column name
674};
675
677// Schema concrete classes //
679
680class HsSqlSchema : public HsSchema
681{
682public:
683
685 std::string fTableName;
686 std::vector<std::string> fColumnNames;
687 std::vector<std::string> fColumnTypes;
688 std::vector<bool> fColumnInactive;
689
690public:
691
692 HsSqlSchema() // ctor
693 {
694 // empty
695 }
696
697 ~HsSqlSchema() // dtor
698 {
699 assert(get_transaction_count() == 0);
700 }
701
703 void print(bool print_tags = true) const;
707 int close_transaction();
709 int close() { return close_transaction(); }
710 int write_event(const time_t t, const char* data, const int data_size);
711 int match_event_var(const char* event_name, const char* var_name, const int var_index);
712 int read_last_written(const time_t timestamp,
713 const int debug,
714 time_t* last_written);
715 int read_data(const time_t start_time,
716 const time_t end_time,
717 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
718 const int debug,
719 std::vector<time_t>& last_time,
721
722private:
723 // Sqlite uses a transaction per table; MySQL uses a single transaction for all tables.
724 // But to support future "single transaction" DBs more easily (e.g. if user wants to
725 // log to both Postgres and MySQL in future), we keep track of the transaction count
726 // per SQL engine.
728 static std::map<SqlBase*, int> gfTransactionCount;
729};
730
731std::map<SqlBase*, int> HsSqlSchema::gfTransactionCount;
732
733class HsFileSchema : public HsSchema
734{
735public:
736
737 std::string fFileName;
738 int fRecordSize = 0;
739 int fDataOffset = 0;
740 int fLastSize = 0;
741 int fWriterFd = -1;
744
745public:
746
747 HsFileSchema() // ctor
748 {
749 // empty
750 }
751
753 {
754 close();
755 fRecordSize = 0;
756 fDataOffset = 0;
757 fLastSize = 0;
758 fWriterFd = -1;
759 if (fRecordBuffer) {
760 free(fRecordBuffer);
762 }
764 }
765
766 void remove_inactive_columns() { /* empty */ };
767 void print(bool print_tags = true) const;
768 int flush_buffers() { return HS_SUCCESS; };
769 int close();
770 int write_event(const time_t t, const char* data, const int data_size);
771 int read_last_written(const time_t timestamp,
772 const int debug,
773 time_t* last_written);
774 int read_data(const time_t start_time,
775 const time_t end_time,
776 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
777 const int debug,
778 std::vector<time_t>& last_time,
780};
781
783// Print functions //
785
787{
788 unsigned nv = this->fVariables.size();
789 printf("event [%s], time %s..%s, %d variables, %d bytes\n", this->fEventName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
790 if (print_tags)
791 for (unsigned j=0; j<nv; j++)
792 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
793};
794
796{
797 unsigned nv = this->fVariables.size();
798 printf("event [%s], sql_table [%s], time %s..%s, %d variables, %d bytes\n", this->fEventName.c_str(), this->fTableName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
799 if (print_tags) {
800 for (unsigned j=0; j<nv; j++) {
801 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes);
802 printf(", sql_column [%s], sql_type [%s], offset %d", this->fColumnNames[j].c_str(), this->fColumnTypes[j].c_str(), this->fOffsets[j]);
803 printf(", inactive %d", (int)this->fColumnInactive[j]);
804 printf("\n");
805 }
806 }
807}
808
810{
811 unsigned nv = this->fVariables.size();
812 printf("event [%s], file_name [%s], time %s..%s, %d variables, %d bytes, dat_offset %d, record_size %d\n", this->fEventName.c_str(), this->fFileName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes, fDataOffset, fRecordSize);
813 if (print_tags) {
814 for (unsigned j=0; j<nv; j++)
815 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
816 }
817}
818
820// File functions //
822
823#ifdef HAVE_MYSQL
824
826// MYSQL/MariaDB database access //
828
829//#warning !!!HAVE_MYSQL!!!
830
831//#include <my_global.h> // my_global.h removed MySQL 8.0, MariaDB 10.2. K.O.
832#include <mysql.h>
833
834class Mysql: public SqlBase
835{
836public:
837 std::string fConnectString;
838 MYSQL* fMysql = NULL;
839
840 // query results
843 int fNumFields = 0;
844
845 // disconnected operation
846 unsigned fMaxDisconnected = 0;
847 std::list<std::string> fDisconnectedBuffer;
850 int fDisconnectedLost = 0;
851
852 Mysql(); // ctor
853 ~Mysql(); // dtor
854
855 int Connect(const char* path);
856 int Disconnect();
857 bool IsConnected();
858
859 int ConnectTable(const char* table_name);
860
861 int ListTables(std::vector<std::string> *plist);
862 int ListColumns(const char* table_name, std::vector<std::string> *plist);
863
864 int Exec(const char* table_name, const char* sql);
865 int ExecDisconnected(const char* table_name, const char* sql);
866
867 int Prepare(const char* table_name, const char* sql);
868 int Step();
869 const char* GetText(int column);
870 time_t GetTime(int column);
871 double GetDouble(int column);
872 int Finalize();
873
874 int OpenTransaction(const char* table_name);
875 int CommitTransaction(const char* table_name);
876 int RollbackTransaction(const char* table_name);
877
878 const char* ColumnType(int midas_tid);
879 bool TypesCompatible(int midas_tid, const char* sql_type);
880
881 std::string QuoteId(const char* s);
882 std::string QuoteString(const char* s);
883};
884
885Mysql::Mysql() // ctor
886{
887 fMysql = NULL;
888 fResult = NULL;
889 fRow = NULL;
890 fNumFields = 0;
891 fMaxDisconnected = 1000;
892 fNextReconnect = 0;
895 fTransactionPerTable = false;
896}
897
898Mysql::~Mysql() // dtor
899{
900 Disconnect();
901 fMysql = NULL;
902 fResult = NULL;
903 fRow = NULL;
904 fNumFields = 0;
905 if (fDisconnectedBuffer.size() > 0) {
906 cm_msg(MINFO, "Mysql::~Mysql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
908 }
909}
910
911int Mysql::Connect(const char* connect_string)
912{
913 if (fIsConnected)
914 Disconnect();
915
916 fConnectString = connect_string;
917
918 if (fDebug) {
919 cm_msg(MINFO, "Mysql::Connect", "Connecting to Mysql database specified by \'%s\'", connect_string);
921 }
922
923 std::string host_name;
924 std::string user_name;
925 std::string user_password;
926 std::string db_name;
927 int tcp_port = 0;
928 std::string unix_socket;
929 std::string buffer;
930
931 FILE* fp = fopen(connect_string, "r");
932 if (!fp) {
933 cm_msg(MERROR, "Mysql::Connect", "Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
934 return DB_FILE_ERROR;
935 }
936
937 while (1) {
938 char buf[256];
939 char* s = fgets(buf, sizeof(buf), fp);
940 if (!s)
941 break; // EOF
942
943 char*ss;
944 // kill trailing \n and \r
945 ss = strchr(s, '\n');
946 if (ss) *ss = 0;
947 ss = strchr(s, '\r');
948 if (ss) *ss = 0;
949
950 //printf("line [%s]\n", s);
951
952 if (strncasecmp(s, "server=", 7)==0)
953 host_name = skip_spaces(s + 7);
954 if (strncasecmp(s, "port=", 5)==0)
955 tcp_port = atoi(skip_spaces(s + 5));
956 if (strncasecmp(s, "database=", 9)==0)
957 db_name = skip_spaces(s + 9);
958 if (strncasecmp(s, "socket=", 7)==0)
959 unix_socket = skip_spaces(s + 7);
960 if (strncasecmp(s, "user=", 5)==0)
961 user_name = skip_spaces(s + 5);
962 if (strncasecmp(s, "password=", 9)==0)
963 user_password = skip_spaces(s + 9);
964 if (strncasecmp(s, "buffer=", 7)==0)
965 buffer = skip_spaces(s + 7);
966 }
967
968 fclose(fp);
969
970 int buffer_int = atoi(buffer.c_str());
971
972 if (buffer_int > 0 && buffer_int < 1000000)
974
975 if (fDebug)
976 printf("Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
977
978 if (!fMysql) {
980 if (!fMysql) {
981 return DB_FILE_ERROR;
982 }
983 }
984
986
987 if (mysql_real_connect(fMysql, host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
988 cm_msg(MERROR, "Mysql::Connect", "mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", mysql_errno(fMysql), mysql_error(fMysql));
989 Disconnect();
990 return DB_FILE_ERROR;
991 }
992
993 int status;
994
995 // FIXME:
996 //my_bool reconnect = 0;
997 //mysql_options(&mysql, MYSQL_OPT_RECONNECT, &reconnect);
998
999 status = Exec("(notable)", "SET SESSION sql_mode='ANSI'");
1000 if (status != DB_SUCCESS) {
1001 cm_msg(MERROR, "Mysql::Connect", "Cannot set ANSI mode, nothing will work");
1002 Disconnect();
1003 return DB_FILE_ERROR;
1004 }
1005
1006 if (fDebug) {
1007 cm_msg(MINFO, "Mysql::Connect", "Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1009 }
1010
1011 fIsConnected = true;
1012
1013 int count = 0;
1014 while (fDisconnectedBuffer.size() > 0) {
1015 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1016 if (status != DB_SUCCESS) {
1017 return status;
1018 }
1019 fDisconnectedBuffer.pop_front();
1020 count++;
1021 }
1022
1023 if (count > 0) {
1024 cm_msg(MINFO, "Mysql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1026 }
1027
1028 assert(fDisconnectedBuffer.size() == 0);
1030
1031 return DB_SUCCESS;
1032}
1033
1034int Mysql::Disconnect()
1035{
1036 if (fRow) {
1037 // FIXME: mysql_free_result(fResult);
1038 }
1039
1040 if (fResult)
1042
1043 if (fMysql)
1045
1046 fMysql = NULL;
1047 fResult = NULL;
1048 fRow = NULL;
1049
1050 fIsConnected = false;
1051 return DB_SUCCESS;
1052}
1053
1054bool Mysql::IsConnected()
1055{
1056 return fIsConnected;
1057}
1058
1059int Mysql::OpenTransaction(const char* table_name)
1060{
1061 return Exec(table_name, "START TRANSACTION");
1062 return DB_SUCCESS;
1063}
1064
1065int Mysql::CommitTransaction(const char* table_name)
1066{
1067 Exec(table_name, "COMMIT");
1068 return DB_SUCCESS;
1069}
1070
1071int Mysql::RollbackTransaction(const char* table_name)
1072{
1073 Exec(table_name, "ROLLBACK");
1074 return DB_SUCCESS;
1075}
1076
1077int Mysql::ListTables(std::vector<std::string> *plist)
1078{
1079 if (!fIsConnected)
1080 return DB_FILE_ERROR;
1081
1082 if (fDebug)
1083 printf("Mysql::ListTables!\n");
1084
1085 int status;
1086
1088
1089 if (fResult == NULL) {
1090 cm_msg(MERROR, "Mysql::ListTables", "mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1091 return DB_FILE_ERROR;
1092 }
1093
1095
1096 while (1) {
1097 status = Step();
1098 if (status != DB_SUCCESS)
1099 break;
1100 std::string tn = GetText(0);
1101 plist->push_back(tn);
1102 };
1103
1104 status = Finalize();
1105
1106 return DB_SUCCESS;
1107}
1108
1109int Mysql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1110{
1111 if (!fIsConnected)
1112 return DB_FILE_ERROR;
1113
1114 if (fDebug)
1115 printf("Mysql::ListColumns for table \'%s\'\n", table_name);
1116
1117 int status;
1118
1119 std::string cmd;
1120 cmd += "SHOW COLUMNS FROM ";
1121 cmd += QuoteId(table_name);
1122 cmd += ";";
1123
1124 status = Prepare(table_name, cmd.c_str());
1125 if (status != DB_SUCCESS)
1126 return status;
1127
1129
1130 while (1) {
1131 status = Step();
1132 if (status != DB_SUCCESS)
1133 break;
1134 std::string cn = GetText(0);
1135 std::string ct = GetText(1);
1136 plist->push_back(cn);
1137 plist->push_back(ct);
1138 //printf("cn [%s]\n", cn.c_str());
1139 //for (int i=0; i<fNumFields; i++)
1140 //printf(" field[%d]: [%s]\n", i, GetText(i));
1141 };
1142
1143 status = Finalize();
1144
1145 return DB_SUCCESS;
1146}
1147
1148int Mysql::Exec(const char* table_name, const char* sql)
1149{
1150 if (fDebug)
1151 printf("Mysql::Exec(%s, %s)\n", table_name, sql);
1152
1153 // FIXME: match Sqlite::Exec() return values:
1154 // return values:
1155 // DB_SUCCESS
1156 // DB_FILE_ERROR: not connected
1157 // DB_KEY_EXIST: "table already exists"
1158
1159 if (!fMysql)
1160 return DB_FILE_ERROR;
1161
1162 assert(fMysql);
1163 assert(fResult == NULL); // there should be no unfinalized queries
1164 assert(fRow == NULL);
1165
1166 if (mysql_query(fMysql, sql)) {
1167 if (mysql_errno(fMysql) == 1050) { // "Table already exists"
1168 return DB_KEY_EXIST;
1169 }
1170 if (mysql_errno(fMysql) == 1146) { // "Table does not exist"
1171 return DB_FILE_ERROR;
1172 }
1173 cm_msg(MERROR, "Mysql::Exec", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1174 if (mysql_errno(fMysql) == 1060) // "Duplicate column name"
1175 return DB_KEY_EXIST;
1176 if (mysql_errno(fMysql) == 2006) { // "MySQL server has gone away"
1177 Disconnect();
1178 return ExecDisconnected(table_name, sql);
1179 }
1180 return DB_FILE_ERROR;
1181 }
1182
1183 return DB_SUCCESS;
1184}
1185
1186int Mysql::ExecDisconnected(const char* table_name, const char* sql)
1187{
1188 if (fDebug)
1189 printf("Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1190
1192 fDisconnectedBuffer.push_back(sql);
1193 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1194 cm_msg(MERROR, "Mysql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1195 }
1196 } else {
1198 }
1199
1200 time_t now = time(NULL);
1201
1202 if (fNextReconnect == 0 || now >= fNextReconnect) {
1203 int status = Connect(fConnectString.c_str());
1204 if (status == DB_SUCCESS) {
1205 fNextReconnect = 0;
1207 } else {
1208 if (fNextReconnectDelaySec == 0) {
1210 } else if (fNextReconnectDelaySec < 10*60) {
1212 }
1213 if (fDebug) {
1214 cm_msg(MINFO, "Mysql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1216 }
1218 }
1219 }
1220
1221 return DB_SUCCESS;
1222}
1223
1224int Mysql::Prepare(const char* table_name, const char* sql)
1225{
1226 if (fDebug)
1227 printf("Mysql::Prepare(%s, %s)\n", table_name, sql);
1228
1229 if (!fMysql)
1230 return DB_FILE_ERROR;
1231
1232 assert(fMysql);
1233 assert(fResult == NULL); // there should be no unfinalized queries
1234 assert(fRow == NULL);
1235
1236 // if (mysql_query(fMysql, sql)) {
1237 // cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1238 // return DB_FILE_ERROR;
1239 //}
1240
1241 // Check if the connection to MySQL timed out; fix from B. Smith
1242 int status = mysql_query(fMysql, sql);
1243 if (status) {
1244 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1245 // "MySQL server has gone away" or "Lost connection to MySQL server during query"
1246 status = Connect(fConnectString.c_str());
1247 if (status == DB_SUCCESS) {
1248 // Retry after reconnecting
1250 } else {
1251 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql, status);
1252 return DB_FILE_ERROR;
1253 }
1254 }
1255 if (status) {
1256 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1257 return DB_FILE_ERROR;
1258 }
1259 cm_msg(MINFO, "Mysql::Prepare", "Reconnected to MySQL after long inactivity.");
1260 }
1261
1263 //fResult = mysql_use_result(fMysql); // cannot use this because it blocks writing into table
1264
1265 if (!fResult) {
1266 cm_msg(MERROR, "Mysql::Prepare", "mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1267 return DB_FILE_ERROR;
1268 }
1269
1271
1272 //printf("num fields %d\n", fNumFields);
1273
1274 return DB_SUCCESS;
1275}
1276
1277int Mysql::Step()
1278{
1279 if (/* DISABLES CODE */ (0) && fDebug)
1280 printf("Mysql::Step()\n");
1281
1282 assert(fMysql);
1283 assert(fResult);
1284
1286
1287 if (fRow)
1288 return DB_SUCCESS;
1289
1290 if (mysql_errno(fMysql) == 0)
1291 return DB_NO_MORE_SUBKEYS;
1292
1293 cm_msg(MERROR, "Mysql::Step", "mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1294
1295 return DB_FILE_ERROR;
1296}
1297
1298const char* Mysql::GetText(int column)
1299{
1300 assert(fMysql);
1301 assert(fResult);
1302 assert(fRow);
1303 assert(fNumFields > 0);
1304 assert(column >= 0);
1305 assert(column < fNumFields);
1306 if (fRow[column] == NULL)
1307 return "";
1308 return fRow[column];
1309}
1310
1311double Mysql::GetDouble(int column)
1312{
1313 return atof(GetText(column));
1314}
1315
1316time_t Mysql::GetTime(int column)
1317{
1318 return strtoul(GetText(column), NULL, 0);
1319}
1320
1321int Mysql::Finalize()
1322{
1323 assert(fMysql);
1324 assert(fResult);
1325
1327 fResult = NULL;
1328 fRow = NULL;
1329 fNumFields = 0;
1330
1331 return DB_SUCCESS;
1332}
1333
1334const char* Mysql::ColumnType(int midas_tid)
1335{
1336 assert(midas_tid>=0);
1337 assert(midas_tid<TID_LAST);
1338 return sql_type_mysql[midas_tid];
1339}
1340
1341bool Mysql::TypesCompatible(int midas_tid, const char* sql_type)
1342{
1343 if (/* DISABLES CODE */ (0))
1344 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1345
1346 //if (sql2midasType_mysql(sql_type) == midas_tid)
1347 // return true;
1348
1349 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1350 return true;
1351
1352 // permit writing FLOAT into DOUBLE
1353 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double")==0)
1354 return true;
1355
1356 // T2K quirk!
1357 // permit writing BYTE into signed tinyint
1358 if (midas_tid==TID_BYTE && strcmp(sql_type, "tinyint")==0)
1359 return true;
1360
1361 // T2K quirk!
1362 // permit writing WORD into signed tinyint
1363 if (midas_tid==TID_WORD && strcmp(sql_type, "tinyint")==0)
1364 return true;
1365
1366 // mysql quirk!
1367 //if (midas_tid==TID_DWORD && strcmp(sql_type, "int(10) unsigned")==0)
1368 // return true;
1369
1370 if (/* DISABLES CODE */ (0))
1371 printf("type mismatch!\n");
1372
1373 return false;
1374}
1375
1376std::string Mysql::QuoteId(const char* s)
1377{
1378 std::string q;
1379 q += "`";
1380 q += s;
1381 q += "`";
1382 return q;
1383}
1384
1385std::string Mysql::QuoteString(const char* s)
1386{
1387 std::string q;
1388 q += "\'";
1389 q += s;
1390#if 0
1391 while (int c = *s++) {
1392 if (c == '\'') {
1393 q += "\\'";
1394 } if (c == '"') {
1395 q += "\\\"";
1396 } else if (isprint(c)) {
1397 q += c;
1398 } else {
1399 char buf[256];
1400 sprintf(buf, "\\\\x%02x", c&0xFF);
1401 q += buf;
1402 }
1403 }
1404#endif
1405 q += "\'";
1406 return q;
1407}
1408
1409#endif // HAVE_MYSQL
1410
1411#ifdef HAVE_PGSQL
1412
1414// PostgreSQL database access //
1416
1417//#warning !!!HAVE_PGSQL!!!
1418
1419#include <libpq-fe.h>
1420
1421class Pgsql: public SqlBase
1422{
1423public:
1424 std::string fConnectString;
1425 int fDownsample = 0;
1426 PGconn* fPgsql = NULL;
1427
1428 // query results
1430 int fNumFields = 0;
1431 int fRow = 0;
1432
1433 // disconnected operation
1434 unsigned fMaxDisconnected = 0;
1435 std::list<std::string> fDisconnectedBuffer;
1437 int fNextReconnectDelaySec = 0;
1438 int fDisconnectedLost = 0;
1439
1440 Pgsql(); // ctor
1441 ~Pgsql(); // dtor
1442
1443 int Connect(const char* path);
1444 int Disconnect();
1445 bool IsConnected();
1446
1447 int ConnectTable(const char* table_name);
1448
1449 int ListTables(std::vector<std::string> *plist);
1450 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1451
1452 int Exec(const char* table_name, const char* sql);
1453 int ExecDisconnected(const char* table_name, const char* sql);
1454
1455 int Prepare(const char* table_name, const char* sql);
1456 std::string BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints, const char* table_name, const char* column_name);
1457 int Step();
1458 const char* GetText(int column);
1459 time_t GetTime(int column);
1460 double GetDouble(int column);
1461 int Finalize();
1462
1463 int OpenTransaction(const char* table_name);
1464 int CommitTransaction(const char* table_name);
1465 int RollbackTransaction(const char* table_name);
1466
1467 const char* ColumnType(int midas_tid);
1468 bool TypesCompatible(int midas_tid, const char* sql_type);
1469
1470 std::string QuoteId(const char* s);
1471 std::string QuoteString(const char* s);
1472};
1473
1474Pgsql::Pgsql() // ctor
1475{
1476 fPgsql = NULL;
1477 fDownsample = 0;
1478 fResult = NULL;
1479 fRow = -1;
1480 fNumFields = 0;
1481 fMaxDisconnected = 1000;
1482 fNextReconnect = 0;
1485 fTransactionPerTable = false;
1486}
1487
1488Pgsql::~Pgsql() // dtor
1489{
1490 Disconnect();
1491 if(fResult)
1493 fRow = -1;
1494 fNumFields = 0;
1495 if (fDisconnectedBuffer.size() > 0) {
1496 cm_msg(MINFO, "Pgsql::~Pgsql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
1498 }
1499}
1500
1501int Pgsql::Connect(const char* connect_string)
1502{
1503 if (fIsConnected)
1504 Disconnect();
1505
1506 fConnectString = connect_string;
1507
1508 if (fDebug) {
1509 cm_msg(MINFO, "Pgsql::Connect", "Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1511 }
1512
1513 std::string host_name;
1514 std::string user_name;
1515 std::string user_password;
1516 std::string db_name;
1517 std::string tcp_port;
1518 std::string unix_socket;
1519 std::string buffer;
1520
1521 FILE* fp = fopen(connect_string, "r");
1522 if (!fp) {
1523 cm_msg(MERROR, "Pgsql::Connect", "Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1524 return DB_FILE_ERROR;
1525 }
1526
1527 while (1) {
1528 char buf[256];
1529 char* s = fgets(buf, sizeof(buf), fp);
1530 if (!s)
1531 break; // EOF
1532
1533 char*ss;
1534 // kill trailing \n and \r
1535 ss = strchr(s, '\n');
1536 if (ss) *ss = 0;
1537 ss = strchr(s, '\r');
1538 if (ss) *ss = 0;
1539
1540 //printf("line [%s]\n", s);
1541
1542 if (strncasecmp(s, "server=", 7)==0)
1543 host_name = skip_spaces(s + 7);
1544 if (strncasecmp(s, "port=", 5)==0)
1545 tcp_port = skip_spaces(s + 5);
1546 if (strncasecmp(s, "database=", 9)==0)
1547 db_name = skip_spaces(s + 9);
1548 if (strncasecmp(s, "socket=", 7)==0)
1549 unix_socket = skip_spaces(s + 7);
1550 if (strncasecmp(s, "user=", 5)==0)
1551 user_name = skip_spaces(s + 5);
1552 if (strncasecmp(s, "password=", 9)==0)
1553 user_password = skip_spaces(s + 9);
1554 if (strncasecmp(s, "buffer=", 7)==0)
1555 buffer = skip_spaces(s + 7);
1556 }
1557
1558 fclose(fp);
1559
1560 int buffer_int = atoi(buffer.c_str());
1561
1562 if (buffer_int > 0 && buffer_int < 1000000)
1564
1565 if (fDebug)
1566 printf("Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1567
1568 fPgsql = PQsetdbLogin(host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1569 if (PQstatus(fPgsql) != CONNECTION_OK) {
1570 std::string msg(PQerrorMessage(fPgsql));
1571 msg.erase(std::remove(msg.begin(), msg.end(), '\n'), msg.end());
1572 cm_msg(MERROR, "Pgsql::Connect", "PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", msg.c_str());
1573 Disconnect();
1574 return DB_FILE_ERROR;
1575 }
1576
1577 int status;
1578
1579 if (fDebug) {
1580 cm_msg(MINFO, "Pgsql::Connect", "Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1582 }
1583
1584 fIsConnected = true;
1585
1586 int count = 0;
1587 while (fDisconnectedBuffer.size() > 0) {
1588 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1589 if (status != DB_SUCCESS) {
1590 return status;
1591 }
1592 fDisconnectedBuffer.pop_front();
1593 count++;
1594 }
1595
1596 if (count > 0) {
1597 cm_msg(MINFO, "Pgsql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1599 }
1600
1601 assert(fDisconnectedBuffer.size() == 0);
1603
1604 if (fDownsample) {
1605 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb';");
1606
1607 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1608 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB extension not installed");
1609 return DB_FILE_ERROR;
1610 }
1611 Finalize();
1612
1613 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb_toolkit';");
1614
1615 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1616 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB_toolkit extension not installed");
1617 return DB_FILE_ERROR;
1618 }
1619 Finalize();
1620
1621 cm_msg(MINFO, "Pgsql::Connect", "TimescaleDB extensions found - downsampling enabled");
1622 }
1623
1624 return DB_SUCCESS;
1625}
1626
1627int Pgsql::Disconnect()
1628{
1629 if (fPgsql)
1631
1632 fPgsql = NULL;
1633 fRow = -1;
1634
1635 fIsConnected = false;
1636 return DB_SUCCESS;
1637}
1638
1639bool Pgsql::IsConnected()
1640{
1641 return fIsConnected;
1642}
1643
1644int Pgsql::OpenTransaction(const char* table_name)
1645{
1646 return Exec(table_name, "BEGIN TRANSACTION;");
1647}
1648
1649int Pgsql::CommitTransaction(const char* table_name)
1650{
1651 return Exec(table_name, "COMMIT;");
1652}
1653
1654int Pgsql::RollbackTransaction(const char* table_name)
1655{
1656 return Exec(table_name, "ROLLBACK;");
1657}
1658
1659int Pgsql::ListTables(std::vector<std::string> *plist)
1660{
1661 if (!fIsConnected)
1662 return DB_FILE_ERROR;
1663
1664 if (fDebug)
1665 printf("Pgsql::ListTables!\n");
1666
1667 int status = Prepare("pg_tables", "select tablename from pg_tables where schemaname = 'public';");
1668
1669 if (status != DB_SUCCESS) {
1670 cm_msg(MERROR, "Pgsql::ListTables", "error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1671 return DB_FILE_ERROR;
1672 }
1673
1674 while (1) {
1675 if (Step() != DB_SUCCESS)
1676 break;
1677 std::string tn = GetText(0);
1678 plist->push_back(tn);
1679 };
1680
1681 Finalize();
1682
1683 return DB_SUCCESS;
1684}
1685
1686int Pgsql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1687{
1688 if (!fIsConnected)
1689 return DB_FILE_ERROR;
1690
1691 if (fDebug)
1692 printf("Pgsql::ListColumns for table \'%s\'\n", table_name);
1693
1694 std::string cmd;
1695 cmd += "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1696 cmd += QuoteString(table_name);
1697 cmd += ";";
1698
1699 int status = Prepare(table_name, cmd.c_str());
1700 if (status != DB_SUCCESS)
1701 return status;
1702
1704
1705 while (1) {
1706 if (Step() != DB_SUCCESS)
1707 break;
1708 std::string cn = GetText(0);
1709 std::string ct = GetText(1);
1710 plist->push_back(cn);
1711 plist->push_back(ct);
1712 };
1713
1714 Finalize();
1715
1716 return DB_SUCCESS;
1717}
1718
1719int Pgsql::Exec(const char* table_name, const char* sql)
1720{
1721 if (fDebug)
1722 printf("Pgsql::Exec(%s, %s)\n", table_name, sql);
1723
1724 if (!fPgsql)
1725 return DB_FILE_ERROR;
1726
1727 assert(fPgsql);
1728 assert(fRow == -1);
1729
1732 if(err != PGRES_TUPLES_OK) {
1733 if(err == PGRES_FATAL_ERROR) {
1734 // handle fatal error
1735 if(strstr(PQresultErrorMessage(fResult), "already exists"))
1736 return DB_KEY_EXIST;
1737 else return DB_FILE_ERROR;
1738 }
1739
1741 Disconnect();
1742 return ExecDisconnected(table_name, sql);
1743 }
1744 }
1745
1746 return DB_SUCCESS;
1747}
1748
1749int Pgsql::ExecDisconnected(const char* table_name, const char* sql)
1750{
1751 if (fDebug)
1752 printf("Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1753
1755 fDisconnectedBuffer.push_back(sql);
1756 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1757 cm_msg(MERROR, "Pgsql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1758 }
1759 } else {
1761 }
1762
1763 time_t now = time(NULL);
1764
1765 if (fNextReconnect == 0 || now >= fNextReconnect) {
1766 int status = Connect(fConnectString.c_str());
1767 if (status == DB_SUCCESS) {
1768 fNextReconnect = 0;
1770 } else {
1771 if (fNextReconnectDelaySec == 0) {
1773 } else if (fNextReconnectDelaySec < 10*60) {
1775 }
1776 if (fDebug) {
1777 cm_msg(MINFO, "Pgsql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1779 }
1781 }
1782 }
1783
1784 return DB_SUCCESS;
1785}
1786
1787int Pgsql::Prepare(const char* table_name, const char* sql)
1788{
1789 if (fDebug)
1790 printf("Pgsql::Prepare(%s, %s)\n", table_name, sql);
1791
1792 if (!fPgsql)
1793 return DB_FILE_ERROR;
1794
1795 assert(fPgsql);
1796 //assert(fResult==NULL);
1797 assert(fRow == -1);
1798
1800 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1801 // lost connection to server
1802 int status = Connect(fConnectString.c_str());
1803 if (status == DB_SUCCESS) {
1804 // Retry after reconnecting
1806 } else {
1807 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql, status);
1808 return DB_FILE_ERROR;
1809 }
1810 if (status) {
1811 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1812 return DB_FILE_ERROR;
1813 }
1814 cm_msg(MINFO, "Pgsql::Prepare", "Reconnected to PostgreSQL after long inactivity.");
1815 }
1816
1818
1819 return DB_SUCCESS;
1820}
1821
1822std::string Pgsql::BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints,
1823 const char* table_name, const char* column_name)
1824{
1825 std::string cmd;
1826 cmd += "SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1827
1828 cmd += " FROM unnest(( SELECT lttb";
1829 cmd += "(_t_time, ";
1830 cmd += column_name;
1831 cmd += ", ";
1832 cmd += std::to_string(npoints);
1833 cmd += ") ";
1834 cmd += "FROM ";
1835 cmd += QuoteId(table_name);
1836 cmd += " WHERE _t_time BETWEEN ";
1837 cmd += "to_timestamp(";
1838 cmd += TimeToString(start_time);
1839 cmd += ") AND to_timestamp(";
1840 cmd += TimeToString(end_time);
1841 cmd += ") )) ORDER BY time;";
1842
1843 return cmd;
1844}
1845
1846int Pgsql::Step()
1847{
1848 assert(fPgsql);
1849 assert(fResult);
1850
1851 fRow++;
1852
1853 if (fRow == PQntuples(fResult))
1854 return DB_NO_MORE_SUBKEYS;
1855
1856 return DB_SUCCESS;
1857}
1858
1859const char* Pgsql::GetText(int column)
1860{
1861 assert(fPgsql);
1862 assert(fResult);
1863 assert(fNumFields > 0);
1864 assert(column >= 0);
1865 assert(column < fNumFields);
1866
1867 return PQgetvalue(fResult, fRow, column);
1868}
1869
1870double Pgsql::GetDouble(int column)
1871{
1872 return atof(GetText(column));
1873}
1874
1875time_t Pgsql::GetTime(int column)
1876{
1877 return strtoul(GetText(column), NULL, 0);
1878}
1879
1880int Pgsql::Finalize()
1881{
1882 assert(fPgsql);
1883 assert(fResult);
1884
1885 fRow = -1;
1886 fNumFields = 0;
1887
1888 return DB_SUCCESS;
1889}
1890
1891const char* Pgsql::ColumnType(int midas_tid)
1892{
1893 assert(midas_tid>=0);
1894 assert(midas_tid<TID_LAST);
1895 return sql_type_pgsql[midas_tid];
1896}
1897
1898bool Pgsql::TypesCompatible(int midas_tid, const char* sql_type)
1899{
1900 if (/* DISABLES CODE */ (0))
1901 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1902
1903 //if (sql2midasType_mysql(sql_type) == midas_tid)
1904 // return true;
1905
1906 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1907 return true;
1908
1909 // permit writing FLOAT into DOUBLE
1910 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double precision")==0)
1911 return true;
1912
1913 // T2K quirk!
1914 // permit writing BYTE into signed tinyint
1915 if (midas_tid==TID_BYTE && strcmp(sql_type, "integer")==0)
1916 return true;
1917
1918 // T2K quirk!
1919 // permit writing WORD into signed tinyint
1920 if (midas_tid==TID_WORD && strcmp(sql_type, "integer")==0)
1921 return true;
1922
1923 if (/* DISABLES CODE */ (0))
1924 printf("type mismatch!\n");
1925
1926 return false;
1927}
1928
1929std::string Pgsql::QuoteId(const char* s)
1930{
1931 std::string q;
1932 q += '"';
1933 q += s;
1934 q += '"';
1935 return q;
1936}
1937
1938std::string Pgsql::QuoteString(const char* s)
1939{
1940 std::string q;
1941 q += '\'';
1942 q += s;
1943 q += '\'';
1944 return q;
1945}
1946
1947#endif // HAVE_PGSQL
1948
1949#ifdef HAVE_SQLITE
1950
1952// SQLITE database access //
1954
1955#include <sqlite3.h>
1956
1957typedef std::map<std::string, sqlite3*> DbMap;
1958
1959class Sqlite: public SqlBase
1960{
1961public:
1962 std::string fPath;
1963
1964 DbMap fMap;
1965
1966 // temporary storage of query data
1969
1970 Sqlite(); // ctor
1971 ~Sqlite(); // dtor
1972
1973 int Connect(const char* path);
1974 int Disconnect();
1975 bool IsConnected();
1976
1977 int ConnectTable(const char* table_name);
1978 sqlite3* GetTable(const char* table_name);
1979
1980 int ListTables(std::vector<std::string> *plist);
1981 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1982
1983 int Exec(const char* table_name, const char* sql);
1984 int ExecDisconnected(const char* table_name, const char* sql);
1985
1986 int Prepare(const char* table_name, const char* sql);
1987 int Step();
1988 const char* GetText(int column);
1989 time_t GetTime(int column);
1990 double GetDouble(int column);
1991 int Finalize();
1992
1993 int OpenTransaction(const char* table_name);
1994 int CommitTransaction(const char* table_name);
1995 int RollbackTransaction(const char* table_name);
1996
1997 const char* ColumnType(int midas_tid);
1998 bool TypesCompatible(int midas_tid, const char* sql_type);
1999
2000 std::string QuoteId(const char* s);
2001 std::string QuoteString(const char* s);
2002};
2003
2004std::string Sqlite::QuoteId(const char* s)
2005{
2006 std::string q;
2007 q += "\"";
2008 q += s;
2009 q += "\"";
2010 return q;
2011}
2012
2013std::string Sqlite::QuoteString(const char* s)
2014{
2015 std::string q;
2016 q += "\'";
2017 q += s;
2018 q += "\'";
2019 return q;
2020}
2021
2022const char* Sqlite::ColumnType(int midas_tid)
2023{
2024 assert(midas_tid>=0);
2025 assert(midas_tid<TID_LAST);
2026 return sql_type_sqlite[midas_tid];
2027}
2028
2029bool Sqlite::TypesCompatible(int midas_tid, const char* sql_type)
2030{
2031 if (0)
2032 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
2033
2034 //if (sql2midasType_sqlite(sql_type) == midas_tid)
2035 // return true;
2036
2037 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
2038 return true;
2039
2040 // permit writing FLOAT into DOUBLE
2041 if (midas_tid==TID_FLOAT && strcasecmp(sql_type, "double")==0)
2042 return true;
2043
2044 return false;
2045}
2046
2047const char* Sqlite::GetText(int column)
2048{
2049 return (const char*)sqlite3_column_text(fTempStmt, column);
2050}
2051
2052time_t Sqlite::GetTime(int column)
2053{
2055}
2056
2057double Sqlite::GetDouble(int column)
2058{
2060}
2061
2062Sqlite::Sqlite() // ctor
2063{
2064 fIsConnected = false;
2065 fTempDB = NULL;
2066 fTempStmt = NULL;
2067 fDebug = 0;
2068}
2069
2070Sqlite::~Sqlite() // dtor
2071{
2072 Disconnect();
2073}
2074
2075const char* xsqlite3_errstr(sqlite3* db, int errcode)
2076{
2077 //return sqlite3_errstr(errcode);
2078 return sqlite3_errmsg(db);
2079}
2080
2081int Sqlite::ConnectTable(const char* table_name)
2082{
2083 std::string fname = fPath + "mh_" + table_name + ".sqlite3";
2084
2085 sqlite3* db = NULL;
2086
2087 int status = sqlite3_open(fname.c_str(), &db);
2088
2089 if (status != SQLITE_OK) {
2090 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(), status, xsqlite3_errstr(db, status));
2092 db = NULL;
2093 return DB_FILE_ERROR;
2094 }
2095
2096#if SQLITE_VERSION_NUMBER >= 3006020
2098 if (status != SQLITE_OK) {
2099 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2100 }
2101#else
2102#warning Missing sqlite3_extended_result_codes()!
2103#endif
2104
2105 fMap[table_name] = db;
2106
2107 Exec(table_name, "PRAGMA journal_mode=persist;");
2108 Exec(table_name, "PRAGMA synchronous=normal;");
2109 //Exec(table_name, "PRAGMA synchronous=off;");
2110 Exec(table_name, "PRAGMA journal_size_limit=-1;");
2111
2112 if (0) {
2113 Exec(table_name, "PRAGMA legacy_file_format;");
2114 Exec(table_name, "PRAGMA synchronous;");
2115 Exec(table_name, "PRAGMA journal_mode;");
2116 Exec(table_name, "PRAGMA journal_size_limit;");
2117 }
2118
2119#ifdef SQLITE_LIMIT_COLUMN
2120 if (0) {
2122 printf("Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2123 }
2124#endif
2125
2126 if (fDebug)
2127 cm_msg(MINFO, "Sqlite::Connect", "Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2128
2129 return DB_SUCCESS;
2130}
2131
2132sqlite3* Sqlite::GetTable(const char* table_name)
2133{
2134 sqlite3* db = fMap[table_name];
2135
2136 if (db)
2137 return db;
2138
2139 int status = ConnectTable(table_name);
2140 if (status != DB_SUCCESS)
2141 return NULL;
2142
2143 return fMap[table_name];
2144}
2145
2146int Sqlite::Connect(const char* path)
2147{
2148 if (fIsConnected)
2149 Disconnect();
2150
2151 fPath = path;
2152
2153 // add trailing '/'
2154 if (fPath.length() > 0) {
2155 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
2156 fPath += DIR_SEPARATOR_STR;
2157 }
2158
2159 if (fDebug)
2160 cm_msg(MINFO, "Sqlite::Connect", "Connected to Sqlite database in \'%s\'", fPath.c_str());
2161
2162 fIsConnected = true;
2163
2164 return DB_SUCCESS;
2165}
2166
2167int Sqlite::Disconnect()
2168{
2169 if (!fIsConnected)
2170 return DB_SUCCESS;
2171
2172 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2173 const char* table_name = iter->first.c_str();
2174 sqlite3* db = iter->second;
2175 int status = sqlite3_close(db);
2176 if (status != SQLITE_OK) {
2177 cm_msg(MERROR, "Sqlite::Disconnect", "sqlite3_close(%s) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2178 }
2179 }
2180
2181 fMap.clear();
2182
2183 fIsConnected = false;
2184
2185 return DB_SUCCESS;
2186}
2187
2188bool Sqlite::IsConnected()
2189{
2190 return fIsConnected;
2191}
2192
2193int Sqlite::OpenTransaction(const char* table_name)
2194{
2195 int status = Exec(table_name, "BEGIN TRANSACTION");
2196 return status;
2197}
2198
2199int Sqlite::CommitTransaction(const char* table_name)
2200{
2201 int status = Exec(table_name, "COMMIT TRANSACTION");
2202 return status;
2203}
2204
2205int Sqlite::RollbackTransaction(const char* table_name)
2206{
2207 int status = Exec(table_name, "ROLLBACK TRANSACTION");
2208 return status;
2209}
2210
2211int Sqlite::Prepare(const char* table_name, const char* sql)
2212{
2213 sqlite3* db = GetTable(table_name);
2214 if (!db)
2215 return DB_FILE_ERROR;
2216
2217 if (fDebug)
2218 printf("Sqlite::Prepare(%s, %s)\n", table_name, sql);
2219
2220 assert(fTempDB==NULL);
2221 fTempDB = db;
2222
2223#if SQLITE_VERSION_NUMBER >= 3006020
2225#else
2226#warning Missing sqlite3_prepare_v2()!
2228#endif
2229
2230 if (status == SQLITE_OK)
2231 return DB_SUCCESS;
2232
2233 std::string sqlstring = sql;
2234 cm_msg(MERROR, "Sqlite::Prepare", "Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, xsqlite3_errstr(db, status));
2235
2236 fTempDB = NULL;
2237
2238 return DB_FILE_ERROR;
2239}
2240
2241int Sqlite::Step()
2242{
2243 if (0 && fDebug)
2244 printf("Sqlite::Step()\n");
2245
2246 assert(fTempDB);
2247 assert(fTempStmt);
2248
2250
2251 if (status == SQLITE_DONE)
2252 return DB_NO_MORE_SUBKEYS;
2253
2254 if (status == SQLITE_ROW)
2255 return DB_SUCCESS;
2256
2257 cm_msg(MERROR, "Sqlite::Step", "sqlite3_step() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2258
2259 return DB_FILE_ERROR;
2260}
2261
2262int Sqlite::Finalize()
2263{
2264 if (0 && fDebug)
2265 printf("Sqlite::Finalize()\n");
2266
2267 assert(fTempDB);
2268 assert(fTempStmt);
2269
2271
2272 if (status != SQLITE_OK) {
2273 cm_msg(MERROR, "Sqlite::Finalize", "sqlite3_finalize() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2274
2275 fTempDB = NULL;
2276 fTempStmt = NULL; // FIXME: maybe a memory leak?
2277 return DB_FILE_ERROR;
2278 }
2279
2280 fTempDB = NULL;
2281 fTempStmt = NULL;
2282
2283 return DB_SUCCESS;
2284}
2285
2286int Sqlite::ListTables(std::vector<std::string> *plist)
2287{
2288 if (!fIsConnected)
2289 return DB_FILE_ERROR;
2290
2291 if (fDebug)
2292 printf("Sqlite::ListTables at path [%s]\n", fPath.c_str());
2293
2294 int status;
2295
2296 const char* cmd = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2297
2298 DIR *dir = opendir(fPath.c_str());
2299 if (!dir) {
2300 cm_msg(MERROR, "Sqlite::ListTables", "Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2301 return HS_FILE_ERROR;
2302 }
2303
2304 while (1) {
2305 const struct dirent* de = readdir(dir);
2306 if (!de)
2307 break;
2308
2309 const char* dn = de->d_name;
2310
2311 //if (dn[0]!='m' || dn[1]!='h')
2312 //continue;
2313
2314 const char* s;
2315
2316 s = strstr(dn, "mh_");
2317 if (!s || s!=dn)
2318 continue;
2319
2320 s = strstr(dn, ".sqlite3");
2321 if (!s || s[8]!=0)
2322 continue;
2323
2324 char table_name[256];
2325 mstrlcpy(table_name, dn+3, sizeof(table_name));
2326 // FIXME: skip names like "xxx.sqlite3~" and "xxx.sqlite3-deleted"
2327 char* ss = strstr(table_name, ".sqlite3");
2328 if (!ss)
2329 continue;
2330 *ss = 0;
2331
2332 //printf("dn [%s] tn [%s]\n", dn, table_name);
2333
2334 status = Prepare(table_name, cmd);
2335 if (status != DB_SUCCESS)
2336 continue;
2337
2338 while (1) {
2339 status = Step();
2340 if (status != DB_SUCCESS)
2341 break;
2342
2343 const char* tn = GetText(0);
2344 //printf("table [%s]\n", tn);
2345 plist->push_back(tn);
2346 }
2347
2348 status = Finalize();
2349 }
2350
2351 closedir(dir);
2352 dir = NULL;
2353
2354 return DB_SUCCESS;
2355}
2356
2357int Sqlite::ListColumns(const char* table, std::vector<std::string> *plist)
2358{
2359 if (!fIsConnected)
2360 return DB_FILE_ERROR;
2361
2362 if (fDebug)
2363 printf("Sqlite::ListColumns for table \'%s\'\n", table);
2364
2365 std::string cmd;
2366 cmd = "PRAGMA table_info(";
2367 cmd += table;
2368 cmd += ");";
2369
2370 int status;
2371
2372 status = Prepare(table, cmd.c_str());
2373 if (status != DB_SUCCESS)
2374 return status;
2375
2376 while (1) {
2377 status = Step();
2378 if (status != DB_SUCCESS)
2379 break;
2380
2381 const char* colname = GetText(1);
2382 const char* coltype = GetText(2);
2383 //printf("column [%s] [%s]\n", colname, coltype);
2384 plist->push_back(colname); // column name
2385 plist->push_back(coltype); // column type
2386 }
2387
2388 status = Finalize();
2389
2390 return DB_SUCCESS;
2391}
2392
2393static int callback_debug = 0;
2394
2395static int callback(void *NotUsed, int argc, char **argv, char **azColName){
2396 if (callback_debug) {
2397 printf("history_sqlite::callback---->\n");
2398 for (int i=0; i<argc; i++){
2399 printf("history_sqlite::callback[%d] %s = %s\n", i, azColName[i], argv[i] ? argv[i] : "NULL");
2400 }
2401 }
2402 return 0;
2403}
2404
2405int Sqlite::Exec(const char* table_name, const char* sql)
2406{
2407 // return values:
2408 // DB_SUCCESS
2409 // DB_FILE_ERROR: not connected
2410 // DB_KEY_EXIST: "table already exists"
2411
2412 if (!fIsConnected)
2413 return DB_FILE_ERROR;
2414
2415 sqlite3* db = GetTable(table_name);
2416 if (!db)
2417 return DB_FILE_ERROR;
2418
2419 if (fDebug)
2420 printf("Sqlite::Exec(%s, %s)\n", table_name, sql);
2421
2422 int status;
2423
2424 callback_debug = fDebug;
2425 char* errmsg = NULL;
2426
2428 if (status != SQLITE_OK) {
2429 if (status == SQLITE_ERROR && strstr(errmsg, "duplicate column name"))
2430 return DB_KEY_EXIST;
2431 if (status == SQLITE_ERROR && strstr(errmsg, "already exists"))
2432 return DB_KEY_EXIST;
2433 std::string sqlstring = sql;
2434 cm_msg(MERROR, "Sqlite::Exec", "Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, errmsg);
2436 return DB_FILE_ERROR;
2437 }
2438
2439 return DB_SUCCESS;
2440}
2441
2442int Sqlite::ExecDisconnected(const char* table_name, const char* sql)
2443{
2444 cm_msg(MERROR, "Sqlite::Exec", "sqlite driver does not support disconnected operations");
2445 return DB_FILE_ERROR;
2446}
2447
2448#endif // HAVE_SQLITE
2449
2451// Methods of HsFileSchema //
2453
2454int HsFileSchema::write_event(const time_t t, const char* data, const int data_size)
2455{
2456 HsFileSchema* s = this;
2457
2458 assert(s->fVariables.size() == s->fOffsets.size());
2459
2460 int status;
2461
2462 if (s->fWriterFd < 0) {
2463 s->fWriterFd = open(s->fFileName.c_str(), O_RDWR);
2464 if (s->fWriterFd < 0) {
2465 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2466 return HS_FILE_ERROR;
2467 }
2468
2469 int file_size = lseek(s->fWriterFd, 0, SEEK_END);
2470
2471 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2472 if (nrec < 0)
2473 nrec = 0;
2474 int data_end = s->fDataOffset + nrec*s->fRecordSize;
2475
2476 //printf("file_size %d, nrec %d, data_end %d\n", file_size, nrec, data_end);
2477
2478 if (data_end != file_size) {
2479 if (nrec > 0)
2480 cm_msg(MERROR, "FileHistory::write_event", "File \'%s\' may be truncated, data offset %d, record size %d, file size: %d, should be %d, truncating the file", s->fFileName.c_str(), s->fDataOffset, s->fRecordSize, file_size, data_end);
2481
2483 if (status < 0) {
2484 cm_msg(MERROR, "FileHistory::write_event", "Cannot seek \'%s\' to offset %d, lseek() errno %d (%s)", s->fFileName.c_str(), data_end, errno, strerror(errno));
2485 return HS_FILE_ERROR;
2486 }
2488 if (status < 0) {
2489 cm_msg(MERROR, "FileHistory::write_event", "Cannot truncate \'%s\' to size %d, ftruncate() errno %d (%s)", s->fFileName.c_str(), data_end, errno, strerror(errno));
2490 return HS_FILE_ERROR;
2491 }
2492 }
2493 }
2494
2495 int expected_size = s->fRecordSize - 4;
2496
2497 // sanity check: record_size and n_bytes are computed from the byte counts in the file header
2498 assert(expected_size == s->fNumBytes);
2499
2500 if (s->fLastSize == 0)
2502
2503 if (data_size != s->fLastSize) {
2504 cm_msg(MERROR, "FileHistory::write_event", "Event \'%s\' data size mismatch, expected %d bytes, got %d bytes, previously %d bytes", s->fEventName.c_str(), expected_size, data_size, s->fLastSize);
2505 //printf("schema:\n");
2506 //s->print();
2507
2508 if (data_size < expected_size)
2509 return HS_FILE_ERROR;
2510
2511 // truncate for now
2512 // data_size = expected_size;
2513 s->fLastSize = data_size;
2514 }
2515
2516 int size = 4 + expected_size;
2517
2518 if (size != s->fRecordBufferSize) {
2519 s->fRecordBuffer = (char*)realloc(s->fRecordBuffer, size);
2520 assert(s->fRecordBuffer != NULL);
2521 s->fRecordBufferSize = size;
2522 }
2523
2524 memcpy(s->fRecordBuffer, &t, 4);
2526
2527 status = write(s->fWriterFd, s->fRecordBuffer, size);
2528 if (status != size) {
2529 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) returned %d, errno %d (%s)", s->fFileName.c_str(), size, status, errno, strerror(errno));
2530 return HS_FILE_ERROR;
2531 }
2532
2533#if 0
2534 status = write(s->fWriterFd, &t, 4);
2535 if (status != 4) {
2536 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2537 return HS_FILE_ERROR;
2538 }
2539
2541 if (status != expected_size) {
2542 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) errno %d (%s)", s->fFileName.c_str(), data_size, errno, strerror(errno));
2543 return HS_FILE_ERROR;
2544 }
2545#endif
2546
2547 return HS_SUCCESS;
2548}
2549
2551{
2552 if (fWriterFd >= 0) {
2554 fWriterFd = -1;
2555 }
2556 return HS_SUCCESS;
2557}
2558
2559static int ReadRecord(const char* file_name, int fd, int offset, int recsize, int irec, char* rec)
2560{
2561 int status;
2562 int fpos = offset + irec*recsize;
2563
2564 status = ::lseek(fd, fpos, SEEK_SET);
2565 if (status == -1) {
2566 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', lseek(%d) errno %d (%s)", file_name, fpos, errno, strerror(errno));
2567 return -1;
2568 }
2569
2570 status = ::read(fd, rec, recsize);
2571 if (status == 0) {
2572 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', unexpected end of file on read()", file_name);
2573 return -1;
2574 }
2575 if (status == -1) {
2576 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', read() errno %d (%s)", file_name, errno, strerror(errno));
2577 return -1;
2578 }
2579 if (status != recsize) {
2580 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', short read() returned %d instead of %d bytes", file_name, status, recsize);
2581 return -1;
2582 }
2583 return HS_SUCCESS;
2584}
2585
2586static int FindTime(const char* file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int* i1p, time_t* t1p, int* i2p, time_t* t2p, time_t* tstart, time_t* tend, int debug)
2587{
2588 //
2589 // purpose: find location time timestamp inside given file.
2590 // uses binary search
2591 // returns:
2592 // tstart, tend - time of first and last data in a file
2593 // i1p,t1p - data just before timestamp, used as "last_written"
2594 // i2p,t2p - data at timestamp or after timestamp, used as starting point to read data from file
2595 // assertions:
2596 // tstart <= t1p < t2p <= tend
2597 // i1p+1==i2p
2598 // t1p < timestamp <= t2p
2599 //
2600 // special cases:
2601 // 1) timestamp <= tstart - all data is in the future, return i1p==-1, t1p==-1, i2p==0, t2p==tstart
2602 // 2) tend < timestamp - all the data is in the past, return i1p = nrec-1, t1p = tend, i2p = nrec, t2p = 0;
2603 // 3) nrec == 1 only one record in this file and it is older than the timestamp (tstart == tend < timestamp)
2604 //
2605
2606 int status;
2607 char* buf = new char[recsize];
2608
2609 assert(nrec > 0);
2610
2611 int rec1 = 0;
2612 int rec2 = nrec-1;
2613
2615 if (status != HS_SUCCESS) {
2616 delete[] buf;
2617 return HS_FILE_ERROR;
2618 }
2619
2620 time_t t1 = *(DWORD*)buf;
2621
2622 *tstart = t1;
2623
2624 // timestamp is older than any data in this file
2625 if (timestamp <= t1) {
2626 *i1p = -1;
2627 *t1p = 0;
2628 *i2p = 0;
2629 *t2p = t1;
2630 *tend = 0;
2631 delete[] buf;
2632 return HS_SUCCESS;
2633 }
2634
2635 assert(t1 < timestamp);
2636
2637 if (nrec == 1) {
2638 *i1p = 0;
2639 *t1p = t1;
2640 *i2p = nrec; // == 1
2641 *t2p = 0;
2642 *tend = t1;
2643 delete[] buf;
2644 return HS_SUCCESS;
2645 }
2646
2648 if (status != HS_SUCCESS) {
2649 delete[] buf;
2650 return HS_FILE_ERROR;
2651 }
2652
2653 time_t t2 = *(DWORD*)buf;
2654
2655 *tend = t2;
2656
2657 // all the data is in the past
2658 if (t2 < timestamp) {
2659 *i1p = rec2;
2660 *t1p = t2;
2661 *i2p = nrec;
2662 *t2p = 0;
2663 delete[] buf;
2664 return HS_SUCCESS;
2665 }
2666
2667 assert(t1 < timestamp);
2668 assert(timestamp <= t2);
2669
2670 if (debug)
2671 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2672
2673 // implement binary search
2674
2675 do {
2676 int rec = (rec1+rec2)/2;
2677
2678 assert(rec >= 0);
2679 assert(rec < nrec);
2680
2682 if (status != HS_SUCCESS) {
2683 delete[] buf;
2684 return HS_FILE_ERROR;
2685 }
2686
2687 time_t t = *(DWORD*)buf;
2688
2689 if (timestamp <= t) {
2690 if (debug)
2691 printf("FindTime: rec %d..(x)..%d..%d, time %s..(%s)..%s..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t).c_str(), TimeToString(t2).c_str());
2692
2693 rec2 = rec;
2694 t2 = t;
2695 } else {
2696 if (debug)
2697 printf("FindTime: rec %d..%d..(x)..%d, time %s..%s..(%s)..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(t).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2698
2699 rec1 = rec;
2700 t1 = t;
2701 }
2702 } while (rec2 - rec1 > 1);
2703
2704 assert(rec1+1 == rec2);
2705 assert(t1 < timestamp);
2706 assert(timestamp <= t2);
2707
2708 if (debug)
2709 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s, this is the result.\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2710
2711 *i1p = rec1;
2712 *t1p = t1;
2713
2714 *i2p = rec2;
2715 *t2p = t2;
2716
2717 delete[] buf;
2718 return HS_SUCCESS;
2719}
2720
2722 const int debug,
2723 time_t* last_written)
2724{
2725 int status;
2726 HsFileSchema* s = this;
2727
2728 if (debug)
2729 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str());
2730
2731 int fd = open(s->fFileName.c_str(), O_RDONLY);
2732 if (fd < 0) {
2733 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2734 return HS_FILE_ERROR;
2735 }
2736
2737 int file_size = ::lseek(fd, 0, SEEK_END);
2738
2739 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2740 if (nrec < 0)
2741 nrec = 0;
2742
2743 if (nrec < 1) {
2744 ::close(fd);
2745 if (last_written)
2746 *last_written = 0;
2747 return HS_SUCCESS;
2748 }
2749
2750 time_t lw = 0;
2751
2752 // read last record to check if desired time is inside or outside of the file
2753
2754 if (1) {
2755 char* buf = new char[s->fRecordSize];
2756
2757 status = ReadRecord(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec - 1, buf);
2758 if (status != HS_SUCCESS) {
2759 delete[] buf;
2760 ::close(fd);
2761 return HS_FILE_ERROR;
2762 }
2763
2764 lw = *(DWORD*)buf;
2765
2766 delete[] buf;
2767 }
2768
2769 if (lw >= timestamp) {
2770 int irec = 0;
2771 time_t trec = 0;
2772 int iunused = 0;
2773 time_t tunused = 0;
2774 time_t tstart = 0; // not used
2775 time_t tend = 0; // not used
2776
2777 status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*debug);
2778 if (status != HS_SUCCESS) {
2779 ::close(fd);
2780 return HS_FILE_ERROR;
2781 }
2782
2783 assert(trec < timestamp);
2784
2785 lw = trec;
2786 }
2787
2788 if (last_written)
2789 *last_written = lw;
2790
2791 if (debug)
2792 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s, last_written %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
2793
2794 assert(lw < timestamp);
2795
2796 ::close(fd);
2797
2798 return HS_SUCCESS;
2799}
2800
2801int HsFileSchema::read_data(const time_t start_time,
2802 const time_t end_time,
2803 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
2804 const int debug,
2805 std::vector<time_t>& last_time,
2807{
2808 HsFileSchema* s = this;
2809
2810 if (debug)
2811 printf("FileHistory::read_data: file %s, schema time %s..%s, read time %s..%s, %d vars\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var);
2812
2813 //if (1) {
2814 // printf("Last time: ");
2815 // for (int i=0; i<num_var; i++) {
2816 // printf(" %s", TimeToString(last_time[i]).c_str());
2817 // }
2818 // printf("\n");
2819 //}
2820
2821 if (debug) {
2822 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
2823 for (size_t i=0; i<var_schema_index.size(); i++) {
2824 printf(" %2d", var_schema_index[i]);
2825 }
2826 printf("\n");
2827 }
2828
2829 int fd = ::open(s->fFileName.c_str(), O_RDONLY);
2830 if (fd < 0) {
2831 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2832 return HS_FILE_ERROR;
2833 }
2834
2835 off_t file_size = ::lseek(fd, 0, SEEK_END);
2836
2837 if (file_size == (off_t)-1) {
2838 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', fseek(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2839 ::close(fd);
2840 return HS_FILE_ERROR;
2841 }
2842
2843 int nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2844 if (nrec < 0)
2845 nrec = 0;
2846
2847 if (nrec < 1) {
2848 ::close(fd);
2849 return HS_SUCCESS;
2850 }
2851
2852 int iunused = 0;
2853 time_t tunused = 0;
2854 int irec = 0;
2855 time_t trec = 0;
2856 time_t tstart = 0;
2857 time_t tend = 0;
2858
2859 int status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*debug);
2860
2861 if (status != HS_SUCCESS) {
2862 ::close(fd);
2863 return HS_FILE_ERROR;
2864 }
2865
2866 if (debug) {
2867 printf("FindTime %d, nrec %d, (%d, %s) (%d, %s), tstart %s, tend %s, want %s\n", status, nrec, iunused, TimeToString(tunused).c_str(), irec, TimeToString(trec).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str(), TimeToString(start_time).c_str());
2868 }
2869
2870 if (irec < 0 || irec >= nrec) {
2871 // all data in this file is older than start_time
2872
2873 ::close(fd);
2874
2875 if (debug)
2876 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str());
2877
2878 return HS_SUCCESS;
2879 }
2880
2882 // data starts before time declared in schema
2883
2884 ::close(fd);
2885
2886 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of first data %s is before schema start time %s", s->fFileName.c_str(), TimeToString(tstart).c_str(), TimeToString(s->fTimeFrom).c_str());
2887
2888 return HS_FILE_ERROR;
2889 }
2890
2891 if (tend && s->fTimeTo && tend > s->fTimeTo) {
2892 // data ends after time declared in schema (overlaps with next file)
2893
2894 ::close(fd);
2895
2896 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of last data %s is after schema end time %s", s->fFileName.c_str(), TimeToString(tend).c_str(), TimeToString(s->fTimeTo).c_str());
2897
2898 return HS_FILE_ERROR;
2899 }
2900
2901 for (int i=0; i<num_var; i++) {
2902 int si = var_schema_index[i];
2903 if (si < 0)
2904 continue;
2905
2906 if (trec < last_time[i]) { // protect against duplicate and non-monotonous data
2907 ::close(fd);
2908
2909 cm_msg(MERROR, "FileHistory::read_data", "Internal history error at file \'%s\': variable %d data timestamp %s is before last timestamp %s", s->fFileName.c_str(), i, TimeToString(trec).c_str(), TimeToString(last_time[i]).c_str());
2910
2911 return HS_FILE_ERROR;
2912 }
2913 }
2914
2915 int count = 0;
2916
2918
2919 off_t xpos = ::lseek(fd, fpos, SEEK_SET);
2920 if (xpos == (off_t)-1) {
2921 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', lseek(%zu) errno %d (%s)", s->fFileName.c_str(), (size_t)fpos, errno, strerror(errno));
2922 ::close(fd);
2923 return HS_FILE_ERROR;
2924 }
2925
2926 char* buf = new char[s->fRecordSize];
2927
2928 int prec = irec;
2929
2930 while (1) {
2931 status = ::read(fd, buf, s->fRecordSize);
2932 if (status == 0) // EOF
2933 break;
2934 if (status == -1) {
2935 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', read() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2936 break;
2937 }
2938 if (status != s->fRecordSize) {
2939 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', short read() returned %d instead of %d bytes", s->fFileName.c_str(), status, s->fRecordSize);
2940 break;
2941 }
2942
2943 prec++;
2944
2945 bool past_end_of_last_file = (s->fTimeTo == 0) && (prec > nrec);
2946
2947 time_t t = *(DWORD*)buf;
2948
2949 if (debug > 1)
2950 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, row time %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(t).c_str());
2951
2952 if (t < trec) {
2953 delete[] buf;
2954 ::close(fd);
2955 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before start time %s", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(trec).c_str());
2956 return HS_FILE_ERROR;
2957 }
2958
2959 if (tend && (t > tend) && !past_end_of_last_file) {
2960 delete[] buf;
2961 ::close(fd);
2962 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is after last timestamp %s", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(tend).c_str());
2963 return HS_FILE_ERROR;
2964 }
2965
2966 if (t > end_time)
2967 break;
2968
2969 char* data = buf + 4;
2970
2971 for (int i=0; i<num_var; i++) {
2972 int si = var_schema_index[i];
2973 if (si < 0)
2974 continue;
2975
2976 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
2977 delete[] buf;
2978 ::close(fd);
2979
2980 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before timestamp %s of variable %d", s->fFileName.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(last_time[i]).c_str(), i);
2981
2982 return HS_FILE_ERROR;
2983 }
2984
2985 double v = 0;
2986 void* ptr = data + s->fOffsets[si];
2987
2988 int ii = var_index[i];
2989 assert(ii >= 0);
2990 assert(ii < s->fVariables[si].n_data);
2991
2992 switch (s->fVariables[si].type) {
2993 default:
2994 // unknown data type
2995 v = 0;
2996 break;
2997 case TID_BYTE:
2998 v = ((unsigned char*)ptr)[ii];
2999 break;
3000 case TID_SBYTE:
3001 v = ((signed char *)ptr)[ii];
3002 break;
3003 case TID_CHAR:
3004 v = ((char*)ptr)[ii];
3005 break;
3006 case TID_WORD:
3007 v = ((unsigned short *)ptr)[ii];
3008 break;
3009 case TID_SHORT:
3010 v = ((signed short *)ptr)[ii];
3011 break;
3012 case TID_DWORD:
3013 v = ((unsigned int *)ptr)[ii];
3014 break;
3015 case TID_INT:
3016 v = ((int *)ptr)[ii];
3017 break;
3018 case TID_BOOL:
3019 v = ((unsigned int *)ptr)[ii];
3020 break;
3021 case TID_FLOAT:
3022 v = ((float*)ptr)[ii];
3023 break;
3024 case TID_DOUBLE:
3025 v = ((double*)ptr)[ii];
3026 break;
3027 }
3028
3029 buffer[i]->Add(t, v);
3030 last_time[i] = t;
3031 }
3032 count++;
3033 }
3034
3035 delete[] buf;
3036
3037 ::close(fd);
3038
3039 if (debug) {
3040 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
3041 for (size_t i=0; i<var_schema_index.size(); i++) {
3042 printf(" %2d", var_schema_index[i]);
3043 }
3044 printf(" read %d rows\n", count);
3045 }
3046
3047 if (debug)
3048 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var, count);
3049
3050 return HS_SUCCESS;
3051}
3052
3054// Implementation of the MidasHistoryInterface //
3056
3058{
3059protected:
3061 std::string fConnectString;
3062
3063 // writer data
3065 std::vector<HsSchema*> fEvents;
3066
3067 // reader data
3069
3070public:
3072 {
3073 fDebug = 0;
3074 }
3075
3077 {
3078 for (unsigned i=0; i<fEvents.size(); i++)
3079 if (fEvents[i]) {
3080 delete fEvents[i];
3081 fEvents[i] = NULL;
3082 }
3083 fEvents.clear();
3084 }
3085
3086 virtual int hs_set_debug(int debug)
3087 {
3088 int old = fDebug;
3089 fDebug = debug;
3090 return old;
3091 }
3092
3093 virtual int hs_connect(const char* connect_string) = 0;
3094 virtual int hs_disconnect() = 0;
3095
3096protected:
3098 // Schema functions //
3100
3101 // load existing schema
3102 virtual int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp) = 0; // event_name: =NULL means read only event names, =event_name means load tag names for all matching events; timestamp: =0 means read all schema all the way to the beginning of time, =time means read schema in effect at this time and all newer schema
3103
3104 // return a new copy of a schema for writing into history. If schema for this event does not exist, it will be created
3105 virtual HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]) = 0;
3106
3107public:
3109 // Functions used by mlogger //
3111
3112 int hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
3113 int hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer);
3114 int hs_flush_buffers();
3115
3117 // Functions used by mhttpd //
3119
3120 int hs_clear_cache();
3121 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3122 int hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags);
3123 int hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[]);
3124 int hs_read_buffer(time_t start_time, time_t end_time,
3125 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3127 int hs_status[]);
3128
3129 /*------------------------------------------------------------------*/
3130
3132 {
3133 public:
3137
3139
3143 double **fDataBuffer;
3144
3146
3148 {
3149 fNumAdded = 0;
3150
3154
3155 fNumAlloc = 0;
3156 fNumEntries = NULL;
3157 fTimeBuffer = NULL;
3158 fDataBuffer = NULL;
3159
3160 fPrevTime = 0;
3161 }
3162
3163 ~ReadBuffer() // dtor
3164 {
3165 }
3166
3168 {
3169 if (wantalloc < fNumAlloc - 10)
3170 return;
3171
3172 int newalloc = fNumAlloc*2;
3173
3174 if (newalloc <= 1000)
3175 newalloc = wantalloc + 1000;
3176
3177 //printf("wantalloc %d, fNumEntries %d, fNumAlloc %d, newalloc %d\n", wantalloc, *fNumEntries, fNumAlloc, newalloc);
3178
3180 assert(*fTimeBuffer);
3181
3182 *fDataBuffer = (double*)realloc(*fDataBuffer, sizeof(double)*newalloc);
3183 assert(*fDataBuffer);
3184
3186 }
3187
3188 void Add(time_t t, double v)
3189 {
3190 if (t < fFirstTime)
3191 return;
3192 if (t > fLastTime)
3193 return;
3194
3195 fNumAdded++;
3196
3197 if ((fPrevTime==0) || (t >= fPrevTime + fInterval)) {
3198 int pos = *fNumEntries;
3199
3200 Realloc(pos + 1);
3201
3202 (*fTimeBuffer)[pos] = t;
3203 (*fDataBuffer)[pos] = v;
3204
3205 (*fNumEntries) = pos + 1;
3206
3207 fPrevTime = t;
3208 }
3209 }
3210
3211 void Finish()
3212 {
3213
3214 }
3215 };
3216
3217 /*------------------------------------------------------------------*/
3218
3219 int hs_read(time_t start_time, time_t end_time, time_t interval,
3220 int num_var,
3221 const char* const event_name[], const char* const var_name[], const int var_index[],
3222 int num_entries[],
3223 time_t* time_buffer[], double* data_buffer[],
3224 int st[]);
3225 /*------------------------------------------------------------------*/
3226
3227
3228 int hs_read_binned(time_t start_time, time_t end_time, int num_bins,
3229 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3230 int num_entries[],
3231 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3234 time_t last_time[], double last_value[],
3235 int st[]);
3236};
3237
3239{
3240 fNumEntries = 0;
3241
3245
3246 fSum0 = new double[num_bins];
3247 fSum1 = new double[num_bins];
3248 fSum2 = new double[num_bins];
3249
3250 for (int i=0; i<num_bins; i++) {
3251 fSum0[i] = 0;
3252 fSum1[i] = 0;
3253 fSum2[i] = 0;
3254 }
3255}
3256
3258{
3259 delete fSum0; fSum0 = NULL;
3260 delete fSum1; fSum1 = NULL;
3261 delete fSum2; fSum2 = NULL;
3262 // poison the pointers
3263 fCount = NULL;
3264 fMean = NULL;
3265 fRms = NULL;
3266 fMin = NULL;
3267 fMax = NULL;
3274}
3275
3277{
3278 for (int ibin = 0; ibin < fNumBins; ibin++) {
3279 if (fMin)
3280 fMin[ibin] = 0;
3281 if (fMax)
3282 fMax[ibin] = 0;
3283 if (fBinsFirstTime)
3284 fBinsFirstTime[ibin] = 0;
3285 if (fBinsFirstValue)
3286 fBinsFirstValue[ibin] = 0;
3287 if (fBinsLastTime)
3288 fBinsLastTime[ibin] = 0;
3289 if (fBinsLastValue)
3290 fBinsLastValue[ibin] = 0;
3291 }
3292 if (fLastTimePtr)
3293 *fLastTimePtr = 0;
3294 if (fLastValuePtr)
3295 *fLastValuePtr = 0;
3296}
3297
3299{
3300 if (t < fFirstTime)
3301 return;
3302 if (t > fLastTime)
3303 return;
3304
3305 fNumEntries++;
3306
3307 double a = (double)(t - fFirstTime);
3308 double b = (double)(fLastTime - fFirstTime);
3309 double fbin = fNumBins*a/b;
3310
3311 int ibin = (int)fbin;
3312
3313 if (ibin < 0)
3314 ibin = 0;
3315 else if (ibin >= fNumBins)
3316 ibin = fNumBins-1;
3317
3318 if (fSum0[ibin] == 0) {
3319 if (fMin)
3320 fMin[ibin] = v;
3321 if (fMax)
3322 fMax[ibin] = v;
3323 if (fBinsFirstTime)
3324 fBinsFirstTime[ibin] = t;
3325 if (fBinsFirstValue)
3326 fBinsFirstValue[ibin] = v;
3327 if (fBinsLastTime)
3328 fBinsLastTime[ibin] = t;
3329 if (fBinsLastValue)
3330 fBinsLastValue[ibin] = v;
3331 if (fLastTimePtr)
3332 *fLastTimePtr = t;
3333 if (fLastValuePtr)
3334 *fLastValuePtr = v;
3335 }
3336
3337 fSum0[ibin] += 1.0;
3338 fSum1[ibin] += v;
3339 fSum2[ibin] += v*v;
3340
3341 if (fMin)
3342 if (v < fMin[ibin])
3343 fMin[ibin] = v;
3344
3345 if (fMax)
3346 if (v > fMax[ibin])
3347 fMax[ibin] = v;
3348
3349 // NOTE: this assumes t and v are sorted by time.
3350 if (fBinsLastTime)
3351 fBinsLastTime[ibin] = t;
3352 if (fBinsLastValue)
3353 fBinsLastValue[ibin] = v;
3354
3355 if (fLastTimePtr)
3356 if (t > *fLastTimePtr) {
3357 *fLastTimePtr = t;
3358 if (fLastValuePtr)
3359 *fLastValuePtr = v;
3360 }
3361}
3362
3364{
3365 for (int i=0; i<fNumBins; i++) {
3366 double num = fSum0[i];
3367 double mean = 0;
3368 double variance = 0;
3369 if (num > 0) {
3370 mean = fSum1[i]/num;
3372 }
3373 double rms = 0;
3374 if (variance > 0)
3375 rms = sqrt(variance);
3376
3377 if (fCount)
3378 fCount[i] = (int)num;
3379
3380 if (fMean)
3381 fMean[i] = mean;
3382
3383 if (fRms)
3384 fRms[i] = rms;
3385
3386 if (num == 0) {
3387 if (fMin)
3388 fMin[i] = 0;
3389 if (fMax)
3390 fMax[i] = 0;
3391 }
3392 }
3393}
3394
3395int SchemaHistoryBase::hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
3396{
3397 if (fDebug) {
3398 printf("hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3399 if (fDebug > 1)
3400 PrintTags(ntags, tags);
3401 }
3402
3403 // delete all events with the same name
3404 for (unsigned int i=0; i<fEvents.size(); i++)
3405 if (fEvents[i])
3406 if (event_name_cmp(fEvents[i]->fEventName, event_name)==0) {
3407 if (fDebug)
3408 printf("deleting exising event %s\n", event_name);
3409 fEvents[i]->close();
3410 delete fEvents[i];
3411 fEvents[i] = NULL;
3412 }
3413
3414 // check for wrong types etc
3415 for (int i=0; i<ntags; i++) {
3416 if (strlen(tags[i].name) < 1) {
3417 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has empty name at index %d", event_name, i);
3418 return HS_FILE_ERROR;
3419 }
3420 if (tags[i].type <= 0 || tags[i].type >= TID_LAST) {
3421 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3422 event_name, tags[i].name, i, tags[i].type);
3423 return HS_FILE_ERROR;
3424 }
3425 if (tags[i].type == TID_STRING) {
3426 cm_msg(MERROR, "hs_define_event",
3427 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3428 tags[i].name, i);
3429 return HS_FILE_ERROR;
3430 }
3431 if (tags[i].n_data <= 0) {
3432 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3433 event_name, tags[i].name, i, tags[i].n_data);
3434 return HS_FILE_ERROR;
3435 }
3436 }
3437
3438 // check for duplicate names. Done by sorting, since this takes only O(N*log*N))
3439 std::vector<std::string> names;
3440 for (int i=0; i<ntags; i++) {
3441 std::string str(tags[i].name);
3442 std::transform(str.begin(), str.end(), str.begin(), ::toupper);
3443 names.push_back(str);
3444 }
3445 std::sort(names.begin(), names.end());
3446 for (int i=0; i<ntags-1; i++) {
3447 if (names[i] == names[i + 1]) {
3448 cm_msg(MERROR, "hs_define_event",
3449 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3450 names[i].c_str());
3451 return HS_FILE_ERROR;
3452 }
3453 }
3454
3455 HsSchema* s = new_event(event_name, timestamp, ntags, tags);
3456 if (!s)
3457 return HS_FILE_ERROR;
3458
3459 s->fDisabled = false;
3460
3462
3463 // find empty slot in events list
3464 for (unsigned int i=0; i<fEvents.size(); i++)
3465 if (!fEvents[i]) {
3466 fEvents[i] = s;
3467 s = NULL;
3468 break;
3469 }
3470
3471 // if no empty slots, add at the end
3472 if (s)
3473 fEvents.push_back(s);
3474
3475 return HS_SUCCESS;
3476}
3477
3478int SchemaHistoryBase::hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer)
3479{
3480 if (fDebug)
3481 printf("hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (int)timestamp, buffer_size);
3482
3483 HsSchema *s = NULL;
3484
3485 // find this event
3486 for (size_t i=0; i<fEvents.size(); i++)
3487 if (fEvents[i] && (event_name_cmp(fEvents[i]->fEventName, event_name)==0)) {
3488 s = fEvents[i];
3489 break;
3490 }
3491
3492 // not found
3493 if (!s)
3494 return HS_UNDEFINED_EVENT;
3495
3496 // deactivated because of error?
3497 if (s->fDisabled)
3498 return HS_FILE_ERROR;
3499
3500 if (s->fNumBytes == 0) { // compute expected data size
3501 // NB: history data does not have any padding!
3502 for (unsigned i=0; i<s->fVariables.size(); i++) {
3503 s->fNumBytes += s->fVariables[i].n_bytes;
3504 }
3505 }
3506
3507 int status;
3508
3509 if (buffer_size > s->fNumBytes) { // too many bytes!
3510 if (s->fCountWriteOversize == 0) {
3511 // only report first occurence
3512 // count of all occurences is reported by HsSchema destructor
3513 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3514 }
3516 if (buffer_size > s->fWriteMaxSize)
3517 s->fWriteMaxSize = buffer_size;
3518 status = s->write_event(timestamp, buffer, s->fNumBytes);
3519 } else if (buffer_size < s->fNumBytes) { // too few bytes
3520 if (s->fCountWriteUndersize == 0) {
3521 // only report first occurence
3522 // count of all occurences is reported by HsSchema destructor
3523 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3524 }
3526 if (s->fWriteMinSize == 0)
3527 s->fWriteMinSize = buffer_size;
3528 else if (buffer_size < s->fWriteMinSize)
3529 s->fWriteMinSize = buffer_size;
3530 char* tmp = (char*)malloc(s->fNumBytes);
3531 memcpy(tmp, buffer, buffer_size);
3532 memset(tmp + buffer_size, 0, s->fNumBytes - buffer_size);
3533 status = s->write_event(timestamp, tmp, s->fNumBytes);
3534 free(tmp);
3535 } else {
3536 assert(buffer_size == s->fNumBytes); // obviously
3537 status = s->write_event(timestamp, buffer, buffer_size);
3538 }
3539
3540 // if could not write event, deactivate it
3541 if (status != HS_SUCCESS) {
3542 s->fDisabled = true;
3543 cm_msg(MERROR, "hs_write_event", "Event \'%s\' disabled after write error %d", event_name, status);
3544 return HS_FILE_ERROR;
3545 }
3546
3547 return HS_SUCCESS;
3548}
3549
3551{
3552 int status = HS_SUCCESS;
3553
3554 if (fDebug)
3555 printf("hs_flush_buffers!\n");
3556
3557 for (unsigned int i=0; i<fEvents.size(); i++)
3558 if (fEvents[i]) {
3559 int xstatus = fEvents[i]->flush_buffers();
3560 if (xstatus != HS_SUCCESS)
3561 status = xstatus;
3562 }
3563
3564 return status;
3565}
3566
3568// Functions used by mhttpd //
3570
3572{
3573 if (fDebug)
3574 printf("SchemaHistoryBase::hs_clear_cache!\n");
3575
3577 fSchema.clear();
3578
3579 return HS_SUCCESS;
3580}
3581
3582int SchemaHistoryBase::hs_get_events(time_t t, std::vector<std::string> *pevents)
3583{
3584 if (fDebug)
3585 printf("hs_get_events, time %s\n", TimeToString(t).c_str());
3586
3587 int status = read_schema(&fSchema, NULL, t);
3588 if (status != HS_SUCCESS)
3589 return status;
3590
3591 if (fDebug) {
3592 printf("hs_get_events: available schema:\n");
3593 fSchema.print(false);
3594 }
3595
3596 assert(pevents);
3597
3598 for (unsigned i=0; i<fSchema.size(); i++) {
3599 HsSchema* s = fSchema[i];
3600 if (t && s->fTimeTo && s->fTimeTo < t)
3601 continue;
3602 bool dupe = false;
3603 for (unsigned j=0; j<pevents->size(); j++)
3604 if (event_name_cmp((*pevents)[j], s->fEventName.c_str())==0) {
3605 dupe = true;
3606 break;
3607 }
3608
3609 if (!dupe)
3610 pevents->push_back(s->fEventName);
3611 }
3612
3613 std::sort(pevents->begin(), pevents->end());
3614
3615 if (fDebug) {
3616 printf("hs_get_events: returning %d events\n", (int)pevents->size());
3617 for (unsigned i=0; i<pevents->size(); i++) {
3618 printf(" %d: [%s]\n", i, (*pevents)[i].c_str());
3619 }
3620 }
3621
3622 return HS_SUCCESS;
3623}
3624
3625int SchemaHistoryBase::hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags)
3626{
3627 if (fDebug)
3628 printf("hs_get_tags: event [%s], time %s\n", event_name, TimeToString(t).c_str());
3629
3630 assert(ptags);
3631
3632 int status = read_schema(&fSchema, event_name, t);
3633 if (status != HS_SUCCESS)
3634 return status;
3635
3636 bool found_event = false;
3637 for (unsigned i=0; i<fSchema.size(); i++) {
3638 HsSchema* s = fSchema[i];
3639 if (t && s->fTimeTo && s->fTimeTo < t)
3640 continue;
3641
3642 if (event_name_cmp(s->fEventName, event_name) != 0)
3643 continue;
3644
3645 found_event = true;
3646
3647 for (unsigned i=0; i<s->fVariables.size(); i++) {
3648 const char* tagname = s->fVariables[i].name.c_str();
3649 //printf("event_name [%s], table_name [%s], column name [%s], tag name [%s]\n", event_name, tn.c_str(), cn.c_str(), tagname);
3650
3651 bool dupe = false;
3652
3653 for (unsigned k=0; k<ptags->size(); k++)
3654 if (strcasecmp((*ptags)[k].name, tagname) == 0) {
3655 dupe = true;
3656 break;
3657 }
3658
3659 if (!dupe) {
3660 TAG t;
3661 mstrlcpy(t.name, tagname, sizeof(t.name));
3662 t.type = s->fVariables[i].type;
3663 t.n_data = s->fVariables[i].n_data;
3664
3665 ptags->push_back(t);
3666 }
3667 }
3668 }
3669
3670 if (!found_event)
3671 return HS_UNDEFINED_EVENT;
3672
3673 if (fDebug) {
3674 printf("hs_get_tags: event [%s], returning %d tags\n", event_name, (int)ptags->size());
3675 for (unsigned i=0; i<ptags->size(); i++) {
3676 printf(" tag[%d]: %s[%d] type %d\n", i, (*ptags)[i].name, (*ptags)[i].n_data, (*ptags)[i].type);
3677 }
3678 }
3679
3680 return HS_SUCCESS;
3681}
3682
3683int SchemaHistoryBase::hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[])
3684{
3685 if (fDebug) {
3686 printf("hs_get_last_written: timestamp %s, num_var %d\n", TimeToString(timestamp).c_str(), num_var);
3687 }
3688
3689 for (int j=0; j<num_var; j++) {
3690 last_written[j] = 0;
3691 }
3692
3693 for (int i=0; i<num_var; i++) {
3694 int status = read_schema(&fSchema, event_name[i], 0);
3695 if (status != HS_SUCCESS)
3696 return status;
3697 }
3698
3699 //fSchema.print(false);
3700
3701 for (int i=0; i<num_var; i++) {
3702 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3703 HsSchema* s = fSchema[ss];
3704 // schema is too new
3705 if (s->fTimeFrom && s->fTimeFrom >= timestamp)
3706 continue;
3707 // this schema is newer than last_written and may contain newer data?
3708 if (s->fTimeFrom && s->fTimeFrom < last_written[i])
3709 continue;
3710 // schema for the variables we want?
3711 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3712 if (sindex < 0)
3713 continue;
3714
3715 time_t lw = 0;
3716
3717 int status = s->read_last_written(timestamp, fDebug, &lw);
3718
3719 if (status == HS_SUCCESS && lw != 0) {
3720 for (int j=0; j<num_var; j++) {
3721 int sj = s->match_event_var(event_name[j], var_name[j], var_index[j]);
3722 if (sj < 0)
3723 continue;
3724
3725 if (lw > last_written[j])
3726 last_written[j] = lw;
3727 }
3728 }
3729 }
3730 }
3731
3732 if (fDebug) {
3733 printf("hs_get_last_written: timestamp time %s, num_var %d, result:\n", TimeToString(timestamp).c_str(), num_var);
3734 for (int i=0; i<num_var; i++) {
3735 printf(" event [%s] tag [%s] index [%d] last_written %s\n", event_name[i], var_name[i], var_index[i], TimeToString(last_written[i]).c_str());
3736 }
3737 }
3738
3739 return HS_SUCCESS;
3740}
3741
3743 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3745 int hs_status[])
3746{
3747 if (fDebug)
3748 printf("hs_read_buffer: %d variables, start time %s, end time %s\n", num_var, TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
3749
3750 for (int i=0; i<num_var; i++) {
3751 int status = read_schema(&fSchema, event_name[i], start_time);
3752 if (status != HS_SUCCESS)
3753 return status;
3754 }
3755
3756#if 0
3757 if (fDebug)
3758 fSchema.print(false);
3759#endif
3760
3761 for (int i=0; i<num_var; i++) {
3763 }
3764
3765 //for (unsigned ss=0; ss<fSchema.size(); ss++) {
3766 // HsSchema* s = fSchema[ss];
3767 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3768 // assert(fs != NULL);
3769 // printf("schema %d from %s to %s, filename %s\n", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3770 //}
3771
3772 // check that schema are sorted by time
3773
3774#if 0
3775 // check that schema list is sorted by time, descending fTimeFrom, newest schema first
3776 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3777 if (fDebug) {
3778 //printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d %d %d\n", ss, fSchema.size(),
3779 // TimeToString(fSchema[ss-1]->fTimeFrom).c_str(),
3780 // TimeToString(fSchema[ss]->fTimeFrom).c_str(),
3781 // TimeToString(fSchema[ss]->fTimeTo).c_str(),
3782 // fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo,
3783 // fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom,
3784 // (fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo) && (fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom));
3785 printf("Schema %zu/%zu: from %s to %s, name %s\n", ss, fSchema.size(),
3786 TimeToString(fSchema[ss]->fTimeFrom).c_str(),
3787 TimeToString(fSchema[ss]->fTimeTo).c_str(),
3788 fSchema[ss]->fEventName.c_str());
3789 }
3790
3791 if (ss > 0) {
3792 //if ((fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeTo) && (fSchema[ss-1]->fTimeFrom > fSchema[ss]->fTimeFrom)) {
3793 if ((fSchema[ss-1]->fTimeFrom >= fSchema[ss]->fTimeFrom)) {
3794 // good
3795 } else {
3796 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3797 return HS_FILE_ERROR;
3798 }
3799 }
3800 }
3801#endif
3802
3803 std::vector<HsSchema*> slist;
3804 std::vector<std::vector<int>> smap;
3805
3806 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3807 HsSchema* s = fSchema[ss];
3808 // schema is too new?
3809 if (s->fTimeFrom && s->fTimeFrom > end_time)
3810 continue;
3811 // schema is too old
3812 if (s->fTimeTo && s->fTimeTo < start_time)
3813 continue;
3814
3815 std::vector<int> sm;
3816
3817 for (int i=0; i<num_var; i++) {
3818 // schema for the variables we want?
3819 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3820 if (sindex < 0)
3821 continue;
3822
3823 if (sm.empty()) {
3824 for (int i=0; i<num_var; i++) {
3825 sm.push_back(-1);
3826 }
3827 }
3828
3829 sm[i] = sindex;
3830 }
3831
3832 if (!sm.empty()) {
3833 slist.push_back(s);
3834 smap.push_back(sm);
3835 }
3836 }
3837
3838 if (0||fDebug) {
3839 printf("Found %d matching schema:\n", (int)slist.size());
3840
3841 for (size_t i=0; i<slist.size(); i++) {
3842 HsSchema* s = slist[i];
3843 s->print();
3844 for (int k=0; k<num_var; k++)
3845 printf(" tag %s[%d] sindex %d\n", var_name[k], var_index[k], smap[i][k]);
3846 }
3847 }
3848
3849 //for (size_t ss=0; ss<slist.size(); ss++) {
3850 // HsSchema* s = slist[ss];
3851 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3852 // assert(fs != NULL);
3853 // printf("schema %zu from %s to %s, filename %s", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3854 // printf(" smap ");
3855 // for (int k=0; k<num_var; k++)
3856 // printf(" %2d", smap[ss][k]);
3857 // printf("\n");
3858 //}
3859
3860 for (size_t ss=1; ss<slist.size(); ss++) {
3861 if (fDebug) {
3862 printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3863 TimeToString(slist[ss-1]->fTimeFrom).c_str(),
3864 TimeToString(slist[ss]->fTimeFrom).c_str(),
3865 TimeToString(slist[ss]->fTimeTo).c_str(),
3866 slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom);
3867 }
3868 if (slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom) {
3869 // good
3870 } else {
3871 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3872 return HS_FILE_ERROR;
3873 }
3874 }
3875
3876 std::vector<time_t> last_time;
3877
3878 for (int i=0; i<num_var; i++) {
3879 last_time.push_back(start_time);
3880 }
3881
3882 for (int i=slist.size()-1; i>=0; i--) {
3883 HsSchema* s = slist[i];
3884
3885 int status = s->read_data(start_time, end_time, num_var, smap[i], var_index, fDebug, last_time, buffer);
3886
3887 if (status == HS_SUCCESS) {
3888 for (int j=0; j<num_var; j++) {
3889 if (smap[i][j] >= 0)
3891 }
3892 }
3893 }
3894
3895 return HS_SUCCESS;
3896}
3897
3899 int num_var,
3900 const char* const event_name[], const char* const var_name[], const int var_index[],
3901 int num_entries[],
3902 time_t* time_buffer[], double* data_buffer[],
3903 int st[])
3904{
3905 int status;
3906
3907 ReadBuffer** buffer = new ReadBuffer*[num_var];
3909
3910 for (int i=0; i<num_var; i++) {
3911 buffer[i] = new ReadBuffer(start_time, end_time, interval);
3912 bi[i] = buffer[i];
3913
3914 // make sure outputs are initialized to something sane
3915 if (num_entries)
3916 num_entries[i] = 0;
3917 if (time_buffer)
3918 time_buffer[i] = NULL;
3919 if (data_buffer)
3920 data_buffer[i] = NULL;
3921 if (st)
3922 st[i] = 0;
3923
3924 if (num_entries)
3925 buffer[i]->fNumEntries = &num_entries[i];
3926 if (time_buffer)
3927 buffer[i]->fTimeBuffer = &time_buffer[i];
3928 if (data_buffer)
3929 buffer[i]->fDataBuffer = &data_buffer[i];
3930 }
3931
3932 status = hs_read_buffer(start_time, end_time,
3933 num_var, event_name, var_name, var_index,
3934 bi, st);
3935
3936 for (int i=0; i<num_var; i++) {
3937 buffer[i]->Finish();
3938 delete buffer[i];
3939 }
3940
3941 delete[] buffer;
3942 delete[] bi;
3943
3944 return status;
3945}
3946
3948 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3949 int num_entries[],
3950 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3953 time_t last_time[], double last_value[],
3954 int st[])
3955{
3956 int status;
3957
3960
3961 for (int i=0; i<num_var; i++) {
3962 buffer[i] = new MidasHistoryBinnedBuffer(start_time, end_time, num_bins);
3963 xbuffer[i] = buffer[i];
3964
3965 if (count_bins)
3966 buffer[i]->fCount = count_bins[i];
3967 if (mean_bins)
3968 buffer[i]->fMean = mean_bins[i];
3969 if (rms_bins)
3970 buffer[i]->fRms = rms_bins[i];
3971 if (min_bins)
3972 buffer[i]->fMin = min_bins[i];
3973 if (max_bins)
3974 buffer[i]->fMax = max_bins[i];
3975 if (bins_first_time)
3976 buffer[i]->fBinsFirstTime = bins_first_time[i];
3977 if (bins_first_value)
3978 buffer[i]->fBinsFirstValue = bins_first_value[i];
3979 if (bins_last_time)
3980 buffer[i]->fBinsLastTime = bins_last_time[i];
3981 if (bins_last_value)
3982 buffer[i]->fBinsLastValue = bins_last_value[i];
3983 if (last_time)
3984 buffer[i]->fLastTimePtr = &last_time[i];
3985 if (last_value)
3986 buffer[i]->fLastValuePtr = &last_value[i];
3987
3988 buffer[i]->Start();
3989 }
3990
3991 status = hs_read_buffer(start_time, end_time,
3992 num_var, event_name, var_name, var_index,
3993 xbuffer,
3994 st);
3995
3996 for (int i=0; i<num_var; i++) {
3997 buffer[i]->Finish();
3998 if (num_entries)
3999 num_entries[i] = buffer[i]->fNumEntries;
4000 if (0) {
4001 for (int j=0; j<num_bins; j++) {
4002 printf("var %d bin %d count %d, first %s last %s value first %f last %f\n", i, j, count_bins[i][j], TimeToString(bins_first_time[i][j]).c_str(), TimeToString(bins_last_time[i][j]).c_str(), bins_first_value[i][j], bins_last_value[i][j]);
4003 }
4004 }
4005 delete buffer[i];
4006 }
4007
4008 delete[] buffer;
4009 delete[] xbuffer;
4010
4011 return status;
4012}
4013
4015// SQL schema //
4017
4019{
4020 if (!fSql->IsConnected()) {
4021 return HS_SUCCESS;
4022 }
4023
4024 int status = HS_SUCCESS;
4025 if (get_transaction_count() > 0) {
4028 }
4029 return status;
4030}
4031
4032int HsSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4033{
4034 if (!MatchEventName(this->fEventName.c_str(), event_name))
4035 return -1;
4036
4037 for (unsigned j=0; j<this->fVariables.size(); j++) {
4038 if (MatchTagName(this->fVariables[j].name.c_str(), this->fVariables[j].n_data, var_name, var_index)) {
4039 // Second clause in if() is case where MatchTagName used the "alternate tag name".
4040 // E.g. our variable name is "IM05[3]" (n_data=1), but we're looking for var_name="IM05" and var_index=3.
4041 if (var_index < this->fVariables[j].n_data || (this->fVariables[j].n_data == 1 && this->fVariables[j].name.find("[") != std::string::npos)) {
4042 return j;
4043 }
4044 }
4045 }
4046
4047 return -1;
4048}
4049
4050int HsSqlSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4051{
4052 if (event_name_cmp(this->fTableName, event_name)==0) {
4053 for (unsigned j=0; j<this->fVariables.size(); j++) {
4054 if (var_name_cmp(this->fColumnNames[j], var_name)==0)
4055 return j;
4056 }
4057 }
4058
4059 return HsSchema::match_event_var(event_name, var_name, var_index);
4060}
4061
4062static HsSqlSchema* NewSqlSchema(HsSchemaVector* sv, const char* table_name, time_t t)
4063{
4064 time_t tt = 0;
4065 int j=-1;
4066 int jjx=-1; // remember oldest schema
4067 time_t ttx = 0;
4068 for (unsigned i=0; i<sv->size(); i++) {
4069 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
4070 if (s->fTableName != table_name)
4071 continue;
4072
4073 if (s->fTimeFrom == t) {
4074 return s;
4075 }
4076
4077 // remember the last schema before time t
4078 if (s->fTimeFrom < t) {
4079 if (s->fTimeFrom > tt) {
4080 tt = s->fTimeFrom;
4081 j = i;
4082 }
4083 }
4084
4085 if (jjx < 0) {
4086 jjx = i;
4087 ttx = s->fTimeFrom;
4088 }
4089
4090 if (s->fTimeFrom < ttx) {
4091 jjx = i;
4092 ttx = s->fTimeFrom;
4093 }
4094
4095 //printf("table_name [%s], t=%s, i=%d, j=%d %s, tt=%s, dt is %d\n", table_name, TimeToString(t).c_str(), i, j, TimeToString(s->fTimeFrom).c_str(), TimeToString(tt).c_str(), (int)(s->fTimeFrom-t));
4096 }
4097
4098 //printf("NewSqlSchema: will copy schema j=%d, tt=%d at time %d\n", j, tt, t);
4099
4100 //printf("cloned schema at time %s: ", TimeToString(t).c_str());
4101 //(*sv)[j]->print(false);
4102
4103 //printf("schema before:\n");
4104 //sv->print(false);
4105
4106 if (j >= 0) {
4107 HsSqlSchema* s = new HsSqlSchema;
4108 *s = *(HsSqlSchema*)(*sv)[j]; // make a copy
4109 s->fTimeFrom = t;
4110 sv->add(s);
4111
4112 //printf("schema after:\n");
4113 //sv->print(false);
4114
4115 return s;
4116 }
4117
4118 if (jjx >= 0) {
4119 cm_msg(MERROR, "NewSqlSchema", "Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4120
4121 HsSqlSchema* s = new HsSqlSchema;
4122 *s = *(HsSqlSchema*)(*sv)[jjx]; // make a copy
4123 s->fTimeFrom = t;
4124 s->fTimeTo = ttx;
4125 sv->add(s);
4126
4127 //printf("schema after:\n");
4128 //sv->print(false);
4129
4130 return s;
4131 }
4132
4133 cm_msg(MERROR, "NewSqlSchema", "Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4134 return NULL;
4135}
4136
4138{
4139 assert(fVariables.size() == fColumnInactive.size());
4140 assert(fVariables.size() == fColumnNames.size());
4141 assert(fVariables.size() == fColumnTypes.size());
4142 assert(fVariables.size() == fOffsets.size());
4143
4144 size_t count_active = 0;
4145 size_t count_inactive = 0;
4146
4147 for (size_t i=0; i<fColumnInactive.size(); i++) {
4148 if (fColumnInactive[i])
4149 count_inactive += 1;
4150 else
4151 count_active += 1;
4152 }
4153
4154 //printf("remove_inactive_columns: enter! count_active: %zu, count_inactive: %zu\n", count_active, count_inactive);
4155 //print();
4156
4157 if (count_inactive > 0) {
4158 size_t j=0;
4159
4160 for (size_t i=0; i<fColumnInactive.size(); i++) {
4161 if (fColumnInactive[i]) {
4162 // skip this entry
4163 } else {
4164 if (j != i) {
4169 fOffsets[j] = fOffsets[i];
4170 }
4171 j++;
4172 }
4173 }
4174
4175 //print();
4176 //printf("%zu %zu\n", j, count_active);
4177
4178 assert(j == count_active);
4179
4180 //print();
4181
4182 fVariables.resize(count_active);
4184 fColumnNames.resize(count_active);
4185 fColumnTypes.resize(count_active);
4186 fOffsets.resize(count_active);
4187
4188 assert(fVariables.size() == fColumnInactive.size());
4189 assert(fVariables.size() == fColumnNames.size());
4190 assert(fVariables.size() == fColumnTypes.size());
4191 assert(fVariables.size() == fOffsets.size());
4192
4193 //printf("remove_inactice_columns: exit!\n");
4194 //print();
4195 }
4196}
4197
4198int HsSqlSchema::write_event(const time_t t, const char* data, const int data_size)
4199{
4200 HsSqlSchema* s = this;
4201
4202 assert(s->fVariables.size() == s->fColumnInactive.size());
4203 assert(s->fVariables.size() == s->fColumnNames.size());
4204 assert(s->fVariables.size() == s->fColumnTypes.size());
4205 assert(s->fVariables.size() == s->fOffsets.size());
4206
4207 std::string tags;
4208 std::string values;
4209
4210 for (unsigned i=0; i<s->fVariables.size(); i++) {
4211 // NB: inactive columns should have been removed from the schema. K.O.
4212
4213 if (s->fColumnInactive[i]) {
4214 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected inactive column %d", i);
4216 return HS_FILE_ERROR;
4217 }
4218
4219 int type = s->fVariables[i].type;
4220 int n_data = s->fVariables[i].n_data;
4221 int offset = s->fOffsets[i];
4222 const char* column_name = s->fColumnNames[i].c_str();
4223
4224 if (offset < 0) {
4225 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected negative offset %d for column %d", offset, i);
4227 return HS_FILE_ERROR;
4228 }
4229
4230 assert(n_data == 1);
4231 assert(strlen(column_name) > 0);
4232 assert(offset < data_size);
4233
4234 void* ptr = (void*)(data+offset);
4235
4236 tags += ", ";
4237 tags += fSql->QuoteId(column_name);
4238
4239 values += ", ";
4240
4241 char buf[1024];
4242 int j=0;
4243
4244 switch (type) {
4245 default:
4246 sprintf(buf, "unknownType%d", type);
4247 break;
4248 case TID_BYTE:
4249 sprintf(buf, "%u",((unsigned char *)ptr)[j]);
4250 break;
4251 case TID_SBYTE:
4252 sprintf(buf, "%d",((signed char*)ptr)[j]);
4253 break;
4254 case TID_CHAR:
4255 // FIXME: quotes
4256 sprintf(buf, "\'%c\'",((char*)ptr)[j]);
4257 break;
4258 case TID_WORD:
4259 sprintf(buf, "%u",((unsigned short *)ptr)[j]);
4260 break;
4261 case TID_SHORT:
4262 sprintf(buf, "%d",((short *)ptr)[j]);
4263 break;
4264 case TID_DWORD:
4265 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4266 break;
4267 case TID_INT:
4268 sprintf(buf, "%d",((int *)ptr)[j]);
4269 break;
4270 case TID_BOOL:
4271 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4272 break;
4273 case TID_FLOAT:
4274 // FIXME: quotes
4275 sprintf(buf, "\'%.8g\'",((float*)ptr)[j]);
4276 break;
4277 case TID_DOUBLE:
4278 // FIXME: quotes
4279 sprintf(buf, "\'%.16g\'",((double*)ptr)[j]);
4280 break;
4281 }
4282
4283 values += buf;
4284 }
4285
4286 // 2001-02-16 20:38:40.1
4287 struct tm tms;
4288 localtime_r(&t, &tms); // somebody must call tzset() before this.
4289 char buf[1024];
4290 strftime(buf, sizeof(buf)-1, "%Y-%m-%d %H:%M:%S.0", &tms);
4291
4292 std::string cmd;
4293 cmd = "INSERT INTO ";
4294 cmd += fSql->QuoteId(s->fTableName.c_str());
4295 cmd += " (_t_time, _i_time";
4296 cmd += tags;
4297 cmd += ") VALUES (";
4298 cmd += fSql->QuoteString(buf);
4299 cmd += ", ";
4300 cmd += fSql->QuoteString(TimeToString(t).c_str());
4301 cmd += "";
4302 cmd += values;
4303 cmd += ");";
4304
4305 if (fSql->IsConnected()) {
4306 if (s->get_transaction_count() == 0)
4307 fSql->OpenTransaction(s->fTableName.c_str());
4308
4310
4311 int status = fSql->Exec(s->fTableName.c_str(), cmd.c_str());
4312
4313 // mh2sql who does not call hs_flush_buffers()
4314 // so we should flush the transaction by hand
4315 // some SQL engines have limited transaction buffers... K.O.
4316 if (s->get_transaction_count() > 100000) {
4317 //printf("flush table %s\n", table_name);
4318 fSql->CommitTransaction(s->fTableName.c_str());
4320 }
4321
4322 if (status != DB_SUCCESS) {
4323 return status;
4324 }
4325 } else {
4326 int status = fSql->ExecDisconnected(s->fTableName.c_str(), cmd.c_str());
4327 if (status != DB_SUCCESS) {
4328 return status;
4329 }
4330 }
4331
4332 return HS_SUCCESS;
4333}
4334
4336 const int debug,
4337 time_t* last_written)
4338{
4339 if (debug)
4340 printf("SqlHistory::read_last_written: table [%s], timestamp %s\n", fTableName.c_str(), TimeToString(timestamp).c_str());
4341
4342 std::string cmd;
4343 cmd += "SELECT _i_time FROM ";
4344 cmd += fSql->QuoteId(fTableName.c_str());
4345 cmd += " WHERE _i_time < ";
4346 cmd += TimeToString(timestamp);
4347 cmd += " ORDER BY _i_time DESC LIMIT 2;";
4348
4349 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4350
4351 if (status != DB_SUCCESS)
4352 return status;
4353
4354 time_t lw = 0;
4355
4356 /* Loop through the rows in the result-set */
4357
4358 while (1) {
4359 status = fSql->Step();
4360 if (status != DB_SUCCESS)
4361 break;
4362
4363 time_t t = fSql->GetTime(0);
4364
4365 if (t >= timestamp)
4366 continue;
4367
4368 if (t > lw)
4369 lw = t;
4370 }
4371
4372 fSql->Finalize();
4373
4374 *last_written = lw;
4375
4376 if (debug)
4377 printf("SqlHistory::read_last_written: table [%s], timestamp %s, last_written %s\n", fTableName.c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
4378
4379 return HS_SUCCESS;
4380}
4381
4382int HsSqlSchema::read_data(const time_t start_time,
4383 const time_t end_time,
4384 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
4385 const int debug,
4386 std::vector<time_t>& last_time,
4388{
4389 bool bad_last_time = false;
4390
4391 if (debug)
4392 printf("SqlHistory::read_data: table [%s], start %s, end %s\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4393
4394 std::string collist;
4395
4396 for (int i=0; i<num_var; i++) {
4397 int j = var_schema_index[i];
4398 if (j < 0)
4399 continue;
4400 if (collist.length() > 0)
4401 collist += ", ";
4403 }
4404
4405 std::string cmd;
4406 cmd += "SELECT _i_time, ";
4407 cmd += collist;
4408 cmd += " FROM ";
4409 cmd += fSql->QuoteId(fTableName.c_str());
4410 cmd += " WHERE _i_time>=";
4411 cmd += TimeToString(start_time);
4412 cmd += " and _i_time<=";
4413 cmd += TimeToString(end_time);
4414 cmd += " ORDER BY _i_time;";
4415
4416 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4417
4418 if (status != DB_SUCCESS)
4419 return HS_FILE_ERROR;
4420
4421 /* Loop through the rows in the result-set */
4422
4423 int count = 0;
4424
4425 while (1) {
4426 status = fSql->Step();
4427 if (status != DB_SUCCESS)
4428 break;
4429
4430 count++;
4431
4432 time_t t = fSql->GetTime(0);
4433
4434 if (t < start_time || t > end_time)
4435 continue;
4436
4437 int k = 0;
4438
4439 for (int i=0; i<num_var; i++) {
4440 int j = var_schema_index[i];
4441 if (j < 0)
4442 continue;
4443
4444 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
4445 bad_last_time = true;
4446 } else {
4447 double v = fSql->GetDouble(1+k);
4448
4449 //printf("Column %d, index %d, Row %d, time %d, value %f\n", k, colindex[k], count, t, v);
4450
4451 buffer[i]->Add(t, v);
4452 last_time[i] = t;
4453 }
4454
4455 k++;
4456 }
4457 }
4458
4459 fSql->Finalize();
4460
4461 if (bad_last_time) {
4462 cm_msg(MERROR, "SqlHistory::read_data", "Detected duplicate or non-monotonous data in table \"%s\" for start time %s and end time %s", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4463 }
4464
4465 if (debug)
4466 printf("SqlHistory::read_data: table [%s], start %s, end %s, read %d rows\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), count);
4467
4468 return HS_SUCCESS;
4469}
4470
4472 if (!fSql || fSql->fTransactionPerTable) {
4474 } else {
4475 return gfTransactionCount[fSql];
4476 }
4477}
4478
4480 if (!fSql || fSql->fTransactionPerTable) {
4482 } else {
4484 }
4485}
4486
4494
4496// SQL history functions //
4498
4499static int StartSqlTransaction(SqlBase* sql, const char* table_name, bool* have_transaction)
4500{
4501 if (*have_transaction)
4502 return HS_SUCCESS;
4503
4504 int status = sql->OpenTransaction(table_name);
4505 if (status != DB_SUCCESS)
4506 return HS_FILE_ERROR;
4507
4508 *have_transaction = true;
4509 return HS_SUCCESS;
4510}
4511
4512static int CreateSqlTable(SqlBase* sql, const char* table_name, bool* have_transaction, bool set_default_timestamp = false)
4513{
4514 int status;
4515
4517 if (status != DB_SUCCESS)
4518 return HS_FILE_ERROR;
4519
4520 std::string cmd;
4521
4522 cmd = "CREATE TABLE ";
4523 cmd += sql->QuoteId(table_name);
4525 cmd += " (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4526 } else {
4527 cmd += " (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4528 }
4529
4530 status = sql->Exec(table_name, cmd.c_str());
4531
4532
4533 if (status == DB_KEY_EXIST) {
4534 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", but it already exists", table_name);
4536 return status;
4537 }
4538
4539 if (status != DB_SUCCESS) {
4540 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4542 return HS_FILE_ERROR;
4543 }
4544
4545 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\"", table_name);
4547
4548 std::string i_index_name;
4549 i_index_name = table_name;
4550 i_index_name += "_i_time_index";
4551
4552 std::string t_index_name;
4553 t_index_name = table_name;
4554 t_index_name += "_t_time_index";
4555
4556 cmd = "CREATE INDEX ";
4557 cmd += sql->QuoteId(i_index_name.c_str());
4558 cmd += " ON ";
4559 cmd += sql->QuoteId(table_name);
4560 cmd += " (_i_time ASC);";
4561
4562 status = sql->Exec(table_name, cmd.c_str());
4563 if (status != DB_SUCCESS)
4564 return HS_FILE_ERROR;
4565
4566 cmd = "CREATE INDEX ";
4567 cmd += sql->QuoteId(t_index_name.c_str());
4568 cmd += " ON ";
4569 cmd += sql->QuoteId(table_name);
4570 cmd += " (_t_time);";
4571
4572 status = sql->Exec(table_name, cmd.c_str());
4573 if (status != DB_SUCCESS)
4574 return HS_FILE_ERROR;
4575
4576 return status;
4577}
4578
4579static int CreateSqlHyperTable(SqlBase* sql, const char* table_name, bool* have_transaction) {
4580 int status;
4581
4583 if (status != DB_SUCCESS)
4584 return HS_FILE_ERROR;
4585
4586 std::string cmd;
4587
4588 cmd = "CREATE TABLE ";
4589 cmd += sql->QuoteId(table_name);
4590 cmd += " (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4591
4592 status = sql->Exec(table_name, cmd.c_str());
4593
4594 if (status == DB_KEY_EXIST) {
4595 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", but it already exists", table_name);
4597 return status;
4598 }
4599
4600 if (status != DB_SUCCESS) {
4601 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4603 return HS_FILE_ERROR;
4604 }
4605
4606 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\"", table_name);
4608
4609 cmd = "SELECT create_hypertable(";
4610 cmd += sql->QuoteString(table_name);
4611 cmd += ", '_t_time');";
4612
4613 // convert regular table to hypertable
4614 status = sql->Exec(table_name, cmd.c_str());
4615
4616 if (status != DB_SUCCESS) {
4617 cm_msg(MINFO, "CreateSqlHyperTable", "Converting SQL table to hypertable \"%s\", error status %d", table_name, status);
4619 return HS_FILE_ERROR;
4620 }
4621
4622 std::string i_index_name;
4623 i_index_name = table_name;
4624 i_index_name += "_i_time_index";
4625
4626 std::string t_index_name;
4627 t_index_name = table_name;
4628 t_index_name += "_t_time_index";
4629
4630 cmd = "CREATE INDEX ";
4631 cmd += sql->QuoteId(i_index_name.c_str());
4632 cmd += " ON ";
4633 cmd += sql->QuoteId(table_name);
4634 cmd += " (_i_time ASC);";
4635
4636 status = sql->Exec(table_name, cmd.c_str());
4637 if (status != DB_SUCCESS)
4638 return HS_FILE_ERROR;
4639
4640 cmd = "CREATE INDEX ";
4641 cmd += sql->QuoteId(t_index_name.c_str());
4642 cmd += " ON ";
4643 cmd += sql->QuoteId(table_name);
4644 cmd += " (_t_time);";
4645
4646 status = sql->Exec(table_name, cmd.c_str());
4647 if (status != DB_SUCCESS)
4648 return HS_FILE_ERROR;
4649
4650 return status;
4651}
4652
4653static int CreateSqlColumn(SqlBase* sql, const char* table_name, const char* column_name, const char* column_type, bool* have_transaction, int debug)
4654{
4655 if (debug)
4656 printf("CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4657
4658 int status = StartSqlTransaction(sql, table_name, have_transaction);
4659 if (status != HS_SUCCESS)
4660 return status;
4661
4662 std::string cmd;
4663 cmd = "ALTER TABLE ";
4664 cmd += sql->QuoteId(table_name);
4665 cmd += " ADD COLUMN ";
4666 cmd += sql->QuoteId(column_name);
4667 cmd += " ";
4668 cmd += column_type;
4669 cmd += ";";
4670
4671 status = sql->Exec(table_name, cmd.c_str());
4672
4673 cm_msg(MINFO, "CreateSqlColumn", "Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name, status);
4675
4676 return status;
4677}
4678
4680// SQL history base classes //
4682
4684{
4685public:
4687
4689 {
4690 fSql = NULL;
4692 }
4693
4694 virtual ~SqlHistoryBase() // dtor
4695 {
4696 hs_disconnect();
4697 if (fSql)
4698 delete fSql;
4699 fSql = NULL;
4700 }
4701
4703 {
4704 if (fSql)
4705 fSql->fDebug = debug;
4707 }
4708
4709 int hs_connect(const char* connect_string);
4710 int hs_disconnect();
4711 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
4712 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
4713
4714protected:
4716 virtual int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name) = 0;
4717 virtual int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp) = 0;
4718 virtual int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction) = 0;
4719
4720 int update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable);
4721 int update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction);
4722};
4723
4725{
4726 if (fDebug)
4727 printf("hs_connect [%s]!\n", connect_string);
4728
4729 assert(fSql);
4730
4731 if (fSql->IsConnected())
4732 if (strcmp(fConnectString.c_str(), connect_string) == 0)
4733 return HS_SUCCESS;
4734
4735 hs_disconnect();
4736
4737 if (!connect_string || strlen(connect_string) < 1) {
4738 // FIXME: should use "logger dir" or some such default, that code should be in hs_get_history(), not here
4739 connect_string = ".";
4740 }
4741
4743
4744 if (fDebug)
4745 printf("hs_connect: connecting to SQL database \'%s\'\n", fConnectString.c_str());
4746
4747 int status = fSql->Connect(fConnectString.c_str());
4748 if (status != DB_SUCCESS)
4749 return status;
4750
4751 return HS_SUCCESS;
4752}
4753
4755{
4756 if (fDebug)
4757 printf("hs_disconnect!\n");
4758
4760
4761 fSql->Disconnect();
4762
4764
4765 return HS_SUCCESS;
4766}
4767
4768HsSchema* SqlHistoryBase::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
4769{
4770 if (fDebug)
4771 printf("SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
4772
4773 int status;
4774
4775 if (fWriterCurrentSchema.size() == 0) {
4777 if (status != HS_SUCCESS)
4778 return NULL;
4779 }
4780
4781 HsSqlSchema* s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4782
4783 // schema does not exist, the SQL tables probably do not exist yet
4784
4785 if (!s) {
4786 status = create_table(&fWriterCurrentSchema, event_name, timestamp);
4787 if (status != HS_SUCCESS)
4788 return NULL;
4789
4790 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4791
4792 if (!s) {
4793 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4794 fWriterCurrentSchema.find_event(event_name, timestamp, 1);
4795 return NULL;
4796 }
4797 }
4798
4799 assert(s != NULL);
4800
4802 if (status != HS_SUCCESS)
4803 return NULL;
4804
4805 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4806
4807 if (!s) {
4808 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4809 return NULL;
4810 }
4811
4812 if (0||fDebug) {
4813 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4814 if (s)
4815 s->print();
4816 }
4817
4818 status = update_schema(s, timestamp, ntags, tags, true);
4819 if (status != HS_SUCCESS) {
4820 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4821 return NULL;
4822 }
4823
4825 if (status != HS_SUCCESS)
4826 return NULL;
4827
4828 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4829
4830 if (!s) {
4831 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4832 return NULL;
4833 }
4834
4835 if (0||fDebug) {
4836 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4837 if (s)
4838 s->print();
4839 }
4840
4841 // last call to UpdateMysqlSchema with "false" will check that new schema matches the new tags
4842
4843 status = update_schema(s, timestamp, ntags, tags, false);
4844 if (status != HS_SUCCESS) {
4845 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4846 //fDebug = 1;
4847 //update_schema(s, timestamp, ntags, tags, false);
4848 //abort();
4849 return NULL;
4850 }
4851
4852 HsSqlSchema* e = new HsSqlSchema();
4853
4854 *e = *s; // make a copy of the schema
4855
4856 return e;
4857}
4858
4859int SqlHistoryBase::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
4860{
4861 if (fDebug)
4862 printf("SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
4863
4864 int status;
4865
4866 if (fSchema.size() == 0) {
4868 if (status != HS_SUCCESS)
4869 return status;
4870 }
4871
4872 //sv->print(false);
4873
4874 if (event_name == NULL)
4875 return HS_SUCCESS;
4876
4877 for (unsigned i=0; i<sv->size(); i++) {
4878 HsSqlSchema* h = (HsSqlSchema*)(*sv)[i];
4879 // skip schema with already read column names
4880 if (h->fVariables.size() > 0)
4881 continue;
4882 // skip schema with different name
4883 if (!MatchEventName(h->fEventName.c_str(), event_name))
4884 continue;
4885
4886 unsigned nn = sv->size();
4887
4888 status = read_column_names(sv, h->fTableName.c_str(), h->fEventName.c_str());
4889
4890 // if new schema was added, loop all over again
4891 if (sv->size() != nn)
4892 i=0;
4893 }
4894
4895 //sv->print(false);
4896
4897 return HS_SUCCESS;
4898}
4899
4900int SqlHistoryBase::update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
4901{
4902 int status;
4903 bool have_transaction = false;
4904
4905 status = update_schema1(s, timestamp, ntags, tags, write_enable, &have_transaction);
4906
4907 if (have_transaction) {
4908 int xstatus;
4909
4910 if (status == HS_SUCCESS)
4912 else
4914
4915 if (xstatus != DB_SUCCESS) {
4916 return HS_FILE_ERROR;
4917 }
4918 have_transaction = false;
4919 }
4920
4921 return status;
4922}
4923
4924int SqlHistoryBase::update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction)
4925{
4926 int status;
4927
4928 if (fDebug)
4929 printf("update_schema1\n");
4930
4931 // check that compare schema with tags[]
4932
4933 bool schema_ok = true;
4934
4935 int offset = 0;
4936 for (int i=0; i<ntags; i++) {
4937 for (unsigned int j=0; j<tags[i].n_data; j++) {
4938 int tagtype = tags[i].type;
4939 std::string tagname = tags[i].name;
4940 std::string maybe_colname = MidasNameToSqlName(tags[i].name);
4941
4942 if (tags[i].n_data > 1) {
4943 char s[256];
4944 sprintf(s, "[%d]", j);
4945 tagname += s;
4946
4947 sprintf(s, "_%d", j);
4948 maybe_colname += s;
4949 }
4950
4951 int count = 0;
4952
4953 for (unsigned j=0; j<s->fVariables.size(); j++) {
4954 // NB: inactive columns will be reactivated or recreated by the if(count==0) branch. K.O.
4955 if (s->fColumnInactive[j])
4956 continue;
4957 if (tagname == s->fVariables[j].name) {
4958 if (s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str())) {
4959 if (count == 0) {
4960 s->fOffsets[j] = offset;
4962 }
4963 count++;
4964 if (count > 1) {
4965 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
4967 }
4968 } else {
4969 // column with incompatible type, mark it as unused
4970 schema_ok = false;
4971 if (fDebug)
4972 printf("Incompatible column!\n");
4973 if (write_enable) {
4974 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" as incompatible with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
4976
4977 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[i].tag_type.c_str(), timestamp, false, have_transaction);
4978 if (status != HS_SUCCESS)
4979 return status;
4980 }
4981 }
4982 }
4983 }
4984
4985 if (count == 0) {
4986 // tag does not have a corresponding column
4987 schema_ok = false;
4988 if (fDebug)
4989 printf("No column for tag %s!\n", tagname.c_str());
4990
4991 bool found_column = false;
4992
4993 if (write_enable) {
4994 for (unsigned j=0; j<s->fVariables.size(); j++) {
4995 if (tagname == s->fVariables[j].tag_name) {
4996 bool typeok = s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str());
4997 if (typeok) {
4998 cm_msg(MINFO, "SqlHistory::update_schema", "Reactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5000
5001 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[j].tag_type.c_str(), timestamp, true, have_transaction);
5002 if (status != HS_SUCCESS)
5003 return status;
5004
5005 if (count == 0) {
5006 s->fOffsets[j] = offset;
5008 }
5009 count++;
5010 found_column = true;
5011 if (count > 1) {
5012 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5014 }
5015 }
5016 }
5017 }
5018 }
5019
5020 // create column
5021 if (!found_column && write_enable) {
5022 std::string col_name = maybe_colname;
5023 const char* col_type = s->fSql->ColumnType(tagtype);
5024
5025 bool dupe = false;
5026 for (unsigned kk=0; kk<s->fColumnNames.size(); kk++)
5027 if (s->fColumnNames[kk] == col_name) {
5028 dupe = true;
5029 break;
5030 }
5031
5032 time_t now = time(NULL);
5033
5034 bool retry = false;
5035 for (int t=0; t<20; t++) {
5036
5037 // if duplicate column name, change it, try again
5038 if (dupe || retry) {
5040 col_name += "_";
5042 if (t > 0) {
5043 char s[256];
5044 sprintf(s, "_%d", t);
5045 col_name += s;
5046 }
5047 }
5048
5049 if (fDebug)
5050 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5051
5053
5054 if (status == DB_KEY_EXIST) {
5055 if (fDebug)
5056 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5057 retry = true;
5058 continue;
5059 }
5060
5061 if (status != HS_SUCCESS)
5062 return status;
5063
5064 break;
5065 }
5066
5067 if (status != HS_SUCCESS)
5068 return status;
5069
5070 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str(), rpc_tid_name(tagtype), timestamp, true, have_transaction);
5071 if (status != HS_SUCCESS)
5072 return status;
5073 }
5074 }
5075
5076 if (count > 1) {
5077 // schema has duplicate tags
5078 schema_ok = false;
5079 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->fEventName.c_str(), tagname.c_str());
5081 }
5082 }
5083 }
5084
5085 // mark as unused all columns not listed in tags
5086
5087 for (unsigned k=0; k<s->fColumnNames.size(); k++)
5088 if (s->fVariables[k].name.length() > 0) {
5089 bool found = false;
5090
5091 for (int i=0; i<ntags; i++) {
5092 for (unsigned int j=0; j<tags[i].n_data; j++) {
5093 std::string tagname = tags[i].name;
5094
5095 if (tags[i].n_data > 1) {
5096 char s[256];
5097 sprintf(s, "[%d]", j);
5098 tagname += s;
5099 }
5100
5101 if (s->fVariables[k].name == tagname) {
5102 found = true;
5103 break;
5104 }
5105 }
5106
5107 if (found)
5108 break;
5109 }
5110
5111 if (!found) {
5112 // column not found in tags list
5113 schema_ok = false;
5114 if (fDebug)
5115 printf("Event [%s] Column [%s] tag [%s] not listed in tags list!\n", s->fEventName.c_str(), s->fColumnNames[k].c_str(), s->fVariables[k].name.c_str());
5116 if (write_enable) {
5117 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" not used for any tags", s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fTableName.c_str(), s->fEventName.c_str());
5119
5120 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fVariables[k].tag_name.c_str(), s->fVariables[k].tag_type.c_str(), timestamp, false, have_transaction);
5121 if (status != HS_SUCCESS)
5122 return status;
5123 }
5124 }
5125 }
5126
5127 if (!write_enable)
5128 if (!schema_ok) {
5129 if (fDebug)
5130 printf("Return error!\n");
5131 return HS_FILE_ERROR;
5132 }
5133
5134 return HS_SUCCESS;
5135}
5136
5138// SQLITE functions //
5140
5141static int ReadSqliteTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5142{
5143 if (debug)
5144 printf("ReadSqliteTableNames: table [%s]\n", table_name);
5145
5146 int status;
5147 std::string cmd;
5148
5149 // FIXME: quotes
5150 cmd = "SELECT event_name, _i_time FROM \'_event_name_";
5151 cmd += table_name;
5152 cmd += "\' WHERE table_name='";
5153 cmd += table_name;
5154 cmd += "';";
5155
5156 status = sql->Prepare(table_name, cmd.c_str());
5157
5158 if (status != DB_SUCCESS)
5159 return status;
5160
5161 while (1) {
5162 status = sql->Step();
5163
5164 if (status != DB_SUCCESS)
5165 break;
5166
5167 std::string xevent_name = sql->GetText(0);
5168 time_t xevent_time = sql->GetTime(1);
5169
5170 //printf("read event name [%s] time %s\n", xevent_name.c_str(), TimeToString(xevent_time).c_str());
5171
5172 HsSqlSchema* s = new HsSqlSchema;
5173 s->fSql = sql;
5176 s->fTimeTo = 0;
5177 s->fTableName = table_name;
5178 sv->add(s);
5179 }
5180
5181 status = sql->Finalize();
5182
5183 return HS_SUCCESS;
5184}
5185
5186static int ReadSqliteTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5187{
5188 if (debug)
5189 printf("ReadSqliteTableSchema: table [%s]\n", table_name);
5190
5191 if (1) {
5192 // seed schema with table names
5193 HsSqlSchema* s = new HsSqlSchema;
5194 s->fSql = sql;
5195 s->fEventName = table_name;
5196 s->fTimeFrom = 0;
5197 s->fTimeTo = 0;
5198 s->fTableName = table_name;
5199 sv->add(s);
5200 }
5201
5202 return ReadSqliteTableNames(sql, sv, table_name, debug);
5203}
5204
5206// SQLITE history classes //
5208
5210{
5211public:
5212 SqliteHistory() { // ctor
5213#ifdef HAVE_SQLITE
5214 fSql = new Sqlite();
5215#endif
5216 }
5217
5219 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5220 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5221 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5222};
5223
5225{
5226 int status;
5227
5228 if (fDebug)
5229 printf("SqliteHistory::read_table_and_event_names!\n");
5230
5231 // loop over all tables
5232
5233 std::vector<std::string> tables;
5235 if (status != DB_SUCCESS)
5236 return status;
5237
5238 for (unsigned i=0; i<tables.size(); i++) {
5239 const char* table_name = tables[i].c_str();
5240
5241 const char* s;
5242 s = strstr(table_name, "_event_name_");
5243 if (s == table_name)
5244 continue;
5245 s = strstr(table_name, "_column_names_");
5246 if (s == table_name)
5247 continue;
5248
5249 status = ReadSqliteTableSchema(fSql, sv, table_name, fDebug);
5250 }
5251
5252 return HS_SUCCESS;
5253}
5254
5255int SqliteHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5256{
5257 if (fDebug)
5258 printf("SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5259
5260 // for all schema for table_name, prepopulate is with column names
5261
5262 std::vector<std::string> columns;
5263 fSql->ListColumns(table_name, &columns);
5264
5265 // first, populate column names
5266
5267 for (unsigned i=0; i<sv->size(); i++) {
5268 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5269
5270 if (s->fTableName != table_name)
5271 continue;
5272
5273 // schema should be empty at this point
5274 //assert(s->fVariables.size() == 0);
5275
5276 for (unsigned j=0; j<columns.size(); j+=2) {
5277 const char* cn = columns[j+0].c_str();
5278 const char* ct = columns[j+1].c_str();
5279
5280 if (strcmp(cn, "_t_time") == 0)
5281 continue;
5282 if (strcmp(cn, "_i_time") == 0)
5283 continue;
5284
5285 bool found = false;
5286
5287 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
5288 if (s->fColumnNames[k] == cn) {
5289 found = true;
5290 break;
5291 }
5292 }
5293
5294 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5295
5296 if (!found) {
5298 se.name = cn;
5299 se.type = 0;
5300 se.n_data = 1;
5301 se.n_bytes = 0;
5302 s->fVariables.push_back(se);
5303 s->fColumnNames.push_back(cn);
5304 s->fColumnTypes.push_back(ct);
5305 s->fColumnInactive.push_back(false);
5306 s->fOffsets.push_back(-1);
5307 }
5308 }
5309 }
5310
5311 // then read column name information
5312
5313 std::string tn;
5314 tn += "_column_names_";
5315 tn += table_name;
5316
5317 std::string cmd;
5318 cmd = "SELECT column_name, tag_name, tag_type, _i_time FROM ";
5319 cmd += fSql->QuoteId(tn.c_str());
5320 cmd += " WHERE table_name=";
5321 cmd += fSql->QuoteString(table_name);
5322 cmd += " ORDER BY _i_time ASC;";
5323
5324 int status = fSql->Prepare(table_name, cmd.c_str());
5325
5326 if (status != DB_SUCCESS) {
5327 return status;
5328 }
5329
5330 while (1) {
5331 status = fSql->Step();
5332
5333 if (status != DB_SUCCESS)
5334 break;
5335
5336 // NOTE: SQL "SELECT ORDER BY _i_time ASC" returns data sorted by time
5337 // in this code we use the data from the last data row
5338 // so if multiple rows are present, the latest one is used
5339
5340 std::string col_name = fSql->GetText(0);
5341 std::string tag_name = fSql->GetText(1);
5342 std::string tag_type = fSql->GetText(2);
5344
5345 //printf("read table [%s] column [%s] tag name [%s] time %s\n", table_name, col_name.c_str(), tag_name.c_str(), TimeToString(xxx_time).c_str());
5346
5347 // make sure a schema exists at this time point
5348 NewSqlSchema(sv, table_name, schema_time);
5349
5350 // add this information to all schema
5351
5352 for (unsigned i=0; i<sv->size(); i++) {
5353 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5354 if (s->fTableName != table_name)
5355 continue;
5356 if (s->fTimeFrom < schema_time)
5357 continue;
5358
5359 //printf("add column to schema %d\n", s->fTimeFrom);
5360
5361 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
5362 if (col_name != s->fColumnNames[j])
5363 continue;
5364 s->fVariables[j].name = tag_name;
5365 s->fVariables[j].type = rpc_name_tid(tag_type.c_str());
5366 s->fVariables[j].n_data = 1;
5367 s->fVariables[j].n_bytes = rpc_tid_size(s->fVariables[j].type);
5368 }
5369 }
5370 }
5371
5372 status = fSql->Finalize();
5373
5374 return HS_SUCCESS;
5375}
5376
5377int SqliteHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5378{
5379 if (fDebug)
5380 printf("SqliteHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5381
5382 int status;
5383 bool have_transaction = false;
5384 std::string table_name = MidasNameToSqlName(event_name);
5385
5386 // FIXME: what about duplicate table names?
5387 status = CreateSqlTable(fSql, table_name.c_str(), &have_transaction);
5388
5389 //if (status == DB_KEY_EXIST) {
5390 // return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5391 //}
5392
5393 if (status != HS_SUCCESS) {
5394 // FIXME: ???
5395 // FIXME: at least close or revert the transaction
5396 return status;
5397 }
5398
5399 std::string cmd;
5400
5401 std::string en;
5402 en += "_event_name_";
5403 en += table_name;
5404
5405 cmd = "CREATE TABLE ";
5406 cmd += fSql->QuoteId(en.c_str());
5407 cmd += " (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5408
5409 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5410
5411 cmd = "INSERT INTO ";
5412 cmd += fSql->QuoteId(en.c_str());
5413 cmd += " (table_name, event_name, _i_time) VALUES (";
5414 cmd += fSql->QuoteString(table_name.c_str());
5415 cmd += ", ";
5416 cmd += fSql->QuoteString(event_name);
5417 cmd += ", ";
5418 cmd += fSql->QuoteString(TimeToString(timestamp).c_str());
5419 cmd += ");";
5420
5421 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5422
5423 std::string cn;
5424 cn += "_column_names_";
5425 cn += table_name;
5426
5427 cmd = "CREATE TABLE ";
5428 cmd += fSql->QuoteId(cn.c_str());
5429 cmd += " (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5430
5431 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5432
5433 status = fSql->CommitTransaction(table_name.c_str());
5434 if (status != DB_SUCCESS) {
5435 return HS_FILE_ERROR;
5436 }
5437
5438 return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5439}
5440
5441int SqliteHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5442{
5443 if (fDebug)
5444 printf("SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name, TimeToString(timestamp).c_str());
5445
5446 int status = StartSqlTransaction(fSql, table_name, have_transaction);
5447 if (status != HS_SUCCESS)
5448 return status;
5449
5450 // FIXME: quotes
5451 std::string cmd;
5452 cmd = "INSERT INTO \'_column_names_";
5453 cmd += table_name;
5454 cmd += "\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5455 cmd += table_name;
5456 cmd += "\', \'";
5457 cmd += column_name;
5458 cmd += "\', \'";
5459 cmd += tag_name;
5460 cmd += "\', \'";
5461 cmd += tag_type;
5462 cmd += "\', \'";
5463 cmd += column_type;
5464 cmd += "\', \'";
5465 cmd += TimeToString(timestamp);
5466 cmd += "\');";
5467 status = fSql->Exec(table_name, cmd.c_str());
5468
5469 return status;
5470}
5471
5473// Mysql history classes //
5475
5477{
5478public:
5479 MysqlHistory() { // ctor
5480#ifdef HAVE_MYSQL
5481 fSql = new Mysql();
5482#endif
5483 }
5484
5486 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5487 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5488 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5489};
5490
5491static int ReadMysqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5492{
5493 if (debug)
5494 printf("ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5495
5496 int status;
5497 std::string cmd;
5498
5499 if (table_name) {
5500 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5501 cmd += table_name;
5502 cmd += "';";
5503 } else {
5504 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5505 table_name = "_history_index";
5506 }
5507
5508 status = sql->Prepare(table_name, cmd.c_str());
5509
5510 if (status != DB_SUCCESS)
5511 return status;
5512
5513 bool found_must_have_table = false;
5514 int count = 0;
5515
5516 while (1) {
5517 status = sql->Step();
5518
5519 if (status != DB_SUCCESS)
5520 break;
5521
5522 const char* xevent_name = sql->GetText(0);
5523 const char* xtable_name = sql->GetText(1);
5524 time_t xevent_time = sql->GetTime(2);
5525
5526 if (debug == 999) {
5527 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5528 }
5529
5531 assert(must_have_event_name != NULL);
5533 found_must_have_table = true;
5534 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5535 } else {
5536 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5537 }
5538 }
5539
5540 HsSqlSchema* s = new HsSqlSchema;
5541 s->fSql = sql;
5544 s->fTimeTo = 0;
5546 sv->add(s);
5547 count++;
5548 }
5549
5550 status = sql->Finalize();
5551
5553 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5554 if (debug == 999)
5555 return HS_FILE_ERROR;
5556 // NB: recursion is broken by setting debug to 999.
5558 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5560 abort();
5561 return HS_FILE_ERROR;
5562 }
5563
5564 if (0) {
5565 // print accumulated schema
5566 printf("ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5567 sv->print(false);
5568 }
5569
5570 return HS_SUCCESS;
5571}
5572
5573int MysqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5574{
5575 if (fDebug)
5576 printf("MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5577
5578 // for all schema for table_name, prepopulate is with column names
5579
5580 std::vector<std::string> columns;
5581 fSql->ListColumns(table_name, &columns);
5582
5583 // first, populate column names
5584
5585 for (unsigned i=0; i<sv->size(); i++) {
5586 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5587
5588 if (s->fTableName != table_name)
5589 continue;
5590
5591 // schema should be empty at this point
5592 //assert(s->fVariables.size() == 0);
5593
5594 for (unsigned j=0; j<columns.size(); j+=2) {
5595 const char* cn = columns[j+0].c_str();
5596 const char* ct = columns[j+1].c_str();
5597
5598 if (strcmp(cn, "_t_time") == 0)
5599 continue;
5600 if (strcmp(cn, "_i_time") == 0)
5601 continue;
5602
5603 bool found = false;
5604
5605 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
5606 if (s->fColumnNames[k] == cn) {
5607 found = true;
5608 break;
5609 }
5610 }
5611
5612 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5613
5614 if (!found) {
5616 se.tag_name = cn;
5617 se.tag_type = "";
5618 se.name = cn;
5619 se.type = 0;
5620 se.n_data = 1;
5621 se.n_bytes = 0;
5622 s->fVariables.push_back(se);
5623 s->fColumnNames.push_back(cn);
5624 s->fColumnTypes.push_back(ct);
5625 s->fColumnInactive.push_back(false);
5626 s->fOffsets.push_back(-1);
5627 }
5628 }
5629 }
5630
5631 // then read column name information
5632
5633 std::string cmd;
5634 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5635 cmd += fSql->QuoteString(event_name);
5636 cmd += ";";
5637
5638 int status = fSql->Prepare(table_name, cmd.c_str());
5639
5640 if (status != DB_SUCCESS) {
5641 return status;
5642 }
5643
5644 while (1) {
5645 status = fSql->Step();
5646
5647 if (status != DB_SUCCESS)
5648 break;
5649
5650 const char* col_name = fSql->GetText(0);
5651 const char* col_type = fSql->GetText(1);
5652 const char* tag_name = fSql->GetText(2);
5653 const char* tag_type = fSql->GetText(3);
5655 const char* active = fSql->GetText(5);
5656 int iactive = atoi(active);
5657
5658 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
5659
5660 if (!col_name)
5661 continue;
5662 if (!tag_name)
5663 continue;
5664 if (strlen(col_name) < 1)
5665 continue;
5666
5667 // make sure a schema exists at this time point
5668 NewSqlSchema(sv, table_name, schema_time);
5669
5670 // add this information to all schema
5671
5672 for (unsigned i=0; i<sv->size(); i++) {
5673 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5674 if (s->fTableName != table_name)
5675 continue;
5676 if (s->fTimeFrom < schema_time)
5677 continue;
5678
5679 int tid = rpc_name_tid(tag_type);
5680 int tid_size = rpc_tid_size(tid);
5681
5682 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
5683 if (col_name != s->fColumnNames[j])
5684 continue;
5685
5686 s->fVariables[j].tag_name = tag_name;
5687 s->fVariables[j].tag_type = tag_type;
5688 if (!iactive) {
5689 s->fVariables[j].name = "";
5690 s->fColumnInactive[j] = true;
5691 } else {
5692 s->fVariables[j].name = tag_name;
5693 s->fColumnInactive[j] = false;
5694 }
5695 s->fVariables[j].type = tid;
5696 s->fVariables[j].n_data = 1;
5697 s->fVariables[j].n_bytes = tid_size;
5698
5699 // doctor column names in case MySQL returns different type
5700 // from the type used to create the column, but the types
5701 // are actually the same. K.O.
5703 }
5704 }
5705 }
5706
5707 status = fSql->Finalize();
5708
5709 return HS_SUCCESS;
5710}
5711
5712#if 0
5713static int ReadMysqlTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5714{
5715 if (debug)
5716 printf("ReadMysqlTableSchema: table [%s]\n", table_name);
5717
5718 if (1) {
5719 // seed schema with table names
5720 HsSqlSchema* s = new HsSqlSchema;
5721 s->fSql = sql;
5722 s->fEventName = table_name;
5723 s->fTimeFrom = 0;
5724 s->fTimeTo = 0;
5725 s->fTableName = table_name;
5726 sv->add(s);
5727 }
5728
5729 return ReadMysqlTableNames(sql, sv, table_name, debug, NULL, NULL);
5730}
5731#endif
5732
5734{
5735 int status;
5736
5737 if (fDebug)
5738 printf("MysqlHistory::read_table_and_event_names!\n");
5739
5740 // loop over all tables
5741
5742 std::vector<std::string> tables;
5744 if (status != DB_SUCCESS)
5745 return status;
5746
5747 for (unsigned i=0; i<tables.size(); i++) {
5748 const char* table_name = tables[i].c_str();
5749
5750 const char* s;
5751 s = strstr(table_name, "_history_index");
5752 if (s == table_name)
5753 continue;
5754
5755 if (1) {
5756 // seed schema with table names
5757 HsSqlSchema* s = new HsSqlSchema;
5758 s->fSql = fSql;
5759 s->fEventName = table_name;
5760 s->fTimeFrom = 0;
5761 s->fTimeTo = 0;
5762 s->fTableName = table_name;
5763 sv->add(s);
5764 }
5765 }
5766
5767 if (0) {
5768 // print accumulated schema
5769 printf("read_table_and_event_names:\n");
5770 sv->print(false);
5771 }
5772
5774
5775 return HS_SUCCESS;
5776}
5777
5778int MysqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5779{
5780 if (fDebug)
5781 printf("MysqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5782
5783 int status;
5784 std::string table_name = MidasNameToSqlName(event_name);
5785
5786 // MySQL table name length limit is 64 bytes
5787 if (table_name.length() > 40) {
5788 table_name.resize(40);
5789 table_name += "_T";
5790 }
5791
5792 time_t now = time(NULL);
5793
5794 int max_attempts = 10;
5795 for (int i=0; i<max_attempts; i++) {
5796 status = fSql->OpenTransaction(table_name.c_str());
5797 if (status != DB_SUCCESS) {
5798 return HS_FILE_ERROR;
5799 }
5800
5801 bool have_transaction = true;
5802
5803 std::string xtable_name = table_name;
5804
5805 if (i>0) {
5806 xtable_name += "_";
5808 if (i>1) {
5809 xtable_name += "_";
5810 char buf[256];
5811 sprintf(buf, "%d", i);
5812 xtable_name += buf;
5813 }
5814 }
5815
5817
5818 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
5819
5820 if (status == DB_KEY_EXIST) {
5821 // already exists, try with different name!
5822 fSql->RollbackTransaction(table_name.c_str());
5823 continue;
5824 }
5825
5826 if (status != HS_SUCCESS) {
5827 // MYSQL cannot roll back "create table", if we cannot create SQL tables, nothing will work. Give up now.
5828 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name, TimeToString(timestamp).c_str());
5829 abort();
5830
5831 // fatal error, give up!
5832 fSql->RollbackTransaction(table_name.c_str());
5833 break;
5834 }
5835
5836 for (int j=0; j<2; j++) {
5837 std::string cmd;
5838 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5839 cmd += fSql->QuoteString(event_name);
5840 cmd += ", ";
5841 cmd += fSql->QuoteString(xtable_name.c_str());
5842 cmd += ", ";
5843 char buf[256];
5844 sprintf(buf, "%.0f", (double)timestamp);
5845 cmd += fSql->QuoteString(buf);
5846 cmd += ", ";
5847 cmd += fSql->QuoteString("1");
5848 cmd += ");";
5849
5850 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
5851 if (status == DB_SUCCESS)
5852 break;
5853
5854 status = CreateSqlTable(fSql, "_history_index", &have_transaction);
5855 status = CreateSqlColumn(fSql, "_history_index", "event_name", "varchar(256) character set binary not null", &have_transaction, fDebug);
5856 status = CreateSqlColumn(fSql, "_history_index", "table_name", "varchar(256)", &have_transaction, fDebug);
5857 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "varchar(256) character set binary", &have_transaction, fDebug);
5858 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "varchar(256)", &have_transaction, fDebug);
5859 status = CreateSqlColumn(fSql, "_history_index", "column_name", "varchar(256)", &have_transaction, fDebug);
5860 status = CreateSqlColumn(fSql, "_history_index", "column_type", "varchar(256)", &have_transaction, fDebug);
5861 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
5862 status = CreateSqlColumn(fSql, "_history_index", "active", "boolean", &have_transaction, fDebug);
5863 }
5864
5865 status = fSql->CommitTransaction(table_name.c_str());
5866
5867 if (status != DB_SUCCESS) {
5868 return HS_FILE_ERROR;
5869 }
5870
5871 return ReadMysqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
5872 }
5873
5874 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
5875
5876 return HS_FILE_ERROR;
5877}
5878
5879int MysqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5880{
5881 if (fDebug)
5882 printf("MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
5883
5884 std::string cmd;
5885 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5886 cmd += fSql->QuoteString(event_name);
5887 cmd += ", ";
5888 cmd += fSql->QuoteString(table_name);
5889 cmd += ", ";
5890 cmd += fSql->QuoteString(tag_name);
5891 cmd += ", ";
5892 cmd += fSql->QuoteString(tag_type);
5893 cmd += ", ";
5894 cmd += fSql->QuoteString(column_name);
5895 cmd += ", ";
5896 cmd += fSql->QuoteString(column_type);
5897 cmd += ", ";
5898 char buf[256];
5899 sprintf(buf, "%.0f", (double)timestamp);
5900 cmd += fSql->QuoteString(buf);
5901 cmd += ", ";
5902 if (active)
5903 cmd += fSql->QuoteString("1");
5904 else
5905 cmd += fSql->QuoteString("0");
5906 cmd += ");";
5907
5908 int status = fSql->Exec(table_name, cmd.c_str());
5909 if (status != DB_SUCCESS)
5910 return HS_FILE_ERROR;
5911
5912 return HS_SUCCESS;
5913}
5914
5916// PostgreSQL history classes //
5918
5919#ifdef HAVE_PGSQL
5920
5921class PgsqlHistory: public SqlHistoryBase
5922{
5923public:
5924 Pgsql *fPgsql = NULL;
5925public:
5926 PgsqlHistory() { // ctor
5927 fPgsql = new Pgsql();
5928 fSql = fPgsql;
5929 }
5930
5932 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5933 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5934 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5935};
5936
5937static int ReadPgsqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5938{
5939 if (debug)
5940 printf("ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5941
5942 int status;
5943 std::string cmd;
5944
5945 if (table_name) {
5946 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5947 cmd += table_name;
5948 cmd += "';";
5949 } else {
5950 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5951 table_name = "_history_index";
5952 }
5953
5954 status = sql->Prepare(table_name, cmd.c_str());
5955
5956 if (status != DB_SUCCESS)
5957 return status;
5958
5959 bool found_must_have_table = false;
5960 int count = 0;
5961
5962 while (1) {
5963 status = sql->Step();
5964
5965 if (status != DB_SUCCESS)
5966 break;
5967
5968 const char* xevent_name = sql->GetText(0);
5969 const char* xtable_name = sql->GetText(1);
5970 time_t xevent_time = sql->GetTime(2);
5971
5972 if (debug == 999) {
5973 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5974 }
5975
5977 assert(must_have_event_name != NULL);
5979 found_must_have_table = true;
5980 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5981 } else {
5982 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5983 }
5984 }
5985
5986 HsSqlSchema* s = new HsSqlSchema;
5987 s->fSql = sql;
5990 s->fTimeTo = 0;
5992 sv->add(s);
5993 count++;
5994 }
5995
5996 status = sql->Finalize();
5997
5999 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
6000 if (debug == 999)
6001 return HS_FILE_ERROR;
6002 // NB: recursion is broken by setting debug to 999.
6004 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
6006 abort();
6007 return HS_FILE_ERROR;
6008 }
6009
6010 if (0) {
6011 // print accumulated schema
6012 printf("ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
6013 sv->print(false);
6014 }
6015
6016 return HS_SUCCESS;
6017}
6018
6019int PgsqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
6020{
6021 if (fDebug)
6022 printf("PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
6023
6024 // for all schema for table_name, prepopulate is with column names
6025
6026 std::vector<std::string> columns;
6027 fSql->ListColumns(table_name, &columns);
6028
6029 // first, populate column names
6030
6031 for (unsigned i=0; i<sv->size(); i++) {
6032 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6033
6034 if (s->fTableName != table_name)
6035 continue;
6036
6037 // schema should be empty at this point
6038 //assert(s->fVariables.size() == 0);
6039
6040 for (unsigned j=0; j<columns.size(); j+=2) {
6041 const char* cn = columns[j+0].c_str();
6042 const char* ct = columns[j+1].c_str();
6043
6044 if (strcmp(cn, "_t_time") == 0)
6045 continue;
6046 if (strcmp(cn, "_i_time") == 0)
6047 continue;
6048
6049 bool found = false;
6050
6051 for (unsigned k=0; k<s->fColumnNames.size(); k++) {
6052 if (s->fColumnNames[k] == cn) {
6053 found = true;
6054 break;
6055 }
6056 }
6057
6058 if (!found) {
6060 se.tag_name = cn;
6061 se.tag_type = "";
6062 se.name = cn;
6063 se.type = 0;
6064 se.n_data = 1;
6065 se.n_bytes = 0;
6066 s->fVariables.push_back(se);
6067 s->fColumnNames.push_back(cn);
6068 s->fColumnTypes.push_back(ct);
6069 s->fColumnInactive.push_back(false);
6070 s->fOffsets.push_back(-1);
6071 }
6072 }
6073 }
6074
6075 // then read column name information
6076
6077 std::string cmd;
6078 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6079 cmd += fSql->QuoteString(event_name);
6080 cmd += ";";
6081
6082 int status = fSql->Prepare(table_name, cmd.c_str());
6083
6084 if (status != DB_SUCCESS) {
6085 return status;
6086 }
6087
6088 while (1) {
6089 status = fSql->Step();
6090
6091 if (status != DB_SUCCESS)
6092 break;
6093
6094 const char* col_name = fSql->GetText(0);
6095 const char* col_type = fSql->GetText(1);
6096 const char* tag_name = fSql->GetText(2);
6097 const char* tag_type = fSql->GetText(3);
6098 time_t schema_time = fSql->GetTime(4);
6099 const char* active = fSql->GetText(5);
6100 int iactive = atoi(active);
6101
6102 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
6103
6104 if (!col_name)
6105 continue;
6106 if (!tag_name)
6107 continue;
6108 if (strlen(col_name) < 1)
6109 continue;
6110
6111 // make sure a schema exists at this time point
6112 NewSqlSchema(sv, table_name, schema_time);
6113
6114 // add this information to all schema
6115 for (unsigned i=0; i<sv->size(); i++) {
6116 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6117 if (s->fTableName != table_name)
6118 continue;
6119 if (s->fTimeFrom < schema_time)
6120 continue;
6121
6122 int tid = rpc_name_tid(tag_type);
6123 int tid_size = rpc_tid_size(tid);
6124
6125 for (unsigned j=0; j<s->fColumnNames.size(); j++) {
6126 if (col_name != s->fColumnNames[j])
6127 continue;
6128
6129 s->fVariables[j].tag_name = tag_name;
6130 s->fVariables[j].tag_type = tag_type;
6131 if (!iactive) {
6132 s->fVariables[j].name = "";
6133 s->fColumnInactive[j] = true;
6134 } else {
6135 s->fVariables[j].name = tag_name;
6136 s->fColumnInactive[j] = false;
6137 }
6138 s->fVariables[j].type = tid;
6139 s->fVariables[j].n_data = 1;
6140 s->fVariables[j].n_bytes = tid_size;
6141
6142 // doctor column names in case MySQL returns different type
6143 // from the type used to create the column, but the types
6144 // are actually the same. K.O.
6146 }
6147 }
6148 }
6149
6150 status = fSql->Finalize();
6151
6152 return HS_SUCCESS;
6153}
6154
6155int PgsqlHistory::read_table_and_event_names(HsSchemaVector *sv)
6156{
6157 int status;
6158
6159 if (fDebug)
6160 printf("PgsqlHistory::read_table_and_event_names!\n");
6161
6162 // loop over all tables
6163
6164 std::vector<std::string> tables;
6165 status = fSql->ListTables(&tables);
6166 if (status != DB_SUCCESS)
6167 return status;
6168
6169 for (unsigned i=0; i<tables.size(); i++) {
6170 const char* table_name = tables[i].c_str();
6171
6172 const char* s;
6173 s = strstr(table_name, "_history_index");
6174 if (s == table_name)
6175 continue;
6176
6177 if (1) {
6178 // seed schema with table names
6179 HsSqlSchema* s = new HsSqlSchema;
6180 s->fSql = fSql;
6181 s->fEventName = table_name;
6182 s->fTimeFrom = 0;
6183 s->fTimeTo = 0;
6184 s->fTableName = table_name;
6185 sv->add(s);
6186 }
6187 }
6188
6189 if (0) {
6190 // print accumulated schema
6191 printf("read_table_and_event_names:\n");
6192 sv->print(false);
6193 }
6194
6195 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6196
6197 return HS_SUCCESS;
6198}
6199
6200int PgsqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
6201{
6202 if (fDebug)
6203 printf("PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
6204
6205 int status;
6206 std::string table_name = MidasNameToSqlName(event_name);
6207
6208 // PostgreSQL table name length limit is 64 bytes
6209 if (table_name.length() > 40) {
6210 table_name.resize(40);
6211 table_name += "_T";
6212 }
6213
6214 time_t now = time(NULL);
6215
6216 int max_attempts = 10;
6217 for (int i=0; i<max_attempts; i++) {
6218 status = fSql->OpenTransaction(table_name.c_str());
6219 if (status != DB_SUCCESS) {
6220 return HS_FILE_ERROR;
6221 }
6222
6223 bool have_transaction = true;
6224
6225 std::string xtable_name = table_name;
6226
6227 if (i>0) {
6228 xtable_name += "_";
6230 if (i>1) {
6231 xtable_name += "_";
6232 char buf[256];
6233 sprintf(buf, "%d", i);
6234 xtable_name += buf;
6235 }
6236 }
6237
6238 if (fPgsql->fDownsample)
6240 else
6242
6243 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
6244
6245 if (status == DB_KEY_EXIST) {
6246 // already exists, try with different name!
6247 fSql->RollbackTransaction(table_name.c_str());
6248 continue;
6249 }
6250
6251 if (status != HS_SUCCESS) {
6252 fSql->RollbackTransaction(table_name.c_str());
6253 continue;
6254 }
6255
6256 fSql->Exec(table_name.c_str(), "SAVEPOINT t0");
6257
6258 for (int j=0; j<2; j++) {
6259 std::string cmd;
6260 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6261 cmd += fSql->QuoteString(event_name);
6262 cmd += ", ";
6263 cmd += fSql->QuoteString(xtable_name.c_str());
6264 cmd += ", ";
6265 char buf[256];
6266 sprintf(buf, "%.0f", (double)timestamp);
6267 cmd += buf;
6268 cmd += ", ";
6269 cmd += fSql->QuoteString("1");
6270 cmd += ");";
6271
6272 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6273 if (status == DB_SUCCESS)
6274 break;
6275
6276 // if INSERT failed _history_index does not exist then recover to savepoint t0
6277 // to prevent whole transition abort
6278 fSql->Exec(table_name.c_str(), "ROLLBACK TO SAVEPOINT t0");
6279
6280 status = CreateSqlTable(fSql, "_history_index", &have_transaction, true);
6281 status = CreateSqlColumn(fSql, "_history_index", "event_name", "text not null", &have_transaction, fDebug);
6282 status = CreateSqlColumn(fSql, "_history_index", "table_name", "text", &have_transaction, fDebug);
6283 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "text", &have_transaction, fDebug);
6284 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "text", &have_transaction, fDebug);
6285 status = CreateSqlColumn(fSql, "_history_index", "column_name", "text", &have_transaction, fDebug);
6286 status = CreateSqlColumn(fSql, "_history_index", "column_type", "text", &have_transaction, fDebug);
6287 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
6288 status = CreateSqlColumn(fSql, "_history_index", "active", "smallint", &have_transaction, fDebug);
6289
6290 status = fSql->CommitTransaction(table_name.c_str());
6291 }
6292
6293 if (status != DB_SUCCESS) {
6294 return HS_FILE_ERROR;
6295 }
6296
6297 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6298 }
6299
6300 cm_msg(MERROR, "PgsqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
6301
6302 return HS_FILE_ERROR;
6303}
6304
6305int PgsqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
6306{
6307 if (fDebug)
6308 printf("PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
6309
6310 std::string cmd;
6311 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6312 cmd += fSql->QuoteString(event_name);
6313 cmd += ", ";
6314 cmd += fSql->QuoteString(table_name);
6315 cmd += ", ";
6316 cmd += fSql->QuoteString(tag_name);
6317 cmd += ", ";
6318 cmd += fSql->QuoteString(tag_type);
6319 cmd += ", ";
6320 cmd += fSql->QuoteString(column_name);
6321 cmd += ", ";
6322 cmd += fSql->QuoteString(column_type);
6323 cmd += ", ";
6324 char buf[256];
6325 sprintf(buf, "%.0f", (double)timestamp);
6326 cmd += buf;
6327 cmd += ", ";
6328 if (active)
6329 cmd += fSql->QuoteString("1");
6330 else
6331 cmd += fSql->QuoteString("0");
6332 cmd += ");";
6333
6334 int status = fSql->Exec(table_name, cmd.c_str());
6335 if (status != DB_SUCCESS)
6336 return HS_FILE_ERROR;
6337
6338 return HS_SUCCESS;
6339}
6340
6341#endif // HAVE_PGSQL
6342
6344// File history class //
6346
6347const time_t kDay = 24*60*60;
6348const time_t kMonth = 30*kDay;
6349
6350const double KiB = 1024;
6351const double MiB = KiB*KiB;
6352//const double GiB = KiB*MiB;
6353
6355{
6356protected:
6357 std::string fPath;
6359 std::vector<std::string> fSortedFiles;
6360 std::vector<bool> fSortedRead;
6363
6364public:
6365 FileHistory() // ctor
6366 {
6368 fConfMaxFileSize = 100*MiB;
6369
6370 fPathLastMtime = 0;
6371 }
6372
6373 int hs_connect(const char* connect_string);
6374 int hs_disconnect();
6375 int hs_clear_cache();
6376 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
6377 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
6378
6379protected:
6380 int create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep);
6381 HsFileSchema* read_file_schema(const char* filename);
6382 int read_file_list(bool *pchanged);
6383 void clear_file_list();
6384};
6385
6387{
6388 if (fDebug)
6389 printf("hs_connect [%s]!\n", connect_string);
6390
6391 hs_disconnect();
6392
6395
6396 // add trailing '/'
6397 if (fPath.length() > 0) {
6398 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
6400 }
6401
6402 return HS_SUCCESS;
6403}
6404
6406{
6407 if (fDebug)
6408 printf("FileHistory::hs_clear_cache!\n");
6409 fPathLastMtime = 0;
6411}
6412
6414{
6415 if (fDebug)
6416 printf("FileHistory::hs_disconnect!\n");
6417
6420
6421 return HS_SUCCESS;
6422}
6423
6425{
6426 fPathLastMtime = 0;
6427 fSortedFiles.clear();
6428 fSortedRead.clear();
6429}
6430
6432{
6433 int status;
6434 double start_time = ss_time_sec();
6435
6436 if (pchanged)
6437 *pchanged = false;
6438
6439 struct stat stat_buf;
6440 status = stat(fPath.c_str(), &stat_buf);
6441 if (status != 0) {
6442 cm_msg(MERROR, "FileHistory::read_file_list", "Cannot stat(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
6443 return HS_FILE_ERROR;
6444 }
6445
6446 //printf("dir [%s], mtime: %d %d last: %d %d, mtime %s", fPath.c_str(), stat_buf.st_mtimespec.tv_sec, stat_buf.st_mtimespec.tv_nsec, last_mtimespec.tv_sec, last_mtimespec.tv_nsec, ctime(&stat_buf.st_mtimespec.tv_sec));
6447
6448 if (stat_buf.st_mtime == fPathLastMtime) {
6449 if (fDebug)
6450 printf("FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n", fPath.c_str(), int(stat_buf.st_mtime));
6451 return HS_SUCCESS;
6452 }
6453
6454 fPathLastMtime = stat_buf.st_mtime;
6455
6456 if (fDebug)
6457 printf("FileHistory::read_file_list: reading list of history files in \"%s\"\n", fPath.c_str());
6458
6459 std::vector<std::string> flist;
6460
6461 ss_file_find(fPath.c_str(), "mhf_*.dat", &flist);
6462
6463 double ls_time = ss_time_sec();
6464 double ls_elapsed = ls_time - start_time;
6465 if (ls_elapsed > 5.000) {
6466 cm_msg(MINFO, "FileHistory::read_file_list", "\"ls -l\" of \"%s\" took %.1f sec", fPath.c_str(), ls_elapsed);
6468 }
6469
6470 // note: reverse iterator is used to sort filenames by time, newest first
6471 std::sort(flist.rbegin(), flist.rend());
6472
6473#if 0
6474 {
6475 printf("file names sorted by time:\n");
6476 for (unsigned i=0; i<flist.size(); i++) {
6477 printf("%d: %s\n", i, flist[i].c_str());
6478 }
6479 }
6480#endif
6481
6482 std::vector<bool> fread;
6483 fread.resize(flist.size()); // fill with "false"
6484
6485 // loop over the old list of files,
6486 // for files we already read, loop over new file
6487 // list and mark the same file as read. K.O.
6488 for (size_t j=0; j<fSortedFiles.size(); j++) {
6489 if (fSortedRead[j]) {
6490 for (size_t i=0; i<flist.size(); i++) {
6491 if (flist[i] == fSortedFiles[j]) {
6492 fread[i] = true;
6493 break;
6494 }
6495 }
6496 }
6497 }
6498
6501
6502 if (pchanged)
6503 *pchanged = true;
6504
6505 return HS_SUCCESS;
6506}
6507
6508int FileHistory::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
6509{
6510 if (fDebug)
6511 printf("FileHistory::read_schema: event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
6512
6513 if (fSchema.size() == 0) {
6514 if (fDebug)
6515 printf("FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6517 }
6518
6520 DWORD old_timeout = 0;
6523
6524 bool changed = false;
6525
6527
6528 if (status != HS_SUCCESS) {
6530 return status;
6531 }
6532
6533 if (!changed) {
6534 if ((*sv).find_event(event_name, timestamp)) {
6535 if (fDebug)
6536 printf("FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name, TimeToString(timestamp).c_str());
6538 return HS_SUCCESS;
6539 }
6540 }
6541
6542 double start_time = ss_time_sec();
6543
6544 int count_read = 0;
6545
6546 for (unsigned i=0; i<fSortedFiles.size(); i++) {
6547 std::string file_name = fPath + fSortedFiles[i];
6548 if (fSortedRead[i])
6549 continue;
6550 //bool dupe = false;
6551 //for (unsigned ss=0; ss<sv->size(); ss++) {
6552 // HsFileSchema* ssp = (HsFileSchema*)(*sv)[ss];
6553 // if (file_name == ssp->fFileName) {
6554 // dupe = true;
6555 // break;
6556 // }
6557 //}
6558 //if (dupe)
6559 // continue;
6560 fSortedRead[i] = true;
6562 if (!s)
6563 continue;
6564 sv->add(s);
6565 count_read++;
6566
6567 if (event_name) {
6568 if (s->fEventName == event_name) {
6569 //printf("file %s event_name %s time %s, age %f\n", file_name.c_str(), s->fEventName.c_str(), TimeToString(s->fTimeFrom).c_str(), double(timestamp - s->fTimeFrom));
6570 if (s->fTimeFrom <= timestamp) {
6571 // this file is older than the time requested,
6572 // subsequent files will be even older,
6573 // we can stop reading here.
6574 break;
6575 }
6576 }
6577 }
6578 }
6579
6580 double end_time = ss_time_sec();
6581 double read_elapsed = end_time - start_time;
6582 if (read_elapsed > 5.000) {
6583 cm_msg(MINFO, "FileHistory::read_schema", "Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name, TimeToString(timestamp).c_str(), count_read, read_elapsed);
6585 }
6586
6588
6589 return HS_SUCCESS;
6590}
6591
6592HsSchema* FileHistory::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
6593{
6594 if (fDebug)
6595 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
6596
6597 int status;
6598
6599 HsFileSchema* s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6600
6601 if (!s) {
6602 //printf("hs_define_event: no schema for event %s\n", event_name);
6603 status = read_schema(&fWriterCurrentSchema, event_name, timestamp);
6604 if (status != HS_SUCCESS)
6605 return NULL;
6606 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6607 } else {
6608 //printf("hs_define_event: already have schema for event %s\n", s->fEventName.c_str());
6609 }
6610
6611 bool xdebug = false;
6612
6613 if (s) { // is existing schema the same as new schema?
6614 bool same = true;
6615
6616 if (same)
6617 if (s->fEventName != event_name) {
6618 if (xdebug)
6619 printf("AAA: [%s] [%s]!\n", s->fEventName.c_str(), event_name);
6620 same = false;
6621 }
6622
6623 if (same)
6624 if (s->fVariables.size() != (unsigned)ntags) {
6625 if (xdebug)
6626 printf("BBB: event [%s]: ntags: %d -> %d!\n", event_name, (int)s->fVariables.size(), ntags);
6627 same = false;
6628 }
6629
6630 if (same)
6631 for (unsigned i=0; i<s->fVariables.size(); i++) {
6632 if (s->fVariables[i].name != tags[i].name) {
6633 if (xdebug)
6634 printf("CCC: event [%s] index %d: name [%s] -> [%s]!\n", event_name, i, s->fVariables[i].name.c_str(), tags[i].name);
6635 same = false;
6636 }
6637 if (s->fVariables[i].type != (int)tags[i].type) {
6638 if (xdebug)
6639 printf("DDD: event [%s] index %d: type %d -> %d!\n", event_name, i, s->fVariables[i].type, tags[i].type);
6640 same = false;
6641 }
6642 if (s->fVariables[i].n_data != (int)tags[i].n_data) {
6643 if (xdebug)
6644 printf("EEE: event [%s] index %d: n_data %d -> %d!\n", event_name, i, s->fVariables[i].n_data, tags[i].n_data);
6645 same = false;
6646 }
6647 if (!same)
6648 break;
6649 }
6650
6651 if (!same) {
6652 if (xdebug) {
6653 printf("*** Schema for event %s has changed!\n", event_name);
6654
6655 printf("*** Old schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6656 s->print();
6657 printf("*** New tags:\n");
6658 PrintTags(ntags, tags);
6659 }
6660
6661 if (fDebug)
6662 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags);
6663
6664 s = NULL;
6665 }
6666 }
6667
6668 if (s) {
6669 // maybe this schema is too old - rotate files every so often
6670 time_t age = timestamp - s->fTimeFrom;
6671 //printf("*** age %s (%.1f months), timestamp %s, time_from %s, file %s\n", TimeToString(age).c_str(), (double)age/(double)kMonth, TimeToString(timestamp).c_str(), TimeToString(s->fTimeFrom).c_str(), s->fFileName.c_str());
6672 if (age > fConfMaxFileAge) {
6673 if (fDebug)
6674 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, (double)age/(double)kMonth);
6675
6676 // force creation of a new file
6677 s = NULL;
6678 }
6679 }
6680
6681 if (s) {
6682 // maybe this file is too big - rotate files to limit maximum size
6683 double size = ss_file_size(s->fFileName.c_str());
6684 //printf("*** size %.0f, file %s\n", size, s->fFileName.c_str());
6685 if (size > fConfMaxFileSize) {
6686 if (fDebug)
6687 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, size/MiB);
6688
6689 // force creation of a new file
6690 s = NULL;
6691 }
6692 }
6693
6694 if (!s) {
6695 std::string filename;
6696
6697 status = create_file(event_name, timestamp, ntags, tags, &filename);
6698 if (status != HS_SUCCESS)
6699 return NULL;
6700
6701 HsFileSchema* ss = read_file_schema(filename.c_str());
6702 if (!ss) {
6703 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6704 return NULL;
6705 }
6706
6708
6709 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6710
6711 if (!s) {
6712 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6713 return NULL;
6714 }
6715
6716 if (xdebug) {
6717 printf("*** New schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6718 s->print();
6719 }
6720 }
6721
6722 assert(s != NULL);
6723
6724#if 0
6725 {
6726 printf("schema for [%s] is %p\n", event_name, s);
6727 if (s)
6728 s->print();
6729 }
6730#endif
6731
6732 HsFileSchema* e = new HsFileSchema();
6733
6734 *e = *s; // make a copy of the schema
6735
6736 return e;
6737}
6738
6739int FileHistory::create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep)
6740{
6741 if (fDebug)
6742 printf("FileHistory::create_file: event [%s]\n", event_name);
6743
6744 // NB: file names are constructed in such a way
6745 // that when sorted lexicographically ("ls -1 | sort")
6746 // they *also* become sorted by time
6747
6748 struct tm tm;
6749 localtime_r(&timestamp, &tm); // somebody must call tzset() before this.
6750
6751 char buf[256];
6752 strftime(buf, sizeof(buf), "%Y%m%d", &tm);
6753
6754 std::string filename;
6755 filename += fPath;
6756 filename += "mhf_";
6757 filename += TimeToString(timestamp);
6758 filename += "_";
6759 filename += buf;
6760 filename += "_";
6761 filename += MidasNameToFileName(event_name);
6762
6763 std::string try_filename = filename + ".dat";
6764
6765 FILE *fp = NULL;
6766 for (int itry=0; itry<10; itry++) {
6767 if (itry > 0) {
6768 char s[256];
6769 sprintf(s, "_%d", rand());
6770 try_filename = filename + s + ".dat";
6771 }
6772
6773 fp = fopen(try_filename.c_str(), "r");
6774 if (fp != NULL) {
6775 // this file already exists, try with a different name
6776 fclose(fp);
6777 continue;
6778 }
6779
6780 fp = fopen(try_filename.c_str(), "w");
6781 if (fp == NULL) {
6782 // somehow cannot create this file, try again
6783 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6784 continue;
6785 }
6786
6787 // file opened
6788 break;
6789 }
6790
6791 if (fp == NULL) {
6792 // somehow cannot create any file, whine!
6793 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6794 return HS_FILE_ERROR;
6795 }
6796
6797 std::string ss;
6798
6799 ss += "version: 2.0\n";
6800 ss += "event_name: ";
6801 ss += event_name;
6802 ss += "\n";
6803 ss += "time: ";
6804 ss += TimeToString(timestamp);
6805 ss += "\n";
6806
6807 int recsize = 0;
6808
6809 ss += "tag: /DWORD 1 4 /timestamp\n";
6810 recsize += 4;
6811
6812 bool padded = false;
6813 int offset = 0;
6814
6815 bool xdebug = false; // (strcmp(event_name, "u_Beam") == 0);
6816
6817 for (int i=0; i<ntags; i++) {
6818 int tsize = rpc_tid_size(tags[i].type);
6819 int n_bytes = tags[i].n_data*tsize;
6820 int xalign = (offset % tsize);
6821
6822 if (xdebug)
6823 printf("tag %d, tsize %d, n_bytes %d, xalign %d\n", i, tsize, n_bytes, xalign);
6824
6825#if 0
6826 // looks like history data does not do alignement and padding
6827 if (xalign != 0) {
6828 padded = true;
6829 int pad_bytes = tsize - xalign;
6830 assert(pad_bytes > 0);
6831
6832 ss += "tag: ";
6833 ss += "XPAD";
6834 ss += " ";
6835 ss += SmallIntToString(1);
6836 ss += " ";
6838 ss += " ";
6839 ss += "pad_";
6840 ss += SmallIntToString(i);
6841 ss += "\n";
6842
6843 offset += pad_bytes;
6844 recsize += pad_bytes;
6845
6846 assert((offset % tsize) == 0);
6847 fprintf(stderr, "FIXME: need to debug padding!\n");
6848 //abort();
6849 }
6850#endif
6851
6852 ss += "tag: ";
6853 ss += rpc_tid_name(tags[i].type);
6854 ss += " ";
6855 ss += SmallIntToString(tags[i].n_data);
6856 ss += " ";
6857 ss += SmallIntToString(n_bytes);
6858 ss += " ";
6859 ss += tags[i].name;
6860 ss += "\n";
6861
6862 recsize += n_bytes;
6863 offset += n_bytes;
6864 }
6865
6866 ss += "record_size: ";
6868 ss += "\n";
6869
6870 // reserve space for "data_offset: ..."
6871 int sslength = ss.length() + 127;
6872
6873 int block = 1024;
6874 int nb = (sslength + block - 1)/block;
6875 int data_offset = block * nb;
6876
6877 ss += "data_offset: ";
6879 ss += "\n";
6880
6881 fprintf(fp, "%s", ss.c_str());
6882
6883 fclose(fp);
6884
6885 if (1 && padded) {
6886 printf("Schema in file %s has padding:\n", try_filename.c_str());
6887 printf("%s", ss.c_str());
6888 }
6889
6890 if (filenamep)
6892
6893 return HS_SUCCESS;
6894}
6895
6897{
6898 if (fDebug)
6899 printf("FileHistory::read_file_schema: file %s\n", filename);
6900
6901 FILE* fp = fopen(filename, "r");
6902 if (!fp) {
6903 cm_msg(MERROR, "FileHistory::read_file_schema", "Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
6904 return NULL;
6905 }
6906
6907 HsFileSchema* s = NULL;
6908
6909 // File format looks like this:
6910 // version: 2.0
6911 // event_name: u_Beam
6912 // time: 1023174012
6913 // tag: /DWORD 1 4 /timestamp
6914 // tag: FLOAT 1 4 B1
6915 // ...
6916 // tag: FLOAT 1 4 Ref Heater
6917 // record_size: 84
6918 // data_offset: 1024
6919
6920 int rd_recsize = 0;
6921 int offset = 0;
6922
6923 while (1) {
6924 char buf[1024];
6925 char* b = fgets(buf, sizeof(buf), fp);
6926
6927 //printf("read: %s\n", b);
6928
6929 if (!b) {
6930 break; // end of file
6931 }
6932
6933 char*bb;
6934
6935 bb = strchr(b, '\n');
6936 if (bb)
6937 *bb = 0;
6938
6939 bb = strchr(b, '\r');
6940 if (bb)
6941 *bb = 0;
6942
6943 bb = strstr(b, "version: 2.0");
6944 if (bb == b) {
6945 s = new HsFileSchema();
6946 assert(s);
6947
6948 s->fFileName = filename;
6949 continue;
6950 }
6951
6952 if (!s) {
6953 // malformed history file
6954 break;
6955 }
6956
6957 bb = strstr(b, "event_name: ");
6958 if (bb == b) {
6959 s->fEventName = bb + 12;
6960 continue;
6961 }
6962
6963 bb = strstr(b, "time: ");
6964 if (bb == b) {
6965 s->fTimeFrom = strtoul(bb + 6, NULL, 10);
6966 continue;
6967 }
6968
6969 // tag format is like this:
6970 //
6971 // tag: FLOAT 1 4 Ref Heater
6972 //
6973 // "FLOAT" is the MIDAS type, "/DWORD" is special tag for the timestamp
6974 // "1" is the number of array elements
6975 // "4" is the total tag size in bytes (n_data*tid_size)
6976 // "Ref Heater" is the tag name
6977
6978 bb = strstr(b, "tag: ");
6979 if (bb == b) {
6980 bb += 5; // now points to the tag MIDAS type
6981 const char* midas_type = bb;
6982 char* bbb = strchr(bb, ' ');
6983 if (bbb) {
6984 *bbb = 0;
6985 HsSchemaEntry t;
6986 if (midas_type[0] == '/') {
6987 t.type = 0;
6988 } else {
6990 if (t.type == 0) {
6991 cm_msg(MERROR, "FileHistory::read_file_schema", "Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
6992 if (s)
6993 delete s;
6994 s = NULL;
6995 break;
6996 }
6997 }
6998 bbb++;
6999 while (*bbb == ' ')
7000 bbb++;
7001 if (*bbb) {
7002 t.n_data = strtoul(bbb, &bbb, 10);
7003 while (*bbb == ' ')
7004 bbb++;
7005 if (*bbb) {
7006 t.n_bytes = strtoul(bbb, &bbb, 10);
7007 while (*bbb == ' ')
7008 bbb++;
7009 t.name = bbb;
7010 }
7011 }
7012
7013 if (midas_type[0] != '/') {
7014 s->fVariables.push_back(t);
7015 s->fOffsets.push_back(offset);
7016 offset += t.n_bytes;
7017 }
7018
7019 rd_recsize += t.n_bytes;
7020 }
7021 continue;
7022 }
7023
7024 bb = strstr(b, "record_size: ");
7025 if (bb == b) {
7026 s->fRecordSize = atoi(bb + 12);
7027 continue;
7028 }
7029
7030 bb = strstr(b, "data_offset: ");
7031 if (bb == b) {
7032 s->fDataOffset = atoi(bb + 12);
7033 // data offset is the last entry in the file
7034 break;
7035 }
7036 }
7037
7038 fclose(fp);
7039
7040 if (!s) {
7041 cm_msg(MERROR, "FileHistory::read_file_schema", "Malformed history schema in \'%s\', maybe it is not a history file", filename);
7042 return NULL;
7043 }
7044
7045 if (rd_recsize != s->fRecordSize) {
7046 cm_msg(MERROR, "FileHistory::read_file_schema", "Record size mismatch in history schema from \'%s\', file says %d while total of all tags is %d", filename, s->fRecordSize, rd_recsize);
7047 if (s)
7048 delete s;
7049 return NULL;
7050 }
7051
7052 if (!s) {
7053 cm_msg(MERROR, "FileHistory::read_file_schema", "Could not read history schema from \'%s\', maybe it is not a history file", filename);
7054 if (s)
7055 delete s;
7056 return NULL;
7057 }
7058
7059 if (fDebug > 1)
7060 s->print();
7061
7062 return s;
7063}
7064
7066// Factory constructors //
7068
7070{
7071#ifdef HAVE_SQLITE
7072 return new SqliteHistory();
7073#else
7074 cm_msg(MERROR, "MakeMidasHistorySqlite", "Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7075 return NULL;
7076#endif
7077}
7078
7080{
7081#ifdef HAVE_MYSQL
7082 return new MysqlHistory();
7083#else
7084 cm_msg(MERROR, "MakeMidasHistoryMysql", "Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7085 return NULL;
7086#endif
7087}
7088
7090{
7091#ifdef HAVE_PGSQL
7092 return new PgsqlHistory();
7093#else
7094 cm_msg(MERROR, "MakeMidasHistoryPgsql", "Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
7095 return NULL;
7096#endif
7097}
7098
7103
7104/* emacs
7105 * Local Variables:
7106 * tab-width: 8
7107 * c-basic-offset: 3
7108 * indent-tabs-mode: nil
7109 * End:
7110 */
#define FALSE
Definition cfortran.h:309
std::string fPath
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int create_file(const char *event_name, time_t timestamp, int ntags, const TAG tags[], std::string *filenamep)
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
void remove_inactive_columns()
int write_event(const time_t t, const char *data, const int data_size)
std::string fFileName
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int fCountWriteUndersize
virtual void print(bool print_tags=true) const
std::vector< int > fOffsets
virtual ~HsSchema()
virtual void remove_inactive_columns()=0
std::vector< HsSchemaEntry > fVariables
virtual int write_event(const time_t t, const char *data, const int data_size)=0
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
virtual int close()=0
int fCountWriteOversize
std::string fEventName
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
unsigned size() const
void print(bool print_tags=true) const
std::vector< HsSchema * > fData
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
void add(HsSchema *s)
HsSchema * operator[](int index) const
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
std::vector< std::string > fColumnNames
void print(bool print_tags=true) const
void remove_inactive_columns()
std::vector< std::string > fColumnTypes
std::vector< bool > fColumnInactive
void increment_transaction_count()
int match_event_var(const char *event_name, const char *var_name, const int var_index)
std::string fTableName
static std::map< SqlBase *, int > gfTransactionCount
int write_event(const time_t t, const char *data, const int data_size)
void reset_transaction_count()
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
Definition history.h:112
char name[NAME_LENGTH]
Definition history.h:111
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
HsSchemaVector fSchema
std::vector< HsSchema * > fEvents
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
HsSchemaVector fWriterCurrentSchema
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Finalize()=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual ~SqlBase()
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual int Step()=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3317
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3283
#define DB_KEY_EXIST
Definition midas.h:641
#define DB_FILE_ERROR
Definition midas.h:647
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_MORE_SUBKEYS
Definition midas.h:646
#define HS_UNDEFINED_VAR
Definition midas.h:733
#define HS_SUCCESS
Definition midas.h:727
#define HS_FILE_ERROR
Definition midas.h:728
#define HS_UNDEFINED_EVENT
Definition midas.h:732
unsigned int DWORD
Definition mcstd.h:51
#define TID_DOUBLE
Definition midas.h:343
#define TID_SBYTE
Definition midas.h:329
#define TID_BOOL
Definition midas.h:340
#define TID_SHORT
Definition midas.h:334
#define TID_WORD
Definition midas.h:332
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_CHAR
Definition midas.h:331
#define TID_INT
Definition midas.h:338
#define TID_FLOAT
Definition midas.h:341
#define TID_LAST
Definition midas.h:354
#define TID_DWORD
Definition midas.h:336
double ss_file_size(const char *path)
Definition system.cxx:6972
double ss_time_sec()
Definition system.cxx:3467
INT ss_file_find(const char *path, const char *pattern, char **plist)
Definition system.cxx:6713
INT cm_msg_flush_buffer()
Definition midas.cxx:865
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
const char * rpc_tid_name(INT id)
Definition midas.cxx:11764
int rpc_name_tid(const char *name)
Definition midas.cxx:11778
INT rpc_tid_size(INT id)
Definition midas.cxx:11757
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
static int ReadRecord(const char *file_name, int fd, int offset, int recsize, int irec, char *rec)
const time_t kMonth
static int FindTime(const char *file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int *i1p, time_t *t1p, int *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
const double KiB
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
const double MiB
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
const time_t kDay
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
INT index
Definition mana.cxx:271
DWORD last_time
Definition mana.cxx:3070
void * data
Definition mana.cxx:268
BOOL debug
debug printouts
Definition mana.cxx:254
INT type
Definition mana.cxx:269
char host_name[HOST_NAME_LENGTH]
Definition mana.cxx:242
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
static int offset
Definition mgd.cxx:1500
#define DIR_SEPARATOR
Definition midas.h:193
DWORD BOOL
Definition midas.h:105
#define DIR_SEPARATOR_STR
Definition midas.h:194
#define read(n, a, f)
#define write(n, a, f, d)
#define name(x)
Definition midas_macro.h:24
static FILE * fp
struct callback_addr callback
Definition mserver.cxx:22
MUTEX_T * tm
Definition odbedit.cxx:39
BOOL match(char *pat, char *str)
Definition odbedit.cxx:190
INT j
Definition odbhist.cxx:40
INT k
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
char file_name[256]
Definition odbhist.cxx:41
INT add
Definition odbhist.cxx:40
DWORD status
Definition odbhist.cxx:39
char var_name[256]
Definition odbhist.cxx:41
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
std::string tag_name
int type
std::string tag_type
int n_data
std::string name
int n_bytes
Definition midas.h:1234
DWORD type
Definition midas.h:1236
DWORD n_data
Definition midas.h:1237
char name[NAME_LENGTH]
Definition midas.h:1235
char c
Definition system.cxx:1310
@ DIR
Definition test_init.cxx:7
static double e(void)
Definition tinyexpr.c:136