16#include "git-revision.h"
22#include <sys/resource.h>
62#ifndef DOXYGEN_SHOULD_SKIP_THIS
161extern unsigned _stklen = 60000U;
274 "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"},
331 f =
fopen(
"mem.txt",
"w");
364 f =
fopen(
"mem.txt",
"w");
372static std::vector<std::string>
split(
const char*
sep,
const std::string& s)
375 std::vector<std::string> v;
376 std::string::size_type pos = 0;
378 std::string::size_type next = s.find(
sep, pos);
379 if (next == std::string::npos) {
380 v.push_back(s.substr(pos));
383 v.push_back(s.substr(pos, next-pos));
389static std::string
join(
const char*
sep,
const std::vector<std::string>& v)
393 for (
unsigned i=0;
i<v.size();
i++) {
407 return s[s.length()-1] ==
c;
412 va_start(
ap, format);
415 char *buffer = (
char *)
malloc(size);
420 std::string s(buffer);
463 return msprintf(
"unlisted status code %d", code);
511 if (pos != std::string::npos) {
523 for (
size_t i = 0;
i <
flist.size();
i++) {
524 const char *p =
flist[
i].c_str();
525 if (
strchr(p,
'_') ==
NULL && !(p[0] >=
'0' && p[0] <=
'9')) {
526 size_t pos =
flist[
i].rfind(
'.');
527 if (pos != std::string::npos) {
549 *filename = std::string(
fac) +
".log";
671 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n",
message,
status);
675 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n",
message);
698 "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n",
707 "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n",
739 }
else if (
wr != len) {
740 fprintf(
stderr,
"cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n",
message, filename.c_str(), (
int)
wr, (
int)len);
754 const char*
pc = filename +
strlen(filename);
755 while (*
pc !=
'\\' && *
pc !=
'/' &&
pc != filename)
770 if (message_type &
MT_LOG)
782 if (
name.length() > 0)
791 }
else if (message_type ==
MT_USER) {
799 for (
int i=0;
i<10;
i++) {
828 if (message_type !=
MT_LOG) {
870 for (
i = 0;
i < 100;
i++) {
915INT cm_msg(
INT message_type,
const char *filename,
INT line,
const char *routine,
const char *format, ...)
931 if (message_type !=
MT_LOG) {
974 const char *
facility,
const char *routine,
const char *format, ...) {
1081 (*messages)[*
length] =
'\n';
1092 (*messages)[*
length] = 0;
1110 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot open log file \"%s\", errno %d (%s)", filename,
errno,
1127 char *buffer = (
char *)
malloc(size + 1);
1129 if (buffer ==
NULL) {
1130 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (
int) size,
1139 cm_msg(
MERROR,
"cm_msg_retrieve1",
"Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)",
1148 p = buffer + size - 1;
1152 while (*p ==
'\n' || *p ==
'\r')
1156 for (
n = 0; !
stop && p > buffer;) {
1160 for (
i = 0; p != buffer && (*p !=
'\n' && *p !=
'\r');
i++)
1164 if (
i >= (
int)
sizeof(
str))
1165 i =
sizeof(
str) - 1;
1186 if (
str[0] >=
'0' &&
str[0] <=
'9') {
1200 for (
i = 0;
i < 12;
i++)
1222 if (t == 0 ||
tstamp == -1 ||
1231 while (*p ==
'\n' || *p ==
'\r')
1499 assert(path[0] != 0);
1546 assert(path !=
NULL);
1592#ifdef LOCAL_ROUTINES
1619 if (
getenv(
"MIDAS_DIR")) {
1624 if (
getenv(
"MIDAS_EXPT_NAME")) {
1628 cm_msg(
MERROR,
"cm_read_exptab",
"Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"",
e.name.c_str());
1631 e.directory =
getenv(
"MIDAS_DIR");
1640#if defined (OS_WINNT)
1642 if (
getenv(
"SystemRoot"))
1644 else if (
getenv(
"windir"))
1650 str +=
"\\system32\\exptab";
1651 alt_str +=
"\\system\\exptab";
1652#elif defined (OS_UNIX)
1653 std::string
str =
"/etc/exptab";
1654 std::string
alt_str =
"/exptab";
1656 std::strint
str =
"exptab";
1657 std::string
alt_str =
"exptab";
1661 if (
getenv(
"MIDAS_EXPTAB")) {
1680 memset(buf, 0,
sizeof(buf));
1681 char*
str =
fgets(buf,
sizeof(buf)-1, f);
1684 if (
str[0] == 0)
continue;
1685 if (
str[0] ==
'#')
continue;
1709 e.
name = std::string(
p1, len);
1733 e.directory = std::string(
p1, len);
1754 e.user = std::string(
p1, len);
1768 for (
unsigned j=0;
j<exptab->
exptab.size();
j++) {
1769 cm_msg(
MINFO,
"cm_read_exptab",
"entry %d, experiment \"%s\", directory \"%s\", user \"%s\"",
j, exptab->
exptab[
j].name.c_str(), exptab->
exptab[
j].directory.c_str(), exptab->
exptab[
j].user.c_str());
1873#ifdef LOCAL_ROUTINES
1894 char *client_name,
INT hw_type,
const char *password,
DWORD watchdog_timeout) {
1899#ifdef LOCAL_ROUTINES
1949 strcpy(
name, client_name);
1980 sprintf(
str,
"System/Clients/%0d/Name", pid);
1984 cm_msg(
MERROR,
"cm_set_client_info",
"cannot set client name, db_set_value(%s) status %d",
str,
status);
1989 strcpy(client_name,
name);
2025 size =
sizeof(watchdog_timeout);
2164#ifdef LOCAL_ROUTINES
2188 cm_msg(
MERROR,
"cm_set_experiment_local",
"Experiment \"%s\" directory \"%s\" does not exist",
exp_name1.c_str(),
expdir.c_str());
2203 cm_msg(
MERROR,
"cm_check_connect",
"cm_disconnect_experiment not called at end of program");
2298 const char *client_name,
void (*func)(
char *),
INT odb_size,
DWORD watchdog_timeout) {
2356#ifdef LOCAL_ROUTINES
2370 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create alarm semaphore");
2375 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create elog semaphore");
2380 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create history semaphore");
2385 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot create message semaphore");
2404 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open database, db_open_database() status %d",
status);
2415 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/ODB timeout, status %d",
status);
2426 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Protect ODB, status %d",
status);
2437 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot get ODB /Experiment/Enable core dumps, status %d",
status);
2447 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)",
errno,
2451#warning setrlimit(RLIMIT_CORE) is not available
2460 "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d",
status);
2472 if (watchdog_timeout == 0)
2512 cm_msg(
MERROR,
"cm_connect_experiment1",
"cannot open message buffer, cm_msg_open_buffer() status %d",
status);
2536 cm_msg(
MERROR,
"cm_connect_experiment",
"Cannot register RPC server, cm_register_server() status %d",
status);
2544 size =
sizeof(watchdog_timeout);
2545 sprintf(
str,
"/Programs/%s/Watchdog Timeout", client_name);
2551 std::string path =
"/Programs/" + std::string(client_name);
2554 prog[
"Start command"] == std::string(
""))
2578#ifdef LOCAL_ROUTINES
2635 cm_msg(
MERROR,
"cm_list_experiments_remote",
"Cannot connect to \"%s\" port %d: %s",
hname, port,
errmsg.c_str());
2640 send(sock,
"I", 2, 0);
2661#ifdef LOCAL_ROUTINES
2681 if (
expts.size() == 1) {
2683 }
else if (
expts.size() > 1) {
2684 printf(
"Available experiments on local computer:\n");
2686 for (
unsigned i = 0;
i <
expts.size();
i++) {
2691 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2730 if (
expts.size() > 1) {
2733 for (
unsigned i = 0;
i <
expts.size();
i++) {
2738 printf(
"Select number from 0 to %d: ", ((
int)
expts.size())-1);
2854 printf(
"Waiting for transition to finish...\n");
2877 cm_msg(
MLOG,
"cm_disconnect_experiment",
"Program %s on host %s stopped", client_name.c_str(),
local_host_name.c_str());
2955#ifndef DOXYGEN_SHOULD_SKIP_THIS
3030#ifndef DOXYGEN_SHOULD_SKIP_THIS
3071#ifdef LOCAL_ROUTINES
3110 printf(
"lock_buffer_guard(invalid) dtor\n");
3246#ifdef LOCAL_ROUTINES
3269 pclient->watchdog_timeout = timeout;
3340#ifdef LOCAL_ROUTINES
3349#ifndef DOXYGEN_SHOULD_SKIP_THIS
3397 "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d",
new_size,
status);
3410 strcpy(buf,
"localhost");
3416 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create the RPC hosts access control list, db_get_value() status %d",
3426 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot create \"Disable RPC hosts check\", db_get_value() status %d",
status);
3436 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot find the RPC hosts access control list, db_find_key() status %d",
3446 cm_msg(
MERROR,
"init_rpc_hosts",
"Cannot watch the RPC hosts access control list, db_watch() status %d",
status);
3481 size =
sizeof(
name);
3485 cm_msg(
MERROR,
"cm_register_server",
"cannot get client name, db_get_value() status %d",
status);
3492 size =
sizeof(port);
3496 cm_msg(
MERROR,
"cm_register_server",
"cannot get RPC port number, db_get_value(%s) status %d",
str,
status);
3504 cm_msg(
MERROR,
"cm_register_server",
"error, rpc_register_server(port=%d) status %d", port,
status);
3517 cm_msg(
MERROR,
"cm_register_server",
"error, db_find_key(\"Server Port\") status %d",
status);
3527 cm_msg(
MERROR,
"cm_register_server",
"error, db_set_data(\"Server Port\"=%d) status %d", port,
status);
3635 tt.sequence_number = sequence_number;
3748 }
else if (
count > 1) {
3795 "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout",
3816#ifndef DOXYGEN_SHOULD_SKIP_THIS
3871 cm_msg(
MERROR,
"cm_register_deferred_transition",
"Cannot hotlink /Runinfo/Requested Transition");
3906 cm_msg(
MERROR,
"cm_check_deferred_transition",
"Cannot perform deferred transition: %s",
str);
3922#ifndef DOXYGEN_SHOULD_SKIP_THIS
3977 return arg1->sequence_number <
arg2->sequence_number;
4007 const char *buf =
"Success";
4017 tr->end_time = end_time;
4019 tr->errorstr = errorstr;
4021 tr->errorstr =
"(null)";
4085 const char *
args[100];
4103 path +=
"mtransition";
4153 if (errstr !=
NULL) {
4154 sprintf(errstr,
"Cannot execute mtransition, ss_spawnv() returned %d",
status);
4169 int connect_timeout = 10000;
4170 int timeout = 120000;
4195 for (
size_t i = 0;
i <
tr_client->wait_for_index.size();
i++) {
4198 assert(wait_for_index >= 0);
4199 assert(wait_for_index < (
int)s->
clients.size());
4228 printf(
"Client \"%s\" waits for client \"%s\"\n",
tr_client->client_name.c_str(),
wait_for->client_name.c_str());
4235 cm_msg(
MERROR,
"cm_transition_call",
"Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared",
tr_client->client_name.c_str(),
tr_client->transition,
wait_for->client_name.c_str());
4251 printf(
"Connecting to client \"%s\" on host %s...\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4253 cm_msg(
MINFO,
"cm_transition_call",
"cm_transition_call: Connecting to client \"%s\" on host %s...",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4256 size =
sizeof(timeout);
4259 if (connect_timeout < 1000)
4260 connect_timeout = 1000;
4263 size =
sizeof(timeout);
4273 tr_client->connect_timeout = connect_timeout;
4288 "cannot connect to client \"%s\" on host %s, port %d, status %d",
4312 printf(
"Connection established to client \"%s\" on host %s\n",
tr_client->client_name.c_str(),
tr_client->host_name.c_str());
4315 "cm_transition: Connection established to client \"%s\" on host %s",
4327 printf(
"Executing RPC transition client \"%s\" on host %s...\n",
4331 "cm_transition: Executing RPC transition client \"%s\" on host %s...",
4359 printf(
"RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n",
4363 "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d",
4383 printf(
"hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n",
4430 for (
size_t i = 0;
i <
n;
i++) {
4438 printf(
"Calling local transition callback\n");
4440 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Calling local transition callback");
4456 printf(
"Local transition callback finished, status %d\n",
int(
tr_client->status));
4458 cm_msg(
MINFO,
"cm_transition_call_direct",
"cm_transition: Local transition callback finished, status %d",
int(
tr_client->status));
4471 cm_msg(
MERROR,
"cm_transition_call_direct",
"no handler for transition %d with sequence number %d",
tr_client->transition,
tr_client->sequence_number);
4538 errstr_size =
sizeof(
xerrstr);
4554 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
4565 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
4566 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
4568 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
4627 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to alarms: %s",
alarms.c_str());
4628 mstrlcpy(errstr,
"Cannot start run due to alarms: ", errstr_size);
4660 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info required, status %d",
status);
4669 cm_msg(
MERROR,
"cm_transition",
"Run start abort due to program \"%s\" not running",
key.
name);
4685 mstrlcpy(errstr,
"Unknown error", errstr_size);
4687 if (debug_flag == 0) {
4714 if (debug_flag == 1)
4716 if (debug_flag == 2)
4721 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/Run number in database, status %d",
status);
4727 if (debug_flag == 1)
4728 printf(
"Clearing /Runinfo/Requested transition\n");
4729 if (debug_flag == 2)
4730 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Clearing /Runinfo/Requested transition");
4738 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4740 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4749 mstrlcpy(errstr,
"Deferred transition already in progress", errstr_size);
4750 mstrlcat(errstr,
", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size);
4766 size =
sizeof(sequence_number);
4775 if (debug_flag == 1)
4776 printf(
"---- Transition %s deferred by client \"%s\" ----\n",
trname.c_str(),
str);
4777 if (debug_flag == 2)
4778 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s deferred by client \"%s\" ----",
trname.c_str(),
str);
4780 if (debug_flag == 1)
4781 printf(
"Setting /Runinfo/Requested transition\n");
4782 if (debug_flag == 2)
4783 cm_msg(
MINFO,
"cm_transition",
"cm_transition: Setting /Runinfo/Requested transition");
4794 sprintf(errstr,
"Transition %s deferred by client \"%s\"",
trname.c_str(),
str);
4828 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto start, status %d",
status);
4834 start_command[0] = 0;
4836 size =
sizeof(start_command);
4839 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info start command, status %d",
status);
4843 if (start_command[0]) {
4844 cm_msg(
MINFO,
"cm_transition",
"Auto Starting program \"%s\", command \"%s\"",
key.
name,
4879 size =
sizeof(
state);
4885 cm_msg(
MERROR,
"cm_transition",
"cannot get Runinfo/State in database");
4892 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time binary\" in database");
4899 cm_msg(
MERROR,
"cm_transition",
"cannot set \"Runinfo/Stop Time\" in database");
4905 cm_msg(
MERROR,
"cm_transition",
"cannot find System/Clients entry in database");
4907 mstrlcpy(errstr,
"Cannot find /System/Clients in ODB", errstr_size);
4937 if (debug_flag == 1)
4938 printf(
"---- Transition %s started ----\n",
trname.c_str());
4939 if (debug_flag == 2)
4940 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s started ----",
trname.c_str());
4946 for (
int i = 0,
status = 0;;
i++) {
4963 size =
sizeof(sequence_number);
4972 c->async_flag = async_flag;
4973 c->debug_flag = debug_flag;
4974 c->sequence_number = sequence_number;
4980 size =
sizeof(client_name);
4982 c->client_name = client_name;
4995 size =
sizeof(port);
5005 if (
cc->client_name ==
c->client_name)
5006 if (
cc->host_name ==
c->host_name)
5007 if (
cc->port ==
c->port)
5008 if (
cc->sequence_number ==
c->sequence_number)
5013 s.
clients.push_back(std::unique_ptr<TrClient>(
c));
5016 cm_msg(
MERROR,
"cm_transition",
"transition %s: client \"%s\" is registered with sequence number %d more than once",
trname.c_str(),
c->client_name.c_str(),
c->sequence_number);
5034 for (
size_t i =
idx - 1; ;
i--) {
5036 if (s.
clients[
i]->sequence_number > 0) {
5062 if (debug_flag == 1)
5063 printf(
"\n==== Found client \"%s\" with sequence number %d\n",
5065 if (debug_flag == 2)
5067 "cm_transition: ==== Found client \"%s\" with sequence number %d",
5085 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: client \"%s\" returned status %d",
trname.c_str(),
5112 cm_msg(
MERROR,
"cm_transition",
"transition %s aborted: \"/Runinfo/Transition in progress\" was cleared",
trname.c_str());
5115 mstrlcpy(errstr,
"Canceled", errstr_size);
5140 if (debug_flag == 1)
5141 printf(
"\n---- Transition %s finished ----\n",
trname.c_str());
5142 if (debug_flag == 2)
5143 cm_msg(
MINFO,
"cm_transition",
"cm_transition: ---- Transition %s finished ----",
trname.c_str());
5158 size =
sizeof(
state);
5161 cm_msg(
MERROR,
"cm_transition",
"cannot set Runinfo/State in database, db_set_value() status %d",
status);
5212 cm_msg(
MERROR,
"cm_transition",
"Cannot get program info auto stop, status %d",
status);
5230 mstrlcpy(errstr,
"Success", errstr_size);
5244 cm_msg(
MERROR,
"cm_transition",
"Could not start a run: cm_transition() status %d, message \'%s\'",
status,
5293 cm_msg(
MERROR,
"cm_transition",
"previous transition did not finish yet");
5308 mstrlcpy(errstr,
"Invalid transition request", errstr_size);
5316 int size =
sizeof(
i);
5320 sprintf(errstr,
"Start/Stop transition %d already in progress, please try again later\n",
i);
5321 mstrlcat(errstr,
"or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size);
5323 cm_msg(
MERROR,
"cm_transition",
"another transition is already in progress");
5382#ifndef DOXYGEN_SHOULD_SKIP_THIS
5443 printf(
"Received 2nd Ctrl-C, hard abort\n");
5446 printf(
"Received Ctrl-C, aborting...\n");
5506 std::string command;
5511 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" of type TID_STRING, db_get_value_string() error %d",
5516 for (
int i = 0;;
i++) {
5532 int size =
subkey.item_size;
5533 char *buf = (
char *)
malloc(size);
5534 assert(buf !=
NULL);
5537 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s/%s\" of type %d, db_get_data() error %d",
5551 cm_msg(
MERROR,
"cm_exec_script",
"Script ODB \"%s\" has invalid type %d, should be TID_STRING or TID_KEY",
5558 if (command.length() > 0) {
5752 mstrlcpy(buf, command,
sizeof(buf));
5753 cm_msg(
MERROR,
"cm_execute",
"cm_execute(%s...) is disabled by ODB \"/Experiment/Enable cm_execute\"", buf);
5758 strcpy(
str, command);
5768 result[
MAX(0,
n)] = 0;
5787#ifndef DOXYGEN_SHOULD_SKIP_THIS
5853 p +=
"/Logger/History/";
5855 p +=
"/History dir";
5916#ifdef LOCAL_ROUTINES
5933 }
else if (
idx >
pbuf->buffer_header->max_client_index) {
5952 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5953 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5956 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5957 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5958 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5961 printf(
"bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5962 pbuf,
pbuf->buffer_header->name,
pbuf->client_index,
pbuf->buffer_header->max_client_index,
5963 pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
5975 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
5977 cm_msg(
MERROR,
"bm_validate_client_index",
"My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
5980 cm_msg(
MERROR,
"bm_validate_client_index",
"Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5984 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->max_client_index,
ss_getpid());
5986 fprintf(
stderr,
"bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n",
idx,
pbuf->buffer_header->name,
pbuf->buffer_header->client[
idx].name,
pbuf->buffer_header->client[
idx].pid,
ss_getpid());
5989 fprintf(
stderr,
"bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6029#ifdef LOCAL_ROUTINES
6071 pheader =
pbuf->buffer_header;
6079 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist",
pbclient->name,
6091 printf(
"buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6103 cm_msg(
MINFO,
"bm_cleanup",
"Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6106 pbclient->watchdog_timeout / 1000.0);
6129 if (
pbuf->attached) {
6154#ifdef LOCAL_ROUTINES
6168 if (
pbuf->attached) {
6187#ifdef LOCAL_ROUTINES
6192 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6205 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6229 if (
gRpLog && (total_size < 16)) {
6230 const char *
pdata = (
const char *) (pheader + 1);
6232 fprintf(
gRpLog,
"%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->
name, rp, total_size,
6233 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6239 assert(total_size > 0);
6243 if (rp >= pheader->
size) {
6244 rp -= pheader->
size;
6263 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6288 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6309 const char *
pdata = (
const char *) (pheader + 1);
6319 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->
name,
6326 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->
name,
6332 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->
name,
6341 cm_msg(
MERROR,
"bm_validate_buffer",
"buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6349 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->
name, rp,
rp0);
6372 int rp =
c->read_pointer;
6379 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6380 pheader->
name,
c->name,
c->read_pointer, rp,
rp0);
6413 sprintf(
str,
"/System/buffers/%s/Clients/%s/writes_blocked_by",
pbuf->buffer_name,
pbuf->client_name);
6488 double buf_size = pheader->
size;
6564 if (!
pbuf->client_count_write_wait[
i])
6596 if (
pbuf->count_lock ==
pbuf->last_count_lock) {
6602 std::string client_name =
pbuf->client_name;
6607 cm_msg(
MERROR,
"bm_write_buffer_statistics_to_odb",
"Invalid empty buffer name \"%s\" or client name \"%s\"",
buffer_name.c_str(), client_name.c_str());
6611 pbuf->last_count_lock =
pbuf->count_lock;
6615 int client_index =
pbuf->client_index;
6640 cm_msg(
MERROR,
who,
"invalid buffer handle %d: out of range [1..%d]", buffer_handle, (
int)
nbuf);
6648 cm_msg(
MERROR,
who,
"invalid buffer handle %d: empty slot", buffer_handle);
6654 if (!
pbuf->attached) {
6656 cm_msg(
MERROR,
who,
"invalid buffer handle %d: not attached", buffer_handle);
6732 int size =
sizeof(
INT);
6736 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6743#ifdef LOCAL_ROUTINES
6753 cm_msg(
MERROR,
"bm_open_buffer",
"cannot open buffer with zero name");
6770 std::string odb_path;
6771 odb_path +=
"/Experiment/Buffer sizes/";
6774 int size =
sizeof(
INT);
6779 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6790 cm_msg(
MERROR,
"bm_open_buffer",
"Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6800 *buffer_handle =
i + 1;
6820 *buffer_handle =
i + 1;
6835 pbuf->client_time_write_wait[
i] = 0;
6851 mstrlcpy(
pbuf->client_name, client_name.c_str(),
sizeof(
pbuf->client_name));
6858 pbuf->attached =
true;
6893 pheader->
size = buffer_size;
6903 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"",
buffer_name,
6914 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6925 cm_msg(
MERROR,
"bm_open_buffer",
"Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6933 if (pheader->
size != buffer_size) {
6934 cm_msg(
MINFO,
"bm_open_buffer",
"Buffer \"%s\" requested size %d differs from existing size %d",
6937 buffer_size = pheader->
size;
6952 pheader =
pbuf->buffer_header;
6958 pbuf->attached =
true;
6960 pbuf->shm_handle = shm_handle;
6961 pbuf->shm_size = shm_size;
6969 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...",
buffer_name,
7039 *buffer_handle =
i+1;
7044 *buffer_handle =
gBuffers.size() + 1;
7081 *buffer_handle =
i + 1;
7102#ifdef LOCAL_ROUTINES
7145 pbuf->read_cache_mutex.unlock();
7152 pbuf->write_cache_mutex.unlock();
7153 pbuf->read_cache_mutex.unlock();
7160 pbuf->attached =
false;
7187 if (
pbuf->read_cache_size > 0) {
7188 free(
pbuf->read_cache);
7190 pbuf->read_cache_size = 0;
7191 pbuf->read_cache_rp = 0;
7192 pbuf->read_cache_wp = 0;
7195 if (
pbuf->write_cache_size > 0) {
7196 free(
pbuf->write_cache);
7198 pbuf->write_cache_size = 0;
7199 pbuf->write_cache_rp = 0;
7200 pbuf->write_cache_wp = 0;
7220 pbuf->shm_handle = 0;
7226 pbuf->write_cache_mutex.unlock();
7227 pbuf->read_cache_mutex.unlock();
7247#ifdef LOCAL_ROUTINES
7255 for (
size_t i =
nbuf;
i > 0;
i--) {
7281#ifdef LOCAL_ROUTINES
7319#ifdef LOCAL_ROUTINES
7338 for (
i = 0;
i < 20;
i++) {
7360#ifdef LOCAL_ROUTINES
7375#ifdef LOCAL_ROUTINES
7432 size =
sizeof(client_name);
7444 size =
sizeof(port);
7459 cm_msg(
MERROR,
"cm_shutdown",
"Cannot connect to client \'%s\' on host \'%s\', port %d",
7462 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7468 cm_msg(
MERROR,
"cm_shutdown",
"Cannot delete client info for client \'%s\', pid %d, status %d",
7485 cm_msg(
MERROR,
"cm_shutdown",
"Client \'%s\' not responding to shutdown command", client_name);
7487 cm_msg(
MERROR,
"cm_shutdown",
"Killing and Deleting client \'%s\' pid %d", client_name,
7493 "Cannot delete client info for client \'%s\', pid %d, status %d",
name,
client_pid,
7547 size =
sizeof(client_name);
7614#ifdef LOCAL_ROUTINES
7629 if (
pbuf->attached) {
7646 (client_name ==
NULL || client_name[0] == 0
7661 "Client \'%s\' on \'%s\' removed by cm_cleanup (idle %1.1lfs, timeout %1.0lfs)",
7711 const char *s =
str;
7744 printf(
"test_expand_env: [%s] -> [%s] expected [%s]",
7758 printf(
"Test expand_end()\n");
7777 printf(
"test_expand_env: all tests passed!\n");
7779 printf(
"test_expand_env: test FAILED!\n");
7788#ifndef DOXYGEN_SHOULD_SKIP_THIS
7817#ifdef LOCAL_ROUTINES
7861#ifdef LOCAL_ROUTINES
7881 *n_bytes += pheader->
size;
7885 if (
pbuf->read_cache_size) {
7889 if (
pbuf->read_cache_wp >
pbuf->read_cache_rp)
7890 *n_bytes +=
pbuf->read_cache_wp -
pbuf->read_cache_rp;
7891 pbuf->read_cache_mutex.unlock();
7901#ifdef LOCAL_ROUTINES
7909 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7910 cm_msg(
MERROR,
"bm_lock_buffer_read_cache",
"Cannot lock read cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7915 if (!
pbuf->attached) {
7916 pbuf->read_cache_mutex.unlock();
7917 fprintf(
stderr,
"bm_lock_buffer_read_cache: Error: Cannot lock read cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7930 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7931 cm_msg(
MERROR,
"bm_lock_buffer_write_cache",
"Cannot lock write cache of buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7936 if (!
pbuf->attached) {
7937 pbuf->write_cache_mutex.unlock();
7938 fprintf(
stderr,
"bm_lock_buffer_write_cache: Error: Cannot lock write cache of buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7953 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...\n",
pbuf->buffer_name);
7954 cm_msg(
MERROR,
"bm_lock_buffer_mutex",
"Cannot lock buffer \"%s\", ss_timed_mutex_wait_for_sec() timeout, aborting...",
pbuf->buffer_name);
7959 if (!
pbuf->attached) {
7960 pbuf->buffer_mutex.unlock();
7961 fprintf(
stderr,
"bm_lock_buffer_mutex: Error: Cannot lock buffer \"%s\", buffer was closed while we waited for the buffer_mutex\n",
pbuf->buffer_name);
7994 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 1 second!\n",
pbuf->buffer_name);
7999 fprintf(
stderr,
"bm_lock_buffer: Lock buffer \"%s\" is taking longer than 10 seconds, buffer semaphore is probably stuck, delete %s.SHM and try again!\n",
pbuf->buffer_name,
pbuf->buffer_name);
8001 if (
pbuf->buffer_header) {
8003 fprintf(
stderr,
"bm_lock_buffer: Buffer \"%s\" client %d \"%s\" pid %d\n",
pbuf->buffer_name,
i,
pbuf->buffer_header->client[
i].name,
pbuf->buffer_header->client[
i].pid);
8010 fprintf(
stderr,
"bm_lock_buffer: Error: Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...\n",
pbuf->buffer_name,
status);
8011 cm_msg(
MERROR,
"bm_lock_buffer",
"Cannot lock buffer \"%s\", ss_semaphore_wait_for() status %d, aborting...",
pbuf->buffer_name,
status);
8019 assert(!
pbuf->locked);
8024 if (
pbuf->buffer_header->client[x].unused1 != 0) {
8025 printf(
"lllock [%s] unused1 %d pid %d\n",
pbuf->buffer_name,
pbuf->buffer_header->client[x].unused1,
getpid());
8028 pbuf->buffer_header->client[x].unused1 =
getpid();
8042 if (
pbuf->attached) {
8043 if (
pbuf->buffer_header->client[x].unused1 !=
getpid()) {
8044 printf(
"unlock [%s] unused1 %d pid %d\n",
pbuf->buffer_header->name,
pbuf->buffer_header->client[x].unused1,
getpid());
8046 pbuf->buffer_header->client[x].unused1 = 0;
8048 printf(
"unlock [??????] unused1 ????? pid %d\n",
getpid());
8053 assert(
pbuf->locked);
8057 pbuf->buffer_mutex.unlock();
8086#ifdef LOCAL_ROUTINES
8100 pbuf->buffer_header->num_in_events = 0;
8101 pbuf->buffer_header->num_out_events = 0;
8146#ifdef LOCAL_ROUTINES
8176 cm_msg(
MERROR,
"bm_set_cache_size",
"requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes",
write_size,
pbuf->buffer_name,
pbuf->buffer_header->size,
new_write_size);
8180 pbuf->buffer_mutex.unlock();
8190 if (
pbuf->read_cache_size > 0) {
8191 free(
pbuf->read_cache);
8198 pbuf->read_cache_size = 0;
8199 pbuf->read_cache_rp = 0;
8200 pbuf->read_cache_wp = 0;
8201 pbuf->read_cache_mutex.unlock();
8202 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
read_size);
8208 pbuf->read_cache_rp = 0;
8209 pbuf->read_cache_wp = 0;
8211 pbuf->read_cache_mutex.unlock();
8221 if (
pbuf->write_cache_size &&
pbuf->write_cache_wp > 0) {
8222 cm_msg(
MERROR,
"bm_set_cache_size",
"buffer \"%s\" lost %zu bytes from the write cache",
pbuf->buffer_name,
pbuf->write_cache_wp);
8226 if (
pbuf->write_cache_size > 0) {
8227 free(
pbuf->write_cache);
8234 pbuf->write_cache_size = 0;
8235 pbuf->write_cache_rp = 0;
8236 pbuf->write_cache_wp = 0;
8237 pbuf->write_cache_mutex.unlock();
8238 cm_msg(
MERROR,
"bm_set_cache_size",
"not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed",
pbuf->buffer_name,
write_size);
8244 pbuf->write_cache_rp = 0;
8245 pbuf->write_cache_wp = 0;
8247 pbuf->write_cache_mutex.unlock();
8294 static std::mutex mutex;
8301 std::lock_guard<std::mutex>
lock(mutex);
8311#ifndef DOXYGEN_SHOULD_SKIP_THIS
8363#ifdef LOCAL_ROUTINES
8379 if (func ==
NULL &&
pbuf->callback) {
8381 cm_msg(
MERROR,
"bm_add_event_request",
"mixing callback/non callback requests not possible");
8388 cm_msg(
MERROR,
"bm_add_event_request",
"GET_RECENT request not possible if read cache is enabled");
8398 if (!
pclient->event_request[
i].valid)
8407 pclient->event_request[
i].id = request_id;
8411 pclient->event_request[
i].sampling_type = sampling_type;
8426 if (
i + 1 >
pclient->max_request_index)
8427 pclient->max_request_index =
i + 1;
8467 INT sampling_type,
HNDLE *request_id,
8470 assert(request_id !=
NULL);
8522#ifdef LOCAL_ROUTINES
8544 if (
pclient->event_request[
i].valid &&
pclient->event_request[
i].id == request_id) {
8551 if (
pclient->event_request[
i].valid)
8554 pclient->max_request_index =
i + 1;
8559 for (
i = 0;
i <
pclient->max_request_index;
i++)
8588 if (request_id < 0 ||
size_t(request_id) >=
_request_list.size()) {
8593 int buffer_handle =
_request_list[request_id].buffer_handle;
8630 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8639 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8648 if (
pclient->read_pointer < 0) {
8650 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8659 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8668 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8733 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8774 printf(
"bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8791 for (
i = 0;
i <
pc->max_request_index;
i++)
8792 if (
pc->event_request[
i].valid)
8812 if (
pc->pid &&
pc->write_wait) {
8829 for (
size_t i = 0;
i <
n;
i++) {
8846 r.
dispatcher(buffer_handle,
i, pevent, (
void *) (pevent + 1));
8853#ifdef LOCAL_ROUTINES
8857 pbuf->read_cache_rp += total_size;
8859 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
8860 pbuf->read_cache_rp = 0;
8861 pbuf->read_cache_wp = 0;
8867 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp)
8895 if (!
pc->read_wait) {
8902 if (
pc->read_wait) {
8907 if ((
pc->read_pointer < 0) || (
pc->read_pointer >= pheader->
size)) {
8908 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->
name,
pc->name,
pc->read_pointer, pheader->
read_pointer, pheader->
write_pointer, pheader->
size);
8912 char *
pdata = (
char *) (pheader + 1);
8918 if ((total_size <= 0) || (total_size > pheader->
size)) {
8919 cm_msg(
MERROR,
"bm_peek_buffer_locked",
"event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->
name,
pc->name,
pc->read_pointer, pevent->
data_size,
event_size, total_size, pheader->
size, pheader->
read_pointer, pheader->
write_pointer);
8923 assert(total_size > 0);
8938 const char *
pdata = (
const char *) (pheader + 1);
8945 int size = pheader->
size - rp;
8953 const char *
pdata = (
const char *) (pheader + 1);
8960 int size = pheader->
size - rp;
8970 for (
i = 0;
i <
pc->max_request_index;
i++) {
9016 if (
pbuf->read_cache_rp ==
pbuf->read_cache_wp) {
9042 if (
pbuf->read_cache_wp + total_size >
pbuf->read_cache_size) {
9051 pbuf->read_cache_wp += total_size;
9071 if (convert_flags) {
9093 char *
pdata = (
char *) (pheader + 1);
9120 free += pheader->
size;
9130 if (
pbuf->wait_start_time != 0) {
9134 pbuf->wait_start_time = 0;
9152 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9167 printf(
"bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->
read_pointer, pheader->
write_pointer, free, pheader->
size,
requested_space,
event_size, total_size);
9172 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9200 for (
j = 0;
j <
pc->max_request_index;
j++) {
9243 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9263 if (
pbuf->wait_start_time == 0) {
9265 pbuf->count_write_wait++;
9305 pbuf->write_cache_mutex.unlock();
9364 pbuf->write_cache_mutex.unlock();
9406 if (!
pc->read_wait) {
9430 if (!
pc->read_wait) {
9444 pbuf->read_cache_mutex.unlock();
9477 pbuf->read_cache_mutex.unlock();
9496 if (
pc->read_wait) {
9506 char *
pdata = (
char *) (pheader + 1);
9592 for (
j = 0;
j <
pc->max_request_index;
j++) {
9604 if (request_id >= 0) {
9606 if (
pc->read_wait) {
9683 if (data_size == 0) {
9684 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9707 const char*
cptr =
event.data();
9708 size_t clen =
event.size();
9714 int sg_n =
event.size();
9724#ifdef LOCAL_ROUTINES
9789 cm_msg(
MERROR,
"bm_send_event",
"invalid sg_ptr[0] is NULL");
9803 if (data_size == 0) {
9804 cm_msg(
MERROR,
"bm_send_event",
"invalid event data size zero");
9821 cm_msg(
MERROR,
"bm_send_event",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
9827#ifdef LOCAL_ROUTINES
9841 if (
pbuf->write_cache_size) {
9848 if (
pbuf->write_cache_size) {
9855 if (
pbuf->write_cache_wp > 0 && (
pbuf->write_cache_wp + total_size >
pbuf->write_cache_size ||
too_big)) {
9861 pbuf->write_cache_mutex.unlock();
9873 pbuf->write_cache_mutex.unlock();
9876 cm_msg(
MERROR,
"bm_send_event",
"write cache size is bigger than buffer size");
9881 assert(
pbuf->write_cache_wp == 0);
9895 pbuf->write_cache_wp += total_size;
9897 pbuf->write_cache_mutex.unlock();
9903 pbuf->write_cache_mutex.unlock();
9921 printf(
"bm_send_event: corrupted 111!\n");
9927 if (total_size >= (
size_t)pheader->
size) {
9929 cm_msg(
MERROR,
"bm_send_event",
"total event size (%d) larger than size (%d) of buffer \'%s\'", (
int)total_size, pheader->
size, pheader->
name);
9943 printf(
"bm_send_event: corrupted 222!\n");
9972 printf(
"bm_send_event: corrupted 333!\n");
9979 pbuf->count_sent += 1;
9980 pbuf->bytes_sent += total_size;
10062#ifdef LOCAL_ROUTINES
10081 request_id[
i] = -1;
10106 printf(
"bm_flush_cache: corrupted 111!\n");
10129 if (
pbuf->write_cache_wp == 0) {
10135 while (
pbuf->write_cache_rp <
pbuf->write_cache_wp) {
10143 printf(
"bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10144 int(
pbuf->write_cache_size),
10145 int(
pbuf->write_cache_wp),
10146 int(
pbuf->write_cache_rp),
10156 assert(total_size <= (
size_t)pheader->
size);
10162 pbuf->count_sent += 1;
10163 pbuf->bytes_sent += total_size;
10182 pbuf->write_cache_rp += total_size;
10185 assert(
pbuf->write_cache_rp > 0);
10186 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_size);
10187 assert(
pbuf->write_cache_rp <=
pbuf->write_cache_wp);
10191 assert(
pbuf->write_cache_wp ==
pbuf->write_cache_rp);
10192 pbuf->write_cache_wp = 0;
10193 pbuf->write_cache_rp = 0;
10213#ifdef LOCAL_ROUTINES
10224 if (
pbuf->write_cache_size == 0)
10233 if (
pbuf->write_cache_wp == 0) {
10234 pbuf->write_cache_mutex.unlock();
10253 pbuf->write_cache_mutex.unlock();
10262#ifdef LOCAL_ROUTINES
10280 if (
pbuf->read_cache_size > 0) {
10287 if (
pbuf->read_cache_wp == 0) {
10292 pbuf->read_cache_mutex.unlock();
10303 pbuf->read_cache_mutex.unlock();
10332 if (convert_flags) {
10341 char*
cptr = (
char*)pevent;
10345 pbuf->read_cache_mutex.unlock();
10355 pbuf->read_cache_mutex.unlock();
10407 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"",
max_size,
10419 if (convert_flags) {
10423 pbuf->count_read++;
10425 }
else if (dispatch ||
bufptr) {
10429 pbuf->count_read++;
10433 pbuf->count_read++;
10503 assert(!
"incorrect call to bm_receivent_event_rpc()");
10569 assert(!
"incorrect call to bm_receivent_event_rpc()");
10585 assert(!
"incorrect call to bm_receivent_event_rpc()");
10655#ifdef LOCAL_ROUTINES
10735#ifdef LOCAL_ROUTINES
10813#ifdef LOCAL_ROUTINES
10831#ifdef LOCAL_ROUTINES
10836 if (
pbuf->read_cache_size > 0) {
10843 pbuf->read_cache_rp = 0;
10844 pbuf->read_cache_wp = 0;
10846 pbuf->read_cache_mutex.unlock();
10878#ifdef LOCAL_ROUTINES
10894#ifdef LOCAL_ROUTINES
10906 if (!
pbuf->callback)
10955#ifdef LOCAL_ROUTINES
10986 if (
pbuf->attached) {
11095 if (!
fbuf->callback)
11112 if (convert_flags) {
11147 std::vector<char>
vec;
11151 bool locked =
true;
11153 for (
size_t i = 0;
i <
n;
i++) {
11186 cm_msg(
MERROR,
"bm_poll_event",
"received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (
int)
vec.size());
11244#ifdef LOCAL_ROUTINES
11256 if (!
pbuf->attached)
11270#ifndef DOXYGEN_SHOULD_SKIP_THIS
11272#define MAX_DEFRAG_EVENTS 10
11328 "Received new event with ID %d while old fragments were not completed",
11329 (pevent->event_id & 0x0FFF));
11339 "Not enough defragment buffers, please increase MAX_DEFRAG_EVENTS and recompile");
11346 "Received first event fragment with %d bytes instead of %d bytes, event ignored",
11359 cm_msg(
MERROR,
"bm_defragement_event",
"Not enough memory to allocate event defragment buffer");
11381 "Received fragment without first fragment (ID %d) Ser#:%d",
11392 "Received fragments with more data (%d) than event size (%d)",
11445 printf(
"index %d, client \"%s\", host \"%s\", port %d, socket %d, connected %d, timeout %d",
11584 *convert_flags = 0;
11612 unsigned short int lo,
hi;
11615 lo = *((
short int *) (
var) + 1);
11616 hi = *((
short int *) (
var));
11622 *((
short int *) (
var) + 1) =
hi;
11623 *((
short int *) (
var)) =
lo;
11627 unsigned short int lo,
hi;
11630 lo = *((
short int *) (
var) + 1);
11631 hi = *((
short int *) (
var));
11637 *((
short int *) (
var) + 1) =
hi;
11638 *((
short int *) (
var)) =
lo;
11643 unsigned short int i1,
i2,
i3,
i4;
11646 i1 = *((
short int *) (
var) + 3);
11647 i2 = *((
short int *) (
var) + 2);
11648 i3 = *((
short int *) (
var) + 1);
11649 i4 = *((
short int *) (
var));
11655 *((
short int *) (
var) + 3) =
i4;
11656 *((
short int *) (
var) + 2) =
i3;
11657 *((
short int *) (
var) + 1) =
i2;
11658 *((
short int *) (
var)) =
i1;
11662 unsigned short int i1,
i2,
i3,
i4;
11665 i1 = *((
short int *) (
var) + 3);
11666 i2 = *((
short int *) (
var) + 2);
11667 i3 = *((
short int *) (
var) + 1);
11668 i4 = *((
short int *) (
var));
11674 *((
short int *) (
var) + 3) =
i4;
11675 *((
short int *) (
var) + 2) =
i3;
11676 *((
short int *) (
var) + 1) =
i2;
11677 *((
short int *) (
var)) =
i1;
11744 for (
int i = 0;
i <
n;
i++) {
11768 return "<unknown>";
11775 return "<unknown>";
11834 cm_msg(
MERROR,
"rpc_register_functions",
"registered RPC function with invalid ID %d",
new_list[
i].
id);
11854 if (
e.dispatch ==
NULL) {
11867#ifndef DOXYGEN_SHOULD_SKIP_THIS
11955 char net_buffer[256];
11957 int n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
11981 timeout.tv_sec = 0;
11982 timeout.tv_usec = 0;
11987 n =
recv_tcp(sock, net_buffer,
sizeof(net_buffer), 0);
12037 bool debug =
false;
12041 cm_msg(
MERROR,
"rpc_client_connect",
"cm_connect_experiment/rpc_set_name not called");
12047 cm_msg(
MERROR,
"rpc_client_connect",
"invalid port %d", port);
12059 printf(
"rpc_client_connect: host \"%s\", port %d, client \"%s\"\n",
host_name, port, client_name);
12062 printf(
"client connection %d: ", (
int)
i);
12080 if (
c &&
c->connected) {
12087 if ((
c->host_name ==
host_name) && (
c->port == port)) {
12093 std::lock_guard<std::mutex>
cguard(
c->mutex);
12095 if (
c->connected) {
12102 printf(
"already connected: ");
12129 for (
int j = 1;
j < size;
j++) {
12153 printf(
"new connection appended to array: ");
12160 c->connected =
true;
12186 c->client_name = client_name;
12201 int size =
cstr.length() + 1;
12202 i =
send(
c->send_sock,
cstr.c_str(), size, 0);
12203 if (
i < 0 ||
i != size) {
12211 DWORD watchdog_timeout;
12231 cm_msg(
MERROR,
"rpc_client_connect",
"timeout waiting for server reply");
12237 int remote_hw_type = 0;
12242 c->remote_hw_type = remote_hw_type;
12260 c->connected =
true;
12290 if (
c &&
c->connected) {
12291 std::lock_guard<std::mutex>
cguard(
c->mutex);
12293 if (!
c->connected) {
12309 timeout.tv_sec = 0;
12310 timeout.tv_usec = 0;
12341 "RPC client connection to \"%s\" on host \"%s\" is broken, recv() errno %d (%s)",
12342 c->client_name.c_str(),
12343 c->host_name.c_str(),
12347 }
else if (
status == 0) {
12352 cm_msg(
MINFO,
"rpc_client_check",
"RPC client connection to \"%s\" on host \"%s\" unexpectedly closed",
c->client_name.c_str(),
c->host_name.c_str());
12412 char str[200], version[32],
v1[32];
12437 cm_msg(
MERROR,
"rpc_server_connect",
"cm_connect_experiment/rpc_set_name not called");
12463 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12470 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12477 cm_msg(
MERROR,
"rpc_server_connect",
"cannot create listener socket: %s",
errmsg.c_str());
12494 cm_msg(
MERROR,
"rpc_server_connect",
"cannot connect to mserver on host \"%s\" port %d: %s",
str, port,
errmsg.c_str());
12508 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive status from server");
12512 status = version[0] = 0;
12521 strcpy(
v1, version);
12532 cm_msg(
MERROR,
"rpc_server_connect",
"remote MIDAS version \'%s\' differs from local version \'%s\'", version,
12543 timeout.tv_usec = 0;
12555 cm_msg(
MERROR,
"rpc_server_connect",
"mserver subprocess could not be started (check path)");
12567 cm_msg(
MERROR,
"rpc_server_connect",
"accept() failed");
12581 flag = 2 * 1024 * 1024;
12596 cm_msg(
MERROR,
"rpc_server_connect",
"timeout on receive remote computer info");
12617 if (
c &&
c->connected) {
12620 if (!
c->connected) {
12640 if (
c &&
c->connected) {
12657 if (!
c->connected) {
12866 dummy = 0x12345678;
12867 p = (
unsigned char *) &
dummy;
12870 else if (*p == 0x12)
12873 cm_msg(
MERROR,
"rpc_get_option",
"unknown byte order format");
12876 f = (
float) 1.2345;
12879 if ((
dummy & 0xFF) == 0x19 &&
12880 ((
dummy >> 8) & 0xFF) == 0x04 && ((
dummy >> 16) & 0xFF) == 0x9E
12881 && ((
dummy >> 24) & 0xFF) == 0x3F)
12883 else if ((
dummy & 0xFF) == 0x9E &&
12884 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x19
12885 && ((
dummy >> 24) & 0xFF) == 0x04)
12888 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12893 if ((
dummy & 0xFF) == 0x8D &&
12894 ((
dummy >> 8) & 0xFF) == 0x97 && ((
dummy >> 16) & 0xFF) == 0x6E
12895 && ((
dummy >> 24) & 0xFF) == 0x12)
12897 else if ((
dummy & 0xFF) == 0x83 &&
12898 ((
dummy >> 8) & 0xFF) == 0xC0 && ((
dummy >> 16) & 0xFF) == 0xF3
12899 && ((
dummy >> 24) & 0xFF) == 0x3F)
12901 else if ((
dummy & 0xFF) == 0x13 &&
12902 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x83
12903 && ((
dummy >> 24) & 0xFF) == 0xC0)
12905 else if ((
dummy & 0xFF) == 0x9E &&
12906 ((
dummy >> 8) & 0xFF) == 0x40 && ((
dummy >> 16) & 0xFF) == 0x18
12907 && ((
dummy >> 24) & 0xFF) == 0x04)
12909 "MIDAS cannot handle VAX D FLOAT format. Please compile with the /g_float flag");
12911 cm_msg(
MERROR,
"rpc_get_option",
"unknown floating point format");
12935 else if (
hConn == -2)
12982 int timeout =
c->rpc_timeout;
13027#ifndef DOXYGEN_SHOULD_SKIP_THIS
13180 va_start(
argptr, format);
13181 vsprintf(
str, (
char *) format,
argptr);
13233 bool debug =
false;
13236 printf(
"encode rpc_id %d \"%s\"\n",
rl.id,
rl.name);
13237 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13238 int tid =
rl.param[
i].tid;
13239 int flags =
rl.param[
i].flags;
13240 int n =
rl.param[
i].n;
13241 printf(
"i=%d, tid %d, flags 0x%x, n %d\n",
i, tid, flags,
n);
13247 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13248 int tid =
rl.param[
i].tid;
13249 int flags =
rl.param[
i].flags;
13270 size_t buf_size =
sizeof(
NET_COMMAND) + 4 * 1024;
13271 char* buf = (
char *)
malloc(buf_size);
13281 for (
int i=0;
rl.param[
i].tid != 0;
i++) {
13282 int tid =
rl.param[
i].tid;
13283 int flags =
rl.param[
i].flags;
13303 char* arg =
args[
i];
13375 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy pointer %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13380 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, double->float\n",
i, flags, tid,
arg_type,
arg_size, param_size);
13386 printf(
"encode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13398 printf(
"encode rpc_id %d \"%s\" buf_size %d, param_size %d\n",
rl.id,
rl.name, (
int)buf_size, (*nc)->header.param_size);
13404 bool debug =
false;
13407 printf(
"decode reply to rpc_id %d \"%s\" has %d bytes\n",
rl.id,
rl.name, (
int)buf_size);
13413 for (
int i = 0;
rl.param[
i].tid != 0;
i++) {
13414 int tid =
rl.param[
i].tid;
13415 int flags =
rl.param[
i].flags;
13436 cm_msg(
MERROR,
"rpc_call_decode",
"routine \"%s\": no data in RPC reply, needed to decode an RPC_OUT parameter. param_ptr is NULL",
rl.name);
13440 tid =
rl.param[
i].tid;
13458 if (*((
char **) arg)) {
13460 printf(
"decode param %d, flags 0x%x, tid %d, arg_type %d, arg_size %d, param_size %d, memcpy %d\n",
i, flags, tid,
arg_type,
arg_size, param_size,
arg_size);
13527 if (
rpc_list[
i].
id == (
int) routine_id) {
13537 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" with invalid RPC ID %d",
c->client_name.c_str(),
c->host_name.c_str(), routine_id);
13546 va_start(
ap, routine_id);
13564 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13586 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": send_tcp() failed",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13596 DWORD watchdog_timeout;
13601 if (
c->rpc_timeout >= (
int) watchdog_timeout) {
13607 DWORD buf_size = 0;
13618 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": timeout waiting for reply",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name);
13626 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, ss_recv_net_command() status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
status);
13636 cm_msg(
MERROR,
"rpc_client_call",
"call to \"%s\" on \"%s\" RPC \"%s\": error, unknown RPC, status %d",
c->client_name.c_str(),
c->host_name.c_str(),
rpc_name,
rpc_status);
13644 va_start(
ap, routine_id);
13701 fprintf(
stderr,
"rpc_call(routine_id=%d) failed, no connection to mserver.\n", routine_id);
13725 if (
rpc_list[
i].
id == (
int) routine_id) {
13737 cm_msg(
MERROR,
"rpc_call",
"invalid rpc ID (%d)", routine_id);
13746 va_start(
ap, routine_id);
13789 DWORD watchdog_timeout;
13799 if (rpc_timeout >= (
int) watchdog_timeout) {
13806 DWORD buf_size = 0;
13822 cm_msg(
MERROR,
"rpc_call",
"routine \"%s\": timeout waiting for reply, program abort",
rpc_name);
13846 va_start(
ap, routine_id);
13906 return bm_send_event(buffer_handle, pevent, unused, async_flag);
13933 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_ptr[0] is NULL");
13938 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid sg_len[0] value %d is smaller than event header size %d", (
int)
sg_len[0], (
int)
sizeof(
EVENT_HEADER));
13947 if (data_size == 0) {
13948 cm_msg(
MERROR,
"rpc_send_event_sg",
"invalid event data size zero");
13966 cm_msg(
MERROR,
"rpc_send_event_sg",
"data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (
int)data_size, (
int)
event_size, (
int)
count);
13992 assert(
sizeof(
DWORD) == 4);
13998 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(buffer handle) failed, event socket is now closed");
14008 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(event data) failed, event socket is now closed");
14015 if (
count < total_size) {
14016 char padding[8] = { 0,0,0,0,0,0,0,0 };
14022 cm_msg(
MERROR,
"rpc_send_event_sg",
"ss_write_tcp(padding) failed, event socket is now closed");
14088 for (
size_t i = 0;
i <
n;
i++) {
14093 if (
tt.transition ==
CINT(0) &&
tt.sequence_number ==
CINT(4)) {
14111 cm_msg(
MERROR,
"rpc_transition_dispatch",
"no handler for transition %d with sequence number %d",
CINT(0),
CINT(4));
14114 cm_msg(
MERROR,
"rpc_transition_dispatch",
"received unrecognized command %d",
idx);
14175 for (
i = 0;
i < (size - 1) / 16 + 1;
i++) {
14177 for (
j = 0;
j < 16;
j++)
14178 if (
i * 16 +
j < size)
14184 for (
j = 0;
j < 16;
j++) {
14186 if (
i * 16 +
j < size)
14187 printf(
"%c", (
c >= 32 &&
c < 128) ? p[
i * 16 +
j] :
'.');
14226 char *buffer =
NULL;
14250 int param_size = -1;
14259 if (param_size == -1) {
14299 int size = write_ptr - read_ptr;
14304 read_ptr = write_ptr;
14311 }
while (write_ptr == -1 &&
errno ==
EINTR);
14317 if (write_ptr <= 0) {
14318 if (write_ptr == 0)
14329 read_ptr = misalign;
14330 write_ptr += misalign;
14332 misalign = write_ptr % 8;
14342 if (write_ptr - read_ptr < param_size)
14410 int sock =
psa->event_sock;
14436 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(header) returned %d",
hrd);
14445 cm_msg(
MERROR,
"recv_event_server",
"recv_tcp2(more header) returned %d",
hrd1);
14462 if (
psa->convert_flags) {
14477 for (
int i=0;
i<5;
i++) {
14478 printf(
"recv_event_server: header[%d]: 0x%08x\n",
i,
pbh[
i]);
14486 "received event header with invalid data_size %d: event_size %d, total_size %d", pevent->
data_size,
14614 cm_msg(
MERROR,
"rpc_register_server",
"cannot listen to tcp port %d: %s", port,
errmsg.c_str());
14619#if defined(F_SETFD) && defined(FD_CLOEXEC)
14716 assert(return_buffer);
14726 if (convert_flags) {
14758 cm_msg(
MERROR,
"rpc_execute",
"Invalid rpc ID (%d)", routine_id);
14771 for (
i = 0;
rl.param[
i].tid != 0;
i++) {
14772 tid =
rl.param[
i].tid;
14773 flags =
rl.param[
i].flags;
14786 param_size =
ALIGN8(param_size);
14797 if (convert_flags) {
14850 "return parameters (%d) too large for network buffer (%d)",
14860 "rpc_execute: return parameters (%d) too large for network buffer (%d), new buffer size (%d)",
14876 assert(return_buffer);
14893 if (
rl.param[
i + 1].tid)
14932 for (
i = 0;
rl.param[
i].tid != 0;
i++)
14934 tid =
rl.param[
i].tid;
14935 flags =
rl.param[
i].flags;
14947 param_size =
ALIGN8(param_size);
14975 param_size =
ALIGN8(param_size);
14987 if (convert_flags) {
15003 nc_out->header.param_size = param_size;
15008 if (convert_flags) {
15065 printf(
"rpc_test_rpc!\n");
15094 for (
int i=0;
i<10;
i++) {
15126 printf(
"int_out mismatch!\n");
15131 printf(
"int_inout mismatch!\n");
15141 printf(
"string2_out mismatch [%s] vs [%s]\n",
string2_out,
"second string_out");
15146 printf(
"string_inout mismatch [%s] vs [%s]\n",
string_inout,
"return string_inout");
15158 if (
pkey->type != 444 ||
pkey->num_values != 555 ||
strcmp(
pkey->name,
"out_name") ||
pkey->last_written != 666) {
15159 printf(
"struct_out mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15165 if (
pkey->type != 444444 ||
pkey->num_values != 555555 ||
strcmp(
pkey->name,
"inout_name") ||
pkey->last_written != 666666) {
15166 printf(
"struct_inout mismatch: type %d, num_values %d, name [%s], last_written %d\n",
pkey->type,
pkey->num_values,
pkey->name,
pkey->last_written);
15281 if (
strcmp(hostname,
"localhost") == 0)
15284 if (
strcmp(hostname,
"localhost.localdomain") == 0)
15287 if (
strcmp(hostname,
"localhost6") == 0)
15290 if (
strcmp(hostname,
"ip6-localhost") == 0)
15298 if (h == hostname) {
15315 std::string hostname;
15331 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\', this message will no longer be reported", hostname.c_str());
15333 cm_msg(
MERROR,
"rpc_socket_check_allowed_host",
"rejecting connection from unallowed host \'%s\'. Add this host to \"/Experiment/Security/RPC hosts/Allowed hosts\"", hostname.c_str());
15369 char net_buffer[256];
15414#ifdef LOCAL_ROUTINES
15417 for (
unsigned i=0;
i<exptab.
exptab.size();
i++) {
15419 const char*
str = exptab.
exptab[
i].name.c_str();
15422 send(sock,
"", 1, 0);
15444 while (*ptr ==
' ')
15448 for (; *ptr != 0 && *ptr !=
' ' &&
i < (
int)
sizeof(version) - 1;)
15449 version[
i++] = *ptr++;
15452 assert(
i < (
int)
sizeof(version));
15456 for (; *ptr != 0 && *ptr !=
' ';)
15459 while (*ptr ==
' ')
15463 for (; *ptr != 0 && *ptr !=
' ' && *ptr !=
'\n' && *ptr !=
'\r' &&
i < (
int)
sizeof(
experiment) - 1;)
15486 cm_msg(
MERROR,
"rpc_server_accept",
"received string: %s", net_buffer + 2);
15501#ifdef LOCAL_ROUTINES
15507 bool found =
false;
15526 send(sock,
"2", 2, 0);
15545 const char *
argv[10];
15580 cm_msg(
MERROR,
"rpc_server_accept",
"received unknown command '%c' code %d", command, command);
15625 char net_buffer[256], *p;
15646 i =
recv_string(sock, net_buffer,
sizeof(net_buffer), 10000);
15653 p =
strtok(net_buffer,
" ");
15723 int recv_sock, send_sock, event_sock;
15728 char net_buffer[256];
15770 flag = 2 * 1024 * 1024;
15773 cm_msg(
MERROR,
"rpc_server_callback",
"cannot setsockopt(SOL_SOCKET, SO_RCVBUF), errno %d (%s)",
errno,
15778 cm_msg(
MERROR,
"rpc_server_callback",
"timeout on receive remote computer info");
15905 cm_msg(
MERROR,
"rpc_server_receive_rpc",
"rpc_execute() returned %d, abort",
status);
15932 cm_msg(
MTALK,
"rpc_server_receive_rpc",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16018 cm_msg(
MERROR,
"rpc_server_receive_event",
"internal error: called recursively");
16035 cm_msg(
MERROR,
"rpc_server_receive_event",
"recv_event_server_realloc() returned %d, abort",
n_received);
16063 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d (SS_ABORT), abort",
status);
16074 cm_msg(
MERROR,
"rpc_server_receive_event",
"bm_send_event() error %d, mserver dropped this event",
status);
16092 cm_msg(
MTALK,
"rpc_server_receive_event",
"Program \'%s\' on host \'%s\' aborted", sa->
prog_name.c_str(),
str);
16300 if (convert_flags) {
16309 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, send_tcp() returned %d",
16345 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec",
16364 cm_msg(
MINFO,
"rpc_check_channels",
"client \"%s\" on host \"%s\" failed watchdog test after %d sec, recv_tcp() returned %d",
16412#ifndef DOXYGEN_SHOULD_SKIP_THIS
16563 if (((
PTYPE) event & 0x07) != 0) {
16564 cm_msg(
MERROR,
"bk_create",
"Bank %s created with unaligned event pointer",
name);
16580 pbk32->data_size = 0;
16588 pbk->data_size = 0;
16594#ifndef DOXYGEN_SHOULD_SKIP_THIS
16715 }
while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16737 }
while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16759 }
while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16787 return pbk32a->data_size;
16792 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk32->name[0],
pbk32->name[1],
pbk32->name[2],
pbk32->name[3]);
16794 return pbk32->data_size;
16798 if (size > 0xFFFF) {
16799 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16802 pbk->data_size = (
WORD) (size);
16804 printf(
"Warning: TID_STRUCT bank %c%c%c%c has zero size\n",
pbk->name[0],
pbk->name[1],
pbk->name[2],
pbk->name[3]);
16806 if (size > 0xFFFF) {
16807 printf(
"Error: Bank size %d exceeds 16-bit limit of 65526, please use bk_init32() to create a 32-bit bank\n", size);
16811 return pbk->data_size;
16898 while ((
DWORD) ((
char *)
pbk32a - (
char *) event) <
16903 return pbk32a->data_size;
16911 while ((
DWORD) ((
char *)
pbk32 - (
char *) event) <
16916 return pbk32->data_size;
16924 while ((
DWORD) ((
char *)
pbk - (
char *) event) <
16929 return pbk->data_size;
17052 *((
void **)
pdata) = (*pbk) + 1;
17059 return (*pbk)->data_size;
17064#ifndef DOXYGEN_SHOULD_SKIP_THIS
17092 *((
void **)
pdata) = (*pbk) + 1;
17100 return (*pbk)->data_size;
17128 *((
void **)
pdata) = (*pbk32a) + 1;
17136 return (*pbk32a)->data_size;
17168 if (
pbh->flags < 0x10000 && !
force)
17214 while ((
char *)
pdata < (
char *)
pbk) {
17224 while ((
char *)
pdata < (
char *)
pbk) {
17233 while ((
char *)
pdata < (
char *)
pbk) {
17255#ifndef DOXYGEN_SHOULD_SKIP_THIS
17280#define MAX_RING_BUFFER 100
17372 assert(
rb[
i].buffer);
17469 rp >
rb[h].buffer) {
17531 cm_msg(
MERROR,
"rb_increment_wp",
"event size of %d MB larger than max_event_size of %d MB",
17542 assert(
rb[h].rp !=
rb[h].buffer);
17601 if (
rb[h].wp !=
rb[h].rp) {
17603 *p =
rb[handle - 1].
rp;
17669 if (
new_rp >= ep &&
rb[h].wp < ep)
17711 if (
rb[h].wp >=
rb[h].rp)
17729 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event %d ODB record size mismatch, db_set_record() status %d", pevent->
event_id,
status);
17754 for (
n=0 ; ;
n++) {
17810 cm_msg(
MERROR,
"cm_write_event_to_odb",
"please define bank \"%s\" in BANK_LIST in frontend",
name);
17815 for (
i = 0;;
i++) {
17828 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17842 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot create key for bank \"%s\" with tid %d in ODB, db_create_key() status %d",
name,
bktype,
status);
17847 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot find key for bank \"%s\" in ODB, after db_create_key(), db_find_key() status %d",
name,
status);
17854 cm_msg(
MERROR,
"cm_write_event_to_odb",
"cannot write bank \"%s\" to ODB, db_set_data1() status %d",
name,
status);
17868 cm_msg(
MERROR,
"cm_write_event_to_odb",
"event format %d is not supported (see midas.h definitions of FORMAT_xxx)", format);
std::atomic_bool connected
BUFFER * get_pbuf() const
bm_lock_buffer_guard(BUFFER *pbuf, bool do_not_lock=false)
bm_lock_buffer_guard & operator=(const bm_lock_buffer_guard &)=delete
bm_lock_buffer_guard(const bm_lock_buffer_guard &)=delete
static bool exists(const std::string &name)
INT transition(INT run_number, char *error)
INT al_get_alarms(std::string *presult)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
INT bk_iterate32a(const void *event, BANK32A **pbk32a, void *pdata)
static void copy_bk_name(char *dst, const char *src)
INT bk_swap(void *event, BOOL force)
BOOL bk_is32a(const void *event)
int bk_delete(void *event, const char *name)
BOOL bk_is32(const void *event)
INT bk_iterate32(const void *event, BANK32 **pbk, void *pdata)
INT bk_locate(const void *event, const char *name, void *pdata)
void bk_init(void *event)
INT bk_list(const void *event, char *bklist)
INT bk_copy(char *pevent, char *psrce, const char *bkname)
INT bk_iterate(const void *event, BANK **pbk, void *pdata)
void bk_init32(void *event)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_find(const BANK_HEADER *pbkh, const char *name, DWORD *bklen, DWORD *bktype, void **pdata)
INT bk_size(const void *event)
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
static int bm_skip_event(BUFFER *pbuf)
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_write_statistics_to_odb(void)
#define MAX_DEFRAG_EVENTS
INT bm_delete_request(INT request_id)
INT bm_close_all_buffers(void)
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
static int bm_validate_client_index_locked(bm_lock_buffer_guard &pbuf_guard)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
static int bm_validate_buffer_locked(const BUFFER *pbuf)
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
INT bm_compose_event_threadsafe(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
INT bm_remove_event_request(INT buffer_handle, INT request_id)
static double _bm_mutex_timeout_sec
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
static EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
static void bm_update_last_activity(DWORD millitime)
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
static DWORD _bm_max_event_size
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
INT bm_get_buffer_handle(const char *buffer_name, INT *buffer_handle)
int bm_send_event_vec(int buffer_handle, const std::vector< char > &event, int timeout_msec)
static int _bm_lock_timeout
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
INT bm_receive_event_alloc(INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
static void bm_reset_buffer_locked(BUFFER *pbuf)
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
INT cm_set_path(const char *path)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
static int cm_transition_call(TrState *s, int idx)
INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown)
static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info)
static std::atomic< std::thread * > _watchdog_thread
static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names)
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
INT cm_connect_client(const char *client_name, HNDLE *hConn)
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
INT cm_list_experiments_local(STRING_LIST *exp_names)
INT cm_start_watchdog_thread()
static INT bm_push_event(const char *buffer_name)
INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg)
static int tr_finish(HNDLE hDB, TrState *tr, int transition, int status, const char *errorstr)
INT cm_set_client_run_state(INT state)
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_stop_watchdog_thread()
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_check_client(HNDLE hDB, HNDLE hKeyClient)
static int xbm_lock_buffer(BUFFER *pbuf)
static void xbm_unlock_buffer(BUFFER *pbuf)
INT cm_dispatch_ipc(const char *message, int message_size, int client_socket)
static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_select_experiment_remote(const char *host_name, std::string *exp_name)
INT cm_register_server(void)
static BOOL _ctrlc_pressed
static void init_rpc_hosts(HNDLE hDB)
void cm_ack_ctrlc_pressed()
INT cm_execute(const char *command, char *result, INT bufsize)
INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
void cm_check_connect(void)
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
INT cm_select_experiment_local(std::string *exp_name)
std::string cm_expand_env(const char *str)
std::string cm_get_client_name()
static int bm_lock_buffer_mutex(BUFFER *pbuf)
int cm_exec_script(const char *odb_path_to_script)
INT EXPRT cm_get_path_string(std::string *path)
static void rpc_client_shutdown()
static std::atomic< bool > _watchdog_thread_is_running
static exptab_struct _exptab
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
static INT bm_notify_client(const char *buffer_name, int s)
int cm_get_exptab(const char *expname, std::string *dir, std::string *user)
static void xcm_watchdog_thread()
static bool test_cm_expand_env1(const char *str, const char *expected)
INT cm_synchronize(DWORD *seconds)
std::string cm_get_exptab_filename()
std::string cm_get_path()
static DWORD _deferred_transition_mask
std::string cm_get_history_path(const char *history_channel)
void cm_test_expand_env()
INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL))
int cm_set_experiment_local(const char *exp_name)
static INT _requested_transition
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
const char * cm_get_version()
INT cm_read_exptab(exptab_struct *exptab)
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
INT cm_deregister_transition(INT transition)
INT cm_check_deferred_transition()
std::string cm_get_experiment_name()
INT cm_set_transition_sequence(INT transition, INT sequence_number)
static bool tr_compare(const std::unique_ptr< TrClient > &arg1, const std::unique_ptr< TrClient > &arg2)
INT cm_delete_client_info(HNDLE hDB, INT pid)
static std::atomic< bool > _watchdog_thread_run
const char * cm_get_revision()
INT cm_watchdog_thread(void *unused)
INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient)
BOOL cm_is_ctrlc_pressed()
static INT tr_main_thread(void *param)
void cm_ctrlc_handler(int sig)
INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout)
INT cm_transition_cleanup()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
static int cm_transition_call_direct(TrClient *tr_client)
INT cm_exist(const char *name, BOOL bUnique)
INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg)
static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_set_experiment_name(const char *name)
#define CM_INVALID_TRANSITION
#define CM_DEFERRED_TRANSITION
#define CM_TRANSITION_IN_PROGRESS
#define CM_WRONG_PASSWORD
#define CM_TRANSITION_CANCELED
#define CM_VERSION_MISMATCH
#define BM_INVALID_MIXING
#define BM_INVALID_HANDLE
#define DB_INVALID_HANDLE
#define DB_NO_MORE_SUBKEYS
#define RPC_EXCEED_BUFFER
#define RPC_DOUBLE_DEFINED
#define RPC_NOT_REGISTERED
#define RPC_MUTEX_TIMEOUT
#define RPC_NO_CONNECTION
#define RPC_HNDLE_CONNECT
#define RPC_HNDLE_MSERVER
#define VALIGN(adr, align)
RPC_LIST * rpc_get_internal_list(INT flag)
#define MESSAGE_BUFFER_NAME
#define DRI_LITTLE_ENDIAN
#define MAX_STRING_LENGTH
#define MESSAGE_BUFFER_SIZE
void() EVENT_HANDLER(HNDLE buffer_handler, HNDLE request_id, EVENT_HEADER *event_header, void *event_data)
INT() RPC_HANDLER(INT index, void *prpc_param[])
std::string ss_gethostname()
INT ss_suspend(INT millisec, INT msg)
INT ss_get_struct_align()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_suspend_init_odb_port()
bool ss_event_socket_has_data()
time_t ss_mktime(struct tm *tms)
int ss_file_exist(const char *path)
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
INT recv_tcp2(int sock, char *net_buffer, int buffer_size, int timeout_ms)
INT ss_suspend_set_client_listener(int listen_socket)
INT ss_socket_get_peer_name(int sock, std::string *hostp, int *portp)
int ss_file_link_exist(const char *path)
INT ss_mutex_delete(MUTEX_T *mutex)
int ss_socket_wait(int sock, INT millisec)
INT ss_suspend_set_server_acceptions(RPC_SERVER_ACCEPTION_LIST *acceptions)
DWORD ss_settime(DWORD seconds)
char * ss_getpass(const char *prompt)
INT ss_suspend_set_client_connection(RPC_SERVER_CONNECTION *connection)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
INT recv_string(int sock, char *buffer, DWORD buffer_size, INT millisec)
INT ss_write_tcp(int sock, const char *buffer, size_t buffer_size)
int ss_dir_exist(const char *path)
bool ss_timed_mutex_wait_for_sec(std::timed_mutex &mutex, const char *mutex_name, double timeout_sec)
INT ss_semaphore_release(HNDLE semaphore_handle)
int ss_file_copy(const char *src, const char *dst, bool append)
INT recv_tcp(int sock, char *net_buffer, DWORD buffer_size, INT flags)
INT ss_resume(INT port, const char *message)
midas_thread_t ss_gettid(void)
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
INT ss_sleep(INT millisec)
INT ss_socket_connect_tcp(const char *hostname, int tcp_port, int *sockp, std::string *error_msg_p)
INT ss_semaphore_wait_for(HNDLE semaphore_handle, DWORD timeout_millisec)
INT ss_socket_listen_tcp(bool listen_localhost, int tcp_port, int *sockp, int *tcp_port_p, std::string *error_msg_p)
char * ss_crypt(const char *buf, const char *salt)
INT ss_spawnv(INT mode, const char *cmdname, const char *const argv[])
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
INT ss_socket_close(int *sockp)
char * ss_gets(char *string, int size)
INT ss_recv_net_command(int sock, DWORD *routine_id, DWORD *param_size, char **param_ptr, int timeout_ms)
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
INT send_tcp(int sock, char *buffer, DWORD buffer_size, INT flags)
void * ss_ctrlc_handler(void(*func)(int))
BOOL ss_pid_exists(int pid)
std::string ss_get_executable(void)
INT ss_system(const char *command)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT ss_file_find(const char *path, const char *pattern, char **plist)
static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages)
INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format,...)
int cm_msg_early_init(void)
INT EXPRT cm_msg_facilities(STRING_LIST *list)
int cm_msg_open_buffer(void)
int cm_msg_close_buffer(void)
static std::mutex gMsgBufMutex
static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message)
INT cm_msg_register(EVENT_HANDLER *func)
INT cm_msg_log(INT message_type, const char *facility, const char *message)
INT cm_msg_flush_buffer()
static std::deque< msg_buffer_entry > gMsgBuf
static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message)
std::string cm_get_error(INT code)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr)
INT cm_msg_retrieve(INT n_message, char *message, INT buf_size)
INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages)
void cm_msg_get_logfile(const char *fac, time_t t, std::string *filename, std::string *linkname, std::string *linktarget)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
struct rpc_server_acception_struct RPC_SERVER_ACCEPTION
BOOL equal_ustring(const char *str1, const char *str2)
INT db_flush_database(HNDLE hDB)
INT db_get_data_index(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT idx, DWORD type)
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
INT db_check_client(HNDLE hDB, HNDLE hKeyClient)
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
INT db_open_record(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info)
INT db_open_database(const char *xdatabase_name, INT database_size, HNDLE *hDB, const char *client_name)
void db_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
INT db_lock_database(HNDLE hDB)
std::string strcomb1(const char **list)
INT db_get_data(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, DWORD type)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_unlock_database(HNDLE hDB)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_get_key(HNDLE hDB, HNDLE hKey, KEY *key)
INT EXPRT db_get_value_string(HNDLE hdb, HNDLE hKeyRoot, const char *key_name, int index, std::string *s, BOOL create, int create_string_length)
INT db_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last)
INT db_set_data_index(HNDLE hDB, HNDLE hKey, const void *data, INT data_size, INT idx, DWORD type)
INT db_close_all_records()
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_close_all_databases(void)
INT db_set_data(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_sprintf(char *string, const void *data, INT data_size, INT idx, DWORD type)
INT db_update_last_activity(DWORD millitime)
INT db_set_data1(HNDLE hDB, HNDLE hKey, const void *data, INT buf_size, INT num_values, DWORD type)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_update_record_local(INT hDB, INT hKeyRoot, INT hKey, int index)
void db_set_watchdog_params(DWORD timeout)
INT db_update_record_mserver(INT hDB, INT hKeyRoot, INT hKey, int index, int client_socket)
void db_cleanup2(const char *client_name, int ignore_timeout, DWORD actual_time, const char *who)
int db_delete_client_info(HNDLE hDB, int pid)
INT db_set_client_name(HNDLE hDB, const char *client_name)
INT db_notify_clients_array(HNDLE hDB, HNDLE hKeys[], INT size)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
INT EXPRT db_set_value_string(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const std::string *s)
INT db_set_lock_timeout(HNDLE hDB, int timeout_millisec)
INT db_set_num_values(HNDLE hDB, HNDLE hKey, INT num_values)
INT db_protect_database(HNDLE hDB)
int rb_get_rp(int handle, void **p, int millisec)
int rb_delete(int handle)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
static volatile int _rb_nonblocking
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
int rb_get_buffer_level(int handle, int *n_bytes)
static RING_BUFFER rb[MAX_RING_BUFFER]
INT rpc_add_allowed_host(const char *hostname)
void rpc_convert_data(void *data, INT tid, INT flags, INT total_size, INT convert_flags)
INT rpc_client_connect(const char *host_name, INT port, const char *client_name, HNDLE *hConnection)
#define RPC_BM_ADD_EVENT_REQUEST
INT rpc_register_server(int port, int *plsock, int *pport)
#define RPC_CM_CHECK_CLIENT
static int recv_event_server_realloc(INT idx, RPC_SERVER_ACCEPTION *psa, char **pbuffer, int *pbuffer_size)
INT rpc_get_opt_tcp_size()
INT rpc_client_disconnect(HNDLE hConn, BOOL bShutdown)
#define RPC_BM_SEND_EVENT
INT rpc_client_call(HNDLE hConn, DWORD routine_id,...)
INT rpc_register_functions(const RPC_LIST *new_list, RPC_HANDLER func)
static std::atomic_bool gAllowedHostsEnabled(false)
INT rpc_server_callback(struct callback_addr *pcallback)
static std::mutex _client_connections_mutex
INT rpc_set_timeout(HNDLE hConn, int timeout_msec, int *old_timeout_msec)
#define RPC_CM_SYNCHRONIZE
static std::mutex gAllowedHostsMutex
INT recv_tcp_check(int sock)
INT rpc_server_receive_rpc(int idx, RPC_SERVER_ACCEPTION *sa)
#define RPC_BM_GET_BUFFER_INFO
#define RPC_CM_SET_CLIENT_INFO
const char * rpc_get_mserver_path()
static RPC_SERVER_ACCEPTION * rpc_get_server_acception(int idx)
RPC_SERVER_ACCEPTION * rpc_get_mserver_acception()
void rpc_calc_convert_flags(INT hw_type, INT remote_hw_type, INT *convert_flags)
INT rpc_server_connect(const char *host_name, const char *exp_name)
std::string rpc_get_name()
static RPC_SERVER_ACCEPTION * rpc_new_server_acception()
#define RPC_BM_REMOVE_EVENT_REQUEST
void rpc_debug_printf(const char *format,...)
const char * rpc_tid_name_old(INT id)
int cm_query_transition(int *transition, int *run_number, int *trans_time)
#define RPC_RC_TRANSITION
void rpc_va_arg(va_list *arg_ptr, INT arg_type, void *arg)
INT rpc_server_loop(void)
INT rpc_clear_allowed_hosts()
std::string rpc_get_mserver_hostname(void)
INT rpc_deregister_functions()
bool rpc_is_connected(void)
INT rpc_set_mserver_path(const char *path)
static std::vector< RPC_LIST > rpc_list
#define RPC_BM_CLOSE_BUFFER
static TLS_POINTER * tls_buffer
#define RPC_BM_SET_CACHE_SIZE
static std::vector< RPC_CLIENT_CONNECTION * > _client_connections
static void rpc_call_encode(va_list &ap, const RPC_LIST &rl, NET_COMMAND **nc)
static RPC_CLIENT_CONNECTION * rpc_get_locked_client_connection(HNDLE hConn)
static std::vector< std::string > gAllowedHosts
INT rpc_call(DWORD routine_id,...)
static std::mutex rpc_list_mutex
const char * rpc_tid_name(INT id)
INT rpc_register_client(const char *name, RPC_LIST *list)
static std::vector< RPC_SERVER_ACCEPTION * > _server_acceptions
static INT rpc_socket_check_allowed_host(int sock)
#define RPC_BM_CLOSE_ALL_BUFFERS
INT rpc_server_shutdown(void)
int rpc_flush_event_socket(int timeout_msec)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
static TR_FIFO _tr_fifo[10]
static std::string _mserver_path
INT rpc_get_timeout(HNDLE hConn)
INT rpc_server_disconnect()
INT rpc_set_debug(void(*func)(const char *), INT mode)
INT rpc_client_accept(int lsock)
void rpc_vax2ieee_float(float *var)
INT rpc_execute(INT sock, char *buffer, INT convert_flags)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_GET_BUFFER_LEVEL
int rpc_name_tid(const char *name)
INT rpc_register_listener(int port, RPC_HANDLER func, int *plsock, int *pport)
INT rpc_server_receive_event(int idx, RPC_SERVER_ACCEPTION *sa, int timeout_msec)
#define RPC_BM_OPEN_BUFFER
#define RPC_BM_EMPTY_BUFFERS
INT rpc_server_accept(int lsock)
static std::mutex _tr_fifo_mutex
bool rpc_is_mserver(void)
static RPC_SERVER_ACCEPTION * _mserver_acception
static int recv_net_command_realloc(INT idx, char **pbuf, int *pbufsize, INT *remaining)
void rpc_ieee2vax_float(float *var)
#define RPC_CM_SET_WATCHDOG_PARAMS
INT rpc_send_event1(INT buffer_handle, const EVENT_HEADER *pevent)
#define RPC_BM_RECEIVE_EVENT
static bool _rpc_is_remote
static int rpc_call_decode(va_list &ap, const RPC_LIST &rl, const char *buf, size_t buf_size)
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
INT rpc_set_name(const char *name)
INT rpc_register_function(INT id, INT(*func)(INT, void **))
#define RPC_CM_MSG_RETRIEVE
#define RPC_BM_SKIP_EVENT
INT rpc_client_dispatch(int sock)
#define RPC_BM_INIT_BUFFER_COUNTERS
static RPC_SERVER_CONNECTION _server_connection
INT rpc_check_channels(void)
static INT rpc_transition_dispatch(INT idx, void *prpc_param[])
static int handle_msg_odb(int n, const NET_COMMAND *nc)
#define RPC_BM_FLUSH_CACHE
void rpc_ieee2vax_double(double *var)
void rpc_vax2ieee_double(double *var)
#define RPC_CM_GET_WATCHDOG_INFO
INT rpc_get_convert_flags(void)
INT rpc_check_allowed_host(const char *hostname)
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
char exp_name[NAME_LENGTH]
BOOL debug
debug printouts
char host_name[HOST_NAME_LENGTH]
char expt_name[NAME_LENGTH]
char buffer_name[NAME_LENGTH]
static const int tid_size[]
static std::string join(const char *sep, const std::vector< std::string > &v)
static std::vector< TRANS_TABLE > _trans_table
static std::atomic_int _message_mask_system
static int disable_bind_rpc_to_localhost
static std::mutex gBuffersMutex
int(* MessagePrintCallback)(const char *)
std::string cm_transition_name(int transition)
static DBG_MEM_LOC * _mem_loc
static std::vector< BUFFER * > gBuffers
static void(* _debug_print)(const char *)
static std::mutex _trans_table_mutex
static std::string _experiment_name
static std::string _path_name
static std::string _client_name
static std::vector< EventRequest > _request_list
static int _rpc_connect_timeout
static const char * tid_name[]
static const ERROR_TABLE _error_table[]
static INT _watchdog_timeout
INT bm_get_buffer_info(INT buffer_handle, BUFFER_HEADER *buffer_header)
static MUTEX_T * _mutex_rpc
static EVENT_HANDLER * _msg_dispatch
void * dbg_calloc(unsigned int size, unsigned int count, char *file, int line)
static BOOL _rpc_registered
static std::atomic< MessagePrintCallback > _message_print
bool ends_with_char(const std::string &s, char c)
void dbg_free(void *adr, char *file, int line)
static std::atomic_int _message_mask_user
static std::mutex _request_list_mutex
INT bm_get_buffer_level(INT buffer_handle, INT *n_bytes)
static TRANS_TABLE _deferred_trans_table[]
static int _rpc_listen_socket
std::string msprintf(const char *format,...)
void * dbg_malloc(unsigned int size, char *file, int line)
static const char * tid_name_old[]
static std::vector< std::string > split(const char *sep, const std::string &s)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
INT bm_init_buffer_counters(INT buffer_handle)
#define DIR_SEPARATOR_STR
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_RPC_TIMEOUT
#define PROGRAM_INFO_STR(_name)
#define MIN_WRITE_CACHE_SIZE
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
#define DEFAULT_MAX_EVENT_SIZE
#define WATCHDOG_INTERVAL
#define MAX_EVENT_REQUESTS
#define BANK_FORMAT_64BIT_ALIGNED
#define BANK_FORMAT_32BIT
std::vector< std::string > STRING_LIST
#define MAX_WRITE_CACHE_SIZE_DIV
#define TRANSITION_ERROR_STRING_LENGTH
#define BANK_FORMAT_VERSION
#define message(type, str)
#define write(n, a, f, d)
static std::string remove(const std::string s, char c)
int gettimeofday(struct timeval *tp, void *tzp)
struct callback_addr callback
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
BUFFER_CLIENT client[MAX_CLIENTS]
BUFFER_INFO(BUFFER *pbuf)
int client_count_write_wait[MAX_CLIENTS]
DWORD client_time_write_wait[MAX_CLIENTS]
int client_count_write_wait[MAX_CLIENTS]
char buffer_name[NAME_LENGTH]
EVENT_HANDLER * dispatcher
NET_COMMAND_HEADER header
unsigned int max_event_size
std::atomic< std::thread * > thread
std::atomic_bool finished
std::vector< int > wait_for_index
std::string waiting_for_client
std::vector< std::unique_ptr< TrClient > > clients
unsigned short host_port1
unsigned short host_port2
unsigned short host_port3
std::vector< exptab_entry > exptab
std::mutex event_sock_mutex
static double fac(double a)
static te_expr * list(state *s)