Back Midas Rome Roody Rootana
  Midas DAQ System  Not logged in ELOG logo
Entry  05 Nov 2024, Jack Carlton, Forum, How to properly write a client listens for events on a given buffer? data_pipeline_(2).cxxMidasConnector.cppmain.cpp
    Reply  05 Nov 2024, Maia Henriksson-Ward, Forum, How to properly write a client listens for events on a given buffer? 
Message ID: 2885     Entry time: 05 Nov 2024     Reply to this: 2886
Author: Jack Carlton 
Topic: Forum 
Subject: How to properly write a client listens for events on a given buffer? 
If there's some template for writing a client to access event data, that would be 
very useful (and you can probably just ignore the context I gave below in that 
case).


Some context:

Quite a while ago, I wrote the attached "data pipeline" client whose job was to 
listen for events, copy their data, and pipe them to a python script. I believe I 
just stole bits and pieces from mdump.cxx to accomplish this. Later I wrote the 
attached wrapper class "MidasConnector.cpp" and a main.cpp to generalize
data_pipeline.cxx a bit. There were a lot of iterations to the code where I had the 
below problems; so don't take the logic in the attached code as the exact code that 
caused the issues below.

However, I'm unable to resolve a couple issues:

1. If a timeout is set, everything will work until that timeout is reached. Then 
regardless of what kind of logic I tried to implement (retry receiving event, 
disconnect and reconnect client, etc.) the client would refuse to receive more data.

2. When I ctrl-C main, it hangs; this is expected because it's stuck in a while 
loop. But because I can't set a timeout I have to ctrl-C twice; this would 
occasionally corrupt the ODB which was not ideal. I was able to get around this with 
some impractical solution involving ncurses I believe.


Thanks,
Jack
Attachment 1: data_pipeline_(2).cxx  3 kB  | Hide | Hide all | Show all
#include "midas.h"
#include "msystem.h"
#include "mrpc.h"
#include "mdsupport.h"
#include <iostream>
#include <unistd.h>
#include <stdio.h>  // Added for popen
#include <stdlib.h> // Added for malloc and free

INT hBufEvent;

void process_event(EVENT_HEADER *pheader) {
    printf("Received event #%d\n", pheader->serial_number);
    printf("Event ID: %d\n", pheader->event_id);
    printf("Data Size: %d bytes\n", pheader->data_size);
    printf("Timestamp: %d\n", pheader->time_stamp);
    printf("Trigger mask: %d\n", pheader->trigger_mask);

    // Print a marker to indicate the start of serialized data
    printf("EVENT_DATA_START\n");

    // Serialize and print the event data
    int* eventData = (int*)((char*)pheader + sizeof(EVENT_HEADER));
    int numIntegers = (pheader->data_size - sizeof(EVENT_HEADER)) / sizeof(int);
    for (int i = 0; i < 8; ++i) {
        printf("%d ", eventData[i]);
    }
    printf("\n");

    // Process the event here
}

int main() {
    HNDLE hDB, hKey;
    char host_name[HOST_NAME_LENGTH], expt_name[NAME_LENGTH], str[80];
    char buf_name[32] = EVENT_BUFFER_NAME, rep_file[128];
    unsigned int status, start_time, stop_time;
    INT ch, request_id, size, get_flag, action, single, i;

    // Define the maximum event size you expect to receive
    INT max_event_size = 4000;

    // Allocate memory for storing event data dynamically
    void* event_data = malloc(max_event_size);

    printf("1\n");
    /* Get if existing the pre-defined experiment */
    cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
    // Print host_name
    printf("host_name = %s\n", host_name);

    // Print expt_name
    printf("expt_name = %s\n", expt_name);

    printf("2\n");
    /* connect to the experiment */
    status = cm_connect_experiment(host_name, expt_name, "data_pipeline", 0);
    if (status != CM_SUCCESS) {
        return 1;
    }
    printf("3\n");

    status = bm_open_buffer(buf_name, DEFAULT_BUFFER_SIZE, &hBufEvent);
    if (status != BM_SUCCESS && status != BM_CREATED) {
        cm_msg(MERROR, "data_pipeline", "Cannot open buffer \"%s\", bm_open_buffer() status %d", buf_name, status);
        return 1;
    }
    printf("4\n");
    /* set the buffer cache size if requested */
    bm_set_cache_size(hBufEvent, 100000, 0);
    printf("5\n");

    /* place a request for a specific event id */
    status = bm_request_event(hBufEvent, EVENTID_ALL, TRIGGER_ALL, GET_ALL, &request_id, NULL); // Use NULL as the callback routine
    printf("6\n");
    printf("status = %d\n",status);
    // Open a pipe to a Python script for data transfer
    FILE* pipe = popen("python3 data_pipeline.py", "w");
    if (pipe == NULL) {
        perror("popen");
        return 1;
    }

    // Enter the event processing loop
    while (1) {
        // Use the address of max_event_size in bm_receive_event
        status = bm_receive_event(hBufEvent, event_data, &max_event_size, BM_WAIT); // Wait for new data indefinitely

        if (status == BM_SUCCESS) {
            //process_event((EVENT_HEADER*)((char*)event_data + sizeof(EVENT_HEADER)));
            // Send the event data to the Python script via the pipe
            fprintf(pipe, "EVENT_DATA_START\n");
            int* eventData = (int*)((char*)event_data + sizeof(EVENT_HEADER));
            int numIntegers = (max_event_size - sizeof(EVENT_HEADER)) / sizeof(int);
            for (int i = 4; i < 12; ++i) {
                fprintf(pipe, "%d ", eventData[i]);
            }
            fprintf(pipe, "\n");
            fflush(pipe); // Flush the buffer to ensure data is sent immediately
        } else {
            printf("Error receiving event: %d\n", status);
            break; // Exit the loop if an error occurs
        }
    }

    // Close the pipe
    pclose(pipe);

    // Free the dynamically allocated memory
    free(event_data);

    cm_disconnect_experiment();

    printf("7\n");
    return 1;
}
Attachment 2: MidasConnector.cpp  3 kB  | Show | Hide all | Show all
Attachment 3: main.cpp  3 kB  | Show | Hide all | Show all
ELOG V3.1.4-2e1708b5