Line data Source code
1 : /********************************************************************\
2 :
3 : Name: mfe.c
4 : Created by: Stefan Ritt
5 :
6 : Contents: The system part of the MIDAS frontend. Has to be
7 : linked with user code to form a complete frontend
8 :
9 : \********************************************************************/
10 :
11 : #undef NDEBUG// midas required assert() to be always enabled
12 :
13 : #include "mfe.h"
14 :
15 : #include "midas.h"
16 : #include "msystem.h"
17 : #include "mstrlcpy.h"
18 :
19 : #include <assert.h>
20 : #include <stdio.h>
21 :
22 :
23 : /*------------------------------------------------------------------*/
24 :
25 : /* globals */
26 :
27 : INT rpc_mode = 1; // 0 for RPC socket, 1 for event socket
28 :
29 : #define ODB_UPDATE_TIME 1000 /* 1 seconds for ODB update */
30 :
31 : #define DEFAULT_FE_TIMEOUT 10000 /* 10 seconds for watchdog timeout */
32 :
33 : #define MAX_N_THREADS 32 /* maximum number of readout threads */
34 :
35 : INT run_state = 0; /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
36 : INT run_number = 0;
37 : DWORD actual_time = 0; /* current time in seconds since 1970 */
38 : DWORD actual_millitime = 0; /* current time in milliseconds */
39 : DWORD rate_period = 0; /* period in ms for rate calculations */
40 :
41 : int gWriteCacheSize = 0; /* remember max write cache size to use in periodic flush buffer */
42 :
43 : char host_name[HOST_NAME_LENGTH];
44 : char exp_name[NAME_LENGTH];
45 : char full_frontend_name[256];
46 :
47 : INT max_bytes_per_sec = 0;
48 : INT optimize = 0; /* set this to one to opimize TCP buffer size */
49 : INT fe_stop = 0; /* stop switch for VxWorks */
50 : BOOL debug = 0; /* disable watchdog messages from server */
51 : DWORD auto_restart = 0; /* restart run after event limit reached stop */
52 : INT manual_trigger_event_id = 0; /* set from the manual_trigger callback */
53 : static INT frontend_index = -1; /* frontend index for event building */
54 : INT verbosity_level = 0; /* can be used by user code for debugging output */
55 : BOOL lockout_readout_thread = TRUE; /* manual triggers, periodic events and 1Hz flush cache lockout the readout thread */
56 : BOOL mfe_debug = FALSE;
57 :
58 : HNDLE hDB = 0;
59 : HNDLE hClient = 0;
60 :
61 : EQUIPMENT *interrupt_eq = NULL;
62 : EQUIPMENT *multithread_eq = NULL;
63 : EQUIPMENT *polled_eq = NULL;
64 : BOOL slowcont_eq = FALSE;
65 : void *event_buffer = NULL;
66 : void *frag_buffer = NULL;
67 :
68 : int *n_events = NULL;
69 :
70 : /* inter-thread communication */
71 : static int rbh[MAX_N_THREADS];
72 : std::atomic<bool> _stop_all_threads(false);
73 : static int _readout_thread(void *param);
74 : static volatile int readout_thread_active[MAX_N_THREADS];
75 :
76 : void mfe_error_check(void);
77 :
78 : static int send_event(INT idx, BOOL manual_trig);
79 : static int receive_trigger_event(EQUIPMENT *eq);
80 : static void send_all_periodic_events(INT transition);
81 : static void interrupt_routine(void);
82 : void readout_enable(bool flag);
83 : bool readout_enabled(void);
84 : void display(BOOL bInit);
85 : void rotate_wheel(void);
86 : BOOL logger_root(void);
87 : static INT check_polled_events(void);
88 : static INT check_user_events(void);
89 : static int flush_user_events(void);
90 :
91 : /*------------------------------------------------------------------*/
92 :
93 0 : void set_rate_period(int ms) {
94 0 : rate_period = ms;
95 0 : }
96 :
97 0 : int get_rate_period() {
98 0 : return rate_period;
99 : }
100 :
101 : /*-- transition callbacks ------------------------------------------*/
102 :
103 : /*-- start ---------------------------------------------------------*/
104 :
105 0 : static INT tr_start(INT rn, char *error) {
106 : INT i, status;
107 :
108 : /* disable interrupts or readout thread
109 : * if somehow it was not stopped from previous run */
110 0 : readout_enable(FALSE);
111 :
112 : /* flush all buffers with EQ_USER events */
113 0 : flush_user_events();
114 :
115 : /* reset serial numbers and statistics */
116 0 : for (i = 0; equipment[i].name[0]; i++) {
117 0 : equipment[i].serial_number = 0;
118 0 : equipment[i].subevent_number = 0;
119 0 : equipment[i].events_collected = 0;
120 0 : equipment[i].stats.events_sent = 0;
121 0 : equipment[i].stats.events_per_sec = 0;
122 0 : equipment[i].stats.kbytes_per_sec = 0;
123 0 : equipment[i].odb_in = equipment[i].odb_out = 0;
124 0 : n_events[i] = 0;
125 : }
126 0 : db_send_changed_records();
127 :
128 0 : status = begin_of_run(rn, error);
129 :
130 0 : if (status == CM_SUCCESS) {
131 0 : run_state = STATE_RUNNING;
132 0 : run_number = rn;
133 :
134 0 : send_all_periodic_events(TR_START);
135 :
136 0 : if (display_period && !mfe_debug) {
137 0 : ss_printf(14, 2, "Running ");
138 0 : ss_printf(36, 2, "%d", rn);
139 : } else
140 0 : printf("Started run %d\n", rn);
141 :
142 : /* enable interrupts or readout thread */
143 0 : readout_enable(TRUE);
144 : }
145 :
146 0 : cm_set_client_run_state(run_state);
147 :
148 0 : return status;
149 : }
150 :
151 : /*-- prestop -------------------------------------------------------*/
152 :
153 0 : static INT tr_stop(INT rn, char *error) {
154 : INT status, i;
155 : EQUIPMENT *eq;
156 :
157 : /* disable interrupts or readout thread */
158 0 : readout_enable(FALSE);
159 :
160 0 : status = end_of_run(rn, error);
161 :
162 : /* allow last event to be sent from frontend thread */
163 0 : ss_sleep(100);
164 :
165 : /* check if event(s) happened just before disabling the trigger */
166 0 : check_polled_events();
167 0 : check_user_events();
168 :
169 0 : if (status == CM_SUCCESS) {
170 : /* don't send events if already stopped */
171 0 : if (run_state != STATE_STOPPED)
172 0 : send_all_periodic_events(TR_STOP);
173 :
174 0 : run_state = STATE_STOPPED;
175 0 : run_number = rn;
176 :
177 0 : if (display_period && !mfe_debug)
178 0 : ss_printf(14, 2, "Stopped ");
179 : else
180 0 : printf("Run stopped\n");
181 : } else
182 0 : readout_enable(TRUE);
183 :
184 0 : for (i = 0; equipment[i].name[0]; i++) {
185 : /* flush remaining buffered events */
186 0 : rpc_flush_event();
187 0 : if (equipment[i].buffer_handle) {
188 0 : INT err = bm_flush_cache(equipment[i].buffer_handle, BM_WAIT);
189 0 : if (err != BM_SUCCESS) {
190 0 : cm_msg(MERROR, "tr_stop", "bm_flush_cache(BM_WAIT) error %d", err);
191 0 : return err;
192 : }
193 : }
194 : }
195 :
196 : /* update final statistics record in ODB */
197 0 : for (i = 0; equipment[i].name[0]; i++) {
198 0 : eq = &equipment[i];
199 0 : eq->stats.events_sent += eq->events_sent;
200 0 : eq->stats.events_per_sec = 0;
201 0 : eq->stats.kbytes_per_sec = 0;
202 0 : eq->bytes_sent = 0;
203 0 : eq->events_sent = 0;
204 0 : n_events[i] = 0;
205 : }
206 :
207 0 : db_send_changed_records();
208 0 : cm_set_client_run_state(run_state);
209 0 : return status;
210 : }
211 :
212 : /*-- pause ---------------------------------------------------------*/
213 :
214 0 : static INT tr_pause(INT rn, char *error) {
215 : INT status;
216 :
217 : /* disable interrupts or readout thread */
218 0 : readout_enable(FALSE);
219 :
220 0 : status = pause_run(rn, error);
221 :
222 0 : if (status == CM_SUCCESS) {
223 0 : run_state = STATE_PAUSED;
224 0 : run_number = rn;
225 :
226 0 : send_all_periodic_events(TR_PAUSE);
227 :
228 0 : if (display_period && !mfe_debug)
229 0 : ss_printf(14, 2, "Paused ");
230 : else
231 0 : printf("Paused\n");
232 : } else
233 0 : readout_enable(TRUE);
234 :
235 0 : cm_set_client_run_state(run_state);
236 0 : return status;
237 : }
238 :
239 : /*-- resume --------------------------------------------------------*/
240 :
241 0 : static INT tr_resume(INT rn, char *error) {
242 : INT status;
243 :
244 0 : status = resume_run(rn, error);
245 :
246 0 : if (status == CM_SUCCESS) {
247 0 : run_state = STATE_RUNNING;
248 0 : run_number = rn;
249 :
250 0 : send_all_periodic_events(TR_RESUME);
251 :
252 0 : if (display_period && !mfe_debug)
253 0 : ss_printf(14, 2, "Running ");
254 : else
255 0 : printf("Running\n");
256 :
257 : /* enable interrupts or readout thread */
258 0 : readout_enable(TRUE);
259 : }
260 :
261 0 : cm_set_client_run_state(run_state);
262 0 : return status;
263 : }
264 :
265 : /*------------------------------------------------------------------*/
266 :
267 0 : INT manual_trigger(INT, void *prpc_param[]) {
268 0 : manual_trigger_event_id = CWORD(0);
269 0 : return SUCCESS;
270 : }
271 :
272 : /*------------------------------------------------------------------*/
273 :
274 0 : static void eq_common_watcher(INT hDB, INT, INT, void *info) {
275 : int status;
276 0 : assert(info != NULL);
277 0 : EQUIPMENT *eq = (EQUIPMENT *) info;
278 : HNDLE hCommon;
279 0 : std::string path;
280 0 : path += "/Equipment/";
281 0 : path += eq->name;
282 0 : path += "/Common";
283 0 : status = db_find_key(hDB, 0, path.c_str(), &hCommon);
284 0 : if (status != DB_SUCCESS)
285 0 : return;
286 0 : int size = sizeof(eq->info);
287 0 : status = db_get_record1(hDB, hCommon, &eq->info, &size, 0, EQUIPMENT_COMMON_STR);
288 0 : if (status != DB_SUCCESS) {
289 0 : cm_msg(MINFO, "eq_common_watcher", "db_get_record(%s) status %d", path.c_str(), status);
290 0 : return;
291 : }
292 0 : }
293 :
294 : /*------------------------------------------------------------------*/
295 :
296 0 : static INT register_equipment(void) {
297 : INT idx, size, status;
298 : char str[256];
299 : EQUIPMENT_INFO *eq_info;
300 : EQUIPMENT_STATS *eq_stats;
301 : HNDLE hKey;
302 : BANK_LIST *bank_list;
303 : DWORD dummy;
304 :
305 : /* get current ODB run state */
306 0 : size = sizeof(run_state);
307 0 : run_state = STATE_STOPPED;
308 0 : db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
309 0 : size = sizeof(run_number);
310 0 : run_number = 1;
311 0 : status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
312 0 : assert(status == SUCCESS);
313 :
314 : /* scan EQUIPMENT table from user frontend */
315 0 : for (idx = 0; equipment[idx].name[0]; idx++) {
316 0 : eq_info = &equipment[idx].info;
317 0 : eq_stats = &equipment[idx].stats;
318 :
319 0 : if (eq_info->event_id == 0) {
320 0 : printf("\nEvent ID 0 for %s not allowed\n", equipment[idx].name);
321 0 : cm_disconnect_experiment();
322 0 : ss_sleep(5000);
323 0 : exit(0);
324 : }
325 :
326 : /* init status */
327 0 : equipment[idx].status = FE_SUCCESS;
328 :
329 : #pragma GCC diagnostic push
330 : #pragma GCC diagnostic ignored "-Wformat-nonliteral" // need for snprintf below
331 :
332 : /* check for % in equipment (needed for event building) */
333 0 : if (frontend_index != -1) {
334 : /* modify equipment name to <name>xx where xx is the frontend index */
335 0 : if (strchr(equipment[idx].name, '%')) {
336 0 : snprintf(str, sizeof(str), equipment[idx].name, frontend_index);
337 0 : mstrlcpy(equipment[idx].name, str, sizeof(equipment[idx].name));
338 : }
339 :
340 : /* modify event buffer name to <name>xx where xx is the frontend index */
341 0 : if (strchr(eq_info->buffer, '%')) {
342 0 : snprintf(str, sizeof(str), eq_info->buffer, frontend_index);
343 0 : mstrlcpy(eq_info->buffer, str, sizeof(eq_info->buffer));
344 : }
345 : } else {
346 : /* stip %.. */
347 0 : if (strchr(equipment[idx].name, '%'))
348 0 : *strchr(equipment[idx].name, '%') = 0;
349 0 : if (strchr(eq_info->buffer, '%'))
350 0 : *strchr(eq_info->buffer, '%') = 0;
351 : }
352 : #pragma GCC diagnostic pop
353 :
354 : /* check for '${HOSTNAME}' in equipment name, replace with env var (needed for sysmon) */
355 0 : std::string name = equipment[idx].name;
356 0 : size_t namepos = name.find("${HOSTNAME}");
357 :
358 : /* if there is a ${HOSTNAME} ... */
359 0 : if (namepos != std::string::npos) {
360 : // Grab text before and after
361 0 : std::string before = name.substr(0, namepos);
362 0 : std::string after = name.substr(namepos + 11);//11 = length of "${HOSTNAME}"
363 : // Get local_host_name (ODB entry not set yet)
364 0 : std::string thishost = ss_gethostname();
365 : // cut the hostname string at the first '.'
366 0 : size_t p = thishost.find('.');
367 0 : thishost = thishost.substr(0, p == std::string::npos ? thishost.length() : p);
368 0 : name = before;
369 0 : name += thishost;
370 0 : name += after;
371 0 : if (name.length() >= 32) {
372 0 : cm_msg(MERROR, "equipment name too long",
373 : "Equipment name %s%s%s too long, trimming down to %d characters",
374 : before.c_str(),thishost.c_str(),after.c_str(),
375 : 32);
376 : }
377 0 : mstrlcpy(equipment[idx].name, name.c_str(), 32);
378 0 : printf("\t became:%s\n", equipment[idx].name);
379 0 : }
380 :
381 0 : sprintf(str, "/Equipment/%s/Common", equipment[idx].name);
382 :
383 0 : status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, FALSE);
384 0 : if (status == DB_NO_KEY) {
385 0 : db_create_record(hDB, 0, str, EQUIPMENT_COMMON_STR);
386 0 : db_find_key(hDB, 0, str, &hKey);
387 0 : status = db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
388 0 : if (status != DB_SUCCESS) {
389 0 : printf("ERROR: Cannot create equipment record \"%s\", db_set_record() status %d\n", str, status);
390 0 : cm_disconnect_experiment();
391 0 : ss_sleep(3000);
392 0 : return 0;
393 : }
394 0 : } else if (status == DB_STRUCT_MISMATCH) {
395 0 : cm_msg(MINFO, "register_equipment", "Correcting \"%s\", db_check_record() status %d", str, status);
396 0 : db_create_record(hDB, 0, str, EQUIPMENT_COMMON_STR);
397 0 : } else if (status != DB_SUCCESS) {
398 0 : printf("ERROR: Cannot check equipment record \"%s\", db_check_record() status %d\n", str, status);
399 0 : cm_disconnect_experiment();
400 0 : ss_sleep(3000);
401 0 : exit(0);
402 : }
403 :
404 0 : status = db_find_key(hDB, 0, str, &hKey);
405 0 : if (status != DB_SUCCESS) {
406 0 : printf("ERROR: Cannot find \"%s\", db_find_key() status %d", str, status);
407 0 : cm_disconnect_experiment();
408 0 : ss_sleep(3000);
409 0 : exit(0);
410 : }
411 :
412 0 : status = db_check_record(hDB, hKey, "", EQUIPMENT_COMMON_STR, TRUE);
413 0 : if (status != DB_SUCCESS) {
414 0 : printf("ERROR: Cannot check record \"%s\", db_check_record() status %d", str, status);
415 0 : cm_disconnect_experiment();
416 0 : ss_sleep(3000);
417 0 : exit(0);
418 : }
419 :
420 : /* set equipment Common from equipment[] list if flag is set in user frontend code */
421 0 : if (equipment_common_overwrite) {
422 : // do not overwrite "enabled" and "hidden" flags, these is always defined in the ODB
423 : BOOL prev_enabled;
424 : BOOL prev_hidden;
425 : double prev_event_limit;
426 : int size;
427 0 : size = sizeof(prev_enabled);
428 0 : db_get_value(hDB, hKey, "Enabled", &prev_enabled, &size, TID_BOOL, FALSE);
429 0 : size = sizeof(prev_hidden);
430 0 : db_get_value(hDB, hKey, "Hidden", &prev_hidden, &size, TID_BOOL, FALSE);
431 0 : size = sizeof(prev_event_limit);
432 0 : db_get_value(hDB, hKey, "Event limit", &prev_event_limit, &size, TID_DOUBLE, FALSE);
433 0 : status = db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
434 0 : if (status != DB_SUCCESS) {
435 0 : printf("ERROR: Cannot set record \"%s\", db_set_record() status %d", str, status);
436 0 : cm_disconnect_experiment();
437 0 : ss_sleep(3000);
438 0 : exit(0);
439 : }
440 0 : eq_info->enabled = prev_enabled;
441 0 : db_set_value(hDB, hKey, "Enabled", &prev_enabled, sizeof(prev_enabled), 1, TID_BOOL);
442 0 : eq_info->hidden = prev_hidden;
443 0 : db_set_value(hDB, hKey, "Hidden", &prev_hidden, sizeof(prev_hidden), 1, TID_BOOL);
444 0 : if ((eq_info->eq_type & EQ_SLOW) == 0) {
445 0 : eq_info->event_limit = prev_event_limit;
446 0 : db_set_value(hDB, hKey, "Event limit", &prev_event_limit, sizeof(prev_event_limit), 1, TID_DOUBLE);
447 : }
448 : } else {
449 0 : size = sizeof(EQUIPMENT_INFO);
450 0 : status = db_get_record(hDB, hKey, eq_info, &size, 0);
451 0 : if (status != DB_SUCCESS) {
452 0 : printf("ERROR: Cannot get record \"%s\", db_get_record() status %d", str, status);
453 0 : cm_disconnect_experiment();
454 0 : ss_sleep(3000);
455 0 : exit(0);
456 : }
457 : }
458 :
459 : /* open hot link to equipment info */
460 0 : status = db_watch(hDB, hKey, eq_common_watcher, &equipment[idx]);
461 0 : if (status != DB_SUCCESS) {
462 0 : printf("ERROR: Cannot hotlink \"%s\", db_watch() status %d", str, status);
463 0 : cm_disconnect_experiment();
464 0 : ss_sleep(3000);
465 0 : exit(0);
466 : }
467 :
468 0 : if (equal_ustring(eq_info->format, "FIXED"))
469 0 : equipment[idx].format = FORMAT_FIXED;
470 : else /* default format is MIDAS */
471 0 : equipment[idx].format = FORMAT_MIDAS;
472 :
473 0 : size = sizeof(str);
474 0 : status = db_get_value(hDB, hClient, "Host", str, &size, TID_STRING, FALSE);
475 0 : assert(status == DB_SUCCESS);
476 0 : mstrlcpy(eq_info->frontend_host, str, sizeof(eq_info->frontend_host));
477 0 : mstrlcpy(eq_info->frontend_name, full_frontend_name, sizeof(eq_info->frontend_name));
478 0 : mstrlcpy(eq_info->frontend_file_name, frontend_file_name, sizeof(eq_info->frontend_file_name));
479 0 : mstrlcpy(eq_info->status, full_frontend_name, sizeof(eq_info->status));
480 0 : mstrlcat(eq_info->status, "@", sizeof(eq_info->status));
481 0 : mstrlcat(eq_info->status, eq_info->frontend_host, sizeof(eq_info->status));
482 0 : mstrlcpy(eq_info->status_color, "greenLight", sizeof(eq_info->status_color));
483 :
484 : /* update variables in ODB */
485 0 : status = db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
486 :
487 0 : if (status != DB_SUCCESS) {
488 0 : cm_msg(MERROR, "register_equipment", "Cannot update equipment Common, db_set_record() status %d", status);
489 0 : return 0;
490 : }
491 :
492 : /* check for consistent common settings */
493 0 : if ((eq_info->read_on & RO_STOPPED) &&
494 0 : (eq_info->eq_type == EQ_POLLED ||
495 0 : eq_info->eq_type == EQ_INTERRUPT ||
496 0 : eq_info->eq_type == EQ_MULTITHREAD ||
497 0 : eq_info->eq_type == EQ_USER)) {
498 0 : cm_msg(MERROR, "register_equipment", "Equipment \"%s\" contains RO_STOPPED or RO_ALWAYS. This can lead to undesired side-effect and should be removed.", equipment[idx].name);
499 : }
500 :
501 : /*---- Create variables record ---------------------------------*/
502 :
503 0 : sprintf(str, "/Equipment/%s/Variables", equipment[idx].name);
504 0 : if (equipment[idx].event_descrip) {
505 0 : if (equipment[idx].format == FORMAT_FIXED)
506 0 : db_check_record(hDB, 0, str, (char *) equipment[idx].event_descrip, TRUE);
507 : else {
508 : /* create bank descriptions */
509 0 : bank_list = (BANK_LIST *) equipment[idx].event_descrip;
510 :
511 0 : for (; bank_list->name[0]; bank_list++) {
512 : /* mabye needed later...
513 : if (bank_list->output_flag == 0)
514 : continue;
515 : */
516 :
517 0 : if (bank_list->type == TID_STRUCT) {
518 0 : sprintf(str, "/Equipment/%s/Variables/%s", equipment[idx].name,
519 0 : bank_list->name);
520 0 : status = db_check_record(hDB, 0, str, strcomb1((const char **) bank_list->init_str).c_str(), TRUE);
521 0 : if (status != DB_SUCCESS) {
522 0 : printf("Cannot check/create record \"%s\", status = %d\n", str,
523 : status);
524 0 : ss_sleep(3000);
525 : }
526 : } else {
527 0 : sprintf(str, "/Equipment/%s/Variables/%s", equipment[idx].name,
528 0 : bank_list->name);
529 0 : dummy = 0;
530 0 : db_set_value(hDB, 0, str, &dummy, rpc_tid_size(bank_list->type), 1,
531 0 : bank_list->type);
532 : }
533 : }
534 : }
535 : } else
536 0 : db_create_key(hDB, 0, str, TID_KEY);
537 :
538 0 : sprintf(str, "/Equipment/%s/Variables", equipment[idx].name);
539 0 : db_find_key(hDB, 0, str, &hKey);
540 0 : equipment[idx].hkey_variables = hKey;
541 :
542 : /*---- Create and initialize statistics tree -------------------*/
543 :
544 0 : sprintf(str, "/Equipment/%s/Statistics", equipment[idx].name);
545 :
546 0 : status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
547 0 : if (status != DB_SUCCESS) {
548 0 : printf("Cannot create/check statistics record \'%s\', error %d\n", str, status);
549 0 : ss_sleep(3000);
550 : }
551 :
552 0 : status = db_find_key(hDB, 0, str, &hKey);
553 0 : if (status != DB_SUCCESS) {
554 0 : printf("Cannot find statistics record \'%s\', error %d\n", str, status);
555 0 : ss_sleep(3000);
556 : }
557 :
558 0 : eq_stats->events_sent = 0;
559 0 : eq_stats->events_per_sec = 0;
560 0 : eq_stats->kbytes_per_sec = 0;
561 :
562 : /* open hot link to statistics tree */
563 0 : status = db_open_record1(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL, EQUIPMENT_STATISTICS_STR);
564 0 : if (status == DB_NO_ACCESS) {
565 : /* record is probably still in exclusive access by dead FE, so reset it */
566 0 : status = db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE);
567 0 : if (status != DB_SUCCESS)
568 0 : cm_msg(MERROR, "register_equipment",
569 : "Cannot change access mode for record \'%s\', error %d", str, status);
570 : else
571 0 : cm_msg(MINFO, "register_equipment", "Recovered access mode for record \'%s\'", str);
572 0 : status = db_open_record1(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL, EQUIPMENT_STATISTICS_STR);
573 : }
574 0 : if (status != DB_SUCCESS) {
575 0 : cm_msg(MERROR, "register_equipment",
576 : "Cannot open statistics record \'%s\', error %d", str, status);
577 0 : ss_sleep(3000);
578 : }
579 :
580 : /*---- open event buffer ---------------------------------------*/
581 :
582 : /* check for fragmented event */
583 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
584 0 : if (frag_buffer == NULL)
585 0 : frag_buffer = malloc(max_event_size_frag);
586 :
587 0 : if (frag_buffer == NULL) {
588 0 : cm_msg(MERROR, "register_equipment",
589 : "Not enough memory to allocate buffer for fragmented events");
590 0 : return SS_NO_MEMORY;
591 : }
592 : }
593 :
594 0 : if (eq_info->buffer[0]) {
595 0 : status = bm_open_buffer(eq_info->buffer, DEFAULT_BUFFER_SIZE, &equipment[idx].buffer_handle);
596 0 : if (status != BM_SUCCESS && status != BM_CREATED) {
597 0 : cm_msg(MERROR, "register_equipment", "Cannot open event buffer \"%s\" size %d, bm_open_buffer() status %d", eq_info->buffer, DEFAULT_BUFFER_SIZE, status);
598 0 : return 0;
599 : }
600 : } else
601 0 : equipment[idx].buffer_handle = 0;
602 0 : }
603 :
604 0 : n_events = (int *) calloc(sizeof(int), idx);
605 :
606 0 : return SUCCESS;
607 : }
608 :
609 : /*------------------------------------------------------------------*/
610 :
611 0 : static INT initialize_equipment(void) {
612 : INT idx, i, j, k, n;
613 : double count;
614 : char str[256];
615 : DWORD start_time, delta_time;
616 : EQUIPMENT_INFO *eq_info;
617 0 : BOOL manual_trig_flag = FALSE;
618 :
619 : /* scan EQUIPMENT table from user frontend */
620 0 : for (idx = 0; equipment[idx].name[0]; idx++) {
621 0 : eq_info = &equipment[idx].info;
622 :
623 : /*---- initialize interrupt events -----------------------------*/
624 :
625 0 : if (eq_info->eq_type & EQ_INTERRUPT) {
626 : /* install interrupt for interrupt events */
627 :
628 0 : for (i = 0; equipment[i].name[0]; i++)
629 0 : if (equipment[i].info.eq_type & EQ_POLLED) {
630 0 : equipment[idx].status = FE_ERR_DISABLED;
631 0 : cm_msg(MINFO, "initialize_equipment",
632 : "Interrupt readout cannot be combined with polled readout");
633 : }
634 :
635 0 : if (equipment[idx].status != FE_ERR_DISABLED) {
636 0 : if (eq_info->enabled) {
637 0 : interrupt_eq = &equipment[idx];
638 :
639 : /* create ring buffer for inter-thread data transfer */
640 0 : create_event_rb(0);
641 :
642 : /* establish interrupt handler */
643 0 : interrupt_configure(CMD_INTERRUPT_ATTACH, idx,
644 : (POINTER_T) interrupt_routine);
645 : } else {
646 0 : equipment[idx].status = FE_ERR_DISABLED;
647 0 : cm_msg(MINFO, "initialize_equipment",
648 : "Equipment %s disabled in frontend",
649 0 : equipment[idx].name);
650 : }
651 : }
652 : }
653 :
654 : /*---- evaluate polling count ----------------------------------*/
655 :
656 0 : if (eq_info->eq_type & (EQ_POLLED | EQ_MULTITHREAD)) {
657 0 : if (eq_info->eq_type & EQ_INTERRUPT) {
658 0 : if (eq_info->eq_type & EQ_POLLED)
659 0 : cm_msg(MERROR, "register_equipment",
660 : "Equipment \"%s\" cannot be of type EQ_INTERRUPT and EQ_POLLED at the same time",
661 0 : equipment[idx].name);
662 : else
663 0 : cm_msg(MERROR, "register_equipment",
664 : "Equipment \"%s\" cannot be of type EQ_INTERRUPT and EQ_MULTITHREAD at the same time",
665 0 : equipment[idx].name);
666 0 : return 0;
667 : }
668 :
669 0 : if (polled_eq) {
670 0 : equipment[idx].status = FE_ERR_DISABLED;
671 0 : cm_msg(MINFO, "initialize_equipment",
672 0 : "Defined more than one polled equipment \'%s\' in frontend \'%s\'", equipment[idx].name, frontend_name);
673 : } else
674 0 : polled_eq = &equipment[idx];
675 :
676 0 : if (display_period)
677 0 : printf("\nCalibrating");
678 :
679 0 : count = 1;
680 : do {
681 0 : if (display_period)
682 0 : printf(".");
683 :
684 0 : start_time = ss_millitime();
685 :
686 0 : poll_event(equipment[idx].info.source, (INT) count, TRUE);
687 :
688 0 : delta_time = ss_millitime() - start_time;
689 :
690 0 : if (count == 1 && delta_time > eq_info->period * 1.2) {
691 0 : cm_msg(MERROR, "register_equipment", "Polling routine with count=1 takes %d ms", delta_time);
692 0 : ss_sleep(3000);
693 0 : break;
694 : }
695 :
696 0 : if (delta_time > 0)
697 0 : count = count * eq_info->period / delta_time;
698 : else
699 0 : count *= 100;
700 :
701 : // avoid overflows
702 0 : if (count > 2147483647.0) {
703 0 : count = 2147483647.0;
704 0 : break;
705 : }
706 :
707 0 : } while (delta_time > eq_info->period * 1.2 || delta_time < eq_info->period * 0.8);
708 :
709 0 : equipment[idx].poll_count = (INT) count;
710 : }
711 :
712 : /*---- initialize multithread events -------------------------*/
713 :
714 0 : if (eq_info->eq_type & EQ_MULTITHREAD) {
715 : /* install interrupt for interrupt events */
716 :
717 0 : for (i = 0; equipment[i].name[0]; i++)
718 0 : if (equipment[i].info.eq_type & EQ_POLLED) {
719 0 : equipment[idx].status = FE_ERR_DISABLED;
720 0 : cm_msg(MINFO, "initialize_equipment",
721 0 : "Multi-threaded readout cannot be combined with polled readout for equipment \'%s\'", equipment[i].name);
722 : }
723 :
724 0 : if (equipment[idx].status != FE_ERR_DISABLED) {
725 0 : if (eq_info->enabled) {
726 0 : if (multithread_eq) {
727 0 : equipment[idx].status = FE_ERR_DISABLED;
728 0 : cm_msg(MINFO, "initialize_equipment",
729 0 : "Defined more than one equipment with multi-threaded readout for equipment \'%s\'", equipment[i].name);
730 : } else {
731 0 : multithread_eq = &equipment[idx];
732 :
733 : /* create ring buffer for inter-thread data transfer */
734 0 : create_event_rb(0);
735 :
736 : /* create hardware reading thread */
737 0 : readout_enable(FALSE);
738 0 : ss_thread_create(_readout_thread, multithread_eq);
739 : }
740 : } else {
741 0 : equipment[idx].status = FE_ERR_DISABLED;
742 0 : cm_msg(MINFO, "initialize_equipment",
743 : "Equipment %s disabled in frontend",
744 0 : equipment[idx].name);
745 : }
746 : }
747 : }
748 :
749 : /*---- initialize user events -------------------------------*/
750 :
751 0 : if (eq_info->eq_type & EQ_USER) {
752 0 : if (equipment[idx].status != FE_ERR_DISABLED) {
753 0 : if (!eq_info->enabled) {
754 0 : equipment[idx].status = FE_ERR_DISABLED;
755 0 : cm_msg(MINFO, "initialize_equipment",
756 : "Equipment %s disabled in frontend",
757 0 : equipment[idx].name);
758 : }
759 : }
760 : }
761 :
762 : /*---- initialize slow control equipment ---------------------*/
763 :
764 0 : if (eq_info->eq_type & EQ_SLOW) {
765 :
766 0 : set_equipment_status(equipment[idx].name, "Initializing...", "yellowLight");
767 :
768 0 : if (equipment[idx].driver == nullptr) {
769 0 : cm_msg(MERROR, "initialize_equipment", "Found slow control equipment \"%s\" with no device driver list, aborting",
770 0 : equipment[idx].name);
771 0 : return FE_ERR_DRIVER;
772 : }
773 :
774 : /* resolve duplicate device names */
775 0 : for (i = 0; equipment[idx].driver[i].name[0]; i++) {
776 0 : equipment[idx].driver[i].pequipment_name = new std::string(equipment[idx].name);
777 :
778 0 : for (j = i + 1; equipment[idx].driver[j].name[0]; j++)
779 0 : if (equal_ustring(equipment[idx].driver[i].name,
780 0 : equipment[idx].driver[j].name)) {
781 0 : strcpy(str, equipment[idx].driver[i].name);
782 0 : for (k = 0, n = 0; equipment[idx].driver[k].name[0]; k++)
783 0 : if (equal_ustring(str, equipment[idx].driver[k].name)) {
784 : //sprintf(equipment[idx].driver[k].name, "%s_%d", str, n++);
785 0 : mstrlcpy(equipment[idx].driver[k].name, str, sizeof(equipment[idx].driver[k].name));
786 0 : mstrlcat(equipment[idx].driver[k].name, "_", sizeof(equipment[idx].driver[k].name));
787 : char tmp[256];
788 0 : sprintf(tmp, "%d", n++);
789 0 : mstrlcat(equipment[idx].driver[k].name, tmp, sizeof(equipment[idx].driver[k].name));
790 : }
791 :
792 0 : break;
793 : }
794 : }
795 :
796 : /* loop over equipment list and call class driver's init method */
797 0 : if (eq_info->enabled) {
798 0 : equipment[idx].status = equipment[idx].cd(CMD_INIT, &equipment[idx]);
799 :
800 0 : if (equipment[idx].status == FE_SUCCESS)
801 0 : strcpy(str, "Ok");
802 0 : else if (equipment[idx].status == FE_ERR_HW)
803 0 : strcpy(str, "Hardware error");
804 0 : else if (equipment[idx].status == FE_ERR_ODB)
805 0 : strcpy(str, "ODB error");
806 0 : else if (equipment[idx].status == FE_ERR_DRIVER)
807 0 : strcpy(str, "Driver error");
808 0 : else if (equipment[idx].status == FE_PARTIALLY_DISABLED)
809 0 : strcpy(str, "Partially disabled");
810 : else
811 0 : strcpy(str, "Error");
812 :
813 0 : if (equipment[idx].status == FE_SUCCESS)
814 0 : set_equipment_status(equipment[idx].name, str, "greenLight");
815 0 : else if (equipment[idx].status == FE_PARTIALLY_DISABLED) {
816 0 : set_equipment_status(equipment[idx].name, str, "yellowGreenLight");
817 0 : cm_msg(MINFO, "initialize_equipment", "Equipment %s partially disabled", equipment[idx].name);
818 : } else {
819 0 : set_equipment_status(equipment[idx].name, str, "redLight");
820 0 : cm_msg(MERROR, "initialize_equipment", "Equipment %s disabled because of %s", equipment[idx].name, str);
821 : }
822 :
823 : } else {
824 0 : equipment[idx].status = FE_ERR_DISABLED;
825 0 : set_equipment_status(equipment[idx].name, "Disabled", "yellowLight");
826 : }
827 :
828 : /* remember that we have slowcontrol equipment (needed later for scheduler) */
829 0 : slowcont_eq = TRUE;
830 :
831 : /* let user read error messages */
832 0 : if (equipment[idx].status != FE_SUCCESS && equipment[idx].status != FE_ERR_DISABLED)
833 0 : ss_sleep(3000);
834 : }
835 :
836 : /*---- register callback for manual triggered events -----------*/
837 0 : if (eq_info->eq_type & EQ_MANUAL_TRIG) {
838 0 : if (!manual_trig_flag)
839 0 : cm_register_function(RPC_MANUAL_TRIG, manual_trigger);
840 :
841 0 : manual_trig_flag = TRUE;
842 : }
843 : }
844 :
845 : /* start threads after all equipment has been initialized */
846 0 : for (idx = 0; equipment[idx].name[0]; idx++) {
847 0 : eq_info = &equipment[idx].info;
848 :
849 0 : if (eq_info->eq_type & EQ_SLOW) {
850 0 : if (equipment[idx].status == FE_SUCCESS || equipment[idx].status == FE_PARTIALLY_DISABLED)
851 0 : equipment[idx].cd(CMD_START, &equipment[idx]); /* start threads for this equipment */
852 : }
853 : }
854 :
855 0 : if (slowcont_eq)
856 0 : cm_msg(MINFO, "initialize_equipment", "Slow control equipment initialized");
857 :
858 0 : return SUCCESS;
859 : }
860 :
861 : /*------------------------------------------------------------------*/
862 :
863 0 : int set_equipment_status(const char *name, const char *equipment_status, const char *status_class) {
864 : int status, idx;
865 : char str[256];
866 : HNDLE hKey;
867 :
868 0 : for (idx = 0; equipment[idx].name[0]; idx++)
869 0 : if (equal_ustring(equipment[idx].name, name))
870 0 : break;
871 :
872 0 : if (equal_ustring(equipment[idx].name, name)) {
873 0 : sprintf(str, "/Equipment/%s/Common", name);
874 0 : db_find_key(hDB, 0, str, &hKey);
875 0 : assert(hKey);
876 :
877 0 : mstrlcpy(str, equipment_status, sizeof(str));
878 0 : status = db_set_value(hDB, hKey, "Status", str, 256, 1, TID_STRING);
879 0 : assert(status == DB_SUCCESS);
880 0 : mstrlcpy(str, status_class, sizeof(str));
881 0 : status = db_set_value(hDB, hKey, "Status color", str, 32, 1, TID_STRING);
882 0 : assert(status == DB_SUCCESS);
883 : }
884 :
885 0 : return SUCCESS;
886 : }
887 :
888 : /*------------------------------------------------------------------*/
889 :
890 0 : static void update_odb(const EVENT_HEADER *pevent, HNDLE hKey, INT format) {
891 0 : cm_write_event_to_odb(hDB, hKey, pevent, format);
892 0 : }
893 :
894 : /*------------------------------------------------------------------*/
895 :
896 0 : static int send_event(INT idx, BOOL manual_trig) {
897 : EQUIPMENT_INFO *eq_info;
898 : EVENT_HEADER *pevent, *pfragment;
899 : char *pdata;
900 : unsigned char *pd;
901 : INT i, status;
902 : DWORD sent, size;
903 :
904 0 : eq_info = &equipment[idx].info;
905 :
906 : /* check for fragmented event */
907 0 : if (eq_info->eq_type & EQ_FRAGMENTED)
908 0 : pevent = (EVENT_HEADER *) frag_buffer;
909 : else
910 0 : pevent = (EVENT_HEADER *) event_buffer;
911 :
912 : /* compose MIDAS event header */
913 0 : pevent->event_id = eq_info->event_id;
914 0 : pevent->trigger_mask = eq_info->trigger_mask;
915 0 : pevent->data_size = (INT) manual_trig;
916 0 : pevent->time_stamp = ss_time();
917 0 : pevent->serial_number = equipment[idx].serial_number++;
918 :
919 0 : equipment[idx].last_called = ss_millitime();
920 :
921 : /* call user readout routine */
922 0 : *((EQUIPMENT **) (pevent + 1)) = &equipment[idx];
923 0 : pevent->data_size = equipment[idx].readout((char *) (pevent + 1), 0);
924 :
925 : /* send event */
926 0 : if (pevent->data_size) {
927 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
928 : /* fragment event */
929 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size_frag) {
930 0 : cm_msg(MERROR, "send_event",
931 : "Event size %ld larger than maximum size %d for frag. ev.",
932 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
933 : max_event_size_frag);
934 0 : return SS_NO_MEMORY;
935 : }
936 :
937 : /* compose fragments */
938 0 : pfragment = (EVENT_HEADER *) event_buffer;
939 :
940 : /* compose MIDAS event header */
941 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
942 0 : pfragment->event_id |= EVENTID_FRAG1;
943 :
944 : /* store total event size */
945 0 : pd = (unsigned char *) (pfragment + 1);
946 0 : size = pevent->data_size;
947 0 : for (i = 0; i < 4; i++) {
948 0 : pd[i] = (unsigned char) (size & 0xFF); /* little endian, please! */
949 0 : size >>= 8;
950 : }
951 :
952 0 : pfragment->data_size = sizeof(DWORD);
953 :
954 0 : pdata = (char *) (pevent + 1);
955 :
956 0 : for (i = 0, sent = 0; sent < pevent->data_size; i++) {
957 0 : if (i > 0) {
958 0 : pfragment = (EVENT_HEADER *) event_buffer;
959 :
960 : /* compose MIDAS event header */
961 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
962 0 : pfragment->event_id |= EVENTID_FRAG;
963 :
964 : /* copy portion of event */
965 0 : size = pevent->data_size - sent;
966 0 : if (size > max_event_size - sizeof(EVENT_HEADER))
967 0 : size = max_event_size - sizeof(EVENT_HEADER);
968 :
969 0 : memcpy(pfragment + 1, pdata, size);
970 0 : pfragment->data_size = size;
971 0 : sent += size;
972 0 : pdata += size;
973 : }
974 :
975 : /* send event to buffer */
976 0 : if (equipment[idx].buffer_handle) {
977 0 : status = rpc_send_event(equipment[idx].buffer_handle, pfragment,
978 0 : pfragment->data_size + sizeof(EVENT_HEADER), BM_WAIT, rpc_mode);
979 0 : if (status != RPC_SUCCESS) {
980 0 : cm_msg(MERROR, "send_event", "rpc_send_event(BM_WAIT) error %d", status);
981 0 : return status;
982 : }
983 :
984 : /* flush events from buffer */
985 0 : rpc_flush_event();
986 : }
987 : }
988 :
989 0 : if (equipment[idx].buffer_handle) {
990 : /* flush buffer cache on server side */
991 0 : status = bm_flush_cache(equipment[idx].buffer_handle, BM_WAIT);
992 0 : if (status != BM_SUCCESS) {
993 0 : cm_msg(MERROR, "send_event", "bm_flush_cache(BM_WAIT) error %d", status);
994 0 : return status;
995 : }
996 : }
997 : } else {
998 : /* send un-fragmented event */
999 :
1000 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1001 0 : cm_msg(MERROR, "send_event", "Event size %ld larger than maximum size %d",
1002 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)), max_event_size);
1003 0 : return SS_NO_MEMORY;
1004 : }
1005 :
1006 : /* send event to buffer */
1007 0 : if (equipment[idx].buffer_handle) {
1008 0 : status = rpc_send_event(equipment[idx].buffer_handle, pevent,
1009 0 : pevent->data_size + sizeof(EVENT_HEADER), BM_WAIT, rpc_mode);
1010 0 : if (status != BM_SUCCESS) {
1011 0 : cm_msg(MERROR, "send_event", "bm_send_event(BM_WAIT) error %d", status);
1012 0 : return status;
1013 : }
1014 0 : rpc_flush_event();
1015 0 : status = bm_flush_cache(equipment[idx].buffer_handle, BM_WAIT);
1016 0 : if (status != BM_SUCCESS) {
1017 0 : cm_msg(MERROR, "send_event", "bm_flush_cache(BM_WAIT) error %d", status);
1018 0 : return status;
1019 : }
1020 : }
1021 :
1022 : /* send event to ODB if RO_ODB flag is set */
1023 0 : if (eq_info->read_on & RO_ODB) {
1024 0 : update_odb(pevent, equipment[idx].hkey_variables, equipment[idx].format);
1025 0 : equipment[idx].odb_out++;
1026 : }
1027 : }
1028 :
1029 0 : equipment[idx].bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
1030 0 : equipment[idx].events_sent++;
1031 : } else
1032 0 : equipment[idx].serial_number--;
1033 :
1034 0 : for (i = 0; equipment[i].name[0]; i++)
1035 0 : if (equipment[i].buffer_handle) {
1036 0 : status = bm_flush_cache(equipment[i].buffer_handle, BM_WAIT);
1037 0 : if (status != BM_SUCCESS) {
1038 0 : cm_msg(MERROR, "send_event", "bm_flush_cache(BM_WAIT) error %d", status);
1039 0 : return status;
1040 : }
1041 : }
1042 :
1043 0 : return CM_SUCCESS;
1044 : }
1045 :
1046 : /*------------------------------------------------------------------*/
1047 :
1048 0 : static void send_all_periodic_events(INT transition) {
1049 : EQUIPMENT_INFO *eq_info;
1050 : INT i;
1051 :
1052 0 : for (i = 0; equipment[i].name[0]; i++) {
1053 0 : eq_info = &equipment[i].info;
1054 :
1055 0 : if (!eq_info->enabled || equipment[i].status != FE_SUCCESS)
1056 0 : continue;
1057 :
1058 0 : if (transition == TR_START && (eq_info->read_on & RO_BOR) == 0)
1059 0 : continue;
1060 0 : if (transition == TR_STOP && (eq_info->read_on & RO_EOR) == 0)
1061 0 : continue;
1062 0 : if (transition == TR_PAUSE && (eq_info->read_on & RO_PAUSE) == 0)
1063 0 : continue;
1064 0 : if (transition == TR_RESUME && (eq_info->read_on & RO_RESUME) == 0)
1065 0 : continue;
1066 :
1067 0 : send_event(i, FALSE);
1068 : }
1069 0 : }
1070 :
1071 : /*------------------------------------------------------------------*/
1072 :
1073 : static std::atomic<bool> _readout_enabled_flag(false);
1074 :
1075 0 : bool readout_enabled() {
1076 0 : return _readout_enabled_flag;
1077 : }
1078 :
1079 0 : void readout_enable(bool flag) {
1080 0 : _readout_enabled_flag = flag;
1081 :
1082 0 : if (interrupt_eq) {
1083 0 : if (flag)
1084 0 : interrupt_configure(CMD_INTERRUPT_ENABLE, 0, 0);
1085 : else
1086 0 : interrupt_configure(CMD_INTERRUPT_DISABLE, 0, 0);
1087 : }
1088 0 : }
1089 :
1090 : /*------------------------------------------------------------------*/
1091 :
1092 0 : static void interrupt_routine(void) {
1093 : int status;
1094 : EVENT_HEADER *pevent;
1095 : void *p;
1096 :
1097 : /* get pointer for upcoming event.
1098 : This is a blocking call if no space available */
1099 0 : status = rb_get_wp(get_event_rbh(0), &p, 100000);
1100 :
1101 0 : if (status == DB_SUCCESS) {
1102 0 : pevent = (EVENT_HEADER *) p;
1103 :
1104 : /* compose MIDAS event header */
1105 0 : pevent->event_id = interrupt_eq->info.event_id;
1106 0 : pevent->trigger_mask = interrupt_eq->info.trigger_mask;
1107 0 : pevent->data_size = 0;
1108 0 : pevent->time_stamp = actual_time;
1109 0 : pevent->serial_number = interrupt_eq->serial_number++;
1110 :
1111 : /* call user readout routine */
1112 0 : pevent->data_size = interrupt_eq->readout((char *) (pevent + 1), 0);
1113 :
1114 : /* send event */
1115 0 : if (pevent->data_size) {
1116 :
1117 : /* put event into ring buffer */
1118 0 : rb_increment_wp(get_event_rbh(0), sizeof(EVENT_HEADER) + pevent->data_size);
1119 :
1120 : } else
1121 0 : interrupt_eq->serial_number--;
1122 : }
1123 0 : }
1124 :
1125 : /*------------------------------------------------------------------*/
1126 :
1127 : /* routines to be called from user code */
1128 :
1129 0 : int create_event_rb(int i) {
1130 : int status;
1131 :
1132 0 : assert(i < MAX_N_THREADS);
1133 0 : assert(rbh[i] == 0);
1134 0 : status = rb_create(event_buffer_size, max_event_size, &rbh[i]);
1135 0 : assert(status == DB_SUCCESS);
1136 0 : return rbh[i];
1137 : }
1138 :
1139 0 : int get_event_rbh(int i) {
1140 0 : return rbh[i];
1141 : }
1142 :
1143 0 : void stop_readout_threads() {
1144 0 : _stop_all_threads = true;
1145 0 : }
1146 :
1147 0 : bool is_readout_thread_enabled() {
1148 0 : return !_stop_all_threads;
1149 : }
1150 :
1151 0 : int is_readout_thread_active() {
1152 : int i;
1153 0 : for (i = 0; i < MAX_N_THREADS; i++)
1154 0 : if (readout_thread_active[i])
1155 0 : return TRUE;
1156 0 : return FALSE;
1157 : }
1158 :
1159 0 : void signal_readout_thread_active(int index, int flag) {
1160 0 : readout_thread_active[index] = flag;
1161 0 : }
1162 :
1163 : /*------------------------------------------------------------------*/
1164 :
1165 0 : static int _readout_thread(void *param) {
1166 : int status, source;
1167 : EVENT_HEADER *pevent;
1168 : void *p;
1169 :
1170 : /* indicate activity to framework */
1171 0 : signal_readout_thread_active(0, 1);
1172 :
1173 0 : p = param; /* avoid compiler warning */
1174 0 : while (!_stop_all_threads) {
1175 : /* obtain buffer space */
1176 :
1177 0 : status = rb_get_wp(get_event_rbh(0), &p, 0);
1178 0 : if (_stop_all_threads)
1179 0 : break;
1180 0 : if (status == DB_TIMEOUT) {
1181 : // printf("readout_thread: Ring buffer is full, waiting for space!\n");
1182 0 : ss_sleep(10);
1183 0 : continue;
1184 : }
1185 0 : if (status != DB_SUCCESS)
1186 0 : break;
1187 :
1188 0 : if (readout_enabled()) {
1189 :
1190 : /* check for new event */
1191 0 : source = poll_event(multithread_eq->info.source, multithread_eq->poll_count, FALSE);
1192 :
1193 0 : if (source > 0) {
1194 :
1195 0 : if (_stop_all_threads)
1196 0 : break;
1197 :
1198 0 : pevent = (EVENT_HEADER *) p;
1199 : /* put source at beginning of event, will be overwritten by
1200 : user readout code, just a special feature used by some
1201 : multi-source applications */
1202 0 : *(INT *) (pevent + 1) = source;
1203 :
1204 : /* compose MIDAS event header */
1205 0 : pevent->event_id = multithread_eq->info.event_id;
1206 0 : pevent->trigger_mask = multithread_eq->info.trigger_mask;
1207 0 : pevent->data_size = 0;
1208 0 : pevent->time_stamp = actual_time;
1209 0 : pevent->serial_number = multithread_eq->serial_number++;
1210 :
1211 : /* call user readout routine */
1212 0 : pevent->data_size = multithread_eq->readout((char *) (pevent + 1), 0);
1213 :
1214 : /* check event size */
1215 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1216 0 : cm_msg(MERROR, "readout_thread",
1217 : "Event size %ld larger than maximum size %d",
1218 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1219 : max_event_size);
1220 0 : assert(FALSE);
1221 : }
1222 :
1223 0 : if (pevent->data_size > 0) {
1224 : /* put event into ring buffer */
1225 0 : rb_increment_wp(get_event_rbh(0), sizeof(EVENT_HEADER) + pevent->data_size);
1226 : } else
1227 0 : multithread_eq->serial_number--;
1228 : }
1229 :
1230 : } else // readout_enabled
1231 0 : ss_sleep(10);
1232 : }
1233 :
1234 0 : signal_readout_thread_active(0, 0);
1235 :
1236 0 : return 0;
1237 : }
1238 :
1239 : /*-- Receive event from readout thread or interrupt routine --------*/
1240 :
1241 0 : static int receive_trigger_event(EQUIPMENT *eq) {
1242 : int index, status;
1243 0 : EVENT_HEADER *prb = NULL, *pevent;
1244 : void *p;
1245 : static unsigned int last_event_time = 0;
1246 : static unsigned int last_error = 0;
1247 0 : unsigned int last_serial = 0;
1248 :
1249 0 : unsigned int serial = eq->events_collected;
1250 0 : if (serial == 0)
1251 0 : last_serial = 0; // BOR
1252 :
1253 : // search all ring buffers for next event
1254 0 : status = 0;
1255 0 : for (index = 0; get_event_rbh(index); index++) {
1256 0 : status = rb_get_rp(get_event_rbh(index), &p, 10);
1257 0 : prb = (EVENT_HEADER *) p;
1258 0 : if (status == DB_SUCCESS)
1259 0 : if (prb->serial_number > last_serial)
1260 0 : last_serial = prb->serial_number;
1261 0 : if (status == DB_SUCCESS && prb->serial_number == serial)
1262 0 : break;
1263 : }
1264 :
1265 0 : if (get_event_rbh(index) == 0) {
1266 0 : if (serial > 0 && last_serial > serial &&
1267 0 : last_event_time > 0 && ss_millitime() > last_event_time + 5000) {
1268 0 : if (ss_time() - last_error > 30) {
1269 0 : last_error = ss_time();
1270 0 : cm_msg(MERROR, "receive_trigger_event",
1271 : "Event collector: waiting for event serial %d since %1.1lf seconds, received already serial %d",
1272 0 : serial, (ss_millitime() - last_event_time) / 1000.0, last_serial);
1273 : }
1274 : }
1275 0 : return 0;
1276 : }
1277 :
1278 0 : last_event_time = ss_millitime();
1279 0 : pevent = prb;
1280 :
1281 : /* send event */
1282 0 : if (pevent->data_size) {
1283 0 : if (eq->buffer_handle) {
1284 :
1285 : /* save event in temporary buffer to push it to the ODB later */
1286 0 : if (eq->info.read_on & RO_ODB)
1287 0 : memcpy(event_buffer, pevent, pevent->data_size + sizeof(EVENT_HEADER));
1288 :
1289 : /* send first event to ODB if logger writes in root format */
1290 0 : if (pevent->serial_number == 0)
1291 0 : if (logger_root())
1292 0 : update_odb(pevent, eq->hkey_variables, eq->format);
1293 :
1294 0 : status = rpc_send_event(eq->buffer_handle, pevent,
1295 0 : pevent->data_size + sizeof(EVENT_HEADER),
1296 : BM_WAIT, rpc_mode);
1297 :
1298 0 : if (status != SUCCESS) {
1299 0 : cm_msg(MERROR, "receive_trigger_event", "rpc_send_event error %d", status);
1300 0 : return -1;
1301 : }
1302 :
1303 0 : eq->bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
1304 :
1305 0 : if (eq->info.num_subevents)
1306 0 : eq->events_sent += eq->subevent_number;
1307 : else
1308 0 : eq->events_sent++;
1309 :
1310 0 : eq->events_collected++;
1311 :
1312 0 : rotate_wheel();
1313 : }
1314 : }
1315 :
1316 0 : rb_increment_rp(get_event_rbh(index), sizeof(EVENT_HEADER) + prb->data_size);
1317 0 : return prb->data_size;
1318 : }
1319 :
1320 : /*------------------------------------------------------------------*/
1321 :
1322 0 : static int flush_user_events() {
1323 : int index, status;
1324 : EVENT_HEADER *pevent;
1325 : void *p;
1326 :
1327 0 : for (int idx = 0; equipment[idx].name[0]; idx++) {
1328 0 : EQUIPMENT *eq = &equipment[idx];
1329 :
1330 0 : if (eq->info.eq_type == EQ_USER) {
1331 0 : for (index = 0; get_event_rbh(index); index++) {
1332 : do {
1333 0 : status = rb_get_rp(get_event_rbh(index), &p, 10);
1334 0 : pevent = (EVENT_HEADER *) p;
1335 0 : if (status == DB_SUCCESS) {
1336 0 : rb_increment_rp(get_event_rbh(index), sizeof(EVENT_HEADER) + pevent->data_size);
1337 : }
1338 0 : } while (status == DB_SUCCESS);
1339 : }
1340 : }
1341 : }
1342 :
1343 0 : return FE_SUCCESS;
1344 : }
1345 :
1346 : /*------------------------------------------------------------------*/
1347 :
1348 0 : static int message_print(const char *msg) {
1349 : char str[160];
1350 :
1351 0 : memset(str, ' ', 159);
1352 0 : str[159] = 0;
1353 :
1354 0 : if (msg[0] == '[')
1355 0 : msg = strchr(msg, ']') + 2;
1356 :
1357 0 : memcpy(str, msg, strlen(msg));
1358 0 : ss_printf(0, 20, str);
1359 :
1360 0 : return 0;
1361 : }
1362 :
1363 : /*------------------------------------------------------------------*/
1364 :
1365 0 : void display(BOOL bInit) {
1366 : INT i, status;
1367 : time_t full_time;
1368 : char str[30];
1369 :
1370 0 : if (bInit) {
1371 0 : ss_clear_screen();
1372 :
1373 0 : if (host_name[0])
1374 0 : strcpy(str, host_name);
1375 : else
1376 0 : strcpy(str, "<local>");
1377 :
1378 0 : ss_printf(0, 0, "%s connected to %s. Press \"!\" to exit", full_frontend_name, str);
1379 0 : ss_printf(0, 1,
1380 : "================================================================================");
1381 0 : ss_printf(0, 2, "Run status: %s",
1382 0 : run_state == STATE_STOPPED ? "Stopped" : run_state == STATE_RUNNING ? "Running"
1383 : : "Paused");
1384 0 : ss_printf(25, 2, "Run number %d ", run_number);
1385 0 : ss_printf(0, 3,
1386 : "================================================================================");
1387 0 : ss_printf(0, 4,
1388 : "Equipment Status Events Events/sec Rate[B/s] ODB->FE FE->ODB");
1389 0 : ss_printf(0, 5,
1390 : "--------------------------------------------------------------------------------");
1391 0 : for (i = 0; equipment[i].name[0]; i++)
1392 0 : ss_printf(0, i + 6, "%s", equipment[i].name);
1393 : }
1394 :
1395 : /* display time */
1396 0 : time(&full_time);
1397 : char ctimebuf[32];
1398 0 : ctime_r(&full_time, ctimebuf);
1399 0 : mstrlcpy(str, ctimebuf + 11, sizeof(str));
1400 0 : str[8] = 0;
1401 0 : ss_printf(72, 0, "%s", str);
1402 :
1403 0 : for (i = 0; equipment[i].name[0]; i++) {
1404 0 : status = equipment[i].status;
1405 :
1406 0 : if ((status == 0 || status == FE_SUCCESS) && equipment[i].info.enabled)
1407 0 : ss_printf(14, i + 6, "OK ");
1408 0 : else if (!equipment[i].info.enabled)
1409 0 : ss_printf(14, i + 6, "Disabled ");
1410 0 : else if (status == FE_ERR_ODB)
1411 0 : ss_printf(14, i + 6, "ODB Error");
1412 0 : else if (status == FE_ERR_HW)
1413 0 : ss_printf(14, i + 6, "HW Error ");
1414 0 : else if (status == FE_ERR_DISABLED)
1415 0 : ss_printf(14, i + 6, "Disabled ");
1416 0 : else if (status == FE_ERR_DRIVER)
1417 0 : ss_printf(14, i + 6, "Driver err");
1418 : else
1419 0 : ss_printf(14, i + 6, "Unknown ");
1420 :
1421 0 : if (equipment[i].stats.events_sent > 1E9)
1422 0 : ss_printf(25, i + 6, "%1.3lfG ", equipment[i].stats.events_sent / 1E9);
1423 0 : else if (equipment[i].stats.events_sent > 1E6)
1424 0 : ss_printf(25, i + 6, "%1.3lfM ", equipment[i].stats.events_sent / 1E6);
1425 : else
1426 0 : ss_printf(25, i + 6, "%1.0lf ", equipment[i].stats.events_sent);
1427 :
1428 0 : if (equipment[i].stats.events_per_sec > 1E6)
1429 0 : ss_printf(36, i + 6, "%1.3lfM ", equipment[i].stats.events_per_sec / 1E6);
1430 0 : else if (equipment[i].stats.events_per_sec > 1E3)
1431 0 : ss_printf(36, i + 6, "%1.3lfk ", equipment[i].stats.events_per_sec / 1E3);
1432 : else
1433 0 : ss_printf(36, i + 6, "%1.1lf ", equipment[i].stats.events_per_sec);
1434 :
1435 0 : if (equipment[i].stats.kbytes_per_sec > 1E3)
1436 0 : ss_printf(47, i + 6, "%1.3lfM ", equipment[i].stats.kbytes_per_sec / 1E3);
1437 0 : else if (equipment[i].stats.kbytes_per_sec < 1E3)
1438 0 : ss_printf(47, i + 6, "%1.1lf ", equipment[i].stats.kbytes_per_sec * 1E3);
1439 : else
1440 0 : ss_printf(47, i + 6, "%1.3lfk ", equipment[i].stats.kbytes_per_sec);
1441 :
1442 0 : ss_printf(58, i + 6, "%ld ", equipment[i].odb_in);
1443 0 : ss_printf(69, i + 6, "%ld ", equipment[i].odb_out);
1444 : }
1445 :
1446 : /* go to next line */
1447 0 : ss_printf(0, i + 6, "");
1448 0 : }
1449 :
1450 : /*------------------------------------------------------------------*/
1451 :
1452 0 : void display_inline() {
1453 : INT i;
1454 : time_t full_time;
1455 : char str[30];
1456 :
1457 : /* display time */
1458 0 : time(&full_time);
1459 : char ctimebuf[32];
1460 0 : ctime_r(&full_time, ctimebuf);
1461 0 : mstrlcpy(str, ctimebuf + 11, sizeof(str));
1462 0 : str[8] = 0;
1463 0 : printf("%s ", str);
1464 :
1465 0 : for (i = 0; equipment[i].name[0]; i++) {
1466 0 : printf(" %s:", equipment[i].name);
1467 :
1468 0 : if (equipment[i].stats.events_per_sec > 1E6)
1469 0 : printf("%6.3lfM", equipment[i].stats.events_per_sec / 1E6);
1470 0 : else if (equipment[i].stats.events_per_sec > 1E3)
1471 0 : printf("%6.3lfk", equipment[i].stats.events_per_sec / 1E3);
1472 : else
1473 0 : printf("%6.1lf ", equipment[i].stats.events_per_sec);
1474 : }
1475 :
1476 : /* go to next line */
1477 0 : printf("\n");
1478 0 : }
1479 :
1480 : /*------------------------------------------------------------------*/
1481 :
1482 0 : void rotate_wheel(void) {
1483 : static DWORD last_wheel = 0, wheel_index = 0;
1484 : static char wheel_char[] = {'-', '\\', '|', '/'};
1485 :
1486 0 : if (display_period && !mfe_debug) {
1487 0 : if (ss_millitime() - last_wheel > 300) {
1488 0 : last_wheel = ss_millitime();
1489 0 : ss_printf(79, 2, "%c", wheel_char[wheel_index]);
1490 0 : wheel_index = (wheel_index + 1) % 4;
1491 : }
1492 : }
1493 0 : }
1494 :
1495 : /*------------------------------------------------------------------*/
1496 :
1497 0 : BOOL logger_root()
1498 : /* check if logger uses ROOT format */
1499 : {
1500 : int size, i, status;
1501 : char str[80];
1502 : HNDLE hKeyRoot, hKey;
1503 :
1504 0 : if (db_find_key(hDB, 0, "/Logger/Channels", &hKeyRoot) == DB_SUCCESS) {
1505 0 : for (i = 0;; i++) {
1506 0 : status = db_enum_key(hDB, hKeyRoot, i, &hKey);
1507 0 : if (status == DB_NO_MORE_SUBKEYS)
1508 0 : break;
1509 :
1510 0 : strcpy(str, "MIDAS");
1511 0 : size = sizeof(str);
1512 0 : db_get_value(hDB, hKey, "Settings/Format", str, &size, TID_STRING, TRUE);
1513 :
1514 0 : if (equal_ustring(str, "ROOT"))
1515 0 : return TRUE;
1516 : }
1517 : }
1518 :
1519 0 : return FALSE;
1520 : }
1521 :
1522 : /*------------------------------------------------------------------*/
1523 :
1524 0 : static INT check_polled_events(void) {
1525 : EQUIPMENT_INFO *eq_info;
1526 : EQUIPMENT *eq;
1527 : EVENT_HEADER *pevent, *pfragment;
1528 : DWORD readout_start, sent, size;
1529 : INT i, idx, source, events_sent, status;
1530 : char *pdata;
1531 : unsigned char *pd;
1532 :
1533 0 : events_sent = 0;
1534 0 : actual_millitime = ss_millitime();
1535 :
1536 : /*---- loop over equipment table -------------------------------*/
1537 0 : for (idx = 0;; idx++) {
1538 0 : eq = &equipment[idx];
1539 0 : eq_info = &eq->info;
1540 :
1541 : /* check if end of equipment list */
1542 0 : if (!eq->name[0])
1543 0 : break;
1544 :
1545 0 : if (!eq_info->enabled)
1546 0 : continue;
1547 :
1548 0 : if (eq->status != FE_SUCCESS)
1549 0 : continue;
1550 :
1551 0 : if ((eq_info->eq_type & EQ_POLLED) == 0)
1552 0 : continue;
1553 :
1554 : /*---- check polled events ----*/
1555 0 : readout_start = actual_millitime;
1556 0 : pevent = NULL;
1557 :
1558 0 : while ((source = poll_event(eq_info->source, eq->poll_count, FALSE)) > 0) {
1559 :
1560 0 : if (eq_info->eq_type & EQ_FRAGMENTED)
1561 0 : pevent = (EVENT_HEADER *) frag_buffer;
1562 : else
1563 0 : pevent = (EVENT_HEADER *) event_buffer;
1564 :
1565 : /* compose MIDAS event header */
1566 0 : pevent->event_id = eq_info->event_id;
1567 0 : pevent->trigger_mask = eq_info->trigger_mask;
1568 0 : pevent->data_size = 0;
1569 0 : pevent->time_stamp = actual_time;
1570 0 : pevent->serial_number = eq->serial_number;
1571 :
1572 : /* put source at beginning of event, will be overwritten by
1573 : user readout code, just a special feature used by some
1574 : multi-source applications */
1575 0 : *(INT *) (pevent + 1) = source;
1576 :
1577 0 : if (eq->info.num_subevents) {
1578 0 : eq->subevent_number = 0;
1579 : do {
1580 0 : *(INT *) ((char *) (pevent + 1) + pevent->data_size) = source;
1581 :
1582 : /* call user readout routine for subevent indicating offset */
1583 0 : size = eq->readout((char *) (pevent + 1), pevent->data_size);
1584 0 : pevent->data_size += size;
1585 0 : if (size > 0) {
1586 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1587 0 : cm_msg(MERROR, "check_polled_events",
1588 : "Event size %ld larger than maximum size %d",
1589 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1590 : max_event_size);
1591 : }
1592 :
1593 0 : eq->subevent_number++;
1594 0 : eq->serial_number++;
1595 : }
1596 :
1597 : /* wait for next event */
1598 : do {
1599 0 : source = poll_event(eq_info->source, eq->poll_count, FALSE);
1600 :
1601 0 : if (source == FALSE) {
1602 0 : actual_millitime = ss_millitime();
1603 :
1604 : /* repeat no more than period */
1605 0 : if (actual_millitime - readout_start > (DWORD) eq_info->period)
1606 0 : break;
1607 : }
1608 0 : } while (source == FALSE);
1609 :
1610 0 : } while (eq->subevent_number < eq->info.num_subevents && source);
1611 :
1612 : /* notify readout routine about end of super-event */
1613 0 : pevent->data_size = eq->readout((char *) (pevent + 1), -1);
1614 : } else {
1615 : /* call user readout routine indicating event source */
1616 0 : pevent->data_size = eq->readout((char *) (pevent + 1), 0);
1617 :
1618 : /* check event size */
1619 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
1620 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size_frag) {
1621 0 : cm_msg(MERROR, "check_polled_events",
1622 : "Event size %ld larger than maximum size %d for frag. ev.",
1623 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1624 : max_event_size_frag);
1625 0 : assert(FALSE);
1626 : }
1627 : } else {
1628 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1629 0 : cm_msg(MERROR, "check_polled_events",
1630 : "Event size %ld larger than maximum size %d",
1631 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1632 : max_event_size);
1633 0 : assert(FALSE);
1634 : }
1635 : }
1636 :
1637 : /* increment serial number if event read out sucessfully */
1638 0 : if (pevent->data_size)
1639 0 : eq->serial_number++;
1640 : }
1641 :
1642 : /* send event */
1643 0 : if (pevent->data_size) {
1644 :
1645 : /* check for fragmented event */
1646 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
1647 : /* compose fragments */
1648 0 : pfragment = (EVENT_HEADER *) event_buffer;
1649 :
1650 : /* compose MIDAS event header */
1651 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
1652 0 : pfragment->event_id |= EVENTID_FRAG1;
1653 :
1654 : /* store total event size */
1655 0 : pd = (unsigned char *) (pfragment + 1);
1656 0 : size = pevent->data_size;
1657 0 : for (i = 0; i < 4; i++) {
1658 0 : pd[i] = (unsigned char) (size & 0xFF); /* little endian, please! */
1659 0 : size >>= 8;
1660 : }
1661 :
1662 0 : pfragment->data_size = sizeof(DWORD);
1663 :
1664 0 : pdata = (char *) (pevent + 1);
1665 :
1666 0 : for (i = 0, sent = 0; sent < pevent->data_size; i++) {
1667 0 : if (i > 0) {
1668 0 : pfragment = (EVENT_HEADER *) event_buffer;
1669 :
1670 : /* compose MIDAS event header */
1671 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
1672 0 : pfragment->event_id |= EVENTID_FRAG;
1673 :
1674 : /* copy portion of event */
1675 0 : size = pevent->data_size - sent;
1676 0 : if (size > max_event_size - sizeof(EVENT_HEADER))
1677 0 : size = max_event_size - sizeof(EVENT_HEADER);
1678 :
1679 0 : memcpy(pfragment + 1, pdata, size);
1680 0 : pfragment->data_size = size;
1681 0 : sent += size;
1682 0 : pdata += size;
1683 : }
1684 :
1685 : /* send event to buffer */
1686 0 : if (equipment[idx].buffer_handle) {
1687 0 : status = rpc_send_event(equipment[idx].buffer_handle, pfragment,
1688 0 : pfragment->data_size + sizeof(EVENT_HEADER), BM_WAIT, rpc_mode);
1689 0 : if (status != RPC_SUCCESS) {
1690 0 : cm_msg(MERROR, "check_polled_events", "rpc_send_event(BM_WAIT) error %d", status);
1691 0 : return status;
1692 : }
1693 :
1694 : /* flush events from buffer */
1695 0 : rpc_flush_event();
1696 : }
1697 : }
1698 :
1699 : } else { /*-------------------*/
1700 :
1701 : /* send unfragmented event */
1702 :
1703 : /* send first event to ODB if logger writes in root format */
1704 0 : if (pevent->serial_number == 0)
1705 0 : if (logger_root())
1706 0 : update_odb(pevent, eq->hkey_variables, eq->format);
1707 :
1708 0 : status = rpc_send_event(eq->buffer_handle, pevent,
1709 0 : pevent->data_size + sizeof(EVENT_HEADER),
1710 : BM_WAIT, rpc_mode);
1711 :
1712 0 : if (status != SUCCESS) {
1713 0 : cm_msg(MERROR, "check_polled_events", "rpc_send_event error %d", status);
1714 0 : break;
1715 : }
1716 : }
1717 :
1718 0 : eq->bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
1719 :
1720 0 : if (eq->info.num_subevents) {
1721 0 : eq->events_sent += eq->subevent_number;
1722 0 : events_sent += eq->subevent_number;
1723 : } else {
1724 0 : eq->events_sent++;
1725 0 : events_sent++;
1726 : }
1727 :
1728 0 : rotate_wheel();
1729 : }
1730 :
1731 0 : actual_millitime = ss_millitime();
1732 :
1733 : /* repeat no more than period */
1734 0 : if (actual_millitime - readout_start > (DWORD) eq_info->period)
1735 0 : break;
1736 :
1737 : /* quit if event limit is reached */
1738 0 : if (eq_info->event_limit > 0 && eq->stats.events_sent + eq->events_sent >= eq_info->event_limit)
1739 0 : break;
1740 : }
1741 : }
1742 :
1743 0 : return events_sent;
1744 : }
1745 :
1746 : /*------------------------------------------------------------------*/
1747 :
1748 0 : static INT check_user_events(void) {
1749 : EQUIPMENT_INFO *eq_info;
1750 : EQUIPMENT *eq;
1751 : DWORD size;
1752 : INT idx, events_sent;
1753 :
1754 0 : events_sent = 0;
1755 :
1756 : /*---- loop over equipment table -------------------------------*/
1757 0 : for (idx = 0;; idx++) {
1758 0 : eq = &equipment[idx];
1759 0 : eq_info = &eq->info;
1760 :
1761 : /* check if end of equipment list */
1762 0 : if (!eq->name[0])
1763 0 : break;
1764 :
1765 0 : if (!eq_info->enabled)
1766 0 : continue;
1767 :
1768 0 : if (eq->status != FE_SUCCESS)
1769 0 : continue;
1770 :
1771 0 : if ((eq_info->eq_type & (EQ_INTERRUPT | EQ_MULTITHREAD | EQ_USER)) == 0)
1772 0 : continue;
1773 :
1774 : do {
1775 0 : size = receive_trigger_event(eq);
1776 0 : if (size > 0)
1777 0 : events_sent++;
1778 0 : } while (size > 0);
1779 : }
1780 :
1781 0 : return events_sent;
1782 : }
1783 :
1784 : /*------------------------------------------------------------------*/
1785 :
1786 0 : static INT scheduler() {
1787 : EQUIPMENT_INFO *eq_info;
1788 : EQUIPMENT *eq;
1789 : EVENT_HEADER *pevent, *pfragment;
1790 0 : DWORD last_time_network = 0, last_time_display = 0, last_time_flush = 0,
1791 0 : readout_start, sent, size, last_time_rate = 0;
1792 0 : INT i, j, idx, status = 0, ch, source, state, old_flag;
1793 : char *pdata;
1794 : unsigned char *pd;
1795 0 : BOOL flag, force_update = FALSE;
1796 :
1797 0 : INT opt_max = 0, opt_index = 0, opt_tcp_size = 128, opt_cnt = 0;
1798 : INT err;
1799 :
1800 : #ifdef OS_VXWORKS
1801 : rpc_set_opt_tcp_size(1024);
1802 : #ifdef PPCxxx
1803 : rpc_set_opt_tcp_size(NET_TCP_SIZE);
1804 : #endif
1805 : #endif
1806 :
1807 : /*----------------- MAIN equipment loop ------------------------------*/
1808 :
1809 0 : last_time_rate = ss_millitime();
1810 :
1811 : do {
1812 0 : actual_millitime = ss_millitime();
1813 0 : actual_time = ss_time();
1814 :
1815 : /*---- loop over equipment table -------------------------------*/
1816 0 : for (idx = 0;; idx++) {
1817 0 : eq = &equipment[idx];
1818 0 : eq_info = &eq->info;
1819 :
1820 : /* check if end of equipment list */
1821 0 : if (!eq->name[0])
1822 0 : break;
1823 :
1824 0 : if (!eq_info->enabled)
1825 0 : continue;
1826 :
1827 0 : if (eq->status != FE_SUCCESS && eq->status != FE_PARTIALLY_DISABLED)
1828 0 : continue;
1829 :
1830 : /*---- call idle routine for slow control equipment ----*/
1831 0 : if ((eq_info->eq_type & EQ_SLOW) && (eq->status == FE_SUCCESS || eq->status == FE_PARTIALLY_DISABLED)) {
1832 0 : if (eq_info->history > 0) {
1833 0 : if (actual_millitime - eq->last_idle >= (DWORD) eq_info->history) {
1834 0 : eq->cd(CMD_IDLE, eq);
1835 0 : eq->last_idle = actual_millitime;
1836 : }
1837 : } else
1838 0 : eq->cd(CMD_IDLE, eq);
1839 : }
1840 :
1841 0 : if (run_state == STATE_STOPPED && (eq_info->read_on & RO_STOPPED) == 0)
1842 0 : continue;
1843 0 : if (run_state == STATE_PAUSED && (eq_info->read_on & RO_PAUSED) == 0)
1844 0 : continue;
1845 0 : if (run_state == STATE_RUNNING && (eq_info->read_on & RO_RUNNING) == 0)
1846 0 : continue;
1847 :
1848 : /*---- check periodic events ----*/
1849 0 : if ((eq_info->eq_type & EQ_PERIODIC) || (eq_info->eq_type & EQ_SLOW)) {
1850 0 : if (eq_info->period == 0)
1851 0 : continue;
1852 :
1853 : /* check if period over */
1854 0 : if (actual_millitime - eq->last_called >= (DWORD) eq_info->period) {
1855 : /* disable interrupts or readout thread during this event */
1856 0 : old_flag = readout_enabled();
1857 0 : if (old_flag && lockout_readout_thread)
1858 0 : readout_enable(FALSE);
1859 :
1860 : /* readout and send event */
1861 0 : status = send_event(idx, FALSE);
1862 :
1863 0 : if (status != CM_SUCCESS) {
1864 0 : cm_msg(MERROR, "scheduler", "send_event error %d", status);
1865 0 : goto net_error;
1866 : }
1867 :
1868 : /* re-enable the interrupt or readout thread after readout */
1869 0 : if (old_flag)
1870 0 : readout_enable(TRUE);
1871 : }
1872 : }
1873 :
1874 : /*---- check polled events ----*/
1875 0 : if (eq_info->eq_type & EQ_POLLED) {
1876 0 : readout_start = actual_millitime;
1877 0 : pevent = NULL;
1878 :
1879 0 : while ((source = poll_event(eq_info->source, eq->poll_count, FALSE)) > 0) {
1880 :
1881 0 : if (eq_info->eq_type & EQ_FRAGMENTED)
1882 0 : pevent = (EVENT_HEADER *) frag_buffer;
1883 : else
1884 0 : pevent = (EVENT_HEADER *) event_buffer;
1885 :
1886 : /* compose MIDAS event header */
1887 0 : pevent->event_id = eq_info->event_id;
1888 0 : pevent->trigger_mask = eq_info->trigger_mask;
1889 0 : pevent->data_size = 0;
1890 0 : pevent->time_stamp = actual_time;
1891 0 : pevent->serial_number = eq->serial_number;
1892 :
1893 : /* put source at beginning of event, will be overwritten by
1894 : user readout code, just a special feature used by some
1895 : multi-source applications */
1896 0 : *(INT *) (pevent + 1) = source;
1897 :
1898 0 : if (eq->info.num_subevents) {
1899 0 : eq->subevent_number = 0;
1900 : do {
1901 0 : *(INT *) ((char *) (pevent + 1) + pevent->data_size) = source;
1902 :
1903 : /* call user readout routine for subevent indicating offset */
1904 0 : size = eq->readout((char *) (pevent + 1), pevent->data_size);
1905 0 : pevent->data_size += size;
1906 0 : if (size > 0) {
1907 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1908 0 : cm_msg(MERROR, "scheduler",
1909 : "Event size %ld larger than maximum size %d",
1910 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1911 : max_event_size);
1912 : }
1913 :
1914 0 : eq->subevent_number++;
1915 0 : eq->serial_number++;
1916 : }
1917 :
1918 : /* wait for next event */
1919 : do {
1920 0 : source = poll_event(eq_info->source, eq->poll_count, FALSE);
1921 :
1922 0 : if (source == FALSE) {
1923 0 : actual_millitime = ss_millitime();
1924 :
1925 : /* repeat no more than period */
1926 0 : if (actual_millitime - readout_start > (DWORD) eq_info->period)
1927 0 : break;
1928 : }
1929 0 : } while (source == FALSE);
1930 :
1931 0 : } while (eq->subevent_number < eq->info.num_subevents && source);
1932 :
1933 : /* notify readout routine about end of super-event */
1934 0 : pevent->data_size = eq->readout((char *) (pevent + 1), -1);
1935 : } else {
1936 : /* call user readout routine indicating event source */
1937 0 : pevent->data_size = eq->readout((char *) (pevent + 1), 0);
1938 :
1939 : /* check event size */
1940 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
1941 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size_frag) {
1942 0 : cm_msg(MERROR, "send_event",
1943 : "Event size %ld larger than maximum size %d for frag. ev.",
1944 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1945 : max_event_size_frag);
1946 0 : pevent->data_size = 0;
1947 : }
1948 : } else {
1949 0 : if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
1950 0 : cm_msg(MERROR, "scheduler",
1951 : "Event size %ld larger than maximum size %d",
1952 0 : (long) (pevent->data_size + sizeof(EVENT_HEADER)),
1953 : max_event_size);
1954 0 : pevent->data_size = 0;
1955 : }
1956 : }
1957 :
1958 : /* increment serial number if event read out sucessfully */
1959 0 : if (pevent->data_size)
1960 0 : eq->serial_number++;
1961 : }
1962 :
1963 : /* send event */
1964 0 : if (pevent->data_size) {
1965 :
1966 : /* check for fragmented event */
1967 0 : if (eq_info->eq_type & EQ_FRAGMENTED) {
1968 : /* compose fragments */
1969 0 : pfragment = (EVENT_HEADER *) event_buffer;
1970 :
1971 : /* compose MIDAS event header */
1972 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
1973 0 : pfragment->event_id |= EVENTID_FRAG1;
1974 :
1975 : /* store total event size */
1976 0 : pd = (unsigned char *) (pfragment + 1);
1977 0 : size = pevent->data_size;
1978 0 : for (i = 0; i < 4; i++) {
1979 0 : pd[i] = (unsigned char) (size & 0xFF); /* little endian, please! */
1980 0 : size >>= 8;
1981 : }
1982 :
1983 0 : pfragment->data_size = sizeof(DWORD);
1984 :
1985 0 : pdata = (char *) (pevent + 1);
1986 :
1987 0 : for (i = 0, sent = 0; sent < pevent->data_size; i++) {
1988 0 : if (i > 0) {
1989 0 : pfragment = (EVENT_HEADER *) event_buffer;
1990 :
1991 : /* compose MIDAS event header */
1992 0 : memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
1993 0 : pfragment->event_id |= EVENTID_FRAG;
1994 :
1995 : /* copy portion of event */
1996 0 : size = pevent->data_size - sent;
1997 0 : if (size > max_event_size - sizeof(EVENT_HEADER))
1998 0 : size = max_event_size - sizeof(EVENT_HEADER);
1999 :
2000 0 : memcpy(pfragment + 1, pdata, size);
2001 0 : pfragment->data_size = size;
2002 0 : sent += size;
2003 0 : pdata += size;
2004 : }
2005 :
2006 : /* send event to buffer */
2007 0 : if (equipment[idx].buffer_handle) {
2008 0 : status = rpc_send_event(equipment[idx].buffer_handle, pfragment,
2009 0 : pfragment->data_size + sizeof(EVENT_HEADER), BM_WAIT, rpc_mode);
2010 0 : if (status != RPC_SUCCESS) {
2011 0 : cm_msg(MERROR, "scheduler", "rpc_send_event(BM_WAIT) error %d", status);
2012 0 : return status;
2013 : }
2014 :
2015 : /* flush events from buffer */
2016 0 : rpc_flush_event();
2017 : }
2018 : }
2019 :
2020 : } else { /*-------------------*/
2021 :
2022 : /* send unfragmented event */
2023 :
2024 : /* send first event to ODB if logger writes in root format */
2025 0 : if (pevent->serial_number == 0)
2026 0 : if (logger_root())
2027 0 : update_odb(pevent, eq->hkey_variables, eq->format);
2028 :
2029 0 : status = rpc_send_event(eq->buffer_handle, pevent,
2030 0 : pevent->data_size + sizeof(EVENT_HEADER),
2031 : BM_WAIT, rpc_mode);
2032 :
2033 0 : if (status != SUCCESS) {
2034 0 : cm_msg(MERROR, "scheduler", "rpc_send_event error %d", status);
2035 0 : goto net_error;
2036 : }
2037 : }
2038 :
2039 0 : eq->bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
2040 :
2041 0 : if (eq->info.num_subevents)
2042 0 : eq->events_sent += eq->subevent_number;
2043 : else
2044 0 : eq->events_sent++;
2045 :
2046 0 : rotate_wheel();
2047 : }
2048 :
2049 0 : actual_millitime = ss_millitime();
2050 :
2051 : /* send event to ODB */
2052 0 : if (pevent->data_size && (eq_info->read_on & RO_ODB)) {
2053 0 : if (actual_millitime - eq->last_called > ODB_UPDATE_TIME) {
2054 0 : eq->last_called = actual_millitime;
2055 0 : update_odb(pevent, eq->hkey_variables, eq->format);
2056 0 : eq->odb_out++;
2057 : }
2058 : }
2059 :
2060 : /* repeat no more than period */
2061 0 : if (actual_millitime - readout_start > (DWORD) eq_info->period)
2062 0 : break;
2063 :
2064 : /* quit if event limit is reached */
2065 0 : if (eq_info->event_limit > 0 && eq->stats.events_sent + eq->events_sent >= eq_info->event_limit)
2066 0 : break;
2067 : }
2068 : }
2069 :
2070 : /*---- send interrupt events ----*/
2071 0 : if (eq_info->eq_type & (EQ_INTERRUPT | EQ_MULTITHREAD | EQ_USER)) {
2072 0 : readout_start = actual_millitime;
2073 :
2074 : do {
2075 0 : size = receive_trigger_event(eq);
2076 0 : if ((int) size == -1)
2077 0 : goto net_error;
2078 :
2079 0 : actual_millitime = ss_millitime();
2080 :
2081 : /* repeat no more than period */
2082 0 : if (actual_millitime - readout_start > (DWORD) eq_info->period)
2083 0 : break;
2084 :
2085 : /* quit if event limit is reached */
2086 0 : if (eq_info->event_limit > 0 && eq->stats.events_sent + eq->events_sent >= eq_info->event_limit)
2087 0 : break;
2088 :
2089 0 : } while (size > 0);
2090 :
2091 : /* send event to ODB */
2092 0 : pevent = (EVENT_HEADER *) event_buffer;
2093 0 : if (size > 0 && pevent->data_size && ((eq_info->read_on & RO_ODB) || eq_info->history)) {
2094 0 : if (actual_millitime - eq->last_called > ODB_UPDATE_TIME && pevent != NULL) {
2095 0 : eq->last_called = actual_millitime;
2096 0 : update_odb(pevent, eq->hkey_variables, eq->format);
2097 0 : eq->odb_out++;
2098 : }
2099 : }
2100 : }
2101 :
2102 : /*---- check if event limit is reached ----*/
2103 0 : if (eq_info->eq_type != EQ_SLOW && eq_info->event_limit > 0 && eq->stats.events_sent + eq->events_sent >= eq_info->event_limit && run_state == STATE_RUNNING) {
2104 : /* stop run */
2105 : char str[TRANSITION_ERROR_STRING_LENGTH];
2106 0 : if (cm_transition(TR_STOP, 0, str, sizeof(str), TR_SYNC, FALSE) != CM_SUCCESS)
2107 0 : cm_msg(MERROR, "scheduler", "cannot stop run: %s", str);
2108 :
2109 : /* check if auto-restart, main loop will take care of it */
2110 0 : flag = FALSE;
2111 0 : size = sizeof(flag);
2112 0 : db_get_value(hDB, 0, "/Logger/Auto restart", &flag, (INT *) &size, TID_BOOL, TRUE);
2113 :
2114 0 : if (flag) {
2115 0 : UINT32 delay = 20;
2116 0 : size = sizeof(delay);
2117 0 : db_get_value(hDB, 0, "/Logger/Auto restart delay", &delay, (INT *) &size, TID_UINT32, TRUE);
2118 0 : auto_restart = ss_time() + delay;
2119 : }
2120 :
2121 : /* update event display correctly */
2122 0 : force_update = TRUE;
2123 : }
2124 0 : }
2125 :
2126 : /*---- check for error messages periodically -------------------*/
2127 0 : mfe_error_check();
2128 :
2129 : /*---- call frontend_loop periodically -------------------------*/
2130 0 : if (frontend_call_loop) {
2131 0 : status = frontend_loop();
2132 0 : if (status == RPC_SHUTDOWN || status == SS_ABORT) {
2133 0 : status = RPC_SHUTDOWN;
2134 0 : break;
2135 : }
2136 : }
2137 :
2138 : /*---- check for deferred transitions --------------------------*/
2139 0 : cm_check_deferred_transition();
2140 :
2141 : /*---- check for manual triggered events -----------------------*/
2142 0 : if (manual_trigger_event_id) {
2143 0 : old_flag = readout_enabled();
2144 0 : if (old_flag && lockout_readout_thread)
2145 0 : readout_enable(FALSE);
2146 :
2147 : /* readout and send event */
2148 0 : status = BM_INVALID_PARAM;
2149 0 : for (i = 0; equipment[i].name[0]; i++)
2150 0 : if (equipment[i].info.event_id == manual_trigger_event_id) {
2151 0 : status = send_event(i, TRUE);
2152 0 : break;
2153 : }
2154 :
2155 0 : manual_trigger_event_id = 0;
2156 :
2157 0 : if (status != CM_SUCCESS) {
2158 0 : cm_msg(MERROR, "scheduler", "send_event error %d", status);
2159 0 : goto net_error;
2160 : }
2161 :
2162 : /* re-enable the interrupt after periodic */
2163 0 : if (old_flag)
2164 0 : readout_enable(TRUE);
2165 : }
2166 :
2167 0 : int overflow = 0;
2168 :
2169 0 : for (i = 0; equipment[i].name[0]; i++) {
2170 0 : if (equipment[i].bytes_sent > 0xDFFFFFFF)
2171 0 : overflow = (int) equipment[i].bytes_sent;
2172 : }
2173 :
2174 : /*---- calculate rates and update status page periodically -----*/
2175 0 : if (force_update || overflow || (display_period && actual_millitime - last_time_display > (DWORD) display_period)
2176 0 : || (!display_period && actual_millitime - last_time_display > 3000)) {
2177 0 : force_update = FALSE;
2178 :
2179 0 : for (i = 0; equipment[i].name[0]; i++) {
2180 0 : eq = &equipment[i];
2181 0 : eq->stats.events_sent += eq->events_sent;
2182 0 : n_events[i] += (int) eq->events_sent;
2183 0 : eq->events_sent = 0;
2184 : }
2185 :
2186 : /* calculate rates after requested period */
2187 0 : if (overflow || (actual_millitime - last_time_rate > (DWORD) get_rate_period())) {
2188 0 : max_bytes_per_sec = 0;
2189 0 : for (i = 0; equipment[i].name[0]; i++) {
2190 0 : eq = &equipment[i];
2191 0 : double e = n_events[i] / ((actual_millitime - last_time_rate) / 1000.0);
2192 0 : eq->stats.events_per_sec = ((int)(e * 100 + 0.5)) / 100.0;
2193 :
2194 0 : e = eq->bytes_sent / 1000.0 / ((actual_millitime - last_time_rate) / 1000.0);
2195 0 : eq->stats.kbytes_per_sec = ((int)(e * 1000 + 0.5)) / 1000.0;
2196 :
2197 0 : if ((INT) eq->bytes_sent > max_bytes_per_sec)
2198 0 : max_bytes_per_sec = (INT) eq->bytes_sent;
2199 :
2200 0 : eq->bytes_sent = 0;
2201 0 : n_events[i] = 0;
2202 : }
2203 :
2204 0 : max_bytes_per_sec = (INT) ((double) max_bytes_per_sec / ((actual_millitime - last_time_rate) / 1000.0));
2205 :
2206 0 : last_time_rate = actual_millitime;
2207 : }
2208 :
2209 : /* tcp buffer size evaluation */
2210 0 : if (optimize) {
2211 0 : opt_max = MAX(opt_max, (INT) max_bytes_per_sec);
2212 0 : ss_printf(0, opt_index, "%6d : %5.1lf %5.1lf", opt_tcp_size,
2213 : opt_max / 1000.0, max_bytes_per_sec / 1000.0);
2214 0 : if (++opt_cnt == 10) {
2215 0 : opt_cnt = 0;
2216 0 : opt_max = 0;
2217 0 : opt_index++;
2218 0 : opt_tcp_size = 1 << (opt_index + 7);
2219 0 : rpc_set_opt_tcp_size(opt_tcp_size);
2220 0 : if (1 << (opt_index + 7) > 0x8000) {
2221 0 : opt_index = 0;
2222 0 : opt_tcp_size = 1 << 7;
2223 0 : rpc_set_opt_tcp_size(opt_tcp_size);
2224 : }
2225 : }
2226 : }
2227 :
2228 : /* propagate changes in equipment to ODB */
2229 0 : db_send_changed_records();
2230 :
2231 0 : if (display_period && !mfe_debug) {
2232 0 : display(FALSE);
2233 :
2234 : /* check keyboard */
2235 0 : ch = 0;
2236 0 : status = 0;
2237 0 : while (ss_kbhit()) {
2238 0 : ch = ss_getchar(0);
2239 0 : if (ch == -1)
2240 0 : ch = getchar();
2241 :
2242 0 : if (ch == '!')
2243 0 : status = RPC_SHUTDOWN;
2244 : }
2245 :
2246 0 : if (ch > 0)
2247 0 : display(TRUE);
2248 0 : if (status == RPC_SHUTDOWN)
2249 0 : break;
2250 : }
2251 :
2252 0 : if (display_period && mfe_debug) {
2253 0 : display_inline();
2254 : }
2255 :
2256 0 : last_time_display = actual_millitime;
2257 : }
2258 :
2259 : /*---- check to flush cache ------------------------------------*/
2260 0 : if (actual_millitime - last_time_flush > 1000) {
2261 0 : last_time_flush = actual_millitime;
2262 :
2263 : /* if cache on server is not filled in one second at current
2264 : data rate, flush it now to make events available to consumers */
2265 :
2266 0 : if (max_bytes_per_sec < gWriteCacheSize) {
2267 0 : old_flag = readout_enabled();
2268 0 : if (old_flag && lockout_readout_thread)
2269 0 : readout_enable(FALSE);
2270 :
2271 0 : for (i = 0; equipment[i].name[0]; i++) {
2272 0 : if (equipment[i].buffer_handle) {
2273 : /* if the same buffer is open multiple times, only flush it once */
2274 0 : BOOL buffer_done = FALSE;
2275 0 : for (j = 0; j < i; j++)
2276 0 : if (equipment[i].buffer_handle == equipment[j].buffer_handle) {
2277 0 : buffer_done = TRUE;
2278 0 : break;
2279 : }
2280 :
2281 : //printf("mfe: eq %d, buffer %d, done %d\n", i, equipment[i].buffer_handle, buffer_done);
2282 :
2283 0 : if (!buffer_done) {
2284 0 : rpc_flush_event();
2285 0 : if (rpc_is_remote()) {
2286 0 : err = rpc_call(RPC_BM_FLUSH_CACHE | RPC_NO_REPLY, equipment[i].buffer_handle, BM_NO_WAIT);
2287 : } else {
2288 0 : err = bm_flush_cache(equipment[i].buffer_handle, BM_NO_WAIT);
2289 : }
2290 0 : if ((err != BM_SUCCESS) && (err != BM_ASYNC_RETURN)) {
2291 0 : cm_msg(MERROR, "scheduler", "bm_flush_cache(BM_NO_WAIT) returned status %d", err);
2292 0 : return err;
2293 : }
2294 : }
2295 : }
2296 : }
2297 :
2298 0 : if (old_flag)
2299 0 : readout_enable(TRUE);
2300 : }
2301 : }
2302 :
2303 : /*---- check for auto restart --------------------------------*/
2304 0 : if (auto_restart > 0 && ss_time() > auto_restart) {
2305 : /* check if really stopped */
2306 0 : size = sizeof(state);
2307 0 : status = db_get_value(hDB, 0, "Runinfo/State", &state, (INT *) &size, TID_INT, TRUE);
2308 0 : if (status != DB_SUCCESS)
2309 0 : cm_msg(MERROR, "scheduler", "cannot get Runinfo/State in database");
2310 :
2311 0 : if (state == STATE_STOPPED) {
2312 0 : auto_restart = 0;
2313 0 : size = sizeof(run_number);
2314 : status =
2315 0 : db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, (INT *) &size, TID_INT,
2316 : TRUE);
2317 0 : assert(status == SUCCESS);
2318 :
2319 0 : if (run_number <= 0) {
2320 0 : cm_msg(MERROR, "main", "aborting on attempt to use invalid run number %d",
2321 : run_number);
2322 0 : abort();
2323 : }
2324 :
2325 0 : cm_msg(MTALK, "main", "starting new run");
2326 0 : status = cm_transition(TR_START, run_number + 1, NULL, 0, TR_SYNC, FALSE);
2327 0 : if (status != CM_SUCCESS)
2328 0 : cm_msg(MERROR, "main", "cannot restart run");
2329 : }
2330 : }
2331 :
2332 : /*---- check network messages ----------------------------------*/
2333 0 : if ((run_state == STATE_RUNNING && interrupt_eq == NULL) || slowcont_eq || frontend_call_loop) {
2334 :
2335 : /* yield 10 ms if no polled equipment present */
2336 0 : if (polled_eq == NULL && !slowcont_eq && !frontend_call_loop)
2337 0 : status = cm_yield(10);
2338 : else {
2339 : /* only call yield once every 10ms when running */
2340 0 : if (actual_millitime - last_time_network > 10) {
2341 0 : status = cm_yield(0);
2342 0 : last_time_network = actual_millitime;
2343 : } else
2344 0 : status = RPC_SUCCESS;
2345 : }
2346 : } else
2347 : /* when run is stopped or interrupts used,
2348 : call yield with 100ms timeout */
2349 0 : status = cm_yield(100);
2350 :
2351 : /* exit for VxWorks */
2352 0 : if (fe_stop)
2353 0 : status = RPC_SHUTDOWN;
2354 :
2355 : /* exit if CTRL-C pressed */
2356 0 : if (cm_is_ctrlc_pressed())
2357 0 : status = RPC_SHUTDOWN;
2358 :
2359 0 : } while (status != RPC_SHUTDOWN && status != SS_ABORT);
2360 :
2361 0 : net_error:
2362 :
2363 0 : return status;
2364 : }
2365 :
2366 : /*------------------------------------------------------------------*/
2367 :
2368 0 : INT get_frontend_index() {
2369 0 : return frontend_index;
2370 : }
2371 :
2372 : /*------------------------------------------------------------------*/
2373 :
2374 : void (*mfe_error_dispatcher)(const char *) = NULL;
2375 :
2376 : #define MFE_ERROR_SIZE 10
2377 : char mfe_error_str[MFE_ERROR_SIZE][256];
2378 : int mfe_error_r, mfe_error_w;
2379 : MUTEX_T *mfe_mutex = NULL;
2380 :
2381 0 : void mfe_set_error(void (*dispatcher)(const char *)) {
2382 : int status;
2383 :
2384 0 : mfe_error_dispatcher = dispatcher;
2385 0 : mfe_error_r = mfe_error_w = 0;
2386 0 : memset(mfe_error_str, 0, sizeof(mfe_error_str));
2387 :
2388 0 : if (mfe_mutex == NULL) {
2389 0 : status = ss_mutex_create(&mfe_mutex, FALSE);
2390 0 : if (status != SS_SUCCESS && status != SS_CREATED)
2391 0 : cm_msg(MERROR, "mfe_set_error", "Cannot create mutex\n");
2392 : }
2393 0 : }
2394 :
2395 0 : void mfe_error(const char *error)
2396 : /* central error dispatcher routine which can be called by any device
2397 : or class driver */
2398 : {
2399 0 : if (mfe_mutex == NULL) {
2400 0 : int status = ss_mutex_create(&mfe_mutex, FALSE);
2401 0 : if (status != SS_SUCCESS && status != SS_CREATED) {
2402 0 : cm_msg(MERROR, "mfe_error", "Cannot create mutex\n");
2403 0 : return;
2404 : }
2405 : }
2406 :
2407 : /* put error into FIFO */
2408 0 : ss_mutex_wait_for(mfe_mutex, 1000);
2409 0 : mstrlcpy(mfe_error_str[mfe_error_w], error, 256);
2410 0 : mfe_error_w = (mfe_error_w + 1) % MFE_ERROR_SIZE;
2411 0 : ss_mutex_release(mfe_mutex);
2412 : }
2413 :
2414 0 : void mfe_error_check(void) {
2415 0 : if (mfe_mutex != NULL) {
2416 0 : ss_mutex_wait_for(mfe_mutex, 1000);
2417 0 : if (mfe_error_w != mfe_error_r) {
2418 0 : if (mfe_error_dispatcher != NULL)
2419 0 : mfe_error_dispatcher(mfe_error_str[mfe_error_r]);
2420 0 : mfe_error_r = (mfe_error_r + 1) % MFE_ERROR_SIZE;
2421 : }
2422 0 : ss_mutex_release(mfe_mutex);
2423 : }
2424 0 : }
2425 :
2426 : /*------------------------------------------------------------------*/
2427 :
2428 : static int _argc = 0;
2429 : static char **_argv = NULL;
2430 :
2431 0 : void mfe_get_args(int *argc, char ***argv) {
2432 0 : *argc = _argc;
2433 0 : *argv = _argv;
2434 0 : }
2435 :
2436 : /*------------------------------------------------------------------*/
2437 :
2438 : #ifdef OS_VXWORKS
2439 : int mfe(char *ahost_name, char *aexp_name, BOOL adebug)
2440 : #else
2441 0 : int main(int argc, char *argv[])
2442 : #endif
2443 : {
2444 : INT status, i, j;
2445 : INT daemon_flag;
2446 :
2447 0 : host_name[0] = 0;
2448 0 : exp_name[0] = 0;
2449 0 : daemon_flag = 0;
2450 :
2451 0 : setbuf(stdout, 0);
2452 0 : setbuf(stderr, 0);
2453 :
2454 : #ifdef SIGPIPE
2455 0 : signal(SIGPIPE, SIG_IGN);
2456 : #endif
2457 :
2458 : #ifdef OS_VXWORKS
2459 : if (ahost_name)
2460 : strcpy(host_name, ahost_name);
2461 : if (aexp_name)
2462 : strcpy(exp_name, aexp_name);
2463 : mfe_debug = adebug;
2464 : #else
2465 :
2466 : /* get default from environment */
2467 0 : cm_get_environment(host_name, sizeof(host_name), exp_name, sizeof(exp_name));
2468 :
2469 : /* store arguments for user use */
2470 0 : _argc = argc;
2471 0 : _argv = (char **) malloc(sizeof(char *) * argc);
2472 0 : for (i = 0; i < argc; i++) {
2473 0 : _argv[i] = argv[i];
2474 : }
2475 :
2476 : /* parse command line parameters */
2477 0 : for (i = 1; i < argc; i++) {
2478 0 : if (argv[i][0] == '-' && argv[i][1] == 'd')
2479 0 : mfe_debug = TRUE;
2480 0 : else if (argv[i][0] == '-' && argv[i][1] == 'D')
2481 0 : daemon_flag = 1;
2482 0 : else if (argv[i][0] == '-' && argv[i][1] == 'O')
2483 0 : daemon_flag = 2;
2484 0 : else if (argv[i][1] == 'v') {
2485 0 : if (i < argc - 1 && atoi(argv[i + 1]) > 0)
2486 0 : verbosity_level = atoi(argv[++i]);
2487 : else
2488 0 : verbosity_level = 1;
2489 0 : } else if (argv[i][0] == '-') {
2490 0 : if (i + 1 >= argc || argv[i + 1][0] == '-')
2491 0 : goto usage;
2492 0 : if (argv[i][1] == 'e')
2493 0 : strcpy(exp_name, argv[++i]);
2494 0 : else if (argv[i][1] == 'h')
2495 0 : strcpy(host_name, argv[++i]);
2496 0 : else if (argv[i][1] == 'i')
2497 0 : frontend_index = atoi(argv[++i]);
2498 0 : else if (argv[i][1] == '-') {
2499 0 : usage:
2500 0 : printf("usage: frontend [-h Hostname] [-e Experiment] [-d] [-D] [-O] [-v <n>] [-i <n>]\n");
2501 0 : printf(" [-d] Used to debug the frontend\n");
2502 0 : printf(" [-D] Become a daemon\n");
2503 0 : printf(" [-O] Become a daemon but keep stdout\n");
2504 0 : printf(" [-v <n>] Set verbosity level\n");
2505 0 : printf(" [-i <n>] Set frontend index (used for event building)\n");
2506 0 : return 0;
2507 : }
2508 : }
2509 : }
2510 : #endif
2511 :
2512 : #ifdef OS_VXWORKS
2513 : /* override event_buffer_size in case of VxWorks
2514 : take remaining free memory and use 20% of it for rb_ */
2515 : event_buffer_size = 2 * 10 * (max_event_size + sizeof(EVENT_HEADER) + sizeof(INT));
2516 : if (event_buffer_size > memFindMax()) {
2517 : cm_msg(MERROR, "mainFE", "Not enough mem space for event size");
2518 : return 0;
2519 : }
2520 : /* takes overall 20% of the available memory resource for rb_() */
2521 : event_buffer_size = 0.2 * memFindMax();
2522 : #endif
2523 :
2524 : /* retrieve frontend index from environment if defined */
2525 0 : if (getenv("MIDAS_FRONTEND_INDEX"))
2526 0 : frontend_index = atoi(getenv("MIDAS_FRONTEND_INDEX"));
2527 :
2528 : /* add frontend index to frontend name if present */
2529 0 : strcpy(full_frontend_name, frontend_name);
2530 0 : if (frontend_index >= 0)
2531 0 : sprintf(full_frontend_name + strlen(full_frontend_name), "%02d", frontend_index);
2532 :
2533 0 : if (daemon_flag) {
2534 0 : printf("\nBecoming a daemon...\n");
2535 0 : ss_daemon_init(daemon_flag == 2);
2536 : }
2537 :
2538 : /* set default rate period */
2539 0 : set_rate_period(1000);
2540 :
2541 : /* now connect to server */
2542 0 : if (host_name[0]) {
2543 0 : if (exp_name[0])
2544 0 : printf("Connect to experiment %s on host %s...\n", exp_name, host_name);
2545 : else
2546 0 : printf("Connect to experiment on host %s...\n", host_name);
2547 0 : } else if (exp_name[0])
2548 0 : printf("Connect to experiment %s...\n", exp_name);
2549 : else
2550 0 : printf("Connect to experiment...\n");
2551 :
2552 0 : status = cm_connect_experiment1(host_name, exp_name, full_frontend_name,
2553 : NULL, DEFAULT_ODB_SIZE, DEFAULT_FE_TIMEOUT);
2554 0 : if (status != CM_SUCCESS) {
2555 0 : cm_msg(MERROR, "mfe_main", "Cannot connect to experiment \'%s\' on host \'%s\', status %d", exp_name, host_name,
2556 : status);
2557 : /* let user read message before window might close */
2558 0 : ss_sleep(5000);
2559 0 : return 1;
2560 : }
2561 :
2562 0 : printf("OK\n");
2563 :
2564 : /* allocate buffer space */
2565 0 : event_buffer = malloc(max_event_size);
2566 0 : if (event_buffer == NULL) {
2567 0 : cm_msg(MERROR, "mfe_main", "mfe: Cannot allocate event buffer of max_event_size %d\n", max_event_size);
2568 0 : return 1;
2569 : }
2570 :
2571 0 : cm_msg(MINFO, "mfe_main", "Frontend \"%s\" started", full_frontend_name);
2572 :
2573 : /* remove any dead frontend */
2574 0 : cm_cleanup(full_frontend_name, FALSE);
2575 :
2576 : /* shutdown previous frontend */
2577 0 : status = cm_shutdown(full_frontend_name, TRUE);
2578 0 : if (status == CM_SUCCESS) {
2579 0 : cm_msg(MINFO, "mfe_main", "Previous frontend \"%s\" stopped", full_frontend_name);
2580 :
2581 : /* let user read message */
2582 0 : ss_sleep(3000);
2583 :
2584 : // set full name again, because previously we could have a "1" added to our name
2585 0 : cm_get_experiment_database(&hDB, &hClient);
2586 0 : cm_set_client_info(hDB, &hClient, host_name, full_frontend_name, rpc_get_hw_type(),
2587 : nullptr, DEFAULT_FE_TIMEOUT);
2588 : }
2589 :
2590 : /* register transition callbacks */
2591 0 : if (cm_register_transition(TR_START, tr_start, 500) != CM_SUCCESS ||
2592 0 : cm_register_transition(TR_STOP, tr_stop, 500) != CM_SUCCESS ||
2593 0 : cm_register_transition(TR_PAUSE, tr_pause, 500) != CM_SUCCESS ||
2594 0 : cm_register_transition(TR_RESUME, tr_resume, 500) != CM_SUCCESS) {
2595 0 : printf("Failed to start local RPC server");
2596 0 : cm_disconnect_experiment();
2597 :
2598 : /* let user read message before window might close */
2599 0 : ss_sleep(5000);
2600 0 : return 1;
2601 : }
2602 0 : cm_set_client_run_state(run_state);
2603 :
2604 0 : cm_get_experiment_database(&hDB, &hClient);
2605 : /* set time from server */
2606 : #ifdef OS_VXWORKS
2607 : cm_synchronize(NULL);
2608 : #endif
2609 :
2610 : /* turn off watchdog if in debug mode */
2611 0 : if (mfe_debug)
2612 0 : cm_set_watchdog_params(FALSE, 0);
2613 :
2614 0 : cm_start_watchdog_thread();
2615 :
2616 : /* reqister equipment in ODB */
2617 0 : if (register_equipment() != SUCCESS) {
2618 0 : printf("\n");
2619 0 : cm_disconnect_experiment();
2620 :
2621 : /* let user read message before window might close */
2622 0 : ss_sleep(5000);
2623 0 : return 1;
2624 : }
2625 :
2626 : /* call user init function */
2627 0 : printf("Init hardware...\n");
2628 0 : if (frontend_init() != SUCCESS) {
2629 0 : printf("\n");
2630 0 : cm_disconnect_experiment();
2631 0 : return 1;
2632 : }
2633 :
2634 : /* inform user of settings */
2635 0 : printf("Frontend name : %s\n", full_frontend_name);
2636 0 : printf("Event buffer size : %d\n", event_buffer_size);
2637 0 : printf("User max event size : %d\n", max_event_size);
2638 0 : if (max_event_size_frag > 0)
2639 0 : printf("User max frag. size : %d\n", max_event_size_frag);
2640 0 : printf("# of events per buffer : %d\n\n", event_buffer_size / max_event_size);
2641 :
2642 : /* check event and buffer sizes */
2643 0 : if (event_buffer_size < 2 * max_event_size) {
2644 0 : cm_msg(MERROR, "mfe_main", "event_buffer_size %d too small for max. event size %d\n", event_buffer_size,
2645 : max_event_size);
2646 0 : ss_sleep(5000);
2647 0 : return 1;
2648 : }
2649 :
2650 0 : int max_allowed_buffer_size = 1024 * 1024 * 1024; // 1 GB
2651 :
2652 : // Check buffer size. If this value is too large, the end-of-run
2653 : // might take quite long to drain a full buffer
2654 0 : if (event_buffer_size > max_allowed_buffer_size) {
2655 0 : cm_msg(MERROR, "mfe_main", "event_buffer_size %d MB exceeds maximum allowed size of %d MB\n",
2656 : event_buffer_size / 1024 / 1024, max_allowed_buffer_size / 1024 / 1024);
2657 0 : ss_sleep(5000);
2658 0 : return 1;
2659 : }
2660 :
2661 : /* initialize all equipment */
2662 0 : status = initialize_equipment();
2663 0 : if (status != SUCCESS) {
2664 0 : printf("\n");
2665 0 : cm_msg(MERROR, "mfe_main", "Error status %d received from initialize_equipment, aborting", status);
2666 0 : cm_disconnect_experiment();
2667 :
2668 : /* let user read message before window might close */
2669 0 : ss_sleep(5000);
2670 0 : return 1;
2671 : }
2672 :
2673 0 : printf("OK\n");
2674 :
2675 : /* switch on interrupts or readout thread if running */
2676 0 : if (run_state == STATE_RUNNING)
2677 0 : readout_enable(TRUE);
2678 :
2679 0 : if (!mfe_debug) {
2680 : /* initialize ss_getchar */
2681 0 : ss_getchar(0);
2682 :
2683 : /* initialize screen display */
2684 0 : if (display_period) {
2685 0 : ss_sleep(500);
2686 0 : display(TRUE);
2687 : }
2688 : }
2689 :
2690 : /* set own message print function */
2691 0 : if (display_period && !mfe_debug)
2692 0 : cm_set_msg_print(MT_ALL, MT_ALL, message_print);
2693 :
2694 : /* call main scheduler loop */
2695 0 : status = scheduler();
2696 :
2697 : /* reset terminal */
2698 0 : ss_getchar(TRUE);
2699 :
2700 0 : if (display_period && !mfe_debug) {
2701 0 : ss_clear_screen();
2702 0 : ss_printf(0, 0, "");
2703 : }
2704 :
2705 : /* stop readout thread */
2706 0 : stop_readout_threads();
2707 0 : rb_set_nonblocking();
2708 0 : while (is_readout_thread_active()) {
2709 0 : flush_user_events();
2710 0 : ss_sleep(100);
2711 : }
2712 :
2713 : /* switch off interrupts and detach */
2714 0 : if (interrupt_eq) {
2715 0 : interrupt_configure(CMD_INTERRUPT_DISABLE, 0, 0);
2716 0 : interrupt_configure(CMD_INTERRUPT_DETACH, 0, 0);
2717 : }
2718 :
2719 : /* call user exit function */
2720 0 : frontend_exit();
2721 :
2722 : /* close slow control drivers */
2723 0 : for (i = 0; equipment[i].name[0]; i++)
2724 0 : if ((equipment[i].info.eq_type & EQ_SLOW) && (equipment[i].status == FE_SUCCESS || equipment[i].status == FE_PARTIALLY_DISABLED)) {
2725 :
2726 0 : for (j = 0; equipment[i].driver[j].name[0]; j++)
2727 0 : if (equipment[i].driver[j].flags & DF_MULTITHREAD)
2728 0 : break;
2729 :
2730 : /* stop all threads if multithreaded */
2731 0 : if (equipment[i].driver[j].name[0] && (equipment[i].status == FE_SUCCESS || equipment[i].status == FE_PARTIALLY_DISABLED))
2732 0 : equipment[i].cd(CMD_STOP, &equipment[i]); /* stop all threads */
2733 : }
2734 0 : for (i = 0; equipment[i].name[0]; i++)
2735 0 : if ((equipment[i].info.eq_type & EQ_SLOW) && (equipment[i].status == FE_SUCCESS || equipment[i].status == FE_PARTIALLY_DISABLED))
2736 0 : equipment[i].cd(CMD_EXIT, &equipment[i]); /* close physical connections */
2737 :
2738 0 : free(n_events);
2739 :
2740 0 : if (cm_is_ctrlc_pressed()) {
2741 0 : cm_msg(MINFO, "mfe_main", "Frontend \"%s\" stopped via Ctrl-C", full_frontend_name);
2742 0 : } else if (status == RPC_SHUTDOWN) {
2743 0 : cm_msg(MINFO, "mfe_main", "Frontend \"%s\" stopped via network command", full_frontend_name);
2744 0 : } else if (status != RPC_SHUTDOWN) {
2745 0 : cm_msg(MINFO, "mfe_main", "Frontend \"%s\" stopped because of network error", full_frontend_name);
2746 : } else
2747 0 : cm_msg(MINFO, "mfe_main", "Frontend \"%s\" stopped", full_frontend_name);
2748 :
2749 : /* close network connection to server */
2750 0 : cm_disconnect_experiment();
2751 :
2752 0 : return 0;
2753 : }
2754 :
2755 : #ifdef LINK_TEST
2756 : char *frontend_name;
2757 : char *frontend_file_name;
2758 : BOOL frontend_call_loop;
2759 : int event_buffer_size;
2760 : int max_event_size;
2761 : int max_event_size_frag;
2762 : int display_period;
2763 : EQUIPMENT equipment[1];
2764 : int frontend_init() { return 0; };
2765 : int frontend_exit() { return 0; };
2766 : int begin_of_run(int runno, char *errstr) { return 0; };
2767 : int end_of_run(int runno, char *errstr) { return 0; };
2768 : int pause_run(int runno, char *errstr) { return 0; };
2769 : int resume_run(int runno, char *errstr) { return 0; };
2770 : int interrupt_configure(INT cmd, INT source, POINTER_T adr) { return 0; };
2771 : int frontend_loop() { return 0; };
2772 : int poll_event(INT source, INT count, BOOL test) { return 0; };
2773 : #endif
2774 : /* emacs
2775 : * Local Variables:
2776 : * tab-width: 8
2777 : * c-basic-offset: 3
2778 : * indent-tabs-mode: nil
2779 : * End:
2780 : */
|