16#include "git-revision.h"
22#include <sys/resource.h>
62#ifndef DOXYGEN_SHOULD_SKIP_THIS
161extern unsigned _stklen = 60000U;
274 "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"},
333 f = fopen(
"mem.txt",
"w");
351 memset(adr, 0, size *
count);
369 f = fopen(
"mem.txt",
"w");
381static std::vector<std::string>
split(
const char* sep,
const std::string& s)
383 unsigned sep_len = strlen(sep);
384 std::vector<std::string> v;
385 std::string::size_type pos = 0;
387 std::string::size_type next = s.find(sep, pos);
388 if (next == std::string::npos) {
389 v.push_back(s.substr(pos));
392 v.push_back(s.substr(pos, next-pos));
398static std::string
join(
const char* sep,
const std::vector<std::string>& v)
402 for (
unsigned i=0;
i<v.size();
i++) {
416 return s[s.length()-1] ==
c;
420 assert(format != NULL);
422 va_start(ap, format);
424 size_t size = vsnprintf(
nullptr, 0, format, ap1) + 1;
425 char *buffer = (
char *)malloc(size);
431 vsnprintf(buffer, size, format, ap);
434 std::string s(buffer);
477 return msprintf(
"unlisted status code %d", code);
525 if (pos != std::string::npos) {
537 for (
size_t i = 0;
i < flist.size();
i++) {
538 const char *p = flist[
i].c_str();
539 if (strchr(p,
'_') == NULL && !(p[0] >=
'0' && p[0] <=
'9')) {
540 size_t pos = flist[
i].rfind(
'.');
541 if (pos != std::string::npos) {
542 flist[
i].resize(pos);
544 list->push_back(flist[
i]);
553void cm_msg_get_logfile(
const char *
fac, time_t t, std::string* filename, std::string* linkname, std::string* linktarget) {
563 *filename = std::string(
fac) +
".log";
578 std::string facility;
584 std::string message_format;
586 if (message_format.find(
'%') != std::string::npos) {
593 localtime_r(&t, &tms);
597 strftime(
de + 1,
sizeof(
de)-1, strchr(message_format.c_str(),
'%'), &tms);
601 std::string message_dir;
603 if (message_dir.empty()) {
605 if (message_dir.empty()) {
607 if (message_dir.empty()) {
621 *filename = message_dir + facility + message_format +
".log";
622 if (!message_format.empty()) {
624 *linkname = message_dir + facility +
".log";
626 *linktarget = facility + message_format +
".log";
685 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n",
message,
status);
689 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n",
message);
695 std::string filename, linkname, linktarget;
700 if (!linkname.empty()) {
708 unlink(linkname.c_str());
709 status = symlink(linktarget.c_str(), linkname.c_str());
712 "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n",
713 linktarget.c_str(), linkname.c_str(), errno, strerror(errno));
718 int fh = open(filename.c_str(), O_WRONLY | O_CREAT | O_APPEND |
O_LARGEFILE, 0644);
721 "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n",
722 message, filename.c_str(), errno, strerror(errno));
730 localtime_r(&
tv.tv_sec, &tms);
733 strftime(
str,
sizeof(
str),
"%H:%M:%S", &tms);
734 sprintf(
str + strlen(
str),
".%03d ", (
int) (
tv.tv_usec / 1000));
735 strftime(
str + strlen(
str),
sizeof(
str),
"%G/%m/%d", &tms);
746 ssize_t len = msg.length();
749 ssize_t wr =
write(fh, msg.c_str(), len);
752 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", write() error, errno %d (%s)\n",
message, filename.c_str(), errno, strerror(errno));
753 }
else if (wr != len) {
754 fprintf(stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n",
message, filename.c_str(), (
int)wr, (
int)len);
765static std::string
cm_msg_format(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, va_list *argptr)
768 const char* pc = filename + strlen(filename);
769 while (*pc !=
'\\' && *pc !=
'/' && pc != filename)
775 std::string type_str;
784 if (message_type &
MT_LOG)
796 if (
name.length() > 0)
804 message +=
msprintf(
"[%s:%d:%s,%s] ", pc, line, routine, type_str.c_str());
805 }
else if (message_type ==
MT_USER) {
810 char* buf = (
char*)malloc(bufsize);
813 for (
int i=0;
i<10;
i++) {
815 va_copy(ap, *argptr);
818 int n = vsnprintf(buf, bufsize-1, format, ap);
830 buf = (
char*)realloc(buf, bufsize);
844 if (message_type !=
MT_LOG) {
847 size_t len = strlen(send_message);
849 char event[event_length];
852 memcpy(event +
sizeof(
EVENT_HEADER), send_message, len + 1);
886 for (
i = 0;
i < 100;
i++) {
931INT cm_msg(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, ...)
938 va_start(argptr, format);
947 if (message_type !=
MT_LOG) {
990 const char *facility,
const char *routine,
const char *format, ...) {
1002 va_start(argptr, format);
1081static void add_message(
char **messages,
int *length,
int *allocated, time_t tstamp,
const char *new_message) {
1082 int new_message_length = strlen(new_message);
1083 int new_allocated = 1024 + 2 * ((*allocated) + new_message_length);
1089 if (*length + new_message_length + 100 > *allocated) {
1090 *messages = (
char *) realloc(*messages, new_allocated);
1091 assert(*messages != NULL);
1092 *allocated = new_allocated;
1096 if ((*messages)[(*length) - 1] !=
'\n') {
1097 (*messages)[*length] =
'\n';
1101 sprintf(buf,
"%ld ", tstamp);
1102 buf_length = strlen(buf);
1103 memcpy(&((*messages)[*length]), buf, buf_length);
1104 (*length) += buf_length;
1106 memcpy(&((*messages)[*length]), new_message, new_message_length);
1107 (*length) += new_message_length;
1108 (*messages)[*length] = 0;
1112static int cm_msg_retrieve1(
const char *filename, time_t t,
INT n_messages,
char **messages,
int *length,
int *allocated,
1113 int *num_messages) {
1117 struct stat stat_buf;
1118 time_t tstamp, tstamp_valid, tstamp_last;
1124 fh = open(filename, O_RDONLY |
O_TEXT, 0644);
1126 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot open log file \"%s\", errno %d (%s)", filename, errno,
1132 fstat(fh, &stat_buf);
1133 ssize_t size = stat_buf.st_size;
1136 ssize_t maxsize = 10 * 1024 * 1024;
1137 if (size > maxsize) {
1138 lseek(fh, -maxsize, SEEK_END);
1143 char *buffer = (
char *) malloc(size + 1);
1145 if (buffer == NULL) {
1146 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (
int) size,
1147 filename, errno, strerror(errno));
1152 ssize_t rd =
read(fh, buffer, size);
1155 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)",
1156 (
int) size, filename, (
int) rd, errno, strerror(errno));
1164 p = buffer + size - 1;
1165 tstamp_last = tstamp_valid = 0;
1168 while (*p ==
'\n' || *p ==
'\r')
1172 for (
n = 0; !stop && p > buffer;) {
1176 for (
i = 0; p != buffer && (*p !=
'\n' && *p !=
'\r');
i++)
1180 if (
i >= (
int)
sizeof(
str))
1181 i =
sizeof(
str) - 1;
1187 memcpy(
str, p + 1,
i);
1189 if (strchr(
str,
'\n'))
1190 *strchr(
str,
'\n') = 0;
1191 if (strchr(
str,
'\r'))
1192 *strchr(
str,
'\r') = 0;
1193 mstrlcat(
str,
"\n",
sizeof(
str));
1200 localtime_r(&now, &tms);
1202 if (
str[0] >=
'0' &&
str[0] <=
'9') {
1204 tms.tm_hour = atoi(
str);
1205 tms.tm_min = atoi(
str + 3);
1206 tms.tm_sec = atoi(
str + 6);
1207 tms.tm_year = atoi(
str + 13) - 1900;
1208 tms.tm_mon = atoi(
str + 18) - 1;
1209 tms.tm_mday = atoi(
str + 21);
1212 tms.tm_hour = atoi(
str + 11);
1213 tms.tm_min = atoi(
str + 14);
1214 tms.tm_sec = atoi(
str + 17);
1215 tms.tm_year = atoi(
str + 20) - 1900;
1216 for (
i = 0;
i < 12;
i++)
1220 tms.tm_mday = atoi(
str + 8);
1224 tstamp_valid = tstamp;
1227 if (n_messages == 0) {
1228 if (tstamp_valid < t)
1233 if (n_messages != 0) {
1234 if (tstamp_last > 0 && tstamp_valid < tstamp_last)
1238 if (t == 0 || tstamp == -1 ||
1239 (n_messages > 0 && tstamp <= t) ||
1240 (n_messages == 0 && tstamp >= t)) {
1247 while (*p ==
'\n' || *p ==
'\r')
1250 if (n_messages == 1)
1252 else if (n_messages > 1) {
1254 if (
n == n_messages)
1255 tstamp_last = tstamp_valid;
1258 if (
n == n_messages && tstamp_valid == 0)
1281 std::string filename, linkname;
1293 if (!linkname.empty()) {
1295 filename = linkname;
1299 cm_msg_retrieve1(filename.c_str(), t, n_message, messages, &length, &allocated, &
n);
1305 if (linkname.empty()) {
1313 while (
n < n_message) {
1314 filedate -= 3600 * 24;
1321 cm_msg_retrieve1(filename.c_str(), t, n_message -
n, messages, &length, &allocated, &
i);
1352 char *messages = NULL;
1353 int num_messages = 0;
1361 mstrlcpy(
message, messages, buf_size);
1362 int len = strlen(messages);
1398 if (seconds != NULL) {
1501 return GIT_REVISION;
1515 assert(path[0] != 0);
1538 assert(path_size !=
sizeof(
char *));
1542 mstrlcpy(path,
_path_name.c_str(), path_size);
1562 assert(path != NULL);
1608#ifdef LOCAL_ROUTINES
1635 if (getenv(
"MIDAS_DIR")) {
1640 if (getenv(
"MIDAS_EXPT_NAME")) {
1641 e.
name = getenv(
"MIDAS_EXPT_NAME");
1644 cm_msg(
MERROR,
"cm_read_exptab",
"Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"",
e.name.c_str());
1647 e.directory = getenv(
"MIDAS_DIR");
1656#if defined (OS_WINNT)
1658 if (getenv(
"SystemRoot"))
1659 str = getenv(
"SystemRoot");
1660 else if (getenv(
"windir"))
1661 str = getenv(
"windir");
1665 std::string alt_str =
str;
1666 str +=
"\\system32\\exptab";
1667 alt_str +=
"\\system\\exptab";
1668#elif defined (OS_UNIX)
1669 std::string
str =
"/etc/exptab";
1670 std::string alt_str =
"/exptab";
1672 std::strint
str =
"exptab";
1673 std::string alt_str =
"exptab";
1677 if (getenv(
"MIDAS_EXPTAB")) {
1678 str = getenv(
"MIDAS_EXPTAB");
1679 alt_str = getenv(
"MIDAS_EXPTAB");
1685 FILE* f = fopen(
str.c_str(),
"r");
1687 f = fopen(alt_str.c_str(),
"r");
1696 memset(buf, 0,
sizeof(buf));
1697 char*
str = fgets(buf,
sizeof(buf)-1, f);
1700 if (
str[0] == 0)
continue;
1701 if (
str[0] ==
'#')
continue;
1709 while (*
str && isspace(*
str))
1715 while (*p2 && !isspace(*p2))
1718 ssize_t len = p2-p1;
1725 e.
name = std::string(p1, len);
1733 while (*
str && isspace(*
str))
1739 while (*p2 && !isspace(*p2))
1749 e.directory = std::string(p1, len);
1757 while (*
str && isspace(*
str))
1763 while (*p2 && !isspace(*p2))
1770 e.user = std::string(p1, len);
1784 for (
unsigned j=0;
j<exptab->
exptab.size();
j++) {
1785 cm_msg(
MINFO,
"cm_read_exptab",
"entry %d, experiment \"%s\", directory \"%s\", user \"%s\"",
j, exptab->
exptab[
j].name.c_str(), exptab->
exptab[
j].directory.c_str(), exptab->
exptab[
j].user.c_str());
1846int cm_get_exptab(
const char *expname,
char *dir,
int dir_size,
char *user,
int user_size) {
1847 std::string sdir, suser;
1851 mstrlcpy(dir, sdir.c_str(), dir_size);
1853 mstrlcpy(user, suser.c_str(), user_size);
1889#ifdef LOCAL_ROUTINES
1910 char *client_name,
INT hw_type,
const char *password,
DWORD watchdog_timeout) {
1913 host_name, client_name, hw_type, password, watchdog_timeout);
1915#ifdef LOCAL_ROUTINES
1920 BOOL call_watchdog, allow;
1942 if (!allow && strcmp(password,
pwd) != 0) {
1955 sprintf(
str,
"System/Clients/%0d", pid);
1965 strcpy(
name, client_name);
1966 strcpy(orig_name, client_name);
1989 sprintf(
name,
"%s%d", client_name, idx);
1996 sprintf(
str,
"System/Clients/%0d/Name", pid);
2000 cm_msg(
MERROR,
"cm_set_client_info",
"cannot set client name, db_set_value(%s) status %d",
str,
status);
2005 strcpy(client_name,
name);
2012 sprintf(
str,
"System/Clients/%0d", pid);
2041 size =
sizeof(watchdog_timeout);
2042 sprintf(
str,
"/Programs/%s/Watchdog Timeout", orig_name);
2046 sprintf(
str,
"/Programs/%s", orig_name);
2156 if (
host_name && getenv(
"MIDAS_SERVER_HOST"))
2157 mstrlcpy(
host_name, getenv(
"MIDAS_SERVER_HOST"), host_name_size);
2159 if (
exp_name && getenv(
"MIDAS_EXPT_NAME"))
2160 mstrlcpy(
exp_name, getenv(
"MIDAS_EXPT_NAME"), exp_name_size);
2171 if (
host_name && getenv(
"MIDAS_SERVER_HOST"))
2172 *
host_name = getenv(
"MIDAS_SERVER_HOST");
2174 if (
exp_name && getenv(
"MIDAS_EXPT_NAME"))
2175 *
exp_name = getenv(
"MIDAS_EXPT_NAME");
2180#ifdef LOCAL_ROUTINES
2184 std::string exp_name1;
2194 std::string expdir, expuser;
2204 cm_msg(
MERROR,
"cm_set_experiment_local",
"Experiment \"%s\" directory \"%s\" does not exist", exp_name1.c_str(), expdir.c_str());
2219 cm_msg(
MERROR,
"cm_check_connect",
"cm_disconnect_experiment not called at end of program");
2314 const char *client_name,
void (*func)(
char *),
INT odb_size,
DWORD watchdog_timeout) {
2342 if (WSAStartup(MAKEWORD(1, 1), &WSAData) != 0)
2347 std::string default_exp_name1;
2348 if (default_exp_name)
2349 default_exp_name1 = default_exp_name;
2353 if (default_exp_name1.length() == 0) {
2372#ifdef LOCAL_ROUTINES
2381 INT semaphore_elog, semaphore_alarm, semaphore_history, semaphore_msg;
2386 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create alarm semaphore");
2391 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create elog semaphore");
2396 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create history semaphore");
2401 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create message semaphore");
2420 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open database, db_open_database() status %d",
status);
2428 size =
sizeof(odb_timeout);
2431 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/ODB timeout, status %d",
status);
2434 if (odb_timeout > 0) {
2439 size =
sizeof(protect_odb);
2442 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Protect ODB, status %d",
status);
2450 size =
sizeof(enable_core_dumps);
2453 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Enable core dumps, status %d",
status);
2456 if (enable_core_dumps) {
2458 struct rlimit limit;
2459 limit.rlim_cur = RLIM_INFINITY;
2460 limit.rlim_max = RLIM_INFINITY;
2461 status = setrlimit(RLIMIT_CORE, &limit);
2463 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)", errno,
2467#warning setrlimit(RLIMIT_CORE) is not available
2476 "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d",
status);
2479 std::string local_host_name;
2483 local_host_name =
"localhost";
2488 if (watchdog_timeout == 0)
2491 strcpy(client_name1, client_name);
2528 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open message buffer, cm_msg_open_buffer() status %d",
status);
2536 std::string current_name;
2538 if (current_name.length() == 0 || current_name ==
"Default") {
2552 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot register RPC server, cm_register_server() status %d",
status);
2560 size =
sizeof(watchdog_timeout);
2561 sprintf(
str,
"/Programs/%s/Watchdog Timeout", client_name);
2567 std::string path =
"/Programs/" + std::string(client_name);
2570 prog[
"Start command"] == std::string(
""))
2577 cm_msg(
MLOG,
"cm_connect_experiment",
"Program %s on host %s started", xclient_name.c_str(), local_host_name.c_str());
2594#ifdef LOCAL_ROUTINES
2603 assert(exp_names != NULL);
2635 assert(exp_names != NULL);
2639 mstrlcpy(hname,
host_name,
sizeof(hname));
2640 s = strchr(hname,
':');
2643 port = strtoul(s + 1, NULL, 0);
2651 cm_msg(
MERROR,
"cm_list_experiments_remote",
"Cannot connect to \"%s\" port %d: %s", hname, port, errmsg.c_str());
2656 send(sock,
"I", 2, 0);
2669 exp_names->push_back(
str);
2677#ifdef LOCAL_ROUTINES
2697 if (expts.size() == 1) {
2699 }
else if (expts.size() > 1) {
2700 printf(
"Available experiments on local computer:\n");
2702 for (
unsigned i = 0;
i < expts.size();
i++) {
2703 printf(
"%d : %s\n",
i, expts[
i].c_str());
2707 printf(
"Select number from 0 to %d: ", ((
int)expts.size())-1);
2710 int isel = atoi(
str);
2713 if (isel >= (
int)expts.size())
2746 if (expts.size() > 1) {
2747 printf(
"Available experiments on server %s:\n",
host_name);
2749 for (
unsigned i = 0;
i < expts.size();
i++) {
2750 printf(
"%d : %s\n",
i, expts[
i].c_str());
2754 printf(
"Select number from 0 to %d: ", ((
int)expts.size())-1);
2757 int isel = atoi(
str);
2760 if (isel >= (
int)expts.size())
2815 length =
sizeof(
INT);
2870 printf(
"Waiting for transition to finish...\n");
2882 std::string local_host_name;
2885 local_host_name =
"localhost";
2893 cm_msg(
MLOG,
"cm_disconnect_experiment",
"Program %s on host %s stopped", client_name.c_str(), local_host_name.c_str());
2971#ifndef DOXYGEN_SHOULD_SKIP_THIS
3032 if (hKeyClient != NULL)
3039 if (hKeyClient != NULL)
3046#ifndef DOXYGEN_SHOULD_SKIP_THIS
3070 if (semaphore_alarm)
3074 if (semaphore_history)
3079 *semaphore_msg = -1;
3087#ifdef LOCAL_ROUTINES
3103 assert(pbuf != NULL);
3107 printf(
"lock_buffer_guard(%s) ctor without lock\n",
fBuf->
buffer_name);
3126 printf(
"lock_buffer_guard(invalid) dtor\n");
3128 assert(
fBuf != NULL);
3146 assert(
fBuf != NULL);
3157 assert(
fBuf != NULL);
3174 assert(
fBuf != NULL);
3262#ifdef LOCAL_ROUTINES
3265 std::vector<BUFFER*> mybuffers;
3272 for (
BUFFER* pbuf : mybuffers) {
3274 if (!pbuf || !pbuf->attached)
3343 *call_watchdog =
FALSE;
3364#ifdef LOCAL_ROUTINES
3373#ifndef DOXYGEN_SHOULD_SKIP_THIS
3396 str = (
char *) malloc(max_size);
3400 int size = max_size;
3405 if (strlen(
str) < 1)
3417 int new_size = last + 10;
3421 "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d", new_size,
status);
3434 strcpy(buf,
"localhost");
3440 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create the RPC hosts access control list, db_get_value() status %d",
3450 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create \"Disable RPC hosts check\", db_get_value() status %d",
status);
3460 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot find the RPC hosts access control list, db_find_key() status %d",
3470 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot watch the RPC hosts access control list, db_watch() status %d",
status);
3505 size =
sizeof(
name);
3509 cm_msg(
MERROR,
"cm_register_server",
"cannot get client name, db_get_value() status %d",
status);
3513 mstrlcpy(
str,
"/Experiment/Security/RPC ports/",
sizeof(
str));
3516 size =
sizeof(port);
3520 cm_msg(
MERROR,
"cm_register_server",
"cannot get RPC port number, db_get_value(%s) status %d",
str,
status);
3528 cm_msg(
MERROR,
"cm_register_server",
"error, rpc_register_server(port=%d) status %d", port,
status);
3541 cm_msg(
MERROR,
"cm_register_server",
"error, db_find_key(\"Server Port\") status %d",
status);
3551 cm_msg(
MERROR,
"cm_register_server",
"error, db_set_data(\"Server Port\"=%d) status %d", port,
status);
3772 }
else if (
count > 1) {
3819 "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout",
3840#ifndef DOXYGEN_SHOULD_SKIP_THIS
3863 char tr_key_name[256];
3895 cm_msg(
MERROR,
"cm_register_deferred_transition",
"Cannot hotlink /Runinfo/Requested Transition");
3930 cm_msg(
MERROR,
"cm_check_deferred_transition",
"Cannot perform deferred transition: %s",
str);
3946#ifndef DOXYGEN_SHOULD_SKIP_THIS
3992 printf(
", wait for:");
4000static bool tr_compare(
const std::unique_ptr<TrClient>& arg1,
const std::unique_ptr<TrClient>& arg2) {
4001 return arg1->sequence_number < arg2->sequence_number;
4031 const char *buf =
"Success";
4035 sprintf(buf,
"status %d",
status);
4109 const char *args[100];
4111 char debug_arg[256];
4112 char start_arg[256];
4114 std::string mserver_hostname;
4120 const char *midassys = getenv(
"MIDASSYS");
4127 path +=
"mtransition";
4129 args[iarg++] = path.c_str();
4134 args[iarg++] =
"-h";
4135 args[iarg++] = mserver_hostname.c_str();
4142 args[iarg++] =
"-e";
4147 args[iarg++] =
"-d";
4149 sprintf(debug_arg,
"%d", debug_flag);
4150 args[iarg++] = debug_arg;
4154 args[iarg++] =
"STOP";
4156 args[iarg++] =
"PAUSE";
4158 args[iarg++] =
"RESUME";
4160 args[iarg++] =
"START";
4163 args[iarg++] = start_arg;
4166 args[iarg++] = NULL;
4169 for (iarg = 0; args[iarg] != NULL; iarg++) {
4170 printf(
"arg[%d] [%s]\n", iarg, args[iarg]);
4177 if (errstr != NULL) {
4178 sprintf(errstr,
"Cannot execute mtransition, ss_spawnv() returned %d",
status);
4193 int connect_timeout = 10000;
4194 int timeout = 120000;
4222 assert(wait_for_index >= 0);
4223 assert(wait_for_index < (
int)s->
clients.size());
4245 if (wait_for == NULL)
4252 printf(
"Client \"%s\" waits for client \"%s\"\n", tr_client->
client_name.c_str(), wait_for->
client_name.c_str());
4259 cm_msg(
MERROR,
"cm_transition_call",
"Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared", tr_client->
client_name.c_str(), tr_client->
transition, wait_for->
client_name.c_str());
4275 printf(
"Connecting to client \"%s\" on host %s...\n", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4277 cm_msg(
MINFO,
"cm_transition_call",
"cm_transition_call: Connecting to client \"%s\" on host %s...", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4280 size =
sizeof(timeout);
4283 if (connect_timeout < 1000)
4284 connect_timeout = 1000;
4287 size =
sizeof(timeout);
4312 "cannot connect to client \"%s\" on host %s, port %d, status %d",
4336 printf(
"Connection established to client \"%s\" on host %s\n", tr_client->
client_name.c_str(), tr_client->
host_name.c_str());
4339 "cm_transition: Connection established to client \"%s\" on host %s",
4351 printf(
"Executing RPC transition client \"%s\" on host %s...\n",
4355 "cm_transition: Executing RPC transition client \"%s\" on host %s...",
4383 printf(
"RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n",
4387 "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d",
4407 printf(
"hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n",
4454 for (
size_t i = 0;
i <
n;
i++) {
4462 printf(
"Calling local transition callback\n");
4464 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Calling local transition callback");
4480 printf(
"Local transition callback finished, status %d\n",
int(tr_client->
status));
4482 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Local transition callback finished, status %d",
int(tr_client->
status));
4490 return tr_client->
status;
4552 char tr_key_name[256];
4562 errstr_size =
sizeof(xerrstr);
4578 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
4589 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
4590 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
4592 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
4645 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to alarms: %s", alarms.c_str());
4646 mstrlcpy(errstr,
"Cannot start run due to alarms: ", errstr_size);
4647 mstrlcat(errstr, alarms.c_str(), errstr_size);
4658 HNDLE hkeyroot, hkey;
4675 size =
sizeof(program_info_required);
4678 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info required, status %d",
status);
4682 if (program_info_required) {
4687 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to program \"%s\" not running",
key.
name);
4688 std::string serrstr =
msprintf(
"Run start abort due to program \"%s\" not running",
key.
name);
4689 mstrlcpy(errstr, serrstr.c_str(), errstr_size);
4703 mstrlcpy(errstr,
"Unknown error", errstr_size);
4705 if (debug_flag == 0) {
4732 if (debug_flag == 1)
4733 printf(
"Setting run number %d in ODB\n",
run_number);
4734 if (debug_flag == 2)
4739 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/Run number in database, status %d",
status);
4745 if (debug_flag == 1)
4746 printf(
"Clearing /Runinfo/Requested transition\n");
4747 if (debug_flag == 2)
4748 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Clearing /Runinfo/Requested transition");
4756 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4758 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4767 mstrlcpy(errstr,
"Deferred transition already in progress", errstr_size);
4768 mstrlcat(errstr,
", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size);
4775 sprintf(tr_key_name,
"Transition %s DEFERRED", trname.c_str());
4784 size =
sizeof(sequence_number);
4793 if (debug_flag == 1)
4794 printf(
"---- Transition %s deferred by client \"%s\" ----\n", trname.c_str(),
str);
4795 if (debug_flag == 2)
4796 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s deferred by client \"%s\" ----", trname.c_str(),
str);
4798 if (debug_flag == 1)
4799 printf(
"Setting /Runinfo/Requested transition\n");
4800 if (debug_flag == 2)
4801 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Setting /Runinfo/Requested transition");
4812 sprintf(errstr,
"Transition %s deferred by client \"%s\"", trname.c_str(),
str);
4843 size =
sizeof(program_info_auto_start);
4846 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto start, status %d",
status);
4850 if (program_info_auto_start) {
4852 start_command[0] = 0;
4854 size =
sizeof(start_command);
4857 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info start command, status %d",
status);
4861 if (start_command[0]) {
4862 cm_msg(
MINFO,
"cm_transition",
"Auto Starting program \"%s\", command \"%s\"",
key.
name,
4897 size =
sizeof(
state);
4903 cm_msg(
MERROR,
"cm_transition",
"cannot get Runinfo/State in database");
4910 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time binary\" in database");
4917 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time\" in database");
4923 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4925 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4955 if (debug_flag == 1)
4956 printf(
"---- Transition %s started ----\n", trname.c_str());
4957 if (debug_flag == 2)
4958 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s started ----", trname.c_str());
4960 sprintf(tr_key_name,
"Transition %s", trname.c_str());
4964 for (
int i = 0,
status = 0;;
i++) {
4981 size =
sizeof(sequence_number);
4990 c->async_flag = async_flag;
4991 c->debug_flag = debug_flag;
4992 c->sequence_number = sequence_number;
4994 c->key_name = subkey.
name;
4998 size =
sizeof(client_name);
5000 c->client_name = client_name;
5013 size =
sizeof(port);
5025 if (cc->
port ==
c->port)
5031 s.
clients.push_back(std::unique_ptr<TrClient>(
c));
5034 cm_msg(
MERROR,
"cm_transition",
"transition %s: client \"%s\" is registered with sequence number %d more than once", trname.c_str(),
c->client_name.c_str(),
c->sequence_number);
5046 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5047 if (s.
clients[idx]->sequence_number == 0) {
5052 for (
size_t i = idx - 1; ;
i--) {
5053 if (s.
clients[
i]->sequence_number < s.
clients[idx]->sequence_number) {
5054 if (s.
clients[
i]->sequence_number > 0) {
5055 s.
clients[idx]->wait_for_index.push_back(
i);
5065 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5070 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5071 printf(
"TrClient[%d]: ",
int(idx));
5079 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5080 if (debug_flag == 1)
5081 printf(
"\n==== Found client \"%s\" with sequence number %d\n",
5082 s.
clients[idx]->client_name.c_str(), s.
clients[idx]->sequence_number);
5083 if (debug_flag == 2)
5085 "cm_transition: ==== Found client \"%s\" with sequence number %d",
5086 s.
clients[idx]->client_name.c_str(), s.
clients[idx]->sequence_number);
5090 assert(s.
clients[idx]->thread == NULL);
5093 if (s.
clients[idx]->port == 0) {
5103 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: client \"%s\" returned status %d", trname.c_str(),
5104 s.
clients[idx]->client_name.c_str(),
int(s.
clients[idx]->status));
5114 for (
size_t idx = 0; idx < s.
clients.size(); idx++) {
5117 s.
clients[idx]->thread->join();
5118 delete s.
clients[idx]->thread;
5119 s.
clients[idx]->thread = NULL;
5130 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: \"/Runinfo/Transition in progress\" was cleared", trname.c_str());
5133 mstrlcpy(errstr,
"Canceled", errstr_size);
5139 for (
size_t idx = 0; idx < s.
clients.size(); idx++)
5143 mstrlcpy(errstr, s.
clients[idx]->errorstr.c_str(), errstr_size);
5158 if (debug_flag == 1)
5159 printf(
"\n---- Transition %s finished ----\n", trname.c_str());
5160 if (debug_flag == 2)
5161 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s finished ----", trname.c_str());
5176 size =
sizeof(
state);
5179 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/State in database, db_set_value() status %d",
status);
5227 size =
sizeof(program_info_auto_stop);
5230 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto stop, status %d",
status);
5234 if (program_info_auto_stop) {
5248 mstrlcpy(errstr,
"Success", errstr_size);
5262 cm_msg(
MERROR,
"cm_transition",
"Could not start a run: cm_transition() status %d, message \'%s\'",
status,
5306 int sflag = async_flag &
TR_SYNC;
5311 cm_msg(
MERROR,
"cm_transition",
"previous transition did not finish yet");
5326 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
5334 int size =
sizeof(
i);
5338 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
5339 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
5341 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
5400#ifndef DOXYGEN_SHOULD_SKIP_THIS
5428 if (client_socket) {
5442 if (strchr(
str,
' '))
5443 *strchr(
str,
' ') = 0;
5461 printf(
"Received 2nd Ctrl-C, hard abort\n");
5464 printf(
"Received Ctrl-C, aborting...\n");
5524 std::string command;
5529 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" of type TID_STRING, db_get_value_string() error %d",
5530 odb_path_to_script,
status);
5534 for (
int i = 0;;
i++) {
5546 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" should not be TID_KEY", odb_path_to_script,
5551 char *buf = (
char *) malloc(size);
5552 assert(buf != NULL);
5555 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" of type %d, db_get_data() error %d",
5569 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" has invalid type %d, should be TID_STRING or TID_KEY",
5570 odb_path_to_script,
key.
type);
5576 if (command.length() > 0) {
5577 cm_msg(
MINFO,
"cm_exec_script",
"Executing script \"%s\" from ODB \"%s\"", command.c_str(), odb_path_to_script);
5598 static DWORD alarm_last_checked_sec = 0;
5602 static DWORD last_millitime = 0;
5603 DWORD tdiff_millitime = now_millitime - last_millitime;
5604 const DWORD kPeriod = 1000;
5605 if (last_millitime == 0) {
5606 last_millitime = now_millitime;
5607 tdiff_millitime = kPeriod;
5617 if (now_sec - alarm_last_checked_sec > 10) {
5619 alarm_last_checked_sec = now_sec;
5624 if (tdiff_millitime >= kPeriod) {
5626 if (tdiff_millitime > 60000)
5627 wrong_interval =
TRUE;
5631 bm_cleanup(
"cm_periodic_tasks", now_millitime, wrong_interval);
5632 db_cleanup(
"cm_periodic_tasks", now_millitime, wrong_interval);
5636 last_millitime = now_millitime;
5746 static int check_cm_execute = 1;
5747 static int enable_cm_execute = 0;
5752 if (check_cm_execute) {
5756 check_cm_execute = 0;
5761 size =
sizeof(enable_cm_execute);
5768 if (!enable_cm_execute) {
5770 mstrlcpy(buf, command,
sizeof(buf));
5771 cm_msg(
MERROR,
"cm_execute",
"cm_execute(%s...) is disabled by ODB \"/Experiment/Enable cm_execute\"", buf);
5776 strcpy(
str, command);
5782 fh = open(
str, O_RDONLY, 0644);
5785 n =
read(fh, result, bufsize - 1);
5786 result[
MAX(0,
n)] = 0;
5791 status = system(command);
5805#ifndef DOXYGEN_SHOULD_SKIP_THIS
5841 sprintf(
str,
"RPC/%d",
id);
5869 if (history_channel && (strlen(history_channel) > 0)) {
5871 p +=
"/Logger/History/";
5872 p += history_channel;
5873 p +=
"/History dir";
5934#ifdef LOCAL_ROUTINES
5944 bool badindex =
false;
5945 bool badclient =
false;
5955 if (pclient->
name[0] == 0)
5970 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5973 }
else if (badclient) {
5974 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5979 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5986 if (badindex || badclient) {
5987 static int prevent_recursion = 1;
5989 if (prevent_recursion) {
5990 prevent_recursion = 0;
5998 cm_msg(
MERROR,
"bm_validate_client_index",
"Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
6007 fprintf(stderr,
"bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6047#ifdef LOCAL_ROUTINES
6073 pbctmp = pheader->
client;
6090 pbclient = pheader->
client;
6094 if (pbclient->
pid) {
6097 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->
name,
6098 pheader->
name, who, pbclient->
pid);
6109 printf(
"buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6121 cm_msg(
MINFO,
"bm_cleanup",
"Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6122 pbclient->
name, pheader->
name, who,
6138 std::vector<BUFFER*> mybuffers;
6144 for (
BUFFER* pbuf : mybuffers) {
6147 if (pbuf->attached) {
6157 if (pclient->
pid == pid) {
6172#ifdef LOCAL_ROUTINES
6176 std::vector<BUFFER*> mybuffers;
6183 for (
BUFFER* pbuf : mybuffers) {
6186 if (pbuf->attached) {
6198 if (!wrong_interval)
6205#ifdef LOCAL_ROUTINES
6208 if (rp < 0 || rp > pheader->
size) {
6210 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6223 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6238static FILE* gRpLog = NULL;
6244 if (gRpLog == NULL) {
6245 gRpLog = fopen(
"rp.log",
"a");
6247 if (gRpLog && (total_size < 16)) {
6248 const char *pdata = (
const char *) (pheader + 1);
6249 const DWORD *pevent = (
const DWORD*) (pdata + rp);
6250 fprintf(gRpLog,
"%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->
name, rp, total_size,
6251 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6257 assert(total_size > 0);
6261 if (rp >= pheader->
size) {
6262 rp -= pheader->
size;
6279 if (pevent->
data_size <= 0 || total_size <= 0 || total_size > pheader->
size) {
6281 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6295 if (rp < pheader->write_pointer) {
6298 remaining = pheader->
size - rp;
6304 if (total_size > remaining) {
6306 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6327 const char *pdata = (
const char *) (pheader + 1);
6337 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->
name,
6344 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->
name,
6350 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->
name,
6359 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6360 pheader->
name, rp, rp0);
6364 int rp1 =
bm_next_rp(
"bm_validate_buffer_locked", pheader, pdata, rp);
6367 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->
name, rp, rp0);
6386 get_all = (get_all || xget_all);
6390 int rp =
c->read_pointer;
6394 int rp1 =
bm_next_rp(
"bm_validate_buffer_locked", pheader, pdata, rp);
6397 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6398 pheader->
name,
c->name,
c->read_pointer, rp, rp0);
6499 double buf_size = pheader->
size;
6503 double buf_fill = 0;
6504 double buf_cptr = 0;
6505 double buf_cused = 0;
6506 double buf_cused_pct = 0;
6508 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6511 if (buf_wptr == buf_cptr) {
6513 }
else if (buf_wptr > buf_cptr) {
6514 buf_cused = buf_wptr - buf_cptr;
6516 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6519 buf_cused_pct = buf_cused / buf_size * 100.0;
6528 if (buf_wptr == buf_rptr) {
6530 }
else if (buf_wptr > buf_rptr) {
6531 buf_fill = buf_wptr - buf_rptr;
6533 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6536 double buf_fill_pct = buf_fill / buf_size * 100.0;
6586 sprintf(
str,
"writes_blocked_by/%s/count_write_wait", pheader->
client[
i].
name);
6589 sprintf(
str,
"writes_blocked_by/%s/time_write_wait", pheader->
client[
i].
name);
6615 if ((strlen(
buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6618 cm_msg(
MERROR,
"bm_write_buffer_statistics_to_odb",
"Invalid empty buffer name \"%s\" or client name \"%s\"",
buffer_name.c_str(), client_name.c_str());
6635 size_t sbuffer_handle = buffer_handle;
6643 if (buffer_handle >=1 && sbuffer_handle <= nbuf) {
6649 if (sbuffer_handle > nbuf || buffer_handle <= 0) {
6651 cm_msg(
MERROR, who,
"invalid buffer handle %d: out of range [1..%d]", buffer_handle, (
int)nbuf);
6659 cm_msg(
MERROR, who,
"invalid buffer handle %d: empty slot", buffer_handle);
6667 cm_msg(
MERROR, who,
"invalid buffer handle %d: not attached", buffer_handle);
6743 int size =
sizeof(
INT);
6747 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6754#ifdef LOCAL_ROUTINES
6759 const int max_buffer_size = 2 * 1000 * 1024 * 1024;
6764 cm_msg(
MERROR,
"bm_open_buffer",
"cannot open buffer with zero name");
6781 std::string odb_path;
6782 odb_path +=
"/Experiment/Buffer sizes/";
6785 int size =
sizeof(
INT);
6788 if (buffer_size <= 0 || buffer_size > max_buffer_size) {
6790 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6791 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6801 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6811 *buffer_handle =
i + 1;
6820 static std::mutex gNewBufferMutex;
6821 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6831 *buffer_handle =
i + 1;
6904 pheader->
size = buffer_size;
6914 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"",
buffer_name,
6925 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6936 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6944 if (pheader->
size != buffer_size) {
6945 cm_msg(
MINFO,
"bm_open_buffer",
"Buffer \"%s\" requested size %d differs from existing size %d",
6948 buffer_size = pheader->
size;
6980 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...",
buffer_name,
7019 mstrlcpy(pclient->
name, client_name.c_str(),
sizeof(pclient->
name));
7050 *buffer_handle =
i+1;
7055 *buffer_handle =
gBuffers.size() + 1;
7092 *buffer_handle =
i + 1;
7113#ifdef LOCAL_ROUTINES
7128 std::vector<EventRequest> request_list_copy =
_request_list;
7130 for (
size_t i = 0;
i < request_list_copy.size();
i++) {
7131 if (request_list_copy[
i].buffer_handle == buffer_handle) {
7258#ifdef LOCAL_ROUTINES
7266 for (
size_t i = nbuf;
i > 0;
i--) {
7292#ifdef LOCAL_ROUTINES
7304 std::vector<BUFFER*> mybuffers;
7310 for (
BUFFER* pbuf : mybuffers) {
7311 if (!pbuf || !pbuf->attached)
7330#ifdef LOCAL_ROUTINES
7349 for (
i = 0;
i < 20;
i++) {
7371#ifdef LOCAL_ROUTINES
7386#ifdef LOCAL_ROUTINES
7443 size =
sizeof(client_name);
7449 client_name[strlen(
name)] = 0;
7455 size =
sizeof(port);
7458 size =
sizeof(remote_host);
7468 int client_pid = atoi(
key.
name);
7470 cm_msg(
MERROR,
"cm_shutdown",
"Cannot connect to client \'%s\' on host \'%s\', port %d",
7471 client_name, remote_host, port);
7473 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7475 kill(client_pid, SIGKILL);
7479 cm_msg(
MERROR,
"cm_shutdown",
"Cannot delete client info for client \'%s\', pid %d, status %d",
7494 int client_pid = atoi(
key.
name);
7496 cm_msg(
MERROR,
"cm_shutdown",
"Client \'%s\' not responding to shutdown command", client_name);
7498 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7500 kill(client_pid, SIGKILL);
7504 "Cannot delete client info for client \'%s\', pid %d, status %d",
name, client_pid,
7519 return return_status;
7558 size =
sizeof(client_name);
7572 client_name[strlen(
name)] = 0;
7625#ifdef LOCAL_ROUTINES
7630 std::vector<BUFFER*> mybuffers;
7637 for (
BUFFER* pbuf : mybuffers) {
7640 if (pbuf->attached) {
7656 if (
j != pbuf->client_index && pbclient->
pid &&
7657 (client_name == NULL || client_name[0] == 0
7658 || strncmp(pbclient->
name, client_name, strlen(client_name)) == 0)) {
7672 "Client \'%s\' on \'%s\' removed by cm_cleanup (idle %1.1lfs, timeout %1.0lfs)",
7695 db_cleanup2(client_name, ignore_timeout, now,
"cm_cleanup");
7722 const char *s =
str;
7727 std::string envname;
7734 const char *
e = getenv(envname.c_str());
7755 printf(
"test_expand_env: [%s] -> [%s] expected [%s]",
7759 if (s != expected) {
7760 printf(
", MISMATCH!\n");
7769 printf(
"Test expand_end()\n");
7770 setenv(
"FOO",
"foo", 1);
7771 setenv(
"BAR",
"bar", 1);
7772 setenv(
"EMPTY",
"", 1);
7788 printf(
"test_expand_env: all tests passed!\n");
7790 printf(
"test_expand_env: test FAILED!\n");
7799#ifndef DOXYGEN_SHOULD_SKIP_THIS
7828#ifdef LOCAL_ROUTINES
7872#ifdef LOCAL_ROUTINES
7892 *n_bytes += pheader->
size;
7912#ifdef LOCAL_ROUTINES
7920 fprintf(stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7921 cm_msg(
MERROR,
"bm_lock_buffer_read_cache",
"Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7928 fprintf(stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
7941 fprintf(stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7942 cm_msg(
MERROR,
"bm_lock_buffer_write_cache",
"Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7949 fprintf(stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
7964 fprintf(stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n", pbuf->
buffer_name);
7965 cm_msg(
MERROR,
"bm_lock_buffer_mutex",
"Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...", pbuf->
buffer_name);
7972 fprintf(stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n", pbuf->
buffer_name);
8005 fprintf(stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 1 second!\n", pbuf->
buffer_name);
8010 fprintf(stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 10 seconds, buffer semaphore is probably stuck, delete %s.SHM and try again!\n", pbuf->
buffer_name, pbuf->
buffer_name);
8021 fprintf(stderr,
"bm_lock_buffer: Error: Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...\n", pbuf->
buffer_name,
status);
8059 printf(
"unlock [??????] unused1 ????? pid %d\n", getpid());
8097#ifdef LOCAL_ROUTINES
8157#ifdef LOCAL_ROUTINES
8176 if (write_size > 0) {
8185 if (write_size > max_write_size) {
8186 size_t new_write_size = max_write_size;
8187 cm_msg(
MERROR,
"bm_set_cache_size",
"requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->
buffer_name, pbuf->
buffer_header->
size, new_write_size);
8188 write_size = new_write_size;
8206 if (read_size > 0) {
8207 pbuf->
read_cache = (
char *) malloc(read_size);
8213 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->
buffer_name, read_size);
8242 if (write_size > 0) {
8249 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->
buffer_name, write_size);
8305 static std::mutex mutex;
8312 std::lock_guard<std::mutex> lock(mutex);
8322#ifndef DOXYGEN_SHOULD_SKIP_THIS
8374#ifdef LOCAL_ROUTINES
8390 if (func == NULL && pbuf->
callback) {
8392 cm_msg(
MERROR,
"bm_add_event_request",
"mixing callback/non callback requests not possible");
8399 cm_msg(
MERROR,
"bm_add_event_request",
"GET_RECENT request not possible if read cache is enabled");
8478 INT sampling_type,
HNDLE *request_id,
8481 assert(request_id != NULL);
8533#ifdef LOCAL_ROUTINES
8599 if (request_id < 0 ||
size_t(request_id) >=
_request_list.size()) {
8604 int buffer_handle =
_request_list[request_id].buffer_handle;
8620 pclient = pheader->
client;
8622 printf(
"buffer \'%s\', rptr: %d, wptr: %d, size: %d\n", pheader->
name, pheader->
read_pointer,
8625 if (pclient[
i].pid) {
8626 printf(
"pointers: client %d \'%s\', rptr %d\n",
i, pclient[
i].
name, pclient[
i].read_pointer);
8641 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8650 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8661 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8670 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8679 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8695 if (pclient[
i].pid) {
8696 bm_validate_client_pointers(pheader, &pclient[
i]);
8732 assert(caller_name);
8744 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8775 min_rp += pheader->
size;
8777 assert(min_rp >= 0);
8778 assert(min_rp < pheader->size);
8785 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8800 int have_get_all_requests = 0;
8807 if (!have_get_all_requests)
8817 if (free_space <= 0)
8818 free_space += pheader->
size;
8820 if (free_space >= pheader->
size * 0.5) {
8840 for (
size_t i = 0;
i <
n;
i++) {
8857 r.
dispatcher(buffer_handle,
i, pevent, (
void *) (pevent + 1));
8864#ifdef LOCAL_ROUTINES
8890 *ptotal_size = total_size;
8919 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->
name, pc->
name, pc->
read_pointer, pheader->
read_pointer, pheader->
write_pointer, pheader->
size);
8923 char *pdata = (
char *) (pheader + 1);
8929 if ((total_size <= 0) || (total_size > pheader->
size)) {
8930 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->
name, pc->
name, pc->
read_pointer, pevent->
data_size,
event_size, total_size, pheader->
size, pheader->
read_pointer, pheader->
write_pointer);
8934 assert(total_size > 0);
8935 assert(total_size <= pheader->size);
8942 *ptotal_size = total_size;
8949 const char *pdata = (
const char *) (pheader + 1);
8951 if (rp + event_size <= pheader->size) {
8956 int size = pheader->
size - rp;
8957 memcpy(buf, pdata + rp, size);
8958 memcpy(buf + size, pdata,
event_size - size);
8964 const char *pdata = (
const char *) (pheader + 1);
8966 if (rp + event_size <= pheader->size) {
8968 vecptr->assign(pdata + rp, pdata + rp +
event_size);
8971 int size = pheader->
size - rp;
8972 vecptr->assign(pdata + rp, pdata + rp + size);
8973 vecptr->insert(vecptr->end(), pdata, pdata +
event_size - size);
8983 if (prequest->
valid) {
8993 is_requested =
TRUE;
8998 return is_requested;
9082 if (convert_flags) {
9104 char *pdata = (
char *) (pheader + 1);
9111 requested_space += 100;
9113 if (requested_space >= pheader->
size)
9117 DWORD time_end = time_start + timeout_msec;
9121 int blocking_client_index = -1;
9123 blocking_client_name[0] = 0;
9131 free += pheader->
size;
9135 if (requested_space < free) {
9163 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9178 printf(
"bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->
read_pointer, pheader->
write_pointer, free, pheader->
size, requested_space,
event_size, total_size);
9181 if (pevent->
data_size <= 0 || total_size <= 0 || total_size > pheader->
size) {
9183 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9196 int blocking_client = -1;
9226 blocking_client =
i;
9235 if (blocking_client >= 0) {
9236 blocking_client_index = blocking_client;
9237 mstrlcpy(blocking_client_name, pheader->
client[blocking_client].
name,
sizeof(blocking_client_name));
9254 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9286 int sleep_time_msec = 1000;
9288 if (timeout_msec ==
BM_WAIT) {
9295 if (now >= time_end) {
9300 sleep_time_msec = time_end - now;
9302 if (sleep_time_msec <= 0) {
9303 sleep_time_msec = 10;
9304 }
else if (sleep_time_msec > 1000) {
9305 sleep_time_msec = 1000;
9315 if (unlock_write_cache)
9364 if (unlock_write_cache) {
9373 if (!pbuf_guard.
relock()) {
9374 if (unlock_write_cache) {
9425 DWORD time_wait = time_start + timeout_msec;
9426 DWORD sleep_time = 1000;
9429 }
else if (timeout_msec ==
BM_WAIT) {
9432 if (sleep_time > (
DWORD)timeout_msec)
9433 sleep_time = timeout_msec;
9454 if (unlock_read_cache)
9461 }
else if (timeout_msec ==
BM_WAIT) {
9466 if (now >= time_wait) {
9469 sleep_time = time_wait - now;
9470 if (sleep_time > 1000)
9478 if (unlock_read_cache) {
9486 if (!pbuf_guard.
relock()) {
9487 if (unlock_read_cache) {
9517 char *pdata = (
char *) (pheader + 1);
9525 for (
int i=0;
i<sg_n;
i++) {
9527 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9555 for (;
i<sg_n;
i++) {
9556 if (
count + sg_len[
i] > size)
9558 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9567 size_t first = size -
count;
9568 size_t second = sg_len[
i] - first;
9569 assert(first + second == sg_len[
i]);
9570 assert(
count + first == size);
9574 memcpy(wptr, sg_ptr[
i], first);
9577 memcpy(wptr, sg_ptr[
i] + first, second);
9584 for (;
i<sg_n;
i++) {
9585 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9606 return prequest->
id;
9615 if (request_id >= 0) {
9619 sprintf(
str,
"B %s %d", pheader->
name, request_id);
9636 DWORD time_end = time_start + timeout_msec;
9638 int xtimeout_msec = timeout_msec;
9641 if (timeout_msec ==
BM_WAIT) {
9642 xtimeout_msec = 1000;
9646 if (xtimeout_msec > 1000) {
9647 xtimeout_msec = 1000;
9656 if (timeout_msec ==
BM_WAIT) {
9664 if (now >= time_end) {
9669 DWORD remain = time_end - now;
9671 if (remain < xtimeout_msec) {
9672 xtimeout_msec = remain;
9691 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
9694 if (data_size == 0) {
9695 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9699 if (data_size > MAX_DATA_SIZE) {
9700 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9718 const char* cptr =
event.data();
9719 size_t clen =
event.size();
9723int bm_send_event_vec(
int buffer_handle,
const std::vector<std::vector<char>>& event,
int timeout_msec)
9725 int sg_n =
event.size();
9726 const char* sg_ptr[sg_n];
9727 size_t sg_len[sg_n];
9728 for (
int i=0;
i<sg_n;
i++) {
9729 sg_ptr[
i] =
event[
i].data();
9730 sg_len[
i] =
event[
i].size();
9732 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9735#ifdef LOCAL_ROUTINES
9789int bm_send_event_sg(
int buffer_handle,
int sg_n,
const char*
const sg_ptr[],
const size_t sg_len[],
int timeout_msec)
9795 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_n %d", sg_n);
9799 if (sg_ptr[0] == NULL) {
9800 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_ptr[0] is NULL");
9805 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)sg_len[0], (
int)
sizeof(
EVENT_HEADER));
9811 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
9814 if (data_size == 0) {
9815 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9819 if (data_size > MAX_DATA_SIZE) {
9820 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9827 for (
int i=0;
i<sg_n;
i++) {
9832 cm_msg(
MERROR,
"bm_send_event",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
9838#ifdef LOCAL_ROUTINES
9887 cm_msg(
MERROR,
"bm_send_event",
"write cache size is bigger than buffer size");
9896 if (!too_big && pbuf->
write_cache_wp + total_size <= pbuf->write_cache_size) {
9901 for (
int i=0;
i<sg_n;
i++) {
9902 memcpy(wptr, sg_ptr[
i], sg_len[
i]);
9932 printf(
"bm_send_event: corrupted 111!\n");
9938 if (total_size >= (
size_t)pheader->
size) {
9940 cm_msg(
MERROR,
"bm_send_event",
"total event size (%d) larger than size (%d) of buffer \'%s\'", (
int)total_size, pheader->
size, pheader->
name);
9954 printf(
"bm_send_event: corrupted 222!\n");
9983 printf(
"bm_send_event: corrupted 333!\n");
10003 DWORD time_end = time_start + timeout_msec;
10004 DWORD time_bombout = time_end;
10006 if (timeout_msec < 10000)
10007 time_bombout = time_start + 10000;
10009 int xtimeout_msec = timeout_msec;
10012 if (timeout_msec ==
BM_WAIT) {
10013 xtimeout_msec = 1000;
10017 if (xtimeout_msec > 1000) {
10018 xtimeout_msec = 1000;
10027 if (timeout_msec ==
BM_WAIT) {
10029 if (now >= time_bombout) {
10041 if (now >= time_end) {
10046 DWORD remain = time_end - now;
10048 if (remain < (
DWORD)xtimeout_msec) {
10049 xtimeout_msec = remain;
10052 if (now >= time_bombout) {
10088#ifdef LOCAL_ROUTINES
10107 request_id[
i] = -1;
10117 if (ask_rp == ask_wp) {
10121 assert(ask_rp < ask_wp);
10123 size_t ask_free =
ALIGN8(ask_wp - ask_rp);
10125 if (ask_free == 0) {
10132 printf(
"bm_flush_cache: corrupted 111!\n");
10169 printf(
"bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10182 assert(total_size <= (
size_t)pheader->
size);
10239#ifdef LOCAL_ROUTINES
10288#ifdef LOCAL_ROUTINES
10295 max_size = *buf_size;
10317 if (!pbuf_guard.
relock()) {
10358 if (convert_flags) {
10361 }
else if (bufptr) {
10365 }
else if (vecptr) {
10367 char* cptr = (
char*)pevent;
10388 if (!pbuf_guard.
relock())
10425 if (is_requested) {
10433 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10445 if (convert_flags) {
10451 }
else if (dispatch || bufptr) {
10457 }
else if (vecptr) {
10520 xbuf_size = *buf_size;
10521 }
else if (ppevent) {
10526 xbuf = pvec->data();
10527 xbuf_size = pvec->size();
10529 assert(!
"incorrect call to bm_receivent_event_rpc()");
10534 DWORD time_end = time_start + timeout_msec;
10536 int xtimeout_msec = timeout_msec;
10538 int zbuf_size = xbuf_size;
10541 if (timeout_msec ==
BM_WAIT) {
10542 xtimeout_msec = 1000;
10546 if (xtimeout_msec > 1000) {
10547 xtimeout_msec = 1000;
10551 zbuf_size = xbuf_size;
10558 if (timeout_msec ==
BM_WAIT) {
10566 if (now >= time_end) {
10571 DWORD remain = time_end - now;
10573 if (remain < (
DWORD)xtimeout_msec) {
10574 xtimeout_msec = remain;
10589 }
else if (ppevent) {
10595 assert(!
"incorrect call to bm_receivent_event_rpc()");
10604 *buf_size = zbuf_size;
10605 }
else if (ppevent) {
10609 pvec->resize(zbuf_size);
10611 assert(!
"incorrect call to bm_receivent_event_rpc()");
10621 std::vector<char> *pv;
10624 pv =
new std::vector<char>;
10632 DWORD time_end = time_start + timeout_msec;
10634 int xtimeout_msec = timeout_msec;
10637 if (timeout_msec ==
BM_WAIT) {
10638 xtimeout_msec = 1000;
10642 if (xtimeout_msec > 1000) {
10643 xtimeout_msec = 1000;
10649 printf(
"bm_receive_event_rpc_cxx: handle %d, timeout %d, status %d, size %zu, via RPC_BM_RECEIVE_EVENT_CXX\n", buffer_handle, xtimeout_msec,
status, pv->size());
10652 if (timeout_msec ==
BM_WAIT) {
10660 if (now >= time_end) {
10665 DWORD remain = time_end - now;
10667 if (remain < (
DWORD)xtimeout_msec) {
10668 xtimeout_msec = remain;
10683 }
else if (ppevent) {
10689 assert(!
"incorrect call to bm_receivent_event_rpc_cxx()");
10701 if (pv->size() > (
size_t)*buf_size) {
10703 memcpy(buf, pv->data(), *buf_size);
10705 *buf_size = pv->size();
10706 memcpy(buf, pv->data(), *buf_size);
10708 }
else if (ppevent) {
10709 if (*ppevent == NULL) {
10711 assert(*ppevent != NULL);
10712 memcpy(*ppevent, pv->data(), pv->size());
10714 *ppevent = (
EVENT_HEADER*)realloc(*ppevent, pv->size());
10715 assert(*ppevent != NULL);
10716 memcpy(*ppevent, pv->data(), pv->size());
10721 assert(!
"incorrect call to bm_receivent_event_rpc()");
10794#ifdef LOCAL_ROUTINES
10874#ifdef LOCAL_ROUTINES
10885 return bm_read_buffer(pbuf, buffer_handle, (
void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags,
FALSE);
10952#ifdef LOCAL_ROUTINES
10963 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags,
FALSE);
10970#ifdef LOCAL_ROUTINES
11017#ifdef LOCAL_ROUTINES
11033#ifdef LOCAL_ROUTINES
11059 std::vector<BUFFER*> mybuffers;
11065 for (
size_t i = 0;
i < mybuffers.size();
i++) {
11094#ifdef LOCAL_ROUTINES
11110 std::vector<BUFFER*> mybuffers;
11117 for (
size_t idx = 0; idx < mybuffers.size(); idx++) {
11118 BUFFER* pbuf = mybuffers[idx];
11251 if (convert_flags) {
11286 std::vector<char>
vec;
11290 bool locked =
true;
11292 for (
size_t i = 0;
i <
n;
i++) {
11316 dispatched_something =
TRUE;
11325 cm_msg(
MERROR,
"bm_poll_event",
"received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (
int)
vec.size());
11348 if (dispatched_something)
11383#ifdef LOCAL_ROUTINES
11385 std::vector<BUFFER*> mybuffers;
11392 for (
BUFFER* pbuf : mybuffers) {
11395 if (!pbuf->attached)
11409#ifndef DOXYGEN_SHOULD_SKIP_THIS
11411#define MAX_DEFRAG_EVENTS 10
11467 "Received new event with ID %d while old fragments were not completed",
11468 (pevent->event_id & 0x0FFF));
11478 "Not enough defragment buffers, please increase MAX_DEFRAG_EVENTS and recompile");
11485 "Received first event fragment with %d bytes instead of %d bytes, event ignored",
11498 cm_msg(
MERROR,
"bm_defragement_event",
"Not enough memory to allocate event defragment buffer");
11520 "Received fragment without first fragment (ID %d) Ser#:%d",
11531 "Received fragments with more data (%d) than event size (%d)",
11584 printf(
"index %d, client \"%s\", host \"%s\", port %d, socket %d, connected %d, timeout %d",
11715 *convert_flags = 0;
11743 unsigned short int lo, hi;
11746 lo = *((
short int *) (var) + 1);
11747 hi = *((
short int *) (var));
11753 *((
short int *) (var) + 1) = hi;
11754 *((
short int *) (var)) = lo;
11758 unsigned short int lo, hi;
11761 lo = *((
short int *) (var) + 1);
11762 hi = *((
short int *) (var));
11768 *((
short int *) (var) + 1) = hi;
11769 *((
short int *) (var)) = lo;
11774 unsigned short int i1, i2, i3, i4;
11777 i1 = *((
short int *) (var) + 3);
11778 i2 = *((
short int *) (var) + 2);
11779 i3 = *((
short int *) (var) + 1);
11780 i4 = *((
short int *) (var));
11786 *((
short int *) (var) + 3) = i4;
11787 *((
short int *) (var) + 2) = i3;
11788 *((
short int *) (var) + 1) = i2;
11789 *((
short int *) (var)) = i1;
11793 unsigned short int i1, i2, i3, i4;
11796 i1 = *((
short int *) (var) + 3);
11797 i2 = *((
short int *) (var) + 2);
11798 i3 = *((
short int *) (var) + 1);
11799 i4 = *((
short int *) (var));
11805 *((
short int *) (var) + 3) = i4;
11806 *((
short int *) (var) + 2) = i3;
11807 *((
short int *) (var) + 1) = i2;
11808 *((
short int *) (var)) = i1;
11870 if (single_size == 0)
11873 int n = total_size / single_size;
11875 for (
int i = 0;
i <
n;
i++) {
11876 char* p = (
char *)
data + (
i * single_size);
11899 return "<unknown>";
11906 return "<unknown>";
11960 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11965 cm_msg(
MERROR,
"rpc_register_functions",
"registered RPC function with invalid ID %d", new_list[
i].
id);
11972 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11981 for (
int i = 0; new_list[
i].
id != 0;
i++) {
11985 if (
e.dispatch == NULL) {
11998#ifndef DOXYGEN_SHOULD_SKIP_THIS
12086 char net_buffer[256];
12088 int n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12103 struct timeval timeout;
12110 FD_SET(sock, &readfds);
12112 timeout.tv_sec = 0;
12113 timeout.tv_usec = 0;
12115 select(
FD_SETSIZE, &readfds, NULL, NULL, &timeout);
12117 if (FD_ISSET(sock, &readfds)) {
12118 n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12132 }
while (FD_ISSET(sock, &readfds));
12168 bool debug =
false;
12172 cm_msg(
MERROR,
"rpc_client_connect",
"cm_connect_experiment/rpc_set_name not called");
12178 cm_msg(
MERROR,
"rpc_client_connect",
"invalid port %d", port);
12184 static std::mutex gHostnameMutex;
12190 printf(
"rpc_client_connect: host \"%s\", port %d, client \"%s\"\n",
host_name, port, client_name);
12193 printf(
"client connection %d: ", (
int)
i);
12206 bool hostname_locked =
false;
12211 if (
c &&
c->connected) {
12213 if (!hostname_locked) {
12214 gHostnameMutex.lock();
12215 hostname_locked =
true;
12218 if ((
c->host_name ==
host_name) && (
c->port == port)) {
12222 gHostnameMutex.unlock();
12223 hostname_locked =
false;
12224 std::lock_guard<std::mutex> cguard(
c->mutex);
12226 if (
c->connected) {
12231 *hConnection =
c->index;
12233 printf(
"already connected: ");
12249 if (hostname_locked) {
12250 gHostnameMutex.unlock();
12251 hostname_locked =
false;
12257 static int last_reused = 0;
12260 for (
int j = 1;
j < size;
j++) {
12261 int i = (last_reused +
j) % size;
12265 printf(
"last reused %d, reusing slot %d: ", last_reused, (
int)
i);
12284 printf(
"new connection appended to array: ");
12291 c->connected =
true;
12300 std::string errmsg;
12305 cm_msg(
MERROR,
"rpc_client_connect",
"cannot connect to \"%s\" port %d: %s",
host_name, port, errmsg.c_str());
12310 gHostnameMutex.lock();
12315 gHostnameMutex.unlock();
12317 c->client_name = client_name;
12322 setsockopt(
c->send_sock, IPPROTO_TCP, TCP_NODELAY, (
char *) &
i,
sizeof(
i));
12330 std::string cstr =
msprintf(
"%d %s %s %s", hw_type,
cm_get_version(), local_prog_name.c_str(), local_host_name.c_str());
12332 int size = cstr.length() + 1;
12333 i = send(
c->send_sock, cstr.c_str(), size, 0);
12334 if (
i < 0 ||
i != size) {
12335 cm_msg(
MERROR,
"rpc_client_connect",
"cannot send %d bytes, send() returned %d, errno %d (%s)", size,
i, errno, strerror(errno));
12340 bool restore_watchdog_timeout =
false;
12341 BOOL watchdog_call;
12342 DWORD watchdog_timeout;
12348 restore_watchdog_timeout =
true;
12357 if (restore_watchdog_timeout) {
12362 cm_msg(
MERROR,
"rpc_client_connect",
"timeout waiting for server reply");
12368 int remote_hw_type = 0;
12369 char remote_version[32];
12370 remote_version[0] = 0;
12371 sscanf(
str,
"%d %s", &remote_hw_type, remote_version);
12373 c->remote_hw_type = remote_hw_type;
12377 mstrlcpy(v1, remote_version,
sizeof(v1));
12378 if (strchr(v1,
'.'))
12379 if (strchr(strchr(v1,
'.') + 1,
'.'))
12380 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
12383 if (strchr(
str,
'.'))
12384 if (strchr(strchr(
str,
'.') + 1,
'.'))
12385 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
12387 if (strcmp(v1,
str) != 0) {
12388 cm_msg(
MERROR,
"rpc_client_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", remote_version,
cm_get_version());
12391 c->connected =
true;
12393 *hConnection =
c->index;
12411 for (
i = 0;
i < MAX_RPC_CONNECTION;
i++)
12412 if (_client_connection[
i].send_sock != 0)
12413 printf(
"slot %d, checking client %s socket %d, connected %d\n",
i, _client_connection[
i].client_name, _client_connection[
i].send_sock, _client_connection[
i].connected);
12421 if (
c &&
c->connected) {
12422 std::lock_guard<std::mutex> cguard(
c->mutex);
12424 if (!
c->connected) {
12437 FD_SET(
c->send_sock, &readfds);
12439 struct timeval timeout;
12440 timeout.tv_sec = 0;
12441 timeout.tv_usec = 0;
12450 }
while (
status == -1 && errno == EINTR);
12453 if (!FD_ISSET(
c->send_sock, &readfds)) {
12460 status = recv(
c->send_sock, (
char *) buffer,
sizeof(buffer), MSG_PEEK);
12465 if (errno == EAGAIN) {
12472 "RPC client connection to \"%s\" on host \"%s\" is broken, recv() errno %d (%s)",
12473 c->client_name.c_str(),
12474 c->host_name.c_str(),
12475 errno, strerror(errno));
12478 }
else if (
status == 0) {
12483 cm_msg(
MINFO,
"rpc_client_check",
"RPC client connection to \"%s\" on host \"%s\" unexpectedly closed",
c->client_name.c_str(),
c->host_name.c_str());
12542 INT remote_hw_type, hw_type;
12543 char str[200], version[32], v1[32];
12545 struct timeval timeout;
12554 if (WSAStartup(MAKEWORD(1, 1), &WSAData) != 0)
12568 cm_msg(
MERROR,
"rpc_server_connect",
"cm_connect_experiment/rpc_set_name not called");
12580 bool listen_localhost =
false;
12582 if (strcmp(
host_name,
"localhost") == 0)
12583 listen_localhost =
true;
12585 int lsock1, lport1;
12586 int lsock2, lport2;
12587 int lsock3, lport3;
12589 std::string errmsg;
12594 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12601 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12608 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s", errmsg.c_str());
12614 s = strchr(
str,
':');
12617 port = strtoul(s + 1, NULL, 0);
12625 cm_msg(
MERROR,
"rpc_server_connect",
"cannot connect to mserver on host \"%s\" port %d: %s",
str, port, errmsg.c_str());
12631 sprintf(
str,
"C %d %d %d %s Default", lport1, lport2, lport3,
cm_get_version());
12635 send(sock,
str, strlen(
str) + 1, 0);
12639 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive status from server");
12643 status = version[0] = 0;
12644 sscanf(
str,
"%d %s", &
status, version);
12652 strcpy(v1, version);
12653 if (strchr(v1,
'.'))
12654 if (strchr(strchr(v1,
'.') + 1,
'.'))
12655 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
12658 if (strchr(
str,
'.'))
12659 if (strchr(strchr(
str,
'.') + 1,
'.'))
12660 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
12662 if (strcmp(v1,
str) != 0) {
12663 cm_msg(
MERROR,
"rpc_server_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", version,
12669 FD_SET(lsock1, &readfds);
12670 FD_SET(lsock2, &readfds);
12671 FD_SET(lsock3, &readfds);
12674 timeout.tv_usec = 0;
12685 if (!FD_ISSET(lsock1, &readfds)) {
12686 cm_msg(
MERROR,
"rpc_server_connect",
"mserver subprocess could not be started (check path)");
12698 cm_msg(
MERROR,
"rpc_server_connect",
"accept() failed");
12712 flag = 2 * 1024 * 1024;
12715 cm_msg(
MERROR,
"rpc_server_connect",
"cannot setsockopt(SOL_SOCKET, SO_SNDBUF), errno %d (%s)", errno, strerror(errno));
12720 sprintf(
str,
"%d %s", hw_type, local_prog_name.c_str());
12727 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive remote computer info");
12731 sscanf(
str,
"%d", &remote_hw_type);
12748 if (
c &&
c->connected) {
12751 if (!
c->connected) {
12771 if (
c &&
c->connected) {
12788 if (!
c->connected) {
12857 static int rpc_server_disconnect_recursion_level = 0;
12859 if (rpc_server_disconnect_recursion_level)
12862 rpc_server_disconnect_recursion_level = 1;
12887 rpc_server_disconnect_recursion_level = 0;
12979 INT tmp_type, size;
12997 dummy = 0x12345678;
12998 p = (
unsigned char *) &dummy;
13001 else if (*p == 0x12)
13004 cm_msg(
MERROR,
"rpc_get_option",
"unknown byte order format");
13007 f = (float) 1.2345;
13009 memcpy(&dummy, &f,
sizeof(f));
13010 if ((dummy & 0xFF) == 0x19 &&
13011 ((dummy >> 8) & 0xFF) == 0x04 && ((dummy >> 16) & 0xFF) == 0x9E
13012 && ((dummy >> 24) & 0xFF) == 0x3F)
13014 else if ((dummy & 0xFF) == 0x9E &&
13015 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x19
13016 && ((dummy >> 24) & 0xFF) == 0x04)
13019 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
13021 d = (double) 1.2345;
13023 memcpy(&dummy, &
d,
sizeof(f));
13024 if ((dummy & 0xFF) == 0x8D &&
13025 ((dummy >> 8) & 0xFF) == 0x97 && ((dummy >> 16) & 0xFF) == 0x6E
13026 && ((dummy >> 24) & 0xFF) == 0x12)
13028 else if ((dummy & 0xFF) == 0x83 &&
13029 ((dummy >> 8) & 0xFF) == 0xC0 && ((dummy >> 16) & 0xFF) == 0xF3
13030 && ((dummy >> 24) & 0xFF) == 0x3F)
13032 else if ((dummy & 0xFF) == 0x13 &&
13033 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x83
13034 && ((dummy >> 24) & 0xFF) == 0xC0)
13036 else if ((dummy & 0xFF) == 0x9E &&
13037 ((dummy >> 8) & 0xFF) == 0x40 && ((dummy >> 16) & 0xFF) == 0x18
13038 && ((dummy >> 24) & 0xFF) == 0x04)
13040 "MIDAS cannot handle VAX D FLOAT format. Please compile with the /g_float flag");
13042 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
13066 else if (hConn == -2)
13083 setsockopt(
c->send_sock, IPPROTO_TCP, TCP_NODELAY, (
char *) &
value,
sizeof(
value));
13113 int timeout =
c->rpc_timeout;
13134 if (old_timeout_msec)
13138 if (old_timeout_msec)
13144 if (old_timeout_msec)
13145 *old_timeout_msec =
c->rpc_timeout;
13146 c->rpc_timeout = timeout_msec;
13149 if (old_timeout_msec)
13150 *old_timeout_msec = 0;
13158#ifndef DOXYGEN_SHOULD_SKIP_THIS
13311 va_start(argptr, format);
13312 vsprintf(
str, (
char *) format, argptr);
13325 switch (arg_type) {
13334 *((
int *) arg) = va_arg(*arg_ptr,
int);
13339 *((
INT *) arg) = va_arg(*arg_ptr,
INT);
13348 *((
float *) arg) = (float) va_arg(*arg_ptr,
double);
13352 *((
double *) arg) = va_arg(*arg_ptr,
double);
13356 *((
char **) arg) = va_arg(*arg_ptr,
char *);
13366 bool debug =
false;
13369 printf(
"encode rpc_id %d \"%s\"\n", rl.
id, rl.
name);
13374 printf(
"i=%d, tid %d, flags 0x%x, n %d\n",
i, tid, flags,
n);
13403 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13404 char* buf = (
char *)malloc(buf_size);
13412 char* param_ptr = (*nc)->param;
13436 char* arg = args[
i];
13459 arg_size = 1 + strlen((
char *) *((
char **) arg));
13472 const char* arg_tmp = args[
i+1];
13476 arg_size = *((
INT *) *((
void **) arg_tmp));
13478 arg_size = *((
INT *) arg_tmp);
13480 *((
INT *) param_ptr) =
ALIGN8(arg_size);
13490 int param_size =
ALIGN8(arg_size);
13493 size_t param_offset = (
char *) param_ptr - (
char *)(*nc);
13495 if (param_offset + param_size + 16 > buf_size) {
13496 size_t new_size = param_offset + param_size + 1024;
13498 buf = (
char *) realloc(buf, new_size);
13500 buf_size = new_size;
13502 param_ptr = buf + param_offset;
13508 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy pointer %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13510 memcpy(param_ptr, (
void *) *((
void **) arg), arg_size);
13513 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, double->float\n",
i, flags, tid, arg_type, arg_size, param_size);
13516 *((
float *) param_ptr) = (float) *((
double *) arg);
13519 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13521 memcpy(param_ptr, arg, arg_size);
13524 param_ptr += param_size;
13531 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n", rl.
id, rl.
name, (
int)buf_size, (*nc)->header.param_size);
13539 bool debug =
false;
13542 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n", rl.
id, rl.
name, (
int)buf_size);
13546 const char* param_ptr = buf;
13570 if (param_ptr == NULL) {
13571 cm_msg(
MERROR,
"rpc_call_decode",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL", rl.
name);
13579 arg_size = strlen((
char *) (param_ptr)) + 1;
13582 arg_size = *((
INT *) param_ptr);
13590 int param_size =
ALIGN8(arg_size);
13593 if (*((
char **) arg)) {
13595 printf(
"decode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13596 memcpy((
void *) *((
char **) arg), param_ptr, arg_size);
13599 param_ptr += param_size;
13611 bool debug =
false;
13620 printf(
"encode rpc_id %d \"%s\"\n", rl.
id, rl.
name);
13625 printf(
"param %2d, tid %2d, flags 0x%02x, n %3d\n",
i, tid, flags,
n);
13654 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13655 char* buf = (
char *)malloc(buf_size);
13663 char* param_ptr = (*nc)->param;
13687 char* arg = args[
i];
13708 void* parg = (
void *) *((
void **) arg);
13712 std::string* s = (std::string*)parg;
13713 arg_size = 1 + s->length();
13714 parg = (
void*)s->c_str();
13718 arg_size = 1 + strlen((
char *) *((
char **) arg));
13732 std::vector<char>* pv = (std::vector<char>*)parg;
13733 arg_size = pv->size();
13734 parg = (
void*)pv->data();
13738 *((
INT *) param_ptr) = arg_size;
13741 const char* arg_tmp = args[
i+1];
13745 arg_size = *((
INT *) *((
void **) arg_tmp));
13747 arg_size = *((
INT *) arg_tmp);
13750 *((
INT *) param_ptr) =
ALIGN8(arg_size);
13762 int param_size =
ALIGN8(arg_size);
13765 size_t param_offset = (
char *) param_ptr - (
char *)(*nc);
13767 if (param_offset + param_size + 16 > buf_size) {
13768 size_t new_size = param_offset + param_size + 1024;
13770 buf = (
char *) realloc(buf, new_size);
13772 buf_size = new_size;
13774 param_ptr = buf + param_offset;
13780 printf(
"encode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, memcpy pointer %3d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13782 memcpy(param_ptr, parg, arg_size);
13785 printf(
"encode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, double->float\n",
i, flags, tid, arg_type, arg_size, param_size);
13788 *((
float *) param_ptr) = (float) *((
double *) arg);
13791 printf(
"encode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, memcpy %3d\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size);
13793 memcpy(param_ptr, arg, arg_size);
13796 param_ptr += param_size;
13803 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n", rl.
id, rl.
name, (
int)buf_size, (*nc)->header.param_size);
13811 bool debug =
false;
13820 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n", rl.
id, rl.
name, (
int)buf_size);
13824 const char* param_ptr = buf;
13843 char arg[
sizeof(double)+
sizeof(uint64_t)+
sizeof(
char*)];
13848 if (param_ptr == NULL) {
13849 cm_msg(
MERROR,
"rpc_call_decode_cxx",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL", rl.
name);
13856 arg_size = strlen((
char *) (param_ptr)) + 1;
13859 printf(
"decode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, string [%s]\n",
i, flags, tid, arg_type, arg_size, (
char *) (param_ptr));
13863 arg_size = *((
INT *) param_ptr);
13871 int param_size =
ALIGN8(arg_size);
13874 void* parg = *(
char**) arg;
13878 printf(
"decode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, assign %3d to std::string at %p, offset %zu, string [%s]\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size, parg, param_ptr - buf, param_ptr);
13879 *(std::string*)parg = param_ptr;
13882 printf(
"decode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, assign %3d to std::vector at %p, offset %zu\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size, parg, param_ptr - buf);
13883 std::vector<char>* pvec = (std::vector<char>*)parg;
13885 pvec->insert(pvec->end(), param_ptr, param_ptr + arg_size);
13888 printf(
"decode param %2d, flags 0x%02x, tid %2d, arg_type %2d, arg_size %3d, param_size %3d, memcpy %3d to %p, offset %zu\n",
i, flags, tid, arg_type, arg_size, param_size, arg_size, parg, param_ptr - buf);
13889 memcpy(parg, param_ptr, arg_size);
13893 param_ptr += param_size;
13955 cm_msg(
MERROR,
"rpc_client_call",
"invalid rpc connection handle %d", hConn);
13966 routine_id &= ~RPC_NO_REPLY;
13976 bool rpc_cxx =
false;
13981 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" with invalid RPC ID %d",
c->client_name.c_str(),
c->host_name.c_str(), routine_id);
13986 const char *rpc_name = rpc_entry.
name;
13992 va_start(ap, routine_id);
14009 if (rpc_no_reply) {
14010 i =
send_tcp(
c->send_sock, (
char *) nc, send_size, 0);
14012 if (
i != send_size) {
14013 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
14033 i =
send_tcp(
c->send_sock, (
char *) nc, send_size, 0);
14034 if (
i != send_size) {
14035 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
14043 bool restore_watchdog_timeout =
false;
14044 BOOL watchdog_call;
14045 DWORD watchdog_timeout;
14050 if (
c->rpc_timeout >= (
int) watchdog_timeout) {
14051 restore_watchdog_timeout =
true;
14055 DWORD rpc_status = 0;
14056 DWORD buf_size = 0;
14062 if (restore_watchdog_timeout) {
14067 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": timeout waiting for reply",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name);
14075 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, ss_recv_net_command() status %d",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name,
status);
14085 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, unknown RPC, status %d",
c->client_name.c_str(),
c->host_name.c_str(), rpc_name, rpc_status);
14093 va_start(ap, routine_id);
14144 routine_id &= ~RPC_NO_REPLY;
14153 fprintf(stderr,
"rpc_call(routine_id=%d) failed, no connection to mserver.\n", routine_id);
14171 bool rpc_cxx =
false;
14177 cm_msg(
MERROR,
"rpc_call",
"invalid rpc ID (%d)", routine_id);
14181 const char* rpc_name = rpc_entry.
name;
14188 va_start(ap, routine_id);
14205 if (rpc_no_reply) {
14206 i =
send_tcp(send_sock, (
char *) nc, send_size, 0);
14208 if (
i != send_size) {
14210 cm_msg(
MERROR,
"rpc_call",
"rpc \"%s\" error: send_tcp() failed", rpc_name);
14221 i =
send_tcp(send_sock, (
char *) nc, send_size, 0);
14222 if (
i != send_size) {
14224 cm_msg(
MERROR,
"rpc_call",
"rpc \"%s\" error: send_tcp() failed", rpc_name);
14232 bool restore_watchdog_timeout =
false;
14233 BOOL watchdog_call;
14234 DWORD watchdog_timeout;
14244 if (rpc_timeout >= (
int) watchdog_timeout) {
14245 restore_watchdog_timeout =
true;
14250 DWORD rpc_status = 0;
14251 DWORD buf_size = 0;
14256 if (restore_watchdog_timeout) {
14267 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": timeout waiting for reply, program abort", rpc_name);
14275 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": error, ss_recv_net_command() status %d, program abort", rpc_name,
status);
14283 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": error, unknown RPC, status %d", rpc_name, rpc_status);
14291 va_start(ap, routine_id);
14354 return bm_send_event(buffer_handle, pevent, unused, async_flag);
14376 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_n %d", sg_n);
14380 if (sg_ptr[0] == NULL) {
14381 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_ptr[0] is NULL");
14386 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)sg_len[0], (
int)
sizeof(
EVENT_HEADER));
14392 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16);
14395 if (data_size == 0) {
14396 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size zero");
14400 if (data_size > MAX_DATA_SIZE) {
14401 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
14409 for (
int i=0;
i<sg_n;
i++) {
14414 cm_msg(
MERROR,
"rpc_send_event_sg",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
14440 assert(
sizeof(
DWORD) == 4);
14441 DWORD bh_buf = buffer_handle;
14446 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(buffer handle) failed, event socket is now closed");
14452 for (
int i=0;
i<sg_n;
i++) {
14456 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(event data) failed, event socket is now closed");
14463 if (
count < total_size) {
14464 char padding[8] = { 0,0,0,0,0,0,0,0 };
14465 size_t padlen = total_size -
count;
14466 assert(padlen < 8);
14470 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(padding) failed, event socket is now closed");
14536 for (
size_t i = 0;
i <
n;
i++) {
14559 cm_msg(
MERROR,
"rpc_transition_dispatch",
"no handler for transition %d with sequence number %d",
CINT(0),
CINT(4));
14562 cm_msg(
MERROR,
"rpc_transition_dispatch",
"received unrecognized command %d", idx);
14618void debug_dump(
unsigned char *p,
int size)
14623 for (
i = 0;
i < (size - 1) / 16 + 1;
i++) {
14624 printf(
"%p ", p +
i * 16);
14625 for (
j = 0;
j < 16;
j++)
14626 if (
i * 16 +
j < size)
14627 printf(
"%02X ", p[
i * 16 +
j]);
14632 for (
j = 0;
j < 16;
j++) {
14634 if (
i * 16 +
j < size)
14635 printf(
"%c", (
c >= 32 &&
c < 128) ? p[
i * 16 +
j] :
'.');
14674 char *buffer = NULL;
14696 int param_size = -1;
14705 if (param_size == -1) {
14725 char *p = (
char *) realloc(*pbuf, new_size);
14728 cm_msg(
MERROR,
"recv_net_command_realloc",
"cannot reallocate buffer from %d bytes to %d bytes", *pbufsize, new_size);
14734 *pbufsize = new_size;
14745 int size = write_ptr - read_ptr;
14748 memcpy(buffer + copied, net_buffer + read_ptr, size);
14750 read_ptr = write_ptr;
14754 write_ptr = recv(sock, net_buffer + misalign, sa->
net_buffer_size - 8, 0);
14757 }
while (write_ptr == -1 && errno == EINTR);
14759 write_ptr = recv(sock, net_buffer + misalign, sa->
net_buffer_size - 8, 0);
14763 if (write_ptr <= 0) {
14764 if (write_ptr == 0)
14765 cm_msg(
MERROR,
"recv_net_command_realloc",
"rpc connection from \'%s\' on \'%s\' unexpectedly closed", sa->
prog_name.c_str(), sa->
host_name.c_str());
14767 cm_msg(
MERROR,
"recv_net_command_realloc",
"recv() returned %d, errno: %d (%s)", write_ptr, errno, strerror(errno));
14775 read_ptr = misalign;
14776 write_ptr += misalign;
14778 misalign = write_ptr % 8;
14783 memcpy(buffer + copied, net_buffer + read_ptr, size);
14788 if (write_ptr - read_ptr < param_size)
14791 *remaining = write_ptr - read_ptr;
14798 return size + copied;
14862 char header_buf[header_size];
14873 int hrd =
recv_tcp2(sock, header_buf, header_size, 1);
14882 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d", hrd);
14886 if (hrd < (
int) header_size) {
14887 int hrd1 =
recv_tcp2(sock, header_buf + hrd, header_size - hrd, 0);
14891 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(more header) returned %d", hrd1);
14899 if (hrd != (
int) header_size) {
14900 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d instead of %d", hrd, (
int) header_size);
14904 INT *pbh = (
INT *) header_buf;
14923 for (
int i=0;
i<5;
i++) {
14924 printf(
"recv_event_server: header[%d]: 0x%08x\n",
i, pbh[
i]);
14932 "received event header with invalid data_size %d: event_size %d, total_size %d", pevent->
data_size,
14940 int bufsize =
sizeof(
INT) + total_size;
14945 if (*pbuffer_size < bufsize) {
14946 int newsize = 1024 +
ALIGN8(bufsize);
14950 char *newbuf = (
char *) realloc(*pbuffer, newsize);
14951 if (newbuf == NULL) {
14952 cm_msg(
MERROR,
"recv_event_server",
"cannot realloc() event buffer from %d to %d bytes", *pbuffer_size,
14957 *pbuffer_size = newsize;
14962 memcpy(*pbuffer, header_buf, header_size);
14966 int to_read =
sizeof(
INT) + total_size - header_size;
14967 int rptr = header_size;
14970 int drd =
recv_tcp2(sock, (*pbuffer) + rptr, to_read, 0);
14974 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(data) returned %d instead of %d", drd, to_read);
15055 std::string errmsg;
15060 cm_msg(
MERROR,
"rpc_register_server",
"cannot listen to tcp port %d: %s", port, errmsg.c_str());
15065#if defined(F_SETFD) && defined(FD_CLOEXEC)
15066 status = fcntl(lsock, F_SETFD, fcntl(lsock, F_GETFD) | FD_CLOEXEC);
15068 cm_msg(
MERROR,
"rpc_register_server",
"fcntl(F_SETFD, FD_CLOEXEC) failed, errno %d (%s)", errno, strerror(errno));
15125 char *in_param_ptr, *out_param_ptr, *last_param_ptr;
15128 INT param_size, max_size;
15129 void *prpc_param[20];
15130 char debug_line[1024], *return_buffer;
15131 int return_buffer_size;
15132 int return_buffer_tls;
15136 int initial_buffer_size = 1024;
15159 return_buffer_tls =
i;
15162 assert(return_buffer);
15172 if (convert_flags) {
15188 assert(xroutine_id == routine_id);
15192 in_param_ptr = nc_in->
param;
15195 out_param_ptr = nc_out->
param;
15197 sprintf(debug_line,
"%s(", rl.
name);
15207 param_size =
ALIGN8(1 + strlen((
char *) (in_param_ptr)));
15211 param_size = *((
INT *) in_param_ptr);
15214 param_size =
ALIGN8(param_size);
15222 prpc_param[
i] = in_param_ptr;
15225 if (convert_flags) {
15236 if (strlen(debug_line) +
str.length() + 2 <
sizeof(debug_line)) {
15237 strcat(debug_line,
"\"");
15238 strcat(debug_line,
str.c_str());
15239 strcat(debug_line,
"\"");
15241 strcat(debug_line,
"...");
15243 strcat(debug_line,
str.c_str());
15245 in_param_ptr += param_size;
15257 max_size = *((
INT *) in_param_ptr);
15261 max_size =
ALIGN8(max_size);
15263 *((
INT *) out_param_ptr) = max_size;
15269 param_size = max_size;
15275 if ((
POINTER_T) out_param_ptr - (
POINTER_T) nc_out + param_size > return_buffer_size) {
15278 "return parameters (%d) too large for network buffer (%d)",
15288 "rpc_execute: return parameters (%d) too large for network buffer (%d), new buffer size (%d)",
15289 (
int)((
POINTER_T) out_param_ptr - (
POINTER_T) nc_out + param_size), return_buffer_size, new_size);
15292 itls = return_buffer_tls;
15298 cm_msg(
MERROR,
"rpc_execute",
"Cannot allocate return buffer of size %d", new_size);
15304 assert(return_buffer);
15312 memcpy(out_param_ptr, prpc_param[
i], param_size);
15315 strcat(debug_line,
"-");
15317 prpc_param[
i] = out_param_ptr;
15318 out_param_ptr += param_size;
15322 strcat(debug_line,
", ");
15327 strcat(debug_line,
")");
15330 last_param_ptr = out_param_ptr;
15359 out_param_ptr = nc_out->
param;
15367 max_size = *((
INT *) out_param_ptr);
15372 const char* param_ptr = ((
char *) out_param_ptr) +
ALIGN8(
sizeof(
INT));
15374 param_size = strlen(param_ptr) + 1;
15375 param_size =
ALIGN8(param_size);
15378 memmove(out_param_ptr, out_param_ptr +
ALIGN8(
sizeof(
INT)), param_size);
15381 memmove(out_param_ptr + param_size,
15382 out_param_ptr + max_size +
ALIGN8(
sizeof(
INT)),
15388 max_size = *((
INT *) out_param_ptr);
15396 param_size = *((
INT *) prpc_param[
i + 1]);
15397 *((
INT *) out_param_ptr) = param_size;
15403 param_size =
ALIGN8(param_size);
15406 memmove(out_param_ptr + param_size,
15407 out_param_ptr + max_size,
15415 if (convert_flags) {
15425 out_param_ptr += param_size;
15436 if (convert_flags) {
15484 std::vector<char>*
pv = NULL;
15533 bool debug =
false;
15539 if (convert_flags) {
15554 assert(xroutine_id == routine_id);
15572 char* in_param_ptr = (
char*)nc_in->
param;
15575 printf(
"rpc_execute_cxx: routine_id %d, name \"%s\"\n", routine_id, rl.
name);
15582 std::vector<RPE> params;
15589 params.resize(nparams);
15591 size_t in_offset = 0;
15593 for (
int i = 0;
i < nparams;
i++) {
15594 in_param_size[
i] = 0;
15595 in_param_offset[
i] = 0;
15604 arg_size = 1 + strlen((
char *) (in_param_ptr));
15609 int arg_size_align8 = *((
INT *) in_param_ptr);
15616 arg_size = arg_size_align8;
15619 arg_size = *((
INT *) (((
char*)in_param_ptr) +
ALIGN8(arg_size_align8)));
15626 cm_msg(
MERROR,
"rpc_execute_cxx",
"RPC %d, param %d tid %d flags 0x%x size mismatch: header %d vs next param %d", routine_id,
i, tid, flags, arg_size_align8, arg_size);
15635 int param_size =
ALIGN8(arg_size);
15637 in_param_size[
i] = param_size;
15638 in_param_offset[
i] = in_offset;
15640 params[
i].offset = in_offset;
15641 params[
i].arg_size = arg_size;
15642 params[
i].param_size = param_size;
15645 if (convert_flags) {
15653 in_param_ptr += param_size;
15654 in_offset += param_size;
15661 params[
i].out_max_size = 0;
15668 params[
i].out_max_size_offset = in_offset;
15670 INT max_size = *((
INT *) in_param_ptr);
15675 params[
i].out_max_size = max_size;
15679 params[
i].out_max_size = rl.
param[
i].
n;
15685 params[
i].ps =
new std::string;
15687 *(params[
i].ps) = (
char*)nc_in->
param + in_param_offset[
i];
15690 prpc_param[
i] = (
void*) params[
i].ps;
15692 params[
i].pv =
new std::vector<char>;
15694 params[
i].pv->insert(params[
i].pv->end(), (
char*)nc_in->
param + in_param_offset[
i], (
char*)nc_in->
param + in_param_offset[
i] + params[
i].arg_size);
15697 prpc_param[
i] = (
void*) params[
i].pv;
15699 cm_msg(
MERROR,
"rpc_execute_cxx",
"RPC %d: param %d tid %d flags 0x%x, TID not compatible with flag RPC_CXX", routine_id,
i, tid, flags);
15704 params[
i].pv =
new std::vector<char>;
15705 params[
i].pv->insert(params[
i].pv->end(), (
char*)nc_in->
param + in_param_offset[
i], (
char*)nc_in->
param + in_param_offset[
i] + params[
i].arg_size);
15706 size_t want_size = params[
i].out_max_size;
15708 if (params[
i].pv->size() < want_size)
15709 params[
i].pv->resize(want_size);
15710 prpc_param[
i] = params[
i].pv->data();
15711 }
else if (flags &
RPC_IN) {
15712 prpc_param[
i] = (
char*)nc_in->
param + in_param_offset[
i];
15714 }
else if (flags &
RPC_OUT) {
15715 params[
i].pv =
new std::vector<char>;
15716 params[
i].pv->resize(params[
i].out_max_size);
15717 prpc_param[
i] = params[
i].pv->data();
15723 printf(
"rpc_execute_cxx: param %2d, tid %2d, flags 0x%04x, in %3zu+%-3zu+%-3zu, out max size %3zu at %3zu, ptr %p\n",
i, tid, flags, params[
i].
offset, params[
i].arg_size, params[
i].param_size, params[
i].out_max_size, params[
i].out_max_size_offset, prpc_param[
i]);
15727 printf(
"rpc_execute_cxx: nc_in size %d, in_offset %zu\n", nc_in->
header.
param_size, in_offset);
15732 ok &= in_param_offset[ 0] == 0; ok &= in_param_size[ 0] == 8;
15733 ok &= in_param_offset[ 1] == 0; ok &= in_param_size[ 1] == 0;
15734 ok &= in_param_offset[ 2] == 8; ok &= in_param_size[ 2] == 8;
15735 ok &= in_param_offset[ 3] == 16; ok &= in_param_size[ 3] == 16;
15736 ok &= in_param_offset[ 4] == 0; ok &= in_param_size[ 4] == 0;
15737 ok &= in_param_offset[ 5] == 32; ok &= in_param_size[ 5] == 8;
15738 ok &= in_param_offset[ 6] == 0; ok &= in_param_size[ 6] == 0;
15739 ok &= in_param_offset[ 7] == 40; ok &= in_param_size[ 7] == 8;
15740 ok &= in_param_offset[ 8] == 48; ok &= in_param_size[ 8] == 16;
15741 ok &= in_param_offset[ 9] == 64; ok &= in_param_size[ 9] == 8;
15742 ok &= in_param_offset[10] == 72; ok &= in_param_size[10] == 72;
15743 ok &= in_param_offset[11] == 0; ok &= in_param_size[11] == 0;
15744 ok &= in_param_offset[12] == 144; ok &= in_param_size[12] == 72;
15745 ok &= in_param_offset[13] == 224; ok &= in_param_size[13] == 40;
15746 ok &= in_param_offset[14] == 264; ok &= in_param_size[14] == 8;
15747 ok &= in_param_offset[15] == 280; ok &= in_param_size[15] == 16;
15748 ok &= in_param_offset[16] == 296; ok &= in_param_size[16] == 8;
15749 ok &= in_param_offset[17] == 0; ok &= in_param_size[17] == 0;
15750 ok &= in_param_offset[18] == 304; ok &= in_param_size[18] == 8;
15751 ok &= in_offset == 312;
15754 cm_msg(
MERROR,
"rpc_execute_cxx",
"RPC_TEST2 parameters encoding error!");
15760 printf(
"rpc_execute_cxx: calling dispatch()\n");
15772 printf(
"rpc_execute_cxx: dispatch() status %d\n",
status);
15797 std::vector<char> v_out;
15801 for (
int i = 0;
i < nparams;
i++) {
15808 size_t arg_size = 1 + params[
i].ps->length();
15809 size_t param_size =
ALIGN8(arg_size);
15812 printf(
"rpc_execute_cxx: param %2d, std::string arg_size %zu, param_size %zu, string [%s]\n",
i, arg_size, param_size, params[
i].ps->c_str());
15814 v_out.insert(v_out.end(), params[
i].ps->c_str(), params[
i].ps->c_str() + arg_size);
15815 v_out.resize(v_out.size() + param_size - arg_size);
15817 size_t arg_size = params[
i].pv->size();
15818 size_t param_size =
ALIGN8(arg_size);
15821 printf(
"rpc_execute_cxx: param %2d, std::vector arg_size %zu, param_size %zu\n",
i, arg_size, param_size);
15824 *((
INT *) buf) = arg_size;
15827 v_out.insert(v_out.end(), buf, buf +
ALIGN8(
sizeof(
INT)));
15828 v_out.insert(v_out.end(), params[
i].pv->data(), params[
i].pv->data() + arg_size);
15829 v_out.resize(v_out.size() + param_size - arg_size);
15831 cm_msg(
MERROR,
"rpc_execute_cxx",
"RPC %d: param %d tid %d flags 0x%x, TID not compatible with flag RPC_CXX", routine_id,
i, tid, flags);
15835 size_t convert_offset = 0;
15836 size_t convert_size = 0;
15839 size_t max_size = params[
i].out_max_size;
15840 char* param_ptr = (
char *) prpc_param[
i];
15842 size_t arg_size = 1 + strlen(param_ptr);
15843 if (arg_size > max_size) {
15844 param_ptr[max_size] = 0;
15845 size_t arg_size = 1 + strlen(param_ptr);
15846 assert(arg_size == max_size);
15848 size_t param_size =
ALIGN8(arg_size);
15851 printf(
"rpc_execute_cxx: param %2d, string max_size %zu, string_size %zu, param_size %zu\n",
i, max_size, arg_size, param_size);
15853 v_out.insert(v_out.end(), param_ptr, param_ptr + arg_size);
15854 v_out.resize(v_out.size() + param_size - arg_size);
15856 size_t max_size = params[
i].out_max_size;
15857 size_t arg_size = *((
INT *) prpc_param[
i + 1]);
15858 char* param_ptr = (
char*)prpc_param[
i];
15859 size_t param_size =
ALIGN8(arg_size);
15862 printf(
"rpc_execute_cxx: param %2d, array max_size %zu, param_size %zu\n",
i, max_size, param_size);
15865 *((
INT *) buf) = arg_size;
15868 v_out.insert(v_out.end(), buf, buf +
ALIGN8(
sizeof(
INT)));
15869 convert_offset = v_out.size();
15870 convert_size = arg_size;
15871 v_out.insert(v_out.end(), param_ptr, param_ptr + arg_size);
15872 v_out.resize(v_out.size() + param_size - arg_size);
15874 char* param_ptr = (
char*)prpc_param[
i];
15878 size_t param_size =
ALIGN8(arg_size);
15882 printf(
"rpc_execute_cxx: param %2d, tid %2d, arg_size %zu, param_size %zu, value %d\n",
i, tid, arg_size, param_size, *(
int*)param_ptr);
15884 printf(
"rpc_execute_cxx: param %2d, tid %2d, arg_size %zu, param_size %zu\n",
i, tid, arg_size, param_size);
15888 convert_offset = v_out.size();
15889 convert_size = arg_size;
15890 v_out.insert(v_out.end(), param_ptr, param_ptr + arg_size);
15891 v_out.resize(v_out.size() + param_size - arg_size);
15895 if (convert_flags) {
15913 if (convert_flags) {
15926 printf(
"rpc_execute_cxx: send_tcp() sent %d bytes\n",
status);
15959 printf(
"rpc_test_rpc_test2!\n");
15962 int int_inout = 456;
15964 char string_out[33];
15965 char string2_out[49];
15967 char string_inout[25];
15968 strcpy(string_inout,
"string_inout");
15972 struct_in.
type = 111;
15974 strcpy(struct_in.
name,
"name");
15980 struct_inout.
type = 111111;
15982 strcpy(struct_inout.
name,
"name_name");
15985 uint32_t dwordarray_inout[9];
15986 size_t dwordarray_inout_size =
sizeof(dwordarray_inout);
15988 for (
int i=0;
i<9;
i++) {
15989 dwordarray_inout[
i] =
i*10;
15994 for (
size_t i=0;
i<
sizeof(array_in);
i++) {
15995 array_in[
i] =
'a' +
i;
15998 char array_out[16];
15999 size_t array_out_size =
sizeof(array_out);
16001 for (
size_t i=0;
i<
sizeof(array_out);
i++) {
16002 array_out[
i] =
'Z';
16010 string_out,
sizeof(string_out),
16011 string2_out,
sizeof(string2_out),
16012 string_inout,
sizeof(string_inout),
16016 dwordarray_inout, &dwordarray_inout_size,
16017 array_in,
sizeof(array_in),
16018 array_out, &array_out_size
16022 printf(
"rpc_call(RPC_TEST2) status %d\n",
status);
16026 if (int_out != 789) {
16027 printf(
"int_out mismatch!\n");
16031 if (int_inout != 456*2) {
16032 printf(
"int_inout mismatch!\n");
16036 if (strcmp(string_out,
"string_out") != 0) {
16037 printf(
"string_out mismatch [%s] vs [%s]\n", string_out,
"string_out");
16041 if (strcmp(string2_out,
"second string_out") != 0) {
16042 printf(
"string2_out mismatch [%s] vs [%s]\n", string2_out,
"second string_out");
16046 if (strcmp(string_inout,
"return string_inout") != 0) {
16047 printf(
"string_inout mismatch [%s] vs [%s]\n", string_inout,
"return string_inout");
16057 pkey = &struct_out;
16060 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16064 pkey = &struct_inout;
16067 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16071 if (dwordarray_inout_size != 4*5) {
16072 printf(
"dwordarray_inout_size mismatch %d vs %d\n", (
int)dwordarray_inout_size, 4*5);
16075 for (
size_t i=0;
i<dwordarray_inout_size/
sizeof(uint32_t);
i++) {
16076 if (dwordarray_inout[
i] !=
i*10+
i) {
16077 printf(
"dwordarray_inout[%d] data mismatch %d vs %zu\n", (
int)
i, dwordarray_inout[
i],
i*10+
i);
16088 if (array_out_size != 15) {
16089 printf(
"array_out_size mismatch %d vs %d\n", (
int)array_out_size, 15);
16092 if (strcmp(array_out,
"test test test") != 0) {
16093 printf(
"array_out data mismatch\n");
16121 printf(
"rpc_test_rpc_test2_cxx!\n");
16124 int int_inout = 456;
16126 char string_out[33];
16127 std::string string2_out;
16128 std::string string_inout =
"string_inout";
16132 struct_in.
type = 111;
16134 strcpy(struct_in.
name,
"name");
16140 struct_inout.
type = 111111;
16142 strcpy(struct_inout.
name,
"name_name");
16145 uint32_t dwordarray_inout[9];
16146 size_t dwordarray_inout_size =
sizeof(dwordarray_inout);
16148 for (
int i=0;
i<9;
i++) {
16149 dwordarray_inout[
i] =
i*10;
16152 std::vector<char> array_in;
16153 int array_in_size = 10;
16155 for (
int i=0;
i<array_in_size;
i++) {
16156 array_in.push_back(
'a' +
i);
16159 std::vector<char> array_out;
16160 size_t array_out_size = 16;
16167 string_out,
sizeof(string_out),
16173 dwordarray_inout, &dwordarray_inout_size,
16174 &array_in, array_in_size,
16175 &array_out, &array_out_size
16179 printf(
"rpc_call(RPC_TEST2_CXX) status %d\n",
status);
16183 if (int_out != 789) {
16184 printf(
"int_out mismatch!\n");
16188 if (int_inout != 456*2) {
16189 printf(
"int_inout mismatch!\n");
16193 if (strcmp(string_out,
"string_out") != 0) {
16194 printf(
"string_out mismatch [%s] vs [%s]\n", string_out,
"string_out");
16198 if (string2_out !=
"second string_out") {
16199 printf(
"string2_out mismatch [%s] vs [%s]\n", string2_out.c_str(),
"second string_out");
16203 if (string_inout !=
"return string_inout") {
16204 printf(
"string_inout mismatch [%s] vs [%s]\n", string_inout.c_str(),
"return string_inout");
16214 pkey = &struct_out;
16217 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16221 pkey = &struct_inout;
16224 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16228 if (dwordarray_inout_size != 4*5) {
16229 printf(
"dwordarray_inout_size mismatch %d vs %d\n", (
int)dwordarray_inout_size, 4*5);
16232 for (
size_t i=0;
i<dwordarray_inout_size/
sizeof(uint32_t);
i++) {
16233 if (dwordarray_inout[
i] !=
i*10+
i) {
16234 printf(
"dwordarray_inout[%d] data mismatch %d vs %zu\n", (
int)
i, dwordarray_inout[
i],
i*10+
i);
16245 if (array_out_size != 15) {
16246 printf(
"array_out_size mismatch %d vs %d\n", (
int)array_out_size, 15);
16248 }
else if (array_out.size() != 15) {
16249 printf(
"array_out.size() mismatch %d vs %d\n", (
int)array_out.size(), 15);
16252 if (strcmp(array_out.data(),
"test test test") != 0) {
16253 printf(
"array_out data mismatch\n");
16281 printf(
"rpc_test_rpc_test3_cxx!\n");
16284 int int_inout = 456;
16286 char string_out[33];
16287 std::string string2_out;
16288 std::string string_inout =
"string_inout";
16292 struct_in.
type = 111;
16294 strcpy(struct_in.
name,
"name");
16300 struct_inout.
type = 111111;
16302 strcpy(struct_inout.
name,
"name_name");
16305 uint32_t dwordarray_inout[9];
16306 size_t dwordarray_inout_size =
sizeof(dwordarray_inout);
16308 for (
int i=0;
i<9;
i++) {
16309 dwordarray_inout[
i] =
i*10;
16312 std::vector<char> array_in;
16313 int array_in_size = 10;
16315 for (
int i=0;
i<array_in_size;
i++) {
16316 array_in.push_back(
'a' +
i);
16319 std::vector<char> array_out;
16326 string_out,
sizeof(string_out),
16332 dwordarray_inout, &dwordarray_inout_size,
16338 printf(
"rpc_call(RPC_TEST3_CXX) status %d\n",
status);
16342 if (int_out != 789) {
16343 printf(
"int_out mismatch!\n");
16347 if (int_inout != 456*2) {
16348 printf(
"int_inout mismatch!\n");
16352 if (strcmp(string_out,
"string_out") != 0) {
16353 printf(
"string_out mismatch [%s] vs [%s]\n", string_out,
"string_out");
16357 if (string2_out !=
"second string_out") {
16358 printf(
"string2_out mismatch [%s] vs [%s]\n", string2_out.c_str(),
"second string_out");
16362 if (string_inout !=
"return string_inout") {
16363 printf(
"string_inout mismatch [%s] vs [%s]\n", string_inout.c_str(),
"return string_inout");
16373 pkey = &struct_out;
16376 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16380 pkey = &struct_inout;
16383 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n", pkey->
type, pkey->
num_values, pkey->
name, pkey->
last_written);
16387 if (dwordarray_inout_size != 4*5) {
16388 printf(
"dwordarray_inout_size mismatch %d vs %d\n", (
int)dwordarray_inout_size, 4*5);
16391 for (
size_t i=0;
i<dwordarray_inout_size/
sizeof(uint32_t);
i++) {
16392 if (dwordarray_inout[
i] !=
i*10+
i) {
16393 printf(
"dwordarray_inout[%d] data mismatch %d vs %zu\n", (
int)
i, dwordarray_inout[
i],
i*10+
i);
16404 if (array_out.size() != 15) {
16405 printf(
"array_out.size() mismatch %d vs %d\n", (
int)array_out.size(), 15);
16408 if (strcmp(array_out.data(),
"test test test") != 0) {
16409 printf(
"array_out data mismatch\n");
16437 printf(
"rpc_test_rpc_test4_cxx!\n");
16440 int int_inout = 456;
16442 std::string string_in =
"test string";
16443 std::string string_out;
16444 std::string string_inout =
"string_inout";
16446 std::vector<char> array_in;
16447 int array_in_size = 10;
16449 for (
int i=0;
i<array_in_size;
i++) {
16450 array_in.push_back(
'a' +
i);
16453 std::vector<char> array_out;
16455 std::vector<char> array_inout;
16456 int array_inout_size = 6;
16458 for (
int i=0;
i<array_inout_size;
i++) {
16459 array_inout.push_back(
'0' +
i);
16475 printf(
"rpc_call(RPC_TEST4_CXX) status %d\n",
status);
16479 if (int_out != 789) {
16480 printf(
"int_out mismatch!\n");
16484 if (int_inout != 456*2) {
16485 printf(
"int_inout mismatch!\n");
16489 if (string_out !=
"return string_out") {
16490 printf(
"string_out mismatch [%s] vs [%s]\n", string_out.c_str(),
"return string_out");
16494 if (string_inout !=
"return string_inout") {
16495 printf(
"string_inout mismatch [%s] vs [%s]\n", string_inout.c_str(),
"return string_inout");
16499 if (array_out.size() != 15) {
16500 printf(
"array_out.size() mismatch %d vs %d\n", (
int)array_out.size(), 15);
16503 if (strcmp(array_out.data(),
"test test test") != 0) {
16504 printf(
"array_out data mismatch\n");
16509 if (array_inout.size() != 12) {
16510 printf(
"array_inout.size() mismatch %d vs %d\n", (
int)array_inout.size(), 12);
16513 for (
int i=0;
i<6;
i++) {
16514 if (array_inout[
i] !=
'0' +
i) {
16515 printf(
"array_inout data mismatch, index %d, value %d should be %d\n",
i, array_inout[
i], (
'0'+
i));
16519 for (
int i=6;
i<12;
i++) {
16520 if (array_inout[
i] != 2*(
'0' + (
i-6))) {
16521 printf(
"array_inout data mismatch, index %d, value %d should be %d\n",
i, array_inout[
i], 2*(
'0'+
i));
16640 if (strcmp(hostname,
"localhost") == 0)
16643 if (strcmp(hostname,
"localhost.localdomain") == 0)
16646 if (strcmp(hostname,
"localhost6") == 0)
16649 if (strcmp(hostname,
"ip6-localhost") == 0)
16657 if (h == hostname) {
16674 std::string hostname;
16686 static std::atomic_int max_report(10);
16687 if (max_report > 0) {
16689 if (max_report == 0) {
16690 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\', this message will no longer be reported", hostname.c_str());
16692 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\'. Add this host to \"/Experiment/Security/RPC hosts/Allowed hosts\"", hostname.c_str());
16726 INT port1, port2, port3;
16728 char net_buffer[256];
16729 struct linger ling;
16734 sock = accept(lsock, NULL, NULL);
16759 char command = (char) toupper(net_buffer[0]);
16773#ifdef LOCAL_ROUTINES
16776 for (
unsigned i=0;
i<exptab.
exptab.size();
i++) {
16778 const char*
str = exptab.
exptab[
i].name.c_str();
16779 send(sock,
str, strlen(
str) + 1, 0);
16781 send(sock,
"", 1, 0);
16792 port1 = port2 = version[0] = 0;
16799 port1 = strtoul(net_buffer + 2, &ptr, 0);
16800 port2 = strtoul(ptr, &ptr, 0);
16801 port3 = strtoul(ptr, &ptr, 0);
16803 while (*ptr ==
' ')
16807 for (; *ptr != 0 && *ptr !=
' ' &&
i < (int)
sizeof(version) - 1;)
16808 version[
i++] = *ptr++;
16811 assert(
i < (
int)
sizeof(version));
16815 for (; *ptr != 0 && *ptr !=
' ';)
16818 while (*ptr ==
' ')
16822 for (; *ptr != 0 && *ptr !=
' ' && *ptr !=
'\n' && *ptr !=
'\r' &&
i < (int)
sizeof(
experiment) - 1;)
16832 mstrlcpy(v1, version,
sizeof(v1));
16833 if (strchr(v1,
'.'))
16834 if (strchr(strchr(v1,
'.') + 1,
'.'))
16835 *strchr(strchr(v1,
'.') + 1,
'.') = 0;
16839 if (strchr(
str,
'.'))
16840 if (strchr(strchr(
str,
'.') + 1,
'.'))
16841 *strchr(strchr(
str,
'.') + 1,
'.') = 0;
16843 if (strcmp(v1,
str) != 0) {
16845 cm_msg(
MERROR,
"rpc_server_accept",
"received string: %s", net_buffer + 2);
16860#ifdef LOCAL_ROUTINES
16866 bool found =
false;
16872 for (idx = 0; idx < exptab.
exptab.size(); idx++) {
16885 send(sock,
"2", 2, 0);
16894 char host_port1_str[30], host_port2_str[30], host_port3_str[30];
16895 char debug_str[30];
16904 const char *argv[10];
16905 argv[0] = mserver_path;
16907 argv[2] = host_port1_str;
16908 argv[3] = host_port2_str;
16909 argv[4] = host_port3_str;
16910 argv[5] = debug_str;
16917 argv[0], argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], argv[7], argv[8],
16926 send(sock,
str, strlen(
str) + 1, 0);
16932 send(sock,
str, strlen(
str) + 1, 0);
16939 cm_msg(
MERROR,
"rpc_server_accept",
"received unknown command '%c' code %d", command, command);
16949 setsockopt(sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
16980 INT client_hw_type = 0, hw_type;
16981 std::string client_program;
16984 char net_buffer[256], *p;
16986 int sock = accept(lsock, NULL, NULL);
17002 client_program =
"(unknown)";
17005 i =
recv_string(sock, net_buffer,
sizeof(net_buffer), 10000);
17012 p = strtok(net_buffer,
" ");
17014 client_hw_type = atoi(p);
17015 p = strtok(NULL,
" ");
17019 p = strtok(NULL,
" ");
17022 client_program = p;
17023 p = strtok(NULL,
" ");
17027 p = strtok(NULL,
" ");
17048 status = send(sock,
str.c_str(),
str.length() + 1, 0);
17082 int recv_sock, send_sock, event_sock;
17084 std::string client_program;
17085 INT client_hw_type, hw_type;
17087 char net_buffer[256];
17095 std::string errmsg;
17129 flag = 2 * 1024 * 1024;
17130 status = setsockopt(event_sock, SOL_SOCKET, SO_RCVBUF, (
char *) &flag,
sizeof(
INT));
17132 cm_msg(
MERROR,
"rpc_server_callback",
"cannot setsockopt(SOL_SOCKET, SO_RCVBUF), errno %d (%s)", errno,
17137 cm_msg(
MERROR,
"rpc_server_callback",
"timeout on receive remote computer info");
17146 client_hw_type = strtoul(net_buffer, &p, 0);
17151 client_program = p;
17185 sprintf(
str,
"%d", hw_type);
17186 send(recv_sock,
str, strlen(
str) + 1, 0);
17255 if (n_received <= 0) {
17257 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"recv_net_command() returned %d", n_received);
17264 memcpy(&nc_in, buf,
sizeof(nc_in));
17275 bool rpc_cxx =
false;
17280 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"Unknown RPC routine_id %d", routine_id);
17290 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"rpc_execute() returned %d, abort",
status);
17300 }
while (remaining);
17315 if (strchr(
str,
'.'))
17316 *strchr(
str,
'.') = 0;
17317 cm_msg(
MTALK,
"rpc_server_receive_rpc",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
17334 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"mserver unexpected shutdown, status %d",
status);
17396 static char *xbuf = NULL;
17397 static int xbufsize = 0;
17398 static bool xbufempty =
true;
17401 if (sa == NULL && xbufempty)
17404 static bool recurse =
false;
17407 cm_msg(
MERROR,
"rpc_server_receive_event",
"internal error: called recursively");
17419 if (xbufempty && sa) {
17422 if (n_received < 0) {
17424 cm_msg(
MERROR,
"rpc_server_receive_event",
"recv_event_server_realloc() returned %d, abort", n_received);
17428 if (n_received == 0) {
17444 INT *pbh = (
INT *) xbuf;
17452 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d (SS_ABORT), abort",
status);
17463 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d, mserver dropped this event",
status);
17479 if (strchr(
str,
'.'))
17480 *strchr(
str,
'.') = 0;
17481 cm_msg(
MTALK,
"rpc_server_receive_event",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
17547 }
else if (timeout_msec ==
BM_WAIT) {
17592 struct linger ling;
17601 setsockopt(sa->
recv_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
17605 setsockopt(sa->
send_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
17610 setsockopt(sa->
event_sock, SOL_SOCKET, SO_LINGER, (
char *) &ling,
sizeof(ling));
17665 struct timeval timeout;
17692 if (convert_flags) {
17701 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, send_tcp() returned %d",
17724 timeout.tv_sec = 1;
17725 timeout.tv_usec = 0;
17733 if (now > timeout_end_ms)
17746 if (!FD_ISSET(sa->
send_sock, &readfds) &&
17749 cm_msg(
MERROR,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec",
17765 if (FD_ISSET(sa->
send_sock, &readfds)) {
17768 cm_msg(
MERROR,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, recv_tcp() returned %d",
17816#ifndef DOXYGEN_SHOULD_SKIP_THIS
17967 if (((
PTYPE) event & 0x07) != 0) {
17968 cm_msg(
MERROR,
"bk_create",
"Bank %s created with unaligned event pointer",
name);
17977 *pdata = pbk32a + 1;
17985 *pdata = pbk32 + 1;
17998#ifndef DOXYGEN_SHOULD_SKIP_THIS
18011 DWORD bklen, bktype, bksze;
18037 memmove(pdest, (
char *) psbkh32a,
ALIGN8(bksze) +
sizeof(
BANK32A));
18051 memmove(pdest, (
char *) psbkh32,
ALIGN8(bksze) +
sizeof(
BANK32));
18058 psbkh = ((
BANK *) psdata - 1);
18065 memmove(pdest, (
char *) psbkh,
ALIGN8(bksze) +
sizeof(
BANK));
18104 if (*((
DWORD *) pbk32a->
name) == dname) {
18106 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
18114 memmove(pbk32a, (
char *) (pbk32a + 1) +
ALIGN8(pbk32a->
data_size), remaining);
18119 }
while ((
DWORD) ((
char *) pbk32a - (
char *) event) <
18126 if (*((
DWORD *) pbk32->
name) == dname) {
18128 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
18136 memmove(pbk32, (
char *) (pbk32 + 1) +
ALIGN8(pbk32->
data_size), remaining);
18141 }
while ((
DWORD) ((
char *) pbk32 - (
char *) event) <
18150 remaining = ((
char *) event + ((
BANK_HEADER *) event)->data_size +
18158 memmove(pbk, (
char *) (pbk + 1) +
ALIGN8(pbk->
data_size), remaining);
18163 }
while ((
DWORD) ((
char *) pbk - (
char *) event) <
18187 pbk32a->
data_size = (
DWORD) ((
char *) pdata - (
char *) (pbk32a + 1));
18189 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk32a->
name[0], pbk32a->
name[1], pbk32a->
name[2], pbk32a->
name[3]);
18194 pbk32->
data_size = (
DWORD) ((
char *) pdata - (
char *) (pbk32 + 1));
18196 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk32->
name[0], pbk32->
name[1], pbk32->
name[2], pbk32->
name[3]);
18201 uint32_t size = (uint32_t) ((
char *) pdata - (
char *) (pbk + 1));
18202 if (size > 0xFFFF) {
18203 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
18208 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n", pbk->
name[0], pbk->
name[1], pbk->
name[2], pbk->
name[3]);
18210 if (size > 0xFFFF) {
18211 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
18258 if (pmbk32a == NULL)
18262 if (pmbk32 == NULL)
18276 strncat(bklist, (
char *) pmbk32a->
name, 4);
18278 strncat(bklist, (
char *) pmbk32->
name, 4);
18280 strncat(bklist, (
char *) pmbk->
name, 4);
18302 while ((
DWORD) ((
char *) pbk32a - (
char *) event) <
18304 if (*((
DWORD *) pbk32a->
name) == dname) {
18305 *((
void **) pdata) = pbk32a + 1;
18315 while ((
DWORD) ((
char *) pbk32 - (
char *) event) <
18317 if (*((
DWORD *) pbk32->
name) == dname) {
18318 *((
void **) pdata) = pbk32 + 1;
18328 while ((
DWORD) ((
char *) pbk - (
char *) event) <
18331 *((
void **) pdata) = pbk + 1;
18342 *((
void **) pdata) = NULL;
18363 if (*((
DWORD *) pbk32a->
name) == dname) {
18364 *((
void **) pdata) = pbk32a + 1;
18370 *bktype = pbk32a->
type;
18379 if (*((
DWORD *) pbk32->
name) == dname) {
18380 *((
void **) pdata) = pbk32 + 1;
18386 *bktype = pbk32->
type;
18396 *((
void **) pdata) = pbk + 1;
18402 *bktype = pbk->
type;
18410 *((
void **) pdata) = NULL;
18454 *pbk = (
BANK *) ((
char *) (*pbk + 1) +
ALIGN8((*pbk)->data_size));
18456 *((
void **) pdata) = (*pbk) + 1;
18459 *pbk = *((
BANK **) pdata) = NULL;
18468#ifndef DOXYGEN_SHOULD_SKIP_THIS
18494 *pbk = (
BANK32 *) ((
char *) (*pbk + 1) +
ALIGN8((*pbk)->data_size));
18496 *((
void **) pdata) = (*pbk) + 1;
18527 if (*pbk32a == NULL)
18530 *pbk32a = (
BANK32A *) ((
char *) (*pbk32a + 1) +
ALIGN8((*pbk32a)->data_size));
18532 *((
void **) pdata) = (*pbk32a) + 1;
18572 if (pbh->
flags < 0x10000 && !force)
18579 pbk = (
BANK *) (pbh + 1);
18589 pdata = pbk32a + 1;
18606 pbk = (
BANK *) pbk32a;
18609 pbk = (
BANK *) pbk32;
18618 while ((
char *) pdata < (
char *) pbk) {
18620 pdata = (
void *) (((
WORD *) pdata) + 1);
18628 while ((
char *) pdata < (
char *) pbk) {
18630 pdata = (
void *) (((
DWORD *) pdata) + 1);
18637 while ((
char *) pdata < (
char *) pbk) {
18639 pdata = (
void *) (((
double *) pdata) + 1);
18659#ifndef DOXYGEN_SHOULD_SKIP_THIS
18684#define MAX_RING_BUFFER 100
18765 if (
rb[
i].buffer == NULL)
18776 assert(
rb[
i].buffer);
18860 for (
i = 0;
i <= millisec / 10;
i++) {
18873 rp >
rb[h].buffer) {
18927 unsigned char *new_wp;
18935 cm_msg(
MERROR,
"rb_increment_wp",
"event size of %d MB larger than max_event_size of %d MB",
18940 new_wp =
rb[h].
wp + size;
18946 assert(
rb[h].rp !=
rb[h].buffer);
18948 if (new_wp >
rb[h].ep)
19003 for (
i = 0;
i <= millisec / 10;
i++) {
19005 if (
rb[h].wp !=
rb[h].rp) {
19007 *p =
rb[handle - 1].
rp;
19058 unsigned char *new_rp;
19069 new_rp =
rb[h].
rp + size;
19073 if (new_rp >= ep &&
rb[h].wp < ep)
19076 rb[handle - 1].
rp = new_rp;
19115 if (
rb[h].wp >=
rb[h].rp)
19133 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event %d ODB record size mismatch, db_set_record() status %d", pevent->
event_id,
status);
19140 char *pdata, *pdata0;
19149 HNDLE hKeyRoot, hKeyl, *hKeys;
19158 for (
n=0 ; ;
n++) {
19161 if (pbk32a == NULL)
19184 if (pbk32a == NULL)
19222 cm_msg(
MERROR,
"cm_write_event_to_odb",
"please define bank \"%s\" in BANK_LIST in frontend",
name);
19227 for (
i = 0;;
i++) {
19240 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
19243 hKeys[
n++] = hKeyl;
19254 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot create key for bank \"%s\" with tid %d in ODB, db_create_key() status %d",
name, bktype,
status);
19259 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot find key for bank \"%s\" in ODB, after db_create_key(), db_find_key() status %d",
name,
status);
19266 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
19268 hKeys[
n++] = hKeyRoot;
19280 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event format %d is not supported (see midas.h definitions of FORMAT_xxx)", format);
std::atomic_bool connected
size_t out_max_size_offset
BUFFER * get_pbuf() const
bm_lock_buffer_guard(BUFFER *pbuf, bool do_not_lock=false)
bm_lock_buffer_guard & operator=(const bm_lock_buffer_guard &)=delete
bm_lock_buffer_guard(const bm_lock_buffer_guard &)=delete
void set_string_size(std::string s, int size)
static bool exists(const std::string &name)
INT transition(INT run_number, char *error)
INT al_get_alarms(std::string *presult)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
INT bk_iterate32a(const void *event, BANK32A **pbk32a, void *pdata)
static void copy_bk_name(char *dst, const char *src)
INT bk_swap(void *event, BOOL force)
BOOL bk_is32a(const void *event)
int bk_delete(void *event, const char *name)
BOOL bk_is32(const void *event)
INT bk_iterate32(const void *event, BANK32 **pbk, void *pdata)
INT bk_locate(const void *event, const char *name, void *pdata)
void bk_init(void *event)
INT bk_list(const void *event, char *bklist)
INT bk_copy(char *pevent, char *psrce, const char *bkname)
INT bk_iterate(const void *event, BANK **pbk, void *pdata)
void bk_init32(void *event)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_find(const BANK_HEADER *pbkh, const char *name, DWORD *bklen, DWORD *bktype, void **pdata)
INT bk_size(const void *event)
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
static int bm_skip_event(BUFFER *pbuf)
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
static INT bm_receive_event_rpc_cxx(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_write_statistics_to_odb(void)
#define MAX_DEFRAG_EVENTS
INT bm_delete_request(INT request_id)
INT bm_close_all_buffers(void)
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
static int bm_validate_client_index_locked(bm_lock_buffer_guard &pbuf_guard)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
static int bm_validate_buffer_locked(const BUFFER *pbuf)
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_compose_event_threadsafe(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
INT bm_remove_event_request(INT buffer_handle, INT request_id)
static double _bm_mutex_timeout_sec
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
static EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
static void bm_update_last_activity(DWORD millitime)
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
static DWORD _bm_max_event_size
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
INT bm_get_buffer_handle(const char *buffer_name, INT *buffer_handle)
int bm_send_event_vec(int buffer_handle, const std::vector< char > &event, int timeout_msec)
static int _bm_lock_timeout
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
INT bm_receive_event_alloc(INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
static void bm_reset_buffer_locked(BUFFER *pbuf)
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
INT cm_set_path(const char *path)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
static int cm_transition_call(TrState *s, int idx)
INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown)
static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info)
static std::atomic< std::thread * > _watchdog_thread
static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
INT cm_connect_client(const char *client_name, HNDLE *hConn)
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
INT cm_list_experiments_local(STRING_LIST *exp_names)
INT cm_start_watchdog_thread()
static INT bm_push_event(const char *buffer_name)
INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg)
static int tr_finish(HNDLE hDB, TrState *tr, int transition, int status, const char *errorstr)
INT cm_set_client_run_state(INT state)
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_stop_watchdog_thread()
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_check_client(HNDLE hDB, HNDLE hKeyClient)
static int xbm_lock_buffer(BUFFER *pbuf)
static void xbm_unlock_buffer(BUFFER *pbuf)
INT cm_dispatch_ipc(const char *message, int message_size, int client_socket)
static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_select_experiment_remote(const char *host_name, std::string *exp_name)
INT cm_register_server(void)
static BOOL _ctrlc_pressed
static void init_rpc_hosts(HNDLE hDB)
void cm_ack_ctrlc_pressed()
INT cm_execute(const char *command, char *result, INT bufsize)
INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
void cm_check_connect(void)
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
INT cm_select_experiment_local(std::string *exp_name)
std::string cm_expand_env(const char *str)
std::string cm_get_client_name()
static int bm_lock_buffer_mutex(BUFFER *pbuf)
int cm_exec_script(const char *odb_path_to_script)
INT EXPRT cm_get_path_string(std::string *path)
static void rpc_client_shutdown()
static std::atomic< bool > _watchdog_thread_is_running
static exptab_struct _exptab
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
static INT bm_notify_client(const char *buffer_name, int s)
int cm_get_exptab(const char *expname, std::string *dir, std::string *user)
static void xcm_watchdog_thread()
static bool test_cm_expand_env1(const char *str, const char *expected)
INT cm_synchronize(DWORD *seconds)
std::string cm_get_exptab_filename()
std::string cm_get_path()
static DWORD _deferred_transition_mask
std::string cm_get_history_path(const char *history_channel)
void cm_test_expand_env()
INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL))
int cm_set_experiment_local(const char *exp_name)
static INT _requested_transition
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
const char * cm_get_version()
INT cm_read_exptab(exptab_struct *exptab)
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
INT cm_deregister_transition(INT transition)
INT cm_check_deferred_transition()
std::string cm_get_experiment_name()
INT cm_set_transition_sequence(INT transition, INT sequence_number)
static bool tr_compare(const std::unique_ptr< TrClient > &arg1, const std::unique_ptr< TrClient > &arg2)
INT cm_delete_client_info(HNDLE hDB, INT pid)
static std::atomic< bool > _watchdog_thread_run
const char * cm_get_revision()
INT cm_watchdog_thread(void *unused)
INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient)
BOOL cm_is_ctrlc_pressed()
static INT tr_main_thread(void *param)
void cm_ctrlc_handler(int sig)
INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout)
INT cm_transition_cleanup()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
static int cm_transition_call_direct(TrClient *tr_client)
INT cm_exist(const char *name, BOOL bUnique)
INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg)
static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_set_experiment_name(const char *name)
#define CM_INVALID_TRANSITION
#define CM_DEFERRED_TRANSITION
#define CM_TRANSITION_IN_PROGRESS
#define CM_WRONG_PASSWORD
#define CM_TRANSITION_CANCELED
#define CM_VERSION_MISMATCH
#define BM_INVALID_MIXING
#define BM_INVALID_HANDLE
#define DB_INVALID_HANDLE
#define DB_NO_MORE_SUBKEYS
#define RPC_EXCEED_BUFFER
#define RPC_DOUBLE_DEFINED
#define RPC_NOT_REGISTERED
#define RPC_MUTEX_TIMEOUT
#define RPC_NO_CONNECTION
#define RPC_HNDLE_CONNECT
#define RPC_HNDLE_MSERVER
#define VALIGN(adr, align)
RPC_LIST * rpc_get_internal_list(INT flag)
#define MESSAGE_BUFFER_NAME
#define DRI_LITTLE_ENDIAN
#define MAX_STRING_LENGTH
#define MESSAGE_BUFFER_SIZE
void() EVENT_HANDLER(HNDLE buffer_handler, HNDLE request_id, EVENT_HEADER *event_header, void *event_data)
INT() RPC_HANDLER(INT index, void *prpc_param[])
std::string ss_gethostname()
INT ss_suspend(INT millisec, INT msg)
INT ss_get_struct_align()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_suspend_init_odb_port()
bool ss_event_socket_has_data()
time_t ss_mktime(struct tm *tms)
int ss_file_exist(const char *path)
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
INT recv_tcp2(int sock, char *net_buffer, int buffer_size, int timeout_ms)
INT ss_suspend_set_client_listener(int listen_socket)
INT ss_socket_get_peer_name(int sock, std::string *hostp, int *portp)
int ss_file_link_exist(const char *path)
INT ss_mutex_delete(MUTEX_T *mutex)
int ss_socket_wait(int sock, INT millisec)
INT ss_suspend_set_server_acceptions(RPC_SERVER_ACCEPTION_LIST *acceptions)
DWORD ss_settime(DWORD seconds)
char * ss_getpass(const char *prompt)
INT ss_suspend_set_client_connection(RPC_SERVER_CONNECTION *connection)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
INT recv_string(int sock, char *buffer, DWORD buffer_size, INT millisec)
INT ss_write_tcp(int sock, const char *buffer, size_t buffer_size)
int ss_dir_exist(const char *path)
bool ss_timed_mutex_wait_for_sec(std::timed_mutex &mutex, const char *mutex_name, double timeout_sec)
INT ss_semaphore_release(HNDLE semaphore_handle)
std::string ss_get_cmdline(void)
int ss_file_copy(const char *src, const char *dst, bool append)
INT recv_tcp(int sock, char *net_buffer, DWORD buffer_size, INT flags)
INT ss_resume(INT port, const char *message)
midas_thread_t ss_gettid(void)
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
INT ss_sleep(INT millisec)
INT ss_socket_connect_tcp(const char *hostname, int tcp_port, int *sockp, std::string *error_msg_p)
INT ss_semaphore_wait_for(HNDLE semaphore_handle, DWORD timeout_millisec)
INT ss_socket_listen_tcp(bool listen_localhost, int tcp_port, int *sockp, int *tcp_port_p, std::string *error_msg_p)
char * ss_crypt(const char *buf, const char *salt)
INT ss_spawnv(INT mode, const char *cmdname, const char *const argv[])
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
INT ss_socket_close(int *sockp)
char * ss_gets(char *string, int size)
INT ss_recv_net_command(int sock, DWORD *routine_id, DWORD *param_size, char **param_ptr, int timeout_ms)
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
INT send_tcp(int sock, char *buffer, DWORD buffer_size, INT flags)
void * ss_ctrlc_handler(void(*func)(int))
BOOL ss_pid_exists(int pid)
INT ss_system(const char *command)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT ss_file_find(const char *path, const char *pattern, char **plist)
static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages)
INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format,...)
int cm_msg_early_init(void)
INT EXPRT cm_msg_facilities(STRING_LIST *list)
int cm_msg_open_buffer(void)
int cm_msg_close_buffer(void)
static std::mutex gMsgBufMutex
static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message)
INT cm_msg_register(EVENT_HANDLER *func)
INT cm_msg_log(INT message_type, const char *facility, const char *message)
INT cm_msg_flush_buffer()
static std::deque< msg_buffer_entry > gMsgBuf
static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message)
std::string cm_get_error(INT code)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr)
INT cm_msg_retrieve(INT n_message, char *message, INT buf_size)
INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages)
void cm_msg_get_logfile(const char *fac, time_t t, std::string *filename, std::string *linkname, std::string *linktarget)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
struct rpc_server_acception_struct RPC_SERVER_ACCEPTION
BOOL equal_ustring(const char *str1, const char *str2)
INT db_flush_database(HNDLE hDB)
INT db_get_data_index(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT idx, DWORD type)
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
INT db_check_client(HNDLE hDB, HNDLE hKeyClient)
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
INT db_open_record(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info)
INT db_open_database(const char *xdatabase_name, INT database_size, HNDLE *hDB, const char *client_name)
void db_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
std::string strcomb1(const char **list)
INT db_get_data(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, DWORD type)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_get_key(HNDLE hDB, HNDLE hKey, KEY *key)
INT EXPRT db_get_value_string(HNDLE hdb, HNDLE hKeyRoot, const char *key_name, int index, std::string *s, BOOL create, int create_string_length)
INT db_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT db_set_data_index(HNDLE hDB, HNDLE hKey, const void *data, INT data_size, INT idx, DWORD type)
INT db_close_all_records()
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_close_all_databases(void)
INT db_set_data(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_delete(HNDLE hDB, HNDLE hKeyRoot, const char *odb_path)
INT db_sprintf(char *string, const void *data, INT data_size, INT idx, DWORD type)
INT db_update_last_activity(DWORD millitime)
INT db_set_data1(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_update_record_local(INT hDB, INT hKeyRoot, INT hKey, int index)
void db_set_watchdog_params(DWORD timeout)
INT db_update_record_mserver(INT hDB, INT hKeyRoot, INT hKey, int index, int client_socket)
void db_cleanup2(const char *client_name, int ignore_timeout, DWORD actual_time, const char *who)
int db_delete_client_info(HNDLE hDB, int pid)
static DATABASE * db_lock_database(HNDLE hDB, int *pstatus, const char *caller, bool check_attached=true)
INT db_set_client_name(HNDLE hDB, const char *client_name)
INT db_notify_clients_array(HNDLE hDB, HNDLE hKeys[], INT size)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
static void db_unlock_database(DATABASE *pdb, const char *caller)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
INT EXPRT db_set_value_string(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const std::string *s)
INT db_set_lock_timeout(HNDLE hDB, int timeout_millisec)
INT db_set_num_values(HNDLE hDB, HNDLE hKey, INT num_values)
INT db_protect_database(HNDLE hDB)
int rb_get_rp(int handle, void **p, int millisec)
int rb_delete(int handle)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
static volatile int _rb_nonblocking
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
int rb_get_buffer_level(int handle, int *n_bytes)
static RING_BUFFER rb[MAX_RING_BUFFER]
INT rpc_add_allowed_host(const char *hostname)
void rpc_convert_data(void *data, INT tid, INT flags, INT total_size, INT convert_flags)
INT rpc_client_connect(const char *host_name, INT port, const char *client_name, HNDLE *hConnection)
#define RPC_BM_ADD_EVENT_REQUEST
INT rpc_register_server(int port, int *plsock, int *pport)
#define RPC_CM_CHECK_CLIENT
static int recv_event_server_realloc(INT idx, RPC_SERVER_ACCEPTION *psa, char **pbuffer, int *pbuffer_size)
INT rpc_get_opt_tcp_size()
INT rpc_client_disconnect(HNDLE hConn, BOOL bShutdown)
#define RPC_BM_SEND_EVENT
INT rpc_client_call(HNDLE hConn, DWORD routine_id,...)
INT rpc_register_functions(const RPC_LIST *new_list, RPC_HANDLER func)
static std::atomic_bool gAllowedHostsEnabled(false)
INT rpc_server_callback(struct callback_addr *pcallback)
static std::mutex _client_connections_mutex
INT rpc_set_timeout(HNDLE hConn, int timeout_msec, int *old_timeout_msec)
#define RPC_CM_SYNCHRONIZE
static std::mutex gAllowedHostsMutex
INT recv_tcp_check(int sock)
#define RPC_BM_GET_BUFFER_INFO
#define RPC_CM_SET_CLIENT_INFO
const char * rpc_get_mserver_path()
RPC_SERVER_ACCEPTION * rpc_get_mserver_acception()
void rpc_calc_convert_flags(INT hw_type, INT remote_hw_type, INT *convert_flags)
INT rpc_server_connect(const char *host_name, const char *exp_name)
std::string rpc_get_name()
int rpc_test_rpc_test2_cxx()
static RPC_SERVER_ACCEPTION * rpc_new_server_acception()
#define RPC_BM_REMOVE_EVENT_REQUEST
INT rpc_server_receive_rpc(RPC_SERVER_ACCEPTION *sa)
void rpc_debug_printf(const char *format,...)
const char * rpc_tid_name_old(INT id)
int cm_query_transition(int *transition, int *run_number, int *trans_time)
#define RPC_RC_TRANSITION
int rpc_test_rpc_test3_cxx()
void rpc_va_arg(va_list *arg_ptr, INT arg_type, void *arg)
static INT rpc_execute_old(INT sock, int xroutine_id, const RPC_LIST &rl, char *buffer, INT convert_flags)
INT rpc_server_loop(void)
INT rpc_clear_allowed_hosts()
std::string rpc_get_mserver_hostname(void)
INT rpc_deregister_functions()
bool rpc_is_connected(void)
INT rpc_set_mserver_path(const char *path)
static std::vector< RPC_LIST > rpc_list
#define RPC_BM_CLOSE_BUFFER
static TLS_POINTER * tls_buffer
#define RPC_BM_SET_CACHE_SIZE
static std::vector< RPC_CLIENT_CONNECTION * > _client_connections
static void rpc_call_encode(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
static RPC_CLIENT_CONNECTION * rpc_get_locked_client_connection(HNDLE hConn)
static std::vector< std::string > gAllowedHosts
INT rpc_call(DWORD routine_id,...)
static std::mutex rpc_list_mutex
const char * rpc_tid_name(INT id)
static void rpc_call_encode_cxx(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
INT rpc_register_client(const char *name, RPC_LIST *list)
static std::vector< RPC_SERVER_ACCEPTION * > _server_acceptions
static INT rpc_socket_check_allowed_host(int sock)
#define RPC_BM_CLOSE_ALL_BUFFERS
INT rpc_server_shutdown(void)
int rpc_flush_event_socket(int timeout_msec)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
static TR_FIFO _tr_fifo[10]
static std::string _mserver_path
INT rpc_get_timeout(HNDLE hConn)
INT rpc_server_disconnect()
INT rpc_set_debug(void(*func)(const char *), INT mode)
INT rpc_client_accept(int lsock)
void rpc_vax2ieee_float(float *var)
static int rpc_find_rpc(int routine_id, RPC_LIST *pentry, bool *prpc_cxx)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_RECEIVE_EVENT_CXX
#define RPC_BM_GET_BUFFER_LEVEL
int rpc_name_tid(const char *name)
INT rpc_register_listener(int port, RPC_HANDLER func, int *plsock, int *pport)
INT rpc_server_receive_event(int idx, RPC_SERVER_ACCEPTION *sa, int timeout_msec)
#define RPC_BM_OPEN_BUFFER
#define RPC_BM_EMPTY_BUFFERS
INT rpc_server_accept(int lsock)
static std::mutex _tr_fifo_mutex
bool rpc_is_mserver(void)
static RPC_SERVER_ACCEPTION * _mserver_acception
void rpc_ieee2vax_float(float *var)
#define RPC_CM_SET_WATCHDOG_PARAMS
INT rpc_send_event1(INT buffer_handle, const EVENT_HEADER *pevent)
#define RPC_BM_RECEIVE_EVENT
static bool _rpc_is_remote
static int rpc_call_decode(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
INT rpc_set_name(const char *name)
INT rpc_register_function(INT id, INT(*func)(INT, void **))
#define RPC_CM_MSG_RETRIEVE
#define RPC_BM_SKIP_EVENT
static INT rpc_execute_cxx(INT sock, int xroutine_id, const RPC_LIST &rl, char *buffer, INT convert_flags)
INT rpc_client_dispatch(int sock)
#define RPC_BM_INIT_BUFFER_COUNTERS
static RPC_SERVER_CONNECTION _server_connection
INT rpc_check_channels(void)
static INT rpc_transition_dispatch(INT idx, void *prpc_param[])
int rpc_test_rpc_test4_cxx()
static int handle_msg_odb(int n, const NET_COMMAND *nc)
#define RPC_BM_FLUSH_CACHE
void rpc_ieee2vax_double(double *var)
static int rpc_call_decode_cxx(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
void rpc_vax2ieee_double(double *var)
#define RPC_CM_GET_WATCHDOG_INFO
static int recv_net_command_realloc(RPC_SERVER_ACCEPTION *sa, char **pbuf, int *pbufsize, INT *remaining)
INT rpc_get_convert_flags(void)
INT rpc_check_allowed_host(const char *hostname)
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
char exp_name[NAME_LENGTH]
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
char expt_name[NAME_LENGTH]
char buffer_name[NAME_LENGTH]
static const int tid_size[]
static std::string join(const char *sep, const std::vector< std::string > &v)
static std::vector< TRANS_TABLE > _trans_table
static std::atomic_int _message_mask_system
static int disable_bind_rpc_to_localhost
static std::mutex gBuffersMutex
int(* MessagePrintCallback)(const char *)
std::string cm_transition_name(int transition)
static DBG_MEM_LOC * _mem_loc
static std::vector< BUFFER * > gBuffers
static void(* _debug_print)(const char *)
static std::mutex _trans_table_mutex
static std::string _experiment_name
static std::string _path_name
static std::string _client_name
static std::vector< EventRequest > _request_list
static int _rpc_connect_timeout
static const char * tid_name[]
static const ERROR_TABLE _error_table[]
static INT _watchdog_timeout
INT bm_get_buffer_info(INT buffer_handle, BUFFER_HEADER *buffer_header)
static MUTEX_T * _mutex_rpc
static EVENT_HANDLER * _msg_dispatch
void * dbg_calloc(unsigned int size, unsigned int count, char *file, int line)
static BOOL _rpc_registered
static std::atomic< MessagePrintCallback > _message_print
bool ends_with_char(const std::string &s, char c)
void dbg_free(void *adr, char *file, int line)
static std::atomic_int _message_mask_user
static std::mutex _request_list_mutex
INT bm_get_buffer_level(INT buffer_handle, INT *n_bytes)
static TRANS_TABLE _deferred_trans_table[]
static int _rpc_listen_socket
std::string msprintf(const char *format,...)
void * dbg_malloc(unsigned int size, char *file, int line)
static const char * tid_name_old[]
static std::vector< std::string > split(const char *sep, const std::string &s)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
INT bm_init_buffer_counters(INT buffer_handle)
#define DIR_SEPARATOR_STR
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_RPC_TIMEOUT
#define PROGRAM_INFO_STR(_name)
#define MIN_WRITE_CACHE_SIZE
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
#define DEFAULT_MAX_EVENT_SIZE
#define WATCHDOG_INTERVAL
#define MAX_EVENT_REQUESTS
#define BANK_FORMAT_64BIT_ALIGNED
#define BANK_FORMAT_32BIT
std::vector< std::string > STRING_LIST
#define MAX_WRITE_CACHE_SIZE_DIV
#define TRANSITION_ERROR_STRING_LENGTH
#define BANK_FORMAT_VERSION
#define message(type, str)
#define write(n, a, f, d)
static std::string remove(const std::string s, char c)
int gettimeofday(struct timeval *tp, void *tzp)
struct callback_addr callback
EVENT_REQUEST event_request[MAX_EVENT_REQUESTS]
BUFFER_CLIENT client[MAX_CLIENTS]
BUFFER_INFO(BUFFER *pbuf)
int client_count_write_wait[MAX_CLIENTS]
DWORD client_time_write_wait[MAX_CLIENTS]
std::timed_mutex buffer_mutex
std::timed_mutex read_cache_mutex
char client_name[NAME_LENGTH]
std::timed_mutex write_cache_mutex
int client_count_write_wait[MAX_CLIENTS]
BUFFER_HEADER * buffer_header
std::atomic< size_t > read_cache_size
std::atomic< size_t > write_cache_size
char buffer_name[NAME_LENGTH]
std::atomic_bool attached
DWORD client_time_write_wait[MAX_CLIENTS]
EVENT_HANDLER * dispatcher
NET_COMMAND_HEADER header
unsigned int max_event_size
RPC_PARAM param[MAX_RPC_PARAMS]
std::atomic< std::thread * > thread
std::atomic_bool finished
std::vector< int > wait_for_index
std::string waiting_for_client
std::vector< std::unique_ptr< TrClient > > clients
unsigned short host_port1
unsigned short host_port2
unsigned short host_port3
std::vector< exptab_entry > exptab
std::mutex event_sock_mutex
static double fac(double a)
static te_expr * list(state *s)