53#define FREE(x) { if (x) free(x); (x) = NULL; }
67 const char* sign =
"";
83 char c =
'0' + (char)(tt%10);
106 char c =
'0' + (char)(
i%10);
120 bool newStyleEventName = (strchr(var_event_name,
'/')!=NULL);
124 if (strcasecmp(event_name, var_event_name) == 0) {
126 }
else if (newStyleEventName) {
131 const char* s = event_name;
132 for (
int j=0; s[
j];
j++) {
134 if ((var_event_name[
j]==0) && (s[
j]==
'/')) {
139 if ((var_event_name[
j]==0) && (s[
j]==
'_')) {
144 if (var_event_name[
j]==0) {
149 if (tolower(var_event_name[
j]) != tolower(s[
j])) {
159static bool MatchTagName(
const char* tag_name,
int n_data,
const char* var_tag_name,
const int var_tag_index)
161 char alt_tag_name[1024];
162 sprintf(alt_tag_name,
"%s[%d]", var_tag_name, var_tag_index);
166 if (strcasecmp(tag_name, var_tag_name) == 0)
167 if (var_tag_index >= 0 && var_tag_index < n_data)
170 if (strcasecmp(tag_name, alt_tag_name) == 0)
178 for (
int i=0;
i<ntags;
i++)
188 for (
int i=0; s[
i]!=0;
i++) {
190 if (isalpha(
c) || isdigit(
c))
205 for (
int i=0; s[
i]!=0;
i++) {
207 if (isalpha(
c) || isdigit(
c))
220 return strcasecmp(e1.c_str(), e2);
227 return strcasecmp(v1.c_str(), v2);
235static const char *sql_type_sqlite[
TID_LAST] = {
249 "xxxINVALIDxxxARRAY",
250 "xxxINVALIDxxxSTRUCT",
257static const char *sql_type_pgsql[
TID_LAST] = {
271 "xxxINVALIDxxxARRAY",
272 "xxxINVALIDxxxSTRUCT",
293 "xxxINVALIDxxxARRAY",
294 "xxxINVALIDxxxSTRUCT",
302 if (*col_type == index_type)
305 if (*col_type ==
"bigint" && strcmp(index_type,
"int8")==0) {
306 *col_type = index_type;
310 if (*col_type ==
"integer" && strcmp(index_type,
"int4")==0) {
311 *col_type = index_type;
315 if (*col_type ==
"smallint" && strcmp(index_type,
"int2")==0) {
316 *col_type = index_type;
320 cm_msg(
MERROR,
"SqlHistory",
"Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
327 if (*col_type == index_type)
330 if (*col_type ==
"int(10) unsigned" && strcmp(index_type,
"integer unsigned")==0) {
331 *col_type = index_type;
335 if (*col_type ==
"int(11)" && strcmp(index_type,
"integer")==0) {
336 *col_type = index_type;
340 if (*col_type ==
"integer" && strcmp(index_type,
"int(11)")==0) {
341 *col_type = index_type;
347 if (*col_type ==
"int" && strcmp(index_type,
"integer")==0) {
348 *col_type = index_type;
352 if (*col_type ==
"int unsigned" && strcmp(index_type,
"integer unsigned")==0) {
353 *col_type = index_type;
357 cm_msg(
MERROR,
"SqlHistory",
"Cannot use this SQL database, incompatible column names: created column type [%s] is reported with column type [%s]", index_type, col_type->c_str());
363static int sql2midasType_mysql(
const char*
name)
365 for (
int tid=0; tid<
TID_LAST; tid++)
369 printf(
"sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n",
name);
375static int sql2midasType_sqlite(
const char*
name)
377 if (strcmp(
name,
"INTEGER") == 0)
379 if (strcmp(
name,
"REAL") == 0)
381 if (strcmp(
name,
"TEXT") == 0)
384 printf(
"sql2midasType: Cannot convert SQL data type \'%s\' to a MIDAS data type!\n",
name);
431 virtual void print(
bool print_tags =
true)
const;
435 virtual int write_event(
const time_t t,
const char*
data,
const size_t data_size) = 0;
439 time_t* last_written) = 0;
441 const time_t end_time,
442 const int num_var,
const std::vector<int>& var_schema_index,
const int var_index[],
469 for (
size_t i=0;
i<
fData.size();
i++)
477 void print(
bool print_tags =
true)
const {
478 for (
size_t i=0;
i<
fData.size();
i++)
511 for (
auto it =
fData.begin(); it !=
fData.end(); it++) {
537 for (
auto it =
fData.begin(); it !=
fData.end(); it++) {
540 time_to = (*it)->fTimeFrom;
552 printf(
"find_event: All schema for event %s: (total %zu)\n", event_name,
fData.size());
554 for (
size_t i=0;
i<
fData.size();
i++) {
556 printf(
"find_event: schema %zu name [%s]\n",
i, s->
fEventName.c_str());
563 printf(
"find_event: Found %d schemas for event %s\n", found, event_name);
569 for (
size_t i=0;
i<
fData.size();
i++) {
590 for (
size_t i=0;
i<
fData.size();
i++) {
612 printf(
"find_event: for time %s, returning:\n",
TimeToString(t).c_str());
615 printf(
"find_event: for time %s, nothing found:\n",
TimeToString(t).c_str());
649 virtual int ListColumns(
const char* table_name, std::vector<std::string> *plist) = 0;
652 virtual int Exec(
const char* table_name,
const char* sql) = 0;
656 virtual int Prepare(
const char* table_name,
const char* sql) = 0;
674 virtual std::string
QuoteId(
const char* s) = 0;
704 void print(
bool print_tags =
true)
const;
711 int write_event(
const time_t t,
const char*
data,
const size_t data_size);
715 time_t* last_written);
717 const time_t end_time,
718 const int num_var,
const std::vector<int>& var_schema_index,
const int var_index[],
773 void print(
bool print_tags =
true)
const;
776 int write_event(
const time_t t,
const char*
data,
const size_t data_size);
779 time_t* last_written);
781 const time_t end_time,
782 const int num_var,
const std::vector<int>& var_schema_index,
const int var_index[],
797 for (
size_t j=0;
j<nv;
j++)
798 printf(
" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n",
j, this->
fVariables[
j].name.c_str(),
rpc_tid_name(this->
fVariables[
j].
type), this->
fVariables[
j].type, this->
fVariables[
j].n_data, this->
fVariables[
j].n_bytes, this->
fOffsets[
j]);
804 printf(
"event [%s], sql_table [%s], time %s..%s, %zu variables, %zu bytes\n", this->
fEventName.c_str(), this->fTableName.c_str(),
TimeToString(this->
fTimeFrom).c_str(),
TimeToString(this->
fTimeTo).c_str(), nv,
fNumBytes);
806 for (
size_t j=0;
j<nv;
j++) {
818 printf(
"event [%s], file_name [%s], time %s..%s, %zu variables, %zu bytes, dat_offset %jd, record_size %zu\n", this->
fEventName.c_str(), this->fFileName.c_str(),
TimeToString(this->
fTimeFrom).c_str(),
TimeToString(this->
fTimeTo).c_str(), nv,
fNumBytes, (intmax_t)
fDataOffset,
fRecordSize);
820 for (
size_t j=0;
j<nv;
j++)
821 printf(
" %zu: name [%s], type [%s] tid %d, n_data %d, n_bytes %d, offset %d\n",
j, this->
fVariables[
j].name.c_str(),
rpc_tid_name(this->
fVariables[
j].
type), this->
fVariables[
j].type, this->
fVariables[
j].n_data, this->
fVariables[
j].n_bytes, this->
fOffsets[
j]);
843 std::string fConnectString;
844 MYSQL* fMysql = NULL;
847 MYSQL_RES* fResult = NULL;
848 MYSQL_ROW fRow = NULL;
852 size_t fMaxDisconnected = 0;
853 std::list<std::string> fDisconnectedBuffer;
854 time_t fNextReconnect = 0;
855 int fNextReconnectDelaySec = 0;
856 int fDisconnectedLost = 0;
865 int ConnectTable(
const char* table_name);
867 int ListTables(std::vector<std::string> *plist);
868 int ListColumns(
const char* table_name, std::vector<std::string> *plist);
870 int Exec(
const char* table_name,
const char* sql);
873 int Prepare(
const char* table_name,
const char* sql);
875 const char*
GetText(
int column);
887 std::string
QuoteId(
const char* s);
897 fMaxDisconnected = 1000;
899 fNextReconnectDelaySec = 0;
900 fDisconnectedLost = 0;
901 fTransactionPerTable =
false;
911 if (fDisconnectedBuffer.size() > 0) {
912 cm_msg(
MINFO,
"Mysql::~Mysql",
"Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
917int Mysql::Connect(
const char* connect_string)
922 fConnectString = connect_string;
925 cm_msg(
MINFO,
"Mysql::Connect",
"Connecting to Mysql database specified by \'%s\'", connect_string);
930 std::string user_name;
931 std::string user_password;
934 std::string unix_socket;
937 FILE*
fp = fopen(connect_string,
"r");
939 cm_msg(
MERROR,
"Mysql::Connect",
"Cannot read MYSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
945 char* s = fgets(buf,
sizeof(buf),
fp);
951 ss = strchr(s,
'\n');
953 ss = strchr(s,
'\r');
958 if (strncasecmp(s,
"server=", 7)==0)
960 if (strncasecmp(s,
"port=", 5)==0)
962 if (strncasecmp(s,
"database=", 9)==0)
964 if (strncasecmp(s,
"socket=", 7)==0)
966 if (strncasecmp(s,
"user=", 5)==0)
968 if (strncasecmp(s,
"password=", 9)==0)
970 if (strncasecmp(s,
"buffer=", 7)==0)
976 int buffer_int = atoi(buffer.c_str());
978 if (buffer_int > 0 && buffer_int < 1000000)
979 fMaxDisconnected = buffer_int;
982 printf(
"Mysql::Connect: connecting to server [%s] port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n",
host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
985 fMysql = mysql_init(NULL);
991 int client_flag = 0|CLIENT_IGNORE_SIGPIPE;
993 if (mysql_real_connect(fMysql,
host_name.c_str(), user_name.c_str(), user_password.c_str(), db_name.c_str(), tcp_port, unix_socket.c_str(), client_flag) == NULL) {
994 cm_msg(
MERROR,
"Mysql::Connect",
"mysql_real_connect() to host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s]: error %d (%s)",
host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(),
"xxx", mysql_errno(fMysql), mysql_error(fMysql));
1005 status = Exec(
"(notable)",
"SET SESSION sql_mode='ANSI'");
1007 cm_msg(
MERROR,
"Mysql::Connect",
"Cannot set ANSI mode, nothing will work");
1013 cm_msg(
MINFO,
"Mysql::Connect",
"Connected to a MySQL database on host [%s], port %d, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu",
host_name.c_str(), tcp_port, unix_socket.c_str(), db_name.c_str(), user_name.c_str(),
"xxx", fMaxDisconnected);
1017 fIsConnected =
true;
1020 while (fDisconnectedBuffer.size() > 0) {
1021 status = Exec(
"(flush)", fDisconnectedBuffer.front().c_str());
1025 fDisconnectedBuffer.pop_front();
1030 cm_msg(
MINFO,
"Mysql::Connect",
"Saved %d, lost %d history events accumulated while disconnected from the database",
count, fDisconnectedLost);
1034 assert(fDisconnectedBuffer.size() == 0);
1035 fDisconnectedLost = 0;
1040int Mysql::Disconnect()
1047 mysql_free_result(fResult);
1050 mysql_close(fMysql);
1056 fIsConnected =
false;
1060bool Mysql::IsConnected()
1062 return fIsConnected;
1065int Mysql::OpenTransaction(
const char* table_name)
1067 return Exec(table_name,
"START TRANSACTION");
1071int Mysql::CommitTransaction(
const char* table_name)
1073 Exec(table_name,
"COMMIT");
1077int Mysql::RollbackTransaction(
const char* table_name)
1079 Exec(table_name,
"ROLLBACK");
1083int Mysql::ListTables(std::vector<std::string> *plist)
1089 printf(
"Mysql::ListTables!\n");
1093 fResult = mysql_list_tables(fMysql, NULL);
1095 if (fResult == NULL) {
1096 cm_msg(
MERROR,
"Mysql::ListTables",
"mysql_list_tables() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1100 fNumFields = mysql_num_fields(fResult);
1106 std::string tn = GetText(0);
1107 plist->push_back(tn);
1115int Mysql::ListColumns(
const char* table_name, std::vector<std::string> *plist)
1121 printf(
"Mysql::ListColumns for table \'%s\'\n", table_name);
1126 cmd +=
"SHOW COLUMNS FROM ";
1127 cmd += QuoteId(table_name);
1130 status = Prepare(table_name, cmd.c_str());
1134 fNumFields = mysql_num_fields(fResult);
1140 std::string cn = GetText(0);
1141 std::string ct = GetText(1);
1142 plist->push_back(cn);
1143 plist->push_back(ct);
1154int Mysql::Exec(
const char* table_name,
const char* sql)
1157 printf(
"Mysql::Exec(%s, %s)\n", table_name, sql);
1169 assert(fResult == NULL);
1170 assert(fRow == NULL);
1172 if (mysql_query(fMysql, sql)) {
1173 if (mysql_errno(fMysql) == 1050) {
1176 if (mysql_errno(fMysql) == 1146) {
1179 cm_msg(
MERROR,
"Mysql::Exec",
"mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1180 if (mysql_errno(fMysql) == 1060)
1182 if (mysql_errno(fMysql) == 2006) {
1184 return ExecDisconnected(table_name, sql);
1192int Mysql::ExecDisconnected(
const char* table_name,
const char* sql)
1195 printf(
"Mysql::ExecDisconnected(%s, %s)\n", table_name, sql);
1197 if (fDisconnectedBuffer.size() < fMaxDisconnected) {
1198 fDisconnectedBuffer.push_back(sql);
1199 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1200 cm_msg(
MERROR,
"Mysql::ExecDisconnected",
"Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1203 fDisconnectedLost++;
1206 time_t now = time(NULL);
1208 if (fNextReconnect == 0 || now >= fNextReconnect) {
1209 int status = Connect(fConnectString.c_str());
1212 fNextReconnectDelaySec = 0;
1214 if (fNextReconnectDelaySec == 0) {
1215 fNextReconnectDelaySec = 5;
1216 }
else if (fNextReconnectDelaySec < 10*60) {
1217 fNextReconnectDelaySec *= 2;
1220 cm_msg(
MINFO,
"Mysql::ExecDisconnected",
"Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1223 fNextReconnect = now + fNextReconnectDelaySec;
1230int Mysql::Prepare(
const char* table_name,
const char* sql)
1233 printf(
"Mysql::Prepare(%s, %s)\n", table_name, sql);
1239 assert(fResult == NULL);
1240 assert(fRow == NULL);
1248 int status = mysql_query(fMysql, sql);
1250 if (mysql_errno(fMysql) == 2006 || mysql_errno(fMysql) == 2013) {
1252 status = Connect(fConnectString.c_str());
1255 status = mysql_query(fMysql, sql);
1257 cm_msg(
MERROR,
"Mysql::Prepare",
"mysql_query(%s) - MySQL server has gone away, and couldn't reconnect - %d", sql,
status);
1262 cm_msg(
MERROR,
"Mysql::Prepare",
"mysql_query(%s) error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1265 cm_msg(
MINFO,
"Mysql::Prepare",
"Reconnected to MySQL after long inactivity.");
1268 fResult = mysql_store_result(fMysql);
1272 cm_msg(
MERROR,
"Mysql::Prepare",
"mysql_store_result(%s) returned NULL, error %d (%s)", sql, mysql_errno(fMysql), mysql_error(fMysql));
1276 fNumFields = mysql_num_fields(fResult);
1286 printf(
"Mysql::Step()\n");
1291 fRow = mysql_fetch_row(fResult);
1296 if (mysql_errno(fMysql) == 0)
1299 cm_msg(
MERROR,
"Mysql::Step",
"mysql_fetch_row() error %d (%s)", mysql_errno(fMysql), mysql_error(fMysql));
1304const char* Mysql::GetText(
int column)
1309 assert(fNumFields > 0);
1310 assert(column >= 0);
1311 assert(column < fNumFields);
1312 if (fRow[column] == NULL)
1314 return fRow[column];
1317double Mysql::GetDouble(
int column)
1319 return atof(GetText(column));
1322time_t Mysql::GetTime(
int column)
1324 return strtoul(GetText(column), NULL, 0);
1327int Mysql::Finalize()
1332 mysql_free_result(fResult);
1340const char* Mysql::ColumnType(
int midas_tid)
1342 assert(midas_tid>=0);
1347bool Mysql::TypesCompatible(
int midas_tid,
const char*
sql_type)
1350 printf(
"compare types midas \'%s\'=\'%s\' and sql \'%s\'\n",
rpc_tid_name(midas_tid), ColumnType(midas_tid),
sql_type);
1355 if (strcasecmp(ColumnType(midas_tid),
sql_type) == 0)
1377 printf(
"type mismatch!\n");
1382std::string Mysql::QuoteId(
const char* s)
1391std::string Mysql::QuoteString(
const char* s)
1397 while (
int c = *s++) {
1402 }
else if (isprint(
c)) {
1406 sprintf(buf,
"\\\\x%02x",
c&0xFF);
1425#include <libpq-fe.h>
1430 std::string fConnectString;
1431 int fDownsample = 0;
1432 PGconn* fPgsql = NULL;
1435 PGresult *fResult = NULL;
1440 size_t fMaxDisconnected = 0;
1441 std::list<std::string> fDisconnectedBuffer;
1442 time_t fNextReconnect = 0;
1443 int fNextReconnectDelaySec = 0;
1444 int fDisconnectedLost = 0;
1449 int Connect(
const char* path);
1453 int ConnectTable(
const char* table_name);
1455 int ListTables(std::vector<std::string> *plist);
1456 int ListColumns(
const char* table_name, std::vector<std::string> *plist);
1458 int Exec(
const char* table_name,
const char* sql);
1461 int Prepare(
const char* table_name,
const char* sql);
1462 std::string BuildDownsampleQuery(
const time_t start_time,
const time_t end_time,
const int npoints,
const char* table_name,
const char* column_name);
1464 const char*
GetText(
int column);
1476 std::string
QuoteId(
const char* s);
1487 fMaxDisconnected = 1000;
1489 fNextReconnectDelaySec = 0;
1490 fDisconnectedLost = 0;
1491 fTransactionPerTable =
false;
1501 if (fDisconnectedBuffer.size() > 0) {
1502 cm_msg(
MINFO,
"Pgsql::~Pgsql",
"Lost %zu history entries accumulated while disconnected from the database", fDisconnectedBuffer.size());
1507int Pgsql::Connect(
const char* connect_string)
1512 fConnectString = connect_string;
1515 cm_msg(
MINFO,
"Pgsql::Connect",
"Connecting to PostgreSQL database specified by \'%s\'", connect_string);
1520 std::string user_name;
1521 std::string user_password;
1522 std::string db_name;
1523 std::string tcp_port;
1524 std::string unix_socket;
1527 FILE*
fp = fopen(connect_string,
"r");
1529 cm_msg(
MERROR,
"Pgsql::Connect",
"Cannot read PostgreSQL connection parameters from \'%s\', fopen() error %d (%s)", connect_string, errno, strerror(errno));
1535 char* s = fgets(buf,
sizeof(buf),
fp);
1541 ss = strchr(s,
'\n');
1543 ss = strchr(s,
'\r');
1548 if (strncasecmp(s,
"server=", 7)==0)
1550 if (strncasecmp(s,
"port=", 5)==0)
1552 if (strncasecmp(s,
"database=", 9)==0)
1554 if (strncasecmp(s,
"socket=", 7)==0)
1556 if (strncasecmp(s,
"user=", 5)==0)
1558 if (strncasecmp(s,
"password=", 9)==0)
1560 if (strncasecmp(s,
"buffer=", 7)==0)
1566 int buffer_int = atoi(buffer.c_str());
1568 if (buffer_int > 0 && buffer_int < 1000000)
1569 fMaxDisconnected = buffer_int;
1572 printf(
"Pgsql::Connect: connecting to server [%s] port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer [%zu]\n",
host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(), user_password.c_str(), fMaxDisconnected);
1574 fPgsql = PQsetdbLogin(
host_name.c_str(), tcp_port.c_str(), NULL, NULL, db_name.c_str(), user_name.c_str(), user_password.c_str());
1575 if (PQstatus(fPgsql) != CONNECTION_OK) {
1576 std::string msg(PQerrorMessage(fPgsql));
1577 msg.erase(std::remove(msg.begin(), msg.end(),
'\n'), msg.end());
1578 cm_msg(
MERROR,
"Pgsql::Connect",
"PQsetdbLogin() to host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s]: error (%s)",
host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(),
"xxx", msg.c_str());
1586 cm_msg(
MINFO,
"Pgsql::Connect",
"Connected to a PostgreSQL database on host [%s], port %s, unix socket [%s], database [%s], user [%s], password [%s], buffer %zu",
host_name.c_str(), tcp_port.c_str(), unix_socket.c_str(), db_name.c_str(), user_name.c_str(),
"xxx", fMaxDisconnected);
1590 fIsConnected =
true;
1593 while (fDisconnectedBuffer.size() > 0) {
1594 status = Exec(
"(flush)", fDisconnectedBuffer.front().c_str());
1598 fDisconnectedBuffer.pop_front();
1603 cm_msg(
MINFO,
"Pgsql::Connect",
"Saved %d, lost %d history events accumulated while disconnected from the database",
count, fDisconnectedLost);
1607 assert(fDisconnectedBuffer.size() == 0);
1608 fDisconnectedLost = 0;
1611 status = Prepare(
"pg_extensions",
"select extname from pg_extension where extname = 'timescaledb';");
1614 cm_msg(
MERROR,
"Pgsql::Connect",
"TimescaleDB extension not installed");
1619 status = Prepare(
"pg_extensions",
"select extname from pg_extension where extname = 'timescaledb_toolkit';");
1622 cm_msg(
MERROR,
"Pgsql::Connect",
"TimescaleDB_toolkit extension not installed");
1627 cm_msg(
MINFO,
"Pgsql::Connect",
"TimescaleDB extensions found - downsampling enabled");
1633int Pgsql::Disconnect()
1641 fIsConnected =
false;
1645bool Pgsql::IsConnected()
1647 return fIsConnected;
1650int Pgsql::OpenTransaction(
const char* table_name)
1652 return Exec(table_name,
"BEGIN TRANSACTION;");
1655int Pgsql::CommitTransaction(
const char* table_name)
1657 return Exec(table_name,
"COMMIT;");
1660int Pgsql::RollbackTransaction(
const char* table_name)
1662 return Exec(table_name,
"ROLLBACK;");
1665int Pgsql::ListTables(std::vector<std::string> *plist)
1671 printf(
"Pgsql::ListTables!\n");
1673 int status = Prepare(
"pg_tables",
"select tablename from pg_tables where schemaname = 'public';");
1676 cm_msg(
MERROR,
"Pgsql::ListTables",
"error %s (%s)", PQresStatus(PQresultStatus(fResult)), PQresultErrorMessage(fResult));
1683 std::string tn = GetText(0);
1684 plist->push_back(tn);
1692int Pgsql::ListColumns(
const char* table_name, std::vector<std::string> *plist)
1698 printf(
"Pgsql::ListColumns for table \'%s\'\n", table_name);
1701 cmd +=
"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ";
1702 cmd += QuoteString(table_name);
1705 int status = Prepare(table_name, cmd.c_str());
1709 fNumFields = PQnfields(fResult);
1714 std::string cn = GetText(0);
1715 std::string ct = GetText(1);
1716 plist->push_back(cn);
1717 plist->push_back(ct);
1725int Pgsql::Exec(
const char* table_name,
const char* sql)
1728 printf(
"Pgsql::Exec(%s, %s)\n", table_name, sql);
1736 fResult = PQexec(fPgsql, sql);
1737 ExecStatusType err = PQresultStatus(fResult);
1738 if(err != PGRES_TUPLES_OK) {
1739 if(err == PGRES_FATAL_ERROR) {
1741 if(strstr(PQresultErrorMessage(fResult),
"already exists"))
1746 if(PQstatus(fPgsql) == CONNECTION_BAD) {
1748 return ExecDisconnected(table_name, sql);
1755int Pgsql::ExecDisconnected(
const char* table_name,
const char* sql)
1758 printf(
"Pgsql::ExecDisconnected(%s, %s)\n", table_name, sql);
1760 if (fDisconnectedBuffer.size() < fMaxDisconnected) {
1761 fDisconnectedBuffer.push_back(sql);
1762 if (fDisconnectedBuffer.size() >= fMaxDisconnected) {
1763 cm_msg(
MERROR,
"Pgsql::ExecDisconnected",
"Error: Disconnected database buffer overflow, size %zu, subsequent events are lost", fDisconnectedBuffer.size());
1766 fDisconnectedLost++;
1769 time_t now = time(NULL);
1771 if (fNextReconnect == 0 || now >= fNextReconnect) {
1772 int status = Connect(fConnectString.c_str());
1775 fNextReconnectDelaySec = 0;
1777 if (fNextReconnectDelaySec == 0) {
1778 fNextReconnectDelaySec = 5;
1779 }
else if (fNextReconnectDelaySec < 10*60) {
1780 fNextReconnectDelaySec *= 2;
1783 cm_msg(
MINFO,
"Pgsql::ExecDisconnected",
"Next reconnect attempt in %d sec, history events buffered %zu, lost %d", fNextReconnectDelaySec, fDisconnectedBuffer.size(), fDisconnectedLost);
1786 fNextReconnect = now + fNextReconnectDelaySec;
1793int Pgsql::Prepare(
const char* table_name,
const char* sql)
1796 printf(
"Pgsql::Prepare(%s, %s)\n", table_name, sql);
1805 fResult = PQexec(fPgsql, sql);
1806 if (PQstatus(fPgsql) == CONNECTION_BAD) {
1808 int status = Connect(fConnectString.c_str());
1811 fResult = PQexec(fPgsql, sql);
1813 cm_msg(
MERROR,
"Pgsql::Prepare",
"PQexec(%s) PostgreSQL server has gone away, and couldn't reconnect - %d", sql,
status);
1817 cm_msg(
MERROR,
"Pgsql::Prepare",
"PQexec(%s) error %s", sql, PQresStatus(PQresultStatus(fResult)));
1820 cm_msg(
MINFO,
"Pgsql::Prepare",
"Reconnected to PostgreSQL after long inactivity.");
1823 fNumFields = PQnfields(fResult);
1828std::string Pgsql::BuildDownsampleQuery(
const time_t start_time,
const time_t end_time,
const int npoints,
1829 const char* table_name,
const char* column_name)
1832 cmd +=
"SELECT extract(epoch from time::TIMESTAMPTZ) as _i_time, value ";
1834 cmd +=
" FROM unnest(( SELECT lttb";
1835 cmd +=
"(_t_time, ";
1838 cmd += std::to_string(npoints);
1841 cmd += QuoteId(table_name);
1842 cmd +=
" WHERE _t_time BETWEEN ";
1843 cmd +=
"to_timestamp(";
1845 cmd +=
") AND to_timestamp(";
1847 cmd +=
") )) ORDER BY time;";
1859 if (fRow == PQntuples(fResult))
1865const char* Pgsql::GetText(
int column)
1869 assert(fNumFields > 0);
1870 assert(column >= 0);
1871 assert(column < fNumFields);
1873 return PQgetvalue(fResult, fRow, column);
1876double Pgsql::GetDouble(
int column)
1878 return atof(GetText(column));
1881time_t Pgsql::GetTime(
int column)
1883 return strtoul(GetText(column), NULL, 0);
1886int Pgsql::Finalize()
1897const char* Pgsql::ColumnType(
int midas_tid)
1899 assert(midas_tid>=0);
1901 return sql_type_pgsql[midas_tid];
1904bool Pgsql::TypesCompatible(
int midas_tid,
const char*
sql_type)
1907 printf(
"compare types midas \'%s\'=\'%s\' and sql \'%s\'\n",
rpc_tid_name(midas_tid), ColumnType(midas_tid),
sql_type);
1912 if (strcasecmp(ColumnType(midas_tid),
sql_type) == 0)
1930 printf(
"type mismatch!\n");
1935std::string Pgsql::QuoteId(
const char* s)
1944std::string Pgsql::QuoteString(
const char* s)
1963typedef std::map<std::string, sqlite3*> DbMap;
1974 sqlite3_stmt* fTempStmt;
1979 int Connect(
const char* path);
1983 int ConnectTable(
const char* table_name);
1984 sqlite3* GetTable(
const char* table_name);
1986 int ListTables(std::vector<std::string> *plist);
1987 int ListColumns(
const char* table_name, std::vector<std::string> *plist);
1989 int Exec(
const char* table_name,
const char* sql);
1992 int Prepare(
const char* table_name,
const char* sql);
1994 const char*
GetText(
int column);
2006 std::string
QuoteId(
const char* s);
2010std::string Sqlite::QuoteId(
const char* s)
2019std::string Sqlite::QuoteString(
const char* s)
2028const char* Sqlite::ColumnType(
int midas_tid)
2030 assert(midas_tid>=0);
2032 return sql_type_sqlite[midas_tid];
2035bool Sqlite::TypesCompatible(
int midas_tid,
const char*
sql_type)
2038 printf(
"compare types midas \'%s\'=\'%s\' and sql \'%s\'\n",
rpc_tid_name(midas_tid), ColumnType(midas_tid),
sql_type);
2043 if (strcasecmp(ColumnType(midas_tid),
sql_type) == 0)
2053const char* Sqlite::GetText(
int column)
2055 return (
const char*)sqlite3_column_text(fTempStmt, column);
2058time_t Sqlite::GetTime(
int column)
2060 return sqlite3_column_int64(fTempStmt, column);
2063double Sqlite::GetDouble(
int column)
2065 return sqlite3_column_double(fTempStmt, column);
2070 fIsConnected =
false;
2081const char* xsqlite3_errstr(sqlite3* db,
int errcode)
2084 return sqlite3_errmsg(db);
2087int Sqlite::ConnectTable(
const char* table_name)
2089 std::string fname = fPath +
"mh_" + table_name +
".sqlite3";
2093 int status = sqlite3_open(fname.c_str(), &db);
2095 if (
status != SQLITE_OK) {
2096 cm_msg(
MERROR,
"Sqlite::Connect",
"Table %s: sqlite3_open(%s) error %d (%s)", table_name, fname.c_str(),
status, xsqlite3_errstr(db,
status));
2102#if SQLITE_VERSION_NUMBER >= 3006020
2103 status = sqlite3_extended_result_codes(db, 1);
2104 if (
status != SQLITE_OK) {
2105 cm_msg(
MERROR,
"Sqlite::Connect",
"Table %s: sqlite3_extended_result_codes(1) error %d (%s)", table_name,
status, xsqlite3_errstr(db,
status));
2108#warning Missing sqlite3_extended_result_codes()!
2111 fMap[table_name] = db;
2113 Exec(table_name,
"PRAGMA journal_mode=persist;");
2114 Exec(table_name,
"PRAGMA synchronous=normal;");
2116 Exec(table_name,
"PRAGMA journal_size_limit=-1;");
2119 Exec(table_name,
"PRAGMA legacy_file_format;");
2120 Exec(table_name,
"PRAGMA synchronous;");
2121 Exec(table_name,
"PRAGMA journal_mode;");
2122 Exec(table_name,
"PRAGMA journal_size_limit;");
2125#ifdef SQLITE_LIMIT_COLUMN
2127 int max_columns = sqlite3_limit(db, SQLITE_LIMIT_COLUMN, -1);
2128 printf(
"Sqlite::Connect: SQLITE_LIMIT_COLUMN=%d\n", max_columns);
2133 cm_msg(
MINFO,
"Sqlite::Connect",
"Table %s: connected to Sqlite file \'%s\'", table_name, fname.c_str());
2138sqlite3* Sqlite::GetTable(
const char* table_name)
2140 sqlite3* db = fMap[table_name];
2145 int status = ConnectTable(table_name);
2149 return fMap[table_name];
2152int Sqlite::Connect(
const char* path)
2160 if (fPath.length() > 0) {
2166 cm_msg(
MINFO,
"Sqlite::Connect",
"Connected to Sqlite database in \'%s\'", fPath.c_str());
2168 fIsConnected =
true;
2173int Sqlite::Disconnect()
2178 for (DbMap::iterator iter = fMap.begin(); iter != fMap.end(); ++iter) {
2179 const char* table_name = iter->first.c_str();
2180 sqlite3* db = iter->second;
2181 int status = sqlite3_close(db);
2182 if (
status != SQLITE_OK) {
2183 cm_msg(
MERROR,
"Sqlite::Disconnect",
"sqlite3_close(%s) error %d (%s)", table_name,
status, xsqlite3_errstr(db,
status));
2189 fIsConnected =
false;
2194bool Sqlite::IsConnected()
2196 return fIsConnected;
2199int Sqlite::OpenTransaction(
const char* table_name)
2201 int status = Exec(table_name,
"BEGIN TRANSACTION");
2205int Sqlite::CommitTransaction(
const char* table_name)
2207 int status = Exec(table_name,
"COMMIT TRANSACTION");
2211int Sqlite::RollbackTransaction(
const char* table_name)
2213 int status = Exec(table_name,
"ROLLBACK TRANSACTION");
2217int Sqlite::Prepare(
const char* table_name,
const char* sql)
2219 sqlite3* db = GetTable(table_name);
2224 printf(
"Sqlite::Prepare(%s, %s)\n", table_name, sql);
2226 assert(fTempDB==NULL);
2229#if SQLITE_VERSION_NUMBER >= 3006020
2230 int status = sqlite3_prepare_v2(db, sql, strlen(sql), &fTempStmt, NULL);
2232#warning Missing sqlite3_prepare_v2()!
2233 int status = sqlite3_prepare(db, sql, strlen(sql), &fTempStmt, NULL);
2239 std::string sqlstring = sql;
2240 cm_msg(
MERROR,
"Sqlite::Prepare",
"Table %s: sqlite3_prepare_v2(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(),
status, xsqlite3_errstr(db,
status));
2250 printf(
"Sqlite::Step()\n");
2255 int status = sqlite3_step(fTempStmt);
2257 if (
status == SQLITE_DONE)
2260 if (
status == SQLITE_ROW)
2268int Sqlite::Finalize()
2271 printf(
"Sqlite::Finalize()\n");
2276 int status = sqlite3_finalize(fTempStmt);
2278 if (
status != SQLITE_OK) {
2292int Sqlite::ListTables(std::vector<std::string> *plist)
2298 printf(
"Sqlite::ListTables at path [%s]\n", fPath.c_str());
2302 const char* cmd =
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
2304 DIR *dir = opendir(fPath.c_str());
2306 cm_msg(
MERROR,
"Sqlite::ListTables",
"Cannot opendir(%s), errno %d (%s)", fPath.c_str(), errno, strerror(errno));
2311 const struct dirent*
de = readdir(dir);
2315 const char* dn =
de->d_name;
2322 s = strstr(dn,
"mh_");
2326 s = strstr(dn,
".sqlite3");
2330 char table_name[256];
2331 mstrlcpy(table_name, dn+3,
sizeof(table_name));
2333 char* ss = strstr(table_name,
".sqlite3");
2340 status = Prepare(table_name, cmd);
2349 const char* tn = GetText(0);
2351 plist->push_back(tn);
2363int Sqlite::ListColumns(
const char* table, std::vector<std::string> *plist)
2369 printf(
"Sqlite::ListColumns for table \'%s\'\n", table);
2372 cmd =
"PRAGMA table_info(";
2378 status = Prepare(table, cmd.c_str());
2387 const char* colname = GetText(1);
2388 const char* coltype = GetText(2);
2390 plist->push_back(colname);
2391 plist->push_back(coltype);
2399static int callback_debug = 0;
2401static int callback(
void *NotUsed,
int argc,
char **argv,
char **azColName){
2402 if (callback_debug) {
2403 printf(
"history_sqlite::callback---->\n");
2404 for (
int i=0;
i<argc;
i++){
2405 printf(
"history_sqlite::callback[%d] %s = %s\n",
i, azColName[
i], argv[
i] ? argv[
i] :
"NULL");
2411int Sqlite::Exec(
const char* table_name,
const char* sql)
2421 sqlite3* db = GetTable(table_name);
2426 printf(
"Sqlite::Exec(%s, %s)\n", table_name, sql);
2430 callback_debug = fDebug;
2431 char* errmsg = NULL;
2434 if (
status != SQLITE_OK) {
2435 if (
status == SQLITE_ERROR && strstr(errmsg,
"duplicate column name"))
2437 if (
status == SQLITE_ERROR && strstr(errmsg,
"already exists"))
2439 std::string sqlstring = sql;
2440 cm_msg(
MERROR,
"Sqlite::Exec",
"Table %s: sqlite3_exec(%s...) error %d (%s)", table_name, sqlstring.substr(0,60).c_str(),
status, errmsg);
2441 sqlite3_free(errmsg);
2448int Sqlite::ExecDisconnected(
const char* table_name,
const char* sql)
2450 cm_msg(
MERROR,
"Sqlite::Exec",
"sqlite driver does not support disconnected operations");
2469 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot write to \'%s\', open() errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2483 off64_t file_size = ::lseek64(s->
fWriterFd, 0, SEEK_END);
2485 if (file_size < 0) {
2486 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2503 if (data_end != file_size) {
2505 cm_msg(
MERROR,
"FileHistory::write_event",
"File \'%s\' may be truncated, data offset %jd, record size %zu, file size: %jd, should be %jd, making it so", s->
fFileName.c_str(), (intmax_t)s->
fDataOffset, s->
fRecordSize, (intmax_t)file_size, (intmax_t)data_end);
2507 off64_t status64 = ::lseek64(s->
fWriterFd, data_end, SEEK_SET);
2510 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot seek \'%s\' to offset %jd, lseek64() errno %d (%s)", s->
fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2517 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot truncate \'%s\' to size %jd, ftruncate64() errno %d (%s)", s->
fFileName.c_str(), (intmax_t)data_end, errno, strerror(errno));
2535 cm_msg(
MERROR,
"FileHistory::write_event",
"Event \'%s\' data size mismatch, expected %zu bytes, got %zu bytes, previously %zu bytes", s->
fEventName.c_str(), expected_size, data_size, s->
fLastSize);
2539 if (data_size < expected_size)
2547 size_t size = 4 + expected_size;
2559 if ((
size_t)wr != size) {
2560 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot write to \'%s\', write(%zu) returned %zd, errno %d (%s)", s->
fFileName.c_str(), size, wr, errno, strerror(errno));
2569 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot write to \'%s\', write(timestamp) errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2574 if (
status != expected_size) {
2575 cm_msg(
MERROR,
"FileHistory::write_event",
"Cannot write to \'%s\', write(%d) errno %d (%s)", s->
fFileName.c_str(), data_size, errno, strerror(errno));
2594 off64_t fpos =
offset + irec*recsize;
2596 off64_t status64 = ::lseek64(fd, fpos, SEEK_SET);
2599 cm_msg(
MERROR,
"FileHistory::ReadRecord",
"Cannot read \'%s\', lseek64(%jd) errno %d (%s)",
file_name, (intmax_t)fpos, errno, strerror(errno));
2603 ssize_t rd =
::read(fd, rec, recsize);
2606 cm_msg(
MERROR,
"FileHistory::ReadRecord",
"Cannot read \'%s\', read() errno %d (%s)",
file_name, errno, strerror(errno));
2611 cm_msg(
MERROR,
"FileHistory::ReadRecord",
"Cannot read \'%s\', unexpected end of file on read()",
file_name);
2615 if ((
size_t)rd != recsize) {
2616 cm_msg(
MERROR,
"FileHistory::ReadRecord",
"Cannot read \'%s\', short read() returned %zd instead of %zu bytes",
file_name, rd, recsize);
2623static int FindTime(
const char*
file_name,
int fd, off64_t
offset,
size_t recsize, off64_t nrec, time_t timestamp, off64_t* i1p, time_t* t1p, off64_t* i2p, time_t* t2p, time_t* tstart, time_t* tend,
int debug)
2644 char* buf =
new char[recsize];
2649 off64_t rec2 = nrec-1;
2657 time_t t1 = *(
DWORD*)buf;
2662 if (timestamp <= t1) {
2672 assert(t1 < timestamp);
2690 time_t t2 = *(
DWORD*)buf;
2695 if (t2 < timestamp) {
2704 assert(t1 < timestamp);
2705 assert(timestamp <= t2);
2708 printf(
"FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s\n", (intmax_t)rec1, (intmax_t)rec2,
TimeToString(t1).c_str(),
TimeToString(timestamp).c_str(),
TimeToString(t2).c_str());
2713 off64_t rec = (rec1+rec2)/2;
2724 time_t t = *(
DWORD*)buf;
2726 if (timestamp <= t) {
2739 }
while (rec2 - rec1 > 1);
2741 assert(rec1+1 == rec2);
2742 assert(t1 < timestamp);
2743 assert(timestamp <= t2);
2746 printf(
"FindTime: rec %jd..(x)..%jd, time %s..(%s)..%s, this is the result.\n", (intmax_t)rec1, (intmax_t)rec2,
TimeToString(t1).c_str(),
TimeToString(timestamp).c_str(),
TimeToString(t2).c_str());
2760 time_t* last_written)
2768 int fd = open(s->
fFileName.c_str(), O_RDONLY);
2770 cm_msg(
MERROR,
"FileHistory::read_last_written",
"Cannot read \'%s\', open() errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2774 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2776 if (file_size < 0) {
2777 cm_msg(
MERROR,
"FileHistory::read_last_written",
"Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2822 if (lw >= timestamp) {
2825 off64_t iunused = 0;
2830 status =
FindTime(s->
fFileName.c_str(), fd, s->
fDataOffset, s->
fRecordSize, nrec, timestamp, &irec, &trec, &iunused, &tunused, &tstart, &tend, 0*
debug);
2836 assert(trec < timestamp);
2847 assert(lw < timestamp);
2855 const time_t end_time,
2856 const int num_var,
const std::vector<int>& var_schema_index,
const int var_index[],
2875 printf(
"FileHistory::read_data: file %s map", s->
fFileName.c_str());
2876 for (
size_t i=0;
i<var_schema_index.size();
i++) {
2877 printf(
" %2d", var_schema_index[
i]);
2882 int fd = ::open(s->
fFileName.c_str(), O_RDONLY);
2884 cm_msg(
MERROR,
"FileHistory::read_data",
"Cannot read \'%s\', open() errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2888 off64_t file_size = ::lseek64(fd, 0, SEEK_END);
2890 if (file_size < 0) {
2891 cm_msg(
MERROR,
"FileHistory::read_data",
"Cannot read file size of \'%s\', lseek64(SEEK_END) errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
2914 off64_t iunused = 0;
2921 int istatus =
FindTime(s->
fFileName.c_str(), fd, s->
fDataOffset, s->
fRecordSize, nrec, start_time, &iunused, &tunused, &irec, &trec, &tstart, &tend, 0*
debug);
2929 printf(
"FindTime %d, nrec %jd, (%jd, %s) (%jd, %s), tstart %s, tend %s, want %s\n", istatus, (intmax_t)nrec, (intmax_t)iunused,
TimeToString(tunused).c_str(), (intmax_t)irec,
TimeToString(trec).c_str(),
TimeToString(tstart).c_str(),
TimeToString(tend).c_str(),
TimeToString(start_time).c_str());
2932 if (irec < 0 || irec >= nrec) {
2938 printf(
"FileHistory::read: file %s, schema time %s..%s, read time %s..%s, file time %s..%s, data in this file is too old\n", s->
fFileName.c_str(),
TimeToString(s->
fTimeFrom).c_str(),
TimeToString(s->
fTimeTo).c_str(),
TimeToString(start_time).c_str(),
TimeToString(end_time).c_str(),
TimeToString(tstart).c_str(),
TimeToString(tend).c_str());
2963 for (
int i=0;
i<num_var;
i++) {
2964 int si = var_schema_index[
i];
2981 off64_t xpos = ::lseek64(fd, fpos, SEEK_SET);
2986 cm_msg(
MERROR,
"FileHistory::read_data",
"Cannot read \'%s\', lseek64(%jd) errno %d (%s)", s->
fFileName.c_str(), (intmax_t)fpos, errno, strerror(errno));
2993 off64_t prec = irec;
2999 cm_msg(
MERROR,
"FileHistory::read_data",
"Cannot read \'%s\', read() errno %d (%s)", s->
fFileName.c_str(), errno, strerror(errno));
3015 bool past_end_of_last_file = (s->
fTimeTo == 0) && (prec > nrec);
3017 time_t t = *(
DWORD*)buf;
3029 if (tend && (t > tend) && !past_end_of_last_file) {
3039 char*
data = buf + 4;
3041 for (
int i=0;
i<num_var;
i++) {
3042 int si = var_schema_index[
i];
3058 int ii = var_index[
i];
3068 v = ((
unsigned char*)ptr)[ii];
3071 v = ((
signed char *)ptr)[ii];
3074 v = ((
char*)ptr)[ii];
3077 v = ((
unsigned short *)ptr)[ii];
3080 v = ((
signed short *)ptr)[ii];
3083 v = ((
unsigned int *)ptr)[ii];
3086 v = ((
int *)ptr)[ii];
3089 v = ((
unsigned int *)ptr)[ii];
3092 v = ((
float*)ptr)[ii];
3095 v = ((
double*)ptr)[ii];
3099 buffer[
i]->
Add(t, v);
3110 printf(
"FileHistory::read_data: file %s map", s->
fFileName.c_str());
3111 for (
size_t i=0;
i<var_schema_index.size();
i++) {
3112 printf(
" %2d", var_schema_index[
i]);
3114 printf(
" read %d rows\n",
count);
3118 printf(
"FileHistory::read: file %s, schema time %s..%s, read time %s..%s, %d vars, read %d rows\n", s->
fFileName.c_str(),
TimeToString(s->
fTimeFrom).c_str(),
TimeToString(s->
fTimeTo).c_str(),
TimeToString(start_time).c_str(),
TimeToString(end_time).c_str(), num_var,
count);
3187 int hs_define_event(
const char* event_name, time_t timestamp,
int ntags,
const TAG tags[]);
3188 int hs_write_event(
const char* event_name, time_t timestamp,
int buffer_size,
const char* buffer);
3196 int hs_get_events(time_t t, std::vector<std::string> *pevents);
3197 int hs_get_tags(
const char* event_name, time_t t, std::vector<TAG> *ptags);
3198 int hs_get_last_written(time_t timestamp,
int num_var,
const char*
const event_name[],
const char*
const var_name[],
const int var_index[], time_t last_written[]);
3200 int num_var,
const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
3249 if (newalloc <= 1000)
3250 newalloc = wantalloc + 1000;
3277 (*fTimeBuffer)[pos] = t;
3278 (*fDataBuffer)[pos] = v;
3280 (*fNumEntries) = pos + 1;
3294 int hs_read(time_t start_time, time_t end_time, time_t interval,
3296 const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
3298 time_t* time_buffer[],
double* data_buffer[],
3303 int hs_read_binned(time_t start_time, time_t end_time,
int num_bins,
3304 int num_var,
const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
3306 int* count_bins[],
double* mean_bins[],
double* rms_bins[],
double* min_bins[],
double* max_bins[],
3307 time_t* bins_first_time[],
double* bins_first_value[],
3308 time_t* bins_last_time[],
double* bins_last_value[],
3309 time_t
last_time[],
double last_value[],
3321 fSum0 =
new double[num_bins];
3322 fSum1 =
new double[num_bins];
3323 fSum2 =
new double[num_bins];
3325 for (
int i=0;
i<num_bins;
i++) {
3353 for (
int ibin = 0; ibin <
fNumBins; ibin++) {
3386 int ibin = (int)fbin;
3393 if (
fSum0[ibin] == 0) {
3443 double variance = 0;
3446 variance =
fSum2[
i]/num-mean*mean;
3450 rms = sqrt(variance);
3473 printf(
"hs_define_event: event name [%s] with %d tags\n", event_name, ntags);
3483 printf(
"deleting exising event %s\n", event_name);
3490 for (
int i=0;
i<ntags;
i++) {
3491 if (strlen(tags[
i].
name) < 1) {
3492 cm_msg(
MERROR,
"hs_define_event",
"Error: History event \'%s\' has empty name at index %d", event_name,
i);
3495 if (tags[
i].
name[0] ==
' ') {
3496 cm_msg(
MERROR,
"hs_define_event",
"Error: History event \'%s\' has name \'%s\' starting with a blank", event_name, tags[
i].
name);
3500 cm_msg(
MERROR,
"hs_define_event",
"Error: History event \'%s\' tag \'%s\' at index %d has invalid type %d",
3506 "Error: History event \'%s\' tag \'%s\' at index %d has forbidden type TID_STRING", event_name,
3510 if (tags[
i].n_data <= 0) {
3511 cm_msg(
MERROR,
"hs_define_event",
"Error: History event \'%s\' tag \'%s\' at index %d has invalid n_data %d",
3512 event_name, tags[
i].
name,
i, tags[
i].n_data);
3518 std::vector<std::string> names;
3519 for (
int i=0;
i<ntags;
i++) {
3521 std::transform(
str.begin(),
str.end(),
str.begin(), ::toupper);
3522 names.push_back(
str);
3524 std::sort(names.begin(), names.end());
3525 for (
int i=0;
i<ntags-1;
i++) {
3526 if (names[
i] == names[
i + 1]) {
3528 "Error: History event \'%s\' has duplicate tag name \'%s\'", event_name,
3560 printf(
"hs_write_event: write event \'%s\', time %d, size %d\n", event_name, (
int)timestamp, xbuffer_size);
3562 assert(xbuffer_size > 0);
3564 size_t buffer_size = xbuffer_size;
3602 cm_msg(
MERROR,
"hs_write_event",
"Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->
fEventName.c_str(), s->
fNumBytes, buffer_size);
3608 }
else if (buffer_size < s->fNumBytes) {
3612 cm_msg(
MERROR,
"hs_write_event",
"Event \'%s\' data size mismatch: expected %zu bytes, got %zu bytes", s->
fEventName.c_str(), s->
fNumBytes, buffer_size);
3617 else if (buffer_size < s->fWriteMinSize)
3619 char* tmp = (
char*)malloc(s->
fNumBytes);
3620 memcpy(tmp, buffer, buffer_size);
3621 memset(tmp + buffer_size, 0, s->
fNumBytes - buffer_size);
3632 cm_msg(
MERROR,
"hs_write_event",
"Event \'%s\' disabled after write error %d", event_name,
status);
3644 printf(
"hs_flush_buffers!\n");
3663 printf(
"SchemaHistoryBase::hs_clear_cache!\n");
3674 printf(
"hs_get_events, time %s\n",
TimeToString(t).c_str());
3681 printf(
"hs_get_events: available schema:\n");
3692 for (
size_t j=0;
j<pevents->size();
j++)
3702 std::sort(pevents->begin(), pevents->end());
3705 printf(
"hs_get_events: returning %zu events\n", pevents->size());
3706 for (
size_t i=0;
i<pevents->size();
i++) {
3707 printf(
" %zu: [%s]\n",
i, (*pevents)[
i].c_str());
3717 printf(
"hs_get_tags: event [%s], time %s\n", event_name,
TimeToString(t).c_str());
3725 bool found_event =
false;
3737 const char* tagname = s->
fVariables[
i].name.c_str();
3742 for (
size_t k=0;
k<ptags->size();
k++)
3743 if (strcasecmp((*ptags)[
k].
name, tagname) == 0) {
3750 mstrlcpy(t.
name, tagname,
sizeof(t.
name));
3754 ptags->push_back(t);
3763 printf(
"hs_get_tags: event [%s], returning %zu tags\n", event_name, ptags->size());
3764 for (
size_t i=0;
i<ptags->size();
i++) {
3765 printf(
" tag[%zu]: %s[%d] type %d\n",
i, (*ptags)[
i].
name, (*ptags)[
i].n_data, (*ptags)[
i].
type);
3775 printf(
"hs_get_last_written: timestamp %s, num_var %d\n",
TimeToString(timestamp).c_str(), num_var);
3778 for (
int j=0;
j<num_var;
j++) {
3779 last_written[
j] = 0;
3782 for (
int i=0;
i<num_var;
i++) {
3790 for (
int i=0;
i<num_var;
i++) {
3809 for (
int j=0;
j<num_var;
j++) {
3814 if (lw > last_written[
j])
3815 last_written[
j] = lw;
3822 printf(
"hs_get_last_written: timestamp time %s, num_var %d, result:\n",
TimeToString(timestamp).c_str(), num_var);
3823 for (
int i=0;
i<num_var;
i++) {
3824 printf(
" event [%s] tag [%s] index [%d] last_written %s\n", event_name[
i],
var_name[
i], var_index[
i],
TimeToString(last_written[
i]).c_str());
3832 int num_var,
const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
3837 printf(
"hs_read_buffer: %d variables, start time %s, end time %s\n", num_var,
TimeToString(start_time).c_str(),
TimeToString(end_time).c_str());
3839 for (
int i=0;
i<num_var;
i++) {
3850 for (
int i=0;
i<num_var;
i++) {
3885 cm_msg(
MERROR,
"SchemaHistoryBase::hs_read_buffer",
"History internal error, schema is not ordered by time. Please report this error to the midas forum.");
3892 std::vector<HsSchema*> slist;
3893 std::vector<std::vector<int>> smap;
3904 std::vector<int> sm;
3906 for (
int i=0;
i<num_var;
i++) {
3913 for (
int i=0;
i<num_var;
i++) {
3928 printf(
"Found %zu matching schema:\n", slist.size());
3930 for (
size_t i=0;
i<slist.size();
i++) {
3933 for (
int k=0;
k<num_var;
k++)
3934 printf(
" tag %s[%d] sindex %d\n",
var_name[
k], var_index[
k], smap[
i][
k]);
3949 for (
size_t ss=1; ss<slist.size(); ss++) {
3951 printf(
"Check schema %zu/%zu: prev from %s, this from %s to %s, compare %d\n", ss, slist.size(),
3955 slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom);
3957 if (slist[ss-1]->fTimeFrom >= slist[ss]->fTimeFrom) {
3960 cm_msg(
MERROR,
"SchemaHistoryBase::hs_read_buffer",
"History internal error, selected schema is not ordered by time. Please report this error to the midas forum.");
3967 for (
int i=0;
i<num_var;
i++) {
3971 if (slist.size() > 0) {
3972 for (
size_t i=slist.size()-1; ;
i--) {
3978 for (
int j=0;
j<num_var;
j++) {
3979 if (smap[
i][
j] >= 0)
3994 const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
3996 time_t* time_buffer[],
double* data_buffer[],
4004 for (
int i=0;
i<num_var;
i++) {
4005 buffer[
i] =
new ReadBuffer(start_time, end_time, interval);
4012 time_buffer[
i] = NULL;
4014 data_buffer[
i] = NULL;
4027 num_var, event_name,
var_name, var_index,
4030 for (
int i=0;
i<num_var;
i++) {
4042 int num_var,
const char*
const event_name[],
const char*
const var_name[],
const int var_index[],
4044 int* count_bins[],
double* mean_bins[],
double* rms_bins[],
double* min_bins[],
double* max_bins[],
4045 time_t* bins_first_time[],
double* bins_first_value[],
4046 time_t* bins_last_time[],
double* bins_last_value[],
4047 time_t
last_time[],
double last_value[],
4055 for (
int i=0;
i<num_var;
i++) {
4057 xbuffer[
i] = buffer[
i];
4062 buffer[
i]->
fMean = mean_bins[
i];
4064 buffer[
i]->
fRms = rms_bins[
i];
4066 buffer[
i]->
fMin = min_bins[
i];
4068 buffer[
i]->
fMax = max_bins[
i];
4069 if (bins_first_time)
4071 if (bins_first_value)
4075 if (bins_last_value)
4086 num_var, event_name,
var_name, var_index,
4090 for (
int i=0;
i<num_var;
i++) {
4095 for (
int j=0;
j<num_bins;
j++) {
4096 printf(
"var %d bin %d count %d, first %s last %s value first %f last %f\n",
i,
j, count_bins[
i][
j],
TimeToString(bins_first_time[
i][
j]).c_str(),
TimeToString(bins_last_time[
i][
j]).c_str(), bins_first_value[
i][
j], bins_last_value[
i][
j]);
4162 for (
size_t i=0;
i<sv->
size();
i++) {
4213 cm_msg(
MERROR,
"NewSqlSchema",
"Error: Unexpected ordering of schema for table \'%s\', good luck!", table_name);
4227 cm_msg(
MERROR,
"NewSqlSchema",
"Error: Cannot clone schema for table \'%s\', good luck!", table_name);
4238 size_t count_active = 0;
4239 size_t count_inactive = 0;
4243 count_inactive += 1;
4251 if (count_inactive > 0) {
4272 assert(
j == count_active);
4308 cm_msg(
MERROR,
"HsSqlSchema::write_event",
"Internal error, unexpected inactive column %zu",
i);
4319 cm_msg(
MERROR,
"HsSqlSchema::write_event",
"Internal error, unexpected negative offset %d for column %zu",
offset,
i);
4324 assert(n_data == 1);
4325 assert(strlen(column_name) > 0);
4327 assert((
size_t)
offset < data_size);
4341 sprintf(buf,
"unknownType%d",
type);
4344 sprintf(buf,
"%u",((
unsigned char *)ptr)[
j]);
4347 sprintf(buf,
"%d",((
signed char*)ptr)[
j]);
4351 sprintf(buf,
"\'%c\'",((
char*)ptr)[
j]);
4354 sprintf(buf,
"%u",((
unsigned short *)ptr)[
j]);
4357 sprintf(buf,
"%d",((
short *)ptr)[
j]);
4360 sprintf(buf,
"%u",((
unsigned int *)ptr)[
j]);
4363 sprintf(buf,
"%d",((
int *)ptr)[
j]);
4366 sprintf(buf,
"%u",((
unsigned int *)ptr)[
j]);
4370 sprintf(buf,
"\'%.8g\'",((
float*)ptr)[
j]);
4374 sprintf(buf,
"\'%.16g\'",((
double*)ptr)[
j]);
4383 localtime_r(&t, &tms);
4385 strftime(buf,
sizeof(buf)-1,
"%Y-%m-%d %H:%M:%S.0", &tms);
4388 cmd =
"INSERT INTO ";
4390 cmd +=
" (_t_time, _i_time";
4392 cmd +=
") VALUES (";
4432 time_t* last_written)
4435 printf(
"SqlHistory::read_last_written: table [%s], timestamp %s\n",
fTableName.c_str(),
TimeToString(timestamp).c_str());
4438 cmd +=
"SELECT _i_time FROM ";
4440 cmd +=
" WHERE _i_time < ";
4442 cmd +=
" ORDER BY _i_time DESC LIMIT 2;";
4478 const time_t end_time,
4479 const int num_var,
const std::vector<int>& var_schema_index,
const int var_index[],
4484 bool bad_last_time =
false;
4489 std::string collist;
4491 for (
int i=0;
i<num_var;
i++) {
4492 int j = var_schema_index[
i];
4495 if (collist.length() > 0)
4501 cmd +=
"SELECT _i_time, ";
4505 cmd +=
" WHERE _i_time>=";
4507 cmd +=
" and _i_time<=";
4509 cmd +=
" ORDER BY _i_time;";
4529 if (t < start_time || t > end_time)
4534 for (
int i=0;
i<num_var;
i++) {
4535 int j = var_schema_index[
i];
4540 bad_last_time =
true;
4546 buffer[
i]->
Add(t, v);
4556 if (bad_last_time) {
4596 if (*have_transaction)
4603 *have_transaction =
true;
4607static int CreateSqlTable(
SqlBase* sql,
const char* table_name,
bool* have_transaction,
bool set_default_timestamp =
false)
4617 cmd =
"CREATE TABLE ";
4618 cmd += sql->
QuoteId(table_name);
4619 if (set_default_timestamp) {
4620 cmd +=
" (_t_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4622 cmd +=
" (_t_time TIMESTAMP NOT NULL, _i_time INTEGER NOT NULL);";
4629 cm_msg(
MINFO,
"CreateSqlTable",
"Adding SQL table \"%s\", but it already exists", table_name);
4635 cm_msg(
MINFO,
"CreateSqlTable",
"Adding SQL table \"%s\", error status %d", table_name,
status);
4640 cm_msg(
MINFO,
"CreateSqlTable",
"Adding SQL table \"%s\"", table_name);
4643 std::string i_index_name;
4644 i_index_name = table_name;
4645 i_index_name +=
"_i_time_index";
4647 std::string t_index_name;
4648 t_index_name = table_name;
4649 t_index_name +=
"_t_time_index";
4651 cmd =
"CREATE INDEX ";
4652 cmd += sql->
QuoteId(i_index_name.c_str());
4654 cmd += sql->
QuoteId(table_name);
4655 cmd +=
" (_i_time ASC);";
4661 cmd =
"CREATE INDEX ";
4662 cmd += sql->
QuoteId(t_index_name.c_str());
4664 cmd += sql->
QuoteId(table_name);
4665 cmd +=
" (_t_time);";
4683 cmd =
"CREATE TABLE ";
4684 cmd += sql->
QuoteId(table_name);
4685 cmd +=
" (_t_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, _i_time INTEGER NOT NULL DEFAULT 0);";
4690 cm_msg(
MINFO,
"CreateSqlHyperTable",
"Adding SQL table \"%s\", but it already exists", table_name);
4696 cm_msg(
MINFO,
"CreateSqlHyperTable",
"Adding SQL table \"%s\", error status %d", table_name,
status);
4701 cm_msg(
MINFO,
"CreateSqlHyperTable",
"Adding SQL table \"%s\"", table_name);
4704 cmd =
"SELECT create_hypertable(";
4706 cmd +=
", '_t_time');";
4712 cm_msg(
MINFO,
"CreateSqlHyperTable",
"Converting SQL table to hypertable \"%s\", error status %d", table_name,
status);
4717 std::string i_index_name;
4718 i_index_name = table_name;
4719 i_index_name +=
"_i_time_index";
4721 std::string t_index_name;
4722 t_index_name = table_name;
4723 t_index_name +=
"_t_time_index";
4725 cmd =
"CREATE INDEX ";
4726 cmd += sql->
QuoteId(i_index_name.c_str());
4728 cmd += sql->
QuoteId(table_name);
4729 cmd +=
" (_i_time ASC);";
4735 cmd =
"CREATE INDEX ";
4736 cmd += sql->
QuoteId(t_index_name.c_str());
4738 cmd += sql->
QuoteId(table_name);
4739 cmd +=
" (_t_time);";
4751 printf(
"CreateSqlColumn: table [%s], column [%s], type [%s]\n", table_name, column_name, column_type);
4758 cmd =
"ALTER TABLE ";
4759 cmd += sql->
QuoteId(table_name);
4760 cmd +=
" ADD COLUMN ";
4761 cmd += sql->
QuoteId(column_name);
4768 cm_msg(
MINFO,
"CreateSqlColumn",
"Adding column \"%s\" to SQL table \"%s\", status %d", column_name, table_name,
status);
4815 virtual int update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction) = 0;
4818 int update_schema1(
HsSqlSchema* s,
const time_t timestamp,
const int ntags,
const TAG tags[],
bool write_enable,
bool* have_transaction);
4824 printf(
"hs_connect [%s]!\n", connect_string);
4834 if (!connect_string || strlen(connect_string) < 1) {
4836 connect_string =
".";
4842 printf(
"hs_connect: connecting to SQL database \'%s\'\n",
fConnectString.c_str());
4854 printf(
"hs_disconnect!\n");
4868 printf(
"SqlHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name,
TimeToString(timestamp).c_str(), ntags);
4890 cm_msg(
MERROR,
"SqlHistory::new_event",
"Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4905 cm_msg(
MERROR,
"SqlHistory::new_event",
"Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4910 printf(
"SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4917 cm_msg(
MERROR,
"SqlHistory::new_event",
"Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4928 cm_msg(
MERROR,
"SqlHistory::new_event",
"Error: Cannot update schema database for event \'%s\', see previous messages", event_name);
4933 printf(
"SqlHistory::new_event: schema for [%s] is %p\n", event_name, s);
4942 cm_msg(
MERROR,
"SqlHistory::new_event",
"Error: Cannot create schema for event \'%s\', see previous messages", event_name);
4959 printf(
"SqlHistory::read_schema: loading schema for event [%s] at time %s\n", event_name,
TimeToString(timestamp).c_str());
4963 if (sv->
size() == 0) {
4971 if (event_name == NULL)
4974 for (
size_t i=0;
i<sv->
size();
i++) {
4983 size_t nn = sv->
size();
4988 if (sv->
size() != nn)
5000 bool have_transaction =
false;
5004 if (have_transaction) {
5015 have_transaction =
false;
5026 printf(
"update_schema1\n");
5030 bool schema_ok =
true;
5033 for (
int i=0;
i<ntags;
i++) {
5034 for (
unsigned int j=0;
j<tags[
i].
n_data;
j++) {
5035 int tagtype = tags[
i].
type;
5036 std::string tagname = tags[
i].
name;
5039 if (tags[
i].n_data > 1) {
5041 sprintf(s,
"[%d]",
j);
5044 sprintf(s,
"_%d",
j);
5069 printf(
"Incompatible column!\n");
5086 printf(
"No column for tag %s!\n", tagname.c_str());
5088 bool found_column =
false;
5107 found_column =
true;
5118 if (!found_column && write_enable) {
5119 std::string col_name = maybe_colname;
5129 time_t now = time(NULL);
5132 for (
int t=0; t<20; t++) {
5135 if (dupe || retry) {
5136 col_name = maybe_colname;
5141 sprintf(s,
"_%d", t);
5147 printf(
"SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s]\n", s->
fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5153 printf(
"SqlHistory::update_schema: table [%s], add column [%s] type [%s] for tag [%s] failed: duplicate column name\n", s->
fTableName.c_str(), col_name.c_str(), col_type, tagname.c_str());
5176 cm_msg(
MERROR,
"SqlHistory::update_schema",
"Duplicate tags or SQL columns for history event \"%s\" tag \"%s\"", s->
fEventName.c_str(), tagname.c_str());
5188 for (
int i=0;
i<ntags;
i++) {
5189 for (
unsigned int j=0;
j<tags[
i].
n_data;
j++) {
5190 std::string tagname = tags[
i].
name;
5192 if (tags[
i].n_data > 1) {
5194 sprintf(s,
"[%d]",
j);
5227 printf(
"Return error!\n");
5241 printf(
"ReadSqliteTableNames: table [%s]\n", table_name);
5247 cmd =
"SELECT event_name, _i_time FROM \'_event_name_";
5249 cmd +=
"\' WHERE table_name='";
5264 std::string xevent_name = sql->
GetText(0);
5265 time_t xevent_time = sql->
GetTime(1);
5286 printf(
"ReadSqliteTableSchema: table [%s]\n", table_name);
5311 fSql =
new Sqlite();
5318 int update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction);
5326 printf(
"SqliteHistory::read_table_and_event_names!\n");
5330 std::vector<std::string> tables;
5335 for (
size_t i=0;
i<tables.size();
i++) {
5336 const char* table_name = tables[
i].c_str();
5339 s = strstr(table_name,
"_event_name_");
5340 if (s == table_name)
5342 s = strstr(table_name,
"_column_names_");
5343 if (s == table_name)
5355 printf(
"SqliteHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5359 std::vector<std::string> columns;
5364 for (
size_t i=0;
i<sv->
size();
i++) {
5373 for (
size_t j=0;
j<columns.size();
j+=2) {
5374 const char* cn = columns[
j+0].c_str();
5375 const char* ct = columns[
j+1].c_str();
5377 if (strcmp(cn,
"_t_time") == 0)
5379 if (strcmp(cn,
"_i_time") == 0)
5411 tn +=
"_column_names_";
5415 cmd =
"SELECT column_name, tag_name, tag_type, _i_time FROM ";
5417 cmd +=
" WHERE table_name=";
5419 cmd +=
" ORDER BY _i_time ASC;";
5449 for (
size_t i=0;
i<sv->
size();
i++) {
5477 printf(
"SqliteHistory::create_table: event [%s], timestamp %s\n", event_name,
TimeToString(timestamp).c_str());
5480 bool have_transaction =
false;
5499 en +=
"_event_name_";
5502 cmd =
"CREATE TABLE ";
5504 cmd +=
" (table_name TEXT NOT NULL, event_name TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5508 cmd =
"INSERT INTO ";
5510 cmd +=
" (table_name, event_name, _i_time) VALUES (";
5521 cn +=
"_column_names_";
5524 cmd =
"CREATE TABLE ";
5526 cmd +=
" (table_name TEXT NOT NULL, column_name TEXT NOT NULL, tag_name TEXT NOT NULL, tag_type TEXT NOT NULL, column_type TEXT NOT NULL, _i_time INTEGER NOT NULL);";
5538int SqliteHistory::update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction)
5541 printf(
"SqliteHistory::update_column: event [%s], table [%s], column [%s], new name [%s], timestamp %s\n", event_name, table_name, column_name, tag_name,
TimeToString(timestamp).c_str());
5549 cmd =
"INSERT INTO \'_column_names_";
5551 cmd +=
"\' (table_name, column_name, tag_name, tag_type, column_type, _i_time) VALUES (\'";
5585 int update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction);
5591 printf(
"ReadMysqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
5597 cmd =
"SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
5601 cmd =
"SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
5602 table_name =
"_history_index";
5610 bool found_must_have_table =
false;
5619 const char* xevent_name = sql->
GetText(0);
5620 const char* xtable_name = sql->
GetText(1);
5621 time_t xevent_time = sql->
GetTime(2);
5624 printf(
"entry %d event name [%s] table name [%s] time %s\n",
count, xevent_name, xtable_name,
TimeToString(xevent_time).c_str());
5627 if (must_have_table_name && (strcmp(xtable_name, must_have_table_name) == 0)) {
5628 assert(must_have_event_name != NULL);
5630 found_must_have_table =
true;
5649 if (must_have_table_name && !found_must_have_table) {
5650 cm_msg(
MERROR,
"ReadMysqlTableNames",
"Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
5654 ReadMysqlTableNames(sql, sv, table_name, 999, must_have_event_name, must_have_table_name);
5655 cm_msg(
MERROR,
"ReadMysqlTableNames",
"Error: Cannot continue, nothing will work after this error\n");
5663 printf(
"ReadMysqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
5673 printf(
"MysqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
5677 std::vector<std::string> columns;
5682 for (
size_t i=0;
i<sv->
size();
i++) {
5691 for (
size_t j=0;
j<columns.size();
j+=2) {
5692 const char* cn = columns[
j+0].c_str();
5693 const char* ct = columns[
j+1].c_str();
5695 if (strcmp(cn,
"_t_time") == 0)
5697 if (strcmp(cn,
"_i_time") == 0)
5731 cmd =
"SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
5753 int iactive = atoi(active);
5761 if (strlen(col_name) < 1)
5769 for (
size_t i=0;
i<sv->
size();
i++) {
5813 printf(
"ReadMysqlTableSchema: table [%s]\n", table_name);
5835 printf(
"MysqlHistory::read_table_and_event_names!\n");
5839 std::vector<std::string> tables;
5844 for (
size_t i=0;
i<tables.size();
i++) {
5845 const char* table_name = tables[
i].c_str();
5848 s = strstr(table_name,
"_history_index");
5849 if (s == table_name)
5866 printf(
"read_table_and_event_names:\n");
5878 printf(
"MysqlHistory::create_table: event [%s], timestamp %s\n", event_name,
TimeToString(timestamp).c_str());
5884 if (table_name.length() > 40) {
5885 table_name.resize(40);
5889 time_t now = time(NULL);
5891 int max_attempts = 10;
5892 for (
int i=0;
i<max_attempts;
i++) {
5898 bool have_transaction =
true;
5900 std::string xtable_name = table_name;
5908 sprintf(buf,
"%d",
i);
5925 cm_msg(
MERROR,
"MysqlHistory::create_table",
"Could not create table [%s] for event [%s], timestamp %s, please fix the SQL database configuration and try again", table_name.c_str(), event_name,
TimeToString(timestamp).c_str());
5933 for (
int j=0;
j<2;
j++) {
5935 cmd +=
"INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
5941 sprintf(buf,
"%.0f", (
double)timestamp);
5971 cm_msg(
MERROR,
"MysqlHistory::create_table",
"Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name,
TimeToString(timestamp).c_str(), max_attempts);
5976int MysqlHistory::update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction)
5979 printf(
"MysqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name,
TimeToString(timestamp).c_str());
5982 cmd +=
"INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
5996 sprintf(buf,
"%.0f", (
double)timestamp);
6021 Pgsql *fPgsql = NULL;
6024 fPgsql =
new Pgsql();
6031 int update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction);
6034static int ReadPgsqlTableNames(
SqlBase* sql,
HsSchemaVector *sv,
const char* table_name,
int debug,
const char* must_have_event_name,
const char* must_have_table_name)
6037 printf(
"ReadPgsqlTableNames: table [%s], must have event [%s] table [%s]\n", table_name, must_have_event_name, must_have_table_name);
6043 cmd =
"SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name='";
6047 cmd =
"SELECT event_name, table_name, itimestamp FROM _history_index WHERE table_name!='';";
6048 table_name =
"_history_index";
6056 bool found_must_have_table =
false;
6065 const char* xevent_name = sql->
GetText(0);
6066 const char* xtable_name = sql->
GetText(1);
6067 time_t xevent_time = sql->
GetTime(2);
6070 printf(
"entry %d event name [%s] table name [%s] time %s\n",
count, xevent_name, xtable_name,
TimeToString(xevent_time).c_str());
6073 if (must_have_table_name && (strcmp(xtable_name, must_have_table_name) == 0)) {
6074 assert(must_have_event_name != NULL);
6076 found_must_have_table =
true;
6095 if (must_have_table_name && !found_must_have_table) {
6096 cm_msg(
MERROR,
"ReadPgsqlTableNames",
"Error: Table [%s] for event [%s] missing from the history index\n", must_have_table_name, must_have_event_name);
6100 ReadPgsqlTableNames(sql, sv, table_name, 999, must_have_event_name, must_have_table_name);
6101 cm_msg(
MERROR,
"ReadPgsqlTableNames",
"Error: Cannot continue, nothing will work after this error\n");
6109 printf(
"ReadPgsqlTableNames: table_name [%s] event_name [%s] table_name [%s]\n", table_name, must_have_event_name, must_have_table_name);
6116int PgsqlHistory::read_column_names(
HsSchemaVector *sv,
const char* table_name,
const char* event_name)
6119 printf(
"PgsqlHistory::read_column_names: table [%s], event [%s]\n", table_name, event_name);
6123 std::vector<std::string> columns;
6124 fSql->ListColumns(table_name, &columns);
6128 for (
size_t i=0;
i<sv->
size();
i++) {
6137 for (
size_t j=0;
j<columns.size();
j+=2) {
6138 const char* cn = columns[
j+0].c_str();
6139 const char* ct = columns[
j+1].c_str();
6141 if (strcmp(cn,
"_t_time") == 0)
6143 if (strcmp(cn,
"_i_time") == 0)
6175 cmd =
"SELECT column_name, column_type, tag_name, tag_type, itimestamp, active FROM _history_index WHERE event_name=";
6176 cmd += fSql->QuoteString(event_name);
6179 int status = fSql->Prepare(table_name, cmd.c_str());
6191 const char* col_name = fSql->GetText(0);
6192 const char* col_type = fSql->GetText(1);
6193 const char* tag_name = fSql->GetText(2);
6194 const char* tag_type = fSql->GetText(3);
6195 time_t schema_time = fSql->GetTime(4);
6196 const char* active = fSql->GetText(5);
6197 int iactive = atoi(active);
6205 if (strlen(col_name) < 1)
6212 for (
size_t i=0;
i<sv->
size();
i++) {
6247 status = fSql->Finalize();
6257 printf(
"PgsqlHistory::read_table_and_event_names!\n");
6261 std::vector<std::string> tables;
6262 status = fSql->ListTables(&tables);
6266 for (
size_t i=0;
i<tables.size();
i++) {
6267 const char* table_name = tables[
i].c_str();
6270 s = strstr(table_name,
"_history_index");
6271 if (s == table_name)
6288 printf(
"read_table_and_event_names:\n");
6292 status = ReadPgsqlTableNames(fSql, sv, NULL, fDebug, NULL, NULL);
6297int PgsqlHistory::create_table(
HsSchemaVector* sv,
const char* event_name, time_t timestamp)
6300 printf(
"PgsqlHistory::create_table: event [%s], timestamp %s\n", event_name,
TimeToString(timestamp).c_str());
6306 if (table_name.length() > 40) {
6307 table_name.resize(40);
6311 time_t now = time(NULL);
6313 int max_attempts = 10;
6314 for (
int i=0;
i<max_attempts;
i++) {
6315 status = fSql->OpenTransaction(table_name.c_str());
6320 bool have_transaction =
true;
6322 std::string xtable_name = table_name;
6330 sprintf(buf,
"%d",
i);
6335 if (fPgsql->fDownsample)
6344 fSql->RollbackTransaction(table_name.c_str());
6349 fSql->RollbackTransaction(table_name.c_str());
6353 fSql->Exec(table_name.c_str(),
"SAVEPOINT t0");
6355 for (
int j=0;
j<2;
j++) {
6357 cmd +=
"INSERT INTO _history_index (event_name, table_name, itimestamp, active) VALUES (";
6358 cmd += fSql->QuoteString(event_name);
6360 cmd += fSql->QuoteString(xtable_name.c_str());
6363 sprintf(buf,
"%.0f", (
double)timestamp);
6366 cmd += fSql->QuoteString(
"1");
6369 int status = fSql->Exec(table_name.c_str(), cmd.c_str());
6375 fSql->Exec(table_name.c_str(),
"ROLLBACK TO SAVEPOINT t0");
6378 status =
CreateSqlColumn(fSql,
"_history_index",
"event_name",
"text not null", &have_transaction, fDebug);
6384 status =
CreateSqlColumn(fSql,
"_history_index",
"itimestamp",
"integer not null", &have_transaction, fDebug);
6387 status = fSql->CommitTransaction(table_name.c_str());
6394 return ReadPgsqlTableNames(fSql, sv, xtable_name.c_str(), fDebug, event_name, xtable_name.c_str());
6397 cm_msg(
MERROR,
"PgsqlHistory::create_table",
"Could not create table [%s] for event [%s], timestamp %s, after %d attempts", table_name.c_str(), event_name,
TimeToString(timestamp).c_str(), max_attempts);
6402int PgsqlHistory::update_column(
const char* event_name,
const char* table_name,
const char* column_name,
const char* column_type,
const char* tag_name,
const char* tag_type,
const time_t timestamp,
bool active,
bool* have_transaction)
6405 printf(
"PgsqlHistory::update_column: event [%s], table [%s], column [%s], type [%s] new name [%s], timestamp %s\n", event_name, table_name, column_name, column_type, tag_name,
TimeToString(timestamp).c_str());
6408 cmd +=
"INSERT INTO _history_index (event_name, table_name, tag_name, tag_type, column_name, column_type, itimestamp, active) VALUES (";
6409 cmd += fSql->QuoteString(event_name);
6411 cmd += fSql->QuoteString(table_name);
6413 cmd += fSql->QuoteString(tag_name);
6415 cmd += fSql->QuoteString(tag_type);
6417 cmd += fSql->QuoteString(column_name);
6419 cmd += fSql->QuoteString(column_type);
6422 sprintf(buf,
"%.0f", (
double)timestamp);
6426 cmd += fSql->QuoteString(
"1");
6428 cmd += fSql->QuoteString(
"0");
6431 int status = fSql->Exec(table_name, cmd.c_str());
6474 int create_file(
const char* event_name, time_t timestamp,
const std::vector<HsSchemaEntry>& vars, std::string* filenamep);
6485 printf(
"hs_connect [%s]!\n", connect_string);
6490 fPath = connect_string;
6493 if (
fPath.length() > 0) {
6504 printf(
"FileHistory::hs_clear_cache!\n");
6512 printf(
"FileHistory::hs_disconnect!\n");
6535 struct stat stat_buf;
6538 cm_msg(
MERROR,
"FileHistory::read_file_list",
"Cannot stat(%s), errno %d (%s)",
fPath.c_str(), errno, strerror(errno));
6546 printf(
"FileHistory::read_file_list: history directory \"%s\" mtime %d did not change\n",
fPath.c_str(),
int(stat_buf.st_mtime));
6553 printf(
"FileHistory::read_file_list: reading list of history files in \"%s\"\n",
fPath.c_str());
6555 std::vector<std::string> flist;
6560 double ls_elapsed = ls_time - start_time;
6561 if (ls_elapsed > 5.000) {
6562 cm_msg(
MINFO,
"FileHistory::read_file_list",
"\"ls -l\" of \"%s\" took %.1f sec",
fPath.c_str(), ls_elapsed);
6567 std::sort(flist.rbegin(), flist.rend());
6571 printf(
"file names sorted by time:\n");
6572 for (
size_t i=0;
i<flist.size();
i++) {
6573 printf(
"%d: %s\n",
i, flist[
i].c_str());
6578 std::vector<bool> fread;
6579 fread.resize(flist.size());
6586 for (
size_t i=0;
i<flist.size();
i++) {
6607 printf(
"FileHistory::read_schema: event [%s] at time %s\n", event_name,
TimeToString(timestamp).c_str());
6609 if (sv->
size() == 0) {
6611 printf(
"FileHistory::read_schema: schema is empty, do a full reload from disk\n");
6616 DWORD old_timeout = 0;
6620 bool changed =
false;
6630 if ((*sv).find_event(event_name, timestamp)) {
6632 printf(
"FileHistory::read_schema: event [%s] at time %s, no new history files, already have this schema\n", event_name,
TimeToString(timestamp).c_str());
6677 double read_elapsed = end_time - start_time;
6678 if (read_elapsed > 5.000) {
6679 cm_msg(
MINFO,
"FileHistory::read_schema",
"Loading schema for event \"%s\" timestamp %s, reading %d history files took %.1f sec", event_name,
TimeToString(timestamp).c_str(), count_read, read_elapsed);
6690 for (
int i=0;
i<ntags;
i++) {
6695 e.tag_name = tags[
i].
name;
6702 variables.push_back(
e);
6709 printf(
"FileHistory::new_event: event [%s], timestamp %s, ntags %d\n", event_name,
TimeToString(timestamp).c_str(), ntags);
6725 bool xdebug =
false;
6733 printf(
"AAA: [%s] [%s]!\n", s->
fEventName.c_str(), event_name);
6741 printf(
"BBB: event [%s]: ntags: %zu -> %d!\n", event_name, s->
fVariables.size(), ntags);
6750 printf(
"CCC: event [%s] index %zu: name [%s] -> [%s]!\n", event_name,
i, s->
fVariables[
i].name.c_str(), tags[
i].
name);
6755 printf(
"DDD: event [%s] index %zu: type %d -> %d!\n", event_name,
i, s->
fVariables[
i].type, tags[
i].
type);
6760 printf(
"EEE: event [%s] index %zu: n_data %d -> %d!\n", event_name,
i, s->
fVariables[
i].n_data, tags[
i].
n_data);
6770 printf(
"*** Schema for event %s has changed!\n", event_name);
6772 printf(
"*** Old schema for event [%s] time %s:\n", event_name,
TimeToString(timestamp).c_str());
6774 printf(
"*** New tags:\n");
6779 printf(
"FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema mismatch, starting a new file.\n", event_name,
TimeToString(timestamp).c_str(), ntags);
6791 printf(
"FileHistory::new_event: event [%s], timestamp %s, ntags %d: schema is too old, age %.1f months, starting a new file.\n", event_name,
TimeToString(timestamp).c_str(), ntags, (
double)age/(
double)
kMonth);
6804 printf(
"FileHistory::new_event: event [%s], timestamp %s, ntags %d: file too big, size %.1f MiBytes, max size %.1f MiBytes, starting a new file.\n", event_name,
TimeToString(timestamp).c_str(), ntags, size/
MiB,
fConfMaxFileSize/
MiB);
6812 std::string filename;
6814 std::vector<HsSchemaEntry> vars;
6824 cm_msg(
MERROR,
"FileHistory::new_event",
"Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6833 cm_msg(
MERROR,
"FileHistory::new_event",
"Error: Cannot create schema for event \'%s\', see previous messages", event_name);
6838 printf(
"*** New schema for event [%s] time %s:\n", event_name,
TimeToString(timestamp).c_str());
6847 printf(
"schema for [%s] is %p\n", event_name, s);
6863 printf(
"FileHistory::create_file: event [%s]\n", event_name);
6870 localtime_r(×tamp, &
tm);
6873 strftime(buf,
sizeof(buf),
"%Y%m%d", &
tm);
6875 std::string filename;
6884 std::string try_filename = filename +
".dat";
6887 for (
int itry=0; itry<10; itry++) {
6890 sprintf(s,
"_%d", rand());
6891 try_filename = filename + s +
".dat";
6894 fp = fopen(try_filename.c_str(),
"r");
6901 fp = fopen(try_filename.c_str(),
"w");
6904 cm_msg(
MERROR,
"FileHistory::create_file",
"Error: Cannot create file \'%s\' for event \'%s\', fopen() errno %d (%s)", try_filename.c_str(), event_name, errno, strerror(errno));
6914 cm_msg(
MERROR,
"FileHistory::create_file",
"Error: Cannot create file \'%s\' for event \'%s\'", filename.c_str(), event_name);
6920 ss +=
"version: 2.0\n";
6921 ss +=
"event_name: ";
6930 ss +=
"tag: /DWORD 1 4 /timestamp\n";
6933 bool padded =
false;
6936 bool xdebug =
false;
6938 for (
size_t i=0;
i<vars.size();
i++) {
6940 int n_bytes = vars[
i].n_data*tsize;
6941 int xalign = (
offset % tsize);
6944 printf(
"tag %zu, tsize %d, n_bytes %d, xalign %d\n",
i, tsize, n_bytes, xalign);
6950 int pad_bytes = tsize - xalign;
6951 assert(pad_bytes > 0);
6965 recsize += pad_bytes;
6967 assert((
offset % tsize) == 0);
6968 fprintf(stderr,
"FIXME: need to debug padding!\n");
6987 ss +=
"record_size: ";
6992 int sslength = ss.length() + 127;
6995 int nb = (sslength + block - 1)/block;
6996 int data_offset = block * nb;
6998 ss +=
"data_offset: ";
7002 fprintf(
fp,
"%s", ss.c_str());
7007 printf(
"Schema in file %s has padding:\n", try_filename.c_str());
7008 printf(
"%s", ss.c_str());
7012 *filenamep = try_filename;
7020 printf(
"FileHistory::read_file_schema: file %s\n", filename);
7022 FILE*
fp = fopen(filename,
"r");
7024 cm_msg(
MERROR,
"FileHistory::read_file_schema",
"Cannot read \'%s\', fopen() errno %d (%s)", filename, errno, strerror(errno));
7041 size_t rd_recsize = 0;
7046 char* b = fgets(buf,
sizeof(buf),
fp);
7056 bb = strchr(b,
'\n');
7060 bb = strchr(b,
'\r');
7064 bb = strstr(b,
"version: 2.0");
7078 bb = strstr(b,
"event_name: ");
7084 bb = strstr(b,
"time: ");
7086 s->
fTimeFrom = strtoul(bb + 6, NULL, 10);
7099 bb = strstr(b,
"tag: ");
7102 const char* midas_type = bb;
7103 char* bbb = strchr(bb,
' ');
7107 if (midas_type[0] ==
'/') {
7112 cm_msg(
MERROR,
"FileHistory::read_file_schema",
"Unknown MIDAS data type \'%s\' in history file \'%s\'", midas_type, filename);
7123 t.
n_data = strtoul(bbb, &bbb, 10);
7127 t.
n_bytes = strtoul(bbb, &bbb, 10);
7134 if (midas_type[0] !=
'/') {
7145 bb = strstr(b,
"record_size: ");
7151 bb = strstr(b,
"data_offset: ");
7162 cm_msg(
MERROR,
"FileHistory::read_file_schema",
"Malformed history schema in \'%s\', maybe it is not a history file", filename);
7167 cm_msg(
MERROR,
"FileHistory::read_file_schema",
"Record size mismatch in history schema from \'%s\', file says %zu while total of all tags is %zu", filename, s->
fRecordSize, rd_recsize);
7174 cm_msg(
MERROR,
"FileHistory::read_file_schema",
"Could not read history schema from \'%s\', maybe it is not a history file", filename);
7207 std::string new_filename;
7226 *new_fs_copy = *new_fs;
7259 cm_msg(
MERROR,
"MakeMidasHistorySqlite",
"Error: Cannot initialize SQLITE history - this MIDAS was built without SQLITE support - HAVE_SQLITE is not defined");
7269 cm_msg(
MERROR,
"MakeMidasHistoryMysql",
"Error: Cannot initialize MySQL history - this MIDAS was built without MySQL support - HAVE_MYSQL is not defined");
7277 return new PgsqlHistory();
7279 cm_msg(
MERROR,
"MakeMidasHistoryPgsql",
"Error: Cannot initialize PgSQL history - this MIDAS was built without PostgreSQL support - HAVE_PGSQL is not defined");
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
HsFileSchema * read_file_schema(const char *filename)
std::vector< std::string > fSortedFiles
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
std::vector< bool > fSortedRead
int hs_connect(const char *connect_string)
returns HS_SUCCESS
void tags_to_variables(int ntags, const TAG tags[], std::vector< HsSchemaEntry > &variables)
int create_file(const char *event_name, time_t timestamp, const std::vector< HsSchemaEntry > &vars, std::string *filenamep)
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
int read_file_list(bool *pchanged)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
void remove_inactive_columns()
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
void print(bool print_tags=true) const
int write_event(const time_t t, const char *data, const size_t data_size)
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
virtual void print(bool print_tags=true) const
std::vector< int > fOffsets
virtual void remove_inactive_columns()=0
std::vector< HsSchemaEntry > fVariables
virtual int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])=0
virtual int write_event(const time_t t, const char *data, const size_t data_size)=0
virtual int read_last_written(const time_t timestamp, const int debug, time_t *last_written)=0
virtual int flush_buffers()=0
virtual int match_event_var(const char *event_name, const char *var_name, const int var_index)
void print(bool print_tags=true) const
std::vector< HsSchema * > fData
HsSchema * find_event(const char *event_name, const time_t timestamp, int debug=0)
HsSchema * operator[](int index) const
int get_transaction_count()
int read_last_written(const time_t timestamp, const int debug, time_t *last_written)
int read_data(const time_t start_time, const time_t end_time, const int num_var, const std::vector< int > &var_schema_index, const int var_index[], const int debug, std::vector< time_t > &last_time, MidasHistoryBufferInterface *buffer[])
std::vector< std::string > fColumnNames
void print(bool print_tags=true) const
void remove_inactive_columns()
std::vector< std::string > fColumnTypes
int fTableTransactionCount
std::vector< bool > fColumnInactive
void increment_transaction_count()
int match_event_var(const char *event_name, const char *var_name, const int var_index)
static std::map< SqlBase *, int > gfTransactionCount
void reset_transaction_count()
int write_event(const time_t t, const char *data, const size_t data_size)
MidasHistoryBinnedBuffer(time_t first_time, time_t last_time, int num_bins)
~MidasHistoryBinnedBuffer()
void Add(time_t t, double v)
virtual void Add(time_t time, double value)=0
char type[NAME_LENGTH]
history channel name
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_table_and_event_names(HsSchemaVector *sv)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
void Add(time_t t, double v)
ReadBuffer(time_t first_time, time_t last_time, time_t interval)
void Realloc(int wantalloc)
int hs_get_tags(const char *event_name, time_t t, std::vector< TAG > *ptags)
get list of history variables for given event (use event names returned by hs_get_events()) that exis...
virtual int hs_connect(const char *connect_string)=0
returns HS_SUCCESS
virtual ~SchemaHistoryBase()
int hs_write_event(const char *event_name, time_t timestamp, int buffer_size, const char *buffer)
see hs_write_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_buffer(time_t start_time, time_t end_time, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], MidasHistoryBufferInterface *buffer[], int hs_status[])
returns HS_SUCCESS
virtual HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)=0
int hs_define_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
see hs_define_event(), returns HS_SUCCESS or HS_FILE_ERROR
int hs_read_binned(time_t start_time, time_t end_time, int num_bins, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], int *count_bins[], double *mean_bins[], double *rms_bins[], double *min_bins[], double *max_bins[], time_t *bins_first_time[], double *bins_first_value[], time_t *bins_last_time[], double *bins_last_value[], time_t last_time[], double last_value[], int st[])
returns HS_SUCCESS
HsSchemaVector fWriterSchema
virtual HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])=0
int hs_get_events(time_t t, std::vector< std::string > *pevents)
get list of events that exist(ed) at given time and later (value 0 means "return all events from begi...
std::vector< HsSchema * > fWriterEvents
int hs_get_last_written(time_t timestamp, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], time_t last_written[])
virtual int hs_set_debug(int debug)
set debug level, returns previous debug level
virtual int hs_disconnect()=0
disconnect from history, returns HS_SUCCESS
HsSchemaVector fReaderSchema
int hs_read(time_t start_time, time_t end_time, time_t interval, int num_var, const char *const event_name[], const char *const var_name[], const int var_index[], int num_entries[], time_t *time_buffer[], double *data_buffer[], int st[])
see hs_read(), returns HS_SUCCESS
int hs_clear_cache()
clear internal cache, returns HS_SUCCESS
std::string fConnectString
int hs_flush_buffers()
flush buffered data to storage where it is visible to mhttpd
virtual int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)=0
virtual int ListColumns(const char *table_name, std::vector< std::string > *plist)=0
virtual int Connect(const char *path)=0
virtual int ListColumns(const char *table, std::vector< std::string > *plist)=0
virtual double GetDouble(int column)=0
virtual int RollbackTransaction(const char *table_name)=0
virtual bool IsConnected()=0
virtual int CommitTransaction(const char *table_name)=0
virtual int ListTables(std::vector< std::string > *plist)=0
virtual std::string QuoteId(const char *s)=0
virtual int Disconnect()=0
virtual bool TypesCompatible(int midas_tid, const char *sql_type)=0
virtual int Prepare(const char *table_name, const char *sql)=0
virtual int Exec(const char *sql)=0
virtual int Connect(const char *dsn=0)=0
virtual int Exec(const char *table_name, const char *sql)=0
virtual std::string QuoteString(const char *s)=0
bool fTransactionPerTable
virtual time_t GetTime(int column)=0
virtual const char * GetText(int column)=0
virtual int ExecDisconnected(const char *table_name, const char *sql)=0
virtual int OpenTransaction(const char *table_name)=0
virtual const char * ColumnType(int midas_tid)=0
int update_schema1(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable, bool *have_transaction)
int hs_disconnect()
disconnect from history, returns HS_SUCCESS
int hs_set_debug(int debug)
set debug level, returns previous debug level
int read_schema(HsSchemaVector *sv, const char *event_name, const time_t timestamp)
HsSchema * maybe_reopen(const char *event_name, time_t timestamp, HsSchema *s)
virtual ~SqlHistoryBase()
virtual int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)=0
virtual int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)=0
virtual int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)=0
virtual int read_table_and_event_names(HsSchemaVector *sv)=0
int hs_connect(const char *connect_string)
returns HS_SUCCESS
HsSchema * new_event(const char *event_name, time_t timestamp, int ntags, const TAG tags[])
int update_schema(HsSqlSchema *s, const time_t timestamp, const int ntags, const TAG tags[], bool write_enable)
int update_column(const char *event_name, const char *table_name, const char *column_name, const char *column_type, const char *tag_name, const char *tag_type, const time_t timestamp, bool active, bool *have_transaction)
int read_column_names(HsSchemaVector *sv, const char *table_name, const char *event_name)
int create_table(HsSchemaVector *sv, const char *event_name, time_t timestamp)
int read_table_and_event_names(HsSchemaVector *sv)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
#define DB_NO_MORE_SUBKEYS
#define HS_UNDEFINED_EVENT
double ss_file_size(const char *path)
INT ss_file_find(const char *path, const char *pattern, char **plist)
INT cm_msg_flush_buffer()
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
const char * rpc_tid_name(INT id)
int rpc_name_tid(const char *name)
static std::string q(const char *s)
static const int tid_size[]
static const char * sql_type_mysql[]
static const char ** sql_type
static std::string MidasNameToSqlName(const char *s)
static int CreateSqlColumn(SqlBase *sql, const char *table_name, const char *column_name, const char *column_type, bool *have_transaction, int debug)
static int ReadSqliteTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
void DoctorPgsqlColumnType(std::string *col_type, const char *index_type)
static int ReadRecord(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t irec, char *rec)
static int CreateSqlHyperTable(SqlBase *sql, const char *table_name, bool *have_transaction)
void DoctorSqlColumnType(std::string *col_type, const char *index_type)
static bool MatchTagName(const char *tag_name, int n_data, const char *var_tag_name, const int var_tag_index)
static int var_name_cmp(const std::string &v1, const char *v2)
static std::string TimeToString(time_t t)
MidasHistoryInterface * MakeMidasHistorySqlite()
static void PrintTags(int ntags, const TAG tags[])
static char * skip_spaces(char *s)
static bool MatchEventName(const char *event_name, const char *var_event_name)
static HsSqlSchema * NewSqlSchema(HsSchemaVector *sv, const char *table_name, time_t t)
static int StartSqlTransaction(SqlBase *sql, const char *table_name, bool *have_transaction)
MidasHistoryInterface * MakeMidasHistoryMysql()
static int ReadSqliteTableSchema(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug)
static int event_name_cmp(const std::string &e1, const char *e2)
MidasHistoryInterface * MakeMidasHistoryPgsql()
MidasHistoryInterface * MakeMidasHistoryFile()
static int ReadMysqlTableNames(SqlBase *sql, HsSchemaVector *sv, const char *table_name, int debug, const char *must_have_event_name, const char *must_have_table_name)
static std::string SmallIntToString(int i)
static int CreateSqlTable(SqlBase *sql, const char *table_name, bool *have_transaction, bool set_default_timestamp=false)
static std::string MidasNameToFileName(const char *s)
static int FindTime(const char *file_name, int fd, off64_t offset, size_t recsize, off64_t nrec, time_t timestamp, off64_t *i1p, time_t *t1p, off64_t *i2p, time_t *t2p, time_t *tstart, time_t *tend, int debug)
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
#define DIR_SEPARATOR_STR
#define write(n, a, f, d)
struct callback_addr callback
BOOL match(char *pat, char *str)