LCOV - code coverage report
Current view: top level - examples/lowlevel - consume.cxx (source / functions) Coverage Total Hit
Test: coverage.info Lines: 0.0 % 75 0
Test Date: 2025-11-11 10:26:08 Functions: 0.0 % 4 0

            Line data    Source code
       1              : /********************************************************************\
       2              : 
       3              :   Name:         consume.c
       4              :   Created by:   Stefan Ritt
       5              : 
       6              :   Contents:     Buffer manager test program. Simple consumer connec-
       7              :                 ting to a SYSTEM buffer and receiving some data.
       8              : 
       9              :   $Id$
      10              : 
      11              : \********************************************************************/
      12              : 
      13              : #include <stdio.h>
      14              : #include <stdlib.h>
      15              : #include "midas.h"
      16              : 
      17              : #define MAX_EVENT_SIZE (10*1024*1024)
      18              : 
      19              : INT all_flag;
      20              : INT hBufEvent;
      21              : INT event_byte_count;
      22              : INT count_mismatches = 0;
      23              : char event_buffer[MAX_EVENT_SIZE];
      24              : 
      25              : /*----- Print any system message -----------------------------------*/
      26              : 
      27            0 : void process_message(HNDLE hBuf, HNDLE id, EVENT_HEADER * pheader, void *message)
      28              : {
      29              :    /* just print message text which comes after event header */
      30            0 :    printf("%s\n", (char *) message);
      31            0 : }
      32              : 
      33              : /*----- Print any run transition -----------------------------------*/
      34              : 
      35            0 : INT transition(INT run_number, char *error)
      36              : {
      37            0 :    printf("Transition, run #%d\n", run_number);
      38            0 :    return CM_SUCCESS;
      39              : }
      40              : 
      41              : /*----- process_event ----------------------------------------------*/
      42              : 
      43            0 : void process_event(HNDLE hBuf, HNDLE request_id, EVENT_HEADER * pheader, void *pevent)
      44              : {
      45              :    static int ser[10], jam = 0;
      46              : 
      47              :    int size, *pdata, id;
      48              : 
      49              :    /* accumulate received event size */
      50            0 :    size = pheader->data_size;
      51            0 :    id = pheader->event_id;
      52            0 :    if (id > 9)
      53            0 :       id = 9;
      54            0 :    event_byte_count += size + sizeof(EVENT_HEADER);
      55              : 
      56              :    /* check if first and last word inside event is equal
      57              :       to size to check that nothing was overwritten... */
      58              : 
      59            0 :    if (!jam) {
      60              :       /* only test once */
      61            0 :       pdata = (INT *) (pheader + 1);
      62            0 :       if (pdata[0] != size || pdata[size / 4 - 1] != size)
      63            0 :          cm_msg(MERROR, "process_event", "--> data jam <--");
      64            0 :       jam = 1;
      65              :    }
      66              : 
      67              :    /* if only some events are requested, sleep a little bit
      68              :       to simulate a random event consumer */
      69            0 :    if (!all_flag)
      70            0 :       ss_sleep(10);
      71              : 
      72              :    /* if all events are requested, now check the serial number
      73              :       if no events are missing */
      74            0 :    if (all_flag && (INT) pheader->serial_number != ser[id] + 1) {
      75            0 :       cm_msg(MERROR, "process_event",
      76              :              "Serial number mismatch: Ser: %d, OldSer: %d, ID: %d, size: %d",
      77            0 :              pheader->serial_number, ser[id], pheader->event_id, pheader->data_size);
      78            0 :       count_mismatches ++;
      79              :    }
      80              : 
      81            0 :    ser[id] = pheader->serial_number;
      82            0 : }
      83              : 
      84              : /*------------------------------------------------------------------*/
      85              : 
      86            0 : int main()
      87              : {
      88              :    INT start_time, stop_time;
      89              :    double rate;
      90              :    BUFFER_HEADER buffer_header;
      91              :    INT status, size, trans, run_number;
      92              :    char host_name[256], str[32];
      93              :    INT event_id, request_id;
      94              :    DWORD last_time;
      95              :    BOOL via_callback;
      96              : 
      97              :    /* get parameters */
      98              : 
      99            0 :    printf("ID of event to request: ");
     100            0 :    ss_gets(str, 32);
     101            0 :    event_id = atoi(str);
     102              : 
     103            0 :    printf("Host to connect: ");
     104            0 :    ss_gets(host_name, 256);
     105              : 
     106            0 :    printf("Get all events (0/1): ");
     107            0 :    ss_gets(str, 32);
     108            0 :    all_flag = atoi(str);
     109              : 
     110            0 :    printf("Receive via callback ([y]/n): ");
     111            0 :    ss_gets(str, 32);
     112            0 :    via_callback = str[0] != 'n';
     113              : 
     114              :    /* connect to experiment */
     115            0 :    status = cm_connect_experiment(host_name, "",
     116            0 :                                   all_flag ? "Power Consumer" : "Consumer", NULL);
     117            0 :    if (status != CM_SUCCESS)
     118            0 :       return 1;
     119              : 
     120              :    /* open the "system" buffer, 1M size */
     121            0 :    bm_open_buffer("SYSTEM", 2*MAX_EVENT_SIZE, &hBufEvent);
     122              : 
     123              :    /* set the buffer cache size */
     124            0 :    bm_set_cache_size(hBufEvent, 100000, 0);
     125              : 
     126              :    /* place a request for a specific event id */
     127            0 :    bm_request_event(hBufEvent, (WORD) event_id, TRIGGER_ALL,
     128            0 :                     all_flag ? GET_ALL : GET_NONBLOCKING, &request_id,
     129              :                     via_callback ? process_event : NULL);
     130              : 
     131              :    /* place a request for system messages */
     132            0 :    cm_msg_register(process_message);
     133              : 
     134              :    /* place a request for transition notification */
     135            0 :    cm_register_transition(TR_START, via_callback ? transition : NULL, 500);
     136              : 
     137            0 :    last_time = 0;
     138            0 :    start_time = 0;
     139            0 :    event_byte_count = 0;
     140              : 
     141              :    do {
     142            0 :       if (via_callback)
     143            0 :          status = cm_yield(1000);
     144              :       else {
     145              :          /* receive event "manually" and call receive_event */
     146            0 :          size = sizeof(event_buffer);
     147            0 :          status = bm_receive_event(hBufEvent, event_buffer, &size, BM_NO_WAIT);
     148            0 :          if (status == BM_SUCCESS)
     149            0 :             process_event(hBufEvent, request_id, (EVENT_HEADER *) event_buffer,
     150              :                           (void *) (((EVENT_HEADER *) event_buffer) + 1));
     151              : 
     152              :          /* receive transitions "manually" */
     153            0 :          if (cm_query_transition(&trans, &run_number, NULL))
     154            0 :             transition(run_number, NULL);
     155              : 
     156              :          /* call yield once every 100 ms */
     157            0 :          if (ss_millitime() - last_time > 100) {
     158            0 :             last_time = ss_millitime();
     159            0 :             status = cm_yield(0);
     160              :          }
     161              :       }
     162              : 
     163              :       /* calculate rates each second */
     164            0 :       if (ss_millitime() - start_time > 1000) {
     165            0 :          stop_time = ss_millitime();
     166            0 :          rate = event_byte_count / 1024.0 / 1024.0 / ((stop_time - start_time) / 1000.0);
     167              : 
     168              :          /* get information about filling level of the buffer */
     169            0 :          bm_get_buffer_info(hBufEvent, &buffer_header);
     170            0 :          size = buffer_header.read_pointer - buffer_header.write_pointer;
     171            0 :          if (size <= 0)
     172            0 :             size += buffer_header.size;
     173            0 :          printf("Level: %5.1lf %%, ", 100 - 100.0 * size / buffer_header.size);
     174              : 
     175            0 :          printf("Rate: %1.2lf MB/sec, ser mismatches: %d\n", rate, count_mismatches);
     176            0 :          start_time = stop_time;
     177            0 :          event_byte_count = 0;
     178              :       }
     179              : 
     180            0 :    } while (status != RPC_SHUTDOWN && status != SS_ABORT);
     181              : 
     182            0 :    cm_disconnect_experiment();
     183              : 
     184            0 :    return 1;
     185              : }
        

Generated by: LCOV version 2.0-1