LCOV - code coverage report
Current view: top level - examples/eventbuilder - mevb.cxx (source / functions) Coverage Total Hit
Test: coverage.info Lines: 0.0 % 644 0
Test Date: 2025-11-11 10:26:08 Functions: 0.0 % 16 0

            Line data    Source code
       1              : /********************************************************************\
       2              : 
       3              :    Name:         mevb.c
       4              :    Created by:   Pierre-Andre Amaudruz
       5              : 
       6              :    Contents:     Main Event builder task.
       7              : 
       8              :    $Id$
       9              : 
      10              : \********************************************************************/
      11              : 
      12              : /**dox***************************************************************/
      13              : /* @file mevb.c
      14              : The Event builder main file 
      15              : */
      16              : 
      17              : #include <stdio.h>
      18              : #include "midas.h"
      19              : #include "mevb.h"
      20              : #include "msystem.h"
      21              : #include "mdsupport.h"
      22              : #include "mfe.h"
      23              : 
      24              : #define SERVER_CACHE_SIZE  100000       /* event cache before buffer */
      25              : 
      26              : #define ODB_UPDATE_TIME      1000       /* 1 seconds for ODB update */
      27              : 
      28              : #define DEFAULT_FE_TIMEOUT  60000       /* 60 seconds for watchdog timeout */
      29              : 
      30              : //RP#define TIMEOUT_ABORT          10       /* seconds waiting for data before aborting run */
      31              : //#define TIMEOUT_ABORT          120       /* seconds waiting for data before aborting run */
      32              : #define TIMEOUT_ABORT          300       /* seconds waiting for data before aborting run */
      33              : 
      34              : EBUILDER_SETTINGS ebset;
      35              : EBUILDER_CHANNEL ebch[MAX_CHANNELS];
      36              : 
      37              : INT run_state;                  /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
      38              : INT run_number;
      39              : DWORD last_time;
      40              : DWORD actual_time;              /* current time in seconds since 1970 */
      41              : DWORD actual_millitime;         /* current time in milliseconds */
      42              : 
      43              : char host_name[HOST_NAME_LENGTH];
      44              : char expt_name[NAME_LENGTH];
      45              : char buffer_name[NAME_LENGTH];
      46              : INT nfragment;
      47              : char *dest_event;
      48              : HNDLE hDB, hKey, hStatKey, hSubkey, hEqKey, hESetKey;
      49              : BOOL debug = FALSE, debug1 = FALSE;
      50              : 
      51              : BOOL wheel = FALSE;
      52              : char bars[] = "|\\-/";
      53              : int i_bar;
      54              : BOOL abort_requested = FALSE, stop_requested = TRUE;
      55              : DWORD stop_time = 0, request_stop_time = 0;
      56              : 
      57              : INT(*meb_fragment_add) (char *, char *, INT *);
      58              : INT handFlush(void);
      59              : INT source_booking(void);
      60              : INT source_unbooking(void);
      61              : INT close_buffers(void);
      62              : INT source_scan(INT fmt, EQUIPMENT_INFO * eq_info);
      63              : INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
      64              : INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
      65              : 
      66              : INT eb_begin_of_run(INT, char *, char *);
      67              : INT eb_end_of_run(INT, char *);
      68              : INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
      69              : INT load_fragment(void);
      70              : INT scan_fragment(void);
      71              : //char *frontend_name;
      72              : //char *frontend_file_name;
      73              : BOOL frontend_call_loop;
      74              : 
      75              : extern INT max_event_size;
      76              : extern INT max_event_size_frag;
      77              : extern INT event_buffer_size;
      78              : extern INT display_period;
      79              : extern INT ebuilder_init(void);
      80              : extern INT ebuilder_exit(void);
      81              : extern INT ebuilder_loop(void);
      82              : 
      83              : extern EQUIPMENT equipment[];
      84              : extern INT md_event_swap(INT fmt, void * pevt);
      85              : 
      86              : #define EQUIPMENT_STATISTICS_STR "\
      87              : Events sent = DOUBLE : 0\n\
      88              : Events per sec. = DOUBLE : 0\n\
      89              : kBytes per sec. = DOUBLE : 0\n\
      90              : "
      91              : static int waiting_for_stop = FALSE;
      92              : 
      93              : /********************************************************************/
      94            0 : INT register_equipment(void)
      95              : {
      96            0 :    INT index, size, status;
      97            0 :    char str[256];
      98            0 :    EQUIPMENT_INFO *eq_info;
      99            0 :    EQUIPMENT_STATS *eq_stats;
     100            0 :    HNDLE hKey;
     101              : 
     102              :    /* get current ODB run state */
     103            0 :    size = sizeof(run_state);
     104            0 :    run_state = STATE_STOPPED;
     105            0 :    db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
     106            0 :    size = sizeof(run_number);
     107            0 :    run_number = 1;
     108            0 :    status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
     109            0 :    assert(status == SUCCESS);
     110              : 
     111              :    /* scan EQUIPMENT table from mevb.C */
     112            0 :    for (index = 0; equipment[index].name[0]; index++) {
     113            0 :       eq_info = &equipment[index].info;
     114            0 :       eq_stats = &equipment[index].stats;
     115              : 
     116            0 :       if (eq_info->event_id == 0) {
     117            0 :          printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
     118            0 :          cm_disconnect_experiment();
     119            0 :          ss_sleep(5000);
     120            0 :          exit(0);
     121              :       }
     122              : 
     123              :       /* init status */
     124            0 :       equipment[index].status = EB_SUCCESS;
     125              : 
     126            0 :       sprintf(str, "/Equipment/%s/Common", equipment[index].name);
     127              : 
     128              :       /* get last event limit from ODB */
     129            0 :       if (eq_info->eq_type != EQ_SLOW) {
     130            0 :          db_find_key(hDB, 0, str, &hKey);
     131            0 :          size = sizeof(double);
     132            0 :          if (hKey)
     133            0 :             db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size, TID_DOUBLE, TRUE);
     134              :       }
     135              : 
     136              :       /* Create common subtree */
     137            0 :       status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
     138            0 :       if (status != DB_SUCCESS) {
     139            0 :          printf("Cannot check equipment record, status = %d\n", status);
     140            0 :          ss_sleep(3000);
     141              :       }
     142            0 :       db_find_key(hDB, 0, str, &hKey);
     143              : 
     144            0 :       if (equal_ustring(eq_info->format, "FIXED"))
     145            0 :          equipment[index].format = FORMAT_FIXED;
     146              :       else                      /* default format is MIDAS */
     147            0 :          equipment[index].format = FORMAT_MIDAS;
     148              : 
     149            0 :       gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
     150            0 :       strcpy(eq_info->frontend_name, frontend_name);
     151            0 :       strcpy(eq_info->frontend_file_name, frontend_file_name);
     152              : 
     153              :       /* set record from equipment[] table in frontend.c */
     154            0 :       db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
     155              : 
     156              :       /* get record once at the start equipment info */
     157            0 :       size = sizeof(EQUIPMENT_INFO);
     158            0 :       db_get_record(hDB, hKey, eq_info, &size, 0);
     159              : 
     160              :     /*---- Create just the key , leave it empty ---------------------------------*/
     161            0 :       sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
     162            0 :       db_create_key(hDB, 0, str, TID_KEY);
     163            0 :       db_find_key(hDB, 0, str, &hKey);
     164            0 :       equipment[index].hkey_variables = hKey;
     165              : 
     166              :     /*---- Create and initialize statistics tree -------------------*/
     167            0 :       sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
     168              : 
     169            0 :       status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
     170            0 :       if (status != DB_SUCCESS) {
     171            0 :          printf("Cannot create/check statistics record, error %d\n", status);
     172            0 :          ss_sleep(3000);
     173              :       }
     174              : 
     175            0 :       status = db_find_key(hDB, 0, str, &hKey);
     176            0 :       if (status != DB_SUCCESS) {
     177            0 :          printf("Cannot find statistics record, error %d\n", status);
     178            0 :          ss_sleep(3000);
     179              :       }
     180              : 
     181            0 :       eq_stats->events_sent = 0;
     182            0 :       eq_stats->events_per_sec = 0;
     183            0 :       eq_stats->kbytes_per_sec = 0;
     184              : 
     185              :       /* open hot link to statistics tree */
     186            0 :       status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL);
     187            0 :       if (status == DB_NO_ACCESS) {
     188              :          /* record is probably still in exclusive access by dead FE, so reset it */
     189            0 :          status = db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE);
     190            0 :          if (status != DB_SUCCESS)
     191            0 :             cm_msg(MERROR, "register_equipment", "Cannot change access mode for record \'%s\', error %d", str, status);
     192              :          else
     193            0 :             cm_msg(MINFO, "register_equipment", "Recovered access mode for record \'%s\'", str);
     194            0 :          status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL, NULL);
     195              :       }
     196            0 :       if (status != DB_SUCCESS) {
     197            0 :          cm_msg(MERROR, "register_equipment", "Cannot open statistics record, error %d. Probably other FE is using it", status);
     198            0 :          ss_sleep(3000);
     199              :       }
     200              : 
     201              :     /*---- open event buffer ---------------------------------------*/
     202            0 :       if (eq_info->buffer[0]) {
     203            0 :          status = bm_open_buffer(eq_info->buffer, DEFAULT_BUFFER_SIZE, &equipment[index].buffer_handle);
     204            0 :          if (status != BM_SUCCESS && status != BM_CREATED) {
     205            0 :             cm_msg(MERROR, "register_equipment",
     206              :                    "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
     207              :           and rebuild the system.");
     208            0 :             return 0;
     209              :          }
     210              : 
     211            0 :          if (1)
     212              :            {
     213            0 :              int level = 0;
     214            0 :              bm_get_buffer_level(equipment[index].buffer_handle, &level);
     215            0 :              printf("Buffer %s, level %d, info: \n", eq_info->buffer, level);
     216              :            }
     217              : 
     218              :          /* set the default buffer cache size */
     219            0 :          bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
     220              :       } else {
     221            0 :          cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
     222            0 :          ss_sleep(3000);
     223            0 :          exit(0);
     224              :       }
     225              :    }
     226              :    return SUCCESS;
     227              : }
     228              : 
     229              : /********************************************************************/
     230            0 : INT load_fragment(void)
     231              : {
     232            0 :    INT i, size, type;
     233            0 :    HNDLE hEqKey, hSubkey;
     234            0 :    EQUIPMENT_INFO *eq_info;
     235            0 :    KEY key;
     236            0 :    char buffer[NAME_LENGTH];
     237            0 :    char format[8];
     238              : 
     239              :    /* Get equipment pointer, only one eqp for now */
     240            0 :    eq_info = &equipment[0].info;
     241              : 
     242              :    /* Scan Equipment/Common listing */
     243            0 :    if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
     244            0 :       cm_msg(MINFO, "load_fragment", "Equipment listing not found");
     245            0 :       return EB_ERROR;
     246              :    }
     247              : 
     248              :    /* Scan the Equipment list for fragment info collection */
     249            0 :    for (i = 0, nfragment = 0;; i++) {
     250            0 :       db_enum_key(hDB, hEqKey, i, &hSubkey);
     251            0 :       if (!hSubkey)
     252              :          break;
     253            0 :       db_get_key(hDB, hSubkey, &key);
     254            0 :       if (key.type == TID_KEY) {
     255              :          /* Equipment name */
     256            0 :          if (debug)
     257            0 :             printf("Equipment name:%s\n", key.name);
     258              :          /* Check if equipment is EQ_EB */
     259            0 :          size = sizeof(INT);
     260            0 :          db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
     261            0 :          size = sizeof(buffer);
     262            0 :          db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
     263            0 :          size = sizeof(format);
     264            0 :          db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
     265              :          /* Check if equipment match EB requirements */
     266              :          //      if (debug)
     267              :          //  printf("Equipment name: %s, Type: 0x%x, Buffer: %s, Format: %s\n", 
     268              :          //       key.name, type, buffer, format);
     269            0 :          if ((type & EQ_EB)
     270            0 :              && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
     271            0 :              && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
     272              :             /* match=> fill internal eb structure */
     273            0 :             strcpy(ebch[nfragment].format, format);
     274            0 :             strcpy(ebch[nfragment].buffer, buffer);
     275            0 :             size = sizeof(WORD);
     276            0 :             db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD,
     277              :                          0);
     278            0 :             size = sizeof(WORD);
     279            0 :             db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
     280            0 :             printf("Fragment %d: Equipment name %s,  evID %d, buffer %s\n",nfragment,key.name, ebch[nfragment].event_id, buffer);
     281              :             //      printf("Fragment %d: Equipment name %s,  evID %d, buffer %s\n",key.name, ebch[nfragment].event_id, buffer);
     282            0 :             nfragment++;
     283              :          }
     284              :       }
     285              :    }
     286              : 
     287            0 :    if (nfragment > 1)
     288            0 :       printf("Found %d fragments for event building\n", nfragment);
     289              :    else
     290            0 :       printf("Found one fragment for event building\n");
     291              : 
     292              :    /* Point to the Ebuilder settings */
     293              :    /* Set fragment_add function based on the format */
     294            0 :    if (equipment[0].format == FORMAT_MIDAS)
     295            0 :       meb_fragment_add = eb_mfragment_add;
     296              :    else {
     297            0 :       cm_msg(MERROR, "load_fragment", "Unknown data format \"%s\"", format);
     298            0 :       return EB_ERROR;
     299              :    }
     300              : 
     301              :    /* allocate destination event buffer */
     302            0 :    dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
     303            0 :    memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
     304            0 :    if (dest_event == NULL) {
     305              :       cm_msg(MERROR, "load_fragment", "%s: Not enough memory for event buffer", frontend_name);
     306              :       return EB_ERROR;
     307              :    }
     308              :    return EB_SUCCESS;
     309              : }
     310              : 
     311              : /********************************************************************/
     312            0 : INT scan_fragment(void)
     313              : {
     314            0 :    INT fragn, status;
     315            0 :    EQUIPMENT *eq;
     316            0 :    EQUIPMENT_INFO *eq_info;
     317            0 :    INT ch;
     318              : 
     319              :    /* Get equipment pointer, only one eqp for now */
     320            0 :    eq_info = &equipment[0].info;
     321            0 :    status = 0;
     322            0 :    eq = NULL;
     323              : 
     324              :    /* Main event loop */
     325            0 :    do {
     326            0 :       switch (run_state) {
     327            0 :       case STATE_STOPPED:
     328            0 :       case STATE_PAUSED:
     329              :          /* skip the source scan and yield */
     330            0 :          status = cm_yield(500);
     331            0 :          if (wheel) {
     332            0 :             printf("...%c Snoring\r", bars[i_bar++ % 4]);
     333            0 :             fflush(stdout);
     334              :          }
     335              :          break;
     336            0 :       case STATE_RUNNING:
     337            0 :          status = source_scan(equipment[0].format, eq_info);
     338            0 :          switch (status) {
     339            0 :          case BM_ASYNC_RETURN: // No event found for now, Check for timeout 
     340              : 
     341            0 :            if (1) {
     342              :              /* advanced checking for timeouts */
     343              : 
     344            0 :              time_t now = time(NULL);
     345            0 :              int empty = 1;
     346            0 :              int badfrag = -1;
     347              : 
     348              :              /* check if we recieved any data */
     349            0 :              for (fragn = 0; fragn < nfragment; fragn++)
     350            0 :                if (ebset.received[fragn])
     351            0 :                  empty = 0;
     352              : 
     353              :              /* only look for timeout if there is received data from any fragment */
     354            0 :              if (!empty)
     355            0 :                for (fragn = 0; fragn < nfragment; fragn++) {
     356              :                  //if (debug)
     357              :                  //  printf("frag %d, timeout %d, threshold %d, received %d, time %d\n", fragn, ebch[fragn].timeout, TIMEOUT, ebset.received[fragn], now - ebch[fragn].time);
     358            0 :                  if (ebch[fragn].time && ebch[fragn].timeout)
     359            0 :                    if (now - ebch[fragn].time > TIMEOUT_ABORT) {
     360              :                      //cm_msg(MERROR, "scan_fragment", "frag %d, timeout %d, %d sec", fragn, ebch[fragn].timeout, now - ebch[fragn].time);
     361            0 :                      badfrag = fragn;
     362              :                    }
     363              :                }
     364              : 
     365            0 :              if (badfrag >= 0) {
     366              :                //status = SS_ABORT;
     367            0 :                if (!waiting_for_stop && !stop_requested) {
     368            0 :                  cm_msg(MERROR, "scan_fragment", "timeout waiting for fragment %d, restarting run", badfrag);
     369            0 :                  cm_msg(MINFO, "scan_fragment", "spawning mtransition");
     370            0 :                  ss_system("mtransition STOP IF \"/Logger/Auto restart\" DELAY \"/Logger/Auto restart delay\" START &");
     371            0 :                  waiting_for_stop = TRUE;
     372              :                }
     373              :                break;
     374              :              }
     375              :            }
     376              : 
     377            0 :             for (fragn = 0; fragn < nfragment; fragn++) {
     378            0 :                if (ebch[fragn].timeout > TIMEOUT) {     /* Timeout */
     379            0 :                   if (stop_requested) { /* Stop */
     380            0 :                      if (debug)
     381            0 :                         printf("Timeout waiting for fragment %d while stopping the run\n", fragn);
     382            0 :                      status = close_buffers();
     383            0 :                      break;
     384              :                   } else {
     385              :                      /* No stop requested  but timeout, allow a yield to not
     386              :                         eat all the CPU */
     387            0 :                      status = cm_yield(10);
     388            0 :                      if (wheel) {
     389            0 :                         printf("...%c Timing on %1.0lf\r", bars[i_bar++ % 4], eq->stats.events_sent);
     390            0 :                         fflush(stdout);
     391              :                      }
     392              : 
     393              :                   }
     394              :                }
     395              :                //else { /* No timeout loop back */
     396              :             }                   // for loop over all fragments
     397              :             break;
     398            0 :          case EB_ERROR:
     399            0 :          case EB_USER_ERROR:
     400            0 :             abort_requested = TRUE;
     401            0 :             if (status == EB_USER_ERROR)
     402            0 :                cm_msg(MTALK, "scan_fragment", "%s: Error signaled by user code - stopping run...",
     403              :                       frontend_name);
     404              :             else
     405            0 :                cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", frontend_name);
     406              : 
     407            0 :             if (debug)
     408            0 :                printf("Stop requested on Error %d\n", status);
     409            0 :             close_buffers();
     410              : 
     411              : #if 0
     412              :             if (!waiting_for_stop && !stop_requested) {
     413              :               cm_msg(MINFO, "scan_fragment", "spawning mtransition");
     414              :               //ss_system("mtransition STOP IF \"/Logger/Auto restart\" DELAY \"/Logger/Auto restart delay\" START &");
     415              :               ss_system("mtransition STOP &");
     416              :               waiting_for_stop = TRUE;
     417              :             }
     418              : #endif
     419            0 :             for (ch=0; ch<5; ch++) {
     420            0 :                if (cm_transition(TR_STOP, 0, NULL, 0, TR_SYNC, 0) == CM_SUCCESS)
     421              :                   break;
     422            0 :                cm_msg(MERROR, "scan_fragment", "%s: Stop Transition request failed, trying again!", frontend_name);
     423              :             }
     424              :             return status;
     425              :             break;
     426              :          case EB_SUCCESS:
     427              :          case EB_SKIP:
     428              :             //   Normal path if event has been assembled
     429              :             //   No yield in this case.
     430              :             break;
     431            0 :          default:
     432            0 :             cm_msg(MERROR, "scan_fragment", "unexpected return %d", status);
     433            0 :             status = SS_ABORT;
     434              :          }                      // switch scan_source
     435              :          break;
     436              :       }
     437              :       /* EB job done, update statistics if its time */
     438              :       /* Check if it's time to do statistics job */
     439            0 :       if ((actual_millitime = ss_millitime()) - last_time > 1000) {
     440              :          /* Force event to appear at the destination if Ebuilder is remote */
     441            0 :          rpc_flush_event();
     442              :          /* Force event ot appear at the destination if Ebuilder is local */
     443            0 :          bm_flush_cache(equipment[0].buffer_handle, BM_NO_WAIT);
     444              : 
     445            0 :          status = cm_yield(10);
     446              : 
     447            0 :          eq = &equipment[0];
     448            0 :          eq->stats.events_sent += eq->events_sent;
     449            0 :          eq->stats.events_per_sec = eq->events_sent / ((actual_millitime - last_time) / 1000.0);
     450            0 :          eq->stats.kbytes_per_sec = eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) / 1000.0);
     451            0 :          eq->bytes_sent = 0;
     452            0 :          eq->events_sent = 0;
     453              :          /* update destination statistics */
     454            0 :          db_send_changed_records();
     455              :          /* Keep track of last ODB update */
     456            0 :          last_time = ss_millitime();
     457              :       }
     458              : 
     459            0 :       ch = 0;
     460            0 :       if (ss_kbhit()) {
     461            0 :          ch = ss_getchar(0);
     462            0 :          if (ch == -1)
     463            0 :             ch = getchar();
     464            0 :          if ((char) ch == '!')
     465              :             break;
     466              :       }
     467            0 :    } while (status != RPC_SHUTDOWN && status != SS_ABORT);
     468              : 
     469              :    return status;
     470              : }
     471              : 
     472              : /********************************************************************/
     473            0 : INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
     474              : {
     475            0 :    BANK_HEADER *psbh, *pdbh;
     476            0 :    char *psdata, *pddata;
     477            0 :    INT bksize;
     478              : 
     479              :    /* Condition for new EVENT the data_size should be ZERO */
     480            0 :    *size = ((EVENT_HEADER *) pdest)->data_size;
     481              : 
     482              :    /* destination pointer */
     483            0 :    pddata = pdest + *size + sizeof(EVENT_HEADER);
     484              : 
     485            0 :    if (*size) {
     486              :       /* NOT the first fragment */
     487              : 
     488              :       /* Swap event source if necessary */
     489            0 :       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
     490            0 :       bk_swap(psbh, FALSE);
     491              : 
     492              :       /* source pointer */
     493            0 :       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
     494            0 :       psdata = (char *) (psbh + 1);
     495              : 
     496              :       /* copy all banks without the bank header */
     497            0 :       bksize = psbh->data_size;
     498              : 
     499              :       /* copy */
     500            0 :       memcpy(pddata, psdata, bksize);
     501              : 
     502              :       /* update event size */
     503            0 :       ((EVENT_HEADER *) pdest)->data_size += bksize;
     504              : 
     505              :       /* update bank size */
     506            0 :       pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
     507            0 :       pdbh->data_size += bksize;
     508              : 
     509            0 :       *size = ((EVENT_HEADER *) pdest)->data_size;
     510              :    } else {
     511              :       /* First event without the event header but with the 
     512              :          bank header as the size is zero */
     513            0 :       *size = ((EVENT_HEADER *) psrce)->data_size;
     514              : 
     515              :       /* Swap event if necessary */
     516            0 :       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
     517            0 :       bk_swap(psbh, FALSE);
     518              : 
     519              :       /* copy first fragment */
     520            0 :       memcpy(pddata, psbh, *size);
     521              : 
     522              :       /* update destination event size */
     523            0 :       ((EVENT_HEADER *) pdest)->data_size = *size;
     524              :    }
     525            0 :    return CM_SUCCESS;
     526              : }
     527              : 
     528              : /*--------------------------------------------------------------------*/
     529            0 : INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
     530              : {
     531              :    /* pdest : EVENT_HEADER pointer
     532              :       psrce : EVENT_HEADER pointer
     533              :       Keep pbkh for later incrementation
     534              :     */
     535            0 :    char *psdata, *pddata;
     536            0 :    DWORD *pslrl, *pdlrl;
     537            0 :    INT i4frgsize, i1frgsize;
     538              : 
     539              :    /* Condition for new EVENT the data_size should be ZERO */
     540            0 :    *size = ((EVENT_HEADER *) pdest)->data_size;
     541              : 
     542              :    /* destination pointer skip the header as it has been already
     543              :       composed and the usere may have modified it on purpose (Midas Control) */
     544            0 :    pddata = pdest + *size + sizeof(EVENT_HEADER);
     545              : 
     546              :    /* the Midas header is present for logger */
     547            0 :    if (*size) {                 /* already filled with a fragment */
     548              : 
     549              :       /* source pointer: number of DWORD (lrl included) */
     550            0 :       pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
     551              : 
     552              :       /* Swap event if necessary */
     553            0 :       md_event_swap(FORMAT_MIDAS, pslrl);
     554              : 
     555              :       /* copy done in bytes, do not include LRL */
     556            0 :       psdata = (char *) (pslrl + 1);
     557              : 
     558              :       /* copy size in I*4 (lrl included, remove it) */
     559            0 :       i4frgsize = (*pslrl);
     560            0 :       i1frgsize = 4 * i4frgsize;
     561              : 
     562              :       /* append fragment */
     563            0 :       memcpy(pddata, psdata, i1frgsize);
     564              : 
     565              :       /* update Midas header event size */
     566            0 :       ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
     567              : 
     568              :       /* update LRL size (I*4) */
     569            0 :       pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
     570            0 :       *pdlrl += i4frgsize;
     571              : 
     572              :       /* Return event size in bytes */
     573            0 :       *size = ((EVENT_HEADER *) pdest)->data_size;
     574              :    } else {                     /* new destination event */
     575              :       /* The composed event has already the MIDAS header.
     576              :          which may have been modified by the user in ebuser.c
     577              :          Will be stripped by the logger (YBOS).
     578              :          Copy the first full event ( no EVID suppression )
     579              :          First event (without the event header) */
     580              : 
     581              :       /* source pointer */
     582            0 :       pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
     583              : 
     584              :       /* Swap event if necessary */
     585            0 :       md_event_swap(FORMAT_MIDAS, pslrl);
     586              : 
     587              :       /* size in byte from the source midas header */
     588            0 :       *size = ((EVENT_HEADER *) psrce)->data_size;
     589              : 
     590              :       /* copy first fragment */
     591            0 :       memcpy(pddata, (char *) pslrl, *size);
     592              : 
     593              :       /* update destination Midas header event size */
     594            0 :       ((EVENT_HEADER *) pdest)->data_size += *size;
     595              : 
     596              :    }
     597            0 :    return CM_SUCCESS;
     598              : }
     599              : 
     600              : /*--------------------------------------------------------------------*/
     601            0 : INT tr_start(INT rn, char *error)
     602              : {
     603            0 :    EBUILDER(ebuilder_str);
     604            0 :    INT status, size, i;
     605            0 :    char str[128];
     606            0 :    KEY key;
     607            0 :    HNDLE hKey, hEqkey, hEqFRkey;
     608            0 :    EQUIPMENT_INFO *eq_info;
     609              : 
     610            0 :    if (debug)
     611            0 :      printf("tr_start: run %d\n", rn);
     612              : 
     613            0 :    eq_info = &equipment[0].info;
     614              : 
     615              :    /* Get update eq_info from ODB */
     616            0 :    sprintf(str, "/Equipment/%s/Common", equipment[0].name);
     617            0 :    status = db_find_key(hDB, 0, str, &hKey);
     618            0 :    size = sizeof(EQUIPMENT_INFO);
     619            0 :    db_get_record(hDB, hKey, eq_info, &size, 0);
     620              : 
     621            0 :    ebset.nfragment = nfragment;
     622              : 
     623              :    /* reset serial numbers */
     624            0 :    for (i = 0; equipment[i].name[0]; i++) {
     625            0 :       equipment[i].serial_number = 1;
     626            0 :       equipment[i].subevent_number = 0;
     627            0 :       equipment[i].stats.events_sent = 0;
     628            0 :       equipment[i].odb_in = equipment[i].odb_out = 0;
     629              :    }
     630              : 
     631              :    /* Get / Set Settings */
     632            0 :    sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
     633            0 :    if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
     634            0 :       status = db_create_record(hDB, 0, str, strcomb1(ebuilder_str).c_str());
     635              :    }
     636              : 
     637              :    /* Keep Key on Ebuilder/Settings */
     638            0 :    sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
     639            0 :    if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
     640            0 :       cm_msg(MINFO, "tr_start", "/Equipment/%s/Settings not found", equipment[0].name);
     641              :    }
     642              : 
     643              :    /* Update or Create User_field */
     644            0 :    size = sizeof(ebset.user_field);
     645            0 :    status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
     646              : 
     647              :    /* Update or Create User_Build */
     648            0 :    size = sizeof(ebset.user_build);
     649            0 :    status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
     650              : 
     651              :    /* update ODB */
     652            0 :    size = sizeof(INT);
     653            0 :    status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);
     654              : 
     655              :    /* Create or update the fragment request list */
     656            0 :    status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
     657            0 :    status = db_get_key(hDB, hEqFRkey, &key);
     658            0 :    assert(status == DB_SUCCESS);
     659              : 
     660            0 :    if (key.num_values != ebset.nfragment) {
     661            0 :       cm_msg(MINFO, "tr_start", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
     662            0 :       free(ebset.preqfrag);
     663            0 :       size = ebset.nfragment * sizeof(BOOL);
     664            0 :       ebset.preqfrag = (BOOL*)malloc(size);
     665            0 :       for (i = 0; i < ebset.nfragment; i++)
     666            0 :          ebset.preqfrag[i] = TRUE;
     667            0 :       status =
     668            0 :          db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);
     669              :    } else {                     // Take from ODBedit
     670            0 :       size = key.total_size;
     671            0 :       free(ebset.preqfrag);
     672            0 :       ebset.preqfrag = (BOOL*)malloc(size);
     673            0 :       status = db_get_data(hDB, hEqFRkey, ebset.preqfrag, &size, TID_BOOL);
     674              :    }
     675              :    /* Cleanup fragment flags */
     676            0 :    free(ebset.received);
     677            0 :    ebset.received = (BOOL*)malloc(size);
     678            0 :    for (i = 0; i < ebset.nfragment; i++)
     679            0 :       ebset.received[i] = FALSE;
     680              : 
     681              :    /* Check if at least one fragment is requested */
     682            0 :    for (i = 0; i < ebset.nfragment; i++)
     683            0 :       if (ebset.preqfrag[i])
     684              :          break;
     685              : 
     686            0 :    if (i == ebset.nfragment) {
     687            0 :       cm_msg(MERROR, "tr_start", "Run start aborted because no fragment required");
     688            0 :       return 0;
     689              :    }
     690              : 
     691              :    /* Call BOR user function */
     692            0 :    status = eb_begin_of_run(run_number, ebset.user_field, error);
     693            0 :    if (status != EB_SUCCESS) {
     694            0 :       cm_msg(MERROR, "tr_start", "run start aborted due to eb_begin_of_run (%d)", status);
     695            0 :       return status;
     696              :    }
     697              : 
     698              :    /* Book all fragment */
     699            0 :    status = source_booking();
     700            0 :    if (status != SUCCESS)
     701              :       return status;
     702              : 
     703            0 :    if (!eq_info->enabled) {
     704            0 :       cm_msg(MINFO, "tr_start", "Event Builder disabled");
     705            0 :       return CM_SUCCESS;
     706              :    }
     707              : 
     708              :    /* local run state */
     709            0 :    run_state = STATE_RUNNING;
     710            0 :    run_number = rn;
     711            0 :    stop_requested = FALSE;
     712            0 :    abort_requested = FALSE;
     713            0 :    printf("%s-Starting New Run: %d\n", frontend_name, rn);
     714              : 
     715            0 :    if (1) {
     716            0 :      int fragn;
     717            0 :      time_t now = time(NULL);
     718              :      
     719              :      // reset timeouts
     720            0 :      for (fragn = 0; fragn < nfragment; fragn++) {
     721            0 :        ebch[fragn].time = now;
     722            0 :        ebch[fragn].timeout = 0;
     723              :      }
     724              :    }
     725              : 
     726              :    /* Reset global trigger mask */
     727              :    return CM_SUCCESS;
     728              : }
     729              : 
     730              : /*--------------------------------------------------------------------*/
     731            0 : INT tr_resume(INT rn, char *error)
     732              : {
     733            0 :   int fragn;
     734            0 :   time_t now = time(NULL);
     735              : 
     736            0 :    printf("\n%s-Resume Run: %d detected\n", frontend_name, rn);
     737              : 
     738              :    // reset timeouts
     739            0 :    for (fragn = 0; fragn < nfragment; fragn++) {
     740            0 :      ebch[fragn].time = now;
     741            0 :      ebch[fragn].timeout = 0;
     742              :    }
     743              : 
     744            0 :    run_state = STATE_RUNNING;
     745            0 :    return CM_SUCCESS;
     746              : }
     747              : 
     748              : /*--------------------------------------------------------------------*/
     749            0 : INT tr_pause(INT rn, char *error)
     750              : {
     751            0 :    printf("\n%s-Pause Run: %d detected\n", frontend_name, rn);
     752              : 
     753            0 :    run_state = STATE_PAUSED;
     754            0 :    return CM_SUCCESS;
     755              : }
     756              : 
     757              : /*--------------------------------------------------------------------*/
     758            0 : INT tr_stop(INT rn, char *error)
     759              : {
     760            0 :    waiting_for_stop = FALSE;
     761              : 
     762            0 :    if (debug)
     763            0 :      printf("tr_stop: run %d\n", rn);
     764              : 
     765              :    /* local stop */
     766            0 :    stop_requested = TRUE;
     767              : 
     768              :    /* local stop time */
     769            0 :    request_stop_time = ss_millitime();
     770            0 :    return CM_SUCCESS;
     771              : }
     772              : 
     773              : /*--------------------------------------------------------------------*/
     774            0 : void free_event_buffer(INT nfrag)
     775              : {
     776            0 :    INT i;
     777            0 :    for (i = 0; i < nfrag; i++) {
     778            0 :       if (ebch[i].pfragment) {
     779            0 :          free(ebch[i].pfragment);
     780            0 :          ebch[i].pfragment = NULL;
     781              :       }
     782              :    }
     783            0 : }
     784              : 
     785              : /*--------------------------------------------------------------------*/
     786            0 : INT handFlush()
     787              : {
     788            0 :    int i, size, status;
     789            0 :    char strout[256];
     790              : 
     791              :    /* Do Hand flush until better way to  garantee the input buffer to be empty */
     792            0 :    if (debug)
     793            0 :       printf("Hand flushing system buffer... \n");
     794            0 :    for (i = 0; i < nfragment; i++) {
     795            0 :       do { 
     796            0 :          status = 0;
     797            0 :          if (ebset.preqfrag[i]) {
     798            0 :             size = max_event_size;
     799            0 :             status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, BM_NO_WAIT);
     800            0 :             if (debug1) {
     801            0 :                sprintf(strout,
     802              :                        "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d  Last Ser:%d",
     803            0 :                        i, ebch[i].hBuf, status, ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
     804            0 :                printf("%s\n", strout);
     805              :             }
     806              :          }
     807            0 :       } while (status == BM_SUCCESS);
     808              :    }
     809              : 
     810              :    /* Empty source buffer */
     811            0 :    status = bm_empty_buffers();
     812            0 :    if (status != BM_SUCCESS)
     813            0 :       cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
     814            0 :    run_state = STATE_STOPPED;
     815            0 :    return status;
     816              : }
     817              : 
     818              : 
     819              : /*--------------------------------------------------------------------*/
     820            0 : INT source_booking()
     821              : {
     822            0 :    INT j, i, status, status1, status2;
     823              : 
     824            0 :    if (debug)
     825            0 :       printf("Entering booking\n");
     826              : 
     827              :    status1 = status2 = 0;
     828              : 
     829              :    /* Book all the source channels */
     830            0 :    for (i = 0; i < nfragment; i++) {
     831              :       /* Book only the requested event mask */
     832            0 :       if (ebset.preqfrag[i]) {
     833              :          /* Connect channel to source buffer */
     834            0 :          status1 = bm_open_buffer(ebch[i].buffer, DEFAULT_BUFFER_SIZE, &(ebch[i].hBuf));
     835              : 
     836            0 :          if (debug)
     837            0 :             printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
     838            0 :                    i, ebch[i].buffer, ebch[i].hBuf, status1);
     839              : 
     840            0 :          if (1)
     841              :            {
     842            0 :              int level = 0;
     843            0 :              bm_get_buffer_level(ebch[i].hBuf, &level);
     844            0 :              printf("Buffer %s, level %d, info: \n", ebch[i].buffer, level);
     845              :            }
     846              : 
     847              : 
     848              :          /* Register for specified channel event ID and Trigger mask */
     849            0 :          status2 =
     850            0 :              bm_request_event(ebch[i].hBuf, ebch[i].event_id,
     851              :                               TRIGGER_ALL, GET_ALL, &ebch[i].req_id, NULL);
     852            0 :          if (debug)
     853            0 :             printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
     854            0 :                    i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
     855            0 :          if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
     856            0 :              ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
     857            0 :             cm_msg(MERROR, "source_booking",
     858              :                    "Open buffer/event request failure [%d %d %d]", i, status1, status2);
     859            0 :             return BM_CONFLICT;
     860              :          }
     861              : 
     862              :          /* allocate local source event buffer */
     863            0 :          if (ebch[i].pfragment)
     864            0 :             free(ebch[i].pfragment);
     865            0 :          ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
     866            0 :          if (debug)
     867            0 :             printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
     868            0 :          if (ebch[i].pfragment == NULL) {
     869            0 :             free_event_buffer(nfragment);
     870            0 :             cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
     871            0 :             return BM_NO_MEMORY;
     872              :          }
     873              :       }
     874              :    }
     875              : 
     876              :    /* Empty source buffer */
     877            0 :    status = bm_empty_buffers();
     878            0 :    if (status != BM_SUCCESS) {
     879            0 :       cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
     880            0 :       return status;
     881              :    }
     882              : 
     883            0 :    if (debug) {
     884            0 :       printf("bm_empty_buffers stat:%d\n", status);
     885            0 :       for (j = 0; j < ebset.nfragment; j++) {
     886            0 :          printf(" buff:%s", ebch[j].buffer);
     887            0 :          printf(" ser#:%d", ebch[j].serial);
     888            0 :          printf(" hbuf:%2d", ebch[j].hBuf);
     889            0 :          printf(" rqid:%2d", ebch[j].req_id);
     890            0 :          printf(" opst:%d", status1);
     891            0 :          printf(" rqst:%d", status2);
     892            0 :          printf(" evid:%2d", ebch[j].event_id);
     893            0 :          printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
     894              :       }
     895              :    }
     896              : 
     897              :    return SUCCESS;
     898              : }
     899              : 
     900              : /*--------------------------------------------------------------------*/
     901            0 : INT source_unbooking()
     902              : {
     903            0 :    INT i, status;
     904              : 
     905              :    /* unbook all source channels */
     906            0 :    for (i = 0; i < nfragment; i++) {
     907              : 
     908              :    /* Skip unbooking if already done */
     909            0 :       if (ebch[i].pfragment != NULL) {
     910            0 :          bm_empty_buffers();
     911              : 
     912              :          /* Remove event ID registration */
     913            0 :          status = bm_delete_request(ebch[i].req_id);
     914            0 :          if (debug)
     915            0 :             printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id, status);
     916              : 
     917              :          /* Close source buffer */
     918            0 :          status = bm_close_buffer(ebch[i].hBuf);
     919            0 :          if (debug)
     920            0 :             printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf, status);
     921            0 :          if (status != BM_SUCCESS) {
     922            0 :             cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat: %d", i, status);
     923            0 :             return status;
     924              :          }
     925              :       }
     926              :    }
     927              : 
     928              :    /* release local event buffer memory */
     929            0 :    free_event_buffer(nfragment);
     930              : 
     931            0 :    return EB_SUCCESS;
     932              : }
     933              : 
     934              : /*--------------------------------------------------------------------*/
     935            0 : INT close_buffers(void)
     936              : {
     937            0 :    INT status;
     938            0 :    char error[256];
     939            0 :    EQUIPMENT *eq;
     940              : 
     941            0 :    eq = &equipment[0];
     942              : 
     943              :    /* Flush local destination cache */
     944            0 :    bm_flush_cache(equipment[0].buffer_handle, BM_WAIT);
     945              :    /* Call user function */
     946            0 :    eb_end_of_run(run_number, error);
     947              :    /* Cleanup buffers */
     948            0 :    handFlush();
     949              :    /* Detach all source from midas */
     950            0 :    status = source_unbooking();
     951              : 
     952              :    /* Compose message */
     953            0 :    stop_time = ss_millitime() - request_stop_time;
     954            0 :    sprintf(error, "Run %d Stop after %1.0lf + %d events sent DT:%d[ms]",
     955              :            run_number, eq->stats.events_sent, eq->events_sent, stop_time);
     956            0 :    cm_msg(MINFO, "close_buffers", "%s", error);
     957              : 
     958            0 :    run_state = STATE_STOPPED;
     959            0 :    abort_requested = FALSE;
     960            0 :    return status;
     961              : }
     962              : 
     963              : /********************************************************************/
     964              : /**
     965              : Scan all the fragment source once per call.
     966              : 
     967              : -# This will retrieve the full midas event not swapped (except the
     968              : MIDAS_HEADER) for each fragment if possible. The fragment will
     969              : be stored in the channel event pointer.
     970              : -# if after a full nfrag path some frag are still not cellected, it
     971              : returns with the frag# missing for timeout check.
     972              : -# If ALL fragments are present it will check the midas serial#
     973              : for a full match across all the fragments.
     974              : -# If the serial check fails it returns with "event mismatch"
     975              : and will abort the event builder but not stop the run for now.
     976              : -# If the serial check is passed, it will call the user_build function
     977              : where the destination event is going to be composed.
     978              : 
     979              : @param fmt Fragment format type 
     980              : @param eq_info Equipement pointer
     981              : @return   EB_NO_MORE_EVENT, EB_COMPOSE_TIMEOUT
     982              : if different then SUCCESS (bm_compose, rpc_sent error)
     983              : */
     984            0 : INT source_scan(INT fmt, EQUIPMENT_INFO * eq_info)
     985              : {
     986            0 :    static DWORD serial;
     987            0 :    INT i, status, size;
     988            0 :    INT act_size;
     989            0 :    BOOL found, event_mismatch;
     990            0 :    BANK_HEADER *psbh;
     991              : 
     992            0 :    status = 0;
     993              : 
     994              :    /* Scan all channels at least once */
     995            0 :    for (i = 0; i < nfragment; i++) {
     996              :       /* Check if current channel needs to be received */
     997            0 :       if (ebset.preqfrag[i] && !ebset.received[i]) {
     998              :          /* Get fragment and store it in ebch[i].pfragment */
     999            0 :          size = max_event_size;
    1000            0 :          status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, BM_NO_WAIT);
    1001              :          //printf("call bm_receive_event from %s, serial %d, status %d\n", ebch[i].buffer, serial, status);
    1002            0 :          switch (status) {
    1003            0 :          case BM_SUCCESS:      /* event received */
    1004              :             /* Mask event */
    1005            0 :             ebset.received[i] = TRUE;
    1006              :             /* Keep local serial */
    1007            0 :             ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
    1008              :             /* clear timeout */
    1009            0 :             ebch[i].timeout = 0;
    1010            0 :             ebch[i].time = time(NULL);
    1011              : 
    1012              :             /* Swap event depending on data format */
    1013            0 :             switch (fmt) {
    1014            0 :             case FORMAT_MIDAS:
    1015            0 :                psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
    1016            0 :                bk_swap(psbh, FALSE);
    1017              :                break;
    1018              :             }
    1019              : 
    1020            0 :             if (debug1) {
    1021            0 :                printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i, ebch[i].serial, ebset.received[i], size);
    1022              :             }
    1023              :             break;
    1024            0 :          case BM_ASYNC_RETURN: /* timeout */
    1025            0 :             ebch[i].timeout++;
    1026            0 :             if (debug1) {
    1027            0 :               printf("ASYNC: ch:%d ser:%d rec:%d sz:%d, timeout:%d\n", i, ebch[i].serial, ebset.received[i], size, ebch[i].timeout);
    1028              :             }
    1029              :             //return status;
    1030              :             break;
    1031            0 :          default:              /* Error */
    1032            0 :             cm_msg(MERROR, "source_scan", "bm_receive_event error %d", status);
    1033            0 :             return status;
    1034              :             break;
    1035              :          }
    1036              :       }                         /* next channel */
    1037              :    }
    1038              : 
    1039              :    /* Check if all fragments have been received */
    1040            0 :    for (i = 0; i < nfragment; i++) {
    1041            0 :       if (ebset.preqfrag[i] && !ebset.received[i])
    1042              :          break;
    1043              :    }
    1044            0 :    if (i == nfragment) {
    1045              :       /* Check if serial matches */
    1046            0 :       found = event_mismatch = FALSE;
    1047            0 :       serial = 0;
    1048              :       /* Check Serial, mark first serial */
    1049            0 :       for (i = 0; i < nfragment; i++) {
    1050            0 :          if (ebset.preqfrag[i] && ebset.received[i] && !found) {
    1051            0 :             serial = ebch[i].serial;
    1052            0 :             found = TRUE;
    1053              :          } else {
    1054            0 :             if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
    1055              :                /* Event mismatch */
    1056            0 :                event_mismatch = TRUE;
    1057              :             }
    1058              :          }
    1059              :       }
    1060              : 
    1061              :       /* internal action in case of event mismatch */
    1062            0 :       if (event_mismatch && debug) {
    1063            0 :          char str[256];
    1064            0 :          char strsub[128];
    1065            0 :          strcpy(str, "event mismatch: ");
    1066            0 :          for (i = 0; i < nfragment; i++) {
    1067            0 :             sprintf(strsub, "Ser[%d]:%d ", i, ebch[i].serial);
    1068            0 :             strcat(str, strsub);
    1069              :          }
    1070            0 :          printf("event serial mismatch %s\n", str);
    1071              :       }
    1072              : 
    1073              :       /* In any case reset destination buffer */
    1074            0 :       memset(dest_event, 0, sizeof(EVENT_HEADER));
    1075            0 :       act_size = 0;
    1076              : 
    1077              :       /* Fill reserved header space of destination event with
    1078              :          final header information */
    1079            0 :       bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
    1080              :                        act_size, serial);
    1081              : 
    1082              :       /* Pass fragments to user with mismatch flag, for final check before assembly */
    1083            0 :       status =
    1084            0 :           eb_user(nfragment, event_mismatch, ebch, (EVENT_HEADER *) dest_event,
    1085            0 :                   (void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
    1086            0 :       if (status != EB_SUCCESS) {
    1087            0 :          if (status == EB_SKIP) {
    1088              :             /* Reset mask and timeouts as if event has been successfully send out */
    1089            0 :             for (i = 0; i < nfragment; i++) {
    1090            0 :                ebch[i].timeout = 0;
    1091            0 :                ebset.received[i] = FALSE;
    1092              :             }
    1093              :          }
    1094            0 :          return status;         // Event mark as EB_SKIP or EB_ABORT by user
    1095              :       }
    1096              : 
    1097              :       /* Allow bypass of fragment assembly if user did it on its own */
    1098            0 :       if (!ebset.user_build) {
    1099            0 :          for (i = 0; i < nfragment; i++) {
    1100            0 :             if (ebset.preqfrag[i]) {
    1101            0 :                status = meb_fragment_add(dest_event, ebch[i].pfragment, &act_size);
    1102            0 :                if (status != EB_SUCCESS) {
    1103            0 :                   cm_msg(MERROR, "source_scan",
    1104              :                          "compose fragment:%d current size:%d (%d)", i, act_size, status);
    1105            0 :                   return EB_ERROR;
    1106              :                }
    1107              :             }
    1108              :          }
    1109              :       }
    1110              : 
    1111              :       /* Overall event to be sent */
    1112            0 :       act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
    1113              : 
    1114              :       /* Send event and wait for completion */
    1115            0 :       status = rpc_send_event(equipment[0].buffer_handle, (EVENT_HEADER *) dest_event, act_size, BM_WAIT, 0);
    1116            0 :       if (status != BM_SUCCESS) {
    1117            0 :          if (debug)
    1118            0 :             printf("rpc_send_event returned error %d, event_size %d\n", status, act_size);
    1119            0 :          cm_msg(MERROR, "source_scan", "%s: rpc_send_event returned error %d", frontend_name, status);
    1120            0 :          return EB_ERROR;
    1121              :       }
    1122              : 
    1123              :       /* Keep track of the total byte count */
    1124            0 :       equipment[0].bytes_sent += act_size;
    1125              : 
    1126              :       /* update destination event count */
    1127            0 :       equipment[0].events_sent++;
    1128              : 
    1129              :       /* Reset mask and timeouts as even thave been succesfully send */
    1130            0 :       for (i = 0; i < nfragment; i++) {
    1131            0 :          ebch[i].timeout = 0;
    1132            0 :          ebset.received[i] = FALSE;
    1133              :       }
    1134              :    }                            // all fragment recieved for this event
    1135              : 
    1136              :    return status;
    1137              : }
    1138              : 
    1139              : /*--------------------------------------------------------------------*/
    1140            0 : int main(int argc, char **argv)
    1141              : {
    1142            0 :    INT status, size, rstate;
    1143            0 :    int i;
    1144            0 :    BOOL daemon = FALSE;
    1145            0 :    HNDLE hEqkey;
    1146            0 :    EBUILDER(ebuilder_str);
    1147            0 :    char str[128];
    1148            0 :    int auto_restart = 0;
    1149            0 :    int restart_count = 0;
    1150              : 
    1151              :    /* init structure */
    1152            0 :    memset(&ebch[0], 0, sizeof(ebch));
    1153              : 
    1154              :    /* set default */
    1155            0 :    cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
    1156              : 
    1157              :    /* set default buffer name */
    1158            0 :    strcpy(buffer_name, "SYSTEM");
    1159              : 
    1160              :    /* get parameters */
    1161            0 :    for (i = 1; i < argc; i++) {
    1162            0 :       if (argv[i][0] == '-' && argv[i][1] == 'd')
    1163            0 :          debug = TRUE;
    1164            0 :       else if (argv[i][0] == '-' && argv[i][1] == 'D')
    1165              :          daemon = TRUE;
    1166            0 :       else if (argv[i][0] == '-' && argv[i][1] == 'w')
    1167            0 :          wheel = TRUE;
    1168            0 :       else if (argv[i][0] == '-') {
    1169            0 :          if (i + 1 >= argc || argv[i + 1][0] == '-')
    1170            0 :             goto usage;
    1171            0 :          if (strncmp(argv[i], "-e", 2) == 0)
    1172            0 :             strcpy(expt_name, argv[++i]);
    1173            0 :          else if (strncmp(argv[i], "-h", 2) == 0)
    1174            0 :             strcpy(host_name, argv[++i]);
    1175            0 :          else if (strncmp(argv[i], "-b", 2) == 0)
    1176            0 :             strcpy(buffer_name, argv[++i]);
    1177              :       } else {
    1178            0 :        usage:
    1179            0 :          printf("usage: mevb [-h <Hostname>] [-e <Experiment>] [-b <buffername>] [-d] [-w] [-D]\n");
    1180            0 :          printf("  [-h <Hostname>]    Host where midas experiment is running on\n");
    1181            0 :          printf("  [-e <Experiment>]  Midas experiment if more than one exists\n");
    1182            0 :          printf("  [-b <buffername>]  Specify evnet buffer name, use \"SYSTEM\" by default\n");
    1183            0 :          printf("  [-d]               Print debugging output\n");
    1184            0 :          printf("  [-w]               Show wheel\n");
    1185            0 :          printf("  [-D]               Start as a daemon\n");
    1186            0 :          return 0;
    1187              :       }
    1188              :    }
    1189              : 
    1190            0 :    printf("MIDAS example event builder. Press \"!\" to exit.\n");
    1191              : 
    1192            0 :    if (daemon) {
    1193            0 :       printf("Becoming a daemon...\n");
    1194            0 :       ss_daemon_init(FALSE);
    1195              :    }
    1196              : 
    1197              :    /* Connect to experiment */
    1198            0 :    status = cm_connect_experiment(host_name, expt_name, frontend_name, NULL);
    1199            0 :    if (status != CM_SUCCESS) {
    1200            0 :       ss_sleep(5000);
    1201            0 :       goto exit;
    1202              :    }
    1203              : 
    1204            0 :    if (debug)
    1205            0 :       cm_set_watchdog_params(TRUE, 0);
    1206              : 
    1207              :    /* Connect to ODB */
    1208            0 :    status = cm_get_experiment_database(&hDB, &hKey);
    1209            0 :    if (status != EB_SUCCESS) {
    1210            0 :       ss_sleep(5000);
    1211            0 :       goto exit;
    1212              :    }
    1213              : 
    1214              :    /* check if Ebuilder is already running */
    1215            0 :    status = cm_exist(frontend_name, FALSE);
    1216            0 :    if (status == CM_SUCCESS) {
    1217            0 :       cm_msg(MERROR, "main", "%s running already!.", frontend_name);
    1218            0 :       cm_disconnect_experiment();
    1219            0 :       goto exit;
    1220              :    }
    1221              : 
    1222              :    /* Check if run in progess if so abort */
    1223            0 :    size = sizeof(rstate);
    1224            0 :    db_get_value(hDB, 0, "/Runinfo/State", &rstate, &size, TID_INT, FALSE);
    1225            0 :    if (rstate != STATE_STOPPED) {
    1226            0 :       cm_msg(MERROR, "main", "Run in Progress, EBuilder aborted!.");
    1227            0 :       cm_disconnect_experiment();
    1228            0 :       goto exit;
    1229              :    }
    1230              : 
    1231            0 :    if (ebuilder_init() != SUCCESS) {
    1232            0 :       cm_disconnect_experiment();
    1233              :       /* let user read message before window might close */
    1234            0 :       ss_sleep(5000);
    1235            0 :       goto exit;
    1236              :    }
    1237              : 
    1238              :    /* Register single equipment */
    1239            0 :    status = register_equipment();
    1240            0 :    if (status != EB_SUCCESS) {
    1241            0 :       ss_sleep(5000);
    1242            0 :       goto exit;
    1243              :    }
    1244              : 
    1245              :    /* Load Fragment info */
    1246            0 :    status = load_fragment();
    1247            0 :    if (status != EB_SUCCESS) {
    1248            0 :       ss_sleep(5000);
    1249            0 :       goto exit;
    1250              :    }
    1251              : 
    1252              :    /* Register transition for reset counters */
    1253            0 :    if (cm_register_transition(TR_START, tr_start, 400) != CM_SUCCESS)
    1254              :       return status;
    1255            0 :    if (cm_register_transition(TR_RESUME, tr_resume, 400) != CM_SUCCESS)
    1256              :       return status;
    1257            0 :    if (cm_register_transition(TR_PAUSE, tr_pause, 600) != CM_SUCCESS)
    1258            0 :       goto exit;
    1259            0 :    if (cm_register_transition(TR_STOP, tr_stop, 600) != CM_SUCCESS)
    1260            0 :       goto exit;
    1261              : 
    1262            0 :  restart:
    1263              : 
    1264              :    /* Set Initial EB/Settings */
    1265            0 :    sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
    1266            0 :    if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
    1267            0 :       status = db_create_record(hDB, 0, str, strcomb1(ebuilder_str).c_str());
    1268              :    }
    1269              : 
    1270            0 :    if (auto_restart && restart_count > 0)
    1271              :      {
    1272            0 :        int run_number = 0;
    1273            0 :        int size = sizeof(run_number);
    1274            0 :        status = db_get_value(hDB, 0, "Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
    1275            0 :        assert(status == SUCCESS);
    1276              : 
    1277            0 :        cm_msg(MINFO, frontend_name, "Restart the run!");
    1278              : 
    1279            0 :        cm_transition(TR_START, run_number+1, NULL, 0, TR_SYNC, 0);
    1280              :      }
    1281              :      
    1282              :    /* initialize ss_getchar */
    1283            0 :    ss_getchar(0);
    1284              : 
    1285              :    /* Scan fragments... will stay in */
    1286            0 :    status = scan_fragment();
    1287            0 :    printf("%s-Out of scan_fragment\n", frontend_name);
    1288              : 
    1289              :    /* Detach all source from midas */
    1290            0 :    printf("%s-Unbooking\n", frontend_name);
    1291            0 :    source_unbooking();
    1292              : 
    1293            0 :    ebuilder_exit();
    1294              : 
    1295            0 :    auto_restart = 0;
    1296            0 :    db_get_value(hDB, 0, "/Logger/Auto restart", &auto_restart, &size, TID_BOOL, FALSE);
    1297              : 
    1298            0 :    cm_msg(MINFO, frontend_name, "evb exit status %d, auto_restart %d", status, auto_restart);
    1299              : 
    1300            0 :    if (status == EB_USER_ERROR)
    1301              :      {
    1302            0 :        restart_count ++;
    1303            0 :        goto restart;
    1304              :      }
    1305              : 
    1306              :    /* reset terminal */
    1307            0 :    ss_getchar(TRUE);
    1308              : 
    1309            0 :  exit:
    1310              :    /* Free local memory */
    1311            0 :    free_event_buffer(ebset.nfragment);
    1312              : 
    1313              :    /* Clean disconnect from midas */
    1314            0 :    cm_disconnect_experiment();
    1315              :    return 0;
    1316              : }
        

Generated by: LCOV version 2.0-1