MIDAS
Loading...
Searching...
No Matches
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

static int bm_validate_client_index_locked (bm_lock_buffer_guard &pbuf_guard)
 
INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char > > &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11280 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8322 of file midas.cxx.

8366{
8367 if (rpc_is_remote())
8368 return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8369 trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8370
8371#ifdef LOCAL_ROUTINES
8372 {
8373 int status = 0;
8374
8375 BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8376
8377 if (!pbuf)
8378 return status;
8379
8380 /* lock buffer */
8382
8383 if (!pbuf_guard.is_locked())
8384 return pbuf_guard.get_status();
8385
8386 /* avoid callback/non callback requests */
8387 if (func == NULL && pbuf->callback) {
8388 pbuf_guard.unlock(); // unlock before cm_msg()
8389 cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8390 return BM_INVALID_MIXING;
8391 }
8392
8393 /* do not allow GET_RECENT with nonzero cache size */
8394 if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8395 pbuf_guard.unlock(); // unlock before cm_msg()
8396 cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8397 return BM_INVALID_PARAM;
8398 }
8399
8400 /* get a pointer to the proper client structure */
8402
8403 /* look for a empty request entry */
8404 int i;
8405 for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8406 if (!pclient->event_request[i].valid)
8407 break;
8408
8409 if (i == MAX_EVENT_REQUESTS) {
8410 // implicit unlock
8411 return BM_NO_MEMORY;
8412 }
8413
8414 /* setup event_request structure */
8415 pclient->event_request[i].id = request_id;
8416 pclient->event_request[i].valid = TRUE;
8417 pclient->event_request[i].event_id = event_id;
8418 pclient->event_request[i].trigger_mask = trigger_mask;
8419 pclient->event_request[i].sampling_type = sampling_type;
8420
8421 pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8422
8423 pbuf->get_all_flag = pclient->all_flag;
8424
8425 /* set callback flag in buffer structure */
8426 if (func != NULL)
8427 pbuf->callback = TRUE;
8428
8429 /*
8430 Save the index of the last request in the list so that later only the
8431 requests 0..max_request_index-1 have to be searched through.
8432 */
8433
8434 if (i + 1 > pclient->max_request_index)
8435 pclient->max_request_index = i + 1;
8436 }
8437#endif /* LOCAL_ROUTINES */
8438
8439 return BM_SUCCESS;
8440}
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
Definition midas.cxx:6007
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition midas.cxx:6630
#define BM_INVALID_PARAM
Definition midas.h:619
#define BM_NO_MEMORY
Definition midas.h:607
#define BM_INVALID_MIXING
Definition midas.h:621
#define BM_SUCCESS
Definition midas.h:605
#define GET_ALL
Definition midas.h:321
#define GET_RECENT
Definition midas.h:323
#define MERROR
Definition midas.h:559
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
#define RPC_BM_ADD_EVENT_REQUEST
Definition mrpc.h:43
bool rpc_is_remote(void)
Definition midas.cxx:12769
INT rpc_call(DWORD routine_id,...)
Definition midas.cxx:13671
INT i
Definition mdump.cxx:32
int INT
Definition midas.h:129
#define TRUE
Definition midas.h:182
#define MAX_EVENT_REQUESTS
Definition midas.h:275
#define POINTER_T
Definition midas.h:166
#define trigger_mask
#define event_id
DWORD status
Definition odbhist.cxx:39
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 10962 of file midas.cxx.

10962 {
10963#ifdef LOCAL_ROUTINES
10964 {
10965 INT status = 0;
10966 BOOL bMore;
10967 DWORD start_time;
10968 //static DWORD last_time = 0;
10969
10970 /* if running as a server, buffer checking is done by client
10971 via ASYNC bm_receive_event */
10972 if (rpc_is_mserver()) {
10973 return FALSE;
10974 }
10975
10976 bMore = FALSE;
10977 start_time = ss_millitime();
10978
10979 std::vector<BUFFER*> mybuffers;
10980
10981 gBuffersMutex.lock();
10983 gBuffersMutex.unlock();
10984
10985 /* go through all buffers */
10986 for (size_t idx = 0; idx < mybuffers.size(); idx++) {
10988
10989 if (!pbuf || !pbuf->attached)
10990 continue;
10991
10992 //int count_loops = 0;
10993 while (1) {
10994 if (pbuf->attached) {
10995 /* one bm_push_event could cause a run stop and a buffer close, which
10996 * would crash the next call to bm_push_event(). So check for valid
10997 * buffer on each call */
10998
10999 /* this is what happens:
11000 * bm_push_buffer() may call a user callback function
11001 * user callback function may indirectly call bm_close() of this buffer,
11002 * i.e. if it stops the run,
11003 * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
11004 * here we will see pbuf->attched is false and quit this loop
11005 */
11006
11007 status = bm_push_buffer(pbuf, idx + 1);
11008
11009 if (status == BM_CORRUPTED) {
11010 return status;
11011 }
11012
11013 //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
11014
11015 if (status != BM_MORE_EVENTS) {
11016 //DWORD t = ss_millitime() - start_time;
11017 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
11018 break;
11019 }
11020
11021 // count_loops++;
11022 }
11023
11024 // NB: this code has a logic error: if 2 buffers always have data,
11025 // this timeout will cause us to exit reading the 1st buffer
11026 // after 1000 msec, then we read the 2nd buffer exactly once,
11027 // and exit the loop because the timeout is still active -
11028 // we did not reset "start_time" when we started reading
11029 // from the 2nd buffer. Result is that we always read all
11030 // the data in a loop from the 1st buffer, but read just
11031 // one event from the 2nd buffer, resulting in severe unfairness.
11032
11033 /* stop after one second */
11034 DWORD t = ss_millitime() - start_time;
11035 if (t > 1000) {
11036 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
11037 bMore = TRUE;
11038 break;
11039 }
11040 }
11041 }
11042
11043 //last_time = start_time;
11044
11045 return bMore;
11046
11047 }
11048#else /* LOCAL_ROUTINES */
11049
11050 return FALSE;
11051
11052#endif
11053}
#define FALSE
Definition cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition midas.cxx:10910
#define BM_MORE_EVENTS
Definition midas.h:620
#define BM_CORRUPTED
Definition midas.h:623
unsigned int DWORD
Definition mcstd.h:51
DWORD ss_millitime()
Definition system.cxx:3393
bool rpc_is_mserver(void)
Definition midas.cxx:12826
static std::mutex gBuffersMutex
Definition midas.cxx:195
static std::vector< BUFFER * > gBuffers
Definition midas.cxx:196
DWORD BOOL
Definition midas.h:105
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8974 of file midas.cxx.

8974 {
8975
8977 int i;
8978 for (i = 0; i < pc->max_request_index; i++) {
8979 const EVENT_REQUEST *prequest = pc->event_request + i;
8980 if (prequest->valid) {
8981 if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8982 /* check if this is a recent event */
8983 if (prequest->sampling_type == GET_RECENT) {
8984 if (ss_time() - pevent->time_stamp > 1) {
8985 /* skip that event */
8986 continue;
8987 }
8988 }
8989
8991 break;
8992 }
8993 }
8994 }
8995 return is_requested;
8996}
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition midas.cxx:6023
DWORD ss_time()
Definition system.cxx:3462
DWORD time_stamp
Definition midas.h:855
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6074 of file midas.cxx.

6074 {
6075 BUFFER_HEADER *pheader;
6077 int j;
6078
6079 pheader = pbuf->buffer_header;
6080 pbclient = pheader->client;
6081
6082 /* now check other clients */
6083 for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6084 if (pbclient->pid) {
6085 if (!ss_pid_exists(pbclient->pid)) {
6086 cm_msg(MINFO, "bm_cleanup",
6087 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6088 pheader->name, who, pbclient->pid);
6089
6090 bm_remove_client_locked(pheader, j);
6091 continue;
6092 }
6093 }
6094
6095 /* If client process has no activity, clear its buffer entry. */
6096 if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6097 DWORD tdiff = actual_time - pbclient->last_activity;
6098#if 0
6099 printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6100 pheader->name,
6101 pbclient->name,
6102 pbclient->last_activity,
6104 tdiff,
6105 tdiff,
6106 pbclient->watchdog_timeout);
6107#endif
6108 if (actual_time > pbclient->last_activity &&
6109 tdiff > pbclient->watchdog_timeout) {
6110
6111 cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6112 pbclient->name, pheader->name, who,
6113 tdiff / 1000.0,
6114 pbclient->watchdog_timeout / 1000.0);
6115
6116 bm_remove_client_locked(pheader, j);
6117 }
6118 }
6119 }
6120}
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition midas.cxx:6043
#define MINFO
Definition midas.h:560
BOOL ss_pid_exists(int pid)
Definition system.cxx:1440
DWORD actual_time
Definition mfe.cxx:37
INT j
Definition odbhist.cxx:40
char name[NAME_LENGTH]
Definition midas.h:958
INT max_client_index
Definition midas.h:960
BUFFER_CLIENT client[MAX_CLIENTS]
Definition midas.h:967
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6416 of file midas.cxx.

6416 {
6417 HNDLE hKey;
6418 int status;
6419
6420 char str[256 + 2 * NAME_LENGTH];
6421 sprintf(str, "/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6422 //printf("delete [%s]\n", str);
6423 status = db_find_key(hDB, 0, str, &hKey);
6424 if (status == DB_SUCCESS) {
6426 }
6427}
#define DB_SUCCESS
Definition midas.h:631
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
Definition odb.cxx:3856
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4079
HNDLE hKey
HNDLE hDB
main ODB handle
Definition mana.cxx:207
INT HNDLE
Definition midas.h:132
#define NAME_LENGTH
Definition midas.h:272
char str[256]
Definition odbhist.cxx:33
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7251 of file midas.cxx.

7251 {
7252 if (rpc_is_remote())
7254
7255#ifdef LOCAL_ROUTINES
7256 {
7258
7259 gBuffersMutex.lock();
7260 size_t nbuf = gBuffers.size();
7261 gBuffersMutex.unlock();
7262
7263 for (size_t i = nbuf; i > 0; i--) {
7265 }
7266
7267 gBuffersMutex.lock();
7268 for (size_t i=0; i< gBuffers.size(); i++) {
7269 BUFFER* pbuf = gBuffers[i];
7270 if (!pbuf)
7271 continue;
7272 delete pbuf;
7273 pbuf = NULL;
7274 gBuffers[i] = NULL;
7275 }
7276 gBuffersMutex.unlock();
7277 }
7278#endif /* LOCAL_ROUTINES */
7279
7280 return BM_SUCCESS;
7281}
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7104
int cm_msg_close_buffer(void)
Definition midas.cxx:487
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7104 of file midas.cxx.

7104 {
7105 //printf("bm_close_buffer: handle %d\n", buffer_handle);
7106
7107 if (rpc_is_remote())
7108 return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7109
7110#ifdef LOCAL_ROUTINES
7111 {
7112 int status = 0;
7113
7114 BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7115
7116 if (!pbuf)
7117 return status;
7118
7119 //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7120
7121 int i;
7122
7123 { /* delete all requests for this buffer */
7124 _request_list_mutex.lock();
7125 std::vector<EventRequest> request_list_copy = _request_list;
7126 _request_list_mutex.unlock();
7127 for (size_t i = 0; i < request_list_copy.size(); i++) {
7128 if (request_list_copy[i].buffer_handle == buffer_handle) {
7130 }
7131 }
7132 }
7133
7134 HNDLE hDB;
7136
7137 if (hDB) {
7138 /* write statistics to odb */
7140 }
7141
7142 /* lock buffer in correct order */
7143
7145
7146 if (status != BM_SUCCESS) {
7147 return status;
7148 }
7149
7151
7152 if (status != BM_SUCCESS) {
7153 pbuf->read_cache_mutex.unlock();
7154 return status;
7155 }
7156
7158
7159 if (!pbuf_guard.is_locked()) {
7160 pbuf->write_cache_mutex.unlock();
7161 pbuf->read_cache_mutex.unlock();
7162 return pbuf_guard.get_status();
7163 }
7164
7165 BUFFER_HEADER *pheader = pbuf->buffer_header;
7166
7167 /* mark entry in _buffer as empty */
7168 pbuf->attached = false;
7169
7171
7172 if (pclient) {
7173 /* clear entry from client structure in buffer header */
7174 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7175 }
7176
7177 /* calculate new max_client_index entry */
7178 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7179 if (pheader->client[i].pid != 0)
7180 break;
7181 pheader->max_client_index = i + 1;
7182
7183 /* count new number of clients */
7184 int j = 0;
7185 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7186 if (pheader->client[i].pid != 0)
7187 j++;
7188 pheader->num_clients = j;
7189
7190 int destroy_flag = (pheader->num_clients == 0);
7191
7192 // we hold the locks on the read cache and the write cache.
7193
7194 /* free cache */
7195 if (pbuf->read_cache_size > 0) {
7196 free(pbuf->read_cache);
7197 pbuf->read_cache = NULL;
7198 pbuf->read_cache_size = 0;
7199 pbuf->read_cache_rp = 0;
7200 pbuf->read_cache_wp = 0;
7201 }
7202
7203 if (pbuf->write_cache_size > 0) {
7204 free(pbuf->write_cache);
7205 pbuf->write_cache = NULL;
7206 pbuf->write_cache_size = 0;
7207 pbuf->write_cache_rp = 0;
7208 pbuf->write_cache_wp = 0;
7209 }
7210
7211 /* check if anyone is waiting and wake him up */
7212
7213 for (int i = 0; i < pheader->max_client_index; i++) {
7214 BUFFER_CLIENT *pclient = pheader->client + i;
7215 if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7216 ss_resume(pclient->port, "B ");
7217 }
7218
7219 /* unmap shared memory, delete it if we are the last */
7220
7221 ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7222
7223 /* after ss_shm_close() these are invalid: */
7224
7225 pheader = NULL;
7226 pbuf->buffer_header = NULL;
7227 pbuf->shm_size = 0;
7228 pbuf->shm_handle = 0;
7229
7230 /* unlock buffer in correct order */
7231
7232 pbuf_guard.unlock();
7233
7234 pbuf->write_cache_mutex.unlock();
7235 pbuf->read_cache_mutex.unlock();
7236
7237 /* delete semaphore */
7238
7240 }
7241#endif /* LOCAL_ROUTINES */
7242
7243 return BM_SUCCESS;
7244}
INT bm_delete_request(INT request_id)
Definition midas.cxx:8592
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition midas.cxx:6594
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3011
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition midas.cxx:7912
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition midas.cxx:7933
INT ss_resume(INT port, const char *message)
Definition system.cxx:4844
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition system.cxx:2869
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition system.cxx:755
#define RPC_BM_CLOSE_BUFFER
Definition mrpc.h:37
static std::vector< EventRequest > _request_list
Definition midas.cxx:220
static std::mutex _request_list_mutex
Definition midas.cxx:219
#define MAX_CLIENTS
Definition midas.h:274
INT num_clients
Definition midas.h:959
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition midas.cxx:8289
#define serial_number
#define time_stamp
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8289 of file midas.cxx.

8290{
8291 event_header->event_id = event_id;
8292 event_header->trigger_mask = trigger_mask;
8293 event_header->data_size = data_size;
8294 event_header->time_stamp = ss_time();
8295 event_header->serial_number = serial;
8296
8297 return BM_SUCCESS;
8298}
INT serial
Definition minife.c:20
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8300 of file midas.cxx.

8301{
8302 static std::mutex mutex;
8303
8304 event_header->event_id = event_id;
8305 event_header->trigger_mask = trigger_mask;
8306 event_header->data_size = data_size;
8307 event_header->time_stamp = ss_time();
8308 {
8309 std::lock_guard<std::mutex> lock(mutex);
8310 event_header->serial_number = *serial;
8311 *serial = *serial + 1;
8312 // implicit unlock
8313 }
8314
8315 return BM_SUCCESS;
8316}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9077 of file midas.cxx.

9077 {
9078 /* now convert event header */
9079 if (convert_flags) {
9080 rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9081 rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9082 rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9083 rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9084 rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9085 }
9086}
#define TID_UINT32
Definition midas.h:337
#define TID_INT16
Definition midas.h:335
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition midas.cxx:11689
#define RPC_OUTGOING
Definition midas.h:1583
short int event_id
Definition midas.h:852
DWORD data_size
Definition midas.h:856
DWORD serial_number
Definition midas.h:854
short int trigger_mask
Definition midas.h:853
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8592 of file midas.cxx.

8593{
8594 _request_list_mutex.lock();
8595
8596 if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8597 _request_list_mutex.unlock();
8598 return BM_INVALID_HANDLE;
8599 }
8600
8601 int buffer_handle = _request_list[request_id].buffer_handle;
8602
8603 _request_list[request_id].clear();
8604
8605 _request_list_mutex.unlock();
8606
8607 /* remove request entry from buffer */
8608 return bm_remove_event_request(buffer_handle, request_id);
8609}
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition midas.cxx:8526
#define BM_INVALID_HANDLE
Definition midas.h:609
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8831 of file midas.cxx.

8832{
8833 _request_list_mutex.lock();
8834 bool locked = true;
8835 size_t n = _request_list.size();
8836 /* call dispatcher */
8837 for (size_t i = 0; i < n; i++) {
8838 if (!locked) {
8839 _request_list_mutex.lock();
8840 locked = true;
8841 }
8843 if (r.buffer_handle != buffer_handle)
8844 continue;
8845 if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8846 continue;
8847 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8848 _request_list_mutex.unlock();
8849 locked = false;
8850 /* if event is fragmented, call defragmenter */
8851 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8852 bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8853 } else {
8854 r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8855 }
8856 }
8857 if (locked)
8858 _request_list_mutex.unlock();
8859}
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition midas.cxx:11292
DWORD n[4]
Definition mana.cxx:247
#define EVENTID_FRAG
Definition midas.h:907
#define EVENTID_FRAG1
Definition midas.h:906
short int event_id
Definition midas.cxx:206
INT buffer_handle
Definition midas.cxx:205
short int trigger_mask
Definition midas.cxx:207
EVENT_HANDLER * dispatcher
Definition midas.cxx:208
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11248 of file midas.cxx.

11248 {
11249 if (rpc_is_remote())
11251
11252#ifdef LOCAL_ROUTINES
11253 {
11254 std::vector<BUFFER*> mybuffers;
11255
11256 gBuffersMutex.lock();
11258 gBuffersMutex.unlock();
11259
11260 /* go through all buffers */
11261 for (BUFFER* pbuf : mybuffers) {
11262 if (!pbuf)
11263 continue;
11264 if (!pbuf->attached)
11265 continue;
11266
11267 int status = bm_skip_event(pbuf);
11268 if (status != BM_SUCCESS)
11269 return status;
11270 }
11271 }
11272#endif /* LOCAL_ROUTINES */
11273
11274 return BM_SUCCESS;
11275}
static int bm_skip_event(BUFFER *pbuf)
Definition midas.cxx:10841
#define RPC_BM_EMPTY_BUFFERS
Definition mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 9000 of file midas.cxx.

9001{
9002 BUFFER* pbuf = pbuf_guard.get_pbuf();
9003 BUFFER_HEADER* pheader = pbuf->buffer_header;
9006
9007 //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
9008
9009 /* loop over all events in the buffer */
9010
9011 while (1) {
9012 EVENT_HEADER *pevent = NULL;
9013 int event_size = 3; // poison value
9014 int total_size = 3; // poison value
9015
9016 int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
9017 if (status == BM_CORRUPTED) {
9018 return status;
9019 } else if (status != BM_SUCCESS) {
9020 /* event buffer is empty */
9021 if (timeout_msec == BM_NO_WAIT) {
9022 if (need_wakeup)
9024 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
9025 // read cache is empty
9026 return BM_ASYNC_RETURN;
9027 }
9028 return BM_SUCCESS;
9029 }
9030
9032
9033 if (status != BM_SUCCESS) {
9034 // we only come here with SS_ABORT & co
9035 return status;
9036 }
9037
9038 // make sure we wait for new event only once
9040 // go back to bm_peek_buffer_locked
9041 continue;
9042 }
9043
9044 /* loop over all requests: if this event matches a request,
9045 * copy it to the read cache */
9046
9048
9049 if (is_requested) {
9050 if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9051 /* read cache is full */
9052 if (need_wakeup)
9054 return BM_SUCCESS;
9055 }
9056
9057 bm_read_from_buffer_locked(pheader, pc->read_pointer, pbuf->read_cache + pbuf->read_cache_wp, event_size);
9058
9059 pbuf->read_cache_wp += total_size;
9060
9061 /* update statistics */
9062 pheader->num_out_events++;
9063 pbuf->count_read++;
9064 pbuf->bytes_read += event_size;
9065 }
9066
9067 /* shift read pointer */
9068
9069 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9070 pc->read_pointer = new_read_pointer;
9071
9072 need_wakeup = TRUE;
9073 }
9074 /* NOT REACHED */
9075}
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition midas.cxx:8795
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition midas.cxx:6231
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:8974
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition midas.cxx:8944
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8899
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition midas.cxx:9400
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_NO_WAIT
Definition midas.h:366
int event_size
Definition msysmon.cxx:527
INT num_out_events
Definition midas.h:965
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9597 of file midas.cxx.

9597 {
9598 if (pc->pid) {
9599 int j;
9600 for (j = 0; j < pc->max_request_index; j++) {
9601 const EVENT_REQUEST *prequest = pc->event_request + j;
9602 if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9603 return prequest->id;
9604 }
9605 }
9606 }
9607
9608 return -1;
9609}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10215 of file midas.cxx.

10216{
10217 if (rpc_is_remote()) {
10218 return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10219 }
10220
10221#ifdef LOCAL_ROUTINES
10222 {
10223 INT status = 0;
10224
10225 //printf("bm_flush_cache!\n");
10226
10227 BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10228
10229 if (!pbuf)
10230 return status;
10231
10232 if (pbuf->write_cache_size == 0)
10233 return BM_SUCCESS;
10234
10236
10237 if (status != BM_SUCCESS)
10238 return status;
10239
10240 /* check if anything needs to be flushed */
10241 if (pbuf->write_cache_wp == 0) {
10242 pbuf->write_cache_mutex.unlock();
10243 return BM_SUCCESS;
10244 }
10245
10246 /* lock the buffer */
10248
10249 if (!pbuf_guard.is_locked())
10250 return pbuf_guard.get_status();
10251
10253
10254 /* unlock in correct order */
10255
10256 if (pbuf_guard.is_locked()) {
10257 // check if bm_wait_for_free_space() failed to relock the buffer
10258 pbuf_guard.unlock();
10259 }
10260
10261 pbuf->write_cache_mutex.unlock();
10262
10263 return status;
10264 }
10265#endif /* LOCAL_ROUTINES */
10266
10267 return BM_SUCCESS;
10268}
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition midas.cxx:9995
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:10071
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10071 of file midas.cxx.

10072{
10073 // NB we come here with write cache locked and buffer locked.
10074
10075 {
10076 INT status = 0;
10077
10078 //printf("bm_flush_cache_locked!\n");
10079
10080 BUFFER* pbuf = pbuf_guard.get_pbuf();
10081 BUFFER_HEADER* pheader = pbuf->buffer_header;
10082
10083 //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10084
10085 int old_write_pointer = pheader->write_pointer;
10086
10087 int request_id[MAX_CLIENTS];
10088 for (int i = 0; i < pheader->max_client_index; i++) {
10089 request_id[i] = -1;
10090 }
10091
10092 size_t ask_rp = pbuf->write_cache_rp;
10093 size_t ask_wp = pbuf->write_cache_wp;
10094
10095 if (ask_wp == 0) { // nothing to do
10096 return BM_SUCCESS;
10097 }
10098
10099 if (ask_rp == ask_wp) { // nothing to do
10100 return BM_SUCCESS;
10101 }
10102
10103 assert(ask_rp < ask_wp);
10104
10105 size_t ask_free = ALIGN8(ask_wp - ask_rp);
10106
10107 if (ask_free == 0) { // nothing to do
10108 return BM_SUCCESS;
10109 }
10110
10111#if 0
10113 if (status != BM_SUCCESS) {
10114 printf("bm_flush_cache: corrupted 111!\n");
10115 abort();
10116 }
10117#endif
10118
10120
10121 if (status != BM_SUCCESS) {
10122 return status;
10123 }
10124
10125 // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10126 //
10127 // wait_for_free_space() will sleep with all locks released,
10128 // during this time, another thread may call bm_send_event() that will
10129 // add one or more events to the write cache and after wait_for_free_space()
10130 // returns, size of data in cache will be bigger than the amount
10131 // of free space we requested. so we need to keep track of how
10132 // much data we write to the buffer and ask for more data
10133 // if we run short. This is the reason for the big loop
10134 // around wait_for_free_space(). We ask for slightly too little free
10135 // space to make sure all this code is always used and does work. K.O.
10136
10137 if (pbuf->write_cache_wp == 0) {
10138 /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10139 return BM_SUCCESS;
10140 }
10141
10142 //size_t written = 0;
10143 while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10144 /* loop over all events in cache */
10145
10146 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10147 size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10148 size_t total_size = ALIGN8(event_size);
10149
10150#if 0
10151 printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10152 int(pbuf->write_cache_size),
10153 int(pbuf->write_cache_wp),
10154 int(pbuf->write_cache_rp),
10155 int(pevent->data_size),
10156 int(event_size),
10157 int(total_size),
10158 int(ask_free),
10159 int(written));
10160#endif
10161
10162 // check for crazy event size
10163 assert(total_size >= sizeof(EVENT_HEADER));
10164 assert(total_size <= (size_t)pheader->size);
10165
10166 bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10167
10168 /* update statistics */
10169 pheader->num_in_events++;
10170 pbuf->count_sent += 1;
10171 pbuf->bytes_sent += total_size;
10172
10173 /* see comment for the same code in bm_send_event().
10174 * We make sure the buffer is never 100% full */
10175 assert(pheader->write_pointer != pheader->read_pointer);
10176
10177 /* check if anybody has a request for this event */
10178 for (int i = 0; i < pheader->max_client_index; i++) {
10179 BUFFER_CLIENT *pc = pheader->client + i;
10180 int r = bm_find_first_request_locked(pc, pevent);
10181 if (r >= 0) {
10182 request_id[i] = r;
10183 }
10184 }
10185
10186 /* this loop does not loop forever because rp
10187 * is monotonously incremented here. write_cache_wp does
10188 * not change */
10189
10190 pbuf->write_cache_rp += total_size;
10191 //written += total_size;
10192
10193 assert(pbuf->write_cache_rp > 0);
10194 assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10195 assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10196 }
10197
10198 /* the write cache is now empty */
10199 assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10200 pbuf->write_cache_wp = 0;
10201 pbuf->write_cache_rp = 0;
10202
10203 /* check which clients are waiting */
10204 for (int i = 0; i < pheader->max_client_index; i++) {
10205 BUFFER_CLIENT *pc = pheader->client + i;
10206 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10207 }
10208 }
10209
10210 return BM_SUCCESS;
10211}
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition midas.cxx:9611
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:9597
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition midas.cxx:6315
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition midas.cxx:9512
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition midas.cxx:9088
#define ALIGN8(x)
Definition midas.h:522
INT num_in_events
Definition midas.h:964
INT write_pointer
Definition midas.h:963
INT read_pointer
Definition midas.h:962
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 9995 of file midas.cxx.

9996{
9997 //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
9998
10001
10003
10004 while (1) {
10005 if (timeout_msec == BM_WAIT) {
10006 xtimeout_msec = 1000;
10007 } else if (timeout_msec == BM_NO_WAIT) {
10009 } else {
10010 if (xtimeout_msec > 1000) {
10011 xtimeout_msec = 1000;
10012 }
10013 }
10014
10015 int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
10016
10017 //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
10018
10019 if (status == BM_ASYNC_RETURN) {
10020 if (timeout_msec == BM_WAIT) {
10021 // BM_WAIT means wait forever
10022 continue;
10023 } else if (timeout_msec == BM_NO_WAIT) {
10024 // BM_NO_WAIT means do not wait
10025 return status;
10026 } else {
10028 if (now >= time_end) {
10029 // timeout, return BM_ASYNC_RETURN
10030 return status;
10031 }
10032
10034
10035 if (remain < (DWORD)xtimeout_msec) {
10037 }
10038
10039 // keep asking for event...
10040 continue;
10041 }
10042 } else if (status == BM_SUCCESS) {
10043 // success, return BM_SUCCESS
10044 return status;
10045 } else {
10046 // error
10047 return status;
10048 }
10049 }
10050}
#define BM_WAIT
Definition midas.h:365
#define RPC_BM_FLUSH_CACHE
Definition mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7083 of file midas.cxx.

7084{
7085 gBuffersMutex.lock();
7086 for (size_t i = 0; i < gBuffers.size(); i++) {
7087 BUFFER* pbuf = gBuffers[i];
7088 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7089 *buffer_handle = i + 1;
7090 gBuffersMutex.unlock();
7091 return BM_SUCCESS;
7092 }
7093 }
7094 gBuffersMutex.unlock();
7095 return BM_NOT_FOUND;
7096}
#define BM_NOT_FOUND
Definition midas.h:612
BOOL equal_ustring(const char *str1, const char *str2)
Definition odb.cxx:3201
char buffer_name[NAME_LENGTH]
Definition mevb.cxx:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8863 of file midas.cxx.

8863 {
8864 /* increment read cache read pointer */
8865 pbuf->read_cache_rp += total_size;
8866
8867 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8868 pbuf->read_cache_rp = 0;
8869 pbuf->read_cache_wp = 0;
8870 }
8871}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6231 of file midas.cxx.

6232{
6233#if 0
6234 if (gRpLog == NULL) {
6235 gRpLog = fopen("rp.log", "a");
6236 }
6237 if (gRpLog && (total_size < 16)) {
6238 const char *pdata = (const char *) (pheader + 1);
6239 const DWORD *pevent = (const DWORD*) (pdata + rp);
6240 fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6241 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6242 }
6243#endif
6244
6245 // these checks are already done before we come here.
6246 // but we check again as last-ressort protection. K.O.
6247 assert(total_size > 0);
6248 assert(total_size >= (int)sizeof(EVENT_HEADER));
6249
6250 rp += total_size;
6251 if (rp >= pheader->size) {
6252 rp -= pheader->size;
6253 } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6254 // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6255 // if at the end of the buffer, the remaining free space is exactly
6256 // equal to the size of an event header, the event header
6257 // is written there, the pointer is wrapped and the event data
6258 // is written to the beginning of the buffer.
6259 rp = 0;
6260 }
6261 return rp;
6262}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 6023 of file midas.cxx.

6023 {
6024 // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
6025 // because of mismatch in sign-extension between signed 16-bit event_id and
6026 // unsigned 16-bit constants. K.O.
6027
6028 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
6029 /* fragmented event */
6030 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
6032
6033 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
6035}
#define TRIGGER_ALL
Definition midas.h:538
#define EVENTID_ALL
Definition midas.h:537
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char who,
const BUFFER_HEADER pheader,
const char pdata,
int  rp 
)
static

Definition at line 6264 of file midas.cxx.

6264 {
6265 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6266 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6267 int total_size = ALIGN8(event_size);
6268
6269 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6270 cm_msg(MERROR, "bm_next_rp",
6271 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6272 pheader->name,
6273 rp,
6274 pevent->data_size,
6275 event_size,
6276 total_size,
6277 pheader->read_pointer,
6278 pheader->write_pointer,
6279 pheader->size,
6280 who);
6281 return -1;
6282 }
6283
6284 int remaining = 0;
6285 if (rp < pheader->write_pointer) {
6286 remaining = pheader->write_pointer - rp;
6287 } else {
6288 remaining = pheader->size - rp;
6289 remaining += pheader->write_pointer;
6290 }
6291
6292 //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6293
6294 if (total_size > remaining) {
6295 cm_msg(MERROR, "bm_next_rp",
6296 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6297 pheader->name,
6298 rp,
6299 pevent->data_size,
6300 event_size,
6301 total_size,
6302 pheader->read_pointer,
6303 pheader->write_pointer,
6304 pheader->size,
6305 remaining,
6306 who);
6307 return -1;
6308 }
6309
6310 rp = bm_incr_rp_no_check(pheader, rp, total_size);
6311
6312 return rp;
6313}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9611 of file midas.cxx.

9611 {
9612 if (request_id >= 0) {
9613 /* if that client has a request and is suspended, wake it up */
9614 if (pc->read_wait) {
9615 char str[80];
9616 sprintf(str, "B %s %d", pheader->name, request_id);
9617 ss_resume(pc->port, str);
9618 //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9619 //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9620 pc->read_wait = FALSE;
9621 }
9622 }
9623}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6725
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8473
INT cm_yield(INT millisec)
Definition midas.cxx:5650
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition midas.cxx:2278
INT cm_disconnect_experiment(void)
Definition midas.cxx:2846
#define CM_SUCCESS
Definition midas.h:582
#define SS_ABORT
Definition midas.h:677
#define RPC_SHUTDOWN
Definition midas.h:707
int main()
Definition hwtest.cxx:23
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define EVENT_BUFFER_NAME
Definition midas.h:269
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6725 of file midas.cxx.

6725 {
6726 INT status;
6727
6728 if (rpc_is_remote()) {
6729 status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6730
6731 HNDLE hDB;
6733 if (status != SUCCESS || hDB == 0) {
6734 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6735 return BM_NO_SHM;
6736 }
6737
6739
6740 int size = sizeof(INT);
6741 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6742
6743 if (status != DB_SUCCESS) {
6744 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6745 status);
6746 return status;
6747 }
6748
6749 return status;
6750 }
6751#ifdef LOCAL_ROUTINES
6752 {
6753 HNDLE shm_handle;
6754 size_t shm_size;
6755 HNDLE hDB;
6756 const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6757
6758 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6759
6760 if (!buffer_name || !buffer_name[0]) {
6761 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6762 return BM_INVALID_PARAM;
6763 }
6764
6765 if (strlen(buffer_name) >= NAME_LENGTH) {
6766 cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6767 return BM_INVALID_PARAM;
6768 }
6769
6771
6772 if (status != SUCCESS || hDB == 0) {
6773 //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6774 return BM_NO_SHM;
6775 }
6776
6777 /* get buffer size from ODB, user parameter as default if not present in ODB */
6778 std::string odb_path;
6779 odb_path += "/Experiment/Buffer sizes/";
6780 odb_path += buffer_name;
6781
6782 int size = sizeof(INT);
6783 status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6784
6786 cm_msg(MERROR, "bm_open_buffer",
6787 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6788 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6789 return BM_INVALID_PARAM;
6790 }
6791
6793
6794 size = sizeof(INT);
6795 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6796
6797 if (status != DB_SUCCESS) {
6798 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6799 status);
6800 return status;
6801 }
6802
6803 /* check if buffer already is open */
6804 gBuffersMutex.lock();
6805 for (size_t i = 0; i < gBuffers.size(); i++) {
6806 BUFFER* pbuf = gBuffers[i];
6807 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6808 *buffer_handle = i + 1;
6809 gBuffersMutex.unlock();
6810 return BM_SUCCESS;
6811 }
6812 }
6813 gBuffersMutex.unlock();
6814
6815 // only one thread at a time should create new buffers
6816
6817 static std::mutex gNewBufferMutex;
6818 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6819
6820 // if we had a race against another thread
6821 // and while we were waiting for gNewBufferMutex
6822 // the other thread created this buffer, we return it.
6823
6824 gBuffersMutex.lock();
6825 for (size_t i = 0; i < gBuffers.size(); i++) {
6826 BUFFER* pbuf = gBuffers[i];
6827 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6828 *buffer_handle = i + 1;
6829 gBuffersMutex.unlock();
6830 return BM_SUCCESS;
6831 }
6832 }
6833 gBuffersMutex.unlock();
6834
6835 /* allocate new BUFFER object */
6836
6837 BUFFER* pbuf = new BUFFER;
6838
6839 /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6840
6841 for (int i=0; i<MAX_CLIENTS; i++) {
6843 pbuf->client_time_write_wait[i] = 0;
6844 }
6845
6846 /* create buffer semaphore */
6847
6848 status = ss_semaphore_create(buffer_name, &(pbuf->semaphore));
6849
6850 if (status != SS_CREATED && status != SS_SUCCESS) {
6851 *buffer_handle = 0;
6852 delete pbuf;
6853 return BM_NO_SEMAPHORE;
6854 }
6855
6856 std::string client_name = cm_get_client_name();
6857
6858 /* store client name */
6859 mstrlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6860
6861 /* store buffer name */
6862 mstrlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6863
6864 /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6865
6866 pbuf->attached = true; // required by bm_lock_buffer()
6867
6869
6870 if (!pbuf_guard.is_locked()) {
6871 // cannot happen, no other thread can see this pbuf
6872 abort();
6873 return BM_NO_SEMAPHORE;
6874 }
6875
6876 /* open shared memory */
6877
6878 void *p = NULL;
6879 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6880
6881 if (status != SS_SUCCESS && status != SS_CREATED) {
6882 *buffer_handle = 0;
6883 pbuf_guard.unlock();
6884 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6885 delete pbuf;
6886 return BM_NO_SHM;
6887 }
6888
6889 pbuf->buffer_header = (BUFFER_HEADER *) p;
6890
6891 BUFFER_HEADER *pheader = pbuf->buffer_header;
6892
6893 bool shm_created = (status == SS_CREATED);
6894
6895 if (shm_created) {
6896 /* initialize newly created shared memory */
6897
6898 memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6899
6900 mstrlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6901 pheader->size = buffer_size;
6902
6903 } else {
6904 /* validate existing shared memory */
6905
6906 if (!equal_ustring(pheader->name, buffer_name)) {
6907 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6908 pbuf_guard.unlock();
6909 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6910 cm_msg(MERROR, "bm_open_buffer",
6911 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6912 pheader->name);
6913 *buffer_handle = 0;
6914 delete pbuf;
6915 return BM_CORRUPTED;
6916 }
6917
6918 if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6919 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6920 pbuf_guard.unlock();
6921 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6922 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6924 *buffer_handle = 0;
6925 delete pbuf;
6926 return BM_CORRUPTED;
6927 }
6928
6929 if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6930 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6931 pbuf_guard.unlock();
6932 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6933 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6935 *buffer_handle = 0;
6936 delete pbuf;
6937 return BM_CORRUPTED;
6938 }
6939
6940 /* check if buffer size is identical */
6941 if (pheader->size != buffer_size) {
6942 cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6943 buffer_name, buffer_size, pheader->size);
6944
6945 buffer_size = pheader->size;
6946
6947 ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6948
6949 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6950
6951 if (status != SS_SUCCESS) {
6952 *buffer_handle = 0;
6953 pbuf_guard.unlock();
6954 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6955 delete pbuf;
6956 return BM_NO_SHM;
6957 }
6958
6959 pbuf->buffer_header = (BUFFER_HEADER *) p;
6960 pheader = pbuf->buffer_header;
6961 }
6962 }
6963
6964 /* shared memory is good from here down */
6965
6966 pbuf->attached = true;
6967
6968 pbuf->shm_handle = shm_handle;
6969 pbuf->shm_size = shm_size;
6970 pbuf->callback = FALSE;
6971
6972 bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6973
6975 if (status != BM_SUCCESS) {
6976 cm_msg(MERROR, "bm_open_buffer",
6977 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6978 status);
6980 cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6981 }
6982
6983 /* add our client BUFFER_HEADER */
6984
6985 int iclient = 0;
6986 for (; iclient < MAX_CLIENTS; iclient++)
6987 if (pheader->client[iclient].pid == 0)
6988 break;
6989
6990 if (iclient == MAX_CLIENTS) {
6991 *buffer_handle = 0;
6992 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6993 pbuf_guard.unlock();
6994 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6995 delete pbuf;
6996 cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
6997 return BM_NO_SLOT;
6998 }
6999
7000 /* store slot index in _buffer structure */
7001 pbuf->client_index = iclient;
7002
7003 /*
7004 Save the index of the last client of that buffer so that later only
7005 the clients 0..max_client_index-1 have to be searched through.
7006 */
7007 pheader->num_clients++;
7008 if (iclient + 1 > pheader->max_client_index)
7009 pheader->max_client_index = iclient + 1;
7010
7011 /* setup buffer header and client structure */
7012 BUFFER_CLIENT *pclient = &pheader->client[iclient];
7013
7014 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7015
7016 mstrlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
7017
7018 pclient->pid = ss_getpid();
7019
7021
7022 pclient->read_pointer = pheader->write_pointer;
7023 pclient->last_activity = ss_millitime();
7024
7025 cm_get_watchdog_params(NULL, &pclient->watchdog_timeout);
7026
7027 pbuf_guard.unlock();
7028
7029 /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
7030
7031 pheader = NULL;
7032
7033 /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
7034
7037
7038 /* add pbuf to buffer list */
7039
7040 gBuffersMutex.lock();
7041
7042 bool added = false;
7043 for (size_t i=0; i<gBuffers.size(); i++) {
7044 if (gBuffers[i] == NULL) {
7045 gBuffers[i] = pbuf;
7046 added = true;
7047 *buffer_handle = i+1;
7048 break;
7049 }
7050 }
7051 if (!added) {
7052 *buffer_handle = gBuffers.size() + 1;
7053 gBuffers.push_back(pbuf);
7054 }
7055
7056 /* from here down we should not touch pbuf without locking it */
7057
7058 pbuf = NULL;
7059
7060 gBuffersMutex.unlock();
7061
7062 /* new buffer is now ready for use */
7063
7064 /* initialize buffer counters */
7065 bm_init_buffer_counters(*buffer_handle);
7066
7067 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7068
7069 if (shm_created)
7070 return BM_CREATED;
7071 }
7072#endif /* LOCAL_ROUTINES */
7073
7074 return BM_SUCCESS;
7075}
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition midas.cxx:6074
static DWORD _bm_max_event_size
Definition midas.cxx:5922
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition midas.cxx:6416
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition midas.cxx:6399
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3325
std::string cm_get_client_name()
Definition midas.cxx:2059
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition midas.cxx:6160
#define BM_NO_SLOT
Definition midas.h:610
#define BM_NO_SHM
Definition midas.h:622
#define BM_CREATED
Definition midas.h:606
#define BM_NO_SEMAPHORE
Definition midas.h:611
#define SS_SUCCESS
Definition midas.h:663
#define SS_CREATED
Definition midas.h:664
#define SUCCESS
Definition mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition system.cxx:2460
INT ss_getpid(void)
Definition system.cxx:1377
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition system.cxx:324
midas_thread_t ss_gettid(void)
Definition system.cxx:1519
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition system.cxx:4353
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5415
#define RPC_BM_OPEN_BUFFER
Definition mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition midas.cxx:8071
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
int client_count_write_wait[MAX_CLIENTS]
Definition midas.h:1022
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8899 of file midas.cxx.

8900{
8901 if (pc->read_pointer == pheader->write_pointer) {
8902 /* no more events buffered for this client */
8903 if (!pc->read_wait) {
8904 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8905 pc->read_wait = TRUE;
8906 }
8907 return BM_ASYNC_RETURN;
8908 }
8909
8910 if (pc->read_wait) {
8911 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8912 pc->read_wait = FALSE;
8913 }
8914
8915 if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8916 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8917 return BM_CORRUPTED;
8918 }
8919
8920 char *pdata = (char *) (pheader + 1);
8921
8922 EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8923 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8924 int total_size = ALIGN8(event_size);
8925
8926 if ((total_size <= 0) || (total_size > pheader->size)) {
8927 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8928 return BM_CORRUPTED;
8929 }
8930
8931 assert(total_size > 0);
8932 assert(total_size <= pheader->size);
8933
8934 if (ppevent)
8935 *ppevent = pevent;
8936 if (pevent_size)
8938 if (ptotal_size)
8939 *ptotal_size = total_size;
8940
8941 return BM_SUCCESS;
8942}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int pevent_size,
int ptotal_size 
)
static

Definition at line 8873 of file midas.cxx.

8874{
8875 if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8876 return FALSE;
8877
8878 EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8879 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8880 int total_size = ALIGN8(event_size);
8881
8882 if (ppevent)
8883 *ppevent = pevent;
8884 if (pevent_size)
8886 if (ptotal_size)
8887 *ptotal_size = total_size;
8888
8889 return TRUE;
8890}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11134 of file midas.cxx.

11148{
11150
11151 //printf("bm_poll_event!\n");
11152
11153 DWORD start_time = ss_millitime();
11154
11155 std::vector<char> vec;
11156
11157 /* loop over all requests */
11158 _request_list_mutex.lock();
11159 bool locked = true;
11160 size_t n = _request_list.size();
11161 for (size_t i = 0; i < n; i++) {
11162 if (!locked) {
11163 _request_list_mutex.lock();
11164 locked = true;
11165 }
11166 /* continue if no dispatcher set (manual bm_receive_event) */
11167 if (_request_list[i].dispatcher == NULL)
11168 continue;
11169
11170 int buffer_handle = _request_list[i].buffer_handle;
11171
11172 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11173 _request_list_mutex.unlock();
11174 locked = false;
11175
11176 do {
11177 /* receive event */
11178 int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11179
11180 //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11181
11182 /* call user function if successful */
11183 if (status == BM_SUCCESS) {
11184 bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11186 }
11187
11188 /* break if no more events */
11189 if (status == BM_ASYNC_RETURN)
11190 break;
11191
11192 /* break if corrupted event buffer */
11193 if (status == BM_TRUNCATED) {
11194 cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11195 }
11196
11197 /* break if corrupted event buffer */
11198 if (status == BM_CORRUPTED)
11199 return SS_ABORT;
11200
11201 /* break if server died */
11202 if (status == RPC_NET_ERROR) {
11203 return SS_ABORT;
11204 }
11205
11206 /* stop after one second */
11207 if (ss_millitime() - start_time > 1000) {
11208 break;
11209 }
11210
11211 } while (TRUE);
11212 }
11213
11214 if (locked)
11215 _request_list_mutex.unlock();
11216
11218 return BM_SUCCESS;
11219 else
11220 return BM_ASYNC_RETURN;
11221}
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10817
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition midas.cxx:8831
#define BM_TRUNCATED
Definition midas.h:614
#define RPC_NET_ERROR
Definition midas.h:701
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 10910 of file midas.cxx.

10910 {
10911 //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
10912
10913 /* return immediately if no callback routine is defined */
10914 if (!pbuf->callback)
10915 return BM_SUCCESS;
10916
10917 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
10918}
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition midas.cxx:10272
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10272 of file midas.cxx.

10272 {
10274
10275 int max_size = 0;
10276 if (buf_size) {
10277 max_size = *buf_size;
10278 *buf_size = 0;
10279 }
10280
10281 //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10282
10283 bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10284
10285 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10286
10287 /* look if there is anything in the cache */
10288 if (pbuf->read_cache_size > 0) {
10289
10291
10292 if (status != BM_SUCCESS)
10293 return status;
10294
10295 if (pbuf->read_cache_wp == 0) {
10296
10297 // lock buffer for the first time
10298
10299 if (!pbuf_guard.relock()) {
10300 pbuf->read_cache_mutex.unlock();
10301 return pbuf_guard.get_status();
10302 }
10303
10305 if (status != BM_SUCCESS) {
10306 // unlock in correct order
10307 if (pbuf_guard.is_locked()) {
10308 // check if bm_wait_for_more_events() failed to relock the buffer
10309 pbuf_guard.unlock();
10310 }
10311 pbuf->read_cache_mutex.unlock();
10312 return status;
10313 }
10314
10315 // buffer remains locked here
10316 }
10317 EVENT_HEADER *pevent;
10318 int event_size;
10319 int total_size;
10320 if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10321 if (pbuf_guard.is_locked()) {
10322 // do not need to keep the event buffer locked
10323 // when reading from the read cache
10324 pbuf_guard.unlock();
10325 }
10326 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10328 if (buf) {
10329 if (event_size > max_size) {
10330 cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10333 }
10334
10335 memcpy(buf, pevent, event_size);
10336
10337 if (buf_size) {
10338 *buf_size = event_size;
10339 }
10340 if (convert_flags) {
10341 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10342 }
10343 } else if (bufptr) {
10345 memcpy(*bufptr, pevent, event_size);
10347 } else if (vecptr) {
10348 vecptr->resize(0);
10349 char* cptr = (char*)pevent;
10350 vecptr->assign(cptr, cptr+event_size);
10351 }
10352 bm_incr_read_cache_locked(pbuf, total_size);
10353 pbuf->read_cache_mutex.unlock();
10354 if (dispatch) {
10355 // FIXME need to protect currently dispatched event against
10356 // another thread overwriting it by refilling the read cache
10357 bm_dispatch_event(buffer_handle, pevent);
10358 return BM_MORE_EVENTS;
10359 }
10360 // buffer is unlocked here
10361 return status;
10362 }
10363 pbuf->read_cache_mutex.unlock();
10364 }
10365
10366 /* we come here if the read cache is disabled */
10367 /* we come here if the next event is too big to fit into the read cache */
10368
10369 if (!pbuf_guard.is_locked()) {
10370 if (!pbuf_guard.relock())
10371 return pbuf_guard.get_status();
10372 }
10373
10375
10376 BUFFER_HEADER *pheader = pbuf->buffer_header;
10377
10379
10380 while (1) {
10381 /* loop over events in the event buffer */
10382
10384
10385 if (status != BM_SUCCESS) {
10386 // implicit unlock
10387 return status;
10388 }
10389
10390 /* check if event at current read pointer matches a request */
10391
10392 EVENT_HEADER *pevent;
10393 int event_size;
10394 int total_size;
10395
10396 status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10397 if (status == BM_CORRUPTED) {
10398 // implicit unlock
10399 return status;
10400 } else if (status != BM_SUCCESS) {
10401 /* event buffer is empty */
10402 break;
10403 }
10404
10406
10407 if (is_requested) {
10408 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10409
10411
10412 if (buf) {
10413 if (event_size > max_size) {
10414 cm_msg(MERROR, "bm_read_buffer",
10415 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10416 event_size, pheader->name);
10419 }
10420
10421 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10422
10423 if (buf_size) {
10424 *buf_size = event_size;
10425 }
10426
10427 if (convert_flags) {
10428 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10429 }
10430
10431 pbuf->count_read++;
10432 pbuf->bytes_read += event_size;
10433 } else if (dispatch || bufptr) {
10434 assert(event_buffer == NULL); // make sure we only come here once
10436 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) event_buffer, event_size);
10437 pbuf->count_read++;
10438 pbuf->bytes_read += event_size;
10439 } else if (vecptr) {
10440 bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10441 pbuf->count_read++;
10442 pbuf->bytes_read += event_size;
10443 }
10444
10445 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10446 pc->read_pointer = new_read_pointer;
10447
10448 pheader->num_out_events++;
10449 /* exit loop over events */
10450 break;
10451 }
10452
10453 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10454 pc->read_pointer = new_read_pointer;
10455 pheader->num_out_events++;
10456 }
10457
10458 /*
10459 If read pointer has been changed, it may have freed up some space
10460 for waiting producers. So check if free space is now more than 50%
10461 of the buffer size and wake waiting producers.
10462 */
10463
10465
10466 pbuf_guard.unlock();
10467
10468 if (dispatch && event_buffer) {
10469 bm_dispatch_event(buffer_handle, event_buffer);
10470 free(event_buffer);
10472 return BM_MORE_EVENTS;
10473 }
10474
10475 if (bufptr && event_buffer) {
10479 }
10480
10481 if (event_buffer) {
10482 free(event_buffer);
10484 }
10485
10486 return status;
10487}
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition midas.cxx:9077
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:9000
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8873
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition midas.cxx:8863
void * event_buffer
Definition mfe.cxx:65
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char buf,
int  event_size 
)
static

Definition at line 8944 of file midas.cxx.

8945{
8946 const char *pdata = (const char *) (pheader + 1);
8947
8948 if (rp + event_size <= pheader->size) {
8949 /* copy event to cache */
8950 memcpy(buf, pdata + rp, event_size);
8951 } else {
8952 /* event is splitted */
8953 int size = pheader->size - rp;
8954 memcpy(buf, pdata + rp, size);
8955 memcpy(buf + size, pdata, event_size - size);
8956 }
8957}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8959 of file midas.cxx.

8960{
8961 const char *pdata = (const char *) (pheader + 1);
8962
8963 if (rp + event_size <= pheader->size) {
8964 /* copy event to cache */
8965 vecptr->assign(pdata + rp, pdata + rp + event_size);
8966 } else {
8967 /* event is splitted */
8968 int size = pheader->size - rp;
8969 vecptr->assign(pdata + rp, pdata + rp + size);
8970 vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8971 }
8972}
Here is the call graph for this function:

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition midas.cxx:10658
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10658 of file midas.cxx.

10658 {
10659 //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10660 if (rpc_is_remote()) {
10661 return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10662 }
10663#ifdef LOCAL_ROUTINES
10664 {
10666
10667 BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10668
10669 if (!pbuf)
10670 return status;
10671
10672 int convert_flags = rpc_get_convert_flags();
10673
10674 status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10675 //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10676 return status;
10677 }
10678#else /* LOCAL_ROUTINES */
10679
10680 return BM_SUCCESS;
10681#endif
10682}
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10491
INT rpc_get_convert_flags(void)
Definition midas.cxx:13038
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10739 of file midas.cxx.

10739 {
10740 if (rpc_is_remote()) {
10741 return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10742 }
10743#ifdef LOCAL_ROUTINES
10744 {
10746
10747 BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10748
10749 if (!pbuf)
10750 return status;
10751
10752 int convert_flags = rpc_get_convert_flags();
10753
10754 return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10755 }
10756#else /* LOCAL_ROUTINES */
10757
10758 return BM_SUCCESS;
10759#endif
10760}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void buf,
int buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10491 of file midas.cxx.

10492{
10493 //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10494
10495 assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10496
10497 void *xbuf = NULL;
10498 int xbuf_size = 0;
10499
10500 if (buf) {
10501 xbuf = buf;
10502 xbuf_size = *buf_size;
10503 } else if (ppevent) {
10506 } else if (pvec) {
10507 pvec->resize(_bm_max_event_size);
10508 xbuf = pvec->data();
10509 xbuf_size = pvec->size();
10510 } else {
10511 assert(!"incorrect call to bm_receivent_event_rpc()");
10512 }
10513
10514 int status;
10517
10519
10520 int zbuf_size = xbuf_size;
10521
10522 while (1) {
10523 if (timeout_msec == BM_WAIT) {
10524 xtimeout_msec = 1000;
10525 } else if (timeout_msec == BM_NO_WAIT) {
10527 } else {
10528 if (xtimeout_msec > 1000) {
10529 xtimeout_msec = 1000;
10530 }
10531 }
10532
10534
10536
10537 //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10538
10539 if (status == BM_ASYNC_RETURN) {
10540 if (timeout_msec == BM_WAIT) {
10541 // BM_WAIT means wait forever
10542 continue;
10543 } else if (timeout_msec == BM_NO_WAIT) {
10544 // BM_NO_WAIT means do not wait
10545 break;
10546 } else {
10548 if (now >= time_end) {
10549 // timeout, return BM_ASYNC_RETURN
10550 break;
10551 }
10552
10554
10555 if (remain < (DWORD)xtimeout_msec) {
10557 }
10558
10559 // keep asking for event...
10560 continue;
10561 }
10562 } else if (status == BM_SUCCESS) {
10563 // success, return BM_SUCCESS
10564 break;
10565 }
10566
10567 // RPC error
10568
10569 if (buf) {
10570 *buf_size = 0;
10571 } else if (ppevent) {
10572 free(*ppevent);
10573 *ppevent = NULL;
10574 } else if (pvec) {
10575 pvec->resize(0);
10576 } else {
10577 assert(!"incorrect call to bm_receivent_event_rpc()");
10578 }
10579
10580 return status;
10581 }
10582
10583 // status is BM_SUCCESS or BM_ASYNC_RETURN
10584
10585 if (buf) {
10586 *buf_size = zbuf_size;
10587 } else if (ppevent) {
10588 // nothing to do
10589 // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10590 } else if (pvec) {
10591 pvec->resize(zbuf_size);
10592 } else {
10593 assert(!"incorrect call to bm_receivent_event_rpc()");
10594 }
10595
10596 return status;
10597}
#define RPC_BM_RECEIVE_EVENT
Definition mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10817 of file midas.cxx.

10817 {
10818 if (rpc_is_remote()) {
10819 return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10820 }
10821#ifdef LOCAL_ROUTINES
10822 {
10824
10825 BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10826
10827 if (!pbuf)
10828 return status;
10829
10830 int convert_flags = rpc_get_convert_flags();
10831
10832 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10833 }
10834#else /* LOCAL_ROUTINES */
10835 return BM_SUCCESS;
10836#endif
10837}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 6043 of file midas.cxx.

6043 {
6044 int k, nc;
6046
6047 /* clear entry from client structure in buffer header */
6048 memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6049
6050 /* calculate new max_client_index entry */
6051 for (k = MAX_CLIENTS - 1; k >= 0; k--)
6052 if (pheader->client[k].pid != 0)
6053 break;
6054 pheader->max_client_index = k + 1;
6055
6056 /* count new number of clients */
6057 for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6058 if (pheader->client[k].pid != 0)
6059 nc++;
6060 pheader->num_clients = nc;
6061
6062 /* check if anyone is waiting and wake him up */
6063 pbctmp = pheader->client;
6064
6065 for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6066 if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6067 ss_resume(pbctmp->port, "B ");
6068}
INT k
Definition odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8526 of file midas.cxx.

8526 {
8527 if (rpc_is_remote())
8528 return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8529
8530#ifdef LOCAL_ROUTINES
8531 {
8532 int status = 0;
8533
8534 BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8535
8536 if (!pbuf)
8537 return status;
8538
8539 /* lock buffer */
8541
8542 if (!pbuf_guard.is_locked())
8543 return pbuf_guard.get_status();
8544
8545 INT i, deleted;
8546
8547 /* get a pointer to the proper client structure */
8549
8550 /* check all requests and set to zero if matching */
8551 for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8552 if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8553 memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8554 deleted++;
8555 }
8556
8557 /* calculate new max_request_index entry */
8558 for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8559 if (pclient->event_request[i].valid)
8560 break;
8561
8562 pclient->max_request_index = i + 1;
8563
8564 /* calculate new all_flag */
8565 pclient->all_flag = FALSE;
8566
8567 for (i = 0; i < pclient->max_request_index; i++)
8568 if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8569 pclient->all_flag = TRUE;
8570 break;
8571 }
8572
8573 pbuf->get_all_flag = pclient->all_flag;
8574
8575 if (!deleted)
8576 return BM_NOT_FOUND;
8577 }
8578#endif /* LOCAL_ROUTINES */
8579
8580 return BM_SUCCESS;
8581}
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8473 of file midas.cxx.

8477{
8478 assert(request_id != NULL);
8479
8480 EventRequest r;
8481 r.buffer_handle = buffer_handle;
8482 r.event_id = event_id;
8484 r.dispatcher = func;
8485
8486 {
8487 std::lock_guard<std::mutex> guard(_request_list_mutex);
8488
8489 bool found = false;
8490
8491 // find deleted entry
8492 for (size_t i = 0; i < _request_list.size(); i++) {
8493 if (_request_list[i].buffer_handle == 0) {
8494 _request_list[i] = r;
8495 *request_id = i;
8496 found = true;
8497 break;
8498 }
8499 }
8500
8501 if (!found) { // not found
8502 *request_id = _request_list.size();
8503 _request_list.push_back(r);
8504 }
8505
8506 // implicit unlock()
8507 }
8508
8509 /* add request in buffer structure */
8510 int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8511 if (status != BM_SUCCESS)
8512 return status;
8513
8514 return BM_SUCCESS;
8515}
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition midas.cxx:8322
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6399 of file midas.cxx.

6399 {
6400 BUFFER_HEADER *pheader = pbuf->buffer_header;
6401
6402 //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6403
6404 pheader->read_pointer = 0;
6405 pheader->write_pointer = 0;
6406
6407 int i;
6408 for (i = 0; i < pheader->max_client_index; i++) {
6409 BUFFER_CLIENT *pc = pheader->client + i;
6410 if (pc->pid) {
6411 pc->read_pointer = 0;
6412 }
6413 }
6414}
INT read_pointer
Definition midas.h:940
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9686 of file midas.cxx.

9687{
9688 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9689 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9690
9691 if (data_size == 0) {
9692 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9693 return BM_INVALID_SIZE;
9694 }
9695
9696 if (data_size > MAX_DATA_SIZE) {
9697 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9698 return BM_INVALID_SIZE;
9699 }
9700
9701 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9702
9703 //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9704
9705 if (rpc_is_remote()) {
9706 //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9707 return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9708 } else {
9709 return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9710 }
9711}
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9786
#define BM_INVALID_SIZE
Definition midas.h:624
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition midas.cxx:13933
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition midas.cxx:9686
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9786 of file midas.cxx.

9787{
9788 if (rpc_is_remote())
9789 return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9790
9791 if (sg_n < 1) {
9792 cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9793 return BM_INVALID_SIZE;
9794 }
9795
9796 if (sg_ptr[0] == NULL) {
9797 cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9798 return BM_INVALID_SIZE;
9799 }
9800
9801 if (sg_len[0] < sizeof(EVENT_HEADER)) {
9802 cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9803 return BM_INVALID_SIZE;
9804 }
9805
9806 const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9807
9808 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9809 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9810
9811 if (data_size == 0) {
9812 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9813 return BM_INVALID_SIZE;
9814 }
9815
9816 if (data_size > MAX_DATA_SIZE) {
9817 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9818 return BM_INVALID_SIZE;
9819 }
9820
9821 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9822
9823 size_t count = 0;
9824 for (int i=0; i<sg_n; i++) {
9825 count += sg_len[i];
9826 }
9827
9828 if (count != event_size) {
9829 cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9830 return BM_INVALID_SIZE;
9831 }
9832
9833 //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9834
9835#ifdef LOCAL_ROUTINES
9836 {
9837 int status = 0;
9838 const size_t total_size = ALIGN8(event_size);
9839
9840 BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9841
9842 if (!pbuf)
9843 return status;
9844
9845 /* round up total_size to next DWORD boundary */
9846 //int total_size = ALIGN8(event_size);
9847
9848 /* check if write cache is enabled */
9849 if (pbuf->write_cache_size) {
9851
9852 if (status != BM_SUCCESS)
9853 return status;
9854
9855 /* check if write cache is enabled */
9856 if (pbuf->write_cache_size) {
9857 size_t max_event_size = pbuf->write_cache_size/MAX_WRITE_CACHE_EVENT_SIZE_DIV;
9859
9860 //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9861
9862 /* if this event does not fit into the write cache, flush the write cache */
9863 if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9864 //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9865
9867
9868 if (!pbuf_guard.is_locked()) {
9869 pbuf->write_cache_mutex.unlock();
9870 return pbuf_guard.get_status();
9871 }
9872
9874
9875 if (pbuf_guard.is_locked()) {
9876 // check if bm_wait_for_free_space() failed to relock the buffer
9877 pbuf_guard.unlock();
9878 }
9879
9880 if (status != BM_SUCCESS) {
9881 pbuf->write_cache_mutex.unlock();
9882 // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9883 if (status == BM_NO_MEMORY)
9884 cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9885 return status;
9886 }
9887
9888 // write cache must be empty here
9889 assert(pbuf->write_cache_wp == 0);
9890 }
9891
9892 /* write this event into the write cache, if it is not too big and if it fits */
9893 if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9894 //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9895
9896 char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9897
9898 for (int i=0; i<sg_n; i++) {
9899 memcpy(wptr, sg_ptr[i], sg_len[i]);
9900 wptr += sg_len[i];
9901 }
9902
9903 pbuf->write_cache_wp += total_size;
9904
9905 pbuf->write_cache_mutex.unlock();
9906 return BM_SUCCESS;
9907 }
9908 }
9909
9910 /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9911 pbuf->write_cache_mutex.unlock();
9912 }
9913
9914 /* we come here only for events that are too big to fit into the cache */
9915
9916 /* lock the buffer */
9918
9919 if (!pbuf_guard.is_locked()) {
9920 return pbuf_guard.get_status();
9921 }
9922
9923 /* calculate some shorthands */
9924 BUFFER_HEADER *pheader = pbuf->buffer_header;
9925
9926#if 0
9928 if (status != BM_SUCCESS) {
9929 printf("bm_send_event: corrupted 111!\n");
9930 abort();
9931 }
9932#endif
9933
9934 /* check if buffer is large enough */
9935 if (total_size >= (size_t)pheader->size) {
9936 pbuf_guard.unlock(); // unlock before cm_msg()
9937 cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9938 return BM_NO_MEMORY;
9939 }
9940
9942
9943 if (status != BM_SUCCESS) {
9944 // implicit unlock
9945 return status;
9946 }
9947
9948#if 0
9950 if (status != BM_SUCCESS) {
9951 printf("bm_send_event: corrupted 222!\n");
9952 abort();
9953 }
9954#endif
9955
9956 int old_write_pointer = pheader->write_pointer;
9957
9958 bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9959
9960 /* write pointer was incremented, but there should
9961 * always be some free space in the buffer and the
9962 * write pointer should never cacth up to the read pointer:
9963 * the rest of the code gets confused this happens (buffer 100% full)
9964 * as it is write_pointer == read_pointer can be either
9965 * 100% full or 100% empty. My solution: never fill
9966 * the buffer to 100% */
9967 assert(pheader->write_pointer != pheader->read_pointer);
9968
9969 /* send wake up messages to all clients that want this event */
9970 int i;
9971 for (i = 0; i < pheader->max_client_index; i++) {
9972 BUFFER_CLIENT *pc = pheader->client + i;
9973 int request_id = bm_find_first_request_locked(pc, pevent);
9974 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9975 }
9976
9977#if 0
9979 if (status != BM_SUCCESS) {
9980 printf("bm_send_event: corrupted 333!\n");
9981 abort();
9982 }
9983#endif
9984
9985 /* update statistics */
9986 pheader->num_in_events++;
9987 pbuf->count_sent += 1;
9988 pbuf->bytes_sent += total_size;
9989 }
9990#endif /* LOCAL_ROUTINES */
9991
9992 return BM_SUCCESS;
9993}
double count
Definition mdump.cxx:33
INT max_event_size
Definition mfed.cxx:30
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition midas.h:259
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9713 of file midas.cxx.

9714{
9715 const char* cptr = event.data();
9716 size_t clen = event.size();
9717 return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9718}
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char > > &  event,
int  timeout_msec 
)

Definition at line 9720 of file midas.cxx.

9721{
9722 int sg_n = event.size();
9723 const char* sg_ptr[sg_n];
9724 size_t sg_len[sg_n];
9725 for (int i=0; i<sg_n; i++) {
9726 sg_ptr[i] = event[i].data();
9727 sg_len[i] = event[i].size();
9728 }
9729 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9730}
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8148 of file midas.cxx.

8150{
8151 if (rpc_is_remote())
8152 return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8153
8154#ifdef LOCAL_ROUTINES
8155 {
8156 int status = 0;
8157
8158 BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8159
8160 if (!pbuf)
8161 return status;
8162
8163 /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8164
8166
8167 if (status != BM_SUCCESS)
8168 return status;
8169
8170 if (write_size < 0)
8171 write_size = 0;
8172
8173 if (write_size > 0) {
8175 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8177 }
8178 }
8179
8180 size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8181
8182 if (write_size > max_write_size) {
8184 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8186 }
8187
8188 pbuf->buffer_mutex.unlock();
8189
8190 /* resize read cache */
8191
8193
8194 if (status != BM_SUCCESS) {
8195 return status;
8196 }
8197
8198 if (pbuf->read_cache_size > 0) {
8199 free(pbuf->read_cache);
8200 pbuf->read_cache = NULL;
8201 }
8202
8203 if (read_size > 0) {
8204 pbuf->read_cache = (char *) malloc(read_size);
8205 if (pbuf->read_cache == NULL) {
8206 pbuf->read_cache_size = 0;
8207 pbuf->read_cache_rp = 0;
8208 pbuf->read_cache_wp = 0;
8209 pbuf->read_cache_mutex.unlock();
8210 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8211 return BM_NO_MEMORY;
8212 }
8213 }
8214
8215 pbuf->read_cache_size = read_size;
8216 pbuf->read_cache_rp = 0;
8217 pbuf->read_cache_wp = 0;
8218
8219 pbuf->read_cache_mutex.unlock();
8220
8221 /* resize the write cache */
8222
8224
8225 if (status != BM_SUCCESS)
8226 return status;
8227
8228 // FIXME: should flush the write cache!
8229 if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8230 cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8231 }
8232
8233 /* manage write cache */
8234 if (pbuf->write_cache_size > 0) {
8235 free(pbuf->write_cache);
8236 pbuf->write_cache = NULL;
8237 }
8238
8239 if (write_size > 0) {
8240 pbuf->write_cache = (char *) M_MALLOC(write_size);
8241 if (pbuf->write_cache == NULL) {
8242 pbuf->write_cache_size = 0;
8243 pbuf->write_cache_rp = 0;
8244 pbuf->write_cache_wp = 0;
8245 pbuf->write_cache_mutex.unlock();
8246 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8247 return BM_NO_MEMORY;
8248 }
8249 }
8250
8251 pbuf->write_cache_size = write_size;
8252 pbuf->write_cache_rp = 0;
8253 pbuf->write_cache_wp = 0;
8254
8255 pbuf->write_cache_mutex.unlock();
8256 }
8257#endif /* LOCAL_ROUTINES */
8258
8259 return BM_SUCCESS;
8260}
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition midas.cxx:7954
#define RPC_BM_SET_CACHE_SIZE
Definition mrpc.h:42
#define M_MALLOC(x)
Definition midas.h:1550
#define MIN_WRITE_CACHE_SIZE
Definition midas.h:257
#define MAX_WRITE_CACHE_SIZE_DIV
Definition midas.h:258
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10841 of file midas.cxx.

10842{
10843 /* clear read cache */
10844 if (pbuf->read_cache_size > 0) {
10845
10847
10848 if (status != BM_SUCCESS)
10849 return status;
10850
10851 pbuf->read_cache_rp = 0;
10852 pbuf->read_cache_wp = 0;
10853
10854 pbuf->read_cache_mutex.unlock();
10855 }
10856
10858
10859 if (!pbuf_guard.is_locked())
10860 return pbuf_guard.get_status();
10861
10862 BUFFER_HEADER *pheader = pbuf->buffer_header;
10863
10864 /* forward read pointer to global write pointer */
10866 pclient->read_pointer = pheader->write_pointer;
10867
10868 return BM_SUCCESS;
10869}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 10882 of file midas.cxx.

10882 {
10883 if (rpc_is_remote())
10884 return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
10885
10886#ifdef LOCAL_ROUTINES
10887 {
10888 int status = 0;
10889
10890 BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
10891
10892 if (!pbuf)
10893 return status;
10894
10895 return bm_skip_event(pbuf);
10896 }
10897#endif
10898
10899 return BM_SUCCESS;
10900}
#define RPC_BM_SKIP_EVENT
Definition mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6125 of file midas.cxx.

6125 {
6126 int pid = ss_getpid();
6127
6128 std::vector<BUFFER*> mybuffers;
6129
6130 gBuffersMutex.lock();
6132 gBuffersMutex.unlock();
6133
6134 for (BUFFER* pbuf : mybuffers) {
6135 if (!pbuf)
6136 continue;
6137 if (pbuf->attached) {
6138
6140
6141 if (!pbuf_guard.is_locked())
6142 continue;
6143
6144 BUFFER_HEADER *pheader = pbuf->buffer_header;
6145 for (int j = 0; j < pheader->max_client_index; j++) {
6146 BUFFER_CLIENT *pclient = pheader->client + j;
6147 if (pclient->pid == pid) {
6149 }
6150 }
6151 }
6152 }
6153}
DWORD last_activity
Definition midas.h:950
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8728 of file midas.cxx.

8728 {
8729 assert(caller_name);
8730
8731 /* calculate global read pointer as "minimum" of client read pointers */
8732 int min_rp = pheader->write_pointer;
8733
8734 int i;
8735 for (i = 0; i < pheader->max_client_index; i++) {
8736 BUFFER_CLIENT *pc = pheader->client + i;
8737 if (pc->pid) {
8739
8740#if 0
8741 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8742 pheader->name,
8743 pheader->read_pointer,
8744 pheader->write_pointer,
8745 pheader->size,
8746 min_rp,
8747 pc->name,
8748 pc->read_pointer);
8749#endif
8750
8751 if (pheader->read_pointer <= pheader->write_pointer) {
8752 // normal pointers
8753 if (pc->read_pointer < min_rp)
8754 min_rp = pc->read_pointer;
8755 } else {
8756 // inverted pointers
8757 if (pc->read_pointer <= pheader->write_pointer) {
8758 // clients 3 and 4
8759 if (pc->read_pointer < min_rp)
8760 min_rp = pc->read_pointer;
8761 } else {
8762 // clients 1 and 2
8763 int xptr = pc->read_pointer - pheader->size;
8764 if (xptr < min_rp)
8765 min_rp = xptr;
8766 }
8767 }
8768 }
8769 }
8770
8771 if (min_rp < 0)
8772 min_rp += pheader->size;
8773
8774 assert(min_rp >= 0);
8775 assert(min_rp < pheader->size);
8776
8777 if (min_rp == pheader->read_pointer) {
8778 return FALSE;
8779 }
8780
8781#if 0
8782 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8783 pheader->name,
8784 pheader->read_pointer,
8785 pheader->write_pointer,
8786 pheader->size,
8787 min_rp);
8788#endif
8789
8790 pheader->read_pointer = min_rp;
8791
8792 return TRUE;
8793}
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition midas.cxx:8630
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6315 of file midas.cxx.

6315 {
6316 const BUFFER_HEADER *pheader = pbuf->buffer_header;
6317 const char *pdata = (const char *) (pheader + 1);
6318
6319 //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6320
6321 //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6322
6323 //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6324
6325 if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6326 cm_msg(MERROR, "bm_validate_buffer",
6327 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6328 pheader->read_pointer, pheader->size, pheader->write_pointer);
6329 return BM_CORRUPTED;
6330 }
6331
6332 if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6333 cm_msg(MERROR, "bm_validate_buffer",
6334 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6335 pheader->write_pointer, pheader->size, pheader->read_pointer);
6336 return BM_CORRUPTED;
6337 }
6338
6339 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6340 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6341 pheader->read_pointer);
6342 return BM_CORRUPTED;
6343 }
6344
6345 int rp = pheader->read_pointer;
6346 int rp0 = -1;
6347 while (rp != pheader->write_pointer) {
6348 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6349 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6350 pheader->name, rp, rp0);
6351 return BM_CORRUPTED;
6352 }
6353 //bm_print_event(pdata, rp);
6354 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6355 if (rp1 < 0) {
6356 cm_msg(MERROR, "bm_validate_buffer",
6357 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6358 return BM_CORRUPTED;
6359 }
6360 rp0 = rp;
6361 rp = rp1;
6362 }
6363
6364 int i;
6365 for (i = 0; i < MAX_CLIENTS; i++) {
6366 const BUFFER_CLIENT *c = &pheader->client[i];
6367 if (c->pid == 0)
6368 continue;
6369 BOOL get_all = FALSE;
6370 int j;
6371 for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6372 const EVENT_REQUEST *r = &c->event_request[j];
6373 if (!r->valid)
6374 continue;
6376 get_all = (get_all || xget_all);
6377 //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6378 }
6379
6380 int rp = c->read_pointer;
6381 int rp0 = -1;
6382 while (rp != pheader->write_pointer) {
6383 //bm_print_event(pdata, rp);
6384 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6385 if (rp1 < 0) {
6386 cm_msg(MERROR, "bm_validate_buffer",
6387 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6388 pheader->name, c->name, c->read_pointer, rp, rp0);
6389 return BM_CORRUPTED;
6390 }
6391 rp0 = rp;
6392 rp = rp1;
6393 }
6394 }
6395
6396 return BM_SUCCESS;
6397}
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition midas.cxx:6197
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition midas.cxx:6264
INT sampling_type
Definition midas.h:931
BOOL valid
Definition midas.h:928
char c
Definition system.cxx:1310
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_index_locked()

static int bm_validate_client_index_locked ( bm_lock_buffer_guard pbuf_guard)
static

Definition at line 5930 of file midas.cxx.

5931{
5932 const BUFFER *pbuf = pbuf_guard.get_pbuf();
5933
5934 bool badindex = false;
5935 bool badclient = false;
5936
5937 int idx = pbuf->client_index;
5938
5939 if (idx < 0) {
5940 badindex = true;
5941 } else if (idx > pbuf->buffer_header->max_client_index) {
5942 badindex = true;
5943 } else {
5944 BUFFER_CLIENT *pclient = &pbuf->buffer_header->client[idx];
5945 if (pclient->name[0] == 0)
5946 badclient = true;
5947 else if (pclient->pid != ss_getpid())
5948 badclient = true;
5949
5950 //if (strcmp(pclient->name,"mdump")==0) {
5951 // for (int i=0; i<15; i++) {
5952 // printf("sleep %d\n", i);
5953 // ::sleep(1);
5954 // }
5955 //}
5956 }
5957
5958#if 0
5959 if (badindex) {
5960 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5961 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5962 badindex, ss_getpid());
5963 } else if (badclient) {
5964 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5965 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5966 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5967 ss_getpid(), badclient);
5968 } else {
5969 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5970 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5971 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5972 ss_getpid());
5973 }
5974#endif
5975
5976 if (badindex || badclient) {
5977 static int prevent_recursion = 1;
5978
5979 if (prevent_recursion) {
5981
5982 if (badindex) {
5983 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5984 } else {
5985 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5986 }
5987
5988 cm_msg(MERROR, "bm_validate_client_index", "Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5989 }
5990
5991 if (badindex) {
5992 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5993 } else {
5994 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5995 }
5996
5997 fprintf(stderr, "bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
5998
5999 pbuf_guard.unlock();
6000
6001 abort();
6002 }
6003
6004 return idx;
6005}
INT client_index
Definition midas.h:989
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8630 of file midas.cxx.

8630 {
8631 assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8632 assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8633
8634 if (pheader->read_pointer <= pheader->write_pointer) {
8635
8636 if (pclient->read_pointer < pheader->read_pointer) {
8637 cm_msg(MINFO, "bm_validate_client_pointers",
8638 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8639 pclient->name,
8640 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8641
8642 pclient->read_pointer = pheader->read_pointer;
8643 }
8644
8645 if (pclient->read_pointer > pheader->write_pointer) {
8646 cm_msg(MINFO, "bm_validate_client_pointers",
8647 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8648 pclient->name,
8649 pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8650
8651 pclient->read_pointer = pheader->write_pointer;
8652 }
8653
8654 } else {
8655
8656 if (pclient->read_pointer < 0) {
8657 cm_msg(MINFO, "bm_validate_client_pointers",
8658 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8659 pclient->name,
8660 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8661
8662 pclient->read_pointer = pheader->read_pointer;
8663 }
8664
8665 if (pclient->read_pointer >= pheader->size) {
8666 cm_msg(MINFO, "bm_validate_client_pointers",
8667 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8668 pclient->name,
8669 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8670
8671 pclient->read_pointer = pheader->read_pointer;
8672 }
8673
8674 if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8675 cm_msg(MINFO, "bm_validate_client_pointers",
8676 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8677 pclient->name,
8678 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8679
8680 pclient->read_pointer = pheader->read_pointer;
8681 }
8682 }
8683}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6197 of file midas.cxx.

6197 {
6198 if (rp < 0 || rp > pheader->size) {
6199 cm_msg(MERROR, "bm_validate_rp",
6200 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6201 pheader->name,
6202 rp,
6203 pheader->read_pointer,
6204 pheader->write_pointer,
6205 pheader->size,
6206 who);
6207 return FALSE;
6208 }
6209
6210 if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6211 // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6212 cm_msg(MERROR, "bm_validate_rp",
6213 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6214 pheader->name,
6215 rp,
6216 (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6217 pheader->read_pointer,
6218 pheader->write_pointer,
6219 pheader->size,
6220 who);
6221 return FALSE;
6222 }
6223
6224 return TRUE;
6225}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9088 of file midas.cxx.

9089{
9090 // return values:
9091 // BM_SUCCESS - have "requested_space" bytes free in the buffer
9092 // BM_CORRUPTED - shared memory is corrupted
9093 // BM_NO_MEMORY - asked for more than buffer size
9094 // BM_ASYNC_RETURN - timeout waiting for free space
9095 // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9096 // SS_ABORT - we are told to shutdown (locks releases)
9097
9098 int status;
9099 BUFFER* pbuf = pbuf_guard.get_pbuf();
9100 BUFFER_HEADER *pheader = pbuf->buffer_header;
9101 char *pdata = (char *) (pheader + 1);
9102
9103 /* make sure the buffer never completely full:
9104 * read pointer and write pointer would coincide
9105 * and the code cannot tell if it means the
9106 * buffer is 100% full or 100% empty. It will explode
9107 * or lose events */
9108 requested_space += 100;
9109
9110 if (requested_space >= pheader->size)
9111 return BM_NO_MEMORY;
9112
9115
9116 //DWORD blocking_time = 0;
9117 //int blocking_loops = 0;
9118 int blocking_client_index = -1;
9120 blocking_client_name[0] = 0;
9121
9122 while (1) {
9123 while (1) {
9124 /* check if enough space in buffer */
9125
9126 int free = pheader->read_pointer - pheader->write_pointer;
9127 if (free <= 0)
9128 free += pheader->size;
9129
9130 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9131
9132 if (requested_space < free) { /* note the '<' to avoid 100% filling */
9133 //if (blocking_loops) {
9134 // DWORD wait_time = ss_millitime() - blocking_time;
9135 // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9136 //}
9137
9138 if (pbuf->wait_start_time != 0) {
9140 DWORD wait_time = now - pbuf->wait_start_time;
9141 pbuf->time_write_wait += wait_time;
9142 pbuf->wait_start_time = 0;
9143 int iclient = pbuf->wait_client_index;
9144 //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9145 if (iclient >= 0 && iclient < MAX_CLIENTS) {
9146 pbuf->client_count_write_wait[iclient] += 1;
9147 pbuf->client_time_write_wait[iclient] += wait_time;
9148 }
9149 }
9150
9151 //if (blocking_loops > 0) {
9152 // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9153 //}
9154
9155 return BM_SUCCESS;
9156 }
9157
9158 if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9159 cm_msg(MERROR, "bm_wait_for_free_space",
9160 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9161 pheader->name,
9162 pheader->read_pointer,
9163 pheader->write_pointer,
9164 pheader->size,
9165 free,
9167 return BM_CORRUPTED;
9168 }
9169
9170 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9171 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9172 int total_size = ALIGN8(event_size);
9173
9174#if 0
9175 printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9176#endif
9177
9178 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9179 cm_msg(MERROR, "bm_wait_for_free_space",
9180 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9181 pheader->name,
9182 pheader->read_pointer,
9183 pheader->write_pointer,
9184 pheader->size,
9185 free,
9187 pevent->data_size,
9188 event_size,
9189 total_size);
9190 return BM_CORRUPTED;
9191 }
9192
9193 int blocking_client = -1;
9194
9195 int i;
9196 for (i = 0; i < pheader->max_client_index; i++) {
9197 BUFFER_CLIENT *pc = pheader->client + i;
9198 if (pc->pid) {
9199 if (pc->read_pointer == pheader->read_pointer) {
9200 /*
9201 First assume that the client with the "minimum" read pointer
9202 is not really blocking due to a GET_ALL request.
9203 */
9205 //int blocking_request_id = -1;
9206
9207 int j;
9208 for (j = 0; j < pc->max_request_index; j++) {
9209 const EVENT_REQUEST *prequest = pc->event_request + j;
9210 if (prequest->valid
9211 && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9212 if (prequest->sampling_type & GET_ALL) {
9213 blocking = TRUE;
9214 //blocking_request_id = prequest->id;
9215 break;
9216 }
9217 }
9218 }
9219
9220 //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9221
9222 if (blocking) {
9224 break;
9225 }
9226
9227 pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9228 }
9229 }
9230 } /* client loop */
9231
9232 if (blocking_client >= 0) {
9235 //if (!blocking_time) {
9236 // blocking_time = ss_millitime();
9237 //}
9238
9239 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9240
9241 // from this "break" we go into timeout check and sleep/wait.
9242 break;
9243 }
9244
9245 /* no blocking clients. move the read pointer and again check for free space */
9246
9247 BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9248
9249 if (!moved) {
9250 cm_msg(MERROR, "bm_wait_for_free_space",
9251 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9252 pheader->name,
9253 pheader->read_pointer,
9254 pheader->write_pointer,
9255 pheader->size,
9256 free,
9258 return BM_CORRUPTED;
9259 }
9260
9261 /* we freed one event, loop back to the check for free space */
9262 }
9263
9264 //blocking_loops++;
9265
9266 /* at least one client is blocking */
9267
9269 pc->write_wait = requested_space;
9270
9271 if (pbuf->wait_start_time == 0) {
9272 pbuf->wait_start_time = ss_millitime();
9273 pbuf->count_write_wait++;
9274 if (requested_space > pbuf->max_requested_space)
9275 pbuf->max_requested_space = requested_space;
9276 pbuf->wait_client_index = blocking_client_index;
9277 }
9278
9280
9281 //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9282
9283 int sleep_time_msec = 1000;
9284
9285 if (timeout_msec == BM_WAIT) {
9286 // wait forever
9287 } else if (timeout_msec == BM_NO_WAIT) {
9288 // no wait
9289 return BM_ASYNC_RETURN;
9290 } else {
9291 // check timeout
9292 if (now >= time_end) {
9293 // timeout!
9294 return BM_ASYNC_RETURN;
9295 }
9296
9298
9299 if (sleep_time_msec <= 0) {
9300 sleep_time_msec = 10;
9301 } else if (sleep_time_msec > 1000) {
9302 sleep_time_msec = 1000;
9303 }
9304 }
9305
9307
9308 /* before waiting, unlock everything in the correct order */
9309
9310 pbuf_guard.unlock();
9311
9313 pbuf->write_cache_mutex.unlock();
9314
9315 //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9316
9317#ifdef DEBUG_MSG
9318 cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9319#endif
9320
9322 //int idx = bm_validate_client_index_locked(pbuf, FALSE);
9323 //if (idx >= 0)
9324 // pheader->client[idx].write_wait = requested_space;
9325
9326 //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9327
9329
9330 /* we are told to shutdown */
9331 if (status == SS_ABORT) {
9332 // NB: buffer is locked!
9333 return SS_ABORT;
9334 }
9335
9336 /* make sure we do sleep in this loop:
9337 * if we are the mserver receiving data on the event
9338 * socket and the data buffer is full, ss_suspend() will
9339 * never sleep: it will detect data on the event channel,
9340 * call rpc_server_receive() (recursively, we already *are* in
9341 * rpc_server_receive()) and return without sleeping. Result
9342 * is a busy loop waiting for free space in data buffer */
9343
9344 /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9345 * the event socket, and should sleep now, so this sleep below
9346 * maybe is not needed now. but for safety, I keep it. K.O. */
9347
9348 if (status != SS_TIMEOUT) {
9349 //printf("ss_suspend: status %d\n", status);
9350 ss_sleep(1);
9351 }
9352
9353 /* we may be stuck in this loop for an arbitrary long time,
9354 * depending on how other buffer clients read the accumulated data
9355 * so we should update all the timeouts & etc. K.O. */
9356
9358
9359 /* lock things again in the correct order */
9360
9361 if (unlock_write_cache) {
9363
9364 if (status != BM_SUCCESS) {
9365 // bail out with all locks released
9366 return status;
9367 }
9368 }
9369
9370 if (!pbuf_guard.relock()) {
9371 if (unlock_write_cache) {
9372 pbuf->write_cache_mutex.unlock();
9373 }
9374
9375 // bail out with all locks released
9376 return pbuf_guard.get_status();
9377 }
9378
9379 /* revalidate the client index: we could have been removed from the buffer while sleeping */
9381
9382 pc->write_wait = 0;
9383
9385 //idx = bm_validate_client_index_locked(pbuf, FALSE);
9386 //if (idx >= 0)
9387 // pheader->client[idx].write_wait = 0;
9388 //else {
9389 // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9390 // status = SS_ABORT;
9391 //}
9392
9393#ifdef DEBUG_MSG
9394 cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9395#endif
9396
9397 }
9398}
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition midas.cxx:8728
INT cm_periodic_tasks()
Definition midas.cxx:5587
#define SS_TIMEOUT
Definition midas.h:674
#define MDEBUG
Definition midas.h:561
#define MSG_BM
Definition msystem.h:295
INT ss_suspend(INT millisec, INT msg)
Definition system.cxx:4543
INT ss_sleep(INT millisec)
Definition system.cxx:3628
char name[NAME_LENGTH]
Definition midas.h:935
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9400 of file midas.cxx.

9401{
9402 BUFFER* pbuf = pbuf_guard.get_pbuf();
9403 BUFFER_HEADER* pheader = pbuf->buffer_header;
9404
9405 //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9406
9407 if (pc->read_pointer != pheader->write_pointer) {
9408 // buffer has data
9409 return BM_SUCCESS;
9410 }
9411
9412 if (timeout_msec == BM_NO_WAIT) {
9413 /* event buffer is empty and we are told to not wait */
9414 if (!pc->read_wait) {
9415 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9416 pc->read_wait = TRUE;
9417 }
9418 return BM_ASYNC_RETURN;
9419 }
9420
9423 DWORD sleep_time = 1000;
9424 if (timeout_msec == BM_NO_WAIT) {
9425 // default sleep time
9426 } else if (timeout_msec == BM_WAIT) {
9427 // default sleep time
9428 } else {
9431 }
9432
9433 //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9434
9435 while (pc->read_pointer == pheader->write_pointer) {
9436 /* wait until there is data in the buffer (write pointer moves) */
9437
9438 if (!pc->read_wait) {
9439 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9440 pc->read_wait = TRUE;
9441 }
9442
9443 pc->last_activity = ss_millitime();
9444
9446
9447 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9448
9449 pbuf_guard.unlock();
9450
9452 pbuf->read_cache_mutex.unlock();
9453
9455
9456 if (timeout_msec == BM_NO_WAIT) {
9457 // return immediately
9458 } else if (timeout_msec == BM_WAIT) {
9459 // wait forever
9460 } else {
9462 //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9463 if (now >= time_wait) {
9464 timeout_msec = BM_NO_WAIT; // cause immediate return
9465 } else {
9467 if (sleep_time > 1000)
9468 sleep_time = 1000;
9469 //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9470 }
9471 }
9472
9473 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9474
9475 if (unlock_read_cache) {
9477 if (status != BM_SUCCESS) {
9478 // bail out with all locks released
9479 return status;
9480 }
9481 }
9482
9483 if (!pbuf_guard.relock()) {
9484 if (unlock_read_cache) {
9485 pbuf->read_cache_mutex.unlock();
9486 }
9487 // bail out with all locks released
9488 return pbuf_guard.get_status();
9489 }
9490
9491 /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9492 * because we may have been removed from the buffer by bm_cleanup() & co
9493 * due to a timeout or whatever. */
9495
9496 /* return if TCP connection broken */
9497 if (status == SS_ABORT)
9498 return SS_ABORT;
9499
9500 if (timeout_msec == BM_NO_WAIT)
9501 return BM_ASYNC_RETURN;
9502 }
9503
9504 if (pc->read_wait) {
9505 //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9506 pc->read_wait = FALSE;
9507 }
9508
9509 return BM_SUCCESS;
9510}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8795 of file midas.cxx.

8795 {
8796 int i;
8797 int have_get_all_requests = 0;
8798
8799 for (i = 0; i < pc->max_request_index; i++)
8800 if (pc->event_request[i].valid)
8801 have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8802
8803 /* only GET_ALL requests actually free space in the event buffer */
8805 return;
8806
8807 /*
8808 If read pointer has been changed, it may have freed up some space
8809 for waiting producers. So check if free space is now more than 50%
8810 of the buffer size and wake waiting producers.
8811 */
8812
8813 int free_space = pc->read_pointer - pheader->write_pointer;
8814 if (free_space <= 0)
8815 free_space += pheader->size;
8816
8817 if (free_space >= pheader->size * 0.5) {
8818 for (i = 0; i < pheader->max_client_index; i++) {
8819 const BUFFER_CLIENT *pc = pheader->client + i;
8820 if (pc->pid && pc->write_wait) {
8821 BOOL send_wakeup = (pc->write_wait < free_space);
8822 //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8823 if (send_wakeup) {
8824 ss_resume(pc->port, "B ");
8825 }
8826 }
8827 }
8828 }
8829}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6594 of file midas.cxx.

6595{
6596 //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6597
6599
6600 if (!pbuf_guard.is_locked())
6601 return;
6602
6603 if (!force) {
6604 if (pbuf->count_lock == pbuf->last_count_lock) {
6605 return;
6606 }
6607 }
6608
6609 std::string buffer_name = pbuf->buffer_name;
6610 std::string client_name = pbuf->client_name;
6611
6612 if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6613 // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6614 pbuf_guard.unlock(); // unlock before cm_msg()
6615 cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6616 return;
6617 }
6618
6619 pbuf->last_count_lock = pbuf->count_lock;
6620
6622 BUFFER_HEADER xheader = *pbuf->buffer_header;
6623 int client_index = pbuf->client_index;
6624
6625 pbuf_guard.unlock();
6626
6627 bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6628}
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition midas.cxx:6472
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char buffer_name,
const char client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6472 of file midas.cxx.

6473{
6474 int status;
6475
6477
6478 HNDLE hKey;
6479 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6480 if (status != DB_SUCCESS) {
6481 db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6482 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6483 if (status != DB_SUCCESS)
6484 return;
6485 }
6486
6489 if (status != DB_SUCCESS) {
6492 if (status != DB_SUCCESS)
6493 return;
6494 }
6495
6496 double buf_size = pheader->size;
6497 double buf_rptr = pheader->read_pointer;
6498 double buf_wptr = pheader->write_pointer;
6499
6500 double buf_fill = 0;
6501 double buf_cptr = 0;
6502 double buf_cused = 0;
6503 double buf_cused_pct = 0;
6504
6505 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6506 buf_cptr = pheader->client[client_index].read_pointer;
6507
6508 if (buf_wptr == buf_cptr) {
6509 buf_cused = 0;
6510 } else if (buf_wptr > buf_cptr) {
6512 } else {
6513 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6514 }
6515
6516 buf_cused_pct = buf_cused / buf_size * 100.0;
6517
6518 // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6519 // because some other GET_ALL client may have different buf_cused & etc,
6520 // so they must be written into the per-client statistics
6521 // and the web page should look at all the GET_ALL clients and used
6522 // the biggest buf_cused as the whole-buffer "bytes used" value.
6523 }
6524
6525 if (buf_wptr == buf_rptr) {
6526 buf_fill = 0;
6527 } else if (buf_wptr > buf_rptr) {
6529 } else {
6530 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6531 }
6532
6533 double buf_fill_pct = buf_fill / buf_size * 100.0;
6534
6535 db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6536 db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6537 db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6538 db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6539 db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6540
6541 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6542 if (status != DB_SUCCESS) {
6543 db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6544 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6545 if (status != DB_SUCCESS)
6546 return;
6547 }
6548
6550 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6551 if (status != DB_SUCCESS) {
6552 db_create_key(hDB, hKey, client_name, TID_KEY);
6553 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6554 if (status != DB_SUCCESS)
6555 return;
6556 }
6557
6558 db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6559 db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6560 db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6561 db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6562 db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6563 db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6564 db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6565 db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6566 db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6567 db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6568 db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6569 db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6570
6571 for (int i = 0; i < MAX_CLIENTS; i++) {
6572 if (!pbuf->client_count_write_wait[i])
6573 continue;
6574
6575 if (pheader->client[i].pid == 0)
6576 continue;
6577
6578 if (pheader->client[i].name[0] == 0)
6579 continue;
6580
6581 char str[100 + NAME_LENGTH];
6582
6583 sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6584 db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6585
6586 sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6587 db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6588 }
6589
6590 db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6591 db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6592}
#define TID_DOUBLE
Definition midas.h:343
#define TID_KEY
Definition midas.h:349
#define TID_BOOL
Definition midas.h:340
#define TID_INT32
Definition midas.h:339
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition odb.cxx:3308
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition odb.cxx:5261
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7288 of file midas.cxx.

7288 {
7289#ifdef LOCAL_ROUTINES
7290 {
7291 int status;
7292 HNDLE hDB;
7293
7295
7296 if (status != CM_SUCCESS) {
7297 //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7298 return BM_SUCCESS;
7299 }
7300
7301 std::vector<BUFFER*> mybuffers;
7302
7303 gBuffersMutex.lock();
7305 gBuffersMutex.unlock();
7306
7307 for (BUFFER* pbuf : mybuffers) {
7308 if (!pbuf || !pbuf->attached)
7309 continue;
7311 }
7312 }
7313#endif /* LOCAL_ROUTINES */
7314
7315 return BM_SUCCESS;
7316}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9512 of file midas.cxx.

9513{
9514 char *pdata = (char *) (pheader + 1);
9515
9516 //int old_write_pointer = pheader->write_pointer;
9517
9518 /* new event fits into the remaining space? */
9519 if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9520 //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9521 char* wptr = pdata + pheader->write_pointer;
9522 for (int i=0; i<sg_n; i++) {
9523 //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9524 memcpy(wptr, sg_ptr[i], sg_len[i]);
9525 wptr += sg_len[i];
9526 }
9527 pheader->write_pointer = pheader->write_pointer + total_size;
9528 assert(pheader->write_pointer <= pheader->size);
9529 /* remaining space is smaller than size of an event header? */
9530 if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9531 // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9532 // equal to the event header size, we will write the next event header here,
9533 // then wrap the pointer and write the event data at the beginning of the buffer.
9534 //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9535 pheader->write_pointer = 0;
9536 }
9537 } else {
9538 /* split event */
9539 size_t size = pheader->size - pheader->write_pointer;
9540
9541 //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9542
9543 //memcpy(pdata + pheader->write_pointer, pevent, size);
9544 //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9545
9546 char* wptr = pdata + pheader->write_pointer;
9547 size_t count = 0;
9548
9549 // copy first part
9550
9551 int i = 0;
9552 for (; i<sg_n; i++) {
9553 if (count + sg_len[i] > size)
9554 break;
9555 memcpy(wptr, sg_ptr[i], sg_len[i]);
9556 wptr += sg_len[i];
9557 count += sg_len[i];
9558 }
9559
9560 //printf("wptr %d, count %d\n", wptr-pdata, count);
9561
9562 // split segment
9563
9564 size_t first = size - count;
9565 size_t second = sg_len[i] - first;
9566 assert(first + second == sg_len[i]);
9567 assert(count + first == size);
9568
9569 //printf("first %d, second %d\n", first, second);
9570
9571 memcpy(wptr, sg_ptr[i], first);
9572 wptr = pdata + 0;
9573 count += first;
9575 wptr += second;
9576 count += second;
9577 i++;
9578
9579 // copy remaining
9580
9581 for (; i<sg_n; i++) {
9582 memcpy(wptr, sg_ptr[i], sg_len[i]);
9583 wptr += sg_len[i];
9584 count += sg_len[i];
9585 }
9586
9587 //printf("wptr %d, count %d\n", wptr-pdata, count);
9588
9589 //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9590
9591 pheader->write_pointer = total_size - size;
9592 }
9593
9594 //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9595}
Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5927 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5922 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5928 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11289 of file midas.cxx.