29#define ODB_UPDATE_TIME 1000
31#define DEFAULT_FE_TIMEOUT 10000
33#define MAX_N_THREADS 32
140 printf(
"Started run %d\n",
rn);
180 printf(
"Run stopped\n");
190 cm_msg(
MERROR,
"tr_stop",
"bm_flush_cache(BM_WAIT) error %d", err);
199 eq->stats.events_sent +=
eq->events_sent;
200 eq->stats.events_per_sec = 0;
201 eq->stats.kbytes_per_sec = 0;
276 assert(
info != NULL);
280 path +=
"/Equipment/";
286 int size =
sizeof(
eq->info);
289 cm_msg(
MINFO,
"eq_common_watcher",
"db_get_record(%s) status %d", path.c_str(),
status);
315 for (idx = 0;
equipment[idx].name[0]; idx++) {
320 printf(
"\nEvent ID 0 for %s not allowed\n",
equipment[idx].
name);
329#pragma GCC diagnostic push
330#pragma GCC diagnostic ignored "-Wformat-nonliteral"
341 if (strchr(eq_info->
buffer,
'%')) {
349 if (strchr(eq_info->
buffer,
'%'))
350 *strchr(eq_info->
buffer,
'%') = 0;
352#pragma GCC diagnostic pop
356 size_t namepos =
name.find(
"${HOSTNAME}");
359 if (namepos != std::string::npos) {
361 std::string before =
name.substr(0, namepos);
362 std::string after =
name.substr(namepos + 11);
366 size_t p = thishost.find(
'.');
367 thishost = thishost.substr(0, p == std::string::npos ? thishost.length() : p);
371 if (
name.length() >= 32) {
373 "Equipment name %s%s%s too long, trimming down to %d characters",
374 before.c_str(),thishost.c_str(),after.c_str(),
389 printf(
"ERROR: Cannot create equipment record \"%s\", db_set_record() status %d\n",
str,
status);
395 cm_msg(
MINFO,
"register_equipment",
"Correcting \"%s\", db_check_record() status %d",
str,
status);
398 printf(
"ERROR: Cannot check equipment record \"%s\", db_check_record() status %d\n",
str,
status);
406 printf(
"ERROR: Cannot find \"%s\", db_find_key() status %d",
str,
status);
414 printf(
"ERROR: Cannot check record \"%s\", db_check_record() status %d",
str,
status);
425 double prev_event_limit;
427 size =
sizeof(prev_enabled);
429 size =
sizeof(prev_hidden);
431 size =
sizeof(prev_event_limit);
435 printf(
"ERROR: Cannot set record \"%s\", db_set_record() status %d",
str,
status);
440 eq_info->
enabled = prev_enabled;
442 eq_info->
hidden = prev_hidden;
452 printf(
"ERROR: Cannot get record \"%s\", db_get_record() status %d",
str,
status);
462 printf(
"ERROR: Cannot hotlink \"%s\", db_watch() status %d",
str,
status);
480 mstrlcat(eq_info->
status,
"@",
sizeof(eq_info->
status));
488 cm_msg(
MERROR,
"register_equipment",
"Cannot update equipment Common, db_set_record() status %d",
status);
498 cm_msg(
MERROR,
"register_equipment",
"Equipment \"%s\" contains RO_STOPPED or RO_ALWAYS. This can lead to undesired side-effect and should be removed.",
equipment[idx].
name);
511 for (; bank_list->
name[0]; bank_list++) {
522 printf(
"Cannot check/create record \"%s\", status = %d\n",
str,
548 printf(
"Cannot create/check statistics record \'%s\', error %d\n",
str,
status);
554 printf(
"Cannot find statistics record \'%s\', error %d\n",
str,
status);
569 "Cannot change access mode for record \'%s\', error %d",
str,
status);
571 cm_msg(
MINFO,
"register_equipment",
"Recovered access mode for record \'%s\'",
str);
576 "Cannot open statistics record \'%s\', error %d",
str,
status);
589 "Not enough memory to allocate buffer for fragmented events");
604 n_events = (
int *) calloc(
sizeof(
int), idx);
620 for (idx = 0;
equipment[idx].name[0]; idx++) {
632 "Interrupt readout cannot be combined with polled readout");
648 "Equipment %s disabled in frontend",
660 "Equipment \"%s\" cannot be of type EQ_INTERRUPT and EQ_POLLED at the same time",
664 "Equipment \"%s\" cannot be of type EQ_INTERRUPT and EQ_MULTITHREAD at the same time",
677 printf(
"\nCalibrating");
702 if (
count > 2147483647.0) {
703 count = 2147483647.0;
707 }
while (
delta_time > eq_info->
period * 1.2 || delta_time < eq_info->period * 0.8);
721 "Multi-threaded readout cannot be combined with polled readout for equipment \'%s\'",
equipment[
i].
name);
729 "Defined more than one equipment with multi-threaded readout for equipment \'%s\'",
equipment[
i].
name);
743 "Equipment %s disabled in frontend",
756 "Equipment %s disabled in frontend",
769 cm_msg(
MERROR,
"initialize_equipment",
"Found slow control equipment \"%s\" with no device driver list, aborting",
788 sprintf(tmp,
"%d",
n++);
803 strcpy(
str,
"Hardware error");
805 strcpy(
str,
"ODB error");
807 strcpy(
str,
"Driver error");
809 strcpy(
str,
"Partially disabled");
811 strcpy(
str,
"Error");
838 if (!manual_trig_flag)
841 manual_trig_flag =
TRUE;
846 for (idx = 0;
equipment[idx].name[0]; idx++) {
856 cm_msg(
MINFO,
"initialize_equipment",
"Slow control equipment initialized");
868 for (idx = 0;
equipment[idx].name[0]; idx++)
873 sprintf(
str,
"/Equipment/%s/Common",
name);
877 mstrlcpy(
str, equipment_status,
sizeof(
str));
880 mstrlcpy(
str, status_class,
sizeof(
str));
931 "Event size %ld larger than maximum size %d for frag. ev.",
945 pd = (
unsigned char *) (pfragment + 1);
947 for (
i = 0;
i < 4;
i++) {
948 pd[
i] = (
unsigned char) (size & 0xFF);
954 pdata = (
char *) (pevent + 1);
956 for (
i = 0, sent = 0; sent < pevent->
data_size;
i++) {
969 memcpy(pfragment + 1, pdata, size);
1001 cm_msg(
MERROR,
"send_event",
"Event size %ld larger than maximum size %d",
1133 assert(
rbh[
i] == 0);
1202 *(
INT *) (pevent + 1) = source;
1217 "Event size %ld larger than maximum size %d",
1245 static unsigned int last_event_time = 0;
1246 static unsigned int last_error = 0;
1247 unsigned int last_serial = 0;
1249 unsigned int serial =
eq->events_collected;
1267 last_event_time > 0 &&
ss_millitime() > last_event_time + 5000) {
1268 if (
ss_time() - last_error > 30) {
1271 "Event collector: waiting for event serial %d since %1.1lf seconds, received already serial %d",
1282 if (pevent->data_size) {
1283 if (
eq->buffer_handle) {
1290 if (pevent->serial_number == 0)
1305 if (
eq->info.num_subevents)
1306 eq->events_sent +=
eq->subevent_number;
1310 eq->events_collected++;
1327 for (
int idx = 0;
equipment[idx].name[0]; idx++) {
1351 memset(
str,
' ', 159);
1355 msg = strchr(msg,
']') + 2;
1357 memcpy(
str, msg, strlen(msg));
1376 strcpy(
str,
"<local>");
1380 "================================================================================");
1386 "================================================================================");
1388 "Equipment Status Events Events/sec Rate[B/s] ODB->FE FE->ODB");
1390 "--------------------------------------------------------------------------------");
1398 ctime_r(&full_time, ctimebuf);
1399 mstrlcpy(
str, ctimebuf + 11,
sizeof(
str));
1423 else if (
equipment[
i].stats.events_sent > 1E6)
1430 else if (
equipment[
i].stats.events_per_sec > 1E3)
1437 else if (
equipment[
i].stats.kbytes_per_sec < 1E3)
1460 ctime_r(&full_time, ctimebuf);
1461 mstrlcpy(
str, ctimebuf + 11,
sizeof(
str));
1469 printf(
"%6.3lfM",
equipment[
i].stats.events_per_sec / 1E6);
1470 else if (
equipment[
i].stats.events_per_sec > 1E3)
1471 printf(
"%6.3lfk",
equipment[
i].stats.events_per_sec / 1E3);
1473 printf(
"%6.1lf ",
equipment[
i].stats.events_per_sec);
1483 static DWORD last_wheel = 0, wheel_index = 0;
1484 static char wheel_char[] = {
'-',
'\\',
'|',
'/'};
1489 ss_printf(79, 2,
"%c", wheel_char[wheel_index]);
1490 wheel_index = (wheel_index + 1) % 4;
1510 strcpy(
str,
"MIDAS");
1528 DWORD readout_start, sent, size;
1537 for (idx = 0;; idx++) {
1539 eq_info = &
eq->info;
1575 *(
INT *) (pevent + 1) = source;
1577 if (
eq->info.num_subevents) {
1578 eq->subevent_number = 0;
1580 *(
INT *) ((
char *) (pevent + 1) + pevent->
data_size) = source;
1583 size =
eq->readout((
char *) (pevent + 1), pevent->
data_size);
1588 "Event size %ld larger than maximum size %d",
1593 eq->subevent_number++;
1594 eq->serial_number++;
1601 if (source ==
FALSE) {
1608 }
while (source ==
FALSE);
1610 }
while (
eq->subevent_number <
eq->info.num_subevents && source);
1613 pevent->
data_size =
eq->readout((
char *) (pevent + 1), -1);
1616 pevent->
data_size =
eq->readout((
char *) (pevent + 1), 0);
1622 "Event size %ld larger than maximum size %d for frag. ev.",
1630 "Event size %ld larger than maximum size %d",
1639 eq->serial_number++;
1655 pd = (
unsigned char *) (pfragment + 1);
1657 for (
i = 0;
i < 4;
i++) {
1658 pd[
i] = (
unsigned char) (size & 0xFF);
1664 pdata = (
char *) (pevent + 1);
1666 for (
i = 0, sent = 0; sent < pevent->
data_size;
i++) {
1679 memcpy(pfragment + 1, pdata, size);
1720 if (
eq->info.num_subevents) {
1721 eq->events_sent +=
eq->subevent_number;
1722 events_sent +=
eq->subevent_number;
1752 INT idx, events_sent;
1757 for (idx = 0;; idx++) {
1759 eq_info = &
eq->info;
1790 DWORD last_time_network = 0, last_time_display = 0, last_time_flush = 0,
1791 readout_start, sent, size, last_time_rate = 0;
1797 INT opt_max = 0, opt_index = 0, opt_tcp_size = 128, opt_cnt = 0;
1816 for (idx = 0;; idx++) {
1818 eq_info = &
eq->info;
1850 if (eq_info->
period == 0)
1896 *(
INT *) (pevent + 1) = source;
1898 if (
eq->info.num_subevents) {
1899 eq->subevent_number = 0;
1901 *(
INT *) ((
char *) (pevent + 1) + pevent->
data_size) = source;
1904 size =
eq->readout((
char *) (pevent + 1), pevent->
data_size);
1909 "Event size %ld larger than maximum size %d",
1914 eq->subevent_number++;
1915 eq->serial_number++;
1922 if (source ==
FALSE) {
1929 }
while (source ==
FALSE);
1931 }
while (
eq->subevent_number <
eq->info.num_subevents && source);
1934 pevent->
data_size =
eq->readout((
char *) (pevent + 1), -1);
1937 pevent->
data_size =
eq->readout((
char *) (pevent + 1), 0);
1943 "Event size %ld larger than maximum size %d for frag. ev.",
1951 "Event size %ld larger than maximum size %d",
1960 eq->serial_number++;
1976 pd = (
unsigned char *) (pfragment + 1);
1978 for (
i = 0;
i < 4;
i++) {
1979 pd[
i] = (
unsigned char) (size & 0xFF);
1985 pdata = (
char *) (pevent + 1);
1987 for (
i = 0, sent = 0; sent < pevent->
data_size;
i++) {
2000 memcpy(pfragment + 1, pdata, size);
2041 if (
eq->info.num_subevents)
2042 eq->events_sent +=
eq->subevent_number;
2076 if ((
int) size == -1)
2111 size =
sizeof(flag);
2116 size =
sizeof(delay);
2122 force_update =
TRUE;
2177 force_update =
FALSE;
2181 eq->stats.events_sent +=
eq->events_sent;
2183 eq->events_sent = 0;
2192 eq->stats.events_per_sec = ((int)(
e * 100 + 0.5)) / 100.0;
2195 eq->stats.kbytes_per_sec = ((int)(
e * 1000 + 0.5)) / 1000.0;
2212 ss_printf(0, opt_index,
"%6d : %5.1lf %5.1lf", opt_tcp_size,
2214 if (++opt_cnt == 10) {
2218 opt_tcp_size = 1 << (opt_index + 7);
2220 if (1 << (opt_index + 7) > 0x8000) {
2222 opt_tcp_size = 1 << 7;
2275 for (
j = 0;
j <
i;
j++)
2291 cm_msg(
MERROR,
"scheduler",
"bm_flush_cache(BM_NO_WAIT) returned status %d", err);
2306 size =
sizeof(
state);
2309 cm_msg(
MERROR,
"scheduler",
"cannot get Runinfo/State in database");
2320 cm_msg(
MERROR,
"main",
"aborting on attempt to use invalid run number %d",
2376#define MFE_ERROR_SIZE 10
2391 cm_msg(
MERROR,
"mfe_set_error",
"Cannot create mutex\n");
2439int mfe(
char *ahost_name,
char *aexp_name,
BOOL adebug)
2455 signal(SIGPIPE, SIG_IGN);
2471 _argv = (
char **) malloc(
sizeof(
char *) * argc);
2472 for (
i = 0;
i < argc;
i++) {
2477 for (
i = 1;
i < argc;
i++) {
2478 if (argv[
i][0] ==
'-' && argv[
i][1] ==
'd')
2480 else if (argv[
i][0] ==
'-' && argv[
i][1] ==
'D')
2482 else if (argv[
i][0] ==
'-' && argv[
i][1] ==
'O')
2484 else if (argv[
i][1] ==
'v') {
2485 if (
i < argc - 1 && atoi(argv[
i + 1]) > 0)
2489 }
else if (argv[
i][0] ==
'-') {
2490 if (
i + 1 >= argc || argv[
i + 1][0] ==
'-')
2492 if (argv[
i][1] ==
'e')
2494 else if (argv[
i][1] ==
'h')
2496 else if (argv[
i][1] ==
'i')
2498 else if (argv[
i][1] ==
'-') {
2500 printf(
"usage: frontend [-h Hostname] [-e Experiment] [-d] [-D] [-O] [-v <n>] [-i <n>]\n");
2501 printf(
" [-d] Used to debug the frontend\n");
2502 printf(
" [-D] Become a daemon\n");
2503 printf(
" [-O] Become a daemon but keep stdout\n");
2504 printf(
" [-v <n>] Set verbosity level\n");
2505 printf(
" [-i <n>] Set frontend index (used for event building)\n");
2517 cm_msg(
MERROR,
"mainFE",
"Not enough mem space for event size");
2525 if (getenv(
"MIDAS_FRONTEND_INDEX"))
2534 printf(
"\nBecoming a daemon...\n");
2546 printf(
"Connect to experiment on host %s...\n",
host_name);
2548 printf(
"Connect to experiment %s...\n",
exp_name);
2550 printf(
"Connect to experiment...\n");
2595 printf(
"Failed to start local RPC server");
2627 printf(
"Init hardware...\n");
2650 int max_allowed_buffer_size = 1024 * 1024 * 1024;
2655 cm_msg(
MERROR,
"mfe_main",
"event_buffer_size %d MB exceeds maximum allowed size of %d MB\n",
2665 cm_msg(
MERROR,
"mfe_main",
"Error status %d received from initialize_equipment, aborting",
status);
2766int begin_of_run(
int runno,
char *errstr) {
return 0; };
2767int end_of_run(
int runno,
char *errstr) {
return 0; };
2768int pause_run(
int runno,
char *errstr) {
return 0; };
2769int resume_run(
int runno,
char *errstr) {
return 0; };
INT transition(INT run_number, char *error)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_shutdown(const char *name, BOOL bUnique)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_start_watchdog_thread()
INT cm_set_client_run_state(INT state)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_cleanup(const char *client_name, BOOL ignore_timeout)
INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
INT cm_synchronize(DWORD *seconds)
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
INT cm_check_deferred_transition()
BOOL cm_is_ctrlc_pressed()
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
#define DB_STRUCT_MISMATCH
#define DB_NO_MORE_SUBKEYS
#define CMD_INTERRUPT_ATTACH
#define CMD_INTERRUPT_DISABLE
#define CMD_INTERRUPT_ENABLE
#define FE_PARTIALLY_DISABLED
#define CMD_INTERRUPT_DETACH
std::string ss_gethostname()
INT ss_mutex_release(MUTEX_T *mutex)
INT ss_getchar(BOOL reset)
INT ss_mutex_create(MUTEX_T **mutex, BOOL recursive)
midas_thread_t ss_thread_create(INT(*thread_func)(void *), void *param)
INT ss_daemon_init(BOOL keep_stdout)
INT ss_sleep(INT millisec)
void ss_printf(INT x, INT y, const char *format,...)
INT ss_mutex_wait_for(MUTEX_T *mutex, INT timeout)
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
INT cm_set_msg_print(INT system_mask, INT user_mask, int(*func)(const char *))
BOOL equal_ustring(const char *str1, const char *str2)
INT db_send_changed_records()
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
std::string strcomb1(const char **list)
INT db_get_record1(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT align, const char *rec_str)
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
INT db_check_record(HNDLE hDB, HNDLE hKey, const char *keyname, const char *rec_str, BOOL correct)
INT db_get_record(HNDLE hDB, HNDLE hKey, void *data, INT *buf_size, INT align)
INT db_set_mode(HNDLE hDB, HNDLE hKey, WORD mode, BOOL recurse)
INT db_watch(HNDLE hDB, HNDLE hKey, void(*dispatcher)(INT, INT, INT, void *), void *info)
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT db_open_record1(HNDLE hDB, HNDLE hKey, void *ptr, INT rec_size, WORD access_mode, void(*dispatcher)(INT, INT, void *), void *info, const char *rec_str)
INT db_set_record(HNDLE hDB, HNDLE hKey, void *data, INT buf_size, INT align)
INT db_enum_key(HNDLE hDB, HNDLE hKey, INT idx, HNDLE *subkey_handle)
INT db_create_record(HNDLE hDB, HNDLE hKey, const char *orig_key_name, const char *init_str)
int rb_get_rp(int handle, void **p, int millisec)
int rb_get_wp(int handle, void **p, int millisec)
int rb_increment_rp(int handle, int size)
int rb_increment_wp(int handle, int size)
int rb_create(int size, int max_event_size, int *handle)
INT rpc_call(DWORD routine_id,...)
INT rpc_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, INT async_flag, INT mode)
INT rpc_set_opt_tcp_size(INT tcp_size)
#define RPC_BM_FLUSH_CACHE
#define DEFAULT_FE_TIMEOUT
void set_rate_period(int ms)
static INT tr_resume(INT rn, char *error)
static int flush_user_events(void)
char exp_name[NAME_LENGTH]
std::atomic< bool > _stop_all_threads(false)
void readout_enable(bool flag)
static int rbh[MAX_N_THREADS]
static std::atomic< bool > _readout_enabled_flag(false)
static void eq_common_watcher(INT hDB, INT, INT, void *info)
static int message_print(const char *msg)
static void send_all_periodic_events(INT transition)
INT manual_trigger(INT, void *prpc_param[])
static int send_event(INT idx, BOOL manual_trig)
static INT initialize_equipment(void)
void signal_readout_thread_active(int index, int flag)
int create_event_rb(int i)
int is_readout_thread_active()
void(* mfe_error_dispatcher)(const char *)
EQUIPMENT * multithread_eq
void mfe_get_args(int *argc, char ***argv)
BOOL debug
debug printouts
static int receive_trigger_event(EQUIPMENT *eq)
static INT check_user_events(void)
INT manual_trigger_event_id
static INT tr_pause(INT rn, char *error)
static INT frontend_index
void mfe_error(const char *error)
char full_frontend_name[256]
static INT check_polled_events(void)
static INT tr_start(INT rn, char *error)
static int _readout_thread(void *param)
static volatile int readout_thread_active[MAX_N_THREADS]
void mfe_set_error(void(*dispatcher)(const char *))
static void update_odb(const EVENT_HEADER *pevent, HNDLE hKey, INT format)
char mfe_error_str[MFE_ERROR_SIZE][256]
void mfe_error_check(void)
BOOL lockout_readout_thread
static INT register_equipment(void)
static INT tr_stop(INT rn, char *error)
int set_equipment_status(const char *name, const char *equipment_status, const char *status_class)
bool readout_enabled(void)
bool is_readout_thread_enabled()
void stop_readout_threads()
static void interrupt_routine(void)
char host_name[HOST_NAME_LENGTH]
const char * frontend_file_name
The frontend file name, don't change it.
BOOL equipment_common_overwrite
INT frontend_init(void)
Frontend initialization.
const char * frontend_name
The frontend name (client name) as seen by other MIDAS clients.
INT begin_of_run(__attribute__((unused)) INT rn, __attribute__((unused)) char *error)
INT frontend_exit()
Frontend exit.
INT poll_event(__attribute__((unused)) INT source, __attribute__((unused)) INT count, __attribute__((unused)) BOOL test)
INT interrupt_configure(__attribute__((unused)) INT cmd, __attribute__((unused)) INT source, __attribute__((unused)) PTYPE adr)
INT end_of_run(__attribute__((unused)) INT rn, __attribute__((unused)) char *error)
INT frontend_loop()
Frontend loop.
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
#define EQUIPMENT_STATISTICS_STR
#define DEFAULT_BUFFER_SIZE
#define EQUIPMENT_COMMON_STR
#define TRANSITION_ERROR_STRING_LENGTH
#define driver(name, count)
#define equipment(name, id, type, source, readon, period, readout, cd, driver)
char frontend_file_name[256]
char frontend_host[NAME_LENGTH]
char frontend_name[NAME_LENGTH]
char status_color[NAME_LENGTH]
INT(* readout)(char *, INT)