MIDAS
Loading...
Searching...
No Matches
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

static int bm_validate_client_index_locked (bm_lock_buffer_guard &pbuf_guard)
 
INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char > > &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11297 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8324 of file midas.cxx.

8368{
8369 if (rpc_is_remote())
8370 return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8371 trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8372
8373#ifdef LOCAL_ROUTINES
8374 {
8375 int status = 0;
8376
8377 BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8378
8379 if (!pbuf)
8380 return status;
8381
8382 /* lock buffer */
8383 bm_lock_buffer_guard pbuf_guard(pbuf);
8384
8385 if (!pbuf_guard.is_locked())
8386 return pbuf_guard.get_status();
8387
8388 /* avoid callback/non callback requests */
8389 if (func == NULL && pbuf->callback) {
8390 pbuf_guard.unlock(); // unlock before cm_msg()
8391 cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8392 return BM_INVALID_MIXING;
8393 }
8394
8395 /* do not allow GET_RECENT with nonzero cache size */
8396 if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8397 pbuf_guard.unlock(); // unlock before cm_msg()
8398 cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8399 return BM_INVALID_PARAM;
8400 }
8401
8402 /* get a pointer to the proper client structure */
8403 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
8404
8405 /* look for a empty request entry */
8406 int i;
8407 for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8408 if (!pclient->event_request[i].valid)
8409 break;
8410
8411 if (i == MAX_EVENT_REQUESTS) {
8412 // implicit unlock
8413 return BM_NO_MEMORY;
8414 }
8415
8416 /* setup event_request structure */
8417 pclient->event_request[i].id = request_id;
8418 pclient->event_request[i].valid = TRUE;
8419 pclient->event_request[i].event_id = event_id;
8421 pclient->event_request[i].sampling_type = sampling_type;
8422
8423 pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8424
8425 pbuf->get_all_flag = pclient->all_flag;
8426
8427 /* set callback flag in buffer structure */
8428 if (func != NULL)
8429 pbuf->callback = TRUE;
8430
8431 /*
8432 Save the index of the last request in the list so that later only the
8433 requests 0..max_request_index-1 have to be searched through.
8434 */
8435
8436 if (i + 1 > pclient->max_request_index)
8437 pclient->max_request_index = i + 1;
8438 }
8439#endif /* LOCAL_ROUTINES */
8440
8441 return BM_SUCCESS;
8442}
static BUFFER_CLIENT * bm_get_my_client_locked(bm_lock_buffer_guard &pbuf_guard)
Definition midas.cxx:6016
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition midas.cxx:6632
#define BM_INVALID_PARAM
Definition midas.h:619
#define BM_NO_MEMORY
Definition midas.h:607
#define BM_INVALID_MIXING
Definition midas.h:621
#define BM_SUCCESS
Definition midas.h:605
#define GET_ALL
Definition midas.h:321
#define GET_RECENT
Definition midas.h:323
#define MERROR
Definition midas.h:559
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:930
#define RPC_BM_ADD_EVENT_REQUEST
Definition mrpc.h:43
bool rpc_is_remote(void)
Definition midas.cxx:12786
INT rpc_call(DWORD routine_id,...)
Definition midas.cxx:13688
INT i
Definition mdump.cxx:32
int INT
Definition midas.h:129
#define TRUE
Definition midas.h:182
#define MAX_EVENT_REQUESTS
Definition midas.h:275
#define POINTER_T
Definition midas.h:166
#define trigger_mask
#define event_id
DWORD status
Definition odbhist.cxx:39
BOOL all_flag
Definition midas.h:950
EVENT_REQUEST event_request[MAX_EVENT_REQUESTS]
Definition midas.h:954
INT max_request_index
Definition midas.h:942
BOOL get_all_flag
Definition midas.h:1009
BOOL callback
Definition midas.h:1007
std::atomic< size_t > read_cache_size
Definition midas.h:995
short int event_id
Definition midas.h:930
short int trigger_mask
Definition midas.h:931
INT sampling_type
Definition midas.h:932
BOOL valid
Definition midas.h:929
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 10979 of file midas.cxx.

10979 {
10980#ifdef LOCAL_ROUTINES
10981 {
10982 INT status = 0;
10983 BOOL bMore;
10984 DWORD start_time;
10985 //static DWORD last_time = 0;
10986
10987 /* if running as a server, buffer checking is done by client
10988 via ASYNC bm_receive_event */
10989 if (rpc_is_mserver()) {
10990 return FALSE;
10991 }
10992
10993 bMore = FALSE;
10994 start_time = ss_millitime();
10995
10996 std::vector<BUFFER*> mybuffers;
10997
10998 gBuffersMutex.lock();
10999 mybuffers = gBuffers;
11000 gBuffersMutex.unlock();
11001
11002 /* go through all buffers */
11003 for (size_t idx = 0; idx < mybuffers.size(); idx++) {
11004 BUFFER* pbuf = mybuffers[idx];
11005
11006 if (!pbuf || !pbuf->attached)
11007 continue;
11008
11009 //int count_loops = 0;
11010 while (1) {
11011 if (pbuf->attached) {
11012 /* one bm_push_event could cause a run stop and a buffer close, which
11013 * would crash the next call to bm_push_event(). So check for valid
11014 * buffer on each call */
11015
11016 /* this is what happens:
11017 * bm_push_buffer() may call a user callback function
11018 * user callback function may indirectly call bm_close() of this buffer,
11019 * i.e. if it stops the run,
11020 * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
11021 * here we will see pbuf->attched is false and quit this loop
11022 */
11023
11024 status = bm_push_buffer(pbuf, idx + 1);
11025
11026 if (status == BM_CORRUPTED) {
11027 return status;
11028 }
11029
11030 //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
11031
11032 if (status != BM_MORE_EVENTS) {
11033 //DWORD t = ss_millitime() - start_time;
11034 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
11035 break;
11036 }
11037
11038 // count_loops++;
11039 }
11040
11041 // NB: this code has a logic error: if 2 buffers always have data,
11042 // this timeout will cause us to exit reading the 1st buffer
11043 // after 1000 msec, then we read the 2nd buffer exactly once,
11044 // and exit the loop because the timeout is still active -
11045 // we did not reset "start_time" when we started reading
11046 // from the 2nd buffer. Result is that we always read all
11047 // the data in a loop from the 1st buffer, but read just
11048 // one event from the 2nd buffer, resulting in severe unfairness.
11049
11050 /* stop after one second */
11051 DWORD t = ss_millitime() - start_time;
11052 if (t > 1000) {
11053 //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
11054 bMore = TRUE;
11055 break;
11056 }
11057 }
11058 }
11059
11060 //last_time = start_time;
11061
11062 return bMore;
11063
11064 }
11065#else /* LOCAL_ROUTINES */
11066
11067 return FALSE;
11068
11069#endif
11070}
#define FALSE
Definition cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition midas.cxx:10927
#define BM_MORE_EVENTS
Definition midas.h:620
#define BM_CORRUPTED
Definition midas.h:623
unsigned int DWORD
Definition mcstd.h:51
DWORD ss_millitime()
Definition system.cxx:3465
bool rpc_is_mserver(void)
Definition midas.cxx:12843
static std::mutex gBuffersMutex
Definition midas.cxx:195
static std::vector< BUFFER * > gBuffers
Definition midas.cxx:196
DWORD BOOL
Definition midas.h:105
std::atomic_bool attached
Definition midas.h:988
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8976 of file midas.cxx.

8976 {
8977
8978 BOOL is_requested = FALSE;
8979 int i;
8980 for (i = 0; i < pc->max_request_index; i++) {
8981 const EVENT_REQUEST *prequest = pc->event_request + i;
8982 if (prequest->valid) {
8983 if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8984 /* check if this is a recent event */
8985 if (prequest->sampling_type == GET_RECENT) {
8986 if (ss_time() - pevent->time_stamp > 1) {
8987 /* skip that event */
8988 continue;
8989 }
8990 }
8991
8992 is_requested = TRUE;
8993 break;
8994 }
8995 }
8996 }
8997 return is_requested;
8998}
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition midas.cxx:6032
DWORD ss_time()
Definition system.cxx:3534
DWORD time_stamp
Definition midas.h:856
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char *  who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6083 of file midas.cxx.

6083 {
6084 BUFFER_HEADER *pheader;
6085 BUFFER_CLIENT *pbclient;
6086 int j;
6087
6088 pheader = pbuf->buffer_header;
6089 pbclient = pheader->client;
6090
6091 /* now check other clients */
6092 for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6093 if (pbclient->pid) {
6094 if (!ss_pid_exists(pbclient->pid)) {
6095 cm_msg(MINFO, "bm_cleanup",
6096 "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6097 pheader->name, who, pbclient->pid);
6098
6099 bm_remove_client_locked(pheader, j);
6100 continue;
6101 }
6102 }
6103
6104 /* If client process has no activity, clear its buffer entry. */
6105 if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6106 DWORD tdiff = actual_time - pbclient->last_activity;
6107#if 0
6108 printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6109 pheader->name,
6110 pbclient->name,
6111 pbclient->last_activity,
6113 tdiff,
6114 tdiff,
6115 pbclient->watchdog_timeout);
6116#endif
6117 if (actual_time > pbclient->last_activity &&
6118 tdiff > pbclient->watchdog_timeout) {
6119
6120 cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6121 pbclient->name, pheader->name, who,
6122 tdiff / 1000.0,
6123 pbclient->watchdog_timeout / 1000.0);
6124
6125 bm_remove_client_locked(pheader, j);
6126 }
6127 }
6128 }
6129}
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition midas.cxx:6052
#define MINFO
Definition midas.h:560
BOOL ss_pid_exists(int pid)
Definition system.cxx:1442
DWORD actual_time
Definition mfe.cxx:37
INT j
Definition odbhist.cxx:40
DWORD watchdog_timeout
Definition midas.h:952
DWORD last_activity
Definition midas.h:951
char name[NAME_LENGTH]
Definition midas.h:936
char name[NAME_LENGTH]
Definition midas.h:959
INT max_client_index
Definition midas.h:961
BUFFER_CLIENT client[MAX_CLIENTS]
Definition midas.h:968
BUFFER_HEADER * buffer_header
Definition midas.h:993
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6425 of file midas.cxx.

6425 {
6426 std::string str = msprintf("/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6427 //printf("delete [%s]\n", str);
6428 db_delete(hDB, 0, str.c_str());
6429}
INT db_delete(HNDLE hDB, HNDLE hKeyRoot, const char *odb_path)
Definition odb.cxx:3999
HNDLE hDB
main ODB handle
Definition mana.cxx:207
std::string msprintf(const char *format,...)
Definition midas.cxx:419
char str[256]
Definition odbhist.cxx:33
char client_name[NAME_LENGTH]
Definition midas.h:991
char buffer_name[NAME_LENGTH]
Definition midas.h:992
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7253 of file midas.cxx.

7253 {
7254 if (rpc_is_remote())
7256
7257#ifdef LOCAL_ROUTINES
7258 {
7260
7261 gBuffersMutex.lock();
7262 size_t nbuf = gBuffers.size();
7263 gBuffersMutex.unlock();
7264
7265 for (size_t i = nbuf; i > 0; i--) {
7267 }
7268
7269 gBuffersMutex.lock();
7270 for (size_t i=0; i< gBuffers.size(); i++) {
7271 BUFFER* pbuf = gBuffers[i];
7272 if (!pbuf)
7273 continue;
7274 delete pbuf;
7275 pbuf = NULL;
7276 gBuffers[i] = NULL;
7277 }
7278 gBuffersMutex.unlock();
7279 }
7280#endif /* LOCAL_ROUTINES */
7281
7282 return BM_SUCCESS;
7283}
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7106
int cm_msg_close_buffer(void)
Definition midas.cxx:500
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7106 of file midas.cxx.

7106 {
7107 //printf("bm_close_buffer: handle %d\n", buffer_handle);
7108
7109 if (rpc_is_remote())
7110 return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7111
7112#ifdef LOCAL_ROUTINES
7113 {
7114 int status = 0;
7115
7116 BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7117
7118 if (!pbuf)
7119 return status;
7120
7121 //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7122
7123 int i;
7124
7125 { /* delete all requests for this buffer */
7126 _request_list_mutex.lock();
7127 std::vector<EventRequest> request_list_copy = _request_list;
7128 _request_list_mutex.unlock();
7129 for (size_t i = 0; i < request_list_copy.size(); i++) {
7130 if (request_list_copy[i].buffer_handle == buffer_handle) {
7132 }
7133 }
7134 }
7135
7136 HNDLE hDB;
7138
7139 if (hDB) {
7140 /* write statistics to odb */
7142 }
7143
7144 /* lock buffer in correct order */
7145
7147
7148 if (status != BM_SUCCESS) {
7149 return status;
7150 }
7151
7153
7154 if (status != BM_SUCCESS) {
7155 pbuf->read_cache_mutex.unlock();
7156 return status;
7157 }
7158
7159 bm_lock_buffer_guard pbuf_guard(pbuf);
7160
7161 if (!pbuf_guard.is_locked()) {
7162 pbuf->write_cache_mutex.unlock();
7163 pbuf->read_cache_mutex.unlock();
7164 return pbuf_guard.get_status();
7165 }
7166
7167 BUFFER_HEADER *pheader = pbuf->buffer_header;
7168
7169 /* mark entry in _buffer as empty */
7170 pbuf->attached = false;
7171
7172 BUFFER_CLIENT* pclient = bm_get_my_client_locked(pbuf_guard);
7173
7174 if (pclient) {
7175 /* clear entry from client structure in buffer header */
7176 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7177 }
7178
7179 /* calculate new max_client_index entry */
7180 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7181 if (pheader->client[i].pid != 0)
7182 break;
7183 pheader->max_client_index = i + 1;
7184
7185 /* count new number of clients */
7186 int j = 0;
7187 for (i = MAX_CLIENTS - 1; i >= 0; i--)
7188 if (pheader->client[i].pid != 0)
7189 j++;
7190 pheader->num_clients = j;
7191
7192 int destroy_flag = (pheader->num_clients == 0);
7193
7194 // we hold the locks on the read cache and the write cache.
7195
7196 /* free cache */
7197 if (pbuf->read_cache_size > 0) {
7198 free(pbuf->read_cache);
7199 pbuf->read_cache = NULL;
7200 pbuf->read_cache_size = 0;
7201 pbuf->read_cache_rp = 0;
7202 pbuf->read_cache_wp = 0;
7203 }
7204
7205 if (pbuf->write_cache_size > 0) {
7206 free(pbuf->write_cache);
7207 pbuf->write_cache = NULL;
7208 pbuf->write_cache_size = 0;
7209 pbuf->write_cache_rp = 0;
7210 pbuf->write_cache_wp = 0;
7211 }
7212
7213 /* check if anyone is waiting and wake him up */
7214
7215 for (int i = 0; i < pheader->max_client_index; i++) {
7216 BUFFER_CLIENT *pclient = pheader->client + i;
7217 if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7218 ss_resume(pclient->port, "B ");
7219 }
7220
7221 /* unmap shared memory, delete it if we are the last */
7222
7223 ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7224
7225 /* after ss_shm_close() these are invalid: */
7226
7227 pheader = NULL;
7228 pbuf->buffer_header = NULL;
7229 pbuf->shm_size = 0;
7230 pbuf->shm_handle = 0;
7231
7232 /* unlock buffer in correct order */
7233
7234 pbuf_guard.unlock();
7235
7236 pbuf->write_cache_mutex.unlock();
7237 pbuf->read_cache_mutex.unlock();
7238
7239 /* delete semaphore */
7240
7241 ss_semaphore_delete(pbuf->semaphore, destroy_flag);
7242 }
7243#endif /* LOCAL_ROUTINES */
7244
7245 return BM_SUCCESS;
7246}
INT bm_delete_request(INT request_id)
Definition midas.cxx:8594
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition midas.cxx:6596
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3026
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition midas.cxx:7914
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition midas.cxx:7935
INT ss_resume(INT port, const char *message)
Definition system.cxx:4916
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition system.cxx:2941
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition system.cxx:757
#define RPC_BM_CLOSE_BUFFER
Definition mrpc.h:37
static std::vector< EventRequest > _request_list
Definition midas.cxx:220
static std::mutex _request_list_mutex
Definition midas.cxx:219
#define MAX_CLIENTS
Definition midas.h:274
INT HNDLE
Definition midas.h:132
INT write_wait
Definition midas.h:948
BOOL read_wait
Definition midas.h:947
INT num_clients
Definition midas.h:960
HNDLE semaphore
Definition midas.h:1004
size_t read_cache_rp
Definition midas.h:997
std::timed_mutex read_cache_mutex
Definition midas.h:994
std::timed_mutex write_cache_mutex
Definition midas.h:999
size_t shm_size
Definition midas.h:1006
char * read_cache
Definition midas.h:996
size_t write_cache_rp
Definition midas.h:1002
size_t write_cache_wp
Definition midas.h:1003
char * write_cache
Definition midas.h:1001
size_t read_cache_wp
Definition midas.h:998
std::atomic< size_t > write_cache_size
Definition midas.h:1000
INT shm_handle
Definition midas.h:1005
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition midas.cxx:8291
#define serial_number
#define time_stamp
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8291 of file midas.cxx.

8292{
8293 event_header->event_id = event_id;
8294 event_header->trigger_mask = trigger_mask;
8295 event_header->data_size = data_size;
8296 event_header->time_stamp = ss_time();
8297 event_header->serial_number = serial;
8298
8299 return BM_SUCCESS;
8300}
INT serial
Definition minife.c:20
short int event_id
Definition midas.h:853
DWORD data_size
Definition midas.h:857
DWORD serial_number
Definition midas.h:855
short int trigger_mask
Definition midas.h:854
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8302 of file midas.cxx.

8303{
8304 static std::mutex mutex;
8305
8306 event_header->event_id = event_id;
8307 event_header->trigger_mask = trigger_mask;
8308 event_header->data_size = data_size;
8309 event_header->time_stamp = ss_time();
8310 {
8311 std::lock_guard<std::mutex> lock(mutex);
8312 event_header->serial_number = *serial;
8313 *serial = *serial + 1;
8314 // implicit unlock
8315 }
8316
8317 return BM_SUCCESS;
8318}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9079 of file midas.cxx.

9079 {
9080 /* now convert event header */
9081 if (convert_flags) {
9082 rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9083 rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9084 rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9085 rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9086 rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9087 }
9088}
#define TID_UINT32
Definition midas.h:337
#define TID_INT16
Definition midas.h:335
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition midas.cxx:11706
#define RPC_OUTGOING
Definition midas.h:1584
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8594 of file midas.cxx.

8595{
8596 _request_list_mutex.lock();
8597
8598 if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8599 _request_list_mutex.unlock();
8600 return BM_INVALID_HANDLE;
8601 }
8602
8603 int buffer_handle = _request_list[request_id].buffer_handle;
8604
8605 _request_list[request_id].clear();
8606
8607 _request_list_mutex.unlock();
8608
8609 /* remove request entry from buffer */
8610 return bm_remove_event_request(buffer_handle, request_id);
8611}
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition midas.cxx:8528
#define BM_INVALID_HANDLE
Definition midas.h:609
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8833 of file midas.cxx.

8834{
8835 _request_list_mutex.lock();
8836 bool locked = true;
8837 size_t n = _request_list.size();
8838 /* call dispatcher */
8839 for (size_t i = 0; i < n; i++) {
8840 if (!locked) {
8841 _request_list_mutex.lock();
8842 locked = true;
8843 }
8845 if (r.buffer_handle != buffer_handle)
8846 continue;
8847 if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8848 continue;
8849 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8850 _request_list_mutex.unlock();
8851 locked = false;
8852 /* if event is fragmented, call defragmenter */
8853 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8854 bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8855 } else {
8856 r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8857 }
8858 }
8859 if (locked)
8860 _request_list_mutex.unlock();
8861}
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition midas.cxx:11309
DWORD n[4]
Definition mana.cxx:247
#define EVENTID_FRAG
Definition midas.h:908
#define EVENTID_FRAG1
Definition midas.h:907
short int event_id
Definition midas.cxx:206
INT buffer_handle
Definition midas.cxx:205
short int trigger_mask
Definition midas.cxx:207
EVENT_HANDLER * dispatcher
Definition midas.cxx:208
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11265 of file midas.cxx.

11265 {
11266 if (rpc_is_remote())
11268
11269#ifdef LOCAL_ROUTINES
11270 {
11271 std::vector<BUFFER*> mybuffers;
11272
11273 gBuffersMutex.lock();
11274 mybuffers = gBuffers;
11275 gBuffersMutex.unlock();
11276
11277 /* go through all buffers */
11278 for (BUFFER* pbuf : mybuffers) {
11279 if (!pbuf)
11280 continue;
11281 if (!pbuf->attached)
11282 continue;
11283
11284 int status = bm_skip_event(pbuf);
11285 if (status != BM_SUCCESS)
11286 return status;
11287 }
11288 }
11289#endif /* LOCAL_ROUTINES */
11290
11291 return BM_SUCCESS;
11292}
static int bm_skip_event(BUFFER *pbuf)
Definition midas.cxx:10858
#define RPC_BM_EMPTY_BUFFERS
Definition mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 9002 of file midas.cxx.

9003{
9004 BUFFER* pbuf = pbuf_guard.get_pbuf();
9005 BUFFER_HEADER* pheader = pbuf->buffer_header;
9006 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
9007 BOOL need_wakeup = FALSE;
9008
9009 //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
9010
9011 /* loop over all events in the buffer */
9012
9013 while (1) {
9014 EVENT_HEADER *pevent = NULL;
9015 int event_size = 3; // poison value
9016 int total_size = 3; // poison value
9017
9018 int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
9019 if (status == BM_CORRUPTED) {
9020 return status;
9021 } else if (status != BM_SUCCESS) {
9022 /* event buffer is empty */
9023 if (timeout_msec == BM_NO_WAIT) {
9024 if (need_wakeup)
9025 bm_wakeup_producers_locked(pheader, pc);
9026 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
9027 // read cache is empty
9028 return BM_ASYNC_RETURN;
9029 }
9030 return BM_SUCCESS;
9031 }
9032
9033 int status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, TRUE);
9034
9035 if (status != BM_SUCCESS) {
9036 // we only come here with SS_ABORT & co
9037 return status;
9038 }
9039
9040 // make sure we wait for new event only once
9041 timeout_msec = BM_NO_WAIT;
9042 // go back to bm_peek_buffer_locked
9043 continue;
9044 }
9045
9046 /* loop over all requests: if this event matches a request,
9047 * copy it to the read cache */
9048
9049 BOOL is_requested = bm_check_requests(pc, pevent);
9050
9051 if (is_requested) {
9052 if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9053 /* read cache is full */
9054 if (need_wakeup)
9055 bm_wakeup_producers_locked(pheader, pc);
9056 return BM_SUCCESS;
9057 }
9058
9060
9061 pbuf->read_cache_wp += total_size;
9062
9063 /* update statistics */
9064 pheader->num_out_events++;
9065 pbuf->count_read++;
9066 pbuf->bytes_read += event_size;
9067 }
9068
9069 /* shift read pointer */
9070
9071 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9072 pc->read_pointer = new_read_pointer;
9073
9074 need_wakeup = TRUE;
9075 }
9076 /* NOT REACHED */
9077}
BUFFER * get_pbuf() const
Definition midas.cxx:3196
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition midas.cxx:8797
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition midas.cxx:6240
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:8976
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition midas.cxx:8946
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8901
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition midas.cxx:9402
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_NO_WAIT
Definition midas.h:366
int event_size
Definition msysmon.cxx:527
INT read_pointer
Definition midas.h:941
INT num_out_events
Definition midas.h:966
double bytes_read
Definition midas.h:1022
int count_read
Definition midas.h:1021
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9599 of file midas.cxx.

9599 {
9600 if (pc->pid) {
9601 int j;
9602 for (j = 0; j < pc->max_request_index; j++) {
9603 const EVENT_REQUEST *prequest = pc->event_request + j;
9604 if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9605 return prequest->id;
9606 }
9607 }
9608 }
9609
9610 return -1;
9611}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10232 of file midas.cxx.

10233{
10234 if (rpc_is_remote()) {
10235 return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10236 }
10237
10238#ifdef LOCAL_ROUTINES
10239 {
10240 INT status = 0;
10241
10242 //printf("bm_flush_cache!\n");
10243
10244 BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10245
10246 if (!pbuf)
10247 return status;
10248
10249 if (pbuf->write_cache_size == 0)
10250 return BM_SUCCESS;
10251
10253
10254 if (status != BM_SUCCESS)
10255 return status;
10256
10257 /* check if anything needs to be flushed */
10258 if (pbuf->write_cache_wp == 0) {
10259 pbuf->write_cache_mutex.unlock();
10260 return BM_SUCCESS;
10261 }
10262
10263 /* lock the buffer */
10264 bm_lock_buffer_guard pbuf_guard(pbuf);
10265
10266 if (!pbuf_guard.is_locked())
10267 return pbuf_guard.get_status();
10268
10269 status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
10270
10271 /* unlock in correct order */
10272
10273 if (pbuf_guard.is_locked()) {
10274 // check if bm_wait_for_free_space() failed to relock the buffer
10275 pbuf_guard.unlock();
10276 }
10277
10278 pbuf->write_cache_mutex.unlock();
10279
10280 return status;
10281 }
10282#endif /* LOCAL_ROUTINES */
10283
10284 return BM_SUCCESS;
10285}
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition midas.cxx:9997
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:10088
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10088 of file midas.cxx.

10089{
10090 // NB we come here with write cache locked and buffer locked.
10091
10092 {
10093 INT status = 0;
10094
10095 //printf("bm_flush_cache_locked!\n");
10096
10097 BUFFER* pbuf = pbuf_guard.get_pbuf();
10098 BUFFER_HEADER* pheader = pbuf->buffer_header;
10099
10100 //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10101
10102 int old_write_pointer = pheader->write_pointer;
10103
10104 int request_id[MAX_CLIENTS];
10105 for (int i = 0; i < pheader->max_client_index; i++) {
10106 request_id[i] = -1;
10107 }
10108
10109 size_t ask_rp = pbuf->write_cache_rp;
10110 size_t ask_wp = pbuf->write_cache_wp;
10111
10112 if (ask_wp == 0) { // nothing to do
10113 return BM_SUCCESS;
10114 }
10115
10116 if (ask_rp == ask_wp) { // nothing to do
10117 return BM_SUCCESS;
10118 }
10119
10120 assert(ask_rp < ask_wp);
10121
10122 size_t ask_free = ALIGN8(ask_wp - ask_rp);
10123
10124 if (ask_free == 0) { // nothing to do
10125 return BM_SUCCESS;
10126 }
10127
10128#if 0
10130 if (status != BM_SUCCESS) {
10131 printf("bm_flush_cache: corrupted 111!\n");
10132 abort();
10133 }
10134#endif
10135
10136 status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, ask_free, true);
10137
10138 if (status != BM_SUCCESS) {
10139 return status;
10140 }
10141
10142 // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10143 //
10144 // wait_for_free_space() will sleep with all locks released,
10145 // during this time, another thread may call bm_send_event() that will
10146 // add one or more events to the write cache and after wait_for_free_space()
10147 // returns, size of data in cache will be bigger than the amount
10148 // of free space we requested. so we need to keep track of how
10149 // much data we write to the buffer and ask for more data
10150 // if we run short. This is the reason for the big loop
10151 // around wait_for_free_space(). We ask for slightly too little free
10152 // space to make sure all this code is always used and does work. K.O.
10153
10154 if (pbuf->write_cache_wp == 0) {
10155 /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10156 return BM_SUCCESS;
10157 }
10158
10159 //size_t written = 0;
10160 while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10161 /* loop over all events in cache */
10162
10163 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10164 size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10165 size_t total_size = ALIGN8(event_size);
10166
10167#if 0
10168 printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10169 int(pbuf->write_cache_size),
10170 int(pbuf->write_cache_wp),
10171 int(pbuf->write_cache_rp),
10172 int(pevent->data_size),
10173 int(event_size),
10174 int(total_size),
10175 int(ask_free),
10176 int(written));
10177#endif
10178
10179 // check for crazy event size
10180 assert(total_size >= sizeof(EVENT_HEADER));
10181 assert(total_size <= (size_t)pheader->size);
10182
10183 bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10184
10185 /* update statistics */
10186 pheader->num_in_events++;
10187 pbuf->count_sent += 1;
10188 pbuf->bytes_sent += total_size;
10189
10190 /* see comment for the same code in bm_send_event().
10191 * We make sure the buffer is never 100% full */
10192 assert(pheader->write_pointer != pheader->read_pointer);
10193
10194 /* check if anybody has a request for this event */
10195 for (int i = 0; i < pheader->max_client_index; i++) {
10196 BUFFER_CLIENT *pc = pheader->client + i;
10197 int r = bm_find_first_request_locked(pc, pevent);
10198 if (r >= 0) {
10199 request_id[i] = r;
10200 }
10201 }
10202
10203 /* this loop does not loop forever because rp
10204 * is monotonously incremented here. write_cache_wp does
10205 * not change */
10206
10207 pbuf->write_cache_rp += total_size;
10208 //written += total_size;
10209
10210 assert(pbuf->write_cache_rp > 0);
10211 assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10212 assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10213 }
10214
10215 /* the write cache is now empty */
10216 assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10217 pbuf->write_cache_wp = 0;
10218 pbuf->write_cache_rp = 0;
10219
10220 /* check which clients are waiting */
10221 for (int i = 0; i < pheader->max_client_index; i++) {
10222 BUFFER_CLIENT *pc = pheader->client + i;
10223 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10224 }
10225 }
10226
10227 return BM_SUCCESS;
10228}
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition midas.cxx:9613
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition midas.cxx:9599
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition midas.cxx:6324
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition midas.cxx:9514
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition midas.cxx:9090
#define ALIGN8(x)
Definition midas.h:522
INT num_in_events
Definition midas.h:965
INT write_pointer
Definition midas.h:964
INT read_pointer
Definition midas.h:963
int count_sent
Definition midas.h:1013
double bytes_sent
Definition midas.h:1014
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 9997 of file midas.cxx.

9998{
9999 //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
10000
10001 DWORD time_start = ss_millitime();
10002 DWORD time_end = time_start + timeout_msec;
10003 DWORD time_bombout = time_end;
10004
10005 if (timeout_msec < 10000)
10006 time_bombout = time_start + 10000; // 10 seconds
10007
10008 int xtimeout_msec = timeout_msec;
10009
10010 while (1) {
10011 if (timeout_msec == BM_WAIT) {
10012 xtimeout_msec = 1000;
10013 } else if (timeout_msec == BM_NO_WAIT) {
10014 xtimeout_msec = BM_NO_WAIT;
10015 } else {
10016 if (xtimeout_msec > 1000) {
10017 xtimeout_msec = 1000;
10018 }
10019 }
10020
10021 int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
10022
10023 //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
10024
10025 if (status == BM_ASYNC_RETURN) {
10026 if (timeout_msec == BM_WAIT) {
10027 DWORD now = ss_millitime();
10028 if (now >= time_bombout) {
10029 // timeout
10030 return BM_TIMEOUT;
10031 }
10032
10033 // BM_WAIT means wait forever
10034 continue;
10035 } else if (timeout_msec == BM_NO_WAIT) {
10036 // BM_NO_WAIT means do not wait
10037 return status;
10038 } else {
10039 DWORD now = ss_millitime();
10040 if (now >= time_end) {
10041 // timeout, return BM_ASYNC_RETURN
10042 return status;
10043 }
10044
10045 DWORD remain = time_end - now;
10046
10047 if (remain < (DWORD)xtimeout_msec) {
10048 xtimeout_msec = remain;
10049 }
10050
10051 if (now >= time_bombout) {
10052 // timeout
10053 return BM_TIMEOUT;
10054 }
10055
10056 // keep asking for event...
10057 continue;
10058 }
10059 } else if (status == BM_SUCCESS) {
10060 // success, return BM_SUCCESS
10061 return status;
10062 } else {
10063 // error
10064 return status;
10065 }
10066 }
10067}
#define BM_TIMEOUT
Definition midas.h:625
#define BM_WAIT
Definition midas.h:365
#define RPC_BM_FLUSH_CACHE
Definition mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char *  buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7085 of file midas.cxx.

7086{
7087 gBuffersMutex.lock();
7088 for (size_t i = 0; i < gBuffers.size(); i++) {
7089 BUFFER* pbuf = gBuffers[i];
7090 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7091 *buffer_handle = i + 1;
7092 gBuffersMutex.unlock();
7093 return BM_SUCCESS;
7094 }
7095 }
7096 gBuffersMutex.unlock();
7097 return BM_NOT_FOUND;
7098}
#define BM_NOT_FOUND
Definition midas.h:612
BOOL equal_ustring(const char *str1, const char *str2)
Definition odb.cxx:3285
char buffer_name[NAME_LENGTH]
Definition mevb.cxx:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8865 of file midas.cxx.

8865 {
8866 /* increment read cache read pointer */
8867 pbuf->read_cache_rp += total_size;
8868
8869 if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8870 pbuf->read_cache_rp = 0;
8871 pbuf->read_cache_wp = 0;
8872 }
8873}
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6240 of file midas.cxx.

6241{
6242#if 0
6243 if (gRpLog == NULL) {
6244 gRpLog = fopen("rp.log", "a");
6245 }
6246 if (gRpLog && (total_size < 16)) {
6247 const char *pdata = (const char *) (pheader + 1);
6248 const DWORD *pevent = (const DWORD*) (pdata + rp);
6249 fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6250 pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6251 }
6252#endif
6253
6254 // these checks are already done before we come here.
6255 // but we check again as last-ressort protection. K.O.
6256 assert(total_size > 0);
6257 assert(total_size >= (int)sizeof(EVENT_HEADER));
6258
6259 rp += total_size;
6260 if (rp >= pheader->size) {
6261 rp -= pheader->size;
6262 } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6263 // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6264 // if at the end of the buffer, the remaining free space is exactly
6265 // equal to the size of an event header, the event header
6266 // is written there, the pointer is wrapped and the event data
6267 // is written to the beginning of the buffer.
6268 rp = 0;
6269 }
6270 return rp;
6271}
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 6032 of file midas.cxx.

6032 {
6033 // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
6034 // because of mismatch in sign-extension between signed 16-bit event_id and
6035 // unsigned 16-bit constants. K.O.
6036
6037 if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
6038 /* fragmented event */
6039 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
6040 && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
6041
6042 return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
6043 && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
6044}
#define TRIGGER_ALL
Definition midas.h:538
#define EVENTID_ALL
Definition midas.h:537
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char *  who,
const BUFFER_HEADER pheader,
const char *  pdata,
int  rp 
)
static

Definition at line 6273 of file midas.cxx.

6273 {
6274 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6275 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6276 int total_size = ALIGN8(event_size);
6277
6278 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6279 cm_msg(MERROR, "bm_next_rp",
6280 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6281 pheader->name,
6282 rp,
6283 pevent->data_size,
6284 event_size,
6285 total_size,
6286 pheader->read_pointer,
6287 pheader->write_pointer,
6288 pheader->size,
6289 who);
6290 return -1;
6291 }
6292
6293 int remaining = 0;
6294 if (rp < pheader->write_pointer) {
6295 remaining = pheader->write_pointer - rp;
6296 } else {
6297 remaining = pheader->size - rp;
6298 remaining += pheader->write_pointer;
6299 }
6300
6301 //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6302
6303 if (total_size > remaining) {
6304 cm_msg(MERROR, "bm_next_rp",
6305 "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6306 pheader->name,
6307 rp,
6308 pevent->data_size,
6309 event_size,
6310 total_size,
6311 pheader->read_pointer,
6312 pheader->write_pointer,
6313 pheader->size,
6314 remaining,
6315 who);
6316 return -1;
6317 }
6318
6319 rp = bm_incr_rp_no_check(pheader, rp, total_size);
6320
6321 return rp;
6322}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9613 of file midas.cxx.

9613 {
9614 if (request_id >= 0) {
9615 /* if that client has a request and is suspended, wake it up */
9616 if (pc->read_wait) {
9617 char str[80];
9618 sprintf(str, "B %s %d", pheader->name, request_id);
9619 ss_resume(pc->port, str);
9620 //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9621 //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9622 pc->read_wait = FALSE;
9623 }
9624 }
9625}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char *  buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6727
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8475
INT cm_yield(INT millisec)
Definition midas.cxx:5659
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition midas.cxx:2293
INT cm_disconnect_experiment(void)
Definition midas.cxx:2861
#define CM_SUCCESS
Definition midas.h:582
#define SS_ABORT
Definition midas.h:678
#define RPC_SHUTDOWN
Definition midas.h:708
int main()
Definition hwtest.cxx:23
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define EVENT_BUFFER_NAME
Definition midas.h:269
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6727 of file midas.cxx.

6727 {
6728 INT status;
6729
6730 if (rpc_is_remote()) {
6731 status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6732
6733 HNDLE hDB;
6735 if (status != SUCCESS || hDB == 0) {
6736 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6737 return BM_NO_SHM;
6738 }
6739
6741
6742 int size = sizeof(INT);
6743 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6744
6745 if (status != DB_SUCCESS) {
6746 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6747 status);
6748 return status;
6749 }
6750
6751 return status;
6752 }
6753#ifdef LOCAL_ROUTINES
6754 {
6755 HNDLE shm_handle;
6756 size_t shm_size;
6757 HNDLE hDB;
6758 const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6759
6760 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6761
6762 if (!buffer_name || !buffer_name[0]) {
6763 cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6764 return BM_INVALID_PARAM;
6765 }
6766
6767 if (strlen(buffer_name) >= NAME_LENGTH) {
6768 cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6769 return BM_INVALID_PARAM;
6770 }
6771
6773
6774 if (status != SUCCESS || hDB == 0) {
6775 //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6776 return BM_NO_SHM;
6777 }
6778
6779 /* get buffer size from ODB, user parameter as default if not present in ODB */
6780 std::string odb_path;
6781 odb_path += "/Experiment/Buffer sizes/";
6782 odb_path += buffer_name;
6783
6784 int size = sizeof(INT);
6785 status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6786
6787 if (buffer_size <= 0 || buffer_size > max_buffer_size) {
6788 cm_msg(MERROR, "bm_open_buffer",
6789 "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6790 buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6791 return BM_INVALID_PARAM;
6792 }
6793
6795
6796 size = sizeof(INT);
6797 status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6798
6799 if (status != DB_SUCCESS) {
6800 cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6801 status);
6802 return status;
6803 }
6804
6805 /* check if buffer already is open */
6806 gBuffersMutex.lock();
6807 for (size_t i = 0; i < gBuffers.size(); i++) {
6808 BUFFER* pbuf = gBuffers[i];
6809 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6810 *buffer_handle = i + 1;
6811 gBuffersMutex.unlock();
6812 return BM_SUCCESS;
6813 }
6814 }
6815 gBuffersMutex.unlock();
6816
6817 // only one thread at a time should create new buffers
6818
6819 static std::mutex gNewBufferMutex;
6820 std::lock_guard<std::mutex> guard(gNewBufferMutex);
6821
6822 // if we had a race against another thread
6823 // and while we were waiting for gNewBufferMutex
6824 // the other thread created this buffer, we return it.
6825
6826 gBuffersMutex.lock();
6827 for (size_t i = 0; i < gBuffers.size(); i++) {
6828 BUFFER* pbuf = gBuffers[i];
6829 if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6830 *buffer_handle = i + 1;
6831 gBuffersMutex.unlock();
6832 return BM_SUCCESS;
6833 }
6834 }
6835 gBuffersMutex.unlock();
6836
6837 /* allocate new BUFFER object */
6838
6839 BUFFER* pbuf = new BUFFER;
6840
6841 /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6842
6843 for (int i=0; i<MAX_CLIENTS; i++) {
6844 pbuf->client_count_write_wait[i] = 0;
6845 pbuf->client_time_write_wait[i] = 0;
6846 }
6847
6848 /* create buffer semaphore */
6849
6851
6852 if (status != SS_CREATED && status != SS_SUCCESS) {
6853 *buffer_handle = 0;
6854 delete pbuf;
6855 return BM_NO_SEMAPHORE;
6856 }
6857
6858 std::string client_name = cm_get_client_name();
6859
6860 /* store client name */
6861 mstrlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6862
6863 /* store buffer name */
6864 mstrlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6865
6866 /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6867
6868 pbuf->attached = true; // required by bm_lock_buffer()
6869
6870 bm_lock_buffer_guard pbuf_guard(pbuf);
6871
6872 if (!pbuf_guard.is_locked()) {
6873 // cannot happen, no other thread can see this pbuf
6874 abort();
6875 return BM_NO_SEMAPHORE;
6876 }
6877
6878 /* open shared memory */
6879
6880 void *p = NULL;
6881 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6882
6883 if (status != SS_SUCCESS && status != SS_CREATED) {
6884 *buffer_handle = 0;
6885 pbuf_guard.unlock();
6886 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6887 delete pbuf;
6888 return BM_NO_SHM;
6889 }
6890
6891 pbuf->buffer_header = (BUFFER_HEADER *) p;
6892
6893 BUFFER_HEADER *pheader = pbuf->buffer_header;
6894
6895 bool shm_created = (status == SS_CREATED);
6896
6897 if (shm_created) {
6898 /* initialize newly created shared memory */
6899
6900 memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6901
6902 mstrlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6903 pheader->size = buffer_size;
6904
6905 } else {
6906 /* validate existing shared memory */
6907
6908 if (!equal_ustring(pheader->name, buffer_name)) {
6909 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6910 pbuf_guard.unlock();
6911 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6912 cm_msg(MERROR, "bm_open_buffer",
6913 "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6914 pheader->name);
6915 *buffer_handle = 0;
6916 delete pbuf;
6917 return BM_CORRUPTED;
6918 }
6919
6920 if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6921 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6922 pbuf_guard.unlock();
6923 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6924 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6926 *buffer_handle = 0;
6927 delete pbuf;
6928 return BM_CORRUPTED;
6929 }
6930
6931 if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6932 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6933 pbuf_guard.unlock();
6934 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6935 cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6937 *buffer_handle = 0;
6938 delete pbuf;
6939 return BM_CORRUPTED;
6940 }
6941
6942 /* check if buffer size is identical */
6943 if (pheader->size != buffer_size) {
6944 cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6945 buffer_name, buffer_size, pheader->size);
6946
6947 buffer_size = pheader->size;
6948
6949 ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6950
6951 status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6952
6953 if (status != SS_SUCCESS) {
6954 *buffer_handle = 0;
6955 pbuf_guard.unlock();
6956 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6957 delete pbuf;
6958 return BM_NO_SHM;
6959 }
6960
6961 pbuf->buffer_header = (BUFFER_HEADER *) p;
6962 pheader = pbuf->buffer_header;
6963 }
6964 }
6965
6966 /* shared memory is good from here down */
6967
6968 pbuf->attached = true;
6969
6970 pbuf->shm_handle = shm_handle;
6971 pbuf->shm_size = shm_size;
6972 pbuf->callback = FALSE;
6973
6974 bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6975
6977 if (status != BM_SUCCESS) {
6978 cm_msg(MERROR, "bm_open_buffer",
6979 "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6980 status);
6982 cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6983 }
6984
6985 /* add our client BUFFER_HEADER */
6986
6987 int iclient = 0;
6988 for (; iclient < MAX_CLIENTS; iclient++)
6989 if (pheader->client[iclient].pid == 0)
6990 break;
6991
6992 if (iclient == MAX_CLIENTS) {
6993 *buffer_handle = 0;
6994 // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6995 pbuf_guard.unlock();
6996 pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6997 delete pbuf;
6998 cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
6999 return BM_NO_SLOT;
7000 }
7001
7002 /* store slot index in _buffer structure */
7003 pbuf->client_index = iclient;
7004
7005 /*
7006 Save the index of the last client of that buffer so that later only
7007 the clients 0..max_client_index-1 have to be searched through.
7008 */
7009 pheader->num_clients++;
7010 if (iclient + 1 > pheader->max_client_index)
7011 pheader->max_client_index = iclient + 1;
7012
7013 /* setup buffer header and client structure */
7014 BUFFER_CLIENT *pclient = &pheader->client[iclient];
7015
7016 memset(pclient, 0, sizeof(BUFFER_CLIENT));
7017
7018 mstrlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
7019
7020 pclient->pid = ss_getpid();
7021
7023
7024 pclient->read_pointer = pheader->write_pointer;
7025 pclient->last_activity = ss_millitime();
7026
7028
7029 pbuf_guard.unlock();
7030
7031 /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
7032
7033 pheader = NULL;
7034
7035 /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
7036
7039
7040 /* add pbuf to buffer list */
7041
7042 gBuffersMutex.lock();
7043
7044 bool added = false;
7045 for (size_t i=0; i<gBuffers.size(); i++) {
7046 if (gBuffers[i] == NULL) {
7047 gBuffers[i] = pbuf;
7048 added = true;
7049 *buffer_handle = i+1;
7050 break;
7051 }
7052 }
7053 if (!added) {
7054 *buffer_handle = gBuffers.size() + 1;
7055 gBuffers.push_back(pbuf);
7056 }
7057
7058 /* from here down we should not touch pbuf without locking it */
7059
7060 pbuf = NULL;
7061
7062 gBuffersMutex.unlock();
7063
7064 /* new buffer is now ready for use */
7065
7066 /* initialize buffer counters */
7067 bm_init_buffer_counters(*buffer_handle);
7068
7069 bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7070
7071 if (shm_created)
7072 return BM_CREATED;
7073 }
7074#endif /* LOCAL_ROUTINES */
7075
7076 return BM_SUCCESS;
7077}
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition midas.cxx:6083
static DWORD _bm_max_event_size
Definition midas.cxx:5931
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition midas.cxx:6425
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition midas.cxx:6408
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition midas.cxx:3340
std::string cm_get_client_name()
Definition midas.cxx:2074
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition midas.cxx:6169
#define BM_NO_SLOT
Definition midas.h:610
#define BM_NO_SHM
Definition midas.h:622
#define BM_CREATED
Definition midas.h:606
#define BM_NO_SEMAPHORE
Definition midas.h:611
#define DB_SUCCESS
Definition midas.h:632
#define SS_SUCCESS
Definition midas.h:664
#define SS_CREATED
Definition midas.h:665
#define SUCCESS
Definition mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition system.cxx:2532
INT ss_getpid(void)
Definition system.cxx:1379
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition system.cxx:326
midas_thread_t ss_gettid(void)
Definition system.cxx:1591
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition system.cxx:4425
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5185
#define RPC_BM_OPEN_BUFFER
Definition mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition midas.cxx:8073
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
#define NAME_LENGTH
Definition midas.h:272
INT client_index
Definition midas.h:990
int client_count_write_wait[MAX_CLIENTS]
Definition midas.h:1023
DWORD client_time_write_wait[MAX_CLIENTS]
Definition midas.h:1024
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8901 of file midas.cxx.

8902{
8903 if (pc->read_pointer == pheader->write_pointer) {
8904 /* no more events buffered for this client */
8905 if (!pc->read_wait) {
8906 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8907 pc->read_wait = TRUE;
8908 }
8909 return BM_ASYNC_RETURN;
8910 }
8911
8912 if (pc->read_wait) {
8913 //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8914 pc->read_wait = FALSE;
8915 }
8916
8917 if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8918 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8919 return BM_CORRUPTED;
8920 }
8921
8922 char *pdata = (char *) (pheader + 1);
8923
8924 EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8925 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8926 int total_size = ALIGN8(event_size);
8927
8928 if ((total_size <= 0) || (total_size > pheader->size)) {
8929 cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8930 return BM_CORRUPTED;
8931 }
8932
8933 assert(total_size > 0);
8934 assert(total_size <= pheader->size);
8935
8936 if (ppevent)
8937 *ppevent = pevent;
8938 if (pevent_size)
8939 *pevent_size = event_size;
8940 if (ptotal_size)
8941 *ptotal_size = total_size;
8942
8943 return BM_SUCCESS;
8944}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8875 of file midas.cxx.

8876{
8877 if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8878 return FALSE;
8879
8880 EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8881 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8882 int total_size = ALIGN8(event_size);
8883
8884 if (ppevent)
8885 *ppevent = pevent;
8886 if (pevent_size)
8887 *pevent_size = event_size;
8888 if (ptotal_size)
8889 *ptotal_size = total_size;
8890
8891 return TRUE;
8892}
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11151 of file midas.cxx.

11165{
11166 BOOL dispatched_something = FALSE;
11167
11168 //printf("bm_poll_event!\n");
11169
11170 DWORD start_time = ss_millitime();
11171
11172 std::vector<char> vec;
11173
11174 /* loop over all requests */
11175 _request_list_mutex.lock();
11176 bool locked = true;
11177 size_t n = _request_list.size();
11178 for (size_t i = 0; i < n; i++) {
11179 if (!locked) {
11180 _request_list_mutex.lock();
11181 locked = true;
11182 }
11183 /* continue if no dispatcher set (manual bm_receive_event) */
11184 if (_request_list[i].dispatcher == NULL)
11185 continue;
11186
11187 int buffer_handle = _request_list[i].buffer_handle;
11188
11189 /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11190 _request_list_mutex.unlock();
11191 locked = false;
11192
11193 do {
11194 /* receive event */
11195 int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11196
11197 //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11198
11199 /* call user function if successful */
11200 if (status == BM_SUCCESS) {
11201 bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11202 dispatched_something = TRUE;
11203 }
11204
11205 /* break if no more events */
11206 if (status == BM_ASYNC_RETURN)
11207 break;
11208
11209 /* break if corrupted event buffer */
11210 if (status == BM_TRUNCATED) {
11211 cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11212 }
11213
11214 /* break if corrupted event buffer */
11215 if (status == BM_CORRUPTED)
11216 return SS_ABORT;
11217
11218 /* break if server died */
11219 if (status == RPC_NET_ERROR) {
11220 return SS_ABORT;
11221 }
11222
11223 /* stop after one second */
11224 if (ss_millitime() - start_time > 1000) {
11225 break;
11226 }
11227
11228 } while (TRUE);
11229 }
11230
11231 if (locked)
11232 _request_list_mutex.unlock();
11233
11234 if (dispatched_something)
11235 return BM_SUCCESS;
11236 else
11237 return BM_ASYNC_RETURN;
11238}
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10834
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition midas.cxx:8833
#define BM_TRUNCATED
Definition midas.h:614
#define RPC_NET_ERROR
Definition midas.h:702
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 10927 of file midas.cxx.

10927 {
10928 //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
10929
10930 /* return immediately if no callback routine is defined */
10931 if (!pbuf->callback)
10932 return BM_SUCCESS;
10933
10934 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
10935}
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition midas.cxx:10289
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void *  buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10289 of file midas.cxx.

10289 {
10291
10292 int max_size = 0;
10293 if (buf_size) {
10294 max_size = *buf_size;
10295 *buf_size = 0;
10296 }
10297
10298 //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10299
10300 bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10301
10302 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10303
10304 /* look if there is anything in the cache */
10305 if (pbuf->read_cache_size > 0) {
10306
10308
10309 if (status != BM_SUCCESS)
10310 return status;
10311
10312 if (pbuf->read_cache_wp == 0) {
10313
10314 // lock buffer for the first time
10315
10316 if (!pbuf_guard.relock()) {
10317 pbuf->read_cache_mutex.unlock();
10318 return pbuf_guard.get_status();
10319 }
10320
10321 status = bm_fill_read_cache_locked(pbuf_guard, timeout_msec);
10322 if (status != BM_SUCCESS) {
10323 // unlock in correct order
10324 if (pbuf_guard.is_locked()) {
10325 // check if bm_wait_for_more_events() failed to relock the buffer
10326 pbuf_guard.unlock();
10327 }
10328 pbuf->read_cache_mutex.unlock();
10329 return status;
10330 }
10331
10332 // buffer remains locked here
10333 }
10334 EVENT_HEADER *pevent;
10335 int event_size;
10336 int total_size;
10337 if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10338 if (pbuf_guard.is_locked()) {
10339 // do not need to keep the event buffer locked
10340 // when reading from the read cache
10341 pbuf_guard.unlock();
10342 }
10343 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10345 if (buf) {
10346 if (event_size > max_size) {
10347 cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10348 event_size = max_size;
10350 }
10351
10352 memcpy(buf, pevent, event_size);
10353
10354 if (buf_size) {
10355 *buf_size = event_size;
10356 }
10357 if (convert_flags) {
10358 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10359 }
10360 } else if (bufptr) {
10361 *bufptr = malloc(event_size);
10362 memcpy(*bufptr, pevent, event_size);
10364 } else if (vecptr) {
10365 vecptr->resize(0);
10366 char* cptr = (char*)pevent;
10367 vecptr->assign(cptr, cptr+event_size);
10368 }
10369 bm_incr_read_cache_locked(pbuf, total_size);
10370 pbuf->read_cache_mutex.unlock();
10371 if (dispatch) {
10372 // FIXME need to protect currently dispatched event against
10373 // another thread overwriting it by refilling the read cache
10374 bm_dispatch_event(buffer_handle, pevent);
10375 return BM_MORE_EVENTS;
10376 }
10377 // buffer is unlocked here
10378 return status;
10379 }
10380 pbuf->read_cache_mutex.unlock();
10381 }
10382
10383 /* we come here if the read cache is disabled */
10384 /* we come here if the next event is too big to fit into the read cache */
10385
10386 if (!pbuf_guard.is_locked()) {
10387 if (!pbuf_guard.relock())
10388 return pbuf_guard.get_status();
10389 }
10390
10391 EVENT_HEADER *event_buffer = NULL;
10392
10393 BUFFER_HEADER *pheader = pbuf->buffer_header;
10394
10395 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
10396
10397 while (1) {
10398 /* loop over events in the event buffer */
10399
10400 status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, FALSE);
10401
10402 if (status != BM_SUCCESS) {
10403 // implicit unlock
10404 return status;
10405 }
10406
10407 /* check if event at current read pointer matches a request */
10408
10409 EVENT_HEADER *pevent;
10410 int event_size;
10411 int total_size;
10412
10413 status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10414 if (status == BM_CORRUPTED) {
10415 // implicit unlock
10416 return status;
10417 } else if (status != BM_SUCCESS) {
10418 /* event buffer is empty */
10419 break;
10420 }
10421
10422 BOOL is_requested = bm_check_requests(pc, pevent);
10423
10424 if (is_requested) {
10425 //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10426
10428
10429 if (buf) {
10430 if (event_size > max_size) {
10431 cm_msg(MERROR, "bm_read_buffer",
10432 "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10433 event_size, pheader->name);
10434 event_size = max_size;
10436 }
10437
10438 bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10439
10440 if (buf_size) {
10441 *buf_size = event_size;
10442 }
10443
10444 if (convert_flags) {
10445 bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10446 }
10447
10448 pbuf->count_read++;
10449 pbuf->bytes_read += event_size;
10450 } else if (dispatch || bufptr) {
10451 assert(event_buffer == NULL); // make sure we only come here once
10452 event_buffer = (EVENT_HEADER *) malloc(event_size);
10454 pbuf->count_read++;
10455 pbuf->bytes_read += event_size;
10456 } else if (vecptr) {
10457 bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10458 pbuf->count_read++;
10459 pbuf->bytes_read += event_size;
10460 }
10461
10462 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10463 pc->read_pointer = new_read_pointer;
10464
10465 pheader->num_out_events++;
10466 /* exit loop over events */
10467 break;
10468 }
10469
10470 int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10471 pc->read_pointer = new_read_pointer;
10472 pheader->num_out_events++;
10473 }
10474
10475 /*
10476 If read pointer has been changed, it may have freed up some space
10477 for waiting producers. So check if free space is now more than 50%
10478 of the buffer size and wake waiting producers.
10479 */
10480
10481 bm_wakeup_producers_locked(pheader, pc);
10482
10483 pbuf_guard.unlock();
10484
10485 if (dispatch && event_buffer) {
10486 bm_dispatch_event(buffer_handle, event_buffer);
10487 free(event_buffer);
10488 event_buffer = NULL;
10489 return BM_MORE_EVENTS;
10490 }
10491
10492 if (bufptr && event_buffer) {
10493 *bufptr = event_buffer;
10494 event_buffer = NULL;
10496 }
10497
10498 if (event_buffer) {
10499 free(event_buffer);
10500 event_buffer = NULL;
10501 }
10502
10503 return status;
10504}
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition midas.cxx:9079
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition midas.cxx:9002
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition midas.cxx:8875
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition midas.cxx:8865
void * event_buffer
Definition mfe.cxx:65
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char *  buf,
int  event_size 
)
static

Definition at line 8946 of file midas.cxx.

8947{
8948 const char *pdata = (const char *) (pheader + 1);
8949
8950 if (rp + event_size <= pheader->size) {
8951 /* copy event to cache */
8952 memcpy(buf, pdata + rp, event_size);
8953 } else {
8954 /* event is splitted */
8955 int size = pheader->size - rp;
8956 memcpy(buf, pdata + rp, size);
8957 memcpy(buf + size, pdata, event_size - size);
8958 }
8959}
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8961 of file midas.cxx.

8962{
8963 const char *pdata = (const char *) (pheader + 1);
8964
8965 if (rp + event_size <= pheader->size) {
8966 /* copy event to cache */
8967 vecptr->assign(pdata + rp, pdata + rp + event_size);
8968 } else {
8969 /* event is splitted */
8970 int size = pheader->size - rp;
8971 vecptr->assign(pdata + rp, pdata + rp + size);
8972 vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8973 }
8974}

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void *  destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition midas.cxx:10675
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10675 of file midas.cxx.

10675 {
10676 //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10677 if (rpc_is_remote()) {
10678 return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10679 }
10680#ifdef LOCAL_ROUTINES
10681 {
10683
10684 BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10685
10686 if (!pbuf)
10687 return status;
10688
10689 int convert_flags = rpc_get_convert_flags();
10690
10691 status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10692 //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10693 return status;
10694 }
10695#else /* LOCAL_ROUTINES */
10696
10697 return BM_SUCCESS;
10698#endif
10699}
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10508
INT rpc_get_convert_flags(void)
Definition midas.cxx:13055
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10756 of file midas.cxx.

10756 {
10757 if (rpc_is_remote()) {
10758 return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10759 }
10760#ifdef LOCAL_ROUTINES
10761 {
10763
10764 BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10765
10766 if (!pbuf)
10767 return status;
10768
10769 int convert_flags = rpc_get_convert_flags();
10770
10771 return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10772 }
10773#else /* LOCAL_ROUTINES */
10774
10775 return BM_SUCCESS;
10776#endif
10777}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void *  buf,
int *  buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10508 of file midas.cxx.

10509{
10510 //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10511
10512 assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10513
10514 void *xbuf = NULL;
10515 int xbuf_size = 0;
10516
10517 if (buf) {
10518 xbuf = buf;
10519 xbuf_size = *buf_size;
10520 } else if (ppevent) {
10521 *ppevent = (EVENT_HEADER*)malloc(_bm_max_event_size);
10522 xbuf_size = _bm_max_event_size;
10523 } else if (pvec) {
10524 pvec->resize(_bm_max_event_size);
10525 xbuf = pvec->data();
10526 xbuf_size = pvec->size();
10527 } else {
10528 assert(!"incorrect call to bm_receivent_event_rpc()");
10529 }
10530
10531 int status;
10532 DWORD time_start = ss_millitime();
10533 DWORD time_end = time_start + timeout_msec;
10534
10535 int xtimeout_msec = timeout_msec;
10536
10537 int zbuf_size = xbuf_size;
10538
10539 while (1) {
10540 if (timeout_msec == BM_WAIT) {
10541 xtimeout_msec = 1000;
10542 } else if (timeout_msec == BM_NO_WAIT) {
10543 xtimeout_msec = BM_NO_WAIT;
10544 } else {
10545 if (xtimeout_msec > 1000) {
10546 xtimeout_msec = 1000;
10547 }
10548 }
10549
10550 zbuf_size = xbuf_size;
10551
10552 status = rpc_call(RPC_BM_RECEIVE_EVENT, buffer_handle, xbuf, &zbuf_size, xtimeout_msec);
10553
10554 //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10555
10556 if (status == BM_ASYNC_RETURN) {
10557 if (timeout_msec == BM_WAIT) {
10558 // BM_WAIT means wait forever
10559 continue;
10560 } else if (timeout_msec == BM_NO_WAIT) {
10561 // BM_NO_WAIT means do not wait
10562 break;
10563 } else {
10564 DWORD now = ss_millitime();
10565 if (now >= time_end) {
10566 // timeout, return BM_ASYNC_RETURN
10567 break;
10568 }
10569
10570 DWORD remain = time_end - now;
10571
10572 if (remain < (DWORD)xtimeout_msec) {
10573 xtimeout_msec = remain;
10574 }
10575
10576 // keep asking for event...
10577 continue;
10578 }
10579 } else if (status == BM_SUCCESS) {
10580 // success, return BM_SUCCESS
10581 break;
10582 }
10583
10584 // RPC error
10585
10586 if (buf) {
10587 *buf_size = 0;
10588 } else if (ppevent) {
10589 free(*ppevent);
10590 *ppevent = NULL;
10591 } else if (pvec) {
10592 pvec->resize(0);
10593 } else {
10594 assert(!"incorrect call to bm_receivent_event_rpc()");
10595 }
10596
10597 return status;
10598 }
10599
10600 // status is BM_SUCCESS or BM_ASYNC_RETURN
10601
10602 if (buf) {
10603 *buf_size = zbuf_size;
10604 } else if (ppevent) {
10605 // nothing to do
10606 // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10607 } else if (pvec) {
10608 pvec->resize(zbuf_size);
10609 } else {
10610 assert(!"incorrect call to bm_receivent_event_rpc()");
10611 }
10612
10613 return status;
10614}
#define RPC_BM_RECEIVE_EVENT
Definition mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10834 of file midas.cxx.

10834 {
10835 if (rpc_is_remote()) {
10836 return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10837 }
10838#ifdef LOCAL_ROUTINES
10839 {
10841
10842 BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10843
10844 if (!pbuf)
10845 return status;
10846
10847 int convert_flags = rpc_get_convert_flags();
10848
10849 return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10850 }
10851#else /* LOCAL_ROUTINES */
10852 return BM_SUCCESS;
10853#endif
10854}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 6052 of file midas.cxx.

6052 {
6053 int k, nc;
6054 BUFFER_CLIENT *pbctmp;
6055
6056 /* clear entry from client structure in buffer header */
6057 memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6058
6059 /* calculate new max_client_index entry */
6060 for (k = MAX_CLIENTS - 1; k >= 0; k--)
6061 if (pheader->client[k].pid != 0)
6062 break;
6063 pheader->max_client_index = k + 1;
6064
6065 /* count new number of clients */
6066 for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6067 if (pheader->client[k].pid != 0)
6068 nc++;
6069 pheader->num_clients = nc;
6070
6071 /* check if anyone is waiting and wake him up */
6072 pbctmp = pheader->client;
6073
6074 for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6075 if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6076 ss_resume(pbctmp->port, "B ");
6077}
INT k
Definition odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8528 of file midas.cxx.

8528 {
8529 if (rpc_is_remote())
8530 return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8531
8532#ifdef LOCAL_ROUTINES
8533 {
8534 int status = 0;
8535
8536 BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8537
8538 if (!pbuf)
8539 return status;
8540
8541 /* lock buffer */
8542 bm_lock_buffer_guard pbuf_guard(pbuf);
8543
8544 if (!pbuf_guard.is_locked())
8545 return pbuf_guard.get_status();
8546
8547 INT i, deleted;
8548
8549 /* get a pointer to the proper client structure */
8550 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
8551
8552 /* check all requests and set to zero if matching */
8553 for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8554 if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8555 memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8556 deleted++;
8557 }
8558
8559 /* calculate new max_request_index entry */
8560 for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8561 if (pclient->event_request[i].valid)
8562 break;
8563
8564 pclient->max_request_index = i + 1;
8565
8566 /* calculate new all_flag */
8567 pclient->all_flag = FALSE;
8568
8569 for (i = 0; i < pclient->max_request_index; i++)
8570 if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8571 pclient->all_flag = TRUE;
8572 break;
8573 }
8574
8575 pbuf->get_all_flag = pclient->all_flag;
8576
8577 if (!deleted)
8578 return BM_NOT_FOUND;
8579 }
8580#endif /* LOCAL_ROUTINES */
8581
8582 return BM_SUCCESS;
8583}
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8475 of file midas.cxx.

8479{
8480 assert(request_id != NULL);
8481
8482 EventRequest r;
8483 r.buffer_handle = buffer_handle;
8484 r.event_id = event_id;
8486 r.dispatcher = func;
8487
8488 {
8489 std::lock_guard<std::mutex> guard(_request_list_mutex);
8490
8491 bool found = false;
8492
8493 // find deleted entry
8494 for (size_t i = 0; i < _request_list.size(); i++) {
8495 if (_request_list[i].buffer_handle == 0) {
8496 _request_list[i] = r;
8497 *request_id = i;
8498 found = true;
8499 break;
8500 }
8501 }
8502
8503 if (!found) { // not found
8504 *request_id = _request_list.size();
8505 _request_list.push_back(r);
8506 }
8507
8508 // implicit unlock()
8509 }
8510
8511 /* add request in buffer structure */
8512 int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8513 if (status != BM_SUCCESS)
8514 return status;
8515
8516 return BM_SUCCESS;
8517}
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition midas.cxx:8324
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6408 of file midas.cxx.

6408 {
6409 BUFFER_HEADER *pheader = pbuf->buffer_header;
6410
6411 //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6412
6413 pheader->read_pointer = 0;
6414 pheader->write_pointer = 0;
6415
6416 int i;
6417 for (i = 0; i < pheader->max_client_index; i++) {
6418 BUFFER_CLIENT *pc = pheader->client + i;
6419 if (pc->pid) {
6420 pc->read_pointer = 0;
6421 }
6422 }
6423}
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9688 of file midas.cxx.

9689{
9690 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9691 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9692
9693 if (data_size == 0) {
9694 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9695 return BM_INVALID_SIZE;
9696 }
9697
9698 if (data_size > MAX_DATA_SIZE) {
9699 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9700 return BM_INVALID_SIZE;
9701 }
9702
9703 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9704
9705 //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9706
9707 if (rpc_is_remote()) {
9708 //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9709 return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9710 } else {
9711 return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9712 }
9713}
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9788
#define BM_INVALID_SIZE
Definition midas.h:624
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition midas.cxx:13950
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
HNDLE hbuf;
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition midas.cxx:9688
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9788 of file midas.cxx.

9789{
9790 if (rpc_is_remote())
9791 return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9792
9793 if (sg_n < 1) {
9794 cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9795 return BM_INVALID_SIZE;
9796 }
9797
9798 if (sg_ptr[0] == NULL) {
9799 cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9800 return BM_INVALID_SIZE;
9801 }
9802
9803 if (sg_len[0] < sizeof(EVENT_HEADER)) {
9804 cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9805 return BM_INVALID_SIZE;
9806 }
9807
9808 const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9809
9810 const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9811 const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9812
9813 if (data_size == 0) {
9814 cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9815 return BM_INVALID_SIZE;
9816 }
9817
9818 if (data_size > MAX_DATA_SIZE) {
9819 cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9820 return BM_INVALID_SIZE;
9821 }
9822
9823 const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9824
9825 size_t count = 0;
9826 for (int i=0; i<sg_n; i++) {
9827 count += sg_len[i];
9828 }
9829
9830 if (count != event_size) {
9831 cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9832 return BM_INVALID_SIZE;
9833 }
9834
9835 //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9836
9837#ifdef LOCAL_ROUTINES
9838 {
9839 int status = 0;
9840 const size_t total_size = ALIGN8(event_size);
9841
9842 BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9843
9844 if (!pbuf)
9845 return status;
9846
9847 /* round up total_size to next DWORD boundary */
9848 //int total_size = ALIGN8(event_size);
9849
9850 /* check if write cache is enabled */
9851 if (pbuf->write_cache_size) {
9853
9854 if (status != BM_SUCCESS)
9855 return status;
9856
9857 /* check if write cache is enabled */
9858 if (pbuf->write_cache_size) {
9860 bool too_big = event_size > max_event_size;
9861
9862 //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9863
9864 /* if this event does not fit into the write cache, flush the write cache */
9865 if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9866 //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9867
9868 bm_lock_buffer_guard pbuf_guard(pbuf);
9869
9870 if (!pbuf_guard.is_locked()) {
9871 pbuf->write_cache_mutex.unlock();
9872 return pbuf_guard.get_status();
9873 }
9874
9875 int status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
9876
9877 if (pbuf_guard.is_locked()) {
9878 // check if bm_wait_for_free_space() failed to relock the buffer
9879 pbuf_guard.unlock();
9880 }
9881
9882 if (status != BM_SUCCESS) {
9883 pbuf->write_cache_mutex.unlock();
9884 // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9885 if (status == BM_NO_MEMORY)
9886 cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9887 return status;
9888 }
9889
9890 // write cache must be empty here
9891 assert(pbuf->write_cache_wp == 0);
9892 }
9893
9894 /* write this event into the write cache, if it is not too big and if it fits */
9895 if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9896 //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9897
9898 char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9899
9900 for (int i=0; i<sg_n; i++) {
9901 memcpy(wptr, sg_ptr[i], sg_len[i]);
9902 wptr += sg_len[i];
9903 }
9904
9905 pbuf->write_cache_wp += total_size;
9906
9907 pbuf->write_cache_mutex.unlock();
9908 return BM_SUCCESS;
9909 }
9910 }
9911
9912 /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9913 pbuf->write_cache_mutex.unlock();
9914 }
9915
9916 /* we come here only for events that are too big to fit into the cache */
9917
9918 /* lock the buffer */
9919 bm_lock_buffer_guard pbuf_guard(pbuf);
9920
9921 if (!pbuf_guard.is_locked()) {
9922 return pbuf_guard.get_status();
9923 }
9924
9925 /* calculate some shorthands */
9926 BUFFER_HEADER *pheader = pbuf->buffer_header;
9927
9928#if 0
9930 if (status != BM_SUCCESS) {
9931 printf("bm_send_event: corrupted 111!\n");
9932 abort();
9933 }
9934#endif
9935
9936 /* check if buffer is large enough */
9937 if (total_size >= (size_t)pheader->size) {
9938 pbuf_guard.unlock(); // unlock before cm_msg()
9939 cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9940 return BM_NO_MEMORY;
9941 }
9942
9943 status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, total_size, false);
9944
9945 if (status != BM_SUCCESS) {
9946 // implicit unlock
9947 return status;
9948 }
9949
9950#if 0
9952 if (status != BM_SUCCESS) {
9953 printf("bm_send_event: corrupted 222!\n");
9954 abort();
9955 }
9956#endif
9957
9958 int old_write_pointer = pheader->write_pointer;
9959
9960 bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9961
9962 /* write pointer was incremented, but there should
9963 * always be some free space in the buffer and the
9964 * write pointer should never cacth up to the read pointer:
9965 * the rest of the code gets confused this happens (buffer 100% full)
9966 * as it is write_pointer == read_pointer can be either
9967 * 100% full or 100% empty. My solution: never fill
9968 * the buffer to 100% */
9969 assert(pheader->write_pointer != pheader->read_pointer);
9970
9971 /* send wake up messages to all clients that want this event */
9972 int i;
9973 for (i = 0; i < pheader->max_client_index; i++) {
9974 BUFFER_CLIENT *pc = pheader->client + i;
9975 int request_id = bm_find_first_request_locked(pc, pevent);
9976 bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9977 }
9978
9979#if 0
9981 if (status != BM_SUCCESS) {
9982 printf("bm_send_event: corrupted 333!\n");
9983 abort();
9984 }
9985#endif
9986
9987 /* update statistics */
9988 pheader->num_in_events++;
9989 pbuf->count_sent += 1;
9990 pbuf->bytes_sent += total_size;
9991 }
9992#endif /* LOCAL_ROUTINES */
9993
9994 return BM_SUCCESS;
9995}
double count
Definition mdump.cxx:33
INT max_event_size
Definition mfed.cxx:30
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition midas.h:259
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9715 of file midas.cxx.

9716{
9717 const char* cptr = event.data();
9718 size_t clen = event.size();
9719 return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9720}
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char > > &  event,
int  timeout_msec 
)

Definition at line 9722 of file midas.cxx.

9723{
9724 int sg_n = event.size();
9725 const char* sg_ptr[sg_n];
9726 size_t sg_len[sg_n];
9727 for (int i=0; i<sg_n; i++) {
9728 sg_ptr[i] = event[i].data();
9729 sg_len[i] = event[i].size();
9730 }
9731 return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9732}
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8150 of file midas.cxx.

8152{
8153 if (rpc_is_remote())
8154 return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8155
8156#ifdef LOCAL_ROUTINES
8157 {
8158 int status = 0;
8159
8160 BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8161
8162 if (!pbuf)
8163 return status;
8164
8165 /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8166
8168
8169 if (status != BM_SUCCESS)
8170 return status;
8171
8172 if (write_size < 0)
8173 write_size = 0;
8174
8175 if (write_size > 0) {
8176 if (write_size < MIN_WRITE_CACHE_SIZE) {
8177 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8178 write_size = MIN_WRITE_CACHE_SIZE;
8179 }
8180 }
8181
8182 size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8183
8184 if (write_size > max_write_size) {
8185 size_t new_write_size = max_write_size;
8186 cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8187 write_size = new_write_size;
8188 }
8189
8190 pbuf->buffer_mutex.unlock();
8191
8192 /* resize read cache */
8193
8195
8196 if (status != BM_SUCCESS) {
8197 return status;
8198 }
8199
8200 if (pbuf->read_cache_size > 0) {
8201 free(pbuf->read_cache);
8202 pbuf->read_cache = NULL;
8203 }
8204
8205 if (read_size > 0) {
8206 pbuf->read_cache = (char *) malloc(read_size);
8207 if (pbuf->read_cache == NULL) {
8208 pbuf->read_cache_size = 0;
8209 pbuf->read_cache_rp = 0;
8210 pbuf->read_cache_wp = 0;
8211 pbuf->read_cache_mutex.unlock();
8212 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8213 return BM_NO_MEMORY;
8214 }
8215 }
8216
8217 pbuf->read_cache_size = read_size;
8218 pbuf->read_cache_rp = 0;
8219 pbuf->read_cache_wp = 0;
8220
8221 pbuf->read_cache_mutex.unlock();
8222
8223 /* resize the write cache */
8224
8226
8227 if (status != BM_SUCCESS)
8228 return status;
8229
8230 // FIXME: should flush the write cache!
8231 if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8232 cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8233 }
8234
8235 /* manage write cache */
8236 if (pbuf->write_cache_size > 0) {
8237 free(pbuf->write_cache);
8238 pbuf->write_cache = NULL;
8239 }
8240
8241 if (write_size > 0) {
8242 pbuf->write_cache = (char *) M_MALLOC(write_size);
8243 if (pbuf->write_cache == NULL) {
8244 pbuf->write_cache_size = 0;
8245 pbuf->write_cache_rp = 0;
8246 pbuf->write_cache_wp = 0;
8247 pbuf->write_cache_mutex.unlock();
8248 cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8249 return BM_NO_MEMORY;
8250 }
8251 }
8252
8253 pbuf->write_cache_size = write_size;
8254 pbuf->write_cache_rp = 0;
8255 pbuf->write_cache_wp = 0;
8256
8257 pbuf->write_cache_mutex.unlock();
8258 }
8259#endif /* LOCAL_ROUTINES */
8260
8261 return BM_SUCCESS;
8262}
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition midas.cxx:7956
#define RPC_BM_SET_CACHE_SIZE
Definition mrpc.h:42
#define M_MALLOC(x)
Definition midas.h:1551
#define MIN_WRITE_CACHE_SIZE
Definition midas.h:257
#define MAX_WRITE_CACHE_SIZE_DIV
Definition midas.h:258
std::timed_mutex buffer_mutex
Definition midas.h:989
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10858 of file midas.cxx.

10859{
10860 /* clear read cache */
10861 if (pbuf->read_cache_size > 0) {
10862
10864
10865 if (status != BM_SUCCESS)
10866 return status;
10867
10868 pbuf->read_cache_rp = 0;
10869 pbuf->read_cache_wp = 0;
10870
10871 pbuf->read_cache_mutex.unlock();
10872 }
10873
10874 bm_lock_buffer_guard pbuf_guard(pbuf);
10875
10876 if (!pbuf_guard.is_locked())
10877 return pbuf_guard.get_status();
10878
10879 BUFFER_HEADER *pheader = pbuf->buffer_header;
10880
10881 /* forward read pointer to global write pointer */
10882 BUFFER_CLIENT *pclient = bm_get_my_client_locked(pbuf_guard);
10883 pclient->read_pointer = pheader->write_pointer;
10884
10885 return BM_SUCCESS;
10886}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 10899 of file midas.cxx.

10899 {
10900 if (rpc_is_remote())
10901 return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
10902
10903#ifdef LOCAL_ROUTINES
10904 {
10905 int status = 0;
10906
10907 BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
10908
10909 if (!pbuf)
10910 return status;
10911
10912 return bm_skip_event(pbuf);
10913 }
10914#endif
10915
10916 return BM_SUCCESS;
10917}
#define RPC_BM_SKIP_EVENT
Definition mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6134 of file midas.cxx.

6134 {
6135 int pid = ss_getpid();
6136
6137 std::vector<BUFFER*> mybuffers;
6138
6139 gBuffersMutex.lock();
6140 mybuffers = gBuffers;
6141 gBuffersMutex.unlock();
6142
6143 for (BUFFER* pbuf : mybuffers) {
6144 if (!pbuf)
6145 continue;
6146 if (pbuf->attached) {
6147
6148 bm_lock_buffer_guard pbuf_guard(pbuf);
6149
6150 if (!pbuf_guard.is_locked())
6151 continue;
6152
6153 BUFFER_HEADER *pheader = pbuf->buffer_header;
6154 for (int j = 0; j < pheader->max_client_index; j++) {
6155 BUFFER_CLIENT *pclient = pheader->client + j;
6156 if (pclient->pid == pid) {
6157 pclient->last_activity = millitime;
6158 }
6159 }
6160 }
6161 }
6162}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char *  caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8730 of file midas.cxx.

8730 {
8731 assert(caller_name);
8732
8733 /* calculate global read pointer as "minimum" of client read pointers */
8734 int min_rp = pheader->write_pointer;
8735
8736 int i;
8737 for (i = 0; i < pheader->max_client_index; i++) {
8738 BUFFER_CLIENT *pc = pheader->client + i;
8739 if (pc->pid) {
8741
8742#if 0
8743 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8744 pheader->name,
8745 pheader->read_pointer,
8746 pheader->write_pointer,
8747 pheader->size,
8748 min_rp,
8749 pc->name,
8750 pc->read_pointer);
8751#endif
8752
8753 if (pheader->read_pointer <= pheader->write_pointer) {
8754 // normal pointers
8755 if (pc->read_pointer < min_rp)
8756 min_rp = pc->read_pointer;
8757 } else {
8758 // inverted pointers
8759 if (pc->read_pointer <= pheader->write_pointer) {
8760 // clients 3 and 4
8761 if (pc->read_pointer < min_rp)
8762 min_rp = pc->read_pointer;
8763 } else {
8764 // clients 1 and 2
8765 int xptr = pc->read_pointer - pheader->size;
8766 if (xptr < min_rp)
8767 min_rp = xptr;
8768 }
8769 }
8770 }
8771 }
8772
8773 if (min_rp < 0)
8774 min_rp += pheader->size;
8775
8776 assert(min_rp >= 0);
8777 assert(min_rp < pheader->size);
8778
8779 if (min_rp == pheader->read_pointer) {
8780 return FALSE;
8781 }
8782
8783#if 0
8784 printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8785 pheader->name,
8786 pheader->read_pointer,
8787 pheader->write_pointer,
8788 pheader->size,
8789 min_rp);
8790#endif
8791
8792 pheader->read_pointer = min_rp;
8793
8794 return TRUE;
8795}
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition midas.cxx:8632
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6324 of file midas.cxx.

6324 {
6325 const BUFFER_HEADER *pheader = pbuf->buffer_header;
6326 const char *pdata = (const char *) (pheader + 1);
6327
6328 //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6329
6330 //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6331
6332 //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6333
6334 if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6335 cm_msg(MERROR, "bm_validate_buffer",
6336 "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6337 pheader->read_pointer, pheader->size, pheader->write_pointer);
6338 return BM_CORRUPTED;
6339 }
6340
6341 if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6342 cm_msg(MERROR, "bm_validate_buffer",
6343 "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6344 pheader->write_pointer, pheader->size, pheader->read_pointer);
6345 return BM_CORRUPTED;
6346 }
6347
6348 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6349 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6350 pheader->read_pointer);
6351 return BM_CORRUPTED;
6352 }
6353
6354 int rp = pheader->read_pointer;
6355 int rp0 = -1;
6356 while (rp != pheader->write_pointer) {
6357 if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6358 cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6359 pheader->name, rp, rp0);
6360 return BM_CORRUPTED;
6361 }
6362 //bm_print_event(pdata, rp);
6363 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6364 if (rp1 < 0) {
6365 cm_msg(MERROR, "bm_validate_buffer",
6366 "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6367 return BM_CORRUPTED;
6368 }
6369 rp0 = rp;
6370 rp = rp1;
6371 }
6372
6373 int i;
6374 for (i = 0; i < MAX_CLIENTS; i++) {
6375 const BUFFER_CLIENT *c = &pheader->client[i];
6376 if (c->pid == 0)
6377 continue;
6378 BOOL get_all = FALSE;
6379 int j;
6380 for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6381 const EVENT_REQUEST *r = &c->event_request[j];
6382 if (!r->valid)
6383 continue;
6384 BOOL xget_all = r->sampling_type == GET_ALL;
6385 get_all = (get_all || xget_all);
6386 //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6387 }
6388
6389 int rp = c->read_pointer;
6390 int rp0 = -1;
6391 while (rp != pheader->write_pointer) {
6392 //bm_print_event(pdata, rp);
6393 int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6394 if (rp1 < 0) {
6395 cm_msg(MERROR, "bm_validate_buffer",
6396 "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6397 pheader->name, c->name, c->read_pointer, rp, rp0);
6398 return BM_CORRUPTED;
6399 }
6400 rp0 = rp;
6401 rp = rp1;
6402 }
6403 }
6404
6405 return BM_SUCCESS;
6406}
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition midas.cxx:6206
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition midas.cxx:6273
char c
Definition system.cxx:1312
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_index_locked()

static int bm_validate_client_index_locked ( bm_lock_buffer_guard pbuf_guard)
static

Definition at line 5939 of file midas.cxx.

5940{
5941 const BUFFER *pbuf = pbuf_guard.get_pbuf();
5942
5943 bool badindex = false;
5944 bool badclient = false;
5945
5946 int idx = pbuf->client_index;
5947
5948 if (idx < 0) {
5949 badindex = true;
5950 } else if (idx > pbuf->buffer_header->max_client_index) {
5951 badindex = true;
5952 } else {
5953 BUFFER_CLIENT *pclient = &pbuf->buffer_header->client[idx];
5954 if (pclient->name[0] == 0)
5955 badclient = true;
5956 else if (pclient->pid != ss_getpid())
5957 badclient = true;
5958
5959 //if (strcmp(pclient->name,"mdump")==0) {
5960 // for (int i=0; i<15; i++) {
5961 // printf("sleep %d\n", i);
5962 // ::sleep(1);
5963 // }
5964 //}
5965 }
5966
5967#if 0
5968 if (badindex) {
5969 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, badindex %d, pid=%d\n",
5970 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5971 badindex, ss_getpid());
5972 } else if (badclient) {
5973 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, badclient %d\n",
5974 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5975 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5976 ss_getpid(), badclient);
5977 } else {
5978 printf("bm_validate_client_index: pbuf=%p, buf_name \"%s\", client_index=%d, max_client_index=%d, client_name=\'%s\', client_pid=%d, pid=%d, goodclient\n",
5979 pbuf, pbuf->buffer_header->name, pbuf->client_index, pbuf->buffer_header->max_client_index,
5980 pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid,
5981 ss_getpid());
5982 }
5983#endif
5984
5985 if (badindex || badclient) {
5986 static int prevent_recursion = 1;
5987
5988 if (prevent_recursion) {
5989 prevent_recursion = 0;
5990
5991 if (badindex) {
5992 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
5993 } else {
5994 cm_msg(MERROR, "bm_validate_client_index", "My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
5995 }
5996
5997 cm_msg(MERROR, "bm_validate_client_index", "Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...");
5998 }
5999
6000 if (badindex) {
6001 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid, max_client_index %d, my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->max_client_index, ss_getpid());
6002 } else {
6003 fprintf(stderr, "bm_validate_client_index: My client index %d in buffer \'%s\' is invalid: client name \'%s\', pid %d should be my pid %d\n", idx, pbuf->buffer_header->name, pbuf->buffer_header->client[idx].name, pbuf->buffer_header->client[idx].pid, ss_getpid());
6004 }
6005
6006 fprintf(stderr, "bm_validate_client_index: Maybe this client was removed by a timeout. See midas.log. Cannot continue, aborting...\n");
6007
6008 pbuf_guard.unlock();
6009
6010 abort();
6011 }
6012
6013 return idx;
6014}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8632 of file midas.cxx.

8632 {
8633 assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8634 assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8635
8636 if (pheader->read_pointer <= pheader->write_pointer) {
8637
8638 if (pclient->read_pointer < pheader->read_pointer) {
8639 cm_msg(MINFO, "bm_validate_client_pointers",
8640 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8641 pclient->name,
8642 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8643
8644 pclient->read_pointer = pheader->read_pointer;
8645 }
8646
8647 if (pclient->read_pointer > pheader->write_pointer) {
8648 cm_msg(MINFO, "bm_validate_client_pointers",
8649 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8650 pclient->name,
8651 pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8652
8653 pclient->read_pointer = pheader->write_pointer;
8654 }
8655
8656 } else {
8657
8658 if (pclient->read_pointer < 0) {
8659 cm_msg(MINFO, "bm_validate_client_pointers",
8660 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8661 pclient->name,
8662 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8663
8664 pclient->read_pointer = pheader->read_pointer;
8665 }
8666
8667 if (pclient->read_pointer >= pheader->size) {
8668 cm_msg(MINFO, "bm_validate_client_pointers",
8669 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8670 pclient->name,
8671 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8672
8673 pclient->read_pointer = pheader->read_pointer;
8674 }
8675
8676 if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8677 cm_msg(MINFO, "bm_validate_client_pointers",
8678 "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8679 pclient->name,
8680 pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8681
8682 pclient->read_pointer = pheader->read_pointer;
8683 }
8684 }
8685}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char *  who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6206 of file midas.cxx.

6206 {
6207 if (rp < 0 || rp > pheader->size) {
6208 cm_msg(MERROR, "bm_validate_rp",
6209 "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6210 pheader->name,
6211 rp,
6212 pheader->read_pointer,
6213 pheader->write_pointer,
6214 pheader->size,
6215 who);
6216 return FALSE;
6217 }
6218
6219 if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6220 // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6221 cm_msg(MERROR, "bm_validate_rp",
6222 "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6223 pheader->name,
6224 rp,
6225 (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6226 pheader->read_pointer,
6227 pheader->write_pointer,
6228 pheader->size,
6229 who);
6230 return FALSE;
6231 }
6232
6233 return TRUE;
6234}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9090 of file midas.cxx.

9091{
9092 // return values:
9093 // BM_SUCCESS - have "requested_space" bytes free in the buffer
9094 // BM_CORRUPTED - shared memory is corrupted
9095 // BM_NO_MEMORY - asked for more than buffer size
9096 // BM_ASYNC_RETURN - timeout waiting for free space
9097 // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9098 // SS_ABORT - we are told to shutdown (locks releases)
9099
9100 int status;
9101 BUFFER* pbuf = pbuf_guard.get_pbuf();
9102 BUFFER_HEADER *pheader = pbuf->buffer_header;
9103 char *pdata = (char *) (pheader + 1);
9104
9105 /* make sure the buffer never completely full:
9106 * read pointer and write pointer would coincide
9107 * and the code cannot tell if it means the
9108 * buffer is 100% full or 100% empty. It will explode
9109 * or lose events */
9110 requested_space += 100;
9111
9112 if (requested_space >= pheader->size)
9113 return BM_NO_MEMORY;
9114
9115 DWORD time_start = ss_millitime();
9116 DWORD time_end = time_start + timeout_msec;
9117
9118 //DWORD blocking_time = 0;
9119 //int blocking_loops = 0;
9120 int blocking_client_index = -1;
9121 char blocking_client_name[NAME_LENGTH];
9122 blocking_client_name[0] = 0;
9123
9124 while (1) {
9125 while (1) {
9126 /* check if enough space in buffer */
9127
9128 int free = pheader->read_pointer - pheader->write_pointer;
9129 if (free <= 0)
9130 free += pheader->size;
9131
9132 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9133
9134 if (requested_space < free) { /* note the '<' to avoid 100% filling */
9135 //if (blocking_loops) {
9136 // DWORD wait_time = ss_millitime() - blocking_time;
9137 // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9138 //}
9139
9140 if (pbuf->wait_start_time != 0) {
9141 DWORD now = ss_millitime();
9142 DWORD wait_time = now - pbuf->wait_start_time;
9143 pbuf->time_write_wait += wait_time;
9144 pbuf->wait_start_time = 0;
9145 int iclient = pbuf->wait_client_index;
9146 //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9147 if (iclient >= 0 && iclient < MAX_CLIENTS) {
9148 pbuf->client_count_write_wait[iclient] += 1;
9149 pbuf->client_time_write_wait[iclient] += wait_time;
9150 }
9151 }
9152
9153 //if (blocking_loops > 0) {
9154 // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9155 //}
9156
9157 return BM_SUCCESS;
9158 }
9159
9160 if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9161 cm_msg(MERROR, "bm_wait_for_free_space",
9162 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9163 pheader->name,
9164 pheader->read_pointer,
9165 pheader->write_pointer,
9166 pheader->size,
9167 free,
9168 requested_space);
9169 return BM_CORRUPTED;
9170 }
9171
9172 const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9173 int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9174 int total_size = ALIGN8(event_size);
9175
9176#if 0
9177 printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9178#endif
9179
9180 if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9181 cm_msg(MERROR, "bm_wait_for_free_space",
9182 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9183 pheader->name,
9184 pheader->read_pointer,
9185 pheader->write_pointer,
9186 pheader->size,
9187 free,
9188 requested_space,
9189 pevent->data_size,
9190 event_size,
9191 total_size);
9192 return BM_CORRUPTED;
9193 }
9194
9195 int blocking_client = -1;
9196
9197 int i;
9198 for (i = 0; i < pheader->max_client_index; i++) {
9199 BUFFER_CLIENT *pc = pheader->client + i;
9200 if (pc->pid) {
9201 if (pc->read_pointer == pheader->read_pointer) {
9202 /*
9203 First assume that the client with the "minimum" read pointer
9204 is not really blocking due to a GET_ALL request.
9205 */
9206 BOOL blocking = FALSE;
9207 //int blocking_request_id = -1;
9208
9209 int j;
9210 for (j = 0; j < pc->max_request_index; j++) {
9211 const EVENT_REQUEST *prequest = pc->event_request + j;
9212 if (prequest->valid
9213 && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9214 if (prequest->sampling_type & GET_ALL) {
9215 blocking = TRUE;
9216 //blocking_request_id = prequest->id;
9217 break;
9218 }
9219 }
9220 }
9221
9222 //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9223
9224 if (blocking) {
9225 blocking_client = i;
9226 break;
9227 }
9228
9229 pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9230 }
9231 }
9232 } /* client loop */
9233
9234 if (blocking_client >= 0) {
9235 blocking_client_index = blocking_client;
9236 mstrlcpy(blocking_client_name, pheader->client[blocking_client].name, sizeof(blocking_client_name));
9237 //if (!blocking_time) {
9238 // blocking_time = ss_millitime();
9239 //}
9240
9241 //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9242
9243 // from this "break" we go into timeout check and sleep/wait.
9244 break;
9245 }
9246
9247 /* no blocking clients. move the read pointer and again check for free space */
9248
9249 BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9250
9251 if (!moved) {
9252 cm_msg(MERROR, "bm_wait_for_free_space",
9253 "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9254 pheader->name,
9255 pheader->read_pointer,
9256 pheader->write_pointer,
9257 pheader->size,
9258 free,
9259 requested_space);
9260 return BM_CORRUPTED;
9261 }
9262
9263 /* we freed one event, loop back to the check for free space */
9264 }
9265
9266 //blocking_loops++;
9267
9268 /* at least one client is blocking */
9269
9270 BUFFER_CLIENT *pc = bm_get_my_client_locked(pbuf_guard);
9271 pc->write_wait = requested_space;
9272
9273 if (pbuf->wait_start_time == 0) {
9274 pbuf->wait_start_time = ss_millitime();
9275 pbuf->count_write_wait++;
9276 if (requested_space > pbuf->max_requested_space)
9277 pbuf->max_requested_space = requested_space;
9278 pbuf->wait_client_index = blocking_client_index;
9279 }
9280
9281 DWORD now = ss_millitime();
9282
9283 //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9284
9285 int sleep_time_msec = 1000;
9286
9287 if (timeout_msec == BM_WAIT) {
9288 // wait forever
9289 } else if (timeout_msec == BM_NO_WAIT) {
9290 // no wait
9291 return BM_ASYNC_RETURN;
9292 } else {
9293 // check timeout
9294 if (now >= time_end) {
9295 // timeout!
9296 return BM_ASYNC_RETURN;
9297 }
9298
9299 sleep_time_msec = time_end - now;
9300
9301 if (sleep_time_msec <= 0) {
9302 sleep_time_msec = 10;
9303 } else if (sleep_time_msec > 1000) {
9304 sleep_time_msec = 1000;
9305 }
9306 }
9307
9309
9310 /* before waiting, unlock everything in the correct order */
9311
9312 pbuf_guard.unlock();
9313
9314 if (unlock_write_cache)
9315 pbuf->write_cache_mutex.unlock();
9316
9317 //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9318
9319#ifdef DEBUG_MSG
9320 cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9321#endif
9322
9324 //int idx = bm_validate_client_index_locked(pbuf, FALSE);
9325 //if (idx >= 0)
9326 // pheader->client[idx].write_wait = requested_space;
9327
9328 //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9329
9330 status = ss_suspend(sleep_time_msec, MSG_BM);
9331
9332 /* we are told to shutdown */
9333 if (status == SS_ABORT) {
9334 // NB: buffer is locked!
9335 return SS_ABORT;
9336 }
9337
9338 /* make sure we do sleep in this loop:
9339 * if we are the mserver receiving data on the event
9340 * socket and the data buffer is full, ss_suspend() will
9341 * never sleep: it will detect data on the event channel,
9342 * call rpc_server_receive() (recursively, we already *are* in
9343 * rpc_server_receive()) and return without sleeping. Result
9344 * is a busy loop waiting for free space in data buffer */
9345
9346 /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9347 * the event socket, and should sleep now, so this sleep below
9348 * maybe is not needed now. but for safety, I keep it. K.O. */
9349
9350 if (status != SS_TIMEOUT) {
9351 //printf("ss_suspend: status %d\n", status);
9352 ss_sleep(1);
9353 }
9354
9355 /* we may be stuck in this loop for an arbitrary long time,
9356 * depending on how other buffer clients read the accumulated data
9357 * so we should update all the timeouts & etc. K.O. */
9358
9360
9361 /* lock things again in the correct order */
9362
9363 if (unlock_write_cache) {
9365
9366 if (status != BM_SUCCESS) {
9367 // bail out with all locks released
9368 return status;
9369 }
9370 }
9371
9372 if (!pbuf_guard.relock()) {
9373 if (unlock_write_cache) {
9374 pbuf->write_cache_mutex.unlock();
9375 }
9376
9377 // bail out with all locks released
9378 return pbuf_guard.get_status();
9379 }
9380
9381 /* revalidate the client index: we could have been removed from the buffer while sleeping */
9382 pc = bm_get_my_client_locked(pbuf_guard);
9383
9384 pc->write_wait = 0;
9385
9387 //idx = bm_validate_client_index_locked(pbuf, FALSE);
9388 //if (idx >= 0)
9389 // pheader->client[idx].write_wait = 0;
9390 //else {
9391 // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9392 // status = SS_ABORT;
9393 //}
9394
9395#ifdef DEBUG_MSG
9396 cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9397#endif
9398
9399 }
9400}
int get_status() const
Definition midas.cxx:3191
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition midas.cxx:8730
INT cm_periodic_tasks()
Definition midas.cxx:5596
#define SS_TIMEOUT
Definition midas.h:675
#define MDEBUG
Definition midas.h:561
#define MSG_BM
Definition msystem.h:302
INT ss_suspend(INT millisec, INT msg)
Definition system.cxx:4615
INT ss_sleep(INT millisec)
Definition system.cxx:3700
int max_requested_space
Definition midas.h:1020
int count_write_wait
Definition midas.h:1015
int wait_client_index
Definition midas.h:1019
DWORD time_write_wait
Definition midas.h:1016
DWORD wait_start_time
Definition midas.h:1018
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9402 of file midas.cxx.

9403{
9404 BUFFER* pbuf = pbuf_guard.get_pbuf();
9405 BUFFER_HEADER* pheader = pbuf->buffer_header;
9406
9407 //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9408
9409 if (pc->read_pointer != pheader->write_pointer) {
9410 // buffer has data
9411 return BM_SUCCESS;
9412 }
9413
9414 if (timeout_msec == BM_NO_WAIT) {
9415 /* event buffer is empty and we are told to not wait */
9416 if (!pc->read_wait) {
9417 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9418 pc->read_wait = TRUE;
9419 }
9420 return BM_ASYNC_RETURN;
9421 }
9422
9423 DWORD time_start = ss_millitime();
9424 DWORD time_wait = time_start + timeout_msec;
9425 DWORD sleep_time = 1000;
9426 if (timeout_msec == BM_NO_WAIT) {
9427 // default sleep time
9428 } else if (timeout_msec == BM_WAIT) {
9429 // default sleep time
9430 } else {
9431 if (sleep_time > (DWORD)timeout_msec)
9432 sleep_time = timeout_msec;
9433 }
9434
9435 //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9436
9437 while (pc->read_pointer == pheader->write_pointer) {
9438 /* wait until there is data in the buffer (write pointer moves) */
9439
9440 if (!pc->read_wait) {
9441 //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9442 pc->read_wait = TRUE;
9443 }
9444
9446
9448
9449 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9450
9451 pbuf_guard.unlock();
9452
9453 if (unlock_read_cache)
9454 pbuf->read_cache_mutex.unlock();
9455
9456 int status = ss_suspend(sleep_time, MSG_BM);
9457
9458 if (timeout_msec == BM_NO_WAIT) {
9459 // return immediately
9460 } else if (timeout_msec == BM_WAIT) {
9461 // wait forever
9462 } else {
9463 DWORD now = ss_millitime();
9464 //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9465 if (now >= time_wait) {
9466 timeout_msec = BM_NO_WAIT; // cause immediate return
9467 } else {
9468 sleep_time = time_wait - now;
9469 if (sleep_time > 1000)
9470 sleep_time = 1000;
9471 //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9472 }
9473 }
9474
9475 // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9476
9477 if (unlock_read_cache) {
9479 if (status != BM_SUCCESS) {
9480 // bail out with all locks released
9481 return status;
9482 }
9483 }
9484
9485 if (!pbuf_guard.relock()) {
9486 if (unlock_read_cache) {
9487 pbuf->read_cache_mutex.unlock();
9488 }
9489 // bail out with all locks released
9490 return pbuf_guard.get_status();
9491 }
9492
9493 /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9494 * because we may have been removed from the buffer by bm_cleanup() & co
9495 * due to a timeout or whatever. */
9496 pc = bm_get_my_client_locked(pbuf_guard);
9497
9498 /* return if TCP connection broken */
9499 if (status == SS_ABORT)
9500 return SS_ABORT;
9501
9502 if (timeout_msec == BM_NO_WAIT)
9503 return BM_ASYNC_RETURN;
9504 }
9505
9506 if (pc->read_wait) {
9507 //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9508 pc->read_wait = FALSE;
9509 }
9510
9511 return BM_SUCCESS;
9512}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8797 of file midas.cxx.

8797 {
8798 int i;
8799 int have_get_all_requests = 0;
8800
8801 for (i = 0; i < pc->max_request_index; i++)
8802 if (pc->event_request[i].valid)
8803 have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8804
8805 /* only GET_ALL requests actually free space in the event buffer */
8806 if (!have_get_all_requests)
8807 return;
8808
8809 /*
8810 If read pointer has been changed, it may have freed up some space
8811 for waiting producers. So check if free space is now more than 50%
8812 of the buffer size and wake waiting producers.
8813 */
8814
8815 int free_space = pc->read_pointer - pheader->write_pointer;
8816 if (free_space <= 0)
8817 free_space += pheader->size;
8818
8819 if (free_space >= pheader->size * 0.5) {
8820 for (i = 0; i < pheader->max_client_index; i++) {
8821 const BUFFER_CLIENT *pc = pheader->client + i;
8822 if (pc->pid && pc->write_wait) {
8823 BOOL send_wakeup = (pc->write_wait < free_space);
8824 //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8825 if (send_wakeup) {
8826 ss_resume(pc->port, "B ");
8827 }
8828 }
8829 }
8830 }
8831}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6596 of file midas.cxx.

6597{
6598 //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6599
6600 bm_lock_buffer_guard pbuf_guard(pbuf);
6601
6602 if (!pbuf_guard.is_locked())
6603 return;
6604
6605 if (!force) {
6606 if (pbuf->count_lock == pbuf->last_count_lock) {
6607 return;
6608 }
6609 }
6610
6611 std::string buffer_name = pbuf->buffer_name;
6612 std::string client_name = pbuf->client_name;
6613
6614 if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6615 // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6616 pbuf_guard.unlock(); // unlock before cm_msg()
6617 cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6618 return;
6619 }
6620
6621 pbuf->last_count_lock = pbuf->count_lock;
6622
6623 BUFFER_INFO xbuf(pbuf);
6624 BUFFER_HEADER xheader = *pbuf->buffer_header;
6625 int client_index = pbuf->client_index;
6626
6627 pbuf_guard.unlock();
6628
6629 bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6630}
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition midas.cxx:6474
int last_count_lock
Definition midas.h:1017
int count_lock
Definition midas.h:1012
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char *  buffer_name,
const char *  client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6474 of file midas.cxx.

6475{
6476 int status;
6477
6478 DWORD now = ss_millitime();
6479
6480 HNDLE hKey;
6481 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6482 if (status != DB_SUCCESS) {
6483 db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6484 status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6485 if (status != DB_SUCCESS)
6486 return;
6487 }
6488
6489 HNDLE hKeyBuffer;
6490 status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6491 if (status != DB_SUCCESS) {
6493 status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6494 if (status != DB_SUCCESS)
6495 return;
6496 }
6497
6498 double buf_size = pheader->size;
6499 double buf_rptr = pheader->read_pointer;
6500 double buf_wptr = pheader->write_pointer;
6501
6502 double buf_fill = 0;
6503 double buf_cptr = 0;
6504 double buf_cused = 0;
6505 double buf_cused_pct = 0;
6506
6507 if (client_index >= 0 && client_index <= pheader->max_client_index) {
6508 buf_cptr = pheader->client[client_index].read_pointer;
6509
6510 if (buf_wptr == buf_cptr) {
6511 buf_cused = 0;
6512 } else if (buf_wptr > buf_cptr) {
6513 buf_cused = buf_wptr - buf_cptr;
6514 } else {
6515 buf_cused = (buf_size - buf_cptr) + buf_wptr;
6516 }
6517
6518 buf_cused_pct = buf_cused / buf_size * 100.0;
6519
6520 // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6521 // because some other GET_ALL client may have different buf_cused & etc,
6522 // so they must be written into the per-client statistics
6523 // and the web page should look at all the GET_ALL clients and used
6524 // the biggest buf_cused as the whole-buffer "bytes used" value.
6525 }
6526
6527 if (buf_wptr == buf_rptr) {
6528 buf_fill = 0;
6529 } else if (buf_wptr > buf_rptr) {
6530 buf_fill = buf_wptr - buf_rptr;
6531 } else {
6532 buf_fill = (buf_size - buf_rptr) + buf_wptr;
6533 }
6534
6535 double buf_fill_pct = buf_fill / buf_size * 100.0;
6536
6537 db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6538 db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6539 db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6540 db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6541 db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6542
6543 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6544 if (status != DB_SUCCESS) {
6545 db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6546 status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6547 if (status != DB_SUCCESS)
6548 return;
6549 }
6550
6551 HNDLE hKeyClient;
6552 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6553 if (status != DB_SUCCESS) {
6554 db_create_key(hDB, hKey, client_name, TID_KEY);
6555 status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6556 if (status != DB_SUCCESS)
6557 return;
6558 }
6559
6560 db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6561 db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6562 db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6563 db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6564 db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6565 db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6566 db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6567 db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6568 db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6569 db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6570 db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6571 db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6572
6573 for (int i = 0; i < MAX_CLIENTS; i++) {
6574 if (!pbuf->client_count_write_wait[i])
6575 continue;
6576
6577 if (pheader->client[i].pid == 0)
6578 continue;
6579
6580 if (pheader->client[i].name[0] == 0)
6581 continue;
6582
6583 char str[100 + NAME_LENGTH];
6584
6585 sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6586 db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6587
6588 sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6589 db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6590 }
6591
6592 db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6593 db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6594}
#define TID_DOUBLE
Definition midas.h:343
#define TID_KEY
Definition midas.h:349
#define TID_BOOL
Definition midas.h:340
#define TID_INT32
Definition midas.h:339
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition odb.cxx:3392
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition odb.cxx:5028
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4256
HNDLE hKey
int count_sent
Definition midas.cxx:6437
BOOL get_all_flag
Definition midas.cxx:6433
int count_lock
Definition midas.cxx:6436
int count_write_wait
Definition midas.cxx:6439
double bytes_read
Definition midas.cxx:6446
int client_count_write_wait[MAX_CLIENTS]
Definition midas.cxx:6447
DWORD time_write_wait
Definition midas.cxx:6440
int count_read
Definition midas.cxx:6445
double bytes_sent
Definition midas.cxx:6438
DWORD client_time_write_wait[MAX_CLIENTS]
Definition midas.cxx:6448
int max_requested_space
Definition midas.cxx:6444
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7290 of file midas.cxx.

7290 {
7291#ifdef LOCAL_ROUTINES
7292 {
7293 int status;
7294 HNDLE hDB;
7295
7297
7298 if (status != CM_SUCCESS) {
7299 //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7300 return BM_SUCCESS;
7301 }
7302
7303 std::vector<BUFFER*> mybuffers;
7304
7305 gBuffersMutex.lock();
7306 mybuffers = gBuffers;
7307 gBuffersMutex.unlock();
7308
7309 for (BUFFER* pbuf : mybuffers) {
7310 if (!pbuf || !pbuf->attached)
7311 continue;
7313 }
7314 }
7315#endif /* LOCAL_ROUTINES */
7316
7317 return BM_SUCCESS;
7318}
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9514 of file midas.cxx.

9515{
9516 char *pdata = (char *) (pheader + 1);
9517
9518 //int old_write_pointer = pheader->write_pointer;
9519
9520 /* new event fits into the remaining space? */
9521 if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9522 //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9523 char* wptr = pdata + pheader->write_pointer;
9524 for (int i=0; i<sg_n; i++) {
9525 //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9526 memcpy(wptr, sg_ptr[i], sg_len[i]);
9527 wptr += sg_len[i];
9528 }
9529 pheader->write_pointer = pheader->write_pointer + total_size;
9530 assert(pheader->write_pointer <= pheader->size);
9531 /* remaining space is smaller than size of an event header? */
9532 if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9533 // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9534 // equal to the event header size, we will write the next event header here,
9535 // then wrap the pointer and write the event data at the beginning of the buffer.
9536 //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9537 pheader->write_pointer = 0;
9538 }
9539 } else {
9540 /* split event */
9541 size_t size = pheader->size - pheader->write_pointer;
9542
9543 //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9544
9545 //memcpy(pdata + pheader->write_pointer, pevent, size);
9546 //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9547
9548 char* wptr = pdata + pheader->write_pointer;
9549 size_t count = 0;
9550
9551 // copy first part
9552
9553 int i = 0;
9554 for (; i<sg_n; i++) {
9555 if (count + sg_len[i] > size)
9556 break;
9557 memcpy(wptr, sg_ptr[i], sg_len[i]);
9558 wptr += sg_len[i];
9559 count += sg_len[i];
9560 }
9561
9562 //printf("wptr %d, count %d\n", wptr-pdata, count);
9563
9564 // split segment
9565
9566 size_t first = size - count;
9567 size_t second = sg_len[i] - first;
9568 assert(first + second == sg_len[i]);
9569 assert(count + first == size);
9570
9571 //printf("first %d, second %d\n", first, second);
9572
9573 memcpy(wptr, sg_ptr[i], first);
9574 wptr = pdata + 0;
9575 count += first;
9576 memcpy(wptr, sg_ptr[i] + first, second);
9577 wptr += second;
9578 count += second;
9579 i++;
9580
9581 // copy remaining
9582
9583 for (; i<sg_n; i++) {
9584 memcpy(wptr, sg_ptr[i], sg_len[i]);
9585 wptr += sg_len[i];
9586 count += sg_len[i];
9587 }
9588
9589 //printf("wptr %d, count %d\n", wptr-pdata, count);
9590
9591 //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9592
9593 pheader->write_pointer = total_size - size;
9594 }
9595
9596 //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9597}
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5936 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5931 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5937 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11306 of file midas.cxx.