MIDAS
Loading...
Searching...
No Matches
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

static int bm_validate_client_index_locked (bm_lock_buffer_guard &pbuf_guard)
 
INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char > > &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11294 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8336 of file midas.cxx.

8380{
8381 if (rpc_is_remote())
8382 return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8383 trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8384
8385#ifdef LOCAL_ROUTINES
8386 {
8387 int status = 0;
8388
8389 BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8390
8391 if (!pbuf)
8392 return status;
8393
8394 /* lock buffer */
8396
8397 if (!pbuf_guard.is_locked())
8398 return pbuf_guard.get_status();
8399
8400 /* avoid callback/non callback requests */
8401 if (func == NULL && pbuf->callback) {
8402 pbuf_guard.unlock(); // unlock before cm_msg()
8403 cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8404 return BM_INVALID_MIXING;
8405 }
8406
8407 /* do not allow GET_RECENT with nonzero cache size */
8408 if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8409 pbuf_guard.unlock(); // unlock before cm_msg()
8410 cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8411 return BM_INVALID_PARAM;
8412 }
8413
8414 /* get a pointer to the proper client structure */
8416
8417 /* look for a empty request entry */
8418 int i;
8419 for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8420 if (!pclient->event_request[i].valid)
8421 break;
8422
8423 if (i == MAX_EVENT_REQUESTS) {
8424 // implicit unlock
8425 return BM_NO_MEMORY;
8426 }
8427
8428 /* setup event_request structure */
8429 pclient->event_request[i].id = request_id;
8430 pclient->event_request[i].valid = TRUE;
8431 pclient->event_request[i].event_id = event_id;
8432 pclient->event_request[i].trigger_mask = trigger_mask;
8433 pclient->event_request[i].sampling_type = sampling_type;
8434
8435 pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8436
8437 pbuf->get_all_flag = pclient->all_flag;
8438
8439 /* set callback flag in buffer structure */
8440 if (func != NULL)
8441 pbuf->callback = TRUE;
8442
8443 /*
8444 Save the index of the last request in the list so that later only the
8445 requests 0..max_request_index-1 have to be searched through.
8446 */
8447
8448 if (i + 1 > pclient->max_request_index)
8449 pclient->max_request_index = i + 1;
8450 }
8451#endif /* LOCAL_ROUTINES */
8452
8453 return BM_SUCCESS;
8454}
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
Definition midas.cxx:6021
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition midas.cxx:6644
#define BM_INVALID_PARAM
Definition midas.h:619
#define BM_NO_MEMORY
Definition midas.h:607
#define BM_INVALID_MIXING
Definition midas.h:621
#define BM_SUCCESS
Definition midas.h:605
#define GET_ALL
Definition midas.h:321
#define GET_RECENT
Definition midas.h:323
#define MERROR
Definition midas.h:559
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:929
#define RPC_BM_ADD_EVENT_REQUEST
Definition mrpc.h:43
bool rpc_is_remote(void)
Definition midas.cxx:12783
INT rpc_call(DWORD routine_id,...)
Definition midas.cxx:13685
INT i
Definition mdump.cxx:32
int INT
Definition midas.h:129
#define TRUE
Definition midas.h:182
#define MAX_EVENT_REQUESTS
Definition midas.h:275
#define POINTER_T
Definition midas.h:166
#define trigger_mask
#define event_id
DWORD status
Definition odbhist.cxx:39
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 10976 of file midas.cxx.

10976 {
10977#ifdef LOCAL_ROUTINES
10978 {
10979 INT status = 0;
10980 BOOL bMore;
10981 DWORD start_time;
10982 //static DWORD last_time = 0;
10983
10984 /* if running as a server, buffer checking is done by client
10985 via ASYNC bm_receive_event */
10986 if (rpc_is_mserver()) {
10987 return FALSE;
10988 }
10989
10990 bMore = FALSE;
10991 start_time = ss_millitime();
10992
10993 std::vector<BUFFER*> mybuffers;
10994
10995 gBuffersMutex.lock();
10997 gBuffersMutex.unlock();
10998
10999 /* go through all buffers */
11000 for (size_t idx = 0; idx < mybuffers.size(); idx++) {
11002
11003 if (!pbuf || !pbuf->attached)
11004 continue;
11005
11006 //int count_loops = 0;
11007 while (1) {
11008 if (pbuf->attached) {
11009 /* one bm_push_event could cause a run stop and a buffer close, which
11010 * would crash the next call to bm_push_event(). So check for valid
11011 * buffer on each call */
11012
11013 /* this is what happens:
11014 * bm_push_buffer() may call a user callback function
11015 * user callback function may indirectly call bm_close() of this buffer,
11016 * i.e. if it stops the run,
11017 * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
11018 * here we will see pbuf->attched is false and quit this loop
11019 */
11020
11021 status = bm_push_buffer(pbuf, idx + 1);
11022
11023 if (status == BM_CORRUPTED) {
11024 return status;
11025 }
11026
11027 //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
11028
11029 if (status != BM_MORE_EVENTS) {
11030 //DWORD t = ss_millitime() - start_time;
11031 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
11032 break;
11033 }
11034
11035 // count_loops++;
11036 }
11037
11038 // NB: this code has a logic error: if 2 buffers always have data,
11039 // this timeout will cause us to exit reading the 1st buffer
11040 // after 1000 msec, then we read the 2nd buffer exactly once,
11041 // and exit the loop because the timeout is still active -
11042 // we did not reset "start_time" when we started reading
11043 // from the 2nd buffer. Result is that we always read all
11044 // the data in a loop from the 1st buffer, but read just
11045 // one event from the 2nd buffer, resulting in severe unfairness.
11046
11047 /* stop after one second */
11048 DWORD t = ss_millitime() - start_time;
11049 if (t > 1000) {
11050 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
11051 bMore = TRUE;
11052 break;
11053 }
11054 }
11055 }
11056
11057 //last_time = start_time;
11058
11059 return bMore;
11060
11061 }
11062#else /* LOCAL_ROUTINES */
11063
11064 return FALSE;
11065
11066#endif
11067}
#define FALSE
Definition cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition midas.cxx:10924
#define BM_MORE_EVENTS
Definition midas.h:620
#define BM_CORRUPTED
Definition midas.h:623
unsigned int DWORD
Definition mcstd.h:51
DWORD ss_millitime()
Definition system.cxx:3465
bool rpc_is_mserver(void)
Definition midas.cxx:12840
static std::mutex gBuffersMutex
Definition midas.cxx:195
static std::vector< BUFFER * > gBuffers
Definition midas.cxx:196
DWORD BOOL
Definition midas.h:105
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8988 of file midas.cxx.

8988 {
8989
8991 int i;
8992 for (i = 0; i < pc->max_request_index; i++) {
8993 const EVENT_REQUEST *prequest = pc->event_request + i;
8994 if (prequest->valid) {
8995 if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8996 /* check if this is a recent event */
8997 if (prequest->sampling_type == GET_RECENT) {
8998 if (ss_time() - pevent->time_stamp > 1) {
8999 /* skip that event */
9000 continue;
9001 }
9002 }
9003
9005 break;
9006 }
9007 }
9008 }
9009 return is_requested;
9010}
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition midas.cxx:6037
DWORD ss_time()
Definition system.cxx:3534
DWORD time_stamp
Definition midas.h:855
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6088 of file midas.cxx.

6088 {
6089 BUFFER_HEADER *pheader;
6091 int j;
6092
6093 pheader = pbuf->buffer_header;
6094 pbclient = pheader->client;
6095
6096 /* now check other clients */
6097 for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6098 if (pbclient->pid) {
6099 if (!ss_pid_exists(pbclient->pid)) {
6100 cm_msg(MINFO, "bm_cleanup",
6101 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6102 pheader->name, who, pbclient->pid);
6103
6104 bm_remove_client_locked(pheader, j);
6105 continue;
6106 }
6107 }
6108
6109 /* If client process has no activity, clear its buffer entry. */
6110 if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6111 DWORD tdiff = actual_time - pbclient->last_activity;
6112#if 0
6113 printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6114 pheader->name,
6115 pbclient->name,
6116 pbclient->last_activity,
6118 tdiff,
6119 tdiff,
6120 pbclient->watchdog_timeout);
6121#endif
6122 if (actual_time > pbclient->last_activity &&
6123 tdiff > pbclient->watchdog_timeout) {
6124
6125 cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6126 pbclient->name, pheader->name, who,
6127 tdiff / 1000.0,
6128 pbclient->watchdog_timeout / 1000.0);
6129
6130 bm_remove_client_locked(pheader, j);
6131 }
6132 }
6133 }
6134}
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition midas.cxx:6057
#define MINFO
Definition midas.h:560
BOOL ss_pid_exists(int pid)
Definition system.cxx:1442
DWORD actual_time
Definition mfe.cxx:37
INT j
Definition odbhist.cxx:40
char name[NAME_LENGTH]
Definition midas.h:958
INT max_client_index
Definition midas.h:960
BUFFER_CLIENT client[MAX_CLIENTS]
Definition midas.h:967
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6430 of file midas.cxx.

6430 {
6431 HNDLE hKey;
6432 int status;
6433
6434 char str[256 + 2 * NAME_LENGTH];
6435 sprintf(str, "/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6436 //printf("delete [%s]\n", str);
6437 status = db_find_key(hDB, 0, str, &hKey);
6438 if (status == DB_SUCCESS) {
6440 }
6441}
#define DB_SUCCESS
Definition midas.h:631
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
Definition odb.cxx:3861
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4084
HNDLE hKey
HNDLE hDB
main ODB handle
Definition mana.cxx:207
INT HNDLE
Definition midas.h:132
#define NAME_LENGTH
Definition midas.h:272
char str[256]
Definition odbhist.cxx:33
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7265 of file midas.cxx.

7265 {
7266 if (rpc_is_remote())
7268
7269#ifdef LOCAL_ROUTINES
7270 {
7272
7273 gBuffersMutex.lock();
7274 size_t nbuf = gBuffers.size();
7275 gBuffersMutex.unlock();
7276
7277 for (size_t i = nbuf; i > 0; i--) {
7279 }
7280
7281 gBuffersMutex.lock();
7282 for (size_t i=0; i< gBuffers.size(); i++) {
7283 BUFFER* pbuf = gBuffers[i];
7284 if (!pbuf)
7285 continue;
7286 delete pbuf;
7287 pbuf = NULL;
7288 gBuffers[i] = NULL;
7289 }
7290 gBuffersMutex.unlock();
7291 }
7292#endif /* LOCAL_ROUTINES */
7293
7294 return BM_SUCCESS;
7295}
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7118
int cm_msg_close_buffer(void)
Definition midas.cxx:499
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7118 of file midas.cxx.

7118 {
7119 //printf("bm_close_buffer: handle %d\n", buffer_handle);
7120
7121 if (rpc_is_remote())
7122 return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7123
7124#ifdef LOCAL_ROUTINES
7125 {
7126 int status = 0;
7127
7128 BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7129
7130 if (!pbuf)
7131 return status;
7132
7133 //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7134
7135 int i;
7136
7137 { /* delete all requests for this buffer */
7138 _request_list_mutex.lock();
7139 std::vector<EventRequest> request_list_copy = _request_list;
7140 _request_list_mutex.unlock();
7141 for (size_t i = 0; i < request_list_copy.size(); i++) {
7142 if (request_list_copy[i].buffer_handle == buffer_handle) {
7144 }
7145 }
7146 }
7147
7148 HNDLE hDB;
7150
7151 if (hDB) {
7152 /* write statistics to odb */
7154 }
7155
7156 /* lock buffer in correct order */
7157
7159
7160 if (status != BM_SUCCESS) {
7161 return status;
7162 }
7163
7165
7166 if (status != BM_SUCCESS) {
7167 pbuf->read_cache_mutex.unlock();
7168 return status;
7169 }
7170
7172
7173 if (!pbuf_guard.is_locked()) {
7174 pbuf->write_cache_mutex.unlock();
7175 pbuf->read_cache_mutex.unlock();
7176 return pbuf_guard.get_status();
7177 }
7178
7179 BUFFER_HEADER *pheader = pbuf->buffer_header;
7180
7181 /* mark entry in _buffer as empty */
7182 pbuf->attached = false;
7183
7185
7186 if (pclient) {
7187 /* clear entry from client structure in buffer header */
7188 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7189 }
7190
7191 /* calculate new max_client_index entry */
7192 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7193 if (pheader->client[i].pid != 0)
7194 break;
7195 pheader->max_client_index = i + 1;
7196
7197 /* count new number of clients */
7198 int j = 0;
7199 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7200 if (pheader->client[i].pid != 0)
7201 j++;
7202 pheader->num_clients = j;
7203
7204 int destroy_flag = (pheader->num_clients == 0);
7205
7206 // we hold the locks on the read cache and the write cache.
7207
7208 /* free cache */
7209 if (pbuf->read_cache_size > 0) {
7210 free(pbuf->read_cache);
7211 pbuf->read_cache = NULL;
7212 pbuf->read_cache_size = 0;
7213 pbuf->read_cache_rp = 0;
7214 pbuf->read_cache_wp = 0;
7215 }
7216
7217 if (pbuf->write_cache_size > 0) {
7218 free(pbuf->write_cache);
7219 pbuf->write_cache = NULL;
7220 pbuf->write_cache_size = 0;
7221 pbuf->write_cache_rp = 0;
7222 pbuf->write_cache_wp = 0;
7223 }
7224
7225 /* check if anyone is waiting and wake him up */
7226
7227 for (int i = 0; i < pheader->max_client_index; i++) {
7228 BUFFER_CLIENT *pclient = pheader->client + i;
7229 if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7230 ss_resume(pclient->port, "B ");
7231 }
7232
7233 /* unmap shared memory, delete it if we are the last */
7234
7235 ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7236
7237 /* after ss_shm_close() these are invalid: */
7238
7239 pheader = NULL;
7240 pbuf->buffer_header = NULL;
7241 pbuf->shm_size = 0;
7242 pbuf->shm_handle = 0;
7243
7244 /* unlock buffer in correct order */
7245
7246 pbuf_guard.unlock();
7247
7248 pbuf->write_cache_mutex.unlock();
7249 pbuf->read_cache_mutex.unlock();
7250
7251 /* delete semaphore */
7252
7254 }
7255#endif /* LOCAL_ROUTINES */
7256
7257 return BM_SUCCESS;
7258}
INT bm_delete_request(INT request_id)
Definition midas.cxx:8606
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition midas.cxx:6608
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3025
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition midas.cxx:7926
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition midas.cxx:7947
INT ss_resume(INT port, const char *message)
Definition system.cxx:4916
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition system.cxx:2941
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition system.cxx:757
#define RPC_BM_CLOSE_BUFFER
Definition mrpc.h:37
static std::vector< EventRequest > _request_list
Definition midas.cxx:220
static std::mutex _request_list_mutex
Definition midas.cxx:219
#define MAX_CLIENTS
Definition midas.h:274
INT num_clients
Definition midas.h:959
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition midas.cxx:8303
#define serial_number
#define time_stamp
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8303 of file midas.cxx.

8304{
8305 event_header->event_id = event_id;
8306 event_header->trigger_mask = trigger_mask;
8307 event_header->data_size = data_size;
8308 event_header->time_stamp = ss_time();
8309 event_header->serial_number = serial;
8310
8311 return BM_SUCCESS;
8312}
INT serial
Definition minife.c:20
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8314 of file midas.cxx.

8315{
8316 static std::mutex mutex;
8317
8318 event_header->event_id = event_id;
8319 event_header->trigger_mask = trigger_mask;
8320 event_header->data_size = data_size;
8321 event_header->time_stamp = ss_time();
8322 {
8323 std::lock_guard<std::mutex> lock(mutex);
8324 event_header->serial_number = *serial;
8325 *serial = *serial + 1;
8326 // implicit unlock
8327 }
8328
8329 return BM_SUCCESS;
8330}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9091 of file midas.cxx.

9091 {
9092 /* now convert event header */
9093 if (convert_flags) {
9094 rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9095 rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9096 rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9097 rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9098 rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9099 }
9100}
#define TID_UINT32
Definition midas.h:337
#define TID_INT16
Definition midas.h:335
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition midas.cxx:11703
#define RPC_OUTGOING
Definition midas.h:1583
short int event_id
Definition midas.h:852
DWORD data_size
Definition midas.h:856
DWORD serial_number
Definition midas.h:854
short int trigger_mask
Definition midas.h:853
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8606 of file midas.cxx.

8607{
8608 _request_list_mutex.lock();
8609
8610 if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8611 _request_list_mutex.unlock();
8612 return BM_INVALID_HANDLE;
8613 }
8614
8615 int buffer_handle = _request_list[request_id].buffer_handle;
8616
8617 _request_list[request_id].clear();
8618
8619 _request_list_mutex.unlock();
8620
8621 /* remove request entry from buffer */
8622 return bm_remove_event_request(buffer_handle, request_id);
8623}
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition midas.cxx:8540
#define BM_INVALID_HANDLE
Definition midas.h:609
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8845 of file midas.cxx.

8846{
8847 _request_list_mutex.lock();
8848 bool locked = true;
8849 size_t n = _request_list.size();
8850 /* call dispatcher */
8851 for (size_t i = 0; i < n; i++) {
8852 if (!locked) {
8853 _request_list_mutex.lock();
8854 locked = true;
8855 }
8857 if (r.buffer_handle != buffer_handle)
8858 continue;
8859 if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8860 continue;
8861 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8862 _request_list_mutex.unlock();
8863 locked = false;
8864 /* if event is fragmented, call defragmenter */
8865 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8866 bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8867 } else {
8868 r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8869 }
8870 }
8871 if (locked)
8872 _request_list_mutex.unlock();
8873}
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition midas.cxx:11306
DWORD n[4]
Definition mana.cxx:247
#define EVENTID_FRAG
Definition midas.h:907
#define EVENTID_FRAG1
Definition midas.h:906
short int event_id
Definition midas.cxx:206
INT buffer_handle
Definition midas.cxx:205
short int trigger_mask
Definition midas.cxx:207
EVENT_HANDLER * dispatcher
Definition midas.cxx:208
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11262 of file midas.cxx.

11262 {
11263 if (rpc_is_remote())
11265
11266#ifdef LOCAL_ROUTINES
11267 {
11268 std::vector<BUFFER*> mybuffers;
11269
11270 gBuffersMutex.lock();
11272 gBuffersMutex.unlock();
11273
11274 /* go through all buffers */
11275 for (BUFFER* pbuf : mybuffers) {
11276 if (!pbuf)
11277 continue;
11278 if (!pbuf->attached)
11279 continue;
11280
11281 int status = bm_skip_event(pbuf);
11282 if (status != BM_SUCCESS)
11283 return status;
11284 }
11285 }
11286#endif /* LOCAL_ROUTINES */
11287
11288 return BM_SUCCESS;
11289}
static int bm_skip_event(BUFFER *pbuf)
Definition midas.cxx:10855
#define RPC_BM_EMPTY_BUFFERS
Definition mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 9014 of file midas.cxx.

9015{
9016 BUFFER* pbuf = pbuf_guard.get_pbuf();
9017 BUFFER_HEADER* pheader = pbuf->buffer_header;
9020
9021 //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
9022
9023 /* loop over all events in the buffer */
9024
9025 while (1) {
9026 EVENT_HEADER *pevent = NULL;
9027 int event_size = 3; // poison value
9028 int total_size = 3; // poison value
9029
9030 int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
9031 if (status == BM_CORRUPTED) {
9032 return status;
9033 } else if (status != BM_SUCCESS) {
9034 /* event buffer is empty */
9035 if (timeout_msec == BM_NO_WAIT) {
9036 if (need_wakeup)
9038 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
9039 // read cache is empty
9040 return BM_ASYNC_RETURN;
9041 }
9042 return BM_SUCCESS;
9043 }
9044
9046
9047 if (status != BM_SUCCESS) {
9048 // we only come here with SS_ABORT & co
9049 return status;
9050 }
9051
9052 // make sure we wait for new event only once
9054 // go back to bm_peek_buffer_locked
9055 continue;
9056 }
9057
9058 /* loop over all requests: if this event matches a request,
9059 * copy it to the read cache */
9060
9062
9063 if (is_requested) {
9064 if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9065 /* read cache is full */
9066 if (need_wakeup)
9068 return BM_SUCCESS;
9069 }
9070
9071 bm_read_from_buffer_locked(pheader, pc->read_pointer, pbuf->read_cache + pbuf->read_cache_wp, event_size);
9072
9073 pbuf->read_cache_wp += total_size;
9074
9075 /* update statistics */
9076 pheader->num_out_events++;
9077 pbuf->count_read++;
9078 pbuf->bytes_read += event_size;
9079 }
9080
9081 /* shift read pointer */
9082
9083 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9084 pc->read_pointer = new_read_pointer;
9085
9086 need_wakeup = TRUE;
9087 }
9088 /* NOT REACHED */
9089}
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition midas.cxx:8809
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition midas.cxx:6245
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:8988
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition midas.cxx:8958
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8913
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition midas.cxx:9414
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_NO_WAIT
Definition midas.h:366
int event_size
Definition msysmon.cxx:527
INT num_out_events
Definition midas.h:965
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9611 of file midas.cxx.

9611 {
9612 if (pc->pid) {
9613 int j;
9614 for (j = 0; j < pc->max_request_index; j++) {
9615 const EVENT_REQUEST *prequest = pc->event_request + j;
9616 if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9617 return prequest->id;
9618 }
9619 }
9620 }
9621
9622 return -1;
9623}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10229 of file midas.cxx.

10230{
10231 if (rpc_is_remote()) {
10232 return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10233 }
10234
10235#ifdef LOCAL_ROUTINES
10236 {
10237 INT status = 0;
10238
10239 //printf("bm_flush_cache!\n");
10240
10241 BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10242
10243 if (!pbuf)
10244 return status;
10245
10246 if (pbuf->write_cache_size == 0)
10247 return BM_SUCCESS;
10248
10250
10251 if (status != BM_SUCCESS)
10252 return status;
10253
10254 /* check if anything needs to be flushed */
10255 if (pbuf->write_cache_wp == 0) {
10256 pbuf->write_cache_mutex.unlock();
10257 return BM_SUCCESS;
10258 }
10259
10260 /* lock the buffer */
10262
10263 if (!pbuf_guard.is_locked())
10264 return pbuf_guard.get_status();
10265
10267
10268 /* unlock in correct order */
10269
10270 if (pbuf_guard.is_locked()) {
10271 // check if bm_wait_for_free_space() failed to relock the buffer
10272 pbuf_guard.unlock();
10273 }
10274
10275 pbuf->write_cache_mutex.unlock();
10276
10277 return status;
10278 }
10279#endif /* LOCAL_ROUTINES */
10280
10281 return BM_SUCCESS;
10282}
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition midas.cxx:10009
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:10085
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10085 of file midas.cxx.

10086{
10087 // NB we come here with write cache locked and buffer locked.
10088
10089 {
10090 INT status = 0;
10091
10092 //printf("bm_flush_cache_locked!\n");
10093
10094 BUFFER* pbuf = pbuf_guard.get_pbuf();
10095 BUFFER_HEADER* pheader = pbuf->buffer_header;
10096
10097 //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10098
10099 int old_write_pointer = pheader->write_pointer;
10100
10101 int request_id[MAX_CLIENTS];
10102 for (int i = 0; i < pheader->max_client_index; i++) {
10103 request_id[i] = -1;
10104 }
10105
10106 size_t ask_rp = pbuf->write_cache_rp;
10107 size_t ask_wp = pbuf->write_cache_wp;
10108
10109 if (ask_wp == 0) { // nothing to do
10110 return BM_SUCCESS;
10111 }
10112
10113 if (ask_rp == ask_wp) { // nothing to do
10114 return BM_SUCCESS;
10115 }
10116
10117 assert(ask_rp < ask_wp);
10118
10119 size_t ask_free = ALIGN8(ask_wp - ask_rp);
10120
10121 if (ask_free == 0) { // nothing to do
10122 return BM_SUCCESS;
10123 }
10124
10125#if 0
10127 if (status != BM_SUCCESS) {
10128 printf("bm_flush_cache: corrupted 111!\n");
10129 abort();
10130 }
10131#endif
10132
10134
10135 if (status != BM_SUCCESS) {
10136 return status;
10137 }
10138
10139 // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10140 //
10141 // wait_for_free_space() will sleep with all locks released,
10142 // during this time, another thread may call bm_send_event() that will
10143 // add one or more events to the write cache and after wait_for_free_space()
10144 // returns, size of data in cache will be bigger than the amount
10145 // of free space we requested. so we need to keep track of how
10146 // much data we write to the buffer and ask for more data
10147 // if we run short. This is the reason for the big loop
10148 // around wait_for_free_space(). We ask for slightly too little free
10149 // space to make sure all this code is always used and does work. K.O.
10150
10151 if (pbuf->write_cache_wp == 0) {
10152 /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10153 return BM_SUCCESS;
10154 }
10155
10156 //size_t written = 0;
10157 while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10158 /* loop over all events in cache */
10159
10160 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10161 size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10162 size_t total_size = ALIGN8(event_size);
10163
10164#if 0
10165 printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10166 int(pbuf->write_cache_size),
10167 int(pbuf->write_cache_wp),
10168 int(pbuf->write_cache_rp),
10169 int(pevent->data_size),
10170 int(event_size),
10171 int(total_size),
10172 int(ask_free),
10173 int(written));
10174#endif
10175
10176 // check for crazy event size
10177 assert(total_size >= sizeof(EVENT_HEADER));
10178 assert(total_size <= (size_t)pheader->size);
10179
10180 bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10181
10182 /* update statistics */
10183 pheader->num_in_events++;
10184 pbuf->count_sent += 1;
10185 pbuf->bytes_sent += total_size;
10186
10187 /* see comment for the same code in bm_send_event().
10188 * We make sure the buffer is never 100% full */
10189 assert(pheader->write_pointer != pheader->read_pointer);
10190
10191 /* check if anybody has a request for this event */
10192 for (int i = 0; i < pheader->max_client_index; i++) {
10193 BUFFER_CLIENT *pc = pheader->client + i;
10194 int r = bm_find_first_request_locked(pc, pevent);
10195 if (r >= 0) {
10196 request_id[i] = r;
10197 }
10198 }
10199
10200 /* this loop does not loop forever because rp
10201 * is monotonously incremented here. write_cache_wp does
10202 * not change */
10203
10204 pbuf->write_cache_rp += total_size;
10205 //written += total_size;
10206
10207 assert(pbuf->write_cache_rp > 0);
10208 assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10209 assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10210 }
10211
10212 /* the write cache is now empty */
10213 assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10214 pbuf->write_cache_wp = 0;
10215 pbuf->write_cache_rp = 0;
10216
10217 /* check which clients are waiting */
10218 for (int i = 0; i < pheader->max_client_index; i++) {
10219 BUFFER_CLIENT *pc = pheader->client + i;
10220 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10221 }
10222 }
10223
10224 return BM_SUCCESS;
10225}
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition midas.cxx:9625
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:9611
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition midas.cxx:6329
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition midas.cxx:9526
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition midas.cxx:9102
#define ALIGN8(x)
Definition midas.h:522
INT num_in_events
Definition midas.h:964
INT write_pointer
Definition midas.h:963
INT read_pointer
Definition midas.h:962
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 10009 of file midas.cxx.

10010{
10011 //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
10012
10015
10017
10018 while (1) {
10019 if (timeout_msec == BM_WAIT) {
10020 xtimeout_msec = 1000;
10021 } else if (timeout_msec == BM_NO_WAIT) {
10023 } else {
10024 if (xtimeout_msec > 1000) {
10025 xtimeout_msec = 1000;
10026 }
10027 }
10028
10029 int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
10030
10031 //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
10032
10033 if (status == BM_ASYNC_RETURN) {
10034 if (timeout_msec == BM_WAIT) {
10035 // BM_WAIT means wait forever
10036 continue;
10037 } else if (timeout_msec == BM_NO_WAIT) {
10038 // BM_NO_WAIT means do not wait
10039 return status;
10040 } else {
10042 if (now >= time_end) {
10043 // timeout, return BM_ASYNC_RETURN
10044 return status;
10045 }
10046
10048
10049 if (remain < (DWORD)xtimeout_msec) {
10051 }
10052
10053 // keep asking for event...
10054 continue;
10055 }
10056 } else if (status == BM_SUCCESS) {
10057 // success, return BM_SUCCESS
10058 return status;
10059 } else {
10060 // error
10061 return status;
10062 }
10063 }
10064}
#define BM_WAIT
Definition midas.h:365
#define RPC_BM_FLUSH_CACHE
Definition mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7097 of file midas.cxx.

7098{
7099 gBuffersMutex.lock();
7100 for (size_t i = 0; i < gBuffers.size(); i++) {
7101 BUFFER* pbuf = gBuffers[i];
7102 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7103 *buffer_handle = i + 1;
7104 gBuffersMutex.unlock();
7105 return BM_SUCCESS;
7106 }
7107 }
7108 gBuffersMutex.unlock();
7109 return BM_NOT_FOUND;
7110}
#define BM_NOT_FOUND
Definition midas.h:612
BOOL equal_ustring(const char *str1, const char *str2)
Definition odb.cxx:3206
char buffer_name[NAME_LENGTH]
Definition mevb.cxx:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8877 of file midas.cxx.

8877 {
8878 /* increment read cache read pointer */
8879 pbuf->read_cache_rp += total_size;
8880
8881 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8882 pbuf->read_cache_rp = 0;
8883 pbuf->read_cache_wp = 0;
8884 }
8885}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6245 of file midas.cxx.

6246{
6247#if 0
6248 if (gRpLog == NULL) {
6249 gRpLog = fopen("rp.log", "a");
6250 }
6251 if (gRpLog && (total_size < 16)) {
6252 const char *pdata = (const char *) (pheader + 1);
6253 const DWORD *pevent = (const DWORD*) (pdata + rp);
6254 fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6255 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6256 }
6257#endif
6258
6259 // these checks are already done before we come here.
6260 // but we check again as last-ressort protection. K.O.
6261 assert(total_size > 0);
6262 assert(total_size >= (int)sizeof(EVENT_HEADER));
6263
6264 rp += total_size;
6265 if (rp >= pheader->size) {
6266 rp -= pheader->size;
6267 } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6268 // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6269 // if at the end of the buffer, the remaining free space is exactly
6270 // equal to the size of an event header, the event header
6271 // is written there, the pointer is wrapped and the event data
6272 // is written to the beginning of the buffer.
6273 rp = 0;
6274 }
6275 return rp;
6276}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 6037 of file midas.cxx.

6037 {
6038 // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
6039 // because of mismatch in sign-extension between signed 16-bit event_id and
6040 // unsigned 16-bit constants. K.O.
6041
6042 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
6043 /* fragmented event */
6044 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
6046
6047 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
6049}
#define TRIGGER_ALL
Definition midas.h:538
#define EVENTID_ALL
Definition midas.h:537
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char who,
const BUFFER_HEADER pheader,
const char pdata,
int  rp 
)
static

Definition at line 6278 of file midas.cxx.

6278 {
6279 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6280 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6281 int total_size = ALIGN8(event_size);
6282
6283 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6284 cm_msg(MERROR, "bm_next_rp",
6285 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6286 pheader->name,
6287 rp,
6288 pevent->data_size,
6289 event_size,
6290 total_size,
6291 pheader->read_pointer,
6292 pheader->write_pointer,
6293 pheader->size,
6294 who);
6295 return -1;
6296 }
6297
6298 int remaining = 0;
6299 if (rp < pheader->write_pointer) {
6300 remaining = pheader->write_pointer - rp;
6301 } else {
6302 remaining = pheader->size - rp;
6303 remaining += pheader->write_pointer;
6304 }
6305
6306 //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6307
6308 if (total_size > remaining) {
6309 cm_msg(MERROR, "bm_next_rp",
6310 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6311 pheader->name,
6312 rp,
6313 pevent->data_size,
6314 event_size,
6315 total_size,
6316 pheader->read_pointer,
6317 pheader->write_pointer,
6318 pheader->size,
6319 remaining,
6320 who);
6321 return -1;
6322 }
6323
6324 rp = bm_incr_rp_no_check(pheader, rp, total_size);
6325
6326 return rp;
6327}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9625 of file midas.cxx.

9625 {
9626 if (request_id >= 0) {
9627 /* if that client has a request and is suspended, wake it up */
9628 if (pc->read_wait) {
9629 char str[80];
9630 sprintf(str, "B %s %d", pheader->name, request_id);
9631 ss_resume(pc->port, str);
9632 //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9633 //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9634 pc->read_wait = FALSE;
9635 }
9636 }
9637}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6739
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8487
INT cm_yield(INT millisec)
Definition midas.cxx:5664
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition midas.cxx:2292
INT cm_disconnect_experiment(void)
Definition midas.cxx:2860
#define CM_SUCCESS
Definition midas.h:582
#define SS_ABORT
Definition midas.h:677
#define RPC_SHUTDOWN
Definition midas.h:707
int main()
Definition hwtest.cxx:23
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define EVENT_BUFFER_NAME
Definition midas.h:269
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6739 of file midas.cxx.

6739 {
6740 INT status;
6741
6742 if (rpc_is_remote()) {
6743 status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6744
6745 HNDLE hDB;
6747 if (status != SUCCESS || hDB == 0) {
6748 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6749 return BM_NO_SHM;
6750 }
6751
6753
6754 int size = sizeof(INT);
6755 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6756
6757 if (status != DB_SUCCESS) {
6758 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6759 status);
6760 return status;
6761 }
6762
6763 return status;
6764 }
6765#ifdef LOCAL_ROUTINES
6766 {
6767 HNDLE shm_handle;
6768 size_t shm_size;
6769 HNDLE hDB;
6770 const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6771
6772 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6773
6774 if (!buffer_name || !buffer_name[0]) {
6775 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6776 return BM_INVALID_PARAM;
6777 }
6778
6779 if (strlen(buffer_name) >= NAME_LENGTH) {
6780 cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6781 return BM_INVALID_PARAM;
6782 }
6783
6785
6786 if (status != SUCCESS || hDB == 0) {
6787 //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6788 return BM_NO_SHM;
6789 }
6790
6791 /* get buffer size from ODB, user parameter as default if not present in ODB */
6792 std::string odb_path;
6793 odb_path += "/Experiment/Buffer sizes/";
6794 odb_path += buffer_name;
6795
6796 int size = sizeof(INT);
6797 status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6798
6800 cm_msg(MERROR, "bm_open_buffer",
6801 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6802 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6803 return BM_INVALID_PARAM;
6804 }
6805
6807
6808 size = sizeof(INT);
6809 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6810
6811 if (status != DB_SUCCESS) {
6812 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6813 status);
6814 return status;
6815 }
6816
6817 /* check if buffer already is open */
6818 gBuffersMutex.lock();
6819 for (size_t i = 0; i < gBuffers.size(); i++) {
6820 BUFFER* pbuf = gBuffers[i];
6821 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6822 *buffer_handle = i + 1;
6823 gBuffersMutex.unlock();
6824 return BM_SUCCESS;
6825 }
6826 }
6827 gBuffersMutex.unlock();
6828
6829 // only one thread at a time should create new buffers
6830
6831 static std::mutex gNewBufferMutex;
6832 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6833
6834 // if we had a race against another thread
6835 // and while we were waiting for gNewBufferMutex
6836 // the other thread created this buffer, we return it.
6837
6838 gBuffersMutex.lock();
6839 for (size_t i = 0; i < gBuffers.size(); i++) {
6840 BUFFER* pbuf = gBuffers[i];
6841 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6842 *buffer_handle = i + 1;
6843 gBuffersMutex.unlock();
6844 return BM_SUCCESS;
6845 }
6846 }
6847 gBuffersMutex.unlock();
6848
6849 /* allocate new BUFFER object */
6850
6851 BUFFER* pbuf = new BUFFER;
6852
6853 /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6854
6855 for (int i=0; i<MAX_CLIENTS; i++) {
6857 pbuf->client_time_write_wait[i] = 0;
6858 }
6859
6860 /* create buffer semaphore */
6861
6862 status = ss_semaphore_create(buffer_name, &(pbuf->semaphore));
6863
6864 if (status != SS_CREATED && status != SS_SUCCESS) {
6865 *buffer_handle = 0;
6866 delete pbuf;
6867 return BM_NO_SEMAPHORE;
6868 }
6869
6870 std::string client_name = cm_get_client_name();
6871
6872 /* store client name */
6873 mstrlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6874
6875 /* store buffer name */
6876 mstrlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6877
6878 /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6879
6880 pbuf->attached = true; // required by bm_lock_buffer()
6881
6883
6884 if (!pbuf_guard.is_locked()) {
6885 // cannot happen, no other thread can see this pbuf
6886 abort();
6887 return BM_NO_SEMAPHORE;
6888 }
6889
6890 /* open shared memory */
6891
6892 void *p = NULL;
6893 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6894
6895 if (status != SS_SUCCESS && status != SS_CREATED) {
6896 *buffer_handle = 0;
6897 pbuf_guard.unlock();
6898 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6899 delete pbuf;
6900 return BM_NO_SHM;
6901 }
6902
6903 pbuf->buffer_header = (BUFFER_HEADER *) p;
6904
6905 BUFFER_HEADER *pheader = pbuf->buffer_header;
6906
6907 bool shm_created = (status == SS_CREATED);
6908
6909 if (shm_created) {
6910 /* initialize newly created shared memory */
6911
6912 memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6913
6914 mstrlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6915 pheader->size = buffer_size;
6916
6917 } else {
6918 /* validate existing shared memory */
6919
6920 if (!equal_ustring(pheader->name, buffer_name)) {
6921 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6922 pbuf_guard.unlock();
6923 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6924 cm_msg(MERROR, "bm_open_buffer",
6925 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6926 pheader->name);
6927 *buffer_handle = 0;
6928 delete pbuf;
6929 return BM_CORRUPTED;
6930 }
6931
6932 if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6933 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6934 pbuf_guard.unlock();
6935 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6936 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6938 *buffer_handle = 0;
6939 delete pbuf;
6940 return BM_CORRUPTED;
6941 }
6942
6943 if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6944 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6945 pbuf_guard.unlock();
6946 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6947 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6949 *buffer_handle = 0;
6950 delete pbuf;
6951 return BM_CORRUPTED;
6952 }
6953
6954 /* check if buffer size is identical */
6955 if (pheader->size != buffer_size) {
6956 cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6957 buffer_name, buffer_size, pheader->size);
6958
6959 buffer_size = pheader->size;
6960
6961 ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6962
6963 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6964
6965 if (status != SS_SUCCESS) {
6966 *buffer_handle = 0;
6967 pbuf_guard.unlock();
6968 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6969 delete pbuf;
6970 return BM_NO_SHM;
6971 }
6972
6973 pbuf->buffer_header = (BUFFER_HEADER *) p;
6974 pheader = pbuf->buffer_header;
6975 }
6976 }
6977
6978 /* shared memory is good from here down */
6979
6980 pbuf->attached = true;
6981
6982 pbuf->shm_handle = shm_handle;
6983 pbuf->shm_size = shm_size;
6984 pbuf->callback = FALSE;
6985
6986 bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6987
6989 if (status != BM_SUCCESS) {
6990 cm_msg(MERROR, "bm_open_buffer",
6991 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6992 status);
6994 cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6995 }
6996
6997 /* add our client BUFFER_HEADER */
6998
6999 int iclient = 0;
7000 for (; iclient < MAX_CLIENTS; iclient++)
7001 if (pheader->client[iclient].pid == 0)
7002 break;
7003
7004 if (iclient == MAX_CLIENTS) {
7005 *buffer_handle = 0;
7006 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
7007 pbuf_guard.unlock();
7008 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
7009 delete pbuf;
7010 cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
7011 return BM_NO_SLOT;
7012 }
7013
7014 /* store slot index in _buffer structure */
7015 pbuf->client_index = iclient;
7016
7017 /*
7018 Save the index of the last client of that buffer so that later only
7019 the clients 0..max_client_index-1 have to be searched through.
7020 */
7021 pheader->num_clients++;
7022 if (iclient + 1 > pheader->max_client_index)
7023 pheader->max_client_index = iclient + 1;
7024
7025 /* setup buffer header and client structure */
7026 BUFFER_CLIENT *pclient = &pheader->client[iclient];
7027
7028 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7029
7030 mstrlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
7031
7032 pclient->pid = ss_getpid();
7033
7035
7036 pclient->read_pointer = pheader->write_pointer;
7037 pclient->last_activity = ss_millitime();
7038
7039 cm_get_watchdog_params(NULL, &pclient->watchdog_timeout);
7040
7041 pbuf_guard.unlock();
7042
7043 /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
7044
7045 pheader = NULL;
7046
7047 /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
7048
7051
7052 /* add pbuf to buffer list */
7053
7054 gBuffersMutex.lock();
7055
7056 bool added = false;
7057 for (size_t i=0; i<gBuffers.size(); i++) {
7058 if (gBuffers[i] == NULL) {
7059 gBuffers[i] = pbuf;
7060 added = true;
7061 *buffer_handle = i+1;
7062 break;
7063 }
7064 }
7065 if (!added) {
7066 *buffer_handle = gBuffers.size() + 1;
7067 gBuffers.push_back(pbuf);
7068 }
7069
7070 /* from here down we should not touch pbuf without locking it */
7071
7072 pbuf = NULL;
7073
7074 gBuffersMutex.unlock();
7075
7076 /* new buffer is now ready for use */
7077
7078 /* initialize buffer counters */
7079 bm_init_buffer_counters(*buffer_handle);
7080
7081 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7082
7083 if (shm_created)
7084 return BM_CREATED;
7085 }
7086#endif /* LOCAL_ROUTINES */
7087
7088 return BM_SUCCESS;
7089}
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition midas.cxx:6088
static DWORD _bm_max_event_size
Definition midas.cxx:5936
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition midas.cxx:6430
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition midas.cxx:6413
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3339
std::string cm_get_client_name()
Definition midas.cxx:2073
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition midas.cxx:6174
#define BM_NO_SLOT
Definition midas.h:610
#define BM_NO_SHM
Definition midas.h:622
#define BM_CREATED
Definition midas.h:606
#define BM_NO_SEMAPHORE
Definition midas.h:611
#define SS_SUCCESS
Definition midas.h:663
#define SS_CREATED
Definition midas.h:664
#define SUCCESS
Definition mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition system.cxx:2532
INT ss_getpid(void)
Definition system.cxx:1379
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition system.cxx:326
midas_thread_t ss_gettid(void)
Definition system.cxx:1591
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition system.cxx:4425
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5420
#define RPC_BM_OPEN_BUFFER
Definition mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition midas.cxx:8085
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
int client_count_write_wait[MAX_CLIENTS]
Definition midas.h:1022
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8913 of file midas.cxx.

8914{
8915 if (pc->read_pointer == pheader->write_pointer) {
8916 /* no more events buffered for this client */
8917 if (!pc->read_wait) {
8918 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8919 pc->read_wait = TRUE;
8920 }
8921 return BM_ASYNC_RETURN;
8922 }
8923
8924 if (pc->read_wait) {
8925 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8926 pc->read_wait = FALSE;
8927 }
8928
8929 if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8930 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8931 return BM_CORRUPTED;
8932 }
8933
8934 char *pdata = (char *) (pheader + 1);
8935
8936 EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8937 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8938 int total_size = ALIGN8(event_size);
8939
8940 if ((total_size <= 0) || (total_size > pheader->size)) {
8941 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8942 return BM_CORRUPTED;
8943 }
8944
8945 assert(total_size > 0);
8946 assert(total_size <= pheader->size);
8947
8948 if (ppevent)
8949 *ppevent = pevent;
8950 if (pevent_size)
8952 if (ptotal_size)
8953 *ptotal_size = total_size;
8954
8955 return BM_SUCCESS;
8956}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8887 of file midas.cxx.

8888{
8889 if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8890 return FALSE;
8891
8892 EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8893 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8894 int total_size = ALIGN8(event_size);
8895
8896 if (ppevent)
8897 *ppevent = pevent;
8898 if (pevent_size)
8900 if (ptotal_size)
8901 *ptotal_size = total_size;
8902
8903 return TRUE;
8904}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11148 of file midas.cxx.

11162{
11164
11165 //printf("bm_poll_event!\n");
11166
11167 DWORD start_time = ss_millitime();
11168
11169 std::vector<char> vec;
11170
11171 /* loop over all requests */
11172 _request_list_mutex.lock();
11173 bool locked = true;
11174 size_t n = _request_list.size();
11175 for (size_t i = 0; i < n; i++) {
11176 if (!locked) {
11177 _request_list_mutex.lock();
11178 locked = true;
11179 }
11180 /* continue if no dispatcher set (manual bm_receive_event) */
11181 if (_request_list[i].dispatcher == NULL)
11182 continue;
11183
11184 int buffer_handle = _request_list[i].buffer_handle;
11185
11186 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11187 _request_list_mutex.unlock();
11188 locked = false;
11189
11190 do {
11191 /* receive event */
11192 int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11193
11194 //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11195
11196 /* call user function if successful */
11197 if (status == BM_SUCCESS) {
11198 bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11200 }
11201
11202 /* break if no more events */
11203 if (status == BM_ASYNC_RETURN)
11204 break;
11205
11206 /* break if corrupted event buffer */
11207 if (status == BM_TRUNCATED) {
11208 cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11209 }
11210
11211 /* break if corrupted event buffer */
11212 if (status == BM_CORRUPTED)
11213 return SS_ABORT;
11214
11215 /* break if server died */
11216 if (status == RPC_NET_ERROR) {
11217 return SS_ABORT;
11218 }
11219
11220 /* stop after one second */
11221 if (ss_millitime() - start_time > 1000) {
11222 break;
11223 }
11224
11225 } while (TRUE);
11226 }
11227
11228 if (locked)
11229 _request_list_mutex.unlock();
11230
11232 return BM_SUCCESS;
11233 else
11234 return BM_ASYNC_RETURN;
11235}
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10831
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition midas.cxx:8845
#define BM_TRUNCATED
Definition midas.h:614
#define RPC_NET_ERROR
Definition midas.h:701
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 10924 of file midas.cxx.

10924 {
10925 //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
10926
10927 /* return immediately if no callback routine is defined */
10928 if (!pbuf->callback)
10929 return BM_SUCCESS;
10930
10931 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
10932}
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition midas.cxx:10286
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10286 of file midas.cxx.

10286 {
10288
10289 int max_size = 0;
10290 if (buf_size) {
10291 max_size = *buf_size;
10292 *buf_size = 0;
10293 }
10294
10295 //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10296
10297 bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10298
10299 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10300
10301 /* look if there is anything in the cache */
10302 if (pbuf->read_cache_size > 0) {
10303
10305
10306 if (status != BM_SUCCESS)
10307 return status;
10308
10309 if (pbuf->read_cache_wp == 0) {
10310
10311 // lock buffer for the first time
10312
10313 if (!pbuf_guard.relock()) {
10314 pbuf->read_cache_mutex.unlock();
10315 return pbuf_guard.get_status();
10316 }
10317
10319 if (status != BM_SUCCESS) {
10320 // unlock in correct order
10321 if (pbuf_guard.is_locked()) {
10322 // check if bm_wait_for_more_events() failed to relock the buffer
10323 pbuf_guard.unlock();
10324 }
10325 pbuf->read_cache_mutex.unlock();
10326 return status;
10327 }
10328
10329 // buffer remains locked here
10330 }
10331 EVENT_HEADER *pevent;
10332 int event_size;
10333 int total_size;
10334 if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10335 if (pbuf_guard.is_locked()) {
10336 // do not need to keep the event buffer locked
10337 // when reading from the read cache
10338 pbuf_guard.unlock();
10339 }
10340 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10342 if (buf) {
10343 if (event_size > max_size) {
10344 cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10347 }
10348
10349 memcpy(buf, pevent, event_size);
10350
10351 if (buf_size) {
10352 *buf_size = event_size;
10353 }
10354 if (convert_flags) {
10355 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10356 }
10357 } else if (bufptr) {
10359 memcpy(*bufptr, pevent, event_size);
10361 } else if (vecptr) {
10362 vecptr->resize(0);
10363 char* cptr = (char*)pevent;
10364 vecptr->assign(cptr, cptr+event_size);
10365 }
10366 bm_incr_read_cache_locked(pbuf, total_size);
10367 pbuf->read_cache_mutex.unlock();
10368 if (dispatch) {
10369 // FIXME need to protect currently dispatched event against
10370 // another thread overwriting it by refilling the read cache
10371 bm_dispatch_event(buffer_handle, pevent);
10372 return BM_MORE_EVENTS;
10373 }
10374 // buffer is unlocked here
10375 return status;
10376 }
10377 pbuf->read_cache_mutex.unlock();
10378 }
10379
10380 /* we come here if the read cache is disabled */
10381 /* we come here if the next event is too big to fit into the read cache */
10382
10383 if (!pbuf_guard.is_locked()) {
10384 if (!pbuf_guard.relock())
10385 return pbuf_guard.get_status();
10386 }
10387
10389
10390 BUFFER_HEADER *pheader = pbuf->buffer_header;
10391
10393
10394 while (1) {
10395 /* loop over events in the event buffer */
10396
10398
10399 if (status != BM_SUCCESS) {
10400 // implicit unlock
10401 return status;
10402 }
10403
10404 /* check if event at current read pointer matches a request */
10405
10406 EVENT_HEADER *pevent;
10407 int event_size;
10408 int total_size;
10409
10410 status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10411 if (status == BM_CORRUPTED) {
10412 // implicit unlock
10413 return status;
10414 } else if (status != BM_SUCCESS) {
10415 /* event buffer is empty */
10416 break;
10417 }
10418
10420
10421 if (is_requested) {
10422 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10423
10425
10426 if (buf) {
10427 if (event_size > max_size) {
10428 cm_msg(MERROR, "bm_read_buffer",
10429 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10430 event_size, pheader->name);
10433 }
10434
10435 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10436
10437 if (buf_size) {
10438 *buf_size = event_size;
10439 }
10440
10441 if (convert_flags) {
10442 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10443 }
10444
10445 pbuf->count_read++;
10446 pbuf->bytes_read += event_size;
10447 } else if (dispatch || bufptr) {
10448 assert(event_buffer == NULL); // make sure we only come here once
10450 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) event_buffer, event_size);
10451 pbuf->count_read++;
10452 pbuf->bytes_read += event_size;
10453 } else if (vecptr) {
10454 bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10455 pbuf->count_read++;
10456 pbuf->bytes_read += event_size;
10457 }
10458
10459 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10460 pc->read_pointer = new_read_pointer;
10461
10462 pheader->num_out_events++;
10463 /* exit loop over events */
10464 break;
10465 }
10466
10467 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10468 pc->read_pointer = new_read_pointer;
10469 pheader->num_out_events++;
10470 }
10471
10472 /*
10473 If read pointer has been changed, it may have freed up some space
10474 for waiting producers. So check if free space is now more than 50%
10475 of the buffer size and wake waiting producers.
10476 */
10477
10479
10480 pbuf_guard.unlock();
10481
10482 if (dispatch && event_buffer) {
10483 bm_dispatch_event(buffer_handle, event_buffer);
10484 free(event_buffer);
10486 return BM_MORE_EVENTS;
10487 }
10488
10489 if (bufptr && event_buffer) {
10493 }
10494
10495 if (event_buffer) {
10496 free(event_buffer);
10498 }
10499
10500 return status;
10501}
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition midas.cxx:9091
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:9014
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8887
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition midas.cxx:8877
void * event_buffer
Definition mfe.cxx:65
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char buf,
int  event_size 
)
static

Definition at line 8958 of file midas.cxx.

8959{
8960 const char *pdata = (const char *) (pheader + 1);
8961
8962 if (rp + event_size <= pheader->size) {
8963 /* copy event to cache */
8964 memcpy(buf, pdata + rp, event_size);
8965 } else {
8966 /* event is splitted */
8967 int size = pheader->size - rp;
8968 memcpy(buf, pdata + rp, size);
8969 memcpy(buf + size, pdata, event_size - size);
8970 }
8971}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8973 of file midas.cxx.

8974{
8975 const char *pdata = (const char *) (pheader + 1);
8976
8977 if (rp + event_size <= pheader->size) {
8978 /* copy event to cache */
8979 vecptr->assign(pdata + rp, pdata + rp + event_size);
8980 } else {
8981 /* event is splitted */
8982 int size = pheader->size - rp;
8983 vecptr->assign(pdata + rp, pdata + rp + size);
8984 vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8985 }
8986}
Here is the call graph for this function:

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition midas.cxx:10672
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10672 of file midas.cxx.

10672 {
10673 //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10674 if (rpc_is_remote()) {
10675 return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10676 }
10677#ifdef LOCAL_ROUTINES
10678 {
10680
10681 BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10682
10683 if (!pbuf)
10684 return status;
10685
10686 int convert_flags = rpc_get_convert_flags();
10687
10688 status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10689 //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10690 return status;
10691 }
10692#else /* LOCAL_ROUTINES */
10693
10694 return BM_SUCCESS;
10695#endif
10696}
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10505
INT rpc_get_convert_flags(void)
Definition midas.cxx:13052
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10753 of file midas.cxx.

10753 {
10754 if (rpc_is_remote()) {
10755 return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10756 }
10757#ifdef LOCAL_ROUTINES
10758 {
10760
10761 BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10762
10763 if (!pbuf)
10764 return status;
10765
10766 int convert_flags = rpc_get_convert_flags();
10767
10768 return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10769 }
10770#else /* LOCAL_ROUTINES */
10771
10772 return BM_SUCCESS;
10773#endif
10774}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void buf,
int buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10505 of file midas.cxx.

10506{
10507 //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10508
10509 assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10510
10511 void *xbuf = NULL;
10512 int xbuf_size = 0;
10513
10514 if (buf) {
10515 xbuf = buf;
10516 xbuf_size = *buf_size;
10517 } else if (ppevent) {
10520 } else if (pvec) {
10521 pvec->resize(_bm_max_event_size);
10522 xbuf = pvec->data();
10523 xbuf_size = pvec->size();
10524 } else {
10525 assert(!"incorrect call to bm_receivent_event_rpc()");
10526 }
10527
10528 int status;
10531
10533
10534 int zbuf_size = xbuf_size;
10535
10536 while (1) {
10537 if (timeout_msec == BM_WAIT) {
10538 xtimeout_msec = 1000;
10539 } else if (timeout_msec == BM_NO_WAIT) {
10541 } else {
10542 if (xtimeout_msec > 1000) {
10543 xtimeout_msec = 1000;
10544 }
10545 }
10546
10548
10550
10551 //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10552
10553 if (status == BM_ASYNC_RETURN) {
10554 if (timeout_msec == BM_WAIT) {
10555 // BM_WAIT means wait forever
10556 continue;
10557 } else if (timeout_msec == BM_NO_WAIT) {
10558 // BM_NO_WAIT means do not wait
10559 break;
10560 } else {
10562 if (now >= time_end) {
10563 // timeout, return BM_ASYNC_RETURN
10564 break;
10565 }
10566
10568
10569 if (remain < (DWORD)xtimeout_msec) {
10571 }
10572
10573 // keep asking for event...
10574 continue;
10575 }
10576 } else if (status == BM_SUCCESS) {
10577 // success, return BM_SUCCESS
10578 break;
10579 }
10580
10581 // RPC error
10582
10583 if (buf) {
10584 *buf_size = 0;
10585 } else if (ppevent) {
10586 free(*ppevent);
10587 *ppevent = NULL;
10588 } else if (pvec) {
10589 pvec->resize(0);
10590 } else {
10591 assert(!"incorrect call to bm_receivent_event_rpc()");
10592 }
10593
10594 return status;
10595 }
10596
10597 // status is BM_SUCCESS or BM_ASYNC_RETURN
10598
10599 if (buf) {
10600 *buf_size = zbuf_size;
10601 } else if (ppevent) {
10602 // nothing to do
10603 // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10604 } else if (pvec) {
10605 pvec->resize(zbuf_size);
10606 } else {
10607 assert(!"incorrect call to bm_receivent_event_rpc()");
10608 }
10609
10610 return status;
10611}
#define RPC_BM_RECEIVE_EVENT
Definition mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10831 of file midas.cxx.

10831 {
10832 if (rpc_is_remote()) {
10833 return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10834 }
10835#ifdef LOCAL_ROUTINES
10836 {
10838
10839 BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10840
10841 if (!pbuf)
10842 return status;
10843
10844 int convert_flags = rpc_get_convert_flags();
10845
10846 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10847 }
10848#else /* LOCAL_ROUTINES */
10849 return BM_SUCCESS;
10850#endif
10851}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 6057 of file midas.cxx.

6057 {
6058 int k, nc;
6060
6061 /* clear entry from client structure in buffer header */
6062 memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6063
6064 /* calculate new max_client_index entry */
6065 for (k = MAX_CLIENTS - 1; k >= 0; k--)
6066 if (pheader->client[k].pid != 0)
6067 break;
6068 pheader->max_client_index = k + 1;
6069
6070 /* count new number of clients */
6071 for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6072 if (pheader->client[k].pid != 0)
6073 nc++;
6074 pheader->num_clients = nc;
6075
6076 /* check if anyone is waiting and wake him up */
6077 pbctmp = pheader->client;
6078
6079 for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6080 if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6081 ss_resume(pbctmp->port, "B ");
6082}
INT k
Definition odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8540 of file midas.cxx.

8540 {
8541 if (rpc_is_remote())
8542 return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8543
8544#ifdef LOCAL_ROUTINES
8545 {
8546 int status = 0;
8547
8548 BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8549
8550 if (!pbuf)
8551 return status;
8552
8553 /* lock buffer */
8555
8556 if (!pbuf_guard.is_locked())
8557 return pbuf_guard.get_status();
8558
8559 INT i, deleted;
8560
8561 /* get a pointer to the proper client structure */
8563
8564 /* check all requests and set to zero if matching */
8565 for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8566 if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8567 memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8568 deleted++;
8569 }
8570
8571 /* calculate new max_request_index entry */
8572 for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8573 if (pclient->event_request[i].valid)
8574 break;
8575
8576 pclient->max_request_index = i + 1;
8577
8578 /* calculate new all_flag */
8579 pclient->all_flag = FALSE;
8580
8581 for (i = 0; i < pclient->max_request_index; i++)
8582 if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8583 pclient->all_flag = TRUE;
8584 break;
8585 }
8586
8587 pbuf->get_all_flag = pclient->all_flag;
8588
8589 if (!deleted)
8590 return BM_NOT_FOUND;
8591 }
8592#endif /* LOCAL_ROUTINES */
8593
8594 return BM_SUCCESS;
8595}
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8487 of file midas.cxx.

8491{
8492 assert(request_id != NULL);
8493
8494 EventRequest r;
8495 r.buffer_handle = buffer_handle;
8496 r.event_id = event_id;
8498 r.dispatcher = func;
8499
8500 {
8501 std::lock_guard<std::mutex> guard(_request_list_mutex);
8502
8503 bool found = false;
8504
8505 // find deleted entry
8506 for (size_t i = 0; i < _request_list.size(); i++) {
8507 if (_request_list[i].buffer_handle == 0) {
8508 _request_list[i] = r;
8509 *request_id = i;
8510 found = true;
8511 break;
8512 }
8513 }
8514
8515 if (!found) { // not found
8516 *request_id = _request_list.size();
8517 _request_list.push_back(r);
8518 }
8519
8520 // implicit unlock()
8521 }
8522
8523 /* add request in buffer structure */
8524 int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8525 if (status != BM_SUCCESS)
8526 return status;
8527
8528 return BM_SUCCESS;
8529}
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition midas.cxx:8336
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6413 of file midas.cxx.

6413 {
6414 BUFFER_HEADER *pheader = pbuf->buffer_header;
6415
6416 //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6417
6418 pheader->read_pointer = 0;
6419 pheader->write_pointer = 0;
6420
6421 int i;
6422 for (i = 0; i < pheader->max_client_index; i++) {
6423 BUFFER_CLIENT *pc = pheader->client + i;
6424 if (pc->pid) {
6425 pc->read_pointer = 0;
6426 }
6427 }
6428}
INT read_pointer
Definition midas.h:940
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9700 of file midas.cxx.

9701{
9702 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9703 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9704
9705 if (data_size == 0) {
9706 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9707 return BM_INVALID_SIZE;
9708 }
9709
9710 if (data_size > MAX_DATA_SIZE) {
9711 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9712 return BM_INVALID_SIZE;
9713 }
9714
9715 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9716
9717 //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9718
9719 if (rpc_is_remote()) {
9720 //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9721 return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9722 } else {
9723 return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9724 }
9725}
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9800
#define BM_INVALID_SIZE
Definition midas.h:624
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition midas.cxx:13947
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition midas.cxx:9700
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9800 of file midas.cxx.

9801{
9802 if (rpc_is_remote())
9803 return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9804
9805 if (sg_n < 1) {
9806 cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9807 return BM_INVALID_SIZE;
9808 }
9809
9810 if (sg_ptr[0] == NULL) {
9811 cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9812 return BM_INVALID_SIZE;
9813 }
9814
9815 if (sg_len[0] < sizeof(EVENT_HEADER)) {
9816 cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9817 return BM_INVALID_SIZE;
9818 }
9819
9820 const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9821
9822 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9823 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9824
9825 if (data_size == 0) {
9826 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9827 return BM_INVALID_SIZE;
9828 }
9829
9830 if (data_size > MAX_DATA_SIZE) {
9831 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9832 return BM_INVALID_SIZE;
9833 }
9834
9835 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9836
9837 size_t count = 0;
9838 for (int i=0; i<sg_n; i++) {
9839 count += sg_len[i];
9840 }
9841
9842 if (count != event_size) {
9843 cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9844 return BM_INVALID_SIZE;
9845 }
9846
9847 //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9848
9849#ifdef LOCAL_ROUTINES
9850 {
9851 int status = 0;
9852 const size_t total_size = ALIGN8(event_size);
9853
9854 BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9855
9856 if (!pbuf)
9857 return status;
9858
9859 /* round up total_size to next DWORD boundary */
9860 //int total_size = ALIGN8(event_size);
9861
9862 /* check if write cache is enabled */
9863 if (pbuf->write_cache_size) {
9865
9866 if (status != BM_SUCCESS)
9867 return status;
9868
9869 /* check if write cache is enabled */
9870 if (pbuf->write_cache_size) {
9871 size_t max_event_size = pbuf->write_cache_size/MAX_WRITE_CACHE_EVENT_SIZE_DIV;
9873
9874 //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9875
9876 /* if this event does not fit into the write cache, flush the write cache */
9877 if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9878 //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9879
9881
9882 if (!pbuf_guard.is_locked()) {
9883 pbuf->write_cache_mutex.unlock();
9884 return pbuf_guard.get_status();
9885 }
9886
9888
9889 if (pbuf_guard.is_locked()) {
9890 // check if bm_wait_for_free_space() failed to relock the buffer
9891 pbuf_guard.unlock();
9892 }
9893
9894 if (status != BM_SUCCESS) {
9895 pbuf->write_cache_mutex.unlock();
9896 // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9897 if (status == BM_NO_MEMORY)
9898 cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9899 return status;
9900 }
9901
9902 // write cache must be empty here
9903 assert(pbuf->write_cache_wp == 0);
9904 }
9905
9906 /* write this event into the write cache, if it is not too big and if it fits */
9907 if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9908 //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9909
9910 char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9911
9912 for (int i=0; i<sg_n; i++) {
9913 memcpy(wptr, sg_ptr[i], sg_len[i]);
9914 wptr += sg_len[i];
9915 }
9916
9917 pbuf->write_cache_wp += total_size;
9918
9919 pbuf->write_cache_mutex.unlock();
9920 return BM_SUCCESS;
9921 }
9922 }
9923
9924 /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9925 pbuf->write_cache_mutex.unlock();
9926 }
9927
9928 /* we come here only for events that are too big to fit into the cache */
9929
9930 /* lock the buffer */
9932
9933 if (!pbuf_guard.is_locked()) {
9934 return pbuf_guard.get_status();
9935 }
9936
9937 /* calculate some shorthands */
9938 BUFFER_HEADER *pheader = pbuf->buffer_header;
9939
9940#if 0
9942 if (status != BM_SUCCESS) {
9943 printf("bm_send_event: corrupted 111!\n");
9944 abort();
9945 }
9946#endif
9947
9948 /* check if buffer is large enough */
9949 if (total_size >= (size_t)pheader->size) {
9950 pbuf_guard.unlock(); // unlock before cm_msg()
9951 cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9952 return BM_NO_MEMORY;
9953 }
9954
9956
9957 if (status != BM_SUCCESS) {
9958 // implicit unlock
9959 return status;
9960 }
9961
9962#if 0
9964 if (status != BM_SUCCESS) {
9965 printf("bm_send_event: corrupted 222!\n");
9966 abort();
9967 }
9968#endif
9969
9970 int old_write_pointer = pheader->write_pointer;
9971
9972 bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9973
9974 /* write pointer was incremented, but there should
9975 * always be some free space in the buffer and the
9976 * write pointer should never cacth up to the read pointer:
9977 * the rest of the code gets confused this happens (buffer 100% full)
9978 * as it is write_pointer == read_pointer can be either
9979 * 100% full or 100% empty. My solution: never fill
9980 * the buffer to 100% */
9981 assert(pheader->write_pointer != pheader->read_pointer);
9982
9983 /* send wake up messages to all clients that want this event */
9984 int i;
9985 for (i = 0; i < pheader->max_client_index; i++) {
9986 BUFFER_CLIENT *pc = pheader->client + i;
9987 int request_id = bm_find_first_request_locked(pc, pevent);
9988 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9989 }
9990
9991#if 0
9993 if (status != BM_SUCCESS) {
9994 printf("bm_send_event: corrupted 333!\n");
9995 abort();
9996 }
9997#endif
9998
9999 /* update statistics */
10000 pheader->num_in_events++;
10001 pbuf->count_sent += 1;
10002 pbuf->bytes_sent += total_size;
10003 }
10004#endif /* LOCAL_ROUTINES */
10005
10006 return BM_SUCCESS;
10007}
double count
Definition mdump.cxx:33
INT max_event_size
Definition mfed.cxx:30
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition midas.h:259
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9727 of file midas.cxx.

9728{
9729 const char* cptr = event.data();
9730 size_t clen = event.size();
9731 return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9732}
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char > > &  event,
int  timeout_msec 
)

Definition at line 9734 of file midas.cxx.

9735{
9736 int sg_n = event.size();
9737 const char* sg_ptr[sg_n];
9738 size_t sg_len[sg_n];
9739 for (int i=0; i<sg_n; i++) {
9740 sg_ptr[i] = event[i].data();
9741 sg_len[i] = event[i].size();
9742 }
9743 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9744}
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8162 of file midas.cxx.

8164{
8165 if (rpc_is_remote())
8166 return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8167
8168#ifdef LOCAL_ROUTINES
8169 {
8170 int status = 0;
8171
8172 BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8173
8174 if (!pbuf)
8175 return status;
8176
8177 /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8178
8180
8181 if (status != BM_SUCCESS)
8182 return status;
8183
8184 if (write_size < 0)
8185 write_size = 0;
8186
8187 if (write_size > 0) {
8189 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8191 }
8192 }
8193
8194 size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8195
8196 if (write_size > max_write_size) {
8198 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8200 }
8201
8202 pbuf->buffer_mutex.unlock();
8203
8204 /* resize read cache */
8205
8207
8208 if (status != BM_SUCCESS) {
8209 return status;
8210 }
8211
8212 if (pbuf->read_cache_size > 0) {
8213 free(pbuf->read_cache);
8214 pbuf->read_cache = NULL;
8215 }
8216
8217 if (read_size > 0) {
8218 pbuf->read_cache = (char *) malloc(read_size);
8219 if (pbuf->read_cache == NULL) {
8220 pbuf->read_cache_size = 0;
8221 pbuf->read_cache_rp = 0;
8222 pbuf->read_cache_wp = 0;
8223 pbuf->read_cache_mutex.unlock();
8224 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8225 return BM_NO_MEMORY;
8226 }
8227 }
8228
8229 pbuf->read_cache_size = read_size;
8230 pbuf->read_cache_rp = 0;
8231 pbuf->read_cache_wp = 0;
8232
8233 pbuf->read_cache_mutex.unlock();
8234
8235 /* resize the write cache */
8236
8238
8239 if (status != BM_SUCCESS)
8240 return status;
8241
8242 // FIXME: should flush the write cache!
8243 if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8244 cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8245 }
8246
8247 /* manage write cache */
8248 if (pbuf->write_cache_size > 0) {
8249 free(pbuf->write_cache);
8250 pbuf->write_cache = NULL;
8251 }
8252
8253 if (write_size > 0) {
8254 pbuf->write_cache = (char *) M_MALLOC(write_size);
8255 if (pbuf->write_cache == NULL) {
8256 pbuf->write_cache_size = 0;
8257 pbuf->write_cache_rp = 0;
8258 pbuf->write_cache_wp = 0;
8259 pbuf->write_cache_mutex.unlock();
8260 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8261 return BM_NO_MEMORY;
8262 }
8263 }
8264
8265 pbuf->write_cache_size = write_size;
8266 pbuf->write_cache_rp = 0;
8267 pbuf->write_cache_wp = 0;
8268
8269 pbuf->write_cache_mutex.unlock();
8270 }
8271#endif /* LOCAL_ROUTINES */
8272
8273 return BM_SUCCESS;
8274}
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition midas.cxx:7968
#define RPC_BM_SET_CACHE_SIZE
Definition mrpc.h:42
#define M_MALLOC(x)
Definition midas.h:1550
#define MIN_WRITE_CACHE_SIZE
Definition midas.h:257
#define MAX_WRITE_CACHE_SIZE_DIV
Definition midas.h:258
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10855 of file midas.cxx.

10856{
10857 /* clear read cache */
10858 if (pbuf->read_cache_size > 0) {
10859
10861
10862 if (status != BM_SUCCESS)
10863 return status;
10864
10865 pbuf->read_cache_rp = 0;
10866 pbuf->read_cache_wp = 0;
10867
10868 pbuf->read_cache_mutex.unlock();
10869 }
10870
10872
10873 if (!pbuf_guard.is_locked())
10874 return pbuf_guard.get_status();
10875
10876 BUFFER_HEADER *pheader = pbuf->buffer_header;
10877
10878 /* forward read pointer to global write pointer */
10880 pclient->read_pointer = pheader->write_pointer;
10881
10882 return BM_SUCCESS;
10883}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 10896 of file midas.cxx.

10896 {
10897 if (rpc_is_remote())
10898 return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
10899
10900#ifdef LOCAL_ROUTINES
10901 {
10902 int status = 0;
10903
10904 BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
10905
10906 if (!pbuf)
10907 return status;
10908
10909 return bm_skip_event(pbuf);
10910 }
10911#endif
10912
10913 return BM_SUCCESS;
10914}
#define RPC_BM_SKIP_EVENT
Definition mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6139 of file midas.cxx.

6139 {
6140 int pid = ss_getpid();
6141
6142 std::vector<BUFFER*> mybuffers;
6143
6144 gBuffersMutex.lock();
6146 gBuffersMutex.unlock();
6147
6148 for (BUFFER* pbuf : mybuffers) {
6149 if (!pbuf)
6150 continue;
6151 if (pbuf->attached) {
6152
6154
6155 if (!pbuf_guard.is_locked())
6156 continue;
6157
6158 BUFFER_HEADER *pheader = pbuf->buffer_header;
6159 for (int j = 0; j < pheader->max_client_index; j++) {
6160 BUFFER_CLIENT *pclient = pheader->client + j;
6161 if (pclient->pid == pid) {
6163 }
6164 }
6165 }
6166 }
6167}
DWORD last_activity
Definition midas.h:950
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8742 of file midas.cxx.

8742 {
8743 assert(caller_name);
8744
8745 /* calculate global read pointer as "minimum" of client read pointers */
8746 int min_rp = pheader->write_pointer;
8747
8748 int i;
8749 for (i = 0; i < pheader->max_client_index; i++) {
8750 BUFFER_CLIENT *pc = pheader->client + i;
8751 if (pc->pid) {
8753
8754#if 0
8755 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8756 pheader->name,
8757 pheader->read_pointer,
8758 pheader->write_pointer,
8759 pheader->size,
8760 min_rp,
8761 pc->name,
8762 pc->read_pointer);
8763#endif
8764
8765 if (pheader->read_pointer <= pheader->write_pointer) {
8766 // normal pointers
8767 if (pc->read_pointer < min_rp)
8768 min_rp = pc->read_pointer;
8769 } else {
8770 // inverted pointers
8771 if (pc->read_pointer <= pheader->write_pointer) {
8772 // clients 3 and 4
8773 if (pc->read_pointer < min_rp)
8774 min_rp = pc->read_pointer;
8775 } else {
8776 // clients 1 and 2
8777 int xptr = pc->read_pointer - pheader->size;
8778 if (xptr < min_rp)
8779 min_rp = xptr;
8780 }
8781 }
8782 }
8783 }
8784
8785 if (min_rp < 0)
8786 min_rp += pheader->size;
8787
8788 assert(min_rp >= 0);
8789 assert(min_rp < pheader->size);
8790
8791 if (min_rp == pheader->read_pointer) {
8792 return FALSE;
8793 }
8794
8795#if 0
8796 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8797 pheader->name,
8798 pheader->read_pointer,
8799 pheader->write_pointer,
8800 pheader->size,
8801 min_rp);
8802#endif
8803
8804 pheader->read_pointer = min_rp;
8805
8806 return TRUE;
8807}
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition midas.cxx:8644
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6329 of file midas.cxx.

6329 {
6330 const BUFFER_HEADER *pheader = pbuf->buffer_header;
6331 const char *pdata = (const char *) (pheader + 1);
6332
6333 //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6334
6335 //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6336
6337 //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6338
6339 if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6340 cm_msg(MERROR, "bm_validate_buffer",
6341 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6342 pheader->read_pointer, pheader->size, pheader->write_pointer);
6343 return BM_CORRUPTED;
6344 }
6345
6346 if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6347 cm_msg(MERROR, "bm_validate_buffer",
6348 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6349 pheader->write_pointer, pheader->size, pheader->read_pointer);
6350 return BM_CORRUPTED;
6351 }
6352
6353 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6354 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6355 pheader->read_pointer);
6356 return BM_CORRUPTED;
6357 }
6358
6359 int rp = pheader->read_pointer;
6360 int rp0 = -1;
6361 while (rp != pheader->write_pointer) {
6362 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6363 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6364 pheader->name, rp, rp0);
6365 return BM_CORRUPTED;
6366 }
6367 //bm_print_event(pdata, rp);
6368 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6369 if (rp1 < 0) {
6370 cm_msg(MERROR, "bm_validate_buffer",
6371 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6372 return BM_CORRUPTED;
6373 }
6374 rp0 = rp;
6375 rp = rp1;
6376 }
6377
6378 int i;
6379 for (i = 0; i < MAX_CLIENTS; i++) {
6380 const BUFFER_CLIENT *c = &pheader->client[i];
6381 if (c->pid == 0)
6382 continue;
6383 BOOL get_all = FALSE;
6384 int j;
6385 for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6386 const EVENT_REQUEST *r = &c->event_request[j];
6387 if (!r->valid)
6388 continue;
6390 get_all = (get_all || xget_all);
6391 //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6392 }
6393
6394 int rp = c->read_pointer;
6395 int rp0 = -1;
6396 while (rp != pheader->write_pointer) {
6397 //bm_print_event(pdata, rp);
6398 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6399 if (rp1 < 0) {
6400 cm_msg(MERROR, "bm_validate_buffer",
6401 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6402 pheader->name, c->name, c->read_pointer, rp, rp0);
6403 return BM_CORRUPTED;
6404 }
6405 rp0 = rp;
6406 rp = rp1;
6407 }
6408 }
6409
6410 return BM_SUCCESS;
6411}
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition midas.cxx:6211
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition midas.cxx:6278
INT sampling_type
Definition midas.h:931
BOOL valid
Definition midas.h:928
char c
Definition system.cxx:1312
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_index_locked()

static int bm_validate_client_index_locked ( bm_lock_buffer_guard pbuf_guard)
static

Definition at line 5944 of file midas.cxx.

5945{
5946 const BUFFER *pbuf = pbuf_guard.get_pbuf();
5947
5948 bool badindex = false;
5949 bool badclient = false;
5950
5951 int idx = pbuf->client_index;
5952
5953 if (idx < 0) {
5954 badindex = true;
5955 } else if (idx > pbuf->buffer_header->max_client_index) {
5956 badindex = true;
5957 } else {
5958 BUFFER_CLIENT *pclient = &pbuf->buffer_header->client[idx];
5959 if (pclient->name[0] == 0)
5960 badclient = true;
5961 else if (pclient->pid != ss_getpid())
5962 badclient = true;
5963
5964 //if (strcmp(pclient->name,"mdump")==0) {
5965 // for (int i=0; i<15; i++) {
5966 // printf("sleep %d\n", i);
5967 // ::sleep(1);
5968 // }
5969 //}
5970 }
5971
5972#if 0
5973 if (badindex) {
5974 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5975 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5976 badindex, ss_getpid());
5977 } else if (badclient) {
5978 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5979 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5980 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5981 ss_getpid(), badclient);
5982 } else {
5983 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5984 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5985 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5986 ss_getpid());
5987 }
5988#endif
5989
5990 if (badindex || badclient) {
5991 static int prevent_recursion = 1;
5992
5993 if (prevent_recursion) {
5995
5996 if (badindex) {
5997 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5998 } else {
5999 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
6000 }
6001
6002 cm_msg(MERROR, "bm_validate_client_index", "Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
6003 }
6004
6005 if (badindex) {
6006 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
6007 } else {
6008 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
6009 }
6010
6011 fprintf(stderr, "bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6012
6013 pbuf_guard.unlock();
6014
6015 abort();
6016 }
6017
6018 return idx;
6019}
INT client_index
Definition midas.h:989
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8644 of file midas.cxx.

8644 {
8645 assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8646 assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8647
8648 if (pheader->read_pointer <= pheader->write_pointer) {
8649
8650 if (pclient->read_pointer < pheader->read_pointer) {
8651 cm_msg(MINFO, "bm_validate_client_pointers",
8652 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8653 pclient->name,
8654 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8655
8656 pclient->read_pointer = pheader->read_pointer;
8657 }
8658
8659 if (pclient->read_pointer > pheader->write_pointer) {
8660 cm_msg(MINFO, "bm_validate_client_pointers",
8661 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8662 pclient->name,
8663 pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8664
8665 pclient->read_pointer = pheader->write_pointer;
8666 }
8667
8668 } else {
8669
8670 if (pclient->read_pointer < 0) {
8671 cm_msg(MINFO, "bm_validate_client_pointers",
8672 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8673 pclient->name,
8674 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8675
8676 pclient->read_pointer = pheader->read_pointer;
8677 }
8678
8679 if (pclient->read_pointer >= pheader->size) {
8680 cm_msg(MINFO, "bm_validate_client_pointers",
8681 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8682 pclient->name,
8683 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8684
8685 pclient->read_pointer = pheader->read_pointer;
8686 }
8687
8688 if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8689 cm_msg(MINFO, "bm_validate_client_pointers",
8690 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8691 pclient->name,
8692 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8693
8694 pclient->read_pointer = pheader->read_pointer;
8695 }
8696 }
8697}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6211 of file midas.cxx.

6211 {
6212 if (rp < 0 || rp > pheader->size) {
6213 cm_msg(MERROR, "bm_validate_rp",
6214 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6215 pheader->name,
6216 rp,
6217 pheader->read_pointer,
6218 pheader->write_pointer,
6219 pheader->size,
6220 who);
6221 return FALSE;
6222 }
6223
6224 if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6225 // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6226 cm_msg(MERROR, "bm_validate_rp",
6227 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6228 pheader->name,
6229 rp,
6230 (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6231 pheader->read_pointer,
6232 pheader->write_pointer,
6233 pheader->size,
6234 who);
6235 return FALSE;
6236 }
6237
6238 return TRUE;
6239}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9102 of file midas.cxx.

9103{
9104 // return values:
9105 // BM_SUCCESS - have "requested_space" bytes free in the buffer
9106 // BM_CORRUPTED - shared memory is corrupted
9107 // BM_NO_MEMORY - asked for more than buffer size
9108 // BM_ASYNC_RETURN - timeout waiting for free space
9109 // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9110 // SS_ABORT - we are told to shutdown (locks releases)
9111
9112 int status;
9113 BUFFER* pbuf = pbuf_guard.get_pbuf();
9114 BUFFER_HEADER *pheader = pbuf->buffer_header;
9115 char *pdata = (char *) (pheader + 1);
9116
9117 /* make sure the buffer never completely full:
9118 * read pointer and write pointer would coincide
9119 * and the code cannot tell if it means the
9120 * buffer is 100% full or 100% empty. It will explode
9121 * or lose events */
9122 requested_space += 100;
9123
9124 if (requested_space >= pheader->size)
9125 return BM_NO_MEMORY;
9126
9129
9130 //DWORD blocking_time = 0;
9131 //int blocking_loops = 0;
9132 int blocking_client_index = -1;
9134 blocking_client_name[0] = 0;
9135
9136 while (1) {
9137 while (1) {
9138 /* check if enough space in buffer */
9139
9140 int free = pheader->read_pointer - pheader->write_pointer;
9141 if (free <= 0)
9142 free += pheader->size;
9143
9144 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9145
9146 if (requested_space < free) { /* note the '<' to avoid 100% filling */
9147 //if (blocking_loops) {
9148 // DWORD wait_time = ss_millitime() - blocking_time;
9149 // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9150 //}
9151
9152 if (pbuf->wait_start_time != 0) {
9154 DWORD wait_time = now - pbuf->wait_start_time;
9155 pbuf->time_write_wait += wait_time;
9156 pbuf->wait_start_time = 0;
9157 int iclient = pbuf->wait_client_index;
9158 //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9159 if (iclient >= 0 && iclient < MAX_CLIENTS) {
9160 pbuf->client_count_write_wait[iclient] += 1;
9161 pbuf->client_time_write_wait[iclient] += wait_time;
9162 }
9163 }
9164
9165 //if (blocking_loops > 0) {
9166 // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9167 //}
9168
9169 return BM_SUCCESS;
9170 }
9171
9172 if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9173 cm_msg(MERROR, "bm_wait_for_free_space",
9174 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9175 pheader->name,
9176 pheader->read_pointer,
9177 pheader->write_pointer,
9178 pheader->size,
9179 free,
9181 return BM_CORRUPTED;
9182 }
9183
9184 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9185 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9186 int total_size = ALIGN8(event_size);
9187
9188#if 0
9189 printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9190#endif
9191
9192 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9193 cm_msg(MERROR, "bm_wait_for_free_space",
9194 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9195 pheader->name,
9196 pheader->read_pointer,
9197 pheader->write_pointer,
9198 pheader->size,
9199 free,
9201 pevent->data_size,
9202 event_size,
9203 total_size);
9204 return BM_CORRUPTED;
9205 }
9206
9207 int blocking_client = -1;
9208
9209 int i;
9210 for (i = 0; i < pheader->max_client_index; i++) {
9211 BUFFER_CLIENT *pc = pheader->client + i;
9212 if (pc->pid) {
9213 if (pc->read_pointer == pheader->read_pointer) {
9214 /*
9215 First assume that the client with the "minimum" read pointer
9216 is not really blocking due to a GET_ALL request.
9217 */
9219 //int blocking_request_id = -1;
9220
9221 int j;
9222 for (j = 0; j < pc->max_request_index; j++) {
9223 const EVENT_REQUEST *prequest = pc->event_request + j;
9224 if (prequest->valid
9225 && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9226 if (prequest->sampling_type & GET_ALL) {
9227 blocking = TRUE;
9228 //blocking_request_id = prequest->id;
9229 break;
9230 }
9231 }
9232 }
9233
9234 //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9235
9236 if (blocking) {
9238 break;
9239 }
9240
9241 pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9242 }
9243 }
9244 } /* client loop */
9245
9246 if (blocking_client >= 0) {
9249 //if (!blocking_time) {
9250 // blocking_time = ss_millitime();
9251 //}
9252
9253 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9254
9255 // from this "break" we go into timeout check and sleep/wait.
9256 break;
9257 }
9258
9259 /* no blocking clients. move the read pointer and again check for free space */
9260
9261 BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9262
9263 if (!moved) {
9264 cm_msg(MERROR, "bm_wait_for_free_space",
9265 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9266 pheader->name,
9267 pheader->read_pointer,
9268 pheader->write_pointer,
9269 pheader->size,
9270 free,
9272 return BM_CORRUPTED;
9273 }
9274
9275 /* we freed one event, loop back to the check for free space */
9276 }
9277
9278 //blocking_loops++;
9279
9280 /* at least one client is blocking */
9281
9283 pc->write_wait = requested_space;
9284
9285 if (pbuf->wait_start_time == 0) {
9286 pbuf->wait_start_time = ss_millitime();
9287 pbuf->count_write_wait++;
9288 if (requested_space > pbuf->max_requested_space)
9289 pbuf->max_requested_space = requested_space;
9290 pbuf->wait_client_index = blocking_client_index;
9291 }
9292
9294
9295 //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9296
9297 int sleep_time_msec = 1000;
9298
9299 if (timeout_msec == BM_WAIT) {
9300 // wait forever
9301 } else if (timeout_msec == BM_NO_WAIT) {
9302 // no wait
9303 return BM_ASYNC_RETURN;
9304 } else {
9305 // check timeout
9306 if (now >= time_end) {
9307 // timeout!
9308 return BM_ASYNC_RETURN;
9309 }
9310
9312
9313 if (sleep_time_msec <= 0) {
9314 sleep_time_msec = 10;
9315 } else if (sleep_time_msec > 1000) {
9316 sleep_time_msec = 1000;
9317 }
9318 }
9319
9321
9322 /* before waiting, unlock everything in the correct order */
9323
9324 pbuf_guard.unlock();
9325
9327 pbuf->write_cache_mutex.unlock();
9328
9329 //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9330
9331#ifdef DEBUG_MSG
9332 cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9333#endif
9334
9336 //int idx = bm_validate_client_index_locked(pbuf, FALSE);
9337 //if (idx >= 0)
9338 // pheader->client[idx].write_wait = requested_space;
9339
9340 //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9341
9343
9344 /* we are told to shutdown */
9345 if (status == SS_ABORT) {
9346 // NB: buffer is locked!
9347 return SS_ABORT;
9348 }
9349
9350 /* make sure we do sleep in this loop:
9351 * if we are the mserver receiving data on the event
9352 * socket and the data buffer is full, ss_suspend() will
9353 * never sleep: it will detect data on the event channel,
9354 * call rpc_server_receive() (recursively, we already *are* in
9355 * rpc_server_receive()) and return without sleeping. Result
9356 * is a busy loop waiting for free space in data buffer */
9357
9358 /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9359 * the event socket, and should sleep now, so this sleep below
9360 * maybe is not needed now. but for safety, I keep it. K.O. */
9361
9362 if (status != SS_TIMEOUT) {
9363 //printf("ss_suspend: status %d\n", status);
9364 ss_sleep(1);
9365 }
9366
9367 /* we may be stuck in this loop for an arbitrary long time,
9368 * depending on how other buffer clients read the accumulated data
9369 * so we should update all the timeouts & etc. K.O. */
9370
9372
9373 /* lock things again in the correct order */
9374
9375 if (unlock_write_cache) {
9377
9378 if (status != BM_SUCCESS) {
9379 // bail out with all locks released
9380 return status;
9381 }
9382 }
9383
9384 if (!pbuf_guard.relock()) {
9385 if (unlock_write_cache) {
9386 pbuf->write_cache_mutex.unlock();
9387 }
9388
9389 // bail out with all locks released
9390 return pbuf_guard.get_status();
9391 }
9392
9393 /* revalidate the client index: we could have been removed from the buffer while sleeping */
9395
9396 pc->write_wait = 0;
9397
9399 //idx = bm_validate_client_index_locked(pbuf, FALSE);
9400 //if (idx >= 0)
9401 // pheader->client[idx].write_wait = 0;
9402 //else {
9403 // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9404 // status = SS_ABORT;
9405 //}
9406
9407#ifdef DEBUG_MSG
9408 cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9409#endif
9410
9411 }
9412}
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition midas.cxx:8742
INT cm_periodic_tasks()
Definition midas.cxx:5601
#define SS_TIMEOUT
Definition midas.h:674
#define MDEBUG
Definition midas.h:561
#define MSG_BM
Definition msystem.h:302
INT ss_suspend(INT millisec, INT msg)
Definition system.cxx:4615
INT ss_sleep(INT millisec)
Definition system.cxx:3700
char name[NAME_LENGTH]
Definition midas.h:935
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9414 of file midas.cxx.

9415{
9416 BUFFER* pbuf = pbuf_guard.get_pbuf();
9417 BUFFER_HEADER* pheader = pbuf->buffer_header;
9418
9419 //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9420
9421 if (pc->read_pointer != pheader->write_pointer) {
9422 // buffer has data
9423 return BM_SUCCESS;
9424 }
9425
9426 if (timeout_msec == BM_NO_WAIT) {
9427 /* event buffer is empty and we are told to not wait */
9428 if (!pc->read_wait) {
9429 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9430 pc->read_wait = TRUE;
9431 }
9432 return BM_ASYNC_RETURN;
9433 }
9434
9437 DWORD sleep_time = 1000;
9438 if (timeout_msec == BM_NO_WAIT) {
9439 // default sleep time
9440 } else if (timeout_msec == BM_WAIT) {
9441 // default sleep time
9442 } else {
9445 }
9446
9447 //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9448
9449 while (pc->read_pointer == pheader->write_pointer) {
9450 /* wait until there is data in the buffer (write pointer moves) */
9451
9452 if (!pc->read_wait) {
9453 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9454 pc->read_wait = TRUE;
9455 }
9456
9457 pc->last_activity = ss_millitime();
9458
9460
9461 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9462
9463 pbuf_guard.unlock();
9464
9466 pbuf->read_cache_mutex.unlock();
9467
9469
9470 if (timeout_msec == BM_NO_WAIT) {
9471 // return immediately
9472 } else if (timeout_msec == BM_WAIT) {
9473 // wait forever
9474 } else {
9476 //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9477 if (now >= time_wait) {
9478 timeout_msec = BM_NO_WAIT; // cause immediate return
9479 } else {
9481 if (sleep_time > 1000)
9482 sleep_time = 1000;
9483 //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9484 }
9485 }
9486
9487 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9488
9489 if (unlock_read_cache) {
9491 if (status != BM_SUCCESS) {
9492 // bail out with all locks released
9493 return status;
9494 }
9495 }
9496
9497 if (!pbuf_guard.relock()) {
9498 if (unlock_read_cache) {
9499 pbuf->read_cache_mutex.unlock();
9500 }
9501 // bail out with all locks released
9502 return pbuf_guard.get_status();
9503 }
9504
9505 /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9506 * because we may have been removed from the buffer by bm_cleanup() & co
9507 * due to a timeout or whatever. */
9509
9510 /* return if TCP connection broken */
9511 if (status == SS_ABORT)
9512 return SS_ABORT;
9513
9514 if (timeout_msec == BM_NO_WAIT)
9515 return BM_ASYNC_RETURN;
9516 }
9517
9518 if (pc->read_wait) {
9519 //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9520 pc->read_wait = FALSE;
9521 }
9522
9523 return BM_SUCCESS;
9524}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8809 of file midas.cxx.

8809 {
8810 int i;
8811 int have_get_all_requests = 0;
8812
8813 for (i = 0; i < pc->max_request_index; i++)
8814 if (pc->event_request[i].valid)
8815 have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8816
8817 /* only GET_ALL requests actually free space in the event buffer */
8819 return;
8820
8821 /*
8822 If read pointer has been changed, it may have freed up some space
8823 for waiting producers. So check if free space is now more than 50%
8824 of the buffer size and wake waiting producers.
8825 */
8826
8827 int free_space = pc->read_pointer - pheader->write_pointer;
8828 if (free_space <= 0)
8829 free_space += pheader->size;
8830
8831 if (free_space >= pheader->size * 0.5) {
8832 for (i = 0; i < pheader->max_client_index; i++) {
8833 const BUFFER_CLIENT *pc = pheader->client + i;
8834 if (pc->pid && pc->write_wait) {
8835 BOOL send_wakeup = (pc->write_wait < free_space);
8836 //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8837 if (send_wakeup) {
8838 ss_resume(pc->port, "B ");
8839 }
8840 }
8841 }
8842 }
8843}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6608 of file midas.cxx.

6609{
6610 //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6611
6613
6614 if (!pbuf_guard.is_locked())
6615 return;
6616
6617 if (!force) {
6618 if (pbuf->count_lock == pbuf->last_count_lock) {
6619 return;
6620 }
6621 }
6622
6623 std::string buffer_name = pbuf->buffer_name;
6624 std::string client_name = pbuf->client_name;
6625
6626 if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6627 // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6628 pbuf_guard.unlock(); // unlock before cm_msg()
6629 cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6630 return;
6631 }
6632
6633 pbuf->last_count_lock = pbuf->count_lock;
6634
6636 BUFFER_HEADER xheader = *pbuf->buffer_header;
6637 int client_index = pbuf->client_index;
6638
6639 pbuf_guard.unlock();
6640
6641 bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6642}
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition midas.cxx:6486
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char buffer_name,
const char client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6486 of file midas.cxx.

6487{
6488 int status;
6489
6491
6492 HNDLE hKey;
6493 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6494 if (status != DB_SUCCESS) {
6495 db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6496 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6497 if (status != DB_SUCCESS)
6498 return;
6499 }
6500
6503 if (status != DB_SUCCESS) {
6506 if (status != DB_SUCCESS)
6507 return;
6508 }
6509
6510 double buf_size = pheader->size;
6511 double buf_rptr = pheader->read_pointer;
6512 double buf_wptr = pheader->write_pointer;
6513
6514 double buf_fill = 0;
6515 double buf_cptr = 0;
6516 double buf_cused = 0;
6517 double buf_cused_pct = 0;
6518
6519 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6520 buf_cptr = pheader->client[client_index].read_pointer;
6521
6522 if (buf_wptr == buf_cptr) {
6523 buf_cused = 0;
6524 } else if (buf_wptr > buf_cptr) {
6526 } else {
6527 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6528 }
6529
6530 buf_cused_pct = buf_cused / buf_size * 100.0;
6531
6532 // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6533 // because some other GET_ALL client may have different buf_cused & etc,
6534 // so they must be written into the per-client statistics
6535 // and the web page should look at all the GET_ALL clients and used
6536 // the biggest buf_cused as the whole-buffer "bytes used" value.
6537 }
6538
6539 if (buf_wptr == buf_rptr) {
6540 buf_fill = 0;
6541 } else if (buf_wptr > buf_rptr) {
6543 } else {
6544 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6545 }
6546
6547 double buf_fill_pct = buf_fill / buf_size * 100.0;
6548
6549 db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6550 db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6551 db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6552 db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6553 db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6554
6555 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6556 if (status != DB_SUCCESS) {
6557 db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6558 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6559 if (status != DB_SUCCESS)
6560 return;
6561 }
6562
6564 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6565 if (status != DB_SUCCESS) {
6566 db_create_key(hDB, hKey, client_name, TID_KEY);
6567 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6568 if (status != DB_SUCCESS)
6569 return;
6570 }
6571
6572 db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6573 db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6574 db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6575 db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6576 db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6577 db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6578 db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6579 db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6580 db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6581 db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6582 db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6583 db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6584
6585 for (int i = 0; i < MAX_CLIENTS; i++) {
6586 if (!pbuf->client_count_write_wait[i])
6587 continue;
6588
6589 if (pheader->client[i].pid == 0)
6590 continue;
6591
6592 if (pheader->client[i].name[0] == 0)
6593 continue;
6594
6595 char str[100 + NAME_LENGTH];
6596
6597 sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6598 db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6599
6600 sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6601 db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6602 }
6603
6604 db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6605 db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6606}
#define TID_DOUBLE
Definition midas.h:343
#define TID_KEY
Definition midas.h:349
#define TID_BOOL
Definition midas.h:340
#define TID_INT32
Definition midas.h:339
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition odb.cxx:3313
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition odb.cxx:5266
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7302 of file midas.cxx.

7302 {
7303#ifdef LOCAL_ROUTINES
7304 {
7305 int status;
7306 HNDLE hDB;
7307
7309
7310 if (status != CM_SUCCESS) {
7311 //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7312 return BM_SUCCESS;
7313 }
7314
7315 std::vector<BUFFER*> mybuffers;
7316
7317 gBuffersMutex.lock();
7319 gBuffersMutex.unlock();
7320
7321 for (BUFFER* pbuf : mybuffers) {
7322 if (!pbuf || !pbuf->attached)
7323 continue;
7325 }
7326 }
7327#endif /* LOCAL_ROUTINES */
7328
7329 return BM_SUCCESS;
7330}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9526 of file midas.cxx.

9527{
9528 char *pdata = (char *) (pheader + 1);
9529
9530 //int old_write_pointer = pheader->write_pointer;
9531
9532 /* new event fits into the remaining space? */
9533 if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9534 //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9535 char* wptr = pdata + pheader->write_pointer;
9536 for (int i=0; i<sg_n; i++) {
9537 //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9538 memcpy(wptr, sg_ptr[i], sg_len[i]);
9539 wptr += sg_len[i];
9540 }
9541 pheader->write_pointer = pheader->write_pointer + total_size;
9542 assert(pheader->write_pointer <= pheader->size);
9543 /* remaining space is smaller than size of an event header? */
9544 if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9545 // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9546 // equal to the event header size, we will write the next event header here,
9547 // then wrap the pointer and write the event data at the beginning of the buffer.
9548 //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9549 pheader->write_pointer = 0;
9550 }
9551 } else {
9552 /* split event */
9553 size_t size = pheader->size - pheader->write_pointer;
9554
9555 //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9556
9557 //memcpy(pdata + pheader->write_pointer, pevent, size);
9558 //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9559
9560 char* wptr = pdata + pheader->write_pointer;
9561 size_t count = 0;
9562
9563 // copy first part
9564
9565 int i = 0;
9566 for (; i<sg_n; i++) {
9567 if (count + sg_len[i] > size)
9568 break;
9569 memcpy(wptr, sg_ptr[i], sg_len[i]);
9570 wptr += sg_len[i];
9571 count += sg_len[i];
9572 }
9573
9574 //printf("wptr %d, count %d\n", wptr-pdata, count);
9575
9576 // split segment
9577
9578 size_t first = size - count;
9579 size_t second = sg_len[i] - first;
9580 assert(first + second == sg_len[i]);
9581 assert(count + first == size);
9582
9583 //printf("first %d, second %d\n", first, second);
9584
9585 memcpy(wptr, sg_ptr[i], first);
9586 wptr = pdata + 0;
9587 count += first;
9589 wptr += second;
9590 count += second;
9591 i++;
9592
9593 // copy remaining
9594
9595 for (; i<sg_n; i++) {
9596 memcpy(wptr, sg_ptr[i], sg_len[i]);
9597 wptr += sg_len[i];
9598 count += sg_len[i];
9599 }
9600
9601 //printf("wptr %d, count %d\n", wptr-pdata, count);
9602
9603 //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9604
9605 pheader->write_pointer = total_size - size;
9606 }
9607
9608 //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9609}
Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5941 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5936 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5942 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11303 of file midas.cxx.