MIDAS
Loading...
Searching...
No Matches
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

static int bm_validate_client_index_locked (bm_lock_buffer_guard &pbuf_guard)
 
INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char > > &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11272 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8314 of file midas.cxx.

8358{
8359 if (rpc_is_remote())
8360 return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8361 trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8362
8363#ifdef LOCAL_ROUTINES
8364 {
8365 int status = 0;
8366
8367 BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8368
8369 if (!pbuf)
8370 return status;
8371
8372 /* lock buffer */
8374
8375 if (!pbuf_guard.is_locked())
8376 return pbuf_guard.get_status();
8377
8378 /* avoid callback/non callback requests */
8379 if (func == NULL && pbuf->callback) {
8380 pbuf_guard.unlock(); // unlock before cm_msg()
8381 cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8382 return BM_INVALID_MIXING;
8383 }
8384
8385 /* do not allow GET_RECENT with nonzero cache size */
8386 if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8387 pbuf_guard.unlock(); // unlock before cm_msg()
8388 cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8389 return BM_INVALID_PARAM;
8390 }
8391
8392 /* get a pointer to the proper client structure */
8394
8395 /* look for a empty request entry */
8396 int i;
8397 for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8398 if (!pclient->event_request[i].valid)
8399 break;
8400
8401 if (i == MAX_EVENT_REQUESTS) {
8402 // implicit unlock
8403 return BM_NO_MEMORY;
8404 }
8405
8406 /* setup event_request structure */
8407 pclient->event_request[i].id = request_id;
8408 pclient->event_request[i].valid = TRUE;
8409 pclient->event_request[i].event_id = event_id;
8410 pclient->event_request[i].trigger_mask = trigger_mask;
8411 pclient->event_request[i].sampling_type = sampling_type;
8412
8413 pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8414
8415 pbuf->get_all_flag = pclient->all_flag;
8416
8417 /* set callback flag in buffer structure */
8418 if (func != NULL)
8419 pbuf->callback = TRUE;
8420
8421 /*
8422 Save the index of the last request in the list so that later only the
8423 requests 0..max_request_index-1 have to be searched through.
8424 */
8425
8426 if (i + 1 > pclient->max_request_index)
8427 pclient->max_request_index = i + 1;
8428 }
8429#endif /* LOCAL_ROUTINES */
8430
8431 return BM_SUCCESS;
8432}
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
Definition midas.cxx:5999
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition midas.cxx:6622
#define BM_INVALID_PARAM
Definition midas.h:619
#define BM_NO_MEMORY
Definition midas.h:607
#define BM_INVALID_MIXING
Definition midas.h:621
#define BM_SUCCESS
Definition midas.h:605
#define GET_ALL
Definition midas.h:321
#define GET_RECENT
Definition midas.h:323
#define MERROR
Definition midas.h:559
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
#define RPC_BM_ADD_EVENT_REQUEST
Definition mrpc.h:43
bool rpc_is_remote(void)
Definition midas.cxx:12761
INT rpc_call(DWORD routine_id,...)
Definition midas.cxx:13663
INT i
Definition mdump.cxx:32
int INT
Definition midas.h:129
#define TRUE
Definition midas.h:182
#define MAX_EVENT_REQUESTS
Definition midas.h:275
#define POINTER_T
Definition midas.h:166
#define trigger_mask
#define event_id
DWORD status
Definition odbhist.cxx:39
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 10954 of file midas.cxx.

10954 {
10955#ifdef LOCAL_ROUTINES
10956 {
10957 INT status = 0;
10958 BOOL bMore;
10959 DWORD start_time;
10960 //static DWORD last_time = 0;
10961
10962 /* if running as a server, buffer checking is done by client
10963 via ASYNC bm_receive_event */
10964 if (rpc_is_mserver()) {
10965 return FALSE;
10966 }
10967
10968 bMore = FALSE;
10969 start_time = ss_millitime();
10970
10971 std::vector<BUFFER*> mybuffers;
10972
10973 gBuffersMutex.lock();
10975 gBuffersMutex.unlock();
10976
10977 /* go through all buffers */
10978 for (size_t idx = 0; idx < mybuffers.size(); idx++) {
10980
10981 if (!pbuf || !pbuf->attached)
10982 continue;
10983
10984 //int count_loops = 0;
10985 while (1) {
10986 if (pbuf->attached) {
10987 /* one bm_push_event could cause a run stop and a buffer close, which
10988 * would crash the next call to bm_push_event(). So check for valid
10989 * buffer on each call */
10990
10991 /* this is what happens:
10992 * bm_push_buffer() may call a user callback function
10993 * user callback function may indirectly call bm_close() of this buffer,
10994 * i.e. if it stops the run,
10995 * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
10996 * here we will see pbuf->attched is false and quit this loop
10997 */
10998
10999 status = bm_push_buffer(pbuf, idx + 1);
11000
11001 if (status == BM_CORRUPTED) {
11002 return status;
11003 }
11004
11005 //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
11006
11007 if (status != BM_MORE_EVENTS) {
11008 //DWORD t = ss_millitime() - start_time;
11009 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
11010 break;
11011 }
11012
11013 // count_loops++;
11014 }
11015
11016 // NB: this code has a logic error: if 2 buffers always have data,
11017 // this timeout will cause us to exit reading the 1st buffer
11018 // after 1000 msec, then we read the 2nd buffer exactly once,
11019 // and exit the loop because the timeout is still active -
11020 // we did not reset "start_time" when we started reading
11021 // from the 2nd buffer. Result is that we always read all
11022 // the data in a loop from the 1st buffer, but read just
11023 // one event from the 2nd buffer, resulting in severe unfairness.
11024
11025 /* stop after one second */
11026 DWORD t = ss_millitime() - start_time;
11027 if (t > 1000) {
11028 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
11029 bMore = TRUE;
11030 break;
11031 }
11032 }
11033 }
11034
11035 //last_time = start_time;
11036
11037 return bMore;
11038
11039 }
11040#else /* LOCAL_ROUTINES */
11041
11042 return FALSE;
11043
11044#endif
11045}
#define FALSE
Definition cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition midas.cxx:10902
#define BM_MORE_EVENTS
Definition midas.h:620
#define BM_CORRUPTED
Definition midas.h:623
unsigned int DWORD
Definition mcstd.h:51
DWORD ss_millitime()
Definition system.cxx:3393
bool rpc_is_mserver(void)
Definition midas.cxx:12818
static std::mutex gBuffersMutex
Definition midas.cxx:195
static std::vector< BUFFER * > gBuffers
Definition midas.cxx:196
DWORD BOOL
Definition midas.h:105
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8966 of file midas.cxx.

8966 {
8967
8969 int i;
8970 for (i = 0; i < pc->max_request_index; i++) {
8971 const EVENT_REQUEST *prequest = pc->event_request + i;
8972 if (prequest->valid) {
8973 if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8974 /* check if this is a recent event */
8975 if (prequest->sampling_type == GET_RECENT) {
8976 if (ss_time() - pevent->time_stamp > 1) {
8977 /* skip that event */
8978 continue;
8979 }
8980 }
8981
8983 break;
8984 }
8985 }
8986 }
8987 return is_requested;
8988}
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition midas.cxx:6015
DWORD ss_time()
Definition system.cxx:3462
DWORD time_stamp
Definition midas.h:855
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6066 of file midas.cxx.

6066 {
6067 BUFFER_HEADER *pheader;
6069 int j;
6070
6071 pheader = pbuf->buffer_header;
6072 pbclient = pheader->client;
6073
6074 /* now check other clients */
6075 for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6076 if (pbclient->pid) {
6077 if (!ss_pid_exists(pbclient->pid)) {
6078 cm_msg(MINFO, "bm_cleanup",
6079 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6080 pheader->name, who, pbclient->pid);
6081
6082 bm_remove_client_locked(pheader, j);
6083 continue;
6084 }
6085 }
6086
6087 /* If client process has no activity, clear its buffer entry. */
6088 if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6089 DWORD tdiff = actual_time - pbclient->last_activity;
6090#if 0
6091 printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6092 pheader->name,
6093 pbclient->name,
6094 pbclient->last_activity,
6096 tdiff,
6097 tdiff,
6098 pbclient->watchdog_timeout);
6099#endif
6100 if (actual_time > pbclient->last_activity &&
6101 tdiff > pbclient->watchdog_timeout) {
6102
6103 cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6104 pbclient->name, pheader->name, who,
6105 tdiff / 1000.0,
6106 pbclient->watchdog_timeout / 1000.0);
6107
6108 bm_remove_client_locked(pheader, j);
6109 }
6110 }
6111 }
6112}
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition midas.cxx:6035
#define MINFO
Definition midas.h:560
BOOL ss_pid_exists(int pid)
Definition system.cxx:1440
DWORD actual_time
Definition mfe.cxx:37
INT j
Definition odbhist.cxx:40
char name[NAME_LENGTH]
Definition midas.h:958
INT max_client_index
Definition midas.h:960
BUFFER_CLIENT client[MAX_CLIENTS]
Definition midas.h:967
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6408 of file midas.cxx.

6408 {
6409 HNDLE hKey;
6410 int status;
6411
6412 char str[256 + 2 * NAME_LENGTH];
6413 sprintf(str, "/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6414 //printf("delete [%s]\n", str);
6415 status = db_find_key(hDB, 0, str, &hKey);
6416 if (status == DB_SUCCESS) {
6418 }
6419}
#define DB_SUCCESS
Definition midas.h:631
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
Definition odb.cxx:3856
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4079
HNDLE hKey
HNDLE hDB
main ODB handle
Definition mana.cxx:207
INT HNDLE
Definition midas.h:132
#define NAME_LENGTH
Definition midas.h:272
char str[256]
Definition odbhist.cxx:33
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7243 of file midas.cxx.

7243 {
7244 if (rpc_is_remote())
7246
7247#ifdef LOCAL_ROUTINES
7248 {
7250
7251 gBuffersMutex.lock();
7252 size_t nbuf = gBuffers.size();
7253 gBuffersMutex.unlock();
7254
7255 for (size_t i = nbuf; i > 0; i--) {
7257 }
7258
7259 gBuffersMutex.lock();
7260 for (size_t i=0; i< gBuffers.size(); i++) {
7261 BUFFER* pbuf = gBuffers[i];
7262 if (!pbuf)
7263 continue;
7264 delete pbuf;
7265 pbuf = NULL;
7266 gBuffers[i] = NULL;
7267 }
7268 gBuffersMutex.unlock();
7269 }
7270#endif /* LOCAL_ROUTINES */
7271
7272 return BM_SUCCESS;
7273}
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7096
int cm_msg_close_buffer(void)
Definition midas.cxx:487
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7096 of file midas.cxx.

7096 {
7097 //printf("bm_close_buffer: handle %d\n", buffer_handle);
7098
7099 if (rpc_is_remote())
7100 return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7101
7102#ifdef LOCAL_ROUTINES
7103 {
7104 int status = 0;
7105
7106 BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7107
7108 if (!pbuf)
7109 return status;
7110
7111 //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7112
7113 int i;
7114
7115 { /* delete all requests for this buffer */
7116 _request_list_mutex.lock();
7117 std::vector<EventRequest> request_list_copy = _request_list;
7118 _request_list_mutex.unlock();
7119 for (size_t i = 0; i < request_list_copy.size(); i++) {
7120 if (request_list_copy[i].buffer_handle == buffer_handle) {
7122 }
7123 }
7124 }
7125
7126 HNDLE hDB;
7128
7129 if (hDB) {
7130 /* write statistics to odb */
7132 }
7133
7134 /* lock buffer in correct order */
7135
7137
7138 if (status != BM_SUCCESS) {
7139 return status;
7140 }
7141
7143
7144 if (status != BM_SUCCESS) {
7145 pbuf->read_cache_mutex.unlock();
7146 return status;
7147 }
7148
7150
7151 if (!pbuf_guard.is_locked()) {
7152 pbuf->write_cache_mutex.unlock();
7153 pbuf->read_cache_mutex.unlock();
7154 return pbuf_guard.get_status();
7155 }
7156
7157 BUFFER_HEADER *pheader = pbuf->buffer_header;
7158
7159 /* mark entry in _buffer as empty */
7160 pbuf->attached = false;
7161
7163
7164 if (pclient) {
7165 /* clear entry from client structure in buffer header */
7166 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7167 }
7168
7169 /* calculate new max_client_index entry */
7170 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7171 if (pheader->client[i].pid != 0)
7172 break;
7173 pheader->max_client_index = i + 1;
7174
7175 /* count new number of clients */
7176 int j = 0;
7177 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7178 if (pheader->client[i].pid != 0)
7179 j++;
7180 pheader->num_clients = j;
7181
7182 int destroy_flag = (pheader->num_clients == 0);
7183
7184 // we hold the locks on the read cache and the write cache.
7185
7186 /* free cache */
7187 if (pbuf->read_cache_size > 0) {
7188 free(pbuf->read_cache);
7189 pbuf->read_cache = NULL;
7190 pbuf->read_cache_size = 0;
7191 pbuf->read_cache_rp = 0;
7192 pbuf->read_cache_wp = 0;
7193 }
7194
7195 if (pbuf->write_cache_size > 0) {
7196 free(pbuf->write_cache);
7197 pbuf->write_cache = NULL;
7198 pbuf->write_cache_size = 0;
7199 pbuf->write_cache_rp = 0;
7200 pbuf->write_cache_wp = 0;
7201 }
7202
7203 /* check if anyone is waiting and wake him up */
7204
7205 for (int i = 0; i < pheader->max_client_index; i++) {
7206 BUFFER_CLIENT *pclient = pheader->client + i;
7207 if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7208 ss_resume(pclient->port, "B ");
7209 }
7210
7211 /* unmap shared memory, delete it if we are the last */
7212
7213 ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7214
7215 /* after ss_shm_close() these are invalid: */
7216
7217 pheader = NULL;
7218 pbuf->buffer_header = NULL;
7219 pbuf->shm_size = 0;
7220 pbuf->shm_handle = 0;
7221
7222 /* unlock buffer in correct order */
7223
7224 pbuf_guard.unlock();
7225
7226 pbuf->write_cache_mutex.unlock();
7227 pbuf->read_cache_mutex.unlock();
7228
7229 /* delete semaphore */
7230
7232 }
7233#endif /* LOCAL_ROUTINES */
7234
7235 return BM_SUCCESS;
7236}
INT bm_delete_request(INT request_id)
Definition midas.cxx:8584
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition midas.cxx:6586
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3011
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition midas.cxx:7904
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition midas.cxx:7925
INT ss_resume(INT port, const char *message)
Definition system.cxx:4844
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition system.cxx:2869
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition system.cxx:755
#define RPC_BM_CLOSE_BUFFER
Definition mrpc.h:37
static std::vector< EventRequest > _request_list
Definition midas.cxx:220
static std::mutex _request_list_mutex
Definition midas.cxx:219
#define MAX_CLIENTS
Definition midas.h:274
INT num_clients
Definition midas.h:959
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition midas.cxx:8281
#define serial_number
#define time_stamp
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8281 of file midas.cxx.

8282{
8283 event_header->event_id = event_id;
8284 event_header->trigger_mask = trigger_mask;
8285 event_header->data_size = data_size;
8286 event_header->time_stamp = ss_time();
8287 event_header->serial_number = serial;
8288
8289 return BM_SUCCESS;
8290}
INT serial
Definition minife.c:20
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8292 of file midas.cxx.

8293{
8294 static std::mutex mutex;
8295
8296 event_header->event_id = event_id;
8297 event_header->trigger_mask = trigger_mask;
8298 event_header->data_size = data_size;
8299 event_header->time_stamp = ss_time();
8300 {
8301 std::lock_guard<std::mutex> lock(mutex);
8302 event_header->serial_number = *serial;
8303 *serial = *serial + 1;
8304 // implicit unlock
8305 }
8306
8307 return BM_SUCCESS;
8308}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9069 of file midas.cxx.

9069 {
9070 /* now convert event header */
9071 if (convert_flags) {
9072 rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9073 rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9074 rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9075 rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9076 rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9077 }
9078}
#define TID_UINT32
Definition midas.h:337
#define TID_INT16
Definition midas.h:335
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition midas.cxx:11681
#define RPC_OUTGOING
Definition midas.h:1585
short int event_id
Definition midas.h:852
DWORD data_size
Definition midas.h:856
DWORD serial_number
Definition midas.h:854
short int trigger_mask
Definition midas.h:853
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8584 of file midas.cxx.

8585{
8586 _request_list_mutex.lock();
8587
8588 if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8589 _request_list_mutex.unlock();
8590 return BM_INVALID_HANDLE;
8591 }
8592
8593 int buffer_handle = _request_list[request_id].buffer_handle;
8594
8595 _request_list[request_id].clear();
8596
8597 _request_list_mutex.unlock();
8598
8599 /* remove request entry from buffer */
8600 return bm_remove_event_request(buffer_handle, request_id);
8601}
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition midas.cxx:8518
#define BM_INVALID_HANDLE
Definition midas.h:609
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8823 of file midas.cxx.

8824{
8825 _request_list_mutex.lock();
8826 bool locked = true;
8827 size_t n = _request_list.size();
8828 /* call dispatcher */
8829 for (size_t i = 0; i < n; i++) {
8830 if (!locked) {
8831 _request_list_mutex.lock();
8832 locked = true;
8833 }
8835 if (r.buffer_handle != buffer_handle)
8836 continue;
8837 if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8838 continue;
8839 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8840 _request_list_mutex.unlock();
8841 locked = false;
8842 /* if event is fragmented, call defragmenter */
8843 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8844 bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8845 } else {
8846 r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8847 }
8848 }
8849 if (locked)
8850 _request_list_mutex.unlock();
8851}
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition midas.cxx:11284
DWORD n[4]
Definition mana.cxx:247
#define EVENTID_FRAG
Definition midas.h:907
#define EVENTID_FRAG1
Definition midas.h:906
short int event_id
Definition midas.cxx:206
INT buffer_handle
Definition midas.cxx:205
short int trigger_mask
Definition midas.cxx:207
EVENT_HANDLER * dispatcher
Definition midas.cxx:208
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11240 of file midas.cxx.

11240 {
11241 if (rpc_is_remote())
11243
11244#ifdef LOCAL_ROUTINES
11245 {
11246 std::vector<BUFFER*> mybuffers;
11247
11248 gBuffersMutex.lock();
11250 gBuffersMutex.unlock();
11251
11252 /* go through all buffers */
11253 for (BUFFER* pbuf : mybuffers) {
11254 if (!pbuf)
11255 continue;
11256 if (!pbuf->attached)
11257 continue;
11258
11259 int status = bm_skip_event(pbuf);
11260 if (status != BM_SUCCESS)
11261 return status;
11262 }
11263 }
11264#endif /* LOCAL_ROUTINES */
11265
11266 return BM_SUCCESS;
11267}
static int bm_skip_event(BUFFER *pbuf)
Definition midas.cxx:10833
#define RPC_BM_EMPTY_BUFFERS
Definition mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 8992 of file midas.cxx.

8993{
8994 BUFFER* pbuf = pbuf_guard.get_pbuf();
8995 BUFFER_HEADER* pheader = pbuf->buffer_header;
8998
8999 //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
9000
9001 /* loop over all events in the buffer */
9002
9003 while (1) {
9004 EVENT_HEADER *pevent = NULL;
9005 int event_size = 3; // poison value
9006 int total_size = 3; // poison value
9007
9008 int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
9009 if (status == BM_CORRUPTED) {
9010 return status;
9011 } else if (status != BM_SUCCESS) {
9012 /* event buffer is empty */
9013 if (timeout_msec == BM_NO_WAIT) {
9014 if (need_wakeup)
9016 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
9017 // read cache is empty
9018 return BM_ASYNC_RETURN;
9019 }
9020 return BM_SUCCESS;
9021 }
9022
9024
9025 if (status != BM_SUCCESS) {
9026 // we only come here with SS_ABORT & co
9027 return status;
9028 }
9029
9030 // make sure we wait for new event only once
9032 // go back to bm_peek_buffer_locked
9033 continue;
9034 }
9035
9036 /* loop over all requests: if this event matches a request,
9037 * copy it to the read cache */
9038
9040
9041 if (is_requested) {
9042 if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9043 /* read cache is full */
9044 if (need_wakeup)
9046 return BM_SUCCESS;
9047 }
9048
9049 bm_read_from_buffer_locked(pheader, pc->read_pointer, pbuf->read_cache + pbuf->read_cache_wp, event_size);
9050
9051 pbuf->read_cache_wp += total_size;
9052
9053 /* update statistics */
9054 pheader->num_out_events++;
9055 pbuf->count_read++;
9056 pbuf->bytes_read += event_size;
9057 }
9058
9059 /* shift read pointer */
9060
9061 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9062 pc->read_pointer = new_read_pointer;
9063
9064 need_wakeup = TRUE;
9065 }
9066 /* NOT REACHED */
9067}
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition midas.cxx:8787
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition midas.cxx:6223
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:8966
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition midas.cxx:8936
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8891
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition midas.cxx:9392
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_NO_WAIT
Definition midas.h:366
int event_size
Definition msysmon.cxx:527
INT num_out_events
Definition midas.h:965
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9589 of file midas.cxx.

9589 {
9590 if (pc->pid) {
9591 int j;
9592 for (j = 0; j < pc->max_request_index; j++) {
9593 const EVENT_REQUEST *prequest = pc->event_request + j;
9594 if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9595 return prequest->id;
9596 }
9597 }
9598 }
9599
9600 return -1;
9601}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10207 of file midas.cxx.

10208{
10209 if (rpc_is_remote()) {
10210 return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10211 }
10212
10213#ifdef LOCAL_ROUTINES
10214 {
10215 INT status = 0;
10216
10217 //printf("bm_flush_cache!\n");
10218
10219 BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10220
10221 if (!pbuf)
10222 return status;
10223
10224 if (pbuf->write_cache_size == 0)
10225 return BM_SUCCESS;
10226
10228
10229 if (status != BM_SUCCESS)
10230 return status;
10231
10232 /* check if anything needs to be flushed */
10233 if (pbuf->write_cache_wp == 0) {
10234 pbuf->write_cache_mutex.unlock();
10235 return BM_SUCCESS;
10236 }
10237
10238 /* lock the buffer */
10240
10241 if (!pbuf_guard.is_locked())
10242 return pbuf_guard.get_status();
10243
10245
10246 /* unlock in correct order */
10247
10248 if (pbuf_guard.is_locked()) {
10249 // check if bm_wait_for_free_space() failed to relock the buffer
10250 pbuf_guard.unlock();
10251 }
10252
10253 pbuf->write_cache_mutex.unlock();
10254
10255 return status;
10256 }
10257#endif /* LOCAL_ROUTINES */
10258
10259 return BM_SUCCESS;
10260}
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition midas.cxx:9987
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:10063
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10063 of file midas.cxx.

10064{
10065 // NB we come here with write cache locked and buffer locked.
10066
10067 {
10068 INT status = 0;
10069
10070 //printf("bm_flush_cache_locked!\n");
10071
10072 BUFFER* pbuf = pbuf_guard.get_pbuf();
10073 BUFFER_HEADER* pheader = pbuf->buffer_header;
10074
10075 //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10076
10077 int old_write_pointer = pheader->write_pointer;
10078
10079 int request_id[MAX_CLIENTS];
10080 for (int i = 0; i < pheader->max_client_index; i++) {
10081 request_id[i] = -1;
10082 }
10083
10084 size_t ask_rp = pbuf->write_cache_rp;
10085 size_t ask_wp = pbuf->write_cache_wp;
10086
10087 if (ask_wp == 0) { // nothing to do
10088 return BM_SUCCESS;
10089 }
10090
10091 if (ask_rp == ask_wp) { // nothing to do
10092 return BM_SUCCESS;
10093 }
10094
10095 assert(ask_rp < ask_wp);
10096
10097 size_t ask_free = ALIGN8(ask_wp - ask_rp);
10098
10099 if (ask_free == 0) { // nothing to do
10100 return BM_SUCCESS;
10101 }
10102
10103#if 0
10105 if (status != BM_SUCCESS) {
10106 printf("bm_flush_cache: corrupted 111!\n");
10107 abort();
10108 }
10109#endif
10110
10112
10113 if (status != BM_SUCCESS) {
10114 return status;
10115 }
10116
10117 // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10118 //
10119 // wait_for_free_space() will sleep with all locks released,
10120 // during this time, another thread may call bm_send_event() that will
10121 // add one or more events to the write cache and after wait_for_free_space()
10122 // returns, size of data in cache will be bigger than the amount
10123 // of free space we requested. so we need to keep track of how
10124 // much data we write to the buffer and ask for more data
10125 // if we run short. This is the reason for the big loop
10126 // around wait_for_free_space(). We ask for slightly too little free
10127 // space to make sure all this code is always used and does work. K.O.
10128
10129 if (pbuf->write_cache_wp == 0) {
10130 /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10131 return BM_SUCCESS;
10132 }
10133
10134 //size_t written = 0;
10135 while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10136 /* loop over all events in cache */
10137
10138 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10139 size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10140 size_t total_size = ALIGN8(event_size);
10141
10142#if 0
10143 printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10144 int(pbuf->write_cache_size),
10145 int(pbuf->write_cache_wp),
10146 int(pbuf->write_cache_rp),
10147 int(pevent->data_size),
10148 int(event_size),
10149 int(total_size),
10150 int(ask_free),
10151 int(written));
10152#endif
10153
10154 // check for crazy event size
10155 assert(total_size >= sizeof(EVENT_HEADER));
10156 assert(total_size <= (size_t)pheader->size);
10157
10158 bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10159
10160 /* update statistics */
10161 pheader->num_in_events++;
10162 pbuf->count_sent += 1;
10163 pbuf->bytes_sent += total_size;
10164
10165 /* see comment for the same code in bm_send_event().
10166 * We make sure the buffer is never 100% full */
10167 assert(pheader->write_pointer != pheader->read_pointer);
10168
10169 /* check if anybody has a request for this event */
10170 for (int i = 0; i < pheader->max_client_index; i++) {
10171 BUFFER_CLIENT *pc = pheader->client + i;
10172 int r = bm_find_first_request_locked(pc, pevent);
10173 if (r >= 0) {
10174 request_id[i] = r;
10175 }
10176 }
10177
10178 /* this loop does not loop forever because rp
10179 * is monotonously incremented here. write_cache_wp does
10180 * not change */
10181
10182 pbuf->write_cache_rp += total_size;
10183 //written += total_size;
10184
10185 assert(pbuf->write_cache_rp > 0);
10186 assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10187 assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10188 }
10189
10190 /* the write cache is now empty */
10191 assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10192 pbuf->write_cache_wp = 0;
10193 pbuf->write_cache_rp = 0;
10194
10195 /* check which clients are waiting */
10196 for (int i = 0; i < pheader->max_client_index; i++) {
10197 BUFFER_CLIENT *pc = pheader->client + i;
10198 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10199 }
10200 }
10201
10202 return BM_SUCCESS;
10203}
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition midas.cxx:9603
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:9589
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition midas.cxx:6307
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition midas.cxx:9504
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition midas.cxx:9080
#define ALIGN8(x)
Definition midas.h:522
INT num_in_events
Definition midas.h:964
INT write_pointer
Definition midas.h:963
INT read_pointer
Definition midas.h:962
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 9987 of file midas.cxx.

9988{
9989 //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
9990
9993
9995
9996 while (1) {
9997 if (timeout_msec == BM_WAIT) {
9998 xtimeout_msec = 1000;
9999 } else if (timeout_msec == BM_NO_WAIT) {
10001 } else {
10002 if (xtimeout_msec > 1000) {
10003 xtimeout_msec = 1000;
10004 }
10005 }
10006
10007 int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
10008
10009 //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
10010
10011 if (status == BM_ASYNC_RETURN) {
10012 if (timeout_msec == BM_WAIT) {
10013 // BM_WAIT means wait forever
10014 continue;
10015 } else if (timeout_msec == BM_NO_WAIT) {
10016 // BM_NO_WAIT means do not wait
10017 return status;
10018 } else {
10020 if (now >= time_end) {
10021 // timeout, return BM_ASYNC_RETURN
10022 return status;
10023 }
10024
10026
10027 if (remain < (DWORD)xtimeout_msec) {
10029 }
10030
10031 // keep asking for event...
10032 continue;
10033 }
10034 } else if (status == BM_SUCCESS) {
10035 // success, return BM_SUCCESS
10036 return status;
10037 } else {
10038 // error
10039 return status;
10040 }
10041 }
10042}
#define BM_WAIT
Definition midas.h:365
#define RPC_BM_FLUSH_CACHE
Definition mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7075 of file midas.cxx.

7076{
7077 gBuffersMutex.lock();
7078 for (size_t i = 0; i < gBuffers.size(); i++) {
7079 BUFFER* pbuf = gBuffers[i];
7080 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7081 *buffer_handle = i + 1;
7082 gBuffersMutex.unlock();
7083 return BM_SUCCESS;
7084 }
7085 }
7086 gBuffersMutex.unlock();
7087 return BM_NOT_FOUND;
7088}
#define BM_NOT_FOUND
Definition midas.h:612
BOOL equal_ustring(const char *str1, const char *str2)
Definition odb.cxx:3201
char buffer_name[NAME_LENGTH]
Definition mevb.cxx:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8855 of file midas.cxx.

8855 {
8856 /* increment read cache read pointer */
8857 pbuf->read_cache_rp += total_size;
8858
8859 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8860 pbuf->read_cache_rp = 0;
8861 pbuf->read_cache_wp = 0;
8862 }
8863}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6223 of file midas.cxx.

6224{
6225#if 0
6226 if (gRpLog == NULL) {
6227 gRpLog = fopen("rp.log", "a");
6228 }
6229 if (gRpLog && (total_size < 16)) {
6230 const char *pdata = (const char *) (pheader + 1);
6231 const DWORD *pevent = (const DWORD*) (pdata + rp);
6232 fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6233 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6234 }
6235#endif
6236
6237 // these checks are already done before we come here.
6238 // but we check again as last-ressort protection. K.O.
6239 assert(total_size > 0);
6240 assert(total_size >= (int)sizeof(EVENT_HEADER));
6241
6242 rp += total_size;
6243 if (rp >= pheader->size) {
6244 rp -= pheader->size;
6245 } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6246 // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6247 // if at the end of the buffer, the remaining free space is exactly
6248 // equal to the size of an event header, the event header
6249 // is written there, the pointer is wrapped and the event data
6250 // is written to the beginning of the buffer.
6251 rp = 0;
6252 }
6253 return rp;
6254}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 6015 of file midas.cxx.

6015 {
6016 // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
6017 // because of mismatch in sign-extension between signed 16-bit event_id and
6018 // unsigned 16-bit constants. K.O.
6019
6020 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
6021 /* fragmented event */
6022 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
6024
6025 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
6027}
#define TRIGGER_ALL
Definition midas.h:538
#define EVENTID_ALL
Definition midas.h:537
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char who,
const BUFFER_HEADER pheader,
const char pdata,
int  rp 
)
static

Definition at line 6256 of file midas.cxx.

6256 {
6257 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6258 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6259 int total_size = ALIGN8(event_size);
6260
6261 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6262 cm_msg(MERROR, "bm_next_rp",
6263 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6264 pheader->name,
6265 rp,
6266 pevent->data_size,
6267 event_size,
6268 total_size,
6269 pheader->read_pointer,
6270 pheader->write_pointer,
6271 pheader->size,
6272 who);
6273 return -1;
6274 }
6275
6276 int remaining = 0;
6277 if (rp < pheader->write_pointer) {
6278 remaining = pheader->write_pointer - rp;
6279 } else {
6280 remaining = pheader->size - rp;
6281 remaining += pheader->write_pointer;
6282 }
6283
6284 //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6285
6286 if (total_size > remaining) {
6287 cm_msg(MERROR, "bm_next_rp",
6288 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6289 pheader->name,
6290 rp,
6291 pevent->data_size,
6292 event_size,
6293 total_size,
6294 pheader->read_pointer,
6295 pheader->write_pointer,
6296 pheader->size,
6297 remaining,
6298 who);
6299 return -1;
6300 }
6301
6302 rp = bm_incr_rp_no_check(pheader, rp, total_size);
6303
6304 return rp;
6305}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9603 of file midas.cxx.

9603 {
9604 if (request_id >= 0) {
9605 /* if that client has a request and is suspended, wake it up */
9606 if (pc->read_wait) {
9607 char str[80];
9608 sprintf(str, "B %s %d", pheader->name, request_id);
9609 ss_resume(pc->port, str);
9610 //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9611 //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9612 pc->read_wait = FALSE;
9613 }
9614 }
9615}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6717
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8465
INT cm_yield(INT millisec)
Definition midas.cxx:5642
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition midas.cxx:2278
INT cm_disconnect_experiment(void)
Definition midas.cxx:2846
#define CM_SUCCESS
Definition midas.h:582
#define SS_ABORT
Definition midas.h:677
#define RPC_SHUTDOWN
Definition midas.h:707
int main()
Definition hwtest.cxx:23
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define EVENT_BUFFER_NAME
Definition midas.h:269
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6717 of file midas.cxx.

6717 {
6718 INT status;
6719
6720 if (rpc_is_remote()) {
6721 status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6722
6723 HNDLE hDB;
6725 if (status != SUCCESS || hDB == 0) {
6726 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6727 return BM_NO_SHM;
6728 }
6729
6731
6732 int size = sizeof(INT);
6733 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6734
6735 if (status != DB_SUCCESS) {
6736 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6737 status);
6738 return status;
6739 }
6740
6741 return status;
6742 }
6743#ifdef LOCAL_ROUTINES
6744 {
6745 HNDLE shm_handle;
6746 size_t shm_size;
6747 HNDLE hDB;
6748 const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6749
6750 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6751
6752 if (!buffer_name || !buffer_name[0]) {
6753 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6754 return BM_INVALID_PARAM;
6755 }
6756
6757 if (strlen(buffer_name) >= NAME_LENGTH) {
6758 cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6759 return BM_INVALID_PARAM;
6760 }
6761
6763
6764 if (status != SUCCESS || hDB == 0) {
6765 //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6766 return BM_NO_SHM;
6767 }
6768
6769 /* get buffer size from ODB, user parameter as default if not present in ODB */
6770 std::string odb_path;
6771 odb_path += "/Experiment/Buffer sizes/";
6772 odb_path += buffer_name;
6773
6774 int size = sizeof(INT);
6775 status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6776
6778 cm_msg(MERROR, "bm_open_buffer",
6779 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6780 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6781 return BM_INVALID_PARAM;
6782 }
6783
6785
6786 size = sizeof(INT);
6787 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6788
6789 if (status != DB_SUCCESS) {
6790 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6791 status);
6792 return status;
6793 }
6794
6795 /* check if buffer already is open */
6796 gBuffersMutex.lock();
6797 for (size_t i = 0; i < gBuffers.size(); i++) {
6798 BUFFER* pbuf = gBuffers[i];
6799 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6800 *buffer_handle = i + 1;
6801 gBuffersMutex.unlock();
6802 return BM_SUCCESS;
6803 }
6804 }
6805 gBuffersMutex.unlock();
6806
6807 // only one thread at a time should create new buffers
6808
6809 static std::mutex gNewBufferMutex;
6810 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6811
6812 // if we had a race against another thread
6813 // and while we were waiting for gNewBufferMutex
6814 // the other thread created this buffer, we return it.
6815
6816 gBuffersMutex.lock();
6817 for (size_t i = 0; i < gBuffers.size(); i++) {
6818 BUFFER* pbuf = gBuffers[i];
6819 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6820 *buffer_handle = i + 1;
6821 gBuffersMutex.unlock();
6822 return BM_SUCCESS;
6823 }
6824 }
6825 gBuffersMutex.unlock();
6826
6827 /* allocate new BUFFER object */
6828
6829 BUFFER* pbuf = new BUFFER;
6830
6831 /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6832
6833 for (int i=0; i<MAX_CLIENTS; i++) {
6835 pbuf->client_time_write_wait[i] = 0;
6836 }
6837
6838 /* create buffer semaphore */
6839
6840 status = ss_semaphore_create(buffer_name, &(pbuf->semaphore));
6841
6842 if (status != SS_CREATED && status != SS_SUCCESS) {
6843 *buffer_handle = 0;
6844 delete pbuf;
6845 return BM_NO_SEMAPHORE;
6846 }
6847
6848 std::string client_name = cm_get_client_name();
6849
6850 /* store client name */
6851 mstrlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6852
6853 /* store buffer name */
6854 mstrlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6855
6856 /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6857
6858 pbuf->attached = true; // required by bm_lock_buffer()
6859
6861
6862 if (!pbuf_guard.is_locked()) {
6863 // cannot happen, no other thread can see this pbuf
6864 abort();
6865 return BM_NO_SEMAPHORE;
6866 }
6867
6868 /* open shared memory */
6869
6870 void *p = NULL;
6871 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6872
6873 if (status != SS_SUCCESS && status != SS_CREATED) {
6874 *buffer_handle = 0;
6875 pbuf_guard.unlock();
6876 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6877 delete pbuf;
6878 return BM_NO_SHM;
6879 }
6880
6881 pbuf->buffer_header = (BUFFER_HEADER *) p;
6882
6883 BUFFER_HEADER *pheader = pbuf->buffer_header;
6884
6885 bool shm_created = (status == SS_CREATED);
6886
6887 if (shm_created) {
6888 /* initialize newly created shared memory */
6889
6890 memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6891
6892 mstrlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6893 pheader->size = buffer_size;
6894
6895 } else {
6896 /* validate existing shared memory */
6897
6898 if (!equal_ustring(pheader->name, buffer_name)) {
6899 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6900 pbuf_guard.unlock();
6901 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6902 cm_msg(MERROR, "bm_open_buffer",
6903 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6904 pheader->name);
6905 *buffer_handle = 0;
6906 delete pbuf;
6907 return BM_CORRUPTED;
6908 }
6909
6910 if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6911 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6912 pbuf_guard.unlock();
6913 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6914 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6916 *buffer_handle = 0;
6917 delete pbuf;
6918 return BM_CORRUPTED;
6919 }
6920
6921 if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6922 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6923 pbuf_guard.unlock();
6924 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6925 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6927 *buffer_handle = 0;
6928 delete pbuf;
6929 return BM_CORRUPTED;
6930 }
6931
6932 /* check if buffer size is identical */
6933 if (pheader->size != buffer_size) {
6934 cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6935 buffer_name, buffer_size, pheader->size);
6936
6937 buffer_size = pheader->size;
6938
6939 ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6940
6941 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6942
6943 if (status != SS_SUCCESS) {
6944 *buffer_handle = 0;
6945 pbuf_guard.unlock();
6946 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6947 delete pbuf;
6948 return BM_NO_SHM;
6949 }
6950
6951 pbuf->buffer_header = (BUFFER_HEADER *) p;
6952 pheader = pbuf->buffer_header;
6953 }
6954 }
6955
6956 /* shared memory is good from here down */
6957
6958 pbuf->attached = true;
6959
6960 pbuf->shm_handle = shm_handle;
6961 pbuf->shm_size = shm_size;
6962 pbuf->callback = FALSE;
6963
6964 bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6965
6967 if (status != BM_SUCCESS) {
6968 cm_msg(MERROR, "bm_open_buffer",
6969 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6970 status);
6972 cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6973 }
6974
6975 /* add our client BUFFER_HEADER */
6976
6977 int iclient = 0;
6978 for (; iclient < MAX_CLIENTS; iclient++)
6979 if (pheader->client[iclient].pid == 0)
6980 break;
6981
6982 if (iclient == MAX_CLIENTS) {
6983 *buffer_handle = 0;
6984 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6985 pbuf_guard.unlock();
6986 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6987 delete pbuf;
6988 cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
6989 return BM_NO_SLOT;
6990 }
6991
6992 /* store slot index in _buffer structure */
6993 pbuf->client_index = iclient;
6994
6995 /*
6996 Save the index of the last client of that buffer so that later only
6997 the clients 0..max_client_index-1 have to be searched through.
6998 */
6999 pheader->num_clients++;
7000 if (iclient + 1 > pheader->max_client_index)
7001 pheader->max_client_index = iclient + 1;
7002
7003 /* setup buffer header and client structure */
7004 BUFFER_CLIENT *pclient = &pheader->client[iclient];
7005
7006 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7007
7008 mstrlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
7009
7010 pclient->pid = ss_getpid();
7011
7013
7014 pclient->read_pointer = pheader->write_pointer;
7015 pclient->last_activity = ss_millitime();
7016
7017 cm_get_watchdog_params(NULL, &pclient->watchdog_timeout);
7018
7019 pbuf_guard.unlock();
7020
7021 /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
7022
7023 pheader = NULL;
7024
7025 /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
7026
7029
7030 /* add pbuf to buffer list */
7031
7032 gBuffersMutex.lock();
7033
7034 bool added = false;
7035 for (size_t i=0; i<gBuffers.size(); i++) {
7036 if (gBuffers[i] == NULL) {
7037 gBuffers[i] = pbuf;
7038 added = true;
7039 *buffer_handle = i+1;
7040 break;
7041 }
7042 }
7043 if (!added) {
7044 *buffer_handle = gBuffers.size() + 1;
7045 gBuffers.push_back(pbuf);
7046 }
7047
7048 /* from here down we should not touch pbuf without locking it */
7049
7050 pbuf = NULL;
7051
7052 gBuffersMutex.unlock();
7053
7054 /* new buffer is now ready for use */
7055
7056 /* initialize buffer counters */
7057 bm_init_buffer_counters(*buffer_handle);
7058
7059 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7060
7061 if (shm_created)
7062 return BM_CREATED;
7063 }
7064#endif /* LOCAL_ROUTINES */
7065
7066 return BM_SUCCESS;
7067}
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition midas.cxx:6066
static DWORD _bm_max_event_size
Definition midas.cxx:5914
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition midas.cxx:6408
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition midas.cxx:6391
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3317
std::string cm_get_client_name()
Definition midas.cxx:2059
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition midas.cxx:6152
#define BM_NO_SLOT
Definition midas.h:610
#define BM_NO_SHM
Definition midas.h:622
#define BM_CREATED
Definition midas.h:606
#define BM_NO_SEMAPHORE
Definition midas.h:611
#define SS_SUCCESS
Definition midas.h:663
#define SS_CREATED
Definition midas.h:664
#define SUCCESS
Definition mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition system.cxx:2460
INT ss_getpid(void)
Definition system.cxx:1377
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition system.cxx:324
midas_thread_t ss_gettid(void)
Definition system.cxx:1519
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition system.cxx:4353
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5415
#define RPC_BM_OPEN_BUFFER
Definition mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition midas.cxx:8063
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
int client_count_write_wait[MAX_CLIENTS]
Definition midas.h:1022
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8891 of file midas.cxx.

8892{
8893 if (pc->read_pointer == pheader->write_pointer) {
8894 /* no more events buffered for this client */
8895 if (!pc->read_wait) {
8896 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8897 pc->read_wait = TRUE;
8898 }
8899 return BM_ASYNC_RETURN;
8900 }
8901
8902 if (pc->read_wait) {
8903 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8904 pc->read_wait = FALSE;
8905 }
8906
8907 if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8908 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8909 return BM_CORRUPTED;
8910 }
8911
8912 char *pdata = (char *) (pheader + 1);
8913
8914 EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8915 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8916 int total_size = ALIGN8(event_size);
8917
8918 if ((total_size <= 0) || (total_size > pheader->size)) {
8919 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8920 return BM_CORRUPTED;
8921 }
8922
8923 assert(total_size > 0);
8924 assert(total_size <= pheader->size);
8925
8926 if (ppevent)
8927 *ppevent = pevent;
8928 if (pevent_size)
8930 if (ptotal_size)
8931 *ptotal_size = total_size;
8932
8933 return BM_SUCCESS;
8934}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8865 of file midas.cxx.

8866{
8867 if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8868 return FALSE;
8869
8870 EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8871 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8872 int total_size = ALIGN8(event_size);
8873
8874 if (ppevent)
8875 *ppevent = pevent;
8876 if (pevent_size)
8878 if (ptotal_size)
8879 *ptotal_size = total_size;
8880
8881 return TRUE;
8882}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11126 of file midas.cxx.

11140{
11142
11143 //printf("bm_poll_event!\n");
11144
11145 DWORD start_time = ss_millitime();
11146
11147 std::vector<char> vec;
11148
11149 /* loop over all requests */
11150 _request_list_mutex.lock();
11151 bool locked = true;
11152 size_t n = _request_list.size();
11153 for (size_t i = 0; i < n; i++) {
11154 if (!locked) {
11155 _request_list_mutex.lock();
11156 locked = true;
11157 }
11158 /* continue if no dispatcher set (manual bm_receive_event) */
11159 if (_request_list[i].dispatcher == NULL)
11160 continue;
11161
11162 int buffer_handle = _request_list[i].buffer_handle;
11163
11164 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11165 _request_list_mutex.unlock();
11166 locked = false;
11167
11168 do {
11169 /* receive event */
11170 int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11171
11172 //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11173
11174 /* call user function if successful */
11175 if (status == BM_SUCCESS) {
11176 bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11178 }
11179
11180 /* break if no more events */
11181 if (status == BM_ASYNC_RETURN)
11182 break;
11183
11184 /* break if corrupted event buffer */
11185 if (status == BM_TRUNCATED) {
11186 cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11187 }
11188
11189 /* break if corrupted event buffer */
11190 if (status == BM_CORRUPTED)
11191 return SS_ABORT;
11192
11193 /* break if server died */
11194 if (status == RPC_NET_ERROR) {
11195 return SS_ABORT;
11196 }
11197
11198 /* stop after one second */
11199 if (ss_millitime() - start_time > 1000) {
11200 break;
11201 }
11202
11203 } while (TRUE);
11204 }
11205
11206 if (locked)
11207 _request_list_mutex.unlock();
11208
11210 return BM_SUCCESS;
11211 else
11212 return BM_ASYNC_RETURN;
11213}
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10809
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition midas.cxx:8823
#define BM_TRUNCATED
Definition midas.h:614
#define RPC_NET_ERROR
Definition midas.h:701
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 10902 of file midas.cxx.

10902 {
10903 //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
10904
10905 /* return immediately if no callback routine is defined */
10906 if (!pbuf->callback)
10907 return BM_SUCCESS;
10908
10909 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
10910}
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition midas.cxx:10264
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10264 of file midas.cxx.

10264 {
10266
10267 int max_size = 0;
10268 if (buf_size) {
10269 max_size = *buf_size;
10270 *buf_size = 0;
10271 }
10272
10273 //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10274
10275 bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10276
10277 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10278
10279 /* look if there is anything in the cache */
10280 if (pbuf->read_cache_size > 0) {
10281
10283
10284 if (status != BM_SUCCESS)
10285 return status;
10286
10287 if (pbuf->read_cache_wp == 0) {
10288
10289 // lock buffer for the first time
10290
10291 if (!pbuf_guard.relock()) {
10292 pbuf->read_cache_mutex.unlock();
10293 return pbuf_guard.get_status();
10294 }
10295
10297 if (status != BM_SUCCESS) {
10298 // unlock in correct order
10299 if (pbuf_guard.is_locked()) {
10300 // check if bm_wait_for_more_events() failed to relock the buffer
10301 pbuf_guard.unlock();
10302 }
10303 pbuf->read_cache_mutex.unlock();
10304 return status;
10305 }
10306
10307 // buffer remains locked here
10308 }
10309 EVENT_HEADER *pevent;
10310 int event_size;
10311 int total_size;
10312 if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10313 if (pbuf_guard.is_locked()) {
10314 // do not need to keep the event buffer locked
10315 // when reading from the read cache
10316 pbuf_guard.unlock();
10317 }
10318 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10320 if (buf) {
10321 if (event_size > max_size) {
10322 cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10325 }
10326
10327 memcpy(buf, pevent, event_size);
10328
10329 if (buf_size) {
10330 *buf_size = event_size;
10331 }
10332 if (convert_flags) {
10333 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10334 }
10335 } else if (bufptr) {
10337 memcpy(*bufptr, pevent, event_size);
10339 } else if (vecptr) {
10340 vecptr->resize(0);
10341 char* cptr = (char*)pevent;
10342 vecptr->assign(cptr, cptr+event_size);
10343 }
10344 bm_incr_read_cache_locked(pbuf, total_size);
10345 pbuf->read_cache_mutex.unlock();
10346 if (dispatch) {
10347 // FIXME need to protect currently dispatched event against
10348 // another thread overwriting it by refilling the read cache
10349 bm_dispatch_event(buffer_handle, pevent);
10350 return BM_MORE_EVENTS;
10351 }
10352 // buffer is unlocked here
10353 return status;
10354 }
10355 pbuf->read_cache_mutex.unlock();
10356 }
10357
10358 /* we come here if the read cache is disabled */
10359 /* we come here if the next event is too big to fit into the read cache */
10360
10361 if (!pbuf_guard.is_locked()) {
10362 if (!pbuf_guard.relock())
10363 return pbuf_guard.get_status();
10364 }
10365
10367
10368 BUFFER_HEADER *pheader = pbuf->buffer_header;
10369
10371
10372 while (1) {
10373 /* loop over events in the event buffer */
10374
10376
10377 if (status != BM_SUCCESS) {
10378 // implicit unlock
10379 return status;
10380 }
10381
10382 /* check if event at current read pointer matches a request */
10383
10384 EVENT_HEADER *pevent;
10385 int event_size;
10386 int total_size;
10387
10388 status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10389 if (status == BM_CORRUPTED) {
10390 // implicit unlock
10391 return status;
10392 } else if (status != BM_SUCCESS) {
10393 /* event buffer is empty */
10394 break;
10395 }
10396
10398
10399 if (is_requested) {
10400 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10401
10403
10404 if (buf) {
10405 if (event_size > max_size) {
10406 cm_msg(MERROR, "bm_read_buffer",
10407 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10408 event_size, pheader->name);
10411 }
10412
10413 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10414
10415 if (buf_size) {
10416 *buf_size = event_size;
10417 }
10418
10419 if (convert_flags) {
10420 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10421 }
10422
10423 pbuf->count_read++;
10424 pbuf->bytes_read += event_size;
10425 } else if (dispatch || bufptr) {
10426 assert(event_buffer == NULL); // make sure we only come here once
10428 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) event_buffer, event_size);
10429 pbuf->count_read++;
10430 pbuf->bytes_read += event_size;
10431 } else if (vecptr) {
10432 bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10433 pbuf->count_read++;
10434 pbuf->bytes_read += event_size;
10435 }
10436
10437 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10438 pc->read_pointer = new_read_pointer;
10439
10440 pheader->num_out_events++;
10441 /* exit loop over events */
10442 break;
10443 }
10444
10445 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10446 pc->read_pointer = new_read_pointer;
10447 pheader->num_out_events++;
10448 }
10449
10450 /*
10451 If read pointer has been changed, it may have freed up some space
10452 for waiting producers. So check if free space is now more than 50%
10453 of the buffer size and wake waiting producers.
10454 */
10455
10457
10458 pbuf_guard.unlock();
10459
10460 if (dispatch && event_buffer) {
10461 bm_dispatch_event(buffer_handle, event_buffer);
10462 free(event_buffer);
10464 return BM_MORE_EVENTS;
10465 }
10466
10467 if (bufptr && event_buffer) {
10471 }
10472
10473 if (event_buffer) {
10474 free(event_buffer);
10476 }
10477
10478 return status;
10479}
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition midas.cxx:9069
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:8992
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8865
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition midas.cxx:8855
void * event_buffer
Definition mfe.cxx:65
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char buf,
int  event_size 
)
static

Definition at line 8936 of file midas.cxx.

8937{
8938 const char *pdata = (const char *) (pheader + 1);
8939
8940 if (rp + event_size <= pheader->size) {
8941 /* copy event to cache */
8942 memcpy(buf, pdata + rp, event_size);
8943 } else {
8944 /* event is splitted */
8945 int size = pheader->size - rp;
8946 memcpy(buf, pdata + rp, size);
8947 memcpy(buf + size, pdata, event_size - size);
8948 }
8949}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8951 of file midas.cxx.

8952{
8953 const char *pdata = (const char *) (pheader + 1);
8954
8955 if (rp + event_size <= pheader->size) {
8956 /* copy event to cache */
8957 vecptr->assign(pdata + rp, pdata + rp + event_size);
8958 } else {
8959 /* event is splitted */
8960 int size = pheader->size - rp;
8961 vecptr->assign(pdata + rp, pdata + rp + size);
8962 vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8963 }
8964}
Here is the call graph for this function:

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition midas.cxx:10650
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10650 of file midas.cxx.

10650 {
10651 //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10652 if (rpc_is_remote()) {
10653 return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10654 }
10655#ifdef LOCAL_ROUTINES
10656 {
10658
10659 BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10660
10661 if (!pbuf)
10662 return status;
10663
10664 int convert_flags = rpc_get_convert_flags();
10665
10666 status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10667 //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10668 return status;
10669 }
10670#else /* LOCAL_ROUTINES */
10671
10672 return BM_SUCCESS;
10673#endif
10674}
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10483
INT rpc_get_convert_flags(void)
Definition midas.cxx:13030
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10731 of file midas.cxx.

10731 {
10732 if (rpc_is_remote()) {
10733 return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10734 }
10735#ifdef LOCAL_ROUTINES
10736 {
10738
10739 BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10740
10741 if (!pbuf)
10742 return status;
10743
10744 int convert_flags = rpc_get_convert_flags();
10745
10746 return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10747 }
10748#else /* LOCAL_ROUTINES */
10749
10750 return BM_SUCCESS;
10751#endif
10752}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void buf,
int buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10483 of file midas.cxx.

10484{
10485 //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10486
10487 assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10488
10489 void *xbuf = NULL;
10490 int xbuf_size = 0;
10491
10492 if (buf) {
10493 xbuf = buf;
10494 xbuf_size = *buf_size;
10495 } else if (ppevent) {
10498 } else if (pvec) {
10499 pvec->resize(_bm_max_event_size);
10500 xbuf = pvec->data();
10501 xbuf_size = pvec->size();
10502 } else {
10503 assert(!"incorrect call to bm_receivent_event_rpc()");
10504 }
10505
10506 int status;
10509
10511
10512 int zbuf_size = xbuf_size;
10513
10514 while (1) {
10515 if (timeout_msec == BM_WAIT) {
10516 xtimeout_msec = 1000;
10517 } else if (timeout_msec == BM_NO_WAIT) {
10519 } else {
10520 if (xtimeout_msec > 1000) {
10521 xtimeout_msec = 1000;
10522 }
10523 }
10524
10526
10528
10529 //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10530
10531 if (status == BM_ASYNC_RETURN) {
10532 if (timeout_msec == BM_WAIT) {
10533 // BM_WAIT means wait forever
10534 continue;
10535 } else if (timeout_msec == BM_NO_WAIT) {
10536 // BM_NO_WAIT means do not wait
10537 break;
10538 } else {
10540 if (now >= time_end) {
10541 // timeout, return BM_ASYNC_RETURN
10542 break;
10543 }
10544
10546
10547 if (remain < (DWORD)xtimeout_msec) {
10549 }
10550
10551 // keep asking for event...
10552 continue;
10553 }
10554 } else if (status == BM_SUCCESS) {
10555 // success, return BM_SUCCESS
10556 break;
10557 }
10558
10559 // RPC error
10560
10561 if (buf) {
10562 *buf_size = 0;
10563 } else if (ppevent) {
10564 free(*ppevent);
10565 *ppevent = NULL;
10566 } else if (pvec) {
10567 pvec->resize(0);
10568 } else {
10569 assert(!"incorrect call to bm_receivent_event_rpc()");
10570 }
10571
10572 return status;
10573 }
10574
10575 // status is BM_SUCCESS or BM_ASYNC_RETURN
10576
10577 if (buf) {
10578 *buf_size = zbuf_size;
10579 } else if (ppevent) {
10580 // nothing to do
10581 // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10582 } else if (pvec) {
10583 pvec->resize(zbuf_size);
10584 } else {
10585 assert(!"incorrect call to bm_receivent_event_rpc()");
10586 }
10587
10588 return status;
10589}
#define RPC_BM_RECEIVE_EVENT
Definition mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10809 of file midas.cxx.

10809 {
10810 if (rpc_is_remote()) {
10811 return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10812 }
10813#ifdef LOCAL_ROUTINES
10814 {
10816
10817 BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10818
10819 if (!pbuf)
10820 return status;
10821
10822 int convert_flags = rpc_get_convert_flags();
10823
10824 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10825 }
10826#else /* LOCAL_ROUTINES */
10827 return BM_SUCCESS;
10828#endif
10829}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 6035 of file midas.cxx.

6035 {
6036 int k, nc;
6038
6039 /* clear entry from client structure in buffer header */
6040 memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6041
6042 /* calculate new max_client_index entry */
6043 for (k = MAX_CLIENTS - 1; k >= 0; k--)
6044 if (pheader->client[k].pid != 0)
6045 break;
6046 pheader->max_client_index = k + 1;
6047
6048 /* count new number of clients */
6049 for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6050 if (pheader->client[k].pid != 0)
6051 nc++;
6052 pheader->num_clients = nc;
6053
6054 /* check if anyone is waiting and wake him up */
6055 pbctmp = pheader->client;
6056
6057 for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6058 if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6059 ss_resume(pbctmp->port, "B ");
6060}
INT k
Definition odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8518 of file midas.cxx.

8518 {
8519 if (rpc_is_remote())
8520 return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8521
8522#ifdef LOCAL_ROUTINES
8523 {
8524 int status = 0;
8525
8526 BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8527
8528 if (!pbuf)
8529 return status;
8530
8531 /* lock buffer */
8533
8534 if (!pbuf_guard.is_locked())
8535 return pbuf_guard.get_status();
8536
8537 INT i, deleted;
8538
8539 /* get a pointer to the proper client structure */
8541
8542 /* check all requests and set to zero if matching */
8543 for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8544 if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8545 memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8546 deleted++;
8547 }
8548
8549 /* calculate new max_request_index entry */
8550 for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8551 if (pclient->event_request[i].valid)
8552 break;
8553
8554 pclient->max_request_index = i + 1;
8555
8556 /* calculate new all_flag */
8557 pclient->all_flag = FALSE;
8558
8559 for (i = 0; i < pclient->max_request_index; i++)
8560 if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8561 pclient->all_flag = TRUE;
8562 break;
8563 }
8564
8565 pbuf->get_all_flag = pclient->all_flag;
8566
8567 if (!deleted)
8568 return BM_NOT_FOUND;
8569 }
8570#endif /* LOCAL_ROUTINES */
8571
8572 return BM_SUCCESS;
8573}
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8465 of file midas.cxx.

8469{
8470 assert(request_id != NULL);
8471
8472 EventRequest r;
8473 r.buffer_handle = buffer_handle;
8474 r.event_id = event_id;
8476 r.dispatcher = func;
8477
8478 {
8479 std::lock_guard<std::mutex> guard(_request_list_mutex);
8480
8481 bool found = false;
8482
8483 // find deleted entry
8484 for (size_t i = 0; i < _request_list.size(); i++) {
8485 if (_request_list[i].buffer_handle == 0) {
8486 _request_list[i] = r;
8487 *request_id = i;
8488 found = true;
8489 break;
8490 }
8491 }
8492
8493 if (!found) { // not found
8494 *request_id = _request_list.size();
8495 _request_list.push_back(r);
8496 }
8497
8498 // implicit unlock()
8499 }
8500
8501 /* add request in buffer structure */
8502 int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8503 if (status != BM_SUCCESS)
8504 return status;
8505
8506 return BM_SUCCESS;
8507}
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition midas.cxx:8314
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6391 of file midas.cxx.

6391 {
6392 BUFFER_HEADER *pheader = pbuf->buffer_header;
6393
6394 //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6395
6396 pheader->read_pointer = 0;
6397 pheader->write_pointer = 0;
6398
6399 int i;
6400 for (i = 0; i < pheader->max_client_index; i++) {
6401 BUFFER_CLIENT *pc = pheader->client + i;
6402 if (pc->pid) {
6403 pc->read_pointer = 0;
6404 }
6405 }
6406}
INT read_pointer
Definition midas.h:940
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9678 of file midas.cxx.

9679{
9680 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9681 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9682
9683 if (data_size == 0) {
9684 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9685 return BM_INVALID_SIZE;
9686 }
9687
9688 if (data_size > MAX_DATA_SIZE) {
9689 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9690 return BM_INVALID_SIZE;
9691 }
9692
9693 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9694
9695 //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9696
9697 if (rpc_is_remote()) {
9698 //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9699 return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9700 } else {
9701 return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9702 }
9703}
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9778
#define BM_INVALID_SIZE
Definition midas.h:624
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition midas.cxx:13925
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition midas.cxx:9678
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9778 of file midas.cxx.

9779{
9780 if (rpc_is_remote())
9781 return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9782
9783 if (sg_n < 1) {
9784 cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9785 return BM_INVALID_SIZE;
9786 }
9787
9788 if (sg_ptr[0] == NULL) {
9789 cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9790 return BM_INVALID_SIZE;
9791 }
9792
9793 if (sg_len[0] < sizeof(EVENT_HEADER)) {
9794 cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9795 return BM_INVALID_SIZE;
9796 }
9797
9798 const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9799
9800 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9801 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9802
9803 if (data_size == 0) {
9804 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9805 return BM_INVALID_SIZE;
9806 }
9807
9808 if (data_size > MAX_DATA_SIZE) {
9809 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9810 return BM_INVALID_SIZE;
9811 }
9812
9813 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9814
9815 size_t count = 0;
9816 for (int i=0; i<sg_n; i++) {
9817 count += sg_len[i];
9818 }
9819
9820 if (count != event_size) {
9821 cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9822 return BM_INVALID_SIZE;
9823 }
9824
9825 //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9826
9827#ifdef LOCAL_ROUTINES
9828 {
9829 int status = 0;
9830 const size_t total_size = ALIGN8(event_size);
9831
9832 BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9833
9834 if (!pbuf)
9835 return status;
9836
9837 /* round up total_size to next DWORD boundary */
9838 //int total_size = ALIGN8(event_size);
9839
9840 /* check if write cache is enabled */
9841 if (pbuf->write_cache_size) {
9843
9844 if (status != BM_SUCCESS)
9845 return status;
9846
9847 /* check if write cache is enabled */
9848 if (pbuf->write_cache_size) {
9849 size_t max_event_size = pbuf->write_cache_size/MAX_WRITE_CACHE_EVENT_SIZE_DIV;
9851
9852 //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9853
9854 /* if this event does not fit into the write cache, flush the write cache */
9855 if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9856 //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9857
9859
9860 if (!pbuf_guard.is_locked()) {
9861 pbuf->write_cache_mutex.unlock();
9862 return pbuf_guard.get_status();
9863 }
9864
9866
9867 if (pbuf_guard.is_locked()) {
9868 // check if bm_wait_for_free_space() failed to relock the buffer
9869 pbuf_guard.unlock();
9870 }
9871
9872 if (status != BM_SUCCESS) {
9873 pbuf->write_cache_mutex.unlock();
9874 // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9875 if (status == BM_NO_MEMORY)
9876 cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9877 return status;
9878 }
9879
9880 // write cache must be empty here
9881 assert(pbuf->write_cache_wp == 0);
9882 }
9883
9884 /* write this event into the write cache, if it is not too big and if it fits */
9885 if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9886 //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9887
9888 char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9889
9890 for (int i=0; i<sg_n; i++) {
9891 memcpy(wptr, sg_ptr[i], sg_len[i]);
9892 wptr += sg_len[i];
9893 }
9894
9895 pbuf->write_cache_wp += total_size;
9896
9897 pbuf->write_cache_mutex.unlock();
9898 return BM_SUCCESS;
9899 }
9900 }
9901
9902 /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9903 pbuf->write_cache_mutex.unlock();
9904 }
9905
9906 /* we come here only for events that are too big to fit into the cache */
9907
9908 /* lock the buffer */
9910
9911 if (!pbuf_guard.is_locked()) {
9912 return pbuf_guard.get_status();
9913 }
9914
9915 /* calculate some shorthands */
9916 BUFFER_HEADER *pheader = pbuf->buffer_header;
9917
9918#if 0
9920 if (status != BM_SUCCESS) {
9921 printf("bm_send_event: corrupted 111!\n");
9922 abort();
9923 }
9924#endif
9925
9926 /* check if buffer is large enough */
9927 if (total_size >= (size_t)pheader->size) {
9928 pbuf_guard.unlock(); // unlock before cm_msg()
9929 cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9930 return BM_NO_MEMORY;
9931 }
9932
9934
9935 if (status != BM_SUCCESS) {
9936 // implicit unlock
9937 return status;
9938 }
9939
9940#if 0
9942 if (status != BM_SUCCESS) {
9943 printf("bm_send_event: corrupted 222!\n");
9944 abort();
9945 }
9946#endif
9947
9948 int old_write_pointer = pheader->write_pointer;
9949
9950 bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9951
9952 /* write pointer was incremented, but there should
9953 * always be some free space in the buffer and the
9954 * write pointer should never cacth up to the read pointer:
9955 * the rest of the code gets confused this happens (buffer 100% full)
9956 * as it is write_pointer == read_pointer can be either
9957 * 100% full or 100% empty. My solution: never fill
9958 * the buffer to 100% */
9959 assert(pheader->write_pointer != pheader->read_pointer);
9960
9961 /* send wake up messages to all clients that want this event */
9962 int i;
9963 for (i = 0; i < pheader->max_client_index; i++) {
9964 BUFFER_CLIENT *pc = pheader->client + i;
9965 int request_id = bm_find_first_request_locked(pc, pevent);
9966 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9967 }
9968
9969#if 0
9971 if (status != BM_SUCCESS) {
9972 printf("bm_send_event: corrupted 333!\n");
9973 abort();
9974 }
9975#endif
9976
9977 /* update statistics */
9978 pheader->num_in_events++;
9979 pbuf->count_sent += 1;
9980 pbuf->bytes_sent += total_size;
9981 }
9982#endif /* LOCAL_ROUTINES */
9983
9984 return BM_SUCCESS;
9985}
double count
Definition mdump.cxx:33
INT max_event_size
Definition mfed.cxx:30
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition midas.h:259
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9705 of file midas.cxx.

9706{
9707 const char* cptr = event.data();
9708 size_t clen = event.size();
9709 return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9710}
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char > > &  event,
int  timeout_msec 
)

Definition at line 9712 of file midas.cxx.

9713{
9714 int sg_n = event.size();
9715 const char* sg_ptr[sg_n];
9716 size_t sg_len[sg_n];
9717 for (int i=0; i<sg_n; i++) {
9718 sg_ptr[i] = event[i].data();
9719 sg_len[i] = event[i].size();
9720 }
9721 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9722}
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8140 of file midas.cxx.

8142{
8143 if (rpc_is_remote())
8144 return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8145
8146#ifdef LOCAL_ROUTINES
8147 {
8148 int status = 0;
8149
8150 BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8151
8152 if (!pbuf)
8153 return status;
8154
8155 /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8156
8158
8159 if (status != BM_SUCCESS)
8160 return status;
8161
8162 if (write_size < 0)
8163 write_size = 0;
8164
8165 if (write_size > 0) {
8167 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8169 }
8170 }
8171
8172 size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8173
8174 if (write_size > max_write_size) {
8176 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8178 }
8179
8180 pbuf->buffer_mutex.unlock();
8181
8182 /* resize read cache */
8183
8185
8186 if (status != BM_SUCCESS) {
8187 return status;
8188 }
8189
8190 if (pbuf->read_cache_size > 0) {
8191 free(pbuf->read_cache);
8192 pbuf->read_cache = NULL;
8193 }
8194
8195 if (read_size > 0) {
8196 pbuf->read_cache = (char *) malloc(read_size);
8197 if (pbuf->read_cache == NULL) {
8198 pbuf->read_cache_size = 0;
8199 pbuf->read_cache_rp = 0;
8200 pbuf->read_cache_wp = 0;
8201 pbuf->read_cache_mutex.unlock();
8202 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8203 return BM_NO_MEMORY;
8204 }
8205 }
8206
8207 pbuf->read_cache_size = read_size;
8208 pbuf->read_cache_rp = 0;
8209 pbuf->read_cache_wp = 0;
8210
8211 pbuf->read_cache_mutex.unlock();
8212
8213 /* resize the write cache */
8214
8216
8217 if (status != BM_SUCCESS)
8218 return status;
8219
8220 // FIXME: should flush the write cache!
8221 if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8222 cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8223 }
8224
8225 /* manage write cache */
8226 if (pbuf->write_cache_size > 0) {
8227 free(pbuf->write_cache);
8228 pbuf->write_cache = NULL;
8229 }
8230
8231 if (write_size > 0) {
8232 pbuf->write_cache = (char *) M_MALLOC(write_size);
8233 if (pbuf->write_cache == NULL) {
8234 pbuf->write_cache_size = 0;
8235 pbuf->write_cache_rp = 0;
8236 pbuf->write_cache_wp = 0;
8237 pbuf->write_cache_mutex.unlock();
8238 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8239 return BM_NO_MEMORY;
8240 }
8241 }
8242
8243 pbuf->write_cache_size = write_size;
8244 pbuf->write_cache_rp = 0;
8245 pbuf->write_cache_wp = 0;
8246
8247 pbuf->write_cache_mutex.unlock();
8248 }
8249#endif /* LOCAL_ROUTINES */
8250
8251 return BM_SUCCESS;
8252}
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition midas.cxx:7946
#define RPC_BM_SET_CACHE_SIZE
Definition mrpc.h:42
#define M_MALLOC(x)
Definition midas.h:1552
#define MIN_WRITE_CACHE_SIZE
Definition midas.h:257
#define MAX_WRITE_CACHE_SIZE_DIV
Definition midas.h:258
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10833 of file midas.cxx.

10834{
10835 /* clear read cache */
10836 if (pbuf->read_cache_size > 0) {
10837
10839
10840 if (status != BM_SUCCESS)
10841 return status;
10842
10843 pbuf->read_cache_rp = 0;
10844 pbuf->read_cache_wp = 0;
10845
10846 pbuf->read_cache_mutex.unlock();
10847 }
10848
10850
10851 if (!pbuf_guard.is_locked())
10852 return pbuf_guard.get_status();
10853
10854 BUFFER_HEADER *pheader = pbuf->buffer_header;
10855
10856 /* forward read pointer to global write pointer */
10858 pclient->read_pointer = pheader->write_pointer;
10859
10860 return BM_SUCCESS;
10861}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 10874 of file midas.cxx.

10874 {
10875 if (rpc_is_remote())
10876 return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
10877
10878#ifdef LOCAL_ROUTINES
10879 {
10880 int status = 0;
10881
10882 BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
10883
10884 if (!pbuf)
10885 return status;
10886
10887 return bm_skip_event(pbuf);
10888 }
10889#endif
10890
10891 return BM_SUCCESS;
10892}
#define RPC_BM_SKIP_EVENT
Definition mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6117 of file midas.cxx.

6117 {
6118 int pid = ss_getpid();
6119
6120 std::vector<BUFFER*> mybuffers;
6121
6122 gBuffersMutex.lock();
6124 gBuffersMutex.unlock();
6125
6126 for (BUFFER* pbuf : mybuffers) {
6127 if (!pbuf)
6128 continue;
6129 if (pbuf->attached) {
6130
6132
6133 if (!pbuf_guard.is_locked())
6134 continue;
6135
6136 BUFFER_HEADER *pheader = pbuf->buffer_header;
6137 for (int j = 0; j < pheader->max_client_index; j++) {
6138 BUFFER_CLIENT *pclient = pheader->client + j;
6139 if (pclient->pid == pid) {
6141 }
6142 }
6143 }
6144 }
6145}
DWORD last_activity
Definition midas.h:950
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8720 of file midas.cxx.

8720 {
8721 assert(caller_name);
8722
8723 /* calculate global read pointer as "minimum" of client read pointers */
8724 int min_rp = pheader->write_pointer;
8725
8726 int i;
8727 for (i = 0; i < pheader->max_client_index; i++) {
8728 BUFFER_CLIENT *pc = pheader->client + i;
8729 if (pc->pid) {
8731
8732#if 0
8733 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8734 pheader->name,
8735 pheader->read_pointer,
8736 pheader->write_pointer,
8737 pheader->size,
8738 min_rp,
8739 pc->name,
8740 pc->read_pointer);
8741#endif
8742
8743 if (pheader->read_pointer <= pheader->write_pointer) {
8744 // normal pointers
8745 if (pc->read_pointer < min_rp)
8746 min_rp = pc->read_pointer;
8747 } else {
8748 // inverted pointers
8749 if (pc->read_pointer <= pheader->write_pointer) {
8750 // clients 3 and 4
8751 if (pc->read_pointer < min_rp)
8752 min_rp = pc->read_pointer;
8753 } else {
8754 // clients 1 and 2
8755 int xptr = pc->read_pointer - pheader->size;
8756 if (xptr < min_rp)
8757 min_rp = xptr;
8758 }
8759 }
8760 }
8761 }
8762
8763 if (min_rp < 0)
8764 min_rp += pheader->size;
8765
8766 assert(min_rp >= 0);
8767 assert(min_rp < pheader->size);
8768
8769 if (min_rp == pheader->read_pointer) {
8770 return FALSE;
8771 }
8772
8773#if 0
8774 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8775 pheader->name,
8776 pheader->read_pointer,
8777 pheader->write_pointer,
8778 pheader->size,
8779 min_rp);
8780#endif
8781
8782 pheader->read_pointer = min_rp;
8783
8784 return TRUE;
8785}
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition midas.cxx:8622
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6307 of file midas.cxx.

6307 {
6308 const BUFFER_HEADER *pheader = pbuf->buffer_header;
6309 const char *pdata = (const char *) (pheader + 1);
6310
6311 //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6312
6313 //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6314
6315 //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6316
6317 if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6318 cm_msg(MERROR, "bm_validate_buffer",
6319 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6320 pheader->read_pointer, pheader->size, pheader->write_pointer);
6321 return BM_CORRUPTED;
6322 }
6323
6324 if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6325 cm_msg(MERROR, "bm_validate_buffer",
6326 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6327 pheader->write_pointer, pheader->size, pheader->read_pointer);
6328 return BM_CORRUPTED;
6329 }
6330
6331 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6332 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6333 pheader->read_pointer);
6334 return BM_CORRUPTED;
6335 }
6336
6337 int rp = pheader->read_pointer;
6338 int rp0 = -1;
6339 while (rp != pheader->write_pointer) {
6340 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6341 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6342 pheader->name, rp, rp0);
6343 return BM_CORRUPTED;
6344 }
6345 //bm_print_event(pdata, rp);
6346 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6347 if (rp1 < 0) {
6348 cm_msg(MERROR, "bm_validate_buffer",
6349 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6350 return BM_CORRUPTED;
6351 }
6352 rp0 = rp;
6353 rp = rp1;
6354 }
6355
6356 int i;
6357 for (i = 0; i < MAX_CLIENTS; i++) {
6358 const BUFFER_CLIENT *c = &pheader->client[i];
6359 if (c->pid == 0)
6360 continue;
6361 BOOL get_all = FALSE;
6362 int j;
6363 for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6364 const EVENT_REQUEST *r = &c->event_request[j];
6365 if (!r->valid)
6366 continue;
6368 get_all = (get_all || xget_all);
6369 //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6370 }
6371
6372 int rp = c->read_pointer;
6373 int rp0 = -1;
6374 while (rp != pheader->write_pointer) {
6375 //bm_print_event(pdata, rp);
6376 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6377 if (rp1 < 0) {
6378 cm_msg(MERROR, "bm_validate_buffer",
6379 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6380 pheader->name, c->name, c->read_pointer, rp, rp0);
6381 return BM_CORRUPTED;
6382 }
6383 rp0 = rp;
6384 rp = rp1;
6385 }
6386 }
6387
6388 return BM_SUCCESS;
6389}
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition midas.cxx:6189
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition midas.cxx:6256
INT sampling_type
Definition midas.h:931
BOOL valid
Definition midas.h:928
char c
Definition system.cxx:1310
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_index_locked()

static int bm_validate_client_index_locked ( bm_lock_buffer_guard pbuf_guard)
static

Definition at line 5922 of file midas.cxx.

5923{
5924 const BUFFER *pbuf = pbuf_guard.get_pbuf();
5925
5926 bool badindex = false;
5927 bool badclient = false;
5928
5929 int idx = pbuf->client_index;
5930
5931 if (idx < 0) {
5932 badindex = true;
5933 } else if (idx > pbuf->buffer_header->max_client_index) {
5934 badindex = true;
5935 } else {
5936 BUFFER_CLIENT *pclient = &pbuf->buffer_header->client[idx];
5937 if (pclient->name[0] == 0)
5938 badclient = true;
5939 else if (pclient->pid != ss_getpid())
5940 badclient = true;
5941
5942 //if (strcmp(pclient->name,"mdump")==0) {
5943 // for (int i=0; i<15; i++) {
5944 // printf("sleep %d\n", i);
5945 // ::sleep(1);
5946 // }
5947 //}
5948 }
5949
5950#if 0
5951 if (badindex) {
5952 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5953 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5954 badindex, ss_getpid());
5955 } else if (badclient) {
5956 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5957 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5958 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5959 ss_getpid(), badclient);
5960 } else {
5961 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5962 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5963 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5964 ss_getpid());
5965 }
5966#endif
5967
5968 if (badindex || badclient) {
5969 static int prevent_recursion = 1;
5970
5971 if (prevent_recursion) {
5973
5974 if (badindex) {
5975 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5976 } else {
5977 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5978 }
5979
5980 cm_msg(MERROR, "bm_validate_client_index", "Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5981 }
5982
5983 if (badindex) {
5984 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5985 } else {
5986 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5987 }
5988
5989 fprintf(stderr, "bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
5990
5991 pbuf_guard.unlock();
5992
5993 abort();
5994 }
5995
5996 return idx;
5997}
INT client_index
Definition midas.h:989
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8622 of file midas.cxx.

8622 {
8623 assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8624 assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8625
8626 if (pheader->read_pointer <= pheader->write_pointer) {
8627
8628 if (pclient->read_pointer < pheader->read_pointer) {
8629 cm_msg(MINFO, "bm_validate_client_pointers",
8630 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8631 pclient->name,
8632 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8633
8634 pclient->read_pointer = pheader->read_pointer;
8635 }
8636
8637 if (pclient->read_pointer > pheader->write_pointer) {
8638 cm_msg(MINFO, "bm_validate_client_pointers",
8639 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8640 pclient->name,
8641 pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8642
8643 pclient->read_pointer = pheader->write_pointer;
8644 }
8645
8646 } else {
8647
8648 if (pclient->read_pointer < 0) {
8649 cm_msg(MINFO, "bm_validate_client_pointers",
8650 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8651 pclient->name,
8652 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8653
8654 pclient->read_pointer = pheader->read_pointer;
8655 }
8656
8657 if (pclient->read_pointer >= pheader->size) {
8658 cm_msg(MINFO, "bm_validate_client_pointers",
8659 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8660 pclient->name,
8661 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8662
8663 pclient->read_pointer = pheader->read_pointer;
8664 }
8665
8666 if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8667 cm_msg(MINFO, "bm_validate_client_pointers",
8668 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8669 pclient->name,
8670 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8671
8672 pclient->read_pointer = pheader->read_pointer;
8673 }
8674 }
8675}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6189 of file midas.cxx.

6189 {
6190 if (rp < 0 || rp > pheader->size) {
6191 cm_msg(MERROR, "bm_validate_rp",
6192 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6193 pheader->name,
6194 rp,
6195 pheader->read_pointer,
6196 pheader->write_pointer,
6197 pheader->size,
6198 who);
6199 return FALSE;
6200 }
6201
6202 if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6203 // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6204 cm_msg(MERROR, "bm_validate_rp",
6205 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6206 pheader->name,
6207 rp,
6208 (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6209 pheader->read_pointer,
6210 pheader->write_pointer,
6211 pheader->size,
6212 who);
6213 return FALSE;
6214 }
6215
6216 return TRUE;
6217}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9080 of file midas.cxx.

9081{
9082 // return values:
9083 // BM_SUCCESS - have "requested_space" bytes free in the buffer
9084 // BM_CORRUPTED - shared memory is corrupted
9085 // BM_NO_MEMORY - asked for more than buffer size
9086 // BM_ASYNC_RETURN - timeout waiting for free space
9087 // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9088 // SS_ABORT - we are told to shutdown (locks releases)
9089
9090 int status;
9091 BUFFER* pbuf = pbuf_guard.get_pbuf();
9092 BUFFER_HEADER *pheader = pbuf->buffer_header;
9093 char *pdata = (char *) (pheader + 1);
9094
9095 /* make sure the buffer never completely full:
9096 * read pointer and write pointer would coincide
9097 * and the code cannot tell if it means the
9098 * buffer is 100% full or 100% empty. It will explode
9099 * or lose events */
9100 requested_space += 100;
9101
9102 if (requested_space >= pheader->size)
9103 return BM_NO_MEMORY;
9104
9107
9108 //DWORD blocking_time = 0;
9109 //int blocking_loops = 0;
9110 int blocking_client_index = -1;
9112 blocking_client_name[0] = 0;
9113
9114 while (1) {
9115 while (1) {
9116 /* check if enough space in buffer */
9117
9118 int free = pheader->read_pointer - pheader->write_pointer;
9119 if (free <= 0)
9120 free += pheader->size;
9121
9122 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9123
9124 if (requested_space < free) { /* note the '<' to avoid 100% filling */
9125 //if (blocking_loops) {
9126 // DWORD wait_time = ss_millitime() - blocking_time;
9127 // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9128 //}
9129
9130 if (pbuf->wait_start_time != 0) {
9132 DWORD wait_time = now - pbuf->wait_start_time;
9133 pbuf->time_write_wait += wait_time;
9134 pbuf->wait_start_time = 0;
9135 int iclient = pbuf->wait_client_index;
9136 //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9137 if (iclient >= 0 && iclient < MAX_CLIENTS) {
9138 pbuf->client_count_write_wait[iclient] += 1;
9139 pbuf->client_time_write_wait[iclient] += wait_time;
9140 }
9141 }
9142
9143 //if (blocking_loops > 0) {
9144 // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9145 //}
9146
9147 return BM_SUCCESS;
9148 }
9149
9150 if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9151 cm_msg(MERROR, "bm_wait_for_free_space",
9152 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9153 pheader->name,
9154 pheader->read_pointer,
9155 pheader->write_pointer,
9156 pheader->size,
9157 free,
9159 return BM_CORRUPTED;
9160 }
9161
9162 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9163 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9164 int total_size = ALIGN8(event_size);
9165
9166#if 0
9167 printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9168#endif
9169
9170 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9171 cm_msg(MERROR, "bm_wait_for_free_space",
9172 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9173 pheader->name,
9174 pheader->read_pointer,
9175 pheader->write_pointer,
9176 pheader->size,
9177 free,
9179 pevent->data_size,
9180 event_size,
9181 total_size);
9182 return BM_CORRUPTED;
9183 }
9184
9185 int blocking_client = -1;
9186
9187 int i;
9188 for (i = 0; i < pheader->max_client_index; i++) {
9189 BUFFER_CLIENT *pc = pheader->client + i;
9190 if (pc->pid) {
9191 if (pc->read_pointer == pheader->read_pointer) {
9192 /*
9193 First assume that the client with the "minimum" read pointer
9194 is not really blocking due to a GET_ALL request.
9195 */
9197 //int blocking_request_id = -1;
9198
9199 int j;
9200 for (j = 0; j < pc->max_request_index; j++) {
9201 const EVENT_REQUEST *prequest = pc->event_request + j;
9202 if (prequest->valid
9203 && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9204 if (prequest->sampling_type & GET_ALL) {
9205 blocking = TRUE;
9206 //blocking_request_id = prequest->id;
9207 break;
9208 }
9209 }
9210 }
9211
9212 //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9213
9214 if (blocking) {
9216 break;
9217 }
9218
9219 pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9220 }
9221 }
9222 } /* client loop */
9223
9224 if (blocking_client >= 0) {
9227 //if (!blocking_time) {
9228 // blocking_time = ss_millitime();
9229 //}
9230
9231 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9232
9233 // from this "break" we go into timeout check and sleep/wait.
9234 break;
9235 }
9236
9237 /* no blocking clients. move the read pointer and again check for free space */
9238
9239 BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9240
9241 if (!moved) {
9242 cm_msg(MERROR, "bm_wait_for_free_space",
9243 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9244 pheader->name,
9245 pheader->read_pointer,
9246 pheader->write_pointer,
9247 pheader->size,
9248 free,
9250 return BM_CORRUPTED;
9251 }
9252
9253 /* we freed one event, loop back to the check for free space */
9254 }
9255
9256 //blocking_loops++;
9257
9258 /* at least one client is blocking */
9259
9261 pc->write_wait = requested_space;
9262
9263 if (pbuf->wait_start_time == 0) {
9264 pbuf->wait_start_time = ss_millitime();
9265 pbuf->count_write_wait++;
9266 if (requested_space > pbuf->max_requested_space)
9267 pbuf->max_requested_space = requested_space;
9268 pbuf->wait_client_index = blocking_client_index;
9269 }
9270
9272
9273 //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9274
9275 int sleep_time_msec = 1000;
9276
9277 if (timeout_msec == BM_WAIT) {
9278 // wait forever
9279 } else if (timeout_msec == BM_NO_WAIT) {
9280 // no wait
9281 return BM_ASYNC_RETURN;
9282 } else {
9283 // check timeout
9284 if (now >= time_end) {
9285 // timeout!
9286 return BM_ASYNC_RETURN;
9287 }
9288
9290
9291 if (sleep_time_msec <= 0) {
9292 sleep_time_msec = 10;
9293 } else if (sleep_time_msec > 1000) {
9294 sleep_time_msec = 1000;
9295 }
9296 }
9297
9299
9300 /* before waiting, unlock everything in the correct order */
9301
9302 pbuf_guard.unlock();
9303
9305 pbuf->write_cache_mutex.unlock();
9306
9307 //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9308
9309#ifdef DEBUG_MSG
9310 cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9311#endif
9312
9314 //int idx = bm_validate_client_index_locked(pbuf, FALSE);
9315 //if (idx >= 0)
9316 // pheader->client[idx].write_wait = requested_space;
9317
9318 //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9319
9321
9322 /* we are told to shutdown */
9323 if (status == SS_ABORT) {
9324 // NB: buffer is locked!
9325 return SS_ABORT;
9326 }
9327
9328 /* make sure we do sleep in this loop:
9329 * if we are the mserver receiving data on the event
9330 * socket and the data buffer is full, ss_suspend() will
9331 * never sleep: it will detect data on the event channel,
9332 * call rpc_server_receive() (recursively, we already *are* in
9333 * rpc_server_receive()) and return without sleeping. Result
9334 * is a busy loop waiting for free space in data buffer */
9335
9336 /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9337 * the event socket, and should sleep now, so this sleep below
9338 * maybe is not needed now. but for safety, I keep it. K.O. */
9339
9340 if (status != SS_TIMEOUT) {
9341 //printf("ss_suspend: status %d\n", status);
9342 ss_sleep(1);
9343 }
9344
9345 /* we may be stuck in this loop for an arbitrary long time,
9346 * depending on how other buffer clients read the accumulated data
9347 * so we should update all the timeouts & etc. K.O. */
9348
9350
9351 /* lock things again in the correct order */
9352
9353 if (unlock_write_cache) {
9355
9356 if (status != BM_SUCCESS) {
9357 // bail out with all locks released
9358 return status;
9359 }
9360 }
9361
9362 if (!pbuf_guard.relock()) {
9363 if (unlock_write_cache) {
9364 pbuf->write_cache_mutex.unlock();
9365 }
9366
9367 // bail out with all locks released
9368 return pbuf_guard.get_status();
9369 }
9370
9371 /* revalidate the client index: we could have been removed from the buffer while sleeping */
9373
9374 pc->write_wait = 0;
9375
9377 //idx = bm_validate_client_index_locked(pbuf, FALSE);
9378 //if (idx >= 0)
9379 // pheader->client[idx].write_wait = 0;
9380 //else {
9381 // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9382 // status = SS_ABORT;
9383 //}
9384
9385#ifdef DEBUG_MSG
9386 cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9387#endif
9388
9389 }
9390}
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition midas.cxx:8720
INT cm_periodic_tasks()
Definition midas.cxx:5579
#define SS_TIMEOUT
Definition midas.h:674
#define MDEBUG
Definition midas.h:561
#define MSG_BM
Definition msystem.h:295
INT ss_suspend(INT millisec, INT msg)
Definition system.cxx:4543
INT ss_sleep(INT millisec)
Definition system.cxx:3628
char name[NAME_LENGTH]
Definition midas.h:935
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9392 of file midas.cxx.

9393{
9394 BUFFER* pbuf = pbuf_guard.get_pbuf();
9395 BUFFER_HEADER* pheader = pbuf->buffer_header;
9396
9397 //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9398
9399 if (pc->read_pointer != pheader->write_pointer) {
9400 // buffer has data
9401 return BM_SUCCESS;
9402 }
9403
9404 if (timeout_msec == BM_NO_WAIT) {
9405 /* event buffer is empty and we are told to not wait */
9406 if (!pc->read_wait) {
9407 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9408 pc->read_wait = TRUE;
9409 }
9410 return BM_ASYNC_RETURN;
9411 }
9412
9415 DWORD sleep_time = 1000;
9416 if (timeout_msec == BM_NO_WAIT) {
9417 // default sleep time
9418 } else if (timeout_msec == BM_WAIT) {
9419 // default sleep time
9420 } else {
9423 }
9424
9425 //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9426
9427 while (pc->read_pointer == pheader->write_pointer) {
9428 /* wait until there is data in the buffer (write pointer moves) */
9429
9430 if (!pc->read_wait) {
9431 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9432 pc->read_wait = TRUE;
9433 }
9434
9435 pc->last_activity = ss_millitime();
9436
9438
9439 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9440
9441 pbuf_guard.unlock();
9442
9444 pbuf->read_cache_mutex.unlock();
9445
9447
9448 if (timeout_msec == BM_NO_WAIT) {
9449 // return immediately
9450 } else if (timeout_msec == BM_WAIT) {
9451 // wait forever
9452 } else {
9454 //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9455 if (now >= time_wait) {
9456 timeout_msec = BM_NO_WAIT; // cause immediate return
9457 } else {
9459 if (sleep_time > 1000)
9460 sleep_time = 1000;
9461 //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9462 }
9463 }
9464
9465 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9466
9467 if (unlock_read_cache) {
9469 if (status != BM_SUCCESS) {
9470 // bail out with all locks released
9471 return status;
9472 }
9473 }
9474
9475 if (!pbuf_guard.relock()) {
9476 if (unlock_read_cache) {
9477 pbuf->read_cache_mutex.unlock();
9478 }
9479 // bail out with all locks released
9480 return pbuf_guard.get_status();
9481 }
9482
9483 /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9484 * because we may have been removed from the buffer by bm_cleanup() & co
9485 * due to a timeout or whatever. */
9487
9488 /* return if TCP connection broken */
9489 if (status == SS_ABORT)
9490 return SS_ABORT;
9491
9492 if (timeout_msec == BM_NO_WAIT)
9493 return BM_ASYNC_RETURN;
9494 }
9495
9496 if (pc->read_wait) {
9497 //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9498 pc->read_wait = FALSE;
9499 }
9500
9501 return BM_SUCCESS;
9502}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8787 of file midas.cxx.

8787 {
8788 int i;
8789 int have_get_all_requests = 0;
8790
8791 for (i = 0; i < pc->max_request_index; i++)
8792 if (pc->event_request[i].valid)
8793 have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8794
8795 /* only GET_ALL requests actually free space in the event buffer */
8797 return;
8798
8799 /*
8800 If read pointer has been changed, it may have freed up some space
8801 for waiting producers. So check if free space is now more than 50%
8802 of the buffer size and wake waiting producers.
8803 */
8804
8805 int free_space = pc->read_pointer - pheader->write_pointer;
8806 if (free_space <= 0)
8807 free_space += pheader->size;
8808
8809 if (free_space >= pheader->size * 0.5) {
8810 for (i = 0; i < pheader->max_client_index; i++) {
8811 const BUFFER_CLIENT *pc = pheader->client + i;
8812 if (pc->pid && pc->write_wait) {
8813 BOOL send_wakeup = (pc->write_wait < free_space);
8814 //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8815 if (send_wakeup) {
8816 ss_resume(pc->port, "B ");
8817 }
8818 }
8819 }
8820 }
8821}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6586 of file midas.cxx.

6587{
6588 //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6589
6591
6592 if (!pbuf_guard.is_locked())
6593 return;
6594
6595 if (!force) {
6596 if (pbuf->count_lock == pbuf->last_count_lock) {
6597 return;
6598 }
6599 }
6600
6601 std::string buffer_name = pbuf->buffer_name;
6602 std::string client_name = pbuf->client_name;
6603
6604 if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6605 // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6606 pbuf_guard.unlock(); // unlock before cm_msg()
6607 cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6608 return;
6609 }
6610
6611 pbuf->last_count_lock = pbuf->count_lock;
6612
6614 BUFFER_HEADER xheader = *pbuf->buffer_header;
6615 int client_index = pbuf->client_index;
6616
6617 pbuf_guard.unlock();
6618
6619 bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6620}
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition midas.cxx:6464
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char buffer_name,
const char client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6464 of file midas.cxx.

6465{
6466 int status;
6467
6469
6470 HNDLE hKey;
6471 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6472 if (status != DB_SUCCESS) {
6473 db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6474 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6475 if (status != DB_SUCCESS)
6476 return;
6477 }
6478
6481 if (status != DB_SUCCESS) {
6484 if (status != DB_SUCCESS)
6485 return;
6486 }
6487
6488 double buf_size = pheader->size;
6489 double buf_rptr = pheader->read_pointer;
6490 double buf_wptr = pheader->write_pointer;
6491
6492 double buf_fill = 0;
6493 double buf_cptr = 0;
6494 double buf_cused = 0;
6495 double buf_cused_pct = 0;
6496
6497 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6498 buf_cptr = pheader->client[client_index].read_pointer;
6499
6500 if (buf_wptr == buf_cptr) {
6501 buf_cused = 0;
6502 } else if (buf_wptr > buf_cptr) {
6504 } else {
6505 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6506 }
6507
6508 buf_cused_pct = buf_cused / buf_size * 100.0;
6509
6510 // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6511 // because some other GET_ALL client may have different buf_cused & etc,
6512 // so they must be written into the per-client statistics
6513 // and the web page should look at all the GET_ALL clients and used
6514 // the biggest buf_cused as the whole-buffer "bytes used" value.
6515 }
6516
6517 if (buf_wptr == buf_rptr) {
6518 buf_fill = 0;
6519 } else if (buf_wptr > buf_rptr) {
6521 } else {
6522 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6523 }
6524
6525 double buf_fill_pct = buf_fill / buf_size * 100.0;
6526
6527 db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6528 db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6529 db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6530 db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6531 db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6532
6533 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6534 if (status != DB_SUCCESS) {
6535 db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6536 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6537 if (status != DB_SUCCESS)
6538 return;
6539 }
6540
6542 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6543 if (status != DB_SUCCESS) {
6544 db_create_key(hDB, hKey, client_name, TID_KEY);
6545 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6546 if (status != DB_SUCCESS)
6547 return;
6548 }
6549
6550 db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6551 db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6552 db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6553 db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6554 db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6555 db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6556 db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6557 db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6558 db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6559 db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6560 db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6561 db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6562
6563 for (int i = 0; i < MAX_CLIENTS; i++) {
6564 if (!pbuf->client_count_write_wait[i])
6565 continue;
6566
6567 if (pheader->client[i].pid == 0)
6568 continue;
6569
6570 if (pheader->client[i].name[0] == 0)
6571 continue;
6572
6573 char str[100 + NAME_LENGTH];
6574
6575 sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6576 db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6577
6578 sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6579 db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6580 }
6581
6582 db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6583 db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6584}
#define TID_DOUBLE
Definition midas.h:343
#define TID_KEY
Definition midas.h:349
#define TID_BOOL
Definition midas.h:340
#define TID_INT32
Definition midas.h:339
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition odb.cxx:3308
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition odb.cxx:5261
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7280 of file midas.cxx.

7280 {
7281#ifdef LOCAL_ROUTINES
7282 {
7283 int status;
7284 HNDLE hDB;
7285
7287
7288 if (status != CM_SUCCESS) {
7289 //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7290 return BM_SUCCESS;
7291 }
7292
7293 std::vector<BUFFER*> mybuffers;
7294
7295 gBuffersMutex.lock();
7297 gBuffersMutex.unlock();
7298
7299 for (BUFFER* pbuf : mybuffers) {
7300 if (!pbuf || !pbuf->attached)
7301 continue;
7303 }
7304 }
7305#endif /* LOCAL_ROUTINES */
7306
7307 return BM_SUCCESS;
7308}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9504 of file midas.cxx.

9505{
9506 char *pdata = (char *) (pheader + 1);
9507
9508 //int old_write_pointer = pheader->write_pointer;
9509
9510 /* new event fits into the remaining space? */
9511 if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9512 //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9513 char* wptr = pdata + pheader->write_pointer;
9514 for (int i=0; i<sg_n; i++) {
9515 //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9516 memcpy(wptr, sg_ptr[i], sg_len[i]);
9517 wptr += sg_len[i];
9518 }
9519 pheader->write_pointer = pheader->write_pointer + total_size;
9520 assert(pheader->write_pointer <= pheader->size);
9521 /* remaining space is smaller than size of an event header? */
9522 if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9523 // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9524 // equal to the event header size, we will write the next event header here,
9525 // then wrap the pointer and write the event data at the beginning of the buffer.
9526 //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9527 pheader->write_pointer = 0;
9528 }
9529 } else {
9530 /* split event */
9531 size_t size = pheader->size - pheader->write_pointer;
9532
9533 //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9534
9535 //memcpy(pdata + pheader->write_pointer, pevent, size);
9536 //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9537
9538 char* wptr = pdata + pheader->write_pointer;
9539 size_t count = 0;
9540
9541 // copy first part
9542
9543 int i = 0;
9544 for (; i<sg_n; i++) {
9545 if (count + sg_len[i] > size)
9546 break;
9547 memcpy(wptr, sg_ptr[i], sg_len[i]);
9548 wptr += sg_len[i];
9549 count += sg_len[i];
9550 }
9551
9552 //printf("wptr %d, count %d\n", wptr-pdata, count);
9553
9554 // split segment
9555
9556 size_t first = size - count;
9557 size_t second = sg_len[i] - first;
9558 assert(first + second == sg_len[i]);
9559 assert(count + first == size);
9560
9561 //printf("first %d, second %d\n", first, second);
9562
9563 memcpy(wptr, sg_ptr[i], first);
9564 wptr = pdata + 0;
9565 count += first;
9567 wptr += second;
9568 count += second;
9569 i++;
9570
9571 // copy remaining
9572
9573 for (; i<sg_n; i++) {
9574 memcpy(wptr, sg_ptr[i], sg_len[i]);
9575 wptr += sg_len[i];
9576 count += sg_len[i];
9577 }
9578
9579 //printf("wptr %d, count %d\n", wptr-pdata, count);
9580
9581 //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9582
9583 pheader->write_pointer = total_size - size;
9584 }
9585
9586 //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9587}
Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5919 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5914 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5920 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11281 of file midas.cxx.