MIDAS
Loading...
Searching...
No Matches
history_schema.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: history_schema.cxx
4 Created by: Konstantin Olchanski
5
6 Contents: Schema based MIDAS history. Available drivers:
7 FileHistory: storage of data in binary files (replacement for the traditional MIDAS history)
8 MysqlHistory: storage of data in MySQL database (replacement for the ODBC based SQL history)
9 PgsqlHistory: storage of data in PostgreSQL database
10 SqliteHistory: storage of data in SQLITE3 database (not suitable for production use)
11
12\********************************************************************/
13
14#undef NDEBUG // midas required assert() to be always enabled
15
16#include "midas.h"
17#include "msystem.h"
18#include "mstrlcpy.h"
19
20#include <math.h>
21
22#include <vector>
23#include <list>
24#include <string>
25#include <map>
26#include <algorithm>
27
28// make mysql/my_global.h happy - it redefines closesocket()
29#undef closesocket
30
31//
32// benchmarks
33//
34// /usr/bin/time ./linux/bin/mh2sql . /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
35// -rw-r--r-- 1 alpha users 161028048 Oct 19 2012 /ladd/iris_data2/alpha/alphacpc09-elog-history/history/121019.hst
36// flush 10000, sync=OFF -> 51.51user 1.51system 0:53.76elapsed 98%CPU
37// flush 1000000, sync=NORMAL -> 51.83user 2.09system 1:08.37elapsed 78%CPU (flush never activated)
38// flush 100000, sync=NORMAL -> 51.38user 1.94system 1:06.94elapsed 79%CPU
39// flush 10000, sync=NORMAL -> 51.37user 2.03system 1:31.63elapsed 58%CPU
40// flush 1000, sync=NORMAL -> 52.16user 2.70system 4:38.58elapsed 19%CPU
41
43// MIDAS includes //
45
46#include "midas.h"
47#include "history.h"
48
50// helper stuff //
52
53#define FREE(x) { if (x) free(x); (x) = NULL; }
54
55static char* skip_spaces(char* s)
56{
57 while (*s) {
58 if (!isspace(*s))
59 break;
60 s++;
61 }
62 return s;
63}
64
65static std::string TimeToString(time_t t)
66{
67 const char* sign = "";
68
69 if (t == 0)
70 return "0";
71
72 time_t tt = t;
73
74 if (t < 0) {
75 sign = "-";
76 tt = -t;
77 }
78
79 assert(tt > 0);
80
81 std::string v;
82 while (tt) {
83 char c = '0' + tt%10;
84 tt /= 10;
85 v = c + v;
86 }
87
88 v = sign + v;
89
90 //printf("time %.0f -> %s\n", (double)t, v.c_str());
91
92 return v;
93}
94
95static std::string SmallIntToString(int i)
96{
97 //int ii = i;
98
99 if (i == 0)
100 return "0";
101
102 assert(i > 0);
103
104 std::string v;
105 while (i) {
106 char c = '0' + i%10;
107 i /= 10;
108 v = c + v;
109 }
110
111 //printf("SmallIntToString: %d -> %s\n", ii, v.c_str());
112
113 return v;
114}
115
116static bool MatchEventName(const char* event_name, const char* var_event_name)
117{
118 // new-style event name: "equipment_name/variable_name:tag_name"
119 // old-style event name: "equipment_name:tag_name" ("variable_name" is missing)
121
122 //printf("looking for event_name [%s], try table [%s] event name [%s], new style [%d]\n", var_event_name, table_name, event_name, newStyleEventName);
123
124 if (strcasecmp(event_name, var_event_name) == 0) {
125 return true;
126 } else if (newStyleEventName) {
127 return false;
128 } else { // for old style names, need more parsing
129 bool match = false;
130
131 const char* s = event_name;
132 for (int j=0; s[j]; j++) {
133
134 if ((var_event_name[j]==0) && (s[j]=='/')) {
135 match = true;
136 break;
137 }
138
139 if ((var_event_name[j]==0) && (s[j]=='_')) {
140 match = true;
141 break;
142 }
143
144 if (var_event_name[j]==0) {
145 match = false;
146 break;
147 }
148
149 if (tolower(var_event_name[j]) != tolower(s[j])) { // does not work for UTF-8 Unicode
150 match = false;
151 break;
152 }
153 }
154
155 return match;
156 }
157}
158
159static bool MatchTagName(const char* tag_name, int n_data, const char* var_tag_name, const int var_tag_index)
160{
161 char alt_tag_name[1024]; // maybe this is an array without "Names"?
163
164 //printf(" looking for tag [%s] alt [%s], try column name [%s]\n", var_tag_name, alt_tag_name, tag_name);
165
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
168 return true;
169
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
171 return true;
172
173 return false;
174}
175
176static void PrintTags(int ntags, const TAG tags[])
177{
178 for (int i=0; i<ntags; i++)
179 printf("tag %d: %s %s[%d]\n", i, rpc_tid_name(tags[i].type), tags[i].name, tags[i].n_data);
180}
181
182// convert MIDAS event name to something acceptable as an SQL identifier - table name, column name, etc
183
184static std::string MidasNameToSqlName(const char* s)
185{
186 std::string out;
187
188 for (int i=0; s[i]!=0; i++) {
189 char c = s[i];
190 if (isalpha(c) || isdigit(c))
191 out += tolower(c); // does not work for UTF-8 Unicode
192 else
193 out += '_';
194 }
195
196 return out;
197}
198
199// convert MIDAS event name to something acceptable as a file name
200
201static std::string MidasNameToFileName(const char* s)
202{
203 std::string out;
204
205 for (int i=0; s[i]!=0; i++) {
206 char c = s[i];
207 if (isalpha(c) || isdigit(c))
208 out += tolower(c); // does not work for UTF-8 Unicode
209 else
210 out += '_';
211 }
212
213 return out;
214}
215
216// compare event names
217
218static int event_name_cmp(const std::string& e1, const char* e2)
219{
220 return strcasecmp(e1.c_str(), e2);
221}
222
223// compare variable names
224
225static int var_name_cmp(const std::string& v1, const char* v2)
226{
227 return strcasecmp(v1.c_str(), v2);
228}
229
231// SQL data types //
233
234#ifdef HAVE_SQLITE
235static const char *sql_type_sqlite[TID_LAST] = {
236 "xxxINVALIDxxxNULL", // TID_NULL
237 "INTEGER", // TID_UINT8
238 "INTEGER", // TID_INT8
239 "TEXT", // TID_CHAR
240 "INTEGER", // TID_UINT16
241 "INTEGER", // TID_INT16
242 "INTEGER", // TID_UINT32
243 "INTEGER", // TID_INT32
244 "INTEGER", // TID_BOOL
245 "REAL", // TID_FLOAT
246 "REAL", // TID_DOUBLE
247 "INTEGER", // TID_BITFIELD
248 "TEXT", // TID_STRING
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
251 "xxxINVALIDxxxKEY",
252 "xxxINVALIDxxxLINK"
253};
254#endif
255
256#ifdef HAVE_PGSQL
257static const char *sql_type_pgsql[TID_LAST] = {
258 "xxxINVALIDxxxNULL", // TID_NULL
259 "smallint", // TID_BYTE
260 "smallint", // TID_SBYTE
261 "char(1)", // TID_CHAR
262 "integer", // TID_WORD
263 "smallint", // TID_SHORT
264 "bigint", // TID_DWORD
265 "integer", // TID_INT
266 "smallint", // TID_BOOL
267 "real", // TID_FLOAT
268 "double precision", // TID_DOUBLE
269 "bigint", // TID_BITFIELD
270 "text", // TID_STRING
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
273 "xxxINVALIDxxxKEY",
274 "xxxINVALIDxxxLINK"
275};
276#endif
277
278#ifdef HAVE_MYSQL
279static const char *sql_type_mysql[TID_LAST] = {
280 "xxxINVALIDxxxNULL", // TID_NULL
281 "tinyint unsigned", // TID_BYTE
282 "tinyint", // TID_SBYTE
283 "char", // TID_CHAR
284 "smallint unsigned", // TID_WORD
285 "smallint", // TID_SHORT
286 "integer unsigned", // TID_DWORD
287 "integer", // TID_INT
288 "tinyint", // TID_BOOL
289 "float", // TID_FLOAT
290 "double", // TID_DOUBLE
291 "integer unsigned", // TID_BITFIELD
292 "VARCHAR", // TID_STRING
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
295 "xxxINVALIDxxxKEY",
296 "xxxINVALIDxxxLINK"
297};
298#endif
299
300void DoctorPgsqlColumnType(std::string* col_type, const char* index_type)
301{
302 if (*col_type == index_type)
303 return;
304
305 if (*col_type == "bigint" && strcmp(index_type, "int8")==0) {
307 return;
308 }
309
310 if (*col_type == "integer" && strcmp(index_type, "int4")==0) {
312 return;
313 }
314
315 if (*col_type == "smallint" && strcmp(index_type, "int2")==0) {
317 return;
318 }
319
320 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
322 abort();
323}
324
325void DoctorSqlColumnType(std::string* col_type, const char* index_type)
326{
327 if (*col_type == index_type)
328 return;
329
330 if (*col_type == "int(10) unsigned" && strcmp(index_type, "integer unsigned")==0) {
332 return;
333 }
334
335 if (*col_type == "int(11)" && strcmp(index_type, "integer")==0) {
337 return;
338 }
339
340 if (*col_type == "integer" && strcmp(index_type, "int(11)")==0) {
342 return;
343 }
344
345 // MYSQL 8.0.23
346
347 if (*col_type == "int" && strcmp(index_type, "integer")==0) {
349 return;
350 }
351
352 if (*col_type == "int unsigned" && strcmp(index_type, "integer unsigned")==0) {
354 return;
355 }
356
357 cm_msg(MERROR, "SqlHistory", "Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
359 abort();
360}
361
362#if 0
363static int sql2midasType_mysql(const char* name)
364{
365 for (int tid=0; tid<TID_LAST; tid++)
366 if (strcasecmp(name, sql_type_mysql[tid])==0)
367 return tid;
368 // FIXME!
369 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
370 return 0;
371}
372#endif
373
374#if 0
375static int sql2midasType_sqlite(const char* name)
376{
377 if (strcmp(name, "INTEGER") == 0)
378 return TID_INT;
379 if (strcmp(name, "REAL") == 0)
380 return TID_DOUBLE;
381 if (strcmp(name, "TEXT") == 0)
382 return TID_STRING;
383 // FIXME!
384 printf("sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n", name);
385 return 0;
386}
387#endif
388
390// Schema base classes //
392
394 std::string tag_name; // tag name from MIDAS
395 std::string tag_type; // tag type from MIDAS
396 std::string name; // entry name, same as tag_name except when read from SQL history when it could be the SQL column name
397 int type = 0; // MIDAS data type TID_xxx
398 int n_data = 0; // MIDAS array size
399 int n_bytes = 0; // n_data * size of MIDAS data type (only used by HsFileSchema?)
400 bool inactive = false; // inactive SQL column
401
403 {
404 type = 0;
405 n_data = 0;
406 n_bytes = 0;
407 }
408};
409
411{
412 // event schema definitions
413 std::string event_name;
416 std::vector<HsSchemaEntry> variables;
417 std::vector<int> offsets;
418 int n_bytes = 0;
419
420 // run time data used by hs_write_event()
425
426 // schema disabled by write error
427 bool disabled = true;
428
429 HsSchema() // ctor
430 {
431 time_from = 0;
432 time_to = 0;
433 n_bytes = 0;
434
437 write_max_size = 0;
438 write_min_size = 0;
439 }
440
441 virtual void print(bool print_tags = true) const;
442 virtual ~HsSchema(); // dtor
443 virtual int flush_buffers() = 0;
444 virtual int close() = 0;
445 virtual int write_event(const time_t t, const char* data, const int data_size) = 0;
446 virtual int match_event_var(const char* event_name, const char* var_name, const int var_index);
447 virtual int read_last_written(const time_t timestamp,
448 const int debug,
449 time_t* last_written) = 0;
450 virtual int read_data(const time_t start_time,
451 const time_t end_time,
452 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
453 const int debug,
454 std::vector<time_t>& last_time,
455 MidasHistoryBufferInterface* buffer[]) = 0;
456};
457
459{
460protected:
461 std::vector<HsSchema*> data;
462
463public:
464 ~HsSchemaVector() { // dtor
465 clear();
466 }
467
469 return data[index];
470 }
471
472 unsigned size() const {
473 return data.size();
474 }
475
476 void add(HsSchema* s);
477
478 void clear() {
479 for (unsigned i=0; i<data.size(); i++)
480 if (data[i]) {
481 delete data[i];
482 data[i] = NULL;
483 }
484 data.clear();
485 }
486
487 void print(bool print_tags = true) const {
488 for (unsigned i=0; i<data.size(); i++)
490 }
491
492 HsSchema* find_event(const char* event_name, const time_t timestamp, int debug = 0);
493};
494
496// Base class functions //
498
500{
501 // only report if undersize/oversize happens more than once -
502 // the first occurence is already reported by hs_write_event()
503 if (count_write_undersize > 1) {
504 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as few as %d bytes", event_name.c_str(), count_write_undersize, n_bytes, write_min_size);
505 }
506
507 if (count_write_oversize > 1) {
508 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch count: %d, expected %d bytes, hs_write_event() called with as much as %d bytes", event_name.c_str(), count_write_oversize, n_bytes, write_max_size);
509 }
510};
511
513{
514 // schema list "data" is sorted by decreasing "time_from", newest schema first
515
516 //printf("add: %s..%s %s\n", TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), s->event_name.c_str());
517
518 bool added = false;
519
520 for (auto it = data.begin(); it != data.end(); it++) {
521 if (event_name_cmp((*it)->event_name, s->event_name.c_str())==0) {
522 if (s->time_from == (*it)->time_from) {
523 // duplicate schema, keep the last one added (for file schema it is the newer file)
524 s->time_to = (*it)->time_to;
525 delete (*it);
526 (*it) = s;
527 return;
528 }
529 }
530
531 if (s->time_from > (*it)->time_from) {
532 data.insert(it, s);
533 added = true;
534 break;
535 }
536 }
537
538 if (!added) {
539 data.push_back(s);
540 }
541
542 //time_t oldest_time_from = data.back()->time_from;
543
544 time_t time_to = 0;
545
546 for (auto it = data.begin(); it != data.end(); it++) {
547 if (event_name_cmp((*it)->event_name, s->event_name.c_str())==0) {
548 (*it)->time_to = time_to;
549 time_to = (*it)->time_from;
550
551 //printf("vvv: %s..%s %s\n", TimeToString((*it)->time_from-oldest_time_from).c_str(), TimeToString((*it)->time_to-oldest_time_from).c_str(), (*it)->event_name.c_str());
552 }
553 }
554}
555
556HsSchema* HsSchemaVector::find_event(const char* event_name, time_t t, int debug)
557{
558 HsSchema* ss = NULL;
559
560 if (debug) {
561 printf("find_event: All schema for event %s: (total %d)\n", event_name, (int)data.size());
562 int found = 0;
563 for (unsigned i=0; i<data.size(); i++) {
564 HsSchema* s = data[i];
565 printf("find_event: schema %d name [%s]\n", i, s->event_name.c_str());
566 if (event_name)
567 if (event_name_cmp(s->event_name, event_name)!=0)
568 continue;
569 s->print();
570 found++;
571 }
572 printf("find_event: Found %d schemas for event %s\n", found, event_name);
573
574 //if (found == 0)
575 // abort();
576 }
577
578 for (unsigned i=0; i<data.size(); i++) {
579 HsSchema* s = data[i];
580
581 // wrong event
582 if (event_name)
583 if (event_name_cmp(s->event_name, event_name)!=0)
584 continue;
585
586 // schema is from after the time we are looking for
587 if (s->time_from > t)
588 continue;
589
590 if (!ss)
591 ss = s;
592
593 // remember the newest schema
594 if (s->time_from > ss->time_from)
595 ss = s;
596 }
597
598 // try to find
599 for (unsigned i=0; i<data.size(); i++) {
600 HsSchema* s = data[i];
601
602 // wrong event
603 if (event_name)
604 if (event_name_cmp(s->event_name, event_name)!=0)
605 continue;
606
607 // schema is from after the time we are looking for
608 if (s->time_from > t)
609 continue;
610
611 if (!ss)
612 ss = s;
613
614 // remember the newest schema
615 if (s->time_from > ss->time_from)
616 ss = s;
617 }
618
619 if (debug) {
620 if (ss) {
621 printf("find_event: for time %s, returning:\n", TimeToString(t).c_str());
622 ss->print();
623 } else {
624 printf("find_event: for time %s, nothing found:\n", TimeToString(t).c_str());
625 }
626 }
627
628 return ss;
629}
630
632// Sql interface class //
634
635class SqlBase
636{
637public:
641
642 SqlBase() { // ctor
643 fDebug = 0;
644 fIsConnected = false;
646 };
647
648 virtual ~SqlBase() { // dtor
649 // confirm that the destructor of the concrete class
650 // disconnected the database
651 assert(!fIsConnected);
652 fDebug = 0;
653 fIsConnected = false;
654 }
655
656 virtual int Connect(const char* path) = 0;
657 virtual int Disconnect() = 0;
658 virtual bool IsConnected() = 0;
659
660 virtual int ListTables(std::vector<std::string> *plist) = 0;
661 virtual int ListColumns(const char* table_name, std::vector<std::string> *plist) = 0;
662
663 // sql commands
664 virtual int Exec(const char* table_name, const char* sql) = 0;
665 virtual int ExecDisconnected(const char* table_name, const char* sql) = 0;
666
667 // queries
668 virtual int Prepare(const char* table_name, const char* sql) = 0;
669 virtual int Step() = 0;
670 virtual const char* GetText(int column) = 0;
671 virtual time_t GetTime(int column) = 0;
672 virtual double GetDouble(int column) = 0;
673 virtual int Finalize() = 0;
674
675 // transactions
676 virtual int OpenTransaction(const char* table_name) = 0;
677 virtual int CommitTransaction(const char* table_name) = 0;
678 virtual int RollbackTransaction(const char* table_name) = 0;
679
680 // data types
681 virtual const char* ColumnType(int midas_tid) = 0;
682 virtual bool TypesCompatible(int midas_tid, const char* sql_type) = 0;
683
684 // string quoting
685 virtual std::string QuoteString(const char* s) = 0; // quote text string
686 virtual std::string QuoteId(const char* s) = 0; // quote identifier, such as table or column name
687};
688
690// Schema concrete classes //
692
693struct HsSqlSchema : public HsSchema {
695 std::vector<std::string> disconnected_buffer;
696 std::string table_name;
697 std::vector<std::string> column_names;
698 std::vector<std::string> column_types;
699
700 HsSqlSchema() // ctor
701 {
702 sql = 0;
704 }
705
706 ~HsSqlSchema() // dtor
707 {
708 assert(get_transaction_count() == 0);
709 }
710
711 void print(bool print_tags = true) const;
715 int close_transaction();
717 int close() { return close_transaction(); }
718 int write_event(const time_t t, const char* data, const int data_size);
719 int match_event_var(const char* event_name, const char* var_name, const int var_index);
720 int read_last_written(const time_t timestamp,
721 const int debug,
722 time_t* last_written);
723 int read_data(const time_t start_time,
724 const time_t end_time,
725 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
726 const int debug,
727 std::vector<time_t>& last_time,
729
730private:
731 // Sqlite uses a transaction per table; MySQL uses a single transaction for all tables.
732 // But to support future "single transaction" DBs more easily (e.g. if user wants to
733 // log to both Postgres and MySQL in future), we keep track of the transaction count
734 // per SQL engine.
736 static std::map<SqlBase*, int> global_transaction_count;
737};
738
739std::map<SqlBase*, int> HsSqlSchema::global_transaction_count;
740
741struct HsFileSchema : public HsSchema {
742 std::string file_name;
743 int record_size = 0;
744 int data_offset = 0;
745 int last_size = 0;
746 int writer_fd = -1;
749
750 HsFileSchema() // ctor
751 {
752 // empty
753 }
754
756 {
757 close();
758 record_size = 0;
759 data_offset = 0;
760 last_size = 0;
761 writer_fd = -1;
762 if (record_buffer) {
763 free(record_buffer);
765 }
767 }
768
769 void print(bool print_tags = true) const;
770 int flush_buffers() { return HS_SUCCESS; };
771 int close();
772 int write_event(const time_t t, const char* data, const int data_size);
773 int read_last_written(const time_t timestamp,
774 const int debug,
775 time_t* last_written);
776 int read_data(const time_t start_time,
777 const time_t end_time,
778 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
779 const int debug,
780 std::vector<time_t>& last_time,
782};
783
785// Print functions //
787
789{
790 unsigned nv = this->variables.size();
791 printf("event [%s], time %s..%s, %d variables, %d bytes\n", this->event_name.c_str(), TimeToString(this->time_from).c_str(), TimeToString(this->time_to).c_str(), nv, n_bytes);
792 if (print_tags)
793 for (unsigned j=0; j<nv; j++)
794 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->variables[j].name.c_str(), rpc_tid_name(this->variables[j].type), this->variables[j].type, this->variables[j].n_data, this->variables[j].n_bytes, this->offsets[j]);
795};
796
798{
799 unsigned nv = this->variables.size();
800 printf("event [%s], sql_table [%s], time %s..%s, %d variables, %d bytes\n", this->event_name.c_str(), this->table_name.c_str(), TimeToString(this->time_from).c_str(), TimeToString(this->time_to).c_str(), nv, n_bytes);
801 if (print_tags) {
802 for (unsigned j=0; j<nv; j++) {
803 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d", j, this->variables[j].name.c_str(), rpc_tid_name(this->variables[j].type), this->variables[j].type, this->variables[j].n_data, this->variables[j].n_bytes);
804 printf(", sql_column [%s], sql_type [%s], offset %d", this->column_names[j].c_str(), this->column_types[j].c_str(), this->offsets[j]);
805 printf(", inactive %d", this->variables[j].inactive);
806 printf("\n");
807 }
808 }
809}
810
812{
813 unsigned nv = this->variables.size();
814 printf("event [%s], file_name [%s], time %s..%s, %d variables, %d bytes, dat_offset %d, record_size %d\n", this->event_name.c_str(), this->file_name.c_str(), TimeToString(this->time_from).c_str(), TimeToString(this->time_to).c_str(), nv, n_bytes, data_offset, record_size);
815 if (print_tags) {
816 for (unsigned j=0; j<nv; j++)
817 printf(" %d: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n", j, this->variables[j].name.c_str(), rpc_tid_name(this->variables[j].type), this->variables[j].type, this->variables[j].n_data, this->variables[j].n_bytes, this->offsets[j]);
818 }
819}
820
822// File functions //
824
825#ifdef HAVE_MYSQL
826
828// MYSQL/MariaDB database access //
830
831//#warning !!!HAVE_MYSQL!!!
832
833//#include <my_global.h> // my_global.h removed MySQL 8.0, MariaDB 10.2. K.O.
834#include <mysql.h>
835
836class Mysql: public SqlBase
837{
838public:
839 std::string fConnectString;
840 MYSQL* fMysql;
841
842 // query results
845 int fNumFields;
846
847 // disconnected operation
848 unsigned fMaxDisconnected;
849 std::list<std::string> fDisconnectedBuffer;
853
854 Mysql(); // ctor
855 ~Mysql(); // dtor
856
857 int Connect(const char* path);
858 int Disconnect();
859 bool IsConnected();
860
861 int ConnectTable(const char* table_name);
862
863 int ListTables(std::vector<std::string> *plist);
864 int ListColumns(const char* table_name, std::vector<std::string> *plist);
865
866 int Exec(const char* table_name, const char* sql);
867 int ExecDisconnected(const char* table_name, const char* sql);
868
869 int Prepare(const char* table_name, const char* sql);
870 int Step();
871 const char* GetText(int column);
872 time_t GetTime(int column);
873 double GetDouble(int column);
874 int Finalize();
875
876 int OpenTransaction(const char* table_name);
877 int CommitTransaction(const char* table_name);
878 int RollbackTransaction(const char* table_name);
879
880 const char* ColumnType(int midas_tid);
881 bool TypesCompatible(int midas_tid, const char* sql_type);
882
883 std::string QuoteId(const char* s);
884 std::string QuoteString(const char* s);
885};
886
887Mysql::Mysql() // ctor
888{
889 fMysql = NULL;
890 fResult = NULL;
891 fRow = NULL;
892 fNumFields = 0;
893 fMaxDisconnected = 1000;
894 fNextReconnect = 0;
897 fTransactionPerTable = false;
898}
899
900Mysql::~Mysql() // dtor
901{
902 Disconnect();
903 fMysql = NULL;
904 fResult = NULL;
905 fRow = NULL;
906 fNumFields = 0;
907 if (fDisconnectedBuffer.size() > 0) {
908 cm_msg(MINFO, "Mysql::~Mysql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
910 }
911}
912
913int Mysql::Connect(const char* connect_string)
914{
915 if (fIsConnected)
916 Disconnect();
917
918 fConnectString = connect_string;
919
920 if (fDebug) {
921 cm_msg(MINFO, "Mysql::Connect", "Connecting to Mysql database specified by \'%s\'", connect_string);
923 }
924
925 std::string host_name;
926 std::string user_name;
927 std::string user_password;
928 std::string db_name;
929 int tcp_port = 0;
930 std::string unix_socket;
931 std::string buffer;
932
933 FILE* fp = fopen(connect_string, "r");
934 if (!fp) {
935 cm_msg(MERROR, "Mysql::Connect", "Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
936 return DB_FILE_ERROR;
937 }
938
939 while (1) {
940 char buf[256];
941 char* s = fgets(buf, sizeof(buf), fp);
942 if (!s)
943 break; // EOF
944
945 char*ss;
946 // kill trailing \n and \r
947 ss = strchr(s, '\n');
948 if (ss) *ss = 0;
949 ss = strchr(s, '\r');
950 if (ss) *ss = 0;
951
952 //printf("line [%s]\n", s);
953
954 if (strncasecmp(s, "server=", 7)==0)
955 host_name = skip_spaces(s + 7);
956 if (strncasecmp(s, "port=", 5)==0)
957 tcp_port = atoi(skip_spaces(s + 5));
958 if (strncasecmp(s, "database=", 9)==0)
959 db_name = skip_spaces(s + 9);
960 if (strncasecmp(s, "socket=", 7)==0)
961 unix_socket = skip_spaces(s + 7);
962 if (strncasecmp(s, "user=", 5)==0)
963 user_name = skip_spaces(s + 5);
964 if (strncasecmp(s, "password=", 9)==0)
965 user_password = skip_spaces(s + 9);
966 if (strncasecmp(s, "buffer=", 7)==0)
967 buffer = skip_spaces(s + 7);
968 }
969
970 fclose(fp);
971
972 int buffer_int = atoi(buffer.c_str());
973
974 if (buffer_int > 0 && buffer_int < 1000000)
976
977 if (fDebug)
978 printf("Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
979
980 if (!fMysql) {
982 if (!fMysql) {
983 return DB_FILE_ERROR;
984 }
985 }
986
988
989 if (mysql_real_connect(fMysql, host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
990 cm_msg(MERROR, "Mysql::Connect", "mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", mysql_errno(fMysql), mysql_error(fMysql));
991 Disconnect();
992 return DB_FILE_ERROR;
993 }
994
995 int status;
996
997 // FIXME:
998 //my_bool reconnect = 0;
999 //mysql_options(&mysql, MYSQL_OPT_RECONNECT, &reconnect);
1000
1001 status = Exec("(notable)", "SET SESSION sql_mode='ANSI'");
1002 if (status != DB_SUCCESS) {
1003 cm_msg(MERROR, "Mysql::Connect", "Cannot set ANSI mode, nothing will work");
1004 Disconnect();
1005 return DB_FILE_ERROR;
1006 }
1007
1008 if (fDebug) {
1009 cm_msg(MINFO, "Mysql::Connect", "Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1011 }
1012
1013 fIsConnected = true;
1014
1015 int count = 0;
1016 while (fDisconnectedBuffer.size() > 0) {
1017 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1018 if (status != DB_SUCCESS) {
1019 return status;
1020 }
1021 fDisconnectedBuffer.pop_front();
1022 count++;
1023 }
1024
1025 if (count > 0) {
1026 cm_msg(MINFO, "Mysql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1028 }
1029
1030 assert(fDisconnectedBuffer.size() == 0);
1032
1033 return DB_SUCCESS;
1034}
1035
1036int Mysql::Disconnect()
1037{
1038 if (fRow) {
1039 // FIXME: mysql_free_result(fResult);
1040 }
1041
1042 if (fResult)
1044
1045 if (fMysql)
1047
1048 fMysql = NULL;
1049 fResult = NULL;
1050 fRow = NULL;
1051
1052 fIsConnected = false;
1053 return DB_SUCCESS;
1054}
1055
1056bool Mysql::IsConnected()
1057{
1058 return fIsConnected;
1059}
1060
1061int Mysql::OpenTransaction(const char* table_name)
1062{
1063 return Exec(table_name, "START TRANSACTION");
1064 return DB_SUCCESS;
1065}
1066
1067int Mysql::CommitTransaction(const char* table_name)
1068{
1069 Exec(table_name, "COMMIT");
1070 return DB_SUCCESS;
1071}
1072
1073int Mysql::RollbackTransaction(const char* table_name)
1074{
1075 Exec(table_name, "ROLLBACK");
1076 return DB_SUCCESS;
1077}
1078
1079int Mysql::ListTables(std::vector<std::string> *plist)
1080{
1081 if (!fIsConnected)
1082 return DB_FILE_ERROR;
1083
1084 if (fDebug)
1085 printf("Mysql::ListTables!\n");
1086
1087 int status;
1088
1090
1091 if (fResult == NULL) {
1092 cm_msg(MERROR, "Mysql::ListTables", "mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1093 return DB_FILE_ERROR;
1094 }
1095
1097
1098 while (1) {
1099 status = Step();
1100 if (status != DB_SUCCESS)
1101 break;
1102 std::string tn = GetText(0);
1103 plist->push_back(tn);
1104 };
1105
1106 status = Finalize();
1107
1108 return DB_SUCCESS;
1109}
1110
1111int Mysql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1112{
1113 if (!fIsConnected)
1114 return DB_FILE_ERROR;
1115
1116 if (fDebug)
1117 printf("Mysql::ListColumns for table \'%s\'\n", table_name);
1118
1119 int status;
1120
1121 std::string cmd;
1122 cmd += "SHOW COLUMNS FROM ";
1123 cmd += QuoteId(table_name);
1124 cmd += ";";
1125
1126 status = Prepare(table_name, cmd.c_str());
1127 if (status != DB_SUCCESS)
1128 return status;
1129
1131
1132 while (1) {
1133 status = Step();
1134 if (status != DB_SUCCESS)
1135 break;
1136 std::string cn = GetText(0);
1137 std::string ct = GetText(1);
1138 plist->push_back(cn);
1139 plist->push_back(ct);
1140 //printf("cn [%s]\n", cn.c_str());
1141 //for (int i=0; i<fNumFields; i++)
1142 //printf(" field[%d]: [%s]\n", i, GetText(i));
1143 };
1144
1145 status = Finalize();
1146
1147 return DB_SUCCESS;
1148}
1149
1150int Mysql::Exec(const char* table_name, const char* sql)
1151{
1152 if (fDebug)
1153 printf("Mysql::Exec(%s, %s)\n", table_name, sql);
1154
1155 // FIXME: match Sqlite::Exec() return values:
1156 // return values:
1157 // DB_SUCCESS
1158 // DB_FILE_ERROR: not connected
1159 // DB_KEY_EXIST: "table already exists"
1160
1161 if (!fMysql)
1162 return DB_FILE_ERROR;
1163
1164 assert(fMysql);
1165 assert(fResult == NULL); // there should be no unfinalized queries
1166 assert(fRow == NULL);
1167
1168 if (mysql_query(fMysql, sql)) {
1169 if (mysql_errno(fMysql) == 1050) { // "Table already exists"
1170 return DB_KEY_EXIST;
1171 }
1172 if (mysql_errno(fMysql) == 1146) { // "Table does not exist"
1173 return DB_FILE_ERROR;
1174 }
1175 cm_msg(MERROR, "Mysql::Exec", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1176 if (mysql_errno(fMysql) == 1060) // "Duplicate column name"
1177 return DB_KEY_EXIST;
1178 if (mysql_errno(fMysql) == 2006) { // "MySQL server has gone away"
1179 Disconnect();
1180 return ExecDisconnected(table_name, sql);
1181 }
1182 return DB_FILE_ERROR;
1183 }
1184
1185 return DB_SUCCESS;
1186}
1187
1188int Mysql::ExecDisconnected(const char* table_name, const char* sql)
1189{
1190 if (fDebug)
1191 printf("Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1192
1194 fDisconnectedBuffer.push_back(sql);
1195 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1196 cm_msg(MERROR, "Mysql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1197 }
1198 } else {
1200 }
1201
1202 time_t now = time(NULL);
1203
1204 if (fNextReconnect == 0 || now >= fNextReconnect) {
1205 int status = Connect(fConnectString.c_str());
1206 if (status == DB_SUCCESS) {
1207 fNextReconnect = 0;
1209 } else {
1210 if (fNextReconnectDelaySec == 0) {
1212 } else if (fNextReconnectDelaySec < 10*60) {
1214 }
1215 if (fDebug) {
1216 cm_msg(MINFO, "Mysql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1218 }
1220 }
1221 }
1222
1223 return DB_SUCCESS;
1224}
1225
1226int Mysql::Prepare(const char* table_name, const char* sql)
1227{
1228 if (fDebug)
1229 printf("Mysql::Prepare(%s, %s)\n", table_name, sql);
1230
1231 if (!fMysql)
1232 return DB_FILE_ERROR;
1233
1234 assert(fMysql);
1235 assert(fResult == NULL); // there should be no unfinalized queries
1236 assert(fRow == NULL);
1237
1238 // if (mysql_query(fMysql, sql)) {
1239 // cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1240 // return DB_FILE_ERROR;
1241 //}
1242
1243 // Check if the connection to MySQL timed out; fix from B. Smith
1244 int status = mysql_query(fMysql, sql);
1245 if (status) {
1246 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1247 // "MySQL server has gone away" or "Lost connection to MySQL server during query"
1248 status = Connect(fConnectString.c_str());
1249 if (status == DB_SUCCESS) {
1250 // Retry after reconnecting
1251 status = mysql_query(fMysql, sql);
1252 } else {
1253 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql, status);
1254 return DB_FILE_ERROR;
1255 }
1256 }
1257 if (status) {
1258 cm_msg(MERROR, "Mysql::Prepare", "mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1259 return DB_FILE_ERROR;
1260 }
1261 cm_msg(MINFO, "Mysql::Prepare", "Reconnected to MySQL after long inactivity.");
1262 }
1263
1265 //fResult = mysql_use_result(fMysql); // cannot use this because it blocks writing into table
1266
1267 if (!fResult) {
1268 cm_msg(MERROR, "Mysql::Prepare", "mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1269 return DB_FILE_ERROR;
1270 }
1271
1273
1274 //printf("num fields %d\n", fNumFields);
1275
1276 return DB_SUCCESS;
1277}
1278
1279int Mysql::Step()
1280{
1281 if (/* DISABLES CODE */ (0) && fDebug)
1282 printf("Mysql::Step()\n");
1283
1284 assert(fMysql);
1285 assert(fResult);
1286
1288
1289 if (fRow)
1290 return DB_SUCCESS;
1291
1292 if (mysql_errno(fMysql) == 0)
1293 return DB_NO_MORE_SUBKEYS;
1294
1295 cm_msg(MERROR, "Mysql::Step", "mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1296
1297 return DB_FILE_ERROR;
1298}
1299
1300const char* Mysql::GetText(int column)
1301{
1302 assert(fMysql);
1303 assert(fResult);
1304 assert(fRow);
1305 assert(fNumFields > 0);
1306 assert(column >= 0);
1307 assert(column < fNumFields);
1308 if (fRow[column] == NULL)
1309 return "";
1310 return fRow[column];
1311}
1312
1313double Mysql::GetDouble(int column)
1314{
1315 return atof(GetText(column));
1316}
1317
1318time_t Mysql::GetTime(int column)
1319{
1320 return strtoul(GetText(column), NULL, 0);
1321}
1322
1323int Mysql::Finalize()
1324{
1325 assert(fMysql);
1326 assert(fResult);
1327
1329 fResult = NULL;
1330 fRow = NULL;
1331 fNumFields = 0;
1332
1333 return DB_SUCCESS;
1334}
1335
1336const char* Mysql::ColumnType(int midas_tid)
1337{
1338 assert(midas_tid>=0);
1339 assert(midas_tid<TID_LAST);
1340 return sql_type_mysql[midas_tid];
1341}
1342
1343bool Mysql::TypesCompatible(int midas_tid, const char* sql_type)
1344{
1345 if (/* DISABLES CODE */ (0))
1346 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1347
1348 //if (sql2midasType_mysql(sql_type) == midas_tid)
1349 // return true;
1350
1351 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1352 return true;
1353
1354 // permit writing FLOAT into DOUBLE
1355 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double")==0)
1356 return true;
1357
1358 // T2K quirk!
1359 // permit writing BYTE into signed tinyint
1360 if (midas_tid==TID_BYTE && strcmp(sql_type, "tinyint")==0)
1361 return true;
1362
1363 // T2K quirk!
1364 // permit writing WORD into signed tinyint
1365 if (midas_tid==TID_WORD && strcmp(sql_type, "tinyint")==0)
1366 return true;
1367
1368 // mysql quirk!
1369 //if (midas_tid==TID_DWORD && strcmp(sql_type, "int(10) unsigned")==0)
1370 // return true;
1371
1372 if (/* DISABLES CODE */ (0))
1373 printf("type mismatch!\n");
1374
1375 return false;
1376}
1377
1378std::string Mysql::QuoteId(const char* s)
1379{
1380 std::string q;
1381 q += "`";
1382 q += s;
1383 q += "`";
1384 return q;
1385}
1386
1387std::string Mysql::QuoteString(const char* s)
1388{
1389 std::string q;
1390 q += "\'";
1391 q += s;
1392#if 0
1393 while (int c = *s++) {
1394 if (c == '\'') {
1395 q += "\\'";
1396 } if (c == '"') {
1397 q += "\\\"";
1398 } else if (isprint(c)) {
1399 q += c;
1400 } else {
1401 char buf[256];
1402 sprintf(buf, "\\\\x%02x", c&0xFF);
1403 q += buf;
1404 }
1405 }
1406#endif
1407 q += "\'";
1408 return q;
1409}
1410
1411#endif // HAVE_MYSQL
1412
1413#ifdef HAVE_PGSQL
1414
1416// PostgreSQL database access //
1418
1419//#warning !!!HAVE_PGSQL!!!
1420
1421#include <libpq-fe.h>
1422
1423class Pgsql: public SqlBase
1424{
1425public:
1426 std::string fConnectString;
1427 int fDownsample;
1428 PGconn* fPgsql;
1429
1430 // query results
1432 int fNumFields;
1433 int fRow;
1434
1435 // disconnected operation
1436 unsigned fMaxDisconnected;
1437 std::list<std::string> fDisconnectedBuffer;
1441
1442 Pgsql(); // ctor
1443 ~Pgsql(); // dtor
1444
1445 int Connect(const char* path);
1446 int Disconnect();
1447 bool IsConnected();
1448
1449 int ConnectTable(const char* table_name);
1450
1451 int ListTables(std::vector<std::string> *plist);
1452 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1453
1454 int Exec(const char* table_name, const char* sql);
1455 int ExecDisconnected(const char* table_name, const char* sql);
1456
1457 int Prepare(const char* table_name, const char* sql);
1458 std::string BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints, const char* table_name, const char* column_name);
1459 int Step();
1460 const char* GetText(int column);
1461 time_t GetTime(int column);
1462 double GetDouble(int column);
1463 int Finalize();
1464
1465 int OpenTransaction(const char* table_name);
1466 int CommitTransaction(const char* table_name);
1467 int RollbackTransaction(const char* table_name);
1468
1469 const char* ColumnType(int midas_tid);
1470 bool TypesCompatible(int midas_tid, const char* sql_type);
1471
1472 std::string QuoteId(const char* s);
1473 std::string QuoteString(const char* s);
1474};
1475
1476Pgsql::Pgsql() // ctor
1477{
1478 fPgsql = NULL;
1479 fDownsample = 0;
1480 fResult = NULL;
1481 fRow = -1;
1482 fNumFields = 0;
1483 fMaxDisconnected = 1000;
1484 fNextReconnect = 0;
1487 fTransactionPerTable = false;
1488}
1489
1490Pgsql::~Pgsql() // dtor
1491{
1492 Disconnect();
1493 if(fResult)
1495 fRow = -1;
1496 fNumFields = 0;
1497 if (fDisconnectedBuffer.size() > 0) {
1498 cm_msg(MINFO, "Pgsql::~Pgsql", "Lost %d history entries accumulated while disconnected from the database", (int)fDisconnectedBuffer.size());
1500 }
1501}
1502
1503int Pgsql::Connect(const char* connect_string)
1504{
1505 if (fIsConnected)
1506 Disconnect();
1507
1508 fConnectString = connect_string;
1509
1510 if (fDebug) {
1511 cm_msg(MINFO, "Pgsql::Connect", "Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1513 }
1514
1515 std::string host_name;
1516 std::string user_name;
1517 std::string user_password;
1518 std::string db_name;
1519 std::string tcp_port;
1520 std::string unix_socket;
1521 std::string buffer;
1522
1523 FILE* fp = fopen(connect_string, "r");
1524 if (!fp) {
1525 cm_msg(MERROR, "Pgsql::Connect", "Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1526 return DB_FILE_ERROR;
1527 }
1528
1529 while (1) {
1530 char buf[256];
1531 char* s = fgets(buf, sizeof(buf), fp);
1532 if (!s)
1533 break; // EOF
1534
1535 char*ss;
1536 // kill trailing \n and \r
1537 ss = strchr(s, '\n');
1538 if (ss) *ss = 0;
1539 ss = strchr(s, '\r');
1540 if (ss) *ss = 0;
1541
1542 //printf("line [%s]\n", s);
1543
1544 if (strncasecmp(s, "server=", 7)==0)
1545 host_name = skip_spaces(s + 7);
1546 if (strncasecmp(s, "port=", 5)==0)
1547 tcp_port = skip_spaces(s + 5);
1548 if (strncasecmp(s, "database=", 9)==0)
1549 db_name = skip_spaces(s + 9);
1550 if (strncasecmp(s, "socket=", 7)==0)
1551 unix_socket = skip_spaces(s + 7);
1552 if (strncasecmp(s, "user=", 5)==0)
1553 user_name = skip_spaces(s + 5);
1554 if (strncasecmp(s, "password=", 9)==0)
1555 user_password = skip_spaces(s + 9);
1556 if (strncasecmp(s, "buffer=", 7)==0)
1557 buffer = skip_spaces(s + 7);
1558 }
1559
1560 fclose(fp);
1561
1562 int buffer_int = atoi(buffer.c_str());
1563
1564 if (buffer_int > 0 && buffer_int < 1000000)
1566
1567 if (fDebug)
1568 printf("Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%d]\n", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1569
1570 fPgsql = PQsetdbLogin(host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1571 if (PQstatus(fPgsql) != CONNECTION_OK) {
1572 std::string msg(PQerrorMessage(fPgsql));
1573 msg.erase(std::remove(msg.begin(), msg.end(), '\n'), msg.end());
1574 cm_msg(MERROR, "Pgsql::Connect", "PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", msg.c_str());
1575 Disconnect();
1576 return DB_FILE_ERROR;
1577 }
1578
1579 int status;
1580
1581 if (fDebug) {
1582 cm_msg(MINFO, "Pgsql::Connect", "Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %d", host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), "xxx", fMaxDisconnected);
1584 }
1585
1586 fIsConnected = true;
1587
1588 int count = 0;
1589 while (fDisconnectedBuffer.size() > 0) {
1590 status = Exec("(flush)", fDisconnectedBuffer.front().c_str());
1591 if (status != DB_SUCCESS) {
1592 return status;
1593 }
1594 fDisconnectedBuffer.pop_front();
1595 count++;
1596 }
1597
1598 if (count > 0) {
1599 cm_msg(MINFO, "Pgsql::Connect", "Saved %d, lost %d history events accumulated while disconnected from the database", count, fDisconnectedLost);
1601 }
1602
1603 assert(fDisconnectedBuffer.size() == 0);
1605
1606 if (fDownsample) {
1607 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb';");
1608
1609 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1610 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB extension not installed");
1611 return DB_FILE_ERROR;
1612 }
1613 Finalize();
1614
1615 status = Prepare("pg_extensions", "select extname from pg_extension where extname = 'timescaledb_toolkit';");
1616
1617 if (status != DB_SUCCESS || PQntuples(fResult) == 0) {
1618 cm_msg(MERROR, "Pgsql::Connect", "TimescaleDB_toolkit extension not installed");
1619 return DB_FILE_ERROR;
1620 }
1621 Finalize();
1622
1623 cm_msg(MINFO, "Pgsql::Connect", "TimescaleDB extensions found - downsampling enabled");
1624 }
1625
1626 return DB_SUCCESS;
1627}
1628
1629int Pgsql::Disconnect()
1630{
1631 if (fPgsql)
1633
1634 fPgsql = NULL;
1635 fRow = -1;
1636
1637 fIsConnected = false;
1638 return DB_SUCCESS;
1639}
1640
1641bool Pgsql::IsConnected()
1642{
1643 return fIsConnected;
1644}
1645
1646int Pgsql::OpenTransaction(const char* table_name)
1647{
1648 return Exec(table_name, "BEGIN TRANSACTION;");
1649}
1650
1651int Pgsql::CommitTransaction(const char* table_name)
1652{
1653 return Exec(table_name, "COMMIT;");
1654}
1655
1656int Pgsql::RollbackTransaction(const char* table_name)
1657{
1658 return Exec(table_name, "ROLLBACK;");
1659}
1660
1661int Pgsql::ListTables(std::vector<std::string> *plist)
1662{
1663 if (!fIsConnected)
1664 return DB_FILE_ERROR;
1665
1666 if (fDebug)
1667 printf("Pgsql::ListTables!\n");
1668
1669 int status = Prepare("pg_tables", "select tablename from pg_tables where schemaname = 'public';");
1670
1671 if (status != DB_SUCCESS) {
1672 cm_msg(MERROR, "Pgsql::ListTables", "error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1673 return DB_FILE_ERROR;
1674 }
1675
1676 while (1) {
1677 if (Step() != DB_SUCCESS)
1678 break;
1679 std::string tn = GetText(0);
1680 plist->push_back(tn);
1681 };
1682
1683 Finalize();
1684
1685 return DB_SUCCESS;
1686}
1687
1688int Pgsql::ListColumns(const char* table_name, std::vector<std::string> *plist)
1689{
1690 if (!fIsConnected)
1691 return DB_FILE_ERROR;
1692
1693 if (fDebug)
1694 printf("Pgsql::ListColumns for table \'%s\'\n", table_name);
1695
1696 std::string cmd;
1697 cmd += "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1698 cmd += QuoteString(table_name);
1699 cmd += ";";
1700
1701 int status = Prepare(table_name, cmd.c_str());
1702 if (status != DB_SUCCESS)
1703 return status;
1704
1706
1707 while (1) {
1708 if (Step() != DB_SUCCESS)
1709 break;
1710 std::string cn = GetText(0);
1711 std::string ct = GetText(1);
1712 plist->push_back(cn);
1713 plist->push_back(ct);
1714 };
1715
1716 Finalize();
1717
1718 return DB_SUCCESS;
1719}
1720
1721int Pgsql::Exec(const char* table_name, const char* sql)
1722{
1723 if (fDebug)
1724 printf("Pgsql::Exec(%s, %s)\n", table_name, sql);
1725
1726 if (!fPgsql)
1727 return DB_FILE_ERROR;
1728
1729 assert(fPgsql);
1730 assert(fRow == -1);
1731
1732 fResult = PQexec(fPgsql, sql);
1734 if(err != PGRES_TUPLES_OK) {
1735 if(err == PGRES_FATAL_ERROR) {
1736 // handle fatal error
1737 if(strstr(PQresultErrorMessage(fResult), "already exists"))
1738 return DB_KEY_EXIST;
1739 else return DB_FILE_ERROR;
1740 }
1741
1743 Disconnect();
1744 return ExecDisconnected(table_name, sql);
1745 }
1746 }
1747
1748 return DB_SUCCESS;
1749}
1750
1751int Pgsql::ExecDisconnected(const char* table_name, const char* sql)
1752{
1753 if (fDebug)
1754 printf("Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1755
1757 fDisconnectedBuffer.push_back(sql);
1758 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1759 cm_msg(MERROR, "Pgsql::ExecDisconnected", "Error: Disconnected database buffer overflow, size %d, subsequent events are lost", (int)fDisconnectedBuffer.size());
1760 }
1761 } else {
1763 }
1764
1765 time_t now = time(NULL);
1766
1767 if (fNextReconnect == 0 || now >= fNextReconnect) {
1768 int status = Connect(fConnectString.c_str());
1769 if (status == DB_SUCCESS) {
1770 fNextReconnect = 0;
1772 } else {
1773 if (fNextReconnectDelaySec == 0) {
1775 } else if (fNextReconnectDelaySec < 10*60) {
1777 }
1778 if (fDebug) {
1779 cm_msg(MINFO, "Pgsql::ExecDisconnected", "Next reconnect attempt in %d sec, history events buffered %d, lost %d", fNextReconnectDelaySec, (int)fDisconnectedBuffer.size(), fDisconnectedLost);
1781 }
1783 }
1784 }
1785
1786 return DB_SUCCESS;
1787}
1788
1789int Pgsql::Prepare(const char* table_name, const char* sql)
1790{
1791 if (fDebug)
1792 printf("Pgsql::Prepare(%s, %s)\n", table_name, sql);
1793
1794 if (!fPgsql)
1795 return DB_FILE_ERROR;
1796
1797 assert(fPgsql);
1798 //assert(fResult==NULL);
1799 assert(fRow == -1);
1800
1801 fResult = PQexec(fPgsql, sql);
1802 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1803 // lost connection to server
1804 int status = Connect(fConnectString.c_str());
1805 if (status == DB_SUCCESS) {
1806 // Retry after reconnecting
1807 fResult = PQexec(fPgsql, sql);
1808 } else {
1809 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql, status);
1810 return DB_FILE_ERROR;
1811 }
1812 if (status) {
1813 cm_msg(MERROR, "Pgsql::Prepare", "PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1814 return DB_FILE_ERROR;
1815 }
1816 cm_msg(MINFO, "Pgsql::Prepare", "Reconnected to PostgreSQL after long inactivity.");
1817 }
1818
1820
1821 return DB_SUCCESS;
1822}
1823
1824std::string Pgsql::BuildDownsampleQuery(const time_t start_time, const time_t end_time, const int npoints,
1825 const char* table_name, const char* column_name)
1826{
1827 std::string cmd;
1828 cmd += "SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1829
1830 cmd += " FROM unnest(( SELECT lttb";
1831 cmd += "(_t_time, ";
1832 cmd += column_name;
1833 cmd += ", ";
1834 cmd += std::to_string(npoints);
1835 cmd += ") ";
1836 cmd += "FROM ";
1837 cmd += QuoteId(table_name);
1838 cmd += " WHERE _t_time BETWEEN ";
1839 cmd += "to_timestamp(";
1840 cmd += TimeToString(start_time);
1841 cmd += ") AND to_timestamp(";
1842 cmd += TimeToString(end_time);
1843 cmd += ") )) ORDER BY time;";
1844
1845 return cmd;
1846}
1847
1848int Pgsql::Step()
1849{
1850 assert(fPgsql);
1851 assert(fResult);
1852
1853 fRow++;
1854
1855 if (fRow == PQntuples(fResult))
1856 return DB_NO_MORE_SUBKEYS;
1857
1858 return DB_SUCCESS;
1859}
1860
1861const char* Pgsql::GetText(int column)
1862{
1863 assert(fPgsql);
1864 assert(fResult);
1865 assert(fNumFields > 0);
1866 assert(column >= 0);
1867 assert(column < fNumFields);
1868
1869 return PQgetvalue(fResult, fRow, column);
1870}
1871
1872double Pgsql::GetDouble(int column)
1873{
1874 return atof(GetText(column));
1875}
1876
1877time_t Pgsql::GetTime(int column)
1878{
1879 return strtoul(GetText(column), NULL, 0);
1880}
1881
1882int Pgsql::Finalize()
1883{
1884 assert(fPgsql);
1885 assert(fResult);
1886
1887 fRow = -1;
1888 fNumFields = 0;
1889
1890 return DB_SUCCESS;
1891}
1892
1893const char* Pgsql::ColumnType(int midas_tid)
1894{
1895 assert(midas_tid>=0);
1896 assert(midas_tid<TID_LAST);
1897 return sql_type_pgsql[midas_tid];
1898}
1899
1900bool Pgsql::TypesCompatible(int midas_tid, const char* sql_type)
1901{
1902 if (/* DISABLES CODE */ (0))
1903 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
1904
1905 //if (sql2midasType_mysql(sql_type) == midas_tid)
1906 // return true;
1907
1908 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
1909 return true;
1910
1911 // permit writing FLOAT into DOUBLE
1912 if (midas_tid==TID_FLOAT && strcmp(sql_type, "double precision")==0)
1913 return true;
1914
1915 // T2K quirk!
1916 // permit writing BYTE into signed tinyint
1917 if (midas_tid==TID_BYTE && strcmp(sql_type, "integer")==0)
1918 return true;
1919
1920 // T2K quirk!
1921 // permit writing WORD into signed tinyint
1922 if (midas_tid==TID_WORD && strcmp(sql_type, "integer")==0)
1923 return true;
1924
1925 if (/* DISABLES CODE */ (0))
1926 printf("type mismatch!\n");
1927
1928 return false;
1929}
1930
1931std::string Pgsql::QuoteId(const char* s)
1932{
1933 std::string q;
1934 q += '"';
1935 q += s;
1936 q += '"';
1937 return q;
1938}
1939
1940std::string Pgsql::QuoteString(const char* s)
1941{
1942 std::string q;
1943 q += '\'';
1944 q += s;
1945 q += '\'';
1946 return q;
1947}
1948
1949#endif // HAVE_PGSQL
1950
1951#ifdef HAVE_SQLITE
1952
1954// SQLITE database access //
1956
1957#include <sqlite3.h>
1958
1959typedef std::map<std::string, sqlite3*> DbMap;
1960
1961class Sqlite: public SqlBase
1962{
1963public:
1964 std::string fPath;
1965
1966 DbMap fMap;
1967
1968 // temporary storage of query data
1971
1972 Sqlite(); // ctor
1973 ~Sqlite(); // dtor
1974
1975 int Connect(const char* path);
1976 int Disconnect();
1977 bool IsConnected();
1978
1979 int ConnectTable(const char* table_name);
1980 sqlite3* GetTable(const char* table_name);
1981
1982 int ListTables(std::vector<std::string> *plist);
1983 int ListColumns(const char* table_name, std::vector<std::string> *plist);
1984
1985 int Exec(const char* table_name, const char* sql);
1986 int ExecDisconnected(const char* table_name, const char* sql);
1987
1988 int Prepare(const char* table_name, const char* sql);
1989 int Step();
1990 const char* GetText(int column);
1991 time_t GetTime(int column);
1992 double GetDouble(int column);
1993 int Finalize();
1994
1995 int OpenTransaction(const char* table_name);
1996 int CommitTransaction(const char* table_name);
1997 int RollbackTransaction(const char* table_name);
1998
1999 const char* ColumnType(int midas_tid);
2000 bool TypesCompatible(int midas_tid, const char* sql_type);
2001
2002 std::string QuoteId(const char* s);
2003 std::string QuoteString(const char* s);
2004};
2005
2006std::string Sqlite::QuoteId(const char* s)
2007{
2008 std::string q;
2009 q += "\"";
2010 q += s;
2011 q += "\"";
2012 return q;
2013}
2014
2015std::string Sqlite::QuoteString(const char* s)
2016{
2017 std::string q;
2018 q += "\'";
2019 q += s;
2020 q += "\'";
2021 return q;
2022}
2023
2024const char* Sqlite::ColumnType(int midas_tid)
2025{
2026 assert(midas_tid>=0);
2027 assert(midas_tid<TID_LAST);
2028 return sql_type_sqlite[midas_tid];
2029}
2030
2031bool Sqlite::TypesCompatible(int midas_tid, const char* sql_type)
2032{
2033 if (0)
2034 printf("compare types midas \'%s\'=\'%s\' and sql \'%s\'\n", rpc_tid_name(midas_tid), ColumnType(midas_tid), sql_type);
2035
2036 //if (sql2midasType_sqlite(sql_type) == midas_tid)
2037 // return true;
2038
2039 if (strcasecmp(ColumnType(midas_tid), sql_type) == 0)
2040 return true;
2041
2042 // permit writing FLOAT into DOUBLE
2043 if (midas_tid==TID_FLOAT && strcasecmp(sql_type, "double")==0)
2044 return true;
2045
2046 return false;
2047}
2048
2049const char* Sqlite::GetText(int column)
2050{
2051 return (const char*)sqlite3_column_text(fTempStmt, column);
2052}
2053
2054time_t Sqlite::GetTime(int column)
2055{
2057}
2058
2059double Sqlite::GetDouble(int column)
2060{
2062}
2063
2064Sqlite::Sqlite() // ctor
2065{
2066 fIsConnected = false;
2067 fTempDB = NULL;
2068 fTempStmt = NULL;
2069 fDebug = 0;
2070}
2071
2072Sqlite::~Sqlite() // dtor
2073{
2074 Disconnect();
2075}
2076
2077const char* xsqlite3_errstr(sqlite3* db, int errcode)
2078{
2079 //return sqlite3_errstr(errcode);
2080 return sqlite3_errmsg(db);
2081}
2082
2083int Sqlite::ConnectTable(const char* table_name)
2084{
2085 std::string fname = fPath + "mh_" + table_name + ".sqlite3";
2086
2087 sqlite3* db = NULL;
2088
2089 int status = sqlite3_open(fname.c_str(), &db);
2090
2091 if (status != SQLITE_OK) {
2092 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(), status, xsqlite3_errstr(db, status));
2094 db = NULL;
2095 return DB_FILE_ERROR;
2096 }
2097
2098#if SQLITE_VERSION_NUMBER >= 3006020
2100 if (status != SQLITE_OK) {
2101 cm_msg(MERROR, "Sqlite::Connect", "Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2102 }
2103#else
2104#warning Missing sqlite3_extended_result_codes()!
2105#endif
2106
2107 fMap[table_name] = db;
2108
2109 Exec(table_name, "PRAGMA journal_mode=persist;");
2110 Exec(table_name, "PRAGMA synchronous=normal;");
2111 //Exec(table_name, "PRAGMA synchronous=off;");
2112 Exec(table_name, "PRAGMA journal_size_limit=-1;");
2113
2114 if (0) {
2115 Exec(table_name, "PRAGMA legacy_file_format;");
2116 Exec(table_name, "PRAGMA synchronous;");
2117 Exec(table_name, "PRAGMA journal_mode;");
2118 Exec(table_name, "PRAGMA journal_size_limit;");
2119 }
2120
2121#ifdef SQLITE_LIMIT_COLUMN
2122 if (0) {
2124 printf("Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2125 }
2126#endif
2127
2128 if (fDebug)
2129 cm_msg(MINFO, "Sqlite::Connect", "Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2130
2131 return DB_SUCCESS;
2132}
2133
2134sqlite3* Sqlite::GetTable(const char* table_name)
2135{
2136 sqlite3* db = fMap[table_name];
2137
2138 if (db)
2139 return db;
2140
2141 int status = ConnectTable(table_name);
2142 if (status != DB_SUCCESS)
2143 return NULL;
2144
2145 return fMap[table_name];
2146}
2147
2148int Sqlite::Connect(const char* path)
2149{
2150 if (fIsConnected)
2151 Disconnect();
2152
2153 fPath = path;
2154
2155 // add trailing '/'
2156 if (fPath.length() > 0) {
2157 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
2158 fPath += DIR_SEPARATOR_STR;
2159 }
2160
2161 if (fDebug)
2162 cm_msg(MINFO, "Sqlite::Connect", "Connected to Sqlite database in \'%s\'", fPath.c_str());
2163
2164 fIsConnected = true;
2165
2166 return DB_SUCCESS;
2167}
2168
2169int Sqlite::Disconnect()
2170{
2171 if (!fIsConnected)
2172 return DB_SUCCESS;
2173
2174 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2175 const char* table_name = iter->first.c_str();
2176 sqlite3* db = iter->second;
2177 int status = sqlite3_close(db);
2178 if (status != SQLITE_OK) {
2179 cm_msg(MERROR, "Sqlite::Disconnect", "sqlite3_close(%s) error %d (%s)", table_name, status, xsqlite3_errstr(db, status));
2180 }
2181 }
2182
2183 fMap.clear();
2184
2185 fIsConnected = false;
2186
2187 return DB_SUCCESS;
2188}
2189
2190bool Sqlite::IsConnected()
2191{
2192 return fIsConnected;
2193}
2194
2195int Sqlite::OpenTransaction(const char* table_name)
2196{
2197 int status = Exec(table_name, "BEGIN TRANSACTION");
2198 return status;
2199}
2200
2201int Sqlite::CommitTransaction(const char* table_name)
2202{
2203 int status = Exec(table_name, "COMMIT TRANSACTION");
2204 return status;
2205}
2206
2207int Sqlite::RollbackTransaction(const char* table_name)
2208{
2209 int status = Exec(table_name, "ROLLBACK TRANSACTION");
2210 return status;
2211}
2212
2213int Sqlite::Prepare(const char* table_name, const char* sql)
2214{
2215 sqlite3* db = GetTable(table_name);
2216 if (!db)
2217 return DB_FILE_ERROR;
2218
2219 if (fDebug)
2220 printf("Sqlite::Prepare(%s, %s)\n", table_name, sql);
2221
2222 assert(fTempDB==NULL);
2223 fTempDB = db;
2224
2225#if SQLITE_VERSION_NUMBER >= 3006020
2226 int status = sqlite3_prepare_v2(db, sql, strlen(sql), &fTempStmt, NULL);
2227#else
2228#warning Missing sqlite3_prepare_v2()!
2229 int status = sqlite3_prepare(db, sql, strlen(sql), &fTempStmt, NULL);
2230#endif
2231
2232 if (status == SQLITE_OK)
2233 return DB_SUCCESS;
2234
2235 std::string sqlstring = sql;
2236 cm_msg(MERROR, "Sqlite::Prepare", "Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, xsqlite3_errstr(db, status));
2237
2238 fTempDB = NULL;
2239
2240 return DB_FILE_ERROR;
2241}
2242
2243int Sqlite::Step()
2244{
2245 if (0 && fDebug)
2246 printf("Sqlite::Step()\n");
2247
2248 assert(fTempDB);
2249 assert(fTempStmt);
2250
2252
2253 if (status == SQLITE_DONE)
2254 return DB_NO_MORE_SUBKEYS;
2255
2256 if (status == SQLITE_ROW)
2257 return DB_SUCCESS;
2258
2259 cm_msg(MERROR, "Sqlite::Step", "sqlite3_step() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2260
2261 return DB_FILE_ERROR;
2262}
2263
2264int Sqlite::Finalize()
2265{
2266 if (0 && fDebug)
2267 printf("Sqlite::Finalize()\n");
2268
2269 assert(fTempDB);
2270 assert(fTempStmt);
2271
2273
2274 if (status != SQLITE_OK) {
2275 cm_msg(MERROR, "Sqlite::Finalize", "sqlite3_finalize() error %d (%s)", status, xsqlite3_errstr(fTempDB, status));
2276
2277 fTempDB = NULL;
2278 fTempStmt = NULL; // FIXME: maybe a memory leak?
2279 return DB_FILE_ERROR;
2280 }
2281
2282 fTempDB = NULL;
2283 fTempStmt = NULL;
2284
2285 return DB_SUCCESS;
2286}
2287
2288int Sqlite::ListTables(std::vector<std::string> *plist)
2289{
2290 if (!fIsConnected)
2291 return DB_FILE_ERROR;
2292
2293 if (fDebug)
2294 printf("Sqlite::ListTables at path [%s]\n", fPath.c_str());
2295
2296 int status;
2297
2298 const char* cmd = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2299
2300 DIR *dir = opendir(fPath.c_str());
2301 if (!dir) {
2302 cm_msg(MERROR, "Sqlite::ListTables", "Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2303 return HS_FILE_ERROR;
2304 }
2305
2306 while (1) {
2307 const struct dirent* de = readdir(dir);
2308 if (!de)
2309 break;
2310
2311 const char* dn = de->d_name;
2312
2313 //if (dn[0]!='m' || dn[1]!='h')
2314 //continue;
2315
2316 const char* s;
2317
2318 s = strstr(dn, "mh_");
2319 if (!s || s!=dn)
2320 continue;
2321
2322 s = strstr(dn, ".sqlite3");
2323 if (!s || s[8]!=0)
2324 continue;
2325
2326 char table_name[256];
2327 mstrlcpy(table_name, dn+3, sizeof(table_name));
2328 // FIXME: skip names like "xxx.sqlite3~" and "xxx.sqlite3-deleted"
2329 char* ss = strstr(table_name, ".sqlite3");
2330 if (!ss)
2331 continue;
2332 *ss = 0;
2333
2334 //printf("dn [%s] tn [%s]\n", dn, table_name);
2335
2336 status = Prepare(table_name, cmd);
2337 if (status != DB_SUCCESS)
2338 continue;
2339
2340 while (1) {
2341 status = Step();
2342 if (status != DB_SUCCESS)
2343 break;
2344
2345 const char* tn = GetText(0);
2346 //printf("table [%s]\n", tn);
2347 plist->push_back(tn);
2348 }
2349
2350 status = Finalize();
2351 }
2352
2353 closedir(dir);
2354 dir = NULL;
2355
2356 return DB_SUCCESS;
2357}
2358
2359int Sqlite::ListColumns(const char* table, std::vector<std::string> *plist)
2360{
2361 if (!fIsConnected)
2362 return DB_FILE_ERROR;
2363
2364 if (fDebug)
2365 printf("Sqlite::ListColumns for table \'%s\'\n", table);
2366
2367 std::string cmd;
2368 cmd = "PRAGMA table_info(";
2369 cmd += table;
2370 cmd += ");";
2371
2372 int status;
2373
2374 status = Prepare(table, cmd.c_str());
2375 if (status != DB_SUCCESS)
2376 return status;
2377
2378 while (1) {
2379 status = Step();
2380 if (status != DB_SUCCESS)
2381 break;
2382
2383 const char* colname = GetText(1);
2384 const char* coltype = GetText(2);
2385 //printf("column [%s] [%s]\n", colname, coltype);
2386 plist->push_back(colname); // column name
2387 plist->push_back(coltype); // column type
2388 }
2389
2390 status = Finalize();
2391
2392 return DB_SUCCESS;
2393}
2394
2395static int callback_debug = 0;
2396
2397static int callback(void *NotUsed, int argc, char **argv, char **azColName){
2398 if (callback_debug) {
2399 printf("history_sqlite::callback---->\n");
2400 for (int i=0; i<argc; i++){
2401 printf("history_sqlite::callback[%d] %s = %s\n", i, azColName[i], argv[i] ? argv[i] : "NULL");
2402 }
2403 }
2404 return 0;
2405}
2406
2407int Sqlite::Exec(const char* table_name, const char* sql)
2408{
2409 // return values:
2410 // DB_SUCCESS
2411 // DB_FILE_ERROR: not connected
2412 // DB_KEY_EXIST: "table already exists"
2413
2414 if (!fIsConnected)
2415 return DB_FILE_ERROR;
2416
2417 sqlite3* db = GetTable(table_name);
2418 if (!db)
2419 return DB_FILE_ERROR;
2420
2421 if (fDebug)
2422 printf("Sqlite::Exec(%s, %s)\n", table_name, sql);
2423
2424 int status;
2425
2426 callback_debug = fDebug;
2427 char* errmsg = NULL;
2428
2429 status = sqlite3_exec(db, sql, callback, 0, &errmsg);
2430 if (status != SQLITE_OK) {
2431 if (status == SQLITE_ERROR && strstr(errmsg, "duplicate column name"))
2432 return DB_KEY_EXIST;
2433 if (status == SQLITE_ERROR && strstr(errmsg, "already exists"))
2434 return DB_KEY_EXIST;
2435 std::string sqlstring = sql;
2436 cm_msg(MERROR, "Sqlite::Exec", "Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(), status, errmsg);
2438 return DB_FILE_ERROR;
2439 }
2440
2441 return DB_SUCCESS;
2442}
2443
2444int Sqlite::ExecDisconnected(const char* table_name, const char* sql)
2445{
2446 cm_msg(MERROR, "Sqlite::Exec", "sqlite driver does not support disconnected operations");
2447 return DB_FILE_ERROR;
2448}
2449
2450#endif // HAVE_SQLITE
2451
2453// Methods of HsFileSchema //
2455
2456int HsFileSchema::write_event(const time_t t, const char* data, const int data_size)
2457{
2458 HsFileSchema* s = this;
2459
2460 int status;
2461
2462 if (s->writer_fd < 0) {
2463 s->writer_fd = open(s->file_name.c_str(), O_RDWR);
2464 if (s->writer_fd < 0) {
2465 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', open() errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2466 return HS_FILE_ERROR;
2467 }
2468
2469 int file_size = lseek(s->writer_fd, 0, SEEK_END);
2470
2471 int nrec = (file_size - s->data_offset)/s->record_size;
2472 if (nrec < 0)
2473 nrec = 0;
2474 int data_end = s->data_offset + nrec*s->record_size;
2475
2476 //printf("file_size %d, nrec %d, data_end %d\n", file_size, nrec, data_end);
2477
2478 if (data_end != file_size) {
2479 if (nrec > 0)
2480 cm_msg(MERROR, "FileHistory::write_event", "File \'%s\' may be truncated, data offset %d, record size %d, file size: %d, should be %d, truncating the file", s->file_name.c_str(), s->data_offset, s->record_size, file_size, data_end);
2481
2483 if (status < 0) {
2484 cm_msg(MERROR, "FileHistory::write_event", "Cannot seek \'%s\' to offset %d, lseek() errno %d (%s)", s->file_name.c_str(), data_end, errno, strerror(errno));
2485 return HS_FILE_ERROR;
2486 }
2488 if (status < 0) {
2489 cm_msg(MERROR, "FileHistory::write_event", "Cannot truncate \'%s\' to size %d, ftruncate() errno %d (%s)", s->file_name.c_str(), data_end, errno, strerror(errno));
2490 return HS_FILE_ERROR;
2491 }
2492 }
2493 }
2494
2495 int expected_size = s->record_size - 4;
2496
2497 // sanity check: record_size and n_bytes are computed from the byte counts in the file header
2498 assert(expected_size == s->n_bytes);
2499
2500 if (s->last_size == 0)
2502
2503 if (data_size != s->last_size) {
2504 cm_msg(MERROR, "FileHistory::write_event", "Event \'%s\' data size mismatch, expected %d bytes, got %d bytes, previously %d bytes", s->event_name.c_str(), expected_size, data_size, s->last_size);
2505 //printf("schema:\n");
2506 //s->print();
2507
2508 if (data_size < expected_size)
2509 return HS_FILE_ERROR;
2510
2511 // truncate for now
2512 // data_size = expected_size;
2513 s->last_size = data_size;
2514 }
2515
2516 int size = 4 + expected_size;
2517
2518 if (size != s->record_buffer_size) {
2519 s->record_buffer = (char*)realloc(s->record_buffer, size);
2520 assert(s->record_buffer != NULL);
2521 s->record_buffer_size = size;
2522 }
2523
2524 memcpy(s->record_buffer, &t, 4);
2526
2527 status = write(s->writer_fd, s->record_buffer, size);
2528 if (status != size) {
2529 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) returned %d, errno %d (%s)", s->file_name.c_str(), size, status, errno, strerror(errno));
2530 return HS_FILE_ERROR;
2531 }
2532
2533#if 0
2534 status = write(s->writer_fd, &t, 4);
2535 if (status != 4) {
2536 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2537 return HS_FILE_ERROR;
2538 }
2539
2541 if (status != expected_size) {
2542 cm_msg(MERROR, "FileHistory::write_event", "Cannot write to \'%s\', write(%d) errno %d (%s)", s->file_name.c_str(), data_size, errno, strerror(errno));
2543 return HS_FILE_ERROR;
2544 }
2545#endif
2546
2547 return HS_SUCCESS;
2548}
2549
2551{
2552 if (writer_fd >= 0) {
2554 writer_fd = -1;
2555 }
2556 return HS_SUCCESS;
2557}
2558
2559static int ReadRecord(const char* file_name, int fd, int offset, int recsize, int irec, char* rec)
2560{
2561 int status;
2562 int fpos = offset + irec*recsize;
2563
2564 status = ::lseek(fd, fpos, SEEK_SET);
2565 if (status == -1) {
2566 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', lseek(%d) errno %d (%s)", file_name, fpos, errno, strerror(errno));
2567 return -1;
2568 }
2569
2570 status = ::read(fd, rec, recsize);
2571 if (status == 0) {
2572 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', unexpected end of file on read()", file_name);
2573 return -1;
2574 }
2575 if (status == -1) {
2576 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', read() errno %d (%s)", file_name, errno, strerror(errno));
2577 return -1;
2578 }
2579 if (status != recsize) {
2580 cm_msg(MERROR, "FileHistory::ReadRecord", "Cannot read \'%s\', short read() returned %d instead of %d bytes", file_name, status, recsize);
2581 return -1;
2582 }
2583 return HS_SUCCESS;
2584}
2585
2586static int FindTime(const char* file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int* i1p, time_t* t1p, int* i2p, time_t* t2p, time_t* tstart, time_t* tend, int debug)
2587{
2588 //
2589 // purpose: find location time timestamp inside given file.
2590 // uses binary search
2591 // returns:
2592 // tstart, tend - time of first and last data in a file
2593 // i1p,t1p - data just before timestamp, used as "last_written"
2594 // i2p,t2p - data at timestamp or after timestamp, used as starting point to read data from file
2595 // assertions:
2596 // tstart <= t1p < t2p <= tend
2597 // i1p+1==i2p
2598 // t1p < timestamp <= t2p
2599 //
2600 // special cases:
2601 // 1) timestamp <= tstart - all data is in the future, return i1p==-1, t1p==-1, i2p==0, t2p==tstart
2602 // 2) tend < timestamp - all the data is in the past, return i1p = nrec-1, t1p = tend, i2p = nrec, t2p = 0;
2603 // 3) nrec == 1 only one record in this file and it is older than the timestamp (tstart == tend < timestamp)
2604 //
2605
2606 int status;
2607 char* buf = new char[recsize];
2608
2609 assert(nrec > 0);
2610
2611 int rec1 = 0;
2612 int rec2 = nrec-1;
2613
2615 if (status != HS_SUCCESS) {
2616 delete[] buf;
2617 return HS_FILE_ERROR;
2618 }
2619
2620 time_t t1 = *(DWORD*)buf;
2621
2622 *tstart = t1;
2623
2624 // timestamp is older than any data in this file
2625 if (timestamp <= t1) {
2626 *i1p = -1;
2627 *t1p = 0;
2628 *i2p = 0;
2629 *t2p = t1;
2630 *tend = 0;
2631 delete[] buf;
2632 return HS_SUCCESS;
2633 }
2634
2635 assert(t1 < timestamp);
2636
2637 if (nrec == 1) {
2638 *i1p = 0;
2639 *t1p = t1;
2640 *i2p = nrec; // == 1
2641 *t2p = 0;
2642 *tend = t1;
2643 delete[] buf;
2644 return HS_SUCCESS;
2645 }
2646
2648 if (status != HS_SUCCESS) {
2649 delete[] buf;
2650 return HS_FILE_ERROR;
2651 }
2652
2653 time_t t2 = *(DWORD*)buf;
2654
2655 *tend = t2;
2656
2657 // all the data is in the past
2658 if (t2 < timestamp) {
2659 *i1p = rec2;
2660 *t1p = t2;
2661 *i2p = nrec;
2662 *t2p = 0;
2663 delete[] buf;
2664 return HS_SUCCESS;
2665 }
2666
2667 assert(t1 < timestamp);
2668 assert(timestamp <= t2);
2669
2670 if (debug)
2671 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2672
2673 // implement binary search
2674
2675 do {
2676 int rec = (rec1+rec2)/2;
2677
2678 assert(rec >= 0);
2679 assert(rec < nrec);
2680
2682 if (status != HS_SUCCESS) {
2683 delete[] buf;
2684 return HS_FILE_ERROR;
2685 }
2686
2687 time_t t = *(DWORD*)buf;
2688
2689 if (timestamp <= t) {
2690 if (debug)
2691 printf("FindTime: rec %d..(x)..%d..%d, time %s..(%s)..%s..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t).c_str(), TimeToString(t2).c_str());
2692
2693 rec2 = rec;
2694 t2 = t;
2695 } else {
2696 if (debug)
2697 printf("FindTime: rec %d..%d..(x)..%d, time %s..%s..(%s)..%s\n", rec1, rec, rec2, TimeToString(t1).c_str(), TimeToString(t).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2698
2699 rec1 = rec;
2700 t1 = t;
2701 }
2702 } while (rec2 - rec1 > 1);
2703
2704 assert(rec1+1 == rec2);
2705 assert(t1 < timestamp);
2706 assert(timestamp <= t2);
2707
2708 if (debug)
2709 printf("FindTime: rec %d..(x)..%d, time %s..(%s)..%s, this is the result.\n", rec1, rec2, TimeToString(t1).c_str(), TimeToString(timestamp).c_str(), TimeToString(t2).c_str());
2710
2711 *i1p = rec1;
2712 *t1p = t1;
2713
2714 *i2p = rec2;
2715 *t2p = t2;
2716
2717 delete[] buf;
2718 return HS_SUCCESS;
2719}
2720
2722 const int debug,
2723 time_t* last_written)
2724{
2725 int status;
2726 HsFileSchema* s = this;
2727
2728 if (debug)
2729 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(timestamp).c_str());
2730
2731 int fd = open(s->file_name.c_str(), O_RDONLY);
2732 if (fd < 0) {
2733 cm_msg(MERROR, "FileHistory::read_last_written", "Cannot read \'%s\', open() errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2734 return HS_FILE_ERROR;
2735 }
2736
2737 int file_size = ::lseek(fd, 0, SEEK_END);
2738
2739 int nrec = (file_size - s->data_offset)/s->record_size;
2740 if (nrec < 0)
2741 nrec = 0;
2742
2743 if (nrec < 1) {
2744 ::close(fd);
2745 if (last_written)
2746 *last_written = 0;
2747 return HS_SUCCESS;
2748 }
2749
2750 time_t lw = 0;
2751
2752 // read last record to check if desired time is inside or outside of the file
2753
2754 if (1) {
2755 char* buf = new char[s->record_size];
2756
2757 status = ReadRecord(s->file_name.c_str(), fd, s->data_offset, s->record_size, nrec - 1, buf);
2758 if (status != HS_SUCCESS) {
2759 delete[] buf;
2760 ::close(fd);
2761 return HS_FILE_ERROR;
2762 }
2763
2764 lw = *(DWORD*)buf;
2765
2766 delete[] buf;
2767 }
2768
2769 if (lw >= timestamp) {
2770 int irec = 0;
2771 time_t trec = 0;
2772 int iunused = 0;
2773 time_t tunused = 0;
2774 time_t tstart = 0; // not used
2775 time_t tend = 0; // not used
2776
2777 status = FindTime(s->file_name.c_str(), fd, s->data_offset, s->record_size, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*debug);
2778 if (status != HS_SUCCESS) {
2779 ::close(fd);
2780 return HS_FILE_ERROR;
2781 }
2782
2783 assert(trec < timestamp);
2784
2785 lw = trec;
2786 }
2787
2788 if (last_written)
2789 *last_written = lw;
2790
2791 if (debug)
2792 printf("FileHistory::read_last_written: file %s, schema time %s..%s, timestamp %s, last_written %s\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
2793
2794 assert(lw < timestamp);
2795
2796 ::close(fd);
2797
2798 return HS_SUCCESS;
2799}
2800
2801int HsFileSchema::read_data(const time_t start_time,
2802 const time_t end_time,
2803 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
2804 const int debug,
2805 std::vector<time_t>& last_time,
2807{
2808 HsFileSchema* s = this;
2809
2810 if (debug)
2811 printf("FileHistory::read_data: file %s, schema time %s..%s, read time %s..%s, %d vars\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var);
2812
2813 //if (1) {
2814 // printf("Last time: ");
2815 // for (int i=0; i<num_var; i++) {
2816 // printf(" %s", TimeToString(last_time[i]).c_str());
2817 // }
2818 // printf("\n");
2819 //}
2820
2821 if (debug) {
2822 printf("FileHistory::read_data: file %s map", s->file_name.c_str());
2823 for (size_t i=0; i<var_schema_index.size(); i++) {
2824 printf(" %2d", var_schema_index[i]);
2825 }
2826 printf("\n");
2827 }
2828
2829 int fd = ::open(s->file_name.c_str(), O_RDONLY);
2830 if (fd < 0) {
2831 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', open() errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2832 return HS_FILE_ERROR;
2833 }
2834
2835 off_t file_size = ::lseek(fd, 0, SEEK_END);
2836
2837 if (file_size == (off_t)-1) {
2838 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', fseek(SEEK_END) errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2839 ::close(fd);
2840 return HS_FILE_ERROR;
2841 }
2842
2843 int nrec = (file_size - s->data_offset)/s->record_size;
2844 if (nrec < 0)
2845 nrec = 0;
2846
2847 if (nrec < 1) {
2848 ::close(fd);
2849 return HS_SUCCESS;
2850 }
2851
2852 int iunused = 0;
2853 time_t tunused = 0;
2854 int irec = 0;
2855 time_t trec = 0;
2856 time_t tstart = 0;
2857 time_t tend = 0;
2858
2859 int status = FindTime(s->file_name.c_str(), fd, s->data_offset, s->record_size, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*debug);
2860
2861 if (status != HS_SUCCESS) {
2862 ::close(fd);
2863 return HS_FILE_ERROR;
2864 }
2865
2866 if (debug) {
2867 printf("FindTime %d, nrec %d, (%d, %s) (%d, %s), tstart %s, tend %s, want %s\n", status, nrec, iunused, TimeToString(tunused).c_str(), irec, TimeToString(trec).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str(), TimeToString(start_time).c_str());
2868 }
2869
2870 if (irec < 0 || irec >= nrec) {
2871 // all data in this file is older than start_time
2872
2873 ::close(fd);
2874
2875 if (debug)
2876 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(tstart).c_str(), TimeToString(tend).c_str());
2877
2878 return HS_SUCCESS;
2879 }
2880
2882 // data starts before time declared in schema
2883
2884 ::close(fd);
2885
2886 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of first data %s is before schema start time %s", s->file_name.c_str(), TimeToString(tstart).c_str(), TimeToString(s->time_from).c_str());
2887
2888 return HS_FILE_ERROR;
2889 }
2890
2891 if (tend && s->time_to && tend > s->time_to) {
2892 // data ends after time declared in schema (overlaps with next file)
2893
2894 ::close(fd);
2895
2896 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': timestamp of last data %s is after schema end time %s", s->file_name.c_str(), TimeToString(tend).c_str(), TimeToString(s->time_to).c_str());
2897
2898 return HS_FILE_ERROR;
2899 }
2900
2901 for (int i=0; i<num_var; i++) {
2902 int si = var_schema_index[i];
2903 if (si < 0)
2904 continue;
2905
2906 if (trec < last_time[i]) { // protect against duplicate and non-monotonous data
2907 ::close(fd);
2908
2909 cm_msg(MERROR, "FileHistory::read_data", "Internal history error at file \'%s\': variable %d data timestamp %s is before last timestamp %s", s->file_name.c_str(), i, TimeToString(trec).c_str(), TimeToString(last_time[i]).c_str());
2910
2911 return HS_FILE_ERROR;
2912 }
2913 }
2914
2915 int count = 0;
2916
2918
2919 off_t xpos = ::lseek(fd, fpos, SEEK_SET);
2920 if (xpos == (off_t)-1) {
2921 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', lseek(%zu) errno %d (%s)", s->file_name.c_str(), (size_t)fpos, errno, strerror(errno));
2922 ::close(fd);
2923 return HS_FILE_ERROR;
2924 }
2925
2926 char* buf = new char[s->record_size];
2927
2928 int prec = irec;
2929
2930 while (1) {
2931 status = ::read(fd, buf, s->record_size);
2932 if (status == 0) // EOF
2933 break;
2934 if (status == -1) {
2935 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', read() errno %d (%s)", s->file_name.c_str(), errno, strerror(errno));
2936 break;
2937 }
2938 if (status != s->record_size) {
2939 cm_msg(MERROR, "FileHistory::read_data", "Cannot read \'%s\', short read() returned %d instead of %d bytes", s->file_name.c_str(), status, s->record_size);
2940 break;
2941 }
2942
2943 prec++;
2944
2945 bool past_end_of_last_file = (s->time_to == 0) && (prec > nrec);
2946
2947 time_t t = *(DWORD*)buf;
2948
2949 if (debug > 1)
2950 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, row time %s\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), TimeToString(t).c_str());
2951
2952 if (t < trec) {
2953 delete[] buf;
2954 ::close(fd);
2955 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before start time %s", s->file_name.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(trec).c_str());
2956 return HS_FILE_ERROR;
2957 }
2958
2959 if (tend && (t > tend) && !past_end_of_last_file) {
2960 delete[] buf;
2961 ::close(fd);
2962 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is after last timestamp %s", s->file_name.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(tend).c_str());
2963 return HS_FILE_ERROR;
2964 }
2965
2966 if (t > end_time)
2967 break;
2968
2969 char* data = buf + 4;
2970
2971 for (int i=0; i<num_var; i++) {
2972 int si = var_schema_index[i];
2973 if (si < 0)
2974 continue;
2975
2976 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
2977 delete[] buf;
2978 ::close(fd);
2979
2980 cm_msg(MERROR, "FileHistory::read_data", "Bad history file \'%s\': record %d timestamp %s is before timestamp %s of variable %d", s->file_name.c_str(), irec + count, TimeToString(t).c_str(), TimeToString(last_time[i]).c_str(), i);
2981
2982 return HS_FILE_ERROR;
2983 }
2984
2985 double v = 0;
2986 void* ptr = data + s->offsets[si];
2987
2988 int ii = var_index[i];
2989 assert(ii >= 0);
2990 assert(ii < s->variables[si].n_data);
2991
2992 switch (s->variables[si].type) {
2993 default:
2994 // unknown data type
2995 v = 0;
2996 break;
2997 case TID_BYTE:
2998 v = ((unsigned char*)ptr)[ii];
2999 break;
3000 case TID_SBYTE:
3001 v = ((signed char *)ptr)[ii];
3002 break;
3003 case TID_CHAR:
3004 v = ((char*)ptr)[ii];
3005 break;
3006 case TID_WORD:
3007 v = ((unsigned short *)ptr)[ii];
3008 break;
3009 case TID_SHORT:
3010 v = ((signed short *)ptr)[ii];
3011 break;
3012 case TID_DWORD:
3013 v = ((unsigned int *)ptr)[ii];
3014 break;
3015 case TID_INT:
3016 v = ((int *)ptr)[ii];
3017 break;
3018 case TID_BOOL:
3019 v = ((unsigned int *)ptr)[ii];
3020 break;
3021 case TID_FLOAT:
3022 v = ((float*)ptr)[ii];
3023 break;
3024 case TID_DOUBLE:
3025 v = ((double*)ptr)[ii];
3026 break;
3027 }
3028
3029 buffer[i]->Add(t, v);
3030 last_time[i] = t;
3031 }
3032 count++;
3033 }
3034
3035 delete[] buf;
3036
3037 ::close(fd);
3038
3039 if (debug) {
3040 printf("FileHistory::read_data: file %s map", s->file_name.c_str());
3041 for (size_t i=0; i<var_schema_index.size(); i++) {
3042 printf(" %2d", var_schema_index[i]);
3043 }
3044 printf(" read %d rows\n", count);
3045 }
3046
3047 if (debug)
3048 printf("FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->file_name.c_str(), TimeToString(s->time_from).c_str(), TimeToString(s->time_to).c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), num_var, count);
3049
3050 return HS_SUCCESS;
3051}
3052
3054// Implementation of the MidasHistoryInterface //
3056
3058{
3059protected:
3061 std::string fConnectString;
3062
3063 // writer data
3065 std::vector<HsSchema*> fEvents;
3066
3067 // reader data
3069
3070public:
3072 {
3073 fDebug = 0;
3074 }
3075
3077 {
3078 for (unsigned i=0; i<fEvents.size(); i++)
3079 if (fEvents[i]) {
3080 delete fEvents[i];
3081 fEvents[i] = NULL;
3082 }
3083 fEvents.clear();
3084 }
3085
3086 virtual int hs_set_debug(int debug)
3087 {
3088 int old = fDebug;
3089 fDebug = debug;
3090 return old;
3091 }
3092
3093 virtual int hs_connect(const char* connect_string) = 0;
3094 virtual int hs_disconnect() = 0;
3095
3096protected:
3098 // Schema functions //
3100
3101 // load existing schema
3102 virtual int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp) = 0; // event_name: =NULL means read only event names, =event_name means load tag names for all matching events; timestamp: =0 means read all schema all the way to the beginning of time, =time means read schema in effect at this time and all newer schema
3103
3104 // return a new copy of a schema for writing into history. If schema for this event does not exist, it will be created
3105 virtual HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]) = 0;
3106
3107public:
3109 // Functions used by mlogger //
3111
3112 int hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
3113 int hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer);
3114 int hs_flush_buffers();
3115
3117 // Functions used by mhttpd //
3119
3120 int hs_clear_cache();
3121 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3122 int hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags);
3123 int hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[]);
3124 int hs_read_buffer(time_t start_time, time_t end_time,
3125 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3127 int hs_status[]);
3128
3129 /*------------------------------------------------------------------*/
3130
3132 {
3133 public:
3137
3139
3143 double **fDataBuffer;
3144
3146
3148 {
3149 fNumAdded = 0;
3150
3154
3155 fNumAlloc = 0;
3156 fNumEntries = NULL;
3157 fTimeBuffer = NULL;
3158 fDataBuffer = NULL;
3159
3160 fPrevTime = 0;
3161 }
3162
3163 ~ReadBuffer() // dtor
3164 {
3165 }
3166
3168 {
3169 if (wantalloc < fNumAlloc - 10)
3170 return;
3171
3172 int newalloc = fNumAlloc*2;
3173
3174 if (newalloc <= 1000)
3175 newalloc = wantalloc + 1000;
3176
3177 //printf("wantalloc %d, fNumEntries %d, fNumAlloc %d, newalloc %d\n", wantalloc, *fNumEntries, fNumAlloc, newalloc);
3178
3180 assert(*fTimeBuffer);
3181
3182 *fDataBuffer = (double*)realloc(*fDataBuffer, sizeof(double)*newalloc);
3183 assert(*fDataBuffer);
3184
3186 }
3187
3188 void Add(time_t t, double v)
3189 {
3190 if (t < fFirstTime)
3191 return;
3192 if (t > fLastTime)
3193 return;
3194
3195 fNumAdded++;
3196
3197 if ((fPrevTime==0) || (t >= fPrevTime + fInterval)) {
3198 int pos = *fNumEntries;
3199
3200 Realloc(pos + 1);
3201
3202 (*fTimeBuffer)[pos] = t;
3203 (*fDataBuffer)[pos] = v;
3204
3205 (*fNumEntries) = pos + 1;
3206
3207 fPrevTime = t;
3208 }
3209 }
3210
3211 void Finish()
3212 {
3213
3214 }
3215 };
3216
3217 /*------------------------------------------------------------------*/
3218
3219 int hs_read(time_t start_time, time_t end_time, time_t interval,
3220 int num_var,
3221 const char* const event_name[], const char* const var_name[], const int var_index[],
3222 int num_entries[],
3223 time_t* time_buffer[], double* data_buffer[],
3224 int st[]);
3225 /*------------------------------------------------------------------*/
3226
3227
3228 int hs_read_binned(time_t start_time, time_t end_time, int num_bins,
3229 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3230 int num_entries[],
3231 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3234 time_t last_time[], double last_value[],
3235 int st[]);
3236};
3237
3239{
3240 fNumEntries = 0;
3241
3245
3246 fSum0 = new double[num_bins];
3247 fSum1 = new double[num_bins];
3248 fSum2 = new double[num_bins];
3249
3250 for (int i=0; i<num_bins; i++) {
3251 fSum0[i] = 0;
3252 fSum1[i] = 0;
3253 fSum2[i] = 0;
3254 }
3255}
3256
3258{
3259 delete fSum0; fSum0 = NULL;
3260 delete fSum1; fSum1 = NULL;
3261 delete fSum2; fSum2 = NULL;
3262 // poison the pointers
3263 fCount = NULL;
3264 fMean = NULL;
3265 fRms = NULL;
3266 fMin = NULL;
3267 fMax = NULL;
3274}
3275
3277{
3278 for (int ibin = 0; ibin < fNumBins; ibin++) {
3279 if (fMin)
3280 fMin[ibin] = 0;
3281 if (fMax)
3282 fMax[ibin] = 0;
3283 if (fBinsFirstTime)
3284 fBinsFirstTime[ibin] = 0;
3285 if (fBinsFirstValue)
3286 fBinsFirstValue[ibin] = 0;
3287 if (fBinsLastTime)
3288 fBinsLastTime[ibin] = 0;
3289 if (fBinsLastValue)
3290 fBinsLastValue[ibin] = 0;
3291 }
3292 if (fLastTimePtr)
3293 *fLastTimePtr = 0;
3294 if (fLastValuePtr)
3295 *fLastValuePtr = 0;
3296}
3297
3299{
3300 if (t < fFirstTime)
3301 return;
3302 if (t > fLastTime)
3303 return;
3304
3305 fNumEntries++;
3306
3307 double a = (double)(t - fFirstTime);
3308 double b = (double)(fLastTime - fFirstTime);
3309 double fbin = fNumBins*a/b;
3310
3311 int ibin = (int)fbin;
3312
3313 if (ibin < 0)
3314 ibin = 0;
3315 else if (ibin >= fNumBins)
3316 ibin = fNumBins-1;
3317
3318 if (fSum0[ibin] == 0) {
3319 if (fMin)
3320 fMin[ibin] = v;
3321 if (fMax)
3322 fMax[ibin] = v;
3323 if (fBinsFirstTime)
3324 fBinsFirstTime[ibin] = t;
3325 if (fBinsFirstValue)
3326 fBinsFirstValue[ibin] = v;
3327 if (fBinsLastTime)
3328 fBinsLastTime[ibin] = t;
3329 if (fBinsLastValue)
3330 fBinsLastValue[ibin] = v;
3331 if (fLastTimePtr)
3332 *fLastTimePtr = t;
3333 if (fLastValuePtr)
3334 *fLastValuePtr = v;
3335 }
3336
3337 fSum0[ibin] += 1.0;
3338 fSum1[ibin] += v;
3339 fSum2[ibin] += v*v;
3340
3341 if (fMin)
3342 if (v < fMin[ibin])
3343 fMin[ibin] = v;
3344
3345 if (fMax)
3346 if (v > fMax[ibin])
3347 fMax[ibin] = v;
3348
3349 // NOTE: this assumes t and v are sorted by time.
3350 if (fBinsLastTime)
3351 fBinsLastTime[ibin] = t;
3352 if (fBinsLastValue)
3353 fBinsLastValue[ibin] = v;
3354
3355 if (fLastTimePtr)
3356 if (t > *fLastTimePtr) {
3357 *fLastTimePtr = t;
3358 if (fLastValuePtr)
3359 *fLastValuePtr = v;
3360 }
3361}
3362
3364{
3365 for (int i=0; i<fNumBins; i++) {
3366 double num = fSum0[i];
3367 double mean = 0;
3368 double variance = 0;
3369 if (num > 0) {
3370 mean = fSum1[i]/num;
3372 }
3373 double rms = 0;
3374 if (variance > 0)
3375 rms = sqrt(variance);
3376
3377 if (fCount)
3378 fCount[i] = (int)num;
3379
3380 if (fMean)
3381 fMean[i] = mean;
3382
3383 if (fRms)
3384 fRms[i] = rms;
3385
3386 if (num == 0) {
3387 if (fMin)
3388 fMin[i] = 0;
3389 if (fMax)
3390 fMax[i] = 0;
3391 }
3392 }
3393}
3394
3395int SchemaHistoryBase::hs_define_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
3396{
3397 if (fDebug) {
3398 printf("hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3399 if (fDebug > 1)
3400 PrintTags(ntags, tags);
3401 }
3402
3403 // delete all events with the same name
3404 for (unsigned int i=0; i<fEvents.size(); i++)
3405 if (fEvents[i])
3406 if (event_name_cmp(fEvents[i]->event_name, event_name)==0) {
3407 if (fDebug)
3408 printf("deleting exising event %s\n", event_name);
3409 fEvents[i]->close();
3410 delete fEvents[i];
3411 fEvents[i] = NULL;
3412 }
3413
3414 // check for wrong types etc
3415 for (int i=0; i<ntags; i++) {
3416 if (strlen(tags[i].name) < 1) {
3417 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' has empty name at index %d", event_name, i);
3418 return HS_FILE_ERROR;
3419 }
3420 if (tags[i].type <= 0 || tags[i].type >= TID_LAST) {
3421 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3422 event_name, tags[i].name, i, tags[i].type);
3423 return HS_FILE_ERROR;
3424 }
3425 if (tags[i].type == TID_STRING) {
3426 cm_msg(MERROR, "hs_define_event",
3427 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3428 tags[i].name, i);
3429 return HS_FILE_ERROR;
3430 }
3431 if (tags[i].n_data <= 0) {
3432 cm_msg(MERROR, "hs_define_event", "Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3433 event_name, tags[i].name, i, tags[i].n_data);
3434 return HS_FILE_ERROR;
3435 }
3436 }
3437
3438 // check for duplicate names. Done by sorting, since this takes only O(N*log*N))
3439 std::vector<std::string> names;
3440 for (int i=0; i<ntags; i++) {
3441 std::string str(tags[i].name);
3442 std::transform(str.begin(), str.end(), str.begin(), ::toupper);
3443 names.push_back(str);
3444 }
3445 std::sort(names.begin(), names.end());
3446 for (int i=0; i<ntags-1; i++) {
3447 if (names[i] == names[i + 1]) {
3448 cm_msg(MERROR, "hs_define_event",
3449 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3450 names[i].c_str());
3451 return HS_FILE_ERROR;
3452 }
3453 }
3454
3455 HsSchema* s = new_event(event_name, timestamp, ntags, tags);
3456 if (!s)
3457 return HS_FILE_ERROR;
3458
3459 s->disabled = false;
3460
3461 // keep only active variables
3462 std::vector<HsSchemaEntry> active_vars;
3463
3464 for (auto& var : s->variables) {
3465 if (!var.inactive) {
3466 active_vars.push_back(var);
3467 }
3468 }
3469
3471
3472 // find empty slot in events list
3473 for (unsigned int i=0; i<fEvents.size(); i++)
3474 if (!fEvents[i]) {
3475 fEvents[i] = s;
3476 s = NULL;
3477 break;
3478 }
3479
3480 // if no empty slots, add at the end
3481 if (s)
3482 fEvents.push_back(s);
3483
3484 return HS_SUCCESS;
3485}
3486
3487int SchemaHistoryBase::hs_write_event(const char* event_name, time_t timestamp, int buffer_size, const char* buffer)
3488{
3489 if (fDebug)
3490 printf("hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (int)timestamp, buffer_size);
3491
3492 HsSchema *s = NULL;
3493
3494 // find this event
3495 for (size_t i=0; i<fEvents.size(); i++)
3496 if (fEvents[i] && (event_name_cmp(fEvents[i]->event_name, event_name)==0)) {
3497 s = fEvents[i];
3498 break;
3499 }
3500
3501 // not found
3502 if (!s)
3503 return HS_UNDEFINED_EVENT;
3504
3505 // deactivated because of error?
3506 if (s->disabled)
3507 return HS_FILE_ERROR;
3508
3509 if (s->n_bytes == 0) { // compute expected data size
3510 // NB: history data does not have any padding!
3511 for (unsigned i=0; i<s->variables.size(); i++) {
3512 s->n_bytes += s->variables[i].n_bytes;
3513 }
3514 }
3515
3516 int status;
3517
3518 if (buffer_size > s->n_bytes) { // too many bytes!
3519 if (s->count_write_oversize == 0) {
3520 // only report first occurence
3521 // count of all occurences is reported by HsSchema destructor
3522 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->event_name.c_str(), s->n_bytes, buffer_size);
3523 }
3525 if (buffer_size > s->write_max_size)
3526 s->write_max_size = buffer_size;
3527 status = s->write_event(timestamp, buffer, s->n_bytes);
3528 } else if (buffer_size < s->n_bytes) { // too few bytes
3529 if (s->count_write_undersize == 0) {
3530 // only report first occurence
3531 // count of all occurences is reported by HsSchema destructor
3532 cm_msg(MERROR, "hs_write_event", "Event \'%s\' data size mismatch: expected %d bytes, got %d bytes", s->event_name.c_str(), s->n_bytes, buffer_size);
3533 }
3535 if (s->write_min_size == 0)
3536 s->write_min_size = buffer_size;
3537 else if (buffer_size < s->write_min_size)
3538 s->write_min_size = buffer_size;
3539 char* tmp = (char*)malloc(s->n_bytes);
3540 memcpy(tmp, buffer, buffer_size);
3541 memset(tmp + buffer_size, 0, s->n_bytes - buffer_size);
3542 status = s->write_event(timestamp, tmp, s->n_bytes);
3543 free(tmp);
3544 } else {
3545 assert(buffer_size == s->n_bytes); // obviously
3546 status = s->write_event(timestamp, buffer, buffer_size);
3547 }
3548
3549 // if could not write event, deactivate it
3550 if (status != HS_SUCCESS) {
3551 s->disabled = true;
3552 cm_msg(MERROR, "hs_write_event", "Event \'%s\' disabled after write error %d", event_name, status);
3553 return HS_FILE_ERROR;
3554 }
3555
3556 return HS_SUCCESS;
3557}
3558
3560{
3561 int status = HS_SUCCESS;
3562
3563 if (fDebug)
3564 printf("hs_flush_buffers!\n");
3565
3566 for (unsigned int i=0; i<fEvents.size(); i++)
3567 if (fEvents[i]) {
3568 int xstatus = fEvents[i]->flush_buffers();
3569 if (xstatus != HS_SUCCESS)
3570 status = xstatus;
3571 }
3572
3573 return status;
3574}
3575
3577// Functions used by mhttpd //
3579
3581{
3582 if (fDebug)
3583 printf("SchemaHistoryBase::hs_clear_cache!\n");
3584
3586 fSchema.clear();
3587
3588 return HS_SUCCESS;
3589}
3590
3591int SchemaHistoryBase::hs_get_events(time_t t, std::vector<std::string> *pevents)
3592{
3593 if (fDebug)
3594 printf("hs_get_events, time %s\n", TimeToString(t).c_str());
3595
3596 int status = read_schema(&fSchema, NULL, t);
3597 if (status != HS_SUCCESS)
3598 return status;
3599
3600 if (fDebug) {
3601 printf("hs_get_events: available schema:\n");
3602 fSchema.print(false);
3603 }
3604
3605 assert(pevents);
3606
3607 for (unsigned i=0; i<fSchema.size(); i++) {
3608 HsSchema* s = fSchema[i];
3609 if (t && s->time_to && s->time_to < t)
3610 continue;
3611 bool dupe = false;
3612 for (unsigned j=0; j<pevents->size(); j++)
3613 if (event_name_cmp((*pevents)[j], s->event_name.c_str())==0) {
3614 dupe = true;
3615 break;
3616 }
3617
3618 if (!dupe)
3619 pevents->push_back(s->event_name);
3620 }
3621
3622 std::sort(pevents->begin(), pevents->end());
3623
3624 if (fDebug) {
3625 printf("hs_get_events: returning %d events\n", (int)pevents->size());
3626 for (unsigned i=0; i<pevents->size(); i++) {
3627 printf(" %d: [%s]\n", i, (*pevents)[i].c_str());
3628 }
3629 }
3630
3631 return HS_SUCCESS;
3632}
3633
3634int SchemaHistoryBase::hs_get_tags(const char* event_name, time_t t, std::vector<TAG> *ptags)
3635{
3636 if (fDebug)
3637 printf("hs_get_tags: event [%s], time %s\n", event_name, TimeToString(t).c_str());
3638
3639 assert(ptags);
3640
3641 int status = read_schema(&fSchema, event_name, t);
3642 if (status != HS_SUCCESS)
3643 return status;
3644
3645 bool found_event = false;
3646 for (unsigned i=0; i<fSchema.size(); i++) {
3647 HsSchema* s = fSchema[i];
3648 if (t && s->time_to && s->time_to < t)
3649 continue;
3650
3651 if (event_name_cmp(s->event_name, event_name) != 0)
3652 continue;
3653
3654 found_event = true;
3655
3656 for (unsigned i=0; i<s->variables.size(); i++) {
3657 const char* tagname = s->variables[i].name.c_str();
3658 //printf("event_name [%s], table_name [%s], column name [%s], tag name [%s]\n", event_name, tn.c_str(), cn.c_str(), tagname);
3659
3660 bool dupe = false;
3661
3662 for (unsigned k=0; k<ptags->size(); k++)
3663 if (strcasecmp((*ptags)[k].name, tagname) == 0) {
3664 dupe = true;
3665 break;
3666 }
3667
3668 if (!dupe) {
3669 TAG t;
3670 mstrlcpy(t.name, tagname, sizeof(t.name));
3671 t.type = s->variables[i].type;
3672 t.n_data = s->variables[i].n_data;
3673
3674 ptags->push_back(t);
3675 }
3676 }
3677 }
3678
3679 if (!found_event)
3680 return HS_UNDEFINED_EVENT;
3681
3682 if (fDebug) {
3683 printf("hs_get_tags: event [%s], returning %d tags\n", event_name, (int)ptags->size());
3684 for (unsigned i=0; i<ptags->size(); i++) {
3685 printf(" tag[%d]: %s[%d] type %d\n", i, (*ptags)[i].name, (*ptags)[i].n_data, (*ptags)[i].type);
3686 }
3687 }
3688
3689 return HS_SUCCESS;
3690}
3691
3692int SchemaHistoryBase::hs_get_last_written(time_t timestamp, int num_var, const char* const event_name[], const char* const var_name[], const int var_index[], time_t last_written[])
3693{
3694 if (fDebug) {
3695 printf("hs_get_last_written: timestamp %s, num_var %d\n", TimeToString(timestamp).c_str(), num_var);
3696 }
3697
3698 for (int j=0; j<num_var; j++) {
3699 last_written[j] = 0;
3700 }
3701
3702 for (int i=0; i<num_var; i++) {
3703 int status = read_schema(&fSchema, event_name[i], 0);
3704 if (status != HS_SUCCESS)
3705 return status;
3706 }
3707
3708 //fSchema.print(false);
3709
3710 for (int i=0; i<num_var; i++) {
3711 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3712 HsSchema* s = fSchema[ss];
3713 // schema is too new
3714 if (s->time_from && s->time_from >= timestamp)
3715 continue;
3716 // this schema is newer than last_written and may contain newer data?
3717 if (s->time_from && s->time_from < last_written[i])
3718 continue;
3719 // schema for the variables we want?
3720 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3721 if (sindex < 0)
3722 continue;
3723
3724 time_t lw = 0;
3725
3726 int status = s->read_last_written(timestamp, fDebug, &lw);
3727
3728 if (status == HS_SUCCESS && lw != 0) {
3729 for (int j=0; j<num_var; j++) {
3730 int sj = s->match_event_var(event_name[j], var_name[j], var_index[j]);
3731 if (sj < 0)
3732 continue;
3733
3734 if (lw > last_written[j])
3735 last_written[j] = lw;
3736 }
3737 }
3738 }
3739 }
3740
3741 if (fDebug) {
3742 printf("hs_get_last_written: timestamp time %s, num_var %d, result:\n", TimeToString(timestamp).c_str(), num_var);
3743 for (int i=0; i<num_var; i++) {
3744 printf(" event [%s] tag [%s] index [%d] last_written %s\n", event_name[i], var_name[i], var_index[i], TimeToString(last_written[i]).c_str());
3745 }
3746 }
3747
3748 return HS_SUCCESS;
3749}
3750
3752 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3754 int hs_status[])
3755{
3756 if (fDebug)
3757 printf("hs_read_buffer: %d variables, start time %s, end time %s\n", num_var, TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
3758
3759 for (int i=0; i<num_var; i++) {
3760 int status = read_schema(&fSchema, event_name[i], start_time);
3761 if (status != HS_SUCCESS)
3762 return status;
3763 }
3764
3765#if 0
3766 if (fDebug)
3767 fSchema.print(false);
3768#endif
3769
3770 for (int i=0; i<num_var; i++) {
3772 }
3773
3774 //for (unsigned ss=0; ss<fSchema.size(); ss++) {
3775 // HsSchema* s = fSchema[ss];
3776 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3777 // assert(fs != NULL);
3778 // printf("schema %d from %s to %s, filename %s\n", ss, TimeToString(fs->time_from).c_str(), TimeToString(fs->time_to).c_str(), fs->file_name.c_str());
3779 //}
3780
3781 // check that schema are sorted by time
3782
3783#if 0
3784 // check that schema list is sorted by time, descending time_from, newest schema first
3785 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3786 if (fDebug) {
3787 //printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d %d %d\n", ss, fSchema.size(),
3788 // TimeToString(fSchema[ss-1]->time_from).c_str(),
3789 // TimeToString(fSchema[ss]->time_from).c_str(),
3790 // TimeToString(fSchema[ss]->time_to).c_str(),
3791 // fSchema[ss-1]->time_from >= fSchema[ss]->time_to,
3792 // fSchema[ss-1]->time_from > fSchema[ss]->time_from,
3793 // (fSchema[ss-1]->time_from >= fSchema[ss]->time_to) && (fSchema[ss-1]->time_from > fSchema[ss]->time_from));
3794 printf("Schema %zu/%zu: from %s to %s, name %s\n", ss, fSchema.size(),
3795 TimeToString(fSchema[ss]->time_from).c_str(),
3796 TimeToString(fSchema[ss]->time_to).c_str(),
3797 fSchema[ss]->event_name.c_str());
3798 }
3799
3800 if (ss > 0) {
3801 //if ((fSchema[ss-1]->time_from >= fSchema[ss]->time_to) && (fSchema[ss-1]->time_from > fSchema[ss]->time_from)) {
3802 if ((fSchema[ss-1]->time_from >= fSchema[ss]->time_from)) {
3803 // good
3804 } else {
3805 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3806 return HS_FILE_ERROR;
3807 }
3808 }
3809 }
3810#endif
3811
3812 std::vector<HsSchema*> slist;
3813 std::vector<std::vector<int>> smap;
3814
3815 for (unsigned ss=0; ss<fSchema.size(); ss++) {
3816 HsSchema* s = fSchema[ss];
3817 // schema is too new?
3818 if (s->time_from && s->time_from > end_time)
3819 continue;
3820 // schema is too old
3821 if (s->time_to && s->time_to < start_time)
3822 continue;
3823
3824 std::vector<int> sm;
3825
3826 for (int i=0; i<num_var; i++) {
3827 // schema for the variables we want?
3828 int sindex = s->match_event_var(event_name[i], var_name[i], var_index[i]);
3829 if (sindex < 0)
3830 continue;
3831
3832 if (sm.empty()) {
3833 for (int i=0; i<num_var; i++) {
3834 sm.push_back(-1);
3835 }
3836 }
3837
3838 sm[i] = sindex;
3839 }
3840
3841 if (!sm.empty()) {
3842 slist.push_back(s);
3843 smap.push_back(sm);
3844 }
3845 }
3846
3847 if (0||fDebug) {
3848 printf("Found %d matching schema:\n", (int)slist.size());
3849
3850 for (size_t i=0; i<slist.size(); i++) {
3851 HsSchema* s = slist[i];
3852 s->print();
3853 for (int k=0; k<num_var; k++)
3854 printf(" tag %s[%d] sindex %d\n", var_name[k], var_index[k], smap[i][k]);
3855 }
3856 }
3857
3858 //for (size_t ss=0; ss<slist.size(); ss++) {
3859 // HsSchema* s = slist[ss];
3860 // HsFileSchema* fs = dynamic_cast<HsFileSchema*>(s);
3861 // assert(fs != NULL);
3862 // printf("schema %zu from %s to %s, filename %s", ss, TimeToString(fs->time_from).c_str(), TimeToString(fs->time_to).c_str(), fs->file_name.c_str());
3863 // printf(" smap ");
3864 // for (int k=0; k<num_var; k++)
3865 // printf(" %2d", smap[ss][k]);
3866 // printf("\n");
3867 //}
3868
3869 for (size_t ss=1; ss<slist.size(); ss++) {
3870 if (fDebug) {
3871 printf("Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3872 TimeToString(slist[ss-1]->time_from).c_str(),
3873 TimeToString(slist[ss]->time_from).c_str(),
3874 TimeToString(slist[ss]->time_to).c_str(),
3875 slist[ss-1]->time_from >= slist[ss]->time_from);
3876 }
3877 if (slist[ss-1]->time_from >= slist[ss]->time_from) {
3878 // good
3879 } else {
3880 cm_msg(MERROR, "SchemaHistoryBase::hs_read_buffer", "History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3881 return HS_FILE_ERROR;
3882 }
3883 }
3884
3885 std::vector<time_t> last_time;
3886
3887 for (int i=0; i<num_var; i++) {
3888 last_time.push_back(start_time);
3889 }
3890
3891 for (int i=slist.size()-1; i>=0; i--) {
3892 HsSchema* s = slist[i];
3893
3894 int status = s->read_data(start_time, end_time, num_var, smap[i], var_index, fDebug, last_time, buffer);
3895
3896 if (status == HS_SUCCESS) {
3897 for (int j=0; j<num_var; j++) {
3898 if (smap[i][j] >= 0)
3900 }
3901 }
3902 }
3903
3904 return HS_SUCCESS;
3905}
3906
3908 int num_var,
3909 const char* const event_name[], const char* const var_name[], const int var_index[],
3910 int num_entries[],
3911 time_t* time_buffer[], double* data_buffer[],
3912 int st[])
3913{
3914 int status;
3915
3916 ReadBuffer** buffer = new ReadBuffer*[num_var];
3918
3919 for (int i=0; i<num_var; i++) {
3920 buffer[i] = new ReadBuffer(start_time, end_time, interval);
3921 bi[i] = buffer[i];
3922
3923 // make sure outputs are initialized to something sane
3924 if (num_entries)
3925 num_entries[i] = 0;
3926 if (time_buffer)
3927 time_buffer[i] = NULL;
3928 if (data_buffer)
3929 data_buffer[i] = NULL;
3930 if (st)
3931 st[i] = 0;
3932
3933 if (num_entries)
3934 buffer[i]->fNumEntries = &num_entries[i];
3935 if (time_buffer)
3936 buffer[i]->fTimeBuffer = &time_buffer[i];
3937 if (data_buffer)
3938 buffer[i]->fDataBuffer = &data_buffer[i];
3939 }
3940
3941 status = hs_read_buffer(start_time, end_time,
3942 num_var, event_name, var_name, var_index,
3943 bi, st);
3944
3945 for (int i=0; i<num_var; i++) {
3946 buffer[i]->Finish();
3947 delete buffer[i];
3948 }
3949
3950 delete[] buffer;
3951 delete[] bi;
3952
3953 return status;
3954}
3955
3957 int num_var, const char* const event_name[], const char* const var_name[], const int var_index[],
3958 int num_entries[],
3959 int* count_bins[], double* mean_bins[], double* rms_bins[], double* min_bins[], double* max_bins[],
3962 time_t last_time[], double last_value[],
3963 int st[])
3964{
3965 int status;
3966
3969
3970 for (int i=0; i<num_var; i++) {
3971 buffer[i] = new MidasHistoryBinnedBuffer(start_time, end_time, num_bins);
3972 xbuffer[i] = buffer[i];
3973
3974 if (count_bins)
3975 buffer[i]->fCount = count_bins[i];
3976 if (mean_bins)
3977 buffer[i]->fMean = mean_bins[i];
3978 if (rms_bins)
3979 buffer[i]->fRms = rms_bins[i];
3980 if (min_bins)
3981 buffer[i]->fMin = min_bins[i];
3982 if (max_bins)
3983 buffer[i]->fMax = max_bins[i];
3984 if (bins_first_time)
3985 buffer[i]->fBinsFirstTime = bins_first_time[i];
3986 if (bins_first_value)
3987 buffer[i]->fBinsFirstValue = bins_first_value[i];
3988 if (bins_last_time)
3989 buffer[i]->fBinsLastTime = bins_last_time[i];
3990 if (bins_last_value)
3991 buffer[i]->fBinsLastValue = bins_last_value[i];
3992 if (last_time)
3993 buffer[i]->fLastTimePtr = &last_time[i];
3994 if (last_value)
3995 buffer[i]->fLastValuePtr = &last_value[i];
3996
3997 buffer[i]->Start();
3998 }
3999
4000 status = hs_read_buffer(start_time, end_time,
4001 num_var, event_name, var_name, var_index,
4002 xbuffer,
4003 st);
4004
4005 for (int i=0; i<num_var; i++) {
4006 buffer[i]->Finish();
4007 if (num_entries)
4008 num_entries[i] = buffer[i]->fNumEntries;
4009 if (0) {
4010 for (int j=0; j<num_bins; j++) {
4011 printf("var %d bin %d count %d, first %s last %s value first %f last %f\n", i, j, count_bins[i][j], TimeToString(bins_first_time[i][j]).c_str(), TimeToString(bins_last_time[i][j]).c_str(), bins_first_value[i][j], bins_last_value[i][j]);
4012 }
4013 }
4014 delete buffer[i];
4015 }
4016
4017 delete[] buffer;
4018 delete[] xbuffer;
4019
4020 return status;
4021}
4022
4024// SQL schema //
4026
4028{
4029 if (!sql->IsConnected()) {
4030 return HS_SUCCESS;
4031 }
4032
4033 int status = HS_SUCCESS;
4034 if (get_transaction_count() > 0) {
4037 }
4038 return status;
4039}
4040
4041int HsSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4042{
4043 if (!MatchEventName(this->event_name.c_str(), event_name))
4044 return -1;
4045
4046 for (unsigned j=0; j<this->variables.size(); j++) {
4047 if (MatchTagName(this->variables[j].name.c_str(), this->variables[j].n_data, var_name, var_index)) {
4048 // Second clause in if() is case where MatchTagName used the "alternate tag name".
4049 // E.g. our variable name is "IM05[3]" (n_data=1), but we're looking for var_name="IM05" and var_index=3.
4050 if (var_index < this->variables[j].n_data || (this->variables[j].n_data == 1 && this->variables[j].name.find("[") != std::string::npos)) {
4051 return j;
4052 }
4053 }
4054 }
4055
4056 return -1;
4057}
4058
4059int HsSqlSchema::match_event_var(const char* event_name, const char* var_name, const int var_index)
4060{
4061 if (event_name_cmp(this->table_name, event_name)==0) {
4062 for (unsigned j=0; j<this->variables.size(); j++) {
4063 if (var_name_cmp(this->column_names[j], var_name)==0)
4064 return j;
4065 }
4066 }
4067
4068 return HsSchema::match_event_var(event_name, var_name, var_index);
4069}
4070
4071static HsSqlSchema* NewSqlSchema(HsSchemaVector* sv, const char* table_name, time_t t)
4072{
4073 time_t tt = 0;
4074 int j=-1;
4075 int jjx=-1; // remember oldest schema
4076 time_t ttx = 0;
4077 for (unsigned i=0; i<sv->size(); i++) {
4078 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
4079 if (s->table_name != table_name)
4080 continue;
4081
4082 if (s->time_from == t)
4083 return s;
4084
4085 // remember the last schema before time t
4086 if (s->time_from < t) {
4087 if (s->time_from > tt) {
4088 tt = s->time_from;
4089 j = i;
4090 }
4091 }
4092
4093 if (jjx < 0) {
4094 jjx = i;
4095 ttx = s->time_from;
4096 }
4097
4098 if (s->time_from < ttx) {
4099 jjx = i;
4100 ttx = s->time_from;
4101 }
4102
4103 //printf("table_name [%s], t=%s, i=%d, j=%d %s, tt=%s, dt is %d\n", table_name, TimeToString(t).c_str(), i, j, TimeToString(s->time_from).c_str(), TimeToString(tt).c_str(), (int)(s->time_from-t));
4104 }
4105
4106 //printf("NewSqlSchema: will copy schema j=%d, tt=%d at time %d\n", j, tt, t);
4107
4108 //printf("cloned schema at time %s: ", TimeToString(t).c_str());
4109 //(*sv)[j]->print(false);
4110
4111 //printf("schema before:\n");
4112 //sv->print(false);
4113
4114 if (j >= 0) {
4115 HsSqlSchema* s = new HsSqlSchema;
4116 *s = *(HsSqlSchema*)(*sv)[j]; // make a copy
4117 s->time_from = t;
4118 sv->add(s);
4119
4120 //printf("schema after:\n");
4121 //sv->print(false);
4122
4123 return s;
4124 }
4125
4126 if (jjx >= 0) {
4127 cm_msg(MERROR, "NewSqlSchema", "Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4128
4129 HsSqlSchema* s = new HsSqlSchema;
4130 *s = *(HsSqlSchema*)(*sv)[jjx]; // make a copy
4131 s->time_from = t;
4132 s->time_to = ttx;
4133 sv->add(s);
4134
4135 //printf("schema after:\n");
4136 //sv->print(false);
4137
4138 return s;
4139 }
4140
4141 cm_msg(MERROR, "NewSqlSchema", "Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4142 return NULL;
4143}
4144
4145int HsSqlSchema::write_event(const time_t t, const char* data, const int data_size)
4146{
4147 HsSqlSchema* s = this;
4148
4149 std::string tags;
4150 std::string values;
4151
4152 for (unsigned i=0; i<s->variables.size(); i++) {
4153 if (s->variables[i].inactive)
4154 continue;
4155
4156 int type = s->variables[i].type;
4157 int n_data = s->variables[i].n_data;
4158 int offset = s->offsets[i];
4159 const char* column_name = s->column_names[i].c_str();
4160
4161 if (offset < 0)
4162 continue;
4163
4164 assert(n_data == 1);
4165 assert(strlen(column_name) > 0);
4166 assert(offset < data_size);
4167
4168 void* ptr = (void*)(data+offset);
4169
4170 tags += ", ";
4171 tags += sql->QuoteId(column_name);
4172
4173 values += ", ";
4174
4175 char buf[1024];
4176 int j=0;
4177
4178 switch (type) {
4179 default:
4180 sprintf(buf, "unknownType%d", type);
4181 break;
4182 case TID_BYTE:
4183 sprintf(buf, "%u",((unsigned char *)ptr)[j]);
4184 break;
4185 case TID_SBYTE:
4186 sprintf(buf, "%d",((signed char*)ptr)[j]);
4187 break;
4188 case TID_CHAR:
4189 // FIXME: quotes
4190 sprintf(buf, "\'%c\'",((char*)ptr)[j]);
4191 break;
4192 case TID_WORD:
4193 sprintf(buf, "%u",((unsigned short *)ptr)[j]);
4194 break;
4195 case TID_SHORT:
4196 sprintf(buf, "%d",((short *)ptr)[j]);
4197 break;
4198 case TID_DWORD:
4199 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4200 break;
4201 case TID_INT:
4202 sprintf(buf, "%d",((int *)ptr)[j]);
4203 break;
4204 case TID_BOOL:
4205 sprintf(buf, "%u",((unsigned int *)ptr)[j]);
4206 break;
4207 case TID_FLOAT:
4208 // FIXME: quotes
4209 sprintf(buf, "\'%.8g\'",((float*)ptr)[j]);
4210 break;
4211 case TID_DOUBLE:
4212 // FIXME: quotes
4213 sprintf(buf, "\'%.16g\'",((double*)ptr)[j]);
4214 break;
4215 }
4216
4217 values += buf;
4218 }
4219
4220 // 2001-02-16 20:38:40.1
4221 struct tm tms;
4222 localtime_r(&t, &tms); // somebody must call tzset() before this.
4223 char buf[1024];
4224 strftime(buf, sizeof(buf)-1, "%Y-%m-%d %H:%M:%S.0", &tms);
4225
4226 std::string cmd;
4227 cmd = "INSERT INTO ";
4228 cmd += sql->QuoteId(s->table_name.c_str());
4229 cmd += " (_t_time, _i_time";
4230 cmd += tags;
4231 cmd += ") VALUES (";
4232 cmd += sql->QuoteString(buf);
4233 cmd += ", ";
4234 cmd += sql->QuoteString(TimeToString(t).c_str());
4235 cmd += "";
4236 cmd += values;
4237 cmd += ");";
4238
4239 if (sql->IsConnected()) {
4240 if (s->get_transaction_count() == 0)
4241 sql->OpenTransaction(s->table_name.c_str());
4242
4244
4245 int status = sql->Exec(s->table_name.c_str(), cmd.c_str());
4246
4247 // mh2sql who does not call hs_flush_buffers()
4248 // so we should flush the transaction by hand
4249 // some SQL engines have limited transaction buffers... K.O.
4250 if (s->get_transaction_count() > 100000) {
4251 //printf("flush table %s\n", table_name);
4252 sql->CommitTransaction(s->table_name.c_str());
4254 }
4255
4256 if (status != DB_SUCCESS) {
4257 return status;
4258 }
4259 } else {
4260 int status = sql->ExecDisconnected(s->table_name.c_str(), cmd.c_str());
4261 if (status != DB_SUCCESS) {
4262 return status;
4263 }
4264 }
4265
4266 return HS_SUCCESS;
4267}
4268
4270 const int debug,
4271 time_t* last_written)
4272{
4273 if (debug)
4274 printf("SqlHistory::read_last_written: table [%s], timestamp %s\n", table_name.c_str(), TimeToString(timestamp).c_str());
4275
4276 std::string cmd;
4277 cmd += "SELECT _i_time FROM ";
4278 cmd += sql->QuoteId(table_name.c_str());
4279 cmd += " WHERE _i_time < ";
4280 cmd += TimeToString(timestamp);
4281 cmd += " ORDER BY _i_time DESC LIMIT 2;";
4282
4283 int status = sql->Prepare(table_name.c_str(), cmd.c_str());
4284
4285 if (status != DB_SUCCESS)
4286 return status;
4287
4288 time_t lw = 0;
4289
4290 /* Loop through the rows in the result-set */
4291
4292 while (1) {
4293 status = sql->Step();
4294 if (status != DB_SUCCESS)
4295 break;
4296
4297 time_t t = sql->GetTime(0);
4298
4299 if (t >= timestamp)
4300 continue;
4301
4302 if (t > lw)
4303 lw = t;
4304 }
4305
4306 sql->Finalize();
4307
4308 *last_written = lw;
4309
4310 if (debug)
4311 printf("SqlHistory::read_last_written: table [%s], timestamp %s, last_written %s\n", table_name.c_str(), TimeToString(timestamp).c_str(), TimeToString(lw).c_str());
4312
4313 return HS_SUCCESS;
4314}
4315
4316int HsSqlSchema::read_data(const time_t start_time,
4317 const time_t end_time,
4318 const int num_var, const std::vector<int>& var_schema_index, const int var_index[],
4319 const int debug,
4320 std::vector<time_t>& last_time,
4322{
4323 bool bad_last_time = false;
4324
4325 if (debug)
4326 printf("SqlHistory::read_data: table [%s], start %s, end %s\n", table_name.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4327
4328 std::string collist;
4329
4330 for (int i=0; i<num_var; i++) {
4331 int j = var_schema_index[i];
4332 if (j < 0)
4333 continue;
4334 if (collist.length() > 0)
4335 collist += ", ";
4337 }
4338
4339 std::string cmd;
4340 cmd += "SELECT _i_time, ";
4341 cmd += collist;
4342 cmd += " FROM ";
4343 cmd += sql->QuoteId(table_name.c_str());
4344 cmd += " WHERE _i_time>=";
4345 cmd += TimeToString(start_time);
4346 cmd += " and _i_time<=";
4347 cmd += TimeToString(end_time);
4348 cmd += " ORDER BY _i_time;";
4349
4350 int status = sql->Prepare(table_name.c_str(), cmd.c_str());
4351
4352 if (status != DB_SUCCESS)
4353 return HS_FILE_ERROR;
4354
4355 /* Loop through the rows in the result-set */
4356
4357 int count = 0;
4358
4359 while (1) {
4360 status = sql->Step();
4361 if (status != DB_SUCCESS)
4362 break;
4363
4364 count++;
4365
4366 time_t t = sql->GetTime(0);
4367
4368 if (t < start_time || t > end_time)
4369 continue;
4370
4371 int k = 0;
4372
4373 for (int i=0; i<num_var; i++) {
4374 int j = var_schema_index[i];
4375 if (j < 0)
4376 continue;
4377
4378 if (t < last_time[i]) { // protect against duplicate and non-monotonous data
4379 bad_last_time = true;
4380 } else {
4381 double v = sql->GetDouble(1+k);
4382
4383 //printf("Column %d, index %d, Row %d, time %d, value %f\n", k, colindex[k], count, t, v);
4384
4385 buffer[i]->Add(t, v);
4386 last_time[i] = t;
4387 }
4388
4389 k++;
4390 }
4391 }
4392
4393 sql->Finalize();
4394
4395 if (bad_last_time) {
4396 cm_msg(MERROR, "SqlHistory::read_data", "Detected duplicate or non-monotonous data in table \"%s\" for start time %s and end time %s", table_name.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str());
4397 }
4398
4399 if (debug)
4400 printf("SqlHistory::read_data: table [%s], start %s, end %s, read %d rows\n", table_name.c_str(), TimeToString(start_time).c_str(), TimeToString(end_time).c_str(), count);
4401
4402 return HS_SUCCESS;
4403}
4404
4406 if (!sql || sql->fTransactionPerTable) {
4408 } else {
4410 }
4411}
4412
4414 if (!sql || sql->fTransactionPerTable) {
4416 } else {
4418 }
4419}
4420
4428
4430// SQL history functions //
4432
4433static int StartSqlTransaction(SqlBase* sql, const char* table_name, bool* have_transaction)
4434{
4435 if (*have_transaction)
4436 return HS_SUCCESS;
4437
4438 int status = sql->OpenTransaction(table_name);
4439 if (status != DB_SUCCESS)
4440 return HS_FILE_ERROR;
4441
4442 *have_transaction = true;
4443 return HS_SUCCESS;
4444}
4445
4446static int CreateSqlTable(SqlBase* sql, const char* table_name, bool* have_transaction, bool set_default_timestamp = false)
4447{
4448 int status;
4449
4450 status = StartSqlTransaction(sql, table_name, have_transaction);
4451 if (status != DB_SUCCESS)
4452 return HS_FILE_ERROR;
4453
4454 std::string cmd;
4455
4456 cmd = "CREATE TABLE ";
4457 cmd += sql->QuoteId(table_name);
4459 cmd += " (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4460 } else {
4461 cmd += " (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4462 }
4463
4464 status = sql->Exec(table_name, cmd.c_str());
4465
4466
4467 if (status == DB_KEY_EXIST) {
4468 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", but it already exists", table_name);
4470 return status;
4471 }
4472
4473 if (status != DB_SUCCESS) {
4474 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4476 return HS_FILE_ERROR;
4477 }
4478
4479 cm_msg(MINFO, "CreateSqlTable", "Adding SQL table \"%s\"", table_name);
4481
4482 std::string i_index_name;
4483 i_index_name = table_name;
4484 i_index_name += "_i_time_index";
4485
4486 std::string t_index_name;
4487 t_index_name = table_name;
4488 t_index_name += "_t_time_index";
4489
4490 cmd = "CREATE INDEX ";
4491 cmd += sql->QuoteId(i_index_name.c_str());
4492 cmd += " ON ";
4493 cmd += sql->QuoteId(table_name);
4494 cmd += " (_i_time ASC);";
4495
4496 status = sql->Exec(table_name, cmd.c_str());
4497 if (status != DB_SUCCESS)
4498 return HS_FILE_ERROR;
4499
4500 cmd = "CREATE INDEX ";
4501 cmd += sql->QuoteId(t_index_name.c_str());
4502 cmd += " ON ";
4503 cmd += sql->QuoteId(table_name);
4504 cmd += " (_t_time);";
4505
4506 status = sql->Exec(table_name, cmd.c_str());
4507 if (status != DB_SUCCESS)
4508 return HS_FILE_ERROR;
4509
4510 return status;
4511}
4512
4513static int CreateSqlHyperTable(SqlBase* sql, const char* table_name, bool* have_transaction) {
4514 int status;
4515
4516 status = StartSqlTransaction(sql, table_name, have_transaction);
4517 if (status != DB_SUCCESS)
4518 return HS_FILE_ERROR;
4519
4520 std::string cmd;
4521
4522 cmd = "CREATE TABLE ";
4523 cmd += sql->QuoteId(table_name);
4524 cmd += " (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4525
4526 status = sql->Exec(table_name, cmd.c_str());
4527
4528 if (status == DB_KEY_EXIST) {
4529 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", but it already exists", table_name);
4531 return status;
4532 }
4533
4534 if (status != DB_SUCCESS) {
4535 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\", error status %d", table_name, status);
4537 return HS_FILE_ERROR;
4538 }
4539
4540 cm_msg(MINFO, "CreateSqlHyperTable", "Adding SQL table \"%s\"", table_name);
4542
4543 cmd = "SELECT create_hypertable(";
4544 cmd += sql->QuoteString(table_name);
4545 cmd += ", '_t_time');";
4546
4547 // convert regular table to hypertable
4548 status = sql->Exec(table_name, cmd.c_str());
4549
4550 if (status != DB_SUCCESS) {
4551 cm_msg(MINFO, "CreateSqlHyperTable", "Converting SQL table to hypertable \"%s\", error status %d", table_name, status);
4553 return HS_FILE_ERROR;
4554 }
4555
4556 std::string i_index_name;
4557 i_index_name = table_name;
4558 i_index_name += "_i_time_index";
4559
4560 std::string t_index_name;
4561 t_index_name = table_name;
4562 t_index_name += "_t_time_index";
4563
4564 cmd = "CREATE INDEX ";
4565 cmd += sql->QuoteId(i_index_name.c_str());
4566 cmd += " ON ";
4567 cmd += sql->QuoteId(table_name);
4568 cmd += " (_i_time ASC);";
4569
4570 status = sql->Exec(table_name, cmd.c_str());
4571 if (status != DB_SUCCESS)
4572 return HS_FILE_ERROR;
4573
4574 cmd = "CREATE INDEX ";
4575 cmd += sql->QuoteId(t_index_name.c_str());
4576 cmd += " ON ";
4577 cmd += sql->QuoteId(table_name);
4578 cmd += " (_t_time);";
4579
4580 status = sql->Exec(table_name, cmd.c_str());
4581 if (status != DB_SUCCESS)
4582 return HS_FILE_ERROR;
4583
4584 return status;
4585}
4586
4587static int CreateSqlColumn(SqlBase* sql, const char* table_name, const char* column_name, const char* column_type, bool* have_transaction, int debug)
4588{
4589 if (debug)
4590 printf("CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4591
4592 int status = StartSqlTransaction(sql, table_name, have_transaction);
4593 if (status != HS_SUCCESS)
4594 return status;
4595
4596 std::string cmd;
4597 cmd = "ALTER TABLE ";
4598 cmd += sql->QuoteId(table_name);
4599 cmd += " ADD COLUMN ";
4600 cmd += sql->QuoteId(column_name);
4601 cmd += " ";
4602 cmd += column_type;
4603 cmd += ";";
4604
4605 status = sql->Exec(table_name, cmd.c_str());
4606
4607 cm_msg(MINFO, "CreateSqlColumn", "Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name, status);
4609
4610 return status;
4611}
4612
4614// SQL history base classes //
4616
4618{
4619public:
4621
4623 {
4624 fSql = NULL;
4626 }
4627
4628 virtual ~SqlHistoryBase() // dtor
4629 {
4630 hs_disconnect();
4631 if (fSql)
4632 delete fSql;
4633 fSql = NULL;
4634 }
4635
4637 {
4638 if (fSql)
4639 fSql->fDebug = debug;
4641 }
4642
4643 int hs_connect(const char* connect_string);
4644 int hs_disconnect();
4645 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
4646 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
4647
4648protected:
4650 virtual int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name) = 0;
4651 virtual int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp) = 0;
4652 virtual int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction) = 0;
4653
4654 int update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable);
4655 int update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction);
4656};
4657
4659{
4660 if (fDebug)
4661 printf("hs_connect [%s]!\n", connect_string);
4662
4663 assert(fSql);
4664
4665 if (fSql->IsConnected())
4666 if (strcmp(fConnectString.c_str(), connect_string) == 0)
4667 return HS_SUCCESS;
4668
4669 hs_disconnect();
4670
4671 if (!connect_string || strlen(connect_string) < 1) {
4672 // FIXME: should use "logger dir" or some such default, that code should be in hs_get_history(), not here
4673 connect_string = ".";
4674 }
4675
4677
4678 if (fDebug)
4679 printf("hs_connect: connecting to SQL database \'%s\'\n", fConnectString.c_str());
4680
4681 int status = fSql->Connect(fConnectString.c_str());
4682 if (status != DB_SUCCESS)
4683 return status;
4684
4685 return HS_SUCCESS;
4686}
4687
4689{
4690 if (fDebug)
4691 printf("hs_disconnect!\n");
4692
4694
4695 fSql->Disconnect();
4696
4698
4699 return HS_SUCCESS;
4700}
4701
4702HsSchema* SqlHistoryBase::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
4703{
4704 if (fDebug)
4705 printf("SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
4706
4707 int status;
4708
4709 if (fWriterCurrentSchema.size() == 0) {
4711 if (status != HS_SUCCESS)
4712 return NULL;
4713 }
4714
4715 HsSqlSchema* s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4716
4717 // schema does not exist, the SQL tables probably do not exist yet
4718
4719 if (!s) {
4720 status = create_table(&fWriterCurrentSchema, event_name, timestamp);
4721 if (status != HS_SUCCESS)
4722 return NULL;
4723
4724 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4725
4726 if (!s) {
4727 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4728 fWriterCurrentSchema.find_event(event_name, timestamp, 1);
4729 return NULL;
4730 }
4731 }
4732
4733 assert(s != NULL);
4734
4736 if (status != HS_SUCCESS)
4737 return NULL;
4738
4739 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4740
4741 if (!s) {
4742 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4743 return NULL;
4744 }
4745
4746 if (0||fDebug) {
4747 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4748 if (s)
4749 s->print();
4750 }
4751
4752 status = update_schema(s, timestamp, ntags, tags, true);
4753 if (status != HS_SUCCESS) {
4754 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4755 return NULL;
4756 }
4757
4759 if (status != HS_SUCCESS)
4760 return NULL;
4761
4762 s = (HsSqlSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
4763
4764 if (!s) {
4765 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4766 return NULL;
4767 }
4768
4769 if (0||fDebug) {
4770 printf("SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4771 if (s)
4772 s->print();
4773 }
4774
4775 // last call to UpdateMysqlSchema with "false" will check that new schema matches the new tags
4776
4777 status = update_schema(s, timestamp, ntags, tags, false);
4778 if (status != HS_SUCCESS) {
4779 cm_msg(MERROR, "SqlHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4780 //fDebug = 1;
4781 //update_schema(s, timestamp, ntags, tags, false);
4782 //abort();
4783 return NULL;
4784 }
4785
4786 HsSqlSchema* e = new HsSqlSchema();
4787
4788 *e = *s; // make a copy of the schema
4789
4790 return e;
4791}
4792
4793int SqlHistoryBase::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
4794{
4795 if (fDebug)
4796 printf("SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
4797
4798 int status;
4799
4800 if (fSchema.size() == 0) {
4802 if (status != HS_SUCCESS)
4803 return status;
4804 }
4805
4806 //sv->print(false);
4807
4808 if (event_name == NULL)
4809 return HS_SUCCESS;
4810
4811 for (unsigned i=0; i<sv->size(); i++) {
4812 HsSqlSchema* h = (HsSqlSchema*)(*sv)[i];
4813 // skip schema with already read column names
4814 if (h->variables.size() > 0)
4815 continue;
4816 // skip schema with different name
4817 if (!MatchEventName(h->event_name.c_str(), event_name))
4818 continue;
4819
4820 unsigned nn = sv->size();
4821
4822 status = read_column_names(sv, h->table_name.c_str(), h->event_name.c_str());
4823
4824 // if new schema was added, loop all over again
4825 if (sv->size() != nn)
4826 i=0;
4827 }
4828
4829 //sv->print(false);
4830
4831 return HS_SUCCESS;
4832}
4833
4834int SqlHistoryBase::update_schema(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
4835{
4836 int status;
4837 bool have_transaction = false;
4838
4839 status = update_schema1(s, timestamp, ntags, tags, write_enable, &have_transaction);
4840
4841 if (have_transaction) {
4842 int xstatus;
4843
4844 if (status == HS_SUCCESS)
4846 else
4848
4849 if (xstatus != DB_SUCCESS) {
4850 return HS_FILE_ERROR;
4851 }
4852 have_transaction = false;
4853 }
4854
4855 return status;
4856}
4857
4858int SqlHistoryBase::update_schema1(HsSqlSchema* s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool* have_transaction)
4859{
4860 int status;
4861
4862 if (fDebug)
4863 printf("update_schema1\n");
4864
4865 // check that compare schema with tags[]
4866
4867 bool schema_ok = true;
4868
4869 int offset = 0;
4870 for (int i=0; i<ntags; i++) {
4871 for (unsigned int j=0; j<tags[i].n_data; j++) {
4872 int tagtype = tags[i].type;
4873 std::string tagname = tags[i].name;
4874 std::string maybe_colname = MidasNameToSqlName(tags[i].name);
4875
4876 if (tags[i].n_data > 1) {
4877 char s[256];
4878 sprintf(s, "[%d]", j);
4879 tagname += s;
4880
4881 sprintf(s, "_%d", j);
4882 maybe_colname += s;
4883 }
4884
4885 int count = 0;
4886
4887 for (unsigned j=0; j<s->variables.size(); j++) {
4888 // NB: inactive columns will be reactivated or recreated by the if(count==0) branch. K.O.
4889 if (s->variables[j].inactive)
4890 continue;
4891 if (tagname == s->variables[j].name) {
4892 if (s->sql->TypesCompatible(tagtype, s->column_types[j].c_str())) {
4893 if (count == 0) {
4894 s->offsets[j] = offset;
4896 }
4897 count++;
4898 if (count > 1) {
4899 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->column_names[j].c_str(), s->column_types[j].c_str(), s->table_name.c_str(), rpc_tid_name(tagtype), s->event_name.c_str(), tagname.c_str());
4901 }
4902 } else {
4903 // column with incompatible type, mark it as unused
4904 schema_ok = false;
4905 if (fDebug)
4906 printf("Incompatible column!\n");
4907 if (write_enable) {
4908 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" as incompatible with MIDAS type \'%s\' history event \"%s\" tag \"%s\"", s->column_names[j].c_str(), s->column_types[j].c_str(), s->table_name.c_str(), rpc_tid_name(tagtype), s->event_name.c_str(), tagname.c_str());
4910
4911 status = update_column(s->event_name.c_str(), s->table_name.c_str(), s->column_names[j].c_str(), s->column_types[j].c_str(), s->variables[j].tag_name.c_str(), s->variables[i].tag_type.c_str(), timestamp, false, have_transaction);
4912 if (status != HS_SUCCESS)
4913 return status;
4914 }
4915 }
4916 }
4917 }
4918
4919 if (count == 0) {
4920 // tag does not have a corresponding column
4921 schema_ok = false;
4922 if (fDebug)
4923 printf("No column for tag %s!\n", tagname.c_str());
4924
4925 bool found_column = false;
4926
4927 if (write_enable) {
4928 for (unsigned j=0; j<s->variables.size(); j++) {
4929 if (tagname == s->variables[j].tag_name) {
4930 bool typeok = s->sql->TypesCompatible(tagtype, s->column_types[j].c_str());
4931 if (typeok) {
4932 cm_msg(MINFO, "SqlHistory::update_schema", "Reactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->column_names[j].c_str(), s->column_types[j].c_str(), s->table_name.c_str(), s->event_name.c_str(), tagname.c_str());
4934
4935 status = update_column(s->event_name.c_str(), s->table_name.c_str(), s->column_names[j].c_str(), s->column_types[j].c_str(), s->variables[j].tag_name.c_str(), s->variables[j].tag_type.c_str(), timestamp, true, have_transaction);
4936 if (status != HS_SUCCESS)
4937 return status;
4938
4939 if (count == 0) {
4940 s->offsets[j] = offset;
4942 }
4943 count++;
4944 found_column = true;
4945 if (count > 1) {
4946 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" tag \"%s\"", s->column_names[j].c_str(), s->column_types[j].c_str(), s->table_name.c_str(), s->event_name.c_str(), tagname.c_str());
4948 }
4949 }
4950 }
4951 }
4952 }
4953
4954 // create column
4955 if (!found_column && write_enable) {
4956 std::string col_name = maybe_colname;
4957 const char* col_type = s->sql->ColumnType(tagtype);
4958
4959 bool dupe = false;
4960 for (unsigned kk=0; kk<s->column_names.size(); kk++)
4961 if (s->column_names[kk] == col_name) {
4962 dupe = true;
4963 break;
4964 }
4965
4966 time_t now = time(NULL);
4967
4968 bool retry = false;
4969 for (int t=0; t<20; t++) {
4970
4971 // if duplicate column name, change it, try again
4972 if (dupe || retry) {
4974 col_name += "_";
4976 if (t > 0) {
4977 char s[256];
4978 sprintf(s, "_%d", t);
4979 col_name += s;
4980 }
4981 }
4982
4983 if (fDebug)
4984 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->table_name.c_str(), col_name.c_str(), col_type, tagname.c_str());
4985
4987
4988 if (status == DB_KEY_EXIST) {
4989 if (fDebug)
4990 printf("SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->table_name.c_str(), col_name.c_str(), col_type, tagname.c_str());
4991 retry = true;
4992 continue;
4993 }
4994
4995 if (status != HS_SUCCESS)
4996 return status;
4997
4998 break;
4999 }
5000
5001 if (status != HS_SUCCESS)
5002 return status;
5003
5004 status = update_column(s->event_name.c_str(), s->table_name.c_str(), col_name.c_str(), col_type, tagname.c_str(), rpc_tid_name(tagtype), timestamp, true, have_transaction);
5005 if (status != HS_SUCCESS)
5006 return status;
5007 }
5008 }
5009
5010 if (count > 1) {
5011 // schema has duplicate tags
5012 schema_ok = false;
5013 cm_msg(MERROR, "SqlHistory::update_schema", "Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->event_name.c_str(), tagname.c_str());
5015 }
5016 }
5017 }
5018
5019 // mark as unused all columns not listed in tags
5020
5021 for (unsigned k=0; k<s->column_names.size(); k++)
5022 if (s->variables[k].name.length() > 0) {
5023 bool found = false;
5024
5025 for (int i=0; i<ntags; i++) {
5026 for (unsigned int j=0; j<tags[i].n_data; j++) {
5027 std::string tagname = tags[i].name;
5028
5029 if (tags[i].n_data > 1) {
5030 char s[256];
5031 sprintf(s, "[%d]", j);
5032 tagname += s;
5033 }
5034
5035 if (s->variables[k].name == tagname) {
5036 found = true;
5037 break;
5038 }
5039 }
5040
5041 if (found)
5042 break;
5043 }
5044
5045 if (!found) {
5046 // column not found in tags list
5047 schema_ok = false;
5048 if (fDebug)
5049 printf("Event [%s] Column [%s] tag [%s] not listed in tags list!\n", s->event_name.c_str(), s->column_names[k].c_str(), s->variables[k].name.c_str());
5050 if (write_enable) {
5051 cm_msg(MINFO, "SqlHistory::update_schema", "Deactivating SQL column \'%s\' type \'%s\' in table \"%s\" for history event \"%s\" not used for any tags", s->column_names[k].c_str(), s->column_types[k].c_str(), s->table_name.c_str(), s->event_name.c_str());
5053
5054 status = update_column(s->event_name.c_str(), s->table_name.c_str(), s->column_names[k].c_str(), s->column_types[k].c_str(), s->variables[k].tag_name.c_str(), s->variables[k].tag_type.c_str(), timestamp, false, have_transaction);
5055 if (status != HS_SUCCESS)
5056 return status;
5057 }
5058 }
5059 }
5060
5061 if (!write_enable)
5062 if (!schema_ok) {
5063 if (fDebug)
5064 printf("Return error!\n");
5065 return HS_FILE_ERROR;
5066 }
5067
5068 return HS_SUCCESS;
5069}
5070
5072// SQLITE functions //
5074
5075static int ReadSqliteTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5076{
5077 if (debug)
5078 printf("ReadSqliteTableNames: table [%s]\n", table_name);
5079
5080 int status;
5081 std::string cmd;
5082
5083 // FIXME: quotes
5084 cmd = "SELECT event_name, _i_time FROM \'_event_name_";
5085 cmd += table_name;
5086 cmd += "\' WHERE table_name='";
5087 cmd += table_name;
5088 cmd += "';";
5089
5090 status = sql->Prepare(table_name, cmd.c_str());
5091
5092 if (status != DB_SUCCESS)
5093 return status;
5094
5095 while (1) {
5096 status = sql->Step();
5097
5098 if (status != DB_SUCCESS)
5099 break;
5100
5101 std::string xevent_name = sql->GetText(0);
5102 time_t xevent_time = sql->GetTime(1);
5103
5104 //printf("read event name [%s] time %s\n", xevent_name.c_str(), TimeToString(xevent_time).c_str());
5105
5106 HsSqlSchema* s = new HsSqlSchema;
5107 s->sql = sql;
5110 s->time_to = 0;
5111 s->table_name = table_name;
5112 sv->add(s);
5113 }
5114
5115 status = sql->Finalize();
5116
5117 return HS_SUCCESS;
5118}
5119
5120static int ReadSqliteTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5121{
5122 if (debug)
5123 printf("ReadSqliteTableSchema: table [%s]\n", table_name);
5124
5125 if (1) {
5126 // seed schema with table names
5127 HsSqlSchema* s = new HsSqlSchema;
5128 s->sql = sql;
5129 s->event_name = table_name;
5130 s->time_from = 0;
5131 s->time_to = 0;
5132 s->table_name = table_name;
5133 sv->add(s);
5134 }
5135
5136 return ReadSqliteTableNames(sql, sv, table_name, debug);
5137}
5138
5140// SQLITE history classes //
5142
5144{
5145public:
5146 SqliteHistory() { // ctor
5147#ifdef HAVE_SQLITE
5148 fSql = new Sqlite();
5149#endif
5150 }
5151
5153 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5154 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5155 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5156};
5157
5159{
5160 int status;
5161
5162 if (fDebug)
5163 printf("SqliteHistory::read_table_and_event_names!\n");
5164
5165 // loop over all tables
5166
5167 std::vector<std::string> tables;
5169 if (status != DB_SUCCESS)
5170 return status;
5171
5172 for (unsigned i=0; i<tables.size(); i++) {
5173 const char* table_name = tables[i].c_str();
5174
5175 const char* s;
5176 s = strstr(table_name, "_event_name_");
5177 if (s == table_name)
5178 continue;
5179 s = strstr(table_name, "_column_names_");
5180 if (s == table_name)
5181 continue;
5182
5183 status = ReadSqliteTableSchema(fSql, sv, table_name, fDebug);
5184 }
5185
5186 return HS_SUCCESS;
5187}
5188
5189int SqliteHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5190{
5191 if (fDebug)
5192 printf("SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5193
5194 // for all schema for table_name, prepopulate is with column names
5195
5196 std::vector<std::string> columns;
5197 fSql->ListColumns(table_name, &columns);
5198
5199 // first, populate column names
5200
5201 for (unsigned i=0; i<sv->size(); i++) {
5202 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5203
5204 if (s->table_name != table_name)
5205 continue;
5206
5207 // schema should be empty at this point
5208 //assert(s->variables.size() == 0);
5209
5210 for (unsigned j=0; j<columns.size(); j+=2) {
5211 const char* cn = columns[j+0].c_str();
5212 const char* ct = columns[j+1].c_str();
5213
5214 if (strcmp(cn, "_t_time") == 0)
5215 continue;
5216 if (strcmp(cn, "_i_time") == 0)
5217 continue;
5218
5219 bool found = false;
5220
5221 for (unsigned k=0; k<s->column_names.size(); k++) {
5222 if (s->column_names[k] == cn) {
5223 found = true;
5224 break;
5225 }
5226 }
5227
5228 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5229
5230 if (!found) {
5232 se.name = cn;
5233 se.type = 0;
5234 se.n_data = 1;
5235 se.n_bytes = 0;
5236 s->variables.push_back(se);
5237 s->column_names.push_back(cn);
5238 s->column_types.push_back(ct);
5239 s->offsets.push_back(-1);
5240 }
5241 }
5242 }
5243
5244 // then read column name information
5245
5246 std::string tn;
5247 tn += "_column_names_";
5248 tn += table_name;
5249
5250 std::string cmd;
5251 cmd = "SELECT column_name, tag_name, tag_type, _i_time FROM ";
5252 cmd += fSql->QuoteId(tn.c_str());
5253 cmd += " WHERE table_name=";
5254 cmd += fSql->QuoteString(table_name);
5255 cmd += " ORDER BY _i_time ASC;";
5256
5257 int status = fSql->Prepare(table_name, cmd.c_str());
5258
5259 if (status != DB_SUCCESS) {
5260 return status;
5261 }
5262
5263 while (1) {
5264 status = fSql->Step();
5265
5266 if (status != DB_SUCCESS)
5267 break;
5268
5269 // NOTE: SQL "SELECT ORDER BY _i_time ASC" returns data sorted by time
5270 // in this code we use the data from the last data row
5271 // so if multiple rows are present, the latest one is used
5272
5273 std::string col_name = fSql->GetText(0);
5274 std::string tag_name = fSql->GetText(1);
5275 std::string tag_type = fSql->GetText(2);
5277
5278 //printf("read table [%s] column [%s] tag name [%s] time %s\n", table_name, col_name.c_str(), tag_name.c_str(), TimeToString(xxx_time).c_str());
5279
5280 // make sure a schema exists at this time point
5281 NewSqlSchema(sv, table_name, schema_time);
5282
5283 // add this information to all schema
5284
5285 for (unsigned i=0; i<sv->size(); i++) {
5286 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5287 if (s->table_name != table_name)
5288 continue;
5289 if (s->time_from < schema_time)
5290 continue;
5291
5292 //printf("add column to schema %d\n", s->time_from);
5293
5294 for (unsigned j=0; j<s->column_names.size(); j++) {
5295 if (col_name != s->column_names[j])
5296 continue;
5297 s->variables[j].name = tag_name;
5298 s->variables[j].type = rpc_name_tid(tag_type.c_str());
5299 s->variables[j].n_data = 1;
5300 s->variables[j].n_bytes = rpc_tid_size(s->variables[j].type);
5301 }
5302 }
5303 }
5304
5305 status = fSql->Finalize();
5306
5307 return HS_SUCCESS;
5308}
5309
5310int SqliteHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5311{
5312 if (fDebug)
5313 printf("SqliteHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5314
5315 int status;
5316 bool have_transaction = false;
5317 std::string table_name = MidasNameToSqlName(event_name);
5318
5319 // FIXME: what about duplicate table names?
5320 status = CreateSqlTable(fSql, table_name.c_str(), &have_transaction);
5321
5322 //if (status == DB_KEY_EXIST) {
5323 // return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5324 //}
5325
5326 if (status != HS_SUCCESS) {
5327 // FIXME: ???
5328 // FIXME: at least close or revert the transaction
5329 return status;
5330 }
5331
5332 std::string cmd;
5333
5334 std::string en;
5335 en += "_event_name_";
5336 en += table_name;
5337
5338 cmd = "CREATE TABLE ";
5339 cmd += fSql->QuoteId(en.c_str());
5340 cmd += " (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5341
5342 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5343
5344 cmd = "INSERT INTO ";
5345 cmd += fSql->QuoteId(en.c_str());
5346 cmd += " (table_name, event_name, _i_time) VALUES (";
5347 cmd += fSql->QuoteString(table_name.c_str());
5348 cmd += ", ";
5349 cmd += fSql->QuoteString(event_name);
5350 cmd += ", ";
5351 cmd += fSql->QuoteString(TimeToString(timestamp).c_str());
5352 cmd += ");";
5353
5354 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5355
5356 std::string cn;
5357 cn += "_column_names_";
5358 cn += table_name;
5359
5360 cmd = "CREATE TABLE ";
5361 cmd += fSql->QuoteId(cn.c_str());
5362 cmd += " (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5363
5364 status = fSql->Exec(table_name.c_str(), cmd.c_str());
5365
5366 status = fSql->CommitTransaction(table_name.c_str());
5367 if (status != DB_SUCCESS) {
5368 return HS_FILE_ERROR;
5369 }
5370
5371 return ReadSqliteTableSchema(fSql, sv, table_name.c_str(), fDebug);
5372}
5373
5374int SqliteHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5375{
5376 if (fDebug)
5377 printf("SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name, TimeToString(timestamp).c_str());
5378
5379 int status = StartSqlTransaction(fSql, table_name, have_transaction);
5380 if (status != HS_SUCCESS)
5381 return status;
5382
5383 // FIXME: quotes
5384 std::string cmd;
5385 cmd = "INSERT INTO \'_column_names_";
5386 cmd += table_name;
5387 cmd += "\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5388 cmd += table_name;
5389 cmd += "\', \'";
5390 cmd += column_name;
5391 cmd += "\', \'";
5392 cmd += tag_name;
5393 cmd += "\', \'";
5394 cmd += tag_type;
5395 cmd += "\', \'";
5396 cmd += column_type;
5397 cmd += "\', \'";
5398 cmd += TimeToString(timestamp);
5399 cmd += "\');";
5400 status = fSql->Exec(table_name, cmd.c_str());
5401
5402 return status;
5403}
5404
5406// Mysql history classes //
5408
5410{
5411public:
5412 MysqlHistory() { // ctor
5413#ifdef HAVE_MYSQL
5414 fSql = new Mysql();
5415#endif
5416 }
5417
5419 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5420 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5421 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5422};
5423
5424static int ReadMysqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5425{
5426 if (debug)
5427 printf("ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5428
5429 int status;
5430 std::string cmd;
5431
5432 if (table_name) {
5433 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5434 cmd += table_name;
5435 cmd += "';";
5436 } else {
5437 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5438 table_name = "_history_index";
5439 }
5440
5441 status = sql->Prepare(table_name, cmd.c_str());
5442
5443 if (status != DB_SUCCESS)
5444 return status;
5445
5446 bool found_must_have_table = false;
5447 int count = 0;
5448
5449 while (1) {
5450 status = sql->Step();
5451
5452 if (status != DB_SUCCESS)
5453 break;
5454
5455 const char* xevent_name = sql->GetText(0);
5456 const char* xtable_name = sql->GetText(1);
5457 time_t xevent_time = sql->GetTime(2);
5458
5459 if (debug == 999) {
5460 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5461 }
5462
5464 assert(must_have_event_name != NULL);
5466 found_must_have_table = true;
5467 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5468 } else {
5469 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5470 }
5471 }
5472
5473 HsSqlSchema* s = new HsSqlSchema;
5474 s->sql = sql;
5477 s->time_to = 0;
5479 sv->add(s);
5480 count++;
5481 }
5482
5483 status = sql->Finalize();
5484
5486 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5487 if (debug == 999)
5488 return HS_FILE_ERROR;
5489 // NB: recursion is broken by setting debug to 999.
5491 cm_msg(MERROR, "ReadMysqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5493 abort();
5494 return HS_FILE_ERROR;
5495 }
5496
5497 if (0) {
5498 // print accumulated schema
5499 printf("ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5500 sv->print(false);
5501 }
5502
5503 return HS_SUCCESS;
5504}
5505
5506int MysqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5507{
5508 if (fDebug)
5509 printf("MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5510
5511 // for all schema for table_name, prepopulate is with column names
5512
5513 std::vector<std::string> columns;
5514 fSql->ListColumns(table_name, &columns);
5515
5516 // first, populate column names
5517
5518 for (unsigned i=0; i<sv->size(); i++) {
5519 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5520
5521 if (s->table_name != table_name)
5522 continue;
5523
5524 // schema should be empty at this point
5525 //assert(s->variables.size() == 0);
5526
5527 for (unsigned j=0; j<columns.size(); j+=2) {
5528 const char* cn = columns[j+0].c_str();
5529 const char* ct = columns[j+1].c_str();
5530
5531 if (strcmp(cn, "_t_time") == 0)
5532 continue;
5533 if (strcmp(cn, "_i_time") == 0)
5534 continue;
5535
5536 bool found = false;
5537
5538 for (unsigned k=0; k<s->column_names.size(); k++) {
5539 if (s->column_names[k] == cn) {
5540 found = true;
5541 break;
5542 }
5543 }
5544
5545 //printf("column [%s] sql type [%s]\n", cn.c_str(), ct);
5546
5547 if (!found) {
5549 se.tag_name = cn;
5550 se.tag_type = "";
5551 se.name = cn;
5552 se.type = 0;
5553 se.n_data = 1;
5554 se.n_bytes = 0;
5555 s->variables.push_back(se);
5556 s->column_names.push_back(cn);
5557 s->column_types.push_back(ct);
5558 s->offsets.push_back(-1);
5559 }
5560 }
5561 }
5562
5563 // then read column name information
5564
5565 std::string cmd;
5566 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5567 cmd += fSql->QuoteString(event_name);
5568 cmd += ";";
5569
5570 int status = fSql->Prepare(table_name, cmd.c_str());
5571
5572 if (status != DB_SUCCESS) {
5573 return status;
5574 }
5575
5576 while (1) {
5577 status = fSql->Step();
5578
5579 if (status != DB_SUCCESS)
5580 break;
5581
5582 const char* col_name = fSql->GetText(0);
5583 const char* col_type = fSql->GetText(1);
5584 const char* tag_name = fSql->GetText(2);
5585 const char* tag_type = fSql->GetText(3);
5587 const char* active = fSql->GetText(5);
5588 int iactive = atoi(active);
5589
5590 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
5591
5592 if (!col_name)
5593 continue;
5594 if (!tag_name)
5595 continue;
5596 if (strlen(col_name) < 1)
5597 continue;
5598
5599 // make sure a schema exists at this time point
5600 NewSqlSchema(sv, table_name, schema_time);
5601
5602 // add this information to all schema
5603
5604 for (unsigned i=0; i<sv->size(); i++) {
5605 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5606 if (s->table_name != table_name)
5607 continue;
5608 if (s->time_from < schema_time)
5609 continue;
5610
5611 int tid = rpc_name_tid(tag_type);
5612 int tid_size = rpc_tid_size(tid);
5613
5614 for (unsigned j=0; j<s->column_names.size(); j++) {
5615 if (col_name != s->column_names[j])
5616 continue;
5617
5618 s->variables[j].tag_name = tag_name;
5619 s->variables[j].tag_type = tag_type;
5620 if (!iactive) {
5621 s->variables[j].name = "";
5622 s->variables[j].inactive = true;
5623 } else {
5624 s->variables[j].name = tag_name;
5625 s->variables[j].inactive = false;
5626 }
5627 s->variables[j].type = tid;
5628 s->variables[j].n_data = 1;
5629 s->variables[j].n_bytes = tid_size;
5630
5631 // doctor column names in case MySQL returns different type
5632 // from the type used to create the column, but the types
5633 // are actually the same. K.O.
5635 }
5636 }
5637 }
5638
5639 status = fSql->Finalize();
5640
5641 return HS_SUCCESS;
5642}
5643
5644#if 0
5645static int ReadMysqlTableSchema(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug)
5646{
5647 if (debug)
5648 printf("ReadMysqlTableSchema: table [%s]\n", table_name);
5649
5650 if (1) {
5651 // seed schema with table names
5652 HsSqlSchema* s = new HsSqlSchema;
5653 s->sql = sql;
5654 s->event_name = table_name;
5655 s->time_from = 0;
5656 s->time_to = 0;
5657 s->table_name = table_name;
5658 sv->add(s);
5659 }
5660
5661 return ReadMysqlTableNames(sql, sv, table_name, debug, NULL, NULL);
5662}
5663#endif
5664
5666{
5667 int status;
5668
5669 if (fDebug)
5670 printf("MysqlHistory::read_table_and_event_names!\n");
5671
5672 // loop over all tables
5673
5674 std::vector<std::string> tables;
5676 if (status != DB_SUCCESS)
5677 return status;
5678
5679 for (unsigned i=0; i<tables.size(); i++) {
5680 const char* table_name = tables[i].c_str();
5681
5682 const char* s;
5683 s = strstr(table_name, "_history_index");
5684 if (s == table_name)
5685 continue;
5686
5687 if (1) {
5688 // seed schema with table names
5689 HsSqlSchema* s = new HsSqlSchema;
5690 s->sql = fSql;
5691 s->event_name = table_name;
5692 s->time_from = 0;
5693 s->time_to = 0;
5694 s->table_name = table_name;
5695 sv->add(s);
5696 }
5697 }
5698
5699 if (0) {
5700 // print accumulated schema
5701 printf("read_table_and_event_names:\n");
5702 sv->print(false);
5703 }
5704
5706
5707 return HS_SUCCESS;
5708}
5709
5710int MysqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
5711{
5712 if (fDebug)
5713 printf("MysqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
5714
5715 int status;
5716 std::string table_name = MidasNameToSqlName(event_name);
5717
5718 // MySQL table name length limit is 64 bytes
5719 if (table_name.length() > 40) {
5720 table_name.resize(40);
5721 table_name += "_T";
5722 }
5723
5724 time_t now = time(NULL);
5725
5726 int max_attempts = 10;
5727 for (int i=0; i<max_attempts; i++) {
5728 status = fSql->OpenTransaction(table_name.c_str());
5729 if (status != DB_SUCCESS) {
5730 return HS_FILE_ERROR;
5731 }
5732
5733 bool have_transaction = true;
5734
5735 std::string xtable_name = table_name;
5736
5737 if (i>0) {
5738 xtable_name += "_";
5740 if (i>1) {
5741 xtable_name += "_";
5742 char buf[256];
5743 sprintf(buf, "%d", i);
5744 xtable_name += buf;
5745 }
5746 }
5747
5749
5750 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
5751
5752 if (status == DB_KEY_EXIST) {
5753 // already exists, try with different name!
5754 fSql->RollbackTransaction(table_name.c_str());
5755 continue;
5756 }
5757
5758 if (status != HS_SUCCESS) {
5759 // MYSQL cannot roll back "create table", if we cannot create SQL tables, nothing will work. Give up now.
5760 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name, TimeToString(timestamp).c_str());
5761 abort();
5762
5763 // fatal error, give up!
5764 fSql->RollbackTransaction(table_name.c_str());
5765 break;
5766 }
5767
5768 for (int j=0; j<2; j++) {
5769 std::string cmd;
5770 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5771 cmd += fSql->QuoteString(event_name);
5772 cmd += ", ";
5773 cmd += fSql->QuoteString(xtable_name.c_str());
5774 cmd += ", ";
5775 char buf[256];
5776 sprintf(buf, "%.0f", (double)timestamp);
5777 cmd += fSql->QuoteString(buf);
5778 cmd += ", ";
5779 cmd += fSql->QuoteString("1");
5780 cmd += ");";
5781
5782 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
5783 if (status == DB_SUCCESS)
5784 break;
5785
5786 status = CreateSqlTable(fSql, "_history_index", &have_transaction);
5787 status = CreateSqlColumn(fSql, "_history_index", "event_name", "varchar(256) character set binary not null", &have_transaction, fDebug);
5788 status = CreateSqlColumn(fSql, "_history_index", "table_name", "varchar(256)", &have_transaction, fDebug);
5789 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "varchar(256) character set binary", &have_transaction, fDebug);
5790 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "varchar(256)", &have_transaction, fDebug);
5791 status = CreateSqlColumn(fSql, "_history_index", "column_name", "varchar(256)", &have_transaction, fDebug);
5792 status = CreateSqlColumn(fSql, "_history_index", "column_type", "varchar(256)", &have_transaction, fDebug);
5793 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
5794 status = CreateSqlColumn(fSql, "_history_index", "active", "boolean", &have_transaction, fDebug);
5795 }
5796
5797 status = fSql->CommitTransaction(table_name.c_str());
5798
5799 if (status != DB_SUCCESS) {
5800 return HS_FILE_ERROR;
5801 }
5802
5803 return ReadMysqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
5804 }
5805
5806 cm_msg(MERROR, "MysqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
5807
5808 return HS_FILE_ERROR;
5809}
5810
5811int MysqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
5812{
5813 if (fDebug)
5814 printf("MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
5815
5816 std::string cmd;
5817 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5818 cmd += fSql->QuoteString(event_name);
5819 cmd += ", ";
5820 cmd += fSql->QuoteString(table_name);
5821 cmd += ", ";
5822 cmd += fSql->QuoteString(tag_name);
5823 cmd += ", ";
5824 cmd += fSql->QuoteString(tag_type);
5825 cmd += ", ";
5826 cmd += fSql->QuoteString(column_name);
5827 cmd += ", ";
5828 cmd += fSql->QuoteString(column_type);
5829 cmd += ", ";
5830 char buf[256];
5831 sprintf(buf, "%.0f", (double)timestamp);
5832 cmd += fSql->QuoteString(buf);
5833 cmd += ", ";
5834 if (active)
5835 cmd += fSql->QuoteString("1");
5836 else
5837 cmd += fSql->QuoteString("0");
5838 cmd += ");";
5839
5840 int status = fSql->Exec(table_name, cmd.c_str());
5841 if (status != DB_SUCCESS)
5842 return HS_FILE_ERROR;
5843
5844 return HS_SUCCESS;
5845}
5846
5848// PostgreSQL history classes //
5850
5851#ifdef HAVE_PGSQL
5852
5853class PgsqlHistory: public SqlHistoryBase
5854{
5855public:
5856 Pgsql *fPgsql = NULL;
5857public:
5858 PgsqlHistory() { // ctor
5859 fPgsql = new Pgsql();
5860 fSql = fPgsql;
5861 }
5862
5864 int read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name);
5865 int create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp);
5866 int update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction);
5867};
5868
5869static int ReadPgsqlTableNames(SqlBase* sql, HsSchemaVector *sv, const char* table_name, int debug, const char* must_have_event_name, const char* must_have_table_name)
5870{
5871 if (debug)
5872 printf("ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5873
5874 int status;
5875 std::string cmd;
5876
5877 if (table_name) {
5878 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5879 cmd += table_name;
5880 cmd += "';";
5881 } else {
5882 cmd = "SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5883 table_name = "_history_index";
5884 }
5885
5886 status = sql->Prepare(table_name, cmd.c_str());
5887
5888 if (status != DB_SUCCESS)
5889 return status;
5890
5891 bool found_must_have_table = false;
5892 int count = 0;
5893
5894 while (1) {
5895 status = sql->Step();
5896
5897 if (status != DB_SUCCESS)
5898 break;
5899
5900 const char* xevent_name = sql->GetText(0);
5901 const char* xtable_name = sql->GetText(1);
5902 time_t xevent_time = sql->GetTime(2);
5903
5904 if (debug == 999) {
5905 printf("entry %d event name [%s] table name [%s] time %s\n", count, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5906 }
5907
5909 assert(must_have_event_name != NULL);
5911 found_must_have_table = true;
5912 //printf("Found table [%s]: event name [%s] table name [%s] time %s\n", must_have_table_name, xevent_name, xtable_name, TimeToString(xevent_time).c_str());
5913 } else {
5914 //printf("Found correct table [%s] with wrong event name [%s] expected [%s] time %s\n", must_have_table_name, xevent_name, must_have_event_name, TimeToString(xevent_time).c_str());
5915 }
5916 }
5917
5918 HsSqlSchema* s = new HsSqlSchema;
5919 s->sql = sql;
5922 s->time_to = 0;
5924 sv->add(s);
5925 count++;
5926 }
5927
5928 status = sql->Finalize();
5929
5931 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5932 if (debug == 999)
5933 return HS_FILE_ERROR;
5934 // NB: recursion is broken by setting debug to 999.
5936 cm_msg(MERROR, "ReadPgsqlTableNames", "Error: Cannot continue, nothing will work after this error\n");
5938 abort();
5939 return HS_FILE_ERROR;
5940 }
5941
5942 if (0) {
5943 // print accumulated schema
5944 printf("ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5945 sv->print(false);
5946 }
5947
5948 return HS_SUCCESS;
5949}
5950
5951int PgsqlHistory::read_column_names(HsSchemaVector *sv, const char* table_name, const char* event_name)
5952{
5953 if (fDebug)
5954 printf("PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5955
5956 // for all schema for table_name, prepopulate is with column names
5957
5958 std::vector<std::string> columns;
5959 fSql->ListColumns(table_name, &columns);
5960
5961 // first, populate column names
5962
5963 for (unsigned i=0; i<sv->size(); i++) {
5964 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
5965
5966 if (s->table_name != table_name)
5967 continue;
5968
5969 // schema should be empty at this point
5970 //assert(s->variables.size() == 0);
5971
5972 for (unsigned j=0; j<columns.size(); j+=2) {
5973 const char* cn = columns[j+0].c_str();
5974 const char* ct = columns[j+1].c_str();
5975
5976 if (strcmp(cn, "_t_time") == 0)
5977 continue;
5978 if (strcmp(cn, "_i_time") == 0)
5979 continue;
5980
5981 bool found = false;
5982
5983 for (unsigned k=0; k<s->column_names.size(); k++) {
5984 if (s->column_names[k] == cn) {
5985 found = true;
5986 break;
5987 }
5988 }
5989
5990 if (!found) {
5992 se.tag_name = cn;
5993 se.tag_type = "";
5994 se.name = cn;
5995 se.type = 0;
5996 se.n_data = 1;
5997 se.n_bytes = 0;
5998 s->variables.push_back(se);
5999 s->column_names.push_back(cn);
6000 s->column_types.push_back(ct);
6001 s->offsets.push_back(-1);
6002 }
6003 }
6004 }
6005
6006 // then read column name information
6007
6008 std::string cmd;
6009 cmd = "SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6010 cmd += fSql->QuoteString(event_name);
6011 cmd += ";";
6012
6013 int status = fSql->Prepare(table_name, cmd.c_str());
6014
6015 if (status != DB_SUCCESS) {
6016 return status;
6017 }
6018
6019 while (1) {
6020 status = fSql->Step();
6021
6022 if (status != DB_SUCCESS)
6023 break;
6024
6025 const char* col_name = fSql->GetText(0);
6026 const char* col_type = fSql->GetText(1);
6027 const char* tag_name = fSql->GetText(2);
6028 const char* tag_type = fSql->GetText(3);
6029 time_t schema_time = fSql->GetTime(4);
6030 const char* active = fSql->GetText(5);
6031 int iactive = atoi(active);
6032
6033 //printf("read table [%s] column [%s] type [%s] tag name [%s] type [%s] time %s active [%s] %d\n", table_name, col_name, col_type, tag_name, tag_type, TimeToString(schema_time).c_str(), active, iactive);
6034
6035 if (!col_name)
6036 continue;
6037 if (!tag_name)
6038 continue;
6039 if (strlen(col_name) < 1)
6040 continue;
6041
6042 // make sure a schema exists at this time point
6043 NewSqlSchema(sv, table_name, schema_time);
6044
6045 // add this information to all schema
6046 for (unsigned i=0; i<sv->size(); i++) {
6047 HsSqlSchema* s = (HsSqlSchema*)(*sv)[i];
6048 if (s->table_name != table_name)
6049 continue;
6050 if (s->time_from < schema_time)
6051 continue;
6052
6053 int tid = rpc_name_tid(tag_type);
6054 int tid_size = rpc_tid_size(tid);
6055
6056 for (unsigned j=0; j<s->column_names.size(); j++) {
6057 if (col_name != s->column_names[j])
6058 continue;
6059
6060 s->variables[j].tag_name = tag_name;
6061 s->variables[j].tag_type = tag_type;
6062 if (!iactive) {
6063 s->variables[j].name = "";
6064 s->variables[j].inactive = true;
6065 } else {
6066 s->variables[j].name = tag_name;
6067 s->variables[j].inactive = false;
6068 }
6069 s->variables[j].type = tid;
6070 s->variables[j].n_data = 1;
6071 s->variables[j].n_bytes = tid_size;
6072
6073 // doctor column names in case MySQL returns different type
6074 // from the type used to create the column, but the types
6075 // are actually the same. K.O.
6077 }
6078 }
6079 }
6080
6081 status = fSql->Finalize();
6082
6083 return HS_SUCCESS;
6084}
6085
6086int PgsqlHistory::read_table_and_event_names(HsSchemaVector *sv)
6087{
6088 int status;
6089
6090 if (fDebug)
6091 printf("PgsqlHistory::read_table_and_event_names!\n");
6092
6093 // loop over all tables
6094
6095 std::vector<std::string> tables;
6096 status = fSql->ListTables(&tables);
6097 if (status != DB_SUCCESS)
6098 return status;
6099
6100 for (unsigned i=0; i<tables.size(); i++) {
6101 const char* table_name = tables[i].c_str();
6102
6103 const char* s;
6104 s = strstr(table_name, "_history_index");
6105 if (s == table_name)
6106 continue;
6107
6108 if (1) {
6109 // seed schema with table names
6110 HsSqlSchema* s = new HsSqlSchema;
6111 s->sql = fSql;
6112 s->event_name = table_name;
6113 s->time_from = 0;
6114 s->time_to = 0;
6115 s->table_name = table_name;
6116 sv->add(s);
6117 }
6118 }
6119
6120 if (0) {
6121 // print accumulated schema
6122 printf("read_table_and_event_names:\n");
6123 sv->print(false);
6124 }
6125
6126 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6127
6128 return HS_SUCCESS;
6129}
6130
6131int PgsqlHistory::create_table(HsSchemaVector* sv, const char* event_name, time_t timestamp)
6132{
6133 if (fDebug)
6134 printf("PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name, TimeToString(timestamp).c_str());
6135
6136 int status;
6137 std::string table_name = MidasNameToSqlName(event_name);
6138
6139 // PostgreSQL table name length limit is 64 bytes
6140 if (table_name.length() > 40) {
6141 table_name.resize(40);
6142 table_name += "_T";
6143 }
6144
6145 time_t now = time(NULL);
6146
6147 int max_attempts = 10;
6148 for (int i=0; i<max_attempts; i++) {
6149 status = fSql->OpenTransaction(table_name.c_str());
6150 if (status != DB_SUCCESS) {
6151 return HS_FILE_ERROR;
6152 }
6153
6154 bool have_transaction = true;
6155
6156 std::string xtable_name = table_name;
6157
6158 if (i>0) {
6159 xtable_name += "_";
6161 if (i>1) {
6162 xtable_name += "_";
6163 char buf[256];
6164 sprintf(buf, "%d", i);
6165 xtable_name += buf;
6166 }
6167 }
6168
6169 if (fPgsql->fDownsample)
6171 else
6173
6174 //printf("event [%s] create table [%s] status %d\n", event_name, xtable_name.c_str(), status);
6175
6176 if (status == DB_KEY_EXIST) {
6177 // already exists, try with different name!
6178 fSql->RollbackTransaction(table_name.c_str());
6179 continue;
6180 }
6181
6182 if (status != HS_SUCCESS) {
6183 fSql->RollbackTransaction(table_name.c_str());
6184 continue;
6185 }
6186
6187 fSql->Exec(table_name.c_str(), "SAVEPOINT t0");
6188
6189 for (int j=0; j<2; j++) {
6190 std::string cmd;
6191 cmd += "INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6192 cmd += fSql->QuoteString(event_name);
6193 cmd += ", ";
6194 cmd += fSql->QuoteString(xtable_name.c_str());
6195 cmd += ", ";
6196 char buf[256];
6197 sprintf(buf, "%.0f", (double)timestamp);
6198 cmd += buf;
6199 cmd += ", ";
6200 cmd += fSql->QuoteString("1");
6201 cmd += ");";
6202
6203 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6204 if (status == DB_SUCCESS)
6205 break;
6206
6207 // if INSERT failed _history_index does not exist then recover to savepoint t0
6208 // to prevent whole transition abort
6209 fSql->Exec(table_name.c_str(), "ROLLBACK TO SAVEPOINT t0");
6210
6211 status = CreateSqlTable(fSql, "_history_index", &have_transaction, true);
6212 status = CreateSqlColumn(fSql, "_history_index", "event_name", "text not null", &have_transaction, fDebug);
6213 status = CreateSqlColumn(fSql, "_history_index", "table_name", "text", &have_transaction, fDebug);
6214 status = CreateSqlColumn(fSql, "_history_index", "tag_name", "text", &have_transaction, fDebug);
6215 status = CreateSqlColumn(fSql, "_history_index", "tag_type", "text", &have_transaction, fDebug);
6216 status = CreateSqlColumn(fSql, "_history_index", "column_name", "text", &have_transaction, fDebug);
6217 status = CreateSqlColumn(fSql, "_history_index", "column_type", "text", &have_transaction, fDebug);
6218 status = CreateSqlColumn(fSql, "_history_index", "itimestamp", "integer not null", &have_transaction, fDebug);
6219 status = CreateSqlColumn(fSql, "_history_index", "active", "smallint", &have_transaction, fDebug);
6220
6221 status = fSql->CommitTransaction(table_name.c_str());
6222 }
6223
6224 if (status != DB_SUCCESS) {
6225 return HS_FILE_ERROR;
6226 }
6227
6228 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6229 }
6230
6231 cm_msg(MERROR, "PgsqlHistory::create_table", "Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name, TimeToString(timestamp).c_str(), max_attempts);
6232
6233 return HS_FILE_ERROR;
6234}
6235
6236int PgsqlHistory::update_column(const char* event_name, const char* table_name, const char* column_name, const char* column_type, const char* tag_name, const char* tag_type, const time_t timestamp, bool active, bool* have_transaction)
6237{
6238 if (fDebug)
6239 printf("PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name, TimeToString(timestamp).c_str());
6240
6241 std::string cmd;
6242 cmd += "INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6243 cmd += fSql->QuoteString(event_name);
6244 cmd += ", ";
6245 cmd += fSql->QuoteString(table_name);
6246 cmd += ", ";
6247 cmd += fSql->QuoteString(tag_name);
6248 cmd += ", ";
6249 cmd += fSql->QuoteString(tag_type);
6250 cmd += ", ";
6251 cmd += fSql->QuoteString(column_name);
6252 cmd += ", ";
6253 cmd += fSql->QuoteString(column_type);
6254 cmd += ", ";
6255 char buf[256];
6256 sprintf(buf, "%.0f", (double)timestamp);
6257 cmd += buf;
6258 cmd += ", ";
6259 if (active)
6260 cmd += fSql->QuoteString("1");
6261 else
6262 cmd += fSql->QuoteString("0");
6263 cmd += ");";
6264
6265 int status = fSql->Exec(table_name, cmd.c_str());
6266 if (status != DB_SUCCESS)
6267 return HS_FILE_ERROR;
6268
6269 return HS_SUCCESS;
6270}
6271
6272#endif // HAVE_PGSQL
6273
6275// File history class //
6277
6278const time_t kDay = 24*60*60;
6279const time_t kMonth = 30*kDay;
6280
6281const double KiB = 1024;
6282const double MiB = KiB*KiB;
6283//const double GiB = KiB*MiB;
6284
6286{
6287protected:
6288 std::string fPath;
6290 std::vector<std::string> fSortedFiles;
6291 std::vector<bool> fSortedRead;
6294
6295public:
6296 FileHistory() // ctor
6297 {
6299 fConfMaxFileSize = 100*MiB;
6300
6301 fPathLastMtime = 0;
6302 }
6303
6304 int hs_connect(const char* connect_string);
6305 int hs_disconnect();
6306 int hs_clear_cache();
6307 int read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp);
6308 HsSchema* new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[]);
6309
6310protected:
6311 int create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep);
6312 HsFileSchema* read_file_schema(const char* filename);
6313 int read_file_list(bool *pchanged);
6314 void clear_file_list();
6315};
6316
6318{
6319 if (fDebug)
6320 printf("hs_connect [%s]!\n", connect_string);
6321
6322 hs_disconnect();
6323
6326
6327 // add trailing '/'
6328 if (fPath.length() > 0) {
6329 if (fPath[fPath.length()-1] != DIR_SEPARATOR)
6331 }
6332
6333 return HS_SUCCESS;
6334}
6335
6337{
6338 if (fDebug)
6339 printf("FileHistory::hs_clear_cache!\n");
6340 fPathLastMtime = 0;
6342}
6343
6345{
6346 if (fDebug)
6347 printf("FileHistory::hs_disconnect!\n");
6348
6351
6352 return HS_SUCCESS;
6353}
6354
6356{
6357 fPathLastMtime = 0;
6358 fSortedFiles.clear();
6359 fSortedRead.clear();
6360}
6361
6363{
6364 int status;
6365 double start_time = ss_time_sec();
6366
6367 if (pchanged)
6368 *pchanged = false;
6369
6370 struct stat stat_buf;
6371 status = stat(fPath.c_str(), &stat_buf);
6372 if (status != 0) {
6373 cm_msg(MERROR, "FileHistory::read_file_list", "Cannot stat(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
6374 return HS_FILE_ERROR;
6375 }
6376
6377 //printf("dir [%s], mtime: %d %d last: %d %d, mtime %s", fPath.c_str(), stat_buf.st_mtimespec.tv_sec, stat_buf.st_mtimespec.tv_nsec, last_mtimespec.tv_sec, last_mtimespec.tv_nsec, ctime(&stat_buf.st_mtimespec.tv_sec));
6378
6379 if (stat_buf.st_mtime == fPathLastMtime) {
6380 if (fDebug)
6381 printf("FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n", fPath.c_str(), int(stat_buf.st_mtime));
6382 return HS_SUCCESS;
6383 }
6384
6385 fPathLastMtime = stat_buf.st_mtime;
6386
6387 if (fDebug)
6388 printf("FileHistory::read_file_list: reading list of history files in \"%s\"\n", fPath.c_str());
6389
6390 std::vector<std::string> flist;
6391
6392 ss_file_find(fPath.c_str(), "mhf_*.dat", &flist);
6393
6394 double ls_time = ss_time_sec();
6395 double ls_elapsed = ls_time - start_time;
6396 if (ls_elapsed > 5.000) {
6397 cm_msg(MINFO, "FileHistory::read_file_list", "\"ls -l\" of \"%s\" took %.1f sec", fPath.c_str(), ls_elapsed);
6399 }
6400
6401 // note: reverse iterator is used to sort filenames by time, newest first
6402 std::sort(flist.rbegin(), flist.rend());
6403
6404#if 0
6405 {
6406 printf("file names sorted by time:\n");
6407 for (unsigned i=0; i<flist.size(); i++) {
6408 printf("%d: %s\n", i, flist[i].c_str());
6409 }
6410 }
6411#endif
6412
6413 std::vector<bool> fread;
6414 fread.resize(flist.size()); // fill with "false"
6415
6416 // loop over the old list of files,
6417 // for files we already read, loop over new file
6418 // list and mark the same file as read. K.O.
6419 for (size_t j=0; j<fSortedFiles.size(); j++) {
6420 if (fSortedRead[j]) {
6421 for (size_t i=0; i<flist.size(); i++) {
6422 if (flist[i] == fSortedFiles[j]) {
6423 fread[i] = true;
6424 break;
6425 }
6426 }
6427 }
6428 }
6429
6432
6433 if (pchanged)
6434 *pchanged = true;
6435
6436 return HS_SUCCESS;
6437}
6438
6439int FileHistory::read_schema(HsSchemaVector* sv, const char* event_name, const time_t timestamp)
6440{
6441 if (fDebug)
6442 printf("FileHistory::read_schema: event [%s] at time %s\n", event_name, TimeToString(timestamp).c_str());
6443
6444 if (fSchema.size() == 0) {
6445 if (fDebug)
6446 printf("FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6448 }
6449
6451 DWORD old_timeout = 0;
6454
6455 bool changed = false;
6456
6458
6459 if (status != HS_SUCCESS) {
6461 return status;
6462 }
6463
6464 if (!changed) {
6465 if ((*sv).find_event(event_name, timestamp)) {
6466 if (fDebug)
6467 printf("FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name, TimeToString(timestamp).c_str());
6469 return HS_SUCCESS;
6470 }
6471 }
6472
6473 double start_time = ss_time_sec();
6474
6475 int count_read = 0;
6476
6477 for (unsigned i=0; i<fSortedFiles.size(); i++) {
6478 std::string file_name = fPath + fSortedFiles[i];
6479 if (fSortedRead[i])
6480 continue;
6481 //bool dupe = false;
6482 //for (unsigned ss=0; ss<sv->size(); ss++) {
6483 // HsFileSchema* ssp = (HsFileSchema*)(*sv)[ss];
6484 // if (file_name == ssp->file_name) {
6485 // dupe = true;
6486 // break;
6487 // }
6488 //}
6489 //if (dupe)
6490 // continue;
6491 fSortedRead[i] = true;
6493 if (!s)
6494 continue;
6495 sv->add(s);
6496 count_read++;
6497
6498 if (event_name) {
6499 if (s->event_name == event_name) {
6500 //printf("file %s event_name %s time %s, age %f\n", file_name.c_str(), s->event_name.c_str(), TimeToString(s->time_from).c_str(), double(timestamp - s->time_from));
6501 if (s->time_from <= timestamp) {
6502 // this file is older than the time requested,
6503 // subsequent files will be even older,
6504 // we can stop reading here.
6505 break;
6506 }
6507 }
6508 }
6509 }
6510
6511 double end_time = ss_time_sec();
6512 double read_elapsed = end_time - start_time;
6513 if (read_elapsed > 5.000) {
6514 cm_msg(MINFO, "FileHistory::read_schema", "Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name, TimeToString(timestamp).c_str(), count_read, read_elapsed);
6516 }
6517
6519
6520 return HS_SUCCESS;
6521}
6522
6523HsSchema* FileHistory::new_event(const char* event_name, time_t timestamp, int ntags, const TAG tags[])
6524{
6525 if (fDebug)
6526 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name, TimeToString(timestamp).c_str(), ntags);
6527
6528 int status;
6529
6530 HsFileSchema* s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6531
6532 if (!s) {
6533 //printf("hs_define_event: no schema for event %s\n", event_name);
6534 status = read_schema(&fWriterCurrentSchema, event_name, timestamp);
6535 if (status != HS_SUCCESS)
6536 return NULL;
6537 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6538 } else {
6539 //printf("hs_define_event: already have schema for event %s\n", s->event_name.c_str());
6540 }
6541
6542 bool xdebug = false;
6543
6544 if (s) { // is existing schema the same as new schema?
6545 bool same = true;
6546
6547 if (same)
6548 if (s->event_name != event_name) {
6549 if (xdebug)
6550 printf("AAA: [%s] [%s]!\n", s->event_name.c_str(), event_name);
6551 same = false;
6552 }
6553
6554 if (same)
6555 if (s->variables.size() != (unsigned)ntags) {
6556 if (xdebug)
6557 printf("BBB: event [%s]: ntags: %d -> %d!\n", event_name, (int)s->variables.size(), ntags);
6558 same = false;
6559 }
6560
6561 if (same)
6562 for (unsigned i=0; i<s->variables.size(); i++) {
6563 if (s->variables[i].name != tags[i].name) {
6564 if (xdebug)
6565 printf("CCC: event [%s] index %d: name [%s] -> [%s]!\n", event_name, i, s->variables[i].name.c_str(), tags[i].name);
6566 same = false;
6567 }
6568 if (s->variables[i].type != (int)tags[i].type) {
6569 if (xdebug)
6570 printf("DDD: event [%s] index %d: type %d -> %d!\n", event_name, i, s->variables[i].type, tags[i].type);
6571 same = false;
6572 }
6573 if (s->variables[i].n_data != (int)tags[i].n_data) {
6574 if (xdebug)
6575 printf("EEE: event [%s] index %d: n_data %d -> %d!\n", event_name, i, s->variables[i].n_data, tags[i].n_data);
6576 same = false;
6577 }
6578 if (!same)
6579 break;
6580 }
6581
6582 if (!same) {
6583 if (xdebug) {
6584 printf("*** Schema for event %s has changed!\n", event_name);
6585
6586 printf("*** Old schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6587 s->print();
6588 printf("*** New tags:\n");
6589 PrintTags(ntags, tags);
6590 }
6591
6592 if (fDebug)
6593 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags);
6594
6595 s = NULL;
6596 }
6597 }
6598
6599 if (s) {
6600 // maybe this schema is too old - rotate files every so often
6601 time_t age = timestamp - s->time_from;
6602 //printf("*** age %s (%.1f months), timestamp %s, time_from %s, file %s\n", TimeToString(age).c_str(), (double)age/(double)kMonth, TimeToString(timestamp).c_str(), TimeToString(s->time_from).c_str(), s->file_name.c_str());
6603 if (age > fConfMaxFileAge) {
6604 if (fDebug)
6605 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, (double)age/(double)kMonth);
6606
6607 // force creation of a new file
6608 s = NULL;
6609 }
6610 }
6611
6612 if (s) {
6613 // maybe this file is too big - rotate files to limit maximum size
6614 double size = ss_file_size(s->file_name.c_str());
6615 //printf("*** size %.0f, file %s\n", size, s->file_name.c_str());
6616 if (size > fConfMaxFileSize) {
6617 if (fDebug)
6618 printf("FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, starting a new file.\n", event_name, TimeToString(timestamp).c_str(), ntags, size/MiB);
6619
6620 // force creation of a new file
6621 s = NULL;
6622 }
6623 }
6624
6625 if (!s) {
6626 std::string filename;
6627
6628 status = create_file(event_name, timestamp, ntags, tags, &filename);
6629 if (status != HS_SUCCESS)
6630 return NULL;
6631
6632 HsFileSchema* ss = read_file_schema(filename.c_str());
6633 if (!ss) {
6634 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6635 return NULL;
6636 }
6637
6639
6640 s = (HsFileSchema*)fWriterCurrentSchema.find_event(event_name, timestamp);
6641
6642 if (!s) {
6643 cm_msg(MERROR, "FileHistory::new_event", "Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6644 return NULL;
6645 }
6646
6647 if (xdebug) {
6648 printf("*** New schema for event [%s] time %s:\n", event_name, TimeToString(timestamp).c_str());
6649 s->print();
6650 }
6651 }
6652
6653 assert(s != NULL);
6654
6655#if 0
6656 {
6657 printf("schema for [%s] is %p\n", event_name, s);
6658 if (s)
6659 s->print();
6660 }
6661#endif
6662
6663 HsFileSchema* e = new HsFileSchema();
6664
6665 *e = *s; // make a copy of the schema
6666
6667 return e;
6668}
6669
6670int FileHistory::create_file(const char* event_name, time_t timestamp, int ntags, const TAG tags[], std::string* filenamep)
6671{
6672 if (fDebug)
6673 printf("FileHistory::create_file: event [%s]\n", event_name);
6674
6675 // NB: file names are constructed in such a way
6676 // that when sorted lexicographically ("ls -1 | sort")
6677 // they *also* become sorted by time
6678
6679 struct tm tm;
6680 localtime_r(&timestamp, &tm); // somebody must call tzset() before this.
6681
6682 char buf[256];
6683 strftime(buf, sizeof(buf), "%Y%m%d", &tm);
6684
6685 std::string filename;
6686 filename += fPath;
6687 filename += "mhf_";
6688 filename += TimeToString(timestamp);
6689 filename += "_";
6690 filename += buf;
6691 filename += "_";
6692 filename += MidasNameToFileName(event_name);
6693
6694 std::string try_filename = filename + ".dat";
6695
6696 FILE *fp = NULL;
6697 for (int itry=0; itry<10; itry++) {
6698 if (itry > 0) {
6699 char s[256];
6700 sprintf(s, "_%d", rand());
6701 try_filename = filename + s + ".dat";
6702 }
6703
6704 fp = fopen(try_filename.c_str(), "r");
6705 if (fp != NULL) {
6706 // this file already exists, try with a different name
6707 fclose(fp);
6708 continue;
6709 }
6710
6711 fp = fopen(try_filename.c_str(), "w");
6712 if (fp == NULL) {
6713 // somehow cannot create this file, try again
6714 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6715 continue;
6716 }
6717
6718 // file opened
6719 break;
6720 }
6721
6722 if (fp == NULL) {
6723 // somehow cannot create any file, whine!
6724 cm_msg(MERROR, "FileHistory::create_file", "Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6725 return HS_FILE_ERROR;
6726 }
6727
6728 std::string ss;
6729
6730 ss += "version: 2.0\n";
6731 ss += "event_name: ";
6732 ss += event_name;
6733 ss += "\n";
6734 ss += "time: ";
6735 ss += TimeToString(timestamp);
6736 ss += "\n";
6737
6738 int recsize = 0;
6739
6740 ss += "tag: /DWORD 1 4 /timestamp\n";
6741 recsize += 4;
6742
6743 bool padded = false;
6744 int offset = 0;
6745
6746 bool xdebug = false; // (strcmp(event_name, "u_Beam") == 0);
6747
6748 for (int i=0; i<ntags; i++) {
6749 int tsize = rpc_tid_size(tags[i].type);
6750 int n_bytes = tags[i].n_data*tsize;
6751 int xalign = (offset % tsize);
6752
6753 if (xdebug)
6754 printf("tag %d, tsize %d, n_bytes %d, xalign %d\n", i, tsize, n_bytes, xalign);
6755
6756#if 0
6757 // looks like history data does not do alignement and padding
6758 if (xalign != 0) {
6759 padded = true;
6760 int pad_bytes = tsize - xalign;
6761 assert(pad_bytes > 0);
6762
6763 ss += "tag: ";
6764 ss += "XPAD";
6765 ss += " ";
6766 ss += SmallIntToString(1);
6767 ss += " ";
6769 ss += " ";
6770 ss += "pad_";
6771 ss += SmallIntToString(i);
6772 ss += "\n";
6773
6774 offset += pad_bytes;
6775 recsize += pad_bytes;
6776
6777 assert((offset % tsize) == 0);
6778 fprintf(stderr, "FIXME: need to debug padding!\n");
6779 //abort();
6780 }
6781#endif
6782
6783 ss += "tag: ";
6784 ss += rpc_tid_name(tags[i].type);
6785 ss += " ";
6786 ss += SmallIntToString(tags[i].n_data);
6787 ss += " ";
6788 ss += SmallIntToString(n_bytes);
6789 ss += " ";
6790 ss += tags[i].name;
6791 ss += "\n";
6792
6793 recsize += n_bytes;
6794 offset += n_bytes;
6795 }
6796
6797 ss += "record_size: ";
6799 ss += "\n";
6800
6801 // reserve space for "data_offset: ..."
6802 int sslength = ss.length() + 127;
6803
6804 int block = 1024;
6805 int nb = (sslength + block - 1)/block;
6806 int data_offset = block * nb;
6807
6808 ss += "data_offset: ";
6809 ss += SmallIntToString(data_offset);
6810 ss += "\n";
6811
6812 fprintf(fp, "%s", ss.c_str());
6813
6814 fclose(fp);
6815
6816 if (1 && padded) {
6817 printf("Schema in file %s has padding:\n", try_filename.c_str());
6818 printf("%s", ss.c_str());
6819 }
6820
6821 if (filenamep)
6823
6824 return HS_SUCCESS;
6825}
6826
6828{
6829 if (fDebug)
6830 printf("FileHistory::read_file_schema: file %s\n", filename);
6831
6832 FILE* fp = fopen(filename, "r");
6833 if (!fp) {
6834 cm_msg(MERROR, "FileHistory::read_file_schema", "Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
6835 return NULL;
6836 }
6837
6838 HsFileSchema* s = NULL;
6839
6840 // File format looks like this:
6841 // version: 2.0
6842 // event_name: u_Beam
6843 // time: 1023174012
6844 // tag: /DWORD 1 4 /timestamp
6845 // tag: FLOAT 1 4 B1
6846 // ...
6847 // tag: FLOAT 1 4 Ref Heater
6848 // record_size: 84
6849 // data_offset: 1024
6850
6851 int rd_recsize = 0;
6852 int offset = 0;
6853
6854 while (1) {
6855 char buf[1024];
6856 char* b = fgets(buf, sizeof(buf), fp);
6857
6858 //printf("read: %s\n", b);
6859
6860 if (!b) {
6861 break; // end of file
6862 }
6863
6864 char*bb;
6865
6866 bb = strchr(b, '\n');
6867 if (bb)
6868 *bb = 0;
6869
6870 bb = strchr(b, '\r');
6871 if (bb)
6872 *bb = 0;
6873
6874 bb = strstr(b, "version: 2.0");
6875 if (bb == b) {
6876 s = new HsFileSchema();
6877 assert(s);
6878
6879 s->file_name = filename;
6880 continue;
6881 }
6882
6883 if (!s) {
6884 // malformed history file
6885 break;
6886 }
6887
6888 bb = strstr(b, "event_name: ");
6889 if (bb == b) {
6890 s->event_name = bb + 12;
6891 continue;
6892 }
6893
6894 bb = strstr(b, "time: ");
6895 if (bb == b) {
6896 s->time_from = strtoul(bb + 6, NULL, 10);
6897 continue;
6898 }
6899
6900 // tag format is like this:
6901 //
6902 // tag: FLOAT 1 4 Ref Heater
6903 //
6904 // "FLOAT" is the MIDAS type, "/DWORD" is special tag for the timestamp
6905 // "1" is the number of array elements
6906 // "4" is the total tag size in bytes (n_data*tid_size)
6907 // "Ref Heater" is the tag name
6908
6909 bb = strstr(b, "tag: ");
6910 if (bb == b) {
6911 bb += 5; // now points to the tag MIDAS type
6912 const char* midas_type = bb;
6913 char* bbb = strchr(bb, ' ');
6914 if (bbb) {
6915 *bbb = 0;
6916 HsSchemaEntry t;
6917 if (midas_type[0] == '/') {
6918 t.type = 0;
6919 } else {
6921 if (t.type == 0) {
6922 cm_msg(MERROR, "FileHistory::read_file_schema", "Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
6923 if (s)
6924 delete s;
6925 s = NULL;
6926 break;
6927 }
6928 }
6929 bbb++;
6930 while (*bbb == ' ')
6931 bbb++;
6932 if (*bbb) {
6933 t.n_data = strtoul(bbb, &bbb, 10);
6934 while (*bbb == ' ')
6935 bbb++;
6936 if (*bbb) {
6937 t.n_bytes = strtoul(bbb, &bbb, 10);
6938 while (*bbb == ' ')
6939 bbb++;
6940 t.name = bbb;
6941 }
6942 }
6943
6944 if (midas_type[0] != '/') {
6945 s->variables.push_back(t);
6946 s->offsets.push_back(offset);
6947 offset += t.n_bytes;
6948 }
6949
6950 rd_recsize += t.n_bytes;
6951 }
6952 continue;
6953 }
6954
6955 bb = strstr(b, "record_size: ");
6956 if (bb == b) {
6957 s->record_size = atoi(bb + 12);
6958 continue;
6959 }
6960
6961 bb = strstr(b, "data_offset: ");
6962 if (bb == b) {
6963 s->data_offset = atoi(bb + 12);
6964 // data offset is the last entry in the file
6965 break;
6966 }
6967 }
6968
6969 fclose(fp);
6970
6971 if (!s) {
6972 cm_msg(MERROR, "FileHistory::read_file_schema", "Malformed history schema in \'%s\', maybe it is not a history file", filename);
6973 return NULL;
6974 }
6975
6976 if (rd_recsize != s->record_size) {
6977 cm_msg(MERROR, "FileHistory::read_file_schema", "Record size mismatch in history schema from \'%s\', file says %d while total of all tags is %d", filename, s->record_size, rd_recsize);
6978 if (s)
6979 delete s;
6980 return NULL;
6981 }
6982
6983 if (!s) {
6984 cm_msg(MERROR, "FileHistory::read_file_schema", "Could not read history schema from \'%s\', maybe it is not a history file", filename);
6985 if (s)
6986 delete s;
6987 return NULL;
6988 }
6989
6990 if (fDebug > 1)
6991 s->print();
6992
6993 return s;
6994}
6995
6997// Factory constructors //
6999
7001{
7002#ifdef HAVE_SQLITE
7003 return new SqliteHistory();
7004#else
7005 cm_msg(MERROR, "MakeMidasHistorySqlite", "Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7006 return NULL;
7007#endif
7008}
7009
7011{
7012#ifdef HAVE_MYSQL
7013 return new MysqlHistory();
7014#else
7015 cm_msg(MERROR, "MakeMidasHistoryMysql", "Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7016 return NULL;
7017#endif
7018}
7019
7021{
7022#ifdef HAVE_PGSQL
7023 return new PgsqlHistory();
7024#else
7025 cm_msg(MERROR, "MakeMidasHistoryPgsql", "Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
7026 return NULL;
7027#endif
7028}
7029
7034
7035/* emacs
7036 * Local Variables:
7037 * tab-width: 8
7038 * c-basic-offset: 3
7039 * indent-tabs-mode: nil
7040 * End:
7041 */
#define FALSE
Definition cfortran.h:309
std::string fPath
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int create_file(const char *event_name, time_t timestamp, int ntags, const TAG tags[], std::string *filenamep)
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
unsigned size() const
void print(bool print_tags=true) const
std::vector< HsSchema * > data
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
void add(HsSchema *s)
HsSchema * operator[](int index) const
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
Definition history.h:112
char name[NAME_LENGTH]
Definition history.h:111
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
HsSchemaVector fSchema
std::vector< HsSchema * > fEvents
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
HsSchemaVector fWriterCurrentSchema
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Finalize()=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual ~SqlBase()
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual int Step()=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3317
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3283
#define DB_KEY_EXIST
Definition midas.h:641
#define DB_FILE_ERROR
Definition midas.h:647
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_MORE_SUBKEYS
Definition midas.h:646
#define HS_UNDEFINED_VAR
Definition midas.h:733
#define HS_SUCCESS
Definition midas.h:727
#define HS_FILE_ERROR
Definition midas.h:728
#define HS_UNDEFINED_EVENT
Definition midas.h:732
unsigned int DWORD
Definition mcstd.h:51
#define TID_DOUBLE
Definition midas.h:343
#define TID_SBYTE
Definition midas.h:329
#define TID_BOOL
Definition midas.h:340
#define TID_SHORT
Definition midas.h:334
#define TID_WORD
Definition midas.h:332
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_CHAR
Definition midas.h:331
#define TID_INT
Definition midas.h:338
#define TID_FLOAT
Definition midas.h:341
#define TID_LAST
Definition midas.h:354
#define TID_DWORD
Definition midas.h:336
double ss_file_size(const char *path)
Definition system.cxx:6972
double ss_time_sec()
Definition system.cxx:3467
INT ss_file_find(const char *path, const char *pattern, char **plist)
Definition system.cxx:6713
INT cm_msg_flush_buffer()
Definition midas.cxx:865
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
const char * rpc_tid_name(INT id)
Definition midas.cxx:11764
int rpc_name_tid(const char *name)
Definition midas.cxx:11778
INT rpc_tid_size(INT id)
Definition midas.cxx:11757
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
static int ReadRecord(const char *file_name, int fd, int offset, int recsize, int irec, char *rec)
const time_t kMonth
static int FindTime(const char *file_name, int fd, int offset, int recsize, int nrec, time_t timestamp, int *i1p, time_t *t1p, int *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
const double KiB
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
const double MiB
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
const time_t kDay
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
INT index
Definition mana.cxx:271
DWORD last_time
Definition mana.cxx:3070
void * data
Definition mana.cxx:268
BOOL debug
debug printouts
Definition mana.cxx:254
INT type
Definition mana.cxx:269
char host_name[HOST_NAME_LENGTH]
Definition mana.cxx:242
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
static int offset
Definition mgd.cxx:1500
#define DIR_SEPARATOR
Definition midas.h:193
DWORD BOOL
Definition midas.h:105
#define DIR_SEPARATOR_STR
Definition midas.h:194
#define read(n, a, f)
#define write(n, a, f, d)
#define name(x)
Definition midas_macro.h:24
static FILE * fp
struct callback_addr callback
Definition mserver.cxx:22
MUTEX_T * tm
Definition odbedit.cxx:39
BOOL match(char *pat, char *str)
Definition odbedit.cxx:190
INT j
Definition odbhist.cxx:40
INT k
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
char file_name[256]
Definition odbhist.cxx:41
INT add
Definition odbhist.cxx:40
DWORD status
Definition odbhist.cxx:39
char var_name[256]
Definition odbhist.cxx:41
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
std::string file_name
int write_event(const time_t t, const char *data, const int data_size)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
std::string tag_name
bool inactive
int type
std::string tag_type
HsSchemaEntry()
int n_data
std::string name
int n_bytes
virtual void print(bool print_tags=true) const
virtual ~HsSchema()
int count_write_undersize
std::vector< int > offsets
virtual int write_event(const time_t t, const char *data, const int data_size)=0
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
int count_write_oversize
std::string event_name
virtual int close()=0
std::vector< HsSchemaEntry > variables
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
std::string table_name
void increment_transaction_count()
std::vector< std::string > column_types
std::vector< std::string > disconnected_buffer
static std::map< SqlBase *, int > global_transaction_count
int match_event_var(const char *event_name, const char *var_name, const int var_index)
int write_event(const time_t t, const char *data, const int data_size)
void reset_transaction_count()
std::vector< std::string > column_names
Definition midas.h:1234
DWORD type
Definition midas.h:1236
DWORD n_data
Definition midas.h:1237
char name[NAME_LENGTH]
Definition midas.h:1235
char c
Definition system.cxx:1310
@ DIR
Definition test_init.cxx:7
static double e(void)
Definition tinyexpr.c:136