If there's some template for writing a client to access event data, that would be
very useful (and you can probably just ignore the context I gave below in that
case).
Some context:
Quite a while ago, I wrote the attached "data pipeline" client whose job was to
listen for events, copy their data, and pipe them to a python script. I believe I
just stole bits and pieces from mdump.cxx to accomplish this. Later I wrote the
attached wrapper class "MidasConnector.cpp" and a main.cpp to generalize
data_pipeline.cxx a bit. There were a lot of iterations to the code where I had the
below problems; so don't take the logic in the attached code as the exact code that
caused the issues below.
However, I'm unable to resolve a couple issues:
1. If a timeout is set, everything will work until that timeout is reached. Then
regardless of what kind of logic I tried to implement (retry receiving event,
disconnect and reconnect client, etc.) the client would refuse to receive more data.
2. When I ctrl-C main, it hangs; this is expected because it's stuck in a while
loop. But because I can't set a timeout I have to ctrl-C twice; this would
occasionally corrupt the ODB which was not ideal. I was able to get around this with
some impractical solution involving ncurses I believe.
Thanks,
Jack |
#include "midas.h"
#include "msystem.h"
#include "mrpc.h"
#include "mdsupport.h"
#include <iostream>
#include <unistd.h>
#include <stdio.h> // Added for popen
#include <stdlib.h> // Added for malloc and free
INT hBufEvent;
void process_event(EVENT_HEADER *pheader) {
printf("Received event #%d\n", pheader->serial_number);
printf("Event ID: %d\n", pheader->event_id);
printf("Data Size: %d bytes\n", pheader->data_size);
printf("Timestamp: %d\n", pheader->time_stamp);
printf("Trigger mask: %d\n", pheader->trigger_mask);
// Print a marker to indicate the start of serialized data
printf("EVENT_DATA_START\n");
// Serialize and print the event data
int* eventData = (int*)((char*)pheader + sizeof(EVENT_HEADER));
int numIntegers = (pheader->data_size - sizeof(EVENT_HEADER)) / sizeof(int);
for (int i = 0; i < 8; ++i) {
printf("%d ", eventData[i]);
}
printf("\n");
// Process the event here
}
int main() {
HNDLE hDB, hKey;
char host_name[HOST_NAME_LENGTH], expt_name[NAME_LENGTH], str[80];
char buf_name[32] = EVENT_BUFFER_NAME, rep_file[128];
unsigned int status, start_time, stop_time;
INT ch, request_id, size, get_flag, action, single, i;
// Define the maximum event size you expect to receive
INT max_event_size = 4000;
// Allocate memory for storing event data dynamically
void* event_data = malloc(max_event_size);
printf("1\n");
/* Get if existing the pre-defined experiment */
cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
// Print host_name
printf("host_name = %s\n", host_name);
// Print expt_name
printf("expt_name = %s\n", expt_name);
printf("2\n");
/* connect to the experiment */
status = cm_connect_experiment(host_name, expt_name, "data_pipeline", 0);
if (status != CM_SUCCESS) {
return 1;
}
printf("3\n");
status = bm_open_buffer(buf_name, DEFAULT_BUFFER_SIZE, &hBufEvent);
if (status != BM_SUCCESS && status != BM_CREATED) {
cm_msg(MERROR, "data_pipeline", "Cannot open buffer \"%s\", bm_open_buffer() status %d", buf_name, status);
return 1;
}
printf("4\n");
/* set the buffer cache size if requested */
bm_set_cache_size(hBufEvent, 100000, 0);
printf("5\n");
/* place a request for a specific event id */
status = bm_request_event(hBufEvent, EVENTID_ALL, TRIGGER_ALL, GET_ALL, &request_id, NULL); // Use NULL as the callback routine
printf("6\n");
printf("status = %d\n",status);
// Open a pipe to a Python script for data transfer
FILE* pipe = popen("python3 data_pipeline.py", "w");
if (pipe == NULL) {
perror("popen");
return 1;
}
// Enter the event processing loop
while (1) {
// Use the address of max_event_size in bm_receive_event
status = bm_receive_event(hBufEvent, event_data, &max_event_size, BM_WAIT); // Wait for new data indefinitely
if (status == BM_SUCCESS) {
//process_event((EVENT_HEADER*)((char*)event_data + sizeof(EVENT_HEADER)));
// Send the event data to the Python script via the pipe
fprintf(pipe, "EVENT_DATA_START\n");
int* eventData = (int*)((char*)event_data + sizeof(EVENT_HEADER));
int numIntegers = (max_event_size - sizeof(EVENT_HEADER)) / sizeof(int);
for (int i = 4; i < 12; ++i) {
fprintf(pipe, "%d ", eventData[i]);
}
fprintf(pipe, "\n");
fflush(pipe); // Flush the buffer to ensure data is sent immediately
} else {
printf("Error receiving event: %d\n", status);
break; // Exit the loop if an error occurs
}
}
// Close the pipe
pclose(pipe);
// Free the dynamically allocated memory
free(event_data);
cm_disconnect_experiment();
printf("7\n");
return 1;
}
|
#include "MidasConnector.h"
MidasConnector::MidasConnector(const char* clientName) {
// Initialize client name
strncpy(client_name_, clientName, NAME_LENGTH);
// Get host name and experiment name from environment
cm_get_environment(host_name_, sizeof(host_name_), experiment_name_, sizeof(experiment_name_));
// Initialize other private variables if needed
event_id = EVENTID_ALL; // Initialize with default value
trigger_mask = TRIGGER_ALL; // Initialize with default value
sampling_type = GET_ALL; // Initialize with default value (renamed from get_flags)
buffer_size = DEFAULT_BUFFER_SIZE; // Initialize with default value
timeout_millis = BM_WAIT;
strncpy(buffer_name, EVENT_BUFFER_NAME, sizeof(buffer_name)); // Initialize with default value
}
// Getters for the private variables
short MidasConnector::getEventId() const {
return event_id;
}
short MidasConnector::getTriggerMask() const {
return trigger_mask;
}
int MidasConnector::getSamplingType() const {
return sampling_type;
}
int MidasConnector::getBufferSize() const {
return buffer_size;
}
const char* MidasConnector::getBufferName() const {
return buffer_name;
}
int MidasConnector::getTimeout() const {
return timeout_millis;
}
HNDLE MidasConnector::getEventBufferHandle() const {
return hBufEvent;
}
// Setters for the private variables
void MidasConnector::setEventId(short eventId) {
event_id = eventId;
}
void MidasConnector::setTriggerMask(short triggerMask) {
trigger_mask = triggerMask;
}
void MidasConnector::setSamplingType(int samplingType) {
sampling_type = samplingType;
}
void MidasConnector::setBufferSize(int bufferSize) {
buffer_size = bufferSize;
}
void MidasConnector::setBufferName(const char* bufferName) {
strncpy(buffer_name, bufferName, sizeof(buffer_name));
}
void MidasConnector::setTimeout(int timeoutMillis) {
timeout_millis = timeoutMillis;
}
void MidasConnector::setEventBufferHandle(HNDLE eventBufferHandle) {
hBufEvent = eventBufferHandle;
}
bool MidasConnector::ConnectToExperiment() {
// Connect to the experiment
int status = cm_connect_experiment(host_name_, experiment_name_, client_name_, NULL);
if (status != CM_SUCCESS) {
// Handle connection error
return false;
}
return true;
}
void MidasConnector::DisconnectFromExperiment() {
// Disconnect from the experiment
cm_disconnect_experiment();
}
bool MidasConnector::OpenEventBuffer() {
int status = bm_open_buffer(buffer_name, buffer_size, &hBufEvent);
if (status != BM_SUCCESS && status != BM_CREATED) {
cm_msg(MERROR, client_name_, "Cannot open buffer \"%s\", bm_open_buffer() status %d", buffer_name, status);
return false;
}
return true;
}
bool MidasConnector::SetCacheSize(int cacheSize) {
bm_set_cache_size(hBufEvent, cacheSize, 0);
return true;
}
bool MidasConnector::RequestEvent() {
int request_id;
int status = bm_request_event(hBufEvent, event_id, trigger_mask, sampling_type, &request_id, NULL);
return status == BM_SUCCESS;
}
bool MidasConnector::ReceiveEvent(void* eventBuffer, int& maxEventSize) {
int status = bm_receive_event(hBufEvent, eventBuffer, &maxEventSize, timeout_millis);
return status == BM_SUCCESS;
}
|