LCOV - code coverage report
Current view: top level - progs - feudp.cxx (source / functions) Coverage Total Hit
Test: coverage.info Lines: 0.0 % 167 0
Test Date: 2025-11-11 10:26:08 Functions: 0.0 % 16 0

            Line data    Source code
       1              : //
       2              : // feudp.cxx
       3              : //
       4              : // Frontend for receiving and storing UDP packets as MIDAS data banks.
       5              : //
       6              : 
       7              : #include <stdio.h>
       8              : #include <netdb.h> // getnameinfo()
       9              : //#include <stdlib.h>
      10              : #include <string.h> // memcpy()
      11              : #include <errno.h> // errno
      12              : //#include <unistd.h>
      13              : //#include <time.h>
      14              : 
      15              : #include <string>
      16              : #include <vector>
      17              : 
      18              : #include "midas.h"
      19              : #include "mfe.h"
      20              : #include "mstrlcpy.h"
      21              : 
      22              : const char *frontend_name = "feudp";                     /* fe MIDAS client name */
      23              : const char *frontend_file_name = __FILE__;               /* The frontend file name */
      24              : 
      25              :    BOOL frontend_call_loop = TRUE;       /* frontend_loop called periodically TRUE */
      26              :    int display_period = 0;               /* status page displayed with this freq[ms] */
      27              :    int max_event_size = 1*1024*1024;     /* max event size produced by this frontend */
      28              :    int max_event_size_frag = 5 * 1024 * 1024;     /* max for fragmented events */
      29              :    int event_buffer_size = 8*1024*1024;           /* buffer size to hold events */
      30              : 
      31              :   int interrupt_configure(INT cmd, INT source, PTYPE adr);
      32              :   INT poll_event(INT source, INT count, BOOL test);
      33              :   int frontend_init();
      34              :   int frontend_exit();
      35              :   int begin_of_run(int run, char *err);
      36              :   int end_of_run(int run, char *err);
      37              :   int pause_run(int run, char *err);
      38              :   int resume_run(int run, char *err);
      39              :   int frontend_loop();
      40              :   int read_event(char *pevent, INT off);
      41              : 
      42              : #ifndef EQ_NAME
      43              : #define EQ_NAME "UDP"
      44              : #endif
      45              : 
      46              : #ifndef EQ_EVID
      47              : #define EQ_EVID 1
      48              : #endif
      49              : 
      50              : BOOL equipment_common_overwrite = FALSE;
      51              : 
      52              : EQUIPMENT equipment[] = {
      53              :    { EQ_NAME,                         /* equipment name */
      54              :       {EQ_EVID, 0, "SYSTEM",          /* event ID, trigger mask, Evbuf */
      55              :        EQ_MULTITHREAD, 0, "MIDAS",    /* equipment type, EventSource, format */
      56              :        TRUE, RO_ALWAYS,               /* enabled?, WhenRead? */
      57              :        50, 0, 0, 0,                   /* poll[ms], Evt Lim, SubEvtLim, LogHist */
      58              :        "", "", "",}, read_event,      /* readout routine */
      59              :    },
      60              :    {""}
      61              : };
      62              : ////////////////////////////////////////////////////////////////////////////
      63              : 
      64              : #include <sys/time.h>
      65              : 
      66            0 : static double GetTimeSec()
      67              : {
      68              :    struct timeval tv;
      69            0 :    gettimeofday(&tv,NULL);
      70            0 :    return tv.tv_sec + 0.000001*tv.tv_usec;
      71              : }
      72              : 
      73              : struct Source
      74              : {
      75              :   struct sockaddr addr;
      76              :   char bank_name[5];
      77              :   std::string host_name;
      78              : };
      79              : 
      80              : static std::vector<Source> gSrc;
      81              : 
      82              : //static HNDLE hDB;
      83              : static HNDLE hKeySet; // equipment settings
      84              : 
      85              : static int gDataSocket;
      86              : 
      87              : static int gUnknownPacketCount = 0;
      88              : static bool gSkipUnknownPackets = false;
      89              : 
      90            0 : int open_udp_socket(int server_port)
      91              : {
      92              :    int status;
      93              :    
      94            0 :    int fd = socket(AF_INET, SOCK_DGRAM, 0);
      95              :    
      96            0 :    if (fd < 0) {
      97            0 :       cm_msg(MERROR, "open_udp_socket", "socket(AF_INET,SOCK_DGRAM) returned %d, errno %d (%s)", fd, errno, strerror(errno));
      98            0 :       return -1;
      99              :    }
     100              : 
     101            0 :    int opt = 1;
     102            0 :    status = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
     103              : 
     104            0 :    if (status == -1) {
     105            0 :       cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_REUSEADDR) returned %d, errno %d (%s)", status, errno, strerror(errno));
     106            0 :       return -1;
     107              :    }
     108              : 
     109            0 :    int bufsize = 8*1024*1024;
     110              :    //int bufsize = 20*1024;
     111              : 
     112            0 :    status = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
     113              : 
     114            0 :    if (status == -1) {
     115            0 :       cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
     116            0 :       return -1;
     117              :    }
     118              : 
     119              :    struct sockaddr_in local_addr;
     120            0 :    memset(&local_addr, 0, sizeof(local_addr));
     121              : 
     122            0 :    local_addr.sin_family = AF_INET;
     123            0 :    local_addr.sin_port = htons(server_port);
     124            0 :    local_addr.sin_addr.s_addr = INADDR_ANY;
     125              : 
     126            0 :    status = bind(fd, (struct sockaddr *)&local_addr, sizeof(local_addr));
     127              : 
     128            0 :    if (status == -1) {
     129            0 :       cm_msg(MERROR, "open_udp_socket", "bind(port=%d) returned %d, errno %d (%s)", server_port, status, errno, strerror(errno));
     130            0 :       return -1;
     131              :    }
     132              : 
     133            0 :    int xbufsize = 0;
     134            0 :    unsigned size = sizeof(xbufsize);
     135              : 
     136            0 :    status = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &xbufsize, &size);
     137              : 
     138              :    //printf("status %d, xbufsize %d, size %d\n", status, xbufsize, size);
     139              : 
     140            0 :    if (status == -1) {
     141            0 :       cm_msg(MERROR, "open_udp_socket", "getsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
     142            0 :       return -1;
     143              :    }
     144              : 
     145            0 :    cm_msg(MINFO, "open_udp_socket", "UDP port %d socket receive buffer size is %d", server_port, xbufsize);
     146              : 
     147            0 :    return fd;
     148              : }
     149              : 
     150            0 : bool addr_match(const Source* s, void *addr, int addr_len)
     151              : {
     152            0 :   int v = memcmp(&s->addr, addr, addr_len);
     153              : #if 0
     154              :   for (int i=0; i<addr_len; i++)
     155              :     printf("%3d - 0x%02x 0x%02x\n", i, ((char*)&s->addr)[i], ((char*)addr)[i]);
     156              :   printf("match %d, hostname [%s] bank [%s], status %d\n", addr_len, s->host_name.c_str(), s->bank_name, v);
     157              : #endif
     158            0 :   return v==0;
     159              : }
     160              : 
     161            0 : int wait_udp(int socket, int msec)
     162              : {
     163              :    int status;
     164              :    fd_set fdset;
     165              :    struct timeval timeout;
     166              : 
     167            0 :    FD_ZERO(&fdset);
     168            0 :    FD_SET(socket, &fdset);
     169              : 
     170            0 :    timeout.tv_sec = msec/1000;
     171            0 :    timeout.tv_usec = (msec%1000)*1000;
     172              : 
     173            0 :    status = select(socket+1, &fdset, NULL, NULL, &timeout);
     174              : 
     175              : #ifdef EINTR
     176            0 :    if (status < 0 && errno == EINTR) {
     177            0 :       return 0; // watchdog interrupt, try again
     178              :    }
     179              : #endif
     180              : 
     181            0 :    if (status < 0) {
     182            0 :       cm_msg(MERROR, "wait_udp", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
     183            0 :       return -1;
     184              :    }
     185              : 
     186            0 :    if (status == 0) {
     187            0 :       return 0; // timeout
     188              :    }
     189              : 
     190            0 :    if (FD_ISSET(socket, &fdset)) {
     191            0 :       return 1; // have data
     192              :    }
     193              : 
     194              :    // timeout
     195            0 :    return 0;
     196              : }
     197              : 
     198            0 : int find_source(Source* src, const sockaddr* paddr, int addr_len)
     199              : {
     200              :    char host[NI_MAXHOST], service[NI_MAXSERV];
     201              :       
     202            0 :    int status = getnameinfo(paddr, addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICSERV);
     203              :       
     204            0 :    if (status != 0) {
     205            0 :       cm_msg(MERROR, "read_udp", "getnameinfo() returned %d (%s), errno %d (%s)", status, gai_strerror(status), errno, strerror(errno));
     206            0 :       return -1;
     207              :    }
     208              : 
     209              :    char bankname[NAME_LENGTH];
     210            0 :    int size = sizeof(bankname);
     211              :       
     212            0 :    status = db_get_value(hDB, hKeySet, host, bankname, &size, TID_STRING, FALSE);
     213              :    
     214            0 :    if (status == DB_NO_KEY) {
     215            0 :       cm_msg(MERROR, "read_udp", "UDP packet from unknown host \"%s\"", host);
     216            0 :       cm_msg(MINFO, "read_udp", "Register this host by running following commands:");
     217            0 :       cm_msg(MINFO, "read_udp", "odbedit -c \"create STRING /Equipment/%s/Settings/%s\"", EQ_NAME, host);
     218            0 :       cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
     219            0 :       return -1;
     220            0 :    } else if (status != DB_SUCCESS) {
     221            0 :       cm_msg(MERROR, "read_udp", "db_get_value(\"/Equipment/%s/Settings/%s\") status %d", EQ_NAME, host, status);
     222            0 :       return -1;
     223              :    }
     224              : 
     225            0 :    if (strlen(bankname) != 4) {
     226            0 :       cm_msg(MERROR, "read_udp", "ODB \"/Equipment/%s/Settings/%s\" should be set to a 4 character MIDAS bank name", EQ_NAME, host);
     227            0 :       cm_msg(MINFO, "read_udp", "Use this command:");
     228            0 :       cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
     229            0 :       return -1;
     230              :    }
     231              :       
     232            0 :    cm_msg(MINFO, "read_udp", "UDP packets from host \"%s\" will be stored in bank \"%s\"", host, bankname);
     233              :       
     234            0 :    src->host_name = host;
     235            0 :    mstrlcpy(src->bank_name, bankname, 5);
     236            0 :    memcpy(&src->addr, paddr, sizeof(src->addr));
     237              :    
     238            0 :    return 0;
     239              : }
     240              : 
     241            0 : int read_udp(int socket, char* buf, int bufsize, char* pbankname)
     242              : {
     243            0 :    if (wait_udp(socket, 100) < 1)
     244            0 :       return 0;
     245              : 
     246              : #if 0
     247              :    static int count = 0;
     248              :    static double tt = 0;
     249              :    double t = GetTimeSec();
     250              : 
     251              :    double dt = (t-tt)*1e6;
     252              :    count++;
     253              :    if (dt > 1000) {
     254              :       printf("read_udp: %5d %6.0f usec\n", count, dt);
     255              :       count = 0;
     256              :    }
     257              :    tt = t;
     258              : #endif
     259              : 
     260              :    struct sockaddr addr;
     261            0 :    socklen_t addr_len = sizeof(addr);
     262            0 :    int rd = recvfrom(socket, buf, bufsize, 0, &addr, &addr_len);
     263              :    
     264            0 :    if (rd < 0) {
     265            0 :       cm_msg(MERROR, "read_udp", "recvfrom() returned %d, errno %d (%s)", rd, errno, strerror(errno));
     266            0 :       return -1;
     267              :    }
     268              : 
     269            0 :    for (unsigned i=0; i<gSrc.size(); i++) {
     270            0 :       if (addr_match(&gSrc[i], &addr, addr_len)) {
     271            0 :          mstrlcpy(pbankname, gSrc[i].bank_name, 5);
     272              :          //printf("rd %d, bank [%s]\n", rd, pbankname);
     273            0 :          return rd;
     274              :       }
     275              :    }
     276              : 
     277            0 :    if (gSkipUnknownPackets)
     278            0 :       return -1;
     279              : 
     280            0 :    Source sss;
     281              : 
     282            0 :    int status = find_source(&sss, &addr, addr_len);
     283              : 
     284            0 :    if (status < 0) {
     285              : 
     286            0 :       gUnknownPacketCount++;
     287              : 
     288            0 :       if (gUnknownPacketCount > 10) {
     289            0 :          gSkipUnknownPackets = true;
     290            0 :          cm_msg(MERROR, "read_udp", "further messages are now suppressed...");
     291            0 :          return -1;
     292              :       }
     293              : 
     294            0 :       return -1;
     295              :    }
     296              : 
     297            0 :    gSrc.push_back(sss);
     298              :          
     299            0 :    mstrlcpy(pbankname, sss.bank_name, 5);
     300              :          
     301            0 :    return rd;
     302            0 : }
     303              : 
     304            0 : int interrupt_configure(INT cmd, INT source, PTYPE adr)
     305              : {
     306            0 :    return SUCCESS;
     307              : }
     308              : 
     309            0 : int frontend_init()
     310              : {
     311              :    int status;
     312              : 
     313            0 :    status = cm_get_experiment_database(&hDB, NULL);
     314            0 :    if (status != CM_SUCCESS) {
     315            0 :       cm_msg(MERROR, "frontend_init", "Cannot connect to ODB, cm_get_experiment_database() returned %d", status);
     316            0 :       return FE_ERR_ODB;
     317              :    }
     318              : 
     319            0 :    std::string path;
     320            0 :    path += "/Equipment";
     321            0 :    path += "/";
     322            0 :    path += EQ_NAME;
     323            0 :    path += "/Settings";
     324              : 
     325            0 :    std::string path1 = path + "/udp_port";
     326              : 
     327            0 :    int udp_port = 50005;
     328            0 :    int size = sizeof(udp_port);
     329            0 :    status = db_get_value(hDB, 0, path1.c_str(), &udp_port, &size, TID_INT, TRUE);
     330              :    
     331            0 :    if (status != DB_SUCCESS) {
     332            0 :       cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_get_value() returned %d", path1.c_str(), status);
     333            0 :       return FE_ERR_ODB;
     334              :    }
     335              :    
     336            0 :    status = db_find_key(hDB, 0, path.c_str(), &hKeySet);
     337              :    
     338            0 :    if (status != DB_SUCCESS) {
     339            0 :       cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_find_key() returned %d", path.c_str(), status);
     340            0 :       return FE_ERR_ODB;
     341              :    }
     342              :    
     343            0 :    gDataSocket = open_udp_socket(udp_port);
     344              :    
     345            0 :    if (gDataSocket < 0) {
     346            0 :       printf("frontend_init: cannot open udp socket\n");
     347            0 :       cm_msg(MERROR, "frontend_init", "Cannot open UDP socket for port %d", udp_port);
     348            0 :       return FE_ERR_HW;
     349              :    }
     350              : 
     351            0 :    cm_msg(MINFO, "frontend_init", "Frontend equipment \"%s\" is ready, listening on UDP port %d", EQ_NAME, udp_port);
     352            0 :    return SUCCESS;
     353            0 : }
     354              : 
     355            0 : int frontend_loop()
     356              : {
     357            0 :    ss_sleep(10);
     358            0 :    return SUCCESS;
     359              : }
     360              : 
     361            0 : int begin_of_run(int run_number, char *error)
     362              : {
     363            0 :    gUnknownPacketCount = 0;
     364            0 :    gSkipUnknownPackets = false;
     365            0 :    return SUCCESS;
     366              : }
     367              : 
     368            0 : int end_of_run(int run_number, char *error)
     369              : {
     370            0 :    return SUCCESS;
     371              : }
     372              : 
     373            0 : int pause_run(INT run_number, char *error)
     374              : {
     375            0 :    return SUCCESS;
     376              : }
     377              : 
     378            0 : int resume_run(INT run_number, char *error)
     379              : {
     380            0 :    return SUCCESS;
     381              : }
     382              : 
     383            0 : int frontend_exit()
     384              : {
     385            0 :    return SUCCESS;
     386              : }
     387              : 
     388            0 : INT poll_event(INT source, INT count, BOOL test)
     389              : {
     390              :    //printf("poll_event: source %d, count %d, test %d\n", source, count, test);
     391              : 
     392            0 :    if (test) {
     393            0 :       for (int i=0; i<count; i++)
     394            0 :          ss_sleep(10);
     395            0 :       return 1;
     396              :    }
     397              : 
     398            0 :    return 1;
     399              : }
     400              : 
     401              : #define MAX_UDP_SIZE (0x10000)
     402              : 
     403            0 : int read_event(char *pevent, int off)
     404              : {
     405              :    char buf[MAX_UDP_SIZE];
     406              :    char bankname[5];
     407              :    
     408            0 :    int length = read_udp(gDataSocket, buf, MAX_UDP_SIZE, bankname);
     409            0 :    if (length <= 0)
     410            0 :       return 0;
     411              :    
     412            0 :    bk_init32(pevent);
     413              :    char* pdata;
     414            0 :    bk_create(pevent, bankname, TID_BYTE, (void**)&pdata);
     415            0 :    memcpy(pdata, buf, length);
     416            0 :    bk_close(pevent, pdata + length);
     417            0 :    return bk_size(pevent); 
     418              : }
     419              : 
     420              : /* emacs
     421              :  * Local Variables:
     422              :  * tab-width: 8
     423              :  * c-basic-offset: 3
     424              :  * indent-tabs-mode: nil
     425              :  * End:
     426              :  */
        

Generated by: LCOV version 2.0-1