MIDAS
Loading...
Searching...
No Matches
history_schema.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: history_schema.cxx
4 Created by: Konstantin Olchanski
5
6 Contents: Schema based MIDAS history. Available drivers:
7 FileHistory: storage of data in binary files (replacement for the traditional MIDAS history)
8 MysqlHistory: storage of data in MySQL database (replacement for the ODBC based SQL history)
9 PgsqlHistory: storage of data in PostgreSQL database
10 SqliteHistory: storage of data in SQLITE3 database (not suitable for production use)
11
12\********************************************************************/
13
14#undef NDEBUG // midas required assert() to be always enabled
15
16#include "midas.h"
17#include "msystem.h"
18#include "mstrlcpy.h"
19
20#include <math.h>
21
22#include <vector>
23#include <list>
24#include <string>
25#include <map>
26#include <algorithm>
27
28// make mysql/my_global.h happy - it redefines closesocket()
29#undef closesocket
30
31//
32// benchmarks
33//
34// /usr/bin/time ./linux/bin/mh2sql . /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
35// -rw-r--r-- 1 alpha users 161028048 Oct 19 2012 /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
36// flush 10000, sync=OFF -> 51.51user 1.51system 0:53.76elapsed 98%CPU
37// flush 1000000, sync=NORMAL -> 51.83user 2.09system 1:08.37elapsed 78%CPU (flush never activated)
38// flush 100000, sync=NORMAL -> 51.38user 1.94system 1:06.94elapsed 79%CPU
39// flush 10000, sync=NORMAL -> 51.37user 2.03system 1:31.63elapsed 58%CPU
40// flush 1000, sync=NORMAL -> 52.16user 2.70system 4:38.58elapsed 19%CPU
41
43// MIDAS includes //
45
46#include "midas.h"
47#include "history.h"
48
50// helper stuff //
52
53#define FREE(x) { if (x) free(x); (x) = NULL; }
54
55static char* skip_spaces(char* s)
56{
57 while (*s) {
58 if (!isspace(*s))
59 break;
60 s++;
61 }
62 return s;
63}
64
65static std::string TimeToString(time_t t)
66{
67 const char* sign = "";
68
69 if (t == 0)
70 return "0";
71
72 time_t tt = t;
73
74 if (t < 0) {
75 sign = "-";
76 tt = -t;
77 }
78
79 assert(tt > 0);
80
81 std::string v;
82 while (tt) {
83 char c = '0' + (char)(tt%10);
84 tt /= 10;
85 v = c + v;
86 }
87
88 v = sign + v;
89
90 //printf("time %.0f -> %s\n", (double)t, v.c_str());
91
92 return v;
93}
94
95static std::string SmallIntToString(int i)
96{
97 //int ii = i;
98
99 if (i == 0)
100 return "0";
101
102 assert(i > 0);
103
104 std::string v;
105 while (i) {
106 char c = '0' + (char)(i%10);
107 i /= 10;
108 v = c + v;
109 }
110
111 //printf("SmallIntToString: %d -> %s\n", ii, v.c_str());
112
113 return v;
114}
115
116static bool MatchEventName(const char* event_name, const char* var_event_name)
117{
118 // new-style event name: "equipment_name/variable_name:tag_name"
119 // old-style event name: "equipment_name:tag_name" ("variable_name" is missing)
121
122 //printf("looking for event_name [%s], try table [%s] event name [%s], new style [%d]\n", var_event_name, table_name, event_name, newStyleEventName);
123
124 if (strcasecmp(event_name, var_event_name) == 0) {
125 return true;
126 } else if (newStyleEventName) {
127 return false;
128 } else { // for old style names, need more parsing
129 bool match = false;
130
131 const char* s = event_name;
132 for (int j=0; s[j]; j++) {
133
134 if ((var_event_name[j]==0) && (s[j]=='/')) {
135 match = true;
136 break;
137 }
138
139 if ((var_event_name[j]==0) && (s[j]=='_')) {
140 match = true;
141 break;
142 }
143
144 if (var_event_name[j]==0) {
145 match = false;
146 break;
147 }
148
149 if (tolower(var_event_name[j]) != tolower(s[j])) { // does not work for UTF-8 Unicode
150 match = false;
151 break;
152 }
153 }
154
155 return match;
156 }
157}
158
159static bool MatchTagName(const char* tag_name, int n_data, const char* var_tag_name, const int var_tag_index)
160{
161 char alt_tag_name[1024]; // maybe this is an array without "Names"?
163
164 //printf(" looking for tag [%s] alt [%s], try column name [%s]\n", var_tag_name, alt_tag_name, tag_name);
165
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
168 return true;
169
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
171 return true;
172
173 return false;
174}
175
176static void PrintTags(int ntags, const TAG tags[])
177{
178 for (int i=0; i<ntags; i++)
179 printf("tag %d: %s %s[%d]\n", i, rpc_tid_name(tags[i].type), tags[i].name, tags[i].n_data);
180}
181
182// convert MIDAS event name to something acceptable as an SQL identifier - table name, column name, etc
183
184static std::string MidasNameToSqlName(const char* s)
185{
186 std::string out;
187
188 for (int i=0; s[i]!=0; i++) {
189 char c = s[i];
190 if (isalpha(c) || isdigit(c))
191 out += tolower(c); // does not work for UTF-8 Unicode
192 else
193 out += '_';
194 }
195
196 return out;
197}
198
199// convert MIDAS event name to something acceptable as a file name
200
201static std::string MidasNameToFileName(const char* s)
202{
203 std::string out;
204
205 for (int i=0; s[i]!=0; i++) {
206 char c = s[i];
207 if (isalpha(c) || isdigit(c))
208 out += tolower(c); // does not work for UTF-8 Unicode
209 else
210 out += '_';
211 }
212
213 return out;
214}
215
216// compare event names
217
218static int event_name_cmp(const std::string& e1, const char* e2)
219{
220 return strcasecmp(e1.c_str(), e2);
221}
222
223// compare variable names
224
225static int var_name_cmp(const std::string& v1, const char* v2)
226{
227 return strcasecmp(v1.c_str(), v2);
228}
229
231// SQL data types //
233
234#ifdef HAVE_SQLITE
235static const char *sql_type_sqlite[TID_LAST] = {
236 "xxxINVALIDxxxNULL", // TID_NULL
237 "INTEGER", // TID_UINT8
238 "INTEGER", // TID_INT8
239 "TEXT", // TID_CHAR
240 "INTEGER", // TID_UINT16
241 "INTEGER", // TID_INT16
242 "INTEGER", // TID_UINT32
243 "INTEGER", // TID_INT32
244 "INTEGER", // TID_BOOL
245 "REAL", // TID_FLOAT
246 "REAL", // TID_DOUBLE
247 "INTEGER", // TID_BITFIELD
248 "TEXT", // TID_STRING
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
251 "xxxINVALIDxxxKEY",
252 "xxxINVALIDxxxLINK"
253};
254#endif
255
256#ifdef HAVE_PGSQL
257static const char *sql_type_pgsql[TID_LAST] = {
258 "xxxINVALIDxxxNULL", // TID_NULL
259 "smallint", // TID_BYTE
260 "smallint", // TID_SBYTE
261 "char(1)", // TID_CHAR
262 "integer", // TID_WORD
263 "smallint", // TID_SHORT
264 "bigint", // TID_DWORD
265 "integer", // TID_INT
266 "smallint", // TID_BOOL
267 "real", // TID_FLOAT
268 "double precision", // TID_DOUBLE
269 "bigint", // TID_BITFIELD
270 "text", // TID_STRING
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
273 "xxxINVALIDxxxKEY",
274 "xxxINVALIDxxxLINK"
275};
276#endif
277
278#ifdef HAVE_MYSQL
279static const char *sql_type_mysql[TID_LAST] = {
280 "xxxINVALIDxxxNULL", // TID_NULL
281 "tinyint unsigned", // TID_BYTE
282 "tinyint", // TID_SBYTE
283 "char", // TID_CHAR
284 "smallint unsigned", // TID_WORD
285 "smallint", // TID_SHORT
286 "integer unsigned", // TID_DWORD
287 "integer", // TID_INT
288 "tinyint", // TID_BOOL
289 "float", // TID_FLOAT
290 "double", // TID_DOUBLE
291 "integer unsigned", // TID_BITFIELD
292 "VARCHAR", // TID_STRING
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
295 "xxxINVALIDxxxKEY",
296 "xxxINVALIDxxxLINK"
297};
298#endif
299
300void DoctorPgsqlColumnType(std::string* col_type, const char* index_type)
301{
302 if (*col_type == index_type)
303 return;
304
305 if (*col_type == "bigint" && strcmp(index_type, "int8")==0) {
307 return;
308 }
309
310 if (*col_type == "integer" && strcmp(index_type, "int4")==0) {
312 return;
313 }
314
315 if (*col_type == "smallint" && strcmp(index_type, "int2")==0) {
317 return;
318 }
319
320 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
322 abort();
323}
324
325void DoctorSqlColumnType(std::string* col_type, const char* index_type)
326{
327 if (*col_type == index_type)
328 return;
329
330 if (*col_type == "int(10) unsigned" && strcmp(index_type, "integer unsigned")==0) {
332 return;
333 }
334
335 if (*col_type == "int(11)" && strcmp(index_type, "integer")==0) {
337 return;
338 }
339
340 if (*col_type == "integer" && strcmp(index_type, "int(11)")==0) {
342 return;
343 }
344
345 // MYSQL 8.0.23
346
347 if (*col_type == "int" && strcmp(index_type, "integer")==0) {
349 return;
350 }
351
352 if (*col_type == "int unsigned" && strcmp(index_type, "integer unsigned")==0) {
354 return;
355 }
356
357 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
359 abort();
360}
361
362#if 0
363static int sql2midasType_mysql(const char* name)
364{
365 for (int tid=0; tid<TID_LAST; tid++)
366 if (strcasecmp(name, sql_type_mysql[tid])==0)
367 return tid;
368 // FIXME!
369 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
370 return 0;
371}
372#endif
373
374#if 0
375static int sql2midasType_sqlite(const char* name)
376{
377 if (strcmp(name, "INTEGER") == 0)
378 return TID_INT;
379 if (strcmp(name, "REAL") == 0)
380 return TID_DOUBLE;
381 if (strcmp(name, "TEXT") == 0)
382 return TID_STRING;
383 // FIXME!
384 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
385 return 0;
386}
387#endif
388
390// Schema base classes //
392
394 std::string tag_name; // tag name from MIDAS
395 std::string tag_type; // tag type from MIDAS
396 std::string name; // entry name, same as tag_name except when read from SQL history when it could be the SQL column name
397 int type = 0; // MIDAS data type TID_xxx
398 int n_data = 0; // MIDAS array size
399 int n_bytes = 0; // n_data * size of MIDAS data type (only used by HsFileSchema?)
400};
401
403{
404public:
405
406 // event schema definitions
407 std::string fEventName;
410 std::vector<HsSchemaEntry> fVariables;
411 std::vector<int> fOffsets;
412 size_t fNumBytes = 0;
413
414 // run time data used by hs_write_event()
417 size_t fWriteMaxSize = 0;
418 size_t fWriteMinSize = 0;
419
420 // schema disabled by write error
421 bool fDisabled = true;
422
423public:
424
425 HsSchema() // ctor
426 {
427 // empty
428 }
429
430 virtual void remove_inactive_columns() = 0; // used by SQL schemas
431 virtual void print(bool print_tags = true) const;
432 virtual ~HsSchema(); // dtor
433 virtual int flush_buffers() = 0;
434 virtual int close() = 0;
435 virtual int write_event(const time_t t, const char* data, const size_t data_size) = 0;
436 virtual int match_event_var(const char* event_name, const char* var_name, const int var_index);
437 virtual int read_last_written(const time_t timestamp,
438 const int debug,
439 time_t* last_written) = 0;
440 virtual int read_data(const time_t start_time,
441 const time_t end_time,
442 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
443 const int debug,
444 std::vector<time_t>& last_time,
445 MidasHistoryBufferInterface* buffer[]) = 0;
446};
447
449{
450protected:
451 std::vector<HsSchema*> fData;
452
453public:
454 ~HsSchemaVector() { // dtor
455 clear();
456 }
457
459 return fData[index];
460 }
461
462 size_t size() const {
463 return fData.size();
464 }
465
466 void add(HsSchema* s);
467
468 void clear() {
469 for (size_t i=0; i<fData.size(); i++)
470 if (fData[i]) {
471 delete fData[i];
472 fData[i] = NULL;
473 }
474 fData.clear();
475 }
476
477 void print(bool print_tags = true) const {
478 for (size_t i=0; i<fData.size(); i++)
480 }
481
482 HsSchema* find_event(const char* event_name, const time_t timestamp, int debug = 0);
483};
484
486// Base class functions //
488
490{
491 //printf("HsSchema::dtor %p!\n", this);
492 // only report if undersize/oversize happens more than once -
493 // the first occurence is already reported by hs_write_event()
494 if (fCountWriteUndersize > 1) {
495 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %zu bytes, hs_write_event() called with as few as %zu bytes", fEventName.c_str(), fCountWriteUndersize, fNumBytes, fWriteMinSize);
496 }
497
498 if (fCountWriteOversize > 1) {
499 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %zu bytes, hs_write_event() called with as many as %zu bytes", fEventName.c_str(), fCountWriteOversize, fNumBytes, fWriteMaxSize);
500 }
501};
502
504{
505 // schema list "data" is sorted by decreasing "fTimeFrom", newest schema first
506
507 //printf("add: %s..%s %s\n", TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), s->fEventName.c_str());
508
509 bool added = false;
510
511 for (auto it = fData.begin(); it != fData.end(); it++) {
512 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
513 if (s->fTimeFrom == (*it)->fTimeFrom) {
514 // duplicate schema, keep the last one added (for file schema it is the newer file)
515 s->fTimeTo = (*it)->fTimeTo;
516 delete (*it);
517 (*it) = s;
518 return;
519 }
520 }
521
522 if (s->fTimeFrom > (*it)->fTimeFrom) {
523 fData.insert(it, s);
524 added = true;
525 break;
526 }
527 }
528
529 if (!added) {
530 fData.push_back(s);
531 }
532
533 //time_t oldest_time_from = fData.back()->fTimeFrom;
534
535 time_t time_to = 0;
536
537 for (auto it = fData.begin(); it != fData.end(); it++) {
538 if (event_name_cmp((*it)->fEventName, s->fEventName.c_str())==0) {
539 (*it)->fTimeTo = time_to;
540 time_to = (*it)->fTimeFrom;
541
542 //printf("vvv: %s..%s %s\n", TimeToString((*it)->fTimeFrom-oldest_time_from).c_str(), TimeToString((*it)->fTimeTo-oldest_time_from).c_str(), (*it)->fEventName.c_str());
543 }
544 }
545}
546
547HsSchema* HsSchemaVector::find_event(const char* event_name, time_t t, int debug)
548{
549 HsSchema* ss = NULL;
550
551 if (debug) {
552 printf("find_event: All schema for event %s: (total %zu)\n", event_name, fData.size());
553 int found = 0;
554 for (size_t i=0; i<fData.size(); i++) {
555 HsSchema* s = fData[i];
556 printf("find_event: schema %zu name [%s]\n", i, s->fEventName.c_str());
557 if (event_name)
558 if (event_name_cmp(s->fEventName, event_name)!=0)
559 continue;
560 s->print();
561 found++;
562 }
563 printf("find_event: Found %d schemas for event %s\n", found, event_name);
564
565 //if (found == 0)
566 // abort();
567 }
568
569 for (size_t i=0; i<fData.size(); i++) {
570 HsSchema* s = fData[i];
571
572 // wrong event
573 if (event_name)
574 if (event_name_cmp(s->fEventName, event_name)!=0)
575 continue;
576
577 // schema is from after the time we are looking for
578 if (s->fTimeFrom > t)
579 continue;
580
581 if (!ss)
582 ss = s;
583
584 // remember the newest schema
585 if (s->fTimeFrom > ss->fTimeFrom)
586 ss = s;
587 }
588
589 // try to find
590 for (size_t i=0; i<fData.size(); i++) {
591 HsSchema* s = fData[i];
592
593 // wrong event
594 if (event_name)
595 if (event_name_cmp(s->fEventName, event_name)!=0)
596 continue;
597
598 // schema is from after the time we are looking for
599 if (s->fTimeFrom > t)
600 continue;
601
602 if (!ss)
603 ss = s;
604
605 // remember the newest schema
606 if (s->fTimeFrom > ss->fTimeFrom)
607 ss = s;
608 }
609
610 if (debug) {
611 if (ss) {
612 printf("find_event: for time %s, returning:\n", TimeToString(t).c_str());
613 ss->print();
614 } else {
615 printf("find_event: for time %s, nothing found:\n", TimeToString(t).c_str());
616 }
617 }
618
619 return ss;
620}
621
623// Sql interface class //
625
626class SqlBase
627{
628public:
629 int fDebug = 0;
630 bool fIsConnected = false;
632
633 SqlBase() { // ctor
634 };
635
636 virtual ~SqlBase() { // dtor
637 // confirm that the destructor of the concrete class
638 // disconnected the database
639 assert(!fIsConnected);
640 fDebug = 0;
641 fIsConnected = false;
642 }
643
644 virtual int Connect(const char* path) = 0;
645 virtual int Disconnect() = 0;
646 virtual bool IsConnected() = 0;
647
648 virtual int ListTables(std::vector<std::string> *plist) = 0;
649 virtual int ListColumns(const char* table_name, std::vector<std::string> *plist) = 0;
650
651 // sql commands
652 virtual int Exec(const char* table_name, const char* sql) = 0;
653 virtual int ExecDisconnected(const char* table_name, const char* sql) = 0;
654
655 // queries
656 virtual int Prepare(const char* table_name, const char* sql) = 0;
657 virtual int Step() = 0;
658 virtual const char* GetText(int column) = 0;
659 virtual time_t GetTime(int column) = 0;
660 virtual double GetDouble(int column) = 0;
661 virtual int Finalize() = 0;
662
663 // transactions
664 virtual int OpenTransaction(const char* table_name) = 0;
665 virtual int CommitTransaction(const char* table_name) = 0;
666 virtual int RollbackTransaction(const char* table_name) = 0;
667
668 // data types
669 virtual const char* ColumnType(int midas_tid) = 0;
670 virtual bool TypesCompatible(int midas_tid, const char* sql_type) = 0;
671
672 // string quoting
673 virtual std::string QuoteString(const char* s) = 0; // quote text string
674 virtual std::string QuoteId(const char* s) = 0; // quote identifier, such as table or column name
675};
676
678// Schema concrete classes //
680
681class HsSqlSchema : public HsSchema
682{
683public:
684
686 std::string fTableName;
687 std::vector<std::string> fColumnNames;
688 std::vector<std::string> fColumnTypes;
689 std::vector<bool> fColumnInactive;
690
691public:
692
693 HsSqlSchema() // ctor
694 {
695 // empty
696 }
697
698 ~HsSqlSchema() // dtor
699 {
700 assert(get_transaction_count() == 0);
701 }
702
704 void print(bool print_tags = true) const;
708 int close_transaction();
710 int close() { return close_transaction(); }
711 int write_event(const time_t t, const char* data, const size_t data_size);
712 int match_event_var(const char* event_name, const char* var_name, const int var_index);
713 int read_last_written(const time_t timestamp,
714 const int debug,
715 time_t* last_written);
716 int read_data(const time_t start_time,
717 const time_t end_time,
718 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
719 const int debug,
720 std::vector<time_t>& last_time,
722
723private:
724 // Sqlite uses a transaction per table; MySQL uses a single transaction for all tables.
725 // But to support future "single transaction" DBs more easily (e.g. if user wants to
726 // log to both Postgres and MySQL in future), we keep track of the transaction count
727 // per SQL engine.
729 static std::map<SqlBase*, int> gfTransactionCount;
730};
731
732std::map<SqlBase*, int> HsSqlSchema::gfTransactionCount;
733
734class HsFileSchema : public HsSchema
735{
736public:
737
738 std::string fFileName;
739 size_t fRecordSize = 0;
741 size_t fLastSize = 0;
742 int fWriterFd = -1;
745
746public:
747 off64_t fFileSizeInitial = 0; // initial file size
748 off64_t fFileSize = 0; // file size including any new data we wrote
749
750public:
751
752 HsFileSchema() // ctor
753 {
754 // empty
755 }
756
758 {
759 //printf("HsFileSchema::dtor %p!\n", this);
760 close();
761 fRecordSize = 0;
762 fDataOffset = 0;
763 fLastSize = 0;
764 fWriterFd = -1;
765 if (fRecordBuffer) {
766 free(fRecordBuffer);
768 }
770 }
771
772 void remove_inactive_columns() { /* empty */ };
773 void print(bool print_tags = true) const;
774 int flush_buffers() { return HS_SUCCESS; };
775 int close();
776 int write_event(const time_t t, const char* data, const size_t data_size);
777 int read_last_written(const time_t timestamp,
778 const int debug,
779 time_t* last_written);
780 int read_data(const time_t start_time,
781 const time_t end_time,
782 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
783 const int debug,
784 std::vector<time_t>& last_time,
786};
787
789// Print functions //
791
793{
794 size_t nv = this->fVariables.size();
795 printf("event [%s], time %s..%s, %zu variables, %zu bytes\n", this->fEventName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
796 if (print_tags)
797 for (size_t j=0; j<nv; j++)
798 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
799};
800
802{
803 size_t nv = this->fVariables.size();
804 printf("event [%s], sql_table [%s], time %s..%s, %zu variables, %zu bytes\n", this->fEventName.c_str(), this->fTableName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes);
805 if (print_tags) {
806 for (size_t j=0; j<nv; j++) {
807 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes);
808 printf(", sql_column [%s], sql_type [%s], offset %d", this->fColumnNames[j].c_str(), this->fColumnTypes[j].c_str(), this->fOffsets[j]);
809 printf(", inactive %d", (int)this->fColumnInactive[j]);
810 printf("\n");
811 }
812 }
813}
814
816{
817 size_t nv = this->fVariables.size();
818 printf("event [%s], file_name [%s], time %s..%s, %zu variables, %zu bytes, dat_offset %jd, record_size %zu\n", this->fEventName.c_str(), this->fFileName.c_str(), TimeToString(this->fTimeFrom).c_str(), TimeToString(this->fTimeTo).c_str(), nv, fNumBytes, (intmax_t)fDataOffset, fRecordSize);
819 if (print_tags) {
820 for (size_t j=0; j<nv; j++)
821 printf(" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->fVariables[j].name.c_str(), rpc_tid_name(this->fVariables[j].type), this->fVariables[j].type, this->fVariables[j].n_data, this->fVariables[j].n_bytes, this->fOffsets[j]);
822 }
823}
824
826// File functions //
828
829#ifdef HAVE_MYSQL
830
832// MYSQL/MariaDB database access //
834
835//#warning !!!HAVE_MYSQL!!!
836
837//#include <my_global.h> // my_global.h removed MySQL 8.0, MariaDB 10.2. K.O.
838#include <mysql.h>
839
840class Mysql: public SqlBase
841{
842public:
843 std::string fConnectString;
844 MYSQL* fMysql = NULL;
845
846 // query results
849 int fNumFields = 0;
850
851 // disconnected operation
852 size_t fMaxDisconnected = 0;
853 std::list<std::string> fDisconnectedBuffer;
856 int fDisconnectedLost = 0;
857
858 Mysql(); // ctor
859 ~Mysql(); // dtor
860
861 int Connect(const char* path);
862 int Disconnect();
863 bool IsConnected();
864
865 int ConnectTable(const char* table_name);
866
867 int ListTables(std::vector<std::string> *plist);
868 int ListColumns(const char* table_name, std::vector<std::string> *plist);
869
870 int Exec(const char* table_name, const char* sql);
871 int ExecDisconnected(const char* table_name, const char* sql);
872
873 int Prepare(const char* table_name, const char* sql);
874 int Step();
875 const char* GetText(int column);
876 time_t GetTime(int column);
877 double GetDouble(int column);
878 int Finalize();
879
880 int OpenTransaction(const char* table_name);
881 int CommitTransaction(const char* table_name);
882 int RollbackTransaction(const char* table_name);
883
884 const char* ColumnType(int midas_tid);
885 bool TypesCompatible(int midas_tid, const char* sql_type);
886
887 std::string QuoteId(const char* s);
888 std::string QuoteString(const char* s);
889};
890
891Mysql::Mysql() // ctor
892{
893 fMysql = NULL;
894 fResult = NULL;
895 fRow = NULL;
896 fNumFields = 0;
897 fMaxDisconnected = 1000;
898 fNextReconnect = 0;
901 fTransactionPerTable = false;
902}
903
904Mysql::~Mysql() // dtor
905{
906 Disconnect();
907 fMysql = NULL;
908 fResult = NULL;
909 fRow = NULL;
910 fNumFields = 0;
911 if (fDisconnectedBuffer.size() > 0) {
912 cm_msg(MINFO, "Mysql::~Mysql", "Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
914 }
915}
916
917int Mysql::Connect(const char* connect_string)
918{
919 if (fIsConnected)
920 Disconnect();
921
922 fConnectString = connect_string;
923
924 if (fDebug) {
925 cm_msg(MINFO, "Mysql::Connect", "Connecting to Mysql database specified by \'%s\'", connect_string);
927 }
928
929 std::string host_name;
930 std::string user_name;
931 std::string user_password;
932 std::string db_name;
933 int tcp_port = 0;
934 std::string unix_socket;
935 std::string buffer;
936
937 FILE* fp = fopen(connect_string, "r");
938 if (!fp) {
939 cm_msg(MERROR, "Mysql::Connect", "Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
940 return DB_FILE_ERROR;
941 }
942
943 while (1) {
944 char buf[256];
945 char* s = fgets(buf, sizeof(buf), fp);
946 if (!s)
947 break; // EOF
948
949 char*ss;
950 // kill trailing \n and \r
951 ss = strchr(s, '\n');
952 if (ss) *ss = 0;
953 ss = strchr(s, '\r');
954 if (ss) *ss = 0;
955
956 //printf("line [%s]\n", s);
957
958 if (strncasecmp(s, "server=", 7)==0)
959 host_name = skip_spaces(s + 7);
960 if (strncasecmp(s, "port=", 5)==0)
961 tcp_port = atoi(skip_spaces(s + 5));
962 if (strncasecmp(s, "database=", 9)==0)
963 db_name = skip_spaces(s + 9);
964 if (strncasecmp(s, "socket=", 7)==0)
965 unix_socket = skip_spaces(s + 7);
966 if (strncasecmp(s, "user=", 5)==0)
967 user_name = skip_spaces(s + 5);
968 if (strncasecmp(s, "password=", 9)==0)
969 user_password = skip_spaces(s + 9);
970 if (strncasecmp(s, "buffer=", 7)==0)
971 buffer = skip_spaces(s + 7);
972 }
973
974 fclose(fp);
975
976 int buffer_int = atoi(buffer.c_str());
977
978 if (buffer_int > 0 && buffer_int < 1000000)
980
981 if (fDebug)
982 printf("Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
983
984 if (!fMysql) {
986 if (!fMysql) {
987 return DB_FILE_ERROR;
988 }
989 }
990
992
993 if (mysql_real_connect(fMysql, host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
994 cm_msg(MERROR, "Mysql::Connect", "mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", mysql_errno(fMysql), mysql_error(fMysql));
995 Disconnect();
996 return DB_FILE_ERROR;
997 }
998
999 int status;
1000
1001 // FIXME:
1002 //my_bool reconnect = 0;
1003 //mysql_options(&mysql, MYSQL_OPT_RECONNECT, &reconnect);
1004
1005 status = Exec("(notable)", "SET SESSION sql_mode='ANSI'");
1006 if (status != DB_SUCCESS) {
1007 cm_msg(MERROR, "Mysql::Connect", "Cannot set ANSI mode, nothing will work");
1008 Disconnect();
1009 return DB_FILE_ERROR;
1010 }
1011
1012 if (fDebug) {
1013 cm_msg(MINFO, "Mysql::Connect", "Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1015 }
1016
1017 fIsConnected = true;
1018
1019 int count = 0;
1020 while (fDisconnectedBuffer.size() > 0) {
1021 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1022 if (status != DB_SUCCESS) {
1023 return status;
1024 }
1025 fDisconnectedBuffer.pop_front();
1026 count++;
1027 }
1028
1029 if (count > 0) {
1030 cm_msg(MINFO, "Mysql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1032 }
1033
1034 assert(fDisconnectedBuffer.size() == 0);
1036
1037 return DB_SUCCESS;
1038}
1039
1040int Mysql::Disconnect()
1041{
1042 if (fRow) {
1043 // FIXME: mysql_free_result(fResult);
1044 }
1045
1046 if (fResult)
1048
1049 if (fMysql)
1051
1052 fMysql = NULL;
1053 fResult = NULL;
1054 fRow = NULL;
1055
1056 fIsConnected = false;
1057 return DB_SUCCESS;
1058}
1059
1060bool Mysql::IsConnected()
1061{
1062 return fIsConnected;
1063}
1064
1065int Mysql::OpenTransaction(const char* table_name)
1066{
1067 return Exec(table_name, "START TRANSACTION");
1068 return DB_SUCCESS;
1069}
1070
1071int Mysql::CommitTransaction(const char* table_name)
1072{
1073 Exec(table_name, "COMMIT");
1074 return DB_SUCCESS;
1075}
1076
1077int Mysql::RollbackTransaction(const char* table_name)
1078{
1079 Exec(table_name, "ROLLBACK");
1080 return DB_SUCCESS;
1081}
1082
1083int Mysql::ListTables(std::vector<std::string> *plist)
1084{
1085 if (!fIsConnected)
1086 return DB_FILE_ERROR;
1087
1088 if (fDebug)
1089 printf("Mysql::ListTables!\n");
1090
1091 int status;
1092
1094
1095 if (fResult == NULL) {
1096 cm_msg(MERROR, "Mysql::ListTables", "mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1097 return DB_FILE_ERROR;
1098 }
1099
1101
1102 while (1) {
1103 status = Step();
1104 if (status != DB_SUCCESS)
1105 break;
1106 std::string tn = GetText(0);
1107 plist->push_back(tn);
1108 };
1109
1110 status = Finalize();
1111
1112 return DB_SUCCESS;
1113}
1114
1115int Mysql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1116{
1117 if (!fIsConnected)
1118 return DB_FILE_ERROR;
1119
1120 if (fDebug)
1121 printf("Mysql::ListColumns for table \'%s\'\n", table_name);
1122
1123 int status;
1124
1125 std::string cmd;
1126 cmd += "SHOW COLUMNS FROM ";
1127 cmd += QuoteId(table_name);
1128 cmd += ";";
1129
1130 status = Prepare(table_name, cmd.c_str());
1131 if (status != DB_SUCCESS)
1132 return status;
1133
1135
1136 while (1) {
1137 status = Step();
1138 if (status != DB_SUCCESS)
1139 break;
1140 std::string cn = GetText(0);
1141 std::string ct = GetText(1);
1142 plist->push_back(cn);
1143 plist->push_back(ct);
1144 //printf("cn [%s]\n", cn.c_str());
1145 //for (int i=0; i<fNumFields; i++)
1146 //printf(" field[%d]: [%s]\n", i, GetText(i));
1147 };
1148
1149 status = Finalize();
1150
1151 return DB_SUCCESS;
1152}
1153
1154int Mysql::Exec(const char* table_name, const char* sql)
1155{
1156 if (fDebug)
1157 printf("Mysql::Exec(%s, %s)\n", table_name, sql);
1158
1159 // FIXME: match Sqlite::Exec() return values:
1160 // return values:
1161 // DB_SUCCESS
1162 // DB_FILE_ERROR: not connected
1163 // DB_KEY_EXIST: "table already exists"
1164
1165 if (!fMysql)
1166 return DB_FILE_ERROR;
1167
1168 assert(fMysql);
1169 assert(fResult == NULL); // there should be no unfinalized queries
1170 assert(fRow == NULL);
1171
1172 if (mysql_query(fMysql, sql)) {
1173 if (mysql_errno(fMysql) == 1050) { // "Table already exists"
1174 return DB_KEY_EXIST;
1175 }
1176 if (mysql_errno(fMysql) == 1146) { // "Table does not exist"
1177 return DB_FILE_ERROR;
1178 }
1179 cm_msg(MERROR, "Mysql::Exec", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1180 if (mysql_errno(fMysql) == 1060) // "Duplicate column name"
1181 return DB_KEY_EXIST;
1182 if (mysql_errno(fMysql) == 2006) { // "MySQL server has gone away"
1183 Disconnect();
1184 return ExecDisconnected(table_name, sql);
1185 }
1186 return DB_FILE_ERROR;
1187 }
1188
1189 return DB_SUCCESS;
1190}
1191
1192int Mysql::ExecDisconnected(const char* table_name, const char* sql)
1193{
1194 if (fDebug)
1195 printf("Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1196
1198 fDisconnectedBuffer.push_back(sql);
1199 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1200 cm_msg(MERROR, "Mysql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1201 }
1202 } else {
1204 }
1205
1206 time_t now = time(NULL);
1207
1208 if (fNextReconnect == 0 || now >= fNextReconnect) {
1209 int status = Connect(fConnectString.c_str());
1210 if (status == DB_SUCCESS) {
1211 fNextReconnect = 0;
1213 } else {
1214 if (fNextReconnectDelaySec == 0) {
1216 } else if (fNextReconnectDelaySec < 10*60) {
1218 }
1219 if (fDebug) {
1220 cm_msg(MINFO, "Mysql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1222 }
1224 }
1225 }
1226
1227 return DB_SUCCESS;
1228}
1229
1230int Mysql::Prepare(const char* table_name, const char* sql)
1231{
1232 if (fDebug)
1233 printf("Mysql::Prepare(%s, %s)\n", table_name, sql);
1234
1235 if (!fMysql)
1236 return DB_FILE_ERROR;
1237
1238 assert(fMysql);
1239 assert(fResult == NULL); // there should be no unfinalized queries
1240 assert(fRow == NULL);
1241
1242 // if (mysql_query(fMysql, sql)) {
1243 // cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1244 // return DB_FILE_ERROR;
1245 //}
1246
1247 // Check if the connection to MySQL timed out; fix from B. Smith
1248 int status = mysql_query(fMysql, sql);
1249 if (status) {
1250 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1251 // "MySQL server has gone away" or "Lost connection to MySQL server during query"
1252 status = Connect(fConnectString.c_str());
1253 if (status == DB_SUCCESS) {
1254 // Retry after reconnecting
1256 } else {
1257 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql, status);
1258 return DB_FILE_ERROR;
1259 }
1260 }
1261 if (status) {
1262 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1263 return DB_FILE_ERROR;
1264 }
1265 cm_msg(MINFO, "Mysql::Prepare", "Reconnected to MySQL after long inactivity.");
1266 }
1267
1269 //fResult = mysql_use_result(fMysql); // cannot use this because it blocks writing into table
1270
1271 if (!fResult) {
1272 cm_msg(MERROR, "Mysql::Prepare", "mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1273 return DB_FILE_ERROR;
1274 }
1275
1277
1278 //printf("num fields %d\n", fNumFields);
1279
1280 return DB_SUCCESS;
1281}
1282
1283int Mysql::Step()
1284{
1285 if (/* DISABLES CODE */ (0) && fDebug)
1286 printf("Mysql::Step()\n");
1287
1288 assert(fMysql);
1289 assert(fResult);
1290
1292
1293 if (fRow)
1294 return DB_SUCCESS;
1295
1296 if (mysql_errno(fMysql) == 0)
1297 return DB_NO_MORE_SUBKEYS;
1298
1299 cm_msg(MERROR, "Mysql::Step", "mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1300
1301 return DB_FILE_ERROR;
1302}
1303
1304const char* Mysql::GetText(int column)
1305{
1306 assert(fMysql);
1307 assert(fResult);
1308 assert(fRow);
1309 assert(fNumFields > 0);
1310 assert(column >= 0);
1311 assert(column < fNumFields);
1312 if (fRow[column] == NULL)
1313 return "";
1314 return fRow[column];
1315}
1316
1317double Mysql::GetDouble(int column)
1318{
1319 return atof(GetText(column));
1320}
1321
1322time_t Mysql::GetTime(int column)
1323{
1324 return strtoul(GetText(column), NULL, 0);
1325}
1326
1327int Mysql::Finalize()
1328{
1329 assert(fMysql);
1330 assert(fResult);
1331
1333 fResult = NULL;
1334 fRow = NULL;
1335 fNumFields = 0;
1336
1337 return DB_SUCCESS;
1338}
1339
1340const char* Mysql::ColumnType(int midas_tid)
1341{
1342 assert(midas_tid>=0);
1343 assert(midas_tid<TID_LAST);
1344 return sql_type_mysql[midas_tid];
1345}
1346
1347bool Mysql::TypesCompatible(int midas_tid, const char* sql_type)
1348{
1349 if (/* DISABLES CODE */ (0))
1350 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1351
1352 //if (sql2midasType_mysql(sql_type) == midas_tid)
1353 // return true;
1354
1355 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1356 return true;
1357
1358 // permit writing FLOAT into DOUBLE
1359 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double")==0)
1360 return true;
1361
1362 // T2K quirk!
1363 // permit writing BYTE into signed tinyint
1364 if (midas_tid==TID_BYTE && strcmp(sql_type, "tinyint")==0)
1365 return true;
1366
1367 // T2K quirk!
1368 // permit writing WORD into signed tinyint
1369 if (midas_tid==TID_WORD && strcmp(sql_type, "tinyint")==0)
1370 return true;
1371
1372 // mysql quirk!
1373 //if (midas_tid==TID_DWORD && strcmp(sql_type, "int(10) unsigned")==0)
1374 // return true;
1375
1376 if (/* DISABLES CODE */ (0))
1377 printf("type mismatch!\n");
1378
1379 return false;
1380}
1381
1382std::string Mysql::QuoteId(const char* s)
1383{
1384 std::string q;
1385 q += "`";
1386 q += s;
1387 q += "`";
1388 return q;
1389}
1390
1391std::string Mysql::QuoteString(const char* s)
1392{
1393 std::string q;
1394 q += "\'";
1395 q += s;
1396#if 0
1397 while (int c = *s++) {
1398 if (c == '\'') {
1399 q += "\\'";
1400 } if (c == '"') {
1401 q += "\\\"";
1402 } else if (isprint(c)) {
1403 q += c;
1404 } else {
1405 char buf[256];
1406 sprintf(buf, "\\\\x%02x", c&0xFF);
1407 q += buf;
1408 }
1409 }
1410#endif
1411 q += "\'";
1412 return q;
1413}
1414
1415#endif // HAVE_MYSQL
1416
1417#ifdef HAVE_PGSQL
1418
1420// PostgreSQL database access //
1422
1423//#warning !!!HAVE_PGSQL!!!
1424
1425#include <libpq-fe.h>
1426
1427class Pgsql: public SqlBase
1428{
1429public:
1430 std::string fConnectString;
1431 int fDownsample = 0;
1432 PGconn* fPgsql = NULL;
1433
1434 // query results
1436 int fNumFields = 0;
1437 int fRow = 0;
1438
1439 // disconnected operation
1440 size_t fMaxDisconnected = 0;
1441 std::list<std::string> fDisconnectedBuffer;
1443 int fNextReconnectDelaySec = 0;
1444 int fDisconnectedLost = 0;
1445
1446 Pgsql(); // ctor
1447 ~Pgsql(); // dtor
1448
1449 int Connect(const char* path);
1450 int Disconnect();
1451 bool IsConnected();
1452
1453 int ConnectTable(const char* table_name);
1454
1455 int ListTables(std::vector<std::string> *plist);
1456 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1457
1458 int Exec(const char* table_name, const char* sql);
1459 int ExecDisconnected(const char* table_name, const char* sql);
1460
1461 int Prepare(const char* table_name, const char* sql);
1462 std::string BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints, const char* table_name, const char* column_name);
1463 int Step();
1464 const char* GetText(int column);
1465 time_t GetTime(int column);
1466 double GetDouble(int column);
1467 int Finalize();
1468
1469 int OpenTransaction(const char* table_name);
1470 int CommitTransaction(const char* table_name);
1471 int RollbackTransaction(const char* table_name);
1472
1473 const char* ColumnType(int midas_tid);
1474 bool TypesCompatible(int midas_tid, const char* sql_type);
1475
1476 std::string QuoteId(const char* s);
1477 std::string QuoteString(const char* s);
1478};
1479
1480Pgsql::Pgsql() // ctor
1481{
1482 fPgsql = NULL;
1483 fDownsample = 0;
1484 fResult = NULL;
1485 fRow = -1;
1486 fNumFields = 0;
1487 fMaxDisconnected = 1000;
1488 fNextReconnect = 0;
1491 fTransactionPerTable = false;
1492}
1493
1494Pgsql::~Pgsql() // dtor
1495{
1496 Disconnect();
1497 if(fResult)
1499 fRow = -1;
1500 fNumFields = 0;
1501 if (fDisconnectedBuffer.size() > 0) {
1502 cm_msg(MINFO, "Pgsql::~Pgsql", "Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
1504 }
1505}
1506
1507int Pgsql::Connect(const char* connect_string)
1508{
1509 if (fIsConnected)
1510 Disconnect();
1511
1512 fConnectString = connect_string;
1513
1514 if (fDebug) {
1515 cm_msg(MINFO, "Pgsql::Connect", "Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1517 }
1518
1519 std::string host_name;
1520 std::string user_name;
1521 std::string user_password;
1522 std::string db_name;
1523 std::string tcp_port;
1524 std::string unix_socket;
1525 std::string buffer;
1526
1527 FILE* fp = fopen(connect_string, "r");
1528 if (!fp) {
1529 cm_msg(MERROR, "Pgsql::Connect", "Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1530 return DB_FILE_ERROR;
1531 }
1532
1533 while (1) {
1534 char buf[256];
1535 char* s = fgets(buf, sizeof(buf), fp);
1536 if (!s)
1537 break; // EOF
1538
1539 char*ss;
1540 // kill trailing \n and \r
1541 ss = strchr(s, '\n');
1542 if (ss) *ss = 0;
1543 ss = strchr(s, '\r');
1544 if (ss) *ss = 0;
1545
1546 //printf("line [%s]\n", s);
1547
1548 if (strncasecmp(s, "server=", 7)==0)
1549 host_name = skip_spaces(s + 7);
1550 if (strncasecmp(s, "port=", 5)==0)
1551 tcp_port = skip_spaces(s + 5);
1552 if (strncasecmp(s, "database=", 9)==0)
1553 db_name = skip_spaces(s + 9);
1554 if (strncasecmp(s, "socket=", 7)==0)
1555 unix_socket = skip_spaces(s + 7);
1556 if (strncasecmp(s, "user=", 5)==0)
1557 user_name = skip_spaces(s + 5);
1558 if (strncasecmp(s, "password=", 9)==0)
1559 user_password = skip_spaces(s + 9);
1560 if (strncasecmp(s, "buffer=", 7)==0)
1561 buffer = skip_spaces(s + 7);
1562 }
1563
1564 fclose(fp);
1565
1566 int buffer_int = atoi(buffer.c_str());
1567
1568 if (buffer_int > 0 && buffer_int < 1000000)
1570
1571 if (fDebug)
1572 printf("Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1573
1574 fPgsql = PQsetdbLogin(host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1575 if (PQstatus(fPgsql) != CONNECTION_OK) {
1576 std::string msg(PQerrorMessage(fPgsql));
1577 msg.erase(std::remove(msg.begin(), msg.end(), '\n'), msg.end());
1578 cm_msg(MERROR, "Pgsql::Connect", "PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", msg.c_str());
1579 Disconnect();
1580 return DB_FILE_ERROR;
1581 }
1582
1583 int status;
1584
1585 if (fDebug) {
1586 cm_msg(MINFO, "Pgsql::Connect", "Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1588 }
1589
1590 fIsConnected = true;
1591
1592 int count = 0;
1593 while (fDisconnectedBuffer.size() > 0) {
1594 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1595 if (status != DB_SUCCESS) {
1596 return status;
1597 }
1598 fDisconnectedBuffer.pop_front();
1599 count++;
1600 }
1601
1602 if (count > 0) {
1603 cm_msg(MINFO, "Pgsql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1605 }
1606
1607 assert(fDisconnectedBuffer.size() == 0);
1609
1610 if (fDownsample) {
1611 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb';");
1612
1613 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1614 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB extension not installed");
1615 return DB_FILE_ERROR;
1616 }
1617 Finalize();
1618
1619 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb_toolkit';");
1620
1621 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1622 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB_toolkit extension not installed");
1623 return DB_FILE_ERROR;
1624 }
1625 Finalize();
1626
1627 cm_msg(MINFO, "Pgsql::Connect", "TimescaleDB extensions found - downsampling enabled");
1628 }
1629
1630 return DB_SUCCESS;
1631}
1632
1633int Pgsql::Disconnect()
1634{
1635 if (fPgsql)
1637
1638 fPgsql = NULL;
1639 fRow = -1;
1640
1641 fIsConnected = false;
1642 return DB_SUCCESS;
1643}
1644
1645bool Pgsql::IsConnected()
1646{
1647 return fIsConnected;
1648}
1649
1650int Pgsql::OpenTransaction(const char* table_name)
1651{
1652 return Exec(table_name, "BEGIN TRANSACTION;");
1653}
1654
1655int Pgsql::CommitTransaction(const char* table_name)
1656{
1657 return Exec(table_name, "COMMIT;");
1658}
1659
1660int Pgsql::RollbackTransaction(const char* table_name)
1661{
1662 return Exec(table_name, "ROLLBACK;");
1663}
1664
1665int Pgsql::ListTables(std::vector<std::string> *plist)
1666{
1667 if (!fIsConnected)
1668 return DB_FILE_ERROR;
1669
1670 if (fDebug)
1671 printf("Pgsql::ListTables!\n");
1672
1673 int status = Prepare("pg_tables", "select tablename from pg_tables where schemaname = 'public';");
1674
1675 if (status != DB_SUCCESS) {
1676 cm_msg(MERROR, "Pgsql::ListTables", "error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1677 return DB_FILE_ERROR;
1678 }
1679
1680 while (1) {
1681 if (Step() != DB_SUCCESS)
1682 break;
1683 std::string tn = GetText(0);
1684 plist->push_back(tn);
1685 };
1686
1687 Finalize();
1688
1689 return DB_SUCCESS;
1690}
1691
1692int Pgsql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1693{
1694 if (!fIsConnected)
1695 return DB_FILE_ERROR;
1696
1697 if (fDebug)
1698 printf("Pgsql::ListColumns for table \'%s\'\n", table_name);
1699
1700 std::string cmd;
1701 cmd += "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1702 cmd += QuoteString(table_name);
1703 cmd += ";";
1704
1705 int status = Prepare(table_name, cmd.c_str());
1706 if (status != DB_SUCCESS)
1707 return status;
1708
1710
1711 while (1) {
1712 if (Step() != DB_SUCCESS)
1713 break;
1714 std::string cn = GetText(0);
1715 std::string ct = GetText(1);
1716 plist->push_back(cn);
1717 plist->push_back(ct);
1718 };
1719
1720 Finalize();
1721
1722 return DB_SUCCESS;
1723}
1724
1725int Pgsql::Exec(const char* table_name, const char* sql)
1726{
1727 if (fDebug)
1728 printf("Pgsql::Exec(%s, %s)\n", table_name, sql);
1729
1730 if (!fPgsql)
1731 return DB_FILE_ERROR;
1732
1733 assert(fPgsql);
1734 assert(fRow == -1);
1735
1738 if(err != PGRES_TUPLES_OK) {
1739 if(err == PGRES_FATAL_ERROR) {
1740 // handle fatal error
1741 if(strstr(PQresultErrorMessage(fResult), "already exists"))
1742 return DB_KEY_EXIST;
1743 else return DB_FILE_ERROR;
1744 }
1745
1747 Disconnect();
1748 return ExecDisconnected(table_name, sql);
1749 }
1750 }
1751
1752 return DB_SUCCESS;
1753}
1754
1755int Pgsql::ExecDisconnected(const char* table_name, const char* sql)
1756{
1757 if (fDebug)
1758 printf("Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1759
1761 fDisconnectedBuffer.push_back(sql);
1762 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1763 cm_msg(MERROR, "Pgsql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1764 }
1765 } else {
1767 }
1768
1769 time_t now = time(NULL);
1770
1771 if (fNextReconnect == 0 || now >= fNextReconnect) {
1772 int status = Connect(fConnectString.c_str());
1773 if (status == DB_SUCCESS) {
1774 fNextReconnect = 0;
1776 } else {
1777 if (fNextReconnectDelaySec == 0) {
1779 } else if (fNextReconnectDelaySec < 10*60) {
1781 }
1782 if (fDebug) {
1783 cm_msg(MINFO, "Pgsql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1785 }
1787 }
1788 }
1789
1790 return DB_SUCCESS;
1791}
1792
1793int Pgsql::Prepare(const char* table_name, const char* sql)
1794{
1795 if (fDebug)
1796 printf("Pgsql::Prepare(%s, %s)\n", table_name, sql);
1797
1798 if (!fPgsql)
1799 return DB_FILE_ERROR;
1800
1801 assert(fPgsql);
1802 //assert(fResult==NULL);
1803 assert(fRow == -1);
1804
1806 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1807 // lost connection to server
1808 int status = Connect(fConnectString.c_str());
1809 if (status == DB_SUCCESS) {
1810 // Retry after reconnecting
1812 } else {
1813 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql, status);
1814 return DB_FILE_ERROR;
1815 }
1816 if (status) {
1817 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1818 return DB_FILE_ERROR;
1819 }
1820 cm_msg(MINFO, "Pgsql::Prepare", "Reconnected to PostgreSQL after long inactivity.");
1821 }
1822
1824
1825 return DB_SUCCESS;
1826}
1827
1828std::string Pgsql::BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints,
1829 const char* table_name, const char* column_name)
1830{
1831 std::string cmd;
1832 cmd += "SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1833
1834 cmd += " FROM unnest(( SELECT lttb";
1835 cmd += "(_t_time, ";
1836 cmd += column_name;
1837 cmd += ", ";
1838 cmd += std::to_string(npoints);
1839 cmd += ") ";
1840 cmd += "FROM ";
1841 cmd += QuoteId(table_name);
1842 cmd += " WHERE _t_time BETWEEN ";
1843 cmd += "to_timestamp(";
1844 cmd += TimeToString(start_time);
1845 cmd += ") AND to_timestamp(";
1846 cmd += TimeToString(end_time);
1847 cmd += ") )) ORDER BY time;";
1848
1849 return cmd;
1850}
1851
1852int Pgsql::Step()
1853{
1854 assert(fPgsql);
1855 assert(fResult);
1856
1857 fRow++;
1858
1859 if (fRow == PQntuples(fResult))
1860 return DB_NO_MORE_SUBKEYS;
1861
1862 return DB_SUCCESS;
1863}
1864
1865const char* Pgsql::GetText(int column)
1866{
1867 assert(fPgsql);
1868 assert(fResult);
1869 assert(fNumFields > 0);
1870 assert(column >= 0);
1871 assert(column < fNumFields);
1872
1873 return PQgetvalue(fResult, fRow, column);
1874}
1875
1876double Pgsql::GetDouble(int column)
1877{
1878 return atof(GetText(column));
1879}
1880
1881time_t Pgsql::GetTime(int column)
1882{
1883 return strtoul(GetText(column), NULL, 0);
1884}
1885
1886int Pgsql::Finalize()
1887{
1888 assert(fPgsql);
1889 assert(fResult);
1890
1891 fRow = -1;
1892 fNumFields = 0;
1893
1894 return DB_SUCCESS;
1895}
1896
1897const char* Pgsql::ColumnType(int midas_tid)
1898{
1899 assert(midas_tid>=0);
1900 assert(midas_tid<TID_LAST);
1901 return sql_type_pgsql[midas_tid];
1902}
1903
1904bool Pgsql::TypesCompatible(int midas_tid, const char* sql_type)
1905{
1906 if (/* DISABLES CODE */ (0))
1907 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1908
1909 //if (sql2midasType_mysql(sql_type) == midas_tid)
1910 // return true;
1911
1912 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1913 return true;
1914
1915 // permit writing FLOAT into DOUBLE
1916 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double precision")==0)
1917 return true;
1918
1919 // T2K quirk!
1920 // permit writing BYTE into signed tinyint
1921 if (midas_tid==TID_BYTE && strcmp(sql_type, "integer")==0)
1922 return true;
1923
1924 // T2K quirk!
1925 // permit writing WORD into signed tinyint
1926 if (midas_tid==TID_WORD && strcmp(sql_type, "integer")==0)
1927 return true;
1928
1929 if (/* DISABLES CODE */ (0))
1930 printf("type mismatch!\n");
1931
1932 return false;
1933}
1934
1935std::string Pgsql::QuoteId(const char* s)
1936{
1937 std::string q;
1938 q += '"';
1939 q += s;
1940 q += '"';
1941 return q;
1942}
1943
1944std::string Pgsql::QuoteString(const char* s)
1945{
1946 std::string q;
1947 q += '\'';
1948 q += s;
1949 q += '\'';
1950 return q;
1951}
1952
1953#endif // HAVE_PGSQL
1954
1955#ifdef HAVE_SQLITE
1956
1958// SQLITE database access //
1960
1961#include <sqlite3.h>
1962
1963typedef std::map<std::string, sqlite3*> DbMap;
1964
1965class Sqlite: public SqlBase
1966{
1967public:
1968 std::string fPath;
1969
1970 DbMap fMap;
1971
1972 // temporary storage of query data
1975
1976 Sqlite(); // ctor
1977 ~Sqlite(); // dtor
1978
1979 int Connect(const char* path);
1980 int Disconnect();
1981 bool IsConnected();
1982
1983 int ConnectTable(const char* table_name);
1984 sqlite3* GetTable(const char* table_name);
1985
1986 int ListTables(std::vector<std::string> *plist);
1987 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1988
1989 int Exec(const char* table_name, const char* sql);
1990 int ExecDisconnected(const char* table_name, const char* sql);
1991
1992 int Prepare(const char* table_name, const char* sql);
1993 int Step();
1994 const char* GetText(int column);
1995 time_t GetTime(int column);
1996 double GetDouble(int column);
1997 int Finalize();
1998
1999 int OpenTransaction(const char* table_name);
2000 int CommitTransaction(const char* table_name);
2001 int RollbackTransaction(const char* table_name);
2002
2003 const char* ColumnType(int midas_tid);
2004 bool TypesCompatible(int midas_tid, const char* sql_type);
2005
2006 std::string QuoteId(const char* s);
2007 std::string QuoteString(const char* s);
2008};
2009
2010std::string Sqlite::QuoteId(const char* s)
2011{
2012 std::string q;
2013 q += "\"";
2014 q += s;
2015 q += "\"";
2016 return q;
2017}
2018
2019std::string Sqlite::QuoteString(const char* s)
2020{
2021 std::string q;
2022 q += "\'";
2023 q += s;
2024 q += "\'";
2025 return q;
2026}
2027
2028const char* Sqlite::ColumnType(int midas_tid)
2029{
2030 assert(midas_tid>=0);
2031 assert(midas_tid<TID_LAST);
2032 return sql_type_sqlite[midas_tid];
2033}
2034
2035bool Sqlite::TypesCompatible(int midas_tid, const char* sql_type)
2036{
2037 if (0)
2038 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
2039
2040 //if (sql2midasType_sqlite(sql_type) == midas_tid)
2041 // return true;
2042
2043 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
2044 return true;
2045
2046 // permit writing FLOAT into DOUBLE
2047 if (midas_tid==TID_FLOAT && strcasecmp(sql_type, "double")==0)
2048 return true;
2049
2050 return false;
2051}
2052
2053const char* Sqlite::GetText(int column)
2054{
2055 return (const char*)sqlite3_column_text(fTempStmt, column);
2056}
2057
2058time_t Sqlite::GetTime(int column)
2059{
2061}
2062
2063double Sqlite::GetDouble(int column)
2064{
2066}
2067
2068Sqlite::Sqlite() // ctor
2069{
2070 fIsConnected = false;
2071 fTempDB = NULL;
2072 fTempStmt = NULL;
2073 fDebug = 0;
2074}
2075
2076Sqlite::~Sqlite() // dtor
2077{
2078 Disconnect();
2079}
2080
2081const char* xsqlite3_errstr(sqlite3* db, int errcode)
2082{
2083 //return sqlite3_errstr(errcode);
2084 return sqlite3_errmsg(db);
2085}
2086
2087int Sqlite::ConnectTable(const char* table_name)
2088{
2089 std::string fname = fPath + "mh_" + table_name + ".sqlite3";
2090
2091 sqlite3* db = NULL;
2092
2093 int status = sqlite3_open(fname.c_str(), &db);
2094
2095 if (status != SQLITE_OK) {
2096 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(), status, xsqlite3_errstr(db, status));
2098 db = NULL;
2099 return DB_FILE_ERROR;
2100 }
2101
2102#if SQLITE_VERSION_NUMBER >= 3006020
2104 if (status != SQLITE_OK) {
2105 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2106 }
2107#else
2108#warning Missing sqlite3_extended_result_codes()!
2109#endif
2110
2111 fMap[table_name] = db;
2112
2113 Exec(table_name, "PRAGMA journal_mode=persist;");
2114 Exec(table_name, "PRAGMA synchronous=normal;");
2115 //Exec(table_name, "PRAGMA synchronous=off;");
2116 Exec(table_name, "PRAGMA journal_size_limit=-1;");
2117
2118 if (0) {
2119 Exec(table_name, "PRAGMA legacy_file_format;");
2120 Exec(table_name, "PRAGMA synchronous;");
2121 Exec(table_name, "PRAGMA journal_mode;");
2122 Exec(table_name, "PRAGMA journal_size_limit;");
2123 }
2124
2125#ifdef SQLITE_LIMIT_COLUMN
2126 if (0) {
2128 printf("Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2129 }
2130#endif
2131
2132 if (fDebug)
2133 cm_msg(MINFO, "Sqlite::Connect", "Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2134
2135 return DB_SUCCESS;
2136}
2137
2138sqlite3* Sqlite::GetTable(const char* table_name)
2139{
2140 sqlite3* db = fMap[table_name];
2141
2142 if (db)
2143 return db;
2144
2145 int status = ConnectTable(table_name);
2146 if (status != DB_SUCCESS)
2147 return NULL;
2148
2149 return fMap[table_name];
2150}
2151
2152int Sqlite::Connect(const char* path)
2153{
2154 if (fIsConnected)
2155 Disconnect();
2156
2157 fPath = path;
2158
2159 // add trailing '/'
2160 if (fPath.length() > 0) {
2161 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
2162 fPath += DIR_SEPARATOR_STR;
2163 }
2164
2165 if (fDebug)
2166 cm_msg(MINFO, "Sqlite::Connect", "Connected to Sqlite database in \'%s\'", fPath.c_str());
2167
2168 fIsConnected = true;
2169
2170 return DB_SUCCESS;
2171}
2172
2173int Sqlite::Disconnect()
2174{
2175 if (!fIsConnected)
2176 return DB_SUCCESS;
2177
2178 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2179 const char* table_name = iter->first.c_str();
2180 sqlite3* db = iter->second;
2181 int status = sqlite3_close(db);
2182 if (status != SQLITE_OK) {
2183 cm_msg(MERROR, "Sqlite::Disconnect", "sqlite3_close(%s) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2184 }
2185 }
2186
2187 fMap.clear();
2188
2189 fIsConnected = false;
2190
2191 return DB_SUCCESS;
2192}
2193
2194bool Sqlite::IsConnected()
2195{
2196 return fIsConnected;
2197}
2198
2199int Sqlite::OpenTransaction(const char* table_name)
2200{
2201 int status = Exec(table_name, "BEGIN TRANSACTION");
2202 return status;
2203}
2204
2205int Sqlite::CommitTransaction(const char* table_name)
2206{
2207 int status = Exec(table_name, "COMMIT TRANSACTION");
2208 return status;
2209}
2210
2211int Sqlite::RollbackTransaction(const char* table_name)
2212{
2213 int status = Exec(table_name, "ROLLBACK TRANSACTION");
2214 return status;
2215}
2216
2217int Sqlite::Prepare(const char* table_name, const char* sql)
2218{
2219 sqlite3* db = GetTable(table_name);
2220 if (!db)
2221 return DB_FILE_ERROR;
2222
2223 if (fDebug)
2224 printf("Sqlite::Prepare(%s, %s)\n", table_name, sql);
2225
2226 assert(fTempDB==NULL);
2227 fTempDB = db;
2228
2229#if SQLITE_VERSION_NUMBER >= 3006020
2231#else
2232#warning Missing sqlite3_prepare_v2()!
2234#endif
2235
2236 if (status == SQLITE_OK)
2237 return DB_SUCCESS;
2238
2239 std::string sqlstring = sql;
2240 cm_msg(MERROR, "Sqlite::Prepare", "Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, xsqlite3_errstr(db, status));
2241
2242 fTempDB = NULL;
2243
2244 return DB_FILE_ERROR;
2245}
2246
2247int Sqlite::Step()
2248{
2249 if (0 && fDebug)
2250 printf("Sqlite::Step()\n");
2251
2252 assert(fTempDB);
2253 assert(fTempStmt);
2254
2256
2257 if (status == SQLITE_DONE)
2258 return DB_NO_MORE_SUBKEYS;
2259
2260 if (status == SQLITE_ROW)
2261 return DB_SUCCESS;
2262
2263 cm_msg(MERROR, "Sqlite::Step", "sqlite3_step() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2264
2265 return DB_FILE_ERROR;
2266}
2267
2268int Sqlite::Finalize()
2269{
2270 if (0 && fDebug)
2271 printf("Sqlite::Finalize()\n");
2272
2273 assert(fTempDB);
2274 assert(fTempStmt);
2275
2277
2278 if (status != SQLITE_OK) {
2279 cm_msg(MERROR, "Sqlite::Finalize", "sqlite3_finalize() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2280
2281 fTempDB = NULL;
2282 fTempStmt = NULL; // FIXME: maybe a memory leak?
2283 return DB_FILE_ERROR;
2284 }
2285
2286 fTempDB = NULL;
2287 fTempStmt = NULL;
2288
2289 return DB_SUCCESS;
2290}
2291
2292int Sqlite::ListTables(std::vector<std::string> *plist)
2293{
2294 if (!fIsConnected)
2295 return DB_FILE_ERROR;
2296
2297 if (fDebug)
2298 printf("Sqlite::ListTables at path [%s]\n", fPath.c_str());
2299
2300 int status;
2301
2302 const char* cmd = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2303
2304 DIR *dir = opendir(fPath.c_str());
2305 if (!dir) {
2306 cm_msg(MERROR, "Sqlite::ListTables", "Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2307 return HS_FILE_ERROR;
2308 }
2309
2310 while (1) {
2311 const struct dirent* de = readdir(dir);
2312 if (!de)
2313 break;
2314
2315 const char* dn = de->d_name;
2316
2317 //if (dn[0]!='m' || dn[1]!='h')
2318 //continue;
2319
2320 const char* s;
2321
2322 s = strstr(dn, "mh_");
2323 if (!s || s!=dn)
2324 continue;
2325
2326 s = strstr(dn, ".sqlite3");
2327 if (!s || s[8]!=0)
2328 continue;
2329
2330 char table_name[256];
2331 mstrlcpy(table_name, dn+3, sizeof(table_name));
2332 // FIXME: skip names like "xxx.sqlite3~" and "xxx.sqlite3-deleted"
2333 char* ss = strstr(table_name, ".sqlite3");
2334 if (!ss)
2335 continue;
2336 *ss = 0;
2337
2338 //printf("dn [%s] tn [%s]\n", dn, table_name);
2339
2340 status = Prepare(table_name, cmd);
2341 if (status != DB_SUCCESS)
2342 continue;
2343
2344 while (1) {
2345 status = Step();
2346 if (status != DB_SUCCESS)
2347 break;
2348
2349 const char* tn = GetText(0);
2350 //printf("table [%s]\n", tn);
2351 plist->push_back(tn);
2352 }
2353
2354 status = Finalize();
2355 }
2356
2357 closedir(dir);
2358 dir = NULL;
2359
2360 return DB_SUCCESS;
2361}
2362
2363int Sqlite::ListColumns(const char* table, std::vector<std::string> *plist)
2364{
2365 if (!fIsConnected)
2366 return DB_FILE_ERROR;
2367
2368 if (fDebug)
2369 printf("Sqlite::ListColumns for table \'%s\'\n", table);
2370
2371 std::string cmd;
2372 cmd = "PRAGMA table_info(";
2373 cmd += table;
2374 cmd += ");";
2375
2376 int status;
2377
2378 status = Prepare(table, cmd.c_str());
2379 if (status != DB_SUCCESS)
2380 return status;
2381
2382 while (1) {
2383 status = Step();
2384 if (status != DB_SUCCESS)
2385 break;
2386
2387 const char* colname = GetText(1);
2388 const char* coltype = GetText(2);
2389 //printf("column [%s] [%s]\n", colname, coltype);
2390 plist->push_back(colname); // column name
2391 plist->push_back(coltype); // column type
2392 }
2393
2394 status = Finalize();
2395
2396 return DB_SUCCESS;
2397}
2398
2399static int callback_debug = 0;
2400
2401static int callback(void *NotUsed, int argc, char **argv, char **azColName){
2402 if (callback_debug) {
2403 printf("history_sqlite::callback---->\n");
2404 for (int i=0; i<argc; i++){
2405 printf("history_sqlite::callback[%d] %s = %s\n", i, azColName[i], argv[i] ? argv[i] : "NULL");
2406 }
2407 }
2408 return 0;
2409}
2410
2411int Sqlite::Exec(const char* table_name, const char* sql)
2412{
2413 // return values:
2414 // DB_SUCCESS
2415 // DB_FILE_ERROR: not connected
2416 // DB_KEY_EXIST: "table already exists"
2417
2418 if (!fIsConnected)
2419 return DB_FILE_ERROR;
2420
2421 sqlite3* db = GetTable(table_name);
2422 if (!db)
2423 return DB_FILE_ERROR;
2424
2425 if (fDebug)
2426 printf("Sqlite::Exec(%s, %s)\n", table_name, sql);
2427
2428 int status;
2429
2430 callback_debug = fDebug;
2431 char* errmsg = NULL;
2432
2434 if (status != SQLITE_OK) {
2435 if (status == SQLITE_ERROR && strstr(errmsg, "duplicate column name"))
2436 return DB_KEY_EXIST;
2437 if (status == SQLITE_ERROR && strstr(errmsg, "already exists"))
2438 return DB_KEY_EXIST;
2439 std::string sqlstring = sql;
2440 cm_msg(MERROR, "Sqlite::Exec", "Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, errmsg);
2442 return DB_FILE_ERROR;
2443 }
2444
2445 return DB_SUCCESS;
2446}
2447
2448int Sqlite::ExecDisconnected(const char* table_name, const char* sql)
2449{
2450 cm_msg(MERROR, "Sqlite::Exec", "sqlite driver does not support disconnected operations");
2451 return DB_FILE_ERROR;
2452}
2453
2454#endif // HAVE_SQLITE
2455
2457// Methods of HsFileSchema //
2459
2460int HsFileSchema::write_event(const time_t t, const char* data, const size_t data_size)
2461{
2462 HsFileSchema* s = this;
2463
2464 assert(s->fVariables.size() == s->fOffsets.size());
2465
2466 if (s->fWriterFd < 0) {
2467 s->fWriterFd = open(s->fFileName.c_str(), O_RDWR);
2468 if (s->fWriterFd < 0) {
2469 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2470 return HS_FILE_ERROR;
2471 }
2472
2473 //static_assert((sizeof(int)==3),"here!");
2474 //static_assert((sizeof(off64_t)==7),"here!");
2475 //static_assert((sizeof(size_t)==3),"here!");
2476 //static_assert((sizeof(intmax_t)==3),"here!");
2477
2478 //off64_t zzz = 1;
2479 //off64_t xxx = zzz<<60;
2480 //int yyy = xxx;
2481 //printf("%d", yyy);
2482
2483 off64_t file_size = ::lseek64(s->fWriterFd, 0, SEEK_END);
2484
2485 if (file_size < 0) {
2486 cm_msg(MERROR, "FileHistory::write_event", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2487 return HS_FILE_ERROR;
2488 }
2489
2490 off64_t nrec = 0;
2491
2492 if (file_size >= s->fDataOffset) {
2493 nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2494 }
2495
2496 if (nrec < 0)
2497 nrec = 0;
2498
2500
2501 //printf("write_event: file_size %jd, nrec %jd, data_end %jd\n", (intmax_t)file_size, (intmax_t)nrec, (intmax_t)data_end);
2502
2503 if (data_end != file_size) {
2504 if (nrec > 0)
2505 cm_msg(MERROR, "FileHistory::write_event", "File \'%s\' may be truncated, data offset %jd, record size %zu, file size: %jd, should be %jd, making it so", s->fFileName.c_str(), (intmax_t)s->fDataOffset, s->fRecordSize, (intmax_t)file_size, (intmax_t)data_end);
2506
2508
2509 if (status64 < 0) {
2510 cm_msg(MERROR, "FileHistory::write_event", "Cannot seek \'%s\' to offset %jd, lseek64() errno %d (%s)", s->fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2511 return HS_FILE_ERROR;
2512 }
2513
2515
2516 if (status < 0) {
2517 cm_msg(MERROR, "FileHistory::write_event", "Cannot truncate \'%s\' to size %jd, ftruncate64() errno %d (%s)", s->fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2518 return HS_FILE_ERROR;
2519 }
2520 }
2521
2523 s->fFileSize = data_end;
2524 }
2525
2526 size_t expected_size = s->fRecordSize - 4;
2527
2528 // sanity check: record_size and n_bytes are computed from the byte counts in the file header
2529 assert(expected_size == s->fNumBytes);
2530
2531 if (s->fLastSize == 0)
2533
2534 if (data_size != s->fLastSize) {
2535 cm_msg(MERROR, "FileHistory::write_event", "Event \'%s\' data size mismatch, expected %zu bytes, got %zu bytes, previously %zu bytes", s->fEventName.c_str(), expected_size, data_size, s->fLastSize);
2536 //printf("schema:\n");
2537 //s->print();
2538
2539 if (data_size < expected_size)
2540 return HS_FILE_ERROR;
2541
2542 // truncate for now
2543 // data_size = expected_size;
2544 s->fLastSize = data_size;
2545 }
2546
2547 size_t size = 4 + expected_size;
2548
2549 if (size != s->fRecordBufferSize) {
2550 s->fRecordBuffer = (char*)realloc(s->fRecordBuffer, size);
2551 assert(s->fRecordBuffer != NULL);
2552 s->fRecordBufferSize = size;
2553 }
2554
2555 memcpy(s->fRecordBuffer, &t, 4);
2557
2558 ssize_t wr = write(s->fWriterFd, s->fRecordBuffer, size);
2559 if ((size_t)wr != size) {
2560 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%zu) returned %zd, errno %d (%s)", s->fFileName.c_str(), size, wr, errno, strerror(errno));
2561 return HS_FILE_ERROR;
2562 }
2563
2564 s->fFileSize += size;
2565
2566#if 0
2567 status = write(s->fWriterFd, &t, 4);
2568 if (status != 4) {
2569 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2570 return HS_FILE_ERROR;
2571 }
2572
2574 if (status != expected_size) {
2575 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) errno %d (%s)", s->fFileName.c_str(), data_size, errno, strerror(errno));
2576 return HS_FILE_ERROR;
2577 }
2578#endif
2579
2580 return HS_SUCCESS;
2581}
2582
2584{
2585 if (fWriterFd >= 0) {
2587 fWriterFd = -1;
2588 }
2589 return HS_SUCCESS;
2590}
2591
2592static int ReadRecord(const char* file_name, int fd, off64_t offset, size_t recsize, off64_t irec, char* rec)
2593{
2595
2597
2598 if (status64 < 0) {
2599 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', lseek64(%jd) errno %d (%s)", file_name, (intmax_t)fpos, errno, strerror(errno));
2600 return -1;
2601 }
2602
2603 ssize_t rd = ::read(fd, rec, recsize);
2604
2605 if (rd < 0) {
2606 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', read() errno %d (%s)", file_name, errno, strerror(errno));
2607 return -1;
2608 }
2609
2610 if (rd == 0) {
2611 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', unexpected end of file on read()", file_name);
2612 return -1;
2613 }
2614
2615 if ((size_t)rd != recsize) {
2616 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', short read() returned %zd instead of %zu bytes", file_name, rd, recsize);
2617 return -1;
2618 }
2619
2620 return HS_SUCCESS;
2621}
2622
2623static int FindTime(const char* file_name, int fd, off64_t offset, size_t recsize, off64_t nrec, time_t timestamp, off64_t* i1p, time_t* t1p, off64_t* i2p, time_t* t2p, time_t* tstart, time_t* tend, int debug)
2624{
2625 //
2626 // purpose: find location time timestamp inside given file.
2627 // uses binary search
2628 // returns:
2629 // tstart, tend - time of first and last data in a file
2630 // i1p,t1p - data just before timestamp, used as "last_written"
2631 // i2p,t2p - data at timestamp or after timestamp, used as starting point to read data from file
2632 // assertions:
2633 // tstart <= t1p < t2p <= tend
2634 // i1p+1==i2p
2635 // t1p < timestamp <= t2p
2636 //
2637 // special cases:
2638 // 1) timestamp <= tstart - all data is in the future, return i1p==-1, t1p==-1, i2p==0, t2p==tstart
2639 // 2) tend < timestamp - all the data is in the past, return i1p = nrec-1, t1p = tend, i2p = nrec, t2p = 0;
2640 // 3) nrec == 1 only one record in this file and it is older than the timestamp (tstart == tend < timestamp)
2641 //
2642
2643 int status;
2644 char* buf = new char[recsize];
2645
2646 assert(nrec > 0);
2647
2648 off64_t rec1 = 0;
2649 off64_t rec2 = nrec-1;
2650
2652 if (status != HS_SUCCESS) {
2653 delete[] buf;
2654 return HS_FILE_ERROR;
2655 }
2656
2657 time_t t1 = *(DWORD*)buf;
2658
2659 *tstart = t1;
2660
2661 // timestamp is older than any data in this file
2662 if (timestamp <= t1) {
2663 *i1p = -1;
2664 *t1p = 0;
2665 *i2p = 0;
2666 *t2p = t1;
2667 *tend = 0;
2668 delete[] buf;
2669 return HS_SUCCESS;
2670 }
2671
2672 assert(t1 < timestamp);
2673
2674 if (nrec == 1) {
2675 *i1p = 0;
2676 *t1p = t1;
2677 *i2p = nrec; // == 1
2678 *t2p = 0;
2679 *tend = t1;
2680 delete[] buf;
2681 return HS_SUCCESS;
2682 }
2683
2685 if (status != HS_SUCCESS) {
2686 delete[] buf;
2687 return HS_FILE_ERROR;
2688 }
2689
2690 time_t t2 = *(DWORD*)buf;
2691
2692 *tend = t2;
2693
2694 // all the data is in the past
2695 if (t2 < timestamp) {
2696 *i1p = rec2;
2697 *t1p = t2;
2698 *i2p = nrec;
2699 *t2p = 0;
2700 delete[] buf;
2701 return HS_SUCCESS;
2702 }
2703
2704 assert(t1 < timestamp);
2705 assert(timestamp <= t2);
2706
2707 if (debug)
2708 printf("FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s\n", (intmax_t)rec1, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2709
2710 // implement binary search
2711
2712 do {
2713 off64_t rec = (rec1+rec2)/2;
2714
2715 assert(rec >= 0);
2716 assert(rec < nrec);
2717
2719 if (status != HS_SUCCESS) {
2720 delete[] buf;
2721 return HS_FILE_ERROR;
2722 }
2723
2724 time_t t = *(DWORD*)buf;
2725
2726 if (timestamp <= t) {
2727 if (debug)
2728 printf("FindTime: rec %jd..(x)..%jd..%jd, time %s..(%s)..%s..%s\n", (intmax_t)rec1, (intmax_t)rec, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t).c_str(), TimeToString(t2).c_str());
2729
2730 rec2 = rec;
2731 t2 = t;
2732 } else {
2733 if (debug)
2734 printf("FindTime: rec %jd..%jd..(x)..%jd, time %s..%s..(%s)..%s\n", (intmax_t)rec1, (intmax_t)rec, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(t).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2735
2736 rec1 = rec;
2737 t1 = t;
2738 }
2739 } while (rec2 - rec1 > 1);
2740
2741 assert(rec1+1 == rec2);
2742 assert(t1 < timestamp);
2743 assert(timestamp <= t2);
2744
2745 if (debug)
2746 printf("FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s, this is the result.\n", (intmax_t)rec1, (intmax_t)rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2747
2748 *i1p = rec1;
2749 *t1p = t1;
2750
2751 *i2p = rec2;
2752 *t2p = t2;
2753
2754 delete[] buf;
2755 return HS_SUCCESS;
2756}
2757
2759 const int debug,
2760 time_t* last_written)
2761{
2762 int status;
2763 HsFileSchema* s = this;
2764
2765 if (debug)
2766 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str());
2767
2768 int fd = open(s->fFileName.c_str(), O_RDONLY);
2769 if (fd < 0) {
2770 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2771 return HS_FILE_ERROR;
2772 }
2773
2774 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2775
2776 if (file_size < 0) {
2777 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2778 return HS_FILE_ERROR;
2779 }
2780
2782 // empty file
2783 ::close(fd);
2784 if (last_written)
2785 *last_written = 0;
2786 return HS_SUCCESS;
2787 }
2788
2789 off64_t nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2790
2791 //printf("read_last_written: nrec %jd, file_size %jd, offset %jd, record size %zu\n", (intmax_t)nrec, (intmax_t)file_size, s->fDataOffset, s->fRecordSize);
2792
2793 if (nrec < 0)
2794 nrec = 0;
2795
2796 if (nrec < 1) {
2797 ::close(fd);
2798 if (last_written)
2799 *last_written = 0;
2800 return HS_SUCCESS;
2801 }
2802
2803 time_t lw = 0;
2804
2805 // read last record to check if desired time is inside or outside of the file
2806
2807 if (1) {
2808 char* buf = new char[s->fRecordSize];
2809
2810 status = ReadRecord(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec - 1, buf);
2811 if (status != HS_SUCCESS) {
2812 delete[] buf;
2813 ::close(fd);
2814 return HS_FILE_ERROR;
2815 }
2816
2817 lw = *(DWORD*)buf;
2818
2819 delete[] buf;
2820 }
2821
2822 if (lw >= timestamp) {
2823 off64_t irec = 0;
2824 time_t trec = 0;
2825 off64_t iunused = 0;
2826 time_t tunused = 0;
2827 time_t tstart = 0; // not used
2828 time_t tend = 0; // not used
2829
2830 status = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*debug);
2831 if (status != HS_SUCCESS) {
2832 ::close(fd);
2833 return HS_FILE_ERROR;
2834 }
2835
2836 assert(trec < timestamp);
2837
2838 lw = trec;
2839 }
2840
2841 if (last_written)
2842 *last_written = lw;
2843
2844 if (debug)
2845 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s, last_written %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
2846
2847 assert(lw < timestamp);
2848
2849 ::close(fd);
2850
2851 return HS_SUCCESS;
2852}
2853
2854int HsFileSchema::read_data(const time_t start_time,
2855 const time_t end_time,
2856 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
2857 const int debug,
2858 std::vector<time_t>& last_time,
2860{
2861 HsFileSchema* s = this;
2862
2863 if (debug)
2864 printf("FileHistory::read_data: file %s, schema time %s..%s, read time %s..%s, %d vars\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var);
2865
2866 //if (1) {
2867 // printf("Last time: ");
2868 // for (int i=0; i<num_var; i++) {
2869 // printf(" %s", TimeToString(last_time[i]).c_str());
2870 // }
2871 // printf("\n");
2872 //}
2873
2874 if (debug) {
2875 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
2876 for (size_t i=0; i<var_schema_index.size(); i++) {
2877 printf(" %2d", var_schema_index[i]);
2878 }
2879 printf("\n");
2880 }
2881
2882 int fd = ::open(s->fFileName.c_str(), O_RDONLY);
2883 if (fd < 0) {
2884 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', open() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2885 return HS_FILE_ERROR;
2886 }
2887
2888 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2889
2890 if (file_size < 0) {
2891 cm_msg(MERROR, "FileHistory::read_data", "Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
2892 ::close(fd);
2893 return HS_FILE_ERROR;
2894 }
2895
2897 // empty file
2898 ::close(fd);
2899 return HS_SUCCESS;
2900 }
2901
2902 off64_t nrec = (file_size - s->fDataOffset)/s->fRecordSize;
2903
2904 //printf("read_data: nrec %jd, file_size %jd, offset %jd, record size %zu\n", (intmax_t)nrec, (intmax_t)file_size, s->fDataOffset, s->fRecordSize);
2905
2906 if (nrec < 0)
2907 nrec = 0;
2908
2909 if (nrec < 1) {
2910 ::close(fd);
2911 return HS_SUCCESS;
2912 }
2913
2914 off64_t iunused = 0;
2915 time_t tunused = 0;
2916 off64_t irec = 0;
2917 time_t trec = 0;
2918 time_t tstart = 0;
2919 time_t tend = 0;
2920
2921 int istatus = FindTime(s->fFileName.c_str(), fd, s->fDataOffset, s->fRecordSize, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*debug);
2922
2923 if (istatus != HS_SUCCESS) {
2924 ::close(fd);
2925 return HS_FILE_ERROR;
2926 }
2927
2928 if (debug) {
2929 printf("FindTime %d, nrec %jd, (%jd, %s) (%jd, %s), tstart %s, tend %s, want %s\n", istatus, (intmax_t)nrec, (intmax_t)iunused, TimeToString(tunused).c_str(), (intmax_t)irec, TimeToString(trec).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str(), TimeToString(start_time).c_str());
2930 }
2931
2932 if (irec < 0 || irec >= nrec) {
2933 // all data in this file is older than start_time
2934
2935 ::close(fd);
2936
2937 if (debug)
2938 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str());
2939
2940 return HS_SUCCESS;
2941 }
2942
2944 // data starts before time declared in schema
2945
2946 ::close(fd);
2947
2948 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of first data %s is before schema start time %s", s->fFileName.c_str(), TimeToString(tstart).c_str(), TimeToString(s->fTimeFrom).c_str());
2949
2950 return HS_FILE_ERROR;
2951 }
2952
2953 if (tend && s->fTimeTo && tend > s->fTimeTo) {
2954 // data ends after time declared in schema (overlaps with next file)
2955
2956 ::close(fd);
2957
2958 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of last data %s is after schema end time %s", s->fFileName.c_str(), TimeToString(tend).c_str(), TimeToString(s->fTimeTo).c_str());
2959
2960 return HS_FILE_ERROR;
2961 }
2962
2963 for (int i=0; i<num_var; i++) {
2964 int si = var_schema_index[i];
2965 if (si < 0)
2966 continue;
2967
2968 if (trec < last_time[i]) { // protect against duplicate and non-monotonous data
2969 ::close(fd);
2970
2971 cm_msg(MERROR, "FileHistory::read_data", "Internal history error at file \'%s\': variable %d data timestamp %s is before last timestamp %s", s->fFileName.c_str(), i, TimeToString(trec).c_str(), TimeToString(last_time[i]).c_str());
2972
2973 return HS_FILE_ERROR;
2974 }
2975 }
2976
2977 int count = 0;
2978
2980
2982
2983 //printf("read_data: lseek64() returned %jd, irec %jd, offset %jd, record size %zu, fpos %jd\n", xpos, irec, s->fDataOffset, s->fRecordSize, fpos);
2984
2985 if (xpos < 0) {
2986 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', lseek64(%jd) errno %d (%s)", s->fFileName.c_str(), (intmax_t)fpos, errno, strerror(errno));
2987 ::close(fd);
2988 return HS_FILE_ERROR;
2989 }
2990
2991 char* buf = new char[s->fRecordSize];
2992
2993 off64_t prec = irec;
2994
2995 while (1) {
2996 ssize_t rd = ::read(fd, buf, s->fRecordSize);
2997
2998 if (rd < 0) {
2999 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', read() errno %d (%s)", s->fFileName.c_str(), errno, strerror(errno));
3000 break;
3001 }
3002
3003 if (rd == 0) {
3004 // EOF
3005 break;
3006 }
3007
3008 if ((size_t)rd != s->fRecordSize) {
3009 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', short read() returned %zd instead of %zu bytes", s->fFileName.c_str(), rd, s->fRecordSize);
3010 break;
3011 }
3012
3013 prec++;
3014
3015 bool past_end_of_last_file = (s->fTimeTo == 0) && (prec > nrec);
3016
3017 time_t t = *(DWORD*)buf;
3018
3019 if (debug > 1)
3020 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, row time %s\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(t).c_str());
3021
3022 if (t < trec) {
3023 delete[] buf;
3024 ::close(fd);
3025 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is before start time %s", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(trec).c_str());
3026 return HS_FILE_ERROR;
3027 }
3028
3029 if (tend && (t > tend) && !past_end_of_last_file) {
3030 delete[] buf;
3031 ::close(fd);
3032 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is after last timestamp %s", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(tend).c_str());
3033 return HS_FILE_ERROR;
3034 }
3035
3036 if (t > end_time)
3037 break;
3038
3039 char* data = buf + 4;
3040
3041 for (int i=0; i<num_var; i++) {
3042 int si = var_schema_index[i];
3043 if (si < 0)
3044 continue;
3045
3046 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
3047 delete[] buf;
3048 ::close(fd);
3049
3050 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %jd timestamp %s is before timestamp %s of variable %d", s->fFileName.c_str(), (intmax_t)(irec + count), TimeToString(t).c_str(), TimeToString(last_time[i]).c_str(), i);
3051
3052 return HS_FILE_ERROR;
3053 }
3054
3055 double v = 0;
3056 void* ptr = data + s->fOffsets[si];
3057
3058 int ii = var_index[i];
3059 assert(ii >= 0);
3060 assert(ii < s->fVariables[si].n_data);
3061
3062 switch (s->fVariables[si].type) {
3063 default:
3064 // unknown data type
3065 v = 0;
3066 break;
3067 case TID_BYTE:
3068 v = ((unsigned char*)ptr)[ii];
3069 break;
3070 case TID_SBYTE:
3071 v = ((signed char *)ptr)[ii];
3072 break;
3073 case TID_CHAR:
3074 v = ((char*)ptr)[ii];
3075 break;
3076 case TID_WORD:
3077 v = ((unsigned short *)ptr)[ii];
3078 break;
3079 case TID_SHORT:
3080 v = ((signed short *)ptr)[ii];
3081 break;
3082 case TID_DWORD:
3083 v = ((unsigned int *)ptr)[ii];
3084 break;
3085 case TID_INT:
3086 v = ((int *)ptr)[ii];
3087 break;
3088 case TID_BOOL:
3089 v = ((unsigned int *)ptr)[ii];
3090 break;
3091 case TID_FLOAT:
3092 v = ((float*)ptr)[ii];
3093 break;
3094 case TID_DOUBLE:
3095 v = ((double*)ptr)[ii];
3096 break;
3097 }
3098
3099 buffer[i]->Add(t, v);
3100 last_time[i] = t;
3101 }
3102 count++;
3103 }
3104
3105 delete[] buf;
3106
3107 ::close(fd);
3108
3109 if (debug) {
3110 printf("FileHistory::read_data: file %s map", s->fFileName.c_str());
3111 for (size_t i=0; i<var_schema_index.size(); i++) {
3112 printf(" %2d", var_schema_index[i]);
3113 }
3114 printf(" read %d rows\n", count);
3115 }
3116
3117 if (debug)
3118 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->fFileName.c_str(), TimeToString(s->fTimeFrom).c_str(), TimeToString(s->fTimeTo).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var, count);
3119
3120 return HS_SUCCESS;
3121}
3122
3124// Implementation of the MidasHistoryInterface //
3126
3128{
3129protected:
3130 int fDebug = 0;
3131 std::string fConnectString;
3132
3133 // writer data
3135 std::vector<HsSchema*> fWriterEvents;
3136
3137 // reader data
3139
3140public:
3142 {
3143 // empty
3144 }
3145
3147 {
3148 for (size_t i=0; i<fWriterEvents.size(); i++) {
3149 //printf("fWriterEvents[%zu] is %p\n", i, fWriterEvents[i]);
3150 if (fWriterEvents[i]) {
3151 delete fWriterEvents[i];
3152 fWriterEvents[i] = NULL;
3153 }
3154 }
3155 fWriterEvents.clear();
3156 }
3157
3158 virtual int hs_set_debug(int debug)
3159 {
3160 int old = fDebug;
3161 fDebug = debug;
3162 return old;
3163 }
3164
3165 virtual int hs_connect(const char* connect_string) = 0;
3166 virtual int hs_disconnect() = 0;
3167
3168protected:
3170 // Schema functions //
3172
3173 // load existing schema
3174 virtual int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp) = 0; // event_name: =NULL means read only event names, =event_name means load tag names for all matching events; timestamp: =0 means read all schema all the way to the beginning of time, =time means read schema in effect at this time and all newer schema
3175
3176 // return a new copy of a schema for writing into history. If schema for this event does not exist, it will be created
3177 virtual HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]) = 0;
3178
3179 // if output file is too big, close it, create a new output file
3180 virtual HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s) = 0;
3181
3182public:
3184 // Functions used by mlogger //
3186
3187 int hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
3188 int hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer);
3189 int hs_flush_buffers();
3190
3192 // Functions used by mhttpd //
3194
3195 int hs_clear_cache();
3196 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3197 int hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags);
3198 int hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[]);
3199 int hs_read_buffer(time_t start_time, time_t end_time,
3200 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3202 int hs_status[]);
3203
3204 /*------------------------------------------------------------------*/
3205
3207 {
3208 public:
3212
3214
3218 double **fDataBuffer;
3219
3221
3223 {
3224 fNumAdded = 0;
3225
3229
3230 fNumAlloc = 0;
3231 fNumEntries = NULL;
3232 fTimeBuffer = NULL;
3233 fDataBuffer = NULL;
3234
3235 fPrevTime = 0;
3236 }
3237
3238 ~ReadBuffer() // dtor
3239 {
3240 }
3241
3243 {
3244 if (wantalloc < fNumAlloc - 10)
3245 return;
3246
3247 int newalloc = fNumAlloc*2;
3248
3249 if (newalloc <= 1000)
3250 newalloc = wantalloc + 1000;
3251
3252 //printf("wantalloc %d, fNumEntries %d, fNumAlloc %d, newalloc %d\n", wantalloc, *fNumEntries, fNumAlloc, newalloc);
3253
3255 assert(*fTimeBuffer);
3256
3257 *fDataBuffer = (double*)realloc(*fDataBuffer, sizeof(double)*newalloc);
3258 assert(*fDataBuffer);
3259
3261 }
3262
3263 void Add(time_t t, double v)
3264 {
3265 if (t < fFirstTime)
3266 return;
3267 if (t > fLastTime)
3268 return;
3269
3270 fNumAdded++;
3271
3272 if ((fPrevTime==0) || (t >= fPrevTime + fInterval)) {
3273 int pos = *fNumEntries;
3274
3275 Realloc(pos + 1);
3276
3277 (*fTimeBuffer)[pos] = t;
3278 (*fDataBuffer)[pos] = v;
3279
3280 (*fNumEntries) = pos + 1;
3281
3282 fPrevTime = t;
3283 }
3284 }
3285
3286 void Finish()
3287 {
3288
3289 }
3290 };
3291
3292 /*------------------------------------------------------------------*/
3293
3294 int hs_read(time_t start_time, time_t end_time, time_t interval,
3295 int num_var,
3296 const char* const event_name[], const char* const var_name[], const int var_index[],
3297 int num_entries[],
3298 time_t* time_buffer[], double* data_buffer[],
3299 int st[]);
3300 /*------------------------------------------------------------------*/
3301
3302
3303 int hs_read_binned(time_t start_time, time_t end_time, int num_bins,
3304 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3305 int num_entries[],
3306 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3309 time_t last_time[], double last_value[],
3310 int st[]);
3311};
3312
3314{
3315 fNumEntries = 0;
3316
3320
3321 fSum0 = new double[num_bins];
3322 fSum1 = new double[num_bins];
3323 fSum2 = new double[num_bins];
3324
3325 for (int i=0; i<num_bins; i++) {
3326 fSum0[i] = 0;
3327 fSum1[i] = 0;
3328 fSum2[i] = 0;
3329 }
3330}
3331
3333{
3334 delete fSum0; fSum0 = NULL;
3335 delete fSum1; fSum1 = NULL;
3336 delete fSum2; fSum2 = NULL;
3337 // poison the pointers
3338 fCount = NULL;
3339 fMean = NULL;
3340 fRms = NULL;
3341 fMin = NULL;
3342 fMax = NULL;
3349}
3350
3352{
3353 for (int ibin = 0; ibin < fNumBins; ibin++) {
3354 if (fMin)
3355 fMin[ibin] = 0;
3356 if (fMax)
3357 fMax[ibin] = 0;
3358 if (fBinsFirstTime)
3359 fBinsFirstTime[ibin] = 0;
3360 if (fBinsFirstValue)
3361 fBinsFirstValue[ibin] = 0;
3362 if (fBinsLastTime)
3363 fBinsLastTime[ibin] = 0;
3364 if (fBinsLastValue)
3365 fBinsLastValue[ibin] = 0;
3366 }
3367 if (fLastTimePtr)
3368 *fLastTimePtr = 0;
3369 if (fLastValuePtr)
3370 *fLastValuePtr = 0;
3371}
3372
3374{
3375 if (t < fFirstTime)
3376 return;
3377 if (t > fLastTime)
3378 return;
3379
3380 fNumEntries++;
3381
3382 double a = (double)(t - fFirstTime);
3383 double b = (double)(fLastTime - fFirstTime);
3384 double fbin = fNumBins*a/b;
3385
3386 int ibin = (int)fbin;
3387
3388 if (ibin < 0)
3389 ibin = 0;
3390 else if (ibin >= fNumBins)
3391 ibin = fNumBins-1;
3392
3393 if (fSum0[ibin] == 0) {
3394 if (fMin)
3395 fMin[ibin] = v;
3396 if (fMax)
3397 fMax[ibin] = v;
3398 if (fBinsFirstTime)
3399 fBinsFirstTime[ibin] = t;
3400 if (fBinsFirstValue)
3401 fBinsFirstValue[ibin] = v;
3402 if (fBinsLastTime)
3403 fBinsLastTime[ibin] = t;
3404 if (fBinsLastValue)
3405 fBinsLastValue[ibin] = v;
3406 if (fLastTimePtr)
3407 *fLastTimePtr = t;
3408 if (fLastValuePtr)
3409 *fLastValuePtr = v;
3410 }
3411
3412 fSum0[ibin] += 1.0;
3413 fSum1[ibin] += v;
3414 fSum2[ibin] += v*v;
3415
3416 if (fMin)
3417 if (v < fMin[ibin])
3418 fMin[ibin] = v;
3419
3420 if (fMax)
3421 if (v > fMax[ibin])
3422 fMax[ibin] = v;
3423
3424 // NOTE: this assumes t and v are sorted by time.
3425 if (fBinsLastTime)
3426 fBinsLastTime[ibin] = t;
3427 if (fBinsLastValue)
3428 fBinsLastValue[ibin] = v;
3429
3430 if (fLastTimePtr)
3431 if (t > *fLastTimePtr) {
3432 *fLastTimePtr = t;
3433 if (fLastValuePtr)
3434 *fLastValuePtr = v;
3435 }
3436}
3437
3439{
3440 for (int i=0; i<fNumBins; i++) {
3441 double num = fSum0[i];
3442 double mean = 0;
3443 double variance = 0;
3444 if (num > 0) {
3445 mean = fSum1[i]/num;
3447 }
3448 double rms = 0;
3449 if (variance > 0)
3450 rms = sqrt(variance);
3451
3452 if (fCount)
3453 fCount[i] = (int)num;
3454
3455 if (fMean)
3456 fMean[i] = mean;
3457
3458 if (fRms)
3459 fRms[i] = rms;
3460
3461 if (num == 0) {
3462 if (fMin)
3463 fMin[i] = 0;
3464 if (fMax)
3465 fMax[i] = 0;
3466 }
3467 }
3468}
3469
3470int SchemaHistoryBase::hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
3471{
3472 if (fDebug) {
3473 printf("hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3474 if (fDebug > 1)
3475 PrintTags(ntags, tags);
3476 }
3477
3478 // delete all events with the same name
3479 for (size_t i=0; i<fWriterEvents.size(); i++)
3480 if (fWriterEvents[i])
3481 if (event_name_cmp(fWriterEvents[i]->fEventName, event_name)==0) {
3482 if (fDebug)
3483 printf("deleting exising event %s\n", event_name);
3484 fWriterEvents[i]->close();
3485 delete fWriterEvents[i];
3486 fWriterEvents[i] = NULL;
3487 }
3488
3489 // check for wrong types etc
3490 for (int i=0; i<ntags; i++) {
3491 if (strlen(tags[i].name) < 1) {
3492 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has empty name at index %d", event_name, i);
3493 return HS_FILE_ERROR;
3494 }
3495 if (tags[i].name[0] == ' ') {
3496 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has name \'%s\' starting with a blank", event_name, tags[i].name);
3497 return HS_FILE_ERROR;
3498 }
3499 if (tags[i].type <= 0 || tags[i].type >= TID_LAST) {
3500 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3501 event_name, tags[i].name, i, tags[i].type);
3502 return HS_FILE_ERROR;
3503 }
3504 if (tags[i].type == TID_STRING) {
3505 cm_msg(MERROR, "hs_define_event",
3506 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3507 tags[i].name, i);
3508 return HS_FILE_ERROR;
3509 }
3510 if (tags[i].n_data <= 0) {
3511 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3512 event_name, tags[i].name, i, tags[i].n_data);
3513 return HS_FILE_ERROR;
3514 }
3515 }
3516
3517 // check for duplicate names. Done by sorting, since this takes only O(N*log*N))
3518 std::vector<std::string> names;
3519 for (int i=0; i<ntags; i++) {
3520 std::string str(tags[i].name);
3521 std::transform(str.begin(), str.end(), str.begin(), ::toupper);
3522 names.push_back(str);
3523 }
3524 std::sort(names.begin(), names.end());
3525 for (int i=0; i<ntags-1; i++) {
3526 if (names[i] == names[i + 1]) {
3527 cm_msg(MERROR, "hs_define_event",
3528 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3529 names[i].c_str());
3530 return HS_FILE_ERROR;
3531 }
3532 }
3533
3534 HsSchema* s = new_event(event_name, timestamp, ntags, tags);
3535 if (!s)
3536 return HS_FILE_ERROR;
3537
3538 s->fDisabled = false;
3539
3541
3542 // find empty slot in events list
3543 for (size_t i=0; i<fWriterEvents.size(); i++)
3544 if (!fWriterEvents[i]) {
3545 fWriterEvents[i] = s;
3546 s = NULL;
3547 break;
3548 }
3549
3550 // if no empty slots, add at the end
3551 if (s)
3552 fWriterEvents.push_back(s);
3553
3554 return HS_SUCCESS;
3555}
3556
3557int SchemaHistoryBase::hs_write_event(const char* event_name, time_t timestamp, int xbuffer_size, const char* buffer)
3558{
3559 if (fDebug)
3560 printf("hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (int)timestamp, xbuffer_size);
3561
3562 assert(xbuffer_size > 0);
3563
3564 size_t buffer_size = xbuffer_size;
3565
3566 HsSchema *s = NULL;
3567
3568 // find this event
3569 for (size_t i=0; i<fWriterEvents.size(); i++)
3570 if (fWriterEvents[i] && (event_name_cmp(fWriterEvents[i]->fEventName, event_name)==0)) {
3571 s = fWriterEvents[i];
3572 break;
3573 }
3574
3575 // not found
3576 if (!s)
3577 return HS_UNDEFINED_EVENT;
3578
3579 // deactivated because of error?
3580 if (s->fDisabled) {
3581 //printf("HERE!\n");
3582 return HS_FILE_ERROR;
3583 }
3584
3585 s = maybe_reopen(event_name, timestamp, s);
3586
3587 assert(s != NULL);
3588
3589 if (s->fNumBytes == 0) { // compute expected data size
3590 // NB: history data does not have any padding!
3591 for (size_t i=0; i<s->fVariables.size(); i++) {
3592 s->fNumBytes += s->fVariables[i].n_bytes;
3593 }
3594 }
3595
3596 int status;
3597
3598 if (buffer_size > s->fNumBytes) { // too many bytes!
3599 if (s->fCountWriteOversize == 0) {
3600 // only report first occurence
3601 // count of all occurences is reported by HsSchema destructor
3602 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3603 }
3605 if (buffer_size > s->fWriteMaxSize)
3606 s->fWriteMaxSize = buffer_size;
3607 status = s->write_event(timestamp, buffer, s->fNumBytes);
3608 } else if (buffer_size < s->fNumBytes) { // too few bytes
3609 if (s->fCountWriteUndersize == 0) {
3610 // only report first occurence
3611 // count of all occurences is reported by HsSchema destructor
3612 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->fEventName.c_str(), s->fNumBytes, buffer_size);
3613 }
3615 if (s->fWriteMinSize == 0)
3616 s->fWriteMinSize = buffer_size;
3617 else if (buffer_size < s->fWriteMinSize)
3618 s->fWriteMinSize = buffer_size;
3619 char* tmp = (char*)malloc(s->fNumBytes);
3620 memcpy(tmp, buffer, buffer_size);
3621 memset(tmp + buffer_size, 0, s->fNumBytes - buffer_size);
3622 status = s->write_event(timestamp, tmp, s->fNumBytes);
3623 free(tmp);
3624 } else {
3625 assert(buffer_size == s->fNumBytes); // obviously
3626 status = s->write_event(timestamp, buffer, buffer_size);
3627 }
3628
3629 // if could not write event, deactivate it
3630 if (status != HS_SUCCESS) {
3631 s->fDisabled = true;
3632 cm_msg(MERROR, "hs_write_event", "Event \'%s\' disabled after write error %d", event_name, status);
3633 return HS_FILE_ERROR;
3634 }
3635
3636 return HS_SUCCESS;
3637}
3638
3640{
3641 int status = HS_SUCCESS;
3642
3643 if (fDebug)
3644 printf("hs_flush_buffers!\n");
3645
3646 for (size_t i=0; i<fWriterEvents.size(); i++)
3647 if (fWriterEvents[i]) {
3648 int xstatus = fWriterEvents[i]->flush_buffers();
3649 if (xstatus != HS_SUCCESS)
3650 status = xstatus;
3651 }
3652
3653 return status;
3654}
3655
3657// Functions used by mhttpd //
3659
3661{
3662 if (fDebug)
3663 printf("SchemaHistoryBase::hs_clear_cache!\n");
3664
3667
3668 return HS_SUCCESS;
3669}
3670
3671int SchemaHistoryBase::hs_get_events(time_t t, std::vector<std::string> *pevents)
3672{
3673 if (fDebug)
3674 printf("hs_get_events, time %s\n", TimeToString(t).c_str());
3675
3677 if (status != HS_SUCCESS)
3678 return status;
3679
3680 if (fDebug) {
3681 printf("hs_get_events: available schema:\n");
3682 fReaderSchema.print(false);
3683 }
3684
3685 assert(pevents);
3686
3687 for (size_t i=0; i<fReaderSchema.size(); i++) {
3688 HsSchema* s = fReaderSchema[i];
3689 if (t && s->fTimeTo && s->fTimeTo < t)
3690 continue;
3691 bool dupe = false;
3692 for (size_t j=0; j<pevents->size(); j++)
3693 if (event_name_cmp((*pevents)[j], s->fEventName.c_str())==0) {
3694 dupe = true;
3695 break;
3696 }
3697
3698 if (!dupe)
3699 pevents->push_back(s->fEventName);
3700 }
3701
3702 std::sort(pevents->begin(), pevents->end());
3703
3704 if (fDebug) {
3705 printf("hs_get_events: returning %zu events\n", pevents->size());
3706 for (size_t i=0; i<pevents->size(); i++) {
3707 printf(" %zu: [%s]\n", i, (*pevents)[i].c_str());
3708 }
3709 }
3710
3711 return HS_SUCCESS;
3712}
3713
3714int SchemaHistoryBase::hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags)
3715{
3716 if (fDebug)
3717 printf("hs_get_tags: event [%s], time %s\n", event_name, TimeToString(t).c_str());
3718
3719 assert(ptags);
3720
3721 int status = read_schema(&fReaderSchema, event_name, t);
3722 if (status != HS_SUCCESS)
3723 return status;
3724
3725 bool found_event = false;
3726 for (size_t i=0; i<fReaderSchema.size(); i++) {
3727 HsSchema* s = fReaderSchema[i];
3728 if (t && s->fTimeTo && s->fTimeTo < t)
3729 continue;
3730
3731 if (event_name_cmp(s->fEventName, event_name) != 0)
3732 continue;
3733
3734 found_event = true;
3735
3736 for (size_t i=0; i<s->fVariables.size(); i++) {
3737 const char* tagname = s->fVariables[i].name.c_str();
3738 //printf("event_name [%s], table_name [%s], column name [%s], tag name [%s]\n", event_name, tn.c_str(), cn.c_str(), tagname);
3739
3740 bool dupe = false;
3741
3742 for (size_t k=0; k<ptags->size(); k++)
3743 if (strcasecmp((*ptags)[k].name, tagname) == 0) {
3744 dupe = true;
3745 break;
3746 }
3747
3748 if (!dupe) {
3749 TAG t;
3750 mstrlcpy(t.name, tagname, sizeof(t.name));
3751 t.type = s->fVariables[i].type;
3752 t.n_data = s->fVariables[i].n_data;
3753
3754 ptags->push_back(t);
3755 }
3756 }
3757 }
3758
3759 if (!found_event)
3760 return HS_UNDEFINED_EVENT;
3761
3762 if (fDebug) {
3763 printf("hs_get_tags: event [%s], returning %zu tags\n", event_name, ptags->size());
3764 for (size_t i=0; i<ptags->size(); i++) {
3765 printf(" tag[%zu]: %s[%d] type %d\n", i, (*ptags)[i].name, (*ptags)[i].n_data, (*ptags)[i].type);
3766 }
3767 }
3768
3769 return HS_SUCCESS;
3770}
3771
3772int SchemaHistoryBase::hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[])
3773{
3774 if (fDebug) {
3775 printf("hs_get_last_written: timestamp %s, num_var %d\n", TimeToString(timestamp).c_str(), num_var);
3776 }
3777
3778 for (int j=0; j<num_var; j++) {
3779 last_written[j] = 0;
3780 }
3781
3782 for (int i=0; i<num_var; i++) {
3783 int status = read_schema(&fReaderSchema, event_name[i], 0);
3784 if (status != HS_SUCCESS)
3785 return status;
3786 }
3787
3788 //fReaderSchema.print(false);
3789
3790 for (int i=0; i<num_var; i++) {
3791 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3793 // schema is too new
3794 if (s->fTimeFrom && s->fTimeFrom >= timestamp)
3795 continue;
3796 // this schema is newer than last_written and may contain newer data?
3797 if (s->fTimeFrom && s->fTimeFrom < last_written[i])
3798 continue;
3799 // schema for the variables we want?
3800 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3801 if (sindex < 0)
3802 continue;
3803
3804 time_t lw = 0;
3805
3806 int status = s->read_last_written(timestamp, fDebug, &lw);
3807
3808 if (status == HS_SUCCESS && lw != 0) {
3809 for (int j=0; j<num_var; j++) {
3810 int sj = s->match_event_var(event_name[j], var_name[j], var_index[j]);
3811 if (sj < 0)
3812 continue;
3813
3814 if (lw > last_written[j])
3815 last_written[j] = lw;
3816 }
3817 }
3818 }
3819 }
3820
3821 if (fDebug) {
3822 printf("hs_get_last_written: timestamp time %s, num_var %d, result:\n", TimeToString(timestamp).c_str(), num_var);
3823 for (int i=0; i<num_var; i++) {
3824 printf(" event [%s] tag [%s] index [%d] last_written %s\n", event_name[i], var_name[i], var_index[i], TimeToString(last_written[i]).c_str());
3825 }
3826 }
3827
3828 return HS_SUCCESS;
3829}
3830
3832 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3834 int hs_status[])
3835{
3836 if (fDebug)
3837 printf("hs_read_buffer: %d variables, start time %s, end time %s\n", num_var, TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
3838
3839 for (int i=0; i<num_var; i++) {
3840 int status = read_schema(&fReaderSchema, event_name[i], start_time);
3841 if (status != HS_SUCCESS)
3842 return status;
3843 }
3844
3845#if 0
3846 if (fDebug)
3847 fReaderSchema.print(false);
3848#endif
3849
3850 for (int i=0; i<num_var; i++) {
3852 }
3853
3854 //for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3855 // HsSchema* s = fReaderSchema[ss];
3856 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3857 // assert(fs != NULL);
3858 // printf("schema %d from %s to %s, filename %s\n", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3859 //}
3860
3861 // check that schema are sorted by time
3862
3863#if 0
3864 // check that schema list is sorted by time, descending fTimeFrom, newest schema first
3865 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3866 if (fDebug) {
3867 //printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d %d %d\n", ss, fReaderSchema.size(),
3868 // TimeToString(fReaderSchema[ss-1]->fTimeFrom).c_str(),
3869 // TimeToString(fReaderSchema[ss]->fTimeFrom).c_str(),
3870 // TimeToString(fReaderSchema[ss]->fTimeTo).c_str(),
3871 // fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo,
3872 // fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom,
3873 // (fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo) && (fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom));
3874 printf("Schema %zu/%zu: from %s to %s, name %s\n", ss, fReaderSchema.size(),
3875 TimeToString(fReaderSchema[ss]->fTimeFrom).c_str(),
3876 TimeToString(fReaderSchema[ss]->fTimeTo).c_str(),
3877 fReaderSchema[ss]->fEventName.c_str());
3878 }
3879
3880 if (ss > 0) {
3881 //if ((fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeTo) && (fReaderSchema[ss-1]->fTimeFrom > fReaderSchema[ss]->fTimeFrom)) {
3882 if ((fReaderSchema[ss-1]->fTimeFrom >= fReaderSchema[ss]->fTimeFrom)) {
3883 // good
3884 } else {
3885 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3886 return HS_FILE_ERROR;
3887 }
3888 }
3889 }
3890#endif
3891
3892 std::vector<HsSchema*> slist;
3893 std::vector<std::vector<int>> smap;
3894
3895 for (size_t ss=0; ss<fReaderSchema.size(); ss++) {
3897 // schema is too new?
3898 if (s->fTimeFrom && s->fTimeFrom > end_time)
3899 continue;
3900 // schema is too old
3901 if (s->fTimeTo && s->fTimeTo < start_time)
3902 continue;
3903
3904 std::vector<int> sm;
3905
3906 for (int i=0; i<num_var; i++) {
3907 // schema for the variables we want?
3908 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3909 if (sindex < 0)
3910 continue;
3911
3912 if (sm.empty()) {
3913 for (int i=0; i<num_var; i++) {
3914 sm.push_back(-1);
3915 }
3916 }
3917
3918 sm[i] = sindex;
3919 }
3920
3921 if (!sm.empty()) {
3922 slist.push_back(s);
3923 smap.push_back(sm);
3924 }
3925 }
3926
3927 if (0||fDebug) {
3928 printf("Found %zu matching schema:\n", slist.size());
3929
3930 for (size_t i=0; i<slist.size(); i++) {
3931 HsSchema* s = slist[i];
3932 s->print();
3933 for (int k=0; k<num_var; k++)
3934 printf(" tag %s[%d] sindex %d\n", var_name[k], var_index[k], smap[i][k]);
3935 }
3936 }
3937
3938 //for (size_t ss=0; ss<slist.size(); ss++) {
3939 // HsSchema* s = slist[ss];
3940 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3941 // assert(fs != NULL);
3942 // printf("schema %zu from %s to %s, filename %s", ss, TimeToString(fs->fTimeFrom).c_str(), TimeToString(fs->fTimeTo).c_str(), fs->fFileName.c_str());
3943 // printf(" smap ");
3944 // for (int k=0; k<num_var; k++)
3945 // printf(" %2d", smap[ss][k]);
3946 // printf("\n");
3947 //}
3948
3949 for (size_t ss=1; ss<slist.size(); ss++) {
3950 if (fDebug) {
3951 printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3952 TimeToString(slist[ss-1]->fTimeFrom).c_str(),
3953 TimeToString(slist[ss]->fTimeFrom).c_str(),
3954 TimeToString(slist[ss]->fTimeTo).c_str(),
3955 slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom);
3956 }
3957 if (slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom) {
3958 // good
3959 } else {
3960 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3961 return HS_FILE_ERROR;
3962 }
3963 }
3964
3965 std::vector<time_t> last_time;
3966
3967 for (int i=0; i<num_var; i++) {
3968 last_time.push_back(start_time);
3969 }
3970
3971 for (size_t i=slist.size()-1; ; i--) {
3972 HsSchema* s = slist[i];
3973
3974 int status = s->read_data(start_time, end_time, num_var, smap[i], var_index, fDebug, last_time, buffer);
3975
3976 if (status == HS_SUCCESS) {
3977 for (int j=0; j<num_var; j++) {
3978 if (smap[i][j] >= 0)
3980 }
3981 }
3982
3983 if (i==0)
3984 break;
3985 }
3986
3987 return HS_SUCCESS;
3988}
3989
3991 int num_var,
3992 const char* const event_name[], const char* const var_name[], const int var_index[],
3993 int num_entries[],
3994 time_t* time_buffer[], double* data_buffer[],
3995 int st[])
3996{
3997 int status;
3998
3999 ReadBuffer** buffer = new ReadBuffer*[num_var];
4001
4002 for (int i=0; i<num_var; i++) {
4003 buffer[i] = new ReadBuffer(start_time, end_time, interval);
4004 bi[i] = buffer[i];
4005
4006 // make sure outputs are initialized to something sane
4007 if (num_entries)
4008 num_entries[i] = 0;
4009 if (time_buffer)
4010 time_buffer[i] = NULL;
4011 if (data_buffer)
4012 data_buffer[i] = NULL;
4013 if (st)
4014 st[i] = 0;
4015
4016 if (num_entries)
4017 buffer[i]->fNumEntries = &num_entries[i];
4018 if (time_buffer)
4019 buffer[i]->fTimeBuffer = &time_buffer[i];
4020 if (data_buffer)
4021 buffer[i]->fDataBuffer = &data_buffer[i];
4022 }
4023
4024 status = hs_read_buffer(start_time, end_time,
4025 num_var, event_name, var_name, var_index,
4026 bi, st);
4027
4028 for (int i=0; i<num_var; i++) {
4029 buffer[i]->Finish();
4030 delete buffer[i];
4031 }
4032
4033 delete[] buffer;
4034 delete[] bi;
4035
4036 return status;
4037}
4038
4040 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
4041 int num_entries[],
4042 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
4045 time_t last_time[], double last_value[],
4046 int st[])
4047{
4048 int status;
4049
4052
4053 for (int i=0; i<num_var; i++) {
4054 buffer[i] = new MidasHistoryBinnedBuffer(start_time, end_time, num_bins);
4055 xbuffer[i] = buffer[i];
4056
4057 if (count_bins)
4058 buffer[i]->fCount = count_bins[i];
4059 if (mean_bins)
4060 buffer[i]->fMean = mean_bins[i];
4061 if (rms_bins)
4062 buffer[i]->fRms = rms_bins[i];
4063 if (min_bins)
4064 buffer[i]->fMin = min_bins[i];
4065 if (max_bins)
4066 buffer[i]->fMax = max_bins[i];
4067 if (bins_first_time)
4068 buffer[i]->fBinsFirstTime = bins_first_time[i];
4069 if (bins_first_value)
4070 buffer[i]->fBinsFirstValue = bins_first_value[i];
4071 if (bins_last_time)
4072 buffer[i]->fBinsLastTime = bins_last_time[i];
4073 if (bins_last_value)
4074 buffer[i]->fBinsLastValue = bins_last_value[i];
4075 if (last_time)
4076 buffer[i]->fLastTimePtr = &last_time[i];
4077 if (last_value)
4078 buffer[i]->fLastValuePtr = &last_value[i];
4079
4080 buffer[i]->Start();
4081 }
4082
4083 status = hs_read_buffer(start_time, end_time,
4084 num_var, event_name, var_name, var_index,
4085 xbuffer,
4086 st);
4087
4088 for (int i=0; i<num_var; i++) {
4089 buffer[i]->Finish();
4090 if (num_entries)
4091 num_entries[i] = buffer[i]->fNumEntries;
4092 if (0) {
4093 for (int j=0; j<num_bins; j++) {
4094 printf("var %d bin %d count %d, first %s last %s value first %f last %f\n", i, j, count_bins[i][j], TimeToString(bins_first_time[i][j]).c_str(), TimeToString(bins_last_time[i][j]).c_str(), bins_first_value[i][j], bins_last_value[i][j]);
4095 }
4096 }
4097 delete buffer[i];
4098 }
4099
4100 delete[] buffer;
4101 delete[] xbuffer;
4102
4103 return status;
4104}
4105
4107// SQL schema //
4109
4111{
4112 if (!fSql->IsConnected()) {
4113 return HS_SUCCESS;
4114 }
4115
4116 int status = HS_SUCCESS;
4117 if (get_transaction_count() > 0) {
4120 }
4121 return status;
4122}
4123
4124int HsSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4125{
4126 if (!MatchEventName(this->fEventName.c_str(), event_name))
4127 return -1;
4128
4129 for (size_t j=0; j<this->fVariables.size(); j++) {
4130 if (MatchTagName(this->fVariables[j].name.c_str(), this->fVariables[j].n_data, var_name, var_index)) {
4131 // Second clause in if() is case where MatchTagName used the "alternate tag name".
4132 // E.g. our variable name is "IM05[3]" (n_data=1), but we're looking for var_name="IM05" and var_index=3.
4133 if (var_index < this->fVariables[j].n_data || (this->fVariables[j].n_data == 1 && this->fVariables[j].name.find("[") != std::string::npos)) {
4134 return j;
4135 }
4136 }
4137 }
4138
4139 return -1;
4140}
4141
4142int HsSqlSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4143{
4144 if (event_name_cmp(this->fTableName, event_name)==0) {
4145 for (size_t j=0; j<this->fVariables.size(); j++) {
4146 if (var_name_cmp(this->fColumnNames[j], var_name)==0)
4147 return j;
4148 }
4149 }
4150
4151 return HsSchema::match_event_var(event_name, var_name, var_index);
4152}
4153
4154static HsSqlSchema* NewSqlSchema(HsSchemaVector* sv, const char* table_name, time_t t)
4155{
4156 time_t tt = 0;
4157 int j=-1;
4158 int jjx=-1; // remember oldest schema
4159 time_t ttx = 0;
4160 for (size_t i=0; i<sv->size(); i++) {
4161 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
4162 if (s->fTableName != table_name)
4163 continue;
4164
4165 if (s->fTimeFrom == t) {
4166 return s;
4167 }
4168
4169 // remember the last schema before time t
4170 if (s->fTimeFrom < t) {
4171 if (s->fTimeFrom > tt) {
4172 tt = s->fTimeFrom;
4173 j = i;
4174 }
4175 }
4176
4177 if (jjx < 0) {
4178 jjx = i;
4179 ttx = s->fTimeFrom;
4180 }
4181
4182 if (s->fTimeFrom < ttx) {
4183 jjx = i;
4184 ttx = s->fTimeFrom;
4185 }
4186
4187 //printf("table_name [%s], t=%s, i=%d, j=%d %s, tt=%s, dt is %d\n", table_name, TimeToString(t).c_str(), i, j, TimeToString(s->fTimeFrom).c_str(), TimeToString(tt).c_str(), (int)(s->fTimeFrom-t));
4188 }
4189
4190 //printf("NewSqlSchema: will copy schema j=%d, tt=%d at time %d\n", j, tt, t);
4191
4192 //printf("cloned schema at time %s: ", TimeToString(t).c_str());
4193 //(*sv)[j]->print(false);
4194
4195 //printf("schema before:\n");
4196 //sv->print(false);
4197
4198 if (j >= 0) {
4199 HsSqlSchema* s = new HsSqlSchema;
4200 *s = *(HsSqlSchema*)(*sv)[j]; // make a copy
4201 s->fTimeFrom = t;
4202 sv->add(s);
4203
4204 //printf("schema after:\n");
4205 //sv->print(false);
4206
4207 return s;
4208 }
4209
4210 if (jjx >= 0) {
4211 cm_msg(MERROR, "NewSqlSchema", "Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4212
4213 HsSqlSchema* s = new HsSqlSchema;
4214 *s = *(HsSqlSchema*)(*sv)[jjx]; // make a copy
4215 s->fTimeFrom = t;
4216 s->fTimeTo = ttx;
4217 sv->add(s);
4218
4219 //printf("schema after:\n");
4220 //sv->print(false);
4221
4222 return s;
4223 }
4224
4225 cm_msg(MERROR, "NewSqlSchema", "Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4226 return NULL;
4227}
4228
4230{
4231 assert(fVariables.size() == fColumnInactive.size());
4232 assert(fVariables.size() == fColumnNames.size());
4233 assert(fVariables.size() == fColumnTypes.size());
4234 assert(fVariables.size() == fOffsets.size());
4235
4236 size_t count_active = 0;
4237 size_t count_inactive = 0;
4238
4239 for (size_t i=0; i<fColumnInactive.size(); i++) {
4240 if (fColumnInactive[i])
4241 count_inactive += 1;
4242 else
4243 count_active += 1;
4244 }
4245
4246 //printf("remove_inactive_columns: enter! count_active: %zu, count_inactive: %zu\n", count_active, count_inactive);
4247 //print();
4248
4249 if (count_inactive > 0) {
4250 size_t j=0;
4251
4252 for (size_t i=0; i<fColumnInactive.size(); i++) {
4253 if (fColumnInactive[i]) {
4254 // skip this entry
4255 } else {
4256 if (j != i) {
4261 fOffsets[j] = fOffsets[i];
4262 }
4263 j++;
4264 }
4265 }
4266
4267 //print();
4268 //printf("%zu %zu\n", j, count_active);
4269
4270 assert(j == count_active);
4271
4272 //print();
4273
4274 fVariables.resize(count_active);
4276 fColumnNames.resize(count_active);
4277 fColumnTypes.resize(count_active);
4278 fOffsets.resize(count_active);
4279
4280 assert(fVariables.size() == fColumnInactive.size());
4281 assert(fVariables.size() == fColumnNames.size());
4282 assert(fVariables.size() == fColumnTypes.size());
4283 assert(fVariables.size() == fOffsets.size());
4284
4285 //printf("remove_inactice_columns: exit!\n");
4286 //print();
4287 }
4288}
4289
4290int HsSqlSchema::write_event(const time_t t, const char* data, const size_t data_size)
4291{
4292 HsSqlSchema* s = this;
4293
4294 assert(s->fVariables.size() == s->fColumnInactive.size());
4295 assert(s->fVariables.size() == s->fColumnNames.size());
4296 assert(s->fVariables.size() == s->fColumnTypes.size());
4297 assert(s->fVariables.size() == s->fOffsets.size());
4298
4299 std::string tags;
4300 std::string values;
4301
4302 for (size_t i=0; i<s->fVariables.size(); i++) {
4303 // NB: inactive columns should have been removed from the schema. K.O.
4304
4305 if (s->fColumnInactive[i]) {
4306 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected inactive column %zu", i);
4308 return HS_FILE_ERROR;
4309 }
4310
4311 int type = s->fVariables[i].type;
4312 int n_data = s->fVariables[i].n_data;
4313 int offset = s->fOffsets[i];
4314 const char* column_name = s->fColumnNames[i].c_str();
4315
4316 if (offset < 0) {
4317 cm_msg(MERROR, "HsSqlSchema::write_event", "Internal error, unexpected negative offset %d for column %zu", offset, i);
4319 return HS_FILE_ERROR;
4320 }
4321
4322 assert(n_data == 1);
4323 assert(strlen(column_name) > 0);
4324 assert(offset >= 0);
4325 assert((size_t)offset < data_size);
4326
4327 void* ptr = (void*)(data+offset);
4328
4329 tags += ", ";
4330 tags += fSql->QuoteId(column_name);
4331
4332 values += ", ";
4333
4334 char buf[1024];
4335 int j=0;
4336
4337 switch (type) {
4338 default:
4339 sprintf(buf, "unknownType%d", type);
4340 break;
4341 case TID_BYTE:
4342 sprintf(buf, "%u",((unsigned char *)ptr)[j]);
4343 break;
4344 case TID_SBYTE:
4345 sprintf(buf, "%d",((signed char*)ptr)[j]);
4346 break;
4347 case TID_CHAR:
4348 // FIXME: quotes
4349 sprintf(buf, "\'%c\'",((char*)ptr)[j]);
4350 break;
4351 case TID_WORD:
4352 sprintf(buf, "%u",((unsigned short *)ptr)[j]);
4353 break;
4354 case TID_SHORT:
4355 sprintf(buf, "%d",((short *)ptr)[j]);
4356 break;
4357 case TID_DWORD:
4358 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4359 break;
4360 case TID_INT:
4361 sprintf(buf, "%d",((int *)ptr)[j]);
4362 break;
4363 case TID_BOOL:
4364 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4365 break;
4366 case TID_FLOAT:
4367 // FIXME: quotes
4368 sprintf(buf, "\'%.8g\'",((float*)ptr)[j]);
4369 break;
4370 case TID_DOUBLE:
4371 // FIXME: quotes
4372 sprintf(buf, "\'%.16g\'",((double*)ptr)[j]);
4373 break;
4374 }
4375
4376 values += buf;
4377 }
4378
4379 // 2001-02-16 20:38:40.1
4380 struct tm tms;
4381 localtime_r(&t, &tms); // somebody must call tzset() before this.
4382 char buf[1024];
4383 strftime(buf, sizeof(buf)-1, "%Y-%m-%d %H:%M:%S.0", &tms);
4384
4385 std::string cmd;
4386 cmd = "INSERT INTO ";
4387 cmd += fSql->QuoteId(s->fTableName.c_str());
4388 cmd += " (_t_time, _i_time";
4389 cmd += tags;
4390 cmd += ") VALUES (";
4391 cmd += fSql->QuoteString(buf);
4392 cmd += ", ";
4393 cmd += fSql->QuoteString(TimeToString(t).c_str());
4394 cmd += "";
4395 cmd += values;
4396 cmd += ");";
4397
4398 if (fSql->IsConnected()) {
4399 if (s->get_transaction_count() == 0)
4400 fSql->OpenTransaction(s->fTableName.c_str());
4401
4403
4404 int status = fSql->Exec(s->fTableName.c_str(), cmd.c_str());
4405
4406 // mh2sql who does not call hs_flush_buffers()
4407 // so we should flush the transaction by hand
4408 // some SQL engines have limited transaction buffers... K.O.
4409 if (s->get_transaction_count() > 100000) {
4410 //printf("flush table %s\n", table_name);
4411 fSql->CommitTransaction(s->fTableName.c_str());
4413 }
4414
4415 if (status != DB_SUCCESS) {
4416 return status;
4417 }
4418 } else {
4419 int status = fSql->ExecDisconnected(s->fTableName.c_str(), cmd.c_str());
4420 if (status != DB_SUCCESS) {
4421 return status;
4422 }
4423 }
4424
4425 return HS_SUCCESS;
4426}
4427
4429 const int debug,
4430 time_t* last_written)
4431{
4432 if (debug)
4433 printf("SqlHistory::read_last_written: table [%s], timestamp %s\n", fTableName.c_str(), TimeToString(timestamp).c_str());
4434
4435 std::string cmd;
4436 cmd += "SELECT _i_time FROM ";
4437 cmd += fSql->QuoteId(fTableName.c_str());
4438 cmd += " WHERE _i_time < ";
4439 cmd += TimeToString(timestamp);
4440 cmd += " ORDER BY _i_time DESC LIMIT 2;";
4441
4442 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4443
4444 if (status != DB_SUCCESS)
4445 return status;
4446
4447 time_t lw = 0;
4448
4449 /* Loop through the rows in the result-set */
4450
4451 while (1) {
4452 status = fSql->Step();
4453 if (status != DB_SUCCESS)
4454 break;
4455
4456 time_t t = fSql->GetTime(0);
4457
4458 if (t >= timestamp)
4459 continue;
4460
4461 if (t > lw)
4462 lw = t;
4463 }
4464
4465 fSql->Finalize();
4466
4467 *last_written = lw;
4468
4469 if (debug)
4470 printf("SqlHistory::read_last_written: table [%s], timestamp %s, last_written %s\n", fTableName.c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
4471
4472 return HS_SUCCESS;
4473}
4474
4475int HsSqlSchema::read_data(const time_t start_time,
4476 const time_t end_time,
4477 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
4478 const int debug,
4479 std::vector<time_t>& last_time,
4481{
4482 bool bad_last_time = false;
4483
4484 if (debug)
4485 printf("SqlHistory::read_data: table [%s], start %s, end %s\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4486
4487 std::string collist;
4488
4489 for (int i=0; i<num_var; i++) {
4490 int j = var_schema_index[i];
4491 if (j < 0)
4492 continue;
4493 if (collist.length() > 0)
4494 collist += ", ";
4496 }
4497
4498 std::string cmd;
4499 cmd += "SELECT _i_time, ";
4500 cmd += collist;
4501 cmd += " FROM ";
4502 cmd += fSql->QuoteId(fTableName.c_str());
4503 cmd += " WHERE _i_time>=";
4504 cmd += TimeToString(start_time);
4505 cmd += " and _i_time<=";
4506 cmd += TimeToString(end_time);
4507 cmd += " ORDER BY _i_time;";
4508
4509 int status = fSql->Prepare(fTableName.c_str(), cmd.c_str());
4510
4511 if (status != DB_SUCCESS)
4512 return HS_FILE_ERROR;
4513
4514 /* Loop through the rows in the result-set */
4515
4516 int count = 0;
4517
4518 while (1) {
4519 status = fSql->Step();
4520 if (status != DB_SUCCESS)
4521 break;
4522
4523 count++;
4524
4525 time_t t = fSql->GetTime(0);
4526
4527 if (t < start_time || t > end_time)
4528 continue;
4529
4530 int k = 0;
4531
4532 for (int i=0; i<num_var; i++) {
4533 int j = var_schema_index[i];
4534 if (j < 0)
4535 continue;
4536
4537 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
4538 bad_last_time = true;
4539 } else {
4540 double v = fSql->GetDouble(1+k);
4541
4542 //printf("Column %d, index %d, Row %d, time %d, value %f\n", k, colindex[k], count, t, v);
4543
4544 buffer[i]->Add(t, v);
4545 last_time[i] = t;
4546 }
4547
4548 k++;
4549 }
4550 }
4551
4552 fSql->Finalize();
4553
4554 if (bad_last_time) {
4555 cm_msg(MERROR, "SqlHistory::read_data", "Detected duplicate or non-monotonous data in table \"%s\" for start time %s and end time %s", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4556 }
4557
4558 if (debug)
4559 printf("SqlHistory::read_data: table [%s], start %s, end %s, read %d rows\n", fTableName.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), count);
4560
4561 return HS_SUCCESS;
4562}
4563
4565 if (!fSql || fSql->fTransactionPerTable) {
4567 } else {
4568 return gfTransactionCount[fSql];
4569 }
4570}
4571
4573 if (!fSql || fSql->fTransactionPerTable) {
4575 } else {
4577 }
4578}
4579
4587
4589// SQL history functions //
4591
4592static int StartSqlTransaction(SqlBase* sql, const char* table_name, bool* have_transaction)
4593{
4594 if (*have_transaction)
4595 return HS_SUCCESS;
4596
4597 int status = sql->OpenTransaction(table_name);
4598 if (status != DB_SUCCESS)
4599 return HS_FILE_ERROR;
4600
4601 *have_transaction = true;
4602 return HS_SUCCESS;
4603}
4604
4605static int CreateSqlTable(SqlBase* sql, const char* table_name, bool* have_transaction, bool set_default_timestamp = false)
4606{
4607 int status;
4608
4610 if (status != DB_SUCCESS)
4611 return HS_FILE_ERROR;
4612
4613 std::string cmd;
4614
4615 cmd = "CREATE TABLE ";
4616 cmd += sql->QuoteId(table_name);
4618 cmd += " (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4619 } else {
4620 cmd += " (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4621 }
4622
4623 status = sql->Exec(table_name, cmd.c_str());
4624
4625
4626 if (status == DB_KEY_EXIST) {
4627 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", but it already exists", table_name);
4629 return status;
4630 }
4631
4632 if (status != DB_SUCCESS) {
4633 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4635 return HS_FILE_ERROR;
4636 }
4637
4638 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\"", table_name);
4640
4641 std::string i_index_name;
4642 i_index_name = table_name;
4643 i_index_name += "_i_time_index";
4644
4645 std::string t_index_name;
4646 t_index_name = table_name;
4647 t_index_name += "_t_time_index";
4648
4649 cmd = "CREATE INDEX ";
4650 cmd += sql->QuoteId(i_index_name.c_str());
4651 cmd += " ON ";
4652 cmd += sql->QuoteId(table_name);
4653 cmd += " (_i_time ASC);";
4654
4655 status = sql->Exec(table_name, cmd.c_str());
4656 if (status != DB_SUCCESS)
4657 return HS_FILE_ERROR;
4658
4659 cmd = "CREATE INDEX ";
4660 cmd += sql->QuoteId(t_index_name.c_str());
4661 cmd += " ON ";
4662 cmd += sql->QuoteId(table_name);
4663 cmd += " (_t_time);";
4664
4665 status = sql->Exec(table_name, cmd.c_str());
4666 if (status != DB_SUCCESS)
4667 return HS_FILE_ERROR;
4668
4669 return status;
4670}
4671
4672static int CreateSqlHyperTable(SqlBase* sql, const char* table_name, bool* have_transaction) {
4673 int status;
4674
4676 if (status != DB_SUCCESS)
4677 return HS_FILE_ERROR;
4678
4679 std::string cmd;
4680
4681 cmd = "CREATE TABLE ";
4682 cmd += sql->QuoteId(table_name);
4683 cmd += " (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4684
4685 status = sql->Exec(table_name, cmd.c_str());
4686
4687 if (status == DB_KEY_EXIST) {
4688 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", but it already exists", table_name);
4690 return status;
4691 }
4692
4693 if (status != DB_SUCCESS) {
4694 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4696 return HS_FILE_ERROR;
4697 }
4698
4699 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\"", table_name);
4701
4702 cmd = "SELECT create_hypertable(";
4703 cmd += sql->QuoteString(table_name);
4704 cmd += ", '_t_time');";
4705
4706 // convert regular table to hypertable
4707 status = sql->Exec(table_name, cmd.c_str());
4708
4709 if (status != DB_SUCCESS) {
4710 cm_msg(MINFO, "CreateSqlHyperTable", "Converting SQL table to hypertable \"%s\", error status %d", table_name, status);
4712 return HS_FILE_ERROR;
4713 }
4714
4715 std::string i_index_name;
4716 i_index_name = table_name;
4717 i_index_name += "_i_time_index";
4718
4719 std::string t_index_name;
4720 t_index_name = table_name;
4721 t_index_name += "_t_time_index";
4722
4723 cmd = "CREATE INDEX ";
4724 cmd += sql->QuoteId(i_index_name.c_str());
4725 cmd += " ON ";
4726 cmd += sql->QuoteId(table_name);
4727 cmd += " (_i_time ASC);";
4728
4729 status = sql->Exec(table_name, cmd.c_str());
4730 if (status != DB_SUCCESS)
4731 return HS_FILE_ERROR;
4732
4733 cmd = "CREATE INDEX ";
4734 cmd += sql->QuoteId(t_index_name.c_str());
4735 cmd += " ON ";
4736 cmd += sql->QuoteId(table_name);
4737 cmd += " (_t_time);";
4738
4739 status = sql->Exec(table_name, cmd.c_str());
4740 if (status != DB_SUCCESS)
4741 return HS_FILE_ERROR;
4742
4743 return status;
4744}
4745
4746static int CreateSqlColumn(SqlBase* sql, const char* table_name, const char* column_name, const char* column_type, bool* have_transaction, int debug)
4747{
4748 if (debug)
4749 printf("CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4750
4751 int status = StartSqlTransaction(sql, table_name, have_transaction);
4752 if (status != HS_SUCCESS)
4753 return status;
4754
4755 std::string cmd;
4756 cmd = "ALTER TABLE ";
4757 cmd += sql->QuoteId(table_name);
4758 cmd += " ADD COLUMN ";
4759 cmd += sql->QuoteId(column_name);
4760 cmd += " ";
4761 cmd += column_type;
4762 cmd += ";";
4763
4764 status = sql->Exec(table_name, cmd.c_str());
4765
4766 cm_msg(MINFO, "CreateSqlColumn", "Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name, status);
4768
4769 return status;
4770}
4771
4773// SQL history base classes //
4775
4777{
4778public:
4780
4782 {
4783 fSql = NULL;
4785 }
4786
4787 virtual ~SqlHistoryBase() // dtor
4788 {
4789 hs_disconnect();
4790 if (fSql)
4791 delete fSql;
4792 fSql = NULL;
4793 }
4794
4796 {
4797 if (fSql)
4798 fSql->fDebug = debug;
4800 }
4801
4802 int hs_connect(const char* connect_string);
4803 int hs_disconnect();
4804 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
4805 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
4806 HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s) { return s; };
4807
4808
4809protected:
4811 virtual int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name) = 0;
4812 virtual int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp) = 0;
4813 virtual int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction) = 0;
4814
4815 int update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable);
4816 int update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction);
4817};
4818
4820{
4821 if (fDebug)
4822 printf("hs_connect [%s]!\n", connect_string);
4823
4824 assert(fSql);
4825
4826 if (fSql->IsConnected())
4827 if (strcmp(fConnectString.c_str(), connect_string) == 0)
4828 return HS_SUCCESS;
4829
4830 hs_disconnect();
4831
4832 if (!connect_string || strlen(connect_string) < 1) {
4833 // FIXME: should use "logger dir" or some such default, that code should be in hs_get_history(), not here
4834 connect_string = ".";
4835 }
4836
4838
4839 if (fDebug)
4840 printf("hs_connect: connecting to SQL database \'%s\'\n", fConnectString.c_str());
4841
4842 int status = fSql->Connect(fConnectString.c_str());
4843 if (status != DB_SUCCESS)
4844 return status;
4845
4846 return HS_SUCCESS;
4847}
4848
4850{
4851 if (fDebug)
4852 printf("hs_disconnect!\n");
4853
4855
4856 fSql->Disconnect();
4857
4859
4860 return HS_SUCCESS;
4861}
4862
4863HsSchema* SqlHistoryBase::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
4864{
4865 if (fDebug)
4866 printf("SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
4867
4868 int status;
4869
4870 if (fWriterSchema.size() == 0) {
4872 if (status != HS_SUCCESS)
4873 return NULL;
4874 }
4875
4876 HsSqlSchema* s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4877
4878 // schema does not exist, the SQL tables probably do not exist yet
4879
4880 if (!s) {
4881 status = create_table(&fWriterSchema, event_name, timestamp);
4882 if (status != HS_SUCCESS)
4883 return NULL;
4884
4885 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4886
4887 if (!s) {
4888 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4889 fWriterSchema.find_event(event_name, timestamp, 1);
4890 return NULL;
4891 }
4892 }
4893
4894 assert(s != NULL);
4895
4896 status = read_column_names(&fWriterSchema, s->fTableName.c_str(), s->fEventName.c_str());
4897 if (status != HS_SUCCESS)
4898 return NULL;
4899
4900 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4901
4902 if (!s) {
4903 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4904 return NULL;
4905 }
4906
4907 if (0||fDebug) {
4908 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4909 if (s)
4910 s->print();
4911 }
4912
4913 status = update_schema(s, timestamp, ntags, tags, true);
4914 if (status != HS_SUCCESS) {
4915 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4916 return NULL;
4917 }
4918
4919 status = read_column_names(&fWriterSchema, s->fTableName.c_str(), s->fEventName.c_str());
4920 if (status != HS_SUCCESS)
4921 return NULL;
4922
4923 s = (HsSqlSchema*)fWriterSchema.find_event(event_name, timestamp);
4924
4925 if (!s) {
4926 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4927 return NULL;
4928 }
4929
4930 if (0||fDebug) {
4931 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4932 if (s)
4933 s->print();
4934 }
4935
4936 // last call to UpdateMysqlSchema with "false" will check that new schema matches the new tags
4937
4938 status = update_schema(s, timestamp, ntags, tags, false);
4939 if (status != HS_SUCCESS) {
4940 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4941 //fDebug = 1;
4942 //update_schema(s, timestamp, ntags, tags, false);
4943 //abort();
4944 return NULL;
4945 }
4946
4947 HsSqlSchema* e = new HsSqlSchema();
4948
4949 *e = *s; // make a copy of the schema
4950
4951 return e;
4952}
4953
4954int SqlHistoryBase::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
4955{
4956 if (fDebug)
4957 printf("SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
4958
4959 int status;
4960
4961 if (sv->size() == 0) {
4963 if (status != HS_SUCCESS)
4964 return status;
4965 }
4966
4967 //sv->print(false);
4968
4969 if (event_name == NULL)
4970 return HS_SUCCESS;
4971
4972 for (size_t i=0; i<sv->size(); i++) {
4973 HsSqlSchema* h = (HsSqlSchema*)(*sv)[i];
4974 // skip schema with already read column names
4975 if (h->fVariables.size() > 0)
4976 continue;
4977 // skip schema with different name
4978 if (!MatchEventName(h->fEventName.c_str(), event_name))
4979 continue;
4980
4981 size_t nn = sv->size();
4982
4983 status = read_column_names(sv, h->fTableName.c_str(), h->fEventName.c_str());
4984
4985 // if new schema was added, loop all over again
4986 if (sv->size() != nn)
4987 i=0;
4988 }
4989
4990 //sv->print(false);
4991
4992 return HS_SUCCESS;
4993}
4994
4995int SqlHistoryBase::update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
4996{
4997 int status;
4998 bool have_transaction = false;
4999
5000 status = update_schema1(s, timestamp, ntags, tags, write_enable, &have_transaction);
5001
5002 if (have_transaction) {
5003 int xstatus;
5004
5005 if (status == HS_SUCCESS)
5007 else
5009
5010 if (xstatus != DB_SUCCESS) {
5011 return HS_FILE_ERROR;
5012 }
5013 have_transaction = false;
5014 }
5015
5016 return status;
5017}
5018
5019int SqlHistoryBase::update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction)
5020{
5021 int status;
5022
5023 if (fDebug)
5024 printf("update_schema1\n");
5025
5026 // check that compare schema with tags[]
5027
5028 bool schema_ok = true;
5029
5030 int offset = 0;
5031 for (int i=0; i<ntags; i++) {
5032 for (unsigned int j=0; j<tags[i].n_data; j++) {
5033 int tagtype = tags[i].type;
5034 std::string tagname = tags[i].name;
5035 std::string maybe_colname = MidasNameToSqlName(tags[i].name);
5036
5037 if (tags[i].n_data > 1) {
5038 char s[256];
5039 sprintf(s, "[%d]", j);
5040 tagname += s;
5041
5042 sprintf(s, "_%d", j);
5043 maybe_colname += s;
5044 }
5045
5046 int count = 0;
5047
5048 for (size_t j=0; j<s->fVariables.size(); j++) {
5049 // NB: inactive columns will be reactivated or recreated by the if(count==0) branch. K.O.
5050 if (s->fColumnInactive[j])
5051 continue;
5052 if (tagname == s->fVariables[j].name) {
5053 if (s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str())) {
5054 if (count == 0) {
5055 s->fOffsets[j] = offset;
5057 }
5058 count++;
5059 if (count > 1) {
5060 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
5062 }
5063 } else {
5064 // column with incompatible type, mark it as unused
5065 schema_ok = false;
5066 if (fDebug)
5067 printf("Incompatible column!\n");
5068 if (write_enable) {
5069 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" as incompatible with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), rpc_tid_name(tagtype), s->fEventName.c_str(), tagname.c_str());
5071
5072 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[i].tag_type.c_str(), timestamp, false, have_transaction);
5073 if (status != HS_SUCCESS)
5074 return status;
5075 }
5076 }
5077 }
5078 }
5079
5080 if (count == 0) {
5081 // tag does not have a corresponding column
5082 schema_ok = false;
5083 if (fDebug)
5084 printf("No column for tag %s!\n", tagname.c_str());
5085
5086 bool found_column = false;
5087
5088 if (write_enable) {
5089 for (size_t j=0; j<s->fVariables.size(); j++) {
5090 if (tagname == s->fVariables[j].tag_name) {
5091 bool typeok = s->fSql->TypesCompatible(tagtype, s->fColumnTypes[j].c_str());
5092 if (typeok) {
5093 cm_msg(MINFO, "SqlHistory::update_schema", "Reactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5095
5096 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fVariables[j].tag_name.c_str(), s->fVariables[j].tag_type.c_str(), timestamp, true, have_transaction);
5097 if (status != HS_SUCCESS)
5098 return status;
5099
5100 if (count == 0) {
5101 s->fOffsets[j] = offset;
5103 }
5104 count++;
5105 found_column = true;
5106 if (count > 1) {
5107 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->fColumnNames[j].c_str(), s->fColumnTypes[j].c_str(), s->fTableName.c_str(), s->fEventName.c_str(), tagname.c_str());
5109 }
5110 }
5111 }
5112 }
5113 }
5114
5115 // create column
5116 if (!found_column && write_enable) {
5117 std::string col_name = maybe_colname;
5118 const char* col_type = s->fSql->ColumnType(tagtype);
5119
5120 bool dupe = false;
5121 for (size_t kk=0; kk<s->fColumnNames.size(); kk++)
5122 if (s->fColumnNames[kk] == col_name) {
5123 dupe = true;
5124 break;
5125 }
5126
5127 time_t now = time(NULL);
5128
5129 bool retry = false;
5130 for (int t=0; t<20; t++) {
5131
5132 // if duplicate column name, change it, try again
5133 if (dupe || retry) {
5135 col_name += "_";
5137 if (t > 0) {
5138 char s[256];
5139 sprintf(s, "_%d", t);
5140 col_name += s;
5141 }
5142 }
5143
5144 if (fDebug)
5145 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5146
5148
5149 if (status == DB_KEY_EXIST) {
5150 if (fDebug)
5151 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5152 retry = true;
5153 continue;
5154 }
5155
5156 if (status != HS_SUCCESS)
5157 return status;
5158
5159 break;
5160 }
5161
5162 if (status != HS_SUCCESS)
5163 return status;
5164
5165 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str(), rpc_tid_name(tagtype), timestamp, true, have_transaction);
5166 if (status != HS_SUCCESS)
5167 return status;
5168 }
5169 }
5170
5171 if (count > 1) {
5172 // schema has duplicate tags
5173 schema_ok = false;
5174 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->fEventName.c_str(), tagname.c_str());
5176 }
5177 }
5178 }
5179
5180 // mark as unused all columns not listed in tags
5181
5182 for (size_t k=0; k<s->fColumnNames.size(); k++)
5183 if (s->fVariables[k].name.length() > 0) {
5184 bool found = false;
5185
5186 for (int i=0; i<ntags; i++) {
5187 for (unsigned int j=0; j<tags[i].n_data; j++) {
5188 std::string tagname = tags[i].name;
5189
5190 if (tags[i].n_data > 1) {
5191 char s[256];
5192 sprintf(s, "[%d]", j);
5193 tagname += s;
5194 }
5195
5196 if (s->fVariables[k].name == tagname) {
5197 found = true;
5198 break;
5199 }
5200 }
5201
5202 if (found)
5203 break;
5204 }
5205
5206 if (!found) {
5207 // column not found in tags list
5208 schema_ok = false;
5209 if (fDebug)
5210 printf("Event [%s] Column [%s] tag [%s] not listed in tags list!\n", s->fEventName.c_str(), s->fColumnNames[k].c_str(), s->fVariables[k].name.c_str());
5211 if (write_enable) {
5212 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" not used for any tags", s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fTableName.c_str(), s->fEventName.c_str());
5214
5215 status = update_column(s->fEventName.c_str(), s->fTableName.c_str(), s->fColumnNames[k].c_str(), s->fColumnTypes[k].c_str(), s->fVariables[k].tag_name.c_str(), s->fVariables[k].tag_type.c_str(), timestamp, false, have_transaction);
5216 if (status != HS_SUCCESS)
5217 return status;
5218 }
5219 }
5220 }
5221
5222 if (!write_enable)
5223 if (!schema_ok) {
5224 if (fDebug)
5225 printf("Return error!\n");
5226 return HS_FILE_ERROR;
5227 }
5228
5229 return HS_SUCCESS;
5230}
5231
5233// SQLITE functions //
5235
5236static int ReadSqliteTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5237{
5238 if (debug)
5239 printf("ReadSqliteTableNames: table [%s]\n", table_name);
5240
5241 int status;
5242 std::string cmd;
5243
5244 // FIXME: quotes
5245 cmd = "SELECT event_name, _i_time FROM \'_event_name_";
5246 cmd += table_name;
5247 cmd += "\' WHERE table_name='";
5248 cmd += table_name;
5249 cmd += "';";
5250
5251 status = sql->Prepare(table_name, cmd.c_str());
5252
5253 if (status != DB_SUCCESS)
5254 return status;
5255
5256 while (1) {
5257 status = sql->Step();
5258
5259 if (status != DB_SUCCESS)
5260 break;
5261
5262 std::string xevent_name = sql->GetText(0);
5263 time_t xevent_time = sql->GetTime(1);
5264
5265 //printf("read event name [%s] time %s\n", xevent_name.c_str(), TimeToString(xevent_time).c_str());
5266
5267 HsSqlSchema* s = new HsSqlSchema;
5268 s->fSql = sql;
5271 s->fTimeTo = 0;
5272 s->fTableName = table_name;
5273 sv->add(s);
5274 }
5275
5276 status = sql->Finalize();
5277
5278 return HS_SUCCESS;
5279}
5280
5281static int ReadSqliteTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5282{
5283 if (debug)
5284 printf("ReadSqliteTableSchema: table [%s]\n", table_name);
5285
5286 if (1) {
5287 // seed schema with table names
5288 HsSqlSchema* s = new HsSqlSchema;
5289 s->fSql = sql;
5290 s->fEventName = table_name;
5291 s->fTimeFrom = 0;
5292 s->fTimeTo = 0;
5293 s->fTableName = table_name;
5294 sv->add(s);
5295 }
5296
5297 return ReadSqliteTableNames(sql, sv, table_name, debug);
5298}
5299
5301// SQLITE history classes //
5303
5305{
5306public:
5307 SqliteHistory() { // ctor
5308#ifdef HAVE_SQLITE
5309 fSql = new Sqlite();
5310#endif
5311 }
5312
5314 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5315 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5316 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5317};
5318
5320{
5321 int status;
5322
5323 if (fDebug)
5324 printf("SqliteHistory::read_table_and_event_names!\n");
5325
5326 // loop over all tables
5327
5328 std::vector<std::string> tables;
5330 if (status != DB_SUCCESS)
5331 return status;
5332
5333 for (size_t i=0; i<tables.size(); i++) {
5334 const char* table_name = tables[i].c_str();
5335
5336 const char* s;
5337 s = strstr(table_name, "_event_name_");
5338 if (s == table_name)
5339 continue;
5340 s = strstr(table_name, "_column_names_");
5341 if (s == table_name)
5342 continue;
5343
5344 status = ReadSqliteTableSchema(fSql, sv, table_name, fDebug);
5345 }
5346
5347 return HS_SUCCESS;
5348}
5349
5350int SqliteHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5351{
5352 if (fDebug)
5353 printf("SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5354
5355 // for all schema for table_name, prepopulate is with column names
5356
5357 std::vector<std::string> columns;
5358 fSql->ListColumns(table_name, &columns);
5359
5360 // first, populate column names
5361
5362 for (size_t i=0; i<sv->size(); i++) {
5363 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5364
5365 if (s->fTableName != table_name)
5366 continue;
5367
5368 // schema should be empty at this point
5369 //assert(s->fVariables.size() == 0);
5370
5371 for (size_t j=0; j<columns.size(); j+=2) {
5372 const char* cn = columns[j+0].c_str();
5373 const char* ct = columns[j+1].c_str();
5374
5375 if (strcmp(cn, "_t_time") == 0)
5376 continue;
5377 if (strcmp(cn, "_i_time") == 0)
5378 continue;
5379
5380 bool found = false;
5381
5382 for (size_t k=0; k<s->fColumnNames.size(); k++) {
5383 if (s->fColumnNames[k] == cn) {
5384 found = true;
5385 break;
5386 }
5387 }
5388
5389 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5390
5391 if (!found) {
5393 se.name = cn;
5394 se.type = 0;
5395 se.n_data = 1;
5396 se.n_bytes = 0;
5397 s->fVariables.push_back(se);
5398 s->fColumnNames.push_back(cn);
5399 s->fColumnTypes.push_back(ct);
5400 s->fColumnInactive.push_back(false);
5401 s->fOffsets.push_back(-1);
5402 }
5403 }
5404 }
5405
5406 // then read column name information
5407
5408 std::string tn;
5409 tn += "_column_names_";
5410 tn += table_name;
5411
5412 std::string cmd;
5413 cmd = "SELECT column_name, tag_name, tag_type, _i_time FROM ";
5414 cmd += fSql->QuoteId(tn.c_str());
5415 cmd += " WHERE table_name=";
5416 cmd += fSql->QuoteString(table_name);
5417 cmd += " ORDER BY _i_time ASC;";
5418
5419 int status = fSql->Prepare(table_name, cmd.c_str());
5420
5421 if (status != DB_SUCCESS) {
5422 return status;
5423 }
5424
5425 while (1) {
5426 status = fSql->Step();
5427
5428 if (status != DB_SUCCESS)
5429 break;
5430
5431 // NOTE: SQL "SELECT ORDER BY _i_time ASC" returns data sorted by time
5432 // in this code we use the data from the last data row
5433 // so if multiple rows are present, the latest one is used
5434
5435 std::string col_name = fSql->GetText(0);
5436 std::string tag_name = fSql->GetText(1);
5437 std::string tag_type = fSql->GetText(2);
5439
5440 //printf("read table [%s] column [%s] tag name [%s] time %s\n", table_name, col_name.c_str(), tag_name.c_str(), TimeToString(xxx_time).c_str());
5441
5442 // make sure a schema exists at this time point
5443 NewSqlSchema(sv, table_name, schema_time);
5444
5445 // add this information to all schema
5446
5447 for (size_t i=0; i<sv->size(); i++) {
5448 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5449 if (s->fTableName != table_name)
5450 continue;
5451 if (s->fTimeFrom < schema_time)
5452 continue;
5453
5454 //printf("add column to schema %d\n", s->fTimeFrom);
5455
5456 for (size_t j=0; j<s->fColumnNames.size(); j++) {
5457 if (col_name != s->fColumnNames[j])
5458 continue;
5459 s->fVariables[j].name = tag_name;
5460 s->fVariables[j].type = rpc_name_tid(tag_type.c_str());
5461 s->fVariables[j].n_data = 1;
5462 s->fVariables[j].n_bytes = rpc_tid_size(s->fVariables[j].type);
5463 }
5464 }
5465 }
5466
5467 status = fSql->Finalize();
5468
5469 return HS_SUCCESS;
5470}
5471
5472int SqliteHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5473{
5474 if (fDebug)
5475 printf("SqliteHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5476
5477 int status;
5478 bool have_transaction = false;
5479 std::string table_name = MidasNameToSqlName(event_name);
5480
5481 // FIXME: what about duplicate table names?
5482 status = CreateSqlTable(fSql, table_name.c_str(), &have_transaction);
5483
5484 //if (status == DB_KEY_EXIST) {
5485 // return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5486 //}
5487
5488 if (status != HS_SUCCESS) {
5489 // FIXME: ???
5490 // FIXME: at least close or revert the transaction
5491 return status;
5492 }
5493
5494 std::string cmd;
5495
5496 std::string en;
5497 en += "_event_name_";
5498 en += table_name;
5499
5500 cmd = "CREATE TABLE ";
5501 cmd += fSql->QuoteId(en.c_str());
5502 cmd += " (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5503
5504 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5505
5506 cmd = "INSERT INTO ";
5507 cmd += fSql->QuoteId(en.c_str());
5508 cmd += " (table_name, event_name, _i_time) VALUES (";
5509 cmd += fSql->QuoteString(table_name.c_str());
5510 cmd += ", ";
5511 cmd += fSql->QuoteString(event_name);
5512 cmd += ", ";
5513 cmd += fSql->QuoteString(TimeToString(timestamp).c_str());
5514 cmd += ");";
5515
5516 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5517
5518 std::string cn;
5519 cn += "_column_names_";
5520 cn += table_name;
5521
5522 cmd = "CREATE TABLE ";
5523 cmd += fSql->QuoteId(cn.c_str());
5524 cmd += " (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5525
5526 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5527
5528 status = fSql->CommitTransaction(table_name.c_str());
5529 if (status != DB_SUCCESS) {
5530 return HS_FILE_ERROR;
5531 }
5532
5533 return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5534}
5535
5536int SqliteHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5537{
5538 if (fDebug)
5539 printf("SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name, TimeToString(timestamp).c_str());
5540
5541 int status = StartSqlTransaction(fSql, table_name, have_transaction);
5542 if (status != HS_SUCCESS)
5543 return status;
5544
5545 // FIXME: quotes
5546 std::string cmd;
5547 cmd = "INSERT INTO \'_column_names_";
5548 cmd += table_name;
5549 cmd += "\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5550 cmd += table_name;
5551 cmd += "\', \'";
5552 cmd += column_name;
5553 cmd += "\', \'";
5554 cmd += tag_name;
5555 cmd += "\', \'";
5556 cmd += tag_type;
5557 cmd += "\', \'";
5558 cmd += column_type;
5559 cmd += "\', \'";
5560 cmd += TimeToString(timestamp);
5561 cmd += "\');";
5562 status = fSql->Exec(table_name, cmd.c_str());
5563
5564 return status;
5565}
5566
5568// Mysql history classes //
5570
5572{
5573public:
5574 MysqlHistory() { // ctor
5575#ifdef HAVE_MYSQL
5576 fSql = new Mysql();
5577#endif
5578 }
5579
5581 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5582 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5583 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5584};
5585
5586static int ReadMysqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5587{
5588 if (debug)
5589 printf("ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5590
5591 int status;
5592 std::string cmd;
5593
5594 if (table_name) {
5595 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5596 cmd += table_name;
5597 cmd += "';";
5598 } else {
5599 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5600 table_name = "_history_index";
5601 }
5602
5603 status = sql->Prepare(table_name, cmd.c_str());
5604
5605 if (status != DB_SUCCESS)
5606 return status;
5607
5608 bool found_must_have_table = false;
5609 int count = 0;
5610
5611 while (1) {
5612 status = sql->Step();
5613
5614 if (status != DB_SUCCESS)
5615 break;
5616
5617 const char* xevent_name = sql->GetText(0);
5618 const char* xtable_name = sql->GetText(1);
5619 time_t xevent_time = sql->GetTime(2);
5620
5621 if (debug == 999) {
5622 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5623 }
5624
5626 assert(must_have_event_name != NULL);
5628 found_must_have_table = true;
5629 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5630 } else {
5631 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5632 }
5633 }
5634
5635 HsSqlSchema* s = new HsSqlSchema;
5636 s->fSql = sql;
5639 s->fTimeTo = 0;
5641 sv->add(s);
5642 count++;
5643 }
5644
5645 status = sql->Finalize();
5646
5648 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5649 if (debug == 999)
5650 return HS_FILE_ERROR;
5651 // NB: recursion is broken by setting debug to 999.
5653 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5655 abort();
5656 return HS_FILE_ERROR;
5657 }
5658
5659 if (0) {
5660 // print accumulated schema
5661 printf("ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5662 sv->print(false);
5663 }
5664
5665 return HS_SUCCESS;
5666}
5667
5668int MysqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5669{
5670 if (fDebug)
5671 printf("MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5672
5673 // for all schema for table_name, prepopulate is with column names
5674
5675 std::vector<std::string> columns;
5676 fSql->ListColumns(table_name, &columns);
5677
5678 // first, populate column names
5679
5680 for (size_t i=0; i<sv->size(); i++) {
5681 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5682
5683 if (s->fTableName != table_name)
5684 continue;
5685
5686 // schema should be empty at this point
5687 //assert(s->fVariables.size() == 0);
5688
5689 for (size_t j=0; j<columns.size(); j+=2) {
5690 const char* cn = columns[j+0].c_str();
5691 const char* ct = columns[j+1].c_str();
5692
5693 if (strcmp(cn, "_t_time") == 0)
5694 continue;
5695 if (strcmp(cn, "_i_time") == 0)
5696 continue;
5697
5698 bool found = false;
5699
5700 for (size_t k=0; k<s->fColumnNames.size(); k++) {
5701 if (s->fColumnNames[k] == cn) {
5702 found = true;
5703 break;
5704 }
5705 }
5706
5707 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5708
5709 if (!found) {
5711 se.tag_name = cn;
5712 se.tag_type = "";
5713 se.name = cn;
5714 se.type = 0;
5715 se.n_data = 1;
5716 se.n_bytes = 0;
5717 s->fVariables.push_back(se);
5718 s->fColumnNames.push_back(cn);
5719 s->fColumnTypes.push_back(ct);
5720 s->fColumnInactive.push_back(false);
5721 s->fOffsets.push_back(-1);
5722 }
5723 }
5724 }
5725
5726 // then read column name information
5727
5728 std::string cmd;
5729 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5730 cmd += fSql->QuoteString(event_name);
5731 cmd += ";";
5732
5733 int status = fSql->Prepare(table_name, cmd.c_str());
5734
5735 if (status != DB_SUCCESS) {
5736 return status;
5737 }
5738
5739 while (1) {
5740 status = fSql->Step();
5741
5742 if (status != DB_SUCCESS)
5743 break;
5744
5745 const char* col_name = fSql->GetText(0);
5746 const char* col_type = fSql->GetText(1);
5747 const char* tag_name = fSql->GetText(2);
5748 const char* tag_type = fSql->GetText(3);
5750 const char* active = fSql->GetText(5);
5751 int iactive = atoi(active);
5752
5753 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
5754
5755 if (!col_name)
5756 continue;
5757 if (!tag_name)
5758 continue;
5759 if (strlen(col_name) < 1)
5760 continue;
5761
5762 // make sure a schema exists at this time point
5763 NewSqlSchema(sv, table_name, schema_time);
5764
5765 // add this information to all schema
5766
5767 for (size_t i=0; i<sv->size(); i++) {
5768 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5769 if (s->fTableName != table_name)
5770 continue;
5771 if (s->fTimeFrom < schema_time)
5772 continue;
5773
5774 int tid = rpc_name_tid(tag_type);
5775 int tid_size = rpc_tid_size(tid);
5776
5777 for (size_t j=0; j<s->fColumnNames.size(); j++) {
5778 if (col_name != s->fColumnNames[j])
5779 continue;
5780
5781 s->fVariables[j].tag_name = tag_name;
5782 s->fVariables[j].tag_type = tag_type;
5783 if (!iactive) {
5784 s->fVariables[j].name = "";
5785 s->fColumnInactive[j] = true;
5786 } else {
5787 s->fVariables[j].name = tag_name;
5788 s->fColumnInactive[j] = false;
5789 }
5790 s->fVariables[j].type = tid;
5791 s->fVariables[j].n_data = 1;
5792 s->fVariables[j].n_bytes = tid_size;
5793
5794 // doctor column names in case MySQL returns different type
5795 // from the type used to create the column, but the types
5796 // are actually the same. K.O.
5798 }
5799 }
5800 }
5801
5802 status = fSql->Finalize();
5803
5804 return HS_SUCCESS;
5805}
5806
5807#if 0
5808static int ReadMysqlTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5809{
5810 if (debug)
5811 printf("ReadMysqlTableSchema: table [%s]\n", table_name);
5812
5813 if (1) {
5814 // seed schema with table names
5815 HsSqlSchema* s = new HsSqlSchema;
5816 s->fSql = sql;
5817 s->fEventName = table_name;
5818 s->fTimeFrom = 0;
5819 s->fTimeTo = 0;
5820 s->fTableName = table_name;
5821 sv->add(s);
5822 }
5823
5824 return ReadMysqlTableNames(sql, sv, table_name, debug, NULL, NULL);
5825}
5826#endif
5827
5829{
5830 int status;
5831
5832 if (fDebug)
5833 printf("MysqlHistory::read_table_and_event_names!\n");
5834
5835 // loop over all tables
5836
5837 std::vector<std::string> tables;
5839 if (status != DB_SUCCESS)
5840 return status;
5841
5842 for (size_t i=0; i<tables.size(); i++) {
5843 const char* table_name = tables[i].c_str();
5844
5845 const char* s;
5846 s = strstr(table_name, "_history_index");
5847 if (s == table_name)
5848 continue;
5849
5850 if (1) {
5851 // seed schema with table names
5852 HsSqlSchema* s = new HsSqlSchema;
5853 s->fSql = fSql;
5854 s->fEventName = table_name;
5855 s->fTimeFrom = 0;
5856 s->fTimeTo = 0;
5857 s->fTableName = table_name;
5858 sv->add(s);
5859 }
5860 }
5861
5862 if (0) {
5863 // print accumulated schema
5864 printf("read_table_and_event_names:\n");
5865 sv->print(false);
5866 }
5867
5869
5870 return HS_SUCCESS;
5871}
5872
5873int MysqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5874{
5875 if (fDebug)
5876 printf("MysqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5877
5878 int status;
5879 std::string table_name = MidasNameToSqlName(event_name);
5880
5881 // MySQL table name length limit is 64 bytes
5882 if (table_name.length() > 40) {
5883 table_name.resize(40);
5884 table_name += "_T";
5885 }
5886
5887 time_t now = time(NULL);
5888
5889 int max_attempts = 10;
5890 for (int i=0; i<max_attempts; i++) {
5891 status = fSql->OpenTransaction(table_name.c_str());
5892 if (status != DB_SUCCESS) {
5893 return HS_FILE_ERROR;
5894 }
5895
5896 bool have_transaction = true;
5897
5898 std::string xtable_name = table_name;
5899
5900 if (i>0) {
5901 xtable_name += "_";
5903 if (i>1) {
5904 xtable_name += "_";
5905 char buf[256];
5906 sprintf(buf, "%d", i);
5907 xtable_name += buf;
5908 }
5909 }
5910
5912
5913 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
5914
5915 if (status == DB_KEY_EXIST) {
5916 // already exists, try with different name!
5917 fSql->RollbackTransaction(table_name.c_str());
5918 continue;
5919 }
5920
5921 if (status != HS_SUCCESS) {
5922 // MYSQL cannot roll back "create table", if we cannot create SQL tables, nothing will work. Give up now.
5923 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name, TimeToString(timestamp).c_str());
5924 abort();
5925
5926 // fatal error, give up!
5927 fSql->RollbackTransaction(table_name.c_str());
5928 break;
5929 }
5930
5931 for (int j=0; j<2; j++) {
5932 std::string cmd;
5933 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5934 cmd += fSql->QuoteString(event_name);
5935 cmd += ", ";
5936 cmd += fSql->QuoteString(xtable_name.c_str());
5937 cmd += ", ";
5938 char buf[256];
5939 sprintf(buf, "%.0f", (double)timestamp);
5940 cmd += fSql->QuoteString(buf);
5941 cmd += ", ";
5942 cmd += fSql->QuoteString("1");
5943 cmd += ");";
5944
5945 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
5946 if (status == DB_SUCCESS)
5947 break;
5948
5949 status = CreateSqlTable(fSql, "_history_index", &have_transaction);
5950 status = CreateSqlColumn(fSql, "_history_index", "event_name", "varchar(256) character set binary not null", &have_transaction, fDebug);
5951 status = CreateSqlColumn(fSql, "_history_index", "table_name", "varchar(256)", &have_transaction, fDebug);
5952 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "varchar(256) character set binary", &have_transaction, fDebug);
5953 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "varchar(256)", &have_transaction, fDebug);
5954 status = CreateSqlColumn(fSql, "_history_index", "column_name", "varchar(256)", &have_transaction, fDebug);
5955 status = CreateSqlColumn(fSql, "_history_index", "column_type", "varchar(256)", &have_transaction, fDebug);
5956 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
5957 status = CreateSqlColumn(fSql, "_history_index", "active", "boolean", &have_transaction, fDebug);
5958 }
5959
5960 status = fSql->CommitTransaction(table_name.c_str());
5961
5962 if (status != DB_SUCCESS) {
5963 return HS_FILE_ERROR;
5964 }
5965
5966 return ReadMysqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
5967 }
5968
5969 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
5970
5971 return HS_FILE_ERROR;
5972}
5973
5974int MysqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5975{
5976 if (fDebug)
5977 printf("MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
5978
5979 std::string cmd;
5980 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5981 cmd += fSql->QuoteString(event_name);
5982 cmd += ", ";
5983 cmd += fSql->QuoteString(table_name);
5984 cmd += ", ";
5985 cmd += fSql->QuoteString(tag_name);
5986 cmd += ", ";
5987 cmd += fSql->QuoteString(tag_type);
5988 cmd += ", ";
5989 cmd += fSql->QuoteString(column_name);
5990 cmd += ", ";
5991 cmd += fSql->QuoteString(column_type);
5992 cmd += ", ";
5993 char buf[256];
5994 sprintf(buf, "%.0f", (double)timestamp);
5995 cmd += fSql->QuoteString(buf);
5996 cmd += ", ";
5997 if (active)
5998 cmd += fSql->QuoteString("1");
5999 else
6000 cmd += fSql->QuoteString("0");
6001 cmd += ");";
6002
6003 int status = fSql->Exec(table_name, cmd.c_str());
6004 if (status != DB_SUCCESS)
6005 return HS_FILE_ERROR;
6006
6007 return HS_SUCCESS;
6008}
6009
6011// PostgreSQL history classes //
6013
6014#ifdef HAVE_PGSQL
6015
6016class PgsqlHistory: public SqlHistoryBase
6017{
6018public:
6019 Pgsql *fPgsql = NULL;
6020public:
6021 PgsqlHistory() { // ctor
6022 fPgsql = new Pgsql();
6023 fSql = fPgsql;
6024 }
6025
6027 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
6028 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
6029 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
6030};
6031
6032static int ReadPgsqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
6033{
6034 if (debug)
6035 printf("ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
6036
6037 int status;
6038 std::string cmd;
6039
6040 if (table_name) {
6041 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
6042 cmd += table_name;
6043 cmd += "';";
6044 } else {
6045 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
6046 table_name = "_history_index";
6047 }
6048
6049 status = sql->Prepare(table_name, cmd.c_str());
6050
6051 if (status != DB_SUCCESS)
6052 return status;
6053
6054 bool found_must_have_table = false;
6055 int count = 0;
6056
6057 while (1) {
6058 status = sql->Step();
6059
6060 if (status != DB_SUCCESS)
6061 break;
6062
6063 const char* xevent_name = sql->GetText(0);
6064 const char* xtable_name = sql->GetText(1);
6065 time_t xevent_time = sql->GetTime(2);
6066
6067 if (debug == 999) {
6068 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
6069 }
6070
6072 assert(must_have_event_name != NULL);
6074 found_must_have_table = true;
6075 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
6076 } else {
6077 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
6078 }
6079 }
6080
6081 HsSqlSchema* s = new HsSqlSchema;
6082 s->fSql = sql;
6085 s->fTimeTo = 0;
6087 sv->add(s);
6088 count++;
6089 }
6090
6091 status = sql->Finalize();
6092
6094 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
6095 if (debug == 999)
6096 return HS_FILE_ERROR;
6097 // NB: recursion is broken by setting debug to 999.
6099 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
6101 abort();
6102 return HS_FILE_ERROR;
6103 }
6104
6105 if (0) {
6106 // print accumulated schema
6107 printf("ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
6108 sv->print(false);
6109 }
6110
6111 return HS_SUCCESS;
6112}
6113
6114int PgsqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
6115{
6116 if (fDebug)
6117 printf("PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
6118
6119 // for all schema for table_name, prepopulate is with column names
6120
6121 std::vector<std::string> columns;
6122 fSql->ListColumns(table_name, &columns);
6123
6124 // first, populate column names
6125
6126 for (size_t i=0; i<sv->size(); i++) {
6127 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6128
6129 if (s->fTableName != table_name)
6130 continue;
6131
6132 // schema should be empty at this point
6133 //assert(s->fVariables.size() == 0);
6134
6135 for (size_t j=0; j<columns.size(); j+=2) {
6136 const char* cn = columns[j+0].c_str();
6137 const char* ct = columns[j+1].c_str();
6138
6139 if (strcmp(cn, "_t_time") == 0)
6140 continue;
6141 if (strcmp(cn, "_i_time") == 0)
6142 continue;
6143
6144 bool found = false;
6145
6146 for (size_t k=0; k<s->fColumnNames.size(); k++) {
6147 if (s->fColumnNames[k] == cn) {
6148 found = true;
6149 break;
6150 }
6151 }
6152
6153 if (!found) {
6155 se.tag_name = cn;
6156 se.tag_type = "";
6157 se.name = cn;
6158 se.type = 0;
6159 se.n_data = 1;
6160 se.n_bytes = 0;
6161 s->fVariables.push_back(se);
6162 s->fColumnNames.push_back(cn);
6163 s->fColumnTypes.push_back(ct);
6164 s->fColumnInactive.push_back(false);
6165 s->fOffsets.push_back(-1);
6166 }
6167 }
6168 }
6169
6170 // then read column name information
6171
6172 std::string cmd;
6173 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6174 cmd += fSql->QuoteString(event_name);
6175 cmd += ";";
6176
6177 int status = fSql->Prepare(table_name, cmd.c_str());
6178
6179 if (status != DB_SUCCESS) {
6180 return status;
6181 }
6182
6183 while (1) {
6184 status = fSql->Step();
6185
6186 if (status != DB_SUCCESS)
6187 break;
6188
6189 const char* col_name = fSql->GetText(0);
6190 const char* col_type = fSql->GetText(1);
6191 const char* tag_name = fSql->GetText(2);
6192 const char* tag_type = fSql->GetText(3);
6193 time_t schema_time = fSql->GetTime(4);
6194 const char* active = fSql->GetText(5);
6195 int iactive = atoi(active);
6196
6197 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
6198
6199 if (!col_name)
6200 continue;
6201 if (!tag_name)
6202 continue;
6203 if (strlen(col_name) < 1)
6204 continue;
6205
6206 // make sure a schema exists at this time point
6207 NewSqlSchema(sv, table_name, schema_time);
6208
6209 // add this information to all schema
6210 for (size_t i=0; i<sv->size(); i++) {
6211 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6212 if (s->fTableName != table_name)
6213 continue;
6214 if (s->fTimeFrom < schema_time)
6215 continue;
6216
6217 int tid = rpc_name_tid(tag_type);
6218 int tid_size = rpc_tid_size(tid);
6219
6220 for (size_t j=0; j<s->fColumnNames.size(); j++) {
6221 if (col_name != s->fColumnNames[j])
6222 continue;
6223
6224 s->fVariables[j].tag_name = tag_name;
6225 s->fVariables[j].tag_type = tag_type;
6226 if (!iactive) {
6227 s->fVariables[j].name = "";
6228 s->fColumnInactive[j] = true;
6229 } else {
6230 s->fVariables[j].name = tag_name;
6231 s->fColumnInactive[j] = false;
6232 }
6233 s->fVariables[j].type = tid;
6234 s->fVariables[j].n_data = 1;
6235 s->fVariables[j].n_bytes = tid_size;
6236
6237 // doctor column names in case MySQL returns different type
6238 // from the type used to create the column, but the types
6239 // are actually the same. K.O.
6241 }
6242 }
6243 }
6244
6245 status = fSql->Finalize();
6246
6247 return HS_SUCCESS;
6248}
6249
6250int PgsqlHistory::read_table_and_event_names(HsSchemaVector *sv)
6251{
6252 int status;
6253
6254 if (fDebug)
6255 printf("PgsqlHistory::read_table_and_event_names!\n");
6256
6257 // loop over all tables
6258
6259 std::vector<std::string> tables;
6260 status = fSql->ListTables(&tables);
6261 if (status != DB_SUCCESS)
6262 return status;
6263
6264 for (size_t i=0; i<tables.size(); i++) {
6265 const char* table_name = tables[i].c_str();
6266
6267 const char* s;
6268 s = strstr(table_name, "_history_index");
6269 if (s == table_name)
6270 continue;
6271
6272 if (1) {
6273 // seed schema with table names
6274 HsSqlSchema* s = new HsSqlSchema;
6275 s->fSql = fSql;
6276 s->fEventName = table_name;
6277 s->fTimeFrom = 0;
6278 s->fTimeTo = 0;
6279 s->fTableName = table_name;
6280 sv->add(s);
6281 }
6282 }
6283
6284 if (0) {
6285 // print accumulated schema
6286 printf("read_table_and_event_names:\n");
6287 sv->print(false);
6288 }
6289
6290 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6291
6292 return HS_SUCCESS;
6293}
6294
6295int PgsqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
6296{
6297 if (fDebug)
6298 printf("PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
6299
6300 int status;
6301 std::string table_name = MidasNameToSqlName(event_name);
6302
6303 // PostgreSQL table name length limit is 64 bytes
6304 if (table_name.length() > 40) {
6305 table_name.resize(40);
6306 table_name += "_T";
6307 }
6308
6309 time_t now = time(NULL);
6310
6311 int max_attempts = 10;
6312 for (int i=0; i<max_attempts; i++) {
6313 status = fSql->OpenTransaction(table_name.c_str());
6314 if (status != DB_SUCCESS) {
6315 return HS_FILE_ERROR;
6316 }
6317
6318 bool have_transaction = true;
6319
6320 std::string xtable_name = table_name;
6321
6322 if (i>0) {
6323 xtable_name += "_";
6325 if (i>1) {
6326 xtable_name += "_";
6327 char buf[256];
6328 sprintf(buf, "%d", i);
6329 xtable_name += buf;
6330 }
6331 }
6332
6333 if (fPgsql->fDownsample)
6335 else
6337
6338 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
6339
6340 if (status == DB_KEY_EXIST) {
6341 // already exists, try with different name!
6342 fSql->RollbackTransaction(table_name.c_str());
6343 continue;
6344 }
6345
6346 if (status != HS_SUCCESS) {
6347 fSql->RollbackTransaction(table_name.c_str());
6348 continue;
6349 }
6350
6351 fSql->Exec(table_name.c_str(), "SAVEPOINT t0");
6352
6353 for (int j=0; j<2; j++) {
6354 std::string cmd;
6355 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6356 cmd += fSql->QuoteString(event_name);
6357 cmd += ", ";
6358 cmd += fSql->QuoteString(xtable_name.c_str());
6359 cmd += ", ";
6360 char buf[256];
6361 sprintf(buf, "%.0f", (double)timestamp);
6362 cmd += buf;
6363 cmd += ", ";
6364 cmd += fSql->QuoteString("1");
6365 cmd += ");";
6366
6367 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6368 if (status == DB_SUCCESS)
6369 break;
6370
6371 // if INSERT failed _history_index does not exist then recover to savepoint t0
6372 // to prevent whole transition abort
6373 fSql->Exec(table_name.c_str(), "ROLLBACK TO SAVEPOINT t0");
6374
6375 status = CreateSqlTable(fSql, "_history_index", &have_transaction, true);
6376 status = CreateSqlColumn(fSql, "_history_index", "event_name", "text not null", &have_transaction, fDebug);
6377 status = CreateSqlColumn(fSql, "_history_index", "table_name", "text", &have_transaction, fDebug);
6378 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "text", &have_transaction, fDebug);
6379 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "text", &have_transaction, fDebug);
6380 status = CreateSqlColumn(fSql, "_history_index", "column_name", "text", &have_transaction, fDebug);
6381 status = CreateSqlColumn(fSql, "_history_index", "column_type", "text", &have_transaction, fDebug);
6382 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
6383 status = CreateSqlColumn(fSql, "_history_index", "active", "smallint", &have_transaction, fDebug);
6384
6385 status = fSql->CommitTransaction(table_name.c_str());
6386 }
6387
6388 if (status != DB_SUCCESS) {
6389 return HS_FILE_ERROR;
6390 }
6391
6392 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6393 }
6394
6395 cm_msg(MERROR, "PgsqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
6396
6397 return HS_FILE_ERROR;
6398}
6399
6400int PgsqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
6401{
6402 if (fDebug)
6403 printf("PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
6404
6405 std::string cmd;
6406 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6407 cmd += fSql->QuoteString(event_name);
6408 cmd += ", ";
6409 cmd += fSql->QuoteString(table_name);
6410 cmd += ", ";
6411 cmd += fSql->QuoteString(tag_name);
6412 cmd += ", ";
6413 cmd += fSql->QuoteString(tag_type);
6414 cmd += ", ";
6415 cmd += fSql->QuoteString(column_name);
6416 cmd += ", ";
6417 cmd += fSql->QuoteString(column_type);
6418 cmd += ", ";
6419 char buf[256];
6420 sprintf(buf, "%.0f", (double)timestamp);
6421 cmd += buf;
6422 cmd += ", ";
6423 if (active)
6424 cmd += fSql->QuoteString("1");
6425 else
6426 cmd += fSql->QuoteString("0");
6427 cmd += ");";
6428
6429 int status = fSql->Exec(table_name, cmd.c_str());
6430 if (status != DB_SUCCESS)
6431 return HS_FILE_ERROR;
6432
6433 return HS_SUCCESS;
6434}
6435
6436#endif // HAVE_PGSQL
6437
6439// File history class //
6441
6442const time_t kDay = 24*60*60;
6443const time_t kMonth = 30*kDay;
6444
6445const double KiB = 1024;
6446const double MiB = KiB*KiB;
6447//const double GiB = KiB*MiB;
6448
6450{
6451protected:
6452 std::string fPath;
6454 std::vector<std::string> fSortedFiles;
6455 std::vector<bool> fSortedRead;
6458
6459public:
6460 FileHistory() // ctor
6461 {
6462 // empty
6463 }
6464
6465 int hs_connect(const char* connect_string);
6466 int hs_disconnect();
6467 int hs_clear_cache();
6468 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
6469 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
6470
6471protected:
6472 int create_file(const char* event_name, time_t timestamp, const std::vector<HsSchemaEntry>& vars, std::string* filenamep);
6473 HsFileSchema* read_file_schema(const char* filename);
6474 int read_file_list(bool *pchanged);
6475 void clear_file_list();
6476 void tags_to_variables(int ntags, const TAG tags[], std::vector<HsSchemaEntry>& variables);
6477 HsSchema* maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s);
6478};
6479
6481{
6482 if (fDebug)
6483 printf("hs_connect [%s]!\n", connect_string);
6484
6485 hs_disconnect();
6486
6489
6490 // add trailing '/'
6491 if (fPath.length() > 0) {
6492 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
6494 }
6495
6496 return HS_SUCCESS;
6497}
6498
6500{
6501 if (fDebug)
6502 printf("FileHistory::hs_clear_cache!\n");
6503 fPathLastMtime = 0;
6505}
6506
6508{
6509 if (fDebug)
6510 printf("FileHistory::hs_disconnect!\n");
6511
6514
6515 return HS_SUCCESS;
6516}
6517
6519{
6520 fPathLastMtime = 0;
6521 fSortedFiles.clear();
6522 fSortedRead.clear();
6523}
6524
6526{
6527 int status;
6528 double start_time = ss_time_sec();
6529
6530 if (pchanged)
6531 *pchanged = false;
6532
6533 struct stat stat_buf;
6534 status = stat(fPath.c_str(), &stat_buf);
6535 if (status != 0) {
6536 cm_msg(MERROR, "FileHistory::read_file_list", "Cannot stat(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
6537 return HS_FILE_ERROR;
6538 }
6539
6540 //printf("dir [%s], mtime: %d %d last: %d %d, mtime %s", fPath.c_str(), stat_buf.st_mtimespec.tv_sec, stat_buf.st_mtimespec.tv_nsec, last_mtimespec.tv_sec, last_mtimespec.tv_nsec, ctime(&stat_buf.st_mtimespec.tv_sec));
6541
6542 if (stat_buf.st_mtime == fPathLastMtime) {
6543 if (fDebug)
6544 printf("FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n", fPath.c_str(), int(stat_buf.st_mtime));
6545 return HS_SUCCESS;
6546 }
6547
6548 fPathLastMtime = stat_buf.st_mtime;
6549
6550 if (fDebug)
6551 printf("FileHistory::read_file_list: reading list of history files in \"%s\"\n", fPath.c_str());
6552
6553 std::vector<std::string> flist;
6554
6555 ss_file_find(fPath.c_str(), "mhf_*.dat", &flist);
6556
6557 double ls_time = ss_time_sec();
6558 double ls_elapsed = ls_time - start_time;
6559 if (ls_elapsed > 5.000) {
6560 cm_msg(MINFO, "FileHistory::read_file_list", "\"ls -l\" of \"%s\" took %.1f sec", fPath.c_str(), ls_elapsed);
6562 }
6563
6564 // note: reverse iterator is used to sort filenames by time, newest first
6565 std::sort(flist.rbegin(), flist.rend());
6566
6567#if 0
6568 {
6569 printf("file names sorted by time:\n");
6570 for (size_t i=0; i<flist.size(); i++) {
6571 printf("%d: %s\n", i, flist[i].c_str());
6572 }
6573 }
6574#endif
6575
6576 std::vector<bool> fread;
6577 fread.resize(flist.size()); // fill with "false"
6578
6579 // loop over the old list of files,
6580 // for files we already read, loop over new file
6581 // list and mark the same file as read. K.O.
6582 for (size_t j=0; j<fSortedFiles.size(); j++) {
6583 if (fSortedRead[j]) {
6584 for (size_t i=0; i<flist.size(); i++) {
6585 if (flist[i] == fSortedFiles[j]) {
6586 fread[i] = true;
6587 break;
6588 }
6589 }
6590 }
6591 }
6592
6595
6596 if (pchanged)
6597 *pchanged = true;
6598
6599 return HS_SUCCESS;
6600}
6601
6602int FileHistory::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
6603{
6604 if (fDebug)
6605 printf("FileHistory::read_schema: event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
6606
6607 if (sv->size() == 0) {
6608 if (fDebug)
6609 printf("FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6611 }
6612
6614 DWORD old_timeout = 0;
6617
6618 bool changed = false;
6619
6621
6622 if (status != HS_SUCCESS) {
6624 return status;
6625 }
6626
6627 if (!changed) {
6628 if ((*sv).find_event(event_name, timestamp)) {
6629 if (fDebug)
6630 printf("FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name, TimeToString(timestamp).c_str());
6632 return HS_SUCCESS;
6633 }
6634 }
6635
6636 double start_time = ss_time_sec();
6637
6638 int count_read = 0;
6639
6640 for (size_t i=0; i<fSortedFiles.size(); i++) {
6641 std::string file_name = fPath + fSortedFiles[i];
6642 if (fSortedRead[i])
6643 continue;
6644 //bool dupe = false;
6645 //for (size_t ss=0; ss<sv->size(); ss++) {
6646 // HsFileSchema* ssp = (HsFileSchema*)(*sv)[ss];
6647 // if (file_name == ssp->fFileName) {
6648 // dupe = true;
6649 // break;
6650 // }
6651 //}
6652 //if (dupe)
6653 // continue;
6654 fSortedRead[i] = true;
6656 if (!s)
6657 continue;
6658 sv->add(s);
6659 count_read++;
6660
6661 if (event_name) {
6662 if (s->fEventName == event_name) {
6663 //printf("file %s event_name %s time %s, age %f\n", file_name.c_str(), s->fEventName.c_str(), TimeToString(s->fTimeFrom).c_str(), double(timestamp - s->fTimeFrom));
6664 if (s->fTimeFrom <= timestamp) {
6665 // this file is older than the time requested,
6666 // subsequent files will be even older,
6667 // we can stop reading here.
6668 break;
6669 }
6670 }
6671 }
6672 }
6673
6674 double end_time = ss_time_sec();
6675 double read_elapsed = end_time - start_time;
6676 if (read_elapsed > 5.000) {
6677 cm_msg(MINFO, "FileHistory::read_schema", "Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name, TimeToString(timestamp).c_str(), count_read, read_elapsed);
6679 }
6680
6682
6683 return HS_SUCCESS;
6684}
6685
6686void FileHistory::tags_to_variables(int ntags, const TAG tags[], std::vector<HsSchemaEntry>& variables)
6687{
6688 for (int i=0; i<ntags; i++) {
6690
6691 int tsize = rpc_tid_size(tags[i].type);
6692
6693 e.tag_name = tags[i].name;
6694 e.tag_type = rpc_tid_name(tags[i].type);
6695 e.name = tags[i].name;
6696 e.type = tags[i].type;
6697 e.n_data = tags[i].n_data;
6698 e.n_bytes = tags[i].n_data*tsize;
6699
6700 variables.push_back(e);
6701 }
6702}
6703
6704HsSchema* FileHistory::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
6705{
6706 if (fDebug)
6707 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
6708
6709 int status;
6710
6711 HsFileSchema* s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6712
6713 if (!s) {
6714 //printf("hs_define_event: no schema for event %s\n", event_name);
6715 status = read_schema(&fWriterSchema, event_name, timestamp);
6716 if (status != HS_SUCCESS)
6717 return NULL;
6718 s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6719 } else {
6720 //printf("hs_define_event: already have schema for event %s\n", s->fEventName.c_str());
6721 }
6722
6723 bool xdebug = false;
6724
6725 if (s) { // is existing schema the same as new schema?
6726 bool same = true;
6727
6728 if (same) {
6729 if (s->fEventName != event_name) {
6730 if (xdebug)
6731 printf("AAA: [%s] [%s]!\n", s->fEventName.c_str(), event_name);
6732 same = false;
6733 }
6734 }
6735
6736 if (same) {
6737 if (s->fVariables.size() != (size_t)ntags) {
6738 if (xdebug)
6739 printf("BBB: event [%s]: ntags: %zu -> %d!\n", event_name, s->fVariables.size(), ntags);
6740 same = false;
6741 }
6742 }
6743
6744 if (same) {
6745 for (size_t i=0; i<s->fVariables.size(); i++) {
6746 if (s->fVariables[i].name != tags[i].name) {
6747 if (xdebug)
6748 printf("CCC: event [%s] index %zu: name [%s] -> [%s]!\n", event_name, i, s->fVariables[i].name.c_str(), tags[i].name);
6749 same = false;
6750 }
6751 if (s->fVariables[i].type != (int)tags[i].type) {
6752 if (xdebug)
6753 printf("DDD: event [%s] index %zu: type %d -> %d!\n", event_name, i, s->fVariables[i].type, tags[i].type);
6754 same = false;
6755 }
6756 if (s->fVariables[i].n_data != (int)tags[i].n_data) {
6757 if (xdebug)
6758 printf("EEE: event [%s] index %zu: n_data %d -> %d!\n", event_name, i, s->fVariables[i].n_data, tags[i].n_data);
6759 same = false;
6760 }
6761 if (!same)
6762 break;
6763 }
6764 }
6765
6766 if (!same) {
6767 if (xdebug) {
6768 printf("*** Schema for event %s has changed!\n", event_name);
6769
6770 printf("*** Old schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6771 s->print();
6772 printf("*** New tags:\n");
6773 PrintTags(ntags, tags);
6774 }
6775
6776 if (fDebug)
6777 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags);
6778
6779 s = NULL;
6780 }
6781 }
6782
6783 if (s) {
6784 // maybe this schema is too old - rotate files every so often
6785 time_t age = timestamp - s->fTimeFrom;
6786 //printf("*** age %s (%.1f months), timestamp %s, time_from %s, file %s\n", TimeToString(age).c_str(), (double)age/(double)kMonth, TimeToString(timestamp).c_str(), TimeToString(s->fTimeFrom).c_str(), s->fFileName.c_str());
6787 if (age > fConfMaxFileAge) {
6788 if (fDebug)
6789 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, (double)age/(double)kMonth);
6790
6791 // force creation of a new file
6792 s = NULL;
6793 }
6794 }
6795
6796 if (s) {
6797 // maybe this file is too big - rotate files to limit maximum size
6798 double size = ss_file_size(s->fFileName.c_str());
6799 //printf("*** size %.0f, file %s\n", size, s->fFileName.c_str());
6800 if (size > fConfMaxFileSize) {
6801 if (fDebug)
6802 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, max size %.1f MiBytes, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, size/MiB, fConfMaxFileSize/MiB);
6803
6804 // force creation of a new file
6805 s = NULL;
6806 }
6807 }
6808
6809 if (!s) {
6810 std::string filename;
6811
6812 std::vector<HsSchemaEntry> vars;
6813
6814 tags_to_variables(ntags, tags, vars);
6815
6816 status = create_file(event_name, timestamp, vars, &filename);
6817 if (status != HS_SUCCESS)
6818 return NULL;
6819
6820 HsFileSchema* ss = read_file_schema(filename.c_str());
6821 if (!ss) {
6822 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6823 return NULL;
6824 }
6825
6827
6828 s = (HsFileSchema*)fWriterSchema.find_event(event_name, timestamp);
6829
6830 if (!s) {
6831 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6832 return NULL;
6833 }
6834
6835 if (xdebug) {
6836 printf("*** New schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6837 s->print();
6838 }
6839 }
6840
6841 assert(s != NULL);
6842
6843#if 0
6844 {
6845 printf("schema for [%s] is %p\n", event_name, s);
6846 if (s)
6847 s->print();
6848 }
6849#endif
6850
6851 HsFileSchema* e = new HsFileSchema();
6852
6853 *e = *s; // make a copy of the schema
6854
6855 return e;
6856}
6857
6858int FileHistory::create_file(const char* event_name, time_t timestamp, const std::vector<HsSchemaEntry>& vars, std::string* filenamep)
6859{
6860 if (fDebug)
6861 printf("FileHistory::create_file: event [%s]\n", event_name);
6862
6863 // NB: file names are constructed in such a way
6864 // that when sorted lexicographically ("ls -1 | sort")
6865 // they *also* become sorted by time
6866
6867 struct tm tm;
6868 localtime_r(&timestamp, &tm); // somebody must call tzset() before this.
6869
6870 char buf[256];
6871 strftime(buf, sizeof(buf), "%Y%m%d", &tm);
6872
6873 std::string filename;
6874 filename += fPath;
6875 filename += "mhf_";
6876 filename += TimeToString(timestamp);
6877 filename += "_";
6878 filename += buf;
6879 filename += "_";
6880 filename += MidasNameToFileName(event_name);
6881
6882 std::string try_filename = filename + ".dat";
6883
6884 FILE *fp = NULL;
6885 for (int itry=0; itry<10; itry++) {
6886 if (itry > 0) {
6887 char s[256];
6888 sprintf(s, "_%d", rand());
6889 try_filename = filename + s + ".dat";
6890 }
6891
6892 fp = fopen(try_filename.c_str(), "r");
6893 if (fp != NULL) {
6894 // this file already exists, try with a different name
6895 fclose(fp);
6896 continue;
6897 }
6898
6899 fp = fopen(try_filename.c_str(), "w");
6900 if (fp == NULL) {
6901 // somehow cannot create this file, try again
6902 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6903 continue;
6904 }
6905
6906 // file opened
6907 break;
6908 }
6909
6910 if (fp == NULL) {
6911 // somehow cannot create any file, whine!
6912 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6913 return HS_FILE_ERROR;
6914 }
6915
6916 std::string ss;
6917
6918 ss += "version: 2.0\n";
6919 ss += "event_name: ";
6920 ss += event_name;
6921 ss += "\n";
6922 ss += "time: ";
6923 ss += TimeToString(timestamp);
6924 ss += "\n";
6925
6926 int recsize = 0;
6927
6928 ss += "tag: /DWORD 1 4 /timestamp\n";
6929 recsize += 4;
6930
6931 bool padded = false;
6932 int offset = 0;
6933
6934 bool xdebug = false; // (strcmp(event_name, "u_Beam") == 0);
6935
6936 for (size_t i=0; i<vars.size(); i++) {
6937 int tsize = rpc_tid_size(vars[i].type);
6938 int n_bytes = vars[i].n_data*tsize;
6939 int xalign = (offset % tsize);
6940
6941 if (xdebug)
6942 printf("tag %zu, tsize %d, n_bytes %d, xalign %d\n", i, tsize, n_bytes, xalign);
6943
6944#if 0
6945 // looks like history data does not do alignement and padding
6946 if (xalign != 0) {
6947 padded = true;
6948 int pad_bytes = tsize - xalign;
6949 assert(pad_bytes > 0);
6950
6951 ss += "tag: ";
6952 ss += "XPAD";
6953 ss += " ";
6954 ss += SmallIntToString(1);
6955 ss += " ";
6957 ss += " ";
6958 ss += "pad_";
6959 ss += SmallIntToString(i);
6960 ss += "\n";
6961
6962 offset += pad_bytes;
6963 recsize += pad_bytes;
6964
6965 assert((offset % tsize) == 0);
6966 fprintf(stderr, "FIXME: need to debug padding!\n");
6967 //abort();
6968 }
6969#endif
6970
6971 ss += "tag: ";
6972 ss += rpc_tid_name(vars[i].type);
6973 ss += " ";
6974 ss += SmallIntToString(vars[i].n_data);
6975 ss += " ";
6976 ss += SmallIntToString(n_bytes);
6977 ss += " ";
6978 ss += vars[i].name;
6979 ss += "\n";
6980
6981 recsize += n_bytes;
6982 offset += n_bytes;
6983 }
6984
6985 ss += "record_size: ";
6987 ss += "\n";
6988
6989 // reserve space for "data_offset: ..."
6990 int sslength = ss.length() + 127;
6991
6992 int block = 1024;
6993 int nb = (sslength + block - 1)/block;
6994 int data_offset = block * nb;
6995
6996 ss += "data_offset: ";
6998 ss += "\n";
6999
7000 fprintf(fp, "%s", ss.c_str());
7001
7002 fclose(fp);
7003
7004 if (1 && padded) {
7005 printf("Schema in file %s has padding:\n", try_filename.c_str());
7006 printf("%s", ss.c_str());
7007 }
7008
7009 if (filenamep)
7011
7012 return HS_SUCCESS;
7013}
7014
7016{
7017 if (fDebug)
7018 printf("FileHistory::read_file_schema: file %s\n", filename);
7019
7020 FILE* fp = fopen(filename, "r");
7021 if (!fp) {
7022 cm_msg(MERROR, "FileHistory::read_file_schema", "Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
7023 return NULL;
7024 }
7025
7026 HsFileSchema* s = NULL;
7027
7028 // File format looks like this:
7029 // version: 2.0
7030 // event_name: u_Beam
7031 // time: 1023174012
7032 // tag: /DWORD 1 4 /timestamp
7033 // tag: FLOAT 1 4 B1
7034 // ...
7035 // tag: FLOAT 1 4 Ref Heater
7036 // record_size: 84
7037 // data_offset: 1024
7038
7039 size_t rd_recsize = 0;
7040 int offset = 0;
7041
7042 while (1) {
7043 char buf[1024];
7044 char* b = fgets(buf, sizeof(buf), fp);
7045
7046 //printf("read: %s\n", b);
7047
7048 if (!b) {
7049 break; // end of file
7050 }
7051
7052 char*bb;
7053
7054 bb = strchr(b, '\n');
7055 if (bb)
7056 *bb = 0;
7057
7058 bb = strchr(b, '\r');
7059 if (bb)
7060 *bb = 0;
7061
7062 bb = strstr(b, "version: 2.0");
7063 if (bb == b) {
7064 s = new HsFileSchema();
7065 assert(s);
7066
7067 s->fFileName = filename;
7068 continue;
7069 }
7070
7071 if (!s) {
7072 // malformed history file
7073 break;
7074 }
7075
7076 bb = strstr(b, "event_name: ");
7077 if (bb == b) {
7078 s->fEventName = bb + 12;
7079 continue;
7080 }
7081
7082 bb = strstr(b, "time: ");
7083 if (bb == b) {
7084 s->fTimeFrom = strtoul(bb + 6, NULL, 10);
7085 continue;
7086 }
7087
7088 // tag format is like this:
7089 //
7090 // tag: FLOAT 1 4 Ref Heater
7091 //
7092 // "FLOAT" is the MIDAS type, "/DWORD" is special tag for the timestamp
7093 // "1" is the number of array elements
7094 // "4" is the total tag size in bytes (n_data*tid_size)
7095 // "Ref Heater" is the tag name
7096
7097 bb = strstr(b, "tag: ");
7098 if (bb == b) {
7099 bb += 5; // now points to the tag MIDAS type
7100 const char* midas_type = bb;
7101 char* bbb = strchr(bb, ' ');
7102 if (bbb) {
7103 *bbb = 0;
7104 HsSchemaEntry t;
7105 if (midas_type[0] == '/') {
7106 t.type = 0;
7107 } else {
7109 if (t.type == 0) {
7110 cm_msg(MERROR, "FileHistory::read_file_schema", "Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
7111 if (s)
7112 delete s;
7113 s = NULL;
7114 break;
7115 }
7116 }
7117 bbb++;
7118 while (*bbb == ' ')
7119 bbb++;
7120 if (*bbb) {
7121 t.n_data = strtoul(bbb, &bbb, 10);
7122 while (*bbb == ' ')
7123 bbb++;
7124 if (*bbb) {
7125 t.n_bytes = strtoul(bbb, &bbb, 10);
7126 while (*bbb == ' ')
7127 bbb++;
7128 t.name = bbb;
7129 }
7130 }
7131
7132 if (midas_type[0] != '/') {
7133 s->fVariables.push_back(t);
7134 s->fOffsets.push_back(offset);
7135 offset += t.n_bytes;
7136 }
7137
7138 rd_recsize += t.n_bytes;
7139 }
7140 continue;
7141 }
7142
7143 bb = strstr(b, "record_size: ");
7144 if (bb == b) {
7145 s->fRecordSize = atoi(bb + 12);
7146 continue;
7147 }
7148
7149 bb = strstr(b, "data_offset: ");
7150 if (bb == b) {
7151 s->fDataOffset = atoi(bb + 12);
7152 // data offset is the last entry in the file
7153 break;
7154 }
7155 }
7156
7157 fclose(fp);
7158
7159 if (!s) {
7160 cm_msg(MERROR, "FileHistory::read_file_schema", "Malformed history schema in \'%s\', maybe it is not a history file", filename);
7161 return NULL;
7162 }
7163
7164 if (rd_recsize != s->fRecordSize) {
7165 cm_msg(MERROR, "FileHistory::read_file_schema", "Record size mismatch in history schema from \'%s\', file says %zu while total of all tags is %zu", filename, s->fRecordSize, rd_recsize);
7166 if (s)
7167 delete s;
7168 return NULL;
7169 }
7170
7171 if (!s) {
7172 cm_msg(MERROR, "FileHistory::read_file_schema", "Could not read history schema from \'%s\', maybe it is not a history file", filename);
7173 if (s)
7174 delete s;
7175 return NULL;
7176 }
7177
7178 if (fDebug > 1)
7179 s->print();
7180
7181 return s;
7182}
7183
7184HsSchema* FileHistory::maybe_reopen(const char* event_name, time_t timestamp, HsSchema* s)
7185{
7186 HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
7187
7188 assert(fs != NULL); // FileHistory::maybe_reopen() must be called only with file history schema.
7189
7190 if (fs->fFileSize <= fConfMaxFileSize) {
7191 // not big enough, let it grow
7192 return s;
7193 }
7194
7195 // must rotate the file
7196
7197 if (fDebug) {
7198 printf("FileHistory::maybe_reopen: reopen file \"%s\", size %jd, max size %jd\n", fs->fFileName.c_str(), fs->fFileSize, fConfMaxFileSize);
7199 }
7200
7201 std::string new_filename;
7202
7203 int status = create_file(event_name, timestamp, fs->fVariables, &new_filename);
7204
7205 if (status != HS_SUCCESS) {
7206 // cannot create new history file
7207 return s;
7208 }
7209
7211
7212 if (!new_fs) {
7213 // cannot open new history file
7214 return s;
7215 }
7216
7217 new_fs->fDisabled = false;
7218
7220 *new_fs_copy = *new_fs; // make a copy
7221
7222 //printf("replacing schema %p %p with %p %p\n", s, fs, new_fs, new_fs_copy);
7223
7224 for (size_t i=0; i<fWriterEvents.size(); i++) {
7225 if (s == fWriterEvents[i]) {
7226 delete fWriterEvents[i];
7228 s = NULL; // pointer to fEvents[i]
7229 fs = NULL; // pointer to fEvents[i]
7230 }
7231 }
7232
7233 assert(s == NULL); // the schema we are replacing must be in fWriterEvents.
7234
7235 fWriterSchema.add(new_fs_copy); // make sure new file is added to the list of files
7236
7237 assert(new_fs->fFileSize < fConfMaxFileSize); // check that we are not returning the original big file
7238
7239 //new_fs->print();
7240
7241 return new_fs;
7242}
7243
7245// Factory constructors //
7247
7249{
7250#ifdef HAVE_SQLITE
7251 return new SqliteHistory();
7252#else
7253 cm_msg(MERROR, "MakeMidasHistorySqlite", "Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7254 return NULL;
7255#endif
7256}
7257
7259{
7260#ifdef HAVE_MYSQL
7261 return new MysqlHistory();
7262#else
7263 cm_msg(MERROR, "MakeMidasHistoryMysql", "Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7264 return NULL;
7265#endif
7266}
7267
7269{
7270#ifdef HAVE_PGSQL
7271 return new PgsqlHistory();
7272#else
7273 cm_msg(MERROR, "MakeMidasHistoryPgsql", "Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
7274 return NULL;
7275#endif
7276}
7277
7282
7283/* emacs
7284 * Local Variables:
7285 * tab-width: 8
7286 * c-basic-offset: 3
7287 * indent-tabs-mode: nil
7288 * End:
7289 */
#define FALSE
Definition cfortran.h:309
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
std::string fPath
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
off64_t fConfMaxFileSize
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
void tags_to_variables(int ntags, const TAG tags[], std::vector< HsSchemaEntry > &variables)
int create_file(const char *event_name, time_t timestamp, const std::vector< HsSchemaEntry > &vars, std::string *filenamep)
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
void remove_inactive_columns()
off64_t fFileSizeInitial
std::string fFileName
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int write_event(const time_t t, const char *data, const size_t data_size)
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int fCountWriteUndersize
virtual void print(bool print_tags=true) const
std::vector< int > fOffsets
virtual ~HsSchema()
virtual void remove_inactive_columns()=0
std::vector< HsSchemaEntry > fVariables
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
size_t fWriteMinSize
virtual int write_event(const time_t t, const char *data, const size_t data_size)=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
virtual int close()=0
int fCountWriteOversize
size_t fWriteMaxSize
std::string fEventName
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
void print(bool print_tags=true) const
std::vector< HsSchema * > fData
size_t size() const
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
void add(HsSchema *s)
HsSchema * operator[](int index) const
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
std::vector< std::string > fColumnNames
void print(bool print_tags=true) const
void remove_inactive_columns()
std::vector< std::string > fColumnTypes
std::vector< bool > fColumnInactive
void increment_transaction_count()
int match_event_var(const char *event_name, const char *var_name, const int var_index)
std::string fTableName
static std::map< SqlBase *, int > gfTransactionCount
void reset_transaction_count()
int write_event(const time_t t, const char *data, const size_t data_size)
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
Definition history.h:112
char name[NAME_LENGTH]
Definition history.h:111
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
virtual HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)=0
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
HsSchemaVector fWriterSchema
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
std::vector< HsSchema * > fWriterEvents
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
HsSchemaVector fReaderSchema
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Finalize()=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual ~SqlBase()
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual int Step()=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3325
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3283
#define DB_KEY_EXIST
Definition midas.h:641
#define DB_FILE_ERROR
Definition midas.h:647
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_MORE_SUBKEYS
Definition midas.h:646
#define HS_UNDEFINED_VAR
Definition midas.h:733
#define HS_SUCCESS
Definition midas.h:727
#define HS_FILE_ERROR
Definition midas.h:728
#define HS_UNDEFINED_EVENT
Definition midas.h:732
unsigned int DWORD
Definition mcstd.h:51
#define TID_DOUBLE
Definition midas.h:343
#define TID_SBYTE
Definition midas.h:329
#define TID_BOOL
Definition midas.h:340
#define TID_SHORT
Definition midas.h:334
#define TID_WORD
Definition midas.h:332
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_CHAR
Definition midas.h:331
#define TID_INT
Definition midas.h:338
#define TID_FLOAT
Definition midas.h:341
#define TID_LAST
Definition midas.h:354
#define TID_DWORD
Definition midas.h:336
double ss_file_size(const char *path)
Definition system.cxx:6978
double ss_time_sec()
Definition system.cxx:3467
INT ss_file_find(const char *path, const char *pattern, char **plist)
Definition system.cxx:6719
INT cm_msg_flush_buffer()
Definition midas.cxx:865
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
const char * rpc_tid_name(INT id)
Definition midas.cxx:11772
int rpc_name_tid(const char *name)
Definition midas.cxx:11786
INT rpc_tid_size(INT id)
Definition midas.cxx:11765
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
const time_t kMonth
const double KiB
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int ReadRecord(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t irec, char *rec)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
const double MiB
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
const time_t kDay
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
static int FindTime(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t nrec, time_t timestamp, off64_t *i1p, time_t *t1p, off64_t *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
INT index
Definition mana.cxx:271
DWORD last_time
Definition mana.cxx:3070
void * data
Definition mana.cxx:268
BOOL debug
debug printouts
Definition mana.cxx:254
INT type
Definition mana.cxx:269
char host_name[HOST_NAME_LENGTH]
Definition mana.cxx:242
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
static int offset
Definition mgd.cxx:1500
#define DIR_SEPARATOR
Definition midas.h:193
DWORD BOOL
Definition midas.h:105
#define DIR_SEPARATOR_STR
Definition midas.h:194
#define read(n, a, f)
#define write(n, a, f, d)
#define name(x)
Definition midas_macro.h:24
static FILE * fp
struct callback_addr callback
Definition mserver.cxx:22
MUTEX_T * tm
Definition odbedit.cxx:39
BOOL match(char *pat, char *str)
Definition odbedit.cxx:190
INT j
Definition odbhist.cxx:40
INT k
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
char file_name[256]
Definition odbhist.cxx:41
INT add
Definition odbhist.cxx:40
DWORD status
Definition odbhist.cxx:39
char var_name[256]
Definition odbhist.cxx:41
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
std::string tag_name
int type
std::string tag_type
int n_data
std::string name
int n_bytes
Definition midas.h:1232
DWORD type
Definition midas.h:1234
DWORD n_data
Definition midas.h:1235
char name[NAME_LENGTH]
Definition midas.h:1233
char c
Definition system.cxx:1310
@ DIR
Definition test_init.cxx:7
static double e(void)
Definition tinyexpr.c:136