MIDAS
Loading...
Searching...
No Matches
history_schema.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: history_schema.cxx
4 Created by: Konstantin Olchanski
5
6 Contents: Schema based MIDAS history. Available drivers:
7 FileHistory: storage of data in binary files (replacement for the traditional MIDAS history)
8 MysqlHistory: storage of data in MySQL database (replacement for the ODBC based SQL history)
9 PgsqlHistory: storage of data in PostgreSQL database
10 SqliteHistory: storage of data in SQLITE3 database (not suitable for production use)
11
12\********************************************************************/
13
14#undef NDEBUG // midas required assert() to be always enabled
15
16#include "midas.h"
17#include "msystem.h"
18#include "mstrlcpy.h"
19
20#include <math.h>
21
22#include <vector>
23#include <list>
24#include <string>
25#include <map>
26#include <algorithm>
27
28// make mysql/my_global.h happy - it redefines closesocket()
29#undef closesocket
30
31//
32// benchmarks
33//
34// /usr/bin/time ./linux/bin/mh2sql . /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
35// -rw-r--r-- 1 alpha users 161028048 Oct 19 2012 /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
36// flush 10000, sync=OFF -> 51.51user 1.51system 0:53.76elapsed 98%CPU
37// flush 1000000, sync=NORMAL -> 51.83user 2.09system 1:08.37elapsed 78%CPU (flush never activated)
38// flush 100000, sync=NORMAL -> 51.38user 1.94system 1:06.94elapsed 79%CPU
39// flush 10000, sync=NORMAL -> 51.37user 2.03system 1:31.63elapsed 58%CPU
40// flush 1000, sync=NORMAL -> 52.16user 2.70system 4:38.58elapsed 19%CPU
41
43// MIDAS includes //
45
46#include "midas.h"
47#include "history.h"
48
50// helper stuff //
52
53#define FREE(x) { if (x) free(x); (x) = NULL; }
54
55static char* skip_spaces(char* s)
56{
57 while (*s) {
58 if (!isspace(*s))
59 break;
60 s++;
61 }
62 return s;
63}
64
65static std::string TimeToString(time_t t)
66{
67 const char* sign = "";
68
69 if (t == 0)
70 return "0";
71
72 time_t tt = t;
73
74 if (t < 0) {
75 sign = "-";
76 tt = -t;
77 }
78
79 assert(tt > 0);
80
81 std::string v;
82 while (tt) {
83 char c = '0' + (char)(tt%10);
84 tt /= 10;
85 v = c + v;
86 }
87
88 v = sign + v;
89
90 //printf("time %.0f -> %s\n", (double)t, v.c_str());
91
92 return v;
93}
94
95static std::string SmallIntToString(int i)
96{
97 //int ii = i;
98
99 if (i == 0)
100 return "0";
101
102 assert(i > 0);
103
104 std::string v;
105 while (i) {
106 char c = '0' + (char)(i%10);
107 i /= 10;
108 v = c + v;
109 }
110
111 //printf("SmallIntToString: %d -> %s\n", ii, v.c_str());
112
113 return v;
114}
115
116static bool MatchEventName(const char* event_name, const char* var_event_name)
117{
118 // new-style event name: "equipment_name/variable_name:tag_name"
119 // old-style event name: "equipment_name:tag_name" ("variable_name" is missing)
121
122 //printf("looking for event_name [%s], try table [%s] event name [%s], new style [%d]\n", var_event_name, table_name, event_name, newStyleEventName);
123
124 if (strcasecmp(event_name, var_event_name) == 0) {
125 return true;
126 } else if (newStyleEventName) {
127 return false;
128 } else { // for old style names, need more parsing
129 bool match = false;
130
131 const char* s = event_name;
132 for (int j=0; s[j]; j++) {
133
134 if ((var_event_name[j]==0) && (s[j]=='/')) {
135 match = true;
136 break;
137 }
138
139 if ((var_event_name[j]==0) && (s[j]=='_')) {
140 match = true;
141 break;
142 }
143
144 if (var_event_name[j]==0) {
145 match = false;
146 break;
147 }
148
149 if (tolower(var_event_name[j]) != tolower(s[j])) { // does not work for UTF-8 Unicode
150 match = false;
151 break;
152 }
153 }
154
155 return match;
156 }
157}
158
159static bool MatchTagName(const char* tag_name, int n_data, const char* var_tag_name, const int var_tag_index)
160{
161 char alt_tag_name[1024]; // maybe this is an array without "Names"?
163
164 //printf(" looking for tag [%s] alt [%s], try column name [%s]\n", var_tag_name, alt_tag_name, tag_name);
165
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
168 return true;
169
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
171 return true;
172
173 return false;
174}
175
176static void PrintTags(int ntags, const TAG tags[])
177{
178 for (int i=0; i<ntags; i++)
179 printf("tag %d: %s %s[%d]\n", i, rpc_tid_name(tags[i].type), tags[i].name, tags[i].n_data);
180}
181
182// convert MIDAS event name to something acceptable as an SQL identifier - table name, column name, etc
183
184static std::string MidasNameToSqlName(const char* s)
185{
186 std::string out;
187
188 for (int i=0; s[i]!=0; i++) {
189 char c = s[i];
190 if (isalpha(c) || isdigit(c))
191 out += tolower(c); // does not work for UTF-8 Unicode
192 else
193 out += '_';
194 }
195
196 return out;
197}
198
199// convert MIDAS event name to something acceptable as a file name
200
201static std::string MidasNameToFileName(const char* s)
202{
203 std::string out;
204
205 for (int i=0; s[i]!=0; i++) {
206 char c = s[i];
207 if (isalpha(c) || isdigit(c))
208 out += tolower(c); // does not work for UTF-8 Unicode
209 else
210 out += '_';
211 }
212
213 return out;
214}
215
216// compare event names
217
218static int event_name_cmp(const std::string& e1, const char* e2)
219{
220 return strcasecmp(e1.c_str(), e2);
221}
222
223// compare variable names
224
225static int var_name_cmp(const std::string& v1, const char* v2)
226{
227 return strcasecmp(v1.c_str(), v2);
228}
229
231// SQL data types //
233
234#ifdef HAVE_SQLITE
235static const char *sql_type_sqlite[TID_LAST] = {
236 "xxxINVALIDxxxNULL", // TID_NULL
237 "INTEGER", // TID_UINT8
238 "INTEGER", // TID_INT8
239 "TEXT", // TID_CHAR
240 "INTEGER", // TID_UINT16
241 "INTEGER", // TID_INT16
242 "INTEGER", // TID_UINT32
243 "INTEGER", // TID_INT32
244 "INTEGER", // TID_BOOL
245 "REAL", // TID_FLOAT
246 "REAL", // TID_DOUBLE
247 "INTEGER", // TID_BITFIELD
248 "TEXT", // TID_STRING
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
251 "xxxINVALIDxxxKEY",
252 "xxxINVALIDxxxLINK"
253};
254#endif
255
256#ifdef HAVE_PGSQL
257static const char *sql_type_pgsql[TID_LAST] = {
258 "xxxINVALIDxxxNULL", // TID_NULL
259 "smallint", // TID_BYTE
260 "smallint", // TID_SBYTE
261 "char(1)", // TID_CHAR
262 "integer", // TID_WORD
263 "smallint", // TID_SHORT
264 "bigint", // TID_DWORD
265 "integer", // TID_INT
266 "smallint", // TID_BOOL
267 "real", // TID_FLOAT
268 "double precision", // TID_DOUBLE
269 "bigint", // TID_BITFIELD
270 "text", // TID_STRING
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
273 "xxxINVALIDxxxKEY",
274 "xxxINVALIDxxxLINK"
275};
276#endif
277
278#ifdef HAVE_MYSQL
279static const char *sql_type_mysql[TID_LAST] = {
280 "xxxINVALIDxxxNULL", // TID_NULL
281 "tinyint unsigned", // TID_BYTE
282 "tinyint", // TID_SBYTE
283 "char", // TID_CHAR
284 "smallint unsigned", // TID_WORD
285 "smallint", // TID_SHORT
286 "integer unsigned", // TID_DWORD
287 "integer", // TID_INT
288 "tinyint", // TID_BOOL
289 "float", // TID_FLOAT
290 "double", // TID_DOUBLE
291 "integer unsigned", // TID_BITFIELD
292 "VARCHAR", // TID_STRING
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
295 "xxxINVALIDxxxKEY",
296 "xxxINVALIDxxxLINK"
297};
298#endif
299
300void DoctorPgsqlColumnType(std::string* col_type, const char* index_type)
301{
302 if (*col_type == index_type)
303 return;
304
305 if (*col_type == "bigint" && strcmp(index_type, "int8")==0) {
307 return;
308 }
309
310 if (*col_type == "integer" && strcmp(index_type, "int4")==0) {
312 return;
313 }
314
315 if (*col_type == "smallint" && strcmp(index_type, "int2")==0) {
317 return;
318 }
319
320 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
322 abort();
323}
324
325void DoctorSqlColumnType(std::string* col_type, const char* index_type)
326{
327 if (*col_type == index_type)
328 return;
329
330 if (*col_type == "int(10) unsigned" && strcmp(index_type, "integer unsigned")==0) {
332 return;
333 }
334
335 if (*col_type == "int(11)" && strcmp(index_type, "integer")==0) {
337 return;
338 }
339
340 if (*col_type == "integer" && strcmp(index_type, "int(11)")==0) {
342 return;
343 }
344
345 // MYSQL 8.0.23
346
347 if (*col_type == "int" && strcmp(index_type, "integer")==0) {
349 return;
350 }
351
352 if (*col_type == "int unsigned" && strcmp(index_type, "integer unsigned")==0) {
354 return;
355 }
356
357 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
359 abort();
360}
361
362#if 0
363static int sql2midasType_mysql(const char* name)
364{
365 for (int tid=0; tid<TID_LAST; tid++)
366 if (strcasecmp(name, sql_type_mysql[tid])==0)
367 return tid;
368 // FIXME!
369 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
370 return 0;
371}
372#endif
373
374#if 0
375static int sql2midasType_sqlite(const char* name)
376{
377 if (strcmp(name, "INTEGER") == 0)
378 return TID_INT;
379 if (strcmp(name, "REAL") == 0)
380 return TID_DOUBLE;
381 if (strcmp(name, "TEXT") == 0)
382 return TID_STRING;
383 // FIXME!
384 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
385 return 0;
386}
387#endif
388
390// Schema base classes //
392
394 std::string tag_name; // tag name from MIDAS
395 std::string tag_type; // tag type from MIDAS
396 std::string name; // entry name, same as tag_name except when read from SQL history when it could be the SQL column name
397 int type = 0; // MIDAS data type TID_xxx
398 int n_data = 0; // MIDAS array size
399 int n_bytes = 0; // n_data * size of MIDAS data type (only used by HsFileSchema?)
400};
401
403{
404public:
405
406 // event schema definitions
407 std::string fEventName;
410 std::vector<HsSchemaEntry> fVariables;
411 std::vector<int> fOffsets;
412 size_t fNumBytes = 0;
413
414 // run time data used by hs_write_event()
417 size_t fWriteMaxSize = 0;
418 size_t fWriteMinSize = 0;
419
420 // schema disabled by write error
421 bool fDisabled = true;
422
423public:
424
425 HsSchema() // ctor
426 {
427 // empty
428 }
429
430 virtual void remove_inactive_columns() = 0; // used by SQL schemas
431 virtual void print(bool print_tags = true) const;
432 virtual ~HsSchema(); // dtor
433 virtual int flush_buffers() = 0;
434 virtual int close() = 0;
435 virtual int write_event(const time_t t, const char* data, const size_t data_size) = 0;
436 virtual int match_event_var(const char* event_name, const char* var_name, const int var_index);
437 virtual int read_last_written(const time_t timestamp,
438 const int debug,
439 time_t* last_written) = 0;
440 virtual int read_data(const time_t start_time,
441 const time_t end_time,
442 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
443 const int debug,
444 std::vector<time_t>& last_time,
445 MidasHistoryBufferInterface* buffer[]) = 0;
446};
447
449{
450protected:
451 std::vector<HsSchema*> fData;
452
453public:
454 ~HsSchemaVector() { // dtor
455 clear();
456 }
457
459 return fData[index];
460 }
461
462 size_t size() const {
463 return fData.size();
464 }
465
466 void add(HsSchema* s);
467
468 void clear() {
469 for (size_t i=0; i<fData.size(); i++)
470 if (fData[i]) {
471 delete fData[i];
472 fData[i] = NULL;
473 }
474 fData.clear();
475 }
476
477 void print(bool print_tags = true) const {
478 for (size_t i=0; i<fData.size(); i++)
480 }
481
482 HsSchema* find_event(const char* event_name, const time_t timestamp, int debug = 0);
483};
484
486// Base class functions //
488
490{
491 //printf("HsSchema::dtor %p!\n", this);
492 // only report if undersize/oversize happens more than once -
493 // the first occurence is already reported by hs_write_event()
494 if (fCountWriteUndersize > 1) {
495 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %zu bytes, hs_write_event() called with as few as %zu bytes", fEventName.c_str(), fCountWriteUndersize, fNumBytes, fWriteMinSize);
496 }
497
498 if (fCountWriteOversize > 1) {
499 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %zu bytes, hs_write_event() called with as many as %zu bytes", fEventName.c_str(), fCountWriteOversize, fNumBytes, fWriteMaxSize);
500 }
501};
502
504{
505 // schema list "data" is sorted by decreasing "fTimeFrom", newest schema first
506
507 //printf("add: %s..%s %s\n", TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), s->fEventName.c_str());
508
509 bool added = false;
510
511 for (auto it = fData.begin(); it != fData.end(); it++) {
512 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
513 if (s->fTimeFrom == (*it)->fTimeFrom) {
514 // duplicate schema, keep the last one added (for file schema it is the newer file)
515 s->fTimeTo = (*it)->fTimeTo;
516 delete (*it);
517 (*it) = s;
518 return;
519 }
520 }
521
522 if (s->fTimeFrom > (*it)->fTimeFrom) {
523 fData.insert(it, s);
524 added = true;
525 break;
526 }
527 }
528
529 if (!added) {
530 fData.push_back(s);
531 }
532
533 //time_t oldest_time_from = fData.back()->fTimeFrom;
534
535 time_t time_to = 0;
536
537 for (auto it = fData.begin(); it != fData.end(); it++) {
538 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
539 (*it)->fTimeTo = time_to;
540 time_to = (*it)->fTimeFrom;
541
542 //printf("vvv: %s..%s %s\n", TimeToString((*it)->fTimeFrom-oldest_time_from).c_str(), TimeToString((*it)->fTimeTo-oldest_time_from).c_str(), (*it)->fEventName.c_str());
543 }
544 }
545}
546
547HsSchema* HsSchemaVector::find_event(const char* event_name, time_t t, int debug)
548{
549 HsSchema* ss = NULL;
550
551 if (debug) {
552 printf("find_event: All schema for event %s: (total %zu)\n", event_name, fData.size());
553 int found = 0;
554 for (size_t i=0; i<fData.size(); i++) {
555 HsSchema* s = fData[i];
556 printf("find_event: schema %zu name [%s]\n", i, s->fEventName.c_str());
557 if (event_name)
558 if (event_name_cmp(s->fEventName, event_name)!=0)
559 continue;
560 s->print();
561 found++;
562 }
563 printf("find_event: Found %d schemas for event %s\n", found, event_name);
564
565 //if (found == 0)
566 // abort();
567 }
568
569 for (size_t i=0; i<fData.size(); i++) {
570 HsSchema* s = fData[i];
571
572 // wrong event
573 if (event_name)
574 if (event_name_cmp(s->fEventName, event_name)!=0)
575 continue;
576
577 // schema is from after the time we are looking for
578 if (s->fTimeFrom > t)
579 continue;
580
581 if (!ss)
582 ss = s;
583
584 // remember the newest schema
585 if (s->fTimeFrom > ss->fTimeFrom)
586 ss = s;
587 }
588
589 // try to find
590 for (size_t i=0; i<fData.size(); i++) {
591 HsSchema* s = fData[i];
592
593 // wrong event
594 if (event_name)
595 if (event_name_cmp(s->fEventName, event_name)!=0)
596 continue;
597
598 // schema is from after the time we are looking for
599 if (s->fTimeFrom > t)
600 continue;
601
602 if (!ss)
603 ss = s;
604
605 // remember the newest schema
606 if (s->fTimeFrom > ss->fTimeFrom)
607 ss = s;
608 }
609
610 if (debug) {
611 if (ss) {
612 printf("find_event: for time %s, returning:\n", TimeToString(t).c_str());
613 ss->print();
614 } else {
615 printf("find_event: for time %s, nothing found:\n", TimeToString(t).c_str());
616 }
617 }
618
619 return ss;
620}
621
623// Sql interface class //
625
626class SqlBase
627{
628public:
629 int fDebug = 0;
630 bool fIsConnected = false;
632
633 SqlBase() { // ctor
634 };
635
636 virtual ~SqlBase() { // dtor
637 // confirm that the destructor of the concrete class
638 // disconnected the database
639 assert(!fIsConnected);
640 fDebug = 0;
641 fIsConnected = false;
642 }
643
644 virtual int Connect(const char* path) = 0;
645 virtual int Disconnect() = 0;
646 virtual bool IsConnected() = 0;
647
648 virtual int ListTables(std::vector<std::string> *plist) = 0;
649 virtual int ListColumns(const char* table_name, std::vector<std::string> *plist) = 0;
650
651 // sql commands
652 virtual int Exec(const char* table_name, const char* sql) = 0;
653 virtual int ExecDisconnected(const char* table_name, const char* sql) = 0;
654
655 // queries
656 virtual int Prepare(const char* table_name, const char* sql) = 0;
657 virtual int Step() = 0;
658 virtual const char* GetText(int column) = 0;
659 virtual time_t GetTime(int column) = 0;
660 virtual double GetDouble(int column) = 0;
661 virtual int Finalize() = 0;
662
663 // transactions
664 virtual int OpenTransaction(const char* table_name) = 0;
665 virtual int CommitTransaction(const char* table_name) = 0;
666 virtual int RollbackTransaction(const char* table_name) = 0;
667
668 // data types
669 virtual const char* ColumnType(int midas_tid) = 0;
670 virtual bool TypesCompatible(int midas_tid, const char* sql_type) = 0;
671
672 // string quoting
673 virtual std::string QuoteString(const char* s) = 0; // quote text string
674 virtual std::string QuoteId(const char* s) = 0; // quote identifier, such as table or column name
675};
676
678// Schema concrete classes //
680
681class HsSqlSchema : public HsSchema
682{
683public:
684
686 std::string fTableName;
687 std::vector<std::string> fColumnNames;
688 std::vector<std::string> fColumnTypes;
689 std::vector<bool> fColumnInactive;
690
691public:
692
693 HsSqlSchema() // ctor
694 {
695 // empty
696 }
697
698 ~HsSqlSchema() // dtor
699 {
700 assert(get_transaction_count() == 0);
701 }
702
704 void print(bool print_tags = true) const;
708 int close_transaction();
710 int close() { return close_transaction(); }
711 int write_event(const time_t t, const char* data, const size_t data_size);
712 int match_event_var(const char* event_name, const char* var_name, const int var_index);
713 int read_last_written(const time_t timestamp,
714 const int debug,
715 time_t* last_written);
716 int read_data(const time_t start_time,
717 const time_t end_time,
718 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
719 const int debug,
720 std::vector<time_t>& last_time,
722
723private:
724 // Sqlite uses a transaction per table; MySQL uses a single transaction for all tables.
725 // But to support future "single transaction" DBs more easily (e.g. if user wants to
726 // log to both Postgres and MySQL in future), we keep track of the transaction count
727 // per SQL engine.
729 static std::map<SqlBase*, int> gfTransactionCount;
730};
731
732std::map<SqlBase*, int> HsSqlSchema::gfTransactionCount;
733
734class HsFileSchema : public HsSchema
735{
736public:
737
738 std::string fFileName;
739 size_t fRecordSize = 0;
741 size_t fLastSize = 0;
742 int fWriterFd = -1;
745
746public:
747 off64_t fFileSizeInitial = 0; // initial file size
748 off64_t fFileSize = 0; // file size including any new data we wrote
749
750public:
751
752 HsFileSchema() // ctor
753 {
754 // empty
755 }
756
758 {
759 //printf("HsFileSchema::dtor %p!\n", this);
760 close();
761 fRecordSize = 0;
762 fDataOffset = 0;
763 fLastSize = 0;
764 fWriterFd = -1;
765 if (fRecordBuffer) {
766 free(fRecordBuffer);
768 }
770 }
771
772 void remove_inactive_columns() { /* empty */ };
773 void print(bool print_tags = true) const;
774 int flush_buffers() { return HS_SUCCESS; };
775 int close();
776 int write_event(const time_t t, const char* data, const size_t data_size);
777 int read_last_written(const time_t timestamp,
778 const int debug,
779 time_t* last_written);
780 int read_data(const time_t start_time,
781 const time_t end_time,
782 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
783 const int debug,
784 std::vector<time_t>& last_time,
786};
787
789// Print functions //
791
793{
794 size_t nv = this->fVariables.size();
795 printf("event [%s], time %s..%s, %zu variables, %zu bytes\n", this->fEventName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
796 if (print_tags)
797 for (size_t j=0; j<nv; j++)
798 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
799};
800
802{
803 size_t nv = this->fVariables.size();
804 printf("event [%s], sql_table [%s], time %s..%s, %zu variables, %zu bytes\n", this->fEventName.c_str(), this->fTableName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
805 if (print_tags) {
806 for (size_t j=0; j<nv; j++) {
807 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes);
808 printf(", sql_column [%s], sql_type [%s], offset %d", this->fColumnNames[j].c_str(), this->fColumnTypes[j].c_str(), this->fOffsets[j]);
809 printf(", inactive %d", (int)this->fColumnInactive[j]);
810 printf("\n");
811 }
812 }
813}
814
816{
817 size_t nv = this->fVariables.size();
818 printf("event [%s], file_name [%s], time %s..%s, %zu variables, %zu bytes, dat_offset %jd, record_size %zu\n", this->fEventName.c_str(), this->fFileName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes, (intmax_t)fDataOffset, fRecordSize);
819 if (print_tags) {
820 for (size_t j=0; j<nv; j++)
821 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
822 }
823}
824
826// File functions //
828
829#ifdef HAVE_MYSQL
830
832// MYSQL/MariaDB database access //
834
835//#warning !!!HAVE_MYSQL!!!
836
837//#include <my_global.h> // my_global.h removed MySQL 8.0, MariaDB 10.2. K.O.
838#include <mysql.h>
839
840class Mysql: public SqlBase
841{
842public:
843 std::string fConnectString;
844 MYSQL* fMysql = NULL;
845
846 // query results
849 int fNumFields = 0;
850
851 // disconnected operation
852 size_t fMaxDisconnected = 0;
853 std::list<std::string> fDisconnectedBuffer;
856 int fDisconnectedLost = 0;
857
858 Mysql(); // ctor
859 ~Mysql(); // dtor
860
861 int Connect(const char* path);
862 int Disconnect();
863 bool IsConnected();
864
865 int ConnectTable(const char* table_name);
866
867 int ListTables(std::vector<std::string> *plist);
868 int ListColumns(const char* table_name, std::vector<std::string> *plist);
869
870 int Exec(const char* table_name, const char* sql);
871 int ExecDisconnected(const char* table_name, const char* sql);
872
873 int Prepare(const char* table_name, const char* sql);
874 int Step();
875 const char* GetText(int column);
876 time_t GetTime(int column);
877 double GetDouble(int column);
878 int Finalize();
879
880 int OpenTransaction(const char* table_name);
881 int CommitTransaction(const char* table_name);
882 int RollbackTransaction(const char* table_name);
883
884 const char* ColumnType(int midas_tid);
885 bool TypesCompatible(int midas_tid, const char* sql_type);
886
887 std::string QuoteId(const char* s);
888 std::string QuoteString(const char* s);
889};
890
891Mysql::Mysql() // ctor
892{
893 fMysql = NULL;
894 fResult = NULL;
895 fRow = NULL;
896 fNumFields = 0;
897 fMaxDisconnected = 1000;
898 fNextReconnect = 0;
901 fTransactionPerTable = false;
902}
903
904Mysql::~Mysql() // dtor
905{
906 Disconnect();
907 fMysql = NULL;
908 fResult = NULL;
909 fRow = NULL;
910 fNumFields = 0;
911 if (fDisconnectedBuffer.size() > 0) {
912 cm_msg(MINFO, "Mysql::~Mysql", "Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
914 }
915}
916
917int Mysql::Connect(const char* connect_string)
918{
919 if (fIsConnected)
920 Disconnect();
921
922 fConnectString = connect_string;
923
924 if (fDebug) {
925 cm_msg(MINFO, "Mysql::Connect", "Connecting to Mysql database specified by \'%s\'", connect_string);
927 }
928
929 std::string host_name;
930 std::string user_name;
931 std::string user_password;
932 std::string db_name;
933 int tcp_port = 0;
934 std::string unix_socket;
935 std::string buffer;
936
937 FILE* fp = fopen(connect_string, "r");
938 if (!fp) {
939 cm_msg(MERROR, "Mysql::Connect", "Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
940 return DB_FILE_ERROR;
941 }
942
943 while (1) {
944 char buf[256];
945 char* s = fgets(buf, sizeof(buf), fp);
946 if (!s)
947 break; // EOF
948
949 char*ss;
950 // kill trailing \n and \r
951 ss = strchr(s, '\n');
952 if (ss) *ss = 0;
953 ss = strchr(s, '\r');
954 if (ss) *ss = 0;
955
956 //printf("line [%s]\n", s);
957
958 if (strncasecmp(s, "server=", 7)==0)
959 host_name = skip_spaces(s + 7);
960 if (strncasecmp(s, "port=", 5)==0)
961 tcp_port = atoi(skip_spaces(s + 5));
962 if (strncasecmp(s, "database=", 9)==0)
963 db_name = skip_spaces(s + 9);
964 if (strncasecmp(s, "socket=", 7)==0)
965 unix_socket = skip_spaces(s + 7);
966 if (strncasecmp(s, "user=", 5)==0)
967 user_name = skip_spaces(s + 5);
968 if (strncasecmp(s, "password=", 9)==0)
969 user_password = skip_spaces(s + 9);
970 if (strncasecmp(s, "buffer=", 7)==0)
971 buffer = skip_spaces(s + 7);
972 }
973
974 fclose(fp);
975
976 int buffer_int = atoi(buffer.c_str());
977
978 if (buffer_int > 0 && buffer_int < 1000000)
980
981 if (fDebug)
982 printf("Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
983
984 if (!fMysql) {
986 if (!fMysql) {
987 return DB_FILE_ERROR;
988 }
989 }
990
992
993 if (mysql_real_connect(fMysql, host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
994 cm_msg(MERROR, "Mysql::Connect", "mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", mysql_errno(fMysql), mysql_error(fMysql));
995 Disconnect();
996 return DB_FILE_ERROR;
997 }
998
999 int status;
1000
1001 // FIXME:
1002 //my_bool reconnect = 0;
1003 //mysql_options(&mysql, MYSQL_OPT_RECONNECT, &reconnect);
1004
1005 status = Exec("(notable)", "SET SESSION sql_mode='ANSI'");
1006 if (status != DB_SUCCESS) {
1007 cm_msg(MERROR, "Mysql::Connect", "Cannot set ANSI mode, nothing will work");
1008 Disconnect();
1009 return DB_FILE_ERROR;
1010 }
1011
1012 if (fDebug) {
1013 cm_msg(MINFO, "Mysql::Connect", "Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1015 }
1016
1017 fIsConnected = true;
1018
1019 int count = 0;
1020 while (fDisconnectedBuffer.size() > 0) {
1021 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1022 if (status != DB_SUCCESS) {
1023 return status;
1024 }
1025 fDisconnectedBuffer.pop_front();
1026 count++;
1027 }
1028
1029 if (count > 0) {
1030 cm_msg(MINFO, "Mysql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1032 }
1033
1034 assert(fDisconnectedBuffer.size() == 0);
1036
1037 return DB_SUCCESS;
1038}
1039
1040int Mysql::Disconnect()
1041{
1042 if (fRow) {
1043 // FIXME: mysql_free_result(fResult);
1044 }
1045
1046 if (fResult)
1048
1049 if (fMysql)
1051
1052 fMysql = NULL;
1053 fResult = NULL;
1054 fRow = NULL;
1055
1056 fIsConnected = false;
1057 return DB_SUCCESS;
1058}
1059
1060bool Mysql::IsConnected()
1061{
1062 return fIsConnected;
1063}
1064
1065int Mysql::OpenTransaction(const char* table_name)
1066{
1067 return Exec(table_name, "START TRANSACTION");
1068 return DB_SUCCESS;
1069}
1070
1071int Mysql::CommitTransaction(const char* table_name)
1072{
1073 Exec(table_name, "COMMIT");
1074 return DB_SUCCESS;
1075}
1076
1077int Mysql::RollbackTransaction(const char* table_name)
1078{
1079 Exec(table_name, "ROLLBACK");
1080 return DB_SUCCESS;
1081}
1082
1083int Mysql::ListTables(std::vector<std::string> *plist)
1084{
1085 if (!fIsConnected)
1086 return DB_FILE_ERROR;
1087
1088 if (fDebug)
1089 printf("Mysql::ListTables!\n");
1090
1091 int status;
1092
1094
1095 if (fResult == NULL) {
1096 cm_msg(MERROR, "Mysql::ListTables", "mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1097 return DB_FILE_ERROR;
1098 }
1099
1101
1102 while (1) {
1103 status = Step();
1104 if (status != DB_SUCCESS)
1105 break;
1106 std::string tn = GetText(0);
1107 plist->push_back(tn);
1108 };
1109
1110 status = Finalize();
1111
1112 return DB_SUCCESS;
1113}
1114
1115int Mysql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1116{
1117 if (!fIsConnected)
1118 return DB_FILE_ERROR;
1119
1120 if (fDebug)
1121 printf("Mysql::ListColumns for table \'%s\'\n", table_name);
1122
1123 int status;
1124
1125 std::string cmd;
1126 cmd += "SHOW COLUMNS FROM ";
1127 cmd += QuoteId(table_name);
1128 cmd += ";";
1129
1130 status = Prepare(table_name, cmd.c_str());
1131 if (status != DB_SUCCESS)
1132 return status;
1133
1135
1136 while (1) {
1137 status = Step();
1138 if (status != DB_SUCCESS)
1139 break;
1140 std::string cn = GetText(0);
1141 std::string ct = GetText(1);
1142 plist->push_back(cn);
1143 plist->push_back(ct);
1144 //printf("cn [%s]\n", cn.c_str());
1145 //for (int i=0; i<fNumFields; i++)
1146 //printf(" field[%d]: [%s]\n", i, GetText(i));
1147 };
1148
1149 status = Finalize();
1150
1151 return DB_SUCCESS;
1152}
1153
1154int Mysql::Exec(const char* table_name, const char* sql)
1155{
1156 if (fDebug)
1157 printf("Mysql::Exec(%s, %s)\n", table_name, sql);
1158
1159 // FIXME: match Sqlite::Exec() return values:
1160 // return values:
1161 // DB_SUCCESS
1162 // DB_FILE_ERROR: not connected
1163 // DB_KEY_EXIST: "table already exists"
1164
1165 if (!fMysql)
1166 return DB_FILE_ERROR;
1167
1168 assert(fMysql);
1169 assert(fResult == NULL); // there should be no unfinalized queries
1170 assert(fRow == NULL);
1171
1172 if (mysql_query(fMysql, sql)) {
1173 if (mysql_errno(fMysql) == 1050) { // "Table already exists"
1174 return DB_KEY_EXIST;
1175 }
1176 if (mysql_errno(fMysql) == 1146) { // "Table does not exist"
1177 return DB_FILE_ERROR;
1178 }
1179 cm_msg(MERROR, "Mysql::Exec", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1180 if (mysql_errno(fMysql) == 1060) // "Duplicate column name"
1181 return DB_KEY_EXIST;
1182 if (mysql_errno(fMysql) == 2006) { // "MySQL server has gone away"
1183 Disconnect();
1184 return ExecDisconnected(table_name, sql);
1185 }
1186 return DB_FILE_ERROR;
1187 }
1188
1189 return DB_SUCCESS;
1190}
1191
1192int Mysql::ExecDisconnected(const char* table_name, const char* sql)
1193{
1194 if (fDebug)
1195 printf("Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1196
1198 fDisconnectedBuffer.push_back(sql);
1199 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1200 cm_msg(MERROR, "Mysql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1201 }
1202 } else {
1204 }
1205
1206 time_t now = time(NULL);
1207
1208 if (fNextReconnect == 0 || now >= fNextReconnect) {
1209 int status = Connect(fConnectString.c_str());
1210 if (status == DB_SUCCESS) {
1211 fNextReconnect = 0;
1213 } else {
1214 if (fNextReconnectDelaySec == 0) {
1216 } else if (fNextReconnectDelaySec < 10*60) {
1218 }
1219 if (fDebug) {
1220 cm_msg(MINFO, "Mysql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1222 }
1224 }
1225 }
1226
1227 return DB_SUCCESS;
1228}
1229
1230int Mysql::Prepare(const char* table_name, const char* sql)
1231{
1232 if (fDebug)
1233 printf("Mysql::Prepare(%s, %s)\n", table_name, sql);
1234
1235 if (!fMysql)
1236 return DB_FILE_ERROR;
1237
1238 assert(fMysql);
1239 assert(fResult == NULL); // there should be no unfinalized queries
1240 assert(fRow == NULL);
1241
1242 // if (mysql_query(fMysql, sql)) {
1243 // cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1244 // return DB_FILE_ERROR;
1245 //}
1246
1247 // Check if the connection to MySQL timed out; fix from B. Smith
1248 int status = mysql_query(fMysql, sql);
1249 if (status) {
1250 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1251 // "MySQL server has gone away" or "Lost connection to MySQL server during query"
1252 status = Connect(fConnectString.c_str());
1253 if (status == DB_SUCCESS) {
1254 // Retry after reconnecting
1256 } else {
1257 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql, status);
1258 return DB_FILE_ERROR;
1259 }
1260 }
1261 if (status) {
1262 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1263 return DB_FILE_ERROR;
1264 }
1265 cm_msg(MINFO, "Mysql::Prepare", "Reconnected to MySQL after long inactivity.");
1266 }
1267
1269 //fResult = mysql_use_result(fMysql); // cannot use this because it blocks writing into table
1270
1271 if (!fResult) {
1272 cm_msg(MERROR, "Mysql::Prepare", "mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1273 return DB_FILE_ERROR;
1274 }
1275
1277
1278 //printf("num fields %d\n", fNumFields);
1279
1280 return DB_SUCCESS;
1281}
1282
1283int Mysql::Step()
1284{
1285 if (/* DISABLES CODE */ (0) && fDebug)
1286 printf("Mysql::Step()\n");
1287
1288 assert(fMysql);
1289 assert(fResult);
1290
1292
1293 if (fRow)
1294 return DB_SUCCESS;
1295
1296 if (mysql_errno(fMysql) == 0)
1297 return DB_NO_MORE_SUBKEYS;
1298
1299 cm_msg(MERROR, "Mysql::Step", "mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1300
1301 return DB_FILE_ERROR;
1302}
1303
1304const char* Mysql::GetText(int column)
1305{
1306 assert(fMysql);
1307 assert(fResult);
1308 assert(fRow);
1309 assert(fNumFields > 0);
1310 assert(column >= 0);
1311 assert(column < fNumFields);
1312 if (fRow[column] == NULL)
1313 return "";
1314 return fRow[column];
1315}
1316
1317double Mysql::GetDouble(int column)
1318{
1319 return atof(GetText(column));
1320}
1321
1322time_t Mysql::GetTime(int column)
1323{
1324 return strtoul(GetText(column), NULL, 0);
1325}
1326
1327int Mysql::Finalize()
1328{
1329 assert(fMysql);
1330 assert(fResult);
1331
1333 fResult = NULL;
1334 fRow = NULL;
1335 fNumFields = 0;
1336
1337 return DB_SUCCESS;
1338}
1339
1340const char* Mysql::ColumnType(int midas_tid)
1341{
1342 assert(midas_tid>=0);
1343 assert(midas_tid<TID_LAST);
1344 return sql_type_mysql[midas_tid];
1345}
1346
1347bool Mysql::TypesCompatible(int midas_tid, const char* sql_type)
1348{
1349 if (/* DISABLES CODE */ (0))
1350 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1351
1352 //if (sql2midasType_mysql(sql_type) == midas_tid)
1353 // return true;
1354
1355 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1356 return true;
1357
1358 // permit writing FLOAT into DOUBLE
1359 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double")==0)
1360 return true;
1361
1362 // T2K quirk!
1363 // permit writing BYTE into signed tinyint
1364 if (midas_tid==TID_BYTE && strcmp(sql_type, "tinyint")==0)
1365 return true;
1366
1367 // T2K quirk!
1368 // permit writing WORD into signed tinyint
1369 if (midas_tid==TID_WORD && strcmp(sql_type, "tinyint")==0)
1370 return true;
1371
1372 // mysql quirk!
1373 //if (midas_tid==TID_DWORD && strcmp(sql_type, "int(10) unsigned")==0)
1374 // return true;
1375
1376 if (/* DISABLES CODE */ (0))
1377 printf("type mismatch!\n");
1378
1379 return false;
1380}
1381
1382std::string Mysql::QuoteId(const char* s)
1383{
1384 std::string q;
1385 q += "`";
1386 q += s;
1387 q += "`";
1388 return q;
1389}
1390
1391std::string Mysql::QuoteString(const char* s)
1392{
1393 std::string q;
1394 q += "\'";
1395 q += s;
1396#if 0
1397 while (int c = *s++) {
1398 if (c == '\'') {
1399 q += "\\'";
1400 } if (c == '"') {
1401 q += "\\\"";
1402 } else if (isprint(c)) {
1403 q += c;
1404 } else {
1405 char buf[256];
1406 sprintf(buf, "\\\\x%02x", c&0xFF);
1407 q += buf;
1408 }
1409 }
1410#endif
1411 q += "\'";
1412 return q;
1413}
1414
1415#endif // HAVE_MYSQL
1416
1417#ifdef HAVE_PGSQL
1418
1420// PostgreSQL database access //
1422
1423//#warning !!!HAVE_PGSQL!!!
1424
1425#include <libpq-fe.h>
1426
1427class Pgsql: public SqlBase
1428{
1429public:
1430 std::string fConnectString;
1431 int fDownsample = 0;
1432 PGconn* fPgsql = NULL;
1433
1434 // query results
1436 int fNumFields = 0;
1437 int fRow = 0;
1438
1439 // disconnected operation
1440 size_t fMaxDisconnected = 0;
1441 std::list<std::string> fDisconnectedBuffer;
1443 int fNextReconnectDelaySec = 0;
1444 int fDisconnectedLost = 0;
1445
1446 Pgsql(); // ctor
1447 ~Pgsql(); // dtor
1448
1449 int Connect(const char* path);
1450 int Disconnect();
1451 bool IsConnected();
1452
1453 int ConnectTable(const char* table_name);
1454
1455 int ListTables(std::vector<std::string> *plist);
1456 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1457
1458 int Exec(const char* table_name, const char* sql);
1459 int ExecDisconnected(const char* table_name, const char* sql);
1460
1461 int Prepare(const char* table_name, const char* sql);
1462 std::string BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints, const char* table_name, const char* column_name);
1463 int Step();
1464 const char* GetText(int column);
1465 time_t GetTime(int column);
1466 double GetDouble(int column);
1467 int Finalize();
1468
1469 int OpenTransaction(const char* table_name);
1470 int CommitTransaction(const char* table_name);
1471 int RollbackTransaction(const char* table_name);
1472
1473 const char* ColumnType(int midas_tid);
1474 bool TypesCompatible(int midas_tid, const char* sql_type);
1475
1476 std::string QuoteId(const char* s);
1477 std::string QuoteString(const char* s);
1478};
1479
1480Pgsql::Pgsql() // ctor
1481{
1482 fPgsql = NULL;
1483 fDownsample = 0;
1484 fResult = NULL;
1485 fRow = -1;
1486 fNumFields = 0;
1487 fMaxDisconnected = 1000;
1488 fNextReconnect = 0;
1491 fTransactionPerTable = false;
1492}
1493
1494Pgsql::~Pgsql() // dtor
1495{
1496 Disconnect();
1497 if(fResult)
1499 fRow = -1;
1500 fNumFields = 0;
1501 if (fDisconnectedBuffer.size() > 0) {
1502 cm_msg(MINFO, "Pgsql::~Pgsql", "Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
1504 }
1505}
1506
1507int Pgsql::Connect(const char* connect_string)
1508{
1509 if (fIsConnected)
1510 Disconnect();
1511
1512 fConnectString = connect_string;
1513
1514 if (fDebug) {
1515 cm_msg(MINFO, "Pgsql::Connect", "Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1517 }
1518
1519 std::string host_name;
1520 std::string user_name;
1521 std::string user_password;
1522 std::string db_name;
1523 std::string tcp_port;
1524 std::string unix_socket;
1525 std::string buffer;
1526
1527 FILE* fp = fopen(connect_string, "r");
1528 if (!fp) {
1529 cm_msg(MERROR, "Pgsql::Connect", "Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1530 return DB_FILE_ERROR;
1531 }
1532
1533 while (1) {
1534 char buf[256];
1535 char* s = fgets(buf, sizeof(buf), fp);
1536 if (!s)
1537 break; // EOF
1538
1539 char*ss;
1540 // kill trailing \n and \r
1541 ss = strchr(s, '\n');
1542 if (ss) *ss = 0;
1543 ss = strchr(s, '\r');
1544 if (ss) *ss = 0;
1545
1546 //printf("line [%s]\n", s);
1547
1548 if (strncasecmp(s, "server=", 7)==0)
1549 host_name = skip_spaces(s + 7);
1550 if (strncasecmp(s, "port=", 5)==0)
1551 tcp_port = skip_spaces(s + 5);
1552 if (strncasecmp(s, "database=", 9)==0)
1553 db_name = skip_spaces(s + 9);
1554 if (strncasecmp(s, "socket=", 7)==0)
1555 unix_socket = skip_spaces(s + 7);
1556 if (strncasecmp(s, "user=", 5)==0)
1557 user_name = skip_spaces(s + 5);
1558 if (strncasecmp(s, "password=", 9)==0)
1559 user_password = skip_spaces(s + 9);
1560 if (strncasecmp(s, "buffer=", 7)==0)
1561 buffer = skip_spaces(s + 7);
1562 }
1563
1564 fclose(fp);
1565
1566 int buffer_int = atoi(buffer.c_str());
1567
1568 if (buffer_int > 0 && buffer_int < 1000000)
1570
1571 if (fDebug)
1572 printf("Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1573
1574 fPgsql = PQsetdbLogin(host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1575 if (PQstatus(fPgsql) != CONNECTION_OK) {
1576 std::string msg(PQerrorMessage(fPgsql));
1577 msg.erase(std::remove(msg.begin(), msg.end(), '\n'), msg.end());
1578 cm_msg(MERROR, "Pgsql::Connect", "PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", msg.c_str());
1579 Disconnect();
1580 return DB_FILE_ERROR;
1581 }
1582
1583 int status;
1584
1585 if (fDebug) {
1586 cm_msg(MINFO, "Pgsql::Connect", "Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1588 }
1589
1590 fIsConnected = true;
1591
1592 int count = 0;
1593 while (fDisconnectedBuffer.size() > 0) {
1594 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1595 if (status != DB_SUCCESS) {
1596 return status;
1597 }
1598 fDisconnectedBuffer.pop_front();
1599 count++;
1600 }
1601
1602 if (count > 0) {
1603 cm_msg(MINFO, "Pgsql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1605 }
1606
1607 assert(fDisconnectedBuffer.size() == 0);
1609
1610 if (fDownsample) {
1611 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb';");
1612
1613 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1614 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB extension not installed");
1615 return DB_FILE_ERROR;
1616 }
1617 Finalize();
1618
1619 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb_toolkit';");
1620
1621 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1622 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB_toolkit extension not installed");
1623 return DB_FILE_ERROR;
1624 }
1625 Finalize();
1626
1627 cm_msg(MINFO, "Pgsql::Connect", "TimescaleDB extensions found - downsampling enabled");
1628 }
1629
1630 return DB_SUCCESS;
1631}
1632
1633int Pgsql::Disconnect()
1634{
1635 if (fPgsql)
1637
1638 fPgsql = NULL;
1639 fRow = -1;
1640
1641 fIsConnected = false;
1642 return DB_SUCCESS;
1643}
1644
1645bool Pgsql::IsConnected()
1646{
1647 return fIsConnected;
1648}
1649
1650int Pgsql::OpenTransaction(const char* table_name)
1651{
1652 return Exec(table_name, "BEGIN TRANSACTION;");
1653}
1654
1655int Pgsql::CommitTransaction(const char* table_name)
1656{
1657 return Exec(table_name, "COMMIT;");
1658}
1659
1660int Pgsql::RollbackTransaction(const char* table_name)
1661{
1662 return Exec(table_name, "ROLLBACK;");
1663}
1664
1665int Pgsql::ListTables(std::vector<std::string> *plist)
1666{
1667 if (!fIsConnected)
1668 return DB_FILE_ERROR;
1669
1670 if (fDebug)
1671 printf("Pgsql::ListTables!\n");
1672
1673 int status = Prepare("pg_tables", "select tablename from pg_tables where schemaname = 'public';");
1674
1675 if (status != DB_SUCCESS) {
1676 cm_msg(MERROR, "Pgsql::ListTables", "error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1677 return DB_FILE_ERROR;
1678 }
1679
1680 while (1) {
1681 if (Step() != DB_SUCCESS)
1682 break;
1683 std::string tn = GetText(0);
1684 plist->push_back(tn);
1685 };
1686
1687 Finalize();
1688
1689 return DB_SUCCESS;
1690}
1691
1692int Pgsql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1693{
1694 if (!fIsConnected)
1695 return DB_FILE_ERROR;
1696
1697 if (fDebug)
1698 printf("Pgsql::ListColumns for table \'%s\'\n", table_name);
1699
1700 std::string cmd;
1701 cmd += "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1702 cmd += QuoteString(table_name);
1703 cmd += ";";
1704
1705 int status = Prepare(table_name, cmd.c_str());
1706 if (status != DB_SUCCESS)
1707 return status;
1708
1710
1711 while (1) {
1712 if (Step() != DB_SUCCESS)
1713 break;
1714 std::string cn = GetText(0);
1715 std::string ct = GetText(1);
1716 plist->push_back(cn);
1717 plist->push_back(ct);
1718 };
1719
1720 Finalize();
1721
1722 return DB_SUCCESS;
1723}
1724
1725int Pgsql::Exec(const char* table_name, const char* sql)
1726{
1727 if (fDebug)
1728 printf("Pgsql::Exec(%s, %s)\n", table_name, sql);
1729
1730 if (!fPgsql)
1731 return DB_FILE_ERROR;
1732
1733 assert(fPgsql);
1734 assert(fRow == -1);
1735
1738 if(err != PGRES_TUPLES_OK) {
1739 if(err == PGRES_FATAL_ERROR) {
1740 // handle fatal error
1741 if(strstr(PQresultErrorMessage(fResult), "already exists"))
1742 return DB_KEY_EXIST;
1743 else return DB_FILE_ERROR;
1744 }
1745
1747 Disconnect();
1748 return ExecDisconnected(table_name, sql);
1749 }
1750 }
1751
1752 return DB_SUCCESS;
1753}
1754
1755int Pgsql::ExecDisconnected(const char* table_name, const char* sql)
1756{
1757 if (fDebug)
1758 printf("Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1759
1761 fDisconnectedBuffer.push_back(sql);
1762 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1763 cm_msg(MERROR, "Pgsql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1764 }
1765 } else {
1767 }
1768
1769 time_t now = time(NULL);
1770
1771 if (fNextReconnect == 0 || now >= fNextReconnect) {
1772 int status = Connect(fConnectString.c_str());
1773 if (status == DB_SUCCESS) {
1774 fNextReconnect = 0;
1776 } else {
1777 if (fNextReconnectDelaySec == 0) {
1779 } else if (fNextReconnectDelaySec < 10*60) {
1781 }
1782 if (fDebug) {
1783 cm_msg(MINFO, "Pgsql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1785 }
1787 }
1788 }
1789
1790 return DB_SUCCESS;
1791}
1792
1793int Pgsql::Prepare(const char* table_name, const char* sql)
1794{
1795 if (fDebug)
1796 printf("Pgsql::Prepare(%s, %s)\n", table_name, sql);
1797
1798 if (!fPgsql)
1799 return DB_FILE_ERROR;
1800
1801 assert(fPgsql);
1802 //assert(fResult==NULL);
1803 assert(fRow == -1);
1804
1806 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1807 // lost connection to server
1808 int status = Connect(fConnectString.c_str());
1809 if (status == DB_SUCCESS) {
1810 // Retry after reconnecting
1812 } else {
1813 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql, status);
1814 return DB_FILE_ERROR;
1815 }
1816 if (status) {
1817 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1818 return DB_FILE_ERROR;
1819 }
1820 cm_msg(MINFO, "Pgsql::Prepare", "Reconnected to PostgreSQL after long inactivity.");
1821 }
1822
1824
1825 return DB_SUCCESS;
1826}
1827
1828std::string Pgsql::BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints,
1829 const char* table_name, const char* column_name)
1830{
1831 std::string cmd;
1832 cmd += "SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1833
1834 cmd += " FROM unnest(( SELECT lttb";
1835 cmd += "(_t_time, ";
1836 cmd += column_name;
1837 cmd += ", ";
1838 cmd += std::to_string(npoints);
1839 cmd += ") ";
1840 cmd += "FROM ";
1841 cmd += QuoteId(table_name);
1842 cmd += " WHERE _t_time BETWEEN ";
1843 cmd += "to_timestamp(";
1844 cmd += TimeToString(start_time);
1845 cmd += ") AND to_timestamp(";
1846 cmd += TimeToString(end_time);
1847 cmd += ") )) ORDER BY time;";
1848
1849 return cmd;
1850}
1851
1852int Pgsql::Step()
1853{
1854 assert(fPgsql);
1855 assert(fResult);
1856
1857 fRow++;
1858
1859 if (fRow == PQntuples(fResult))
1860 return DB_NO_MORE_SUBKEYS;
1861
1862 return DB_SUCCESS;
1863}
1864
1865const char* Pgsql::GetText(int column)
1866{
1867 assert(fPgsql);
1868 assert(fResult);
1869 assert(fNumFields > 0);
1870 assert(column >= 0);
1871 assert(column < fNumFields);
1872
1873 return PQgetvalue(fResult, fRow, column);
1874}
1875
1876double Pgsql::GetDouble(int column)
1877{
1878 return atof(GetText(column));
1879}
1880
1881time_t Pgsql::GetTime(int column)
1882{
1883 return strtoul(GetText(column), NULL, 0);
1884}
1885
1886int Pgsql::Finalize()
1887{
1888 assert(fPgsql);
1889 assert(fResult);
1890
1891 fRow = -1;
1892 fNumFields = 0;
1893
1894 return DB_SUCCESS;
1895}
1896
1897const char* Pgsql::ColumnType(int midas_tid)
1898{
1899 assert(midas_tid>=0);
1900 assert(midas_tid<TID_LAST);
1901 return sql_type_pgsql[midas_tid];
1902}
1903
1904bool Pgsql::TypesCompatible(int midas_tid, const char* sql_type)
1905{
1906 if (/* DISABLES CODE */ (0))
1907 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1908
1909 //if (sql2midasType_mysql(sql_type) == midas_tid)
1910 // return true;
1911
1912 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1913 return true;
1914
1915 // permit writing FLOAT into DOUBLE
1916 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double precision")==0)
1917 return true;
1918
1919 // T2K quirk!
1920 // permit writing BYTE into signed tinyint
1921 if (midas_tid==TID_BYTE && strcmp(sql_type, "integer")==0)
1922 return true;
1923
1924 // T2K quirk!
1925 // permit writing WORD into signed tinyint
1926 if (midas_tid==TID_WORD && strcmp(sql_type, "integer")==0)
1927 return true;
1928
1929 if (/* DISABLES CODE */ (0))
1930 printf("type mismatch!\n");
1931
1932 return false;
1933}
1934
1935std::string Pgsql::QuoteId(const char* s)
1936{
1937 std::string q;
1938 q += '"';
1939 q += s;
1940 q += '"';
1941 return q;
1942}
1943
1944std::string Pgsql::QuoteString(const char* s)
1945{
1946 std::string q;
1947 q += '\'';
1948 q += s;
1949 q += '\'';
1950 return q;
1951}
1952
1953#endif // HAVE_PGSQL
1954
1955#ifdef HAVE_SQLITE
1956
1958// SQLITE database access //
1960
1961#include <sqlite3.h>
1962
1963typedef std::map<std::string, sqlite3*> DbMap;
1964
1965class Sqlite: public SqlBase
1966{
1967public:
1968 std::string fPath;
1969
1970 DbMap fMap;
1971
1972 // temporary storage of query data
1975
1976 Sqlite(); // ctor
1977 ~Sqlite(); // dtor
1978
1979 int Connect(const char* path);
1980 int Disconnect();
1981 bool IsConnected();
1982
1983 int ConnectTable(const char* table_name);
1984 sqlite3* GetTable(const char* table_name);
1985
1986 int ListTables(std::vector<std::string> *plist);
1987 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1988
1989 int Exec(const char* table_name, const char* sql);
1990 int ExecDisconnected(const char* table_name, const char* sql);
1991
1992 int Prepare(const char* table_name, const char* sql);
1993 int Step();
1994 const char* GetText(int column);
1995 time_t GetTime(int column);
1996 double GetDouble(int column);
1997 int Finalize();
1998
1999 int OpenTransaction(const char* table_name);
2000 int CommitTransaction(const char* table_name);
2001 int RollbackTransaction(const char* table_name);
2002
2003 const char* ColumnType(int midas_tid);
2004 bool TypesCompatible(int midas_tid, const char* sql_type);
2005
2006 std::string QuoteId(const char* s);
2007 std::string QuoteString(const char* s);
2008};
2009
2010std::string Sqlite::QuoteId(const char* s)
2011{
2012 std::string q;
2013 q += "\"";
2014 q += s;
2015 q += "\"";
2016 return q;
2017}
2018
2019std::string Sqlite::QuoteString(const char* s)
2020{
2021 std::string q;
2022 q += "\'";
2023 q += s;
2024 q += "\'";
2025 return q;
2026}
2027
2028const char* Sqlite::ColumnType(int midas_tid)
2029{
2030 assert(midas_tid>=0);
2031 assert(midas_tid<TID_LAST);
2032 return sql_type_sqlite[midas_tid];
2033}
2034
2035bool Sqlite::TypesCompatible(int midas_tid, const char* sql_type)
2036{
2037 if (0)
2038 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
2039
2040 //if (sql2midasType_sqlite(sql_type) == midas_tid)
2041 // return true;
2042
2043 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
2044 return true;
2045
2046 // permit writing FLOAT into DOUBLE
2047 if (midas_tid==TID_FLOAT && strcasecmp(sql_type, "double")==0)
2048 return true;
2049
2050 return false;
2051}
2052
2053const char* Sqlite::GetText(int column)
2054{
2055 return (const char*)sqlite3_column_text(fTempStmt, column);
2056}
2057
2058time_t Sqlite::GetTime(int column)
2059{
2061}
2062
2063double Sqlite::GetDouble(int column)
2064{
2066}
2067
2068Sqlite::Sqlite() // ctor
2069{
2070 fIsConnected = false;
2071 fTempDB = NULL;
2072 fTempStmt = NULL;
2073 fDebug = 0;
2074}
2075
2076Sqlite::~Sqlite() // dtor
2077{
2078 Disconnect();
2079}
2080
2081const char* xsqlite3_errstr(sqlite3* db, int errcode)
2082{
2083 //return sqlite3_errstr(errcode);
2084 return sqlite3_errmsg(db);
2085}
2086
2087int Sqlite::ConnectTable(const char* table_name)
2088{
2089 std::string fname = fPath + "mh_" + table_name + ".sqlite3";
2090
2091 sqlite3* db = NULL;
2092
2093 int status = sqlite3_open(fname.c_str(), &db);
2094
2095 if (status != SQLITE_OK) {
2096 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(), status, xsqlite3_errstr(db, status));
2098 db = NULL;
2099 return DB_FILE_ERROR;
2100 }
2101
2102#if SQLITE_VERSION_NUMBER >= 3006020
2104 if (status != SQLITE_OK) {
2105 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2106 }
2107#else
2108#warning Missing sqlite3_extended_result_codes()!
2109#endif
2110
2111 fMap[table_name] = db;
2112
2113 Exec(table_name, "PRAGMA journal_mode=persist;");
2114 Exec(table_name, "PRAGMA synchronous=normal;");
2115 //Exec(table_name, "PRAGMA synchronous=off;");
2116 Exec(table_name, "PRAGMA journal_size_limit=-1;");
2117
2118 if (0) {
2119 Exec(table_name, "PRAGMA legacy_file_format;");
2120 Exec(table_name, "PRAGMA synchronous;");
2121 Exec(table_name, "PRAGMA journal_mode;");
2122 Exec(table_name, "PRAGMA journal_size_limit;");
2123 }
2124
2125#ifdef SQLITE_LIMIT_COLUMN
2126 if (0) {
2128 printf("Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2129 }
2130#endif
2131
2132 if (fDebug)
2133 cm_msg(MINFO, "Sqlite::Connect", "Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2134
2135 return DB_SUCCESS;
2136}
2137
2138sqlite3* Sqlite::GetTable(const char* table_name)
2139{
2140 sqlite3* db = fMap[table_name];
2141
2142 if (db)
2143 return db;
2144
2145 int status = ConnectTable(table_name);
2146 if (status != DB_SUCCESS)
2147 return NULL;
2148
2149 return fMap[table_name];
2150}
2151
2152int Sqlite::Connect(const char* path)
2153{
2154 if (fIsConnected)
2155 Disconnect();
2156
2157 fPath = path;
2158
2159 // add trailing '/'
2160 if (fPath.length() > 0) {
2161 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
2162 fPath += DIR_SEPARATOR_STR;
2163 }
2164
2165 if (fDebug)
2166 cm_msg(MINFO, "Sqlite::Connect", "Connected to Sqlite database in \'%s\'", fPath.c_str());
2167
2168 fIsConnected = true;
2169
2170 return DB_SUCCESS;
2171}
2172
2173int Sqlite::Disconnect()
2174{
2175 if (!fIsConnected)
2176 return DB_SUCCESS;
2177
2178 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2179 const char* table_name = iter->first.c_str();
2180 sqlite3* db = iter->second;
2181 int status = sqlite3_close(db);
2182 if (status != SQLITE_OK) {
2183 cm_msg(MERROR, "Sqlite::Disconnect", "sqlite3_close(%s) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2184 }
2185 }
2186
2187 fMap.clear();
2188
2189 fIsConnected = false;
2190
2191 return DB_SUCCESS;
2192}
2193
2194bool Sqlite::IsConnected()
2195{
2196 return fIsConnected;
2197}
2198
2199int Sqlite::OpenTransaction(const char* table_name)
2200{
2201 int status = Exec(table_name, "BEGIN TRANSACTION");
2202 return status;
2203}
2204
2205int Sqlite::CommitTransaction(const char* table_name)
2206{
2207 int status = Exec(table_name, "COMMIT TRANSACTION");
2208 return status;
2209}
2210
2211int Sqlite::RollbackTransaction(const char* table_name)
2212{
2213 int status = Exec(table_name, "ROLLBACK TRANSACTION");
2214 return status;
2215}
2216
2217int Sqlite::Prepare(const char* table_name, const char* sql)
2218{
2219 sqlite3* db = GetTable(table_name);
2220 if (!db)
2221 return DB_FILE_ERROR;
2222
2223 if (fDebug)
2224 printf("Sqlite::Prepare(%s, %s)\n", table_name, sql);
2225
2226 assert(fTempDB==NULL);
2227 fTempDB = db;
2228
2229#if SQLITE_VERSION_NUMBER >= 3006020
2231#else
2232#warning Missing sqlite3_prepare_v2()!
2234#endif
2235
2236 if (status == SQLITE_OK)
2237 return DB_SUCCESS;
2238
2239 std::string sqlstring = sql;
2240 cm_msg(MERROR, "Sqlite::Prepare", "Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, xsqlite3_errstr(db, status));
2241
2242 fTempDB = NULL;
2243
2244 return DB_FILE_ERROR;
2245}
2246
2247int Sqlite::Step()
2248{
2249 if (0 && fDebug)
2250 printf("Sqlite::Step()\n");
2251
2252 assert(fTempDB);
2253 assert(fTempStmt);
2254
2256
2257 if (status == SQLITE_DONE)
2258 return DB_NO_MORE_SUBKEYS;
2259
2260 if (status == SQLITE_ROW)
2261 return DB_SUCCESS;
2262
2263 cm_msg(MERROR, "Sqlite::Step", "sqlite3_step() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2264
2265 return DB_FILE_ERROR;
2266}
2267
2268int Sqlite::Finalize()
2269{
2270 if (0 && fDebug)
2271 printf("Sqlite::Finalize()\n");
2272
2273 assert(fTempDB);
2274 assert(fTempStmt);
2275
2277
2278 if (status != SQLITE_OK) {
2279 cm_msg(MERROR, "Sqlite::Finalize", "sqlite3_finalize() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2280
2281 fTempDB = NULL;
2282 fTempStmt = NULL; // FIXME: maybe a memory leak?
2283 return DB_FILE_ERROR;
2284 }
2285
2286 fTempDB = NULL;
2287 fTempStmt = NULL;
2288
2289 return DB_SUCCESS;
2290}
2291
2292int Sqlite::ListTables(std::vector<std::string> *plist)
2293{
2294 if (!fIsConnected)
2295 return DB_FILE_ERROR;
2296
2297 if (fDebug)
2298 printf("Sqlite::ListTables at path [%s]\n", fPath.c_str());
2299
2300 int status;
2301
2302 const char* cmd = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2303
2304 DIR *dir = opendir(fPath.c_str());
2305 if (!dir) {
2306 cm_msg(MERROR, "Sqlite::ListTables", "Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2307 return HS_FILE_ERROR;
2308 }
2309
2310 while (1) {
2311 const struct dirent* de = readdir(dir);
2312 if (!de)
2313 break;
2314
2315 const char* dn = de->d_name;
2316
2317 //if (dn[0]!='m' || dn[1]!='h')
2318 //continue;
2319
2320 const char* s;
2321
2322 s = strstr(dn, "mh_");
2323 if (!s || s!=dn)
2324 continue;
2325
2326 s = strstr(dn, ".sqlite3");
2327 if (!s || s[8]!=0)
2328 continue;
2329
2330 char table_name[256];
2331 mstrlcpy(table_name, dn+3, sizeof(table_name));
2332 // FIXME: skip names like "xxx.sqlite3~" and "xxx.sqlite3-deleted"
2333 char* ss = strstr(table_name, ".sqlite3");
2334 if (!ss)
2335 continue;
2336 *ss = 0;
2337
2338 //printf("dn [%s] tn [%s]\n", dn, table_name);
2339
2340 status = Prepare(table_name, cmd);
2341 if (status != DB_SUCCESS)
2342 continue;
2343
2344 while (1) {
2345 status = Step();
2346 if (status != DB_SUCCESS)
2347 break;
2348
2349 const char* tn = GetText(0);
2350 //printf("table [%s]\n", tn);
2351 plist->push_back(tn);
2352 }
2353
2354 status = Finalize();
2355 }
2356
2357 closedir(dir);
2358 dir = NULL;
2359
2360 return DB_SUCCESS;
2361}
2362
2363int Sqlite::ListColumns(const char* table, std::vector<std::string> *plist)
2364{
2365 if (!fIsConnected)
2366 return DB_FILE_ERROR;
2367
2368 if (fDebug)
2369 printf("Sqlite::ListColumns for table \'%s\'\n", table);
2370
2371 std::string cmd;
2372 cmd = "PRAGMA table_info(";
2373 cmd += table;
2374 cmd += ");";
2375
2376 int status;
2377
2378 status = Prepare(table, cmd.c_str());
2379 if (status != DB_SUCCESS)
2380 return status;
2381
2382 while (1) {
2383 status = Step();
2384 if (status != DB_SUCCESS)
2385 break;
2386
2387 const char* colname = GetText(1);
2388 const char* coltype = GetText(2);
2389 //printf("column [%s] [%s]\n", colname, coltype);
2390 plist->push_back(colname); // column name
2391 plist->push_back(coltype); // column type
2392 }
2393
2394 status = Finalize();
2395
2396 return DB_SUCCESS;
2397}
2398
2399static int callback_debug = 0;
2400
2401static int callback(void *NotUsed, int argc, char **argv, char **azColName){
2402 if (callback_debug) {
2403 printf("history_sqlite::callback---->\n");
2404 for (int i=0; i<argc; i++){
2405 printf("history_sqlite::callback[%d] %s = %s\n", i, azColName[i], argv[i] ? argv[i] : "NULL");
2406 }
2407 }
2408 return 0;
2409}
2410
2411int Sqlite::Exec(const char* table_name, const char* sql)
2412{
2413 // return values:
2414 // DB_SUCCESS
2415 // DB_FILE_ERROR: not connected
2416 // DB_KEY_EXIST: "table already exists"
2417
2418 if (!fIsConnected)
2419 return DB_FILE_ERROR;
2420
2421 sqlite3* db = GetTable(table_name);
2422 if (!db)
2423 return DB_FILE_ERROR;
2424
2425 if (fDebug)
2426 printf("Sqlite::Exec(%s, %s)\n", table_name, sql);
2427
2428 int status;
2429
2430 callback_debug = fDebug;
2431 char* errmsg = NULL;
2432
2434 if (status != SQLITE_OK) {
2435 if (status == SQLITE_ERROR && strstr(errmsg, "duplicate column name"))
2436 return DB_KEY_EXIST;
2437 if (status == SQLITE_ERROR && strstr(errmsg, "already exists"))
2438 return DB_KEY_EXIST;
2439 std::string sqlstring = sql;
2440 cm_msg(MERROR, "Sqlite::Exec", "Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, errmsg);
2442 return DB_FILE_ERROR;
2443 }
2444
2445 return DB_SUCCESS;
2446}
2447
2448int Sqlite::ExecDisconnected(const char* table_name, const char* sql)
2449{
2450 cm_msg(MERROR, "Sqlite::Exec", "sqlite driver does not support disconnected operations");
2451 return DB_FILE_ERROR;
2452}
2453
2454#endif // HAVE_SQLITE
2455
2457// Methods of HsFileSchema //
2459
2460int HsFileSchema::write_event(const time_t t, const char* data, const size_t data_size)
2461{
2462 HsFileSchema* s = this;
2463
2464 assert(s->fVariables.size() == s->fOffsets.size());
2465
2466 if (s->fWriterFd < 0) {
2467 s->fWriterFd = open(s->fFileName.c_str(), O_RDWR);
2468 if (s->fWriterFd < 0) {
2469 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2470 return HS_FILE_ERROR;
2471 }
2472
2473 //static_assert((sizeof(int)==3),"here!");
2474 //static_assert((sizeof(off64_t)==7),"here!");
2475 //static_assert((sizeof(size_t)==3),"here!");
2476 //static_assert((sizeof(intmax_t)==3),"here!");
2477
2478 //off64_t zzz = 1;
2479 //off64_t xxx = zzz<<60;
2480 //int yyy = xxx;
2481 //printf("%d", yyy);
2482
2483 off64_t file_size = ::lseek64(s->fWriterFd, 0, SEEK_END);
2484
2485 if (file_size < 0) {
2486 cm_msg(MERROR, "FileHistory::write_event", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2487 return HS_FILE_ERROR;
2488 }
2489
2490 off64_t nrec = 0;
2491
2492 if (file_size >= s->fDataOffset) {
2493 nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2494 }
2495
2496 if (nrec < 0)
2497 nrec = 0;
2498
2500
2501 //printf("write_event: file_size %jd, nrec %jd, data_end %jd\n", (intmax_t)file_size, (intmax_t)nrec, (intmax_t)data_end);
2502
2503 if (data_end != file_size) {
2504 if (nrec > 0)
2505 cm_msg(MERROR, "FileHistory::write_event", "File \'%s\' may be truncated, data offset %jd, record size %zu, file size: %jd, should be %jd, making it so", s->fFileName.c_str(), (intmax_t)s->fDataOffset, s->fRecordSize, (intmax_t)file_size, (intmax_t)data_end);
2506
2508
2509 if (status64 < 0) {
2510 cm_msg(MERROR, "FileHistory::write_event", "Cannot seek \'%s\' to offset %jd, lseek64() errno %d (%s)", s->fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2511 return HS_FILE_ERROR;
2512 }
2513
2515
2516 if (status < 0) {
2517 cm_msg(MERROR, "FileHistory::write_event", "Cannot truncate \'%s\' to size %jd, ftruncate64() errno %d (%s)", s->fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2518 return HS_FILE_ERROR;
2519 }
2520 }
2521
2523 s->fFileSize = data_end;
2524 }
2525
2526 size_t expected_size = s->fRecordSize - 4;
2527
2528 // sanity check: record_size and n_bytes are computed from the byte counts in the file header
2529 assert(expected_size == s->fNumBytes);
2530
2531 if (s->fLastSize == 0)
2533
2534 if (data_size != s->fLastSize) {
2535 cm_msg(MERROR, "FileHistory::write_event", "Event \'%s\' data size mismatch, expected %zu bytes, got %zu bytes, previously %zu bytes", s->fEventName.c_str(), expected_size, data_size, s->fLastSize);
2536 //printf("schema:\n");
2537 //s->print();
2538
2539 if (data_size < expected_size)
2540 return HS_FILE_ERROR;
2541
2542 // truncate for now
2543 // data_size = expected_size;
2544 s->fLastSize = data_size;
2545 }
2546
2547 size_t size = 4 + expected_size;
2548
2549 if (size != s->fRecordBufferSize) {
2550 s->fRecordBuffer = (char*)realloc(s->fRecordBuffer, size);
2551 assert(s->fRecordBuffer != NULL);
2552 s->fRecordBufferSize = size;
2553 }
2554
2555 memcpy(s->fRecordBuffer, &t, 4);
2557
2558 ssize_t wr = write(s->fWriterFd, s->fRecordBuffer, size);
2559 if ((size_t)wr != size) {
2560 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%zu) returned %zd, errno %d (%s)", s->fFileName.c_str(), size, wr, errno, strerror(errno));
2561 return HS_FILE_ERROR;
2562 }
2563
2564 s->fFileSize += size;
2565
2566#if 0
2567 status = write(s->fWriterFd, &t, 4);
2568 if (status != 4) {
2569 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2570 return HS_FILE_ERROR;
2571 }
2572
2574 if (status != expected_size) {
2575 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) errno %d (%s)", s->fFileName.c_str(), data_size, errno, strerror(errno));
2576 return HS_FILE_ERROR;
2577 }
2578#endif
2579
2580 return HS_SUCCESS;
2581}
2582
2584{
2585 if (fWriterFd >= 0) {
2587 fWriterFd = -1;
2588 }
2589 return HS_SUCCESS;
2590}
2591
2592static int ReadRecord(const char* file_name, int fd, off64_t offset, size_t recsize, off64_t irec, char* rec)
2593{
2595
2597
2598 if (status64 < 0) {
2599 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', lseek64(%jd) errno %d (%s)", file_name, (intmax_t)fpos, errno, strerror(errno));
2600 return -1;
2601 }
2602
2603 ssize_t rd = ::read(fd, rec, recsize);
2604
2605 if (rd < 0) {
2606 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', read() errno %d (%s)", file_name, errno, strerror(errno));
2607 return -1;
2608 }
2609
2610 if (rd == 0) {
2611 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', unexpected end of file on read()", file_name);
2612 return -1;
2613 }
2614
2615 if ((size_t)rd != recsize) {
2616 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', short read() returned %zd instead of %zu bytes", file_name, rd, recsize);
2617 return -1;
2618 }
2619
2620 return HS_SUCCESS;
2621}
2622
2623static int FindTime(const char* file_name, int fd, off64_t offset, size_t recsize, off64_t nrec, time_t timestamp, off64_t* i1p, time_t* t1p, off64_t* i2p, time_t* t2p, time_t* tstart, time_t* tend, int debug)
2624{
2625 //
2626 // purpose: find location time timestamp inside given file.
2627 // uses binary search
2628 // returns:
2629 // tstart, tend - time of first and last data in a file
2630 // i1p,t1p - data just before timestamp, used as "last_written"
2631 // i2p,t2p - data at timestamp or after timestamp, used as starting point to read data from file
2632 // assertions:
2633 // tstart <= t1p < t2p <= tend
2634 // i1p+1==i2p
2635 // t1p < timestamp <= t2p
2636 //
2637 // special cases:
2638 // 1) timestamp <= tstart - all data is in the future, return i1p==-1, t1p==-1, i2p==0, t2p==tstart
2639 // 2) tend < timestamp - all the data is in the past, return i1p = nrec-1, t1p = tend, i2p = nrec, t2p = 0;
2640 // 3) nrec == 1 only one record in this file and it is older than the timestamp (tstart == tend < timestamp)
2641 //
2642
2643 int status;
2644 char* buf = new char[recsize];
2645
2646 assert(nrec > 0);
2647
2648 off64_t rec1 = 0;
2649 off64_t rec2 = nrec-1;
2650
2652 if (status != HS_SUCCESS) {
2653 delete[] buf;
2654 return HS_FILE_ERROR;
2655 }
2656
2657 time_t t1 = *(DWORD*)buf;
2658
2659 *tstart = t1;
2660
2661 // timestamp is older than any data in this file
2662 if (timestamp <= t1) {
2663 *i1p = -1;
2664 *t1p = 0;
2665 *i2p = 0;
2666 *t2p = t1;
2667 *tend = 0;
2668 delete[] buf;
2669 return HS_SUCCESS;
2670 }
2671
2672 assert(t1 < timestamp);
2673
2674 if (nrec == 1) {
2675 *i1p = 0;
2676 *t1p = t1;
2677 *i2p = nrec; // == 1
2678 *t2p = 0;
2679 *tend = t1;
2680 delete[] buf;
2681 return HS_SUCCESS;
2682 }
2683
2685 if (status != HS_SUCCESS) {
2686 delete[] buf;
2687 return HS_FILE_ERROR;
2688 }
2689
2690 time_t t2 = *(DWORD*)buf;
2691
2692 *tend = t2;
2693
2694 // all the data is in the past
2695 if (t2 < timestamp) {
2696 *i1p = rec2;
2697 *t1p = t2;
2698 *i2p = nrec;
2699 *t2p = 0;
2700 delete[] buf;
2701 return HS_SUCCESS;
2702 }
2703
2704 assert(t1 < timestamp);
2705 assert(timestamp <= t2);
2706
2707 if (debug)
2708 printf("FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s\n", (intmax_t)rec1, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2709
2710 // implement binary search
2711
2712 do {
2713 off64_t rec = (rec1+rec2)/2;
2714
2715 assert(rec >= 0);
2716 assert(rec < nrec);
2717
2719 if (status != HS_SUCCESS) {
2720 delete[] buf;
2721 return HS_FILE_ERROR;
2722 }
2723
2724 time_t t = *(DWORD*)buf;
2725
2726 if (timestamp <= t) {
2727 if (debug)
2728 printf("FindTime: rec %jd..(x)..%jd..%jd, time %s..(%s)..%s..%s\n", (intmax_t)rec1, (intmax_t)rec, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t).c_str(), TimeToString(t2).c_str());
2729
2730 rec2 = rec;
2731 t2 = t;
2732 } else {
2733 if (debug)
2734 printf("FindTime: rec %jd..%jd..(x)..%jd, time %s..%s..(%s)..%s\n", (intmax_t)rec1, (intmax_t)rec, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(t).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2735
2736 rec1 = rec;
2737 t1 = t;
2738 }
2739 } while (rec2 - rec1 > 1);
2740
2741 assert(rec1+1 == rec2);
2742 assert(t1 < timestamp);
2743 assert(timestamp <= t2);
2744
2745 if (debug)
2746 printf("FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s, this is the result.\n", (intmax_t)rec1, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2747
2748 *i1p = rec1;
2749 *t1p = t1;
2750
2751 *i2p = rec2;
2752 *t2p = t2;
2753
2754 delete[] buf;
2755 return HS_SUCCESS;
2756}
2757
2759 const int debug,
2760 time_t* last_written)
2761{
2762 int status;
2763 HsFileSchema* s = this;
2764
2765 if (debug)
2766 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str());
2767
2768 int fd = open(s->fFileName.c_str(), O_RDONLY);
2769 if (fd < 0) {
2770 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2771 return HS_FILE_ERROR;
2772 }
2773
2774 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2775
2776 if (file_size < 0) {
2777 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2778 return HS_FILE_ERROR;
2779 }
2780
2782 // empty file
2783 ::close(fd);
2784 if (last_written)
2785 *last_written = 0;
2786 return HS_SUCCESS;
2787 }
2788
2789 off64_t nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2790
2791 //printf("read_last_written: nrec %jd, file_size %jd, offset %jd, record size %zu\n", (intmax_t)nrec, (intmax_t)file_size, s->fDataOffset, s->fRecordSize);
2792
2793 if (nrec < 0)
2794 nrec = 0;
2795
2796 if (nrec < 1) {
2797 ::close(fd);
2798 if (last_written)
2799 *last_written = 0;
2800 return HS_SUCCESS;
2801 }
2802
2803 time_t lw = 0;
2804
2805 // read last record to check if desired time is inside or outside of the file
2806
2807 if (1) {
2808 char* buf = new char[s->fRecordSize];
2809
2810 status = ReadRecord(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec - 1, buf);
2811 if (status != HS_SUCCESS) {
2812 delete[] buf;
2813 ::close(fd);
2814 return HS_FILE_ERROR;
2815 }
2816
2817 lw = *(DWORD*)buf;
2818
2819 delete[] buf;
2820 }
2821
2822 if (lw >= timestamp) {
2823 off64_t irec = 0;
2824 time_t trec = 0;
2825 off64_t iunused = 0;
2826 time_t tunused = 0;
2827 time_t tstart = 0; // not used
2828 time_t tend = 0; // not used
2829
2830 status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*debug);
2831 if (status != HS_SUCCESS) {
2832 ::close(fd);
2833 return HS_FILE_ERROR;
2834 }
2835
2836 assert(trec < timestamp);
2837
2838 lw = trec;
2839 }
2840
2841 if (last_written)
2842 *last_written = lw;
2843
2844 if (debug)
2845 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s, last_written %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
2846
2847 assert(lw < timestamp);
2848
2849 ::close(fd);
2850
2851 return HS_SUCCESS;
2852}
2853
2854int HsFileSchema::read_data(const time_t start_time,
2855 const time_t end_time,
2856 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
2857 const int debug,
2858 std::vector<time_t>& last_time,
2860{
2861 HsFileSchema* s = this;
2862
2863 if (debug)
2864 printf("FileHistory::read_data: file %s, schema time %s..%s, read time %s..%s, %d vars\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var);
2865
2866 //if (1) {
2867 // printf("Last time: ");
2868 // for (int i=0; i<num_var; i++) {
2869 // printf(" %s", TimeToString(last_time[i]).c_str());
2870 // }
2871 // printf("\n");
2872 //}
2873
2874 if (debug) {
2875 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
2876 for (size_t i=0; i<var_schema_index.size(); i++) {
2877 printf(" %2d", var_schema_index[i]);
2878 }
2879 printf("\n");
2880 }
2881
2882 int fd = ::open(s->fFileName.c_str(), O_RDONLY);
2883 if (fd < 0) {
2884 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2885 return HS_FILE_ERROR;
2886 }
2887
2888 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2889
2890 if (file_size < 0) {
2891 cm_msg(MERROR, "FileHistory::read_data", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2892 ::close(fd);
2893 return HS_FILE_ERROR;
2894 }
2895
2897 // empty file
2898 ::close(fd);
2899 return HS_SUCCESS;
2900 }
2901
2902 off64_t nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2903
2904 //printf("read_data: nrec %jd, file_size %jd, offset %jd, record size %zu\n", (intmax_t)nrec, (intmax_t)file_size, s->fDataOffset, s->fRecordSize);
2905
2906 if (nrec < 0)
2907 nrec = 0;
2908
2909 if (nrec < 1) {
2910 ::close(fd);
2911 return HS_SUCCESS;
2912 }
2913
2914 off64_t iunused = 0;
2915 time_t tunused = 0;
2916 off64_t irec = 0;
2917 time_t trec = 0;
2918 time_t tstart = 0;
2919 time_t tend = 0;
2920
2921 int istatus = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*debug);
2922
2923 if (istatus != HS_SUCCESS) {
2924 ::close(fd);
2925 return HS_FILE_ERROR;
2926 }
2927
2928 if (debug) {
2929 printf("FindTime %d, nrec %jd, (%jd, %s) (%jd, %s), tstart %s, tend %s, want %s\n", istatus, (intmax_t)nrec, (intmax_t)iunused, TimeToString(tunused).c_str(), (intmax_t)irec, TimeToString(trec).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str(), TimeToString(start_time).c_str());
2930 }
2931
2932 if (irec < 0 || irec >= nrec) {
2933 // all data in this file is older than start_time
2934
2935 ::close(fd);
2936
2937 if (debug)
2938 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str());
2939
2940 return HS_SUCCESS;
2941 }
2942
2944 // data starts before time declared in schema
2945
2946 ::close(fd);
2947
2948 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of first data %s is before schema start time %s", s->fFileName.c_str(), TimeToString(tstart).c_str(), TimeToString(s->fTimeFrom).c_str());
2949
2950 return HS_FILE_ERROR;
2951 }
2952
2953 if (tend && s->fTimeTo && tend > s->fTimeTo) {
2954 // data ends after time declared in schema (overlaps with next file)
2955
2956 ::close(fd);
2957
2958 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of last data %s is after schema end time %s", s->fFileName.c_str(), TimeToString(tend).c_str(), TimeToString(s->fTimeTo).c_str());
2959
2960 return HS_FILE_ERROR;
2961 }
2962
2963 for (int i=0; i<num_var; i++) {
2964 int si = var_schema_index[i];
2965 if (si < 0)
2966 continue;
2967
2968 if (trec < last_time[i]) { // protect against duplicate and non-monotonous data
2969 ::close(fd);
2970
2971 cm_msg(MERROR, "FileHistory::read_data", "Internal history error at file \'%s\': variable %d data timestamp %s is before last timestamp %s", s->fFileName.c_str(), i, TimeToString(trec).c_str(), TimeToString(last_time[i]).c_str());
2972
2973 return HS_FILE_ERROR;
2974 }
2975 }
2976
2977 int count = 0;
2978
2980
2982
2983 //printf("read_data: lseek64() returned %jd, irec %jd, offset %jd, record size %zu, fpos %jd\n", xpos, irec, s->fDataOffset, s->fRecordSize, fpos);
2984
2985 if (xpos < 0) {
2986 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', lseek64(%jd) errno %d (%s)", s->fFileName.c_str(), (intmax_t)fpos, errno, strerror(errno));
2987 ::close(fd);
2988 return HS_FILE_ERROR;
2989 }
2990
2991 char* buf = new char[s->fRecordSize];
2992
2993 off64_t prec = irec;
2994
2995 while (1) {
2996 ssize_t rd = ::read(fd, buf, s->fRecordSize);
2997
2998 if (rd < 0) {
2999 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', read() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
3000 break;
3001 }
3002
3003 if (rd == 0) {
3004 // EOF
3005 break;
3006 }
3007
3008 if ((size_t)rd != s->fRecordSize) {
3009 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', short read() returned %zd instead of %zu bytes", s->fFileName.c_str(), rd, s->fRecordSize);
3010 break;
3011 }
3012
3013 prec++;
3014
3015 bool past_end_of_last_file = (s->fTimeTo == 0) && (prec > nrec);
3016
3017 time_t t = *(DWORD*)buf;
3018
3019 if (debug > 1)
3020 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, row time %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(t).c_str());
3021
3022 if (t < trec) {
3023 delete[] buf;
3024 ::close(fd);
3025 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is before start time %s", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(trec).c_str());
3026 return HS_FILE_ERROR;
3027 }
3028
3029 if (tend && (t > tend) && !past_end_of_last_file) {
3030 delete[] buf;
3031 ::close(fd);
3032 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is after last timestamp %s", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(tend).c_str());
3033 return HS_FILE_ERROR;
3034 }
3035
3036 if (t > end_time)
3037 break;
3038
3039 char* data = buf + 4;
3040
3041 for (int i=0; i<num_var; i++) {
3042 int si = var_schema_index[i];
3043 if (si < 0)
3044 continue;
3045
3046 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
3047 delete[] buf;
3048 ::close(fd);
3049
3050 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is before timestamp %s of variable %d", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(last_time[i]).c_str(), i);
3051
3052 return HS_FILE_ERROR;
3053 }
3054
3055 double v = 0;
3056 void* ptr = data + s->fOffsets[si];
3057
3058 int ii = var_index[i];
3059 assert(ii >= 0);
3060 assert(ii < s->fVariables[si].n_data);
3061
3062 switch (s->fVariables[si].type) {
3063 default:
3064 // unknown data type
3065 v = 0;
3066 break;
3067 case TID_BYTE:
3068 v = ((unsigned char*)ptr)[ii];
3069 break;
3070 case TID_SBYTE:
3071 v = ((signed char *)ptr)[ii];
3072 break;
3073 case TID_CHAR:
3074 v = ((char*)ptr)[ii];
3075 break;
3076 case TID_WORD:
3077 v = ((unsigned short *)ptr)[ii];
3078 break;
3079 case TID_SHORT:
3080 v = ((signed short *)ptr)[ii];
3081 break;
3082 case TID_DWORD:
3083 v = ((unsigned int *)ptr)[ii];
3084 break;
3085 case TID_INT:
3086 v = ((int *)ptr)[ii];
3087 break;
3088 case TID_BOOL:
3089 v = ((unsigned int *)ptr)[ii];
3090 break;
3091 case TID_FLOAT:
3092 v = ((float*)ptr)[ii];
3093 break;
3094 case TID_DOUBLE:
3095 v = ((double*)ptr)[ii];
3096 break;
3097 }
3098
3099 buffer[i]->Add(t, v);
3100 last_time[i] = t;
3101 }
3102 count++;
3103 }
3104
3105 delete[] buf;
3106
3107 ::close(fd);
3108
3109 if (debug) {
3110 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
3111 for (size_t i=0; i<var_schema_index.size(); i++) {
3112 printf(" %2d", var_schema_index[i]);
3113 }
3114 printf(" read %d rows\n", count);
3115 }
3116
3117 if (debug)
3118 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var, count);
3119
3120 return HS_SUCCESS;
3121}
3122
3124// Implementation of the MidasHistoryInterface //
3126
3128{
3129protected:
3130 int fDebug = 0;
3131 std::string fConnectString;
3132
3133 // writer data
3135 std::vector<HsSchema*> fWriterEvents;
3136
3137 // reader data
3139
3140public:
3142 {
3143 // empty
3144 }
3145
3147 {
3148 for (size_t i=0; i<fWriterEvents.size(); i++) {
3149 //printf("fWriterEvents[%zu] is %p\n", i, fWriterEvents[i]);
3150 if (fWriterEvents[i]) {
3151 delete fWriterEvents[i];
3152 fWriterEvents[i] = NULL;
3153 }
3154 }
3155 fWriterEvents.clear();
3156 }
3157
3158 virtual int hs_set_debug(int debug)
3159 {
3160 int old = fDebug;
3161 fDebug = debug;
3162 return old;
3163 }
3164
3165 virtual int hs_connect(const char* connect_string) = 0;
3166 virtual int hs_disconnect() = 0;
3167
3168protected:
3170 // Schema functions //
3172
3173 // load existing schema
3174 virtual int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp) = 0; // event_name: =NULL means read only event names, =event_name means load tag names for all matching events; timestamp: =0 means read all schema all the way to the beginning of time, =time means read schema in effect at this time and all newer schema
3175
3176 // return a new copy of a schema for writing into history. If schema for this event does not exist, it will be created
3177 virtual HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]) = 0;
3178
3179 // if output file is too big, close it, create a new output file
3180 virtual HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s) = 0;
3181
3182public:
3184 // Functions used by mlogger //
3186
3187 int hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
3188 int hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer);
3189 int hs_flush_buffers();
3190
3192 // Functions used by mhttpd //
3194
3195 int hs_clear_cache();
3196 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3197 int hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags);
3198 int hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[]);
3199 int hs_read_buffer(time_t start_time, time_t end_time,
3200 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3202 int hs_status[]);
3203
3204 /*------------------------------------------------------------------*/
3205
3207 {
3208 public:
3212
3214
3218 double **fDataBuffer;
3219
3221
3223 {
3224 fNumAdded = 0;
3225
3229
3230 fNumAlloc = 0;
3231 fNumEntries = NULL;
3232 fTimeBuffer = NULL;
3233 fDataBuffer = NULL;
3234
3235 fPrevTime = 0;
3236 }
3237
3238 ~ReadBuffer() // dtor
3239 {
3240 }
3241
3243 {
3244 if (wantalloc < fNumAlloc - 10)
3245 return;
3246
3247 int newalloc = fNumAlloc*2;
3248
3249 if (newalloc <= 1000)
3250 newalloc = wantalloc + 1000;
3251
3252 //printf("wantalloc %d, fNumEntries %d, fNumAlloc %d, newalloc %d\n", wantalloc, *fNumEntries, fNumAlloc, newalloc);
3253
3255 assert(*fTimeBuffer);
3256
3257 *fDataBuffer = (double*)realloc(*fDataBuffer, sizeof(double)*newalloc);
3258 assert(*fDataBuffer);
3259
3261 }
3262
3263 void Add(time_t t, double v)
3264 {
3265 if (t < fFirstTime)
3266 return;
3267 if (t > fLastTime)
3268 return;
3269
3270 fNumAdded++;
3271
3272 if ((fPrevTime==0) || (t >= fPrevTime + fInterval)) {
3273 int pos = *fNumEntries;
3274
3275 Realloc(pos + 1);
3276
3277 (*fTimeBuffer)[pos] = t;
3278 (*fDataBuffer)[pos] = v;
3279
3280 (*fNumEntries) = pos + 1;
3281
3282 fPrevTime = t;
3283 }
3284 }
3285
3286 void Finish()
3287 {
3288
3289 }
3290 };
3291
3292 /*------------------------------------------------------------------*/
3293
3294 int hs_read(time_t start_time, time_t end_time, time_t interval,
3295 int num_var,
3296 const char* const event_name[], const char* const var_name[], const int var_index[],
3297 int num_entries[],
3298 time_t* time_buffer[], double* data_buffer[],
3299 int st[]);
3300 /*------------------------------------------------------------------*/
3301
3302
3303 int hs_read_binned(time_t start_time, time_t end_time, int num_bins,
3304 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3305 int num_entries[],
3306 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3309 time_t last_time[], double last_value[],
3310 int st[]);
3311};
3312
3314{
3315 fNumEntries = 0;
3316
3320
3321 fSum0 = new double[num_bins];
3322 fSum1 = new double[num_bins];
3323 fSum2 = new double[num_bins];
3324
3325 for (int i=0; i<num_bins; i++) {
3326 fSum0[i] = 0;
3327 fSum1[i] = 0;
3328 fSum2[i] = 0;
3329 }
3330}
3331
3333{
3334 delete fSum0; fSum0 = NULL;
3335 delete fSum1; fSum1 = NULL;
3336 delete fSum2; fSum2 = NULL;
3337 // poison the pointers
3338 fCount = NULL;
3339 fMean = NULL;
3340 fRms = NULL;
3341 fMin = NULL;
3342 fMax = NULL;
3349}
3350
3352{
3353 for (int ibin = 0; ibin < fNumBins; ibin++) {
3354 if (fMin)
3355 fMin[ibin] = 0;
3356 if (fMax)
3357 fMax[ibin] = 0;
3358 if (fBinsFirstTime)
3359 fBinsFirstTime[ibin] = 0;
3360 if (fBinsFirstValue)
3361 fBinsFirstValue[ibin] = 0;
3362 if (fBinsLastTime)
3363 fBinsLastTime[ibin] = 0;
3364 if (fBinsLastValue)
3365 fBinsLastValue[ibin] = 0;
3366 }
3367 if (fLastTimePtr)
3368 *fLastTimePtr = 0;
3369 if (fLastValuePtr)
3370 *fLastValuePtr = 0;
3371}
3372
3374{
3375 if (t < fFirstTime)
3376 return;
3377 if (t > fLastTime)
3378 return;
3379
3380 fNumEntries++;
3381
3382 double a = (double)(t - fFirstTime);
3383 double b = (double)(fLastTime - fFirstTime);
3384 double fbin = fNumBins*a/b;
3385
3386 int ibin = (int)fbin;
3387
3388 if (ibin < 0)
3389 ibin = 0;
3390 else if (ibin >= fNumBins)
3391 ibin = fNumBins-1;
3392
3393 if (fSum0[ibin] == 0) {
3394 if (fMin)
3395 fMin[ibin] = v;
3396 if (fMax)
3397 fMax[ibin] = v;
3398 if (fBinsFirstTime)
3399 fBinsFirstTime[ibin] = t;
3400 if (fBinsFirstValue)
3401 fBinsFirstValue[ibin] = v;
3402 if (fBinsLastTime)
3403 fBinsLastTime[ibin] = t;
3404 if (fBinsLastValue)
3405 fBinsLastValue[ibin] = v;
3406 if (fLastTimePtr)
3407 *fLastTimePtr = t;
3408 if (fLastValuePtr)
3409 *fLastValuePtr = v;
3410 }
3411
3412 fSum0[ibin] += 1.0;
3413 fSum1[ibin] += v;
3414 fSum2[ibin] += v*v;
3415
3416 if (fMin)
3417 if (v < fMin[ibin])
3418 fMin[ibin] = v;
3419
3420 if (fMax)
3421 if (v > fMax[ibin])
3422 fMax[ibin] = v;
3423
3424 // NOTE: this assumes t and v are sorted by time.
3425 if (fBinsLastTime)
3426 fBinsLastTime[ibin] = t;
3427 if (fBinsLastValue)
3428 fBinsLastValue[ibin] = v;
3429
3430 if (fLastTimePtr)
3431 if (t > *fLastTimePtr) {
3432 *fLastTimePtr = t;
3433 if (fLastValuePtr)
3434 *fLastValuePtr = v;
3435 }
3436}
3437
3439{
3440 for (int i=0; i<fNumBins; i++) {
3441 double num = fSum0[i];
3442 double mean = 0;
3443 double variance = 0;
3444 if (num > 0) {
3445 mean = fSum1[i]/num;
3447 }
3448 double rms = 0;
3449 if (variance > 0)
3450 rms = sqrt(variance);
3451
3452 if (fCount)
3453 fCount[i] = (int)num;
3454
3455 if (fMean)
3456 fMean[i] = mean;
3457
3458 if (fRms)
3459 fRms[i] = rms;
3460
3461 if (num == 0) {
3462 if (fMin)
3463 fMin[i] = 0;
3464 if (fMax)
3465 fMax[i] = 0;
3466 }
3467 }
3468}
3469
3470int SchemaHistoryBase::hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
3471{
3472 if (fDebug) {
3473 printf("hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3474 if (fDebug > 1)
3475 PrintTags(ntags, tags);
3476 }
3477
3478 // delete all events with the same name
3479 for (size_t i=0; i<fWriterEvents.size(); i++)
3480 if (fWriterEvents[i])
3481 if (event_name_cmp(fWriterEvents[i]->fEventName, event_name)==0) {
3482 if (fDebug)
3483 printf("deleting exising event %s\n", event_name);
3484 fWriterEvents[i]->close();
3485 delete fWriterEvents[i];
3486 fWriterEvents[i] = NULL;
3487 }
3488
3489 // check for wrong types etc
3490 for (int i=0; i<ntags; i++) {
3491 if (strlen(tags[i].name) < 1) {
3492 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has empty name at index %d", event_name, i);
3493 return HS_FILE_ERROR;
3494 }
3495 if (tags[i].name[0] == ' ') {
3496 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has name \'%s\' starting with a blank", event_name, tags[i].name);
3497 return HS_FILE_ERROR;
3498 }
3499 if (tags[i].type <= 0 || tags[i].type >= TID_LAST) {
3500 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3501 event_name, tags[i].name, i, tags[i].type);
3502 return HS_FILE_ERROR;
3503 }
3504 if (tags[i].type == TID_STRING) {
3505 cm_msg(MERROR, "hs_define_event",
3506 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3507 tags[i].name, i);
3508 return HS_FILE_ERROR;
3509 }
3510 if (tags[i].n_data <= 0) {
3511 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3512 event_name, tags[i].name, i, tags[i].n_data);
3513 return HS_FILE_ERROR;
3514 }
3515 }
3516
3517 // check for duplicate names. Done by sorting, since this takes only O(N*log*N))
3518 std::vector<std::string> names;
3519 for (int i=0; i<ntags; i++) {
3520 std::string str(tags[i].name);
3521 std::transform(str.begin(), str.end(), str.begin(), ::toupper);
3522 names.push_back(str);
3523 }
3524 std::sort(names.begin(), names.end());
3525 for (int i=0; i<ntags-1; i++) {
3526 if (names[i] == names[i + 1]) {
3527 cm_msg(MERROR, "hs_define_event",
3528 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3529 names[i].c_str());
3530 return HS_FILE_ERROR;
3531 }
3532 }
3533
3534 HsSchema* s = new_event(event_name, timestamp, ntags, tags);
3535 if (!s)
3536 return HS_FILE_ERROR;
3537
3538 s->fDisabled = false;
3539
3541
3542 // find empty slot in events list
3543 for (size_t i=0; i<fWriterEvents.size(); i++)
3544 if (!fWriterEvents[i]) {
3545 fWriterEvents[i] = s;
3546 s = NULL;
3547 break;
3548 }
3549
3550 // if no empty slots, add at the end
3551 if (s)
3552 fWriterEvents.push_back(s);
3553
3554 return HS_SUCCESS;
3555}
3556
3557int SchemaHistoryBase::hs_write_event(const char* event_name, time_t timestamp, int xbuffer_size, const char* buffer)
3558{
3559 if (fDebug)
3560 printf("hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (int)timestamp, xbuffer_size);
3561
3562 assert(xbuffer_size > 0);
3563
3564 size_t buffer_size = xbuffer_size;
3565
3566 HsSchema *s = NULL;
3567
3568 // find this event
3569 for (size_t i=0; i<fWriterEvents.size(); i++)
3570 if (fWriterEvents[i] && (event_name_cmp(fWriterEvents[i]->fEventName, event_name)==0)) {
3571 s = fWriterEvents[i];
3572 break;
3573 }
3574
3575 // not found
3576 if (!s)
3577 return HS_UNDEFINED_EVENT;
3578
3579 // deactivated because of error?
3580 if (s->fDisabled) {
3581 //printf("HERE!\n");
3582 return HS_FILE_ERROR;
3583 }
3584
3585 s = maybe_reopen(event_name, timestamp, s);
3586
3587 assert(s != NULL);
3588
3589 if (s->fNumBytes == 0) { // compute expected data size
3590 // NB: history data does not have any padding!
3591 for (size_t i=0; i<s->fVariables.size(); i++) {
3592 s->fNumBytes += s->fVariables[i].n_bytes;
3593 }
3594 }
3595
3596 int status;
3597
3598 if (buffer_size > s->fNumBytes) { // too many bytes!
3599 if (s->fCountWriteOversize == 0) {
3600 // only report first occurence
3601 // count of all occurences is reported by HsSchema destructor
3602 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3603 }
3605 if (buffer_size > s->fWriteMaxSize)
3606 s->fWriteMaxSize = buffer_size;
3607 status = s->write_event(timestamp, buffer, s->fNumBytes);
3608 } else if (buffer_size < s->fNumBytes) { // too few bytes
3609 if (s->fCountWriteUndersize == 0) {
3610 // only report first occurence
3611 // count of all occurences is reported by HsSchema destructor
3612 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3613 }
3615 if (s->fWriteMinSize == 0)
3616 s->fWriteMinSize = buffer_size;
3617 else if (buffer_size < s->fWriteMinSize)
3618 s->fWriteMinSize = buffer_size;
3619 char* tmp = (char*)malloc(s->fNumBytes);
3620 memcpy(tmp, buffer, buffer_size);
3621 memset(tmp + buffer_size, 0, s->fNumBytes - buffer_size);
3622 status = s->write_event(timestamp, tmp, s->fNumBytes);
3623 free(tmp);
3624 } else {
3625 assert(buffer_size == s->fNumBytes); // obviously
3626 status = s->write_event(timestamp, buffer, buffer_size);
3627 }
3628
3629 // if could not write event, deactivate it
3630 if (status != HS_SUCCESS) {
3631 s->fDisabled = true;
3632 cm_msg(MERROR, "hs_write_event", "Event \'%s\' disabled after write error %d", event_name, status);
3633 return HS_FILE_ERROR;
3634 }
3635
3636 return HS_SUCCESS;
3637}
3638
3640{
3641 int status = HS_SUCCESS;
3642
3643 if (fDebug)
3644 printf("hs_flush_buffers!\n");
3645
3646 for (size_t i=0; i<fWriterEvents.size(); i++)
3647 if (fWriterEvents[i]) {
3648 int xstatus = fWriterEvents[i]->flush_buffers();
3649 if (xstatus != HS_SUCCESS)
3650 status = xstatus;
3651 }
3652
3653 return status;
3654}
3655
3657// Functions used by mhttpd //
3659
3661{
3662 if (fDebug)
3663 printf("SchemaHistoryBase::hs_clear_cache!\n");
3664
3667
3668 return HS_SUCCESS;
3669}
3670
3671int SchemaHistoryBase::hs_get_events(time_t t, std::vector<std::string> *pevents)
3672{
3673 if (fDebug)
3674 printf("hs_get_events, time %s\n", TimeToString(t).c_str());
3675
3677 if (status != HS_SUCCESS)
3678 return status;
3679
3680 if (fDebug) {
3681 printf("hs_get_events: available schema:\n");
3682 fReaderSchema.print(false);
3683 }
3684
3685 assert(pevents);
3686
3687 for (size_t i=0; i<fReaderSchema.size(); i++) {
3688 HsSchema* s = fReaderSchema[i];
3689 if (t && s->fTimeTo && s->fTimeTo < t)
3690 continue;
3691 bool dupe = false;
3692 for (size_t j=0; j<pevents->size(); j++)
3693 if (event_name_cmp((*pevents)[j], s->fEventName.c_str())==0) {
3694 dupe = true;
3695 break;
3696 }
3697
3698 if (!dupe)
3699 pevents->push_back(s->fEventName);
3700 }
3701
3702 std::sort(pevents->begin(), pevents->end());
3703
3704 if (fDebug) {
3705 printf("hs_get_events: returning %zu events\n", pevents->size());
3706 for (size_t i=0; i<pevents->size(); i++) {
3707 printf(" %zu: [%s]\n", i, (*pevents)[i].c_str());
3708 }
3709 }
3710
3711 return HS_SUCCESS;
3712}
3713
3714int SchemaHistoryBase::hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags)
3715{
3716 if (fDebug)
3717 printf("hs_get_tags: event [%s], time %s\n", event_name, TimeToString(t).c_str());
3718
3719 assert(ptags);
3720
3721 int status = read_schema(&fReaderSchema, event_name, t);
3722 if (status != HS_SUCCESS)
3723 return status;
3724
3725 bool found_event = false;
3726 for (size_t i=0; i<fReaderSchema.size(); i++) {
3727 HsSchema* s = fReaderSchema[i];
3728 if (t && s->fTimeTo && s->fTimeTo < t)
3729 continue;
3730
3731 if (event_name_cmp(s->fEventName, event_name) != 0)
3732 continue;
3733
3734 found_event = true;
3735
3736 for (size_t i=0; i<s->fVariables.size(); i++) {
3737 const char* tagname = s->fVariables[i].name.c_str();
3738 //printf("event_name [%s], table_name [%s], column name [%s], tag name [%s]\n", event_name, tn.c_str(), cn.c_str(), tagname);
3739
3740 bool dupe = false;
3741
3742 for (size_t k=0; k<ptags->size(); k++)
3743 if (strcasecmp((*ptags)[k].name, tagname) == 0) {
3744 dupe = true;
3745 break;
3746 }
3747
3748 if (!dupe) {
3749 TAG t;
3750 mstrlcpy(t.name, tagname, sizeof(t.name));
3751 t.type = s->fVariables[i].type;
3752 t.n_data = s->fVariables[i].n_data;
3753
3754 ptags->push_back(t);
3755 }
3756 }
3757 }
3758
3759 if (!found_event)
3760 return HS_UNDEFINED_EVENT;
3761
3762 if (fDebug) {
3763 printf("hs_get_tags: event [%s], returning %zu tags\n", event_name, ptags->size());
3764 for (size_t i=0; i<ptags->size(); i++) {
3765 printf(" tag[%zu]: %s[%d] type %d\n", i, (*ptags)[i].name, (*ptags)[i].n_data, (*ptags)[i].type);
3766 }
3767 }
3768
3769 return HS_SUCCESS;
3770}
3771
3772int SchemaHistoryBase::hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[])
3773{
3774 if (fDebug) {
3775 printf("hs_get_last_written: timestamp %s, num_var %d\n", TimeToString(timestamp).c_str(), num_var);
3776 }
3777
3778 for (int j=0; j<num_var; j++) {
3779 last_written[j] = 0;
3780 }
3781
3782 for (int i=0; i<num_var; i++) {
3783 int status = read_schema(&fReaderSchema, event_name[i], 0);
3784 if (status != HS_SUCCESS)
3785 return status;
3786 }
3787
3788 //fReaderSchema.print(false);
3789
3790 for (int i=0; i<num_var; i++) {
3791 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3793 // schema is too new
3794 if (s->fTimeFrom && s->fTimeFrom >= timestamp)
3795 continue;
3796 // this schema is newer than last_written and may contain newer data?
3797 if (s->fTimeFrom && s->fTimeFrom < last_written[i])
3798 continue;
3799 // schema for the variables we want?
3800 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3801 if (sindex < 0)
3802 continue;
3803
3804 time_t lw = 0;
3805
3806 int status = s->read_last_written(timestamp, fDebug, &lw);
3807
3808 if (status == HS_SUCCESS && lw != 0) {
3809 for (int j=0; j<num_var; j++) {
3810 int sj = s->match_event_var(event_name[j], var_name[j], var_index[j]);
3811 if (sj < 0)
3812 continue;
3813
3814 if (lw > last_written[j])
3815 last_written[j] = lw;
3816 }
3817 }
3818 }
3819 }
3820
3821 if (fDebug) {
3822 printf("hs_get_last_written: timestamp time %s, num_var %d, result:\n", TimeToString(timestamp).c_str(), num_var);
3823 for (int i=0; i<num_var; i++) {
3824 printf(" event [%s] tag [%s] index [%d] last_written %s\n", event_name[i], var_name[i], var_index[i], TimeToString(last_written[i]).c_str());
3825 }
3826 }
3827
3828 return HS_SUCCESS;
3829}
3830
3832 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3834 int hs_status[])
3835{
3836 if (fDebug)
3837 printf("hs_read_buffer: %d variables, start time %s, end time %s\n", num_var, TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
3838
3839 for (int i=0; i<num_var; i++) {
3840 int status = read_schema(&fReaderSchema, event_name[i], start_time);
3841 if (status != HS_SUCCESS)
3842 return status;
3843 }
3844
3845#if 0
3846 if (fDebug)
3847 fReaderSchema.print(false);
3848#endif
3849
3850 for (int i=0; i<num_var; i++) {
3852 }
3853
3854 //for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3855 // HsSchema* s = fReaderSchema[ss];
3856 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3857 // assert(fs != NULL);
3858 // printf("schema %d from %s to %s, filename %s\n", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3859 //}
3860
3861 // check that schema are sorted by time
3862
3863#if 0
3864 // check that schema list is sorted by time, descending fTimeFrom, newest schema first
3865 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3866 if (fDebug) {
3867 //printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d %d %d\n", ss, fReaderSchema.size(),
3868 // TimeToString(fReaderSchema[ss-1]->fTimeFrom).c_str(),
3869 // TimeToString(fReaderSchema[ss]->fTimeFrom).c_str(),
3870 // TimeToString(fReaderSchema[ss]->fTimeTo).c_str(),
3871 // fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo,
3872 // fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom,
3873 // (fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo) && (fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom));
3874 printf("Schema %zu/%zu: from %s to %s, name %s\n", ss, fReaderSchema.size(),
3875 TimeToString(fReaderSchema[ss]->fTimeFrom).c_str(),
3876 TimeToString(fReaderSchema[ss]->fTimeTo).c_str(),
3877 fReaderSchema[ss]->fEventName.c_str());
3878 }
3879
3880 if (ss > 0) {
3881 //if ((fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo) && (fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom)) {
3882 if ((fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeFrom)) {
3883 // good
3884 } else {
3885 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3886 return HS_FILE_ERROR;
3887 }
3888 }
3889 }
3890#endif
3891
3892 std::vector<HsSchema*> slist;
3893 std::vector<std::vector<int>> smap;
3894
3895 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3897 // schema is too new?
3898 if (s->fTimeFrom && s->fTimeFrom > end_time)
3899 continue;
3900 // schema is too old
3901 if (s->fTimeTo && s->fTimeTo < start_time)
3902 continue;
3903
3904 std::vector<int> sm;
3905
3906 for (int i=0; i<num_var; i++) {
3907 // schema for the variables we want?
3908 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3909 if (sindex < 0)
3910 continue;
3911
3912 if (sm.empty()) {
3913 for (int i=0; i<num_var; i++) {
3914 sm.push_back(-1);
3915 }
3916 }
3917
3918 sm[i] = sindex;
3919 }
3920
3921 if (!sm.empty()) {
3922 slist.push_back(s);
3923 smap.push_back(sm);
3924 }
3925 }
3926
3927 if (0||fDebug) {
3928 printf("Found %zu matching schema:\n", slist.size());
3929
3930 for (size_t i=0; i<slist.size(); i++) {
3931 HsSchema* s = slist[i];
3932 s->print();
3933 for (int k=0; k<num_var; k++)
3934 printf(" tag %s[%d] sindex %d\n", var_name[k], var_index[k], smap[i][k]);
3935 }
3936 }
3937
3938 //for (size_t ss=0; ss<slist.size(); ss++) {
3939 // HsSchema* s = slist[ss];
3940 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3941 // assert(fs != NULL);
3942 // printf("schema %zu from %s to %s, filename %s", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3943 // printf(" smap ");
3944 // for (int k=0; k<num_var; k++)
3945 // printf(" %2d", smap[ss][k]);
3946 // printf("\n");
3947 //}
3948
3949 for (size_t ss=1; ss<slist.size(); ss++) {
3950 if (fDebug) {
3951 printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3952 TimeToString(slist[ss-1]->fTimeFrom).c_str(),
3953 TimeToString(slist[ss]->fTimeFrom).c_str(),
3954 TimeToString(slist[ss]->fTimeTo).c_str(),
3955 slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom);
3956 }
3957 if (slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom) {
3958 // good
3959 } else {
3960 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3961 return HS_FILE_ERROR;
3962 }
3963 }
3964
3965 std::vector<time_t> last_time;
3966
3967 for (int i=0; i<num_var; i++) {
3968 last_time.push_back(start_time);
3969 }
3970
3971 if (slist.size() > 0) {
3972 for (size_t i=slist.size()-1; ; i--) {
3973 HsSchema* s = slist[i];
3974
3975 int status = s->read_data(start_time, end_time, num_var, smap[i], var_index, fDebug, last_time, buffer);
3976
3977 if (status == HS_SUCCESS) {
3978 for (int j=0; j<num_var; j++) {
3979 if (smap[i][j] >= 0)
3981 }
3982 }
3983
3984 if (i==0)
3985 break;
3986 }
3987 }
3988
3989 return HS_SUCCESS;
3990}
3991
3993 int num_var,
3994 const char* const event_name[], const char* const var_name[], const int var_index[],
3995 int num_entries[],
3996 time_t* time_buffer[], double* data_buffer[],
3997 int st[])
3998{
3999 int status;
4000
4001 ReadBuffer** buffer = new ReadBuffer*[num_var];
4003
4004 for (int i=0; i<num_var; i++) {
4005 buffer[i] = new ReadBuffer(start_time, end_time, interval);
4006 bi[i] = buffer[i];
4007
4008 // make sure outputs are initialized to something sane
4009 if (num_entries)
4010 num_entries[i] = 0;
4011 if (time_buffer)
4012 time_buffer[i] = NULL;
4013 if (data_buffer)
4014 data_buffer[i] = NULL;
4015 if (st)
4016 st[i] = 0;
4017
4018 if (num_entries)
4019 buffer[i]->fNumEntries = &num_entries[i];
4020 if (time_buffer)
4021 buffer[i]->fTimeBuffer = &time_buffer[i];
4022 if (data_buffer)
4023 buffer[i]->fDataBuffer = &data_buffer[i];
4024 }
4025
4026 status = hs_read_buffer(start_time, end_time,
4027 num_var, event_name, var_name, var_index,
4028 bi, st);
4029
4030 for (int i=0; i<num_var; i++) {
4031 buffer[i]->Finish();
4032 delete buffer[i];
4033 }
4034
4035 delete[] buffer;
4036 delete[] bi;
4037
4038 return status;
4039}
4040
4042 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
4043 int num_entries[],
4044 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
4047 time_t last_time[], double last_value[],
4048 int st[])
4049{
4050 int status;
4051
4054
4055 for (int i=0; i<num_var; i++) {
4056 buffer[i] = new MidasHistoryBinnedBuffer(start_time, end_time, num_bins);
4057 xbuffer[i] = buffer[i];
4058
4059 if (count_bins)
4060 buffer[i]->fCount = count_bins[i];
4061 if (mean_bins)
4062 buffer[i]->fMean = mean_bins[i];
4063 if (rms_bins)
4064 buffer[i]->fRms = rms_bins[i];
4065 if (min_bins)
4066 buffer[i]->fMin = min_bins[i];
4067 if (max_bins)
4068 buffer[i]->fMax = max_bins[i];
4069 if (bins_first_time)
4070 buffer[i]->fBinsFirstTime = bins_first_time[i];
4071 if (bins_first_value)
4072 buffer[i]->fBinsFirstValue = bins_first_value[i];
4073 if (bins_last_time)
4074 buffer[i]->fBinsLastTime = bins_last_time[i];
4075 if (bins_last_value)
4076 buffer[i]->fBinsLastValue = bins_last_value[i];
4077 if (last_time)
4078 buffer[i]->fLastTimePtr = &last_time[i];
4079 if (last_value)
4080 buffer[i]->fLastValuePtr = &last_value[i];
4081
4082 buffer[i]->Start();
4083 }
4084
4085 status = hs_read_buffer(start_time, end_time,
4086 num_var, event_name, var_name, var_index,
4087 xbuffer,
4088 st);
4089
4090 for (int i=0; i<num_var; i++) {
4091 buffer[i]->Finish();
4092 if (num_entries)
4093 num_entries[i] = buffer[i]->fNumEntries;
4094 if (0) {
4095 for (int j=0; j<num_bins; j++) {
4096 printf("var %d bin %d count %d, first %s last %s value first %f last %f\n", i, j, count_bins[i][j], TimeToString(bins_first_time[i][j]).c_str(), TimeToString(bins_last_time[i][j]).c_str(), bins_first_value[i][j], bins_last_value[i][j]);
4097 }
4098 }
4099 delete buffer[i];
4100 }
4101
4102 delete[] buffer;
4103 delete[] xbuffer;
4104
4105 return status;
4106}
4107
4109// SQL schema //
4111
4113{
4114 if (!fSql->IsConnected()) {
4115 return HS_SUCCESS;
4116 }
4117
4118 int status = HS_SUCCESS;
4119 if (get_transaction_count() > 0) {
4122 }
4123 return status;
4124}
4125
4126int HsSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4127{
4128 if (!MatchEventName(this->fEventName.c_str(), event_name))
4129 return -1;
4130
4131 for (size_t j=0; j<this->fVariables.size(); j++) {
4132 if (MatchTagName(this->fVariables[j].name.c_str(), this->fVariables[j].n_data, var_name, var_index)) {
4133 // Second clause in if() is case where MatchTagName used the "alternate tag name".
4134 // E.g. our variable name is "IM05[3]" (n_data=1), but we're looking for var_name="IM05" and var_index=3.
4135 if (var_index < this->fVariables[j].n_data || (this->fVariables[j].n_data == 1 && this->fVariables[j].name.find("[") != std::string::npos)) {
4136 return j;
4137 }
4138 }
4139 }
4140
4141 return -1;
4142}
4143
4144int HsSqlSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4145{
4146 if (event_name_cmp(this->fTableName, event_name)==0) {
4147 for (size_t j=0; j<this->fVariables.size(); j++) {
4148 if (var_name_cmp(this->fColumnNames[j], var_name)==0)
4149 return j;
4150 }
4151 }
4152
4153 return HsSchema::match_event_var(event_name, var_name, var_index);
4154}
4155
4156static HsSqlSchema* NewSqlSchema(HsSchemaVector* sv, const char* table_name, time_t t)
4157{
4158 time_t tt = 0;
4159 int j=-1;
4160 int jjx=-1; // remember oldest schema
4161 time_t ttx = 0;
4162 for (size_t i=0; i<sv->size(); i++) {
4163 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
4164 if (s->fTableName != table_name)
4165 continue;
4166
4167 if (s->fTimeFrom == t) {
4168 return s;
4169 }
4170
4171 // remember the last schema before time t
4172 if (s->fTimeFrom < t) {
4173 if (s->fTimeFrom > tt) {
4174 tt = s->fTimeFrom;
4175 j = i;
4176 }
4177 }
4178
4179 if (jjx < 0) {
4180 jjx = i;
4181 ttx = s->fTimeFrom;
4182 }
4183
4184 if (s->fTimeFrom < ttx) {
4185 jjx = i;
4186 ttx = s->fTimeFrom;
4187 }
4188
4189 //printf("table_name [%s], t=%s, i=%d, j=%d %s, tt=%s, dt is %d\n", table_name, TimeToString(t).c_str(), i, j, TimeToString(s->fTimeFrom).c_str(), TimeToString(tt).c_str(), (int)(s->fTimeFrom-t));
4190 }
4191
4192 //printf("NewSqlSchema: will copy schema j=%d, tt=%d at time %d\n", j, tt, t);
4193
4194 //printf("cloned schema at time %s: ", TimeToString(t).c_str());
4195 //(*sv)[j]->print(false);
4196
4197 //printf("schema before:\n");
4198 //sv->print(false);
4199
4200 if (j >= 0) {
4201 HsSqlSchema* s = new HsSqlSchema;
4202 *s = *(HsSqlSchema*)(*sv)[j]; // make a copy
4203 s->fTimeFrom = t;
4204 sv->add(s);
4205
4206 //printf("schema after:\n");
4207 //sv->print(false);
4208
4209 return s;
4210 }
4211
4212 if (jjx >= 0) {
4213 cm_msg(MERROR, "NewSqlSchema", "Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4214
4215 HsSqlSchema* s = new HsSqlSchema;
4216 *s = *(HsSqlSchema*)(*sv)[jjx]; // make a copy
4217 s->fTimeFrom = t;
4218 s->fTimeTo = ttx;
4219 sv->add(s);
4220
4221 //printf("schema after:\n");
4222 //sv->print(false);
4223
4224 return s;
4225 }
4226
4227 cm_msg(MERROR, "NewSqlSchema", "Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4228 return NULL;
4229}
4230
4232{
4233 assert(fVariables.size() == fColumnInactive.size());
4234 assert(fVariables.size() == fColumnNames.size());
4235 assert(fVariables.size() == fColumnTypes.size());
4236 assert(fVariables.size() == fOffsets.size());
4237
4238 size_t count_active = 0;
4239 size_t count_inactive = 0;
4240
4241 for (size_t i=0; i<fColumnInactive.size(); i++) {
4242 if (fColumnInactive[i])
4243 count_inactive += 1;
4244 else
4245 count_active += 1;
4246 }
4247
4248 //printf("remove_inactive_columns: enter! count_active: %zu, count_inactive: %zu\n", count_active, count_inactive);
4249 //print();
4250
4251 if (count_inactive > 0) {
4252 size_t j=0;
4253
4254 for (size_t i=0; i<fColumnInactive.size(); i++) {
4255 if (fColumnInactive[i]) {
4256 // skip this entry
4257 } else {
4258 if (j != i) {
4263 fOffsets[j] = fOffsets[i];
4264 }
4265 j++;
4266 }
4267 }
4268
4269 //print();
4270 //printf("%zu %zu\n", j, count_active);
4271
4272 assert(j == count_active);
4273
4274 //print();
4275
4276 fVariables.resize(count_active);
4278 fColumnNames.resize(count_active);
4279 fColumnTypes.resize(count_active);
4280 fOffsets.resize(count_active);
4281
4282 assert(fVariables.size() == fColumnInactive.size());
4283 assert(fVariables.size() == fColumnNames.size());
4284 assert(fVariables.size() == fColumnTypes.size());
4285 assert(fVariables.size() == fOffsets.size());
4286
4287 //printf("remove_inactice_columns: exit!\n");
4288 //print();
4289 }
4290}
4291
4292int HsSqlSchema::write_event(const time_t t, const char* data, const size_t data_size)
4293{
4294 HsSqlSchema* s = this;
4295
4296 assert(s->fVariables.size() == s->fColumnInactive.size());
4297 assert(s->fVariables.size() == s->fColumnNames.size());
4298 assert(s->fVariables.size() == s->fColumnTypes.size());
4299 assert(s->fVariables.size() == s->fOffsets.size());
4300
4301 std::string tags;
4302 std::string values;
4303
4304 for (size_t i=0; i<s->fVariables.size(); i++) {
4305 // NB: inactive columns should have been removed from the schema. K.O.
4306
4307 if (s->fColumnInactive[i]) {
4308 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected inactive column %zu", i);
4310 return HS_FILE_ERROR;
4311 }
4312
4313 int type = s->fVariables[i].type;
4314 int n_data = s->fVariables[i].n_data;
4315 int offset = s->fOffsets[i];
4316 const char* column_name = s->fColumnNames[i].c_str();
4317
4318 if (offset < 0) {
4319 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected negative offset %d for column %zu", offset, i);
4321 return HS_FILE_ERROR;
4322 }
4323
4324 assert(n_data == 1);
4325 assert(strlen(column_name) > 0);
4326 assert(offset >= 0);
4327 assert((size_t)offset < data_size);
4328
4329 void* ptr = (void*)(data+offset);
4330
4331 tags += ", ";
4332 tags += fSql->QuoteId(column_name);
4333
4334 values += ", ";
4335
4336 char buf[1024];
4337 int j=0;
4338
4339 switch (type) {
4340 default:
4341 sprintf(buf, "unknownType%d", type);
4342 break;
4343 case TID_BYTE:
4344 sprintf(buf, "%u",((unsigned char *)ptr)[j]);
4345 break;
4346 case TID_SBYTE:
4347 sprintf(buf, "%d",((signed char*)ptr)[j]);
4348 break;
4349 case TID_CHAR:
4350 // FIXME: quotes
4351 sprintf(buf, "\'%c\'",((char*)ptr)[j]);
4352 break;
4353 case TID_WORD:
4354 sprintf(buf, "%u",((unsigned short *)ptr)[j]);
4355 break;
4356 case TID_SHORT:
4357 sprintf(buf, "%d",((short *)ptr)[j]);
4358 break;
4359 case TID_DWORD:
4360 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4361 break;
4362 case TID_INT:
4363 sprintf(buf, "%d",((int *)ptr)[j]);
4364 break;
4365 case TID_BOOL:
4366 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4367 break;
4368 case TID_FLOAT:
4369 // FIXME: quotes
4370 sprintf(buf, "\'%.8g\'",((float*)ptr)[j]);
4371 break;
4372 case TID_DOUBLE:
4373 // FIXME: quotes
4374 sprintf(buf, "\'%.16g\'",((double*)ptr)[j]);
4375 break;
4376 }
4377
4378 values += buf;
4379 }
4380
4381 // 2001-02-16 20:38:40.1
4382 struct tm tms;
4383 localtime_r(&t, &tms); // somebody must call tzset() before this.
4384 char buf[1024];
4385 strftime(buf, sizeof(buf)-1, "%Y-%m-%d %H:%M:%S.0", &tms);
4386
4387 std::string cmd;
4388 cmd = "INSERT INTO ";
4389 cmd += fSql->QuoteId(s->fTableName.c_str());
4390 cmd += " (_t_time, _i_time";
4391 cmd += tags;
4392 cmd += ") VALUES (";
4393 cmd += fSql->QuoteString(buf);
4394 cmd += ", ";
4395 cmd += fSql->QuoteString(TimeToString(t).c_str());
4396 cmd += "";
4397 cmd += values;
4398 cmd += ");";
4399
4400 if (fSql->IsConnected()) {
4401 if (s->get_transaction_count() == 0)
4402 fSql->OpenTransaction(s->fTableName.c_str());
4403
4405
4406 int status = fSql->Exec(s->fTableName.c_str(), cmd.c_str());
4407
4408 // mh2sql who does not call hs_flush_buffers()
4409 // so we should flush the transaction by hand
4410 // some SQL engines have limited transaction buffers... K.O.
4411 if (s->get_transaction_count() > 100000) {
4412 //printf("flush table %s\n", table_name);
4413 fSql->CommitTransaction(s->fTableName.c_str());
4415 }
4416
4417 if (status != DB_SUCCESS) {
4418 return status;
4419 }
4420 } else {
4421 int status = fSql->ExecDisconnected(s->fTableName.c_str(), cmd.c_str());
4422 if (status != DB_SUCCESS) {
4423 return status;
4424 }
4425 }
4426
4427 return HS_SUCCESS;
4428}
4429
4431 const int debug,
4432 time_t* last_written)
4433{
4434 if (debug)
4435 printf("SqlHistory::read_last_written: table [%s], timestamp %s\n", fTableName.c_str(), TimeToString(timestamp).c_str());
4436
4437 std::string cmd;
4438 cmd += "SELECT _i_time FROM ";
4439 cmd += fSql->QuoteId(fTableName.c_str());
4440 cmd += " WHERE _i_time < ";
4441 cmd += TimeToString(timestamp);
4442 cmd += " ORDER BY _i_time DESC LIMIT 2;";
4443
4444 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4445
4446 if (status != DB_SUCCESS)
4447 return status;
4448
4449 time_t lw = 0;
4450
4451 /* Loop through the rows in the result-set */
4452
4453 while (1) {
4454 status = fSql->Step();
4455 if (status != DB_SUCCESS)
4456 break;
4457
4458 time_t t = fSql->GetTime(0);
4459
4460 if (t >= timestamp)
4461 continue;
4462
4463 if (t > lw)
4464 lw = t;
4465 }
4466
4467 fSql->Finalize();
4468
4469 *last_written = lw;
4470
4471 if (debug)
4472 printf("SqlHistory::read_last_written: table [%s], timestamp %s, last_written %s\n", fTableName.c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
4473
4474 return HS_SUCCESS;
4475}
4476
4477int HsSqlSchema::read_data(const time_t start_time,
4478 const time_t end_time,
4479 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
4480 const int debug,
4481 std::vector<time_t>& last_time,
4483{
4484 bool bad_last_time = false;
4485
4486 if (debug)
4487 printf("SqlHistory::read_data: table [%s], start %s, end %s\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4488
4489 std::string collist;
4490
4491 for (int i=0; i<num_var; i++) {
4492 int j = var_schema_index[i];
4493 if (j < 0)
4494 continue;
4495 if (collist.length() > 0)
4496 collist += ", ";
4498 }
4499
4500 std::string cmd;
4501 cmd += "SELECT _i_time, ";
4502 cmd += collist;
4503 cmd += " FROM ";
4504 cmd += fSql->QuoteId(fTableName.c_str());
4505 cmd += " WHERE _i_time>=";
4506 cmd += TimeToString(start_time);
4507 cmd += " and _i_time<=";
4508 cmd += TimeToString(end_time);
4509 cmd += " ORDER BY _i_time;";
4510
4511 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4512
4513 if (status != DB_SUCCESS)
4514 return HS_FILE_ERROR;
4515
4516 /* Loop through the rows in the result-set */
4517
4518 int count = 0;
4519
4520 while (1) {
4521 status = fSql->Step();
4522 if (status != DB_SUCCESS)
4523 break;
4524
4525 count++;
4526
4527 time_t t = fSql->GetTime(0);
4528
4529 if (t < start_time || t > end_time)
4530 continue;
4531
4532 int k = 0;
4533
4534 for (int i=0; i<num_var; i++) {
4535 int j = var_schema_index[i];
4536 if (j < 0)
4537 continue;
4538
4539 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
4540 bad_last_time = true;
4541 } else {
4542 double v = fSql->GetDouble(1+k);
4543
4544 //printf("Column %d, index %d, Row %d, time %d, value %f\n", k, colindex[k], count, t, v);
4545
4546 buffer[i]->Add(t, v);
4547 last_time[i] = t;
4548 }
4549
4550 k++;
4551 }
4552 }
4553
4554 fSql->Finalize();
4555
4556 if (bad_last_time) {
4557 cm_msg(MERROR, "SqlHistory::read_data", "Detected duplicate or non-monotonous data in table \"%s\" for start time %s and end time %s", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4558 }
4559
4560 if (debug)
4561 printf("SqlHistory::read_data: table [%s], start %s, end %s, read %d rows\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), count);
4562
4563 return HS_SUCCESS;
4564}
4565
4567 if (!fSql || fSql->fTransactionPerTable) {
4569 } else {
4570 return gfTransactionCount[fSql];
4571 }
4572}
4573
4575 if (!fSql || fSql->fTransactionPerTable) {
4577 } else {
4579 }
4580}
4581
4589
4591// SQL history functions //
4593
4594static int StartSqlTransaction(SqlBase* sql, const char* table_name, bool* have_transaction)
4595{
4596 if (*have_transaction)
4597 return HS_SUCCESS;
4598
4599 int status = sql->OpenTransaction(table_name);
4600 if (status != DB_SUCCESS)
4601 return HS_FILE_ERROR;
4602
4603 *have_transaction = true;
4604 return HS_SUCCESS;
4605}
4606
4607static int CreateSqlTable(SqlBase* sql, const char* table_name, bool* have_transaction, bool set_default_timestamp = false)
4608{
4609 int status;
4610
4612 if (status != DB_SUCCESS)
4613 return HS_FILE_ERROR;
4614
4615 std::string cmd;
4616
4617 cmd = "CREATE TABLE ";
4618 cmd += sql->QuoteId(table_name);
4620 cmd += " (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4621 } else {
4622 cmd += " (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4623 }
4624
4625 status = sql->Exec(table_name, cmd.c_str());
4626
4627
4628 if (status == DB_KEY_EXIST) {
4629 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", but it already exists", table_name);
4631 return status;
4632 }
4633
4634 if (status != DB_SUCCESS) {
4635 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4637 return HS_FILE_ERROR;
4638 }
4639
4640 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\"", table_name);
4642
4643 std::string i_index_name;
4644 i_index_name = table_name;
4645 i_index_name += "_i_time_index";
4646
4647 std::string t_index_name;
4648 t_index_name = table_name;
4649 t_index_name += "_t_time_index";
4650
4651 cmd = "CREATE INDEX ";
4652 cmd += sql->QuoteId(i_index_name.c_str());
4653 cmd += " ON ";
4654 cmd += sql->QuoteId(table_name);
4655 cmd += " (_i_time ASC);";
4656
4657 status = sql->Exec(table_name, cmd.c_str());
4658 if (status != DB_SUCCESS)
4659 return HS_FILE_ERROR;
4660
4661 cmd = "CREATE INDEX ";
4662 cmd += sql->QuoteId(t_index_name.c_str());
4663 cmd += " ON ";
4664 cmd += sql->QuoteId(table_name);
4665 cmd += " (_t_time);";
4666
4667 status = sql->Exec(table_name, cmd.c_str());
4668 if (status != DB_SUCCESS)
4669 return HS_FILE_ERROR;
4670
4671 return status;
4672}
4673
4674static int CreateSqlHyperTable(SqlBase* sql, const char* table_name, bool* have_transaction) {
4675 int status;
4676
4678 if (status != DB_SUCCESS)
4679 return HS_FILE_ERROR;
4680
4681 std::string cmd;
4682
4683 cmd = "CREATE TABLE ";
4684 cmd += sql->QuoteId(table_name);
4685 cmd += " (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4686
4687 status = sql->Exec(table_name, cmd.c_str());
4688
4689 if (status == DB_KEY_EXIST) {
4690 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", but it already exists", table_name);
4692 return status;
4693 }
4694
4695 if (status != DB_SUCCESS) {
4696 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4698 return HS_FILE_ERROR;
4699 }
4700
4701 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\"", table_name);
4703
4704 cmd = "SELECT create_hypertable(";
4705 cmd += sql->QuoteString(table_name);
4706 cmd += ", '_t_time');";
4707
4708 // convert regular table to hypertable
4709 status = sql->Exec(table_name, cmd.c_str());
4710
4711 if (status != DB_SUCCESS) {
4712 cm_msg(MINFO, "CreateSqlHyperTable", "Converting SQL table to hypertable \"%s\", error status %d", table_name, status);
4714 return HS_FILE_ERROR;
4715 }
4716
4717 std::string i_index_name;
4718 i_index_name = table_name;
4719 i_index_name += "_i_time_index";
4720
4721 std::string t_index_name;
4722 t_index_name = table_name;
4723 t_index_name += "_t_time_index";
4724
4725 cmd = "CREATE INDEX ";
4726 cmd += sql->QuoteId(i_index_name.c_str());
4727 cmd += " ON ";
4728 cmd += sql->QuoteId(table_name);
4729 cmd += " (_i_time ASC);";
4730
4731 status = sql->Exec(table_name, cmd.c_str());
4732 if (status != DB_SUCCESS)
4733 return HS_FILE_ERROR;
4734
4735 cmd = "CREATE INDEX ";
4736 cmd += sql->QuoteId(t_index_name.c_str());
4737 cmd += " ON ";
4738 cmd += sql->QuoteId(table_name);
4739 cmd += " (_t_time);";
4740
4741 status = sql->Exec(table_name, cmd.c_str());
4742 if (status != DB_SUCCESS)
4743 return HS_FILE_ERROR;
4744
4745 return status;
4746}
4747
4748static int CreateSqlColumn(SqlBase* sql, const char* table_name, const char* column_name, const char* column_type, bool* have_transaction, int debug)
4749{
4750 if (debug)
4751 printf("CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4752
4753 int status = StartSqlTransaction(sql, table_name, have_transaction);
4754 if (status != HS_SUCCESS)
4755 return status;
4756
4757 std::string cmd;
4758 cmd = "ALTER TABLE ";
4759 cmd += sql->QuoteId(table_name);
4760 cmd += " ADD COLUMN ";
4761 cmd += sql->QuoteId(column_name);
4762 cmd += " ";
4763 cmd += column_type;
4764 cmd += ";";
4765
4766 status = sql->Exec(table_name, cmd.c_str());
4767
4768 cm_msg(MINFO, "CreateSqlColumn", "Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name, status);
4770
4771 return status;
4772}
4773
4775// SQL history base classes //
4777
4779{
4780public:
4782
4784 {
4785 fSql = NULL;
4787 }
4788
4789 virtual ~SqlHistoryBase() // dtor
4790 {
4791 hs_disconnect();
4792 if (fSql)
4793 delete fSql;
4794 fSql = NULL;
4795 }
4796
4798 {
4799 if (fSql)
4800 fSql->fDebug = debug;
4802 }
4803
4804 int hs_connect(const char* connect_string);
4805 int hs_disconnect();
4806 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
4807 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
4808 HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s) { return s; };
4809
4810
4811protected:
4813 virtual int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name) = 0;
4814 virtual int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp) = 0;
4815 virtual int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction) = 0;
4816
4817 int update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable);
4818 int update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction);
4819};
4820
4822{
4823 if (fDebug)
4824 printf("hs_connect [%s]!\n", connect_string);
4825
4826 assert(fSql);
4827
4828 if (fSql->IsConnected())
4829 if (strcmp(fConnectString.c_str(), connect_string) == 0)
4830 return HS_SUCCESS;
4831
4832 hs_disconnect();
4833
4834 if (!connect_string || strlen(connect_string) < 1) {
4835 // FIXME: should use "logger dir" or some such default, that code should be in hs_get_history(), not here
4836 connect_string = ".";
4837 }
4838
4840
4841 if (fDebug)
4842 printf("hs_connect: connecting to SQL database \'%s\'\n", fConnectString.c_str());
4843
4844 int status = fSql->Connect(fConnectString.c_str());
4845 if (status != DB_SUCCESS)
4846 return status;
4847
4848 return HS_SUCCESS;
4849}
4850
4852{
4853 if (fDebug)
4854 printf("hs_disconnect!\n");
4855
4857
4858 fSql->Disconnect();
4859
4861
4862 return HS_SUCCESS;
4863}
4864
4865HsSchema* SqlHistoryBase::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
4866{
4867 if (fDebug)
4868 printf("SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
4869
4870 int status;
4871
4872 if (fWriterSchema.size() == 0) {
4874 if (status != HS_SUCCESS)
4875 return NULL;
4876 }
4877
4878 HsSqlSchema* s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4879
4880 // schema does not exist, the SQL tables probably do not exist yet
4881
4882 if (!s) {
4883 status = create_table(&fWriterSchema, event_name, timestamp);
4884 if (status != HS_SUCCESS)
4885 return NULL;
4886
4887 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4888
4889 if (!s) {
4890 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4891 fWriterSchema.find_event(event_name, timestamp, 1);
4892 return NULL;
4893 }
4894 }
4895
4896 assert(s != NULL);
4897
4898 status = read_column_names(&fWriterSchema, s->fTableName.c_str(), s->fEventName.c_str());
4899 if (status != HS_SUCCESS)
4900 return NULL;
4901
4902 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4903
4904 if (!s) {
4905 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4906 return NULL;
4907 }
4908
4909 if (0||fDebug) {
4910 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4911 if (s)
4912 s->print();
4913 }
4914
4915 status = update_schema(s, timestamp, ntags, tags, true);
4916 if (status != HS_SUCCESS) {
4917 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4918 return NULL;
4919 }
4920
4921 status = read_column_names(&fWriterSchema, s->fTableName.c_str(), s->fEventName.c_str());
4922 if (status != HS_SUCCESS)
4923 return NULL;
4924
4925 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4926
4927 if (!s) {
4928 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4929 return NULL;
4930 }
4931
4932 if (0||fDebug) {
4933 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4934 if (s)
4935 s->print();
4936 }
4937
4938 // last call to UpdateMysqlSchema with "false" will check that new schema matches the new tags
4939
4940 status = update_schema(s, timestamp, ntags, tags, false);
4941 if (status != HS_SUCCESS) {
4942 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4943 //fDebug = 1;
4944 //update_schema(s, timestamp, ntags, tags, false);
4945 //abort();
4946 return NULL;
4947 }
4948
4949 HsSqlSchema* e = new HsSqlSchema();
4950
4951 *e = *s; // make a copy of the schema
4952
4953 return e;
4954}
4955
4956int SqlHistoryBase::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
4957{
4958 if (fDebug)
4959 printf("SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
4960
4961 int status;
4962
4963 if (sv->size() == 0) {
4965 if (status != HS_SUCCESS)
4966 return status;
4967 }
4968
4969 //sv->print(false);
4970
4971 if (event_name == NULL)
4972 return HS_SUCCESS;
4973
4974 for (size_t i=0; i<sv->size(); i++) {
4975 HsSqlSchema* h = (HsSqlSchema*)(*sv)[i];
4976 // skip schema with already read column names
4977 if (h->fVariables.size() > 0)
4978 continue;
4979 // skip schema with different name
4980 if (!MatchEventName(h->fEventName.c_str(), event_name))
4981 continue;
4982
4983 size_t nn = sv->size();
4984
4985 status = read_column_names(sv, h->fTableName.c_str(), h->fEventName.c_str());
4986
4987 // if new schema was added, loop all over again
4988 if (sv->size() != nn)
4989 i=0;
4990 }
4991
4992 //sv->print(false);
4993
4994 return HS_SUCCESS;
4995}
4996
4997int SqlHistoryBase::update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
4998{
4999 int status;
5000 bool have_transaction = false;
5001
5002 status = update_schema1(s, timestamp, ntags, tags, write_enable, &have_transaction);
5003
5004 if (have_transaction) {
5005 int xstatus;
5006
5007 if (status == HS_SUCCESS)
5009 else
5011
5012 if (xstatus != DB_SUCCESS) {
5013 return HS_FILE_ERROR;
5014 }
5015 have_transaction = false;
5016 }
5017
5018 return status;
5019}
5020
5021int SqlHistoryBase::update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction)
5022{
5023 int status;
5024
5025 if (fDebug)
5026 printf("update_schema1\n");
5027
5028 // check that compare schema with tags[]
5029
5030 bool schema_ok = true;
5031
5032 int offset = 0;
5033 for (int i=0; i<ntags; i++) {
5034 for (unsigned int j=0; j<tags[i].n_data; j++) {
5035 int tagtype = tags[i].type;
5036 std::string tagname = tags[i].name;
5037 std::string maybe_colname = MidasNameToSqlName(tags[i].name);
5038
5039 if (tags[i].n_data > 1) {
5040 char s[256];
5041 sprintf(s, "[%d]", j);
5042 tagname += s;
5043
5044 sprintf(s, "_%d", j);
5045 maybe_colname += s;
5046 }
5047
5048 int count = 0;
5049
5050 for (size_t j=0; j<s->fVariables.size(); j++) {
5051 // NB: inactive columns will be reactivated or recreated by the if(count==0) branch. K.O.
5052 if (s->fColumnInactive[j])
5053 continue;
5054 if (tagname == s->fVariables[j].name) {
5055 if (s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str())) {
5056 if (count == 0) {
5057 s->fOffsets[j] = offset;
5059 }
5060 count++;
5061 if (count > 1) {
5062 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
5064 }
5065 } else {
5066 // column with incompatible type, mark it as unused
5067 schema_ok = false;
5068 if (fDebug)
5069 printf("Incompatible column!\n");
5070 if (write_enable) {
5071 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" as incompatible with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
5073
5074 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[i].tag_type.c_str(), timestamp, false, have_transaction);
5075 if (status != HS_SUCCESS)
5076 return status;
5077 }
5078 }
5079 }
5080 }
5081
5082 if (count == 0) {
5083 // tag does not have a corresponding column
5084 schema_ok = false;
5085 if (fDebug)
5086 printf("No column for tag %s!\n", tagname.c_str());
5087
5088 bool found_column = false;
5089
5090 if (write_enable) {
5091 for (size_t j=0; j<s->fVariables.size(); j++) {
5092 if (tagname == s->fVariables[j].tag_name) {
5093 bool typeok = s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str());
5094 if (typeok) {
5095 cm_msg(MINFO, "SqlHistory::update_schema", "Reactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5097
5098 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[j].tag_type.c_str(), timestamp, true, have_transaction);
5099 if (status != HS_SUCCESS)
5100 return status;
5101
5102 if (count == 0) {
5103 s->fOffsets[j] = offset;
5105 }
5106 count++;
5107 found_column = true;
5108 if (count > 1) {
5109 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5111 }
5112 }
5113 }
5114 }
5115 }
5116
5117 // create column
5118 if (!found_column && write_enable) {
5119 std::string col_name = maybe_colname;
5120 const char* col_type = s->fSql->ColumnType(tagtype);
5121
5122 bool dupe = false;
5123 for (size_t kk=0; kk<s->fColumnNames.size(); kk++)
5124 if (s->fColumnNames[kk] == col_name) {
5125 dupe = true;
5126 break;
5127 }
5128
5129 time_t now = time(NULL);
5130
5131 bool retry = false;
5132 for (int t=0; t<20; t++) {
5133
5134 // if duplicate column name, change it, try again
5135 if (dupe || retry) {
5137 col_name += "_";
5139 if (t > 0) {
5140 char s[256];
5141 sprintf(s, "_%d", t);
5142 col_name += s;
5143 }
5144 }
5145
5146 if (fDebug)
5147 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5148
5150
5151 if (status == DB_KEY_EXIST) {
5152 if (fDebug)
5153 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5154 retry = true;
5155 continue;
5156 }
5157
5158 if (status != HS_SUCCESS)
5159 return status;
5160
5161 break;
5162 }
5163
5164 if (status != HS_SUCCESS)
5165 return status;
5166
5167 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str(), rpc_tid_name(tagtype), timestamp, true, have_transaction);
5168 if (status != HS_SUCCESS)
5169 return status;
5170 }
5171 }
5172
5173 if (count > 1) {
5174 // schema has duplicate tags
5175 schema_ok = false;
5176 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->fEventName.c_str(), tagname.c_str());
5178 }
5179 }
5180 }
5181
5182 // mark as unused all columns not listed in tags
5183
5184 for (size_t k=0; k<s->fColumnNames.size(); k++)
5185 if (s->fVariables[k].name.length() > 0) {
5186 bool found = false;
5187
5188 for (int i=0; i<ntags; i++) {
5189 for (unsigned int j=0; j<tags[i].n_data; j++) {
5190 std::string tagname = tags[i].name;
5191
5192 if (tags[i].n_data > 1) {
5193 char s[256];
5194 sprintf(s, "[%d]", j);
5195 tagname += s;
5196 }
5197
5198 if (s->fVariables[k].name == tagname) {
5199 found = true;
5200 break;
5201 }
5202 }
5203
5204 if (found)
5205 break;
5206 }
5207
5208 if (!found) {
5209 // column not found in tags list
5210 schema_ok = false;
5211 if (fDebug)
5212 printf("Event [%s] Column [%s] tag [%s] not listed in tags list!\n", s->fEventName.c_str(), s->fColumnNames[k].c_str(), s->fVariables[k].name.c_str());
5213 if (write_enable) {
5214 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" not used for any tags", s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fTableName.c_str(), s->fEventName.c_str());
5216
5217 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fVariables[k].tag_name.c_str(), s->fVariables[k].tag_type.c_str(), timestamp, false, have_transaction);
5218 if (status != HS_SUCCESS)
5219 return status;
5220 }
5221 }
5222 }
5223
5224 if (!write_enable)
5225 if (!schema_ok) {
5226 if (fDebug)
5227 printf("Return error!\n");
5228 return HS_FILE_ERROR;
5229 }
5230
5231 return HS_SUCCESS;
5232}
5233
5235// SQLITE functions //
5237
5238static int ReadSqliteTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5239{
5240 if (debug)
5241 printf("ReadSqliteTableNames: table [%s]\n", table_name);
5242
5243 int status;
5244 std::string cmd;
5245
5246 // FIXME: quotes
5247 cmd = "SELECT event_name, _i_time FROM \'_event_name_";
5248 cmd += table_name;
5249 cmd += "\' WHERE table_name='";
5250 cmd += table_name;
5251 cmd += "';";
5252
5253 status = sql->Prepare(table_name, cmd.c_str());
5254
5255 if (status != DB_SUCCESS)
5256 return status;
5257
5258 while (1) {
5259 status = sql->Step();
5260
5261 if (status != DB_SUCCESS)
5262 break;
5263
5264 std::string xevent_name = sql->GetText(0);
5265 time_t xevent_time = sql->GetTime(1);
5266
5267 //printf("read event name [%s] time %s\n", xevent_name.c_str(), TimeToString(xevent_time).c_str());
5268
5269 HsSqlSchema* s = new HsSqlSchema;
5270 s->fSql = sql;
5273 s->fTimeTo = 0;
5274 s->fTableName = table_name;
5275 sv->add(s);
5276 }
5277
5278 status = sql->Finalize();
5279
5280 return HS_SUCCESS;
5281}
5282
5283static int ReadSqliteTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5284{
5285 if (debug)
5286 printf("ReadSqliteTableSchema: table [%s]\n", table_name);
5287
5288 if (1) {
5289 // seed schema with table names
5290 HsSqlSchema* s = new HsSqlSchema;
5291 s->fSql = sql;
5292 s->fEventName = table_name;
5293 s->fTimeFrom = 0;
5294 s->fTimeTo = 0;
5295 s->fTableName = table_name;
5296 sv->add(s);
5297 }
5298
5299 return ReadSqliteTableNames(sql, sv, table_name, debug);
5300}
5301
5303// SQLITE history classes //
5305
5307{
5308public:
5309 SqliteHistory() { // ctor
5310#ifdef HAVE_SQLITE
5311 fSql = new Sqlite();
5312#endif
5313 }
5314
5316 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5317 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5318 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5319};
5320
5322{
5323 int status;
5324
5325 if (fDebug)
5326 printf("SqliteHistory::read_table_and_event_names!\n");
5327
5328 // loop over all tables
5329
5330 std::vector<std::string> tables;
5332 if (status != DB_SUCCESS)
5333 return status;
5334
5335 for (size_t i=0; i<tables.size(); i++) {
5336 const char* table_name = tables[i].c_str();
5337
5338 const char* s;
5339 s = strstr(table_name, "_event_name_");
5340 if (s == table_name)
5341 continue;
5342 s = strstr(table_name, "_column_names_");
5343 if (s == table_name)
5344 continue;
5345
5346 status = ReadSqliteTableSchema(fSql, sv, table_name, fDebug);
5347 }
5348
5349 return HS_SUCCESS;
5350}
5351
5352int SqliteHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5353{
5354 if (fDebug)
5355 printf("SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5356
5357 // for all schema for table_name, prepopulate is with column names
5358
5359 std::vector<std::string> columns;
5360 fSql->ListColumns(table_name, &columns);
5361
5362 // first, populate column names
5363
5364 for (size_t i=0; i<sv->size(); i++) {
5365 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5366
5367 if (s->fTableName != table_name)
5368 continue;
5369
5370 // schema should be empty at this point
5371 //assert(s->fVariables.size() == 0);
5372
5373 for (size_t j=0; j<columns.size(); j+=2) {
5374 const char* cn = columns[j+0].c_str();
5375 const char* ct = columns[j+1].c_str();
5376
5377 if (strcmp(cn, "_t_time") == 0)
5378 continue;
5379 if (strcmp(cn, "_i_time") == 0)
5380 continue;
5381
5382 bool found = false;
5383
5384 for (size_t k=0; k<s->fColumnNames.size(); k++) {
5385 if (s->fColumnNames[k] == cn) {
5386 found = true;
5387 break;
5388 }
5389 }
5390
5391 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5392
5393 if (!found) {
5395 se.name = cn;
5396 se.type = 0;
5397 se.n_data = 1;
5398 se.n_bytes = 0;
5399 s->fVariables.push_back(se);
5400 s->fColumnNames.push_back(cn);
5401 s->fColumnTypes.push_back(ct);
5402 s->fColumnInactive.push_back(false);
5403 s->fOffsets.push_back(-1);
5404 }
5405 }
5406 }
5407
5408 // then read column name information
5409
5410 std::string tn;
5411 tn += "_column_names_";
5412 tn += table_name;
5413
5414 std::string cmd;
5415 cmd = "SELECT column_name, tag_name, tag_type, _i_time FROM ";
5416 cmd += fSql->QuoteId(tn.c_str());
5417 cmd += " WHERE table_name=";
5418 cmd += fSql->QuoteString(table_name);
5419 cmd += " ORDER BY _i_time ASC;";
5420
5421 int status = fSql->Prepare(table_name, cmd.c_str());
5422
5423 if (status != DB_SUCCESS) {
5424 return status;
5425 }
5426
5427 while (1) {
5428 status = fSql->Step();
5429
5430 if (status != DB_SUCCESS)
5431 break;
5432
5433 // NOTE: SQL "SELECT ORDER BY _i_time ASC" returns data sorted by time
5434 // in this code we use the data from the last data row
5435 // so if multiple rows are present, the latest one is used
5436
5437 std::string col_name = fSql->GetText(0);
5438 std::string tag_name = fSql->GetText(1);
5439 std::string tag_type = fSql->GetText(2);
5441
5442 //printf("read table [%s] column [%s] tag name [%s] time %s\n", table_name, col_name.c_str(), tag_name.c_str(), TimeToString(xxx_time).c_str());
5443
5444 // make sure a schema exists at this time point
5445 NewSqlSchema(sv, table_name, schema_time);
5446
5447 // add this information to all schema
5448
5449 for (size_t i=0; i<sv->size(); i++) {
5450 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5451 if (s->fTableName != table_name)
5452 continue;
5453 if (s->fTimeFrom < schema_time)
5454 continue;
5455
5456 //printf("add column to schema %d\n", s->fTimeFrom);
5457
5458 for (size_t j=0; j<s->fColumnNames.size(); j++) {
5459 if (col_name != s->fColumnNames[j])
5460 continue;
5461 s->fVariables[j].name = tag_name;
5462 s->fVariables[j].type = rpc_name_tid(tag_type.c_str());
5463 s->fVariables[j].n_data = 1;
5464 s->fVariables[j].n_bytes = rpc_tid_size(s->fVariables[j].type);
5465 }
5466 }
5467 }
5468
5469 status = fSql->Finalize();
5470
5471 return HS_SUCCESS;
5472}
5473
5474int SqliteHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5475{
5476 if (fDebug)
5477 printf("SqliteHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5478
5479 int status;
5480 bool have_transaction = false;
5481 std::string table_name = MidasNameToSqlName(event_name);
5482
5483 // FIXME: what about duplicate table names?
5484 status = CreateSqlTable(fSql, table_name.c_str(), &have_transaction);
5485
5486 //if (status == DB_KEY_EXIST) {
5487 // return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5488 //}
5489
5490 if (status != HS_SUCCESS) {
5491 // FIXME: ???
5492 // FIXME: at least close or revert the transaction
5493 return status;
5494 }
5495
5496 std::string cmd;
5497
5498 std::string en;
5499 en += "_event_name_";
5500 en += table_name;
5501
5502 cmd = "CREATE TABLE ";
5503 cmd += fSql->QuoteId(en.c_str());
5504 cmd += " (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5505
5506 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5507
5508 cmd = "INSERT INTO ";
5509 cmd += fSql->QuoteId(en.c_str());
5510 cmd += " (table_name, event_name, _i_time) VALUES (";
5511 cmd += fSql->QuoteString(table_name.c_str());
5512 cmd += ", ";
5513 cmd += fSql->QuoteString(event_name);
5514 cmd += ", ";
5515 cmd += fSql->QuoteString(TimeToString(timestamp).c_str());
5516 cmd += ");";
5517
5518 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5519
5520 std::string cn;
5521 cn += "_column_names_";
5522 cn += table_name;
5523
5524 cmd = "CREATE TABLE ";
5525 cmd += fSql->QuoteId(cn.c_str());
5526 cmd += " (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5527
5528 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5529
5530 status = fSql->CommitTransaction(table_name.c_str());
5531 if (status != DB_SUCCESS) {
5532 return HS_FILE_ERROR;
5533 }
5534
5535 return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5536}
5537
5538int SqliteHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5539{
5540 if (fDebug)
5541 printf("SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name, TimeToString(timestamp).c_str());
5542
5543 int status = StartSqlTransaction(fSql, table_name, have_transaction);
5544 if (status != HS_SUCCESS)
5545 return status;
5546
5547 // FIXME: quotes
5548 std::string cmd;
5549 cmd = "INSERT INTO \'_column_names_";
5550 cmd += table_name;
5551 cmd += "\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5552 cmd += table_name;
5553 cmd += "\', \'";
5554 cmd += column_name;
5555 cmd += "\', \'";
5556 cmd += tag_name;
5557 cmd += "\', \'";
5558 cmd += tag_type;
5559 cmd += "\', \'";
5560 cmd += column_type;
5561 cmd += "\', \'";
5562 cmd += TimeToString(timestamp);
5563 cmd += "\');";
5564 status = fSql->Exec(table_name, cmd.c_str());
5565
5566 return status;
5567}
5568
5570// Mysql history classes //
5572
5574{
5575public:
5576 MysqlHistory() { // ctor
5577#ifdef HAVE_MYSQL
5578 fSql = new Mysql();
5579#endif
5580 }
5581
5583 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5584 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5585 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5586};
5587
5588static int ReadMysqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5589{
5590 if (debug)
5591 printf("ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5592
5593 int status;
5594 std::string cmd;
5595
5596 if (table_name) {
5597 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5598 cmd += table_name;
5599 cmd += "';";
5600 } else {
5601 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5602 table_name = "_history_index";
5603 }
5604
5605 status = sql->Prepare(table_name, cmd.c_str());
5606
5607 if (status != DB_SUCCESS)
5608 return status;
5609
5610 bool found_must_have_table = false;
5611 int count = 0;
5612
5613 while (1) {
5614 status = sql->Step();
5615
5616 if (status != DB_SUCCESS)
5617 break;
5618
5619 const char* xevent_name = sql->GetText(0);
5620 const char* xtable_name = sql->GetText(1);
5621 time_t xevent_time = sql->GetTime(2);
5622
5623 if (debug == 999) {
5624 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5625 }
5626
5628 assert(must_have_event_name != NULL);
5630 found_must_have_table = true;
5631 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5632 } else {
5633 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5634 }
5635 }
5636
5637 HsSqlSchema* s = new HsSqlSchema;
5638 s->fSql = sql;
5641 s->fTimeTo = 0;
5643 sv->add(s);
5644 count++;
5645 }
5646
5647 status = sql->Finalize();
5648
5650 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5651 if (debug == 999)
5652 return HS_FILE_ERROR;
5653 // NB: recursion is broken by setting debug to 999.
5655 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5657 abort();
5658 return HS_FILE_ERROR;
5659 }
5660
5661 if (0) {
5662 // print accumulated schema
5663 printf("ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5664 sv->print(false);
5665 }
5666
5667 return HS_SUCCESS;
5668}
5669
5670int MysqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5671{
5672 if (fDebug)
5673 printf("MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5674
5675 // for all schema for table_name, prepopulate is with column names
5676
5677 std::vector<std::string> columns;
5678 fSql->ListColumns(table_name, &columns);
5679
5680 // first, populate column names
5681
5682 for (size_t i=0; i<sv->size(); i++) {
5683 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5684
5685 if (s->fTableName != table_name)
5686 continue;
5687
5688 // schema should be empty at this point
5689 //assert(s->fVariables.size() == 0);
5690
5691 for (size_t j=0; j<columns.size(); j+=2) {
5692 const char* cn = columns[j+0].c_str();
5693 const char* ct = columns[j+1].c_str();
5694
5695 if (strcmp(cn, "_t_time") == 0)
5696 continue;
5697 if (strcmp(cn, "_i_time") == 0)
5698 continue;
5699
5700 bool found = false;
5701
5702 for (size_t k=0; k<s->fColumnNames.size(); k++) {
5703 if (s->fColumnNames[k] == cn) {
5704 found = true;
5705 break;
5706 }
5707 }
5708
5709 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5710
5711 if (!found) {
5713 se.tag_name = cn;
5714 se.tag_type = "";
5715 se.name = cn;
5716 se.type = 0;
5717 se.n_data = 1;
5718 se.n_bytes = 0;
5719 s->fVariables.push_back(se);
5720 s->fColumnNames.push_back(cn);
5721 s->fColumnTypes.push_back(ct);
5722 s->fColumnInactive.push_back(false);
5723 s->fOffsets.push_back(-1);
5724 }
5725 }
5726 }
5727
5728 // then read column name information
5729
5730 std::string cmd;
5731 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5732 cmd += fSql->QuoteString(event_name);
5733 cmd += ";";
5734
5735 int status = fSql->Prepare(table_name, cmd.c_str());
5736
5737 if (status != DB_SUCCESS) {
5738 return status;
5739 }
5740
5741 while (1) {
5742 status = fSql->Step();
5743
5744 if (status != DB_SUCCESS)
5745 break;
5746
5747 const char* col_name = fSql->GetText(0);
5748 const char* col_type = fSql->GetText(1);
5749 const char* tag_name = fSql->GetText(2);
5750 const char* tag_type = fSql->GetText(3);
5752 const char* active = fSql->GetText(5);
5753 int iactive = atoi(active);
5754
5755 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
5756
5757 if (!col_name)
5758 continue;
5759 if (!tag_name)
5760 continue;
5761 if (strlen(col_name) < 1)
5762 continue;
5763
5764 // make sure a schema exists at this time point
5765 NewSqlSchema(sv, table_name, schema_time);
5766
5767 // add this information to all schema
5768
5769 for (size_t i=0; i<sv->size(); i++) {
5770 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5771 if (s->fTableName != table_name)
5772 continue;
5773 if (s->fTimeFrom < schema_time)
5774 continue;
5775
5776 int tid = rpc_name_tid(tag_type);
5777 int tid_size = rpc_tid_size(tid);
5778
5779 for (size_t j=0; j<s->fColumnNames.size(); j++) {
5780 if (col_name != s->fColumnNames[j])
5781 continue;
5782
5783 s->fVariables[j].tag_name = tag_name;
5784 s->fVariables[j].tag_type = tag_type;
5785 if (!iactive) {
5786 s->fVariables[j].name = "";
5787 s->fColumnInactive[j] = true;
5788 } else {
5789 s->fVariables[j].name = tag_name;
5790 s->fColumnInactive[j] = false;
5791 }
5792 s->fVariables[j].type = tid;
5793 s->fVariables[j].n_data = 1;
5794 s->fVariables[j].n_bytes = tid_size;
5795
5796 // doctor column names in case MySQL returns different type
5797 // from the type used to create the column, but the types
5798 // are actually the same. K.O.
5800 }
5801 }
5802 }
5803
5804 status = fSql->Finalize();
5805
5806 return HS_SUCCESS;
5807}
5808
5809#if 0
5810static int ReadMysqlTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5811{
5812 if (debug)
5813 printf("ReadMysqlTableSchema: table [%s]\n", table_name);
5814
5815 if (1) {
5816 // seed schema with table names
5817 HsSqlSchema* s = new HsSqlSchema;
5818 s->fSql = sql;
5819 s->fEventName = table_name;
5820 s->fTimeFrom = 0;
5821 s->fTimeTo = 0;
5822 s->fTableName = table_name;
5823 sv->add(s);
5824 }
5825
5826 return ReadMysqlTableNames(sql, sv, table_name, debug, NULL, NULL);
5827}
5828#endif
5829
5831{
5832 int status;
5833
5834 if (fDebug)
5835 printf("MysqlHistory::read_table_and_event_names!\n");
5836
5837 // loop over all tables
5838
5839 std::vector<std::string> tables;
5841 if (status != DB_SUCCESS)
5842 return status;
5843
5844 for (size_t i=0; i<tables.size(); i++) {
5845 const char* table_name = tables[i].c_str();
5846
5847 const char* s;
5848 s = strstr(table_name, "_history_index");
5849 if (s == table_name)
5850 continue;
5851
5852 if (1) {
5853 // seed schema with table names
5854 HsSqlSchema* s = new HsSqlSchema;
5855 s->fSql = fSql;
5856 s->fEventName = table_name;
5857 s->fTimeFrom = 0;
5858 s->fTimeTo = 0;
5859 s->fTableName = table_name;
5860 sv->add(s);
5861 }
5862 }
5863
5864 if (0) {
5865 // print accumulated schema
5866 printf("read_table_and_event_names:\n");
5867 sv->print(false);
5868 }
5869
5871
5872 return HS_SUCCESS;
5873}
5874
5875int MysqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5876{
5877 if (fDebug)
5878 printf("MysqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5879
5880 int status;
5881 std::string table_name = MidasNameToSqlName(event_name);
5882
5883 // MySQL table name length limit is 64 bytes
5884 if (table_name.length() > 40) {
5885 table_name.resize(40);
5886 table_name += "_T";
5887 }
5888
5889 time_t now = time(NULL);
5890
5891 int max_attempts = 10;
5892 for (int i=0; i<max_attempts; i++) {
5893 status = fSql->OpenTransaction(table_name.c_str());
5894 if (status != DB_SUCCESS) {
5895 return HS_FILE_ERROR;
5896 }
5897
5898 bool have_transaction = true;
5899
5900 std::string xtable_name = table_name;
5901
5902 if (i>0) {
5903 xtable_name += "_";
5905 if (i>1) {
5906 xtable_name += "_";
5907 char buf[256];
5908 sprintf(buf, "%d", i);
5909 xtable_name += buf;
5910 }
5911 }
5912
5914
5915 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
5916
5917 if (status == DB_KEY_EXIST) {
5918 // already exists, try with different name!
5919 fSql->RollbackTransaction(table_name.c_str());
5920 continue;
5921 }
5922
5923 if (status != HS_SUCCESS) {
5924 // MYSQL cannot roll back "create table", if we cannot create SQL tables, nothing will work. Give up now.
5925 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name, TimeToString(timestamp).c_str());
5926 abort();
5927
5928 // fatal error, give up!
5929 fSql->RollbackTransaction(table_name.c_str());
5930 break;
5931 }
5932
5933 for (int j=0; j<2; j++) {
5934 std::string cmd;
5935 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5936 cmd += fSql->QuoteString(event_name);
5937 cmd += ", ";
5938 cmd += fSql->QuoteString(xtable_name.c_str());
5939 cmd += ", ";
5940 char buf[256];
5941 sprintf(buf, "%.0f", (double)timestamp);
5942 cmd += fSql->QuoteString(buf);
5943 cmd += ", ";
5944 cmd += fSql->QuoteString("1");
5945 cmd += ");";
5946
5947 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
5948 if (status == DB_SUCCESS)
5949 break;
5950
5951 status = CreateSqlTable(fSql, "_history_index", &have_transaction);
5952 status = CreateSqlColumn(fSql, "_history_index", "event_name", "varchar(256) character set binary not null", &have_transaction, fDebug);
5953 status = CreateSqlColumn(fSql, "_history_index", "table_name", "varchar(256)", &have_transaction, fDebug);
5954 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "varchar(256) character set binary", &have_transaction, fDebug);
5955 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "varchar(256)", &have_transaction, fDebug);
5956 status = CreateSqlColumn(fSql, "_history_index", "column_name", "varchar(256)", &have_transaction, fDebug);
5957 status = CreateSqlColumn(fSql, "_history_index", "column_type", "varchar(256)", &have_transaction, fDebug);
5958 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
5959 status = CreateSqlColumn(fSql, "_history_index", "active", "boolean", &have_transaction, fDebug);
5960 }
5961
5962 status = fSql->CommitTransaction(table_name.c_str());
5963
5964 if (status != DB_SUCCESS) {
5965 return HS_FILE_ERROR;
5966 }
5967
5968 return ReadMysqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
5969 }
5970
5971 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
5972
5973 return HS_FILE_ERROR;
5974}
5975
5976int MysqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5977{
5978 if (fDebug)
5979 printf("MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
5980
5981 std::string cmd;
5982 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5983 cmd += fSql->QuoteString(event_name);
5984 cmd += ", ";
5985 cmd += fSql->QuoteString(table_name);
5986 cmd += ", ";
5987 cmd += fSql->QuoteString(tag_name);
5988 cmd += ", ";
5989 cmd += fSql->QuoteString(tag_type);
5990 cmd += ", ";
5991 cmd += fSql->QuoteString(column_name);
5992 cmd += ", ";
5993 cmd += fSql->QuoteString(column_type);
5994 cmd += ", ";
5995 char buf[256];
5996 sprintf(buf, "%.0f", (double)timestamp);
5997 cmd += fSql->QuoteString(buf);
5998 cmd += ", ";
5999 if (active)
6000 cmd += fSql->QuoteString("1");
6001 else
6002 cmd += fSql->QuoteString("0");
6003 cmd += ");";
6004
6005 int status = fSql->Exec(table_name, cmd.c_str());
6006 if (status != DB_SUCCESS)
6007 return HS_FILE_ERROR;
6008
6009 return HS_SUCCESS;
6010}
6011
6013// PostgreSQL history classes //
6015
6016#ifdef HAVE_PGSQL
6017
6018class PgsqlHistory: public SqlHistoryBase
6019{
6020public:
6021 Pgsql *fPgsql = NULL;
6022public:
6023 PgsqlHistory() { // ctor
6024 fPgsql = new Pgsql();
6025 fSql = fPgsql;
6026 }
6027
6029 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
6030 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
6031 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
6032};
6033
6034static int ReadPgsqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
6035{
6036 if (debug)
6037 printf("ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
6038
6039 int status;
6040 std::string cmd;
6041
6042 if (table_name) {
6043 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
6044 cmd += table_name;
6045 cmd += "';";
6046 } else {
6047 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
6048 table_name = "_history_index";
6049 }
6050
6051 status = sql->Prepare(table_name, cmd.c_str());
6052
6053 if (status != DB_SUCCESS)
6054 return status;
6055
6056 bool found_must_have_table = false;
6057 int count = 0;
6058
6059 while (1) {
6060 status = sql->Step();
6061
6062 if (status != DB_SUCCESS)
6063 break;
6064
6065 const char* xevent_name = sql->GetText(0);
6066 const char* xtable_name = sql->GetText(1);
6067 time_t xevent_time = sql->GetTime(2);
6068
6069 if (debug == 999) {
6070 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
6071 }
6072
6074 assert(must_have_event_name != NULL);
6076 found_must_have_table = true;
6077 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
6078 } else {
6079 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
6080 }
6081 }
6082
6083 HsSqlSchema* s = new HsSqlSchema;
6084 s->fSql = sql;
6087 s->fTimeTo = 0;
6089 sv->add(s);
6090 count++;
6091 }
6092
6093 status = sql->Finalize();
6094
6096 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
6097 if (debug == 999)
6098 return HS_FILE_ERROR;
6099 // NB: recursion is broken by setting debug to 999.
6101 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
6103 abort();
6104 return HS_FILE_ERROR;
6105 }
6106
6107 if (0) {
6108 // print accumulated schema
6109 printf("ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
6110 sv->print(false);
6111 }
6112
6113 return HS_SUCCESS;
6114}
6115
6116int PgsqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
6117{
6118 if (fDebug)
6119 printf("PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
6120
6121 // for all schema for table_name, prepopulate is with column names
6122
6123 std::vector<std::string> columns;
6124 fSql->ListColumns(table_name, &columns);
6125
6126 // first, populate column names
6127
6128 for (size_t i=0; i<sv->size(); i++) {
6129 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6130
6131 if (s->fTableName != table_name)
6132 continue;
6133
6134 // schema should be empty at this point
6135 //assert(s->fVariables.size() == 0);
6136
6137 for (size_t j=0; j<columns.size(); j+=2) {
6138 const char* cn = columns[j+0].c_str();
6139 const char* ct = columns[j+1].c_str();
6140
6141 if (strcmp(cn, "_t_time") == 0)
6142 continue;
6143 if (strcmp(cn, "_i_time") == 0)
6144 continue;
6145
6146 bool found = false;
6147
6148 for (size_t k=0; k<s->fColumnNames.size(); k++) {
6149 if (s->fColumnNames[k] == cn) {
6150 found = true;
6151 break;
6152 }
6153 }
6154
6155 if (!found) {
6157 se.tag_name = cn;
6158 se.tag_type = "";
6159 se.name = cn;
6160 se.type = 0;
6161 se.n_data = 1;
6162 se.n_bytes = 0;
6163 s->fVariables.push_back(se);
6164 s->fColumnNames.push_back(cn);
6165 s->fColumnTypes.push_back(ct);
6166 s->fColumnInactive.push_back(false);
6167 s->fOffsets.push_back(-1);
6168 }
6169 }
6170 }
6171
6172 // then read column name information
6173
6174 std::string cmd;
6175 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6176 cmd += fSql->QuoteString(event_name);
6177 cmd += ";";
6178
6179 int status = fSql->Prepare(table_name, cmd.c_str());
6180
6181 if (status != DB_SUCCESS) {
6182 return status;
6183 }
6184
6185 while (1) {
6186 status = fSql->Step();
6187
6188 if (status != DB_SUCCESS)
6189 break;
6190
6191 const char* col_name = fSql->GetText(0);
6192 const char* col_type = fSql->GetText(1);
6193 const char* tag_name = fSql->GetText(2);
6194 const char* tag_type = fSql->GetText(3);
6195 time_t schema_time = fSql->GetTime(4);
6196 const char* active = fSql->GetText(5);
6197 int iactive = atoi(active);
6198
6199 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
6200
6201 if (!col_name)
6202 continue;
6203 if (!tag_name)
6204 continue;
6205 if (strlen(col_name) < 1)
6206 continue;
6207
6208 // make sure a schema exists at this time point
6209 NewSqlSchema(sv, table_name, schema_time);
6210
6211 // add this information to all schema
6212 for (size_t i=0; i<sv->size(); i++) {
6213 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6214 if (s->fTableName != table_name)
6215 continue;
6216 if (s->fTimeFrom < schema_time)
6217 continue;
6218
6219 int tid = rpc_name_tid(tag_type);
6220 int tid_size = rpc_tid_size(tid);
6221
6222 for (size_t j=0; j<s->fColumnNames.size(); j++) {
6223 if (col_name != s->fColumnNames[j])
6224 continue;
6225
6226 s->fVariables[j].tag_name = tag_name;
6227 s->fVariables[j].tag_type = tag_type;
6228 if (!iactive) {
6229 s->fVariables[j].name = "";
6230 s->fColumnInactive[j] = true;
6231 } else {
6232 s->fVariables[j].name = tag_name;
6233 s->fColumnInactive[j] = false;
6234 }
6235 s->fVariables[j].type = tid;
6236 s->fVariables[j].n_data = 1;
6237 s->fVariables[j].n_bytes = tid_size;
6238
6239 // doctor column names in case MySQL returns different type
6240 // from the type used to create the column, but the types
6241 // are actually the same. K.O.
6243 }
6244 }
6245 }
6246
6247 status = fSql->Finalize();
6248
6249 return HS_SUCCESS;
6250}
6251
6252int PgsqlHistory::read_table_and_event_names(HsSchemaVector *sv)
6253{
6254 int status;
6255
6256 if (fDebug)
6257 printf("PgsqlHistory::read_table_and_event_names!\n");
6258
6259 // loop over all tables
6260
6261 std::vector<std::string> tables;
6262 status = fSql->ListTables(&tables);
6263 if (status != DB_SUCCESS)
6264 return status;
6265
6266 for (size_t i=0; i<tables.size(); i++) {
6267 const char* table_name = tables[i].c_str();
6268
6269 const char* s;
6270 s = strstr(table_name, "_history_index");
6271 if (s == table_name)
6272 continue;
6273
6274 if (1) {
6275 // seed schema with table names
6276 HsSqlSchema* s = new HsSqlSchema;
6277 s->fSql = fSql;
6278 s->fEventName = table_name;
6279 s->fTimeFrom = 0;
6280 s->fTimeTo = 0;
6281 s->fTableName = table_name;
6282 sv->add(s);
6283 }
6284 }
6285
6286 if (0) {
6287 // print accumulated schema
6288 printf("read_table_and_event_names:\n");
6289 sv->print(false);
6290 }
6291
6292 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6293
6294 return HS_SUCCESS;
6295}
6296
6297int PgsqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
6298{
6299 if (fDebug)
6300 printf("PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
6301
6302 int status;
6303 std::string table_name = MidasNameToSqlName(event_name);
6304
6305 // PostgreSQL table name length limit is 64 bytes
6306 if (table_name.length() > 40) {
6307 table_name.resize(40);
6308 table_name += "_T";
6309 }
6310
6311 time_t now = time(NULL);
6312
6313 int max_attempts = 10;
6314 for (int i=0; i<max_attempts; i++) {
6315 status = fSql->OpenTransaction(table_name.c_str());
6316 if (status != DB_SUCCESS) {
6317 return HS_FILE_ERROR;
6318 }
6319
6320 bool have_transaction = true;
6321
6322 std::string xtable_name = table_name;
6323
6324 if (i>0) {
6325 xtable_name += "_";
6327 if (i>1) {
6328 xtable_name += "_";
6329 char buf[256];
6330 sprintf(buf, "%d", i);
6331 xtable_name += buf;
6332 }
6333 }
6334
6335 if (fPgsql->fDownsample)
6337 else
6339
6340 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
6341
6342 if (status == DB_KEY_EXIST) {
6343 // already exists, try with different name!
6344 fSql->RollbackTransaction(table_name.c_str());
6345 continue;
6346 }
6347
6348 if (status != HS_SUCCESS) {
6349 fSql->RollbackTransaction(table_name.c_str());
6350 continue;
6351 }
6352
6353 fSql->Exec(table_name.c_str(), "SAVEPOINT t0");
6354
6355 for (int j=0; j<2; j++) {
6356 std::string cmd;
6357 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6358 cmd += fSql->QuoteString(event_name);
6359 cmd += ", ";
6360 cmd += fSql->QuoteString(xtable_name.c_str());
6361 cmd += ", ";
6362 char buf[256];
6363 sprintf(buf, "%.0f", (double)timestamp);
6364 cmd += buf;
6365 cmd += ", ";
6366 cmd += fSql->QuoteString("1");
6367 cmd += ");";
6368
6369 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6370 if (status == DB_SUCCESS)
6371 break;
6372
6373 // if INSERT failed _history_index does not exist then recover to savepoint t0
6374 // to prevent whole transition abort
6375 fSql->Exec(table_name.c_str(), "ROLLBACK TO SAVEPOINT t0");
6376
6377 status = CreateSqlTable(fSql, "_history_index", &have_transaction, true);
6378 status = CreateSqlColumn(fSql, "_history_index", "event_name", "text not null", &have_transaction, fDebug);
6379 status = CreateSqlColumn(fSql, "_history_index", "table_name", "text", &have_transaction, fDebug);
6380 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "text", &have_transaction, fDebug);
6381 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "text", &have_transaction, fDebug);
6382 status = CreateSqlColumn(fSql, "_history_index", "column_name", "text", &have_transaction, fDebug);
6383 status = CreateSqlColumn(fSql, "_history_index", "column_type", "text", &have_transaction, fDebug);
6384 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
6385 status = CreateSqlColumn(fSql, "_history_index", "active", "smallint", &have_transaction, fDebug);
6386
6387 status = fSql->CommitTransaction(table_name.c_str());
6388 }
6389
6390 if (status != DB_SUCCESS) {
6391 return HS_FILE_ERROR;
6392 }
6393
6394 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6395 }
6396
6397 cm_msg(MERROR, "PgsqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
6398
6399 return HS_FILE_ERROR;
6400}
6401
6402int PgsqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
6403{
6404 if (fDebug)
6405 printf("PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
6406
6407 std::string cmd;
6408 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6409 cmd += fSql->QuoteString(event_name);
6410 cmd += ", ";
6411 cmd += fSql->QuoteString(table_name);
6412 cmd += ", ";
6413 cmd += fSql->QuoteString(tag_name);
6414 cmd += ", ";
6415 cmd += fSql->QuoteString(tag_type);
6416 cmd += ", ";
6417 cmd += fSql->QuoteString(column_name);
6418 cmd += ", ";
6419 cmd += fSql->QuoteString(column_type);
6420 cmd += ", ";
6421 char buf[256];
6422 sprintf(buf, "%.0f", (double)timestamp);
6423 cmd += buf;
6424 cmd += ", ";
6425 if (active)
6426 cmd += fSql->QuoteString("1");
6427 else
6428 cmd += fSql->QuoteString("0");
6429 cmd += ");";
6430
6431 int status = fSql->Exec(table_name, cmd.c_str());
6432 if (status != DB_SUCCESS)
6433 return HS_FILE_ERROR;
6434
6435 return HS_SUCCESS;
6436}
6437
6438#endif // HAVE_PGSQL
6439
6441// File history class //
6443
6444const time_t kDay = 24*60*60;
6445const time_t kMonth = 30*kDay;
6446
6447const double KiB = 1024;
6448const double MiB = KiB*KiB;
6449//const double GiB = KiB*MiB;
6450
6452{
6453protected:
6454 std::string fPath;
6456 std::vector<std::string> fSortedFiles;
6457 std::vector<bool> fSortedRead;
6460
6461public:
6462 FileHistory() // ctor
6463 {
6464 // empty
6465 }
6466
6467 int hs_connect(const char* connect_string);
6468 int hs_disconnect();
6469 int hs_clear_cache();
6470 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
6471 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
6472
6473protected:
6474 int create_file(const char* event_name, time_t timestamp, const std::vector<HsSchemaEntry>& vars, std::string* filenamep);
6475 HsFileSchema* read_file_schema(const char* filename);
6476 int read_file_list(bool *pchanged);
6477 void clear_file_list();
6478 void tags_to_variables(int ntags, const TAG tags[], std::vector<HsSchemaEntry>& variables);
6479 HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s);
6480};
6481
6483{
6484 if (fDebug)
6485 printf("hs_connect [%s]!\n", connect_string);
6486
6487 hs_disconnect();
6488
6491
6492 // add trailing '/'
6493 if (fPath.length() > 0) {
6494 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
6496 }
6497
6498 return HS_SUCCESS;
6499}
6500
6502{
6503 if (fDebug)
6504 printf("FileHistory::hs_clear_cache!\n");
6505 fPathLastMtime = 0;
6507}
6508
6510{
6511 if (fDebug)
6512 printf("FileHistory::hs_disconnect!\n");
6513
6516
6517 return HS_SUCCESS;
6518}
6519
6521{
6522 fPathLastMtime = 0;
6523 fSortedFiles.clear();
6524 fSortedRead.clear();
6525}
6526
6528{
6529 int status;
6530 double start_time = ss_time_sec();
6531
6532 if (pchanged)
6533 *pchanged = false;
6534
6535 struct stat stat_buf;
6536 status = stat(fPath.c_str(), &stat_buf);
6537 if (status != 0) {
6538 cm_msg(MERROR, "FileHistory::read_file_list", "Cannot stat(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
6539 return HS_FILE_ERROR;
6540 }
6541
6542 //printf("dir [%s], mtime: %d %d last: %d %d, mtime %s", fPath.c_str(), stat_buf.st_mtimespec.tv_sec, stat_buf.st_mtimespec.tv_nsec, last_mtimespec.tv_sec, last_mtimespec.tv_nsec, ctime(&stat_buf.st_mtimespec.tv_sec));
6543
6544 if (stat_buf.st_mtime == fPathLastMtime) {
6545 if (fDebug)
6546 printf("FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n", fPath.c_str(), int(stat_buf.st_mtime));
6547 return HS_SUCCESS;
6548 }
6549
6550 fPathLastMtime = stat_buf.st_mtime;
6551
6552 if (fDebug)
6553 printf("FileHistory::read_file_list: reading list of history files in \"%s\"\n", fPath.c_str());
6554
6555 std::vector<std::string> flist;
6556
6557 ss_file_find(fPath.c_str(), "mhf_*.dat", &flist);
6558
6559 double ls_time = ss_time_sec();
6560 double ls_elapsed = ls_time - start_time;
6561 if (ls_elapsed > 5.000) {
6562 cm_msg(MINFO, "FileHistory::read_file_list", "\"ls -l\" of \"%s\" took %.1f sec", fPath.c_str(), ls_elapsed);
6564 }
6565
6566 // note: reverse iterator is used to sort filenames by time, newest first
6567 std::sort(flist.rbegin(), flist.rend());
6568
6569#if 0
6570 {
6571 printf("file names sorted by time:\n");
6572 for (size_t i=0; i<flist.size(); i++) {
6573 printf("%d: %s\n", i, flist[i].c_str());
6574 }
6575 }
6576#endif
6577
6578 std::vector<bool> fread;
6579 fread.resize(flist.size()); // fill with "false"
6580
6581 // loop over the old list of files,
6582 // for files we already read, loop over new file
6583 // list and mark the same file as read. K.O.
6584 for (size_t j=0; j<fSortedFiles.size(); j++) {
6585 if (fSortedRead[j]) {
6586 for (size_t i=0; i<flist.size(); i++) {
6587 if (flist[i] == fSortedFiles[j]) {
6588 fread[i] = true;
6589 break;
6590 }
6591 }
6592 }
6593 }
6594
6597
6598 if (pchanged)
6599 *pchanged = true;
6600
6601 return HS_SUCCESS;
6602}
6603
6604int FileHistory::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
6605{
6606 if (fDebug)
6607 printf("FileHistory::read_schema: event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
6608
6609 if (sv->size() == 0) {
6610 if (fDebug)
6611 printf("FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6613 }
6614
6616 DWORD old_timeout = 0;
6619
6620 bool changed = false;
6621
6623
6624 if (status != HS_SUCCESS) {
6626 return status;
6627 }
6628
6629 if (!changed) {
6630 if ((*sv).find_event(event_name, timestamp)) {
6631 if (fDebug)
6632 printf("FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name, TimeToString(timestamp).c_str());
6634 return HS_SUCCESS;
6635 }
6636 }
6637
6638 double start_time = ss_time_sec();
6639
6640 int count_read = 0;
6641
6642 for (size_t i=0; i<fSortedFiles.size(); i++) {
6643 std::string file_name = fPath + fSortedFiles[i];
6644 if (fSortedRead[i])
6645 continue;
6646 //bool dupe = false;
6647 //for (size_t ss=0; ss<sv->size(); ss++) {
6648 // HsFileSchema* ssp = (HsFileSchema*)(*sv)[ss];
6649 // if (file_name == ssp->fFileName) {
6650 // dupe = true;
6651 // break;
6652 // }
6653 //}
6654 //if (dupe)
6655 // continue;
6656 fSortedRead[i] = true;
6658 if (!s)
6659 continue;
6660 sv->add(s);
6661 count_read++;
6662
6663 if (event_name) {
6664 if (s->fEventName == event_name) {
6665 //printf("file %s event_name %s time %s, age %f\n", file_name.c_str(), s->fEventName.c_str(), TimeToString(s->fTimeFrom).c_str(), double(timestamp - s->fTimeFrom));
6666 if (s->fTimeFrom <= timestamp) {
6667 // this file is older than the time requested,
6668 // subsequent files will be even older,
6669 // we can stop reading here.
6670 break;
6671 }
6672 }
6673 }
6674 }
6675
6676 double end_time = ss_time_sec();
6677 double read_elapsed = end_time - start_time;
6678 if (read_elapsed > 5.000) {
6679 cm_msg(MINFO, "FileHistory::read_schema", "Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name, TimeToString(timestamp).c_str(), count_read, read_elapsed);
6681 }
6682
6684
6685 return HS_SUCCESS;
6686}
6687
6688void FileHistory::tags_to_variables(int ntags, const TAG tags[], std::vector<HsSchemaEntry>& variables)
6689{
6690 for (int i=0; i<ntags; i++) {
6692
6693 int tsize = rpc_tid_size(tags[i].type);
6694
6695 e.tag_name = tags[i].name;
6696 e.tag_type = rpc_tid_name(tags[i].type);
6697 e.name = tags[i].name;
6698 e.type = tags[i].type;
6699 e.n_data = tags[i].n_data;
6700 e.n_bytes = tags[i].n_data*tsize;
6701
6702 variables.push_back(e);
6703 }
6704}
6705
6706HsSchema* FileHistory::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
6707{
6708 if (fDebug)
6709 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
6710
6711 int status;
6712
6713 HsFileSchema* s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6714
6715 if (!s) {
6716 //printf("hs_define_event: no schema for event %s\n", event_name);
6717 status = read_schema(&fWriterSchema, event_name, timestamp);
6718 if (status != HS_SUCCESS)
6719 return NULL;
6720 s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6721 } else {
6722 //printf("hs_define_event: already have schema for event %s\n", s->fEventName.c_str());
6723 }
6724
6725 bool xdebug = false;
6726
6727 if (s) { // is existing schema the same as new schema?
6728 bool same = true;
6729
6730 if (same) {
6731 if (s->fEventName != event_name) {
6732 if (xdebug)
6733 printf("AAA: [%s] [%s]!\n", s->fEventName.c_str(), event_name);
6734 same = false;
6735 }
6736 }
6737
6738 if (same) {
6739 if (s->fVariables.size() != (size_t)ntags) {
6740 if (xdebug)
6741 printf("BBB: event [%s]: ntags: %zu -> %d!\n", event_name, s->fVariables.size(), ntags);
6742 same = false;
6743 }
6744 }
6745
6746 if (same) {
6747 for (size_t i=0; i<s->fVariables.size(); i++) {
6748 if (s->fVariables[i].name != tags[i].name) {
6749 if (xdebug)
6750 printf("CCC: event [%s] index %zu: name [%s] -> [%s]!\n", event_name, i, s->fVariables[i].name.c_str(), tags[i].name);
6751 same = false;
6752 }
6753 if (s->fVariables[i].type != (int)tags[i].type) {
6754 if (xdebug)
6755 printf("DDD: event [%s] index %zu: type %d -> %d!\n", event_name, i, s->fVariables[i].type, tags[i].type);
6756 same = false;
6757 }
6758 if (s->fVariables[i].n_data != (int)tags[i].n_data) {
6759 if (xdebug)
6760 printf("EEE: event [%s] index %zu: n_data %d -> %d!\n", event_name, i, s->fVariables[i].n_data, tags[i].n_data);
6761 same = false;
6762 }
6763 if (!same)
6764 break;
6765 }
6766 }
6767
6768 if (!same) {
6769 if (xdebug) {
6770 printf("*** Schema for event %s has changed!\n", event_name);
6771
6772 printf("*** Old schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6773 s->print();
6774 printf("*** New tags:\n");
6775 PrintTags(ntags, tags);
6776 }
6777
6778 if (fDebug)
6779 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags);
6780
6781 s = NULL;
6782 }
6783 }
6784
6785 if (s) {
6786 // maybe this schema is too old - rotate files every so often
6787 time_t age = timestamp - s->fTimeFrom;
6788 //printf("*** age %s (%.1f months), timestamp %s, time_from %s, file %s\n", TimeToString(age).c_str(), (double)age/(double)kMonth, TimeToString(timestamp).c_str(), TimeToString(s->fTimeFrom).c_str(), s->fFileName.c_str());
6789 if (age > fConfMaxFileAge) {
6790 if (fDebug)
6791 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, (double)age/(double)kMonth);
6792
6793 // force creation of a new file
6794 s = NULL;
6795 }
6796 }
6797
6798 if (s) {
6799 // maybe this file is too big - rotate files to limit maximum size
6800 double size = ss_file_size(s->fFileName.c_str());
6801 //printf("*** size %.0f, file %s\n", size, s->fFileName.c_str());
6802 if (size > fConfMaxFileSize) {
6803 if (fDebug)
6804 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, max size %.1f MiBytes, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, size/MiB, fConfMaxFileSize/MiB);
6805
6806 // force creation of a new file
6807 s = NULL;
6808 }
6809 }
6810
6811 if (!s) {
6812 std::string filename;
6813
6814 std::vector<HsSchemaEntry> vars;
6815
6816 tags_to_variables(ntags, tags, vars);
6817
6818 status = create_file(event_name, timestamp, vars, &filename);
6819 if (status != HS_SUCCESS)
6820 return NULL;
6821
6822 HsFileSchema* ss = read_file_schema(filename.c_str());
6823 if (!ss) {
6824 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6825 return NULL;
6826 }
6827
6829
6830 s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6831
6832 if (!s) {
6833 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6834 return NULL;
6835 }
6836
6837 if (xdebug) {
6838 printf("*** New schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6839 s->print();
6840 }
6841 }
6842
6843 assert(s != NULL);
6844
6845#if 0
6846 {
6847 printf("schema for [%s] is %p\n", event_name, s);
6848 if (s)
6849 s->print();
6850 }
6851#endif
6852
6853 HsFileSchema* e = new HsFileSchema();
6854
6855 *e = *s; // make a copy of the schema
6856
6857 return e;
6858}
6859
6860int FileHistory::create_file(const char* event_name, time_t timestamp, const std::vector<HsSchemaEntry>& vars, std::string* filenamep)
6861{
6862 if (fDebug)
6863 printf("FileHistory::create_file: event [%s]\n", event_name);
6864
6865 // NB: file names are constructed in such a way
6866 // that when sorted lexicographically ("ls -1 | sort")
6867 // they *also* become sorted by time
6868
6869 struct tm tm;
6870 localtime_r(&timestamp, &tm); // somebody must call tzset() before this.
6871
6872 char buf[256];
6873 strftime(buf, sizeof(buf), "%Y%m%d", &tm);
6874
6875 std::string filename;
6876 filename += fPath;
6877 filename += "mhf_";
6878 filename += TimeToString(timestamp);
6879 filename += "_";
6880 filename += buf;
6881 filename += "_";
6882 filename += MidasNameToFileName(event_name);
6883
6884 std::string try_filename = filename + ".dat";
6885
6886 FILE *fp = NULL;
6887 for (int itry=0; itry<10; itry++) {
6888 if (itry > 0) {
6889 char s[256];
6890 sprintf(s, "_%d", rand());
6891 try_filename = filename + s + ".dat";
6892 }
6893
6894 fp = fopen(try_filename.c_str(), "r");
6895 if (fp != NULL) {
6896 // this file already exists, try with a different name
6897 fclose(fp);
6898 continue;
6899 }
6900
6901 fp = fopen(try_filename.c_str(), "w");
6902 if (fp == NULL) {
6903 // somehow cannot create this file, try again
6904 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6905 continue;
6906 }
6907
6908 // file opened
6909 break;
6910 }
6911
6912 if (fp == NULL) {
6913 // somehow cannot create any file, whine!
6914 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6915 return HS_FILE_ERROR;
6916 }
6917
6918 std::string ss;
6919
6920 ss += "version: 2.0\n";
6921 ss += "event_name: ";
6922 ss += event_name;
6923 ss += "\n";
6924 ss += "time: ";
6925 ss += TimeToString(timestamp);
6926 ss += "\n";
6927
6928 int recsize = 0;
6929
6930 ss += "tag: /DWORD 1 4 /timestamp\n";
6931 recsize += 4;
6932
6933 bool padded = false;
6934 int offset = 0;
6935
6936 bool xdebug = false; // (strcmp(event_name, "u_Beam") == 0);
6937
6938 for (size_t i=0; i<vars.size(); i++) {
6939 int tsize = rpc_tid_size(vars[i].type);
6940 int n_bytes = vars[i].n_data*tsize;
6941 int xalign = (offset % tsize);
6942
6943 if (xdebug)
6944 printf("tag %zu, tsize %d, n_bytes %d, xalign %d\n", i, tsize, n_bytes, xalign);
6945
6946#if 0
6947 // looks like history data does not do alignement and padding
6948 if (xalign != 0) {
6949 padded = true;
6950 int pad_bytes = tsize - xalign;
6951 assert(pad_bytes > 0);
6952
6953 ss += "tag: ";
6954 ss += "XPAD";
6955 ss += " ";
6956 ss += SmallIntToString(1);
6957 ss += " ";
6959 ss += " ";
6960 ss += "pad_";
6961 ss += SmallIntToString(i);
6962 ss += "\n";
6963
6964 offset += pad_bytes;
6965 recsize += pad_bytes;
6966
6967 assert((offset % tsize) == 0);
6968 fprintf(stderr, "FIXME: need to debug padding!\n");
6969 //abort();
6970 }
6971#endif
6972
6973 ss += "tag: ";
6974 ss += rpc_tid_name(vars[i].type);
6975 ss += " ";
6976 ss += SmallIntToString(vars[i].n_data);
6977 ss += " ";
6978 ss += SmallIntToString(n_bytes);
6979 ss += " ";
6980 ss += vars[i].name;
6981 ss += "\n";
6982
6983 recsize += n_bytes;
6984 offset += n_bytes;
6985 }
6986
6987 ss += "record_size: ";
6989 ss += "\n";
6990
6991 // reserve space for "data_offset: ..."
6992 int sslength = ss.length() + 127;
6993
6994 int block = 1024;
6995 int nb = (sslength + block - 1)/block;
6996 int data_offset = block * nb;
6997
6998 ss += "data_offset: ";
7000 ss += "\n";
7001
7002 fprintf(fp, "%s", ss.c_str());
7003
7004 fclose(fp);
7005
7006 if (1 && padded) {
7007 printf("Schema in file %s has padding:\n", try_filename.c_str());
7008 printf("%s", ss.c_str());
7009 }
7010
7011 if (filenamep)
7013
7014 return HS_SUCCESS;
7015}
7016
7018{
7019 if (fDebug)
7020 printf("FileHistory::read_file_schema: file %s\n", filename);
7021
7022 FILE* fp = fopen(filename, "r");
7023 if (!fp) {
7024 cm_msg(MERROR, "FileHistory::read_file_schema", "Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
7025 return NULL;
7026 }
7027
7028 HsFileSchema* s = NULL;
7029
7030 // File format looks like this:
7031 // version: 2.0
7032 // event_name: u_Beam
7033 // time: 1023174012
7034 // tag: /DWORD 1 4 /timestamp
7035 // tag: FLOAT 1 4 B1
7036 // ...
7037 // tag: FLOAT 1 4 Ref Heater
7038 // record_size: 84
7039 // data_offset: 1024
7040
7041 size_t rd_recsize = 0;
7042 int offset = 0;
7043
7044 while (1) {
7045 char buf[1024];
7046 char* b = fgets(buf, sizeof(buf), fp);
7047
7048 //printf("read: %s\n", b);
7049
7050 if (!b) {
7051 break; // end of file
7052 }
7053
7054 char*bb;
7055
7056 bb = strchr(b, '\n');
7057 if (bb)
7058 *bb = 0;
7059
7060 bb = strchr(b, '\r');
7061 if (bb)
7062 *bb = 0;
7063
7064 bb = strstr(b, "version: 2.0");
7065 if (bb == b) {
7066 s = new HsFileSchema();
7067 assert(s);
7068
7069 s->fFileName = filename;
7070 continue;
7071 }
7072
7073 if (!s) {
7074 // malformed history file
7075 break;
7076 }
7077
7078 bb = strstr(b, "event_name: ");
7079 if (bb == b) {
7080 s->fEventName = bb + 12;
7081 continue;
7082 }
7083
7084 bb = strstr(b, "time: ");
7085 if (bb == b) {
7086 s->fTimeFrom = strtoul(bb + 6, NULL, 10);
7087 continue;
7088 }
7089
7090 // tag format is like this:
7091 //
7092 // tag: FLOAT 1 4 Ref Heater
7093 //
7094 // "FLOAT" is the MIDAS type, "/DWORD" is special tag for the timestamp
7095 // "1" is the number of array elements
7096 // "4" is the total tag size in bytes (n_data*tid_size)
7097 // "Ref Heater" is the tag name
7098
7099 bb = strstr(b, "tag: ");
7100 if (bb == b) {
7101 bb += 5; // now points to the tag MIDAS type
7102 const char* midas_type = bb;
7103 char* bbb = strchr(bb, ' ');
7104 if (bbb) {
7105 *bbb = 0;
7106 HsSchemaEntry t;
7107 if (midas_type[0] == '/') {
7108 t.type = 0;
7109 } else {
7111 if (t.type == 0) {
7112 cm_msg(MERROR, "FileHistory::read_file_schema", "Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
7113 if (s)
7114 delete s;
7115 s = NULL;
7116 break;
7117 }
7118 }
7119 bbb++;
7120 while (*bbb == ' ')
7121 bbb++;
7122 if (*bbb) {
7123 t.n_data = strtoul(bbb, &bbb, 10);
7124 while (*bbb == ' ')
7125 bbb++;
7126 if (*bbb) {
7127 t.n_bytes = strtoul(bbb, &bbb, 10);
7128 while (*bbb == ' ')
7129 bbb++;
7130 t.name = bbb;
7131 }
7132 }
7133
7134 if (midas_type[0] != '/') {
7135 s->fVariables.push_back(t);
7136 s->fOffsets.push_back(offset);
7137 offset += t.n_bytes;
7138 }
7139
7140 rd_recsize += t.n_bytes;
7141 }
7142 continue;
7143 }
7144
7145 bb = strstr(b, "record_size: ");
7146 if (bb == b) {
7147 s->fRecordSize = atoi(bb + 12);
7148 continue;
7149 }
7150
7151 bb = strstr(b, "data_offset: ");
7152 if (bb == b) {
7153 s->fDataOffset = atoi(bb + 12);
7154 // data offset is the last entry in the file
7155 break;
7156 }
7157 }
7158
7159 fclose(fp);
7160
7161 if (!s) {
7162 cm_msg(MERROR, "FileHistory::read_file_schema", "Malformed history schema in \'%s\', maybe it is not a history file", filename);
7163 return NULL;
7164 }
7165
7166 if (rd_recsize != s->fRecordSize) {
7167 cm_msg(MERROR, "FileHistory::read_file_schema", "Record size mismatch in history schema from \'%s\', file says %zu while total of all tags is %zu", filename, s->fRecordSize, rd_recsize);
7168 if (s)
7169 delete s;
7170 return NULL;
7171 }
7172
7173 if (!s) {
7174 cm_msg(MERROR, "FileHistory::read_file_schema", "Could not read history schema from \'%s\', maybe it is not a history file", filename);
7175 if (s)
7176 delete s;
7177 return NULL;
7178 }
7179
7180 if (fDebug > 1)
7181 s->print();
7182
7183 return s;
7184}
7185
7186HsSchema* FileHistory::maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s)
7187{
7188 HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
7189
7190 assert(fs != NULL); // FileHistory::maybe_reopen() must be called only with file history schema.
7191
7192 if (fs->fFileSize <= fConfMaxFileSize) {
7193 // not big enough, let it grow
7194 return s;
7195 }
7196
7197 // must rotate the file
7198
7199 if (fDebug) {
7200#ifdef OS_DARWIN
7201 printf("FileHistory::maybe_reopen: reopen file \"%s\", size %lld, max size %lld\n", fs->fFileName.c_str(), fs->fFileSize, fConfMaxFileSize);
7202#else
7203 printf("FileHistory::maybe_reopen: reopen file \"%s\", size %jd, max size %jd\n", fs->fFileName.c_str(), fs->fFileSize, fConfMaxFileSize);
7204#endif
7205 }
7206
7207 std::string new_filename;
7208
7209 int status = create_file(event_name, timestamp, fs->fVariables, &new_filename);
7210
7211 if (status != HS_SUCCESS) {
7212 // cannot create new history file
7213 return s;
7214 }
7215
7217
7218 if (!new_fs) {
7219 // cannot open new history file
7220 return s;
7221 }
7222
7223 new_fs->fDisabled = false;
7224
7226 *new_fs_copy = *new_fs; // make a copy
7227
7228 //printf("replacing schema %p %p with %p %p\n", s, fs, new_fs, new_fs_copy);
7229
7230 for (size_t i=0; i<fWriterEvents.size(); i++) {
7231 if (s == fWriterEvents[i]) {
7232 delete fWriterEvents[i];
7234 s = NULL; // pointer to fEvents[i]
7235 fs = NULL; // pointer to fEvents[i]
7236 }
7237 }
7238
7239 assert(s == NULL); // the schema we are replacing must be in fWriterEvents.
7240
7241 fWriterSchema.add(new_fs_copy); // make sure new file is added to the list of files
7242
7243 assert(new_fs->fFileSize < fConfMaxFileSize); // check that we are not returning the original big file
7244
7245 //new_fs->print();
7246
7247 return new_fs;
7248}
7249
7251// Factory constructors //
7253
7255{
7256#ifdef HAVE_SQLITE
7257 return new SqliteHistory();
7258#else
7259 cm_msg(MERROR, "MakeMidasHistorySqlite", "Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7260 return NULL;
7261#endif
7262}
7263
7265{
7266#ifdef HAVE_MYSQL
7267 return new MysqlHistory();
7268#else
7269 cm_msg(MERROR, "MakeMidasHistoryMysql", "Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7270 return NULL;
7271#endif
7272}
7273
7275{
7276#ifdef HAVE_PGSQL
7277 return new PgsqlHistory();
7278#else
7279 cm_msg(MERROR, "MakeMidasHistoryPgsql", "Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
7280 return NULL;
7281#endif
7282}
7283
7288
7289/* emacs
7290 * Local Variables:
7291 * tab-width: 8
7292 * c-basic-offset: 3
7293 * indent-tabs-mode: nil
7294 * End:
7295 */
#define FALSE
Definition cfortran.h:309
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
std::string fPath
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
off64_t fConfMaxFileSize
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
void tags_to_variables(int ntags, const TAG tags[], std::vector< HsSchemaEntry > &variables)
int create_file(const char *event_name, time_t timestamp, const std::vector< HsSchemaEntry > &vars, std::string *filenamep)
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
void remove_inactive_columns()
off64_t fFileSizeInitial
std::string fFileName
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int write_event(const time_t t, const char *data, const size_t data_size)
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int fCountWriteUndersize
virtual void print(bool print_tags=true) const
std::vector< int > fOffsets
virtual ~HsSchema()
virtual void remove_inactive_columns()=0
std::vector< HsSchemaEntry > fVariables
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
size_t fWriteMinSize
virtual int write_event(const time_t t, const char *data, const size_t data_size)=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
virtual int close()=0
int fCountWriteOversize
size_t fWriteMaxSize
std::string fEventName
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
void print(bool print_tags=true) const
std::vector< HsSchema * > fData
size_t size() const
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
void add(HsSchema *s)
HsSchema * operator[](int index) const
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
std::vector< std::string > fColumnNames
void print(bool print_tags=true) const
void remove_inactive_columns()
std::vector< std::string > fColumnTypes
std::vector< bool > fColumnInactive
void increment_transaction_count()
int match_event_var(const char *event_name, const char *var_name, const int var_index)
std::string fTableName
static std::map< SqlBase *, int > gfTransactionCount
void reset_transaction_count()
int write_event(const time_t t, const char *data, const size_t data_size)
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
Definition history.h:112
char name[NAME_LENGTH]
Definition history.h:111
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
virtual HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)=0
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
HsSchemaVector fWriterSchema
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
std::vector< HsSchema * > fWriterEvents
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
HsSchemaVector fReaderSchema
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Finalize()=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual ~SqlBase()
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual int Step()=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3339
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3297
#define DB_KEY_EXIST
Definition midas.h:641
#define DB_FILE_ERROR
Definition midas.h:647
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_MORE_SUBKEYS
Definition midas.h:646
#define HS_UNDEFINED_VAR
Definition midas.h:733
#define HS_SUCCESS
Definition midas.h:727
#define HS_FILE_ERROR
Definition midas.h:728
#define HS_UNDEFINED_EVENT
Definition midas.h:732
unsigned int DWORD
Definition mcstd.h:51
#define TID_DOUBLE
Definition midas.h:343
#define TID_SBYTE
Definition midas.h:329
#define TID_BOOL
Definition midas.h:340
#define TID_SHORT
Definition midas.h:334
#define TID_WORD
Definition midas.h:332
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_CHAR
Definition midas.h:331
#define TID_INT
Definition midas.h:338
#define TID_FLOAT
Definition midas.h:341
#define TID_LAST
Definition midas.h:354
#define TID_DWORD
Definition midas.h:336
double ss_file_size(const char *path)
Definition system.cxx:7050
double ss_time_sec()
Definition system.cxx:3539
INT ss_file_find(const char *path, const char *pattern, char **plist)
Definition system.cxx:6791
INT cm_msg_flush_buffer()
Definition midas.cxx:879
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:929
const char * rpc_tid_name(INT id)
Definition midas.cxx:11786
int rpc_name_tid(const char *name)
Definition midas.cxx:11800
INT rpc_tid_size(INT id)
Definition midas.cxx:11779
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
const time_t kMonth
const double KiB
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int ReadRecord(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t irec, char *rec)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
const double MiB
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
const time_t kDay
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
static int FindTime(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t nrec, time_t timestamp, off64_t *i1p, time_t *t1p, off64_t *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
INT index
Definition mana.cxx:271
DWORD last_time
Definition mana.cxx:3070
void * data
Definition mana.cxx:268
BOOL debug
debug printouts
Definition mana.cxx:254
INT type
Definition mana.cxx:269
char host_name[HOST_NAME_LENGTH]
Definition mana.cxx:242
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
static int offset
Definition mgd.cxx:1500
#define DIR_SEPARATOR
Definition midas.h:193
DWORD BOOL
Definition midas.h:105
#define DIR_SEPARATOR_STR
Definition midas.h:194
#define read(n, a, f)
#define write(n, a, f, d)
#define name(x)
Definition midas_macro.h:24
static FILE * fp
struct callback_addr callback
Definition mserver.cxx:22
MUTEX_T * tm
Definition odbedit.cxx:39
BOOL match(char *pat, char *str)
Definition odbedit.cxx:190
INT j
Definition odbhist.cxx:40
INT k
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
char file_name[256]
Definition odbhist.cxx:41
INT add
Definition odbhist.cxx:40
DWORD status
Definition odbhist.cxx:39
char var_name[256]
Definition odbhist.cxx:41
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
std::string tag_name
int type
std::string tag_type
int n_data
std::string name
int n_bytes
Definition midas.h:1232
DWORD type
Definition midas.h:1234
DWORD n_data
Definition midas.h:1235
char name[NAME_LENGTH]
Definition midas.h:1233
char c
Definition system.cxx:1312
@ DIR
Definition test_init.cxx:7
static double e(void)
Definition tinyexpr.c:136