LCOV - code coverage report
Current view: top level - src - tmfe.cxx (source / functions) Coverage Total Hit
Test: coverage.info Lines: 4.6 % 1384 63
Test Date: 2025-11-11 10:26:08 Functions: 5.6 % 107 6

            Line data    Source code
       1              : /********************************************************************\
       2              : 
       3              :   Name:         tmfe.cxx
       4              :   Created by:   Konstantin Olchanski - TRIUMF
       5              : 
       6              :   Contents:     C++ MIDAS frontend
       7              : 
       8              : \********************************************************************/
       9              : 
      10              : #undef NDEBUG // midas required assert() to be always enabled
      11              : 
      12              : #include <stdio.h>
      13              : #include <stdarg.h>
      14              : #include <assert.h>
      15              : #include <signal.h> // signal()
      16              : #include <sys/time.h> // gettimeofday()
      17              : 
      18              : #include "tmfe.h"
      19              : 
      20              : #include "midas.h"
      21              : #include "msystem.h"
      22              : #include "mrpc.h"
      23              : #include "mstrlcpy.h"
      24              : 
      25              : //////////////////////////////////////////////////////////////////////
      26              : // error handling
      27              : //////////////////////////////////////////////////////////////////////
      28              : 
      29            0 : TMFeResult TMFeErrorMessage(const std::string& message)
      30              : {
      31            0 :    return TMFeResult(0, message);
      32              : }
      33              : 
      34            0 : TMFeResult TMFeMidasError(const std::string& message, const char* midas_function_name, int midas_status)
      35              : {
      36            0 :    return TMFeResult(midas_status, message + msprintf(", %s() status %d", midas_function_name, midas_status));
      37              : }
      38              : 
      39              : //////////////////////////////////////////////////////////////////////
      40              : // TMFE singleton class
      41              : //////////////////////////////////////////////////////////////////////
      42              : 
      43              : 
      44            1 : TMFE::TMFE() // ctor
      45              : {
      46            1 :    if (gfVerbose)
      47            0 :       printf("TMFE::ctor!\n");
      48            1 : }
      49              : 
      50            0 : TMFE::~TMFE() // dtor
      51              : {
      52            0 :    if (gfVerbose)
      53            0 :       printf("TMFE::dtor!\n");
      54            0 :    assert(!"TMFE::~TMFE(): destruction of the TMFE singleton is not permitted!");
      55            0 : }
      56              : 
      57            1 : TMFE* TMFE::Instance()
      58              : {
      59            1 :    if (!gfMFE)
      60            1 :       gfMFE = new TMFE();
      61              :    
      62            1 :    return gfMFE;
      63              : }
      64              : 
      65            1 : TMFeResult TMFE::Connect(const char* progname, const char* hostname, const char* exptname)
      66              : {
      67            1 :    if (progname)
      68            1 :       fProgramName = progname;
      69              : 
      70            1 :    if (fProgramName.empty()) {
      71            0 :       return TMFeErrorMessage("TMFE::Connect: frontend program name is not set");
      72              :    }
      73              : 
      74            1 :    fHostname = ss_gethostname();
      75              : 
      76              :    int status;
      77              :   
      78            1 :    std::string env_hostname;
      79            1 :    std::string env_exptname;
      80              :    
      81              :    /* get default from environment */
      82            1 :    status = cm_get_environment(&env_hostname, &env_exptname);
      83              : 
      84            1 :    if (status != CM_SUCCESS) {
      85            0 :       return TMFeMidasError("Cannot connect to MIDAS", "cm_get_environment", status);
      86              :    }
      87              : 
      88            1 :    if (hostname && hostname[0]) {
      89            0 :       fMserverHostname = hostname;
      90              :    } else {
      91            1 :       fMserverHostname = env_hostname;
      92              :    }
      93              :    
      94            1 :    if (exptname && exptname[0]) {
      95            0 :       fExptname = exptname;
      96              :    } else {
      97            1 :       fExptname = env_exptname;
      98              :    }
      99              : 
     100            1 :    if (gfVerbose) {
     101            0 :       printf("TMFE::Connect: Program \"%s\" connecting to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
     102              :    }
     103              :    
     104            1 :    int watchdog = DEFAULT_WATCHDOG_TIMEOUT;
     105              :    //int watchdog = 60*1000;
     106              :    
     107            1 :    status = cm_connect_experiment1(fMserverHostname.c_str(), fExptname.c_str(), fProgramName.c_str(), NULL, DEFAULT_ODB_SIZE, watchdog);
     108              :    
     109            1 :    if (status == CM_UNDEF_EXP) {
     110            0 :       return TMFeMidasError(msprintf("Cannot connect to MIDAS, experiment \"%s\" is not defined", fExptname.c_str()), "cm_connect_experiment1", status);
     111            1 :    } else if (status != CM_SUCCESS) {
     112            0 :       return TMFeMidasError("Cannot connect to MIDAS", "cm_connect_experiment1", status);
     113              :    }
     114              : 
     115            1 :    status = cm_get_experiment_database(&fDB, NULL);
     116            1 :    if (status != CM_SUCCESS) {
     117            0 :       return TMFeMidasError("Cannot connect to MIDAS", "cm_get_experiment_database", status);
     118              :    }
     119              : 
     120            1 :    fOdbRoot = MakeMidasOdb(fDB);
     121              : 
     122            1 :    int run_state = 0;
     123              : 
     124            1 :    fOdbRoot->RI("Runinfo/state", &run_state);
     125            1 :    fOdbRoot->RI("Runinfo/run number", &fRunNumber);
     126              : 
     127            1 :    if (run_state == STATE_RUNNING) {
     128            0 :       fStateRunning = true;
     129            1 :    } else if (run_state == STATE_PAUSED) {
     130            0 :       fStateRunning = true;
     131              :    } else {
     132            1 :       fStateRunning = false;
     133              :    }
     134              : 
     135            1 :    RegisterRPCs();
     136              : 
     137            1 :    if (gfVerbose) {
     138            0 :       printf("TMFE::Connect: Program \"%s\" connected to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
     139              :    }
     140              : 
     141            1 :    return TMFeOk();
     142            1 : }
     143              : 
     144            0 : TMFeResult TMFE::SetWatchdogSec(int sec)
     145              : {
     146            0 :    if (sec == 0) {
     147            0 :       cm_set_watchdog_params(false, 0);
     148              :    } else {
     149            0 :       cm_set_watchdog_params(true, sec*1000);
     150              :    }
     151            0 :    return TMFeOk();
     152              : }
     153              : 
     154            1 : TMFeResult TMFE::Disconnect()
     155              : {
     156            1 :    if (gfVerbose)
     157            0 :       printf("TMFE::Disconnect: Disconnecting from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
     158            1 :    StopRpcThread();
     159            1 :    cm_disconnect_experiment();
     160            1 :    if (gfVerbose)
     161            0 :       printf("TMFE::Disconnect: Disconnected from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
     162            1 :    return TMFeOk();
     163              : }
     164              : 
     165              : /////////////////////////////////////////////////////////
     166              : //            event buffer functions
     167              : /////////////////////////////////////////////////////////
     168              : 
     169            0 : TMEventBuffer::TMEventBuffer(TMFE* mfe) // ctor
     170              : {
     171            0 :    assert(mfe != NULL);
     172            0 :    fMfe = mfe;
     173            0 : };
     174              : 
     175            0 : TMEventBuffer::~TMEventBuffer() // dtor
     176              : {
     177            0 :    CloseBuffer();
     178              : 
     179              :    // poison all pointers
     180            0 :    fMfe = NULL;
     181            0 : };
     182              : 
     183            0 : TMFeResult TMEventBuffer::OpenBuffer(const char* bufname, size_t bufsize)
     184              : {
     185            0 :    if (fBufHandle) {
     186            0 :       return TMFeErrorMessage(msprintf("Event buffer \"%s\" is already open", fBufName.c_str()));
     187              :    }
     188              :    
     189            0 :    fBufName = bufname;
     190              : 
     191            0 :    if (bufsize == 0)
     192            0 :       bufsize = DEFAULT_BUFFER_SIZE;
     193              : 
     194            0 :    int status = bm_open_buffer(fBufName.c_str(), bufsize, &fBufHandle);
     195              : 
     196            0 :    if (status != BM_SUCCESS && status != BM_CREATED) {
     197            0 :       return TMFeMidasError(msprintf("Cannot open event buffer \"%s\"", fBufName.c_str()), "bm_open_buffer", status);
     198              :    }
     199              : 
     200            0 :    fBufSize = 0;
     201            0 :    fBufMaxEventSize = 0;
     202              : 
     203            0 :    uint32_t buf_size = 0;
     204            0 :    uint32_t max_event_size = 0;
     205              : 
     206            0 :    fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &max_event_size);
     207            0 :    fMfe->fOdbRoot->RU32((std::string("Experiment/Buffer Sizes/") + bufname).c_str(), &buf_size);
     208              : 
     209            0 :    if (buf_size > 0) {
     210              :       // limit event size to half the buffer size, so we can buffer two events
     211            0 :       uint32_t xmax_event_size = buf_size / 2;
     212              :       // add extra margin
     213            0 :       if (xmax_event_size > 1024)
     214            0 :          xmax_event_size -= 1024;
     215            0 :       if (max_event_size > xmax_event_size)
     216            0 :          max_event_size = xmax_event_size;
     217              :    }
     218              : 
     219            0 :    fBufSize = buf_size;
     220            0 :    fBufMaxEventSize = max_event_size;
     221              : 
     222            0 :    if (fBufSize == 0) {
     223            0 :       return TMFeErrorMessage(msprintf("Cannot get buffer size for event buffer \"%s\"", fBufName.c_str()));
     224              :    }
     225              : 
     226            0 :    if (fBufMaxEventSize == 0) {
     227            0 :       return TMFeErrorMessage(msprintf("Cannot get MAX_EVENT_SIZE for event buffer \"%s\"", fBufName.c_str()));
     228              :    }
     229              : 
     230            0 :    printf("TMEventBuffer::OpenBuffer: Buffer \"%s\" size %d, max event size %d\n", fBufName.c_str(), (int)fBufSize, (int)fBufMaxEventSize);
     231              : 
     232            0 :    return TMFeOk();
     233              : }
     234              : 
     235            0 : TMFeResult TMEventBuffer::CloseBuffer()
     236              : {
     237            0 :    if (!fBufHandle)
     238            0 :       return TMFeOk();
     239              :    
     240            0 :    fBufRequests.clear(); // no need to cancel individual requests, they are gone after we close the buffer
     241              :    
     242            0 :    int status = bm_close_buffer(fBufHandle);
     243              :    
     244            0 :    if (status != BM_SUCCESS) {
     245            0 :       fBufHandle = 0;
     246            0 :       return TMFeMidasError(msprintf("Cannot close event buffer \"%s\"", fBufName.c_str()), "bm_close_buffer", status);
     247              :    }
     248              :    
     249            0 :    fBufHandle = 0;
     250            0 :    fBufSize = 0;
     251            0 :    fBufMaxEventSize = 0;
     252            0 :    fBufReadCacheSize = 0;
     253            0 :    fBufWriteCacheSize = 0;
     254              :    
     255            0 :    return TMFeOk();
     256              : }
     257              : 
     258            0 : TMFeResult TMEventBuffer::SetCacheSize(size_t read_cache_size, size_t write_cache_size)
     259              : {
     260            0 :    int status = bm_set_cache_size(fBufHandle, read_cache_size, write_cache_size);
     261              :    
     262            0 :    if (status != BM_SUCCESS) {
     263            0 :       return TMFeMidasError(msprintf("Cannot set event buffer \"%s\" cache sizes: read %d, write %d", fBufName.c_str(), (int)read_cache_size, (int)write_cache_size), "bm_set_cache_size", status);
     264              :    }
     265              : 
     266            0 :    fBufReadCacheSize = read_cache_size;
     267            0 :    fBufWriteCacheSize = write_cache_size;
     268              :    
     269            0 :    return TMFeOk();
     270              : }
     271              : 
     272            0 : TMFeResult TMEventBuffer::AddRequest(int event_id, int trigger_mask, const char* sampling_type_string)
     273              : {
     274            0 :    if (!fBufHandle) {
     275            0 :       return TMFeErrorMessage(msprintf("AddRequest: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
     276              :    }
     277              : 
     278            0 :    int sampling_type = 0;
     279              :    
     280            0 :    if (strcmp(sampling_type_string, "GET_ALL")==0) {
     281            0 :       sampling_type = GET_ALL;
     282            0 :    } else if (strcmp(sampling_type_string, "GET_NONBLOCKING")==0) {
     283            0 :       sampling_type = GET_NONBLOCKING;
     284            0 :    } else if (strcmp(sampling_type_string, "GET_RECENT")==0) {
     285            0 :       sampling_type = GET_RECENT;
     286              :    } else {
     287            0 :       sampling_type = GET_ALL;
     288              :    }
     289              :    
     290            0 :    int request_id = 0;
     291              :       
     292            0 :    int status = bm_request_event(fBufHandle, event_id, trigger_mask, sampling_type, &request_id, NULL);
     293              :    
     294            0 :    if (status != BM_SUCCESS) {
     295            0 :       return TMFeMidasError(msprintf("Cannot make event request on buffer \"%s\"", fBufName.c_str()), "bm_request_event", status);
     296              :    }
     297              :    
     298            0 :    fBufRequests.push_back(request_id);
     299              :    
     300            0 :    return TMFeOk();
     301              : }
     302              : 
     303            0 : TMFeResult TMEventBuffer::ReceiveEvent(std::vector<char> *e, int timeout_msec)
     304              : {
     305            0 :    if (!fBufHandle) {
     306            0 :       return TMFeErrorMessage(msprintf("ReceiveEvent: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
     307              :    }
     308              : 
     309            0 :    assert(e != NULL);
     310              :    
     311            0 :    e->resize(0);
     312              :    
     313            0 :    int status = bm_receive_event_vec(fBufHandle, e, timeout_msec);
     314              :    
     315            0 :    if (status == BM_ASYNC_RETURN) {
     316            0 :       return TMFeOk();
     317              :    }
     318              :    
     319            0 :    if (status != BM_SUCCESS) {
     320            0 :       return TMFeMidasError(msprintf("Cannot receive event on buffer \"%s\"", fBufName.c_str()), "bm_receive_event", status);
     321              :    }
     322              :    
     323            0 :    return TMFeOk();
     324              : }
     325              : 
     326            0 : TMFeResult TMEventBuffer::SendEvent(const char *e)
     327              : {
     328            0 :    const EVENT_HEADER *pevent = (const EVENT_HEADER*)e;
     329            0 :    const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
     330              :    //const size_t total_size = ALIGN8(event_size);
     331            0 :    return SendEvent(1, &e, &event_size);
     332              : }
     333              : 
     334            0 : TMFeResult TMEventBuffer::SendEvent(const std::vector<char>& e)
     335              : {
     336            0 :    const EVENT_HEADER *pevent = (const EVENT_HEADER*)e.data();
     337            0 :    const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
     338              :    //const size_t total_size = ALIGN8(event_size);
     339            0 :    if (e.size() != event_size) {
     340            0 :       return TMFeErrorMessage(msprintf("Cannot send event, size mismatch: vector size %d, data_size %d, event_size %d", (int)e.size(), (int)pevent->data_size, (int)event_size).c_str());
     341              :    }
     342              : 
     343            0 :    return SendEvent(1, (char**)&pevent, &event_size);
     344              : }
     345              : 
     346            0 : TMFeResult TMEventBuffer::SendEvent(const std::vector<std::vector<char>>& e)
     347              : {
     348            0 :    int sg_n = e.size();
     349            0 :    const char* sg_ptr[sg_n];
     350            0 :    size_t sg_len[sg_n];
     351            0 :    for (int i=0; i<sg_n; i++) {
     352            0 :       sg_ptr[i] = e[i].data();
     353            0 :       sg_len[i] = e[i].size();
     354              :    }
     355            0 :    return SendEvent(sg_n, sg_ptr, sg_len);
     356            0 : }
     357              : 
     358            0 : TMFeResult TMEventBuffer::SendEvent(int sg_n, const char* const sg_ptr[], const size_t sg_len[])
     359              : {
     360            0 :    int status = bm_send_event_sg(fBufHandle, sg_n, sg_ptr, sg_len, BM_WAIT);
     361              : 
     362            0 :    if (status == BM_CORRUPTED) {
     363            0 :       fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d, event buffer is corrupted, shutting down the frontend", fBufName.c_str(), status);
     364            0 :       fMfe->fShutdownRequested = true;
     365            0 :       return TMFeMidasError("Cannot send event, event buffer is corrupted, shutting down the frontend", "bm_send_event", status);
     366            0 :    } else if (status != BM_SUCCESS) {
     367            0 :       fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d", fBufName.c_str(), status);
     368            0 :       return TMFeMidasError("Cannot send event", "bm_send_event", status);
     369              :    }
     370              : 
     371            0 :    return TMFeOk();
     372              : }
     373              : 
     374            0 : TMFeResult TMEventBuffer::FlushCache(bool wait)
     375              : {
     376            0 :    if (!fBufHandle)
     377            0 :       return TMFeOk();
     378              :    
     379            0 :    int flag = BM_NO_WAIT;
     380            0 :    if (wait)
     381            0 :       flag = BM_WAIT;
     382              :    
     383              :    /* flush of event socket in no-wait mode does nothing */
     384            0 :    if (wait && rpc_is_remote()) {
     385            0 :       int status = bm_flush_cache(0, flag);
     386              : 
     387              :       //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
     388              : 
     389            0 :       if (status == BM_SUCCESS) {
     390              :          // nothing
     391            0 :       } else if (status == BM_ASYNC_RETURN) {
     392              :          // nothing
     393              :       } else {
     394            0 :          return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
     395              :       }
     396              :    }
     397              : 
     398            0 :    int status = bm_flush_cache(fBufHandle, flag);
     399              : 
     400              :    //printf("bm_flush_cache(%d,%d) status %d\n", fBufHandle, flag, status);
     401              : 
     402            0 :    if (status == BM_SUCCESS) {
     403              :       // nothing
     404            0 :    } else if (status == BM_ASYNC_RETURN) {
     405              :       // nothing
     406              :    } else {
     407            0 :       return TMFeMidasError(msprintf("Cannot flush event buffer \"%s\"", fBufName.c_str()).c_str(), "bm_flush_cache", status);
     408              :    }
     409              : 
     410            0 :    return TMFeOk();
     411              : }
     412              : 
     413            0 : TMFeResult TMFE::EventBufferOpen(TMEventBuffer** pbuf, const char* bufname, size_t bufsize)
     414              : {
     415            0 :    assert(pbuf != NULL);
     416            0 :    assert(bufname != NULL);
     417              :    
     418            0 :    std::lock_guard<std::mutex> guard(fEventBuffersMutex);
     419              : 
     420            0 :    for (auto b : fEventBuffers) {
     421            0 :       if (!b)
     422            0 :          continue;
     423              : 
     424            0 :       if (b->fBufName == bufname) {
     425            0 :          *pbuf = b;
     426            0 :          if (bufsize != 0 && bufsize > b->fBufSize) {
     427            0 :             Msg(MERROR, "TMFE::EventBufferOpen", "Event buffer \"%s\" size %d is smaller than requested size %d", b->fBufName.c_str(), (int)b->fBufSize, (int)bufsize);
     428              :          }
     429            0 :          return TMFeOk();
     430              :       }
     431              :    }
     432              : 
     433            0 :    TMEventBuffer *b = new TMEventBuffer(this);
     434              : 
     435            0 :    fEventBuffers.push_back(b);
     436              : 
     437            0 :    *pbuf = b;
     438              : 
     439            0 :    TMFeResult r = b->OpenBuffer(bufname, bufsize);
     440              : 
     441            0 :    if (r.error_flag) {
     442            0 :       return r;
     443              :    }
     444              : 
     445            0 :    return TMFeOk();
     446            0 : }
     447              : 
     448            0 : TMFeResult TMFE::EventBufferFlushCacheAll(bool wait)
     449              : {
     450            0 :    int flag = BM_NO_WAIT;
     451            0 :    if (wait)
     452            0 :       flag = BM_WAIT;
     453              : 
     454              :    /* flush of event socket in no-wait mode does nothing */
     455            0 :    if (wait && rpc_is_remote()) {
     456            0 :       int status = bm_flush_cache(0, flag);
     457              : 
     458              :       //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
     459              : 
     460            0 :       if (status == BM_SUCCESS) {
     461              :          // nothing
     462            0 :       } else if (status == BM_ASYNC_RETURN) {
     463              :          // nothing
     464              :       } else {
     465            0 :          return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
     466              :       }
     467              :    }
     468              : 
     469            0 :    std::lock_guard<std::mutex> guard(fEventBuffersMutex);
     470              : 
     471            0 :    for (auto b : fEventBuffers) {
     472            0 :       if (!b)
     473            0 :          continue;
     474              : 
     475            0 :       TMFeResult r = b->FlushCache(wait);
     476              : 
     477            0 :       if (r.error_flag)
     478            0 :          return r;
     479            0 :    }
     480              : 
     481            0 :    return TMFeOk();
     482            0 : }
     483              : 
     484            0 : TMFeResult TMFE::EventBufferCloseAll()
     485              : {
     486            0 :    std::lock_guard<std::mutex> guard(fEventBuffersMutex);
     487              : 
     488            0 :    for (auto b : fEventBuffers) {
     489            0 :       if (!b)
     490            0 :          continue;
     491            0 :       TMFeResult r = b->CloseBuffer();
     492            0 :       if (r.error_flag)
     493            0 :          return r;
     494              : 
     495            0 :       delete b;
     496            0 :    }
     497              : 
     498            0 :    fEventBuffers.clear();
     499              : 
     500            0 :    return TMFeOk();
     501            0 : }
     502              : 
     503              : /////////////////////////////////////////////////////////
     504              : //            equipment functions
     505              : /////////////////////////////////////////////////////////
     506              : 
     507            0 : double TMFrontend::FePeriodicTasks()
     508              : {
     509            0 :    double now = TMFE::GetTime();
     510              : 
     511            0 :    double next_periodic = now + 60;
     512              : 
     513            0 :    int n = fFeEquipments.size();
     514            0 :    for (int i=0; i<n; i++) {
     515            0 :       TMFeEquipment* eq = fFeEquipments[i];
     516            0 :       if (!eq)
     517            0 :          continue;
     518            0 :       if (!eq->fEqConfEnabled)
     519            0 :          continue;
     520            0 :       if (!eq->fEqConfEnablePeriodic)
     521            0 :          continue;
     522            0 :       double period = eq->fEqConfPeriodMilliSec/1000.0;
     523            0 :       if (period <= 0)
     524            0 :          continue;
     525            0 :       if (eq->fEqPeriodicNextCallTime == 0)
     526            0 :          eq->fEqPeriodicNextCallTime = now + 0.5; // we are off by 0.5 sec with updating of statistics
     527              :       //printf("periodic[%d] period %f, last call %f, next call %f (%f)\n", i, period, eq->fEqPeriodicLastCallTime, eq->fEqPeriodicNextCallTime, now - eq->fEqPeriodicNextCallTime);
     528            0 :       if (now >= eq->fEqPeriodicNextCallTime) {
     529            0 :          eq->fEqPeriodicNextCallTime += period;
     530              :          
     531            0 :          if (eq->fEqPeriodicNextCallTime < now) {
     532            0 :             if (TMFE::gfVerbose)
     533            0 :                printf("TMFE::EquipmentPeriodicTasks: periodic equipment \"%s\" skipped some beats!\n", eq->fEqName.c_str());
     534            0 :             fMfe->Msg(MERROR, "TMFE::EquipmentPeriodicTasks", "Equipment \"%s\" skipped some beats!", eq->fEqName.c_str());
     535            0 :             while (eq->fEqPeriodicNextCallTime < now) {
     536            0 :                eq->fEqPeriodicNextCallTime += period;
     537              :             }
     538              :          }
     539              :          
     540            0 :          if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
     541            0 :             eq->fEqPeriodicLastCallTime = now;
     542              :             //printf("handler %d eq [%s] call HandlePeriodic()\n", i, h->fEq->fName.c_str());                     
     543            0 :             eq->HandlePeriodic();
     544              :          }
     545              :          
     546            0 :          now = TMFE::GetTime();
     547              :       }
     548              :       
     549            0 :       if (eq->fEqPeriodicNextCallTime < next_periodic)
     550            0 :          next_periodic = eq->fEqPeriodicNextCallTime;
     551              :    }
     552              : 
     553            0 :    now = TMFE::GetTime();
     554              : 
     555              :    // update statistics
     556            0 :    for (auto eq : fFeEquipments) {
     557            0 :       if (!eq)
     558            0 :          continue;
     559            0 :       if (!eq->fEqConfEnabled)
     560            0 :          continue;
     561            0 :       double next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
     562            0 :       if (now > next) {
     563            0 :          eq->EqWriteStatistics();
     564            0 :          next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
     565              :       }
     566            0 :       if (next < next_periodic)
     567            0 :          next_periodic = next;
     568              :    }
     569              : 
     570            0 :    now = TMFE::GetTime();
     571              : 
     572              :    // flush write cache
     573            0 :    if ((fFeFlushWriteCachePeriodSec > 0) && (now >= fFeFlushWriteCacheNextCallTime)) {
     574            0 :       fMfe->EventBufferFlushCacheAll(false);
     575            0 :       fFeFlushWriteCacheNextCallTime = now + fFeFlushWriteCachePeriodSec;
     576            0 :       if (fFeFlushWriteCacheNextCallTime < next_periodic)
     577            0 :          next_periodic = fFeFlushWriteCacheNextCallTime;
     578              :    }
     579              : 
     580            0 :    return next_periodic;
     581              : }
     582              : 
     583            0 : double TMFrontend::FePollTasks(double next_periodic_time)
     584              : {
     585              :    //printf("poll %f next %f diff %f\n", TMFE::GetTime(), next_periodic_time, next_periodic_time - TMFE::GetTime());
     586              :    
     587            0 :    double poll_sleep_sec = 9999.0;
     588            0 :    while (!fMfe->fShutdownRequested) {
     589            0 :       bool poll_again = false;
     590              :       // NOTE: ok to use range-based for() loop, there will be a crash if HandlePoll() or HandlePollRead() modify fEquipments, so they should not do that. K.O.
     591            0 :       for (auto eq : fFeEquipments) {
     592            0 :          if (!eq)
     593            0 :             continue;
     594            0 :          if (!eq->fEqConfEnabled)
     595            0 :             continue;
     596            0 :          if (eq->fEqConfEnablePoll && !eq->fEqPollThreadRunning && !eq->fEqPollThreadStarting) {
     597            0 :             if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
     598            0 :                if (eq->fEqConfPollSleepSec < poll_sleep_sec)
     599            0 :                   poll_sleep_sec = eq->fEqConfPollSleepSec;
     600            0 :                bool poll = eq->HandlePoll();
     601            0 :                if (poll) {
     602            0 :                   poll_again = true;
     603            0 :                   eq->HandlePollRead();
     604              :                }
     605              :             }
     606              :          }
     607              :       }
     608            0 :       if (!poll_again)
     609            0 :          break;
     610              : 
     611            0 :       if (next_periodic_time) {
     612              :          // stop polling if we need to run periodic activity
     613            0 :          double now = TMFE::TMFE::GetTime();
     614            0 :          if (now >= next_periodic_time)
     615            0 :             break;
     616              :       }
     617              :    }
     618            0 :    return poll_sleep_sec;
     619              : }
     620              : 
     621            0 : void TMFeEquipment::EqPollThread()
     622              : {
     623            0 :    if (TMFE::gfVerbose)
     624            0 :       printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread started\n", fEqName.c_str());
     625              :    
     626            0 :    fEqPollThreadRunning = true;
     627              : 
     628            0 :    while (!fMfe->fShutdownRequested && !fEqPollThreadShutdownRequested) {
     629            0 :       if (fMfe->fStateRunning || !fEqConfReadOnlyWhenRunning) {
     630            0 :          bool poll = HandlePoll();
     631            0 :          if (poll) {
     632            0 :             HandlePollRead();
     633              :          } else {
     634            0 :             if (fEqConfPollSleepSec > 0) {
     635            0 :                TMFE::Sleep(fEqConfPollSleepSec);
     636              :             }
     637              :          }
     638            0 :       } else {
     639            0 :          TMFE::Sleep(0.1);
     640              :       }
     641              :    }
     642            0 :    if (TMFE::gfVerbose)
     643            0 :       printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread stopped\n", fEqName.c_str());
     644              : 
     645            0 :    fEqPollThreadRunning = false;
     646            0 : }
     647              : 
     648            0 : void TMFeEquipment::EqStartPollThread()
     649              : {
     650              :    // NOTE: this is thread safe
     651              : 
     652            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
     653              : 
     654            0 :    if (fEqPollThreadRunning || fEqPollThreadStarting || fEqPollThread) {
     655            0 :       fMfe->Msg(MERROR, "TMFeEquipment::EqStartPollThread", "Equipment \"%s\": poll thread is already running", fEqName.c_str());
     656            0 :       return;
     657              :    }
     658              : 
     659            0 :    fEqPollThreadShutdownRequested = false;
     660            0 :    fEqPollThreadStarting = true;
     661              : 
     662            0 :    fEqPollThread = new std::thread(&TMFeEquipment::EqPollThread, this);
     663            0 : }
     664              : 
     665            0 : void TMFeEquipment::EqStopPollThread()
     666              : {
     667              :    // NOTE: this is thread safe
     668            0 :    fEqPollThreadStarting = false;
     669            0 :    fEqPollThreadShutdownRequested = true;
     670            0 :    for (int i=0; i<100; i++) {
     671            0 :       if (!fEqPollThreadRunning) {
     672            0 :          std::lock_guard<std::mutex> guard(fEqMutex);
     673            0 :          if (fEqPollThread) {
     674            0 :             fEqPollThread->join();
     675            0 :             delete fEqPollThread;
     676            0 :             fEqPollThread = NULL;
     677              :          }
     678            0 :          return;
     679            0 :       }
     680            0 :       TMFE::Sleep(0.1);
     681              :    }
     682            0 :    if (fEqPollThreadRunning) {
     683            0 :       fMfe->Msg(MERROR, "TMFeEquipment::EqStopPollThread", "Equipment \"%s\": timeout waiting for shutdown of poll thread", fEqName.c_str());
     684              :    }
     685              : }
     686              : 
     687            0 : void TMFE::StopRun()
     688              : {
     689              :    char str[TRANSITION_ERROR_STRING_LENGTH];
     690              : 
     691            0 :    int status = cm_transition(TR_STOP, 0, str, sizeof(str), TR_SYNC, FALSE);
     692            0 :    if (status != CM_SUCCESS) {
     693            0 :       Msg(MERROR, "TMFE::StopRun", "Cannot stop run, error: %s", str);
     694            0 :       fRunStopRequested = false;
     695            0 :       return;
     696              :    }
     697              : 
     698            0 :    fRunStopRequested = false;
     699              : 
     700            0 :    bool logger_auto_restart = false;
     701            0 :    fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
     702              : 
     703            0 :    int logger_auto_restart_delay = 0;
     704            0 :    fOdbRoot->RI("Logger/Auto restart delay", &logger_auto_restart_delay);
     705              : 
     706            0 :    if (logger_auto_restart) {
     707            0 :       Msg(MINFO, "TMFE::StopRun", "Run will restart after %d seconds", logger_auto_restart_delay);
     708            0 :       fRunStartTime = GetTime() + logger_auto_restart_delay;
     709              :    } else {
     710            0 :       fRunStartTime = 0;
     711              :    }
     712              : }
     713              : 
     714            0 : void TMFE::StartRun()
     715              : {
     716            0 :    fRunStartTime = 0;
     717              : 
     718              :    /* check if really stopped */
     719            0 :    int run_state = 0;
     720            0 :    fOdbRoot->RI("Runinfo/State", &run_state);
     721              : 
     722            0 :    if (run_state != STATE_STOPPED) {
     723            0 :       Msg(MERROR, "TMFE::StartRun", "Run start requested, but run is already in progress");
     724            0 :       return;
     725              :    }
     726              : 
     727            0 :    bool logger_auto_restart = false;
     728            0 :    fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
     729              : 
     730            0 :    if (!logger_auto_restart) {
     731            0 :       Msg(MERROR, "TMFE::StartRun", "Run start requested, but logger/auto restart is off");
     732            0 :       return;
     733              :    }
     734              : 
     735            0 :    Msg(MTALK, "TMFE::StartRun", "Starting new run");
     736              : 
     737              :    char str[TRANSITION_ERROR_STRING_LENGTH];
     738              : 
     739            0 :    int status = cm_transition(TR_START, 0, str, sizeof(str), TR_SYNC, FALSE);
     740            0 :    if (status != CM_SUCCESS) {
     741            0 :       Msg(MERROR, "TMFE::StartRun", "Cannot restart run, error: %s", str);
     742              :    }
     743              : }
     744              : 
     745            0 : void TMFrontend::FePollMidas(double sleep_sec)
     746              : {
     747            0 :    assert(sleep_sec >= 0);
     748            0 :    bool debug = false;
     749            0 :    double now = TMFE::GetTime();
     750            0 :    double sleep_start = now;
     751            0 :    double sleep_end = now + sleep_sec;
     752            0 :    int count_yield_loops = 0;
     753              : 
     754            0 :    while (!fMfe->fShutdownRequested) {
     755            0 :       double next_periodic_time = 0;
     756            0 :       double poll_sleep = 1.0;
     757              : 
     758            0 :       if (!fFePeriodicThreadRunning) {
     759            0 :          next_periodic_time = FePeriodicTasks();
     760            0 :          poll_sleep = FePollTasks(next_periodic_time);
     761              :       } else {
     762            0 :          poll_sleep = FePollTasks(TMFE::GetTime() + 0.100);
     763              :       }
     764              : 
     765            0 :       if (fMfe->fRunStopRequested) {
     766            0 :          fMfe->StopRun();
     767            0 :          continue;
     768              :       }
     769              : 
     770            0 :       now = TMFE::GetTime();
     771              : 
     772            0 :       if (fMfe->fRunStartTime && now >= fMfe->fRunStartTime) {
     773            0 :          fMfe->StartRun();
     774            0 :          continue;
     775              :       }
     776              : 
     777            0 :       double sleep_time = sleep_end - now;
     778              : 
     779            0 :       if (next_periodic_time > 0 && next_periodic_time < sleep_end) {
     780            0 :          sleep_time = next_periodic_time - now;
     781              :       }
     782              : 
     783            0 :       int s = 0;
     784            0 :       if (sleep_time > 0)
     785            0 :          s = 1 + sleep_time*1000.0;
     786              : 
     787            0 :       if (poll_sleep*1000.0 < s) {
     788            0 :          s = 0;
     789              :       }
     790              : 
     791            0 :       if (debug) {
     792            0 :          printf("now %.6f, sleep_end %.6f, next_periodic %.6f, sleep_time %.6f, cm_yield(%d), poll period %.6f\n", now, sleep_end, next_periodic_time, sleep_time, s, poll_sleep);
     793              :       }
     794              : 
     795            0 :       int status = cm_yield(s);
     796              :       
     797            0 :       if (status == RPC_SHUTDOWN || status == SS_ABORT) {
     798            0 :          fMfe->fShutdownRequested = true;
     799            0 :          if (TMFE::gfVerbose) {
     800            0 :             fprintf(stderr, "TMFE::PollMidas: cm_yield(%d) status %d, shutdown requested...\n", s, status);
     801              :          }
     802              :       }
     803              : 
     804            0 :       now = TMFE::GetTime();
     805            0 :       double sleep_more = sleep_end - now;
     806            0 :       if (sleep_more <= 0)
     807            0 :          break;
     808              : 
     809            0 :       count_yield_loops++;
     810              : 
     811            0 :       if (poll_sleep < sleep_more) {
     812            0 :          TMFE::Sleep(poll_sleep);
     813              :       }
     814              :    }
     815              : 
     816            0 :    if (debug) {
     817            0 :       printf("TMFE::PollMidas: sleep %.1f msec, actual %.1f msec, %d loops\n", sleep_sec * 1000.0, (now - sleep_start) * 1000.0, count_yield_loops);
     818              :    }
     819            0 : }
     820              : 
     821            0 : void TMFE::Yield(double sleep_sec)
     822              : {
     823            0 :    double now = GetTime();
     824              :    //double sleep_start = now;
     825            0 :    double sleep_end = now + sleep_sec;
     826              : 
     827            0 :    while (!fShutdownRequested) {
     828            0 :       now = GetTime();
     829              : 
     830            0 :       double sleep_time = sleep_end - now;
     831            0 :       int s = 0;
     832            0 :       if (sleep_time > 0)
     833            0 :          s = 1 + sleep_time*1000.0;
     834              : 
     835              :       //printf("TMFE::Yield: now %f, sleep_end %f, s %d\n", now, sleep_end, s);
     836              :       
     837            0 :       int status = cm_yield(s);
     838              :       
     839            0 :       if (status == RPC_SHUTDOWN || status == SS_ABORT) {
     840            0 :          fShutdownRequested = true;
     841            0 :          fprintf(stderr, "TMFE::Yield: cm_yield(%d) status %d, shutdown requested...\n", s, status);
     842              :       }
     843              : 
     844            0 :       now = GetTime();
     845            0 :       if (now >= sleep_end)
     846            0 :          break;
     847              :    }
     848              : 
     849              :    //printf("TMFE::Yield: sleep_sec %.6f, actual %.6f sec\n", sleep_sec, now - sleep_start);
     850            0 : }
     851              : 
     852            0 : void TMFE::MidasPeriodicTasks()
     853              : {
     854            0 :    cm_periodic_tasks();
     855            0 : }
     856              : 
     857            0 : void TMFE::RpcThread()
     858              : {
     859            0 :    if (TMFE::gfVerbose)
     860            0 :       printf("TMFE::RpcThread: RPC thread started\n");
     861              : 
     862            0 :    int msec = 1000;
     863              : 
     864            0 :    fRpcThreadRunning = true;
     865            0 :    ss_suspend_set_rpc_thread(ss_gettid());
     866              : 
     867            0 :    while (!fShutdownRequested && !fRpcThreadShutdownRequested) {
     868              : 
     869            0 :       int status = cm_yield(msec);
     870              : 
     871            0 :       if (status == RPC_SHUTDOWN || status == SS_ABORT) {
     872            0 :          fShutdownRequested = true;
     873            0 :          if (TMFE::gfVerbose)
     874            0 :             printf("TMFE::RpcThread: cm_yield(%d) status %d, shutdown requested...\n", msec, status);
     875              :       }
     876              :    }
     877            0 :    ss_suspend_exit();
     878            0 :    if (TMFE::gfVerbose)
     879            0 :       printf("TMFE::RpcThread: RPC thread stopped\n");
     880            0 :    fRpcThreadRunning = false;
     881            0 : }
     882              : 
     883            0 : void TMFrontend::FePeriodicThread()
     884              : {
     885            0 :    if (TMFE::gfVerbose)
     886            0 :       printf("TMFE::PeriodicThread: periodic thread started\n");
     887              :    
     888            0 :    fFePeriodicThreadRunning = true;
     889            0 :    while (!fMfe->fShutdownRequested && !fFePeriodicThreadShutdownRequested) {
     890            0 :       double next_periodic_time = FePeriodicTasks();
     891            0 :       double now = TMFE::GetTime();
     892            0 :       double sleep = next_periodic_time - now;
     893              :       //printf("TMFrontend::FePeriodicThread: now %.6f next %.6f, sleep %.6f\n", now, next_periodic_time, sleep);
     894            0 :       if (sleep >= 1.0)
     895            0 :          sleep = 1.0;
     896            0 :       TMFE::Sleep(sleep);
     897              :    }
     898            0 :    if (TMFE::gfVerbose)
     899            0 :       printf("TMFE::PeriodicThread: periodic thread stopped\n");
     900            0 :    fFePeriodicThreadRunning = false;
     901            0 : }
     902              : 
     903            0 : void TMFE::StartRpcThread()
     904              : {
     905              :    // NOTE: this is thread safe
     906              : 
     907            0 :    std::lock_guard<std::mutex> guard(fMutex);
     908              :    
     909            0 :    if (fRpcThreadRunning || fRpcThreadStarting || fRpcThread) {
     910            0 :       if (gfVerbose)
     911            0 :          printf("TMFE::StartRpcThread: RPC thread already running\n");
     912            0 :       return;
     913              :    }
     914              :    
     915            0 :    fRpcThreadStarting = true;
     916            0 :    fRpcThread = new std::thread(&TMFE::RpcThread, this);
     917            0 : }
     918              : 
     919            0 : void TMFrontend::FeStartPeriodicThread()
     920              : {
     921              :    // NOTE: this is thread safe
     922              : 
     923            0 :    std::lock_guard<std::mutex> guard(fFeMutex);
     924              : 
     925            0 :    if (fFePeriodicThreadRunning || fFePeriodicThreadStarting || fFePeriodicThread) {
     926            0 :       if (TMFE::gfVerbose)
     927            0 :          printf("TMFE::StartPeriodicThread: periodic thread already running\n");
     928            0 :       return;
     929              :    }
     930              : 
     931            0 :    fFePeriodicThreadStarting = true;
     932            0 :    fFePeriodicThread = new std::thread(&TMFrontend::FePeriodicThread, this);
     933            0 : }
     934              : 
     935            1 : void TMFE::StopRpcThread()
     936              : {
     937              :    // NOTE: this is thread safe
     938              :    
     939            1 :    fRpcThreadStarting = false;
     940            1 :    fRpcThreadShutdownRequested = true;
     941              :    
     942            1 :    for (int i=0; i<60; i++) {
     943            1 :       if (!fRpcThreadRunning) {
     944            1 :          std::lock_guard<std::mutex> guard(fMutex);
     945            1 :          if (fRpcThread) {
     946            0 :             fRpcThread->join();
     947            0 :             delete fRpcThread;
     948            0 :             fRpcThread = NULL;
     949            0 :             if (gfVerbose)
     950            0 :                printf("TMFE::StopRpcThread: RPC thread stopped\n");
     951              :          }
     952            1 :          return;
     953            1 :       }
     954            0 :       if (i>5) {
     955            0 :          fprintf(stderr, "TMFE::StopRpcThread: waiting for RPC thread to stop\n");
     956              :       }
     957            0 :       ::sleep(1);
     958              :    }
     959              : 
     960            0 :    fprintf(stderr, "TMFE::StopRpcThread: timeout waiting for RPC thread to stop\n");
     961              : }
     962              : 
     963            0 : void TMFrontend::FeStopPeriodicThread()
     964              : {
     965              :    // NOTE: this is thread safe
     966              :    
     967            0 :    fFePeriodicThreadStarting = false;
     968            0 :    fFePeriodicThreadShutdownRequested = true;
     969              : 
     970            0 :    for (int i=0; i<60; i++) {
     971            0 :       if (!fFePeriodicThreadRunning) {
     972            0 :          std::lock_guard<std::mutex> guard(fFeMutex);
     973            0 :          if (fFePeriodicThread) {
     974            0 :             fFePeriodicThread->join();
     975            0 :             delete fFePeriodicThread;
     976            0 :             fFePeriodicThread = NULL;
     977            0 :             if (TMFE::gfVerbose)
     978            0 :                printf("TMFE::StopPeriodicThread: periodic thread stopped\n");
     979              :          }
     980            0 :          return;
     981            0 :       }
     982            0 :       if (i>5) {
     983            0 :          fprintf(stderr, "TMFE::StopPeriodicThread: waiting for periodic thread to stop\n");
     984              :       }
     985            0 :       ::sleep(1);
     986              :    }
     987              : 
     988            0 :    fprintf(stderr, "TMFE::StopPeriodicThread: timeout waiting for periodic thread to stop\n");
     989              : }
     990              : 
     991            0 : void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const char *format, ...)
     992              : {
     993              :    char message[1024];
     994              :    //printf("format [%s]\n", format);
     995              :    va_list ap;
     996            0 :    va_start(ap, format);
     997            0 :    vsnprintf(message, sizeof(message)-1, format, ap);
     998            0 :    va_end(ap);
     999              :    //printf("message [%s]\n", message);
    1000            0 :    cm_msg(message_type, filename, line, routine, "%s", message);
    1001            0 :    cm_msg_flush_buffer();
    1002            0 : }
    1003              : 
    1004            0 : void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const std::string& message)
    1005              : {
    1006              :    //printf("message [%s]\n", message.c_str());
    1007            0 :    cm_msg(message_type, filename, line, routine, "%s", message.c_str());
    1008            0 :    cm_msg_flush_buffer();
    1009            0 : }
    1010              : 
    1011            0 : double TMFE::GetTime()
    1012              : {
    1013              :    struct timeval tv;
    1014            0 :    gettimeofday(&tv, NULL);
    1015            0 :    return tv.tv_sec*1.0 + tv.tv_usec/1000000.0;
    1016              : }
    1017              : 
    1018              : #if 1
    1019            0 : void TMFE::Sleep(double time_sec)
    1020              : {
    1021            0 :    if (time_sec < 0) {
    1022            0 :       TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with negative sleep time: %f", time_sec);
    1023            0 :       return;
    1024              :    }
    1025              :    
    1026            0 :    if (time_sec == 0) {
    1027            0 :       TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with zero sleep time");
    1028            0 :       return;
    1029              :    }
    1030              : 
    1031            0 :    if (time_sec > 1.01) {
    1032              :       // break long sleep into short sleeps
    1033              : 
    1034            0 :       double t0 = TMFE::GetTime();
    1035            0 :       double tend = t0 + time_sec;
    1036              : 
    1037              :       while (1) {
    1038            0 :          double now = TMFE::GetTime();
    1039            0 :          if (now >= tend) {
    1040              :             //printf("t0 %f, tend %f, now %f, done!\n", t0, tend, now);
    1041            0 :             return;
    1042              :          }
    1043              : 
    1044            0 :          double tsleep = tend - now;
    1045              : 
    1046              :          //printf("t0 %f, tend %f, now %f, tsleep %f!\n", t0, tend, now, tsleep);
    1047              : 
    1048            0 :          if (tsleep > 1.0)
    1049            0 :             tsleep = 1.0;
    1050              : 
    1051            0 :          TMFE::Sleep(tsleep);
    1052            0 :       }
    1053              : 
    1054              :       return;
    1055              :    }
    1056              :    
    1057              :    int status;
    1058              :    struct timeval timeout;
    1059              :       
    1060            0 :    timeout.tv_sec = time_sec;
    1061            0 :    timeout.tv_usec = (time_sec-timeout.tv_sec)*1000000.0;
    1062              : 
    1063              :    while (1) {
    1064            0 :       status = select(0, NULL, NULL, NULL, &timeout);
    1065              : #ifdef EINVAL
    1066            0 :       if (status < 0 && errno == EINVAL) {
    1067              :          // #warning HERE EINVAL!
    1068            0 :          TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with invalid sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
    1069            0 :          return;
    1070              :       }
    1071              : #endif
    1072              : #ifdef EINTR
    1073            0 :       if (status < 0 && errno == EINTR) {
    1074              :          // #warning HERE EINTR!
    1075              :          // NOTE1: on linux, "timeout" is modified by the kernel to subtract time already slept, we do not need to adjust it while handling EINTR.
    1076              :          // NOTE2: on macos and other BSD-based systems, "timeout" value is not changed, and for accurate sleeping we should modify here, to account for time already slept, but we do not.
    1077              :          // NOTE3: see "man select" on Linux and Macos.
    1078              :          // NOTE4: MIDAS no longer uses SIGALRM to run cm_watchdog() and MIDAS applications do nto use signals.
    1079              :          // NOTE4: so in theory, we do not have to worry about EINTR interrupting our sleep. K.O. Dec-2024
    1080              :          // TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() EINTR, sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
    1081            0 :          continue;
    1082              :       }
    1083              : #endif
    1084            0 :       break;
    1085              :    }
    1086              :       
    1087            0 :    if (status < 0) {
    1088            0 :       TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
    1089              :    }
    1090              : }
    1091              : #endif
    1092              : 
    1093              : #if 0
    1094              : void TMFE::Sleep(double time)
    1095              : {
    1096              :    struct timespec rqtp;
    1097              :    struct timespec rmtp;
    1098              :       
    1099              :    rqtp.tv_sec = time;
    1100              :    rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
    1101              : 
    1102              :    int status = nanosleep(&rqtp, &rmtp);
    1103              :    
    1104              :    //#ifdef EINTR
    1105              :    //if (status < 0 && errno == EINTR) {
    1106              :    //   return 0; // watchdog interrupt, try again
    1107              :    //}
    1108              :    //#endif
    1109              :       
    1110              :    if (status < 0) {
    1111              :       TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
    1112              :    }
    1113              : }
    1114              : #endif
    1115              : 
    1116              : #if 0
    1117              : void TMFE::Sleep(double time)
    1118              : {
    1119              :    struct timespec rqtp;
    1120              :    struct timespec rmtp;
    1121              :       
    1122              :    rqtp.tv_sec = time;
    1123              :    rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
    1124              : 
    1125              :    //int status = clock_nanosleep(CLOCK_REALTIME, 0, &rqtp, &rmtp);
    1126              :    int status = clock_nanosleep(CLOCK_MONOTONIC, 0, &rqtp, &rmtp);
    1127              :    
    1128              :    //#ifdef EINTR
    1129              :    //if (status < 0 && errno == EINTR) {
    1130              :    //   return 0; // watchdog interrupt, try again
    1131              :    //}
    1132              :    //#endif
    1133              :       
    1134              :    if (status < 0) {
    1135              :       TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
    1136              :    }
    1137              : }
    1138              : #endif
    1139              : 
    1140            0 : std::string TMFE::GetThreadId() ///< return identification of this thread
    1141              : {
    1142            0 :    return ss_tid_to_string(ss_gettid());
    1143              : }
    1144              : 
    1145            0 : static INT rpc_callback(INT index, void *prpc_param[])
    1146              : {
    1147            0 :    const char* cmd  = CSTRING(0);
    1148            0 :    const char* args = CSTRING(1);
    1149            0 :    char* return_buf = CSTRING(2);
    1150            0 :    int   return_max_length = CINT(3);
    1151              : 
    1152            0 :    if (TMFE::gfVerbose)
    1153            0 :       printf("TMFE::rpc_callback: index %d, max_length %d, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
    1154              : 
    1155            0 :    TMFE* mfe = TMFE::Instance();
    1156              : 
    1157              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
    1158            0 :    for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
    1159            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1160            0 :       if (!h)
    1161            0 :          continue;
    1162            0 :       std::string result = "";
    1163            0 :       TMFeResult r = h->HandleRpc(cmd, args, result);
    1164            0 :       if (result.length() > 0) {
    1165              :          //printf("Handler reply [%s]\n", C(r));
    1166            0 :          mstrlcpy(return_buf, result.c_str(), return_max_length);
    1167            0 :          return RPC_SUCCESS;
    1168              :       }
    1169            0 :    }
    1170              : 
    1171            0 :    return_buf[0] = 0;
    1172            0 :    return RPC_SUCCESS;
    1173              : }
    1174              : 
    1175            0 : static INT binary_rpc_callback(INT index, void *prpc_param[])
    1176              : {
    1177            0 :    const char* cmd  = CSTRING(0);
    1178            0 :    const char* args = CSTRING(1);
    1179            0 :    char* return_buf = CSTRING(2);
    1180            0 :    size_t return_max_length = CINT(3);
    1181              : 
    1182            0 :    if (TMFE::gfVerbose)
    1183            0 :       printf("TMFE::binary_rpc_callback: index %d, max_length %zu, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
    1184              : 
    1185            0 :    TMFE* mfe = TMFE::Instance();
    1186              : 
    1187              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
    1188            0 :    for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
    1189            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1190            0 :       if (!h)
    1191            0 :          continue;
    1192            0 :       std::vector<char> result;
    1193            0 :       TMFeResult r = h->HandleBinaryRpc(cmd, args, result);
    1194            0 :       if (result.size() > 0) {
    1195            0 :          if (result.size() > return_max_length) {
    1196            0 :             TMFE::Instance()->Msg(MERROR, "TMFE::binary_rpc_callback", "RPC handler returned too much data, %zu bytes truncated to %zu bytes", result.size(), return_max_length);
    1197            0 :             result.resize(return_max_length);
    1198              :          }
    1199              :          //printf("Handler reply [%s]\n", C(r));
    1200            0 :          assert(result.size() <= return_max_length);
    1201            0 :          memcpy(return_buf, result.data(), result.size());
    1202            0 :          CINT(3) = result.size();
    1203            0 :          return RPC_SUCCESS;
    1204              :       }
    1205            0 :    }
    1206              : 
    1207            0 :    CINT(3) = 0;
    1208            0 :    return_buf[0] = 0;
    1209            0 :    return RPC_SUCCESS;
    1210              : }
    1211              : 
    1212              : class TMFrontendRpcHelper: public TMFeRpcHandlerInterface
    1213              : {
    1214              : public:
    1215              :    TMFrontend* fFe = NULL;
    1216              : 
    1217              : public:
    1218            0 :    TMFrontendRpcHelper(TMFrontend* fe) // ctor
    1219            0 :    {
    1220            0 :       if (TMFE::gfVerbose)
    1221            0 :          printf("TMFrontendRpcHelper::ctor!\n");
    1222              : 
    1223            0 :       fFe = fe;
    1224            0 :    }
    1225              : 
    1226            0 :    virtual ~TMFrontendRpcHelper() // dtor
    1227            0 :    {
    1228            0 :       if (TMFE::gfVerbose)
    1229            0 :          printf("TMFrontendRpcHelper::dtor!\n");
    1230              : 
    1231              :       // poison pointers
    1232            0 :       fFe = NULL;
    1233            0 :    }
    1234              : 
    1235            0 :    TMFeResult HandleBeginRun(int run_number)
    1236              :    {
    1237            0 :       if (TMFE::gfVerbose)
    1238            0 :          printf("TMFrontendRpcHelper::HandleBeginRun!\n");
    1239              : 
    1240            0 :       for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
    1241            0 :          TMFeEquipment* eq = fFe->fFeEquipments[i];
    1242            0 :          if (!eq)
    1243            0 :             continue;
    1244            0 :          if (!eq->fEqConfEnabled)
    1245            0 :             continue;
    1246            0 :          eq->EqZeroStatistics();
    1247            0 :          eq->EqWriteStatistics();
    1248              :       }
    1249            0 :       return TMFeOk();
    1250              :    }
    1251              : 
    1252            0 :    TMFeResult HandleEndRun(int run_number)
    1253              :    {
    1254            0 :       if (TMFE::gfVerbose)
    1255            0 :          printf("TMFrontendRpcHelper::HandleEndRun!\n");
    1256              : 
    1257            0 :       for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
    1258            0 :          TMFeEquipment* eq = fFe->fFeEquipments[i];
    1259            0 :          if (!eq)
    1260            0 :             continue;
    1261            0 :          if (!eq->fEqConfEnabled)
    1262            0 :             continue;
    1263            0 :          eq->EqWriteStatistics();
    1264              :       }
    1265              : 
    1266            0 :       TMFeResult r = fFe->fMfe->EventBufferFlushCacheAll();
    1267              : 
    1268            0 :       if (r.error_flag)
    1269            0 :          return r;
    1270              : 
    1271            0 :       return TMFeOk();
    1272            0 :    }
    1273              : };
    1274              : 
    1275            0 : static INT tr_start(INT run_number, char *errstr)
    1276              : {
    1277            0 :    if (TMFE::gfVerbose)
    1278            0 :       printf("TMFE::tr_start!\n");
    1279              : 
    1280            0 :    TMFE* mfe = TMFE::Instance();
    1281              : 
    1282            0 :    mfe->fRunNumber = run_number;
    1283            0 :    mfe->fStateRunning = true;
    1284              :    
    1285            0 :    TMFeResult result;
    1286              : 
    1287              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleBeginRun() modifies fEquipments. K.O.
    1288            0 :    for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
    1289            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1290            0 :       if (!h)
    1291            0 :          continue;
    1292            0 :       result = h->HandleBeginRun(run_number);
    1293            0 :       if (result.error_flag) {
    1294              :          // error handling in this function matches general transition error handling:
    1295              :          // on run start, the first user handler to return an error code
    1296              :          // will abort the transition. This leaves everything in an
    1297              :          // inconsistent state: frontends called before the abort
    1298              :          // think the run is running, which it does not. They should register
    1299              :          // a handler for the "start abort" transition. This transition calls
    1300              :          // all already started frontends so they can cleanup their state. K.O.
    1301              :          // 
    1302            0 :          break;
    1303              :       }
    1304              :    }
    1305              : 
    1306            0 :    if (result.error_flag) {
    1307            0 :       mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
    1308            0 :       return FE_ERR_DRIVER;
    1309              :    }
    1310              : 
    1311            0 :    return SUCCESS;
    1312            0 : }
    1313              : 
    1314            0 : static INT tr_stop(INT run_number, char *errstr)
    1315              : {
    1316            0 :    if (TMFE::gfVerbose)
    1317            0 :       printf("TMFE::tr_stop!\n");
    1318              : 
    1319            0 :    TMFeResult result;
    1320              : 
    1321            0 :    TMFE* mfe = TMFE::Instance();
    1322              : 
    1323              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleEndRun() modifies fEquipments. K.O.
    1324              :    // NOTE: we need to stop thing in reverse order, otherwise TMFrontend code
    1325              :    // does not work right - TMFrontend is registered first, and (correctly) runs
    1326              :    // first at begin of run (to clear statistics, etc). But at the end of run
    1327              :    // it needs to run last, to update the statistics, etc. after all the equipments
    1328              :    // have done their end of run things and are finished. K.O.
    1329            0 :    for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
    1330            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1331            0 :       if (!h)
    1332            0 :          continue;
    1333            0 :       TMFeResult xresult = h->HandleEndRun(run_number);
    1334            0 :       if (xresult.error_flag) {
    1335              :          // error handling in this function matches general transition error handling:
    1336              :          // the "run stop" transition is always sucessful, the run always stops.
    1337              :          // if some frontend returns an error, this error is remembered and is returned
    1338              :          // as the transition over all status. K.O.
    1339            0 :          result = xresult;
    1340              :       }
    1341            0 :    }
    1342              : 
    1343            0 :    mfe->fStateRunning = false;
    1344              : 
    1345            0 :    if (result.error_flag) {
    1346            0 :       mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
    1347            0 :       return FE_ERR_DRIVER;
    1348              :    }
    1349              : 
    1350            0 :    return SUCCESS;
    1351            0 : }
    1352              : 
    1353            0 : static INT tr_pause(INT run_number, char *errstr)
    1354              : {
    1355            0 :    cm_msg(MINFO, "tr_pause", "tr_pause");
    1356              : 
    1357            0 :    TMFeResult result;
    1358              : 
    1359            0 :    TMFE* mfe = TMFE::Instance();
    1360              : 
    1361              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandlePauseRun() modifies fEquipments. K.O.
    1362              :    // NOTE: tr_pause runs in reverse order to match tr_stop. K.O.
    1363            0 :    for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
    1364            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1365            0 :       if (!h)
    1366            0 :          continue;
    1367            0 :       result = h->HandlePauseRun(run_number);
    1368            0 :       if (result.error_flag) {
    1369              :          // error handling in this function matches general transition error handling:
    1370              :          // logic is same as "start run"
    1371            0 :          break;
    1372              :       }
    1373              :    }
    1374              : 
    1375            0 :    if (result.error_flag) {
    1376            0 :       mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
    1377            0 :       return FE_ERR_DRIVER;
    1378              :    }
    1379              : 
    1380            0 :    return SUCCESS;
    1381            0 : }
    1382              : 
    1383            0 : static INT tr_resume(INT run_number, char *errstr)
    1384              : {
    1385            0 :    if (TMFE::gfVerbose)
    1386            0 :       printf("TMFE::tr_resume!\n");
    1387              : 
    1388            0 :    TMFeResult result;
    1389              : 
    1390            0 :    TMFE* mfe = TMFE::Instance();
    1391              : 
    1392            0 :    mfe->fRunNumber = run_number;
    1393            0 :    mfe->fStateRunning = true;
    1394              : 
    1395              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleResumeRun() modifies fEquipments. K.O.
    1396            0 :    for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
    1397            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1398            0 :       if (!h)
    1399            0 :          continue;
    1400            0 :       result = h->HandleResumeRun(run_number);
    1401            0 :       if (result.error_flag) {
    1402              :          // error handling in this function matches general transition error handling:
    1403              :          // logic is same as "start run"
    1404            0 :          break;
    1405              :       }
    1406              :    }
    1407              : 
    1408            0 :    if (result.error_flag) {
    1409            0 :       mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
    1410            0 :       return FE_ERR_DRIVER;
    1411              :    }
    1412              : 
    1413            0 :    return SUCCESS;
    1414            0 : }
    1415              : 
    1416            0 : static INT tr_startabort(INT run_number, char *errstr)
    1417              : {
    1418            0 :    if (TMFE::gfVerbose)
    1419            0 :       printf("TMFE::tr_startabort!\n");
    1420              : 
    1421            0 :    TMFeResult result;
    1422              : 
    1423            0 :    TMFE* mfe = TMFE::Instance();
    1424              : 
    1425              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleStartAbortRun() modifies fEquipments. K.O.
    1426            0 :    for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
    1427            0 :       TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
    1428            0 :       if (!h)
    1429            0 :          continue;
    1430            0 :       result = h->HandleStartAbortRun(run_number);
    1431            0 :       if (result.error_flag) {
    1432              :          // error handling in this function matches general transition error handling:
    1433              :          // logic is same as "start run"
    1434            0 :          break;
    1435              :       }
    1436              :    }
    1437              : 
    1438            0 :    mfe->fStateRunning = false;
    1439              : 
    1440            0 :    if (result.error_flag) {
    1441            0 :       mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
    1442            0 :       return FE_ERR_DRIVER;
    1443              :    }
    1444              : 
    1445            0 :    return SUCCESS;
    1446            0 : }
    1447              : 
    1448            0 : void TMFE::SetTransitionSequenceStart(int seqno)
    1449              : {
    1450            0 :    cm_set_transition_sequence(TR_START, seqno);
    1451            0 : }
    1452              : 
    1453            0 : void TMFE::SetTransitionSequenceStop(int seqno)
    1454              : {
    1455            0 :    cm_set_transition_sequence(TR_STOP, seqno);
    1456            0 : }
    1457              : 
    1458            0 : void TMFE::SetTransitionSequencePause(int seqno)
    1459              : {
    1460            0 :    cm_set_transition_sequence(TR_PAUSE, seqno);
    1461            0 : }
    1462              : 
    1463            0 : void TMFE::SetTransitionSequenceResume(int seqno)
    1464              : {
    1465            0 :    cm_set_transition_sequence(TR_RESUME, seqno);
    1466            0 : }
    1467              : 
    1468            0 : void TMFE::SetTransitionSequenceStartAbort(int seqno)
    1469              : {
    1470            0 :    cm_set_transition_sequence(TR_STARTABORT, seqno);
    1471            0 : }
    1472              : 
    1473            0 : void TMFE::DeregisterTransitions()
    1474              : {
    1475            0 :    cm_deregister_transition(TR_START);
    1476            0 :    cm_deregister_transition(TR_STOP);
    1477            0 :    cm_deregister_transition(TR_PAUSE);
    1478            0 :    cm_deregister_transition(TR_RESUME);
    1479            0 :    cm_deregister_transition(TR_STARTABORT);
    1480            0 : }
    1481              : 
    1482            0 : void TMFE::DeregisterTransitionStart()
    1483              : {
    1484            0 :    cm_deregister_transition(TR_START);
    1485            0 : }
    1486              : 
    1487            0 : void TMFE::DeregisterTransitionStop()
    1488              : {
    1489            0 :    cm_deregister_transition(TR_STOP);
    1490            0 : }
    1491              : 
    1492            0 : void TMFE::DeregisterTransitionPause()
    1493              : {
    1494            0 :    cm_deregister_transition(TR_PAUSE);
    1495            0 : }
    1496              : 
    1497            0 : void TMFE::DeregisterTransitionResume()
    1498              : {
    1499            0 :    cm_deregister_transition(TR_RESUME);
    1500            0 : }
    1501              : 
    1502            0 : void TMFE::DeregisterTransitionStartAbort()
    1503              : {
    1504            0 :    cm_deregister_transition(TR_STARTABORT);
    1505            0 : }
    1506              : 
    1507            0 : void TMFE::RegisterTransitionStartAbort()
    1508              : {
    1509            0 :    cm_register_transition(TR_STARTABORT, tr_startabort, 500);
    1510            0 : }
    1511              : 
    1512            1 : void TMFE::RegisterRPCs()
    1513              : {
    1514            1 :    if (TMFE::gfVerbose)
    1515            0 :       printf("TMFE::RegisterRPCs!\n");
    1516              :    
    1517            1 :    cm_register_function(RPC_JRPC, rpc_callback);
    1518            1 :    cm_register_function(RPC_BRPC, binary_rpc_callback);
    1519            1 :    cm_register_transition(TR_START, tr_start, 500);
    1520            1 :    cm_register_transition(TR_STOP, tr_stop, 500);
    1521            1 :    cm_register_transition(TR_PAUSE, tr_pause, 500);
    1522            1 :    cm_register_transition(TR_RESUME, tr_resume, 500);
    1523            1 :    cm_register_transition(TR_STARTABORT, tr_startabort, 500);
    1524            1 : }
    1525              : 
    1526            0 : void TMFE::AddRpcHandler(TMFeRpcHandlerInterface* h)
    1527              : {
    1528            0 :    fRpcHandlers.push_back(h);
    1529            0 : }
    1530              : 
    1531            0 : void TMFE::RemoveRpcHandler(TMFeRpcHandlerInterface* h)
    1532              : {
    1533            0 :    for (unsigned i=0; i<fRpcHandlers.size(); i++) {
    1534            0 :       if (fRpcHandlers[i] == h) {
    1535            0 :          fRpcHandlers[i] = NULL;
    1536              :       }
    1537              :    }
    1538            0 : }
    1539              : 
    1540            0 : TMFrontend::TMFrontend() // ctor
    1541              : {
    1542            0 :    fMfe = TMFE::Instance();
    1543            0 : }
    1544              : 
    1545            0 : TMFrontend::~TMFrontend() // dtor
    1546              : {
    1547            0 :    if (fFeRpcHelper) {
    1548            0 :       fMfe->RemoveRpcHandler(fFeRpcHelper);
    1549            0 :       delete fFeRpcHelper;
    1550            0 :       fFeRpcHelper = NULL;
    1551              :    }
    1552              :    // poison all pointers
    1553            0 :    fMfe = NULL;
    1554            0 : }
    1555              : 
    1556            0 : TMFeResult TMFrontend::FeInitEquipments(const std::vector<std::string>& args)
    1557              : {
    1558              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleInit() modifies fEquipments. K.O.
    1559            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    1560            0 :       if (!fFeEquipments[i])
    1561            0 :          continue;
    1562            0 :       if (!fFeEquipments[i]->fEqConfEnabled)
    1563            0 :          continue;
    1564            0 :       TMFeResult r = fFeEquipments[i]->EqInit(args);
    1565            0 :       if (r.error_flag)
    1566            0 :          return r;
    1567            0 :    }
    1568            0 :    return TMFeOk();
    1569              : }
    1570              : 
    1571            0 : void TMFrontend::FeStopEquipmentPollThreads()
    1572              : {
    1573              :    // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
    1574            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    1575            0 :       if (!fFeEquipments[i])
    1576            0 :          continue;
    1577            0 :       fFeEquipments[i]->EqStopPollThread();
    1578              :    }
    1579            0 : }
    1580              : 
    1581            0 : void TMFrontend::FeDeleteEquipments()
    1582              : {
    1583              :    // NOTE: this is thread-safe: we do not modify the fEquipments object. K.O.
    1584              :    // NOTE: this is not thread-safe, we will race against ourselves and do multiple delete of fEquipents[i]. K.O.
    1585              : 
    1586              :    // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
    1587            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    1588            0 :       if (!fFeEquipments[i])
    1589            0 :          continue;
    1590              :       //printf("delete equipment [%s]\n", fFeEquipments[i]->fEqName.c_str());
    1591            0 :       fMfe->RemoveRpcHandler(fFeEquipments[i]);
    1592            0 :       delete fFeEquipments[i];
    1593            0 :       fFeEquipments[i] = NULL;
    1594              :    }
    1595              : 
    1596            0 :    fMfe->EventBufferFlushCacheAll();
    1597            0 :    fMfe->EventBufferCloseAll();
    1598            0 : }
    1599              : 
    1600            0 : TMFeResult TMFrontend::FeAddEquipment(TMFeEquipment* eq)
    1601              : {
    1602              :    // NOTE: not thread-safe, we modify the fEquipments object. K.O.
    1603              :    
    1604              :    // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
    1605            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    1606            0 :       if (!fFeEquipments[i])
    1607            0 :          continue;
    1608            0 :       if (fFeEquipments[i] == eq) {
    1609            0 :          fprintf(stderr, "TMFE::AddEquipment: Fatal error: Equipment \"%s\" is already registered, bye...\n", fFeEquipments[i]->fEqName.c_str());
    1610            0 :          fMfe->Disconnect();
    1611            0 :          exit(1);
    1612              :          //return TMFeErrorMessage(msprintf("TMFE::AddEquipment: Equipment \"%s\" is already registered", fFeEquipments[i]->fEqName.c_str()));
    1613              :       }
    1614            0 :       if (fFeEquipments[i]->fEqName == eq->fEqName) {
    1615            0 :          fprintf(stderr, "TMFE::AddEquipment: Fatal error: Duplicate equipment name \"%s\", bye...\n", eq->fEqName.c_str());
    1616            0 :          fMfe->Disconnect();
    1617            0 :          exit(1);
    1618              :          //return TMFeErrorMessage(std::string("TMFE::AddEquipment: Duplicate equipment name \"") + eq->fEqName + "\"");
    1619              :       }
    1620              :    }
    1621              : 
    1622            0 :    eq->fFe = this;
    1623              : 
    1624              :    // NOTE: fEquipments must be protected again multithreaded access here. K.O.
    1625            0 :    fFeEquipments.push_back(eq);
    1626              : 
    1627            0 :    return TMFeOk();
    1628              : }
    1629              : 
    1630            0 : TMFeResult TMFrontend::FeRemoveEquipment(TMFeEquipment* eq)
    1631              : {
    1632              :    // NOTE: this is thread-safe, we do not modify the fEquipments object. K.O.
    1633              :    
    1634              :    // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
    1635            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    1636            0 :       if (!fFeEquipments[i])
    1637            0 :          continue;
    1638            0 :       if (fFeEquipments[i] == eq) {
    1639            0 :          fFeEquipments[i] = NULL;
    1640            0 :          return TMFeOk();
    1641              :       }
    1642              :    }
    1643              : 
    1644            0 :    return TMFeErrorMessage(msprintf("TMFE::RemoveEquipment: Cannot find equipment \"%s\"", eq->fEqName.c_str()));
    1645              : }
    1646              : 
    1647            0 : void TMFrontend::FeSetName(const char* program_name)
    1648              : {
    1649            0 :    assert(program_name != NULL);
    1650            0 :    fMfe->fProgramName = program_name;
    1651            0 : }
    1652              : 
    1653            0 : TMFeEquipment::TMFeEquipment(const char* eqname, const char* eqfilename) // ctor
    1654              : {
    1655            0 :    assert(eqname != NULL);
    1656            0 :    assert(eqfilename != NULL);
    1657              :    
    1658            0 :    if (TMFE::gfVerbose)
    1659            0 :       printf("TMFeEquipment::ctor: equipment name [%s] file [%s]\n", eqname, eqfilename);
    1660              : 
    1661            0 :    fMfe = TMFE::Instance();
    1662            0 :    fEqName = eqname;
    1663            0 :    fEqFilename = eqfilename;
    1664            0 :    fEqStatNextWrite = TMFE::GetTime();
    1665            0 : }
    1666              : 
    1667            0 : TMFeEquipment::~TMFeEquipment() // dtor
    1668              : {
    1669            0 :    if (TMFE::gfVerbose)
    1670            0 :       printf("TMFeEquipment::dtor: equipment name [%s]\n", fEqName.c_str());
    1671              : 
    1672            0 :    EqStopPollThread();
    1673              : 
    1674              :    // free data and poison pointers
    1675            0 :    if (fOdbEq) {
    1676            0 :       delete fOdbEq;
    1677            0 :       fOdbEq = NULL;
    1678              :    }
    1679            0 :    if (fOdbEqCommon) {
    1680            0 :       delete fOdbEqCommon;
    1681            0 :       fOdbEqCommon = NULL;
    1682              :    }
    1683            0 :    if (fOdbEqSettings) {
    1684            0 :       delete fOdbEqSettings;
    1685            0 :       fOdbEqSettings = NULL;
    1686              :    }
    1687            0 :    if (fOdbEqVariables) {
    1688            0 :       delete fOdbEqVariables;
    1689            0 :       fOdbEqVariables = NULL;
    1690              :    }
    1691            0 :    if (fOdbEqStatistics) {
    1692            0 :       delete fOdbEqStatistics;
    1693            0 :       fOdbEqStatistics = NULL;
    1694              :    }
    1695            0 :    fMfe = NULL;
    1696            0 :    fFe  = NULL;
    1697            0 :    fEqEventBuffer = NULL;
    1698            0 : }
    1699              : 
    1700            0 : TMFeResult TMFeEquipment::EqInit(const std::vector<std::string>& args)
    1701              : {
    1702            0 :    TMFeResult r;
    1703              : 
    1704            0 :    r = EqPreInit();
    1705            0 :    if (r.error_flag)
    1706            0 :       return r;
    1707              : 
    1708            0 :    r = HandleInit(args);
    1709            0 :    if (r.error_flag)
    1710            0 :       return r;
    1711              : 
    1712            0 :    r = EqPostInit();
    1713            0 :    if (r.error_flag)
    1714            0 :       return r;
    1715              : 
    1716            0 :    return TMFeOk();
    1717            0 : }
    1718              : 
    1719            0 : TMFeResult TMFeEquipment::EqReadCommon()
    1720              : {
    1721            0 :    if (TMFE::gfVerbose)
    1722            0 :       printf("TMFeEquipment::EqReadCommon: for [%s]\n", fEqName.c_str());
    1723              : 
    1724              :    // list of ODB Common entries always read
    1725              : 
    1726            0 :    fOdbEqCommon->RB("Enabled", &fEqConfEnabled, true);
    1727            0 :    fOdbEqCommon->RD("Event limit", &fEqConfEventLimit, true);
    1728              : 
    1729            0 :    if (fEqConfReadConfigFromOdb) {
    1730              :       // list of ODB Common entries read if we want to control equipment from ODB
    1731              : 
    1732            0 :       fOdbEqCommon->RU16("Event ID",       &fEqConfEventID,        true);
    1733            0 :       fOdbEqCommon->RU16("Trigger mask",   &fEqConfTriggerMask,    true);
    1734            0 :       fOdbEqCommon->RS("Buffer",           &fEqConfBuffer,         true, NAME_LENGTH);
    1735            0 :       fOdbEqCommon->RI("Type",             &fEqConfType,           true);
    1736            0 :       fOdbEqCommon->RI("Source",           &fEqConfSource,         true);
    1737            0 :       fOdbEqCommon->RS("Format",           &fEqConfFormat,         true, 8);
    1738            0 :       fOdbEqCommon->RI("Read on",          &fEqConfReadOn,         true);
    1739            0 :       fOdbEqCommon->RI("Period",           &fEqConfPeriodMilliSec, true);
    1740            0 :       fOdbEqCommon->RU32("Num subevents",  &fEqConfNumSubEvents,   true);
    1741            0 :       fOdbEqCommon->RI("Log history",      &fEqConfLogHistory,     true);
    1742            0 :       fOdbEqCommon->RB("Hidden",           &fEqConfHidden,         true);
    1743            0 :       fOdbEqCommon->RI("Write cache size", &fEqConfWriteCacheSize, true);
    1744              : 
    1745              :       // decode data from ODB Common
    1746              : 
    1747            0 :       fEqConfReadOnlyWhenRunning = !(fEqConfReadOn & (RO_PAUSED|RO_STOPPED));
    1748            0 :       fEqConfWriteEventsToOdb    = (fEqConfReadOn & RO_ODB);
    1749              :    }
    1750              : 
    1751              :    // list of ODB Common entries we read and write back to ODB, but do not actually use.
    1752              : 
    1753              :    //fOdbEqCommon->RS("Frontend host",       &fEqConfFrontendHost,     true, NAME_LENGTH);
    1754              :    //fOdbEqCommon->RS("Frontend name",       &fEqConfFrontendName,     true, NAME_LENGTH);
    1755              :    //fOdbEqCommon->RS("Frontend file name",  &fEqConfFrontendFileName, true, 256);
    1756              :    //fOdbEqCommon->RS("Status",              &fEqConfStatus,           true, 256);
    1757              :    //fOdbEqCommon->RS("Status color",        &fEqConfStatusColor,      true, NAME_LENGTH);
    1758              : 
    1759            0 :    return TMFeOk();
    1760              : }
    1761              : 
    1762            0 : TMFeResult TMFeEquipment::EqWriteCommon(bool create)
    1763              : {
    1764            0 :    if (TMFE::gfVerbose)
    1765            0 :       printf("TMFeEquipment::EqWriteCommon: for [%s]\n", fEqName.c_str());
    1766              : 
    1767              :    // encode data for ODB Common
    1768              : 
    1769            0 :    fEqConfReadOn = 0;
    1770            0 :    if (fEqConfReadOnlyWhenRunning)
    1771            0 :       fEqConfReadOn |= (RO_RUNNING);
    1772              :    else
    1773            0 :       fEqConfReadOn |= (RO_RUNNING|RO_PAUSED|RO_STOPPED);
    1774            0 :    if (fEqConfWriteEventsToOdb)
    1775            0 :       fEqConfReadOn |= RO_ODB;
    1776              : 
    1777              :    // write to ODB
    1778              :    
    1779            0 :    fOdbEqCommon->WU16("Event ID",          fEqConfEventID);
    1780            0 :    fOdbEqCommon->WU16("Trigger mask",      fEqConfTriggerMask);
    1781            0 :    fOdbEqCommon->WS("Buffer",              fEqConfBuffer.c_str(), NAME_LENGTH);
    1782            0 :    fOdbEqCommon->WI("Type",                fEqConfType);
    1783            0 :    fOdbEqCommon->WI("Source",              fEqConfSource);
    1784            0 :    fOdbEqCommon->WS("Format",              fEqConfFormat.c_str(), 8);
    1785            0 :    fOdbEqCommon->WB("Enabled",             fEqConfEnabled);
    1786            0 :    fOdbEqCommon->WI("Read on",             fEqConfReadOn);
    1787            0 :    fOdbEqCommon->WI("Period",              fEqConfPeriodMilliSec);
    1788            0 :    fOdbEqCommon->WD("Event limit",         fEqConfEventLimit);
    1789            0 :    fOdbEqCommon->WU32("Num subevents",     fEqConfNumSubEvents);
    1790            0 :    fOdbEqCommon->WI("Log history",         fEqConfLogHistory);
    1791            0 :    fOdbEqCommon->WS("Frontend host",       fMfe->fHostname.c_str(), NAME_LENGTH);
    1792            0 :    fOdbEqCommon->WS("Frontend name",       fMfe->fProgramName.c_str(), NAME_LENGTH);
    1793            0 :    fOdbEqCommon->WS("Frontend file name",  fEqFilename.c_str(), 256);
    1794            0 :    if (create) {
    1795            0 :       fOdbEqCommon->WS("Status",           "", 256);
    1796            0 :       fOdbEqCommon->WS("Status color",     "", NAME_LENGTH);
    1797              :    }
    1798            0 :    fOdbEqCommon->WB("Hidden",              fEqConfHidden);
    1799            0 :    fOdbEqCommon->WI("Write cache size",    fEqConfWriteCacheSize);
    1800            0 :    return TMFeOk();
    1801              : }
    1802              : 
    1803            0 : TMFeResult TMFeEquipment::EqPreInit()
    1804              : {
    1805            0 :    if (TMFE::gfVerbose)
    1806            0 :       printf("TMFeEquipment::PreInit: for [%s]\n", fEqName.c_str());
    1807              : 
    1808              :    //
    1809              :    // Apply frontend index
    1810              :    //
    1811              : 
    1812            0 :    if (fEqName.find("%") != std::string::npos) {
    1813            0 :       fEqName = msprintf(fEqName.c_str(), fFe->fFeIndex);
    1814              :    }
    1815              : 
    1816            0 :    if (fEqConfBuffer.find("%") != std::string::npos) {
    1817            0 :       fEqConfBuffer = msprintf(fEqConfBuffer.c_str(), fFe->fFeIndex);
    1818              :    }
    1819              : 
    1820              :    //
    1821              :    // create ODB /eq/name/common
    1822              :    //
    1823              : 
    1824            0 :    fOdbEq = fMfe->fOdbRoot->Chdir((std::string("Equipment/") + fEqName).c_str(), true);
    1825            0 :    fOdbEqCommon     = fOdbEq->Chdir("Common", false);
    1826            0 :    if (!fOdbEqCommon) {
    1827            0 :       if (TMFE::gfVerbose)
    1828            0 :          printf("TMFeEquipment::PreInit: creating ODB common\n");
    1829            0 :       fOdbEqCommon  = fOdbEq->Chdir("Common", true);
    1830            0 :       EqWriteCommon(true);
    1831              :    }
    1832            0 :    fOdbEqSettings   = fOdbEq->Chdir("Settings", true);
    1833            0 :    fOdbEqVariables  = fOdbEq->Chdir("Variables", true);
    1834            0 :    fOdbEqStatistics = fOdbEq->Chdir("Statistics", true);
    1835              : 
    1836            0 :    TMFeResult r = EqReadCommon();
    1837              : 
    1838            0 :    if (r.error_flag)
    1839            0 :       return r;
    1840              : 
    1841            0 :    if (rpc_is_remote()) {
    1842            0 :       EqSetStatus((fMfe->fProgramName + "@" + fMfe->fHostname).c_str(), "greenLight");
    1843              :    } else {
    1844            0 :       EqSetStatus(fMfe->fProgramName.c_str(), "greenLight");
    1845              :    }
    1846              : 
    1847            0 :    EqZeroStatistics();
    1848            0 :    EqWriteStatistics();
    1849              : 
    1850            0 :    return TMFeOk();
    1851            0 : }
    1852              : 
    1853            0 : TMFeResult TMFeEquipment::EqPostInit()
    1854              : {
    1855            0 :    if (TMFE::gfVerbose)
    1856            0 :       printf("TMFeEquipment::EqPostInit: for [%s]\n", fEqName.c_str());
    1857              : 
    1858            0 :    if (!fEqConfEnabled) {
    1859            0 :       EqSetStatus("Disabled", "yellowLight");
    1860              :    }
    1861              : 
    1862              :    // open event buffer
    1863              :    
    1864            0 :    uint32_t odb_max_event_size = DEFAULT_MAX_EVENT_SIZE;
    1865            0 :    fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &odb_max_event_size, true);
    1866              : 
    1867            0 :    if (fEqConfMaxEventSize == 0) {
    1868            0 :       fEqConfMaxEventSize = odb_max_event_size;
    1869            0 :    } else if (fEqConfMaxEventSize > odb_max_event_size) {
    1870            0 :       fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than ODB MAX_EVENT_SIZE %d", fEqName.c_str(), (int)fEqConfMaxEventSize, odb_max_event_size);
    1871            0 :       fEqConfMaxEventSize = odb_max_event_size;
    1872              :    }
    1873              : 
    1874            0 :    if (!fEqConfBuffer.empty()) {
    1875            0 :       TMFeResult r = fMfe->EventBufferOpen(&fEqEventBuffer, fEqConfBuffer.c_str(), fEqConfBufferSize);
    1876              : 
    1877            0 :       if (r.error_flag)
    1878            0 :          return r;
    1879              : 
    1880            0 :       assert(fEqEventBuffer != NULL);
    1881              : 
    1882            0 :       if (fEqConfMaxEventSize > fEqEventBuffer->fBufMaxEventSize) {
    1883            0 :          fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than event buffer \"%s\" max event size %d", fEqName.c_str(), (int)fEqConfMaxEventSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufMaxEventSize);
    1884            0 :          fEqConfMaxEventSize = fEqEventBuffer->fBufMaxEventSize;
    1885              :       }
    1886              : 
    1887            0 :       if (fEqConfWriteCacheSize > 0) {
    1888            0 :          if (fEqEventBuffer->fBufWriteCacheSize == 0) {
    1889            0 :             r = fEqEventBuffer->SetCacheSize(0, fEqConfWriteCacheSize);
    1890              : 
    1891            0 :             if (r.error_flag)
    1892            0 :                return r;
    1893            0 :          } else if (fEqConfWriteCacheSize < (int)fEqEventBuffer->fBufWriteCacheSize) {
    1894            0 :             fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is smaller then already set write cache size %d, ignoring it", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
    1895            0 :          } else if (fEqConfWriteCacheSize == (int)fEqEventBuffer->fBufWriteCacheSize) {
    1896              :             // do nothing
    1897              :          } else {
    1898            0 :             fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is different from already set write cache size %d", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
    1899              :          
    1900            0 :             r = fEqEventBuffer->SetCacheSize(0, fEqConfWriteCacheSize);
    1901              :             
    1902            0 :             if (r.error_flag)
    1903            0 :                return r;
    1904              :          }
    1905              :       }
    1906            0 :    }
    1907              : 
    1908            0 :    if (TMFE::gfVerbose)
    1909            0 :       printf("TMFeEquipment::EqPostInit: Equipment \"%s\", max event size: %d\n", fEqName.c_str(), (int)fEqConfMaxEventSize);
    1910              : 
    1911              :    // update ODB common
    1912              : 
    1913            0 :    TMFeResult r = EqWriteCommon();
    1914              : 
    1915            0 :    if (r.error_flag)
    1916            0 :       return r;
    1917              : 
    1918            0 :    if (fEqConfEnabled && fEqConfEnableRpc) {
    1919            0 :       fMfe->AddRpcHandler(this);
    1920              :    }
    1921              : 
    1922            0 :    return TMFeOk();
    1923            0 : };
    1924              : 
    1925            0 : TMFeResult TMFeEquipment::EqZeroStatistics()
    1926              : {
    1927            0 :    fEqMutex.lock();
    1928              : 
    1929            0 :    if (TMFE::gfVerbose)
    1930            0 :       printf("TMFeEquipment::EqZeroStatistics: zero statistics for [%s]\n", fEqName.c_str());
    1931              : 
    1932            0 :    double now = TMFE::GetTime();
    1933              :    
    1934            0 :    fEqStatEvents = 0;
    1935            0 :    fEqStatBytes = 0;
    1936            0 :    fEqStatEpS = 0;
    1937            0 :    fEqStatKBpS = 0;
    1938              :    
    1939            0 :    fEqStatLastTime = now;
    1940            0 :    fEqStatLastEvents = 0;
    1941            0 :    fEqStatLastBytes = 0;
    1942              : 
    1943            0 :    fEqStatNextWrite = now; // force immediate update
    1944              : 
    1945            0 :    fEqMutex.unlock();
    1946              : 
    1947            0 :    return TMFeOk();
    1948              : }
    1949              : 
    1950            0 : TMFeResult TMFeEquipment::EqWriteStatistics()
    1951              : {
    1952            0 :    fEqMutex.lock();
    1953              : 
    1954            0 :    if (TMFE::gfVerbose)
    1955            0 :       printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s]\n", fEqName.c_str());
    1956              : 
    1957            0 :    double now = TMFE::GetTime();
    1958            0 :    double elapsed = now - fEqStatLastTime;
    1959              : 
    1960            0 :    if (elapsed > 0.9 || fEqStatLastTime == 0) {
    1961            0 :       fEqStatEpS = (fEqStatEvents - fEqStatLastEvents) / elapsed;
    1962            0 :       fEqStatKBpS = (fEqStatBytes - fEqStatLastBytes) / elapsed / 1000.0;
    1963              : 
    1964            0 :       fEqStatLastTime = now;
    1965            0 :       fEqStatLastEvents = fEqStatEvents;
    1966            0 :       fEqStatLastBytes = fEqStatBytes;
    1967              :    }
    1968              : 
    1969              :    //printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s], now %f, elapsed %f, sent %f, eps %f, kps %f\n", fEqName.c_str(), now, elapsed, fEqStatEvents, fEqStatEpS, fEqStatKBpS);
    1970              : 
    1971            0 :    fOdbEqStatistics->WD("Events sent", fEqStatEvents);
    1972            0 :    fOdbEqStatistics->WD("Events per sec.", fEqStatEpS);
    1973            0 :    fOdbEqStatistics->WD("kBytes per sec.", fEqStatKBpS);
    1974              : 
    1975            0 :    fEqStatLastWrite = now;
    1976              : 
    1977            0 :    if (fEqConfPeriodStatisticsSec > 0) {
    1978              :       // avoid creep of NextWrite: we start it at
    1979              :       // time of initialization, then increment it strictly
    1980              :       // by the period value, regardless of when it is actually
    1981              :       // written to ODB (actual period is longer than requested
    1982              :       // period because we only over-sleep, never under-sleep). K.O.
    1983            0 :       while (fEqStatNextWrite <= now) {
    1984            0 :          fEqStatNextWrite += fEqConfPeriodStatisticsSec;
    1985              :       }
    1986              :    } else {
    1987            0 :       fEqStatNextWrite = now;
    1988              :    }
    1989              : 
    1990            0 :    fEqMutex.unlock();
    1991              :    
    1992            0 :    return TMFeOk();
    1993              : }
    1994              : 
    1995            0 : TMFeResult TMFeEquipment::ComposeEvent(char* event, size_t size) const
    1996              : {
    1997            0 :    EVENT_HEADER* pevent = (EVENT_HEADER*)event;
    1998            0 :    pevent->event_id = fEqConfEventID;
    1999            0 :    pevent->trigger_mask = fEqConfTriggerMask;
    2000            0 :    pevent->serial_number = fEqSerial;
    2001            0 :    pevent->time_stamp = TMFE::GetTime();
    2002            0 :    pevent->data_size = 0;
    2003            0 :    return TMFeOk();
    2004              : }
    2005              : 
    2006            0 : TMFeResult TMFeEquipment::EqSendEvent(const char* event, bool write_to_odb)
    2007              : {
    2008            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
    2009              :    
    2010            0 :    fEqSerial++;
    2011              : 
    2012            0 :    EVENT_HEADER* pevent = (EVENT_HEADER*)event;
    2013            0 :    pevent->data_size = BkSize(event);
    2014              :    
    2015            0 :    if (fEqEventBuffer != NULL) {
    2016            0 :       TMFeResult r = fEqEventBuffer->SendEvent(event);
    2017              :   
    2018            0 :    if (r.error_flag)
    2019            0 :       return r;
    2020            0 :    }
    2021              : 
    2022            0 :    fEqStatEvents += 1;
    2023            0 :    fEqStatBytes  += sizeof(EVENT_HEADER) + pevent->data_size;
    2024              : 
    2025            0 :    if (fEqConfWriteEventsToOdb && write_to_odb) {
    2026            0 :       TMFeResult r = EqWriteEventToOdb_locked(event);
    2027            0 :       if (r.error_flag)
    2028            0 :          return r;
    2029            0 :    }
    2030              : 
    2031            0 :    if (fMfe->fStateRunning) {
    2032            0 :       if (fEqConfEventLimit > 0) {
    2033            0 :          if (fEqStatEvents >= fEqConfEventLimit) {
    2034            0 :             if (!fMfe->fRunStopRequested) {
    2035            0 :                fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
    2036              :             }
    2037            0 :             fMfe->fRunStopRequested = true;
    2038              :          }
    2039              :       }
    2040              :    }
    2041              : 
    2042            0 :    return TMFeOk();
    2043            0 : }
    2044              : 
    2045            0 : TMFeResult TMFeEquipment::EqSendEvent(const std::vector<char>& event, bool write_to_odb)
    2046              : {
    2047            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
    2048              :    
    2049            0 :    fEqSerial++;
    2050              : 
    2051            0 :    if (fEqEventBuffer == NULL) {
    2052            0 :       return TMFeOk();
    2053              :    }
    2054              : 
    2055            0 :    TMFeResult r = fEqEventBuffer->SendEvent(event);
    2056              : 
    2057            0 :    if (r.error_flag)
    2058            0 :       return r;
    2059              : 
    2060            0 :    fEqStatEvents += 1;
    2061            0 :    fEqStatBytes  += event.size();
    2062              : 
    2063            0 :    if (fEqConfWriteEventsToOdb && write_to_odb) {
    2064            0 :       TMFeResult r = EqWriteEventToOdb_locked(event.data());
    2065            0 :       if (r.error_flag)
    2066            0 :          return r;
    2067            0 :    }
    2068              : 
    2069            0 :    if (fMfe->fStateRunning) {
    2070            0 :       if (fEqConfEventLimit > 0) {
    2071            0 :          if (fEqStatEvents >= fEqConfEventLimit) {
    2072            0 :             if (!fMfe->fRunStopRequested) {
    2073            0 :                fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
    2074              :             }
    2075            0 :             fMfe->fRunStopRequested = true;
    2076              :          }
    2077              :       }
    2078              :    }
    2079              : 
    2080            0 :    return TMFeOk();
    2081            0 : }
    2082              : 
    2083            0 : TMFeResult TMFeEquipment::EqSendEvent(const std::vector<std::vector<char>>& event, bool write_to_odb)
    2084              : {
    2085            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
    2086              :    
    2087            0 :    fEqSerial++;
    2088              : 
    2089            0 :    if (fEqEventBuffer == NULL) {
    2090            0 :       return TMFeOk();
    2091              :    }
    2092              : 
    2093            0 :    TMFeResult r = fEqEventBuffer->SendEvent(event);
    2094              : 
    2095            0 :    if (r.error_flag)
    2096            0 :       return r;
    2097              : 
    2098            0 :    fEqStatEvents += 1;
    2099            0 :    for (auto v: event) {
    2100            0 :       fEqStatBytes += v.size();
    2101            0 :    }
    2102              : 
    2103              :    //if (fEqConfWriteEventsToOdb && write_to_odb) {
    2104              :    //   TMFeResult r = EqWriteEventToOdb_locked(event.data());
    2105              :    //   if (r.error_flag)
    2106              :    //      return r;
    2107              :    //}
    2108              : 
    2109            0 :    if (fMfe->fStateRunning) {
    2110            0 :       if (fEqConfEventLimit > 0) {
    2111            0 :          if (fEqStatEvents >= fEqConfEventLimit) {
    2112            0 :             if (!fMfe->fRunStopRequested) {
    2113            0 :                fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
    2114              :             }
    2115            0 :             fMfe->fRunStopRequested = true;
    2116              :          }
    2117              :       }
    2118              :    }
    2119              : 
    2120            0 :    return TMFeOk();
    2121            0 : }
    2122              : 
    2123            0 : TMFeResult TMFeEquipment::EqSendEvent(int sg_n, const char* sg_ptr[], const size_t sg_len[], bool write_to_odb)
    2124              : {
    2125            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
    2126              :    
    2127            0 :    fEqSerial++;
    2128              : 
    2129            0 :    if (fEqEventBuffer == NULL) {
    2130            0 :       return TMFeOk();
    2131              :    }
    2132              : 
    2133            0 :    TMFeResult r = fEqEventBuffer->SendEvent(sg_n, sg_ptr, sg_len);
    2134              : 
    2135            0 :    if (r.error_flag)
    2136            0 :       return r;
    2137              : 
    2138            0 :    fEqStatEvents += 1;
    2139            0 :    for (int i=0; i<sg_n; i++) {
    2140            0 :       fEqStatBytes += sg_len[i];
    2141              :    }
    2142              : 
    2143              :    //if (fEqConfWriteEventsToOdb && write_to_odb) {
    2144              :    //   TMFeResult r = EqWriteEventToOdb_locked(event.data());
    2145              :    //   if (r.error_flag)
    2146              :    //      return r;
    2147              :    //}
    2148              : 
    2149            0 :    if (fMfe->fStateRunning) {
    2150            0 :       if (fEqConfEventLimit > 0) {
    2151            0 :          if (fEqStatEvents >= fEqConfEventLimit) {
    2152            0 :             if (!fMfe->fRunStopRequested) {
    2153            0 :                fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
    2154              :             }
    2155            0 :             fMfe->fRunStopRequested = true;
    2156              :          }
    2157              :       }
    2158              :    }
    2159              : 
    2160            0 :    return TMFeOk();
    2161            0 : }
    2162              : 
    2163            0 : TMFeResult TMFeEquipment::EqWriteEventToOdb(const char* event)
    2164              : {
    2165            0 :    std::lock_guard<std::mutex> guard(fEqMutex);
    2166            0 :    return EqWriteEventToOdb_locked(event);
    2167            0 : }
    2168              : 
    2169            0 : TMFeResult TMFeEquipment::EqWriteEventToOdb_locked(const char* event)
    2170              : {
    2171            0 :    std::string path = "";
    2172            0 :    path += "/Equipment/";
    2173            0 :    path += fEqName;
    2174            0 :    path += "/Variables";
    2175              : 
    2176            0 :    HNDLE hKeyVar = 0;
    2177              : 
    2178            0 :    int status = db_find_key(fMfe->fDB, 0, path.c_str(), &hKeyVar);
    2179            0 :    if (status != DB_SUCCESS) {
    2180            0 :       return TMFeMidasError(msprintf("Cannot find \"%s\" in ODB", path.c_str()), "db_find_key", status);
    2181              :    }
    2182              : 
    2183            0 :    status = cm_write_event_to_odb(fMfe->fDB, hKeyVar, (const EVENT_HEADER*) event, FORMAT_MIDAS);
    2184            0 :    if (status != SUCCESS) {
    2185            0 :       return TMFeMidasError("Cannot write event to ODB", "cm_write_event_to_odb", status);
    2186              :    }
    2187            0 :    return TMFeOk();
    2188            0 : }
    2189              : 
    2190            0 : int TMFeEquipment::BkSize(const char* event) const
    2191              : {
    2192            0 :    return bk_size(event + sizeof(EVENT_HEADER));
    2193              : }
    2194              : 
    2195            0 : TMFeResult TMFeEquipment::BkInit(char* event, size_t size) const
    2196              : {
    2197            0 :    bk_init32a(event + sizeof(EVENT_HEADER));
    2198            0 :    return TMFeOk();
    2199              : }
    2200              : 
    2201            0 : void* TMFeEquipment::BkOpen(char* event, const char* name, int tid) const
    2202              : {
    2203              :    void* ptr;
    2204            0 :    bk_create(event + sizeof(EVENT_HEADER), name, tid, &ptr);
    2205            0 :    return ptr;
    2206              : }
    2207              : 
    2208            0 : TMFeResult TMFeEquipment::BkClose(char* event, void* ptr) const
    2209              : {
    2210            0 :    bk_close(event + sizeof(EVENT_HEADER), ptr);
    2211            0 :    ((EVENT_HEADER*)event)->data_size = BkSize(event);
    2212            0 :    return TMFeOk();
    2213              : }
    2214              : 
    2215            0 : TMFeResult TMFeEquipment::EqSetStatus(char const* eq_status, char const* eq_color)
    2216              : {
    2217            0 :    if (eq_status) {
    2218            0 :       fOdbEqCommon->WS("Status", eq_status, 256);
    2219              :    }
    2220              : 
    2221            0 :    if (eq_color) {
    2222            0 :       fOdbEqCommon->WS("Status color", eq_color, NAME_LENGTH);
    2223              :    }
    2224              : 
    2225            0 :    return TMFeOk();
    2226              : }
    2227              : 
    2228            0 : TMFeResult TMFE::TriggerAlarm(const char* name, const char* message, const char* aclass)
    2229              : {
    2230            0 :    int status = al_trigger_alarm(name, message, aclass, message, AT_INTERNAL);
    2231              : 
    2232            0 :    if (status) {
    2233            0 :       return TMFeMidasError("Cannot trigger alarm", "al_trigger_alarm", status);
    2234              :    }
    2235              : 
    2236            0 :    return TMFeOk();
    2237              : }
    2238              : 
    2239            0 : TMFeResult TMFE::ResetAlarm(const char* name)
    2240              : {
    2241            0 :    int status = al_reset_alarm(name);
    2242              : 
    2243            0 :    if (status) {
    2244            0 :       return TMFeMidasError("Cannot reset alarm", "al_reset_alarm", status);
    2245              :    }
    2246              : 
    2247            0 :    return TMFeOk();
    2248              : }
    2249              : 
    2250            0 : void TMFrontend::FeUsage(const char* argv0)
    2251              : {
    2252            0 :    fprintf(stderr, "\n");
    2253            0 :    fprintf(stderr, "Usage: %s args... [-- equipment args...]\n", argv0);
    2254            0 :    fprintf(stderr, "\n");
    2255            0 :    fprintf(stderr, " --help -- print this help message\n");
    2256            0 :    fprintf(stderr, " -h -- print this help message\n");
    2257            0 :    fprintf(stderr, " -v -- report all activities\n");
    2258            0 :    fprintf(stderr, "\n");
    2259            0 :    fprintf(stderr, " -h hostname[:tcpport] -- connect to MIDAS mserver on given host and tcp port number\n");
    2260            0 :    fprintf(stderr, " -e exptname -- connect to given MIDAS experiment\n");
    2261            0 :    fprintf(stderr, "\n");
    2262            0 :    fprintf(stderr, " -D -- Become a daemon\n");
    2263            0 :    fprintf(stderr, " -O -- Become a daemon but keep stdout for saving in a log file: frontend -O >file.log 2>&1\n");
    2264            0 :    fprintf(stderr, "\n");
    2265            0 :    fprintf(stderr, " -i NNN -- Set frontend index number\n");
    2266            0 :    fprintf(stderr, "\n");
    2267              : 
    2268              :    // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleUsage() modifies fEquipments. K.O.
    2269            0 :    for (unsigned i=0; i<fFeEquipments.size(); i++) {
    2270            0 :       if (!fFeEquipments[i])
    2271            0 :          continue;
    2272            0 :       fprintf(stderr, "Usage of equipment \"%s\":\n", fFeEquipments[i]->fEqName.c_str());
    2273            0 :       fprintf(stderr, "\n");
    2274            0 :       fFeEquipments[i]->HandleUsage();
    2275            0 :       fprintf(stderr, "\n");
    2276              :    }
    2277            0 : }
    2278              : 
    2279            0 : int TMFrontend::FeMain(int argc, char* argv[])
    2280              : {
    2281            0 :    std::vector<std::string> args;
    2282            0 :    for (int i=0; i<argc; i++) {
    2283            0 :       args.push_back(argv[i]);
    2284              :    }
    2285              : 
    2286            0 :    return FeMain(args);
    2287            0 : }
    2288              : 
    2289            0 : TMFeResult TMFrontend::FeInit(const std::vector<std::string> &args)
    2290              : {
    2291            0 :    setbuf(stdout, NULL);
    2292            0 :    setbuf(stderr, NULL);
    2293              : 
    2294            0 :    signal(SIGPIPE, SIG_IGN);
    2295              : 
    2296            0 :    std::vector<std::string> eq_args;
    2297              : 
    2298            0 :    bool help = false;
    2299            0 :    std::string exptname;
    2300            0 :    std::string hostname;
    2301            0 :    bool daemon0 = false;
    2302            0 :    bool daemon1 = false;
    2303              : 
    2304            0 :    for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
    2305              :       //printf("argv[%d] is %s\n", i, args[i].c_str());
    2306            0 :       if (args[i] == "--") {
    2307              :          // remaining arguments are passed to equipment Init()
    2308            0 :          for (unsigned j=i+1; j<args.size(); j++)
    2309            0 :             eq_args.push_back(args[j]);
    2310            0 :          break;
    2311            0 :       } else if (args[i] == "-v") {
    2312            0 :          TMFE::gfVerbose = true;
    2313            0 :       } else if (args[i] == "-D") {
    2314            0 :          daemon0 = true;
    2315            0 :       } else if (args[i] == "-O") {
    2316            0 :          daemon1 = true;
    2317            0 :       } else if (args[i] == "-h") {
    2318            0 :          i++;
    2319            0 :          if (i >= args.size()) { help = true; break; }
    2320            0 :          hostname = args[i];
    2321            0 :       } else if (args[i] == "-e") {
    2322            0 :          i++;
    2323            0 :          if (i >= args.size()) { help = true; break; }
    2324            0 :          exptname = args[i];
    2325            0 :       } else if (args[i] == "-i") {
    2326            0 :          i++;
    2327            0 :          if (i >= args.size()) { help = true; break; }
    2328            0 :          fFeIndex = atoi(args[i].c_str());
    2329            0 :       } else if (args[i] == "--help") {
    2330            0 :          help = true;
    2331            0 :          break;
    2332            0 :       } else if (args[i][0] == '-') {
    2333            0 :          help = true;
    2334            0 :          break;
    2335              :       } else {
    2336            0 :          help = true;
    2337            0 :          break;
    2338              :       }
    2339              :    }
    2340              : 
    2341              :    //
    2342              :    // daemonize...
    2343              :    //
    2344              : 
    2345            0 :    if (daemon0) {
    2346            0 :       printf("Becoming a daemon...\n");
    2347            0 :       ss_daemon_init(FALSE);
    2348            0 :    } else if (daemon1) {
    2349            0 :       printf("Becoming a daemon...\n");
    2350            0 :       ss_daemon_init(TRUE);
    2351              :    }
    2352              : 
    2353              :    //
    2354              :    // apply frontend index to indexed frontend
    2355              :    //
    2356              : 
    2357            0 :    if (fMfe->fProgramName.find("%") != std::string::npos) {
    2358            0 :       fMfe->fProgramName = msprintf(fMfe->fProgramName.c_str(), fFeIndex);
    2359              :    }
    2360              :       
    2361            0 :    TMFeResult r;
    2362              : 
    2363              :    // call arguments handler before calling the usage handlers. Otherwise,
    2364              :    // if the arguments handler creates new equipments,
    2365              :    // we will never see their Usage(). K.O.
    2366            0 :    r = HandleArguments(eq_args);
    2367              : 
    2368            0 :    if (r.error_flag) {
    2369            0 :       fprintf(stderr, "Fatal error: arguments handler error: %s, bye.\n", r.error_message.c_str());
    2370            0 :       fMfe->Disconnect();
    2371            0 :       exit(1);
    2372              :    }
    2373              : 
    2374            0 :    if (help) {
    2375            0 :       FeUsage(args[0].c_str());
    2376            0 :       HandleUsage();
    2377            0 :       fMfe->Disconnect();
    2378            0 :       exit(1);
    2379              :    }
    2380              : 
    2381            0 :    r = fMfe->Connect(NULL, hostname.c_str(), exptname.c_str());
    2382              : 
    2383            0 :    if (r.error_flag) {
    2384            0 :       fprintf(stderr, "Fatal error: cannot connect to MIDAS, error: %s, bye.\n", r.error_message.c_str());
    2385            0 :       fMfe->Disconnect();
    2386            0 :       exit(1);
    2387              :    }
    2388              : 
    2389            0 :    r = HandleFrontendInit(eq_args);
    2390              : 
    2391            0 :    if (r.error_flag) {
    2392            0 :       fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
    2393            0 :       fMfe->Disconnect();
    2394            0 :       exit(1);
    2395              :    }
    2396              : 
    2397            0 :    fFeRpcHelper = new TMFrontendRpcHelper(this);
    2398            0 :    fMfe->AddRpcHandler(fFeRpcHelper);
    2399              : 
    2400              :    //mfe->SetWatchdogSec(0);
    2401              :    //mfe->SetTransitionSequenceStart(910);
    2402              :    //mfe->SetTransitionSequenceStop(90);
    2403              :    //mfe->DeregisterTransitionPause();
    2404              :    //mfe->DeregisterTransitionResume();
    2405              :    //mfe->RegisterTransitionStartAbort();
    2406              : 
    2407            0 :    r = FeInitEquipments(eq_args);
    2408              : 
    2409            0 :    if (r.error_flag) {
    2410            0 :       fprintf(stderr, "Cannot initialize equipments, error message: %s, bye.\n", r.error_message.c_str());
    2411            0 :       fMfe->Disconnect();
    2412            0 :       exit(1);
    2413              :    }
    2414              : 
    2415            0 :    r = HandleFrontendReady(eq_args);
    2416              : 
    2417            0 :    if (r.error_flag) {
    2418            0 :       fprintf(stderr, "Fatal error: frontend post-init error: %s, bye.\n", r.error_message.c_str());
    2419            0 :       fMfe->Disconnect();
    2420            0 :       exit(1);
    2421              :    }
    2422              : 
    2423            0 :    if (fMfe->fStateRunning) {
    2424            0 :       if (fFeIfRunningCallExit) {
    2425            0 :          fprintf(stderr, "Fatal error: Cannot start frontend, run is in progress!\n");
    2426            0 :          fMfe->Disconnect();
    2427            0 :          exit(1);
    2428            0 :       } else if (fFeIfRunningCallBeginRun) {
    2429              :          char errstr[TRANSITION_ERROR_STRING_LENGTH];
    2430            0 :          tr_start(fMfe->fRunNumber, errstr);
    2431              :       }
    2432              :    }
    2433              : 
    2434            0 :    return TMFeOk();
    2435            0 : }
    2436              : 
    2437            0 : void TMFrontend::FeMainLoop()
    2438              : {
    2439            0 :    while (!fMfe->fShutdownRequested) {
    2440            0 :       FePollMidas(0.100);
    2441              :    }
    2442            0 : }
    2443              :    
    2444            0 : void TMFrontend::FeShutdown()
    2445              : {
    2446            0 :    fMfe->StopRpcThread();
    2447            0 :    FeStopPeriodicThread();
    2448            0 :    FeStopEquipmentPollThreads();
    2449            0 :    HandleFrontendExit();
    2450            0 :    FeDeleteEquipments();
    2451            0 :    fMfe->Disconnect();
    2452            0 : }
    2453              : 
    2454            0 : int TMFrontend::FeMain(const std::vector<std::string> &args)
    2455              : {
    2456            0 :    TMFeResult r = FeInit(args);
    2457              : 
    2458            0 :    if (r.error_flag) {
    2459            0 :       fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
    2460            0 :       fMfe->Disconnect();
    2461            0 :       exit(1);
    2462              :    }
    2463              : 
    2464            0 :    FeMainLoop();
    2465            0 :    FeShutdown();
    2466              : 
    2467            0 :    return 0;
    2468            0 : }
    2469              : 
    2470              : // singleton instance
    2471              : TMFE* TMFE::gfMFE = NULL;
    2472              : 
    2473              : // static data members
    2474              : bool TMFE::gfVerbose = false;
    2475              : 
    2476              : /* emacs
    2477              :  * Local Variables:
    2478              :  * tab-width: 8
    2479              :  * c-basic-offset: 3
    2480              :  * indent-tabs-mode: nil
    2481              :  * End:
    2482              :  */
        

Generated by: LCOV version 2.0-1