#include "midas.h" #include "msystem.h" #include "mrpc.h" #include "mdsupport.h" #include #include #include // Added for popen #include // Added for malloc and free INT hBufEvent; void process_event(EVENT_HEADER *pheader) { printf("Received event #%d\n", pheader->serial_number); printf("Event ID: %d\n", pheader->event_id); printf("Data Size: %d bytes\n", pheader->data_size); printf("Timestamp: %d\n", pheader->time_stamp); printf("Trigger mask: %d\n", pheader->trigger_mask); // Print a marker to indicate the start of serialized data printf("EVENT_DATA_START\n"); // Serialize and print the event data int* eventData = (int*)((char*)pheader + sizeof(EVENT_HEADER)); int numIntegers = (pheader->data_size - sizeof(EVENT_HEADER)) / sizeof(int); for (int i = 0; i < 8; ++i) { printf("%d ", eventData[i]); } printf("\n"); // Process the event here } int main() { HNDLE hDB, hKey; char host_name[HOST_NAME_LENGTH], expt_name[NAME_LENGTH], str[80]; char buf_name[32] = EVENT_BUFFER_NAME, rep_file[128]; unsigned int status, start_time, stop_time; INT ch, request_id, size, get_flag, action, single, i; // Define the maximum event size you expect to receive INT max_event_size = 4000; // Allocate memory for storing event data dynamically void* event_data = malloc(max_event_size); printf("1\n"); /* Get if existing the pre-defined experiment */ cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name)); // Print host_name printf("host_name = %s\n", host_name); // Print expt_name printf("expt_name = %s\n", expt_name); printf("2\n"); /* connect to the experiment */ status = cm_connect_experiment(host_name, expt_name, "data_pipeline", 0); if (status != CM_SUCCESS) { return 1; } printf("3\n"); status = bm_open_buffer(buf_name, DEFAULT_BUFFER_SIZE, &hBufEvent); if (status != BM_SUCCESS && status != BM_CREATED) { cm_msg(MERROR, "data_pipeline", "Cannot open buffer \"%s\", bm_open_buffer() status %d", buf_name, status); return 1; } printf("4\n"); /* set the buffer cache size if requested */ bm_set_cache_size(hBufEvent, 100000, 0); printf("5\n"); /* place a request for a specific event id */ status = bm_request_event(hBufEvent, EVENTID_ALL, TRIGGER_ALL, GET_ALL, &request_id, NULL); // Use NULL as the callback routine printf("6\n"); printf("status = %d\n",status); // Open a pipe to a Python script for data transfer FILE* pipe = popen("python3 data_pipeline.py", "w"); if (pipe == NULL) { perror("popen"); return 1; } // Enter the event processing loop while (1) { // Use the address of max_event_size in bm_receive_event status = bm_receive_event(hBufEvent, event_data, &max_event_size, BM_WAIT); // Wait for new data indefinitely if (status == BM_SUCCESS) { //process_event((EVENT_HEADER*)((char*)event_data + sizeof(EVENT_HEADER))); // Send the event data to the Python script via the pipe fprintf(pipe, "EVENT_DATA_START\n"); int* eventData = (int*)((char*)event_data + sizeof(EVENT_HEADER)); int numIntegers = (max_event_size - sizeof(EVENT_HEADER)) / sizeof(int); for (int i = 4; i < 12; ++i) { fprintf(pipe, "%d ", eventData[i]); } fprintf(pipe, "\n"); fflush(pipe); // Flush the buffer to ensure data is sent immediately } else { printf("Error receiving event: %d\n", status); break; // Exit the loop if an error occurs } } // Close the pipe pclose(pipe); // Free the dynamically allocated memory free(event_data); cm_disconnect_experiment(); printf("7\n"); return 1; }