16#include "git-revision.h"
22#include <sys/resource.h>
62#ifndef DOXYGEN_SHOULD_SKIP_THIS
161extern unsigned _stklen = 60000U;
274 "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"},
333 f = fopen(
"mem.txt",
"w");
351 memset(adr, 0, size *
count);
369 f = fopen(
"mem.txt",
"w");
381static std::vector<std::string>
split(
const char* sep,
const std::string& s)
383 unsigned sep_len = strlen(sep);
384 std::vector<std::string> v;
385 std::string::size_type pos = 0;
387 std::string::size_type next = s.find(sep, pos);
388 if (next == std::string::npos) {
389 v.push_back(s.substr(pos));
392 v.push_back(s.substr(pos, next-pos));
398static std::string
join(
const char* sep,
const std::vector<std::string>& v)
402 for (
unsigned i=0;
i<v.size();
i++) {
416 return s[s.length()-1] ==
c;
421 va_start(ap, format);
423 size_t size = vsnprintf(
nullptr, 0, format, ap1) + 1;
424 char *buffer = (
char *)malloc(size);
430 vsnprintf(buffer, size, format, ap);
433 std::string s(buffer);
476 return msprintf(
"unlisted status code %d", code);
524 if (pos != std::string::npos) {
536 for (
size_t i = 0;
i < flist.size();
i++) {
537 const char *p = flist[
i].c_str();
538 if (strchr(p,
'_') == NULL && !(p[0] >=
'0' && p[0] <=
'9')) {
539 size_t pos = flist[
i].rfind(
'.');
540 if (pos != std::string::npos) {
541 flist[
i].resize(pos);
543 list->push_back(flist[
i]);
552void cm_msg_get_logfile(
const char *
fac, time_t t, std::string* filename, std::string* linkname, std::string* linktarget) {
562 *filename = std::string(
fac) +
".log";
577 std::string facility;
583 std::string message_format;
585 if (message_format.find(
'%') != std::string::npos) {
592 localtime_r(&t, &tms);
596 strftime(
de + 1,
sizeof(
de)-1, strchr(message_format.c_str(),
'%'), &tms);
600 std::string message_dir;
602 if (message_dir.empty()) {
604 if (message_dir.empty()) {
606 if (message_dir.empty()) {
620 *filename = message_dir + facility + message_format +
".log";
621 if (!message_format.empty()) {
623 *linkname = message_dir + facility +
".log";
625 *linktarget = facility + message_format +
".log";
684 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n",
message,
status);
688 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n",
message);
694 std::string filename, linkname, linktarget;
699 if (!linkname.empty()) {
707 unlink(linkname.c_str());
708 status = symlink(linktarget.c_str(), linkname.c_str());
711 "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n",
712 linktarget.c_str(), linkname.c_str(), errno, strerror(errno));
717 int fh = open(filename.c_str(), O_WRONLY | O_CREAT | O_APPEND |
O_LARGEFILE, 0644);
720 "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n",
721 message, filename.c_str(), errno, strerror(errno));
729 localtime_r(&
tv.tv_sec, &tms);
732 strftime(
str,
sizeof(
str),
"%H:%M:%S", &tms);
733 sprintf(
str + strlen(
str),
".%03d ", (
int) (
tv.tv_usec / 1000));
734 strftime(
str + strlen(
str),
sizeof(
str),
"%G/%m/%d", &tms);
745 ssize_t len = msg.length();
748 ssize_t wr =
write(fh, msg.c_str(), len);
751 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", write() error, errno %d (%s)\n",
message, filename.c_str(), errno, strerror(errno));
752 }
else if (wr != len) {
753 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n",
message, filename.c_str(), (
int)wr, (
int)len);
764static std::string
cm_msg_format(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, va_list *argptr)
767 const char* pc = filename + strlen(filename);
768 while (*pc !=
'\\' && *pc !=
'/' && pc != filename)
774 std::string type_str;
783 if (message_type &
MT_LOG)
795 if (
name.length() > 0)
803 message +=
msprintf(
"[%s:%d:%s,%s] ", pc, line, routine, type_str.c_str());
804 }
else if (message_type ==
MT_USER) {
809 char* buf = (
char*)malloc(bufsize);
812 for (
int i=0;
i<10;
i++) {
814 va_copy(ap, *argptr);
817 int n = vsnprintf(buf, bufsize-1, format, ap);
829 buf = (
char*)realloc(buf, bufsize);
843 if (message_type !=
MT_LOG) {
846 size_t len = strlen(send_message);
848 char event[event_length];
851 memcpy(event +
sizeof(
EVENT_HEADER), send_message, len + 1);
885 for (
i = 0;
i < 100;
i++) {
930INT cm_msg(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, ...)
937 va_start(argptr, format);
946 if (message_type !=
MT_LOG) {
989 const char *facility,
const char *routine,
const char *format, ...) {
1001 va_start(argptr, format);
1080static void add_message(
char **messages,
int *length,
int *allocated, time_t tstamp,
const char *new_message) {
1081 int new_message_length = strlen(new_message);
1082 int new_allocated = 1024 + 2 * ((*allocated) + new_message_length);
1088 if (*length + new_message_length + 100 > *allocated) {
1089 *messages = (
char *) realloc(*messages, new_allocated);
1090 assert(*messages != NULL);
1091 *allocated = new_allocated;
1095 if ((*messages)[(*length) - 1] !=
'\n') {
1096 (*messages)[*length] =
'\n';
1100 sprintf(buf,
"%ld ", tstamp);
1101 buf_length = strlen(buf);
1102 memcpy(&((*messages)[*length]), buf, buf_length);
1103 (*length) += buf_length;
1105 memcpy(&((*messages)[*length]), new_message, new_message_length);
1106 (*length) += new_message_length;
1107 (*messages)[*length] = 0;
1111static int cm_msg_retrieve1(
const char *filename, time_t t,
INT n_messages,
char **messages,
int *length,
int *allocated,
1112 int *num_messages) {
1116 struct stat stat_buf;
1117 time_t tstamp, tstamp_valid, tstamp_last;
1123 fh = open(filename, O_RDONLY |
O_TEXT, 0644);
1125 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot open log file \"%s\", errno %d (%s)", filename, errno,
1131 fstat(fh, &stat_buf);
1132 ssize_t size = stat_buf.st_size;
1135 ssize_t maxsize = 10 * 1024 * 1024;
1136 if (size > maxsize) {
1137 lseek(fh, -maxsize, SEEK_END);
1142 char *buffer = (
char *) malloc(size + 1);
1144 if (buffer == NULL) {
1145 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (
int) size,
1146 filename, errno, strerror(errno));
1151 ssize_t rd =
read(fh, buffer, size);
1154 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)",
1155 (
int) size, filename, (
int) rd, errno, strerror(errno));
1163 p = buffer + size - 1;
1164 tstamp_last = tstamp_valid = 0;
1167 while (*p ==
'\n' || *p ==
'\r')
1171 for (
n = 0; !stop && p > buffer;) {
1175 for (
i = 0; p != buffer && (*p !=
'\n' && *p !=
'\r');
i++)
1179 if (
i >= (
int)
sizeof(
str))
1180 i =
sizeof(
str) - 1;
1186 memcpy(
str, p + 1,
i);
1188 if (strchr(
str,
'\n'))
1189 *strchr(
str,
'\n') = 0;
1190 if (strchr(
str,
'\r'))
1191 *strchr(
str,
'\r') = 0;
1192 mstrlcat(
str,
"\n",
sizeof(
str));
1199 localtime_r(&now, &tms);
1201 if (
str[0] >=
'0' &&
str[0] <=
'9') {
1203 tms.tm_hour = atoi(
str);
1204 tms.tm_min = atoi(
str + 3);
1205 tms.tm_sec = atoi(
str + 6);
1206 tms.tm_year = atoi(
str + 13) - 1900;
1207 tms.tm_mon = atoi(
str + 18) - 1;
1208 tms.tm_mday = atoi(
str + 21);
1211 tms.tm_hour = atoi(
str + 11);
1212 tms.tm_min = atoi(
str + 14);
1213 tms.tm_sec = atoi(
str + 17);
1214 tms.tm_year = atoi(
str + 20) - 1900;
1215 for (
i = 0;
i < 12;
i++)
1219 tms.tm_mday = atoi(
str + 8);
1223 tstamp_valid = tstamp;
1226 if (n_messages == 0) {
1227 if (tstamp_valid < t)
1232 if (n_messages != 0) {
1233 if (tstamp_last > 0 && tstamp_valid < tstamp_last)
1237 if (t == 0 || tstamp == -1 ||
1238 (n_messages > 0 && tstamp <= t) ||
1239 (n_messages == 0 && tstamp >= t)) {
1246 while (*p ==
'\n' || *p ==
'\r')
1249 if (n_messages == 1)
1251 else if (n_messages > 1) {
1253 if (
n == n_messages)
1254 tstamp_last = tstamp_valid;
1257 if (
n == n_messages && tstamp_valid == 0)
1280 std::string filename, linkname;
1292 if (!linkname.empty()) {
1294 filename = linkname;
1298 cm_msg_retrieve1(filename.c_str(), t, n_message, messages, &length, &allocated, &
n);
1304 if (linkname.empty()) {
1312 while (
n < n_message) {
1313 filedate -= 3600 * 24;
1320 cm_msg_retrieve1(filename.c_str(), t, n_message -
n, messages, &length, &allocated, &
i);
1351 char *messages = NULL;
1352 int num_messages = 0;
1360 mstrlcpy(
message, messages, buf_size);
1361 int len = strlen(messages);
1397 if (seconds != NULL) {
1500 return GIT_REVISION;
1514 assert(path[0] != 0);
1537 assert(path_size !=
sizeof(
char *));
1541 mstrlcpy(path,
_path_name.c_str(), path_size);
1561 assert(path != NULL);
1607#ifdef LOCAL_ROUTINES
1634 if (getenv(
"MIDAS_DIR")) {
1639 if (getenv(
"MIDAS_EXPT_NAME")) {
1640 e.
name = getenv(
"MIDAS_EXPT_NAME");
1643 cm_msg(
MERROR,
"cm_read_exptab",
"Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"",
e.name.c_str());
1646 e.directory = getenv(
"MIDAS_DIR");
1655#if defined (OS_WINNT)
1657 if (getenv(
"SystemRoot"))
1658 str = getenv(
"SystemRoot");
1659 else if (getenv(
"windir"))
1660 str = getenv(
"windir");
1664 std::string alt_str =
str;
1665 str +=
"\\system32\\exptab";
1666 alt_str +=
"\\system\\exptab";
1667#elif defined (OS_UNIX)
1668 std::string
str =
"/etc/exptab";
1669 std::string alt_str =
"/exptab";
1671 std::strint
str =
"exptab";
1672 std::string alt_str =
"exptab";
1676 if (getenv(
"MIDAS_EXPTAB")) {
1677 str = getenv(
"MIDAS_EXPTAB");
1678 alt_str = getenv(
"MIDAS_EXPTAB");
1684 FILE* f = fopen(
str.c_str(),
"r");
1686 f = fopen(alt_str.c_str(),
"r");
1695 memset(buf, 0,
sizeof(buf));
1696 char*
str = fgets(buf,
sizeof(buf)-1, f);
1699 if (
str[0] == 0)
continue;
1700 if (
str[0] ==
'#')
continue;
1708 while (*
str && isspace(*
str))
1714 while (*p2 && !isspace(*p2))
1717 ssize_t len = p2-p1;
1724 e.
name = std::string(p1, len);
1732 while (*
str && isspace(*
str))
1738 while (*p2 && !isspace(*p2))
1748 e.directory = std::string(p1, len);
1756 while (*
str && isspace(*
str))
1762 while (*p2 && !isspace(*p2))
1769 e.user = std::string(p1, len);
1783 for (
unsigned j=0;
j<exptab->
exptab.size();
j++) {
1784 cm_msg(
MINFO,
"cm_read_exptab",
"entry %d, experiment \"%s\", directory \"%s\", user \"%s\"",
j, exptab->
exptab[
j].name.c_str(), exptab->
exptab[
j].directory.c_str(), exptab->
exptab[
j].user.c_str());
1845int cm_get_exptab(
const char *expname,
char *dir,
int dir_size,
char *user,
int user_size) {
1846 std::string sdir, suser;
1850 mstrlcpy(dir, sdir.c_str(), dir_size);
1852 mstrlcpy(user, suser.c_str(), user_size);
1888#ifdef LOCAL_ROUTINES
1909 char *client_name,
INT hw_type,
const char *password,
DWORD watchdog_timeout) {
1912 host_name, client_name, hw_type, password, watchdog_timeout);
1914#ifdef LOCAL_ROUTINES
1919 BOOL call_watchdog, allow;
1941 if (!allow && strcmp(password,
pwd) != 0) {
1954 sprintf(
str,
"System/Clients/%0d", pid);
1964 strcpy(
name, client_name);
1965 strcpy(orig_name, client_name);
1988 sprintf(
name,
"%s%d", client_name, idx);
1995 sprintf(
str,
"System/Clients/%0d/Name", pid);
1999 cm_msg(
MERROR,
"cm_set_client_info",
"cannot set client name, db_set_value(%s) status %d",
str,
status);
2004 strcpy(client_name,
name);
2011 sprintf(
str,
"System/Clients/%0d", pid);
2040 size =
sizeof(watchdog_timeout);
2041 sprintf(
str,
"/Programs/%s/Watchdog Timeout", orig_name);
2045 sprintf(
str,
"/Programs/%s", orig_name);
2155 if (
host_name && getenv(
"MIDAS_SERVER_HOST"))
2156 mstrlcpy(
host_name, getenv(
"MIDAS_SERVER_HOST"), host_name_size);
2158 if (
exp_name && getenv(
"MIDAS_EXPT_NAME"))
2159 mstrlcpy(
exp_name, getenv(
"MIDAS_EXPT_NAME"), exp_name_size);
2170 if (
host_name && getenv(
"MIDAS_SERVER_HOST"))
2171 *
host_name = getenv(
"MIDAS_SERVER_HOST");
2173 if (
exp_name && getenv(
"MIDAS_EXPT_NAME"))
2174 *
exp_name = getenv(
"MIDAS_EXPT_NAME");
2179#ifdef LOCAL_ROUTINES
2183 std::string exp_name1;
2193 std::string expdir, expuser;
2203 cm_msg(
MERROR,
"cm_set_experiment_local",
"Experiment \"%s\" directory \"%s\" does not exist", exp_name1.c_str(), expdir.c_str());
2218 cm_msg(
MERROR,
"cm_check_connect",
"cm_disconnect_experiment not called at end of program");
2313 const char *client_name,
void (*func)(
char *),
INT odb_size,
DWORD watchdog_timeout) {
2341 if (WSAStartup(MAKEWORD(1, 1), &WSAData) != 0)
2346 std::string default_exp_name1;
2347 if (default_exp_name)
2348 default_exp_name1 = default_exp_name;
2352 if (default_exp_name1.length() == 0) {
2371#ifdef LOCAL_ROUTINES
2380 INT semaphore_elog, semaphore_alarm, semaphore_history, semaphore_msg;
2385 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create alarm semaphore");
2390 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create elog semaphore");
2395 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create history semaphore");
2400 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create message semaphore");
2419 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open database, db_open_database() status %d",
status);
2427 size =
sizeof(odb_timeout);
2430 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/ODB timeout, status %d",
status);
2433 if (odb_timeout > 0) {
2438 size =
sizeof(protect_odb);
2441 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Protect ODB, status %d",
status);
2449 size =
sizeof(enable_core_dumps);
2452 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Enable core dumps, status %d",
status);
2455 if (enable_core_dumps) {
2457 struct rlimit limit;
2458 limit.rlim_cur = RLIM_INFINITY;
2459 limit.rlim_max = RLIM_INFINITY;
2460 status = setrlimit(RLIMIT_CORE, &limit);
2462 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)", errno,
2466#warning setrlimit(RLIMIT_CORE) is not available
2475 "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d",
status);
2478 std::string local_host_name;
2482 local_host_name =
"localhost";
2487 if (watchdog_timeout == 0)
2490 strcpy(client_name1, client_name);
2527 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open message buffer, cm_msg_open_buffer() status %d",
status);
2535 std::string current_name;
2537 if (current_name.length() == 0 || current_name ==
"Default") {
2551 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot register RPC server, cm_register_server() status %d",
status);
2559 size =
sizeof(watchdog_timeout);
2560 sprintf(
str,
"/Programs/%s/Watchdog Timeout", client_name);
2566 std::string path =
"/Programs/" + std::string(client_name);
2569 prog[
"Start command"] == std::string(
""))
2576 cm_msg(
MLOG,
"cm_connect_experiment",
"Program %s on host %s started", xclient_name.c_str(), local_host_name.c_str());
2593#ifdef LOCAL_ROUTINES
2602 assert(exp_names != NULL);
2634 assert(exp_names != NULL);
2638 mstrlcpy(hname,
host_name,
sizeof(hname));
2639 s = strchr(hname,
':');
2642 port = strtoul(s + 1, NULL, 0);
2650 cm_msg(
MERROR,
"cm_list_experiments_remote",
"Cannot connect to \"%s\" port %d: %s", hname, port, errmsg.c_str());
2655 send(sock,
"I", 2, 0);
2668 exp_names->push_back(
str);
2676#ifdef LOCAL_ROUTINES
2696 if (expts.size() == 1) {
2698 }
else if (expts.size() > 1) {
2699 printf(
"Available experiments on local computer:\n");
2701 for (
unsigned i = 0;
i < expts.size();
i++) {
2702 printf(
"%d : %s\n",
i, expts[
i].c_str());
2706 printf(
"Select number from 0 to %d: ", ((
int)expts.size())-1);
2709 int isel = atoi(
str);
2712 if (isel >= (
int)expts.size())
2745 if (expts.size() > 1) {
2746 printf(
"Available experiments on server %s:\n",
host_name);
2748 for (
unsigned i = 0;
i < expts.size();
i++) {
2749 printf(
"%d : %s\n",
i, expts[
i].c_str());
2753 printf(
"Select number from 0 to %d: ", ((
int)expts.size())-1);
2756 int isel = atoi(
str);
2759 if (isel >= (
int)expts.size())
2814 length =
sizeof(
INT);
2869 printf(
"Waiting for transition to finish...\n");
2881 std::string local_host_name;
2884 local_host_name =
"localhost";
2892 cm_msg(
MLOG,
"cm_disconnect_experiment",
"Program %s on host %s stopped", client_name.c_str(), local_host_name.c_str());
2970#ifndef DOXYGEN_SHOULD_SKIP_THIS
3031 if (hKeyClient != NULL)
3038 if (hKeyClient != NULL)
3045#ifndef DOXYGEN_SHOULD_SKIP_THIS
3069 if (semaphore_alarm)
3073 if (semaphore_history)
3078 *semaphore_msg = -1;
3086#ifdef LOCAL_ROUTINES
3102 assert(pbuf != NULL);
3106 printf(
"lock_buffer_guard(%s) ctor without lock\n",
fBuf->
buffer_name);
3125 printf(
"lock_buffer_guard(invalid) dtor\n");
3127 assert(
fBuf != NULL);
3145 assert(
fBuf != NULL);
3156 assert(
fBuf != NULL);
3173 assert(
fBuf != NULL);
3261#ifdef LOCAL_ROUTINES
3264 std::vector<BUFFER*> mybuffers;
3271 for (
BUFFER* pbuf : mybuffers) {
3273 if (!pbuf || !pbuf->attached)
3342 *call_watchdog =
FALSE;
3363#ifdef LOCAL_ROUTINES
3372#ifndef DOXYGEN_SHOULD_SKIP_THIS
3395 str = (
char *) malloc(max_size);
3399 int size = max_size;
3404 if (strlen(
str) < 1)
3416 int new_size = last + 10;
3420 "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d", new_size,
status);
3433 strcpy(buf,
"localhost");
3439 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create the RPC hosts access control list, db_get_value() status %d",
3449 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create \"Disable RPC hosts check\", db_get_value() status %d",
status);
3459 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot find the RPC hosts access control list, db_find_key() status %d",
3469 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot watch the RPC hosts access control list, db_watch() status %d",
status);
3504 size =
sizeof(
name);
3508 cm_msg(
MERROR,
"cm_register_server",
"cannot get client name, db_get_value() status %d",
status);
3512 mstrlcpy(
str,
"/Experiment/Security/RPC ports/",
sizeof(
str));
3515 size =
sizeof(port);
3519 cm_msg(
MERROR,
"cm_register_server",
"cannot get RPC port number, db_get_value(%s) status %d",
str,
status);
3527 cm_msg(
MERROR,
"cm_register_server",
"error, rpc_register_server(port=%d) status %d", port,
status);
3540 cm_msg(
MERROR,
"cm_register_server",
"error, db_find_key(\"Server Port\") status %d",
status);
3550 cm_msg(
MERROR,
"cm_register_server",
"error, db_set_data(\"Server Port\"=%d) status %d", port,
status);
3771 }
else if (
count > 1) {
3818 "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout",
3839#ifndef DOXYGEN_SHOULD_SKIP_THIS
3862 char tr_key_name[256];
3894 cm_msg(
MERROR,
"cm_register_deferred_transition",
"Cannot hotlink /Runinfo/Requested Transition");
3929 cm_msg(
MERROR,
"cm_check_deferred_transition",
"Cannot perform deferred transition: %s",
str);
3945#ifndef DOXYGEN_SHOULD_SKIP_THIS
3991 printf(
", wait for:");
3999static bool tr_compare(
const std::unique_ptr<TrClient>& arg1,
const std::unique_ptr<TrClient>& arg2) {
4000 return arg1->sequence_number < arg2->sequence_number;
4030 const char *buf =
"Success";
4034 sprintf(buf,
"status %d",
status);
4108 const char *args[100];
4110 char debug_arg[256];
4111 char start_arg[256];
4113 std::string mserver_hostname;
4119 const char *midassys = getenv(
"MIDASSYS");
4126 path +=
"mtransition";
4128 args[iarg++] = path.c_str();
4133 args[iarg++] =
"-h";
4134 args[iarg++] = mserver_hostname.c_str();
4141 args[iarg++] =
"-e";
4146 args[iarg++] =
"-d";
4148 sprintf(debug_arg,
"%d", debug_flag);
4149 args[iarg++] = debug_arg;
4153 args[iarg++] =
"STOP";
4155 args[iarg++] =
"PAUSE";
4157 args[iarg++] =
"RESUME";
4159 args[iarg++] =
"START";
4162 args[iarg++] = start_arg;
4165 args[iarg++] = NULL;
4168 for (iarg = 0; args[iarg] != NULL; iarg++) {
4169 printf(
"arg[%d] [%s]\n", iarg, args[iarg]);
4176 if (errstr != NULL) {
4177 sprintf(errstr,
"Cannot execute mtransition, ss_spawnv() returned %d",
status);
4192 int connect_timeout = 10000;
4193 int timeout = 120000;
4221 assert(wait_for_index >= 0);
4222 assert(wait_for_index < (
int)s->
clients.size());
4244 if (wait_for == NULL)
4251 printf(
"Client \"%s\" waits for client \"%s\"\n", tr_client->
client_name.c_str(), wait_for->
client_name.c_str());
4258 cm_msg(
MERROR,
"cm_transition_call",
"Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared", tr_client->
client_name.c_str(), tr_client->
transition, wait_for->
client_name.c_str());
4274 printf(
"Connecting to client \"%s\" on host %s...\n", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4276 cm_msg(
MINFO,
"cm_transition_call",
"cm_transition_call: Connecting to client \"%s\" on host %s...", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4279 size =
sizeof(timeout);
4282 if (connect_timeout < 1000)
4283 connect_timeout = 1000;
4286 size =
sizeof(timeout);
4311 "cannot connect to client \"%s\" on host %s, port %d, status %d",
4335 printf(
"Connection established to client \"%s\" on host %s\n", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4338 "cm_transition: Connection established to client \"%s\" on host %s",
4350 printf(
"Executing RPC transition client \"%s\" on host %s...\n",
4354 "cm_transition: Executing RPC transition client \"%s\" on host %s...",
4382 printf(
"RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n",
4386 "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d",
4406 printf(
"hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n",
4453 for (
size_t i = 0;
i <
n;
i++) {
4461 printf(
"Calling local transition callback\n");
4463 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Calling local transition callback");
4479 printf(
"Local transition callback finished, status %d\n",
int(tr_client->
status));
4481 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Local transition callback finished, status %d",
int(tr_client->
status));
4489 return tr_client->
status;
4551 char tr_key_name[256];
4561 errstr_size =
sizeof(xerrstr);
4577 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
4588 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
4589 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
4591 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
4644 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to alarms: %s", alarms.c_str());
4645 mstrlcpy(errstr,
"Cannot start run due to alarms: ", errstr_size);
4646 mstrlcat(errstr, alarms.c_str(), errstr_size);
4657 HNDLE hkeyroot, hkey;
4674 size =
sizeof(program_info_required);
4677 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info required, status %d",
status);
4681 if (program_info_required) {
4686 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to program \"%s\" not running",
key.
name);
4687 std::string serrstr =
msprintf(
"Run start abort due to program \"%s\" not running",
key.
name);
4688 mstrlcpy(errstr, serrstr.c_str(), errstr_size);
4702 mstrlcpy(errstr,
"Unknown error", errstr_size);
4704 if (debug_flag == 0) {
4731 if (debug_flag == 1)
4732 printf(
"Setting run number %d in ODB\n",
run_number);
4733 if (debug_flag == 2)
4738 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/Run number in database, status %d",
status);
4744 if (debug_flag == 1)
4745 printf(
"Clearing /Runinfo/Requested transition\n");
4746 if (debug_flag == 2)
4747 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Clearing /Runinfo/Requested transition");
4755 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4757 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4766 mstrlcpy(errstr,
"Deferred transition already in progress", errstr_size);
4767 mstrlcat(errstr,
", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size);
4774 sprintf(tr_key_name,
"Transition %s DEFERRED", trname.c_str());
4783 size =
sizeof(sequence_number);
4792 if (debug_flag == 1)
4793 printf(
"---- Transition %s deferred by client \"%s\" ----\n", trname.c_str(),
str);
4794 if (debug_flag == 2)
4795 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s deferred by client \"%s\" ----", trname.c_str(),
str);
4797 if (debug_flag == 1)
4798 printf(
"Setting /Runinfo/Requested transition\n");
4799 if (debug_flag == 2)
4800 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Setting /Runinfo/Requested transition");
4811 sprintf(errstr,
"Transition %s deferred by client \"%s\"", trname.c_str(),
str);
4842 size =
sizeof(program_info_auto_start);
4845 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto start, status %d",
status);
4849 if (program_info_auto_start) {
4851 start_command[0] = 0;
4853 size =
sizeof(start_command);
4856 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info start command, status %d",
status);
4860 if (start_command[0]) {
4861 cm_msg(
MINFO,
"cm_transition",
"Auto Starting program \"%s\", command \"%s\"",
key.
name,
4896 size =
sizeof(
state);
4902 cm_msg(
MERROR,
"cm_transition",
"cannot get Runinfo/State in database");
4909 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time binary\" in database");
4916 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time\" in database");
4922 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4924 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4954 if (debug_flag == 1)
4955 printf(
"---- Transition %s started ----\n", trname.c_str());
4956 if (debug_flag == 2)
4957 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s started ----", trname.c_str());
4959 sprintf(tr_key_name,
"Transition %s", trname.c_str());
4963 for (
int i = 0,
status = 0;;
i++) {
4980 size =
sizeof(sequence_number);
4989 c->async_flag = async_flag;
4990 c->debug_flag = debug_flag;
4991 c->sequence_number = sequence_number;
4993 c->key_name = subkey.
name;
4997 size =
sizeof(client_name);
4999 c->client_name = client_name;
5012 size =
sizeof(port);
5024 if (cc->
port ==
c->port)
5030 s.
clients.push_back(std::unique_ptr<TrClient>(
c));
5033 cm_msg(
MERROR,
"cm_transition",
"transition %s: client \"%s\" is registered with sequence number %d more than once", trname.c_str(),
c->client_name.c_str(),
c->sequence_number);
5045 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5046 if (s.
clients[idx]->sequence_number == 0) {
5051 for (
size_t i = idx - 1; ;
i--) {
5052 if (s.
clients[
i]->sequence_number < s.
clients[idx]->sequence_number) {
5053 if (s.
clients[
i]->sequence_number > 0) {
5054 s.
clients[idx]->wait_for_index.push_back(
i);
5064 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5069 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5070 printf(
"TrClient[%d]: ",
int(idx));
5078 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5079 if (debug_flag == 1)
5080 printf(
"\n==== Found client \"%s\" with sequence number %d\n",
5081 s.
clients[idx]->client_name.c_str(), s.
clients[idx]->sequence_number);
5082 if (debug_flag == 2)
5084 "cm_transition: ==== Found client \"%s\" with sequence number %d",
5085 s.
clients[idx]->client_name.c_str(), s.
clients[idx]->sequence_number);
5089 assert(s.
clients[idx]->thread == NULL);
5092 if (s.
clients[idx]->port == 0) {
5102 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: client \"%s\" returned status %d", trname.c_str(),
5103 s.
clients[idx]->client_name.c_str(),
int(s.
clients[idx]->status));
5113 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5116 s.
clients[idx]->thread->join();
5117 delete s.
clients[idx]->thread;
5118 s.
clients[idx]->thread = NULL;
5129 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: \"/Runinfo/Transition in progress\" was cleared", trname.c_str());
5132 mstrlcpy(errstr,
"Canceled", errstr_size);
5138 for (
size_t idx = 0; idx < s.
clients.size(); idx++)
5142 mstrlcpy(errstr, s.
clients[idx]->errorstr.c_str(), errstr_size);
5157 if (debug_flag == 1)
5158 printf(
"\n---- Transition %s finished ----\n", trname.c_str());
5159 if (debug_flag == 2)
5160 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s finished ----", trname.c_str());
5175 size =
sizeof(
state);
5178 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/State in database, db_set_value() status %d",
status);
5226 size =
sizeof(program_info_auto_stop);
5229 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto stop, status %d",
status);
5233 if (program_info_auto_stop) {
5247 mstrlcpy(errstr,
"Success", errstr_size);
5261 cm_msg(
MERROR,
"cm_transition",
"Could not start a run: cm_transition() status %d, message \'%s\'",
status,
5305 int sflag = async_flag &
TR_SYNC;
5310 cm_msg(
MERROR,
"cm_transition",
"previous transition did not finish yet");
5325 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
5333 int size =
sizeof(
i);
5337 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
5338 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
5340 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
5399#ifndef DOXYGEN_SHOULD_SKIP_THIS
5427 if (client_socket) {
5441 if (strchr(
str,
' '))
5442 *strchr(
str,
' ') = 0;
5460 printf(
"Received 2nd Ctrl-C, hard abort\n");
5463 printf(
"Received Ctrl-C, aborting...\n");
5523 std::string command;
5528 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" of type TID_STRING, db_get_value_string() error %d",
5529 odb_path_to_script,
status);
5533 for (
int i = 0;;
i++) {
5545 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" should not be TID_KEY", odb_path_to_script,
5550 char *buf = (
char *) malloc(size);
5551 assert(buf != NULL);
5554 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" of type %d, db_get_data() error %d",
5568 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" has invalid type %d, should be TID_STRING or TID_KEY",
5569 odb_path_to_script,
key.
type);
5575 if (command.length() > 0) {
5576 cm_msg(
MINFO,
"cm_exec_script",
"Executing script \"%s\" from ODB \"%s\"", command.c_str(), odb_path_to_script);
5597 static DWORD alarm_last_checked_sec = 0;
5601 static DWORD last_millitime = 0;
5602 DWORD tdiff_millitime = now_millitime - last_millitime;
5603 const DWORD kPeriod = 1000;
5604 if (last_millitime == 0) {
5605 last_millitime = now_millitime;
5606 tdiff_millitime = kPeriod;
5616 if (now_sec - alarm_last_checked_sec > 10) {
5618 alarm_last_checked_sec = now_sec;
5623 if (tdiff_millitime >= kPeriod) {
5625 if (tdiff_millitime > 60000)
5626 wrong_interval =
TRUE;
5630 bm_cleanup(
"cm_periodic_tasks", now_millitime, wrong_interval);
5631 db_cleanup(
"cm_periodic_tasks", now_millitime, wrong_interval);
5635 last_millitime = now_millitime;
5745 static int check_cm_execute = 1;
5746 static int enable_cm_execute = 0;
5751 if (check_cm_execute) {
5755 check_cm_execute = 0;
5760 size =
sizeof(enable_cm_execute);
5767 if (!enable_cm_execute) {
5769 mstrlcpy(buf, command,
sizeof(buf));
5770 cm_msg(
MERROR,
"cm_execute",
"cm_execute(%s...) is disabled by ODB \"/Experiment/Enable cm_execute\"", buf);
5775 strcpy(
str, command);
5781 fh = open(
str, O_RDONLY, 0644);
5784 n =
read(fh, result, bufsize - 1);
5785 result[
MAX(0,
n)] = 0;
5790 status = system(command);
5804#ifndef DOXYGEN_SHOULD_SKIP_THIS
5840 sprintf(
str,
"RPC/%d",
id);
5868 if (history_channel && (strlen(history_channel) > 0)) {
5870 p +=
"/Logger/History/";
5871 p += history_channel;
5872 p +=
"/History dir";
5933#ifdef LOCAL_ROUTINES
5943 bool badindex =
false;
5944 bool badclient =
false;
5954 if (pclient->
name[0] == 0)
5969 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5972 }
else if (badclient) {
5973 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5978 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5985 if (badindex || badclient) {
5986 static int prevent_recursion = 1;
5988 if (prevent_recursion) {
5989 prevent_recursion = 0;
5997 cm_msg(
MERROR,
"bm_validate_client_index",
"Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
6006 fprintf(stderr,
"bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6046#ifdef LOCAL_ROUTINES
6072 pbctmp = pheader->
client;
6089 pbclient = pheader->
client;
6093 if (pbclient->
pid) {
6096 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->
name,
6097 pheader->
name, who, pbclient->
pid);
6108 printf(
"buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6120 cm_msg(
MINFO,
"bm_cleanup",
"Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6121 pbclient->
name, pheader->
name, who,
6137 std::vector<BUFFER*> mybuffers;
6143 for (
BUFFER* pbuf : mybuffers) {
6146 if (pbuf->attached) {
6156 if (pclient->
pid == pid) {
6171#ifdef LOCAL_ROUTINES
6175 std::vector<BUFFER*> mybuffers;
6182 for (
BUFFER* pbuf : mybuffers) {
6185 if (pbuf->attached) {
6197 if (!wrong_interval)
6204#ifdef LOCAL_ROUTINES
6207 if (rp < 0 || rp > pheader->
size) {
6209 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6222 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6237static FILE* gRpLog = NULL;
6243 if (gRpLog == NULL) {
6244 gRpLog = fopen(
"rp.log",
"a");
6246 if (gRpLog && (total_size < 16)) {
6247 const char *pdata = (
const char *) (pheader + 1);
6248 const DWORD *pevent = (
const DWORD*) (pdata + rp);
6249 fprintf(gRpLog,
"%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->
name, rp, total_size,
6250 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6256 assert(total_size > 0);
6260 if (rp >= pheader->
size) {
6261 rp -= pheader->
size;
6278 if (pevent->
data_size <= 0 || total_size <= 0 || total_size > pheader->
size) {
6280 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6294 if (rp < pheader->write_pointer) {
6297 remaining = pheader->
size - rp;
6303 if (total_size > remaining) {
6305 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6326 const char *pdata = (
const char *) (pheader + 1);
6336 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->
name,
6343 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->
name,
6349 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->
name,
6358 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6359 pheader->
name, rp, rp0);
6363 int rp1 =
bm_next_rp(
"bm_validate_buffer_locked", pheader, pdata, rp);
6366 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->
name, rp, rp0);
6385 get_all = (get_all || xget_all);
6389 int rp =
c->read_pointer;
6393 int rp1 =
bm_next_rp(
"bm_validate_buffer_locked", pheader, pdata, rp);
6396 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6397 pheader->
name,
c->name,
c->read_pointer, rp, rp0);
6498 double buf_size = pheader->
size;
6502 double buf_fill = 0;
6503 double buf_cptr = 0;
6504 double buf_cused = 0;
6505 double buf_cused_pct = 0;
6507 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6510 if (buf_wptr == buf_cptr) {
6512 }
else if (buf_wptr > buf_cptr) {
6513 buf_cused = buf_wptr - buf_cptr;
6515 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6518 buf_cused_pct = buf_cused / buf_size * 100.0;
6527 if (buf_wptr == buf_rptr) {
6529 }
else if (buf_wptr > buf_rptr) {
6530 buf_fill = buf_wptr - buf_rptr;
6532 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6535 double buf_fill_pct = buf_fill / buf_size * 100.0;
6585 sprintf(
str,
"writes_blocked_by/%s/count_write_wait", pheader->
client[
i].
name);
6588 sprintf(
str,
"writes_blocked_by/%s/time_write_wait", pheader->
client[
i].
name);
6614 if ((strlen(
buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6617 cm_msg(
MERROR,
"bm_write_buffer_statistics_to_odb",
"Invalid empty buffer name \"%s\" or client name \"%s\"",
buffer_name.c_str(), client_name.c_str());
6634 size_t sbuffer_handle = buffer_handle;
6642 if (buffer_handle >=1 && sbuffer_handle <= nbuf) {
6648 if (sbuffer_handle > nbuf || buffer_handle <= 0) {
6650 cm_msg(
MERROR, who,
"invalid buffer handle %d: out of range [1..%d]", buffer_handle, (
int)nbuf);
6658 cm_msg(
MERROR, who,
"invalid buffer handle %d: empty slot", buffer_handle);
6666 cm_msg(
MERROR, who,
"invalid buffer handle %d: not attached", buffer_handle);
6742 int size =
sizeof(
INT);
6746 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6753#ifdef LOCAL_ROUTINES
6758 const int max_buffer_size = 2 * 1000 * 1024 * 1024;
6763 cm_msg(
MERROR,
"bm_open_buffer",
"cannot open buffer with zero name");
6780 std::string odb_path;
6781 odb_path +=
"/Experiment/Buffer sizes/";
6784 int size =
sizeof(
INT);
6787 if (buffer_size <= 0 || buffer_size > max_buffer_size) {
6789 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6790 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6800 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6810 *buffer_handle =
i + 1;
6819 static std::mutex gNewBufferMutex;
6820 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6830 *buffer_handle =
i + 1;
6903 pheader->
size = buffer_size;
6913 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"",
buffer_name,
6924 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6935 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6943 if (pheader->
size != buffer_size) {
6944 cm_msg(
MINFO,
"bm_open_buffer",
"Buffer \"%s\" requested size %d differs from existing size %d",
6947 buffer_size = pheader->
size;
6979 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...",
buffer_name,
7018 mstrlcpy(pclient->
name, client_name.c_str(),
sizeof(pclient->
name));
7049 *buffer_handle =
i+1;
7054 *buffer_handle =
gBuffers.size() + 1;
7091 *buffer_handle =
i + 1;
7112#ifdef LOCAL_ROUTINES
7127 std::vector<EventRequest> request_list_copy =
_request_list;
7129 for (
size_t i = 0;
i < request_list_copy.size();
i++) {
7130 if (request_list_copy[
i].buffer_handle == buffer_handle) {
7257#ifdef LOCAL_ROUTINES
7265 for (
size_t i = nbuf;
i > 0;
i--) {
7291#ifdef LOCAL_ROUTINES
7303 std::vector<BUFFER*> mybuffers;
7309 for (
BUFFER* pbuf : mybuffers) {
7310 if (!pbuf || !pbuf->attached)
7329#ifdef LOCAL_ROUTINES
7348 for (
i = 0;
i < 20;
i++) {
7370#ifdef LOCAL_ROUTINES
7385#ifdef LOCAL_ROUTINES
7442 size =
sizeof(client_name);
7448 client_name[strlen(
name)] = 0;
7454 size =
sizeof(port);
7457 size =
sizeof(remote_host);
7467 int client_pid = atoi(
key.
name);
7469 cm_msg(
MERROR,
"cm_shutdown",
"Cannot connect to client \'%s\' on host \'%s\', port %d",
7470 client_name, remote_host, port);
7472 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7474 kill(client_pid, SIGKILL);
7478 cm_msg(
MERROR,
"cm_shutdown",
"Cannot delete client info for client \'%s\', pid %d, status %d",
7493 int client_pid = atoi(
key.
name);
7495 cm_msg(
MERROR,
"cm_shutdown",
"Client \'%s\' not responding to shutdown command", client_name);
7497 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7499 kill(client_pid, SIGKILL);
7503 "Cannot delete client info for client \'%s\', pid %d, status %d",
name, client_pid,
7518 return return_status;
7557 size =
sizeof(client_name);
7571 client_name[strlen(
name)] = 0;
7624#ifdef LOCAL_ROUTINES
7629 std::vector<BUFFER*> mybuffers;
7636 for (
BUFFER* pbuf : mybuffers) {
7639 if (pbuf->attached) {
7655 if (
j != pbuf->client_index && pbclient->
pid &&
7656 (client_name == NULL || client_name[0] == 0
7657 || strncmp(pbclient->
name, client_name, strlen(client_name)) == 0)) {
7671 "Client \'%s\' on \'%s\' removed by cm_cleanup (idle %1.1lfs, timeout %1.0lfs)",
7694 db_cleanup2(client_name, ignore_timeout, now,
"cm_cleanup");
7721 const char *s =
str;
7726 std::string envname;
7733 const char *
e = getenv(envname.c_str());
7754 printf(
"test_expand_env: [%s] -> [%s] expected [%s]",
7758 if (s != expected) {
7759 printf(
", MISMATCH!\n");
7768 printf(
"Test expand_end()\n");
7769 setenv(
"FOO",
"foo", 1);
7770 setenv(
"BAR",
"bar", 1);
7771 setenv(
"EMPTY",
"", 1);
7787 printf(
"test_expand_env: all tests passed!\n");
7789 printf(
"test_expand_env: test FAILED!\n");
7798#ifndef DOXYGEN_SHOULD_SKIP_THIS
7827#ifdef LOCAL_ROUTINES
7871#ifdef LOCAL_ROUTINES
7891 *n_bytes += pheader->
size;
7911#ifdef LOCAL_ROUTINES
7919 fprintf(stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7920 cm_msg(
MERROR,
"bm_lock_buffer_read_cache",
"Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7927 fprintf(stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
7940 fprintf(stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7941 cm_msg(
MERROR,
"bm_lock_buffer_write_cache",
"Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7948 fprintf(stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
7963 fprintf(stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7964 cm_msg(
MERROR,
"bm_lock_buffer_mutex",
"Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7971 fprintf(stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
8004 fprintf(stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 1 second!\n", pbuf->
buffer_name);
8009 fprintf(stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 10 seconds, buffer semaphore is probably stuck, delete %s.SHM and try again!\n", pbuf->
buffer_name, pbuf->
buffer_name);
8020 fprintf(stderr,
"bm_lock_buffer: Error: Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...\n", pbuf->
buffer_name,
status);
8058 printf(
"unlock [??????] unused1 ????? pid %d\n", getpid());
8096#ifdef LOCAL_ROUTINES
8156#ifdef LOCAL_ROUTINES
8175 if (write_size > 0) {
8184 if (write_size > max_write_size) {
8185 size_t new_write_size = max_write_size;
8186 cm_msg(
MERROR,
"bm_set_cache_size",
"requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->
buffer_name, pbuf->
buffer_header->
size, new_write_size);
8187 write_size = new_write_size;
8205 if (read_size > 0) {
8206 pbuf->
read_cache = (
char *) malloc(read_size);
8212 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->
buffer_name, read_size);
8241 if (write_size > 0) {
8248 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->
buffer_name, write_size);
8304 static std::mutex mutex;
8311 std::lock_guard<std::mutex> lock(mutex);
8321#ifndef DOXYGEN_SHOULD_SKIP_THIS
8373#ifdef LOCAL_ROUTINES
8389 if (func == NULL && pbuf->
callback) {
8391 cm_msg(
MERROR,
"bm_add_event_request",
"mixing callback/non callback requests not possible");
8398 cm_msg(
MERROR,
"bm_add_event_request",
"GET_RECENT request not possible if read cache is enabled");
8477 INT sampling_type,
HNDLE *request_id,
8480 assert(request_id != NULL);
8532#ifdef LOCAL_ROUTINES
8598 if (request_id < 0 ||
size_t(request_id) >=
_request_list.size()) {
8603 int buffer_handle =
_request_list[request_id].buffer_handle;
8619 pclient = pheader->
client;
8621 printf(
"buffer \'%s\', rptr: %d, wptr: %d, size: %d\n", pheader->
name, pheader->
read_pointer,
8624 if (pclient[
i].pid) {
8625 printf(
"pointers: client %d \'%s\', rptr %d\n",
i, pclient[
i].
name, pclient[
i].read_pointer);
8640 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8649 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8660 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8669 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8678 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8694 if (pclient[
i].pid) {
8695 bm_validate_client_pointers(pheader, &pclient[
i]);
8731 assert(caller_name);
8743 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8774 min_rp += pheader->
size;
8776 assert(min_rp >= 0);
8777 assert(min_rp < pheader->size);
8784 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8799 int have_get_all_requests = 0;
8806 if (!have_get_all_requests)
8816 if (free_space <= 0)
8817 free_space += pheader->
size;
8819 if (free_space >= pheader->
size * 0.5) {
8839 for (
size_t i = 0;
i <
n;
i++) {
8856 r.
dispatcher(buffer_handle,
i, pevent, (
void *) (pevent + 1));
8863#ifdef LOCAL_ROUTINES
8889 *ptotal_size = total_size;
8918 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->
name, pc->
name, pc->
read_pointer, pheader->
read_pointer, pheader->
write_pointer, pheader->
size);
8922 char *pdata = (
char *) (pheader + 1);
8928 if ((total_size <= 0) || (total_size > pheader->
size)) {
8929 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->
name, pc->
name, pc->
read_pointer, pevent->
data_size,
event_size, total_size, pheader->
size, pheader->
read_pointer, pheader->
write_pointer);
8933 assert(total_size > 0);
8934 assert(total_size <= pheader->size);
8941 *ptotal_size = total_size;
8948 const char *pdata = (
const char *) (pheader + 1);
8950 if (rp + event_size <= pheader->size) {
8955 int size = pheader->
size - rp;
8956 memcpy(buf, pdata + rp, size);
8957 memcpy(buf + size, pdata,
event_size - size);
8963 const char *pdata = (
const char *) (pheader + 1);
8965 if (rp + event_size <= pheader->size) {
8967 vecptr->assign(pdata + rp, pdata + rp +
event_size);
8970 int size = pheader->
size - rp;
8971 vecptr->assign(pdata + rp, pdata + rp + size);
8972 vecptr->insert(vecptr->end(), pdata, pdata +
event_size - size);
8982 if (prequest->
valid) {
8992 is_requested =
TRUE;
8997 return is_requested;
9081 if (convert_flags) {
9103 char *pdata = (
char *) (pheader + 1);
9110 requested_space += 100;
9112 if (requested_space >= pheader->
size)
9116 DWORD time_end = time_start + timeout_msec;
9120 int blocking_client_index = -1;
9122 blocking_client_name[0] = 0;
9130 free += pheader->
size;
9134 if (requested_space < free) {
9162 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9177 printf(
"bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->
read_pointer, pheader->
write_pointer, free, pheader->
size, requested_space,
event_size, total_size);
9180 if (pevent->
data_size <= 0 || total_size <= 0 || total_size > pheader->
size) {
9182 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9195 int blocking_client = -1;
9225 blocking_client =
i;
9234 if (blocking_client >= 0) {
9235 blocking_client_index = blocking_client;
9236 mstrlcpy(blocking_client_name, pheader->
client[blocking_client].
name,
sizeof(blocking_client_name));
9253 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9285 int sleep_time_msec = 1000;
9287 if (timeout_msec ==
BM_WAIT) {
9294 if (now >= time_end) {
9299 sleep_time_msec = time_end - now;
9301 if (sleep_time_msec <= 0) {
9302 sleep_time_msec = 10;
9303 }
else if (sleep_time_msec > 1000) {
9304 sleep_time_msec = 1000;
9314 if (unlock_write_cache)
9363 if (unlock_write_cache) {
9372 if (!pbuf_guard.
relock()) {
9373 if (unlock_write_cache) {
9424 DWORD time_wait = time_start + timeout_msec;
9425 DWORD sleep_time = 1000;
9428 }
else if (timeout_msec ==
BM_WAIT) {
9431 if (sleep_time > (
DWORD)timeout_msec)
9432 sleep_time = timeout_msec;
9453 if (unlock_read_cache)
9460 }
else if (timeout_msec ==
BM_WAIT) {
9465 if (now >= time_wait) {
9468 sleep_time = time_wait - now;
9469 if (sleep_time > 1000)
9477 if (unlock_read_cache) {
9485 if (!pbuf_guard.
relock()) {
9486 if (unlock_read_cache) {
9516 char *pdata = (
char *) (pheader + 1);
9524 for (
int i=0;
i<sg_n;
i++) {
9526 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9554 for (;
i<sg_n;
i++) {
9555 if (
count + sg_len[
i] > size)
9557 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9566 size_t first = size -
count;
9567 size_t second = sg_len[
i] - first;
9568 assert(first + second == sg_len[
i]);
9569 assert(
count + first == size);
9573 memcpy(wptr, sg_ptr[
i], first);
9576 memcpy(wptr, sg_ptr[
i] + first, second);
9583 for (;
i<sg_n;
i++) {
9584 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9605 return prequest->
id;
9614 if (request_id >= 0) {
9618 sprintf(
str,
"B %s %d", pheader->
name, request_id);
9635 DWORD time_end = time_start + timeout_msec;
9637 int xtimeout_msec = timeout_msec;
9640 if (timeout_msec ==
BM_WAIT) {
9641 xtimeout_msec = 1000;
9645 if (xtimeout_msec > 1000) {
9646 xtimeout_msec = 1000;
9655 if (timeout_msec ==
BM_WAIT) {
9663 if (now >= time_end) {
9668 DWORD remain = time_end - now;
9670 if (remain < xtimeout_msec) {
9671 xtimeout_msec = remain;
9690 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
9693 if (data_size == 0) {
9694 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9698 if (data_size > MAX_DATA_SIZE) {
9699 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9717 const char* cptr =
event.data();
9718 size_t clen =
event.size();
9722int bm_send_event_vec(
int buffer_handle,
const std::vector<std::vector<char>>& event,
int timeout_msec)
9724 int sg_n =
event.size();
9725 const char* sg_ptr[sg_n];
9726 size_t sg_len[sg_n];
9727 for (
int i=0;
i<sg_n;
i++) {
9728 sg_ptr[
i] =
event[
i].data();
9729 sg_len[
i] =
event[
i].size();
9731 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9734#ifdef LOCAL_ROUTINES
9788int bm_send_event_sg(
int buffer_handle,
int sg_n,
const char*
const sg_ptr[],
const size_t sg_len[],
int timeout_msec)
9794 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_n %d", sg_n);
9798 if (sg_ptr[0] == NULL) {
9799 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_ptr[0] is NULL");
9804 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)sg_len[0], (
int)
sizeof(
EVENT_HEADER));
9810 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
9813 if (data_size == 0) {
9814 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9818 if (data_size > MAX_DATA_SIZE) {
9819 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9826 for (
int i=0;
i<sg_n;
i++) {
9831 cm_msg(
MERROR,
"bm_send_event",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
9837#ifdef LOCAL_ROUTINES
9886 cm_msg(
MERROR,
"bm_send_event",
"write cache size is bigger than buffer size");
9895 if (!too_big && pbuf->
write_cache_wp + total_size <= pbuf->write_cache_size) {
9900 for (
int i=0;
i<sg_n;
i++) {
9901 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9931 printf(
"bm_send_event: corrupted 111!\n");
9937 if (total_size >= (
size_t)pheader->
size) {
9939 cm_msg(
MERROR,
"bm_send_event",
"total event size (%d) larger than size (%d) of buffer \'%s\'", (
int)total_size, pheader->
size, pheader->
name);
9953 printf(
"bm_send_event: corrupted 222!\n");
9982 printf(
"bm_send_event: corrupted 333!\n");
10002 DWORD time_end = time_start + timeout_msec;
10003 DWORD time_bombout = time_end;
10005 if (timeout_msec < 10000)
10006 time_bombout = time_start + 10000;
10008 int xtimeout_msec = timeout_msec;
10011 if (timeout_msec ==
BM_WAIT) {
10012 xtimeout_msec = 1000;
10016 if (xtimeout_msec > 1000) {
10017 xtimeout_msec = 1000;
10026 if (timeout_msec ==
BM_WAIT) {
10028 if (now >= time_bombout) {
10040 if (now >= time_end) {
10045 DWORD remain = time_end - now;
10047 if (remain < (
DWORD)xtimeout_msec) {
10048 xtimeout_msec = remain;
10051 if (now >= time_bombout) {
10087#ifdef LOCAL_ROUTINES
10106 request_id[
i] = -1;
10116 if (ask_rp == ask_wp) {
10120 assert(ask_rp < ask_wp);
10122 size_t ask_free =
ALIGN8(ask_wp - ask_rp);
10124 if (ask_free == 0) {
10131 printf(
"bm_flush_cache: corrupted 111!\n");
10168 printf(
"bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10181 assert(total_size <= (
size_t)pheader->
size);
10238#ifdef LOCAL_ROUTINES
10287#ifdef LOCAL_ROUTINES
10294 max_size = *buf_size;
10316 if (!pbuf_guard.
relock()) {
10357 if (convert_flags) {
10360 }
else if (bufptr) {
10364 }
else if (vecptr) {
10366 char* cptr = (
char*)pevent;
10387 if (!pbuf_guard.
relock())
10424 if (is_requested) {
10432 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10444 if (convert_flags) {
10450 }
else if (dispatch || bufptr) {
10456 }
else if (vecptr) {
10519 xbuf_size = *buf_size;
10520 }
else if (ppevent) {
10525 xbuf = pvec->data();
10526 xbuf_size = pvec->size();
10528 assert(!
"incorrect call to bm_receivent_event_rpc()");
10533 DWORD time_end = time_start + timeout_msec;
10535 int xtimeout_msec = timeout_msec;
10537 int zbuf_size = xbuf_size;
10540 if (timeout_msec ==
BM_WAIT) {
10541 xtimeout_msec = 1000;
10545 if (xtimeout_msec > 1000) {
10546 xtimeout_msec = 1000;
10550 zbuf_size = xbuf_size;
10557 if (timeout_msec ==
BM_WAIT) {
10565 if (now >= time_end) {
10570 DWORD remain = time_end - now;
10572 if (remain < (
DWORD)xtimeout_msec) {
10573 xtimeout_msec = remain;
10588 }
else if (ppevent) {
10594 assert(!
"incorrect call to bm_receivent_event_rpc()");
10603 *buf_size = zbuf_size;
10604 }
else if (ppevent) {
10608 pvec->resize(zbuf_size);
10610 assert(!
"incorrect call to bm_receivent_event_rpc()");
10680#ifdef LOCAL_ROUTINES
10760#ifdef LOCAL_ROUTINES
10771 return bm_read_buffer(pbuf, buffer_handle, (
void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags,
FALSE);
10838#ifdef LOCAL_ROUTINES
10849 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags,
FALSE);
10856#ifdef LOCAL_ROUTINES
10903#ifdef LOCAL_ROUTINES
10919#ifdef LOCAL_ROUTINES
10945 std::vector<BUFFER*> mybuffers;
10951 for (
size_t i = 0;
i < mybuffers.size();
i++) {
10980#ifdef LOCAL_ROUTINES
10996 std::vector<BUFFER*> mybuffers;
11003 for (
size_t idx = 0; idx < mybuffers.size(); idx++) {
11004 BUFFER* pbuf = mybuffers[idx];
11137 if (convert_flags) {
11172 std::vector<char>
vec;
11176 bool locked =
true;
11178 for (
size_t i = 0;
i <
n;
i++) {
11202 dispatched_something =
TRUE;
11211 cm_msg(
MERROR,
"bm_poll_event",
"received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (
int)
vec.size());
11234 if (dispatched_something)
11269#ifdef LOCAL_ROUTINES
11271 std::vector<BUFFER*> mybuffers;
11278 for (
BUFFER* pbuf : mybuffers) {
11281 if (!pbuf->attached)
11295#ifndef DOXYGEN_SHOULD_SKIP_THIS
11297#define MAX_DEFRAG_EVENTS 10
11353 "Received new event with ID %d while old fragments were not completed",
11354 (pevent->event_id & 0x0FFF));
11364 "Not enough defragment buffers, please increase MAX_DEFRAG_EVENTS and recompile");
11371 "Received first event fragment with %d bytes instead of %d bytes, event ignored",
11384 cm_msg(
MERROR,
"bm_defragement_event",
"Not enough memory to allocate event defragment buffer");
11406 "Received fragment without first fragment (ID %d) Ser#:%d",
11417 "Received fragments with more data (%d) than event size (%d)",
11470 printf(
"index %d, client \"%s\", host \"%s\", port %d, socket %d, connected %d, timeout %d",
11609 *convert_flags = 0;
11637 unsigned short int lo, hi;
11640 lo = *((
short int *) (var) + 1);
11641 hi = *((
short int *) (var));
11647 *((
short int *) (var) + 1) = hi;
11648 *((
short int *) (var)) = lo;
11652 unsigned short int lo, hi;
11655 lo = *((
short int *) (var) + 1);
11656 hi = *((
short int *) (var));
11662 *((
short int *) (var) + 1) = hi;
11663 *((
short int *) (var)) = lo;
11668 unsigned short int i1, i2, i3, i4;
11671 i1 = *((
short int *) (var) + 3);
11672 i2 = *((
short int *) (var) + 2);
11673 i3 = *((
short int *) (var) + 1);
11674 i4 = *((
short int *) (var));
11680 *((
short int *) (var) + 3) = i4;
11681 *((
short int *) (var) + 2) = i3;
11682 *((
short int *) (var) + 1) = i2;
11683 *((
short int *) (var)) = i1;
11687 unsigned short int i1, i2, i3, i4;
11690 i1 = *((
short int *) (var) + 3);
11691 i2 = *((
short int *) (var) + 2);
11692 i3 = *((
short int *) (var) + 1);
11693 i4 = *((
short int *) (var));
11699 *((
short int *) (var) + 3) = i4;
11700 *((
short int *) (var) + 2) = i3;
11701 *((
short int *) (var) + 1) = i2;
11702 *((
short int *) (var)) = i1;
11764 if (single_size == 0)
11767 int n = total_size / single_size;
11769 for (
int i = 0;
i <
n;
i++) {
11770 char* p = (
char *)
data + (
i * single_size);
11793 return "<unknown>";
11800 return "<unknown>";
11854 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11859 cm_msg(
MERROR,
"rpc_register_functions",
"registered RPC function with invalid ID %d", new_list[
i].
id);
11866 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11875 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11879 if (
e.dispatch == NULL) {
11892#ifndef DOXYGEN_SHOULD_SKIP_THIS
11980 char net_buffer[256];
11982 int n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
11997 struct timeval timeout;
12004 FD_SET(sock, &readfds);
12006 timeout.tv_sec = 0;
12007 timeout.tv_usec = 0;
12009 select(
FD_SETSIZE, &readfds, NULL, NULL, &timeout);
12011 if (FD_ISSET(sock, &readfds)) {
12012 n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12026 }
while (FD_ISSET(sock, &readfds));
12062 bool debug =
false;
12066 cm_msg(
MERROR,
"rpc_client_connect",
"cm_connect_experiment/rpc_set_name not called");
12072 cm_msg(
MERROR,
"rpc_client_connect",
"invalid port %d", port);
12078 static std::mutex gHostnameMutex;
12084 printf(
"rpc_client_connect: host \"%s\", port %d, client \"%s\"\n",
host_name, port, client_name);
12087 printf(
"client connection %d: ", (
int)
i);
12100 bool hostname_locked =
false;
12105 if (
c &&
c->connected) {
12107 if (!hostname_locked) {
12108 gHostnameMutex.lock();
12109 hostname_locked =
true;
12112 if ((
c->host_name ==
host_name) && (
c->port == port)) {
12116 gHostnameMutex.unlock();
12117 hostname_locked =
false;
12118 std::lock_guard<std::mutex> cguard(
c->mutex);
12120 if (
c->connected) {
12125 *hConnection =
c->index;
12127 printf(
"already connected: ");
12143 if (hostname_locked) {
12144 gHostnameMutex.unlock();
12145 hostname_locked =
false;
12151 static int last_reused = 0;
12154 for (
int j = 1;
j < size;
j++) {
12155 int i = (last_reused +
j) % size;
12159 printf(
"last reused %d, reusing slot %d: ", last_reused, (
int)
i);
12178 printf(
"new connection appended to array: ");
12185 c->connected =
true;
12194 std::string errmsg;
12199 cm_msg(
MERROR,
"rpc_client_connect",
"cannot connect to \"%s\" port %d: %s",
host_name, port, errmsg.c_str());
12204 gHostnameMutex.lock();
12209 gHostnameMutex.unlock();
12211 c->client_name = client_name;
12216 setsockopt(
c->send_sock, IPPROTO_TCP, TCP_NODELAY, (
char *) &
i,
sizeof(
i));
12224 std::string cstr =
msprintf(
"%d %s %s %s", hw_type,
cm_get_version(), local_prog_name.c_str(), local_host_name.c_str());
12226 int size = cstr.length() + 1;
12227 i = send(
c->send_sock, cstr.c_str(), size, 0);
12228 if (
i < 0 ||
i != size) {
12229 cm_msg(
MERROR,
"rpc_client_connect",
"cannot send %d bytes, send() returned %d, errno %d (%s)", size,
i, errno, strerror(errno));
12234 bool restore_watchdog_timeout =
false;
12235 BOOL watchdog_call;
12236 DWORD watchdog_timeout;
12242 restore_watchdog_timeout =
true;
12251 if (restore_watchdog_timeout) {
12256 cm_msg(
MERROR,
"rpc_client_connect",
"timeout waiting for server reply");
12262 int remote_hw_type = 0;
12263 char remote_version[32];
12264 remote_version[0] = 0;
12265 sscanf(
str,
"%d %s", &remote_hw_type, remote_version);
12267 c->remote_hw_type = remote_hw_type;
12271 mstrlcpy(v1, remote_version,
sizeof(v1));
12272 if (strchr(v1,
'.'))
12273 if (strchr(strchr(v1,
'.') + 1,
'.'))
12274 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
12277 if (strchr(
str,
'.'))
12278 if (strchr(strchr(
str,
'.') + 1,
'.'))
12279 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
12281 if (strcmp(v1,
str) != 0) {
12282 cm_msg(
MERROR,
"rpc_client_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", remote_version,
cm_get_version());
12285 c->connected =
true;
12287 *hConnection =
c->index;
12305 for (
i = 0;
i < MAX_RPC_CONNECTION;
i++)
12306 if (_client_connection[
i].send_sock != 0)
12307 printf(
"slot %d, checking client %s socket %d, connected %d\n",
i, _client_connection[
i].client_name, _client_connection[
i].send_sock, _client_connection[
i].connected);
12315 if (
c &&
c->connected) {
12316 std::lock_guard<std::mutex> cguard(
c->mutex);
12318 if (!
c->connected) {
12331 FD_SET(
c->send_sock, &readfds);
12333 struct timeval timeout;
12334 timeout.tv_sec = 0;
12335 timeout.tv_usec = 0;
12344 }
while (
status == -1 && errno == EINTR);
12347 if (!FD_ISSET(
c->send_sock, &readfds)) {
12354 status = recv(
c->send_sock, (
char *) buffer,
sizeof(buffer), MSG_PEEK);
12359 if (errno == EAGAIN) {
12366 "RPC client connection to \"%s\" on host \"%s\" is broken, recv() errno %d (%s)",
12367 c->client_name.c_str(),
12368 c->host_name.c_str(),
12369 errno, strerror(errno));
12372 }
else if (
status == 0) {
12377 cm_msg(
MINFO,
"rpc_client_check",
"RPC client connection to \"%s\" on host \"%s\" unexpectedly closed",
c->client_name.c_str(),
c->host_name.c_str());
12436 INT remote_hw_type, hw_type;
12437 char str[200], version[32], v1[32];
12439 struct timeval timeout;
12448 if (WSAStartup(MAKEWORD(1, 1), &WSAData) != 0)
12462 cm_msg(
MERROR,
"rpc_server_connect",
"cm_connect_experiment/rpc_set_name not called");
12474 bool listen_localhost =
false;
12476 if (strcmp(
host_name,
"localhost") == 0)
12477 listen_localhost =
true;
12479 int lsock1, lport1;
12480 int lsock2, lport2;
12481 int lsock3, lport3;
12483 std::string errmsg;
12488 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12495 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12502 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12508 s = strchr(
str,
':');
12511 port = strtoul(s + 1, NULL, 0);
12519 cm_msg(
MERROR,
"rpc_server_connect",
"cannot connect to mserver on host \"%s\" port %d: %s",
str, port, errmsg.c_str());
12525 sprintf(
str,
"C %d %d %d %s Default", lport1, lport2, lport3,
cm_get_version());
12529 send(sock,
str, strlen(
str) + 1, 0);
12533 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive status from server");
12537 status = version[0] = 0;
12538 sscanf(
str,
"%d %s", &
status, version);
12546 strcpy(v1, version);
12547 if (strchr(v1,
'.'))
12548 if (strchr(strchr(v1,
'.') + 1,
'.'))
12549 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
12552 if (strchr(
str,
'.'))
12553 if (strchr(strchr(
str,
'.') + 1,
'.'))
12554 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
12556 if (strcmp(v1,
str) != 0) {
12557 cm_msg(
MERROR,
"rpc_server_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", version,
12563 FD_SET(lsock1, &readfds);
12564 FD_SET(lsock2, &readfds);
12565 FD_SET(lsock3, &readfds);
12568 timeout.tv_usec = 0;
12579 if (!FD_ISSET(lsock1, &readfds)) {
12580 cm_msg(
MERROR,
"rpc_server_connect",
"mserver subprocess could not be started (check path)");
12592 cm_msg(
MERROR,
"rpc_server_connect",
"accept() failed");
12606 flag = 2 * 1024 * 1024;
12609 cm_msg(
MERROR,
"rpc_server_connect",
"cannot setsockopt(SOL_SOCKET, SO_SNDBUF), errno %d (%s)", errno, strerror(errno));
12614 sprintf(
str,
"%d %s", hw_type, local_prog_name.c_str());
12621 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive remote computer info");
12625 sscanf(
str,
"%d", &remote_hw_type);
12642 if (
c &&
c->connected) {
12645 if (!
c->connected) {
12665 if (
c &&
c->connected) {
12682 if (!
c->connected) {
12751 static int rpc_server_disconnect_recursion_level = 0;
12753 if (rpc_server_disconnect_recursion_level)
12756 rpc_server_disconnect_recursion_level = 1;
12781 rpc_server_disconnect_recursion_level = 0;
12873 INT tmp_type, size;
12891 dummy = 0x12345678;
12892 p = (
unsigned char *) &dummy;
12895 else if (*p == 0x12)
12898 cm_msg(
MERROR,
"rpc_get_option",
"unknown byte order format");
12901 f = (float) 1.2345;
12903 memcpy(&dummy, &f,
sizeof(f));
12904 if ((dummy & 0xFF) == 0x19 &&
12905 ((dummy >> 8) & 0xFF) == 0x04 && ((dummy >> 16) & 0xFF) == 0x9E
12906 && ((dummy >> 24) & 0xFF) == 0x3F)
12908 else if ((dummy & 0xFF) == 0x9E &&
12909 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x19
12910 && ((dummy >> 24) & 0xFF) == 0x04)
12913 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12915 d = (double) 1.2345;
12917 memcpy(&dummy, &
d,
sizeof(f));
12918 if ((dummy & 0xFF) == 0x8D &&
12919 ((dummy >> 8) & 0xFF) == 0x97 && ((dummy >> 16) & 0xFF) == 0x6E
12920 && ((dummy >> 24) & 0xFF) == 0x12)
12922 else if ((dummy & 0xFF) == 0x83 &&
12923 ((dummy >> 8) & 0xFF) == 0xC0 && ((dummy >> 16) & 0xFF) == 0xF3
12924 && ((dummy >> 24) & 0xFF) == 0x3F)
12926 else if ((dummy & 0xFF) == 0x13 &&
12927 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x83
12928 && ((dummy >> 24) & 0xFF) == 0xC0)
12930 else if ((dummy & 0xFF) == 0x9E &&
12931 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x18
12932 && ((dummy >> 24) & 0xFF) == 0x04)
12934 "MIDAS cannot handle VAX D FLOAT format. Please compile with the /g_float flag");
12936 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12960 else if (hConn == -2)
12977 setsockopt(
c->send_sock, IPPROTO_TCP, TCP_NODELAY, (
char *) &
value,
sizeof(
value));
13007 int timeout =
c->rpc_timeout;
13028 if (old_timeout_msec)
13032 if (old_timeout_msec)
13038 if (old_timeout_msec)
13039 *old_timeout_msec =
c->rpc_timeout;
13040 c->rpc_timeout = timeout_msec;
13043 if (old_timeout_msec)
13044 *old_timeout_msec = 0;
13052#ifndef DOXYGEN_SHOULD_SKIP_THIS
13205 va_start(argptr, format);
13206 vsprintf(
str, (
char *) format, argptr);
13219 switch (arg_type) {
13228 *((
int *) arg) = va_arg(*arg_ptr,
int);
13233 *((
INT *) arg) = va_arg(*arg_ptr,
INT);
13242 *((
float *) arg) = (float) va_arg(*arg_ptr,
double);
13246 *((
double *) arg) = va_arg(*arg_ptr,
double);
13250 *((
char **) arg) = va_arg(*arg_ptr,
char *);
13258 bool debug =
false;
13261 printf(
"encode rpc_id %d \"%s\"\n", rl.
id, rl.
name);
13266 printf(
"i=%d, tid %d, flags 0x%x, n %d\n",
i, tid, flags,
n);
13295 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13296 char* buf = (
char *)malloc(buf_size);
13304 char* param_ptr = (*nc)->param;
13328 char* arg = args[
i];
13351 arg_size = 1 + strlen((
char *) *((
char **) arg));
13364 const char* arg_tmp = args[
i+1];
13368 arg_size = *((
INT *) *((
void **) arg_tmp));
13370 arg_size = *((
INT *) arg_tmp);
13372 *((
INT *) param_ptr) =
ALIGN8(arg_size);
13382 int param_size =
ALIGN8(arg_size);
13385 size_t param_offset = (
char *) param_ptr - (
char *)(*nc);
13387 if (param_offset + param_size + 16 > buf_size) {
13388 size_t new_size = param_offset + param_size + 1024;
13390 buf = (
char *) realloc(buf, new_size);
13392 buf_size = new_size;
13394 param_ptr = buf + param_offset;
13400 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy pointer %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13402 memcpy(param_ptr, (
void *) *((
void **) arg), arg_size);
13405 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, double->float\n",
i, flags, tid, arg_type, arg_size, param_size);
13408 *((
float *) param_ptr) = (float) *((
double *) arg);
13411 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13413 memcpy(param_ptr, arg, arg_size);
13416 param_ptr += param_size;
13423 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n", rl.
id, rl.
name, (
int)buf_size, (*nc)->header.param_size);
13429 bool debug =
false;
13432 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n", rl.
id, rl.
name, (
int)buf_size);
13436 const char* param_ptr = buf;
13460 if (param_ptr == NULL) {
13461 cm_msg(
MERROR,
"rpc_call_decode",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL", rl.
name);
13469 arg_size = strlen((
char *) (param_ptr)) + 1;
13472 arg_size = *((
INT *) param_ptr);
13480 int param_size =
ALIGN8(arg_size);
13483 if (*((
char **) arg)) {
13485 printf(
"decode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13486 memcpy((
void *) *((
char **) arg), param_ptr, arg_size);
13489 param_ptr += param_size;
13526 cm_msg(
MERROR,
"rpc_client_call",
"invalid rpc connection handle %d", hConn);
13537 routine_id &= ~RPC_NO_REPLY;
13546 int rpc_index = -1;
13547 const char *rpc_name = NULL;
13552 if (
rpc_list[
i].
id == (
int) routine_id) {
13561 if (rpc_index < 0) {
13562 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" with invalid RPC ID %d",
c->client_name.c_str(),
c->host_name.c_str(), routine_id);
13571 va_start(ap, routine_id);
13585 if (rpc_no_reply) {
13586 i =
send_tcp(
c->send_sock, (
char *) nc, send_size, 0);
13588 if (
i != send_size) {
13589 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
13609 i =
send_tcp(
c->send_sock, (
char *) nc, send_size, 0);
13610 if (
i != send_size) {
13611 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
13619 bool restore_watchdog_timeout =
false;
13620 BOOL watchdog_call;
13621 DWORD watchdog_timeout;
13626 if (
c->rpc_timeout >= (
int) watchdog_timeout) {
13627 restore_watchdog_timeout =
true;
13631 DWORD rpc_status = 0;
13632 DWORD buf_size = 0;
13638 if (restore_watchdog_timeout) {
13643 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": timeout waiting for reply",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
13651 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, ss_recv_net_command() status %d",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name,
status);
13661 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, unknown RPC, status %d",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name, rpc_status);
13669 va_start(ap, routine_id);
13717 routine_id &= ~RPC_NO_REPLY;
13726 fprintf(stderr,
"rpc_call(routine_id=%d) failed, no connection to mserver.\n", routine_id);
13744 const char* rpc_name = NULL;
13750 if (
rpc_list[
i].
id == (
int) routine_id) {
13762 cm_msg(
MERROR,
"rpc_call",
"invalid rpc ID (%d)", routine_id);
13771 va_start(ap, routine_id);
13785 if (rpc_no_reply) {
13786 i =
send_tcp(send_sock, (
char *) nc, send_size, 0);
13788 if (
i != send_size) {
13790 cm_msg(
MERROR,
"rpc_call",
"rpc \"%s\" error: send_tcp() failed", rpc_name);
13801 i =
send_tcp(send_sock, (
char *) nc, send_size, 0);
13802 if (
i != send_size) {
13804 cm_msg(
MERROR,
"rpc_call",
"rpc \"%s\" error: send_tcp() failed", rpc_name);
13812 bool restore_watchdog_timeout =
false;
13813 BOOL watchdog_call;
13814 DWORD watchdog_timeout;
13824 if (rpc_timeout >= (
int) watchdog_timeout) {
13825 restore_watchdog_timeout =
true;
13830 DWORD rpc_status = 0;
13831 DWORD buf_size = 0;
13836 if (restore_watchdog_timeout) {
13847 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": timeout waiting for reply, program abort", rpc_name);
13855 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": error, ss_recv_net_command() status %d, program abort", rpc_name,
status);
13863 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": error, unknown RPC, status %d", rpc_name, rpc_status);
13871 va_start(ap, routine_id);
13931 return bm_send_event(buffer_handle, pevent, unused, async_flag);
13953 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_n %d", sg_n);
13957 if (sg_ptr[0] == NULL) {
13958 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_ptr[0] is NULL");
13963 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)sg_len[0], (
int)
sizeof(
EVENT_HEADER));
13969 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
13972 if (data_size == 0) {
13973 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size zero");
13977 if (data_size > MAX_DATA_SIZE) {
13978 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
13986 for (
int i=0;
i<sg_n;
i++) {
13991 cm_msg(
MERROR,
"rpc_send_event_sg",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
14017 assert(
sizeof(
DWORD) == 4);
14018 DWORD bh_buf = buffer_handle;
14023 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(buffer handle) failed, event socket is now closed");
14029 for (
int i=0;
i<sg_n;
i++) {
14033 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(event data) failed, event socket is now closed");
14040 if (
count < total_size) {
14041 char padding[8] = { 0,0,0,0,0,0,0,0 };
14042 size_t padlen = total_size -
count;
14043 assert(padlen < 8);
14047 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(padding) failed, event socket is now closed");
14113 for (
size_t i = 0;
i <
n;
i++) {
14136 cm_msg(
MERROR,
"rpc_transition_dispatch",
"no handler for transition %d with sequence number %d",
CINT(0),
CINT(4));
14139 cm_msg(
MERROR,
"rpc_transition_dispatch",
"received unrecognized command %d", idx);
14195void debug_dump(
unsigned char *p,
int size)
14200 for (
i = 0;
i < (size - 1) / 16 + 1;
i++) {
14201 printf(
"%p ", p +
i * 16);
14202 for (
j = 0;
j < 16;
j++)
14203 if (
i * 16 +
j < size)
14204 printf(
"%02X ", p[
i * 16 +
j]);
14209 for (
j = 0;
j < 16;
j++) {
14211 if (
i * 16 +
j < size)
14212 printf(
"%c", (
c >= 32 &&
c < 128) ? p[
i * 16 +
j] :
'.');
14251 char *buffer = NULL;
14275 int param_size = -1;
14284 if (param_size == -1) {
14304 char *p = (
char *) realloc(*pbuf, new_size);
14307 cm_msg(
MERROR,
"recv_net_command",
"cannot reallocate buffer from %d bytes to %d bytes", *pbufsize, new_size);
14313 *pbufsize = new_size;
14324 int size = write_ptr - read_ptr;
14327 memcpy(buffer + copied, net_buffer + read_ptr, size);
14329 read_ptr = write_ptr;
14333 write_ptr = recv(sock, net_buffer + misalign, sa->
net_buffer_size - 8, 0);
14336 }
while (write_ptr == -1 && errno == EINTR);
14338 write_ptr = recv(sock, net_buffer + misalign, sa->
net_buffer_size - 8, 0);
14342 if (write_ptr <= 0) {
14343 if (write_ptr == 0)
14346 cm_msg(
MERROR,
"recv_net_command",
"recv() returned %d, errno: %d (%s)", write_ptr, errno, strerror(errno));
14354 read_ptr = misalign;
14355 write_ptr += misalign;
14357 misalign = write_ptr % 8;
14362 memcpy(buffer + copied, net_buffer + read_ptr, size);
14367 if (write_ptr - read_ptr < param_size)
14370 *remaining = write_ptr - read_ptr;
14377 return size + copied;
14441 char header_buf[header_size];
14452 int hrd =
recv_tcp2(sock, header_buf, header_size, 1);
14461 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d", hrd);
14465 if (hrd < (
int) header_size) {
14466 int hrd1 =
recv_tcp2(sock, header_buf + hrd, header_size - hrd, 0);
14470 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(more header) returned %d", hrd1);
14478 if (hrd != (
int) header_size) {
14479 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d instead of %d", hrd, (
int) header_size);
14483 INT *pbh = (
INT *) header_buf;
14502 for (
int i=0;
i<5;
i++) {
14503 printf(
"recv_event_server: header[%d]: 0x%08x\n",
i, pbh[
i]);
14511 "received event header with invalid data_size %d: event_size %d, total_size %d", pevent->
data_size,
14519 int bufsize =
sizeof(
INT) + total_size;
14524 if (*pbuffer_size < bufsize) {
14525 int newsize = 1024 +
ALIGN8(bufsize);
14529 char *newbuf = (
char *) realloc(*pbuffer, newsize);
14530 if (newbuf == NULL) {
14531 cm_msg(
MERROR,
"recv_event_server",
"cannot realloc() event buffer from %d to %d bytes", *pbuffer_size,
14536 *pbuffer_size = newsize;
14541 memcpy(*pbuffer, header_buf, header_size);
14545 int to_read =
sizeof(
INT) + total_size - header_size;
14546 int rptr = header_size;
14549 int drd =
recv_tcp2(sock, (*pbuffer) + rptr, to_read, 0);
14553 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(data) returned %d instead of %d", drd, to_read);
14634 std::string errmsg;
14639 cm_msg(
MERROR,
"rpc_register_server",
"cannot listen to tcp port %d: %s", port, errmsg.c_str());
14644#if defined(F_SETFD) && defined(FD_CLOEXEC)
14645 status = fcntl(lsock, F_SETFD, fcntl(lsock, F_GETFD) | FD_CLOEXEC);
14647 cm_msg(
MERROR,
"rpc_register_server",
"fcntl(F_SETFD, FD_CLOEXEC) failed, errno %d (%s)", errno, strerror(errno));
14704 char *in_param_ptr, *out_param_ptr, *last_param_ptr;
14707 INT param_size, max_size;
14708 void *prpc_param[20];
14709 char debug_line[1024], *return_buffer;
14710 int return_buffer_size;
14711 int return_buffer_tls;
14715 int initial_buffer_size = 1024;
14738 return_buffer_tls =
i;
14741 assert(return_buffer);
14751 if (convert_flags) {
14783 cm_msg(
MERROR,
"rpc_execute",
"Invalid rpc ID (%d)", routine_id);
14789 in_param_ptr = nc_in->
param;
14792 out_param_ptr = nc_out->
param;
14794 sprintf(debug_line,
"%s(", rl.
name);
14804 param_size =
ALIGN8(1 + strlen((
char *) (in_param_ptr)));
14808 param_size = *((
INT *) in_param_ptr);
14811 param_size =
ALIGN8(param_size);
14819 prpc_param[
i] = in_param_ptr;
14822 if (convert_flags) {
14833 if (strlen(debug_line) +
str.length() + 2 <
sizeof(debug_line)) {
14834 strcat(debug_line,
"\"");
14835 strcat(debug_line,
str.c_str());
14836 strcat(debug_line,
"\"");
14838 strcat(debug_line,
"...");
14840 strcat(debug_line,
str.c_str());
14842 in_param_ptr += param_size;
14854 max_size = *((
INT *) in_param_ptr);
14858 max_size =
ALIGN8(max_size);
14860 *((
INT *) out_param_ptr) = max_size;
14866 param_size = max_size;
14872 if ((
POINTER_T) out_param_ptr - (
POINTER_T) nc_out + param_size > return_buffer_size) {
14875 "return parameters (%d) too large for network buffer (%d)",
14885 "rpc_execute: return parameters (%d) too large for network buffer (%d), new buffer size (%d)",
14886 (
int)((
POINTER_T) out_param_ptr - (
POINTER_T) nc_out + param_size), return_buffer_size, new_size);
14889 itls = return_buffer_tls;
14895 cm_msg(
MERROR,
"rpc_execute",
"Cannot allocate return buffer of size %d", new_size);
14901 assert(return_buffer);
14909 memcpy(out_param_ptr, prpc_param[
i], param_size);
14912 strcat(debug_line,
"-");
14914 prpc_param[
i] = out_param_ptr;
14915 out_param_ptr += param_size;
14919 strcat(debug_line,
", ");
14924 strcat(debug_line,
")");
14927 last_param_ptr = out_param_ptr;
14956 out_param_ptr = nc_out->
param;
14964 max_size = *((
INT *) out_param_ptr);
14969 const char* param_ptr = ((
char *) out_param_ptr) +
ALIGN8(
sizeof(
INT));
14971 param_size = strlen(param_ptr) + 1;
14972 param_size =
ALIGN8(param_size);
14975 memmove(out_param_ptr, out_param_ptr +
ALIGN8(
sizeof(
INT)), param_size);
14978 memmove(out_param_ptr + param_size,
14979 out_param_ptr + max_size +
ALIGN8(
sizeof(
INT)),
14985 max_size = *((
INT *) out_param_ptr);
14993 param_size = *((
INT *) prpc_param[
i + 1]);
14994 *((
INT *) out_param_ptr) = param_size;
15000 param_size =
ALIGN8(param_size);
15003 memmove(out_param_ptr + param_size,
15004 out_param_ptr + max_size,
15012 if (convert_flags) {
15022 out_param_ptr += param_size;
15033 if (convert_flags) {
15090 printf(
"rpc_test_rpc!\n");
15093 int int_inout = 456;
15095 char string_out[32];
15096 char string2_out[48];
15098 char string_inout[25];
15099 strcpy(string_inout,
"string_inout");
15103 struct_in.
type = 111;
15105 strcpy(struct_in.
name,
"name");
15111 struct_inout.
type = 111111;
15113 strcpy(struct_inout.
name,
"name_name");
15116 uint32_t dwordarray_inout[10];
15117 size_t dwordarray_inout_size =
sizeof(dwordarray_inout);
15119 for (
int i=0;
i<10;
i++) {
15120 dwordarray_inout[
i] =
i*10;
15125 for (
size_t i=0;
i<
sizeof(array_in);
i++) {
15126 array_in[
i] =
'a' +
i;
15129 char array_out[16];
15130 size_t array_out_size =
sizeof(array_out);
15137 string_out,
sizeof(string_out),
15138 string2_out,
sizeof(string2_out),
15139 string_inout,
sizeof(string_inout),
15143 dwordarray_inout, &dwordarray_inout_size,
15144 array_in,
sizeof(array_in),
15145 array_out, &array_out_size
15148 printf(
"rpc_call(RPC_TEST2) status %d\n",
status);
15150 if (int_out != 789) {
15151 printf(
"int_out mismatch!\n");
15155 if (int_inout != 456*2) {
15156 printf(
"int_inout mismatch!\n");
15160 if (strcmp(string_out,
"string_out") != 0) {
15161 printf(
"string_out mismatch [%s] vs [%s]\n", string_out,
"string_out");
15165 if (strcmp(string2_out,
"second string_out") != 0) {
15166 printf(
"string2_out mismatch [%s] vs [%s]\n", string2_out,
"second string_out");
15170 if (strcmp(string_inout,
"return string_inout") != 0) {
15171 printf(
"string_inout mismatch [%s] vs [%s]\n", string_inout,
"return string_inout");
15181 pkey = &struct_out;
15184 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
15188 pkey = &struct_inout;
15191 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
15196 if (dwordarray_inout_size != 4*5) {
15197 printf(
"dwordarray_inout_size mismatch %d vs %d\n", (
int)dwordarray_inout_size, 4*5);
15201 for (
size_t i=0;
i<dwordarray_inout_size/
sizeof(uint32_t);
i++) {
15202 printf(
"dwordarray_inout[%d] is %d\n", (
int)
i, dwordarray_inout[
i]);
15207 {RPC_TEST_CXX,
"test_cxx",
15306 if (strcmp(hostname,
"localhost") == 0)
15309 if (strcmp(hostname,
"localhost.localdomain") == 0)
15312 if (strcmp(hostname,
"localhost6") == 0)
15315 if (strcmp(hostname,
"ip6-localhost") == 0)
15323 if (h == hostname) {
15340 std::string hostname;
15352 static std::atomic_int max_report(10);
15353 if (max_report > 0) {
15355 if (max_report == 0) {
15356 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\', this message will no longer be reported", hostname.c_str());
15358 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\'. Add this host to \"/Experiment/Security/RPC hosts/Allowed hosts\"", hostname.c_str());
15392 INT port1, port2, port3;
15394 char net_buffer[256];
15395 struct linger ling;
15400 sock = accept(lsock, NULL, NULL);
15425 char command = (char) toupper(net_buffer[0]);
15439#ifdef LOCAL_ROUTINES
15442 for (
unsigned i=0;
i<exptab.
exptab.size();
i++) {
15444 const char*
str = exptab.
exptab[
i].name.c_str();
15445 send(sock,
str, strlen(
str) + 1, 0);
15447 send(sock,
"", 1, 0);
15458 port1 = port2 = version[0] = 0;
15465 port1 = strtoul(net_buffer + 2, &ptr, 0);
15466 port2 = strtoul(ptr, &ptr, 0);
15467 port3 = strtoul(ptr, &ptr, 0);
15469 while (*ptr ==
' ')
15473 for (; *ptr != 0 && *ptr !=
' ' &&
i < (int)
sizeof(version) - 1;)
15474 version[
i++] = *ptr++;
15477 assert(
i < (
int)
sizeof(version));
15481 for (; *ptr != 0 && *ptr !=
' ';)
15484 while (*ptr ==
' ')
15488 for (; *ptr != 0 && *ptr !=
' ' && *ptr !=
'\n' && *ptr !=
'\r' &&
i < (int)
sizeof(
experiment) - 1;)
15498 mstrlcpy(v1, version,
sizeof(v1));
15499 if (strchr(v1,
'.'))
15500 if (strchr(strchr(v1,
'.') + 1,
'.'))
15501 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
15505 if (strchr(
str,
'.'))
15506 if (strchr(strchr(
str,
'.') + 1,
'.'))
15507 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
15509 if (strcmp(v1,
str) != 0) {
15511 cm_msg(
MERROR,
"rpc_server_accept",
"received string: %s", net_buffer + 2);
15526#ifdef LOCAL_ROUTINES
15532 bool found =
false;
15538 for (idx = 0; idx < exptab.
exptab.size(); idx++) {
15551 send(sock,
"2", 2, 0);
15560 char host_port1_str[30], host_port2_str[30], host_port3_str[30];
15561 char debug_str[30];
15570 const char *argv[10];
15571 argv[0] = mserver_path;
15573 argv[2] = host_port1_str;
15574 argv[3] = host_port2_str;
15575 argv[4] = host_port3_str;
15576 argv[5] = debug_str;
15583 argv[0], argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], argv[7], argv[8],
15592 send(sock,
str, strlen(
str) + 1, 0);
15598 send(sock,
str, strlen(
str) + 1, 0);
15605 cm_msg(
MERROR,
"rpc_server_accept",
"received unknown command '%c' code %d", command, command);
15615 setsockopt(sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
15646 INT client_hw_type = 0, hw_type;
15647 std::string client_program;
15650 char net_buffer[256], *p;
15652 int sock = accept(lsock, NULL, NULL);
15668 client_program =
"(unknown)";
15671 i =
recv_string(sock, net_buffer,
sizeof(net_buffer), 10000);
15678 p = strtok(net_buffer,
" ");
15680 client_hw_type = atoi(p);
15681 p = strtok(NULL,
" ");
15685 p = strtok(NULL,
" ");
15688 client_program = p;
15689 p = strtok(NULL,
" ");
15693 p = strtok(NULL,
" ");
15714 status = send(sock,
str.c_str(),
str.length() + 1, 0);
15748 int recv_sock, send_sock, event_sock;
15750 std::string client_program;
15751 INT client_hw_type, hw_type;
15753 char net_buffer[256];
15761 std::string errmsg;
15795 flag = 2 * 1024 * 1024;
15796 status = setsockopt(event_sock, SOL_SOCKET, SO_RCVBUF, (
char *) &flag,
sizeof(
INT));
15798 cm_msg(
MERROR,
"rpc_server_callback",
"cannot setsockopt(SOL_SOCKET, SO_RCVBUF), errno %d (%s)", errno,
15803 cm_msg(
MERROR,
"rpc_server_callback",
"timeout on receive remote computer info");
15812 client_hw_type = strtoul(net_buffer, &p, 0);
15817 client_program = p;
15851 sprintf(
str,
"%d", hw_type);
15852 send(recv_sock,
str, strlen(
str) + 1, 0);
15921 if (n_received <= 0) {
15923 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"recv_net_command() returned %d, abort", n_received);
15930 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"rpc_execute() returned %d, abort",
status);
15940 }
while (remaining);
15955 if (strchr(
str,
'.'))
15956 *strchr(
str,
'.') = 0;
15957 cm_msg(
MTALK,
"rpc_server_receive_rpc",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16032 static char *xbuf = NULL;
16033 static int xbufsize = 0;
16034 static bool xbufempty =
true;
16037 if (sa == NULL && xbufempty)
16040 static bool recurse =
false;
16043 cm_msg(
MERROR,
"rpc_server_receive_event",
"internal error: called recursively");
16055 if (xbufempty && sa) {
16058 if (n_received < 0) {
16060 cm_msg(
MERROR,
"rpc_server_receive_event",
"recv_event_server_realloc() returned %d, abort", n_received);
16064 if (n_received == 0) {
16080 INT *pbh = (
INT *) xbuf;
16088 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d (SS_ABORT), abort",
status);
16099 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d, mserver dropped this event",
status);
16115 if (strchr(
str,
'.'))
16116 *strchr(
str,
'.') = 0;
16117 cm_msg(
MTALK,
"rpc_server_receive_event",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16183 }
else if (timeout_msec ==
BM_WAIT) {
16228 struct linger ling;
16237 setsockopt(sa->
recv_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
16241 setsockopt(sa->
send_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
16246 setsockopt(sa->
event_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
16301 struct timeval timeout;
16328 if (convert_flags) {
16337 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, send_tcp() returned %d",
16360 timeout.tv_sec = 1;
16361 timeout.tv_usec = 0;
16369 if (now > timeout_end_ms)
16382 if (!FD_ISSET(sa->
send_sock, &readfds) &&
16385 cm_msg(
MERROR,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec",
16401 if (FD_ISSET(sa->
send_sock, &readfds)) {
16404 cm_msg(
MERROR,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, recv_tcp() returned %d",
16452#ifndef DOXYGEN_SHOULD_SKIP_THIS
16603 if (((
PTYPE) event & 0x07) != 0) {
16604 cm_msg(
MERROR,
"bk_create",
"Bank %s created with unaligned event pointer",
name);
16613 *pdata = pbk32a + 1;
16621 *pdata = pbk32 + 1;
16634#ifndef DOXYGEN_SHOULD_SKIP_THIS
16647 DWORD bklen, bktype, bksze;
16673 memmove(pdest, (
char *) psbkh32a,
ALIGN8(bksze) +
sizeof(
BANK32A));
16687 memmove(pdest, (
char *) psbkh32,
ALIGN8(bksze) +
sizeof(
BANK32));
16694 psbkh = ((
BANK *) psdata - 1);
16701 memmove(pdest, (
char *) psbkh,
ALIGN8(bksze) +
sizeof(
BANK));
16740 if (*((
DWORD *) pbk32a->
name) == dname) {
16742 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
16750 memmove(pbk32a, (
char *) (pbk32a + 1) +
ALIGN8(pbk32a->
data_size), remaining);
16755 }
while ((
DWORD) ((
char *) pbk32a - (
char *) event) <
16762 if (*((
DWORD *) pbk32->
name) == dname) {
16764 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
16772 memmove(pbk32, (
char *) (pbk32 + 1) +
ALIGN8(pbk32->
data_size), remaining);
16777 }
while ((
DWORD) ((
char *) pbk32 - (
char *) event) <
16786 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
16794 memmove(pbk, (
char *) (pbk + 1) +
ALIGN8(pbk->
data_size), remaining);
16799 }
while ((
DWORD) ((
char *) pbk - (
char *) event) <
16823 pbk32a->
data_size = (
DWORD) ((
char *) pdata - (
char *) (pbk32a + 1));
16825 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk32a->
name[0], pbk32a->
name[1], pbk32a->
name[2], pbk32a->
name[3]);
16830 pbk32->
data_size = (
DWORD) ((
char *) pdata - (
char *) (pbk32 + 1));
16832 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk32->
name[0], pbk32->
name[1], pbk32->
name[2], pbk32->
name[3]);
16837 uint32_t size = (uint32_t) ((
char *) pdata - (
char *) (pbk + 1));
16838 if (size > 0xFFFF) {
16839 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16844 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk->
name[0], pbk->
name[1], pbk->
name[2], pbk->
name[3]);
16846 if (size > 0xFFFF) {
16847 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16894 if (pmbk32a == NULL)
16898 if (pmbk32 == NULL)
16912 strncat(bklist, (
char *) pmbk32a->
name, 4);
16914 strncat(bklist, (
char *) pmbk32->
name, 4);
16916 strncat(bklist, (
char *) pmbk->
name, 4);
16938 while ((
DWORD) ((
char *) pbk32a - (
char *) event) <
16940 if (*((
DWORD *) pbk32a->
name) == dname) {
16941 *((
void **) pdata) = pbk32a + 1;
16951 while ((
DWORD) ((
char *) pbk32 - (
char *) event) <
16953 if (*((
DWORD *) pbk32->
name) == dname) {
16954 *((
void **) pdata) = pbk32 + 1;
16964 while ((
DWORD) ((
char *) pbk - (
char *) event) <
16967 *((
void **) pdata) = pbk + 1;
16978 *((
void **) pdata) = NULL;
16999 if (*((
DWORD *) pbk32a->
name) == dname) {
17000 *((
void **) pdata) = pbk32a + 1;
17006 *bktype = pbk32a->
type;
17015 if (*((
DWORD *) pbk32->
name) == dname) {
17016 *((
void **) pdata) = pbk32 + 1;
17022 *bktype = pbk32->
type;
17032 *((
void **) pdata) = pbk + 1;
17038 *bktype = pbk->
type;
17046 *((
void **) pdata) = NULL;
17090 *pbk = (
BANK *) ((
char *) (*pbk + 1) +
ALIGN8((*pbk)->data_size));
17092 *((
void **) pdata) = (*pbk) + 1;
17095 *pbk = *((
BANK **) pdata) = NULL;
17104#ifndef DOXYGEN_SHOULD_SKIP_THIS
17130 *pbk = (
BANK32 *) ((
char *) (*pbk + 1) +
ALIGN8((*pbk)->data_size));
17132 *((
void **) pdata) = (*pbk) + 1;
17163 if (*pbk32a == NULL)
17166 *pbk32a = (
BANK32A *) ((
char *) (*pbk32a + 1) +
ALIGN8((*pbk32a)->data_size));
17168 *((
void **) pdata) = (*pbk32a) + 1;
17208 if (pbh->
flags < 0x10000 && !force)
17215 pbk = (
BANK *) (pbh + 1);
17225 pdata = pbk32a + 1;
17242 pbk = (
BANK *) pbk32a;
17245 pbk = (
BANK *) pbk32;
17254 while ((
char *) pdata < (
char *) pbk) {
17256 pdata = (
void *) (((
WORD *) pdata) + 1);
17264 while ((
char *) pdata < (
char *) pbk) {
17266 pdata = (
void *) (((
DWORD *) pdata) + 1);
17273 while ((
char *) pdata < (
char *) pbk) {
17275 pdata = (
void *) (((
double *) pdata) + 1);
17295#ifndef DOXYGEN_SHOULD_SKIP_THIS
17320#define MAX_RING_BUFFER 100
17401 if (
rb[
i].buffer == NULL)
17412 assert(
rb[
i].buffer);
17496 for (
i = 0;
i <= millisec / 10;
i++) {
17509 rp >
rb[h].buffer) {
17563 unsigned char *new_wp;
17571 cm_msg(
MERROR,
"rb_increment_wp",
"event size of %d MB larger than max_event_size of %d MB",
17576 new_wp =
rb[h].
wp + size;
17582 assert(
rb[h].rp !=
rb[h].buffer);
17584 if (new_wp >
rb[h].ep)
17639 for (
i = 0;
i <= millisec / 10;
i++) {
17641 if (
rb[h].wp !=
rb[h].rp) {
17643 *p =
rb[handle - 1].
rp;
17694 unsigned char *new_rp;
17705 new_rp =
rb[h].
rp + size;
17709 if (new_rp >= ep &&
rb[h].wp < ep)
17712 rb[handle - 1].
rp = new_rp;
17751 if (
rb[h].wp >=
rb[h].rp)
17769 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event %d ODB record size mismatch, db_set_record() status %d", pevent->
event_id,
status);
17776 char *pdata, *pdata0;
17785 HNDLE hKeyRoot, hKeyl, *hKeys;
17794 for (
n=0 ; ;
n++) {
17797 if (pbk32a == NULL)
17820 if (pbk32a == NULL)
17850 cm_msg(
MERROR,
"cm_write_event_to_odb",
"please define bank \"%s\" in BANK_LIST in frontend",
name);
17855 for (
i = 0;;
i++) {
17868 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17871 hKeys[
n++] = hKeyl;
17882 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot create key for bank \"%s\" with tid %d in ODB, db_create_key() status %d",
name, bktype,
status);
17887 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot find key for bank \"%s\" in ODB, after db_create_key(), db_find_key() status %d",
name,
status);
17894 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17896 hKeys[
n++] = hKeyRoot;
17908 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event format %d is not supported (see midas.h definitions of FORMAT_xxx)", format);
std::atomic_bool connected
BUFFER * get_pbuf() const
bm_lock_buffer_guard(BUFFER *pbuf, bool do_not_lock=false)
bm_lock_buffer_guard & operator=(const bm_lock_buffer_guard &)=delete
bm_lock_buffer_guard(const bm_lock_buffer_guard &)=delete
void set_string_size(std::string s, int size)
static bool exists(const std::string &name)
INT transition(INT run_number, char *error)
INT al_get_alarms(std::string *presult)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
INT bk_iterate32a(const void *event, BANK32A **pbk32a, void *pdata)
static void copy_bk_name(char *dst, const char *src)
INT bk_swap(void *event, BOOL force)
BOOL bk_is32a(const void *event)
int bk_delete(void *event, const char *name)
BOOL bk_is32(const void *event)
INT bk_iterate32(const void *event, BANK32 **pbk, void *pdata)
INT bk_locate(const void *event, const char *name, void *pdata)
void bk_init(void *event)
INT bk_list(const void *event, char *bklist)
INT bk_copy(char *pevent, char *psrce, const char *bkname)
INT bk_iterate(const void *event, BANK **pbk, void *pdata)
void bk_init32(void *event)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_find(const BANK_HEADER *pbkh, const char *name, DWORD *bklen, DWORD *bktype, void **pdata)
INT bk_size(const void *event)
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
static int bm_skip_event(BUFFER *pbuf)
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_write_statistics_to_odb(void)
#define MAX_DEFRAG_EVENTS
INT bm_delete_request(INT request_id)
INT bm_close_all_buffers(void)
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
static int bm_validate_client_index_locked(bm_lock_buffer_guard &pbuf_guard)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
static int bm_validate_buffer_locked(const BUFFER *pbuf)
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_compose_event_threadsafe(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
INT bm_remove_event_request(INT buffer_handle, INT request_id)
static double _bm_mutex_timeout_sec
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
static EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
static void bm_update_last_activity(DWORD millitime)
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
static DWORD _bm_max_event_size
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
INT bm_get_buffer_handle(const char *buffer_name, INT *buffer_handle)
int bm_send_event_vec(int buffer_handle, const std::vector< char > &event, int timeout_msec)
static int _bm_lock_timeout
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
INT bm_receive_event_alloc(INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
static void bm_reset_buffer_locked(BUFFER *pbuf)
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
INT cm_set_path(const char *path)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
static int cm_transition_call(TrState *s, int idx)
INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown)
static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info)
static std::atomic< std::thread * > _watchdog_thread
static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
INT cm_connect_client(const char *client_name, HNDLE *hConn)
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
INT cm_list_experiments_local(STRING_LIST *exp_names)
INT cm_start_watchdog_thread()
static INT bm_push_event(const char *buffer_name)
INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg)
static int tr_finish(HNDLE hDB, TrState *tr, int transition, int status, const char *errorstr)
INT cm_set_client_run_state(INT state)
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_stop_watchdog_thread()
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_check_client(HNDLE hDB, HNDLE hKeyClient)
static int xbm_lock_buffer(BUFFER *pbuf)
static void xbm_unlock_buffer(BUFFER *pbuf)
INT cm_dispatch_ipc(const char *message, int message_size, int client_socket)
static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_select_experiment_remote(const char *host_name, std::string *exp_name)
INT cm_register_server(void)
static BOOL _ctrlc_pressed
static void init_rpc_hosts(HNDLE hDB)
void cm_ack_ctrlc_pressed()
INT cm_execute(const char *command, char *result, INT bufsize)
INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
void cm_check_connect(void)
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
INT cm_select_experiment_local(std::string *exp_name)
std::string cm_expand_env(const char *str)
std::string cm_get_client_name()
static int bm_lock_buffer_mutex(BUFFER *pbuf)
int cm_exec_script(const char *odb_path_to_script)
INT EXPRT cm_get_path_string(std::string *path)
static void rpc_client_shutdown()
static std::atomic< bool > _watchdog_thread_is_running
static exptab_struct _exptab
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
static INT bm_notify_client(const char *buffer_name, int s)
int cm_get_exptab(const char *expname, std::string *dir, std::string *user)
static void xcm_watchdog_thread()
static bool test_cm_expand_env1(const char *str, const char *expected)
INT cm_synchronize(DWORD *seconds)
std::string cm_get_exptab_filename()
std::string cm_get_path()
static DWORD _deferred_transition_mask
std::string cm_get_history_path(const char *history_channel)
void cm_test_expand_env()
INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL))
int cm_set_experiment_local(const char *exp_name)
static INT _requested_transition
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
const char * cm_get_version()
INT cm_read_exptab(exptab_struct *exptab)
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
INT cm_deregister_transition(INT transition)
INT cm_check_deferred_transition()
std::string cm_get_experiment_name()
INT cm_set_transition_sequence(INT transition, INT sequence_number)
static bool tr_compare(const std::unique_ptr< TrClient > &arg1, const std::unique_ptr< TrClient > &arg2)
INT cm_delete_client_info(HNDLE hDB, INT pid)
static std::atomic< bool > _watchdog_thread_run
const char * cm_get_revision()
INT cm_watchdog_thread(void *unused)
INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient)
BOOL cm_is_ctrlc_pressed()
static INT tr_main_thread(void *param)
void cm_ctrlc_handler(int sig)
INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout)
INT cm_transition_cleanup()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
static int cm_transition_call_direct(TrClient *tr_client)
INT cm_exist(const char *name, BOOL bUnique)
INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg)
static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_set_experiment_name(const char *name)
#define CM_INVALID_TRANSITION
#define CM_DEFERRED_TRANSITION
#define CM_TRANSITION_IN_PROGRESS
#define CM_WRONG_PASSWORD
#define CM_TRANSITION_CANCELED
#define CM_VERSION_MISMATCH
#define BM_INVALID_MIXING
#define BM_INVALID_HANDLE
#define DB_INVALID_HANDLE
#define DB_NO_MORE_SUBKEYS
#define RPC_EXCEED_BUFFER
#define RPC_DOUBLE_DEFINED
#define RPC_NOT_REGISTERED
#define RPC_MUTEX_TIMEOUT
#define RPC_NO_CONNECTION
#define RPC_HNDLE_CONNECT
#define RPC_HNDLE_MSERVER
#define VALIGN(adr, align)
RPC_LIST * rpc_get_internal_list(INT flag)
#define MESSAGE_BUFFER_NAME
#define DRI_LITTLE_ENDIAN
#define MAX_STRING_LENGTH
#define MESSAGE_BUFFER_SIZE
void() EVENT_HANDLER(HNDLE buffer_handler, HNDLE request_id, EVENT_HEADER *event_header, void *event_data)
INT() RPC_HANDLER(INT index, void *prpc_param[])
std::string ss_gethostname()
INT ss_suspend(INT millisec, INT msg)
INT ss_get_struct_align()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_suspend_init_odb_port()
bool ss_event_socket_has_data()
time_t ss_mktime(struct tm *tms)
int ss_file_exist(const char *path)
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
INT recv_tcp2(int sock, char *net_buffer, int buffer_size, int timeout_ms)
INT ss_suspend_set_client_listener(int listen_socket)
INT ss_socket_get_peer_name(int sock, std::string *hostp, int *portp)
int ss_file_link_exist(const char *path)
INT ss_mutex_delete(MUTEX_T *mutex)
int ss_socket_wait(int sock, INT millisec)
INT ss_suspend_set_server_acceptions(RPC_SERVER_ACCEPTION_LIST *acceptions)
DWORD ss_settime(DWORD seconds)
char * ss_getpass(const char *prompt)
INT ss_suspend_set_client_connection(RPC_SERVER_CONNECTION *connection)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
INT recv_string(int sock, char *buffer, DWORD buffer_size, INT millisec)
INT ss_write_tcp(int sock, const char *buffer, size_t buffer_size)
int ss_dir_exist(const char *path)
bool ss_timed_mutex_wait_for_sec(std::timed_mutex &mutex, const char *mutex_name, double timeout_sec)
INT ss_semaphore_release(HNDLE semaphore_handle)
std::string ss_get_cmdline(void)
int ss_file_copy(const char *src, const char *dst, bool append)
INT recv_tcp(int sock, char *net_buffer, DWORD buffer_size, INT flags)
INT ss_resume(INT port, const char *message)
midas_thread_t ss_gettid(void)
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
INT ss_sleep(INT millisec)
INT ss_socket_connect_tcp(const char *hostname, int tcp_port, int *sockp, std::string *error_msg_p)
INT ss_semaphore_wait_for(HNDLE semaphore_handle, DWORD timeout_millisec)
INT ss_socket_listen_tcp(bool listen_localhost, int tcp_port, int *sockp, int *tcp_port_p, std::string *error_msg_p)
char * ss_crypt(const char *buf, const char *salt)
INT ss_spawnv(INT mode, const char *cmdname, const char *const argv[])
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
INT ss_socket_close(int *sockp)
char * ss_gets(char *string, int size)
INT ss_recv_net_command(int sock, DWORD *routine_id, DWORD *param_size, char **param_ptr, int timeout_ms)
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
INT send_tcp(int sock, char *buffer, DWORD buffer_size, INT flags)
void * ss_ctrlc_handler(void(*func)(int))
BOOL ss_pid_exists(int pid)
INT ss_system(const char *command)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT ss_file_find(const char *path, const char *pattern, char **plist)
static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages)
INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format,...)
int cm_msg_early_init(void)
INT EXPRT cm_msg_facilities(STRING_LIST *list)
int cm_msg_open_buffer(void)
int cm_msg_close_buffer(void)
static std::mutex gMsgBufMutex
static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message)
INT cm_msg_register(EVENT_HANDLER *func)
INT cm_msg_log(INT message_type, const char *facility, const char *message)
INT cm_msg_flush_buffer()
static std::deque< msg_buffer_entry > gMsgBuf
static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message)
std::string cm_get_error(INT code)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr)
INT cm_msg_retrieve(INT n_message, char *message, INT buf_size)
INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages)
void cm_msg_get_logfile(const char *fac, time_t t, std::string *filename, std::string *linkname, std::string *linktarget)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
struct rpc_server_acception_struct RPC_SERVER_ACCEPTION
BOOL equal_ustring(const char *str1, const char *str2)
INT db_flush_database(HNDLE hDB)
INT db_get_data_index(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT idx, DWORD type)
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
INT db_check_client(HNDLE hDB, HNDLE hKeyClient)
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
INT db_open_record(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info)
INT db_open_database(const char *xdatabase_name, INT database_size, HNDLE *hDB, const char *client_name)
void db_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
std::string strcomb1(const char **list)
INT db_get_data(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, DWORD type)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_get_key(HNDLE hDB, HNDLE hKey, KEY *key)
INT EXPRT db_get_value_string(HNDLE hdb, HNDLE hKeyRoot, const char *key_name, int index, std::string *s, BOOL create, int create_string_length)
INT db_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT db_set_data_index(HNDLE hDB, HNDLE hKey, const void *data, INT data_size, INT idx, DWORD type)
INT db_close_all_records()
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_close_all_databases(void)
INT db_set_data(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_delete(HNDLE hDB, HNDLE hKeyRoot, const char *odb_path)
INT db_sprintf(char *string, const void *data, INT data_size, INT idx, DWORD type)
INT db_update_last_activity(DWORD millitime)
INT db_set_data1(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_update_record_local(INT hDB, INT hKeyRoot, INT hKey, int index)
void db_set_watchdog_params(DWORD timeout)
INT db_update_record_mserver(INT hDB, INT hKeyRoot, INT hKey, int index, int client_socket)
void db_cleanup2(const char *client_name, int ignore_timeout, DWORD actual_time, const char *who)
int db_delete_client_info(HNDLE hDB, int pid)
static DATABASE * db_lock_database(HNDLE hDB, int *pstatus, const char *caller, bool check_attached=true)
INT db_set_client_name(HNDLE hDB, const char *client_name)
INT db_notify_clients_array(HNDLE hDB, HNDLE hKeys[], INT size)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
static void db_unlock_database(DATABASE *pdb, const char *caller)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
INT EXPRT db_set_value_string(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const std::string *s)
INT db_set_lock_timeout(HNDLE hDB, int timeout_millisec)
INT db_set_num_values(HNDLE hDB, HNDLE hKey, INT num_values)
INT db_protect_database(HNDLE hDB)
int rb_get_rp(int handle, void **p, int millisec)
int rb_delete(int handle)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
static volatile int _rb_nonblocking
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
int rb_get_buffer_level(int handle, int *n_bytes)
static RING_BUFFER rb[MAX_RING_BUFFER]
INT rpc_add_allowed_host(const char *hostname)
void rpc_convert_data(void *data, INT tid, INT flags, INT total_size, INT convert_flags)
INT rpc_client_connect(const char *host_name, INT port, const char *client_name, HNDLE *hConnection)
#define RPC_BM_ADD_EVENT_REQUEST
INT rpc_register_server(int port, int *plsock, int *pport)
#define RPC_CM_CHECK_CLIENT
static int recv_event_server_realloc(INT idx, RPC_SERVER_ACCEPTION *psa, char **pbuffer, int *pbuffer_size)
INT rpc_get_opt_tcp_size()
INT rpc_client_disconnect(HNDLE hConn, BOOL bShutdown)
#define RPC_BM_SEND_EVENT
INT rpc_client_call(HNDLE hConn, DWORD routine_id,...)
INT rpc_register_functions(const RPC_LIST *new_list, RPC_HANDLER func)
static std::atomic_bool gAllowedHostsEnabled(false)
INT rpc_server_callback(struct callback_addr *pcallback)
static std::mutex _client_connections_mutex
INT rpc_set_timeout(HNDLE hConn, int timeout_msec, int *old_timeout_msec)
#define RPC_CM_SYNCHRONIZE
static std::mutex gAllowedHostsMutex
INT recv_tcp_check(int sock)
INT rpc_server_receive_rpc(int idx, RPC_SERVER_ACCEPTION *sa)
#define RPC_BM_GET_BUFFER_INFO
#define RPC_CM_SET_CLIENT_INFO
const char * rpc_get_mserver_path()
static RPC_SERVER_ACCEPTION * rpc_get_server_acception(int idx)
RPC_SERVER_ACCEPTION * rpc_get_mserver_acception()
void rpc_calc_convert_flags(INT hw_type, INT remote_hw_type, INT *convert_flags)
INT rpc_server_connect(const char *host_name, const char *exp_name)
std::string rpc_get_name()
static RPC_SERVER_ACCEPTION * rpc_new_server_acception()
#define RPC_BM_REMOVE_EVENT_REQUEST
void rpc_debug_printf(const char *format,...)
const char * rpc_tid_name_old(INT id)
int cm_query_transition(int *transition, int *run_number, int *trans_time)
#define RPC_RC_TRANSITION
void rpc_va_arg(va_list *arg_ptr, INT arg_type, void *arg)
INT rpc_server_loop(void)
INT rpc_clear_allowed_hosts()
std::string rpc_get_mserver_hostname(void)
INT rpc_deregister_functions()
bool rpc_is_connected(void)
INT rpc_set_mserver_path(const char *path)
static std::vector< RPC_LIST > rpc_list
#define RPC_BM_CLOSE_BUFFER
static TLS_POINTER * tls_buffer
#define RPC_BM_SET_CACHE_SIZE
static std::vector< RPC_CLIENT_CONNECTION * > _client_connections
static void rpc_call_encode(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
static RPC_CLIENT_CONNECTION * rpc_get_locked_client_connection(HNDLE hConn)
static std::vector< std::string > gAllowedHosts
INT rpc_call(DWORD routine_id,...)
static std::mutex rpc_list_mutex
const char * rpc_tid_name(INT id)
INT rpc_register_client(const char *name, RPC_LIST *list)
static std::vector< RPC_SERVER_ACCEPTION * > _server_acceptions
static INT rpc_socket_check_allowed_host(int sock)
#define RPC_BM_CLOSE_ALL_BUFFERS
INT rpc_server_shutdown(void)
int rpc_flush_event_socket(int timeout_msec)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
static TR_FIFO _tr_fifo[10]
static std::string _mserver_path
INT rpc_get_timeout(HNDLE hConn)
INT rpc_server_disconnect()
INT rpc_set_debug(void(*func)(const char *), INT mode)
INT rpc_client_accept(int lsock)
void rpc_vax2ieee_float(float *var)
INT rpc_execute(INT sock, char *buffer, INT convert_flags)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_GET_BUFFER_LEVEL
int rpc_name_tid(const char *name)
INT rpc_register_listener(int port, RPC_HANDLER func, int *plsock, int *pport)
INT rpc_server_receive_event(int idx, RPC_SERVER_ACCEPTION *sa, int timeout_msec)
#define RPC_BM_OPEN_BUFFER
#define RPC_BM_EMPTY_BUFFERS
INT rpc_server_accept(int lsock)
static std::mutex _tr_fifo_mutex
bool rpc_is_mserver(void)
static RPC_SERVER_ACCEPTION * _mserver_acception
static int recv_net_command_realloc(INT idx, char **pbuf, int *pbufsize, INT *remaining)
void rpc_ieee2vax_float(float *var)
#define RPC_CM_SET_WATCHDOG_PARAMS
INT rpc_send_event1(INT buffer_handle, const EVENT_HEADER *pevent)
#define RPC_BM_RECEIVE_EVENT
static bool _rpc_is_remote
static int rpc_call_decode(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
INT rpc_set_name(const char *name)
INT rpc_register_function(INT id, INT(*func)(INT, void **))
#define RPC_CM_MSG_RETRIEVE
#define RPC_BM_SKIP_EVENT
INT rpc_client_dispatch(int sock)
#define RPC_BM_INIT_BUFFER_COUNTERS
static RPC_SERVER_CONNECTION _server_connection
INT rpc_check_channels(void)
static INT rpc_transition_dispatch(INT idx, void *prpc_param[])
static int handle_msg_odb(int n, const NET_COMMAND *nc)
#define RPC_BM_FLUSH_CACHE
void rpc_ieee2vax_double(double *var)
void rpc_vax2ieee_double(double *var)
#define RPC_CM_GET_WATCHDOG_INFO
INT rpc_get_convert_flags(void)
INT rpc_check_allowed_host(const char *hostname)
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
char exp_name[NAME_LENGTH]
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
char expt_name[NAME_LENGTH]
char buffer_name[NAME_LENGTH]
static const int tid_size[]
static std::string join(const char *sep, const std::vector< std::string > &v)
static std::vector< TRANS_TABLE > _trans_table
static std::atomic_int _message_mask_system
static int disable_bind_rpc_to_localhost
static std::mutex gBuffersMutex
int(* MessagePrintCallback)(const char *)
std::string cm_transition_name(int transition)
static DBG_MEM_LOC * _mem_loc
static std::vector< BUFFER * > gBuffers
static void(* _debug_print)(const char *)
static std::mutex _trans_table_mutex
static std::string _experiment_name
static std::string _path_name
static std::string _client_name
static std::vector< EventRequest > _request_list
static int _rpc_connect_timeout
static const char * tid_name[]
static const ERROR_TABLE _error_table[]
static INT _watchdog_timeout
INT bm_get_buffer_info(INT buffer_handle, BUFFER_HEADER *buffer_header)
static MUTEX_T * _mutex_rpc
static EVENT_HANDLER * _msg_dispatch
void * dbg_calloc(unsigned int size, unsigned int count, char *file, int line)
static BOOL _rpc_registered
static std::atomic< MessagePrintCallback > _message_print
bool ends_with_char(const std::string &s, char c)
void dbg_free(void *adr, char *file, int line)
static std::atomic_int _message_mask_user
static std::mutex _request_list_mutex
INT bm_get_buffer_level(INT buffer_handle, INT *n_bytes)
static TRANS_TABLE _deferred_trans_table[]
static int _rpc_listen_socket
std::string msprintf(const char *format,...)
void * dbg_malloc(unsigned int size, char *file, int line)
static const char * tid_name_old[]
static std::vector< std::string > split(const char *sep, const std::string &s)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
INT bm_init_buffer_counters(INT buffer_handle)
#define DIR_SEPARATOR_STR
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_RPC_TIMEOUT
#define PROGRAM_INFO_STR(_name)
#define MIN_WRITE_CACHE_SIZE
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
#define DEFAULT_MAX_EVENT_SIZE
#define WATCHDOG_INTERVAL
#define MAX_EVENT_REQUESTS
#define BANK_FORMAT_64BIT_ALIGNED
#define BANK_FORMAT_32BIT
std::vector< std::string > STRING_LIST
#define MAX_WRITE_CACHE_SIZE_DIV
#define TRANSITION_ERROR_STRING_LENGTH
#define BANK_FORMAT_VERSION
#define message(type, str)
#define write(n, a, f, d)
static std::string remove(const std::string s, char c)
int gettimeofday(struct timeval *tp, void *tzp)
struct callback_addr callback
EVENT_REQUEST event_request[MAX_EVENT_REQUESTS]
BUFFER_CLIENT client[MAX_CLIENTS]
BUFFER_INFO(BUFFER *pbuf)
int client_count_write_wait[MAX_CLIENTS]
DWORD client_time_write_wait[MAX_CLIENTS]
std::timed_mutex buffer_mutex
std::timed_mutex read_cache_mutex
char client_name[NAME_LENGTH]
std::timed_mutex write_cache_mutex
int client_count_write_wait[MAX_CLIENTS]
BUFFER_HEADER * buffer_header
std::atomic< size_t > read_cache_size
std::atomic< size_t > write_cache_size
char buffer_name[NAME_LENGTH]
std::atomic_bool attached
DWORD client_time_write_wait[MAX_CLIENTS]
EVENT_HANDLER * dispatcher
NET_COMMAND_HEADER header
unsigned int max_event_size
RPC_PARAM param[MAX_RPC_PARAMS]
std::atomic< std::thread * > thread
std::atomic_bool finished
std::vector< int > wait_for_index
std::string waiting_for_client
std::vector< std::unique_ptr< TrClient > > clients
unsigned short host_port1
unsigned short host_port2
unsigned short host_port3
std::vector< exptab_entry > exptab
std::mutex event_sock_mutex
static double fac(double a)
static te_expr * list(state *s)