16#include "git-revision.h"
22#include <sys/resource.h>
62#ifndef DOXYGEN_SHOULD_SKIP_THIS
161extern unsigned _stklen = 60000U;
274 "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"},
331 f =
fopen(
"mem.txt",
"w");
364 f =
fopen(
"mem.txt",
"w");
372static std::vector<std::string>
split(
const char*
sep,
const std::string& s)
375 std::vector<std::string> v;
376 std::string::size_type pos = 0;
378 std::string::size_type next = s.find(
sep, pos);
379 if (next == std::string::npos) {
380 v.push_back(s.substr(pos));
383 v.push_back(s.substr(pos, next-pos));
389static std::string
join(
const char*
sep,
const std::vector<std::string>& v)
393 for (
unsigned i=0;
i<v.size();
i++) {
407 return s[s.length()-1] ==
c;
412 va_start(
ap, format);
415 char *buffer = (
char *)
malloc(size);
420 std::string s(buffer);
463 return msprintf(
"unlisted status code %d", code);
511 if (pos != std::string::npos) {
523 for (
size_t i = 0;
i <
flist.size();
i++) {
524 const char *p =
flist[
i].c_str();
525 if (
strchr(p,
'_') ==
NULL && !(p[0] >=
'0' && p[0] <=
'9')) {
526 size_t pos =
flist[
i].rfind(
'.');
527 if (pos != std::string::npos) {
549 *filename = std::string(
fac) +
".log";
671 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n",
message,
status);
675 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n",
message);
698 "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n",
707 "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n",
739 }
else if (
wr != len) {
740 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n",
message, filename.c_str(), (
int)
wr, (
int)len);
754 const char*
pc = filename +
strlen(filename);
755 while (*
pc !=
'\\' && *
pc !=
'/' &&
pc != filename)
770 if (message_type &
MT_LOG)
782 if (
name.length() > 0)
791 }
else if (message_type ==
MT_USER) {
799 for (
int i=0;
i<10;
i++) {
828 if (message_type !=
MT_LOG) {
870 for (
i = 0;
i < 100;
i++) {
915INT cm_msg(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, ...)
931 if (message_type !=
MT_LOG) {
974 const char *
facility,
const char *routine,
const char *format, ...) {
1081 (*messages)[*
length] =
'\n';
1092 (*messages)[*
length] = 0;
1110 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot open log file \"%s\", errno %d (%s)", filename,
errno,
1127 char *buffer = (
char *)
malloc(size + 1);
1129 if (buffer ==
NULL) {
1130 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (
int) size,
1139 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)",
1148 p = buffer + size - 1;
1152 while (*p ==
'\n' || *p ==
'\r')
1156 for (
n = 0; !
stop && p > buffer;) {
1160 for (
i = 0; p != buffer && (*p !=
'\n' && *p !=
'\r');
i++)
1164 if (
i >= (
int)
sizeof(
str))
1165 i =
sizeof(
str) - 1;
1186 if (
str[0] >=
'0' &&
str[0] <=
'9') {
1200 for (
i = 0;
i < 12;
i++)
1222 if (t == 0 ||
tstamp == -1 ||
1231 while (*p ==
'\n' || *p ==
'\r')
1499 assert(path[0] != 0);
1546 assert(path !=
NULL);
1592#ifdef LOCAL_ROUTINES
1619 if (
getenv(
"MIDAS_DIR")) {
1624 if (
getenv(
"MIDAS_EXPT_NAME")) {
1628 cm_msg(
MERROR,
"cm_read_exptab",
"Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"",
e.name.c_str());
1631 e.directory =
getenv(
"MIDAS_DIR");
1640#if defined (OS_WINNT)
1642 if (
getenv(
"SystemRoot"))
1644 else if (
getenv(
"windir"))
1650 str +=
"\\system32\\exptab";
1651 alt_str +=
"\\system\\exptab";
1652#elif defined (OS_UNIX)
1653 std::string
str =
"/etc/exptab";
1654 std::string
alt_str =
"/exptab";
1656 std::strint
str =
"exptab";
1657 std::string
alt_str =
"exptab";
1661 if (
getenv(
"MIDAS_EXPTAB")) {
1680 memset(buf, 0,
sizeof(buf));
1681 char*
str =
fgets(buf,
sizeof(buf)-1, f);
1684 if (
str[0] == 0)
continue;
1685 if (
str[0] ==
'#')
continue;
1709 e.
name = std::string(
p1, len);
1733 e.directory = std::string(
p1, len);
1754 e.user = std::string(
p1, len);
1768 for (
unsigned j=0;
j<exptab->
exptab.size();
j++) {
1769 cm_msg(
MINFO,
"cm_read_exptab",
"entry %d, experiment \"%s\", directory \"%s\", user \"%s\"",
j, exptab->
exptab[
j].name.c_str(), exptab->
exptab[
j].directory.c_str(), exptab->
exptab[
j].user.c_str());
1873#ifdef LOCAL_ROUTINES
1894 char *client_name,
INT hw_type,
const char *password,
DWORD watchdog_timeout) {
1899#ifdef LOCAL_ROUTINES
1949 strcpy(
name, client_name);
1980 sprintf(
str,
"System/Clients/%0d/Name", pid);
1984 cm_msg(
MERROR,
"cm_set_client_info",
"cannot set client name, db_set_value(%s) status %d",
str,
status);
1989 strcpy(client_name,
name);
2025 size =
sizeof(watchdog_timeout);
2164#ifdef LOCAL_ROUTINES
2188 cm_msg(
MERROR,
"cm_set_experiment_local",
"Experiment \"%s\" directory \"%s\" does not exist",
exp_name1.c_str(),
expdir.c_str());
2203 cm_msg(
MERROR,
"cm_check_connect",
"cm_disconnect_experiment not called at end of program");
2298 const char *client_name,
void (*func)(
char *),
INT odb_size,
DWORD watchdog_timeout) {
2356#ifdef LOCAL_ROUTINES
2370 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create alarm semaphore");
2375 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create elog semaphore");
2380 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create history semaphore");
2385 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create message semaphore");
2404 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open database, db_open_database() status %d",
status);
2415 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/ODB timeout, status %d",
status);
2426 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Protect ODB, status %d",
status);
2437 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Enable core dumps, status %d",
status);
2447 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)",
errno,
2451#warning setrlimit(RLIMIT_CORE) is not available
2460 "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d",
status);
2472 if (watchdog_timeout == 0)
2512 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open message buffer, cm_msg_open_buffer() status %d",
status);
2536 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot register RPC server, cm_register_server() status %d",
status);
2544 size =
sizeof(watchdog_timeout);
2545 sprintf(
str,
"/Programs/%s/Watchdog Timeout", client_name);
2551 std::string path =
"/Programs/" + std::string(client_name);
2554 prog[
"Start command"] == std::string(
""))
2578#ifdef LOCAL_ROUTINES
2635 cm_msg(
MERROR,
"cm_list_experiments_remote",
"Cannot connect to \"%s\" port %d: %s",
hname, port,
errmsg.c_str());
2640 send(sock,
"I", 2, 0);
2661#ifdef LOCAL_ROUTINES
2681 if (
expts.size() == 1) {
2683 }
else if (
expts.size() > 1) {
2684 printf(
"Available experiments on local computer:\n");
2686 for (
unsigned i = 0;
i <
expts.size();
i++) {
2691 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2730 if (
expts.size() > 1) {
2733 for (
unsigned i = 0;
i <
expts.size();
i++) {
2738 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2854 printf(
"Waiting for transition to finish...\n");
2877 cm_msg(
MLOG,
"cm_disconnect_experiment",
"Program %s on host %s stopped", client_name.c_str(),
local_host_name.c_str());
2955#ifndef DOXYGEN_SHOULD_SKIP_THIS
3030#ifndef DOXYGEN_SHOULD_SKIP_THIS
3071#ifdef LOCAL_ROUTINES
3110 printf(
"lock_buffer_guard(invalid) dtor\n");
3246#ifdef LOCAL_ROUTINES
3269 pclient->watchdog_timeout = timeout;
3348#ifdef LOCAL_ROUTINES
3357#ifndef DOXYGEN_SHOULD_SKIP_THIS
3405 "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d",
new_size,
status);
3418 strcpy(buf,
"localhost");
3424 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create the RPC hosts access control list, db_get_value() status %d",
3434 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create \"Disable RPC hosts check\", db_get_value() status %d",
status);
3444 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot find the RPC hosts access control list, db_find_key() status %d",
3454 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot watch the RPC hosts access control list, db_watch() status %d",
status);
3489 size =
sizeof(
name);
3493 cm_msg(
MERROR,
"cm_register_server",
"cannot get client name, db_get_value() status %d",
status);
3500 size =
sizeof(port);
3504 cm_msg(
MERROR,
"cm_register_server",
"cannot get RPC port number, db_get_value(%s) status %d",
str,
status);
3512 cm_msg(
MERROR,
"cm_register_server",
"error, rpc_register_server(port=%d) status %d", port,
status);
3525 cm_msg(
MERROR,
"cm_register_server",
"error, db_find_key(\"Server Port\") status %d",
status);
3535 cm_msg(
MERROR,
"cm_register_server",
"error, db_set_data(\"Server Port\"=%d) status %d", port,
status);
3643 tt.sequence_number = sequence_number;
3756 }
else if (
count > 1) {
3803 "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout",
3824#ifndef DOXYGEN_SHOULD_SKIP_THIS
3879 cm_msg(
MERROR,
"cm_register_deferred_transition",
"Cannot hotlink /Runinfo/Requested Transition");
3914 cm_msg(
MERROR,
"cm_check_deferred_transition",
"Cannot perform deferred transition: %s",
str);
3930#ifndef DOXYGEN_SHOULD_SKIP_THIS
3985 return arg1->sequence_number <
arg2->sequence_number;
4015 const char *buf =
"Success";
4025 tr->end_time = end_time;
4027 tr->errorstr = errorstr;
4029 tr->errorstr =
"(null)";
4093 const char *
args[100];
4111 path +=
"mtransition";
4161 if (errstr !=
NULL) {
4162 sprintf(errstr,
"Cannot execute mtransition, ss_spawnv() returned %d",
status);
4177 int connect_timeout = 10000;
4178 int timeout = 120000;
4203 for (
size_t i = 0;
i <
tr_client->wait_for_index.size();
i++) {
4206 assert(wait_for_index >= 0);
4207 assert(wait_for_index < (
int)s->
clients.size());
4236 printf(
"Client \"%s\" waits for client \"%s\"\n",
tr_client->client_name.c_str(),
wait_for->client_name.c_str());
4243 cm_msg(
MERROR,
"cm_transition_call",
"Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared",
tr_client->client_name.c_str(),
tr_client->transition,
wait_for->client_name.c_str());
4259 printf(
"Connecting to client \"%s\" on host %s...\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4261 cm_msg(
MINFO,
"cm_transition_call",
"cm_transition_call: Connecting to client \"%s\" on host %s...",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4264 size =
sizeof(timeout);
4267 if (connect_timeout < 1000)
4268 connect_timeout = 1000;
4271 size =
sizeof(timeout);
4281 tr_client->connect_timeout = connect_timeout;
4296 "cannot connect to client \"%s\" on host %s, port %d, status %d",
4320 printf(
"Connection established to client \"%s\" on host %s\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4323 "cm_transition: Connection established to client \"%s\" on host %s",
4335 printf(
"Executing RPC transition client \"%s\" on host %s...\n",
4339 "cm_transition: Executing RPC transition client \"%s\" on host %s...",
4367 printf(
"RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n",
4371 "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d",
4391 printf(
"hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n",
4438 for (
size_t i = 0;
i <
n;
i++) {
4446 printf(
"Calling local transition callback\n");
4448 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Calling local transition callback");
4464 printf(
"Local transition callback finished, status %d\n",
int(
tr_client->status));
4466 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Local transition callback finished, status %d",
int(
tr_client->status));
4479 cm_msg(
MERROR,
"cm_transition_call_direct",
"no handler for transition %d with sequence number %d",
tr_client->transition,
tr_client->sequence_number);
4546 errstr_size =
sizeof(
xerrstr);
4562 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
4573 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
4574 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
4576 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
4635 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to alarms: %s",
alarms.c_str());
4636 mstrlcpy(errstr,
"Cannot start run due to alarms: ", errstr_size);
4668 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info required, status %d",
status);
4677 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to program \"%s\" not running",
key.
name);
4693 mstrlcpy(errstr,
"Unknown error", errstr_size);
4695 if (debug_flag == 0) {
4722 if (debug_flag == 1)
4724 if (debug_flag == 2)
4729 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/Run number in database, status %d",
status);
4735 if (debug_flag == 1)
4736 printf(
"Clearing /Runinfo/Requested transition\n");
4737 if (debug_flag == 2)
4738 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Clearing /Runinfo/Requested transition");
4746 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4748 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4757 mstrlcpy(errstr,
"Deferred transition already in progress", errstr_size);
4758 mstrlcat(errstr,
", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size);
4774 size =
sizeof(sequence_number);
4783 if (debug_flag == 1)
4784 printf(
"---- Transition %s deferred by client \"%s\" ----\n",
trname.c_str(),
str);
4785 if (debug_flag == 2)
4786 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s deferred by client \"%s\" ----",
trname.c_str(),
str);
4788 if (debug_flag == 1)
4789 printf(
"Setting /Runinfo/Requested transition\n");
4790 if (debug_flag == 2)
4791 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Setting /Runinfo/Requested transition");
4802 sprintf(errstr,
"Transition %s deferred by client \"%s\"",
trname.c_str(),
str);
4836 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto start, status %d",
status);
4842 start_command[0] = 0;
4844 size =
sizeof(start_command);
4847 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info start command, status %d",
status);
4851 if (start_command[0]) {
4852 cm_msg(
MINFO,
"cm_transition",
"Auto Starting program \"%s\", command \"%s\"",
key.
name,
4887 size =
sizeof(
state);
4893 cm_msg(
MERROR,
"cm_transition",
"cannot get Runinfo/State in database");
4900 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time binary\" in database");
4907 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time\" in database");
4913 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4915 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4945 if (debug_flag == 1)
4946 printf(
"---- Transition %s started ----\n",
trname.c_str());
4947 if (debug_flag == 2)
4948 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s started ----",
trname.c_str());
4954 for (
int i = 0,
status = 0;;
i++) {
4971 size =
sizeof(sequence_number);
4980 c->async_flag = async_flag;
4981 c->debug_flag = debug_flag;
4982 c->sequence_number = sequence_number;
4988 size =
sizeof(client_name);
4990 c->client_name = client_name;
5003 size =
sizeof(port);
5013 if (
cc->client_name ==
c->client_name)
5014 if (
cc->host_name ==
c->host_name)
5015 if (
cc->port ==
c->port)
5016 if (
cc->sequence_number ==
c->sequence_number)
5021 s.
clients.push_back(std::unique_ptr<TrClient>(
c));
5024 cm_msg(
MERROR,
"cm_transition",
"transition %s: client \"%s\" is registered with sequence number %d more than once",
trname.c_str(),
c->client_name.c_str(),
c->sequence_number);
5042 for (
size_t i =
idx - 1; ;
i--) {
5044 if (s.
clients[
i]->sequence_number > 0) {
5070 if (debug_flag == 1)
5071 printf(
"\n==== Found client \"%s\" with sequence number %d\n",
5073 if (debug_flag == 2)
5075 "cm_transition: ==== Found client \"%s\" with sequence number %d",
5093 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: client \"%s\" returned status %d",
trname.c_str(),
5120 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: \"/Runinfo/Transition in progress\" was cleared",
trname.c_str());
5123 mstrlcpy(errstr,
"Canceled", errstr_size);
5148 if (debug_flag == 1)
5149 printf(
"\n---- Transition %s finished ----\n",
trname.c_str());
5150 if (debug_flag == 2)
5151 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s finished ----",
trname.c_str());
5166 size =
sizeof(
state);
5169 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/State in database, db_set_value() status %d",
status);
5220 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto stop, status %d",
status);
5238 mstrlcpy(errstr,
"Success", errstr_size);
5252 cm_msg(
MERROR,
"cm_transition",
"Could not start a run: cm_transition() status %d, message \'%s\'",
status,
5301 cm_msg(
MERROR,
"cm_transition",
"previous transition did not finish yet");
5316 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
5324 int size =
sizeof(
i);
5328 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
5329 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
5331 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
5390#ifndef DOXYGEN_SHOULD_SKIP_THIS
5451 printf(
"Received 2nd Ctrl-C, hard abort\n");
5454 printf(
"Received Ctrl-C, aborting...\n");
5514 std::string command;
5519 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" of type TID_STRING, db_get_value_string() error %d",
5524 for (
int i = 0;;
i++) {
5540 int size =
subkey.item_size;
5541 char *buf = (
char *)
malloc(size);
5542 assert(buf !=
NULL);
5545 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" of type %d, db_get_data() error %d",
5559 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" has invalid type %d, should be TID_STRING or TID_KEY",
5566 if (command.length() > 0) {
5760 mstrlcpy(buf, command,
sizeof(buf));
5761 cm_msg(
MERROR,
"cm_execute",
"cm_execute(%s...) is disabled by ODB \"/Experiment/Enable cm_execute\"", buf);
5766 strcpy(
str, command);
5776 result[
MAX(0,
n)] = 0;
5795#ifndef DOXYGEN_SHOULD_SKIP_THIS
5861 p +=
"/Logger/History/";
5863 p +=
"/History dir";
5924#ifdef LOCAL_ROUTINES
5941 }
else if (
idx >
pbuf->buffer_header->max_client_index) {
5960 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5961 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5964 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5965 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5966 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5969 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5970 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5971 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5983 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
5985 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
5988 cm_msg(
MERROR,
"bm_validate_client_index",
"Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5992 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
5994 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
5997 fprintf(
stderr,
"bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6037#ifdef LOCAL_ROUTINES
6079 pheader =
pbuf->buffer_header;
6087 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist",
pbclient->name,
6099 printf(
"buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6111 cm_msg(
MINFO,
"bm_cleanup",
"Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6114 pbclient->watchdog_timeout / 1000.0);
6137 if (
pbuf->attached) {
6162#ifdef LOCAL_ROUTINES
6176 if (
pbuf->attached) {
6195#ifdef LOCAL_ROUTINES
6200 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6213 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6237 if (
gRpLog && (total_size < 16)) {
6238 const char *
pdata = (
const char *) (pheader + 1);
6240 fprintf(
gRpLog,
"%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->
name, rp, total_size,
6241 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6247 assert(total_size > 0);
6251 if (rp >= pheader->
size) {
6252 rp -= pheader->
size;
6271 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6296 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6317 const char *
pdata = (
const char *) (pheader + 1);
6327 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->
name,
6334 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->
name,
6340 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->
name,
6349 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6357 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->
name, rp,
rp0);
6380 int rp =
c->read_pointer;
6387 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6388 pheader->
name,
c->name,
c->read_pointer, rp,
rp0);
6421 sprintf(
str,
"/System/buffers/%s/Clients/%s/writes_blocked_by",
pbuf->buffer_name,
pbuf->client_name);
6496 double buf_size = pheader->
size;
6572 if (!
pbuf->client_count_write_wait[
i])
6604 if (
pbuf->count_lock ==
pbuf->last_count_lock) {
6610 std::string client_name =
pbuf->client_name;
6615 cm_msg(
MERROR,
"bm_write_buffer_statistics_to_odb",
"Invalid empty buffer name \"%s\" or client name \"%s\"",
buffer_name.c_str(), client_name.c_str());
6619 pbuf->last_count_lock =
pbuf->count_lock;
6623 int client_index =
pbuf->client_index;
6648 cm_msg(
MERROR,
who,
"invalid buffer handle %d: out of range [1..%d]", buffer_handle, (
int)
nbuf);
6656 cm_msg(
MERROR,
who,
"invalid buffer handle %d: empty slot", buffer_handle);
6662 if (!
pbuf->attached) {
6664 cm_msg(
MERROR,
who,
"invalid buffer handle %d: not attached", buffer_handle);
6740 int size =
sizeof(
INT);
6744 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6751#ifdef LOCAL_ROUTINES
6761 cm_msg(
MERROR,
"bm_open_buffer",
"cannot open buffer with zero name");
6778 std::string odb_path;
6779 odb_path +=
"/Experiment/Buffer sizes/";
6782 int size =
sizeof(
INT);
6787 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6798 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6808 *buffer_handle =
i + 1;
6828 *buffer_handle =
i + 1;
6843 pbuf->client_time_write_wait[
i] = 0;
6859 mstrlcpy(
pbuf->client_name, client_name.c_str(),
sizeof(
pbuf->client_name));
6866 pbuf->attached =
true;
6901 pheader->
size = buffer_size;
6911 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"",
buffer_name,
6922 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6933 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6941 if (pheader->
size != buffer_size) {
6942 cm_msg(
MINFO,
"bm_open_buffer",
"Buffer \"%s\" requested size %d differs from existing size %d",
6945 buffer_size = pheader->
size;
6960 pheader =
pbuf->buffer_header;
6966 pbuf->attached =
true;
6968 pbuf->shm_handle = shm_handle;
6969 pbuf->shm_size = shm_size;
6977 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...",
buffer_name,
7047 *buffer_handle =
i+1;
7052 *buffer_handle =
gBuffers.size() + 1;
7089 *buffer_handle =
i + 1;
7110#ifdef LOCAL_ROUTINES
7153 pbuf->read_cache_mutex.unlock();
7160 pbuf->write_cache_mutex.unlock();
7161 pbuf->read_cache_mutex.unlock();
7168 pbuf->attached =
false;
7195 if (
pbuf->read_cache_size > 0) {
7196 free(
pbuf->read_cache);
7198 pbuf->read_cache_size = 0;
7199 pbuf->read_cache_rp = 0;
7200 pbuf->read_cache_wp = 0;
7203 if (
pbuf->write_cache_size > 0) {
7204 free(
pbuf->write_cache);
7206 pbuf->write_cache_size = 0;
7207 pbuf->write_cache_rp = 0;
7208 pbuf->write_cache_wp = 0;
7228 pbuf->shm_handle = 0;
7234 pbuf->write_cache_mutex.unlock();
7235 pbuf->read_cache_mutex.unlock();
7255#ifdef LOCAL_ROUTINES
7263 for (
size_t i =
nbuf;
i > 0;
i--) {
7289#ifdef LOCAL_ROUTINES
7327#ifdef LOCAL_ROUTINES
7346 for (
i = 0;
i < 20;
i++) {
7368#ifdef LOCAL_ROUTINES
7383#ifdef LOCAL_ROUTINES
7440 size =
sizeof(client_name);
7452 size =
sizeof(port);
7467 cm_msg(
MERROR,
"cm_shutdown",
"Cannot connect to client \'%s\' on host \'%s\', port %d",
7470 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7476 cm_msg(
MERROR,
"cm_shutdown",
"Cannot delete client info for client \'%s\', pid %d, status %d",
7493 cm_msg(
MERROR,
"cm_shutdown",
"Client \'%s\' not responding to shutdown command", client_name);
7495 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7501 "Cannot delete client info for client \'%s\', pid %d, status %d",
name,
client_pid,
7555 size =
sizeof(client_name);
7622#ifdef LOCAL_ROUTINES
7637 if (
pbuf->attached) {
7654 (client_name ==
NULL || client_name[0] == 0
7669 "Client \'%s\' on \'%s\' removed by cm_cleanup (idle %1.1lfs, timeout %1.0lfs)",
7719 const char *s =
str;
7752 printf(
"test_expand_env: [%s] -> [%s] expected [%s]",
7766 printf(
"Test expand_end()\n");
7785 printf(
"test_expand_env: all tests passed!\n");
7787 printf(
"test_expand_env: test FAILED!\n");
7796#ifndef DOXYGEN_SHOULD_SKIP_THIS
7825#ifdef LOCAL_ROUTINES
7869#ifdef LOCAL_ROUTINES
7889 *n_bytes += pheader->
size;
7893 if (
pbuf->read_cache_size) {
7897 if (
pbuf->read_cache_wp >
pbuf->read_cache_rp)
7898 *n_bytes +=
pbuf->read_cache_wp -
pbuf->read_cache_rp;
7899 pbuf->read_cache_mutex.unlock();
7909#ifdef LOCAL_ROUTINES
7917 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7918 cm_msg(
MERROR,
"bm_lock_buffer_read_cache",
"Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7923 if (!
pbuf->attached) {
7924 pbuf->read_cache_mutex.unlock();
7925 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7938 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7939 cm_msg(
MERROR,
"bm_lock_buffer_write_cache",
"Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7944 if (!
pbuf->attached) {
7945 pbuf->write_cache_mutex.unlock();
7946 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7961 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7962 cm_msg(
MERROR,
"bm_lock_buffer_mutex",
"Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7967 if (!
pbuf->attached) {
7968 pbuf->buffer_mutex.unlock();
7969 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
8002 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 1 second!\n",
pbuf->buffer_name);
8007 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 10 seconds, buffer semaphore is probably stuck, delete %s.SHM and try again!\n",
pbuf->buffer_name,
pbuf->buffer_name);
8009 if (
pbuf->buffer_header) {
8011 fprintf(
stderr,
"bm_lock_buffer: Buffer \"%s\" client %d \"%s\" pid %d\n",
pbuf->buffer_name,
i,
pbuf->buffer_header->client[
i].name,
pbuf->buffer_header->client[
i].pid);
8018 fprintf(
stderr,
"bm_lock_buffer: Error: Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...\n",
pbuf->buffer_name,
status);
8019 cm_msg(
MERROR,
"bm_lock_buffer",
"Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...",
pbuf->buffer_name,
status);
8027 assert(!
pbuf->locked);
8032 if (
pbuf->buffer_header->client[x].unused1 != 0) {
8033 printf(
"lllock [%s] unused1 %d pid %d\n",
pbuf->buffer_name,
pbuf->buffer_header->client[x].unused1,
getpid());
8036 pbuf->buffer_header->client[x].unused1 =
getpid();
8050 if (
pbuf->attached) {
8051 if (
pbuf->buffer_header->client[x].unused1 !=
getpid()) {
8052 printf(
"unlock [%s] unused1 %d pid %d\n",
pbuf->buffer_header->name,
pbuf->buffer_header->client[x].unused1,
getpid());
8054 pbuf->buffer_header->client[x].unused1 = 0;
8056 printf(
"unlock [??????] unused1 ????? pid %d\n",
getpid());
8061 assert(
pbuf->locked);
8065 pbuf->buffer_mutex.unlock();
8094#ifdef LOCAL_ROUTINES
8108 pbuf->buffer_header->num_in_events = 0;
8109 pbuf->buffer_header->num_out_events = 0;
8154#ifdef LOCAL_ROUTINES
8184 cm_msg(
MERROR,
"bm_set_cache_size",
"requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes",
write_size,
pbuf->buffer_name,
pbuf->buffer_header->size,
new_write_size);
8188 pbuf->buffer_mutex.unlock();
8198 if (
pbuf->read_cache_size > 0) {
8199 free(
pbuf->read_cache);
8206 pbuf->read_cache_size = 0;
8207 pbuf->read_cache_rp = 0;
8208 pbuf->read_cache_wp = 0;
8209 pbuf->read_cache_mutex.unlock();
8210 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
read_size);
8216 pbuf->read_cache_rp = 0;
8217 pbuf->read_cache_wp = 0;
8219 pbuf->read_cache_mutex.unlock();
8229 if (
pbuf->write_cache_size &&
pbuf->write_cache_wp > 0) {
8230 cm_msg(
MERROR,
"bm_set_cache_size",
"buffer \"%s\" lost %zu bytes from the write cache",
pbuf->buffer_name,
pbuf->write_cache_wp);
8234 if (
pbuf->write_cache_size > 0) {
8235 free(
pbuf->write_cache);
8242 pbuf->write_cache_size = 0;
8243 pbuf->write_cache_rp = 0;
8244 pbuf->write_cache_wp = 0;
8245 pbuf->write_cache_mutex.unlock();
8246 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
write_size);
8252 pbuf->write_cache_rp = 0;
8253 pbuf->write_cache_wp = 0;
8255 pbuf->write_cache_mutex.unlock();
8302 static std::mutex mutex;
8309 std::lock_guard<std::mutex>
lock(mutex);
8319#ifndef DOXYGEN_SHOULD_SKIP_THIS
8371#ifdef LOCAL_ROUTINES
8387 if (func ==
NULL &&
pbuf->callback) {
8389 cm_msg(
MERROR,
"bm_add_event_request",
"mixing callback/non callback requests not possible");
8396 cm_msg(
MERROR,
"bm_add_event_request",
"GET_RECENT request not possible if read cache is enabled");
8406 if (!
pclient->event_request[
i].valid)
8415 pclient->event_request[
i].id = request_id;
8419 pclient->event_request[
i].sampling_type = sampling_type;
8434 if (
i + 1 >
pclient->max_request_index)
8435 pclient->max_request_index =
i + 1;
8475 INT sampling_type,
HNDLE *request_id,
8478 assert(request_id !=
NULL);
8530#ifdef LOCAL_ROUTINES
8552 if (
pclient->event_request[
i].valid &&
pclient->event_request[
i].id == request_id) {
8559 if (
pclient->event_request[
i].valid)
8562 pclient->max_request_index =
i + 1;
8567 for (
i = 0;
i <
pclient->max_request_index;
i++)
8596 if (request_id < 0 ||
size_t(request_id) >=
_request_list.size()) {
8601 int buffer_handle =
_request_list[request_id].buffer_handle;
8638 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8647 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8656 if (
pclient->read_pointer < 0) {
8658 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8667 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8676 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8741 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8782 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8799 for (
i = 0;
i <
pc->max_request_index;
i++)
8800 if (
pc->event_request[
i].valid)
8820 if (
pc->pid &&
pc->write_wait) {
8837 for (
size_t i = 0;
i <
n;
i++) {
8854 r.
dispatcher(buffer_handle,
i, pevent, (
void *) (pevent + 1));
8861#ifdef LOCAL_ROUTINES
8865 pbuf->read_cache_rp += total_size;
8867 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
8868 pbuf->read_cache_rp = 0;
8869 pbuf->read_cache_wp = 0;
8875 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp)
8903 if (!
pc->read_wait) {
8910 if (
pc->read_wait) {
8915 if ((
pc->read_pointer < 0) || (
pc->read_pointer >= pheader->
size)) {
8916 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->
name,
pc->name,
pc->read_pointer, pheader->
read_pointer, pheader->
write_pointer, pheader->
size);
8920 char *
pdata = (
char *) (pheader + 1);
8926 if ((total_size <= 0) || (total_size > pheader->
size)) {
8927 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->
name,
pc->name,
pc->read_pointer, pevent->
data_size,
event_size, total_size, pheader->
size, pheader->
read_pointer, pheader->
write_pointer);
8931 assert(total_size > 0);
8946 const char *
pdata = (
const char *) (pheader + 1);
8953 int size = pheader->
size - rp;
8961 const char *
pdata = (
const char *) (pheader + 1);
8968 int size = pheader->
size - rp;
8978 for (
i = 0;
i <
pc->max_request_index;
i++) {
9024 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
9050 if (
pbuf->read_cache_wp + total_size >
pbuf->read_cache_size) {
9059 pbuf->read_cache_wp += total_size;
9079 if (convert_flags) {
9101 char *
pdata = (
char *) (pheader + 1);
9128 free += pheader->
size;
9138 if (
pbuf->wait_start_time != 0) {
9142 pbuf->wait_start_time = 0;
9160 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9175 printf(
"bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->
read_pointer, pheader->
write_pointer, free, pheader->
size,
requested_space,
event_size, total_size);
9180 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9208 for (
j = 0;
j <
pc->max_request_index;
j++) {
9251 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9271 if (
pbuf->wait_start_time == 0) {
9273 pbuf->count_write_wait++;
9313 pbuf->write_cache_mutex.unlock();
9372 pbuf->write_cache_mutex.unlock();
9414 if (!
pc->read_wait) {
9438 if (!
pc->read_wait) {
9452 pbuf->read_cache_mutex.unlock();
9485 pbuf->read_cache_mutex.unlock();
9504 if (
pc->read_wait) {
9514 char *
pdata = (
char *) (pheader + 1);
9600 for (
j = 0;
j <
pc->max_request_index;
j++) {
9612 if (request_id >= 0) {
9614 if (
pc->read_wait) {
9691 if (data_size == 0) {
9692 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9715 const char*
cptr =
event.data();
9716 size_t clen =
event.size();
9722 int sg_n =
event.size();
9732#ifdef LOCAL_ROUTINES
9797 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_ptr[0] is NULL");
9811 if (data_size == 0) {
9812 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9829 cm_msg(
MERROR,
"bm_send_event",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
9835#ifdef LOCAL_ROUTINES
9849 if (
pbuf->write_cache_size) {
9856 if (
pbuf->write_cache_size) {
9863 if (
pbuf->write_cache_wp > 0 && (
pbuf->write_cache_wp + total_size >
pbuf->write_cache_size ||
too_big)) {
9869 pbuf->write_cache_mutex.unlock();
9881 pbuf->write_cache_mutex.unlock();
9884 cm_msg(
MERROR,
"bm_send_event",
"write cache size is bigger than buffer size");
9889 assert(
pbuf->write_cache_wp == 0);
9903 pbuf->write_cache_wp += total_size;
9905 pbuf->write_cache_mutex.unlock();
9911 pbuf->write_cache_mutex.unlock();
9929 printf(
"bm_send_event: corrupted 111!\n");
9935 if (total_size >= (
size_t)pheader->
size) {
9937 cm_msg(
MERROR,
"bm_send_event",
"total event size (%d) larger than size (%d) of buffer \'%s\'", (
int)total_size, pheader->
size, pheader->
name);
9951 printf(
"bm_send_event: corrupted 222!\n");
9980 printf(
"bm_send_event: corrupted 333!\n");
9987 pbuf->count_sent += 1;
9988 pbuf->bytes_sent += total_size;
10070#ifdef LOCAL_ROUTINES
10089 request_id[
i] = -1;
10114 printf(
"bm_flush_cache: corrupted 111!\n");
10137 if (
pbuf->write_cache_wp == 0) {
10143 while (
pbuf->write_cache_rp <
pbuf->write_cache_wp) {
10151 printf(
"bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10152 int(
pbuf->write_cache_size),
10153 int(
pbuf->write_cache_wp),
10154 int(
pbuf->write_cache_rp),
10164 assert(total_size <= (
size_t)pheader->
size);
10170 pbuf->count_sent += 1;
10171 pbuf->bytes_sent += total_size;
10190 pbuf->write_cache_rp += total_size;
10193 assert(
pbuf->write_cache_rp > 0);
10194 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_size);
10195 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_wp);
10199 assert(
pbuf->write_cache_wp ==
pbuf->write_cache_rp);
10200 pbuf->write_cache_wp = 0;
10201 pbuf->write_cache_rp = 0;
10221#ifdef LOCAL_ROUTINES
10232 if (
pbuf->write_cache_size == 0)
10241 if (
pbuf->write_cache_wp == 0) {
10242 pbuf->write_cache_mutex.unlock();
10261 pbuf->write_cache_mutex.unlock();
10270#ifdef LOCAL_ROUTINES
10288 if (
pbuf->read_cache_size > 0) {
10295 if (
pbuf->read_cache_wp == 0) {
10300 pbuf->read_cache_mutex.unlock();
10311 pbuf->read_cache_mutex.unlock();
10340 if (convert_flags) {
10349 char*
cptr = (
char*)pevent;
10353 pbuf->read_cache_mutex.unlock();
10363 pbuf->read_cache_mutex.unlock();
10415 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"",
max_size,
10427 if (convert_flags) {
10431 pbuf->count_read++;
10433 }
else if (dispatch ||
bufptr) {
10437 pbuf->count_read++;
10441 pbuf->count_read++;
10511 assert(!
"incorrect call to bm_receivent_event_rpc()");
10577 assert(!
"incorrect call to bm_receivent_event_rpc()");
10593 assert(!
"incorrect call to bm_receivent_event_rpc()");
10663#ifdef LOCAL_ROUTINES
10743#ifdef LOCAL_ROUTINES
10821#ifdef LOCAL_ROUTINES
10839#ifdef LOCAL_ROUTINES
10844 if (
pbuf->read_cache_size > 0) {
10851 pbuf->read_cache_rp = 0;
10852 pbuf->read_cache_wp = 0;
10854 pbuf->read_cache_mutex.unlock();
10886#ifdef LOCAL_ROUTINES
10902#ifdef LOCAL_ROUTINES
10914 if (!
pbuf->callback)
10963#ifdef LOCAL_ROUTINES
10994 if (
pbuf->attached) {
11103 if (!
fbuf->callback)
11120 if (convert_flags) {
11155 std::vector<char>
vec;
11159 bool locked =
true;
11161 for (
size_t i = 0;
i <
n;
i++) {
11194 cm_msg(
MERROR,
"bm_poll_event",
"received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (
int)
vec.size());
11252#ifdef LOCAL_ROUTINES
11264 if (!
pbuf->attached)
11278#ifndef DOXYGEN_SHOULD_SKIP_THIS
11280#define MAX_DEFRAG_EVENTS 10
11336 "Received new event with ID %d while old fragments were not completed",
11337 (pevent->event_id & 0x0FFF));
11347 "Not enough defragment buffers, please increase MAX_DEFRAG_EVENTS and recompile");
11354 "Received first event fragment with %d bytes instead of %d bytes, event ignored",
11367 cm_msg(
MERROR,
"bm_defragement_event",
"Not enough memory to allocate event defragment buffer");
11389 "Received fragment without first fragment (ID %d) Ser#:%d",
11400 "Received fragments with more data (%d) than event size (%d)",
11453 printf(
"index %d, client \"%s\", host \"%s\", port %d, socket %d, connected %d, timeout %d",
11592 *convert_flags = 0;
11620 unsigned short int lo,
hi;
11623 lo = *((
short int *) (
var) + 1);
11624 hi = *((
short int *) (
var));
11630 *((
short int *) (
var) + 1) =
hi;
11631 *((
short int *) (
var)) =
lo;
11635 unsigned short int lo,
hi;
11638 lo = *((
short int *) (
var) + 1);
11639 hi = *((
short int *) (
var));
11645 *((
short int *) (
var) + 1) =
hi;
11646 *((
short int *) (
var)) =
lo;
11651 unsigned short int i1,
i2,
i3,
i4;
11654 i1 = *((
short int *) (
var) + 3);
11655 i2 = *((
short int *) (
var) + 2);
11656 i3 = *((
short int *) (
var) + 1);
11657 i4 = *((
short int *) (
var));
11663 *((
short int *) (
var) + 3) =
i4;
11664 *((
short int *) (
var) + 2) =
i3;
11665 *((
short int *) (
var) + 1) =
i2;
11666 *((
short int *) (
var)) =
i1;
11670 unsigned short int i1,
i2,
i3,
i4;
11673 i1 = *((
short int *) (
var) + 3);
11674 i2 = *((
short int *) (
var) + 2);
11675 i3 = *((
short int *) (
var) + 1);
11676 i4 = *((
short int *) (
var));
11682 *((
short int *) (
var) + 3) =
i4;
11683 *((
short int *) (
var) + 2) =
i3;
11684 *((
short int *) (
var) + 1) =
i2;
11685 *((
short int *) (
var)) =
i1;
11752 for (
int i = 0;
i <
n;
i++) {
11776 return "<unknown>";
11783 return "<unknown>";
11842 cm_msg(
MERROR,
"rpc_register_functions",
"registered RPC function with invalid ID %d",
new_list[
i].
id);
11862 if (
e.dispatch ==
NULL) {
11875#ifndef DOXYGEN_SHOULD_SKIP_THIS
11963 char net_buffer[256];
11965 int n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
11989 timeout.tv_sec = 0;
11990 timeout.tv_usec = 0;
11995 n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12045 bool debug =
false;
12049 cm_msg(
MERROR,
"rpc_client_connect",
"cm_connect_experiment/rpc_set_name not called");
12055 cm_msg(
MERROR,
"rpc_client_connect",
"invalid port %d", port);
12067 printf(
"rpc_client_connect: host \"%s\", port %d, client \"%s\"\n",
host_name, port, client_name);
12070 printf(
"client connection %d: ", (
int)
i);
12088 if (
c &&
c->connected) {
12095 if ((
c->host_name ==
host_name) && (
c->port == port)) {
12101 std::lock_guard<std::mutex>
cguard(
c->mutex);
12103 if (
c->connected) {
12110 printf(
"already connected: ");
12137 for (
int j = 1;
j < size;
j++) {
12161 printf(
"new connection appended to array: ");
12168 c->connected =
true;
12194 c->client_name = client_name;
12209 int size =
cstr.length() + 1;
12210 i =
send(
c->send_sock,
cstr.c_str(), size, 0);
12211 if (
i < 0 ||
i != size) {
12219 DWORD watchdog_timeout;
12239 cm_msg(
MERROR,
"rpc_client_connect",
"timeout waiting for server reply");
12245 int remote_hw_type = 0;
12250 c->remote_hw_type = remote_hw_type;
12268 c->connected =
true;
12298 if (
c &&
c->connected) {
12299 std::lock_guard<std::mutex>
cguard(
c->mutex);
12301 if (!
c->connected) {
12317 timeout.tv_sec = 0;
12318 timeout.tv_usec = 0;
12349 "RPC client connection to \"%s\" on host \"%s\" is broken, recv() errno %d (%s)",
12350 c->client_name.c_str(),
12351 c->host_name.c_str(),
12355 }
else if (
status == 0) {
12360 cm_msg(
MINFO,
"rpc_client_check",
"RPC client connection to \"%s\" on host \"%s\" unexpectedly closed",
c->client_name.c_str(),
c->host_name.c_str());
12420 char str[200], version[32],
v1[32];
12445 cm_msg(
MERROR,
"rpc_server_connect",
"cm_connect_experiment/rpc_set_name not called");
12471 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12478 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12485 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12502 cm_msg(
MERROR,
"rpc_server_connect",
"cannot connect to mserver on host \"%s\" port %d: %s",
str, port,
errmsg.c_str());
12516 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive status from server");
12520 status = version[0] = 0;
12529 strcpy(
v1, version);
12540 cm_msg(
MERROR,
"rpc_server_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", version,
12551 timeout.tv_usec = 0;
12563 cm_msg(
MERROR,
"rpc_server_connect",
"mserver subprocess could not be started (check path)");
12575 cm_msg(
MERROR,
"rpc_server_connect",
"accept() failed");
12589 flag = 2 * 1024 * 1024;
12604 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive remote computer info");
12625 if (
c &&
c->connected) {
12628 if (!
c->connected) {
12648 if (
c &&
c->connected) {
12665 if (!
c->connected) {
12874 dummy = 0x12345678;
12875 p = (
unsigned char *) &
dummy;
12878 else if (*p == 0x12)
12881 cm_msg(
MERROR,
"rpc_get_option",
"unknown byte order format");
12884 f = (
float) 1.2345;
12887 if ((
dummy & 0xFF) == 0x19 &&
12888 ((
dummy >> 8) & 0xFF) == 0x04 && ((
dummy >> 16) & 0xFF) == 0x9E
12889 && ((
dummy >> 24) & 0xFF) == 0x3F)
12891 else if ((
dummy & 0xFF) == 0x9E &&
12892 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x19
12893 && ((
dummy >> 24) & 0xFF) == 0x04)
12896 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12901 if ((
dummy & 0xFF) == 0x8D &&
12902 ((
dummy >> 8) & 0xFF) == 0x97 && ((
dummy >> 16) & 0xFF) == 0x6E
12903 && ((
dummy >> 24) & 0xFF) == 0x12)
12905 else if ((
dummy & 0xFF) == 0x83 &&
12906 ((
dummy >> 8) & 0xFF) == 0xC0 && ((
dummy >> 16) & 0xFF) == 0xF3
12907 && ((
dummy >> 24) & 0xFF) == 0x3F)
12909 else if ((
dummy & 0xFF) == 0x13 &&
12910 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x83
12911 && ((
dummy >> 24) & 0xFF) == 0xC0)
12913 else if ((
dummy & 0xFF) == 0x9E &&
12914 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x18
12915 && ((
dummy >> 24) & 0xFF) == 0x04)
12917 "MIDAS cannot handle VAX D FLOAT format. Please compile with the /g_float flag");
12919 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12943 else if (
hConn == -2)
12990 int timeout =
c->rpc_timeout;
13035#ifndef DOXYGEN_SHOULD_SKIP_THIS
13188 va_start(
argptr, format);
13189 vsprintf(
str, (
char *) format,
argptr);
13241 bool debug =
false;
13244 printf(
"encode rpc_id %d \"%s\"\n",
rl.id,
rl.name);
13245 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13246 int tid =
rl.param[
i].tid;
13247 int flags =
rl.param[
i].flags;
13248 int n =
rl.param[
i].n;
13249 printf(
"i=%d, tid %d, flags 0x%x, n %d\n",
i, tid, flags,
n);
13255 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13256 int tid =
rl.param[
i].tid;
13257 int flags =
rl.param[
i].flags;
13278 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13279 char* buf = (
char *)
malloc(buf_size);
13289 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13290 int tid =
rl.param[
i].tid;
13291 int flags =
rl.param[
i].flags;
13311 char* arg =
args[
i];
13383 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy pointer %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13388 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, double->float\n",
i, flags, tid,
arg_type,
arg_size, param_size);
13394 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13406 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n",
rl.id,
rl.name, (
int)buf_size, (*nc)->header.param_size);
13412 bool debug =
false;
13415 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n",
rl.id,
rl.name, (
int)buf_size);
13421 for (
int i = 0;
rl.param[
i].tid != 0;
i++) {
13422 int tid =
rl.param[
i].tid;
13423 int flags =
rl.param[
i].flags;
13444 cm_msg(
MERROR,
"rpc_call_decode",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL",
rl.name);
13448 tid =
rl.param[
i].tid;
13466 if (*((
char **) arg)) {
13468 printf(
"decode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13535 if (
rpc_list[
i].
id == (
int) routine_id) {
13545 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" with invalid RPC ID %d",
c->client_name.c_str(),
c->host_name.c_str(), routine_id);
13554 va_start(
ap, routine_id);
13572 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13594 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13604 DWORD watchdog_timeout;
13609 if (
c->rpc_timeout >= (
int) watchdog_timeout) {
13615 DWORD buf_size = 0;
13626 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": timeout waiting for reply",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13634 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, ss_recv_net_command() status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
status);
13644 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, unknown RPC, status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
rpc_status);
13652 va_start(
ap, routine_id);
13709 fprintf(
stderr,
"rpc_call(routine_id=%d) failed, no connection to mserver.\n", routine_id);
13733 if (
rpc_list[
i].
id == (
int) routine_id) {
13745 cm_msg(
MERROR,
"rpc_call",
"invalid rpc ID (%d)", routine_id);
13754 va_start(
ap, routine_id);
13797 DWORD watchdog_timeout;
13807 if (rpc_timeout >= (
int) watchdog_timeout) {
13814 DWORD buf_size = 0;
13830 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": timeout waiting for reply, program abort",
rpc_name);
13854 va_start(
ap, routine_id);
13914 return bm_send_event(buffer_handle, pevent, unused, async_flag);
13941 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_ptr[0] is NULL");
13946 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)
sg_len[0], (
int)
sizeof(
EVENT_HEADER));
13955 if (data_size == 0) {
13956 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size zero");
13974 cm_msg(
MERROR,
"rpc_send_event_sg",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
14000 assert(
sizeof(
DWORD) == 4);
14006 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(buffer handle) failed, event socket is now closed");
14016 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(event data) failed, event socket is now closed");
14023 if (
count < total_size) {
14024 char padding[8] = { 0,0,0,0,0,0,0,0 };
14030 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(padding) failed, event socket is now closed");
14096 for (
size_t i = 0;
i <
n;
i++) {
14101 if (
tt.transition ==
CINT(0) &&
tt.sequence_number ==
CINT(4)) {
14119 cm_msg(
MERROR,
"rpc_transition_dispatch",
"no handler for transition %d with sequence number %d",
CINT(0),
CINT(4));
14122 cm_msg(
MERROR,
"rpc_transition_dispatch",
"received unrecognized command %d",
idx);
14183 for (
i = 0;
i < (size - 1) / 16 + 1;
i++) {
14185 for (
j = 0;
j < 16;
j++)
14186 if (
i * 16 +
j < size)
14192 for (
j = 0;
j < 16;
j++) {
14194 if (
i * 16 +
j < size)
14195 printf(
"%c", (
c >= 32 &&
c < 128) ? p[
i * 16 +
j] :
'.');
14234 char *buffer =
NULL;
14258 int param_size = -1;
14267 if (param_size == -1) {
14307 int size = write_ptr - read_ptr;
14312 read_ptr = write_ptr;
14319 }
while (write_ptr == -1 &&
errno ==
EINTR);
14325 if (write_ptr <= 0) {
14326 if (write_ptr == 0)
14337 read_ptr = misalign;
14338 write_ptr += misalign;
14340 misalign = write_ptr % 8;
14350 if (write_ptr - read_ptr < param_size)
14418 int sock =
psa->event_sock;
14444 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d",
hrd);
14453 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(more header) returned %d",
hrd1);
14470 if (
psa->convert_flags) {
14485 for (
int i=0;
i<5;
i++) {
14486 printf(
"recv_event_server: header[%d]: 0x%08x\n",
i,
pbh[
i]);
14494 "received event header with invalid data_size %d: event_size %d, total_size %d", pevent->
data_size,
14622 cm_msg(
MERROR,
"rpc_register_server",
"cannot listen to tcp port %d: %s", port,
errmsg.c_str());
14627#if defined(F_SETFD) && defined(FD_CLOEXEC)
14724 assert(return_buffer);
14734 if (convert_flags) {
14766 cm_msg(
MERROR,
"rpc_execute",
"Invalid rpc ID (%d)", routine_id);
14779 for (
i = 0;
rl.param[
i].tid != 0;
i++) {
14780 tid =
rl.param[
i].tid;
14781 flags =
rl.param[
i].flags;
14794 param_size =
ALIGN8(param_size);
14805 if (convert_flags) {
14858 "return parameters (%d) too large for network buffer (%d)",
14868 "rpc_execute: return parameters (%d) too large for network buffer (%d), new buffer size (%d)",
14884 assert(return_buffer);
14901 if (
rl.param[
i + 1].tid)
14940 for (
i = 0;
rl.param[
i].tid != 0;
i++)
14942 tid =
rl.param[
i].tid;
14943 flags =
rl.param[
i].flags;
14955 param_size =
ALIGN8(param_size);
14983 param_size =
ALIGN8(param_size);
14995 if (convert_flags) {
15011 nc_out->header.param_size = param_size;
15016 if (convert_flags) {
15073 printf(
"rpc_test_rpc!\n");
15102 for (
int i=0;
i<10;
i++) {
15134 printf(
"int_out mismatch!\n");
15139 printf(
"int_inout mismatch!\n");
15149 printf(
"string2_out mismatch [%s] vs [%s]\n",
string2_out,
"second string_out");
15154 printf(
"string_inout mismatch [%s] vs [%s]\n",
string_inout,
"return string_inout");
15166 if (
pkey->type != 444 ||
pkey->num_values != 555 ||
strcmp(
pkey->name,
"out_name") ||
pkey->last_written != 666) {
15167 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15173 if (
pkey->type != 444444 ||
pkey->num_values != 555555 ||
strcmp(
pkey->name,
"inout_name") ||
pkey->last_written != 666666) {
15174 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15289 if (
strcmp(hostname,
"localhost") == 0)
15292 if (
strcmp(hostname,
"localhost.localdomain") == 0)
15295 if (
strcmp(hostname,
"localhost6") == 0)
15298 if (
strcmp(hostname,
"ip6-localhost") == 0)
15306 if (h == hostname) {
15323 std::string hostname;
15339 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\', this message will no longer be reported", hostname.c_str());
15341 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\'. Add this host to \"/Experiment/Security/RPC hosts/Allowed hosts\"", hostname.c_str());
15377 char net_buffer[256];
15422#ifdef LOCAL_ROUTINES
15425 for (
unsigned i=0;
i<exptab.
exptab.size();
i++) {
15427 const char*
str = exptab.
exptab[
i].name.c_str();
15430 send(sock,
"", 1, 0);
15452 while (*ptr ==
' ')
15456 for (; *ptr != 0 && *ptr !=
' ' &&
i < (
int)
sizeof(version) - 1;)
15457 version[
i++] = *ptr++;
15460 assert(
i < (
int)
sizeof(version));
15464 for (; *ptr != 0 && *ptr !=
' ';)
15467 while (*ptr ==
' ')
15471 for (; *ptr != 0 && *ptr !=
' ' && *ptr !=
'\n' && *ptr !=
'\r' &&
i < (
int)
sizeof(
experiment) - 1;)
15494 cm_msg(
MERROR,
"rpc_server_accept",
"received string: %s", net_buffer + 2);
15509#ifdef LOCAL_ROUTINES
15515 bool found =
false;
15534 send(sock,
"2", 2, 0);
15553 const char *
argv[10];
15588 cm_msg(
MERROR,
"rpc_server_accept",
"received unknown command '%c' code %d", command, command);
15633 char net_buffer[256], *p;
15654 i =
recv_string(sock, net_buffer,
sizeof(net_buffer), 10000);
15661 p =
strtok(net_buffer,
" ");
15731 int recv_sock, send_sock, event_sock;
15736 char net_buffer[256];
15778 flag = 2 * 1024 * 1024;
15781 cm_msg(
MERROR,
"rpc_server_callback",
"cannot setsockopt(SOL_SOCKET, SO_RCVBUF), errno %d (%s)",
errno,
15786 cm_msg(
MERROR,
"rpc_server_callback",
"timeout on receive remote computer info");
15913 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"rpc_execute() returned %d, abort",
status);
15940 cm_msg(
MTALK,
"rpc_server_receive_rpc",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16026 cm_msg(
MERROR,
"rpc_server_receive_event",
"internal error: called recursively");
16043 cm_msg(
MERROR,
"rpc_server_receive_event",
"recv_event_server_realloc() returned %d, abort",
n_received);
16071 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d (SS_ABORT), abort",
status);
16082 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d, mserver dropped this event",
status);
16100 cm_msg(
MTALK,
"rpc_server_receive_event",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16308 if (convert_flags) {
16317 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, send_tcp() returned %d",
16353 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec",
16372 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, recv_tcp() returned %d",
16420#ifndef DOXYGEN_SHOULD_SKIP_THIS
16571 if (((
PTYPE) event & 0x07) != 0) {
16572 cm_msg(
MERROR,
"bk_create",
"Bank %s created with unaligned event pointer",
name);
16588 pbk32->data_size = 0;
16596 pbk->data_size = 0;
16602#ifndef DOXYGEN_SHOULD_SKIP_THIS
16723 }
while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16745 }
while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16767 }
while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16795 return pbk32a->data_size;
16800 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk32->name[0],
pbk32->name[1],
pbk32->name[2],
pbk32->name[3]);
16802 return pbk32->data_size;
16806 if (size > 0xFFFF) {
16807 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16810 pbk->data_size = (
WORD) (size);
16812 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk->name[0],
pbk->name[1],
pbk->name[2],
pbk->name[3]);
16814 if (size > 0xFFFF) {
16815 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16819 return pbk->data_size;
16906 while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16911 return pbk32a->data_size;
16919 while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16924 return pbk32->data_size;
16932 while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16937 return pbk->data_size;
17060 *((
void **)
pdata) = (*pbk) + 1;
17067 return (*pbk)->data_size;
17072#ifndef DOXYGEN_SHOULD_SKIP_THIS
17100 *((
void **)
pdata) = (*pbk) + 1;
17108 return (*pbk)->data_size;
17136 *((
void **)
pdata) = (*pbk32a) + 1;
17144 return (*pbk32a)->data_size;
17176 if (
pbh->flags < 0x10000 && !
force)
17222 while ((
char *)
pdata < (
char *)
pbk) {
17232 while ((
char *)
pdata < (
char *)
pbk) {
17241 while ((
char *)
pdata < (
char *)
pbk) {
17263#ifndef DOXYGEN_SHOULD_SKIP_THIS
17288#define MAX_RING_BUFFER 100
17380 assert(
rb[
i].buffer);
17477 rp >
rb[h].buffer) {
17539 cm_msg(
MERROR,
"rb_increment_wp",
"event size of %d MB larger than max_event_size of %d MB",
17550 assert(
rb[h].rp !=
rb[h].buffer);
17609 if (
rb[h].wp !=
rb[h].rp) {
17611 *p =
rb[handle - 1].
rp;
17677 if (
new_rp >= ep &&
rb[h].wp < ep)
17719 if (
rb[h].wp >=
rb[h].rp)
17737 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event %d ODB record size mismatch, db_set_record() status %d", pevent->
event_id,
status);
17762 for (
n=0 ; ;
n++) {
17818 cm_msg(
MERROR,
"cm_write_event_to_odb",
"please define bank \"%s\" in BANK_LIST in frontend",
name);
17823 for (
i = 0;;
i++) {
17836 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17850 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot create key for bank \"%s\" with tid %d in ODB, db_create_key() status %d",
name,
bktype,
status);
17855 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot find key for bank \"%s\" in ODB, after db_create_key(), db_find_key() status %d",
name,
status);
17862 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17876 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event format %d is not supported (see midas.h definitions of FORMAT_xxx)", format);
std::atomic_bool connected
BUFFER * get_pbuf() const
bm_lock_buffer_guard(BUFFER *pbuf, bool do_not_lock=false)
bm_lock_buffer_guard & operator=(const bm_lock_buffer_guard &)=delete
bm_lock_buffer_guard(const bm_lock_buffer_guard &)=delete
static bool exists(const std::string &name)
INT transition(INT run_number, char *error)
INT al_get_alarms(std::string *presult)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
INT bk_iterate32a(const void *event, BANK32A **pbk32a, void *pdata)
static void copy_bk_name(char *dst, const char *src)
INT bk_swap(void *event, BOOL force)
BOOL bk_is32a(const void *event)
int bk_delete(void *event, const char *name)
BOOL bk_is32(const void *event)
INT bk_iterate32(const void *event, BANK32 **pbk, void *pdata)
INT bk_locate(const void *event, const char *name, void *pdata)
void bk_init(void *event)
INT bk_list(const void *event, char *bklist)
INT bk_copy(char *pevent, char *psrce, const char *bkname)
INT bk_iterate(const void *event, BANK **pbk, void *pdata)
void bk_init32(void *event)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_find(const BANK_HEADER *pbkh, const char *name, DWORD *bklen, DWORD *bktype, void **pdata)
INT bk_size(const void *event)
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
static int bm_skip_event(BUFFER *pbuf)
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_write_statistics_to_odb(void)
#define MAX_DEFRAG_EVENTS
INT bm_delete_request(INT request_id)
INT bm_close_all_buffers(void)
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
static int bm_validate_client_index_locked(bm_lock_buffer_guard &pbuf_guard)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
static int bm_validate_buffer_locked(const BUFFER *pbuf)
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_compose_event_threadsafe(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
INT bm_remove_event_request(INT buffer_handle, INT request_id)
static double _bm_mutex_timeout_sec
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
static EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
static void bm_update_last_activity(DWORD millitime)
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
static DWORD _bm_max_event_size
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
INT bm_get_buffer_handle(const char *buffer_name, INT *buffer_handle)
int bm_send_event_vec(int buffer_handle, const std::vector< char > &event, int timeout_msec)
static int _bm_lock_timeout
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
INT bm_receive_event_alloc(INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
static void bm_reset_buffer_locked(BUFFER *pbuf)
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
INT cm_set_path(const char *path)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
static int cm_transition_call(TrState *s, int idx)
INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown)
static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info)
static std::atomic< std::thread * > _watchdog_thread
static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
INT cm_connect_client(const char *client_name, HNDLE *hConn)
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
INT cm_list_experiments_local(STRING_LIST *exp_names)
INT cm_start_watchdog_thread()
static INT bm_push_event(const char *buffer_name)
INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg)
static int tr_finish(HNDLE hDB, TrState *tr, int transition, int status, const char *errorstr)
INT cm_set_client_run_state(INT state)
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_stop_watchdog_thread()
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_check_client(HNDLE hDB, HNDLE hKeyClient)
static int xbm_lock_buffer(BUFFER *pbuf)
static void xbm_unlock_buffer(BUFFER *pbuf)
INT cm_dispatch_ipc(const char *message, int message_size, int client_socket)
static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_select_experiment_remote(const char *host_name, std::string *exp_name)
INT cm_register_server(void)
static BOOL _ctrlc_pressed
static void init_rpc_hosts(HNDLE hDB)
void cm_ack_ctrlc_pressed()
INT cm_execute(const char *command, char *result, INT bufsize)
INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
void cm_check_connect(void)
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
INT cm_select_experiment_local(std::string *exp_name)
std::string cm_expand_env(const char *str)
std::string cm_get_client_name()
static int bm_lock_buffer_mutex(BUFFER *pbuf)
int cm_exec_script(const char *odb_path_to_script)
INT EXPRT cm_get_path_string(std::string *path)
static void rpc_client_shutdown()
static std::atomic< bool > _watchdog_thread_is_running
static exptab_struct _exptab
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
static INT bm_notify_client(const char *buffer_name, int s)
int cm_get_exptab(const char *expname, std::string *dir, std::string *user)
static void xcm_watchdog_thread()
static bool test_cm_expand_env1(const char *str, const char *expected)
INT cm_synchronize(DWORD *seconds)
std::string cm_get_exptab_filename()
std::string cm_get_path()
static DWORD _deferred_transition_mask
std::string cm_get_history_path(const char *history_channel)
void cm_test_expand_env()
INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL))
int cm_set_experiment_local(const char *exp_name)
static INT _requested_transition
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
const char * cm_get_version()
INT cm_read_exptab(exptab_struct *exptab)
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
INT cm_deregister_transition(INT transition)
INT cm_check_deferred_transition()
std::string cm_get_experiment_name()
INT cm_set_transition_sequence(INT transition, INT sequence_number)
static bool tr_compare(const std::unique_ptr< TrClient > &arg1, const std::unique_ptr< TrClient > &arg2)
INT cm_delete_client_info(HNDLE hDB, INT pid)
static std::atomic< bool > _watchdog_thread_run
const char * cm_get_revision()
INT cm_watchdog_thread(void *unused)
INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient)
BOOL cm_is_ctrlc_pressed()
static INT tr_main_thread(void *param)
void cm_ctrlc_handler(int sig)
INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout)
INT cm_transition_cleanup()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
static int cm_transition_call_direct(TrClient *tr_client)
INT cm_exist(const char *name, BOOL bUnique)
INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg)
static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_set_experiment_name(const char *name)
#define CM_INVALID_TRANSITION
#define CM_DEFERRED_TRANSITION
#define CM_TRANSITION_IN_PROGRESS
#define CM_WRONG_PASSWORD
#define CM_TRANSITION_CANCELED
#define CM_VERSION_MISMATCH
#define BM_INVALID_MIXING
#define BM_INVALID_HANDLE
#define DB_INVALID_HANDLE
#define DB_NO_MORE_SUBKEYS
#define RPC_EXCEED_BUFFER
#define RPC_DOUBLE_DEFINED
#define RPC_NOT_REGISTERED
#define RPC_MUTEX_TIMEOUT
#define RPC_NO_CONNECTION
#define RPC_HNDLE_CONNECT
#define RPC_HNDLE_MSERVER
#define VALIGN(adr, align)
RPC_LIST * rpc_get_internal_list(INT flag)
#define MESSAGE_BUFFER_NAME
#define DRI_LITTLE_ENDIAN
#define MAX_STRING_LENGTH
#define MESSAGE_BUFFER_SIZE
void() EVENT_HANDLER(HNDLE buffer_handler, HNDLE request_id, EVENT_HEADER *event_header, void *event_data)
INT() RPC_HANDLER(INT index, void *prpc_param[])
std::string ss_gethostname()
INT ss_suspend(INT millisec, INT msg)
INT ss_get_struct_align()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_suspend_init_odb_port()
bool ss_event_socket_has_data()
time_t ss_mktime(struct tm *tms)
int ss_file_exist(const char *path)
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
INT recv_tcp2(int sock, char *net_buffer, int buffer_size, int timeout_ms)
INT ss_suspend_set_client_listener(int listen_socket)
INT ss_socket_get_peer_name(int sock, std::string *hostp, int *portp)
int ss_file_link_exist(const char *path)
INT ss_mutex_delete(MUTEX_T *mutex)
int ss_socket_wait(int sock, INT millisec)
INT ss_suspend_set_server_acceptions(RPC_SERVER_ACCEPTION_LIST *acceptions)
DWORD ss_settime(DWORD seconds)
char * ss_getpass(const char *prompt)
INT ss_suspend_set_client_connection(RPC_SERVER_CONNECTION *connection)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
INT recv_string(int sock, char *buffer, DWORD buffer_size, INT millisec)
INT ss_write_tcp(int sock, const char *buffer, size_t buffer_size)
int ss_dir_exist(const char *path)
bool ss_timed_mutex_wait_for_sec(std::timed_mutex &mutex, const char *mutex_name, double timeout_sec)
INT ss_semaphore_release(HNDLE semaphore_handle)
int ss_file_copy(const char *src, const char *dst, bool append)
INT recv_tcp(int sock, char *net_buffer, DWORD buffer_size, INT flags)
INT ss_resume(INT port, const char *message)
midas_thread_t ss_gettid(void)
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
INT ss_sleep(INT millisec)
INT ss_socket_connect_tcp(const char *hostname, int tcp_port, int *sockp, std::string *error_msg_p)
INT ss_semaphore_wait_for(HNDLE semaphore_handle, DWORD timeout_millisec)
INT ss_socket_listen_tcp(bool listen_localhost, int tcp_port, int *sockp, int *tcp_port_p, std::string *error_msg_p)
char * ss_crypt(const char *buf, const char *salt)
INT ss_spawnv(INT mode, const char *cmdname, const char *const argv[])
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
INT ss_socket_close(int *sockp)
char * ss_gets(char *string, int size)
INT ss_recv_net_command(int sock, DWORD *routine_id, DWORD *param_size, char **param_ptr, int timeout_ms)
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
INT send_tcp(int sock, char *buffer, DWORD buffer_size, INT flags)
void * ss_ctrlc_handler(void(*func)(int))
BOOL ss_pid_exists(int pid)
std::string ss_get_executable(void)
INT ss_system(const char *command)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT ss_file_find(const char *path, const char *pattern, char **plist)
static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages)
INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format,...)
int cm_msg_early_init(void)
INT EXPRT cm_msg_facilities(STRING_LIST *list)
int cm_msg_open_buffer(void)
int cm_msg_close_buffer(void)
static std::mutex gMsgBufMutex
static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message)
INT cm_msg_register(EVENT_HANDLER *func)
INT cm_msg_log(INT message_type, const char *facility, const char *message)
INT cm_msg_flush_buffer()
static std::deque< msg_buffer_entry > gMsgBuf
static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message)
std::string cm_get_error(INT code)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr)
INT cm_msg_retrieve(INT n_message, char *message, INT buf_size)
INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages)
void cm_msg_get_logfile(const char *fac, time_t t, std::string *filename, std::string *linkname, std::string *linktarget)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
struct rpc_server_acception_struct RPC_SERVER_ACCEPTION
BOOL equal_ustring(const char *str1, const char *str2)
INT db_flush_database(HNDLE hDB)
INT db_get_data_index(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT idx, DWORD type)
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
INT db_check_client(HNDLE hDB, HNDLE hKeyClient)
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
INT db_open_record(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info)
INT db_open_database(const char *xdatabase_name, INT database_size, HNDLE *hDB, const char *client_name)
void db_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
INT db_lock_database(HNDLE hDB)
std::string strcomb1(const char **list)
INT db_get_data(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, DWORD type)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_unlock_database(HNDLE hDB)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_get_key(HNDLE hDB, HNDLE hKey, KEY *key)
INT EXPRT db_get_value_string(HNDLE hdb, HNDLE hKeyRoot, const char *key_name, int index, std::string *s, BOOL create, int create_string_length)
INT db_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT db_set_data_index(HNDLE hDB, HNDLE hKey, const void *data, INT data_size, INT idx, DWORD type)
INT db_close_all_records()
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_close_all_databases(void)
INT db_set_data(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_sprintf(char *string, const void *data, INT data_size, INT idx, DWORD type)
INT db_update_last_activity(DWORD millitime)
INT db_set_data1(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_update_record_local(INT hDB, INT hKeyRoot, INT hKey, int index)
void db_set_watchdog_params(DWORD timeout)
INT db_update_record_mserver(INT hDB, INT hKeyRoot, INT hKey, int index, int client_socket)
void db_cleanup2(const char *client_name, int ignore_timeout, DWORD actual_time, const char *who)
int db_delete_client_info(HNDLE hDB, int pid)
INT db_set_client_name(HNDLE hDB, const char *client_name)
INT db_notify_clients_array(HNDLE hDB, HNDLE hKeys[], INT size)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
INT EXPRT db_set_value_string(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const std::string *s)
INT db_set_lock_timeout(HNDLE hDB, int timeout_millisec)
INT db_set_num_values(HNDLE hDB, HNDLE hKey, INT num_values)
INT db_protect_database(HNDLE hDB)
int rb_get_rp(int handle, void **p, int millisec)
int rb_delete(int handle)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
static volatile int _rb_nonblocking
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
int rb_get_buffer_level(int handle, int *n_bytes)
static RING_BUFFER rb[MAX_RING_BUFFER]
INT rpc_add_allowed_host(const char *hostname)
void rpc_convert_data(void *data, INT tid, INT flags, INT total_size, INT convert_flags)
INT rpc_client_connect(const char *host_name, INT port, const char *client_name, HNDLE *hConnection)
#define RPC_BM_ADD_EVENT_REQUEST
INT rpc_register_server(int port, int *plsock, int *pport)
#define RPC_CM_CHECK_CLIENT
static int recv_event_server_realloc(INT idx, RPC_SERVER_ACCEPTION *psa, char **pbuffer, int *pbuffer_size)
INT rpc_get_opt_tcp_size()
INT rpc_client_disconnect(HNDLE hConn, BOOL bShutdown)
#define RPC_BM_SEND_EVENT
INT rpc_client_call(HNDLE hConn, DWORD routine_id,...)
INT rpc_register_functions(const RPC_LIST *new_list, RPC_HANDLER func)
static std::atomic_bool gAllowedHostsEnabled(false)
INT rpc_server_callback(struct callback_addr *pcallback)
static std::mutex _client_connections_mutex
INT rpc_set_timeout(HNDLE hConn, int timeout_msec, int *old_timeout_msec)
#define RPC_CM_SYNCHRONIZE
static std::mutex gAllowedHostsMutex
INT recv_tcp_check(int sock)
INT rpc_server_receive_rpc(int idx, RPC_SERVER_ACCEPTION *sa)
#define RPC_BM_GET_BUFFER_INFO
#define RPC_CM_SET_CLIENT_INFO
const char * rpc_get_mserver_path()
static RPC_SERVER_ACCEPTION * rpc_get_server_acception(int idx)
RPC_SERVER_ACCEPTION * rpc_get_mserver_acception()
void rpc_calc_convert_flags(INT hw_type, INT remote_hw_type, INT *convert_flags)
INT rpc_server_connect(const char *host_name, const char *exp_name)
std::string rpc_get_name()
static RPC_SERVER_ACCEPTION * rpc_new_server_acception()
#define RPC_BM_REMOVE_EVENT_REQUEST
void rpc_debug_printf(const char *format,...)
const char * rpc_tid_name_old(INT id)
int cm_query_transition(int *transition, int *run_number, int *trans_time)
#define RPC_RC_TRANSITION
void rpc_va_arg(va_list *arg_ptr, INT arg_type, void *arg)
INT rpc_server_loop(void)
INT rpc_clear_allowed_hosts()
std::string rpc_get_mserver_hostname(void)
INT rpc_deregister_functions()
bool rpc_is_connected(void)
INT rpc_set_mserver_path(const char *path)
static std::vector< RPC_LIST > rpc_list
#define RPC_BM_CLOSE_BUFFER
static TLS_POINTER * tls_buffer
#define RPC_BM_SET_CACHE_SIZE
static std::vector< RPC_CLIENT_CONNECTION * > _client_connections
static void rpc_call_encode(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
static RPC_CLIENT_CONNECTION * rpc_get_locked_client_connection(HNDLE hConn)
static std::vector< std::string > gAllowedHosts
INT rpc_call(DWORD routine_id,...)
static std::mutex rpc_list_mutex
const char * rpc_tid_name(INT id)
INT rpc_register_client(const char *name, RPC_LIST *list)
static std::vector< RPC_SERVER_ACCEPTION * > _server_acceptions
static INT rpc_socket_check_allowed_host(int sock)
#define RPC_BM_CLOSE_ALL_BUFFERS
INT rpc_server_shutdown(void)
int rpc_flush_event_socket(int timeout_msec)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
static TR_FIFO _tr_fifo[10]
static std::string _mserver_path
INT rpc_get_timeout(HNDLE hConn)
INT rpc_server_disconnect()
INT rpc_set_debug(void(*func)(const char *), INT mode)
INT rpc_client_accept(int lsock)
void rpc_vax2ieee_float(float *var)
INT rpc_execute(INT sock, char *buffer, INT convert_flags)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_GET_BUFFER_LEVEL
int rpc_name_tid(const char *name)
INT rpc_register_listener(int port, RPC_HANDLER func, int *plsock, int *pport)
INT rpc_server_receive_event(int idx, RPC_SERVER_ACCEPTION *sa, int timeout_msec)
#define RPC_BM_OPEN_BUFFER
#define RPC_BM_EMPTY_BUFFERS
INT rpc_server_accept(int lsock)
static std::mutex _tr_fifo_mutex
bool rpc_is_mserver(void)
static RPC_SERVER_ACCEPTION * _mserver_acception
static int recv_net_command_realloc(INT idx, char **pbuf, int *pbufsize, INT *remaining)
void rpc_ieee2vax_float(float *var)
#define RPC_CM_SET_WATCHDOG_PARAMS
INT rpc_send_event1(INT buffer_handle, const EVENT_HEADER *pevent)
#define RPC_BM_RECEIVE_EVENT
static bool _rpc_is_remote
static int rpc_call_decode(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
INT rpc_set_name(const char *name)
INT rpc_register_function(INT id, INT(*func)(INT, void **))
#define RPC_CM_MSG_RETRIEVE
#define RPC_BM_SKIP_EVENT
INT rpc_client_dispatch(int sock)
#define RPC_BM_INIT_BUFFER_COUNTERS
static RPC_SERVER_CONNECTION _server_connection
INT rpc_check_channels(void)
static INT rpc_transition_dispatch(INT idx, void *prpc_param[])
static int handle_msg_odb(int n, const NET_COMMAND *nc)
#define RPC_BM_FLUSH_CACHE
void rpc_ieee2vax_double(double *var)
void rpc_vax2ieee_double(double *var)
#define RPC_CM_GET_WATCHDOG_INFO
INT rpc_get_convert_flags(void)
INT rpc_check_allowed_host(const char *hostname)
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
char exp_name[NAME_LENGTH]
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
char expt_name[NAME_LENGTH]
char buffer_name[NAME_LENGTH]
static const int tid_size[]
static std::string join(const char *sep, const std::vector< std::string > &v)
static std::vector< TRANS_TABLE > _trans_table
static std::atomic_int _message_mask_system
static int disable_bind_rpc_to_localhost
static std::mutex gBuffersMutex
int(* MessagePrintCallback)(const char *)
std::string cm_transition_name(int transition)
static DBG_MEM_LOC * _mem_loc
static std::vector< BUFFER * > gBuffers
static void(* _debug_print)(const char *)
static std::mutex _trans_table_mutex
static std::string _experiment_name
static std::string _path_name
static std::string _client_name
static std::vector< EventRequest > _request_list
static int _rpc_connect_timeout
static const char * tid_name[]
static const ERROR_TABLE _error_table[]
static INT _watchdog_timeout
INT bm_get_buffer_info(INT buffer_handle, BUFFER_HEADER *buffer_header)
static MUTEX_T * _mutex_rpc
static EVENT_HANDLER * _msg_dispatch
void * dbg_calloc(unsigned int size, unsigned int count, char *file, int line)
static BOOL _rpc_registered
static std::atomic< MessagePrintCallback > _message_print
bool ends_with_char(const std::string &s, char c)
void dbg_free(void *adr, char *file, int line)
static std::atomic_int _message_mask_user
static std::mutex _request_list_mutex
INT bm_get_buffer_level(INT buffer_handle, INT *n_bytes)
static TRANS_TABLE _deferred_trans_table[]
static int _rpc_listen_socket
std::string msprintf(const char *format,...)
void * dbg_malloc(unsigned int size, char *file, int line)
static const char * tid_name_old[]
static std::vector< std::string > split(const char *sep, const std::string &s)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
INT bm_init_buffer_counters(INT buffer_handle)
#define DIR_SEPARATOR_STR
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_RPC_TIMEOUT
#define PROGRAM_INFO_STR(_name)
#define MIN_WRITE_CACHE_SIZE
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
#define DEFAULT_MAX_EVENT_SIZE
#define WATCHDOG_INTERVAL
#define MAX_EVENT_REQUESTS
#define BANK_FORMAT_64BIT_ALIGNED
#define BANK_FORMAT_32BIT
std::vector< std::string > STRING_LIST
#define MAX_WRITE_CACHE_SIZE_DIV
#define TRANSITION_ERROR_STRING_LENGTH
#define BANK_FORMAT_VERSION
#define message(type, str)
#define write(n, a, f, d)
static std::string remove(const std::string s, char c)
int gettimeofday(struct timeval *tp, void *tzp)
struct callback_addr callback
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
BUFFER_CLIENT client[MAX_CLIENTS]
BUFFER_INFO(BUFFER *pbuf)
int client_count_write_wait[MAX_CLIENTS]
DWORD client_time_write_wait[MAX_CLIENTS]
int client_count_write_wait[MAX_CLIENTS]
char buffer_name[NAME_LENGTH]
EVENT_HANDLER * dispatcher
NET_COMMAND_HEADER header
unsigned int max_event_size
std::atomic< std::thread * > thread
std::atomic_bool finished
std::vector< int > wait_for_index
std::string waiting_for_client
std::vector< std::unique_ptr< TrClient > > clients
unsigned short host_port1
unsigned short host_port2
unsigned short host_port3
std::vector< exptab_entry > exptab
std::mutex event_sock_mutex
static double fac(double a)
static te_expr * list(state *s)