MIDAS
Loading...
Searching...
No Matches
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

static int bm_validate_client_index_locked (bm_lock_buffer_guard &pbuf_guard)
 
INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char > > &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
static INT bm_receive_event_rpc_cxx (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11411 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8325 of file midas.cxx.

8369{
8370 if (rpc_is_remote())
8371 return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8372 trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8373
8374#ifdef LOCAL_ROUTINES
8375 {
8376 int status = 0;
8377
8378 BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8379
8380 if (!pbuf)
8381 return status;
8382
8383 /* lock buffer */
8384 bm_lock_buffer_guard pbuf_guard(pbuf);
8385
8386 if (!pbuf_guard.is_locked())
8387 return pbuf_guard.get_status();
8388
8389 /* avoid callback/non callback requests */
8390 if (func == NULL && pbuf->callback) {
8391 pbuf_guard.unlock(); // unlock before cm_msg()
8392 cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8393 return BM_INVALID_MIXING;
8394 }
8395
8396 /* do not allow GET_RECENT with nonzero cache size */
8397 if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8398 pbuf_guard.unlock(); // unlock before cm_msg()
8399 cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8400 return BM_INVALID_PARAM;
8401 }
8402
8403 /* get a pointer to the proper client structure */
8404 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
8405
8406 /* look for a empty request entry */
8407 int i;
8408 for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8409 if (!pclient->event_request[i].valid)
8410 break;
8411
8412 if (i == MAX_EVENT_REQUESTS) {
8413 // implicit unlock
8414 return BM_NO_MEMORY;
8415 }
8416
8417 /* setup event_request structure */
8418 pclient->event_request[i].id = request_id;
8419 pclient->event_request[i].valid = TRUE;
8420 pclient->event_request[i].event_id = event_id;
8422 pclient->event_request[i].sampling_type = sampling_type;
8423
8424 pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8425
8426 pbuf->get_all_flag = pclient->all_flag;
8427
8428 /* set callback flag in buffer structure */
8429 if (func != NULL)
8430 pbuf->callback = TRUE;
8431
8432 /*
8433 Save the index of the last request in the list so that later only the
8434 requests 0..max_request_index-1 have to be searched through.
8435 */
8436
8437 if (i + 1 > pclient->max_request_index)
8438 pclient->max_request_index = i + 1;
8439 }
8440#endif /* LOCAL_ROUTINES */
8441
8442 return BM_SUCCESS;
8443}
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
Definition midas.cxx:6017
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition midas.cxx:6633
#define BM_INVALID_PARAM
Definition midas.h:619
#define BM_NO_MEMORY
Definition midas.h:607
#define BM_INVALID_MIXING
Definition midas.h:621
#define BM_SUCCESS
Definition midas.h:605
#define GET_ALL
Definition midas.h:321
#define GET_RECENT
Definition midas.h:323
#define MERROR
Definition midas.h:559
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:931
#define RPC_BM_ADD_EVENT_REQUEST
Definition mrpc.h:43
bool rpc_is_remote(void)
Definition midas.cxx:12892
INT rpc_call(DWORD routine_id,...)
Definition midas.cxx:14115
INT i
Definition mdump.cxx:32
int INT
Definition midas.h:129
#define TRUE
Definition midas.h:182
#define MAX_EVENT_REQUESTS
Definition midas.h:275
#define POINTER_T
Definition midas.h:166
#define trigger_mask
#define event_id
DWORD status
Definition odbhist.cxx:39
BOOL all_flag
Definition midas.h:950
EVENT_REQUEST event_request[MAX_EVENT_REQUESTS]
Definition midas.h:954
INT max_request_index
Definition midas.h:942
BOOL get_all_flag
Definition midas.h:1009
BOOL callback
Definition midas.h:1007
std::atomic< size_t > read_cache_size
Definition midas.h:995
short int event_id
Definition midas.h:930
short int trigger_mask
Definition midas.h:931
INT sampling_type
Definition midas.h:932
BOOL valid
Definition midas.h:929
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 11093 of file midas.cxx.

11093 {
11094#ifdef LOCAL_ROUTINES
11095 {
11096 INT status = 0;
11097 BOOL bMore;
11098 DWORD start_time;
11099 //static DWORD last_time = 0;
11100
11101 /* if running as a server, buffer checking is done by client
11102 via ASYNC bm_receive_event */
11103 if (rpc_is_mserver()) {
11104 return FALSE;
11105 }
11106
11107 bMore = FALSE;
11108 start_time = ss_millitime();
11109
11110 std::vector<BUFFER*> mybuffers;
11111
11112 gBuffersMutex.lock();
11113 mybuffers = gBuffers;
11114 gBuffersMutex.unlock();
11115
11116 /* go through all buffers */
11117 for (size_t idx = 0; idx < mybuffers.size(); idx++) {
11118 BUFFER* pbuf = mybuffers[idx];
11119
11120 if (!pbuf || !pbuf->attached)
11121 continue;
11122
11123 //int count_loops = 0;
11124 while (1) {
11125 if (pbuf->attached) {
11126 /* one bm_push_event could cause a run stop and a buffer close, which
11127 * would crash the next call to bm_push_event(). So check for valid
11128 * buffer on each call */
11129
11130 /* this is what happens:
11131 * bm_push_buffer() may call a user callback function
11132 * user callback function may indirectly call bm_close() of this buffer,
11133 * i.e. if it stops the run,
11134 * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
11135 * here we will see pbuf->attched is false and quit this loop
11136 */
11137
11138 status = bm_push_buffer(pbuf, idx + 1);
11139
11140 if (status == BM_CORRUPTED) {
11141 return status;
11142 }
11143
11144 //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
11145
11146 if (status != BM_MORE_EVENTS) {
11147 //DWORD t = ss_millitime() - start_time;
11148 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
11149 break;
11150 }
11151
11152 // count_loops++;
11153 }
11154
11155 // NB: this code has a logic error: if 2 buffers always have data,
11156 // this timeout will cause us to exit reading the 1st buffer
11157 // after 1000 msec, then we read the 2nd buffer exactly once,
11158 // and exit the loop because the timeout is still active -
11159 // we did not reset "start_time" when we started reading
11160 // from the 2nd buffer. Result is that we always read all
11161 // the data in a loop from the 1st buffer, but read just
11162 // one event from the 2nd buffer, resulting in severe unfairness.
11163
11164 /* stop after one second */
11165 DWORD t = ss_millitime() - start_time;
11166 if (t > 1000) {
11167 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
11168 bMore = TRUE;
11169 break;
11170 }
11171 }
11172 }
11173
11174 //last_time = start_time;
11175
11176 return bMore;
11177
11178 }
11179#else /* LOCAL_ROUTINES */
11180
11181 return FALSE;
11182
11183#endif
11184}
#define FALSE
Definition cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition midas.cxx:11041
#define BM_MORE_EVENTS
Definition midas.h:620
#define BM_CORRUPTED
Definition midas.h:623
unsigned int DWORD
Definition mcstd.h:51
DWORD ss_millitime()
Definition system.cxx:3465
bool rpc_is_mserver(void)
Definition midas.cxx:12949
static std::mutex gBuffersMutex
Definition midas.cxx:195
static std::vector< BUFFER * > gBuffers
Definition midas.cxx:196
DWORD BOOL
Definition midas.h:105
std::atomic_bool attached
Definition midas.h:988
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8977 of file midas.cxx.

8977 {
8978
8979 BOOL is_requested = FALSE;
8980 int i;
8981 for (i = 0; i < pc->max_request_index; i++) {
8982 const EVENT_REQUEST *prequest = pc->event_request + i;
8983 if (prequest->valid) {
8984 if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8985 /* check if this is a recent event */
8986 if (prequest->sampling_type == GET_RECENT) {
8987 if (ss_time() - pevent->time_stamp > 1) {
8988 /* skip that event */
8989 continue;
8990 }
8991 }
8992
8993 is_requested = TRUE;
8994 break;
8995 }
8996 }
8997 }
8998 return is_requested;
8999}
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition midas.cxx:6033
DWORD ss_time()
Definition system.cxx:3534
DWORD time_stamp
Definition midas.h:856
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char *  who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6084 of file midas.cxx.

6084 {
6085 BUFFER_HEADER *pheader;
6086 BUFFER_CLIENT *pbclient;
6087 int j;
6088
6089 pheader = pbuf->buffer_header;
6090 pbclient = pheader->client;
6091
6092 /* now check other clients */
6093 for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6094 if (pbclient->pid) {
6095 if (!ss_pid_exists(pbclient->pid)) {
6096 cm_msg(MINFO, "bm_cleanup",
6097 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6098 pheader->name, who, pbclient->pid);
6099
6100 bm_remove_client_locked(pheader, j);
6101 continue;
6102 }
6103 }
6104
6105 /* If client process has no activity, clear its buffer entry. */
6106 if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6107 DWORD tdiff = actual_time - pbclient->last_activity;
6108#if 0
6109 printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6110 pheader->name,
6111 pbclient->name,
6112 pbclient->last_activity,
6114 tdiff,
6115 tdiff,
6116 pbclient->watchdog_timeout);
6117#endif
6118 if (actual_time > pbclient->last_activity &&
6119 tdiff > pbclient->watchdog_timeout) {
6120
6121 cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6122 pbclient->name, pheader->name, who,
6123 tdiff / 1000.0,
6124 pbclient->watchdog_timeout / 1000.0);
6125
6126 bm_remove_client_locked(pheader, j);
6127 }
6128 }
6129 }
6130}
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition midas.cxx:6053
#define MINFO
Definition midas.h:560
BOOL ss_pid_exists(int pid)
Definition system.cxx:1442
DWORD actual_time
Definition mfe.cxx:37
INT j
Definition odbhist.cxx:40
DWORD watchdog_timeout
Definition midas.h:952
DWORD last_activity
Definition midas.h:951
char name[NAME_LENGTH]
Definition midas.h:936
char name[NAME_LENGTH]
Definition midas.h:959
INT max_client_index
Definition midas.h:961
BUFFER_CLIENT client[MAX_CLIENTS]
Definition midas.h:968
BUFFER_HEADER * buffer_header
Definition midas.h:993
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6426 of file midas.cxx.

6426 {
6427 std::string str = msprintf("/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6428 //printf("delete [%s]\n", str);
6429 db_delete(hDB, 0, str.c_str());
6430}
INT db_delete(HNDLE hDB, HNDLE hKeyRoot, const char *odb_path)
Definition odb.cxx:3999
HNDLE hDB
main ODB handle
Definition mana.cxx:207
std::string msprintf(const char *format,...)
Definition midas.cxx:419
char str[256]
Definition odbhist.cxx:33
char client_name[NAME_LENGTH]
Definition midas.h:991
char buffer_name[NAME_LENGTH]
Definition midas.h:992
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7254 of file midas.cxx.

7254 {
7255 if (rpc_is_remote())
7257
7258#ifdef LOCAL_ROUTINES
7259 {
7261
7262 gBuffersMutex.lock();
7263 size_t nbuf = gBuffers.size();
7264 gBuffersMutex.unlock();
7265
7266 for (size_t i = nbuf; i > 0; i--) {
7268 }
7269
7270 gBuffersMutex.lock();
7271 for (size_t i=0; i< gBuffers.size(); i++) {
7272 BUFFER* pbuf = gBuffers[i];
7273 if (!pbuf)
7274 continue;
7275 delete pbuf;
7276 pbuf = NULL;
7277 gBuffers[i] = NULL;
7278 }
7279 gBuffersMutex.unlock();
7280 }
7281#endif /* LOCAL_ROUTINES */
7282
7283 return BM_SUCCESS;
7284}
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7107
int cm_msg_close_buffer(void)
Definition midas.cxx:501
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7107 of file midas.cxx.

7107 {
7108 //printf("bm_close_buffer: handle %d\n", buffer_handle);
7109
7110 if (rpc_is_remote())
7111 return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7112
7113#ifdef LOCAL_ROUTINES
7114 {
7115 int status = 0;
7116
7117 BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7118
7119 if (!pbuf)
7120 return status;
7121
7122 //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7123
7124 int i;
7125
7126 { /* delete all requests for this buffer */
7127 _request_list_mutex.lock();
7128 std::vector<EventRequest> request_list_copy = _request_list;
7129 _request_list_mutex.unlock();
7130 for (size_t i = 0; i < request_list_copy.size(); i++) {
7131 if (request_list_copy[i].buffer_handle == buffer_handle) {
7133 }
7134 }
7135 }
7136
7137 HNDLE hDB;
7139
7140 if (hDB) {
7141 /* write statistics to odb */
7143 }
7144
7145 /* lock buffer in correct order */
7146
7148
7149 if (status != BM_SUCCESS) {
7150 return status;
7151 }
7152
7154
7155 if (status != BM_SUCCESS) {
7156 pbuf->read_cache_mutex.unlock();
7157 return status;
7158 }
7159
7160 bm_lock_buffer_guard pbuf_guard(pbuf);
7161
7162 if (!pbuf_guard.is_locked()) {
7163 pbuf->write_cache_mutex.unlock();
7164 pbuf->read_cache_mutex.unlock();
7165 return pbuf_guard.get_status();
7166 }
7167
7168 BUFFER_HEADER *pheader = pbuf->buffer_header;
7169
7170 /* mark entry in _buffer as empty */
7171 pbuf->attached = false;
7172
7173 BUFFER_CLIENT* pclient = bm_get_my_client_locked(pbuf_guard);
7174
7175 if (pclient) {
7176 /* clear entry from client structure in buffer header */
7177 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7178 }
7179
7180 /* calculate new max_client_index entry */
7181 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7182 if (pheader->client[i].pid != 0)
7183 break;
7184 pheader->max_client_index = i + 1;
7185
7186 /* count new number of clients */
7187 int j = 0;
7188 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7189 if (pheader->client[i].pid != 0)
7190 j++;
7191 pheader->num_clients = j;
7192
7193 int destroy_flag = (pheader->num_clients == 0);
7194
7195 // we hold the locks on the read cache and the write cache.
7196
7197 /* free cache */
7198 if (pbuf->read_cache_size > 0) {
7199 free(pbuf->read_cache);
7200 pbuf->read_cache = NULL;
7201 pbuf->read_cache_size = 0;
7202 pbuf->read_cache_rp = 0;
7203 pbuf->read_cache_wp = 0;
7204 }
7205
7206 if (pbuf->write_cache_size > 0) {
7207 free(pbuf->write_cache);
7208 pbuf->write_cache = NULL;
7209 pbuf->write_cache_size = 0;
7210 pbuf->write_cache_rp = 0;
7211 pbuf->write_cache_wp = 0;
7212 }
7213
7214 /* check if anyone is waiting and wake him up */
7215
7216 for (int i = 0; i < pheader->max_client_index; i++) {
7217 BUFFER_CLIENT *pclient = pheader->client + i;
7218 if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7219 ss_resume(pclient->port, "B ");
7220 }
7221
7222 /* unmap shared memory, delete it if we are the last */
7223
7224 ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7225
7226 /* after ss_shm_close() these are invalid: */
7227
7228 pheader = NULL;
7229 pbuf->buffer_header = NULL;
7230 pbuf->shm_size = 0;
7231 pbuf->shm_handle = 0;
7232
7233 /* unlock buffer in correct order */
7234
7235 pbuf_guard.unlock();
7236
7237 pbuf->write_cache_mutex.unlock();
7238 pbuf->read_cache_mutex.unlock();
7239
7240 /* delete semaphore */
7241
7242 ss_semaphore_delete(pbuf->semaphore, destroy_flag);
7243 }
7244#endif /* LOCAL_ROUTINES */
7245
7246 return BM_SUCCESS;
7247}
INT bm_delete_request(INT request_id)
Definition midas.cxx:8595
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition midas.cxx:6597
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3027
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition midas.cxx:7915
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition midas.cxx:7936
INT ss_resume(INT port, const char *message)
Definition system.cxx:4916
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition system.cxx:2941
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition system.cxx:757
#define RPC_BM_CLOSE_BUFFER
Definition mrpc.h:37
static std::vector< EventRequest > _request_list
Definition midas.cxx:220
static std::mutex _request_list_mutex
Definition midas.cxx:219
#define MAX_CLIENTS
Definition midas.h:274
INT HNDLE
Definition midas.h:132
INT write_wait
Definition midas.h:948
BOOL read_wait
Definition midas.h:947
INT num_clients
Definition midas.h:960
HNDLE semaphore
Definition midas.h:1004
size_t read_cache_rp
Definition midas.h:997
std::timed_mutex read_cache_mutex
Definition midas.h:994
std::timed_mutex write_cache_mutex
Definition midas.h:999
size_t shm_size
Definition midas.h:1006
char * read_cache
Definition midas.h:996
size_t write_cache_rp
Definition midas.h:1002
size_t write_cache_wp
Definition midas.h:1003
char * write_cache
Definition midas.h:1001
size_t read_cache_wp
Definition midas.h:998
std::atomic< size_t > write_cache_size
Definition midas.h:1000
INT shm_handle
Definition midas.h:1005
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition midas.cxx:8292
#define serial_number
#define time_stamp
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8292 of file midas.cxx.

8293{
8294 event_header->event_id = event_id;
8295 event_header->trigger_mask = trigger_mask;
8296 event_header->data_size = data_size;
8297 event_header->time_stamp = ss_time();
8298 event_header->serial_number = serial;
8299
8300 return BM_SUCCESS;
8301}
INT serial
Definition minife.c:20
short int event_id
Definition midas.h:853
DWORD data_size
Definition midas.h:857
DWORD serial_number
Definition midas.h:855
short int trigger_mask
Definition midas.h:854
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8303 of file midas.cxx.

8304{
8305 static std::mutex mutex;
8306
8307 event_header->event_id = event_id;
8308 event_header->trigger_mask = trigger_mask;
8309 event_header->data_size = data_size;
8310 event_header->time_stamp = ss_time();
8311 {
8312 std::lock_guard<std::mutex> lock(mutex);
8313 event_header->serial_number = *serial;
8314 *serial = *serial + 1;
8315 // implicit unlock
8316 }
8317
8318 return BM_SUCCESS;
8319}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9080 of file midas.cxx.

9080 {
9081 /* now convert event header */
9082 if (convert_flags) {
9083 rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9084 rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9085 rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9086 rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9087 rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9088 }
9089}
#define TID_UINT32
Definition midas.h:337
#define TID_INT16
Definition midas.h:335
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition midas.cxx:11812
#define RPC_OUTGOING
Definition midas.h:1584
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8595 of file midas.cxx.

8596{
8597 _request_list_mutex.lock();
8598
8599 if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8600 _request_list_mutex.unlock();
8601 return BM_INVALID_HANDLE;
8602 }
8603
8604 int buffer_handle = _request_list[request_id].buffer_handle;
8605
8606 _request_list[request_id].clear();
8607
8608 _request_list_mutex.unlock();
8609
8610 /* remove request entry from buffer */
8611 return bm_remove_event_request(buffer_handle, request_id);
8612}
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition midas.cxx:8529
#define BM_INVALID_HANDLE
Definition midas.h:609
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8834 of file midas.cxx.

8835{
8836 _request_list_mutex.lock();
8837 bool locked = true;
8838 size_t n = _request_list.size();
8839 /* call dispatcher */
8840 for (size_t i = 0; i < n; i++) {
8841 if (!locked) {
8842 _request_list_mutex.lock();
8843 locked = true;
8844 }
8846 if (r.buffer_handle != buffer_handle)
8847 continue;
8848 if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8849 continue;
8850 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8851 _request_list_mutex.unlock();
8852 locked = false;
8853 /* if event is fragmented, call defragmenter */
8854 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8855 bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8856 } else {
8857 r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8858 }
8859 }
8860 if (locked)
8861 _request_list_mutex.unlock();
8862}
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition midas.cxx:11423
DWORD n[4]
Definition mana.cxx:247
#define EVENTID_FRAG
Definition midas.h:908
#define EVENTID_FRAG1
Definition midas.h:907
short int event_id
Definition midas.cxx:206
INT buffer_handle
Definition midas.cxx:205
short int trigger_mask
Definition midas.cxx:207
EVENT_HANDLER * dispatcher
Definition midas.cxx:208
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11379 of file midas.cxx.

11379 {
11380 if (rpc_is_remote())
11382
11383#ifdef LOCAL_ROUTINES
11384 {
11385 std::vector<BUFFER*> mybuffers;
11386
11387 gBuffersMutex.lock();
11388 mybuffers = gBuffers;
11389 gBuffersMutex.unlock();
11390
11391 /* go through all buffers */
11392 for (BUFFER* pbuf : mybuffers) {
11393 if (!pbuf)
11394 continue;
11395 if (!pbuf->attached)
11396 continue;
11397
11398 int status = bm_skip_event(pbuf);
11399 if (status != BM_SUCCESS)
11400 return status;
11401 }
11402 }
11403#endif /* LOCAL_ROUTINES */
11404
11405 return BM_SUCCESS;
11406}
static int bm_skip_event(BUFFER *pbuf)
Definition midas.cxx:10972
#define RPC_BM_EMPTY_BUFFERS
Definition mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 9003 of file midas.cxx.

9004{
9005 BUFFER* pbuf = pbuf_guard.get_pbuf();
9006 BUFFER_HEADER* pheader = pbuf->buffer_header;
9007 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
9008 BOOL need_wakeup = FALSE;
9009
9010 //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
9011
9012 /* loop over all events in the buffer */
9013
9014 while (1) {
9015 EVENT_HEADER *pevent = NULL;
9016 int event_size = 3; // poison value
9017 int total_size = 3; // poison value
9018
9019 int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
9020 if (status == BM_CORRUPTED) {
9021 return status;
9022 } else if (status != BM_SUCCESS) {
9023 /* event buffer is empty */
9024 if (timeout_msec == BM_NO_WAIT) {
9025 if (need_wakeup)
9026 bm_wakeup_producers_locked(pheader, pc);
9027 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
9028 // read cache is empty
9029 return BM_ASYNC_RETURN;
9030 }
9031 return BM_SUCCESS;
9032 }
9033
9034 int status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, TRUE);
9035
9036 if (status != BM_SUCCESS) {
9037 // we only come here with SS_ABORT & co
9038 return status;
9039 }
9040
9041 // make sure we wait for new event only once
9042 timeout_msec = BM_NO_WAIT;
9043 // go back to bm_peek_buffer_locked
9044 continue;
9045 }
9046
9047 /* loop over all requests: if this event matches a request,
9048 * copy it to the read cache */
9049
9050 BOOL is_requested = bm_check_requests(pc, pevent);
9051
9052 if (is_requested) {
9053 if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9054 /* read cache is full */
9055 if (need_wakeup)
9056 bm_wakeup_producers_locked(pheader, pc);
9057 return BM_SUCCESS;
9058 }
9059
9061
9062 pbuf->read_cache_wp += total_size;
9063
9064 /* update statistics */
9065 pheader->num_out_events++;
9066 pbuf->count_read++;
9067 pbuf->bytes_read += event_size;
9068 }
9069
9070 /* shift read pointer */
9071
9072 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9073 pc->read_pointer = new_read_pointer;
9074
9075 need_wakeup = TRUE;
9076 }
9077 /* NOT REACHED */
9078}
BUFFER * get_pbuf() const
Definition midas.cxx:3197
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition midas.cxx:8798
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition midas.cxx:6241
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:8977
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition midas.cxx:8947
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8902
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition midas.cxx:9403
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_NO_WAIT
Definition midas.h:366
int event_size
Definition msysmon.cxx:527
INT read_pointer
Definition midas.h:941
INT num_out_events
Definition midas.h:966
double bytes_read
Definition midas.h:1022
int count_read
Definition midas.h:1021
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9600 of file midas.cxx.

9600 {
9601 if (pc->pid) {
9602 int j;
9603 for (j = 0; j < pc->max_request_index; j++) {
9604 const EVENT_REQUEST *prequest = pc->event_request + j;
9605 if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9606 return prequest->id;
9607 }
9608 }
9609 }
9610
9611 return -1;
9612}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10233 of file midas.cxx.

10234{
10235 if (rpc_is_remote()) {
10236 return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10237 }
10238
10239#ifdef LOCAL_ROUTINES
10240 {
10241 INT status = 0;
10242
10243 //printf("bm_flush_cache!\n");
10244
10245 BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10246
10247 if (!pbuf)
10248 return status;
10249
10250 if (pbuf->write_cache_size == 0)
10251 return BM_SUCCESS;
10252
10254
10255 if (status != BM_SUCCESS)
10256 return status;
10257
10258 /* check if anything needs to be flushed */
10259 if (pbuf->write_cache_wp == 0) {
10260 pbuf->write_cache_mutex.unlock();
10261 return BM_SUCCESS;
10262 }
10263
10264 /* lock the buffer */
10265 bm_lock_buffer_guard pbuf_guard(pbuf);
10266
10267 if (!pbuf_guard.is_locked())
10268 return pbuf_guard.get_status();
10269
10270 status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
10271
10272 /* unlock in correct order */
10273
10274 if (pbuf_guard.is_locked()) {
10275 // check if bm_wait_for_free_space() failed to relock the buffer
10276 pbuf_guard.unlock();
10277 }
10278
10279 pbuf->write_cache_mutex.unlock();
10280
10281 return status;
10282 }
10283#endif /* LOCAL_ROUTINES */
10284
10285 return BM_SUCCESS;
10286}
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition midas.cxx:9998
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:10089
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10089 of file midas.cxx.

10090{
10091 // NB we come here with write cache locked and buffer locked.
10092
10093 {
10094 INT status = 0;
10095
10096 //printf("bm_flush_cache_locked!\n");
10097
10098 BUFFER* pbuf = pbuf_guard.get_pbuf();
10099 BUFFER_HEADER* pheader = pbuf->buffer_header;
10100
10101 //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10102
10103 int old_write_pointer = pheader->write_pointer;
10104
10105 int request_id[MAX_CLIENTS];
10106 for (int i = 0; i < pheader->max_client_index; i++) {
10107 request_id[i] = -1;
10108 }
10109
10110 size_t ask_rp = pbuf->write_cache_rp;
10111 size_t ask_wp = pbuf->write_cache_wp;
10112
10113 if (ask_wp == 0) { // nothing to do
10114 return BM_SUCCESS;
10115 }
10116
10117 if (ask_rp == ask_wp) { // nothing to do
10118 return BM_SUCCESS;
10119 }
10120
10121 assert(ask_rp < ask_wp);
10122
10123 size_t ask_free = ALIGN8(ask_wp - ask_rp);
10124
10125 if (ask_free == 0) { // nothing to do
10126 return BM_SUCCESS;
10127 }
10128
10129#if 0
10131 if (status != BM_SUCCESS) {
10132 printf("bm_flush_cache: corrupted 111!\n");
10133 abort();
10134 }
10135#endif
10136
10137 status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, ask_free, true);
10138
10139 if (status != BM_SUCCESS) {
10140 return status;
10141 }
10142
10143 // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10144 //
10145 // wait_for_free_space() will sleep with all locks released,
10146 // during this time, another thread may call bm_send_event() that will
10147 // add one or more events to the write cache and after wait_for_free_space()
10148 // returns, size of data in cache will be bigger than the amount
10149 // of free space we requested. so we need to keep track of how
10150 // much data we write to the buffer and ask for more data
10151 // if we run short. This is the reason for the big loop
10152 // around wait_for_free_space(). We ask for slightly too little free
10153 // space to make sure all this code is always used and does work. K.O.
10154
10155 if (pbuf->write_cache_wp == 0) {
10156 /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10157 return BM_SUCCESS;
10158 }
10159
10160 //size_t written = 0;
10161 while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10162 /* loop over all events in cache */
10163
10164 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10165 size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10166 size_t total_size = ALIGN8(event_size);
10167
10168#if 0
10169 printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10170 int(pbuf->write_cache_size),
10171 int(pbuf->write_cache_wp),
10172 int(pbuf->write_cache_rp),
10173 int(pevent->data_size),
10174 int(event_size),
10175 int(total_size),
10176 int(ask_free),
10177 int(written));
10178#endif
10179
10180 // check for crazy event size
10181 assert(total_size >= sizeof(EVENT_HEADER));
10182 assert(total_size <= (size_t)pheader->size);
10183
10184 bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10185
10186 /* update statistics */
10187 pheader->num_in_events++;
10188 pbuf->count_sent += 1;
10189 pbuf->bytes_sent += total_size;
10190
10191 /* see comment for the same code in bm_send_event().
10192 * We make sure the buffer is never 100% full */
10193 assert(pheader->write_pointer != pheader->read_pointer);
10194
10195 /* check if anybody has a request for this event */
10196 for (int i = 0; i < pheader->max_client_index; i++) {
10197 BUFFER_CLIENT *pc = pheader->client + i;
10198 int r = bm_find_first_request_locked(pc, pevent);
10199 if (r >= 0) {
10200 request_id[i] = r;
10201 }
10202 }
10203
10204 /* this loop does not loop forever because rp
10205 * is monotonously incremented here. write_cache_wp does
10206 * not change */
10207
10208 pbuf->write_cache_rp += total_size;
10209 //written += total_size;
10210
10211 assert(pbuf->write_cache_rp > 0);
10212 assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10213 assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10214 }
10215
10216 /* the write cache is now empty */
10217 assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10218 pbuf->write_cache_wp = 0;
10219 pbuf->write_cache_rp = 0;
10220
10221 /* check which clients are waiting */
10222 for (int i = 0; i < pheader->max_client_index; i++) {
10223 BUFFER_CLIENT *pc = pheader->client + i;
10224 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10225 }
10226 }
10227
10228 return BM_SUCCESS;
10229}
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition midas.cxx:9614
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:9600
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition midas.cxx:6325
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition midas.cxx:9515
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition midas.cxx:9091
#define ALIGN8(x)
Definition midas.h:522
INT num_in_events
Definition midas.h:965
INT write_pointer
Definition midas.h:964
INT read_pointer
Definition midas.h:963
int count_sent
Definition midas.h:1013
double bytes_sent
Definition midas.h:1014
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 9998 of file midas.cxx.

9999{
10000 //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
10001
10002 DWORD time_start = ss_millitime();
10003 DWORD time_end = time_start + timeout_msec;
10004 DWORD time_bombout = time_end;
10005
10006 if (timeout_msec < 10000)
10007 time_bombout = time_start + 10000; // 10 seconds
10008
10009 int xtimeout_msec = timeout_msec;
10010
10011 while (1) {
10012 if (timeout_msec == BM_WAIT) {
10013 xtimeout_msec = 1000;
10014 } else if (timeout_msec == BM_NO_WAIT) {
10015 xtimeout_msec = BM_NO_WAIT;
10016 } else {
10017 if (xtimeout_msec > 1000) {
10018 xtimeout_msec = 1000;
10019 }
10020 }
10021
10022 int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
10023
10024 //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
10025
10026 if (status == BM_ASYNC_RETURN) {
10027 if (timeout_msec == BM_WAIT) {
10028 DWORD now = ss_millitime();
10029 if (now >= time_bombout) {
10030 // timeout
10031 return BM_TIMEOUT;
10032 }
10033
10034 // BM_WAIT means wait forever
10035 continue;
10036 } else if (timeout_msec == BM_NO_WAIT) {
10037 // BM_NO_WAIT means do not wait
10038 return status;
10039 } else {
10040 DWORD now = ss_millitime();
10041 if (now >= time_end) {
10042 // timeout, return BM_ASYNC_RETURN
10043 return status;
10044 }
10045
10046 DWORD remain = time_end - now;
10047
10048 if (remain < (DWORD)xtimeout_msec) {
10049 xtimeout_msec = remain;
10050 }
10051
10052 if (now >= time_bombout) {
10053 // timeout
10054 return BM_TIMEOUT;
10055 }
10056
10057 // keep asking for event...
10058 continue;
10059 }
10060 } else if (status == BM_SUCCESS) {
10061 // success, return BM_SUCCESS
10062 return status;
10063 } else {
10064 // error
10065 return status;
10066 }
10067 }
10068}
#define BM_TIMEOUT
Definition midas.h:625
#define BM_WAIT
Definition midas.h:365
#define RPC_BM_FLUSH_CACHE
Definition mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char *  buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7086 of file midas.cxx.

7087{
7088 gBuffersMutex.lock();
7089 for (size_t i = 0; i < gBuffers.size(); i++) {
7090 BUFFER* pbuf = gBuffers[i];
7091 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7092 *buffer_handle = i + 1;
7093 gBuffersMutex.unlock();
7094 return BM_SUCCESS;
7095 }
7096 }
7097 gBuffersMutex.unlock();
7098 return BM_NOT_FOUND;
7099}
#define BM_NOT_FOUND
Definition midas.h:612
BOOL equal_ustring(const char *str1, const char *str2)
Definition odb.cxx:3285
char buffer_name[NAME_LENGTH]
Definition mevb.cxx:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8866 of file midas.cxx.

8866 {
8867 /* increment read cache read pointer */
8868 pbuf->read_cache_rp += total_size;
8869
8870 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8871 pbuf->read_cache_rp = 0;
8872 pbuf->read_cache_wp = 0;
8873 }
8874}
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6241 of file midas.cxx.

6242{
6243#if 0
6244 if (gRpLog == NULL) {
6245 gRpLog = fopen("rp.log", "a");
6246 }
6247 if (gRpLog && (total_size < 16)) {
6248 const char *pdata = (const char *) (pheader + 1);
6249 const DWORD *pevent = (const DWORD*) (pdata + rp);
6250 fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6251 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6252 }
6253#endif
6254
6255 // these checks are already done before we come here.
6256 // but we check again as last-ressort protection. K.O.
6257 assert(total_size > 0);
6258 assert(total_size >= (int)sizeof(EVENT_HEADER));
6259
6260 rp += total_size;
6261 if (rp >= pheader->size) {
6262 rp -= pheader->size;
6263 } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6264 // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6265 // if at the end of the buffer, the remaining free space is exactly
6266 // equal to the size of an event header, the event header
6267 // is written there, the pointer is wrapped and the event data
6268 // is written to the beginning of the buffer.
6269 rp = 0;
6270 }
6271 return rp;
6272}
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 6033 of file midas.cxx.

6033 {
6034 // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
6035 // because of mismatch in sign-extension between signed 16-bit event_id and
6036 // unsigned 16-bit constants. K.O.
6037
6038 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
6039 /* fragmented event */
6040 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
6041 && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
6042
6043 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
6044 && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
6045}
#define TRIGGER_ALL
Definition midas.h:538
#define EVENTID_ALL
Definition midas.h:537
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char *  who,
const BUFFER_HEADER pheader,
const char *  pdata,
int  rp 
)
static

Definition at line 6274 of file midas.cxx.

6274 {
6275 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6276 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6277 int total_size = ALIGN8(event_size);
6278
6279 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6280 cm_msg(MERROR, "bm_next_rp",
6281 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6282 pheader->name,
6283 rp,
6284 pevent->data_size,
6285 event_size,
6286 total_size,
6287 pheader->read_pointer,
6288 pheader->write_pointer,
6289 pheader->size,
6290 who);
6291 return -1;
6292 }
6293
6294 int remaining = 0;
6295 if (rp < pheader->write_pointer) {
6296 remaining = pheader->write_pointer - rp;
6297 } else {
6298 remaining = pheader->size - rp;
6299 remaining += pheader->write_pointer;
6300 }
6301
6302 //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6303
6304 if (total_size > remaining) {
6305 cm_msg(MERROR, "bm_next_rp",
6306 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6307 pheader->name,
6308 rp,
6309 pevent->data_size,
6310 event_size,
6311 total_size,
6312 pheader->read_pointer,
6313 pheader->write_pointer,
6314 pheader->size,
6315 remaining,
6316 who);
6317 return -1;
6318 }
6319
6320 rp = bm_incr_rp_no_check(pheader, rp, total_size);
6321
6322 return rp;
6323}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9614 of file midas.cxx.

9614 {
9615 if (request_id >= 0) {
9616 /* if that client has a request and is suspended, wake it up */
9617 if (pc->read_wait) {
9618 char str[80];
9619 sprintf(str, "B %s %d", pheader->name, request_id);
9620 ss_resume(pc->port, str);
9621 //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9622 //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9623 pc->read_wait = FALSE;
9624 }
9625 }
9626}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char *  buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6728
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8476
INT cm_yield(INT millisec)
Definition midas.cxx:5660
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition midas.cxx:2294
INT cm_disconnect_experiment(void)
Definition midas.cxx:2862
#define CM_SUCCESS
Definition midas.h:582
#define SS_ABORT
Definition midas.h:678
#define RPC_SHUTDOWN
Definition midas.h:708
int main()
Definition hwtest.cxx:23
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define EVENT_BUFFER_NAME
Definition midas.h:269
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6728 of file midas.cxx.

6728 {
6729 INT status;
6730
6731 if (rpc_is_remote()) {
6732 status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6733
6734 HNDLE hDB;
6736 if (status != SUCCESS || hDB == 0) {
6737 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6738 return BM_NO_SHM;
6739 }
6740
6742
6743 int size = sizeof(INT);
6744 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6745
6746 if (status != DB_SUCCESS) {
6747 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6748 status);
6749 return status;
6750 }
6751
6752 return status;
6753 }
6754#ifdef LOCAL_ROUTINES
6755 {
6756 HNDLE shm_handle;
6757 size_t shm_size;
6758 HNDLE hDB;
6759 const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6760
6761 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6762
6763 if (!buffer_name || !buffer_name[0]) {
6764 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6765 return BM_INVALID_PARAM;
6766 }
6767
6768 if (strlen(buffer_name) >= NAME_LENGTH) {
6769 cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6770 return BM_INVALID_PARAM;
6771 }
6772
6774
6775 if (status != SUCCESS || hDB == 0) {
6776 //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6777 return BM_NO_SHM;
6778 }
6779
6780 /* get buffer size from ODB, user parameter as default if not present in ODB */
6781 std::string odb_path;
6782 odb_path += "/Experiment/Buffer sizes/";
6783 odb_path += buffer_name;
6784
6785 int size = sizeof(INT);
6786 status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6787
6788 if (buffer_size <= 0 || buffer_size > max_buffer_size) {
6789 cm_msg(MERROR, "bm_open_buffer",
6790 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6791 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6792 return BM_INVALID_PARAM;
6793 }
6794
6796
6797 size = sizeof(INT);
6798 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6799
6800 if (status != DB_SUCCESS) {
6801 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6802 status);
6803 return status;
6804 }
6805
6806 /* check if buffer already is open */
6807 gBuffersMutex.lock();
6808 for (size_t i = 0; i < gBuffers.size(); i++) {
6809 BUFFER* pbuf = gBuffers[i];
6810 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6811 *buffer_handle = i + 1;
6812 gBuffersMutex.unlock();
6813 return BM_SUCCESS;
6814 }
6815 }
6816 gBuffersMutex.unlock();
6817
6818 // only one thread at a time should create new buffers
6819
6820 static std::mutex gNewBufferMutex;
6821 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6822
6823 // if we had a race against another thread
6824 // and while we were waiting for gNewBufferMutex
6825 // the other thread created this buffer, we return it.
6826
6827 gBuffersMutex.lock();
6828 for (size_t i = 0; i < gBuffers.size(); i++) {
6829 BUFFER* pbuf = gBuffers[i];
6830 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6831 *buffer_handle = i + 1;
6832 gBuffersMutex.unlock();
6833 return BM_SUCCESS;
6834 }
6835 }
6836 gBuffersMutex.unlock();
6837
6838 /* allocate new BUFFER object */
6839
6840 BUFFER* pbuf = new BUFFER;
6841
6842 /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6843
6844 for (int i=0; i<MAX_CLIENTS; i++) {
6845 pbuf->client_count_write_wait[i] = 0;
6846 pbuf->client_time_write_wait[i] = 0;
6847 }
6848
6849 /* create buffer semaphore */
6850
6852
6853 if (status != SS_CREATED && status != SS_SUCCESS) {
6854 *buffer_handle = 0;
6855 delete pbuf;
6856 return BM_NO_SEMAPHORE;
6857 }
6858
6859 std::string client_name = cm_get_client_name();
6860
6861 /* store client name */
6862 mstrlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6863
6864 /* store buffer name */
6865 mstrlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6866
6867 /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6868
6869 pbuf->attached = true; // required by bm_lock_buffer()
6870
6871 bm_lock_buffer_guard pbuf_guard(pbuf);
6872
6873 if (!pbuf_guard.is_locked()) {
6874 // cannot happen, no other thread can see this pbuf
6875 abort();
6876 return BM_NO_SEMAPHORE;
6877 }
6878
6879 /* open shared memory */
6880
6881 void *p = NULL;
6882 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6883
6884 if (status != SS_SUCCESS && status != SS_CREATED) {
6885 *buffer_handle = 0;
6886 pbuf_guard.unlock();
6887 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6888 delete pbuf;
6889 return BM_NO_SHM;
6890 }
6891
6892 pbuf->buffer_header = (BUFFER_HEADER *) p;
6893
6894 BUFFER_HEADER *pheader = pbuf->buffer_header;
6895
6896 bool shm_created = (status == SS_CREATED);
6897
6898 if (shm_created) {
6899 /* initialize newly created shared memory */
6900
6901 memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6902
6903 mstrlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6904 pheader->size = buffer_size;
6905
6906 } else {
6907 /* validate existing shared memory */
6908
6909 if (!equal_ustring(pheader->name, buffer_name)) {
6910 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6911 pbuf_guard.unlock();
6912 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6913 cm_msg(MERROR, "bm_open_buffer",
6914 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6915 pheader->name);
6916 *buffer_handle = 0;
6917 delete pbuf;
6918 return BM_CORRUPTED;
6919 }
6920
6921 if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6922 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6923 pbuf_guard.unlock();
6924 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6925 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6927 *buffer_handle = 0;
6928 delete pbuf;
6929 return BM_CORRUPTED;
6930 }
6931
6932 if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6933 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6934 pbuf_guard.unlock();
6935 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6936 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6938 *buffer_handle = 0;
6939 delete pbuf;
6940 return BM_CORRUPTED;
6941 }
6942
6943 /* check if buffer size is identical */
6944 if (pheader->size != buffer_size) {
6945 cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6946 buffer_name, buffer_size, pheader->size);
6947
6948 buffer_size = pheader->size;
6949
6950 ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6951
6952 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6953
6954 if (status != SS_SUCCESS) {
6955 *buffer_handle = 0;
6956 pbuf_guard.unlock();
6957 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6958 delete pbuf;
6959 return BM_NO_SHM;
6960 }
6961
6962 pbuf->buffer_header = (BUFFER_HEADER *) p;
6963 pheader = pbuf->buffer_header;
6964 }
6965 }
6966
6967 /* shared memory is good from here down */
6968
6969 pbuf->attached = true;
6970
6971 pbuf->shm_handle = shm_handle;
6972 pbuf->shm_size = shm_size;
6973 pbuf->callback = FALSE;
6974
6975 bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6976
6978 if (status != BM_SUCCESS) {
6979 cm_msg(MERROR, "bm_open_buffer",
6980 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6981 status);
6983 cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6984 }
6985
6986 /* add our client BUFFER_HEADER */
6987
6988 int iclient = 0;
6989 for (; iclient < MAX_CLIENTS; iclient++)
6990 if (pheader->client[iclient].pid == 0)
6991 break;
6992
6993 if (iclient == MAX_CLIENTS) {
6994 *buffer_handle = 0;
6995 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6996 pbuf_guard.unlock();
6997 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6998 delete pbuf;
6999 cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
7000 return BM_NO_SLOT;
7001 }
7002
7003 /* store slot index in _buffer structure */
7004 pbuf->client_index = iclient;
7005
7006 /*
7007 Save the index of the last client of that buffer so that later only
7008 the clients 0..max_client_index-1 have to be searched through.
7009 */
7010 pheader->num_clients++;
7011 if (iclient + 1 > pheader->max_client_index)
7012 pheader->max_client_index = iclient + 1;
7013
7014 /* setup buffer header and client structure */
7015 BUFFER_CLIENT *pclient = &pheader->client[iclient];
7016
7017 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7018
7019 mstrlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
7020
7021 pclient->pid = ss_getpid();
7022
7024
7025 pclient->read_pointer = pheader->write_pointer;
7026 pclient->last_activity = ss_millitime();
7027
7029
7030 pbuf_guard.unlock();
7031
7032 /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
7033
7034 pheader = NULL;
7035
7036 /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
7037
7040
7041 /* add pbuf to buffer list */
7042
7043 gBuffersMutex.lock();
7044
7045 bool added = false;
7046 for (size_t i=0; i<gBuffers.size(); i++) {
7047 if (gBuffers[i] == NULL) {
7048 gBuffers[i] = pbuf;
7049 added = true;
7050 *buffer_handle = i+1;
7051 break;
7052 }
7053 }
7054 if (!added) {
7055 *buffer_handle = gBuffers.size() + 1;
7056 gBuffers.push_back(pbuf);
7057 }
7058
7059 /* from here down we should not touch pbuf without locking it */
7060
7061 pbuf = NULL;
7062
7063 gBuffersMutex.unlock();
7064
7065 /* new buffer is now ready for use */
7066
7067 /* initialize buffer counters */
7068 bm_init_buffer_counters(*buffer_handle);
7069
7070 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7071
7072 if (shm_created)
7073 return BM_CREATED;
7074 }
7075#endif /* LOCAL_ROUTINES */
7076
7077 return BM_SUCCESS;
7078}
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition midas.cxx:6084
static DWORD _bm_max_event_size
Definition midas.cxx:5932
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition midas.cxx:6426
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition midas.cxx:6409
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3341
std::string cm_get_client_name()
Definition midas.cxx:2075
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition midas.cxx:6170
#define BM_NO_SLOT
Definition midas.h:610
#define BM_NO_SHM
Definition midas.h:622
#define BM_CREATED
Definition midas.h:606
#define BM_NO_SEMAPHORE
Definition midas.h:611
#define DB_SUCCESS
Definition midas.h:632
#define SS_SUCCESS
Definition midas.h:664
#define SS_CREATED
Definition midas.h:665
#define SUCCESS
Definition mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition system.cxx:2532
INT ss_getpid(void)
Definition system.cxx:1379
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition system.cxx:326
midas_thread_t ss_gettid(void)
Definition system.cxx:1591
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition system.cxx:4425
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5185
#define RPC_BM_OPEN_BUFFER
Definition mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition midas.cxx:8074
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
#define NAME_LENGTH
Definition midas.h:272
INT client_index
Definition midas.h:990
int client_count_write_wait[MAX_CLIENTS]
Definition midas.h:1023
DWORD client_time_write_wait[MAX_CLIENTS]
Definition midas.h:1024
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8902 of file midas.cxx.

8903{
8904 if (pc->read_pointer == pheader->write_pointer) {
8905 /* no more events buffered for this client */
8906 if (!pc->read_wait) {
8907 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8908 pc->read_wait = TRUE;
8909 }
8910 return BM_ASYNC_RETURN;
8911 }
8912
8913 if (pc->read_wait) {
8914 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8915 pc->read_wait = FALSE;
8916 }
8917
8918 if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8919 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8920 return BM_CORRUPTED;
8921 }
8922
8923 char *pdata = (char *) (pheader + 1);
8924
8925 EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8926 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8927 int total_size = ALIGN8(event_size);
8928
8929 if ((total_size <= 0) || (total_size > pheader->size)) {
8930 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8931 return BM_CORRUPTED;
8932 }
8933
8934 assert(total_size > 0);
8935 assert(total_size <= pheader->size);
8936
8937 if (ppevent)
8938 *ppevent = pevent;
8939 if (pevent_size)
8940 *pevent_size = event_size;
8941 if (ptotal_size)
8942 *ptotal_size = total_size;
8943
8944 return BM_SUCCESS;
8945}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8876 of file midas.cxx.

8877{
8878 if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8879 return FALSE;
8880
8881 EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8882 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8883 int total_size = ALIGN8(event_size);
8884
8885 if (ppevent)
8886 *ppevent = pevent;
8887 if (pevent_size)
8888 *pevent_size = event_size;
8889 if (ptotal_size)
8890 *ptotal_size = total_size;
8891
8892 return TRUE;
8893}
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11265 of file midas.cxx.

11279{
11280 BOOL dispatched_something = FALSE;
11281
11282 //printf("bm_poll_event!\n");
11283
11284 DWORD start_time = ss_millitime();
11285
11286 std::vector<char> vec;
11287
11288 /* loop over all requests */
11289 _request_list_mutex.lock();
11290 bool locked = true;
11291 size_t n = _request_list.size();
11292 for (size_t i = 0; i < n; i++) {
11293 if (!locked) {
11294 _request_list_mutex.lock();
11295 locked = true;
11296 }
11297 /* continue if no dispatcher set (manual bm_receive_event) */
11298 if (_request_list[i].dispatcher == NULL)
11299 continue;
11300
11301 int buffer_handle = _request_list[i].buffer_handle;
11302
11303 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11304 _request_list_mutex.unlock();
11305 locked = false;
11306
11307 do {
11308 /* receive event */
11309 int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11310
11311 //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11312
11313 /* call user function if successful */
11314 if (status == BM_SUCCESS) {
11315 bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11316 dispatched_something = TRUE;
11317 }
11318
11319 /* break if no more events */
11320 if (status == BM_ASYNC_RETURN)
11321 break;
11322
11323 /* break if corrupted event buffer */
11324 if (status == BM_TRUNCATED) {
11325 cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11326 }
11327
11328 /* break if corrupted event buffer */
11329 if (status == BM_CORRUPTED)
11330 return SS_ABORT;
11331
11332 /* break if server died */
11333 if (status == RPC_NET_ERROR) {
11334 return SS_ABORT;
11335 }
11336
11337 /* stop after one second */
11338 if (ss_millitime() - start_time > 1000) {
11339 break;
11340 }
11341
11342 } while (TRUE);
11343 }
11344
11345 if (locked)
11346 _request_list_mutex.unlock();
11347
11348 if (dispatched_something)
11349 return BM_SUCCESS;
11350 else
11351 return BM_ASYNC_RETURN;
11352}
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10948
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition midas.cxx:8834
#define BM_TRUNCATED
Definition midas.h:614
#define RPC_NET_ERROR
Definition midas.h:702
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 11041 of file midas.cxx.

11041 {
11042 //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
11043
11044 /* return immediately if no callback routine is defined */
11045 if (!pbuf->callback)
11046 return BM_SUCCESS;
11047
11048 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
11049}
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition midas.cxx:10290
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void *  buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10290 of file midas.cxx.

10290 {
10292
10293 int max_size = 0;
10294 if (buf_size) {
10295 max_size = *buf_size;
10296 *buf_size = 0;
10297 }
10298
10299 //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10300
10301 bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10302
10303 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10304
10305 /* look if there is anything in the cache */
10306 if (pbuf->read_cache_size > 0) {
10307
10309
10310 if (status != BM_SUCCESS)
10311 return status;
10312
10313 if (pbuf->read_cache_wp == 0) {
10314
10315 // lock buffer for the first time
10316
10317 if (!pbuf_guard.relock()) {
10318 pbuf->read_cache_mutex.unlock();
10319 return pbuf_guard.get_status();
10320 }
10321
10322 status = bm_fill_read_cache_locked(pbuf_guard, timeout_msec);
10323 if (status != BM_SUCCESS) {
10324 // unlock in correct order
10325 if (pbuf_guard.is_locked()) {
10326 // check if bm_wait_for_more_events() failed to relock the buffer
10327 pbuf_guard.unlock();
10328 }
10329 pbuf->read_cache_mutex.unlock();
10330 return status;
10331 }
10332
10333 // buffer remains locked here
10334 }
10335 EVENT_HEADER *pevent;
10336 int event_size;
10337 int total_size;
10338 if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10339 if (pbuf_guard.is_locked()) {
10340 // do not need to keep the event buffer locked
10341 // when reading from the read cache
10342 pbuf_guard.unlock();
10343 }
10344 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10346 if (buf) {
10347 if (event_size > max_size) {
10348 cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10349 event_size = max_size;
10351 }
10352
10353 memcpy(buf, pevent, event_size);
10354
10355 if (buf_size) {
10356 *buf_size = event_size;
10357 }
10358 if (convert_flags) {
10359 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10360 }
10361 } else if (bufptr) {
10362 *bufptr = malloc(event_size);
10363 memcpy(*bufptr, pevent, event_size);
10365 } else if (vecptr) {
10366 vecptr->resize(0);
10367 char* cptr = (char*)pevent;
10368 vecptr->assign(cptr, cptr+event_size);
10369 }
10370 bm_incr_read_cache_locked(pbuf, total_size);
10371 pbuf->read_cache_mutex.unlock();
10372 if (dispatch) {
10373 // FIXME need to protect currently dispatched event against
10374 // another thread overwriting it by refilling the read cache
10375 bm_dispatch_event(buffer_handle, pevent);
10376 return BM_MORE_EVENTS;
10377 }
10378 // buffer is unlocked here
10379 return status;
10380 }
10381 pbuf->read_cache_mutex.unlock();
10382 }
10383
10384 /* we come here if the read cache is disabled */
10385 /* we come here if the next event is too big to fit into the read cache */
10386
10387 if (!pbuf_guard.is_locked()) {
10388 if (!pbuf_guard.relock())
10389 return pbuf_guard.get_status();
10390 }
10391
10392 EVENT_HEADER *event_buffer = NULL;
10393
10394 BUFFER_HEADER *pheader = pbuf->buffer_header;
10395
10396 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
10397
10398 while (1) {
10399 /* loop over events in the event buffer */
10400
10401 status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, FALSE);
10402
10403 if (status != BM_SUCCESS) {
10404 // implicit unlock
10405 return status;
10406 }
10407
10408 /* check if event at current read pointer matches a request */
10409
10410 EVENT_HEADER *pevent;
10411 int event_size;
10412 int total_size;
10413
10414 status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10415 if (status == BM_CORRUPTED) {
10416 // implicit unlock
10417 return status;
10418 } else if (status != BM_SUCCESS) {
10419 /* event buffer is empty */
10420 break;
10421 }
10422
10423 BOOL is_requested = bm_check_requests(pc, pevent);
10424
10425 if (is_requested) {
10426 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10427
10429
10430 if (buf) {
10431 if (event_size > max_size) {
10432 cm_msg(MERROR, "bm_read_buffer",
10433 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10434 event_size, pheader->name);
10435 event_size = max_size;
10437 }
10438
10439 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10440
10441 if (buf_size) {
10442 *buf_size = event_size;
10443 }
10444
10445 if (convert_flags) {
10446 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10447 }
10448
10449 pbuf->count_read++;
10450 pbuf->bytes_read += event_size;
10451 } else if (dispatch || bufptr) {
10452 assert(event_buffer == NULL); // make sure we only come here once
10453 event_buffer = (EVENT_HEADER *) malloc(event_size);
10455 pbuf->count_read++;
10456 pbuf->bytes_read += event_size;
10457 } else if (vecptr) {
10458 bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10459 pbuf->count_read++;
10460 pbuf->bytes_read += event_size;
10461 }
10462
10463 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10464 pc->read_pointer = new_read_pointer;
10465
10466 pheader->num_out_events++;
10467 /* exit loop over events */
10468 break;
10469 }
10470
10471 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10472 pc->read_pointer = new_read_pointer;
10473 pheader->num_out_events++;
10474 }
10475
10476 /*
10477 If read pointer has been changed, it may have freed up some space
10478 for waiting producers. So check if free space is now more than 50%
10479 of the buffer size and wake waiting producers.
10480 */
10481
10482 bm_wakeup_producers_locked(pheader, pc);
10483
10484 pbuf_guard.unlock();
10485
10486 if (dispatch && event_buffer) {
10487 bm_dispatch_event(buffer_handle, event_buffer);
10488 free(event_buffer);
10489 event_buffer = NULL;
10490 return BM_MORE_EVENTS;
10491 }
10492
10493 if (bufptr && event_buffer) {
10494 *bufptr = event_buffer;
10495 event_buffer = NULL;
10497 }
10498
10499 if (event_buffer) {
10500 free(event_buffer);
10501 event_buffer = NULL;
10502 }
10503
10504 return status;
10505}
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition midas.cxx:9080
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:9003
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8876
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition midas.cxx:8866
void * event_buffer
Definition mfe.cxx:65
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char *  buf,
int  event_size 
)
static

Definition at line 8947 of file midas.cxx.

8948{
8949 const char *pdata = (const char *) (pheader + 1);
8950
8951 if (rp + event_size <= pheader->size) {
8952 /* copy event to cache */
8953 memcpy(buf, pdata + rp, event_size);
8954 } else {
8955 /* event is splitted */
8956 int size = pheader->size - rp;
8957 memcpy(buf, pdata + rp, size);
8958 memcpy(buf + size, pdata, event_size - size);
8959 }
8960}
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8962 of file midas.cxx.

8963{
8964 const char *pdata = (const char *) (pheader + 1);
8965
8966 if (rp + event_size <= pheader->size) {
8967 /* copy event to cache */
8968 vecptr->assign(pdata + rp, pdata + rp + event_size);
8969 } else {
8970 /* event is splitted */
8971 int size = pheader->size - rp;
8972 vecptr->assign(pdata + rp, pdata + rp + size);
8973 vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8974 }
8975}

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void *  destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition midas.cxx:10789
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10789 of file midas.cxx.

10789 {
10790 //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10791 if (rpc_is_remote()) {
10792 return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10793 }
10794#ifdef LOCAL_ROUTINES
10795 {
10797
10798 BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10799
10800 if (!pbuf)
10801 return status;
10802
10803 int convert_flags = rpc_get_convert_flags();
10804
10805 status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10806 //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10807 return status;
10808 }
10809#else /* LOCAL_ROUTINES */
10810
10811 return BM_SUCCESS;
10812#endif
10813}
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10509
INT rpc_get_convert_flags(void)
Definition midas.cxx:13161
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10870 of file midas.cxx.

10870 {
10871 if (rpc_is_remote()) {
10872 return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10873 }
10874#ifdef LOCAL_ROUTINES
10875 {
10877
10878 BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10879
10880 if (!pbuf)
10881 return status;
10882
10883 int convert_flags = rpc_get_convert_flags();
10884
10885 return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10886 }
10887#else /* LOCAL_ROUTINES */
10888
10889 return BM_SUCCESS;
10890#endif
10891}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void *  buf,
int *  buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10509 of file midas.cxx.

10510{
10511 //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10512
10513 assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10514
10515 void *xbuf = NULL;
10516 int xbuf_size = 0;
10517
10518 if (buf) {
10519 xbuf = buf;
10520 xbuf_size = *buf_size;
10521 } else if (ppevent) {
10522 *ppevent = (EVENT_HEADER*)malloc(_bm_max_event_size);
10523 xbuf_size = _bm_max_event_size;
10524 } else if (pvec) {
10525 pvec->resize(_bm_max_event_size);
10526 xbuf = pvec->data();
10527 xbuf_size = pvec->size();
10528 } else {
10529 assert(!"incorrect call to bm_receivent_event_rpc()");
10530 }
10531
10532 int status;
10533 DWORD time_start = ss_millitime();
10534 DWORD time_end = time_start + timeout_msec;
10535
10536 int xtimeout_msec = timeout_msec;
10537
10538 int zbuf_size = xbuf_size;
10539
10540 while (1) {
10541 if (timeout_msec == BM_WAIT) {
10542 xtimeout_msec = 1000;
10543 } else if (timeout_msec == BM_NO_WAIT) {
10544 xtimeout_msec = BM_NO_WAIT;
10545 } else {
10546 if (xtimeout_msec > 1000) {
10547 xtimeout_msec = 1000;
10548 }
10549 }
10550
10551 zbuf_size = xbuf_size;
10552
10553 status = rpc_call(RPC_BM_RECEIVE_EVENT, buffer_handle, xbuf, &zbuf_size, xtimeout_msec);
10554
10555 //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10556
10557 if (status == BM_ASYNC_RETURN) {
10558 if (timeout_msec == BM_WAIT) {
10559 // BM_WAIT means wait forever
10560 continue;
10561 } else if (timeout_msec == BM_NO_WAIT) {
10562 // BM_NO_WAIT means do not wait
10563 break;
10564 } else {
10565 DWORD now = ss_millitime();
10566 if (now >= time_end) {
10567 // timeout, return BM_ASYNC_RETURN
10568 break;
10569 }
10570
10571 DWORD remain = time_end - now;
10572
10573 if (remain < (DWORD)xtimeout_msec) {
10574 xtimeout_msec = remain;
10575 }
10576
10577 // keep asking for event...
10578 continue;
10579 }
10580 } else if (status == BM_SUCCESS) {
10581 // success, return BM_SUCCESS
10582 break;
10583 }
10584
10585 // RPC error
10586
10587 if (buf) {
10588 *buf_size = 0;
10589 } else if (ppevent) {
10590 free(*ppevent);
10591 *ppevent = NULL;
10592 } else if (pvec) {
10593 pvec->resize(0);
10594 } else {
10595 assert(!"incorrect call to bm_receivent_event_rpc()");
10596 }
10597
10598 return status;
10599 }
10600
10601 // status is BM_SUCCESS or BM_ASYNC_RETURN
10602
10603 if (buf) {
10604 *buf_size = zbuf_size;
10605 } else if (ppevent) {
10606 // nothing to do
10607 // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10608 } else if (pvec) {
10609 pvec->resize(zbuf_size);
10610 } else {
10611 assert(!"incorrect call to bm_receivent_event_rpc()");
10612 }
10613
10614 return status;
10615}
#define RPC_BM_RECEIVE_EVENT
Definition mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc_cxx()

static INT bm_receive_event_rpc_cxx ( INT  buffer_handle,
void *  buf,
int *  buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10617 of file midas.cxx.

10618{
10619 //printf("bm_receive_event_rpc_cxx: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10620
10621 std::vector<char> *pv;
10622
10623 if (pvec == NULL)
10624 pv = new std::vector<char>;
10625 else
10626 pv = pvec;
10627
10628 pv->clear();
10629
10630 int status;
10631 DWORD time_start = ss_millitime();
10632 DWORD time_end = time_start + timeout_msec;
10633
10634 int xtimeout_msec = timeout_msec;
10635
10636 while (1) {
10637 if (timeout_msec == BM_WAIT) {
10638 xtimeout_msec = 1000;
10639 } else if (timeout_msec == BM_NO_WAIT) {
10640 xtimeout_msec = BM_NO_WAIT;
10641 } else {
10642 if (xtimeout_msec > 1000) {
10643 xtimeout_msec = 1000;
10644 }
10645 }
10646
10647 status = rpc_call(RPC_BM_RECEIVE_EVENT_CXX, buffer_handle, pv, xtimeout_msec);
10648
10649 printf("bm_receive_event_rpc_cxx: handle %d, timeout %d, status %d, size %zu, via RPC_BM_RECEIVE_EVENT_CXX\n", buffer_handle, xtimeout_msec, status, pv->size());
10650
10651 if (status == BM_ASYNC_RETURN) {
10652 if (timeout_msec == BM_WAIT) {
10653 // BM_WAIT means wait forever
10654 continue;
10655 } else if (timeout_msec == BM_NO_WAIT) {
10656 // BM_NO_WAIT means do not wait
10657 break;
10658 } else {
10659 DWORD now = ss_millitime();
10660 if (now >= time_end) {
10661 // timeout, return BM_ASYNC_RETURN
10662 break;
10663 }
10664
10665 DWORD remain = time_end - now;
10666
10667 if (remain < (DWORD)xtimeout_msec) {
10668 xtimeout_msec = remain;
10669 }
10670
10671 // keep asking for event...
10672 continue;
10673 }
10674 } else if (status == BM_SUCCESS) {
10675 // success, return BM_SUCCESS
10676 break;
10677 }
10678
10679 // RPC error
10680
10681 if (buf) {
10682 *buf_size = 0;
10683 } else if (ppevent) {
10684 free(*ppevent);
10685 *ppevent = NULL;
10686 } else if (pvec) {
10687 pvec->clear();
10688 } else {
10689 assert(!"incorrect call to bm_receivent_event_rpc_cxx()");
10690 }
10691
10692 if (pvec == NULL)
10693 delete pv;
10694
10695 return status;
10696 }
10697
10698 // status is BM_SUCCESS or BM_ASYNC_RETURN
10699
10700 if (buf) {
10701 if (pv->size() > (size_t)*buf_size) {
10703 memcpy(buf, pv->data(), *buf_size);
10704 } else {
10705 *buf_size = pv->size();
10706 memcpy(buf, pv->data(), *buf_size);
10707 }
10708 } else if (ppevent) {
10709 if (*ppevent == NULL) {
10710 *ppevent = (EVENT_HEADER*)malloc(pv->size());
10711 assert(*ppevent != NULL);
10712 memcpy(*ppevent, pv->data(), pv->size());
10713 } else {
10714 *ppevent = (EVENT_HEADER*)realloc(*ppevent, pv->size()); // shrink memory allocation
10715 assert(*ppevent != NULL);
10716 memcpy(*ppevent, pv->data(), pv->size());
10717 }
10718 } else if (pvec) {
10719 // nothing to do
10720 } else {
10721 assert(!"incorrect call to bm_receivent_event_rpc()");
10722 }
10723
10724 if (!pvec)
10725 delete pv;
10726
10727 return status;
10728}
#define RPC_BM_RECEIVE_EVENT_CXX
Definition mrpc.h:51
Here is the call graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10948 of file midas.cxx.

10948 {
10949 if (rpc_is_remote()) {
10950 return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10951 }
10952#ifdef LOCAL_ROUTINES
10953 {
10955
10956 BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10957
10958 if (!pbuf)
10959 return status;
10960
10961 int convert_flags = rpc_get_convert_flags();
10962
10963 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10964 }
10965#else /* LOCAL_ROUTINES */
10966 return BM_SUCCESS;
10967#endif
10968}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 6053 of file midas.cxx.

6053 {
6054 int k, nc;
6055 BUFFER_CLIENT *pbctmp;
6056
6057 /* clear entry from client structure in buffer header */
6058 memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6059
6060 /* calculate new max_client_index entry */
6061 for (k = MAX_CLIENTS - 1; k >= 0; k--)
6062 if (pheader->client[k].pid != 0)
6063 break;
6064 pheader->max_client_index = k + 1;
6065
6066 /* count new number of clients */
6067 for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6068 if (pheader->client[k].pid != 0)
6069 nc++;
6070 pheader->num_clients = nc;
6071
6072 /* check if anyone is waiting and wake him up */
6073 pbctmp = pheader->client;
6074
6075 for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6076 if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6077 ss_resume(pbctmp->port, "B ");
6078}
INT k
Definition odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8529 of file midas.cxx.

8529 {
8530 if (rpc_is_remote())
8531 return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8532
8533#ifdef LOCAL_ROUTINES
8534 {
8535 int status = 0;
8536
8537 BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8538
8539 if (!pbuf)
8540 return status;
8541
8542 /* lock buffer */
8543 bm_lock_buffer_guard pbuf_guard(pbuf);
8544
8545 if (!pbuf_guard.is_locked())
8546 return pbuf_guard.get_status();
8547
8548 INT i, deleted;
8549
8550 /* get a pointer to the proper client structure */
8551 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
8552
8553 /* check all requests and set to zero if matching */
8554 for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8555 if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8556 memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8557 deleted++;
8558 }
8559
8560 /* calculate new max_request_index entry */
8561 for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8562 if (pclient->event_request[i].valid)
8563 break;
8564
8565 pclient->max_request_index = i + 1;
8566
8567 /* calculate new all_flag */
8568 pclient->all_flag = FALSE;
8569
8570 for (i = 0; i < pclient->max_request_index; i++)
8571 if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8572 pclient->all_flag = TRUE;
8573 break;
8574 }
8575
8576 pbuf->get_all_flag = pclient->all_flag;
8577
8578 if (!deleted)
8579 return BM_NOT_FOUND;
8580 }
8581#endif /* LOCAL_ROUTINES */
8582
8583 return BM_SUCCESS;
8584}
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8476 of file midas.cxx.

8480{
8481 assert(request_id != NULL);
8482
8483 EventRequest r;
8484 r.buffer_handle = buffer_handle;
8485 r.event_id = event_id;
8487 r.dispatcher = func;
8488
8489 {
8490 std::lock_guard<std::mutex> guard(_request_list_mutex);
8491
8492 bool found = false;
8493
8494 // find deleted entry
8495 for (size_t i = 0; i < _request_list.size(); i++) {
8496 if (_request_list[i].buffer_handle == 0) {
8497 _request_list[i] = r;
8498 *request_id = i;
8499 found = true;
8500 break;
8501 }
8502 }
8503
8504 if (!found) { // not found
8505 *request_id = _request_list.size();
8506 _request_list.push_back(r);
8507 }
8508
8509 // implicit unlock()
8510 }
8511
8512 /* add request in buffer structure */
8513 int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8514 if (status != BM_SUCCESS)
8515 return status;
8516
8517 return BM_SUCCESS;
8518}
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition midas.cxx:8325
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6409 of file midas.cxx.

6409 {
6410 BUFFER_HEADER *pheader = pbuf->buffer_header;
6411
6412 //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6413
6414 pheader->read_pointer = 0;
6415 pheader->write_pointer = 0;
6416
6417 int i;
6418 for (i = 0; i < pheader->max_client_index; i++) {
6419 BUFFER_CLIENT *pc = pheader->client + i;
6420 if (pc->pid) {
6421 pc->read_pointer = 0;
6422 }
6423 }
6424}
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9689 of file midas.cxx.

9690{
9691 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9692 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9693
9694 if (data_size == 0) {
9695 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9696 return BM_INVALID_SIZE;
9697 }
9698
9699 if (data_size > MAX_DATA_SIZE) {
9700 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9701 return BM_INVALID_SIZE;
9702 }
9703
9704 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9705
9706 //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9707
9708 if (rpc_is_remote()) {
9709 //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9710 return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9711 } else {
9712 return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9713 }
9714}
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9789
#define BM_INVALID_SIZE
Definition midas.h:624
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition midas.cxx:14373
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
HNDLE hbuf;
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition midas.cxx:9689
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9789 of file midas.cxx.

9790{
9791 if (rpc_is_remote())
9792 return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9793
9794 if (sg_n < 1) {
9795 cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9796 return BM_INVALID_SIZE;
9797 }
9798
9799 if (sg_ptr[0] == NULL) {
9800 cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9801 return BM_INVALID_SIZE;
9802 }
9803
9804 if (sg_len[0] < sizeof(EVENT_HEADER)) {
9805 cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9806 return BM_INVALID_SIZE;
9807 }
9808
9809 const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9810
9811 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9812 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9813
9814 if (data_size == 0) {
9815 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9816 return BM_INVALID_SIZE;
9817 }
9818
9819 if (data_size > MAX_DATA_SIZE) {
9820 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9821 return BM_INVALID_SIZE;
9822 }
9823
9824 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9825
9826 size_t count = 0;
9827 for (int i=0; i<sg_n; i++) {
9828 count += sg_len[i];
9829 }
9830
9831 if (count != event_size) {
9832 cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9833 return BM_INVALID_SIZE;
9834 }
9835
9836 //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9837
9838#ifdef LOCAL_ROUTINES
9839 {
9840 int status = 0;
9841 const size_t total_size = ALIGN8(event_size);
9842
9843 BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9844
9845 if (!pbuf)
9846 return status;
9847
9848 /* round up total_size to next DWORD boundary */
9849 //int total_size = ALIGN8(event_size);
9850
9851 /* check if write cache is enabled */
9852 if (pbuf->write_cache_size) {
9854
9855 if (status != BM_SUCCESS)
9856 return status;
9857
9858 /* check if write cache is enabled */
9859 if (pbuf->write_cache_size) {
9861 bool too_big = event_size > max_event_size;
9862
9863 //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9864
9865 /* if this event does not fit into the write cache, flush the write cache */
9866 if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9867 //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9868
9869 bm_lock_buffer_guard pbuf_guard(pbuf);
9870
9871 if (!pbuf_guard.is_locked()) {
9872 pbuf->write_cache_mutex.unlock();
9873 return pbuf_guard.get_status();
9874 }
9875
9876 int status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
9877
9878 if (pbuf_guard.is_locked()) {
9879 // check if bm_wait_for_free_space() failed to relock the buffer
9880 pbuf_guard.unlock();
9881 }
9882
9883 if (status != BM_SUCCESS) {
9884 pbuf->write_cache_mutex.unlock();
9885 // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9886 if (status == BM_NO_MEMORY)
9887 cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9888 return status;
9889 }
9890
9891 // write cache must be empty here
9892 assert(pbuf->write_cache_wp == 0);
9893 }
9894
9895 /* write this event into the write cache, if it is not too big and if it fits */
9896 if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9897 //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9898
9899 char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9900
9901 for (int i=0; i<sg_n; i++) {
9902 memcpy(wptr, sg_ptr[i], sg_len[i]);
9903 wptr += sg_len[i];
9904 }
9905
9906 pbuf->write_cache_wp += total_size;
9907
9908 pbuf->write_cache_mutex.unlock();
9909 return BM_SUCCESS;
9910 }
9911 }
9912
9913 /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9914 pbuf->write_cache_mutex.unlock();
9915 }
9916
9917 /* we come here only for events that are too big to fit into the cache */
9918
9919 /* lock the buffer */
9920 bm_lock_buffer_guard pbuf_guard(pbuf);
9921
9922 if (!pbuf_guard.is_locked()) {
9923 return pbuf_guard.get_status();
9924 }
9925
9926 /* calculate some shorthands */
9927 BUFFER_HEADER *pheader = pbuf->buffer_header;
9928
9929#if 0
9931 if (status != BM_SUCCESS) {
9932 printf("bm_send_event: corrupted 111!\n");
9933 abort();
9934 }
9935#endif
9936
9937 /* check if buffer is large enough */
9938 if (total_size >= (size_t)pheader->size) {
9939 pbuf_guard.unlock(); // unlock before cm_msg()
9940 cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9941 return BM_NO_MEMORY;
9942 }
9943
9944 status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, total_size, false);
9945
9946 if (status != BM_SUCCESS) {
9947 // implicit unlock
9948 return status;
9949 }
9950
9951#if 0
9953 if (status != BM_SUCCESS) {
9954 printf("bm_send_event: corrupted 222!\n");
9955 abort();
9956 }
9957#endif
9958
9959 int old_write_pointer = pheader->write_pointer;
9960
9961 bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9962
9963 /* write pointer was incremented, but there should
9964 * always be some free space in the buffer and the
9965 * write pointer should never cacth up to the read pointer:
9966 * the rest of the code gets confused this happens (buffer 100% full)
9967 * as it is write_pointer == read_pointer can be either
9968 * 100% full or 100% empty. My solution: never fill
9969 * the buffer to 100% */
9970 assert(pheader->write_pointer != pheader->read_pointer);
9971
9972 /* send wake up messages to all clients that want this event */
9973 int i;
9974 for (i = 0; i < pheader->max_client_index; i++) {
9975 BUFFER_CLIENT *pc = pheader->client + i;
9976 int request_id = bm_find_first_request_locked(pc, pevent);
9977 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9978 }
9979
9980#if 0
9982 if (status != BM_SUCCESS) {
9983 printf("bm_send_event: corrupted 333!\n");
9984 abort();
9985 }
9986#endif
9987
9988 /* update statistics */
9989 pheader->num_in_events++;
9990 pbuf->count_sent += 1;
9991 pbuf->bytes_sent += total_size;
9992 }
9993#endif /* LOCAL_ROUTINES */
9994
9995 return BM_SUCCESS;
9996}
double count
Definition mdump.cxx:33
INT max_event_size
Definition mfed.cxx:30
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition midas.h:259
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9716 of file midas.cxx.

9717{
9718 const char* cptr = event.data();
9719 size_t clen = event.size();
9720 return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9721}
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char > > &  event,
int  timeout_msec 
)

Definition at line 9723 of file midas.cxx.

9724{
9725 int sg_n = event.size();
9726 const char* sg_ptr[sg_n];
9727 size_t sg_len[sg_n];
9728 for (int i=0; i<sg_n; i++) {
9729 sg_ptr[i] = event[i].data();
9730 sg_len[i] = event[i].size();
9731 }
9732 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9733}
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8151 of file midas.cxx.

8153{
8154 if (rpc_is_remote())
8155 return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8156
8157#ifdef LOCAL_ROUTINES
8158 {
8159 int status = 0;
8160
8161 BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8162
8163 if (!pbuf)
8164 return status;
8165
8166 /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8167
8169
8170 if (status != BM_SUCCESS)
8171 return status;
8172
8173 if (write_size < 0)
8174 write_size = 0;
8175
8176 if (write_size > 0) {
8177 if (write_size < MIN_WRITE_CACHE_SIZE) {
8178 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8179 write_size = MIN_WRITE_CACHE_SIZE;
8180 }
8181 }
8182
8183 size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8184
8185 if (write_size > max_write_size) {
8186 size_t new_write_size = max_write_size;
8187 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8188 write_size = new_write_size;
8189 }
8190
8191 pbuf->buffer_mutex.unlock();
8192
8193 /* resize read cache */
8194
8196
8197 if (status != BM_SUCCESS) {
8198 return status;
8199 }
8200
8201 if (pbuf->read_cache_size > 0) {
8202 free(pbuf->read_cache);
8203 pbuf->read_cache = NULL;
8204 }
8205
8206 if (read_size > 0) {
8207 pbuf->read_cache = (char *) malloc(read_size);
8208 if (pbuf->read_cache == NULL) {
8209 pbuf->read_cache_size = 0;
8210 pbuf->read_cache_rp = 0;
8211 pbuf->read_cache_wp = 0;
8212 pbuf->read_cache_mutex.unlock();
8213 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8214 return BM_NO_MEMORY;
8215 }
8216 }
8217
8218 pbuf->read_cache_size = read_size;
8219 pbuf->read_cache_rp = 0;
8220 pbuf->read_cache_wp = 0;
8221
8222 pbuf->read_cache_mutex.unlock();
8223
8224 /* resize the write cache */
8225
8227
8228 if (status != BM_SUCCESS)
8229 return status;
8230
8231 // FIXME: should flush the write cache!
8232 if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8233 cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8234 }
8235
8236 /* manage write cache */
8237 if (pbuf->write_cache_size > 0) {
8238 free(pbuf->write_cache);
8239 pbuf->write_cache = NULL;
8240 }
8241
8242 if (write_size > 0) {
8243 pbuf->write_cache = (char *) M_MALLOC(write_size);
8244 if (pbuf->write_cache == NULL) {
8245 pbuf->write_cache_size = 0;
8246 pbuf->write_cache_rp = 0;
8247 pbuf->write_cache_wp = 0;
8248 pbuf->write_cache_mutex.unlock();
8249 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8250 return BM_NO_MEMORY;
8251 }
8252 }
8253
8254 pbuf->write_cache_size = write_size;
8255 pbuf->write_cache_rp = 0;
8256 pbuf->write_cache_wp = 0;
8257
8258 pbuf->write_cache_mutex.unlock();
8259 }
8260#endif /* LOCAL_ROUTINES */
8261
8262 return BM_SUCCESS;
8263}
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition midas.cxx:7957
#define RPC_BM_SET_CACHE_SIZE
Definition mrpc.h:42
#define M_MALLOC(x)
Definition midas.h:1551
#define MIN_WRITE_CACHE_SIZE
Definition midas.h:257
#define MAX_WRITE_CACHE_SIZE_DIV
Definition midas.h:258
std::timed_mutex buffer_mutex
Definition midas.h:989
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10972 of file midas.cxx.

10973{
10974 /* clear read cache */
10975 if (pbuf->read_cache_size > 0) {
10976
10978
10979 if (status != BM_SUCCESS)
10980 return status;
10981
10982 pbuf->read_cache_rp = 0;
10983 pbuf->read_cache_wp = 0;
10984
10985 pbuf->read_cache_mutex.unlock();
10986 }
10987
10988 bm_lock_buffer_guard pbuf_guard(pbuf);
10989
10990 if (!pbuf_guard.is_locked())
10991 return pbuf_guard.get_status();
10992
10993 BUFFER_HEADER *pheader = pbuf->buffer_header;
10994
10995 /* forward read pointer to global write pointer */
10996 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
10997 pclient->read_pointer = pheader->write_pointer;
10998
10999 return BM_SUCCESS;
11000}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 11013 of file midas.cxx.

11013 {
11014 if (rpc_is_remote())
11015 return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
11016
11017#ifdef LOCAL_ROUTINES
11018 {
11019 int status = 0;
11020
11021 BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
11022
11023 if (!pbuf)
11024 return status;
11025
11026 return bm_skip_event(pbuf);
11027 }
11028#endif
11029
11030 return BM_SUCCESS;
11031}
#define RPC_BM_SKIP_EVENT
Definition mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6135 of file midas.cxx.

6135 {
6136 int pid = ss_getpid();
6137
6138 std::vector<BUFFER*> mybuffers;
6139
6140 gBuffersMutex.lock();
6141 mybuffers = gBuffers;
6142 gBuffersMutex.unlock();
6143
6144 for (BUFFER* pbuf : mybuffers) {
6145 if (!pbuf)
6146 continue;
6147 if (pbuf->attached) {
6148
6149 bm_lock_buffer_guard pbuf_guard(pbuf);
6150
6151 if (!pbuf_guard.is_locked())
6152 continue;
6153
6154 BUFFER_HEADER *pheader = pbuf->buffer_header;
6155 for (int j = 0; j < pheader->max_client_index; j++) {
6156 BUFFER_CLIENT *pclient = pheader->client + j;
6157 if (pclient->pid == pid) {
6158 pclient->last_activity = millitime;
6159 }
6160 }
6161 }
6162 }
6163}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char *  caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8731 of file midas.cxx.

8731 {
8732 assert(caller_name);
8733
8734 /* calculate global read pointer as "minimum" of client read pointers */
8735 int min_rp = pheader->write_pointer;
8736
8737 int i;
8738 for (i = 0; i < pheader->max_client_index; i++) {
8739 BUFFER_CLIENT *pc = pheader->client + i;
8740 if (pc->pid) {
8742
8743#if 0
8744 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8745 pheader->name,
8746 pheader->read_pointer,
8747 pheader->write_pointer,
8748 pheader->size,
8749 min_rp,
8750 pc->name,
8751 pc->read_pointer);
8752#endif
8753
8754 if (pheader->read_pointer <= pheader->write_pointer) {
8755 // normal pointers
8756 if (pc->read_pointer < min_rp)
8757 min_rp = pc->read_pointer;
8758 } else {
8759 // inverted pointers
8760 if (pc->read_pointer <= pheader->write_pointer) {
8761 // clients 3 and 4
8762 if (pc->read_pointer < min_rp)
8763 min_rp = pc->read_pointer;
8764 } else {
8765 // clients 1 and 2
8766 int xptr = pc->read_pointer - pheader->size;
8767 if (xptr < min_rp)
8768 min_rp = xptr;
8769 }
8770 }
8771 }
8772 }
8773
8774 if (min_rp < 0)
8775 min_rp += pheader->size;
8776
8777 assert(min_rp >= 0);
8778 assert(min_rp < pheader->size);
8779
8780 if (min_rp == pheader->read_pointer) {
8781 return FALSE;
8782 }
8783
8784#if 0
8785 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8786 pheader->name,
8787 pheader->read_pointer,
8788 pheader->write_pointer,
8789 pheader->size,
8790 min_rp);
8791#endif
8792
8793 pheader->read_pointer = min_rp;
8794
8795 return TRUE;
8796}
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition midas.cxx:8633
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6325 of file midas.cxx.

6325 {
6326 const BUFFER_HEADER *pheader = pbuf->buffer_header;
6327 const char *pdata = (const char *) (pheader + 1);
6328
6329 //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6330
6331 //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6332
6333 //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6334
6335 if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6336 cm_msg(MERROR, "bm_validate_buffer",
6337 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6338 pheader->read_pointer, pheader->size, pheader->write_pointer);
6339 return BM_CORRUPTED;
6340 }
6341
6342 if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6343 cm_msg(MERROR, "bm_validate_buffer",
6344 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6345 pheader->write_pointer, pheader->size, pheader->read_pointer);
6346 return BM_CORRUPTED;
6347 }
6348
6349 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6350 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6351 pheader->read_pointer);
6352 return BM_CORRUPTED;
6353 }
6354
6355 int rp = pheader->read_pointer;
6356 int rp0 = -1;
6357 while (rp != pheader->write_pointer) {
6358 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6359 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6360 pheader->name, rp, rp0);
6361 return BM_CORRUPTED;
6362 }
6363 //bm_print_event(pdata, rp);
6364 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6365 if (rp1 < 0) {
6366 cm_msg(MERROR, "bm_validate_buffer",
6367 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6368 return BM_CORRUPTED;
6369 }
6370 rp0 = rp;
6371 rp = rp1;
6372 }
6373
6374 int i;
6375 for (i = 0; i < MAX_CLIENTS; i++) {
6376 const BUFFER_CLIENT *c = &pheader->client[i];
6377 if (c->pid == 0)
6378 continue;
6379 BOOL get_all = FALSE;
6380 int j;
6381 for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6382 const EVENT_REQUEST *r = &c->event_request[j];
6383 if (!r->valid)
6384 continue;
6385 BOOL xget_all = r->sampling_type == GET_ALL;
6386 get_all = (get_all || xget_all);
6387 //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6388 }
6389
6390 int rp = c->read_pointer;
6391 int rp0 = -1;
6392 while (rp != pheader->write_pointer) {
6393 //bm_print_event(pdata, rp);
6394 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6395 if (rp1 < 0) {
6396 cm_msg(MERROR, "bm_validate_buffer",
6397 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6398 pheader->name, c->name, c->read_pointer, rp, rp0);
6399 return BM_CORRUPTED;
6400 }
6401 rp0 = rp;
6402 rp = rp1;
6403 }
6404 }
6405
6406 return BM_SUCCESS;
6407}
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition midas.cxx:6207
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition midas.cxx:6274
char c
Definition system.cxx:1312
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_index_locked()

static int bm_validate_client_index_locked ( bm_lock_buffer_guard pbuf_guard)
static

Definition at line 5940 of file midas.cxx.

5941{
5942 const BUFFER *pbuf = pbuf_guard.get_pbuf();
5943
5944 bool badindex = false;
5945 bool badclient = false;
5946
5947 int idx = pbuf->client_index;
5948
5949 if (idx < 0) {
5950 badindex = true;
5951 } else if (idx > pbuf->buffer_header->max_client_index) {
5952 badindex = true;
5953 } else {
5954 BUFFER_CLIENT *pclient = &pbuf->buffer_header->client[idx];
5955 if (pclient->name[0] == 0)
5956 badclient = true;
5957 else if (pclient->pid != ss_getpid())
5958 badclient = true;
5959
5960 //if (strcmp(pclient->name,"mdump")==0) {
5961 // for (int i=0; i<15; i++) {
5962 // printf("sleep %d\n", i);
5963 // ::sleep(1);
5964 // }
5965 //}
5966 }
5967
5968#if 0
5969 if (badindex) {
5970 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5971 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5972 badindex, ss_getpid());
5973 } else if (badclient) {
5974 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5975 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5976 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5977 ss_getpid(), badclient);
5978 } else {
5979 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5980 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5981 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5982 ss_getpid());
5983 }
5984#endif
5985
5986 if (badindex || badclient) {
5987 static int prevent_recursion = 1;
5988
5989 if (prevent_recursion) {
5990 prevent_recursion = 0;
5991
5992 if (badindex) {
5993 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5994 } else {
5995 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5996 }
5997
5998 cm_msg(MERROR, "bm_validate_client_index", "Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5999 }
6000
6001 if (badindex) {
6002 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
6003 } else {
6004 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
6005 }
6006
6007 fprintf(stderr, "bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6008
6009 pbuf_guard.unlock();
6010
6011 abort();
6012 }
6013
6014 return idx;
6015}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8633 of file midas.cxx.

8633 {
8634 assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8635 assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8636
8637 if (pheader->read_pointer <= pheader->write_pointer) {
8638
8639 if (pclient->read_pointer < pheader->read_pointer) {
8640 cm_msg(MINFO, "bm_validate_client_pointers",
8641 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8642 pclient->name,
8643 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8644
8645 pclient->read_pointer = pheader->read_pointer;
8646 }
8647
8648 if (pclient->read_pointer > pheader->write_pointer) {
8649 cm_msg(MINFO, "bm_validate_client_pointers",
8650 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8651 pclient->name,
8652 pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8653
8654 pclient->read_pointer = pheader->write_pointer;
8655 }
8656
8657 } else {
8658
8659 if (pclient->read_pointer < 0) {
8660 cm_msg(MINFO, "bm_validate_client_pointers",
8661 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8662 pclient->name,
8663 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8664
8665 pclient->read_pointer = pheader->read_pointer;
8666 }
8667
8668 if (pclient->read_pointer >= pheader->size) {
8669 cm_msg(MINFO, "bm_validate_client_pointers",
8670 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8671 pclient->name,
8672 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8673
8674 pclient->read_pointer = pheader->read_pointer;
8675 }
8676
8677 if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8678 cm_msg(MINFO, "bm_validate_client_pointers",
8679 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8680 pclient->name,
8681 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8682
8683 pclient->read_pointer = pheader->read_pointer;
8684 }
8685 }
8686}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char *  who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6207 of file midas.cxx.

6207 {
6208 if (rp < 0 || rp > pheader->size) {
6209 cm_msg(MERROR, "bm_validate_rp",
6210 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6211 pheader->name,
6212 rp,
6213 pheader->read_pointer,
6214 pheader->write_pointer,
6215 pheader->size,
6216 who);
6217 return FALSE;
6218 }
6219
6220 if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6221 // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6222 cm_msg(MERROR, "bm_validate_rp",
6223 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6224 pheader->name,
6225 rp,
6226 (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6227 pheader->read_pointer,
6228 pheader->write_pointer,
6229 pheader->size,
6230 who);
6231 return FALSE;
6232 }
6233
6234 return TRUE;
6235}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9091 of file midas.cxx.

9092{
9093 // return values:
9094 // BM_SUCCESS - have "requested_space" bytes free in the buffer
9095 // BM_CORRUPTED - shared memory is corrupted
9096 // BM_NO_MEMORY - asked for more than buffer size
9097 // BM_ASYNC_RETURN - timeout waiting for free space
9098 // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9099 // SS_ABORT - we are told to shutdown (locks releases)
9100
9101 int status;
9102 BUFFER* pbuf = pbuf_guard.get_pbuf();
9103 BUFFER_HEADER *pheader = pbuf->buffer_header;
9104 char *pdata = (char *) (pheader + 1);
9105
9106 /* make sure the buffer never completely full:
9107 * read pointer and write pointer would coincide
9108 * and the code cannot tell if it means the
9109 * buffer is 100% full or 100% empty. It will explode
9110 * or lose events */
9111 requested_space += 100;
9112
9113 if (requested_space >= pheader->size)
9114 return BM_NO_MEMORY;
9115
9116 DWORD time_start = ss_millitime();
9117 DWORD time_end = time_start + timeout_msec;
9118
9119 //DWORD blocking_time = 0;
9120 //int blocking_loops = 0;
9121 int blocking_client_index = -1;
9122 char blocking_client_name[NAME_LENGTH];
9123 blocking_client_name[0] = 0;
9124
9125 while (1) {
9126 while (1) {
9127 /* check if enough space in buffer */
9128
9129 int free = pheader->read_pointer - pheader->write_pointer;
9130 if (free <= 0)
9131 free += pheader->size;
9132
9133 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9134
9135 if (requested_space < free) { /* note the '<' to avoid 100% filling */
9136 //if (blocking_loops) {
9137 // DWORD wait_time = ss_millitime() - blocking_time;
9138 // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9139 //}
9140
9141 if (pbuf->wait_start_time != 0) {
9142 DWORD now = ss_millitime();
9143 DWORD wait_time = now - pbuf->wait_start_time;
9144 pbuf->time_write_wait += wait_time;
9145 pbuf->wait_start_time = 0;
9146 int iclient = pbuf->wait_client_index;
9147 //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9148 if (iclient >= 0 && iclient < MAX_CLIENTS) {
9149 pbuf->client_count_write_wait[iclient] += 1;
9150 pbuf->client_time_write_wait[iclient] += wait_time;
9151 }
9152 }
9153
9154 //if (blocking_loops > 0) {
9155 // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9156 //}
9157
9158 return BM_SUCCESS;
9159 }
9160
9161 if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9162 cm_msg(MERROR, "bm_wait_for_free_space",
9163 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9164 pheader->name,
9165 pheader->read_pointer,
9166 pheader->write_pointer,
9167 pheader->size,
9168 free,
9169 requested_space);
9170 return BM_CORRUPTED;
9171 }
9172
9173 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9174 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9175 int total_size = ALIGN8(event_size);
9176
9177#if 0
9178 printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9179#endif
9180
9181 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9182 cm_msg(MERROR, "bm_wait_for_free_space",
9183 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9184 pheader->name,
9185 pheader->read_pointer,
9186 pheader->write_pointer,
9187 pheader->size,
9188 free,
9189 requested_space,
9190 pevent->data_size,
9191 event_size,
9192 total_size);
9193 return BM_CORRUPTED;
9194 }
9195
9196 int blocking_client = -1;
9197
9198 int i;
9199 for (i = 0; i < pheader->max_client_index; i++) {
9200 BUFFER_CLIENT *pc = pheader->client + i;
9201 if (pc->pid) {
9202 if (pc->read_pointer == pheader->read_pointer) {
9203 /*
9204 First assume that the client with the "minimum" read pointer
9205 is not really blocking due to a GET_ALL request.
9206 */
9207 BOOL blocking = FALSE;
9208 //int blocking_request_id = -1;
9209
9210 int j;
9211 for (j = 0; j < pc->max_request_index; j++) {
9212 const EVENT_REQUEST *prequest = pc->event_request + j;
9213 if (prequest->valid
9214 && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9215 if (prequest->sampling_type & GET_ALL) {
9216 blocking = TRUE;
9217 //blocking_request_id = prequest->id;
9218 break;
9219 }
9220 }
9221 }
9222
9223 //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9224
9225 if (blocking) {
9226 blocking_client = i;
9227 break;
9228 }
9229
9230 pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9231 }
9232 }
9233 } /* client loop */
9234
9235 if (blocking_client >= 0) {
9236 blocking_client_index = blocking_client;
9237 mstrlcpy(blocking_client_name, pheader->client[blocking_client].name, sizeof(blocking_client_name));
9238 //if (!blocking_time) {
9239 // blocking_time = ss_millitime();
9240 //}
9241
9242 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9243
9244 // from this "break" we go into timeout check and sleep/wait.
9245 break;
9246 }
9247
9248 /* no blocking clients. move the read pointer and again check for free space */
9249
9250 BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9251
9252 if (!moved) {
9253 cm_msg(MERROR, "bm_wait_for_free_space",
9254 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9255 pheader->name,
9256 pheader->read_pointer,
9257 pheader->write_pointer,
9258 pheader->size,
9259 free,
9260 requested_space);
9261 return BM_CORRUPTED;
9262 }
9263
9264 /* we freed one event, loop back to the check for free space */
9265 }
9266
9267 //blocking_loops++;
9268
9269 /* at least one client is blocking */
9270
9271 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
9272 pc->write_wait = requested_space;
9273
9274 if (pbuf->wait_start_time == 0) {
9275 pbuf->wait_start_time = ss_millitime();
9276 pbuf->count_write_wait++;
9277 if (requested_space > pbuf->max_requested_space)
9278 pbuf->max_requested_space = requested_space;
9279 pbuf->wait_client_index = blocking_client_index;
9280 }
9281
9282 DWORD now = ss_millitime();
9283
9284 //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9285
9286 int sleep_time_msec = 1000;
9287
9288 if (timeout_msec == BM_WAIT) {
9289 // wait forever
9290 } else if (timeout_msec == BM_NO_WAIT) {
9291 // no wait
9292 return BM_ASYNC_RETURN;
9293 } else {
9294 // check timeout
9295 if (now >= time_end) {
9296 // timeout!
9297 return BM_ASYNC_RETURN;
9298 }
9299
9300 sleep_time_msec = time_end - now;
9301
9302 if (sleep_time_msec <= 0) {
9303 sleep_time_msec = 10;
9304 } else if (sleep_time_msec > 1000) {
9305 sleep_time_msec = 1000;
9306 }
9307 }
9308
9310
9311 /* before waiting, unlock everything in the correct order */
9312
9313 pbuf_guard.unlock();
9314
9315 if (unlock_write_cache)
9316 pbuf->write_cache_mutex.unlock();
9317
9318 //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9319
9320#ifdef DEBUG_MSG
9321 cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9322#endif
9323
9325 //int idx = bm_validate_client_index_locked(pbuf, FALSE);
9326 //if (idx >= 0)
9327 // pheader->client[idx].write_wait = requested_space;
9328
9329 //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9330
9331 status = ss_suspend(sleep_time_msec, MSG_BM);
9332
9333 /* we are told to shutdown */
9334 if (status == SS_ABORT) {
9335 // NB: buffer is locked!
9336 return SS_ABORT;
9337 }
9338
9339 /* make sure we do sleep in this loop:
9340 * if we are the mserver receiving data on the event
9341 * socket and the data buffer is full, ss_suspend() will
9342 * never sleep: it will detect data on the event channel,
9343 * call rpc_server_receive() (recursively, we already *are* in
9344 * rpc_server_receive()) and return without sleeping. Result
9345 * is a busy loop waiting for free space in data buffer */
9346
9347 /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9348 * the event socket, and should sleep now, so this sleep below
9349 * maybe is not needed now. but for safety, I keep it. K.O. */
9350
9351 if (status != SS_TIMEOUT) {
9352 //printf("ss_suspend: status %d\n", status);
9353 ss_sleep(1);
9354 }
9355
9356 /* we may be stuck in this loop for an arbitrary long time,
9357 * depending on how other buffer clients read the accumulated data
9358 * so we should update all the timeouts & etc. K.O. */
9359
9361
9362 /* lock things again in the correct order */
9363
9364 if (unlock_write_cache) {
9366
9367 if (status != BM_SUCCESS) {
9368 // bail out with all locks released
9369 return status;
9370 }
9371 }
9372
9373 if (!pbuf_guard.relock()) {
9374 if (unlock_write_cache) {
9375 pbuf->write_cache_mutex.unlock();
9376 }
9377
9378 // bail out with all locks released
9379 return pbuf_guard.get_status();
9380 }
9381
9382 /* revalidate the client index: we could have been removed from the buffer while sleeping */
9383 pc = bm_get_my_client_locked(pbuf_guard);
9384
9385 pc->write_wait = 0;
9386
9388 //idx = bm_validate_client_index_locked(pbuf, FALSE);
9389 //if (idx >= 0)
9390 // pheader->client[idx].write_wait = 0;
9391 //else {
9392 // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9393 // status = SS_ABORT;
9394 //}
9395
9396#ifdef DEBUG_MSG
9397 cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9398#endif
9399
9400 }
9401}
int get_status() const
Definition midas.cxx:3192
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition midas.cxx:8731
INT cm_periodic_tasks()
Definition midas.cxx:5597
#define SS_TIMEOUT
Definition midas.h:675
#define MDEBUG
Definition midas.h:561
#define MSG_BM
Definition msystem.h:302
INT ss_suspend(INT millisec, INT msg)
Definition system.cxx:4615
INT ss_sleep(INT millisec)
Definition system.cxx:3700
int max_requested_space
Definition midas.h:1020
int count_write_wait
Definition midas.h:1015
int wait_client_index
Definition midas.h:1019
DWORD time_write_wait
Definition midas.h:1016
DWORD wait_start_time
Definition midas.h:1018
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9403 of file midas.cxx.

9404{
9405 BUFFER* pbuf = pbuf_guard.get_pbuf();
9406 BUFFER_HEADER* pheader = pbuf->buffer_header;
9407
9408 //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9409
9410 if (pc->read_pointer != pheader->write_pointer) {
9411 // buffer has data
9412 return BM_SUCCESS;
9413 }
9414
9415 if (timeout_msec == BM_NO_WAIT) {
9416 /* event buffer is empty and we are told to not wait */
9417 if (!pc->read_wait) {
9418 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9419 pc->read_wait = TRUE;
9420 }
9421 return BM_ASYNC_RETURN;
9422 }
9423
9424 DWORD time_start = ss_millitime();
9425 DWORD time_wait = time_start + timeout_msec;
9426 DWORD sleep_time = 1000;
9427 if (timeout_msec == BM_NO_WAIT) {
9428 // default sleep time
9429 } else if (timeout_msec == BM_WAIT) {
9430 // default sleep time
9431 } else {
9432 if (sleep_time > (DWORD)timeout_msec)
9433 sleep_time = timeout_msec;
9434 }
9435
9436 //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9437
9438 while (pc->read_pointer == pheader->write_pointer) {
9439 /* wait until there is data in the buffer (write pointer moves) */
9440
9441 if (!pc->read_wait) {
9442 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9443 pc->read_wait = TRUE;
9444 }
9445
9447
9449
9450 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9451
9452 pbuf_guard.unlock();
9453
9454 if (unlock_read_cache)
9455 pbuf->read_cache_mutex.unlock();
9456
9457 int status = ss_suspend(sleep_time, MSG_BM);
9458
9459 if (timeout_msec == BM_NO_WAIT) {
9460 // return immediately
9461 } else if (timeout_msec == BM_WAIT) {
9462 // wait forever
9463 } else {
9464 DWORD now = ss_millitime();
9465 //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9466 if (now >= time_wait) {
9467 timeout_msec = BM_NO_WAIT; // cause immediate return
9468 } else {
9469 sleep_time = time_wait - now;
9470 if (sleep_time > 1000)
9471 sleep_time = 1000;
9472 //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9473 }
9474 }
9475
9476 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9477
9478 if (unlock_read_cache) {
9480 if (status != BM_SUCCESS) {
9481 // bail out with all locks released
9482 return status;
9483 }
9484 }
9485
9486 if (!pbuf_guard.relock()) {
9487 if (unlock_read_cache) {
9488 pbuf->read_cache_mutex.unlock();
9489 }
9490 // bail out with all locks released
9491 return pbuf_guard.get_status();
9492 }
9493
9494 /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9495 * because we may have been removed from the buffer by bm_cleanup() & co
9496 * due to a timeout or whatever. */
9497 pc = bm_get_my_client_locked(pbuf_guard);
9498
9499 /* return if TCP connection broken */
9500 if (status == SS_ABORT)
9501 return SS_ABORT;
9502
9503 if (timeout_msec == BM_NO_WAIT)
9504 return BM_ASYNC_RETURN;
9505 }
9506
9507 if (pc->read_wait) {
9508 //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9509 pc->read_wait = FALSE;
9510 }
9511
9512 return BM_SUCCESS;
9513}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8798 of file midas.cxx.

8798 {
8799 int i;
8800 int have_get_all_requests = 0;
8801
8802 for (i = 0; i < pc->max_request_index; i++)
8803 if (pc->event_request[i].valid)
8804 have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8805
8806 /* only GET_ALL requests actually free space in the event buffer */
8807 if (!have_get_all_requests)
8808 return;
8809
8810 /*
8811 If read pointer has been changed, it may have freed up some space
8812 for waiting producers. So check if free space is now more than 50%
8813 of the buffer size and wake waiting producers.
8814 */
8815
8816 int free_space = pc->read_pointer - pheader->write_pointer;
8817 if (free_space <= 0)
8818 free_space += pheader->size;
8819
8820 if (free_space >= pheader->size * 0.5) {
8821 for (i = 0; i < pheader->max_client_index; i++) {
8822 const BUFFER_CLIENT *pc = pheader->client + i;
8823 if (pc->pid && pc->write_wait) {
8824 BOOL send_wakeup = (pc->write_wait < free_space);
8825 //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8826 if (send_wakeup) {
8827 ss_resume(pc->port, "B ");
8828 }
8829 }
8830 }
8831 }
8832}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6597 of file midas.cxx.

6598{
6599 //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6600
6601 bm_lock_buffer_guard pbuf_guard(pbuf);
6602
6603 if (!pbuf_guard.is_locked())
6604 return;
6605
6606 if (!force) {
6607 if (pbuf->count_lock == pbuf->last_count_lock) {
6608 return;
6609 }
6610 }
6611
6612 std::string buffer_name = pbuf->buffer_name;
6613 std::string client_name = pbuf->client_name;
6614
6615 if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6616 // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6617 pbuf_guard.unlock(); // unlock before cm_msg()
6618 cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6619 return;
6620 }
6621
6622 pbuf->last_count_lock = pbuf->count_lock;
6623
6624 BUFFER_INFO xbuf(pbuf);
6625 BUFFER_HEADER xheader = *pbuf->buffer_header;
6626 int client_index = pbuf->client_index;
6627
6628 pbuf_guard.unlock();
6629
6630 bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6631}
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition midas.cxx:6475
int last_count_lock
Definition midas.h:1017
int count_lock
Definition midas.h:1012
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char *  buffer_name,
const char *  client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6475 of file midas.cxx.

6476{
6477 int status;
6478
6479 DWORD now = ss_millitime();
6480
6481 HNDLE hKey;
6482 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6483 if (status != DB_SUCCESS) {
6484 db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6485 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6486 if (status != DB_SUCCESS)
6487 return;
6488 }
6489
6490 HNDLE hKeyBuffer;
6491 status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6492 if (status != DB_SUCCESS) {
6494 status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6495 if (status != DB_SUCCESS)
6496 return;
6497 }
6498
6499 double buf_size = pheader->size;
6500 double buf_rptr = pheader->read_pointer;
6501 double buf_wptr = pheader->write_pointer;
6502
6503 double buf_fill = 0;
6504 double buf_cptr = 0;
6505 double buf_cused = 0;
6506 double buf_cused_pct = 0;
6507
6508 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6509 buf_cptr = pheader->client[client_index].read_pointer;
6510
6511 if (buf_wptr == buf_cptr) {
6512 buf_cused = 0;
6513 } else if (buf_wptr > buf_cptr) {
6514 buf_cused = buf_wptr - buf_cptr;
6515 } else {
6516 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6517 }
6518
6519 buf_cused_pct = buf_cused / buf_size * 100.0;
6520
6521 // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6522 // because some other GET_ALL client may have different buf_cused & etc,
6523 // so they must be written into the per-client statistics
6524 // and the web page should look at all the GET_ALL clients and used
6525 // the biggest buf_cused as the whole-buffer "bytes used" value.
6526 }
6527
6528 if (buf_wptr == buf_rptr) {
6529 buf_fill = 0;
6530 } else if (buf_wptr > buf_rptr) {
6531 buf_fill = buf_wptr - buf_rptr;
6532 } else {
6533 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6534 }
6535
6536 double buf_fill_pct = buf_fill / buf_size * 100.0;
6537
6538 db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6539 db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6540 db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6541 db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6542 db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6543
6544 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6545 if (status != DB_SUCCESS) {
6546 db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6547 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6548 if (status != DB_SUCCESS)
6549 return;
6550 }
6551
6552 HNDLE hKeyClient;
6553 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6554 if (status != DB_SUCCESS) {
6555 db_create_key(hDB, hKey, client_name, TID_KEY);
6556 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6557 if (status != DB_SUCCESS)
6558 return;
6559 }
6560
6561 db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6562 db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6563 db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6564 db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6565 db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6566 db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6567 db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6568 db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6569 db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6570 db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6571 db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6572 db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6573
6574 for (int i = 0; i < MAX_CLIENTS; i++) {
6575 if (!pbuf->client_count_write_wait[i])
6576 continue;
6577
6578 if (pheader->client[i].pid == 0)
6579 continue;
6580
6581 if (pheader->client[i].name[0] == 0)
6582 continue;
6583
6584 char str[100 + NAME_LENGTH];
6585
6586 sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6587 db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6588
6589 sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6590 db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6591 }
6592
6593 db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6594 db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6595}
#define TID_DOUBLE
Definition midas.h:343
#define TID_KEY
Definition midas.h:349
#define TID_BOOL
Definition midas.h:340
#define TID_INT32
Definition midas.h:339
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition odb.cxx:3392
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition odb.cxx:5028
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4256
HNDLE hKey
int count_sent
Definition midas.cxx:6438
BOOL get_all_flag
Definition midas.cxx:6434
int count_lock
Definition midas.cxx:6437
int count_write_wait
Definition midas.cxx:6440
double bytes_read
Definition midas.cxx:6447
int client_count_write_wait[MAX_CLIENTS]
Definition midas.cxx:6448
DWORD time_write_wait
Definition midas.cxx:6441
int count_read
Definition midas.cxx:6446
double bytes_sent
Definition midas.cxx:6439
DWORD client_time_write_wait[MAX_CLIENTS]
Definition midas.cxx:6449
int max_requested_space
Definition midas.cxx:6445
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7291 of file midas.cxx.

7291 {
7292#ifdef LOCAL_ROUTINES
7293 {
7294 int status;
7295 HNDLE hDB;
7296
7298
7299 if (status != CM_SUCCESS) {
7300 //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7301 return BM_SUCCESS;
7302 }
7303
7304 std::vector<BUFFER*> mybuffers;
7305
7306 gBuffersMutex.lock();
7307 mybuffers = gBuffers;
7308 gBuffersMutex.unlock();
7309
7310 for (BUFFER* pbuf : mybuffers) {
7311 if (!pbuf || !pbuf->attached)
7312 continue;
7314 }
7315 }
7316#endif /* LOCAL_ROUTINES */
7317
7318 return BM_SUCCESS;
7319}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9515 of file midas.cxx.

9516{
9517 char *pdata = (char *) (pheader + 1);
9518
9519 //int old_write_pointer = pheader->write_pointer;
9520
9521 /* new event fits into the remaining space? */
9522 if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9523 //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9524 char* wptr = pdata + pheader->write_pointer;
9525 for (int i=0; i<sg_n; i++) {
9526 //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9527 memcpy(wptr, sg_ptr[i], sg_len[i]);
9528 wptr += sg_len[i];
9529 }
9530 pheader->write_pointer = pheader->write_pointer + total_size;
9531 assert(pheader->write_pointer <= pheader->size);
9532 /* remaining space is smaller than size of an event header? */
9533 if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9534 // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9535 // equal to the event header size, we will write the next event header here,
9536 // then wrap the pointer and write the event data at the beginning of the buffer.
9537 //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9538 pheader->write_pointer = 0;
9539 }
9540 } else {
9541 /* split event */
9542 size_t size = pheader->size - pheader->write_pointer;
9543
9544 //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9545
9546 //memcpy(pdata + pheader->write_pointer, pevent, size);
9547 //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9548
9549 char* wptr = pdata + pheader->write_pointer;
9550 size_t count = 0;
9551
9552 // copy first part
9553
9554 int i = 0;
9555 for (; i<sg_n; i++) {
9556 if (count + sg_len[i] > size)
9557 break;
9558 memcpy(wptr, sg_ptr[i], sg_len[i]);
9559 wptr += sg_len[i];
9560 count += sg_len[i];
9561 }
9562
9563 //printf("wptr %d, count %d\n", wptr-pdata, count);
9564
9565 // split segment
9566
9567 size_t first = size - count;
9568 size_t second = sg_len[i] - first;
9569 assert(first + second == sg_len[i]);
9570 assert(count + first == size);
9571
9572 //printf("first %d, second %d\n", first, second);
9573
9574 memcpy(wptr, sg_ptr[i], first);
9575 wptr = pdata + 0;
9576 count += first;
9577 memcpy(wptr, sg_ptr[i] + first, second);
9578 wptr += second;
9579 count += second;
9580 i++;
9581
9582 // copy remaining
9583
9584 for (; i<sg_n; i++) {
9585 memcpy(wptr, sg_ptr[i], sg_len[i]);
9586 wptr += sg_len[i];
9587 count += sg_len[i];
9588 }
9589
9590 //printf("wptr %d, count %d\n", wptr-pdata, count);
9591
9592 //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9593
9594 pheader->write_pointer = total_size - size;
9595 }
9596
9597 //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9598}
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5937 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5932 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5938 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11420 of file midas.cxx.