Line data Source code
1 : /********************************************************************\
2 :
3 : Name: mevb.c
4 : Created by: Pierre-Andre Amaudruz
5 :
6 : Contents: Main Event builder task.
7 :
8 : $Id$
9 :
10 : \********************************************************************/
11 :
12 : /**dox***************************************************************/
13 : /* @file mevb.c
14 : The Event builder main file
15 : */
16 :
17 : #include <stdio.h>
18 : #include "midas.h"
19 : #include "mevb.h"
20 : #include "msystem.h"
21 : #include "mdsupport.h"
22 : #include "mfe.h"
23 :
24 : #define SERVER_CACHE_SIZE 100000 /* event cache before buffer */
25 :
26 : #define ODB_UPDATE_TIME 1000 /* 1 seconds for ODB update */
27 :
28 : #define DEFAULT_FE_TIMEOUT 60000 /* 60 seconds for watchdog timeout */
29 :
30 : //RP#define TIMEOUT_ABORT 10 /* seconds waiting for data before aborting run */
31 : //#define TIMEOUT_ABORT 120 /* seconds waiting for data before aborting run */
32 : #define TIMEOUT_ABORT 300 /* seconds waiting for data before aborting run */
33 :
34 : EBUILDER_SETTINGS ebset;
35 : EBUILDER_CHANNEL ebch[MAX_CHANNELS];
36 :
37 : INT run_state; /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
38 : INT run_number;
39 : DWORD last_time;
40 : DWORD actual_time; /* current time in seconds since 1970 */
41 : DWORD actual_millitime; /* current time in milliseconds */
42 :
43 : char host_name[HOST_NAME_LENGTH];
44 : char expt_name[NAME_LENGTH];
45 : char buffer_name[NAME_LENGTH];
46 : INT nfragment;
47 : char *dest_event;
48 : HNDLE hDB, hKey, hStatKey, hSubkey, hEqKey, hESetKey;
49 : BOOL debug = FALSE, debug1 = FALSE;
50 :
51 : BOOL wheel = FALSE;
52 : char bars[] = "|\\-/";
53 : int i_bar;
54 : BOOL abort_requested = FALSE, stop_requested = TRUE;
55 : DWORD stop_time = 0, request_stop_time = 0;
56 :
57 : INT(*meb_fragment_add) (char *, char *, INT *);
58 : INT handFlush(void);
59 : INT source_booking(void);
60 : INT source_unbooking(void);
61 : INT close_buffers(void);
62 : INT source_scan(INT fmt, EQUIPMENT_INFO * eq_info);
63 : INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
64 : INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
65 :
66 : INT eb_begin_of_run(INT, char *, char *);
67 : INT eb_end_of_run(INT, char *);
68 : INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
69 : INT load_fragment(void);
70 : INT scan_fragment(void);
71 : //char *frontend_name;
72 : //char *frontend_file_name;
73 : BOOL frontend_call_loop;
74 :
75 : extern INT max_event_size;
76 : extern INT max_event_size_frag;
77 : extern INT event_buffer_size;
78 : extern INT display_period;
79 : extern INT ebuilder_init(void);
80 : extern INT ebuilder_exit(void);
81 : extern INT ebuilder_loop(void);
82 :
83 : extern EQUIPMENT equipment[];
84 : extern INT md_event_swap(INT fmt, void * pevt);
85 :
86 : #define EQUIPMENT_STATISTICS_STR "\
87 : Events sent = DOUBLE : 0\n\
88 : Events per sec. = DOUBLE : 0\n\
89 : kBytes per sec. = DOUBLE : 0\n\
90 : "
91 : static int waiting_for_stop = FALSE;
92 :
93 : /********************************************************************/
94 0 : INT register_equipment(void)
95 : {
96 0 : INT index, size, status;
97 0 : char str[256];
98 0 : EQUIPMENT_INFO *eq_info;
99 0 : EQUIPMENT_STATS *eq_stats;
100 0 : HNDLE hKey;
101 :
102 : /* get current ODB run state */
103 0 : size = sizeof(run_state);
104 0 : run_state = STATE_STOPPED;
105 0 : db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
106 0 : size = sizeof(run_number);
107 0 : run_number = 1;
108 0 : status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
109 0 : assert(status == SUCCESS);
110 :
111 : /* scan EQUIPMENT table from mevb.C */
112 0 : for (index = 0; equipment[index].name[0]; index++) {
113 0 : eq_info = &equipment[index].info;
114 0 : eq_stats = &equipment[index].stats;
115 :
116 0 : if (eq_info->event_id == 0) {
117 0 : printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
118 0 : cm_disconnect_experiment();
119 0 : ss_sleep(5000);
120 0 : exit(0);
121 : }
122 :
123 : /* init status */
124 0 : equipment[index].status = EB_SUCCESS;
125 :
126 0 : sprintf(str, "/Equipment/%s/Common", equipment[index].name);
127 :
128 : /* get last event limit from ODB */
129 0 : if (eq_info->eq_type != EQ_SLOW) {
130 0 : db_find_key(hDB, 0, str, &hKey);
131 0 : size = sizeof(double);
132 0 : if (hKey)
133 0 : db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size, TID_DOUBLE, TRUE);
134 : }
135 :
136 : /* Create common subtree */
137 0 : status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
138 0 : if (status != DB_SUCCESS) {
139 0 : printf("Cannot check equipment record, status = %d\n", status);
140 0 : ss_sleep(3000);
141 : }
142 0 : db_find_key(hDB, 0, str, &hKey);
143 :
144 0 : if (equal_ustring(eq_info->format, "FIXED"))
145 0 : equipment[index].format = FORMAT_FIXED;
146 : else /* default format is MIDAS */
147 0 : equipment[index].format = FORMAT_MIDAS;
148 :
149 0 : gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
150 0 : strcpy(eq_info->frontend_name, frontend_name);
151 0 : strcpy(eq_info->frontend_file_name, frontend_file_name);
152 :
153 : /* set record from equipment[] table in frontend.c */
154 0 : db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
155 :
156 : /* get record once at the start equipment info */
157 0 : size = sizeof(EQUIPMENT_INFO);
158 0 : db_get_record(hDB, hKey, eq_info, &size, 0);
159 :
160 : /*---- Create just the key , leave it empty ---------------------------------*/
161 0 : sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
162 0 : db_create_key(hDB, 0, str, TID_KEY);
163 0 : db_find_key(hDB, 0, str, &hKey);
164 0 : equipment[index].hkey_variables = hKey;
165 :
166 : /*---- Create and initialize statistics tree -------------------*/
167 0 : sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
168 :
169 0 : status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
170 0 : if (status != DB_SUCCESS) {
171 0 : printf("Cannot create/check statistics record, error %d\n", status);
172 0 : ss_sleep(3000);
173 : }
174 :
175 0 : status = db_find_key(hDB, 0, str, &hKey);
176 0 : if (status != DB_SUCCESS) {
177 0 : printf("Cannot find statistics record, error %d\n", status);
178 0 : ss_sleep(3000);
179 : }
180 :
181 0 : eq_stats->events_sent = 0;
182 0 : eq_stats->events_per_sec = 0;
183 0 : eq_stats->kbytes_per_sec = 0;
184 :
185 : /* open hot link to statistics tree */
186 0 : status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL);
187 0 : if (status == DB_NO_ACCESS) {
188 : /* record is probably still in exclusive access by dead FE, so reset it */
189 0 : status = db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE);
190 0 : if (status != DB_SUCCESS)
191 0 : cm_msg(MERROR, "register_equipment", "Cannot change access mode for record \'%s\', error %d", str, status);
192 : else
193 0 : cm_msg(MINFO, "register_equipment", "Recovered access mode for record \'%s\'", str);
194 0 : status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL);
195 : }
196 0 : if (status != DB_SUCCESS) {
197 0 : cm_msg(MERROR, "register_equipment", "Cannot open statistics record, error %d. Probably other FE is using it", status);
198 0 : ss_sleep(3000);
199 : }
200 :
201 : /*---- open event buffer ---------------------------------------*/
202 0 : if (eq_info->buffer[0]) {
203 0 : status = bm_open_buffer(eq_info->buffer, DEFAULT_BUFFER_SIZE, &equipment[index].buffer_handle);
204 0 : if (status != BM_SUCCESS && status != BM_CREATED) {
205 0 : cm_msg(MERROR, "register_equipment",
206 : "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
207 : and rebuild the system.");
208 0 : return 0;
209 : }
210 :
211 0 : if (1)
212 : {
213 0 : int level = 0;
214 0 : bm_get_buffer_level(equipment[index].buffer_handle, &level);
215 0 : printf("Buffer %s, level %d, info: \n", eq_info->buffer, level);
216 : }
217 :
218 : /* set the default buffer cache size */
219 0 : bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
220 : } else {
221 0 : cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
222 0 : ss_sleep(3000);
223 0 : exit(0);
224 : }
225 : }
226 : return SUCCESS;
227 : }
228 :
229 : /********************************************************************/
230 0 : INT load_fragment(void)
231 : {
232 0 : INT i, size, type;
233 0 : HNDLE hEqKey, hSubkey;
234 0 : EQUIPMENT_INFO *eq_info;
235 0 : KEY key;
236 0 : char buffer[NAME_LENGTH];
237 0 : char format[8];
238 :
239 : /* Get equipment pointer, only one eqp for now */
240 0 : eq_info = &equipment[0].info;
241 :
242 : /* Scan Equipment/Common listing */
243 0 : if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
244 0 : cm_msg(MINFO, "load_fragment", "Equipment listing not found");
245 0 : return EB_ERROR;
246 : }
247 :
248 : /* Scan the Equipment list for fragment info collection */
249 0 : for (i = 0, nfragment = 0;; i++) {
250 0 : db_enum_key(hDB, hEqKey, i, &hSubkey);
251 0 : if (!hSubkey)
252 : break;
253 0 : db_get_key(hDB, hSubkey, &key);
254 0 : if (key.type == TID_KEY) {
255 : /* Equipment name */
256 0 : if (debug)
257 0 : printf("Equipment name:%s\n", key.name);
258 : /* Check if equipment is EQ_EB */
259 0 : size = sizeof(INT);
260 0 : db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
261 0 : size = sizeof(buffer);
262 0 : db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
263 0 : size = sizeof(format);
264 0 : db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
265 : /* Check if equipment match EB requirements */
266 : // if (debug)
267 : // printf("Equipment name: %s, Type: 0x%x, Buffer: %s, Format: %s\n",
268 : // key.name, type, buffer, format);
269 0 : if ((type & EQ_EB)
270 0 : && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
271 0 : && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
272 : /* match=> fill internal eb structure */
273 0 : strcpy(ebch[nfragment].format, format);
274 0 : strcpy(ebch[nfragment].buffer, buffer);
275 0 : size = sizeof(WORD);
276 0 : db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD,
277 : 0);
278 0 : size = sizeof(WORD);
279 0 : db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
280 0 : printf("Fragment %d: Equipment name %s, evID %d, buffer %s\n",nfragment,key.name, ebch[nfragment].event_id, buffer);
281 : // printf("Fragment %d: Equipment name %s, evID %d, buffer %s\n",key.name, ebch[nfragment].event_id, buffer);
282 0 : nfragment++;
283 : }
284 : }
285 : }
286 :
287 0 : if (nfragment > 1)
288 0 : printf("Found %d fragments for event building\n", nfragment);
289 : else
290 0 : printf("Found one fragment for event building\n");
291 :
292 : /* Point to the Ebuilder settings */
293 : /* Set fragment_add function based on the format */
294 0 : if (equipment[0].format == FORMAT_MIDAS)
295 0 : meb_fragment_add = eb_mfragment_add;
296 : else {
297 0 : cm_msg(MERROR, "load_fragment", "Unknown data format \"%s\"", format);
298 0 : return EB_ERROR;
299 : }
300 :
301 : /* allocate destination event buffer */
302 0 : dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
303 0 : memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
304 0 : if (dest_event == NULL) {
305 : cm_msg(MERROR, "load_fragment", "%s: Not enough memory for event buffer", frontend_name);
306 : return EB_ERROR;
307 : }
308 : return EB_SUCCESS;
309 : }
310 :
311 : /********************************************************************/
312 0 : INT scan_fragment(void)
313 : {
314 0 : INT fragn, status;
315 0 : EQUIPMENT *eq;
316 0 : EQUIPMENT_INFO *eq_info;
317 0 : INT ch;
318 :
319 : /* Get equipment pointer, only one eqp for now */
320 0 : eq_info = &equipment[0].info;
321 0 : status = 0;
322 0 : eq = NULL;
323 :
324 : /* Main event loop */
325 0 : do {
326 0 : switch (run_state) {
327 0 : case STATE_STOPPED:
328 0 : case STATE_PAUSED:
329 : /* skip the source scan and yield */
330 0 : status = cm_yield(500);
331 0 : if (wheel) {
332 0 : printf("...%c Snoring\r", bars[i_bar++ % 4]);
333 0 : fflush(stdout);
334 : }
335 : break;
336 0 : case STATE_RUNNING:
337 0 : status = source_scan(equipment[0].format, eq_info);
338 0 : switch (status) {
339 0 : case BM_ASYNC_RETURN: // No event found for now, Check for timeout
340 :
341 0 : if (1) {
342 : /* advanced checking for timeouts */
343 :
344 0 : time_t now = time(NULL);
345 0 : int empty = 1;
346 0 : int badfrag = -1;
347 :
348 : /* check if we recieved any data */
349 0 : for (fragn = 0; fragn < nfragment; fragn++)
350 0 : if (ebset.received[fragn])
351 0 : empty = 0;
352 :
353 : /* only look for timeout if there is received data from any fragment */
354 0 : if (!empty)
355 0 : for (fragn = 0; fragn < nfragment; fragn++) {
356 : //if (debug)
357 : // printf("frag %d, timeout %d, threshold %d, received %d, time %d\n", fragn, ebch[fragn].timeout, TIMEOUT, ebset.received[fragn], now - ebch[fragn].time);
358 0 : if (ebch[fragn].time && ebch[fragn].timeout)
359 0 : if (now - ebch[fragn].time > TIMEOUT_ABORT) {
360 : //cm_msg(MERROR, "scan_fragment", "frag %d, timeout %d, %d sec", fragn, ebch[fragn].timeout, now - ebch[fragn].time);
361 0 : badfrag = fragn;
362 : }
363 : }
364 :
365 0 : if (badfrag >= 0) {
366 : //status = SS_ABORT;
367 0 : if (!waiting_for_stop && !stop_requested) {
368 0 : cm_msg(MERROR, "scan_fragment", "timeout waiting for fragment %d, restarting run", badfrag);
369 0 : cm_msg(MINFO, "scan_fragment", "spawning mtransition");
370 0 : ss_system("mtransition STOP IF \"/Logger/Auto restart\" DELAY \"/Logger/Auto restart delay\" START &");
371 0 : waiting_for_stop = TRUE;
372 : }
373 : break;
374 : }
375 : }
376 :
377 0 : for (fragn = 0; fragn < nfragment; fragn++) {
378 0 : if (ebch[fragn].timeout > TIMEOUT) { /* Timeout */
379 0 : if (stop_requested) { /* Stop */
380 0 : if (debug)
381 0 : printf("Timeout waiting for fragment %d while stopping the run\n", fragn);
382 0 : status = close_buffers();
383 0 : break;
384 : } else {
385 : /* No stop requested but timeout, allow a yield to not
386 : eat all the CPU */
387 0 : status = cm_yield(10);
388 0 : if (wheel) {
389 0 : printf("...%c Timing on %1.0lf\r", bars[i_bar++ % 4], eq->stats.events_sent);
390 0 : fflush(stdout);
391 : }
392 :
393 : }
394 : }
395 : //else { /* No timeout loop back */
396 : } // for loop over all fragments
397 : break;
398 0 : case EB_ERROR:
399 0 : case EB_USER_ERROR:
400 0 : abort_requested = TRUE;
401 0 : if (status == EB_USER_ERROR)
402 0 : cm_msg(MTALK, "scan_fragment", "%s: Error signaled by user code - stopping run...",
403 : frontend_name);
404 : else
405 0 : cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", frontend_name);
406 :
407 0 : if (debug)
408 0 : printf("Stop requested on Error %d\n", status);
409 0 : close_buffers();
410 :
411 : #if 0
412 : if (!waiting_for_stop && !stop_requested) {
413 : cm_msg(MINFO, "scan_fragment", "spawning mtransition");
414 : //ss_system("mtransition STOP IF \"/Logger/Auto restart\" DELAY \"/Logger/Auto restart delay\" START &");
415 : ss_system("mtransition STOP &");
416 : waiting_for_stop = TRUE;
417 : }
418 : #endif
419 0 : for (ch=0; ch<5; ch++) {
420 0 : if (cm_transition(TR_STOP, 0, NULL, 0, TR_SYNC, 0) == CM_SUCCESS)
421 : break;
422 0 : cm_msg(MERROR, "scan_fragment", "%s: Stop Transition request failed, trying again!", frontend_name);
423 : }
424 : return status;
425 : break;
426 : case EB_SUCCESS:
427 : case EB_SKIP:
428 : // Normal path if event has been assembled
429 : // No yield in this case.
430 : break;
431 0 : default:
432 0 : cm_msg(MERROR, "scan_fragment", "unexpected return %d", status);
433 0 : status = SS_ABORT;
434 : } // switch scan_source
435 : break;
436 : }
437 : /* EB job done, update statistics if its time */
438 : /* Check if it's time to do statistics job */
439 0 : if ((actual_millitime = ss_millitime()) - last_time > 1000) {
440 : /* Force event to appear at the destination if Ebuilder is remote */
441 0 : rpc_flush_event();
442 : /* Force event ot appear at the destination if Ebuilder is local */
443 0 : bm_flush_cache(equipment[0].buffer_handle, BM_NO_WAIT);
444 :
445 0 : status = cm_yield(10);
446 :
447 0 : eq = &equipment[0];
448 0 : eq->stats.events_sent += eq->events_sent;
449 0 : eq->stats.events_per_sec = eq->events_sent / ((actual_millitime - last_time) / 1000.0);
450 0 : eq->stats.kbytes_per_sec = eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) / 1000.0);
451 0 : eq->bytes_sent = 0;
452 0 : eq->events_sent = 0;
453 : /* update destination statistics */
454 0 : db_send_changed_records();
455 : /* Keep track of last ODB update */
456 0 : last_time = ss_millitime();
457 : }
458 :
459 0 : ch = 0;
460 0 : if (ss_kbhit()) {
461 0 : ch = ss_getchar(0);
462 0 : if (ch == -1)
463 0 : ch = getchar();
464 0 : if ((char) ch == '!')
465 : break;
466 : }
467 0 : } while (status != RPC_SHUTDOWN && status != SS_ABORT);
468 :
469 : return status;
470 : }
471 :
472 : /********************************************************************/
473 0 : INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
474 : {
475 0 : BANK_HEADER *psbh, *pdbh;
476 0 : char *psdata, *pddata;
477 0 : INT bksize;
478 :
479 : /* Condition for new EVENT the data_size should be ZERO */
480 0 : *size = ((EVENT_HEADER *) pdest)->data_size;
481 :
482 : /* destination pointer */
483 0 : pddata = pdest + *size + sizeof(EVENT_HEADER);
484 :
485 0 : if (*size) {
486 : /* NOT the first fragment */
487 :
488 : /* Swap event source if necessary */
489 0 : psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
490 0 : bk_swap(psbh, FALSE);
491 :
492 : /* source pointer */
493 0 : psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
494 0 : psdata = (char *) (psbh + 1);
495 :
496 : /* copy all banks without the bank header */
497 0 : bksize = psbh->data_size;
498 :
499 : /* copy */
500 0 : memcpy(pddata, psdata, bksize);
501 :
502 : /* update event size */
503 0 : ((EVENT_HEADER *) pdest)->data_size += bksize;
504 :
505 : /* update bank size */
506 0 : pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
507 0 : pdbh->data_size += bksize;
508 :
509 0 : *size = ((EVENT_HEADER *) pdest)->data_size;
510 : } else {
511 : /* First event without the event header but with the
512 : bank header as the size is zero */
513 0 : *size = ((EVENT_HEADER *) psrce)->data_size;
514 :
515 : /* Swap event if necessary */
516 0 : psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
517 0 : bk_swap(psbh, FALSE);
518 :
519 : /* copy first fragment */
520 0 : memcpy(pddata, psbh, *size);
521 :
522 : /* update destination event size */
523 0 : ((EVENT_HEADER *) pdest)->data_size = *size;
524 : }
525 0 : return CM_SUCCESS;
526 : }
527 :
528 : /*--------------------------------------------------------------------*/
529 0 : INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
530 : {
531 : /* pdest : EVENT_HEADER pointer
532 : psrce : EVENT_HEADER pointer
533 : Keep pbkh for later incrementation
534 : */
535 0 : char *psdata, *pddata;
536 0 : DWORD *pslrl, *pdlrl;
537 0 : INT i4frgsize, i1frgsize;
538 :
539 : /* Condition for new EVENT the data_size should be ZERO */
540 0 : *size = ((EVENT_HEADER *) pdest)->data_size;
541 :
542 : /* destination pointer skip the header as it has been already
543 : composed and the usere may have modified it on purpose (Midas Control) */
544 0 : pddata = pdest + *size + sizeof(EVENT_HEADER);
545 :
546 : /* the Midas header is present for logger */
547 0 : if (*size) { /* already filled with a fragment */
548 :
549 : /* source pointer: number of DWORD (lrl included) */
550 0 : pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
551 :
552 : /* Swap event if necessary */
553 0 : md_event_swap(FORMAT_MIDAS, pslrl);
554 :
555 : /* copy done in bytes, do not include LRL */
556 0 : psdata = (char *) (pslrl + 1);
557 :
558 : /* copy size in I*4 (lrl included, remove it) */
559 0 : i4frgsize = (*pslrl);
560 0 : i1frgsize = 4 * i4frgsize;
561 :
562 : /* append fragment */
563 0 : memcpy(pddata, psdata, i1frgsize);
564 :
565 : /* update Midas header event size */
566 0 : ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
567 :
568 : /* update LRL size (I*4) */
569 0 : pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
570 0 : *pdlrl += i4frgsize;
571 :
572 : /* Return event size in bytes */
573 0 : *size = ((EVENT_HEADER *) pdest)->data_size;
574 : } else { /* new destination event */
575 : /* The composed event has already the MIDAS header.
576 : which may have been modified by the user in ebuser.c
577 : Will be stripped by the logger (YBOS).
578 : Copy the first full event ( no EVID suppression )
579 : First event (without the event header) */
580 :
581 : /* source pointer */
582 0 : pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
583 :
584 : /* Swap event if necessary */
585 0 : md_event_swap(FORMAT_MIDAS, pslrl);
586 :
587 : /* size in byte from the source midas header */
588 0 : *size = ((EVENT_HEADER *) psrce)->data_size;
589 :
590 : /* copy first fragment */
591 0 : memcpy(pddata, (char *) pslrl, *size);
592 :
593 : /* update destination Midas header event size */
594 0 : ((EVENT_HEADER *) pdest)->data_size += *size;
595 :
596 : }
597 0 : return CM_SUCCESS;
598 : }
599 :
600 : /*--------------------------------------------------------------------*/
601 0 : INT tr_start(INT rn, char *error)
602 : {
603 0 : EBUILDER(ebuilder_str);
604 0 : INT status, size, i;
605 0 : char str[128];
606 0 : KEY key;
607 0 : HNDLE hKey, hEqkey, hEqFRkey;
608 0 : EQUIPMENT_INFO *eq_info;
609 :
610 0 : if (debug)
611 0 : printf("tr_start: run %d\n", rn);
612 :
613 0 : eq_info = &equipment[0].info;
614 :
615 : /* Get update eq_info from ODB */
616 0 : sprintf(str, "/Equipment/%s/Common", equipment[0].name);
617 0 : status = db_find_key(hDB, 0, str, &hKey);
618 0 : size = sizeof(EQUIPMENT_INFO);
619 0 : db_get_record(hDB, hKey, eq_info, &size, 0);
620 :
621 0 : ebset.nfragment = nfragment;
622 :
623 : /* reset serial numbers */
624 0 : for (i = 0; equipment[i].name[0]; i++) {
625 0 : equipment[i].serial_number = 1;
626 0 : equipment[i].subevent_number = 0;
627 0 : equipment[i].stats.events_sent = 0;
628 0 : equipment[i].odb_in = equipment[i].odb_out = 0;
629 : }
630 :
631 : /* Get / Set Settings */
632 0 : sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
633 0 : if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
634 0 : status = db_create_record(hDB, 0, str, strcomb1(ebuilder_str).c_str());
635 : }
636 :
637 : /* Keep Key on Ebuilder/Settings */
638 0 : sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
639 0 : if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
640 0 : cm_msg(MINFO, "tr_start", "/Equipment/%s/Settings not found", equipment[0].name);
641 : }
642 :
643 : /* Update or Create User_field */
644 0 : size = sizeof(ebset.user_field);
645 0 : status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
646 :
647 : /* Update or Create User_Build */
648 0 : size = sizeof(ebset.user_build);
649 0 : status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
650 :
651 : /* update ODB */
652 0 : size = sizeof(INT);
653 0 : status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);
654 :
655 : /* Create or update the fragment request list */
656 0 : status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
657 0 : status = db_get_key(hDB, hEqFRkey, &key);
658 0 : assert(status == DB_SUCCESS);
659 :
660 0 : if (key.num_values != ebset.nfragment) {
661 0 : cm_msg(MINFO, "tr_start", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
662 0 : free(ebset.preqfrag);
663 0 : size = ebset.nfragment * sizeof(BOOL);
664 0 : ebset.preqfrag = (BOOL*)malloc(size);
665 0 : for (i = 0; i < ebset.nfragment; i++)
666 0 : ebset.preqfrag[i] = TRUE;
667 0 : status =
668 0 : db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);
669 : } else { // Take from ODBedit
670 0 : size = key.total_size;
671 0 : free(ebset.preqfrag);
672 0 : ebset.preqfrag = (BOOL*)malloc(size);
673 0 : status = db_get_data(hDB, hEqFRkey, ebset.preqfrag, &size, TID_BOOL);
674 : }
675 : /* Cleanup fragment flags */
676 0 : free(ebset.received);
677 0 : ebset.received = (BOOL*)malloc(size);
678 0 : for (i = 0; i < ebset.nfragment; i++)
679 0 : ebset.received[i] = FALSE;
680 :
681 : /* Check if at least one fragment is requested */
682 0 : for (i = 0; i < ebset.nfragment; i++)
683 0 : if (ebset.preqfrag[i])
684 : break;
685 :
686 0 : if (i == ebset.nfragment) {
687 0 : cm_msg(MERROR, "tr_start", "Run start aborted because no fragment required");
688 0 : return 0;
689 : }
690 :
691 : /* Call BOR user function */
692 0 : status = eb_begin_of_run(run_number, ebset.user_field, error);
693 0 : if (status != EB_SUCCESS) {
694 0 : cm_msg(MERROR, "tr_start", "run start aborted due to eb_begin_of_run (%d)", status);
695 0 : return status;
696 : }
697 :
698 : /* Book all fragment */
699 0 : status = source_booking();
700 0 : if (status != SUCCESS)
701 : return status;
702 :
703 0 : if (!eq_info->enabled) {
704 0 : cm_msg(MINFO, "tr_start", "Event Builder disabled");
705 0 : return CM_SUCCESS;
706 : }
707 :
708 : /* local run state */
709 0 : run_state = STATE_RUNNING;
710 0 : run_number = rn;
711 0 : stop_requested = FALSE;
712 0 : abort_requested = FALSE;
713 0 : printf("%s-Starting New Run: %d\n", frontend_name, rn);
714 :
715 0 : if (1) {
716 0 : int fragn;
717 0 : time_t now = time(NULL);
718 :
719 : // reset timeouts
720 0 : for (fragn = 0; fragn < nfragment; fragn++) {
721 0 : ebch[fragn].time = now;
722 0 : ebch[fragn].timeout = 0;
723 : }
724 : }
725 :
726 : /* Reset global trigger mask */
727 : return CM_SUCCESS;
728 : }
729 :
730 : /*--------------------------------------------------------------------*/
731 0 : INT tr_resume(INT rn, char *error)
732 : {
733 0 : int fragn;
734 0 : time_t now = time(NULL);
735 :
736 0 : printf("\n%s-Resume Run: %d detected\n", frontend_name, rn);
737 :
738 : // reset timeouts
739 0 : for (fragn = 0; fragn < nfragment; fragn++) {
740 0 : ebch[fragn].time = now;
741 0 : ebch[fragn].timeout = 0;
742 : }
743 :
744 0 : run_state = STATE_RUNNING;
745 0 : return CM_SUCCESS;
746 : }
747 :
748 : /*--------------------------------------------------------------------*/
749 0 : INT tr_pause(INT rn, char *error)
750 : {
751 0 : printf("\n%s-Pause Run: %d detected\n", frontend_name, rn);
752 :
753 0 : run_state = STATE_PAUSED;
754 0 : return CM_SUCCESS;
755 : }
756 :
757 : /*--------------------------------------------------------------------*/
758 0 : INT tr_stop(INT rn, char *error)
759 : {
760 0 : waiting_for_stop = FALSE;
761 :
762 0 : if (debug)
763 0 : printf("tr_stop: run %d\n", rn);
764 :
765 : /* local stop */
766 0 : stop_requested = TRUE;
767 :
768 : /* local stop time */
769 0 : request_stop_time = ss_millitime();
770 0 : return CM_SUCCESS;
771 : }
772 :
773 : /*--------------------------------------------------------------------*/
774 0 : void free_event_buffer(INT nfrag)
775 : {
776 0 : INT i;
777 0 : for (i = 0; i < nfrag; i++) {
778 0 : if (ebch[i].pfragment) {
779 0 : free(ebch[i].pfragment);
780 0 : ebch[i].pfragment = NULL;
781 : }
782 : }
783 0 : }
784 :
785 : /*--------------------------------------------------------------------*/
786 0 : INT handFlush()
787 : {
788 0 : int i, size, status;
789 0 : char strout[256];
790 :
791 : /* Do Hand flush until better way to garantee the input buffer to be empty */
792 0 : if (debug)
793 0 : printf("Hand flushing system buffer... \n");
794 0 : for (i = 0; i < nfragment; i++) {
795 0 : do {
796 0 : status = 0;
797 0 : if (ebset.preqfrag[i]) {
798 0 : size = max_event_size;
799 0 : status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, BM_NO_WAIT);
800 0 : if (debug1) {
801 0 : sprintf(strout,
802 : "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
803 0 : i, ebch[i].hBuf, status, ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
804 0 : printf("%s\n", strout);
805 : }
806 : }
807 0 : } while (status == BM_SUCCESS);
808 : }
809 :
810 : /* Empty source buffer */
811 0 : status = bm_empty_buffers();
812 0 : if (status != BM_SUCCESS)
813 0 : cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
814 0 : run_state = STATE_STOPPED;
815 0 : return status;
816 : }
817 :
818 :
819 : /*--------------------------------------------------------------------*/
820 0 : INT source_booking()
821 : {
822 0 : INT j, i, status, status1, status2;
823 :
824 0 : if (debug)
825 0 : printf("Entering booking\n");
826 :
827 : status1 = status2 = 0;
828 :
829 : /* Book all the source channels */
830 0 : for (i = 0; i < nfragment; i++) {
831 : /* Book only the requested event mask */
832 0 : if (ebset.preqfrag[i]) {
833 : /* Connect channel to source buffer */
834 0 : status1 = bm_open_buffer(ebch[i].buffer, DEFAULT_BUFFER_SIZE, &(ebch[i].hBuf));
835 :
836 0 : if (debug)
837 0 : printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
838 0 : i, ebch[i].buffer, ebch[i].hBuf, status1);
839 :
840 0 : if (1)
841 : {
842 0 : int level = 0;
843 0 : bm_get_buffer_level(ebch[i].hBuf, &level);
844 0 : printf("Buffer %s, level %d, info: \n", ebch[i].buffer, level);
845 : }
846 :
847 :
848 : /* Register for specified channel event ID and Trigger mask */
849 0 : status2 =
850 0 : bm_request_event(ebch[i].hBuf, ebch[i].event_id,
851 : TRIGGER_ALL, GET_ALL, &ebch[i].req_id, NULL);
852 0 : if (debug)
853 0 : printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
854 0 : i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
855 0 : if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
856 0 : ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
857 0 : cm_msg(MERROR, "source_booking",
858 : "Open buffer/event request failure [%d %d %d]", i, status1, status2);
859 0 : return BM_CONFLICT;
860 : }
861 :
862 : /* allocate local source event buffer */
863 0 : if (ebch[i].pfragment)
864 0 : free(ebch[i].pfragment);
865 0 : ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
866 0 : if (debug)
867 0 : printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
868 0 : if (ebch[i].pfragment == NULL) {
869 0 : free_event_buffer(nfragment);
870 0 : cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
871 0 : return BM_NO_MEMORY;
872 : }
873 : }
874 : }
875 :
876 : /* Empty source buffer */
877 0 : status = bm_empty_buffers();
878 0 : if (status != BM_SUCCESS) {
879 0 : cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
880 0 : return status;
881 : }
882 :
883 0 : if (debug) {
884 0 : printf("bm_empty_buffers stat:%d\n", status);
885 0 : for (j = 0; j < ebset.nfragment; j++) {
886 0 : printf(" buff:%s", ebch[j].buffer);
887 0 : printf(" ser#:%d", ebch[j].serial);
888 0 : printf(" hbuf:%2d", ebch[j].hBuf);
889 0 : printf(" rqid:%2d", ebch[j].req_id);
890 0 : printf(" opst:%d", status1);
891 0 : printf(" rqst:%d", status2);
892 0 : printf(" evid:%2d", ebch[j].event_id);
893 0 : printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
894 : }
895 : }
896 :
897 : return SUCCESS;
898 : }
899 :
900 : /*--------------------------------------------------------------------*/
901 0 : INT source_unbooking()
902 : {
903 0 : INT i, status;
904 :
905 : /* unbook all source channels */
906 0 : for (i = 0; i < nfragment; i++) {
907 :
908 : /* Skip unbooking if already done */
909 0 : if (ebch[i].pfragment != NULL) {
910 0 : bm_empty_buffers();
911 :
912 : /* Remove event ID registration */
913 0 : status = bm_delete_request(ebch[i].req_id);
914 0 : if (debug)
915 0 : printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id, status);
916 :
917 : /* Close source buffer */
918 0 : status = bm_close_buffer(ebch[i].hBuf);
919 0 : if (debug)
920 0 : printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf, status);
921 0 : if (status != BM_SUCCESS) {
922 0 : cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat: %d", i, status);
923 0 : return status;
924 : }
925 : }
926 : }
927 :
928 : /* release local event buffer memory */
929 0 : free_event_buffer(nfragment);
930 :
931 0 : return EB_SUCCESS;
932 : }
933 :
934 : /*--------------------------------------------------------------------*/
935 0 : INT close_buffers(void)
936 : {
937 0 : INT status;
938 0 : char error[256];
939 0 : EQUIPMENT *eq;
940 :
941 0 : eq = &equipment[0];
942 :
943 : /* Flush local destination cache */
944 0 : bm_flush_cache(equipment[0].buffer_handle, BM_WAIT);
945 : /* Call user function */
946 0 : eb_end_of_run(run_number, error);
947 : /* Cleanup buffers */
948 0 : handFlush();
949 : /* Detach all source from midas */
950 0 : status = source_unbooking();
951 :
952 : /* Compose message */
953 0 : stop_time = ss_millitime() - request_stop_time;
954 0 : sprintf(error, "Run %d Stop after %1.0lf + %d events sent DT:%d[ms]",
955 : run_number, eq->stats.events_sent, eq->events_sent, stop_time);
956 0 : cm_msg(MINFO, "close_buffers", "%s", error);
957 :
958 0 : run_state = STATE_STOPPED;
959 0 : abort_requested = FALSE;
960 0 : return status;
961 : }
962 :
963 : /********************************************************************/
964 : /**
965 : Scan all the fragment source once per call.
966 :
967 : -# This will retrieve the full midas event not swapped (except the
968 : MIDAS_HEADER) for each fragment if possible. The fragment will
969 : be stored in the channel event pointer.
970 : -# if after a full nfrag path some frag are still not cellected, it
971 : returns with the frag# missing for timeout check.
972 : -# If ALL fragments are present it will check the midas serial#
973 : for a full match across all the fragments.
974 : -# If the serial check fails it returns with "event mismatch"
975 : and will abort the event builder but not stop the run for now.
976 : -# If the serial check is passed, it will call the user_build function
977 : where the destination event is going to be composed.
978 :
979 : @param fmt Fragment format type
980 : @param eq_info Equipement pointer
981 : @return EB_NO_MORE_EVENT, EB_COMPOSE_TIMEOUT
982 : if different then SUCCESS (bm_compose, rpc_sent error)
983 : */
984 0 : INT source_scan(INT fmt, EQUIPMENT_INFO * eq_info)
985 : {
986 0 : static DWORD serial;
987 0 : INT i, status, size;
988 0 : INT act_size;
989 0 : BOOL found, event_mismatch;
990 0 : BANK_HEADER *psbh;
991 :
992 0 : status = 0;
993 :
994 : /* Scan all channels at least once */
995 0 : for (i = 0; i < nfragment; i++) {
996 : /* Check if current channel needs to be received */
997 0 : if (ebset.preqfrag[i] && !ebset.received[i]) {
998 : /* Get fragment and store it in ebch[i].pfragment */
999 0 : size = max_event_size;
1000 0 : status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, BM_NO_WAIT);
1001 : //printf("call bm_receive_event from %s, serial %d, status %d\n", ebch[i].buffer, serial, status);
1002 0 : switch (status) {
1003 0 : case BM_SUCCESS: /* event received */
1004 : /* Mask event */
1005 0 : ebset.received[i] = TRUE;
1006 : /* Keep local serial */
1007 0 : ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
1008 : /* clear timeout */
1009 0 : ebch[i].timeout = 0;
1010 0 : ebch[i].time = time(NULL);
1011 :
1012 : /* Swap event depending on data format */
1013 0 : switch (fmt) {
1014 0 : case FORMAT_MIDAS:
1015 0 : psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
1016 0 : bk_swap(psbh, FALSE);
1017 : break;
1018 : }
1019 :
1020 0 : if (debug1) {
1021 0 : printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i, ebch[i].serial, ebset.received[i], size);
1022 : }
1023 : break;
1024 0 : case BM_ASYNC_RETURN: /* timeout */
1025 0 : ebch[i].timeout++;
1026 0 : if (debug1) {
1027 0 : printf("ASYNC: ch:%d ser:%d rec:%d sz:%d, timeout:%d\n", i, ebch[i].serial, ebset.received[i], size, ebch[i].timeout);
1028 : }
1029 : //return status;
1030 : break;
1031 0 : default: /* Error */
1032 0 : cm_msg(MERROR, "source_scan", "bm_receive_event error %d", status);
1033 0 : return status;
1034 : break;
1035 : }
1036 : } /* next channel */
1037 : }
1038 :
1039 : /* Check if all fragments have been received */
1040 0 : for (i = 0; i < nfragment; i++) {
1041 0 : if (ebset.preqfrag[i] && !ebset.received[i])
1042 : break;
1043 : }
1044 0 : if (i == nfragment) {
1045 : /* Check if serial matches */
1046 0 : found = event_mismatch = FALSE;
1047 0 : serial = 0;
1048 : /* Check Serial, mark first serial */
1049 0 : for (i = 0; i < nfragment; i++) {
1050 0 : if (ebset.preqfrag[i] && ebset.received[i] && !found) {
1051 0 : serial = ebch[i].serial;
1052 0 : found = TRUE;
1053 : } else {
1054 0 : if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
1055 : /* Event mismatch */
1056 0 : event_mismatch = TRUE;
1057 : }
1058 : }
1059 : }
1060 :
1061 : /* internal action in case of event mismatch */
1062 0 : if (event_mismatch && debug) {
1063 0 : char str[256];
1064 0 : char strsub[128];
1065 0 : strcpy(str, "event mismatch: ");
1066 0 : for (i = 0; i < nfragment; i++) {
1067 0 : sprintf(strsub, "Ser[%d]:%d ", i, ebch[i].serial);
1068 0 : strcat(str, strsub);
1069 : }
1070 0 : printf("event serial mismatch %s\n", str);
1071 : }
1072 :
1073 : /* In any case reset destination buffer */
1074 0 : memset(dest_event, 0, sizeof(EVENT_HEADER));
1075 0 : act_size = 0;
1076 :
1077 : /* Fill reserved header space of destination event with
1078 : final header information */
1079 0 : bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
1080 : act_size, serial);
1081 :
1082 : /* Pass fragments to user with mismatch flag, for final check before assembly */
1083 0 : status =
1084 0 : eb_user(nfragment, event_mismatch, ebch, (EVENT_HEADER *) dest_event,
1085 0 : (void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
1086 0 : if (status != EB_SUCCESS) {
1087 0 : if (status == EB_SKIP) {
1088 : /* Reset mask and timeouts as if event has been successfully send out */
1089 0 : for (i = 0; i < nfragment; i++) {
1090 0 : ebch[i].timeout = 0;
1091 0 : ebset.received[i] = FALSE;
1092 : }
1093 : }
1094 0 : return status; // Event mark as EB_SKIP or EB_ABORT by user
1095 : }
1096 :
1097 : /* Allow bypass of fragment assembly if user did it on its own */
1098 0 : if (!ebset.user_build) {
1099 0 : for (i = 0; i < nfragment; i++) {
1100 0 : if (ebset.preqfrag[i]) {
1101 0 : status = meb_fragment_add(dest_event, ebch[i].pfragment, &act_size);
1102 0 : if (status != EB_SUCCESS) {
1103 0 : cm_msg(MERROR, "source_scan",
1104 : "compose fragment:%d current size:%d (%d)", i, act_size, status);
1105 0 : return EB_ERROR;
1106 : }
1107 : }
1108 : }
1109 : }
1110 :
1111 : /* Overall event to be sent */
1112 0 : act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
1113 :
1114 : /* Send event and wait for completion */
1115 0 : status = rpc_send_event(equipment[0].buffer_handle, (EVENT_HEADER *) dest_event, act_size, BM_WAIT, 0);
1116 0 : if (status != BM_SUCCESS) {
1117 0 : if (debug)
1118 0 : printf("rpc_send_event returned error %d, event_size %d\n", status, act_size);
1119 0 : cm_msg(MERROR, "source_scan", "%s: rpc_send_event returned error %d", frontend_name, status);
1120 0 : return EB_ERROR;
1121 : }
1122 :
1123 : /* Keep track of the total byte count */
1124 0 : equipment[0].bytes_sent += act_size;
1125 :
1126 : /* update destination event count */
1127 0 : equipment[0].events_sent++;
1128 :
1129 : /* Reset mask and timeouts as even thave been succesfully send */
1130 0 : for (i = 0; i < nfragment; i++) {
1131 0 : ebch[i].timeout = 0;
1132 0 : ebset.received[i] = FALSE;
1133 : }
1134 : } // all fragment recieved for this event
1135 :
1136 : return status;
1137 : }
1138 :
1139 : /*--------------------------------------------------------------------*/
1140 0 : int main(int argc, char **argv)
1141 : {
1142 0 : INT status, size, rstate;
1143 0 : int i;
1144 0 : BOOL daemon = FALSE;
1145 0 : HNDLE hEqkey;
1146 0 : EBUILDER(ebuilder_str);
1147 0 : char str[128];
1148 0 : int auto_restart = 0;
1149 0 : int restart_count = 0;
1150 :
1151 : /* init structure */
1152 0 : memset(&ebch[0], 0, sizeof(ebch));
1153 :
1154 : /* set default */
1155 0 : cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
1156 :
1157 : /* set default buffer name */
1158 0 : strcpy(buffer_name, "SYSTEM");
1159 :
1160 : /* get parameters */
1161 0 : for (i = 1; i < argc; i++) {
1162 0 : if (argv[i][0] == '-' && argv[i][1] == 'd')
1163 0 : debug = TRUE;
1164 0 : else if (argv[i][0] == '-' && argv[i][1] == 'D')
1165 : daemon = TRUE;
1166 0 : else if (argv[i][0] == '-' && argv[i][1] == 'w')
1167 0 : wheel = TRUE;
1168 0 : else if (argv[i][0] == '-') {
1169 0 : if (i + 1 >= argc || argv[i + 1][0] == '-')
1170 0 : goto usage;
1171 0 : if (strncmp(argv[i], "-e", 2) == 0)
1172 0 : strcpy(expt_name, argv[++i]);
1173 0 : else if (strncmp(argv[i], "-h", 2) == 0)
1174 0 : strcpy(host_name, argv[++i]);
1175 0 : else if (strncmp(argv[i], "-b", 2) == 0)
1176 0 : strcpy(buffer_name, argv[++i]);
1177 : } else {
1178 0 : usage:
1179 0 : printf("usage: mevb [-h <Hostname>] [-e <Experiment>] [-b <buffername>] [-d] [-w] [-D]\n");
1180 0 : printf(" [-h <Hostname>] Host where midas experiment is running on\n");
1181 0 : printf(" [-e <Experiment>] Midas experiment if more than one exists\n");
1182 0 : printf(" [-b <buffername>] Specify evnet buffer name, use \"SYSTEM\" by default\n");
1183 0 : printf(" [-d] Print debugging output\n");
1184 0 : printf(" [-w] Show wheel\n");
1185 0 : printf(" [-D] Start as a daemon\n");
1186 0 : return 0;
1187 : }
1188 : }
1189 :
1190 0 : printf("MIDAS example event builder. Press \"!\" to exit.\n");
1191 :
1192 0 : if (daemon) {
1193 0 : printf("Becoming a daemon...\n");
1194 0 : ss_daemon_init(FALSE);
1195 : }
1196 :
1197 : /* Connect to experiment */
1198 0 : status = cm_connect_experiment(host_name, expt_name, frontend_name, NULL);
1199 0 : if (status != CM_SUCCESS) {
1200 0 : ss_sleep(5000);
1201 0 : goto exit;
1202 : }
1203 :
1204 0 : if (debug)
1205 0 : cm_set_watchdog_params(TRUE, 0);
1206 :
1207 : /* Connect to ODB */
1208 0 : status = cm_get_experiment_database(&hDB, &hKey);
1209 0 : if (status != EB_SUCCESS) {
1210 0 : ss_sleep(5000);
1211 0 : goto exit;
1212 : }
1213 :
1214 : /* check if Ebuilder is already running */
1215 0 : status = cm_exist(frontend_name, FALSE);
1216 0 : if (status == CM_SUCCESS) {
1217 0 : cm_msg(MERROR, "main", "%s running already!.", frontend_name);
1218 0 : cm_disconnect_experiment();
1219 0 : goto exit;
1220 : }
1221 :
1222 : /* Check if run in progess if so abort */
1223 0 : size = sizeof(rstate);
1224 0 : db_get_value(hDB, 0, "/Runinfo/State", &rstate, &size, TID_INT, FALSE);
1225 0 : if (rstate != STATE_STOPPED) {
1226 0 : cm_msg(MERROR, "main", "Run in Progress, EBuilder aborted!.");
1227 0 : cm_disconnect_experiment();
1228 0 : goto exit;
1229 : }
1230 :
1231 0 : if (ebuilder_init() != SUCCESS) {
1232 0 : cm_disconnect_experiment();
1233 : /* let user read message before window might close */
1234 0 : ss_sleep(5000);
1235 0 : goto exit;
1236 : }
1237 :
1238 : /* Register single equipment */
1239 0 : status = register_equipment();
1240 0 : if (status != EB_SUCCESS) {
1241 0 : ss_sleep(5000);
1242 0 : goto exit;
1243 : }
1244 :
1245 : /* Load Fragment info */
1246 0 : status = load_fragment();
1247 0 : if (status != EB_SUCCESS) {
1248 0 : ss_sleep(5000);
1249 0 : goto exit;
1250 : }
1251 :
1252 : /* Register transition for reset counters */
1253 0 : if (cm_register_transition(TR_START, tr_start, 400) != CM_SUCCESS)
1254 : return status;
1255 0 : if (cm_register_transition(TR_RESUME, tr_resume, 400) != CM_SUCCESS)
1256 : return status;
1257 0 : if (cm_register_transition(TR_PAUSE, tr_pause, 600) != CM_SUCCESS)
1258 0 : goto exit;
1259 0 : if (cm_register_transition(TR_STOP, tr_stop, 600) != CM_SUCCESS)
1260 0 : goto exit;
1261 :
1262 0 : restart:
1263 :
1264 : /* Set Initial EB/Settings */
1265 0 : sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
1266 0 : if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
1267 0 : status = db_create_record(hDB, 0, str, strcomb1(ebuilder_str).c_str());
1268 : }
1269 :
1270 0 : if (auto_restart && restart_count > 0)
1271 : {
1272 0 : int run_number = 0;
1273 0 : int size = sizeof(run_number);
1274 0 : status = db_get_value(hDB, 0, "Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
1275 0 : assert(status == SUCCESS);
1276 :
1277 0 : cm_msg(MINFO, frontend_name, "Restart the run!");
1278 :
1279 0 : cm_transition(TR_START, run_number+1, NULL, 0, TR_SYNC, 0);
1280 : }
1281 :
1282 : /* initialize ss_getchar */
1283 0 : ss_getchar(0);
1284 :
1285 : /* Scan fragments... will stay in */
1286 0 : status = scan_fragment();
1287 0 : printf("%s-Out of scan_fragment\n", frontend_name);
1288 :
1289 : /* Detach all source from midas */
1290 0 : printf("%s-Unbooking\n", frontend_name);
1291 0 : source_unbooking();
1292 :
1293 0 : ebuilder_exit();
1294 :
1295 0 : auto_restart = 0;
1296 0 : db_get_value(hDB, 0, "/Logger/Auto restart", &auto_restart, &size, TID_BOOL, FALSE);
1297 :
1298 0 : cm_msg(MINFO, frontend_name, "evb exit status %d, auto_restart %d", status, auto_restart);
1299 :
1300 0 : if (status == EB_USER_ERROR)
1301 : {
1302 0 : restart_count ++;
1303 0 : goto restart;
1304 : }
1305 :
1306 : /* reset terminal */
1307 0 : ss_getchar(TRUE);
1308 :
1309 0 : exit:
1310 : /* Free local memory */
1311 0 : free_event_buffer(ebset.nfragment);
1312 :
1313 : /* Clean disconnect from midas */
1314 0 : cm_disconnect_experiment();
1315 : return 0;
1316 : }
|