16#include "git-revision.h"
22#include <sys/resource.h>
62#ifndef DOXYGEN_SHOULD_SKIP_THIS
161extern unsigned _stklen = 60000U;
274 "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"},
333 f =
fopen(
"mem.txt",
"w");
369 f =
fopen(
"mem.txt",
"w");
381static std::vector<std::string>
split(
const char*
sep,
const std::string& s)
384 std::vector<std::string> v;
385 std::string::size_type pos = 0;
387 std::string::size_type next = s.find(
sep, pos);
388 if (next == std::string::npos) {
389 v.push_back(s.substr(pos));
392 v.push_back(s.substr(pos, next-pos));
398static std::string
join(
const char*
sep,
const std::vector<std::string>& v)
402 for (
unsigned i=0;
i<v.size();
i++) {
416 return s[s.length()-1] ==
c;
421 va_start(
ap, format);
424 char *buffer = (
char *)
malloc(size);
432 std::string s(buffer);
475 return msprintf(
"unlisted status code %d", code);
523 if (pos != std::string::npos) {
535 for (
size_t i = 0;
i <
flist.size();
i++) {
536 const char *p =
flist[
i].c_str();
537 if (
strchr(p,
'_') ==
NULL && !(p[0] >=
'0' && p[0] <=
'9')) {
538 size_t pos =
flist[
i].rfind(
'.');
539 if (pos != std::string::npos) {
561 *filename = std::string(
fac) +
".log";
683 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n",
message,
status);
687 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n",
message);
710 "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n",
719 "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n",
751 }
else if (
wr != len) {
752 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n",
message, filename.c_str(), (
int)
wr, (
int)len);
766 const char*
pc = filename +
strlen(filename);
767 while (*
pc !=
'\\' && *
pc !=
'/' &&
pc != filename)
782 if (message_type &
MT_LOG)
794 if (
name.length() > 0)
803 }
else if (message_type ==
MT_USER) {
811 for (
int i=0;
i<10;
i++) {
842 if (message_type !=
MT_LOG) {
884 for (
i = 0;
i < 100;
i++) {
929INT cm_msg(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, ...)
945 if (message_type !=
MT_LOG) {
988 const char *
facility,
const char *routine,
const char *format, ...) {
1000 va_start(
argptr, format);
1095 (*messages)[*
length] =
'\n';
1106 (*messages)[*
length] = 0;
1124 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot open log file \"%s\", errno %d (%s)", filename,
errno,
1141 char *buffer = (
char *)
malloc(size + 1);
1143 if (buffer ==
NULL) {
1144 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (
int) size,
1153 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)",
1162 p = buffer + size - 1;
1166 while (*p ==
'\n' || *p ==
'\r')
1170 for (
n = 0; !
stop && p > buffer;) {
1174 for (
i = 0; p != buffer && (*p !=
'\n' && *p !=
'\r');
i++)
1178 if (
i >= (
int)
sizeof(
str))
1179 i =
sizeof(
str) - 1;
1200 if (
str[0] >=
'0' &&
str[0] <=
'9') {
1214 for (
i = 0;
i < 12;
i++)
1236 if (t == 0 ||
tstamp == -1 ||
1245 while (*p ==
'\n' || *p ==
'\r')
1513 assert(path[0] != 0);
1560 assert(path !=
NULL);
1606#ifdef LOCAL_ROUTINES
1633 if (
getenv(
"MIDAS_DIR")) {
1638 if (
getenv(
"MIDAS_EXPT_NAME")) {
1642 cm_msg(
MERROR,
"cm_read_exptab",
"Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"",
e.name.c_str());
1645 e.directory =
getenv(
"MIDAS_DIR");
1654#if defined (OS_WINNT)
1656 if (
getenv(
"SystemRoot"))
1658 else if (
getenv(
"windir"))
1664 str +=
"\\system32\\exptab";
1665 alt_str +=
"\\system\\exptab";
1666#elif defined (OS_UNIX)
1667 std::string
str =
"/etc/exptab";
1668 std::string
alt_str =
"/exptab";
1670 std::strint
str =
"exptab";
1671 std::string
alt_str =
"exptab";
1675 if (
getenv(
"MIDAS_EXPTAB")) {
1694 memset(buf, 0,
sizeof(buf));
1695 char*
str =
fgets(buf,
sizeof(buf)-1, f);
1698 if (
str[0] == 0)
continue;
1699 if (
str[0] ==
'#')
continue;
1723 e.
name = std::string(
p1, len);
1747 e.directory = std::string(
p1, len);
1768 e.user = std::string(
p1, len);
1782 for (
unsigned j=0;
j<exptab->
exptab.size();
j++) {
1783 cm_msg(
MINFO,
"cm_read_exptab",
"entry %d, experiment \"%s\", directory \"%s\", user \"%s\"",
j, exptab->
exptab[
j].name.c_str(), exptab->
exptab[
j].directory.c_str(), exptab->
exptab[
j].user.c_str());
1887#ifdef LOCAL_ROUTINES
1908 char *client_name,
INT hw_type,
const char *password,
DWORD watchdog_timeout) {
1913#ifdef LOCAL_ROUTINES
1963 strcpy(
name, client_name);
1994 sprintf(
str,
"System/Clients/%0d/Name", pid);
1998 cm_msg(
MERROR,
"cm_set_client_info",
"cannot set client name, db_set_value(%s) status %d",
str,
status);
2003 strcpy(client_name,
name);
2039 size =
sizeof(watchdog_timeout);
2178#ifdef LOCAL_ROUTINES
2202 cm_msg(
MERROR,
"cm_set_experiment_local",
"Experiment \"%s\" directory \"%s\" does not exist",
exp_name1.c_str(),
expdir.c_str());
2217 cm_msg(
MERROR,
"cm_check_connect",
"cm_disconnect_experiment not called at end of program");
2312 const char *client_name,
void (*func)(
char *),
INT odb_size,
DWORD watchdog_timeout) {
2370#ifdef LOCAL_ROUTINES
2384 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create alarm semaphore");
2389 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create elog semaphore");
2394 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create history semaphore");
2399 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create message semaphore");
2418 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open database, db_open_database() status %d",
status);
2429 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/ODB timeout, status %d",
status);
2440 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Protect ODB, status %d",
status);
2451 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Enable core dumps, status %d",
status);
2461 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)",
errno,
2465#warning setrlimit(RLIMIT_CORE) is not available
2474 "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d",
status);
2486 if (watchdog_timeout == 0)
2526 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open message buffer, cm_msg_open_buffer() status %d",
status);
2550 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot register RPC server, cm_register_server() status %d",
status);
2558 size =
sizeof(watchdog_timeout);
2559 sprintf(
str,
"/Programs/%s/Watchdog Timeout", client_name);
2565 std::string path =
"/Programs/" + std::string(client_name);
2568 prog[
"Start command"] == std::string(
""))
2569 prog[
"Start command"].set_string_size(
cmdline, 256);
2592#ifdef LOCAL_ROUTINES
2649 cm_msg(
MERROR,
"cm_list_experiments_remote",
"Cannot connect to \"%s\" port %d: %s",
hname, port,
errmsg.c_str());
2654 send(sock,
"I", 2, 0);
2675#ifdef LOCAL_ROUTINES
2695 if (
expts.size() == 1) {
2697 }
else if (
expts.size() > 1) {
2698 printf(
"Available experiments on local computer:\n");
2700 for (
unsigned i = 0;
i <
expts.size();
i++) {
2705 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2744 if (
expts.size() > 1) {
2747 for (
unsigned i = 0;
i <
expts.size();
i++) {
2752 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2868 printf(
"Waiting for transition to finish...\n");
2891 cm_msg(
MLOG,
"cm_disconnect_experiment",
"Program %s on host %s stopped", client_name.c_str(),
local_host_name.c_str());
2969#ifndef DOXYGEN_SHOULD_SKIP_THIS
3044#ifndef DOXYGEN_SHOULD_SKIP_THIS
3085#ifdef LOCAL_ROUTINES
3124 printf(
"lock_buffer_guard(invalid) dtor\n");
3260#ifdef LOCAL_ROUTINES
3283 pclient->watchdog_timeout = timeout;
3362#ifdef LOCAL_ROUTINES
3371#ifndef DOXYGEN_SHOULD_SKIP_THIS
3419 "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d",
new_size,
status);
3432 strcpy(buf,
"localhost");
3438 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create the RPC hosts access control list, db_get_value() status %d",
3448 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create \"Disable RPC hosts check\", db_get_value() status %d",
status);
3458 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot find the RPC hosts access control list, db_find_key() status %d",
3468 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot watch the RPC hosts access control list, db_watch() status %d",
status);
3503 size =
sizeof(
name);
3507 cm_msg(
MERROR,
"cm_register_server",
"cannot get client name, db_get_value() status %d",
status);
3514 size =
sizeof(port);
3518 cm_msg(
MERROR,
"cm_register_server",
"cannot get RPC port number, db_get_value(%s) status %d",
str,
status);
3526 cm_msg(
MERROR,
"cm_register_server",
"error, rpc_register_server(port=%d) status %d", port,
status);
3539 cm_msg(
MERROR,
"cm_register_server",
"error, db_find_key(\"Server Port\") status %d",
status);
3549 cm_msg(
MERROR,
"cm_register_server",
"error, db_set_data(\"Server Port\"=%d) status %d", port,
status);
3657 tt.sequence_number = sequence_number;
3770 }
else if (
count > 1) {
3817 "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout",
3838#ifndef DOXYGEN_SHOULD_SKIP_THIS
3893 cm_msg(
MERROR,
"cm_register_deferred_transition",
"Cannot hotlink /Runinfo/Requested Transition");
3928 cm_msg(
MERROR,
"cm_check_deferred_transition",
"Cannot perform deferred transition: %s",
str);
3944#ifndef DOXYGEN_SHOULD_SKIP_THIS
3999 return arg1->sequence_number <
arg2->sequence_number;
4029 const char *buf =
"Success";
4039 tr->end_time = end_time;
4041 tr->errorstr = errorstr;
4043 tr->errorstr =
"(null)";
4107 const char *
args[100];
4125 path +=
"mtransition";
4175 if (errstr !=
NULL) {
4176 sprintf(errstr,
"Cannot execute mtransition, ss_spawnv() returned %d",
status);
4191 int connect_timeout = 10000;
4192 int timeout = 120000;
4217 for (
size_t i = 0;
i <
tr_client->wait_for_index.size();
i++) {
4220 assert(wait_for_index >= 0);
4221 assert(wait_for_index < (
int)s->
clients.size());
4250 printf(
"Client \"%s\" waits for client \"%s\"\n",
tr_client->client_name.c_str(),
wait_for->client_name.c_str());
4257 cm_msg(
MERROR,
"cm_transition_call",
"Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared",
tr_client->client_name.c_str(),
tr_client->transition,
wait_for->client_name.c_str());
4273 printf(
"Connecting to client \"%s\" on host %s...\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4275 cm_msg(
MINFO,
"cm_transition_call",
"cm_transition_call: Connecting to client \"%s\" on host %s...",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4278 size =
sizeof(timeout);
4281 if (connect_timeout < 1000)
4282 connect_timeout = 1000;
4285 size =
sizeof(timeout);
4295 tr_client->connect_timeout = connect_timeout;
4310 "cannot connect to client \"%s\" on host %s, port %d, status %d",
4334 printf(
"Connection established to client \"%s\" on host %s\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4337 "cm_transition: Connection established to client \"%s\" on host %s",
4349 printf(
"Executing RPC transition client \"%s\" on host %s...\n",
4353 "cm_transition: Executing RPC transition client \"%s\" on host %s...",
4381 printf(
"RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n",
4385 "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d",
4405 printf(
"hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n",
4452 for (
size_t i = 0;
i <
n;
i++) {
4460 printf(
"Calling local transition callback\n");
4462 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Calling local transition callback");
4478 printf(
"Local transition callback finished, status %d\n",
int(
tr_client->status));
4480 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Local transition callback finished, status %d",
int(
tr_client->status));
4493 cm_msg(
MERROR,
"cm_transition_call_direct",
"no handler for transition %d with sequence number %d",
tr_client->transition,
tr_client->sequence_number);
4560 errstr_size =
sizeof(
xerrstr);
4576 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
4587 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
4588 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
4590 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
4649 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to alarms: %s",
alarms.c_str());
4650 mstrlcpy(errstr,
"Cannot start run due to alarms: ", errstr_size);
4682 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info required, status %d",
status);
4691 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to program \"%s\" not running",
key.
name);
4707 mstrlcpy(errstr,
"Unknown error", errstr_size);
4709 if (debug_flag == 0) {
4736 if (debug_flag == 1)
4738 if (debug_flag == 2)
4743 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/Run number in database, status %d",
status);
4749 if (debug_flag == 1)
4750 printf(
"Clearing /Runinfo/Requested transition\n");
4751 if (debug_flag == 2)
4752 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Clearing /Runinfo/Requested transition");
4760 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4762 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4771 mstrlcpy(errstr,
"Deferred transition already in progress", errstr_size);
4772 mstrlcat(errstr,
", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size);
4788 size =
sizeof(sequence_number);
4797 if (debug_flag == 1)
4798 printf(
"---- Transition %s deferred by client \"%s\" ----\n",
trname.c_str(),
str);
4799 if (debug_flag == 2)
4800 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s deferred by client \"%s\" ----",
trname.c_str(),
str);
4802 if (debug_flag == 1)
4803 printf(
"Setting /Runinfo/Requested transition\n");
4804 if (debug_flag == 2)
4805 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Setting /Runinfo/Requested transition");
4816 sprintf(errstr,
"Transition %s deferred by client \"%s\"",
trname.c_str(),
str);
4850 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto start, status %d",
status);
4856 start_command[0] = 0;
4858 size =
sizeof(start_command);
4861 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info start command, status %d",
status);
4865 if (start_command[0]) {
4866 cm_msg(
MINFO,
"cm_transition",
"Auto Starting program \"%s\", command \"%s\"",
key.
name,
4901 size =
sizeof(
state);
4907 cm_msg(
MERROR,
"cm_transition",
"cannot get Runinfo/State in database");
4914 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time binary\" in database");
4921 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time\" in database");
4927 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4929 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4959 if (debug_flag == 1)
4960 printf(
"---- Transition %s started ----\n",
trname.c_str());
4961 if (debug_flag == 2)
4962 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s started ----",
trname.c_str());
4968 for (
int i = 0,
status = 0;;
i++) {
4985 size =
sizeof(sequence_number);
4994 c->async_flag = async_flag;
4995 c->debug_flag = debug_flag;
4996 c->sequence_number = sequence_number;
5002 size =
sizeof(client_name);
5004 c->client_name = client_name;
5017 size =
sizeof(port);
5027 if (
cc->client_name ==
c->client_name)
5028 if (
cc->host_name ==
c->host_name)
5029 if (
cc->port ==
c->port)
5030 if (
cc->sequence_number ==
c->sequence_number)
5035 s.
clients.push_back(std::unique_ptr<TrClient>(
c));
5038 cm_msg(
MERROR,
"cm_transition",
"transition %s: client \"%s\" is registered with sequence number %d more than once",
trname.c_str(),
c->client_name.c_str(),
c->sequence_number);
5056 for (
size_t i =
idx - 1; ;
i--) {
5058 if (s.
clients[
i]->sequence_number > 0) {
5084 if (debug_flag == 1)
5085 printf(
"\n==== Found client \"%s\" with sequence number %d\n",
5087 if (debug_flag == 2)
5089 "cm_transition: ==== Found client \"%s\" with sequence number %d",
5107 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: client \"%s\" returned status %d",
trname.c_str(),
5134 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: \"/Runinfo/Transition in progress\" was cleared",
trname.c_str());
5137 mstrlcpy(errstr,
"Canceled", errstr_size);
5162 if (debug_flag == 1)
5163 printf(
"\n---- Transition %s finished ----\n",
trname.c_str());
5164 if (debug_flag == 2)
5165 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s finished ----",
trname.c_str());
5180 size =
sizeof(
state);
5183 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/State in database, db_set_value() status %d",
status);
5234 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto stop, status %d",
status);
5252 mstrlcpy(errstr,
"Success", errstr_size);
5266 cm_msg(
MERROR,
"cm_transition",
"Could not start a run: cm_transition() status %d, message \'%s\'",
status,
5315 cm_msg(
MERROR,
"cm_transition",
"previous transition did not finish yet");
5330 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
5338 int size =
sizeof(
i);
5342 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
5343 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
5345 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
5404#ifndef DOXYGEN_SHOULD_SKIP_THIS
5465 printf(
"Received 2nd Ctrl-C, hard abort\n");
5468 printf(
"Received Ctrl-C, aborting...\n");
5528 std::string command;
5533 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" of type TID_STRING, db_get_value_string() error %d",
5538 for (
int i = 0;;
i++) {
5554 int size =
subkey.item_size;
5555 char *buf = (
char *)
malloc(size);
5556 assert(buf !=
NULL);
5559 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" of type %d, db_get_data() error %d",
5573 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" has invalid type %d, should be TID_STRING or TID_KEY",
5580 if (command.length() > 0) {
5774 mstrlcpy(buf, command,
sizeof(buf));
5775 cm_msg(
MERROR,
"cm_execute",
"cm_execute(%s...) is disabled by ODB \"/Experiment/Enable cm_execute\"", buf);
5780 strcpy(
str, command);
5790 result[
MAX(0,
n)] = 0;
5809#ifndef DOXYGEN_SHOULD_SKIP_THIS
5875 p +=
"/Logger/History/";
5877 p +=
"/History dir";
5938#ifdef LOCAL_ROUTINES
5955 }
else if (
idx >
pbuf->buffer_header->max_client_index) {
5974 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5975 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5978 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5979 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5980 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5983 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5984 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5985 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5997 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
5999 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
6002 cm_msg(
MERROR,
"bm_validate_client_index",
"Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
6006 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
6008 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
6011 fprintf(
stderr,
"bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6051#ifdef LOCAL_ROUTINES
6093 pheader =
pbuf->buffer_header;
6101 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist",
pbclient->name,
6113 printf(
"buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6125 cm_msg(
MINFO,
"bm_cleanup",
"Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6128 pbclient->watchdog_timeout / 1000.0);
6151 if (
pbuf->attached) {
6176#ifdef LOCAL_ROUTINES
6190 if (
pbuf->attached) {
6209#ifdef LOCAL_ROUTINES
6214 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6227 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6251 if (
gRpLog && (total_size < 16)) {
6252 const char *
pdata = (
const char *) (pheader + 1);
6254 fprintf(
gRpLog,
"%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->
name, rp, total_size,
6255 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6261 assert(total_size > 0);
6265 if (rp >= pheader->
size) {
6266 rp -= pheader->
size;
6285 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6310 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6331 const char *
pdata = (
const char *) (pheader + 1);
6341 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->
name,
6348 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->
name,
6354 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->
name,
6363 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6371 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->
name, rp,
rp0);
6394 int rp =
c->read_pointer;
6401 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6402 pheader->
name,
c->name,
c->read_pointer, rp,
rp0);
6435 sprintf(
str,
"/System/buffers/%s/Clients/%s/writes_blocked_by",
pbuf->buffer_name,
pbuf->client_name);
6510 double buf_size = pheader->
size;
6586 if (!
pbuf->client_count_write_wait[
i])
6618 if (
pbuf->count_lock ==
pbuf->last_count_lock) {
6624 std::string client_name =
pbuf->client_name;
6629 cm_msg(
MERROR,
"bm_write_buffer_statistics_to_odb",
"Invalid empty buffer name \"%s\" or client name \"%s\"",
buffer_name.c_str(), client_name.c_str());
6633 pbuf->last_count_lock =
pbuf->count_lock;
6637 int client_index =
pbuf->client_index;
6662 cm_msg(
MERROR,
who,
"invalid buffer handle %d: out of range [1..%d]", buffer_handle, (
int)
nbuf);
6670 cm_msg(
MERROR,
who,
"invalid buffer handle %d: empty slot", buffer_handle);
6676 if (!
pbuf->attached) {
6678 cm_msg(
MERROR,
who,
"invalid buffer handle %d: not attached", buffer_handle);
6754 int size =
sizeof(
INT);
6758 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6765#ifdef LOCAL_ROUTINES
6775 cm_msg(
MERROR,
"bm_open_buffer",
"cannot open buffer with zero name");
6792 std::string odb_path;
6793 odb_path +=
"/Experiment/Buffer sizes/";
6796 int size =
sizeof(
INT);
6801 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6812 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6822 *buffer_handle =
i + 1;
6842 *buffer_handle =
i + 1;
6857 pbuf->client_time_write_wait[
i] = 0;
6873 mstrlcpy(
pbuf->client_name, client_name.c_str(),
sizeof(
pbuf->client_name));
6880 pbuf->attached =
true;
6915 pheader->
size = buffer_size;
6925 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"",
buffer_name,
6936 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6947 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6955 if (pheader->
size != buffer_size) {
6956 cm_msg(
MINFO,
"bm_open_buffer",
"Buffer \"%s\" requested size %d differs from existing size %d",
6959 buffer_size = pheader->
size;
6974 pheader =
pbuf->buffer_header;
6980 pbuf->attached =
true;
6982 pbuf->shm_handle = shm_handle;
6983 pbuf->shm_size = shm_size;
6991 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...",
buffer_name,
7061 *buffer_handle =
i+1;
7066 *buffer_handle =
gBuffers.size() + 1;
7103 *buffer_handle =
i + 1;
7124#ifdef LOCAL_ROUTINES
7167 pbuf->read_cache_mutex.unlock();
7174 pbuf->write_cache_mutex.unlock();
7175 pbuf->read_cache_mutex.unlock();
7182 pbuf->attached =
false;
7209 if (
pbuf->read_cache_size > 0) {
7210 free(
pbuf->read_cache);
7212 pbuf->read_cache_size = 0;
7213 pbuf->read_cache_rp = 0;
7214 pbuf->read_cache_wp = 0;
7217 if (
pbuf->write_cache_size > 0) {
7218 free(
pbuf->write_cache);
7220 pbuf->write_cache_size = 0;
7221 pbuf->write_cache_rp = 0;
7222 pbuf->write_cache_wp = 0;
7242 pbuf->shm_handle = 0;
7248 pbuf->write_cache_mutex.unlock();
7249 pbuf->read_cache_mutex.unlock();
7269#ifdef LOCAL_ROUTINES
7277 for (
size_t i =
nbuf;
i > 0;
i--) {
7303#ifdef LOCAL_ROUTINES
7341#ifdef LOCAL_ROUTINES
7360 for (
i = 0;
i < 20;
i++) {
7382#ifdef LOCAL_ROUTINES
7397#ifdef LOCAL_ROUTINES
7454 size =
sizeof(client_name);
7466 size =
sizeof(port);
7481 cm_msg(
MERROR,
"cm_shutdown",
"Cannot connect to client \'%s\' on host \'%s\', port %d",
7484 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7490 cm_msg(
MERROR,
"cm_shutdown",
"Cannot delete client info for client \'%s\', pid %d, status %d",
7507 cm_msg(
MERROR,
"cm_shutdown",
"Client \'%s\' not responding to shutdown command", client_name);
7509 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7515 "Cannot delete client info for client \'%s\', pid %d, status %d",
name,
client_pid,
7569 size =
sizeof(client_name);
7636#ifdef LOCAL_ROUTINES
7651 if (
pbuf->attached) {
7668 (client_name ==
NULL || client_name[0] == 0
7683 "Client \'%s\' on \'%s\' removed by cm_cleanup (idle %1.1lfs, timeout %1.0lfs)",
7733 const char *s =
str;
7766 printf(
"test_expand_env: [%s] -> [%s] expected [%s]",
7780 printf(
"Test expand_end()\n");
7799 printf(
"test_expand_env: all tests passed!\n");
7801 printf(
"test_expand_env: test FAILED!\n");
7810#ifndef DOXYGEN_SHOULD_SKIP_THIS
7839#ifdef LOCAL_ROUTINES
7883#ifdef LOCAL_ROUTINES
7903 *n_bytes += pheader->
size;
7907 if (
pbuf->read_cache_size) {
7911 if (
pbuf->read_cache_wp >
pbuf->read_cache_rp)
7912 *n_bytes +=
pbuf->read_cache_wp -
pbuf->read_cache_rp;
7913 pbuf->read_cache_mutex.unlock();
7923#ifdef LOCAL_ROUTINES
7931 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7932 cm_msg(
MERROR,
"bm_lock_buffer_read_cache",
"Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7937 if (!
pbuf->attached) {
7938 pbuf->read_cache_mutex.unlock();
7939 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7952 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7953 cm_msg(
MERROR,
"bm_lock_buffer_write_cache",
"Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7958 if (!
pbuf->attached) {
7959 pbuf->write_cache_mutex.unlock();
7960 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7975 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7976 cm_msg(
MERROR,
"bm_lock_buffer_mutex",
"Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7981 if (!
pbuf->attached) {
7982 pbuf->buffer_mutex.unlock();
7983 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
8016 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 1 second!\n",
pbuf->buffer_name);
8021 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 10 seconds, buffer semaphore is probably stuck, delete %s.SHM and try again!\n",
pbuf->buffer_name,
pbuf->buffer_name);
8023 if (
pbuf->buffer_header) {
8025 fprintf(
stderr,
"bm_lock_buffer: Buffer \"%s\" client %d \"%s\" pid %d\n",
pbuf->buffer_name,
i,
pbuf->buffer_header->client[
i].name,
pbuf->buffer_header->client[
i].pid);
8032 fprintf(
stderr,
"bm_lock_buffer: Error: Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...\n",
pbuf->buffer_name,
status);
8033 cm_msg(
MERROR,
"bm_lock_buffer",
"Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...",
pbuf->buffer_name,
status);
8041 assert(!
pbuf->locked);
8046 if (
pbuf->buffer_header->client[x].unused1 != 0) {
8047 printf(
"lllock [%s] unused1 %d pid %d\n",
pbuf->buffer_name,
pbuf->buffer_header->client[x].unused1,
getpid());
8050 pbuf->buffer_header->client[x].unused1 =
getpid();
8064 if (
pbuf->attached) {
8065 if (
pbuf->buffer_header->client[x].unused1 !=
getpid()) {
8066 printf(
"unlock [%s] unused1 %d pid %d\n",
pbuf->buffer_header->name,
pbuf->buffer_header->client[x].unused1,
getpid());
8068 pbuf->buffer_header->client[x].unused1 = 0;
8070 printf(
"unlock [??????] unused1 ????? pid %d\n",
getpid());
8075 assert(
pbuf->locked);
8079 pbuf->buffer_mutex.unlock();
8108#ifdef LOCAL_ROUTINES
8122 pbuf->buffer_header->num_in_events = 0;
8123 pbuf->buffer_header->num_out_events = 0;
8168#ifdef LOCAL_ROUTINES
8198 cm_msg(
MERROR,
"bm_set_cache_size",
"requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes",
write_size,
pbuf->buffer_name,
pbuf->buffer_header->size,
new_write_size);
8202 pbuf->buffer_mutex.unlock();
8212 if (
pbuf->read_cache_size > 0) {
8213 free(
pbuf->read_cache);
8220 pbuf->read_cache_size = 0;
8221 pbuf->read_cache_rp = 0;
8222 pbuf->read_cache_wp = 0;
8223 pbuf->read_cache_mutex.unlock();
8224 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
read_size);
8230 pbuf->read_cache_rp = 0;
8231 pbuf->read_cache_wp = 0;
8233 pbuf->read_cache_mutex.unlock();
8243 if (
pbuf->write_cache_size &&
pbuf->write_cache_wp > 0) {
8244 cm_msg(
MERROR,
"bm_set_cache_size",
"buffer \"%s\" lost %zu bytes from the write cache",
pbuf->buffer_name,
pbuf->write_cache_wp);
8248 if (
pbuf->write_cache_size > 0) {
8249 free(
pbuf->write_cache);
8256 pbuf->write_cache_size = 0;
8257 pbuf->write_cache_rp = 0;
8258 pbuf->write_cache_wp = 0;
8259 pbuf->write_cache_mutex.unlock();
8260 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
write_size);
8266 pbuf->write_cache_rp = 0;
8267 pbuf->write_cache_wp = 0;
8269 pbuf->write_cache_mutex.unlock();
8316 static std::mutex mutex;
8323 std::lock_guard<std::mutex>
lock(mutex);
8333#ifndef DOXYGEN_SHOULD_SKIP_THIS
8385#ifdef LOCAL_ROUTINES
8401 if (func ==
NULL &&
pbuf->callback) {
8403 cm_msg(
MERROR,
"bm_add_event_request",
"mixing callback/non callback requests not possible");
8410 cm_msg(
MERROR,
"bm_add_event_request",
"GET_RECENT request not possible if read cache is enabled");
8420 if (!
pclient->event_request[
i].valid)
8429 pclient->event_request[
i].id = request_id;
8433 pclient->event_request[
i].sampling_type = sampling_type;
8448 if (
i + 1 >
pclient->max_request_index)
8449 pclient->max_request_index =
i + 1;
8489 INT sampling_type,
HNDLE *request_id,
8492 assert(request_id !=
NULL);
8544#ifdef LOCAL_ROUTINES
8566 if (
pclient->event_request[
i].valid &&
pclient->event_request[
i].id == request_id) {
8573 if (
pclient->event_request[
i].valid)
8576 pclient->max_request_index =
i + 1;
8581 for (
i = 0;
i <
pclient->max_request_index;
i++)
8610 if (request_id < 0 ||
size_t(request_id) >=
_request_list.size()) {
8615 int buffer_handle =
_request_list[request_id].buffer_handle;
8652 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8661 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8670 if (
pclient->read_pointer < 0) {
8672 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8681 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8690 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8755 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8796 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8813 for (
i = 0;
i <
pc->max_request_index;
i++)
8814 if (
pc->event_request[
i].valid)
8834 if (
pc->pid &&
pc->write_wait) {
8851 for (
size_t i = 0;
i <
n;
i++) {
8868 r.
dispatcher(buffer_handle,
i, pevent, (
void *) (pevent + 1));
8875#ifdef LOCAL_ROUTINES
8879 pbuf->read_cache_rp += total_size;
8881 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
8882 pbuf->read_cache_rp = 0;
8883 pbuf->read_cache_wp = 0;
8889 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp)
8917 if (!
pc->read_wait) {
8924 if (
pc->read_wait) {
8929 if ((
pc->read_pointer < 0) || (
pc->read_pointer >= pheader->
size)) {
8930 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->
name,
pc->name,
pc->read_pointer, pheader->
read_pointer, pheader->
write_pointer, pheader->
size);
8934 char *
pdata = (
char *) (pheader + 1);
8940 if ((total_size <= 0) || (total_size > pheader->
size)) {
8941 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->
name,
pc->name,
pc->read_pointer, pevent->
data_size,
event_size, total_size, pheader->
size, pheader->
read_pointer, pheader->
write_pointer);
8945 assert(total_size > 0);
8960 const char *
pdata = (
const char *) (pheader + 1);
8967 int size = pheader->
size - rp;
8975 const char *
pdata = (
const char *) (pheader + 1);
8982 int size = pheader->
size - rp;
8992 for (
i = 0;
i <
pc->max_request_index;
i++) {
9038 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
9064 if (
pbuf->read_cache_wp + total_size >
pbuf->read_cache_size) {
9073 pbuf->read_cache_wp += total_size;
9093 if (convert_flags) {
9115 char *
pdata = (
char *) (pheader + 1);
9142 free += pheader->
size;
9152 if (
pbuf->wait_start_time != 0) {
9156 pbuf->wait_start_time = 0;
9174 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9189 printf(
"bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->
read_pointer, pheader->
write_pointer, free, pheader->
size,
requested_space,
event_size, total_size);
9194 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9222 for (
j = 0;
j <
pc->max_request_index;
j++) {
9265 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9285 if (
pbuf->wait_start_time == 0) {
9287 pbuf->count_write_wait++;
9327 pbuf->write_cache_mutex.unlock();
9386 pbuf->write_cache_mutex.unlock();
9428 if (!
pc->read_wait) {
9452 if (!
pc->read_wait) {
9466 pbuf->read_cache_mutex.unlock();
9499 pbuf->read_cache_mutex.unlock();
9518 if (
pc->read_wait) {
9528 char *
pdata = (
char *) (pheader + 1);
9614 for (
j = 0;
j <
pc->max_request_index;
j++) {
9626 if (request_id >= 0) {
9628 if (
pc->read_wait) {
9705 if (data_size == 0) {
9706 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9729 const char*
cptr =
event.data();
9730 size_t clen =
event.size();
9736 int sg_n =
event.size();
9746#ifdef LOCAL_ROUTINES
9811 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_ptr[0] is NULL");
9825 if (data_size == 0) {
9826 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9843 cm_msg(
MERROR,
"bm_send_event",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
9849#ifdef LOCAL_ROUTINES
9863 if (
pbuf->write_cache_size) {
9870 if (
pbuf->write_cache_size) {
9877 if (
pbuf->write_cache_wp > 0 && (
pbuf->write_cache_wp + total_size >
pbuf->write_cache_size ||
too_big)) {
9883 pbuf->write_cache_mutex.unlock();
9895 pbuf->write_cache_mutex.unlock();
9898 cm_msg(
MERROR,
"bm_send_event",
"write cache size is bigger than buffer size");
9903 assert(
pbuf->write_cache_wp == 0);
9917 pbuf->write_cache_wp += total_size;
9919 pbuf->write_cache_mutex.unlock();
9925 pbuf->write_cache_mutex.unlock();
9943 printf(
"bm_send_event: corrupted 111!\n");
9949 if (total_size >= (
size_t)pheader->
size) {
9951 cm_msg(
MERROR,
"bm_send_event",
"total event size (%d) larger than size (%d) of buffer \'%s\'", (
int)total_size, pheader->
size, pheader->
name);
9965 printf(
"bm_send_event: corrupted 222!\n");
9994 printf(
"bm_send_event: corrupted 333!\n");
10001 pbuf->count_sent += 1;
10002 pbuf->bytes_sent += total_size;
10084#ifdef LOCAL_ROUTINES
10103 request_id[
i] = -1;
10128 printf(
"bm_flush_cache: corrupted 111!\n");
10151 if (
pbuf->write_cache_wp == 0) {
10157 while (
pbuf->write_cache_rp <
pbuf->write_cache_wp) {
10165 printf(
"bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10166 int(
pbuf->write_cache_size),
10167 int(
pbuf->write_cache_wp),
10168 int(
pbuf->write_cache_rp),
10178 assert(total_size <= (
size_t)pheader->
size);
10184 pbuf->count_sent += 1;
10185 pbuf->bytes_sent += total_size;
10204 pbuf->write_cache_rp += total_size;
10207 assert(
pbuf->write_cache_rp > 0);
10208 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_size);
10209 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_wp);
10213 assert(
pbuf->write_cache_wp ==
pbuf->write_cache_rp);
10214 pbuf->write_cache_wp = 0;
10215 pbuf->write_cache_rp = 0;
10235#ifdef LOCAL_ROUTINES
10246 if (
pbuf->write_cache_size == 0)
10255 if (
pbuf->write_cache_wp == 0) {
10256 pbuf->write_cache_mutex.unlock();
10275 pbuf->write_cache_mutex.unlock();
10284#ifdef LOCAL_ROUTINES
10302 if (
pbuf->read_cache_size > 0) {
10309 if (
pbuf->read_cache_wp == 0) {
10314 pbuf->read_cache_mutex.unlock();
10325 pbuf->read_cache_mutex.unlock();
10354 if (convert_flags) {
10363 char*
cptr = (
char*)pevent;
10367 pbuf->read_cache_mutex.unlock();
10377 pbuf->read_cache_mutex.unlock();
10429 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"",
max_size,
10441 if (convert_flags) {
10445 pbuf->count_read++;
10447 }
else if (dispatch ||
bufptr) {
10451 pbuf->count_read++;
10455 pbuf->count_read++;
10525 assert(!
"incorrect call to bm_receivent_event_rpc()");
10591 assert(!
"incorrect call to bm_receivent_event_rpc()");
10607 assert(!
"incorrect call to bm_receivent_event_rpc()");
10677#ifdef LOCAL_ROUTINES
10757#ifdef LOCAL_ROUTINES
10835#ifdef LOCAL_ROUTINES
10853#ifdef LOCAL_ROUTINES
10858 if (
pbuf->read_cache_size > 0) {
10865 pbuf->read_cache_rp = 0;
10866 pbuf->read_cache_wp = 0;
10868 pbuf->read_cache_mutex.unlock();
10900#ifdef LOCAL_ROUTINES
10916#ifdef LOCAL_ROUTINES
10928 if (!
pbuf->callback)
10977#ifdef LOCAL_ROUTINES
11008 if (
pbuf->attached) {
11117 if (!
fbuf->callback)
11134 if (convert_flags) {
11169 std::vector<char>
vec;
11173 bool locked =
true;
11175 for (
size_t i = 0;
i <
n;
i++) {
11208 cm_msg(
MERROR,
"bm_poll_event",
"received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (
int)
vec.size());
11266#ifdef LOCAL_ROUTINES
11278 if (!
pbuf->attached)
11292#ifndef DOXYGEN_SHOULD_SKIP_THIS
11294#define MAX_DEFRAG_EVENTS 10
11350 "Received new event with ID %d while old fragments were not completed",
11351 (pevent->event_id & 0x0FFF));
11361 "Not enough defragment buffers, please increase MAX_DEFRAG_EVENTS and recompile");
11368 "Received first event fragment with %d bytes instead of %d bytes, event ignored",
11381 cm_msg(
MERROR,
"bm_defragement_event",
"Not enough memory to allocate event defragment buffer");
11403 "Received fragment without first fragment (ID %d) Ser#:%d",
11414 "Received fragments with more data (%d) than event size (%d)",
11467 printf(
"index %d, client \"%s\", host \"%s\", port %d, socket %d, connected %d, timeout %d",
11606 *convert_flags = 0;
11634 unsigned short int lo,
hi;
11637 lo = *((
short int *) (
var) + 1);
11638 hi = *((
short int *) (
var));
11644 *((
short int *) (
var) + 1) =
hi;
11645 *((
short int *) (
var)) =
lo;
11649 unsigned short int lo,
hi;
11652 lo = *((
short int *) (
var) + 1);
11653 hi = *((
short int *) (
var));
11659 *((
short int *) (
var) + 1) =
hi;
11660 *((
short int *) (
var)) =
lo;
11665 unsigned short int i1,
i2,
i3,
i4;
11668 i1 = *((
short int *) (
var) + 3);
11669 i2 = *((
short int *) (
var) + 2);
11670 i3 = *((
short int *) (
var) + 1);
11671 i4 = *((
short int *) (
var));
11677 *((
short int *) (
var) + 3) =
i4;
11678 *((
short int *) (
var) + 2) =
i3;
11679 *((
short int *) (
var) + 1) =
i2;
11680 *((
short int *) (
var)) =
i1;
11684 unsigned short int i1,
i2,
i3,
i4;
11687 i1 = *((
short int *) (
var) + 3);
11688 i2 = *((
short int *) (
var) + 2);
11689 i3 = *((
short int *) (
var) + 1);
11690 i4 = *((
short int *) (
var));
11696 *((
short int *) (
var) + 3) =
i4;
11697 *((
short int *) (
var) + 2) =
i3;
11698 *((
short int *) (
var) + 1) =
i2;
11699 *((
short int *) (
var)) =
i1;
11766 for (
int i = 0;
i <
n;
i++) {
11790 return "<unknown>";
11797 return "<unknown>";
11856 cm_msg(
MERROR,
"rpc_register_functions",
"registered RPC function with invalid ID %d",
new_list[
i].
id);
11876 if (
e.dispatch ==
NULL) {
11889#ifndef DOXYGEN_SHOULD_SKIP_THIS
11977 char net_buffer[256];
11979 int n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12003 timeout.tv_sec = 0;
12004 timeout.tv_usec = 0;
12009 n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12059 bool debug =
false;
12063 cm_msg(
MERROR,
"rpc_client_connect",
"cm_connect_experiment/rpc_set_name not called");
12069 cm_msg(
MERROR,
"rpc_client_connect",
"invalid port %d", port);
12081 printf(
"rpc_client_connect: host \"%s\", port %d, client \"%s\"\n",
host_name, port, client_name);
12084 printf(
"client connection %d: ", (
int)
i);
12102 if (
c &&
c->connected) {
12109 if ((
c->host_name ==
host_name) && (
c->port == port)) {
12115 std::lock_guard<std::mutex>
cguard(
c->mutex);
12117 if (
c->connected) {
12124 printf(
"already connected: ");
12151 for (
int j = 1;
j < size;
j++) {
12175 printf(
"new connection appended to array: ");
12182 c->connected =
true;
12208 c->client_name = client_name;
12223 int size =
cstr.length() + 1;
12224 i =
send(
c->send_sock,
cstr.c_str(), size, 0);
12225 if (
i < 0 ||
i != size) {
12233 DWORD watchdog_timeout;
12253 cm_msg(
MERROR,
"rpc_client_connect",
"timeout waiting for server reply");
12259 int remote_hw_type = 0;
12264 c->remote_hw_type = remote_hw_type;
12282 c->connected =
true;
12312 if (
c &&
c->connected) {
12313 std::lock_guard<std::mutex>
cguard(
c->mutex);
12315 if (!
c->connected) {
12331 timeout.tv_sec = 0;
12332 timeout.tv_usec = 0;
12363 "RPC client connection to \"%s\" on host \"%s\" is broken, recv() errno %d (%s)",
12364 c->client_name.c_str(),
12365 c->host_name.c_str(),
12369 }
else if (
status == 0) {
12374 cm_msg(
MINFO,
"rpc_client_check",
"RPC client connection to \"%s\" on host \"%s\" unexpectedly closed",
c->client_name.c_str(),
c->host_name.c_str());
12434 char str[200], version[32],
v1[32];
12459 cm_msg(
MERROR,
"rpc_server_connect",
"cm_connect_experiment/rpc_set_name not called");
12485 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12492 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12499 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12516 cm_msg(
MERROR,
"rpc_server_connect",
"cannot connect to mserver on host \"%s\" port %d: %s",
str, port,
errmsg.c_str());
12530 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive status from server");
12534 status = version[0] = 0;
12543 strcpy(
v1, version);
12554 cm_msg(
MERROR,
"rpc_server_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", version,
12565 timeout.tv_usec = 0;
12577 cm_msg(
MERROR,
"rpc_server_connect",
"mserver subprocess could not be started (check path)");
12589 cm_msg(
MERROR,
"rpc_server_connect",
"accept() failed");
12603 flag = 2 * 1024 * 1024;
12618 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive remote computer info");
12639 if (
c &&
c->connected) {
12642 if (!
c->connected) {
12662 if (
c &&
c->connected) {
12679 if (!
c->connected) {
12888 dummy = 0x12345678;
12889 p = (
unsigned char *) &
dummy;
12892 else if (*p == 0x12)
12895 cm_msg(
MERROR,
"rpc_get_option",
"unknown byte order format");
12898 f = (
float) 1.2345;
12901 if ((
dummy & 0xFF) == 0x19 &&
12902 ((
dummy >> 8) & 0xFF) == 0x04 && ((
dummy >> 16) & 0xFF) == 0x9E
12903 && ((
dummy >> 24) & 0xFF) == 0x3F)
12905 else if ((
dummy & 0xFF) == 0x9E &&
12906 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x19
12907 && ((
dummy >> 24) & 0xFF) == 0x04)
12910 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12915 if ((
dummy & 0xFF) == 0x8D &&
12916 ((
dummy >> 8) & 0xFF) == 0x97 && ((
dummy >> 16) & 0xFF) == 0x6E
12917 && ((
dummy >> 24) & 0xFF) == 0x12)
12919 else if ((
dummy & 0xFF) == 0x83 &&
12920 ((
dummy >> 8) & 0xFF) == 0xC0 && ((
dummy >> 16) & 0xFF) == 0xF3
12921 && ((
dummy >> 24) & 0xFF) == 0x3F)
12923 else if ((
dummy & 0xFF) == 0x13 &&
12924 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x83
12925 && ((
dummy >> 24) & 0xFF) == 0xC0)
12927 else if ((
dummy & 0xFF) == 0x9E &&
12928 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x18
12929 && ((
dummy >> 24) & 0xFF) == 0x04)
12931 "MIDAS cannot handle VAX D FLOAT format. Please compile with the /g_float flag");
12933 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12957 else if (
hConn == -2)
13004 int timeout =
c->rpc_timeout;
13049#ifndef DOXYGEN_SHOULD_SKIP_THIS
13202 va_start(
argptr, format);
13203 vsprintf(
str, (
char *) format,
argptr);
13255 bool debug =
false;
13258 printf(
"encode rpc_id %d \"%s\"\n",
rl.id,
rl.name);
13259 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13260 int tid =
rl.param[
i].tid;
13261 int flags =
rl.param[
i].flags;
13262 int n =
rl.param[
i].n;
13263 printf(
"i=%d, tid %d, flags 0x%x, n %d\n",
i, tid, flags,
n);
13269 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13270 int tid =
rl.param[
i].tid;
13271 int flags =
rl.param[
i].flags;
13292 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13293 char* buf = (
char *)
malloc(buf_size);
13303 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13304 int tid =
rl.param[
i].tid;
13305 int flags =
rl.param[
i].flags;
13325 char* arg =
args[
i];
13397 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy pointer %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13402 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, double->float\n",
i, flags, tid,
arg_type,
arg_size, param_size);
13408 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13420 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n",
rl.id,
rl.name, (
int)buf_size, (*nc)->header.param_size);
13426 bool debug =
false;
13429 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n",
rl.id,
rl.name, (
int)buf_size);
13435 for (
int i = 0;
rl.param[
i].tid != 0;
i++) {
13436 int tid =
rl.param[
i].tid;
13437 int flags =
rl.param[
i].flags;
13458 cm_msg(
MERROR,
"rpc_call_decode",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL",
rl.name);
13462 tid =
rl.param[
i].tid;
13480 if (*((
char **) arg)) {
13482 printf(
"decode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13549 if (
rpc_list[
i].
id == (
int) routine_id) {
13559 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" with invalid RPC ID %d",
c->client_name.c_str(),
c->host_name.c_str(), routine_id);
13568 va_start(
ap, routine_id);
13586 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13608 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13618 DWORD watchdog_timeout;
13623 if (
c->rpc_timeout >= (
int) watchdog_timeout) {
13629 DWORD buf_size = 0;
13640 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": timeout waiting for reply",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13648 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, ss_recv_net_command() status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
status);
13658 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, unknown RPC, status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
rpc_status);
13666 va_start(
ap, routine_id);
13723 fprintf(
stderr,
"rpc_call(routine_id=%d) failed, no connection to mserver.\n", routine_id);
13747 if (
rpc_list[
i].
id == (
int) routine_id) {
13759 cm_msg(
MERROR,
"rpc_call",
"invalid rpc ID (%d)", routine_id);
13768 va_start(
ap, routine_id);
13811 DWORD watchdog_timeout;
13821 if (rpc_timeout >= (
int) watchdog_timeout) {
13828 DWORD buf_size = 0;
13844 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": timeout waiting for reply, program abort",
rpc_name);
13868 va_start(
ap, routine_id);
13928 return bm_send_event(buffer_handle, pevent, unused, async_flag);
13955 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_ptr[0] is NULL");
13960 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)
sg_len[0], (
int)
sizeof(
EVENT_HEADER));
13969 if (data_size == 0) {
13970 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size zero");
13988 cm_msg(
MERROR,
"rpc_send_event_sg",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
14014 assert(
sizeof(
DWORD) == 4);
14020 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(buffer handle) failed, event socket is now closed");
14030 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(event data) failed, event socket is now closed");
14037 if (
count < total_size) {
14038 char padding[8] = { 0,0,0,0,0,0,0,0 };
14044 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(padding) failed, event socket is now closed");
14110 for (
size_t i = 0;
i <
n;
i++) {
14115 if (
tt.transition ==
CINT(0) &&
tt.sequence_number ==
CINT(4)) {
14133 cm_msg(
MERROR,
"rpc_transition_dispatch",
"no handler for transition %d with sequence number %d",
CINT(0),
CINT(4));
14136 cm_msg(
MERROR,
"rpc_transition_dispatch",
"received unrecognized command %d",
idx);
14197 for (
i = 0;
i < (size - 1) / 16 + 1;
i++) {
14199 for (
j = 0;
j < 16;
j++)
14200 if (
i * 16 +
j < size)
14206 for (
j = 0;
j < 16;
j++) {
14208 if (
i * 16 +
j < size)
14209 printf(
"%c", (
c >= 32 &&
c < 128) ? p[
i * 16 +
j] :
'.');
14248 char *buffer =
NULL;
14272 int param_size = -1;
14281 if (param_size == -1) {
14321 int size = write_ptr - read_ptr;
14326 read_ptr = write_ptr;
14333 }
while (write_ptr == -1 &&
errno ==
EINTR);
14339 if (write_ptr <= 0) {
14340 if (write_ptr == 0)
14351 read_ptr = misalign;
14352 write_ptr += misalign;
14354 misalign = write_ptr % 8;
14364 if (write_ptr - read_ptr < param_size)
14432 int sock =
psa->event_sock;
14458 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d",
hrd);
14467 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(more header) returned %d",
hrd1);
14484 if (
psa->convert_flags) {
14499 for (
int i=0;
i<5;
i++) {
14500 printf(
"recv_event_server: header[%d]: 0x%08x\n",
i,
pbh[
i]);
14508 "received event header with invalid data_size %d: event_size %d, total_size %d", pevent->
data_size,
14636 cm_msg(
MERROR,
"rpc_register_server",
"cannot listen to tcp port %d: %s", port,
errmsg.c_str());
14641#if defined(F_SETFD) && defined(FD_CLOEXEC)
14738 assert(return_buffer);
14748 if (convert_flags) {
14780 cm_msg(
MERROR,
"rpc_execute",
"Invalid rpc ID (%d)", routine_id);
14793 for (
i = 0;
rl.param[
i].tid != 0;
i++) {
14794 tid =
rl.param[
i].tid;
14795 flags =
rl.param[
i].flags;
14808 param_size =
ALIGN8(param_size);
14819 if (convert_flags) {
14872 "return parameters (%d) too large for network buffer (%d)",
14882 "rpc_execute: return parameters (%d) too large for network buffer (%d), new buffer size (%d)",
14898 assert(return_buffer);
14915 if (
rl.param[
i + 1].tid)
14954 for (
i = 0;
rl.param[
i].tid != 0;
i++)
14956 tid =
rl.param[
i].tid;
14957 flags =
rl.param[
i].flags;
14969 param_size =
ALIGN8(param_size);
14997 param_size =
ALIGN8(param_size);
15009 if (convert_flags) {
15025 nc_out->header.param_size = param_size;
15030 if (convert_flags) {
15087 printf(
"rpc_test_rpc!\n");
15116 for (
int i=0;
i<10;
i++) {
15148 printf(
"int_out mismatch!\n");
15153 printf(
"int_inout mismatch!\n");
15163 printf(
"string2_out mismatch [%s] vs [%s]\n",
string2_out,
"second string_out");
15168 printf(
"string_inout mismatch [%s] vs [%s]\n",
string_inout,
"return string_inout");
15180 if (
pkey->type != 444 ||
pkey->num_values != 555 ||
strcmp(
pkey->name,
"out_name") ||
pkey->last_written != 666) {
15181 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15187 if (
pkey->type != 444444 ||
pkey->num_values != 555555 ||
strcmp(
pkey->name,
"inout_name") ||
pkey->last_written != 666666) {
15188 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15303 if (
strcmp(hostname,
"localhost") == 0)
15306 if (
strcmp(hostname,
"localhost.localdomain") == 0)
15309 if (
strcmp(hostname,
"localhost6") == 0)
15312 if (
strcmp(hostname,
"ip6-localhost") == 0)
15320 if (h == hostname) {
15337 std::string hostname;
15353 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\', this message will no longer be reported", hostname.c_str());
15355 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\'. Add this host to \"/Experiment/Security/RPC hosts/Allowed hosts\"", hostname.c_str());
15391 char net_buffer[256];
15436#ifdef LOCAL_ROUTINES
15439 for (
unsigned i=0;
i<exptab.
exptab.size();
i++) {
15441 const char*
str = exptab.
exptab[
i].name.c_str();
15444 send(sock,
"", 1, 0);
15466 while (*ptr ==
' ')
15470 for (; *ptr != 0 && *ptr !=
' ' &&
i < (
int)
sizeof(version) - 1;)
15471 version[
i++] = *ptr++;
15474 assert(
i < (
int)
sizeof(version));
15478 for (; *ptr != 0 && *ptr !=
' ';)
15481 while (*ptr ==
' ')
15485 for (; *ptr != 0 && *ptr !=
' ' && *ptr !=
'\n' && *ptr !=
'\r' &&
i < (
int)
sizeof(
experiment) - 1;)
15508 cm_msg(
MERROR,
"rpc_server_accept",
"received string: %s", net_buffer + 2);
15523#ifdef LOCAL_ROUTINES
15529 bool found =
false;
15548 send(sock,
"2", 2, 0);
15567 const char *
argv[10];
15602 cm_msg(
MERROR,
"rpc_server_accept",
"received unknown command '%c' code %d", command, command);
15647 char net_buffer[256], *p;
15668 i =
recv_string(sock, net_buffer,
sizeof(net_buffer), 10000);
15675 p =
strtok(net_buffer,
" ");
15745 int recv_sock, send_sock, event_sock;
15750 char net_buffer[256];
15792 flag = 2 * 1024 * 1024;
15795 cm_msg(
MERROR,
"rpc_server_callback",
"cannot setsockopt(SOL_SOCKET, SO_RCVBUF), errno %d (%s)",
errno,
15800 cm_msg(
MERROR,
"rpc_server_callback",
"timeout on receive remote computer info");
15927 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"rpc_execute() returned %d, abort",
status);
15954 cm_msg(
MTALK,
"rpc_server_receive_rpc",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16040 cm_msg(
MERROR,
"rpc_server_receive_event",
"internal error: called recursively");
16057 cm_msg(
MERROR,
"rpc_server_receive_event",
"recv_event_server_realloc() returned %d, abort",
n_received);
16085 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d (SS_ABORT), abort",
status);
16096 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d, mserver dropped this event",
status);
16114 cm_msg(
MTALK,
"rpc_server_receive_event",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16322 if (convert_flags) {
16331 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, send_tcp() returned %d",
16367 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec",
16386 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, recv_tcp() returned %d",
16434#ifndef DOXYGEN_SHOULD_SKIP_THIS
16585 if (((
PTYPE) event & 0x07) != 0) {
16586 cm_msg(
MERROR,
"bk_create",
"Bank %s created with unaligned event pointer",
name);
16602 pbk32->data_size = 0;
16610 pbk->data_size = 0;
16616#ifndef DOXYGEN_SHOULD_SKIP_THIS
16737 }
while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16759 }
while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16781 }
while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16809 return pbk32a->data_size;
16814 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk32->name[0],
pbk32->name[1],
pbk32->name[2],
pbk32->name[3]);
16816 return pbk32->data_size;
16820 if (size > 0xFFFF) {
16821 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16824 pbk->data_size = (
WORD) (size);
16826 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk->name[0],
pbk->name[1],
pbk->name[2],
pbk->name[3]);
16828 if (size > 0xFFFF) {
16829 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16833 return pbk->data_size;
16920 while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16925 return pbk32a->data_size;
16933 while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16938 return pbk32->data_size;
16946 while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16951 return pbk->data_size;
17074 *((
void **)
pdata) = (*pbk) + 1;
17081 return (*pbk)->data_size;
17086#ifndef DOXYGEN_SHOULD_SKIP_THIS
17114 *((
void **)
pdata) = (*pbk) + 1;
17122 return (*pbk)->data_size;
17150 *((
void **)
pdata) = (*pbk32a) + 1;
17158 return (*pbk32a)->data_size;
17190 if (
pbh->flags < 0x10000 && !
force)
17236 while ((
char *)
pdata < (
char *)
pbk) {
17246 while ((
char *)
pdata < (
char *)
pbk) {
17255 while ((
char *)
pdata < (
char *)
pbk) {
17277#ifndef DOXYGEN_SHOULD_SKIP_THIS
17302#define MAX_RING_BUFFER 100
17394 assert(
rb[
i].buffer);
17491 rp >
rb[h].buffer) {
17553 cm_msg(
MERROR,
"rb_increment_wp",
"event size of %d MB larger than max_event_size of %d MB",
17564 assert(
rb[h].rp !=
rb[h].buffer);
17623 if (
rb[h].wp !=
rb[h].rp) {
17625 *p =
rb[handle - 1].
rp;
17691 if (
new_rp >= ep &&
rb[h].wp < ep)
17733 if (
rb[h].wp >=
rb[h].rp)
17751 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event %d ODB record size mismatch, db_set_record() status %d", pevent->
event_id,
status);
17776 for (
n=0 ; ;
n++) {
17832 cm_msg(
MERROR,
"cm_write_event_to_odb",
"please define bank \"%s\" in BANK_LIST in frontend",
name);
17837 for (
i = 0;;
i++) {
17850 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17864 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot create key for bank \"%s\" with tid %d in ODB, db_create_key() status %d",
name,
bktype,
status);
17869 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot find key for bank \"%s\" in ODB, after db_create_key(), db_find_key() status %d",
name,
status);
17876 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17890 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event format %d is not supported (see midas.h definitions of FORMAT_xxx)", format);
std::atomic_bool connected
BUFFER * get_pbuf() const
bm_lock_buffer_guard(BUFFER *pbuf, bool do_not_lock=false)
bm_lock_buffer_guard & operator=(const bm_lock_buffer_guard &)=delete
bm_lock_buffer_guard(const bm_lock_buffer_guard &)=delete
static bool exists(const std::string &name)
INT transition(INT run_number, char *error)
INT al_get_alarms(std::string *presult)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
INT bk_iterate32a(const void *event, BANK32A **pbk32a, void *pdata)
static void copy_bk_name(char *dst, const char *src)
INT bk_swap(void *event, BOOL force)
BOOL bk_is32a(const void *event)
int bk_delete(void *event, const char *name)
BOOL bk_is32(const void *event)
INT bk_iterate32(const void *event, BANK32 **pbk, void *pdata)
INT bk_locate(const void *event, const char *name, void *pdata)
void bk_init(void *event)
INT bk_list(const void *event, char *bklist)
INT bk_copy(char *pevent, char *psrce, const char *bkname)
INT bk_iterate(const void *event, BANK **pbk, void *pdata)
void bk_init32(void *event)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_find(const BANK_HEADER *pbkh, const char *name, DWORD *bklen, DWORD *bktype, void **pdata)
INT bk_size(const void *event)
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
static int bm_skip_event(BUFFER *pbuf)
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_write_statistics_to_odb(void)
#define MAX_DEFRAG_EVENTS
INT bm_delete_request(INT request_id)
INT bm_close_all_buffers(void)
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
static int bm_validate_client_index_locked(bm_lock_buffer_guard &pbuf_guard)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
static int bm_validate_buffer_locked(const BUFFER *pbuf)
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_compose_event_threadsafe(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
INT bm_remove_event_request(INT buffer_handle, INT request_id)
static double _bm_mutex_timeout_sec
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
static EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
static void bm_update_last_activity(DWORD millitime)
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
static DWORD _bm_max_event_size
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
INT bm_get_buffer_handle(const char *buffer_name, INT *buffer_handle)
int bm_send_event_vec(int buffer_handle, const std::vector< char > &event, int timeout_msec)
static int _bm_lock_timeout
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
INT bm_receive_event_alloc(INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
static void bm_reset_buffer_locked(BUFFER *pbuf)
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
INT cm_set_path(const char *path)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
static int cm_transition_call(TrState *s, int idx)
INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown)
static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info)
static std::atomic< std::thread * > _watchdog_thread
static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
INT cm_connect_client(const char *client_name, HNDLE *hConn)
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
INT cm_list_experiments_local(STRING_LIST *exp_names)
INT cm_start_watchdog_thread()
static INT bm_push_event(const char *buffer_name)
INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg)
static int tr_finish(HNDLE hDB, TrState *tr, int transition, int status, const char *errorstr)
INT cm_set_client_run_state(INT state)
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_stop_watchdog_thread()
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_check_client(HNDLE hDB, HNDLE hKeyClient)
static int xbm_lock_buffer(BUFFER *pbuf)
static void xbm_unlock_buffer(BUFFER *pbuf)
INT cm_dispatch_ipc(const char *message, int message_size, int client_socket)
static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_select_experiment_remote(const char *host_name, std::string *exp_name)
INT cm_register_server(void)
static BOOL _ctrlc_pressed
static void init_rpc_hosts(HNDLE hDB)
void cm_ack_ctrlc_pressed()
INT cm_execute(const char *command, char *result, INT bufsize)
INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
void cm_check_connect(void)
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
INT cm_select_experiment_local(std::string *exp_name)
std::string cm_expand_env(const char *str)
std::string cm_get_client_name()
static int bm_lock_buffer_mutex(BUFFER *pbuf)
int cm_exec_script(const char *odb_path_to_script)
INT EXPRT cm_get_path_string(std::string *path)
static void rpc_client_shutdown()
static std::atomic< bool > _watchdog_thread_is_running
static exptab_struct _exptab
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
static INT bm_notify_client(const char *buffer_name, int s)
int cm_get_exptab(const char *expname, std::string *dir, std::string *user)
static void xcm_watchdog_thread()
static bool test_cm_expand_env1(const char *str, const char *expected)
INT cm_synchronize(DWORD *seconds)
std::string cm_get_exptab_filename()
std::string cm_get_path()
static DWORD _deferred_transition_mask
std::string cm_get_history_path(const char *history_channel)
void cm_test_expand_env()
INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL))
int cm_set_experiment_local(const char *exp_name)
static INT _requested_transition
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
const char * cm_get_version()
INT cm_read_exptab(exptab_struct *exptab)
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
INT cm_deregister_transition(INT transition)
INT cm_check_deferred_transition()
std::string cm_get_experiment_name()
INT cm_set_transition_sequence(INT transition, INT sequence_number)
static bool tr_compare(const std::unique_ptr< TrClient > &arg1, const std::unique_ptr< TrClient > &arg2)
INT cm_delete_client_info(HNDLE hDB, INT pid)
static std::atomic< bool > _watchdog_thread_run
const char * cm_get_revision()
INT cm_watchdog_thread(void *unused)
INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient)
BOOL cm_is_ctrlc_pressed()
static INT tr_main_thread(void *param)
void cm_ctrlc_handler(int sig)
INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout)
INT cm_transition_cleanup()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
static int cm_transition_call_direct(TrClient *tr_client)
INT cm_exist(const char *name, BOOL bUnique)
INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg)
static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_set_experiment_name(const char *name)
#define CM_INVALID_TRANSITION
#define CM_DEFERRED_TRANSITION
#define CM_TRANSITION_IN_PROGRESS
#define CM_WRONG_PASSWORD
#define CM_TRANSITION_CANCELED
#define CM_VERSION_MISMATCH
#define BM_INVALID_MIXING
#define BM_INVALID_HANDLE
#define DB_INVALID_HANDLE
#define DB_NO_MORE_SUBKEYS
#define RPC_EXCEED_BUFFER
#define RPC_DOUBLE_DEFINED
#define RPC_NOT_REGISTERED
#define RPC_MUTEX_TIMEOUT
#define RPC_NO_CONNECTION
#define RPC_HNDLE_CONNECT
#define RPC_HNDLE_MSERVER
#define VALIGN(adr, align)
RPC_LIST * rpc_get_internal_list(INT flag)
#define MESSAGE_BUFFER_NAME
#define DRI_LITTLE_ENDIAN
#define MAX_STRING_LENGTH
#define MESSAGE_BUFFER_SIZE
void() EVENT_HANDLER(HNDLE buffer_handler, HNDLE request_id, EVENT_HEADER *event_header, void *event_data)
INT() RPC_HANDLER(INT index, void *prpc_param[])
std::string ss_gethostname()
INT ss_suspend(INT millisec, INT msg)
INT ss_get_struct_align()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_suspend_init_odb_port()
bool ss_event_socket_has_data()
time_t ss_mktime(struct tm *tms)
int ss_file_exist(const char *path)
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
INT recv_tcp2(int sock, char *net_buffer, int buffer_size, int timeout_ms)
INT ss_suspend_set_client_listener(int listen_socket)
INT ss_socket_get_peer_name(int sock, std::string *hostp, int *portp)
int ss_file_link_exist(const char *path)
INT ss_mutex_delete(MUTEX_T *mutex)
int ss_socket_wait(int sock, INT millisec)
INT ss_suspend_set_server_acceptions(RPC_SERVER_ACCEPTION_LIST *acceptions)
DWORD ss_settime(DWORD seconds)
char * ss_getpass(const char *prompt)
INT ss_suspend_set_client_connection(RPC_SERVER_CONNECTION *connection)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
INT recv_string(int sock, char *buffer, DWORD buffer_size, INT millisec)
INT ss_write_tcp(int sock, const char *buffer, size_t buffer_size)
int ss_dir_exist(const char *path)
bool ss_timed_mutex_wait_for_sec(std::timed_mutex &mutex, const char *mutex_name, double timeout_sec)
INT ss_semaphore_release(HNDLE semaphore_handle)
std::string ss_get_cmdline(void)
int ss_file_copy(const char *src, const char *dst, bool append)
INT recv_tcp(int sock, char *net_buffer, DWORD buffer_size, INT flags)
INT ss_resume(INT port, const char *message)
midas_thread_t ss_gettid(void)
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
INT ss_sleep(INT millisec)
INT ss_socket_connect_tcp(const char *hostname, int tcp_port, int *sockp, std::string *error_msg_p)
INT ss_semaphore_wait_for(HNDLE semaphore_handle, DWORD timeout_millisec)
INT ss_socket_listen_tcp(bool listen_localhost, int tcp_port, int *sockp, int *tcp_port_p, std::string *error_msg_p)
char * ss_crypt(const char *buf, const char *salt)
INT ss_spawnv(INT mode, const char *cmdname, const char *const argv[])
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
INT ss_socket_close(int *sockp)
char * ss_gets(char *string, int size)
INT ss_recv_net_command(int sock, DWORD *routine_id, DWORD *param_size, char **param_ptr, int timeout_ms)
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
INT send_tcp(int sock, char *buffer, DWORD buffer_size, INT flags)
void * ss_ctrlc_handler(void(*func)(int))
BOOL ss_pid_exists(int pid)
INT ss_system(const char *command)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT ss_file_find(const char *path, const char *pattern, char **plist)
static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages)
INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format,...)
int cm_msg_early_init(void)
INT EXPRT cm_msg_facilities(STRING_LIST *list)
int cm_msg_open_buffer(void)
int cm_msg_close_buffer(void)
static std::mutex gMsgBufMutex
static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message)
INT cm_msg_register(EVENT_HANDLER *func)
INT cm_msg_log(INT message_type, const char *facility, const char *message)
INT cm_msg_flush_buffer()
static std::deque< msg_buffer_entry > gMsgBuf
static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message)
std::string cm_get_error(INT code)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr)
INT cm_msg_retrieve(INT n_message, char *message, INT buf_size)
INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages)
void cm_msg_get_logfile(const char *fac, time_t t, std::string *filename, std::string *linkname, std::string *linktarget)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
struct rpc_server_acception_struct RPC_SERVER_ACCEPTION
BOOL equal_ustring(const char *str1, const char *str2)
INT db_flush_database(HNDLE hDB)
INT db_get_data_index(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT idx, DWORD type)
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
INT db_check_client(HNDLE hDB, HNDLE hKeyClient)
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
INT db_open_record(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info)
INT db_open_database(const char *xdatabase_name, INT database_size, HNDLE *hDB, const char *client_name)
void db_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
INT db_lock_database(HNDLE hDB)
std::string strcomb1(const char **list)
INT db_get_data(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, DWORD type)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_unlock_database(HNDLE hDB)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_get_key(HNDLE hDB, HNDLE hKey, KEY *key)
INT EXPRT db_get_value_string(HNDLE hdb, HNDLE hKeyRoot, const char *key_name, int index, std::string *s, BOOL create, int create_string_length)
INT db_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT db_set_data_index(HNDLE hDB, HNDLE hKey, const void *data, INT data_size, INT idx, DWORD type)
INT db_close_all_records()
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_close_all_databases(void)
INT db_set_data(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_sprintf(char *string, const void *data, INT data_size, INT idx, DWORD type)
INT db_update_last_activity(DWORD millitime)
INT db_set_data1(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_update_record_local(INT hDB, INT hKeyRoot, INT hKey, int index)
void db_set_watchdog_params(DWORD timeout)
INT db_update_record_mserver(INT hDB, INT hKeyRoot, INT hKey, int index, int client_socket)
void db_cleanup2(const char *client_name, int ignore_timeout, DWORD actual_time, const char *who)
int db_delete_client_info(HNDLE hDB, int pid)
INT db_set_client_name(HNDLE hDB, const char *client_name)
INT db_notify_clients_array(HNDLE hDB, HNDLE hKeys[], INT size)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
INT EXPRT db_set_value_string(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const std::string *s)
INT db_set_lock_timeout(HNDLE hDB, int timeout_millisec)
INT db_set_num_values(HNDLE hDB, HNDLE hKey, INT num_values)
INT db_protect_database(HNDLE hDB)
int rb_get_rp(int handle, void **p, int millisec)
int rb_delete(int handle)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
static volatile int _rb_nonblocking
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
int rb_get_buffer_level(int handle, int *n_bytes)
static RING_BUFFER rb[MAX_RING_BUFFER]
INT rpc_add_allowed_host(const char *hostname)
void rpc_convert_data(void *data, INT tid, INT flags, INT total_size, INT convert_flags)
INT rpc_client_connect(const char *host_name, INT port, const char *client_name, HNDLE *hConnection)
#define RPC_BM_ADD_EVENT_REQUEST
INT rpc_register_server(int port, int *plsock, int *pport)
#define RPC_CM_CHECK_CLIENT
static int recv_event_server_realloc(INT idx, RPC_SERVER_ACCEPTION *psa, char **pbuffer, int *pbuffer_size)
INT rpc_get_opt_tcp_size()
INT rpc_client_disconnect(HNDLE hConn, BOOL bShutdown)
#define RPC_BM_SEND_EVENT
INT rpc_client_call(HNDLE hConn, DWORD routine_id,...)
INT rpc_register_functions(const RPC_LIST *new_list, RPC_HANDLER func)
static std::atomic_bool gAllowedHostsEnabled(false)
INT rpc_server_callback(struct callback_addr *pcallback)
static std::mutex _client_connections_mutex
INT rpc_set_timeout(HNDLE hConn, int timeout_msec, int *old_timeout_msec)
#define RPC_CM_SYNCHRONIZE
static std::mutex gAllowedHostsMutex
INT recv_tcp_check(int sock)
INT rpc_server_receive_rpc(int idx, RPC_SERVER_ACCEPTION *sa)
#define RPC_BM_GET_BUFFER_INFO
#define RPC_CM_SET_CLIENT_INFO
const char * rpc_get_mserver_path()
static RPC_SERVER_ACCEPTION * rpc_get_server_acception(int idx)
RPC_SERVER_ACCEPTION * rpc_get_mserver_acception()
void rpc_calc_convert_flags(INT hw_type, INT remote_hw_type, INT *convert_flags)
INT rpc_server_connect(const char *host_name, const char *exp_name)
std::string rpc_get_name()
static RPC_SERVER_ACCEPTION * rpc_new_server_acception()
#define RPC_BM_REMOVE_EVENT_REQUEST
void rpc_debug_printf(const char *format,...)
const char * rpc_tid_name_old(INT id)
int cm_query_transition(int *transition, int *run_number, int *trans_time)
#define RPC_RC_TRANSITION
void rpc_va_arg(va_list *arg_ptr, INT arg_type, void *arg)
INT rpc_server_loop(void)
INT rpc_clear_allowed_hosts()
std::string rpc_get_mserver_hostname(void)
INT rpc_deregister_functions()
bool rpc_is_connected(void)
INT rpc_set_mserver_path(const char *path)
static std::vector< RPC_LIST > rpc_list
#define RPC_BM_CLOSE_BUFFER
static TLS_POINTER * tls_buffer
#define RPC_BM_SET_CACHE_SIZE
static std::vector< RPC_CLIENT_CONNECTION * > _client_connections
static void rpc_call_encode(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
static RPC_CLIENT_CONNECTION * rpc_get_locked_client_connection(HNDLE hConn)
static std::vector< std::string > gAllowedHosts
INT rpc_call(DWORD routine_id,...)
static std::mutex rpc_list_mutex
const char * rpc_tid_name(INT id)
INT rpc_register_client(const char *name, RPC_LIST *list)
static std::vector< RPC_SERVER_ACCEPTION * > _server_acceptions
static INT rpc_socket_check_allowed_host(int sock)
#define RPC_BM_CLOSE_ALL_BUFFERS
INT rpc_server_shutdown(void)
int rpc_flush_event_socket(int timeout_msec)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
static TR_FIFO _tr_fifo[10]
static std::string _mserver_path
INT rpc_get_timeout(HNDLE hConn)
INT rpc_server_disconnect()
INT rpc_set_debug(void(*func)(const char *), INT mode)
INT rpc_client_accept(int lsock)
void rpc_vax2ieee_float(float *var)
INT rpc_execute(INT sock, char *buffer, INT convert_flags)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_GET_BUFFER_LEVEL
int rpc_name_tid(const char *name)
INT rpc_register_listener(int port, RPC_HANDLER func, int *plsock, int *pport)
INT rpc_server_receive_event(int idx, RPC_SERVER_ACCEPTION *sa, int timeout_msec)
#define RPC_BM_OPEN_BUFFER
#define RPC_BM_EMPTY_BUFFERS
INT rpc_server_accept(int lsock)
static std::mutex _tr_fifo_mutex
bool rpc_is_mserver(void)
static RPC_SERVER_ACCEPTION * _mserver_acception
static int recv_net_command_realloc(INT idx, char **pbuf, int *pbufsize, INT *remaining)
void rpc_ieee2vax_float(float *var)
#define RPC_CM_SET_WATCHDOG_PARAMS
INT rpc_send_event1(INT buffer_handle, const EVENT_HEADER *pevent)
#define RPC_BM_RECEIVE_EVENT
static bool _rpc_is_remote
static int rpc_call_decode(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
INT rpc_set_name(const char *name)
INT rpc_register_function(INT id, INT(*func)(INT, void **))
#define RPC_CM_MSG_RETRIEVE
#define RPC_BM_SKIP_EVENT
INT rpc_client_dispatch(int sock)
#define RPC_BM_INIT_BUFFER_COUNTERS
static RPC_SERVER_CONNECTION _server_connection
INT rpc_check_channels(void)
static INT rpc_transition_dispatch(INT idx, void *prpc_param[])
static int handle_msg_odb(int n, const NET_COMMAND *nc)
#define RPC_BM_FLUSH_CACHE
void rpc_ieee2vax_double(double *var)
void rpc_vax2ieee_double(double *var)
#define RPC_CM_GET_WATCHDOG_INFO
INT rpc_get_convert_flags(void)
INT rpc_check_allowed_host(const char *hostname)
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
char exp_name[NAME_LENGTH]
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
char expt_name[NAME_LENGTH]
char buffer_name[NAME_LENGTH]
static const int tid_size[]
static std::string join(const char *sep, const std::vector< std::string > &v)
static std::vector< TRANS_TABLE > _trans_table
static std::atomic_int _message_mask_system
static int disable_bind_rpc_to_localhost
static std::mutex gBuffersMutex
int(* MessagePrintCallback)(const char *)
std::string cm_transition_name(int transition)
static DBG_MEM_LOC * _mem_loc
static std::vector< BUFFER * > gBuffers
static void(* _debug_print)(const char *)
static std::mutex _trans_table_mutex
static std::string _experiment_name
static std::string _path_name
static std::string _client_name
static std::vector< EventRequest > _request_list
static int _rpc_connect_timeout
static const char * tid_name[]
static const ERROR_TABLE _error_table[]
static INT _watchdog_timeout
INT bm_get_buffer_info(INT buffer_handle, BUFFER_HEADER *buffer_header)
static MUTEX_T * _mutex_rpc
static EVENT_HANDLER * _msg_dispatch
void * dbg_calloc(unsigned int size, unsigned int count, char *file, int line)
static BOOL _rpc_registered
static std::atomic< MessagePrintCallback > _message_print
bool ends_with_char(const std::string &s, char c)
void dbg_free(void *adr, char *file, int line)
static std::atomic_int _message_mask_user
static std::mutex _request_list_mutex
INT bm_get_buffer_level(INT buffer_handle, INT *n_bytes)
static TRANS_TABLE _deferred_trans_table[]
static int _rpc_listen_socket
std::string msprintf(const char *format,...)
void * dbg_malloc(unsigned int size, char *file, int line)
static const char * tid_name_old[]
static std::vector< std::string > split(const char *sep, const std::string &s)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
INT bm_init_buffer_counters(INT buffer_handle)
#define DIR_SEPARATOR_STR
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_RPC_TIMEOUT
#define PROGRAM_INFO_STR(_name)
#define MIN_WRITE_CACHE_SIZE
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
#define DEFAULT_MAX_EVENT_SIZE
#define WATCHDOG_INTERVAL
#define MAX_EVENT_REQUESTS
#define BANK_FORMAT_64BIT_ALIGNED
#define BANK_FORMAT_32BIT
std::vector< std::string > STRING_LIST
#define MAX_WRITE_CACHE_SIZE_DIV
#define TRANSITION_ERROR_STRING_LENGTH
#define BANK_FORMAT_VERSION
#define message(type, str)
#define write(n, a, f, d)
static std::string remove(const std::string s, char c)
int gettimeofday(struct timeval *tp, void *tzp)
struct callback_addr callback
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
BUFFER_CLIENT client[MAX_CLIENTS]
BUFFER_INFO(BUFFER *pbuf)
int client_count_write_wait[MAX_CLIENTS]
DWORD client_time_write_wait[MAX_CLIENTS]
int client_count_write_wait[MAX_CLIENTS]
char buffer_name[NAME_LENGTH]
EVENT_HANDLER * dispatcher
NET_COMMAND_HEADER header
unsigned int max_event_size
std::atomic< std::thread * > thread
std::atomic_bool finished
std::vector< int > wait_for_index
std::string waiting_for_client
std::vector< std::unique_ptr< TrClient > > clients
unsigned short host_port1
unsigned short host_port2
unsigned short host_port3
std::vector< exptab_entry > exptab
std::mutex event_sock_mutex
static double fac(double a)
static te_expr * list(state *s)