Line data Source code
1 : /********************************************************************\
2 :
3 : Name: consume.c
4 : Created by: Stefan Ritt
5 :
6 : Contents: Buffer manager test program. Simple consumer connec-
7 : ting to a SYSTEM buffer and receiving some data.
8 :
9 : $Id$
10 :
11 : \********************************************************************/
12 :
13 : #include <stdio.h>
14 : #include <stdlib.h>
15 : #include "midas.h"
16 :
17 : #define MAX_EVENT_SIZE (10*1024*1024)
18 :
19 : INT all_flag;
20 : INT hBufEvent;
21 : INT event_byte_count;
22 : INT count_mismatches = 0;
23 : char event_buffer[MAX_EVENT_SIZE];
24 :
25 : /*----- Print any system message -----------------------------------*/
26 :
27 0 : void process_message(HNDLE hBuf, HNDLE id, EVENT_HEADER * pheader, void *message)
28 : {
29 : /* just print message text which comes after event header */
30 0 : printf("%s\n", (char *) message);
31 0 : }
32 :
33 : /*----- Print any run transition -----------------------------------*/
34 :
35 0 : INT transition(INT run_number, char *error)
36 : {
37 0 : printf("Transition, run #%d\n", run_number);
38 0 : return CM_SUCCESS;
39 : }
40 :
41 : /*----- process_event ----------------------------------------------*/
42 :
43 0 : void process_event(HNDLE hBuf, HNDLE request_id, EVENT_HEADER * pheader, void *pevent)
44 : {
45 : static int ser[10], jam = 0;
46 :
47 : int size, *pdata, id;
48 :
49 : /* accumulate received event size */
50 0 : size = pheader->data_size;
51 0 : id = pheader->event_id;
52 0 : if (id > 9)
53 0 : id = 9;
54 0 : event_byte_count += size + sizeof(EVENT_HEADER);
55 :
56 : /* check if first and last word inside event is equal
57 : to size to check that nothing was overwritten... */
58 :
59 0 : if (!jam) {
60 : /* only test once */
61 0 : pdata = (INT *) (pheader + 1);
62 0 : if (pdata[0] != size || pdata[size / 4 - 1] != size)
63 0 : cm_msg(MERROR, "process_event", "--> data jam <--");
64 0 : jam = 1;
65 : }
66 :
67 : /* if only some events are requested, sleep a little bit
68 : to simulate a random event consumer */
69 0 : if (!all_flag)
70 0 : ss_sleep(10);
71 :
72 : /* if all events are requested, now check the serial number
73 : if no events are missing */
74 0 : if (all_flag && (INT) pheader->serial_number != ser[id] + 1) {
75 0 : cm_msg(MERROR, "process_event",
76 : "Serial number mismatch: Ser: %d, OldSer: %d, ID: %d, size: %d",
77 0 : pheader->serial_number, ser[id], pheader->event_id, pheader->data_size);
78 0 : count_mismatches ++;
79 : }
80 :
81 0 : ser[id] = pheader->serial_number;
82 0 : }
83 :
84 : /*------------------------------------------------------------------*/
85 :
86 0 : int main()
87 : {
88 : INT start_time, stop_time;
89 : double rate;
90 : BUFFER_HEADER buffer_header;
91 : INT status, size, trans, run_number;
92 : char host_name[256], str[32];
93 : INT event_id, request_id;
94 : DWORD last_time;
95 : BOOL via_callback;
96 :
97 : /* get parameters */
98 :
99 0 : printf("ID of event to request: ");
100 0 : ss_gets(str, 32);
101 0 : event_id = atoi(str);
102 :
103 0 : printf("Host to connect: ");
104 0 : ss_gets(host_name, 256);
105 :
106 0 : printf("Get all events (0/1): ");
107 0 : ss_gets(str, 32);
108 0 : all_flag = atoi(str);
109 :
110 0 : printf("Receive via callback ([y]/n): ");
111 0 : ss_gets(str, 32);
112 0 : via_callback = str[0] != 'n';
113 :
114 : /* connect to experiment */
115 0 : status = cm_connect_experiment(host_name, "",
116 0 : all_flag ? "Power Consumer" : "Consumer", NULL);
117 0 : if (status != CM_SUCCESS)
118 0 : return 1;
119 :
120 : /* open the "system" buffer, 1M size */
121 0 : bm_open_buffer("SYSTEM", 2*MAX_EVENT_SIZE, &hBufEvent);
122 :
123 : /* set the buffer cache size */
124 0 : bm_set_cache_size(hBufEvent, 100000, 0);
125 :
126 : /* place a request for a specific event id */
127 0 : bm_request_event(hBufEvent, (WORD) event_id, TRIGGER_ALL,
128 0 : all_flag ? GET_ALL : GET_NONBLOCKING, &request_id,
129 : via_callback ? process_event : NULL);
130 :
131 : /* place a request for system messages */
132 0 : cm_msg_register(process_message);
133 :
134 : /* place a request for transition notification */
135 0 : cm_register_transition(TR_START, via_callback ? transition : NULL, 500);
136 :
137 0 : last_time = 0;
138 0 : start_time = 0;
139 0 : event_byte_count = 0;
140 :
141 : do {
142 0 : if (via_callback)
143 0 : status = cm_yield(1000);
144 : else {
145 : /* receive event "manually" and call receive_event */
146 0 : size = sizeof(event_buffer);
147 0 : status = bm_receive_event(hBufEvent, event_buffer, &size, BM_NO_WAIT);
148 0 : if (status == BM_SUCCESS)
149 0 : process_event(hBufEvent, request_id, (EVENT_HEADER *) event_buffer,
150 : (void *) (((EVENT_HEADER *) event_buffer) + 1));
151 :
152 : /* receive transitions "manually" */
153 0 : if (cm_query_transition(&trans, &run_number, NULL))
154 0 : transition(run_number, NULL);
155 :
156 : /* call yield once every 100 ms */
157 0 : if (ss_millitime() - last_time > 100) {
158 0 : last_time = ss_millitime();
159 0 : status = cm_yield(0);
160 : }
161 : }
162 :
163 : /* calculate rates each second */
164 0 : if (ss_millitime() - start_time > 1000) {
165 0 : stop_time = ss_millitime();
166 0 : rate = event_byte_count / 1024.0 / 1024.0 / ((stop_time - start_time) / 1000.0);
167 :
168 : /* get information about filling level of the buffer */
169 0 : bm_get_buffer_info(hBufEvent, &buffer_header);
170 0 : size = buffer_header.read_pointer - buffer_header.write_pointer;
171 0 : if (size <= 0)
172 0 : size += buffer_header.size;
173 0 : printf("Level: %5.1lf %%, ", 100 - 100.0 * size / buffer_header.size);
174 :
175 0 : printf("Rate: %1.2lf MB/sec, ser mismatches: %d\n", rate, count_mismatches);
176 0 : start_time = stop_time;
177 0 : event_byte_count = 0;
178 : }
179 :
180 0 : } while (status != RPC_SHUTDOWN && status != SS_ABORT);
181 :
182 0 : cm_disconnect_experiment();
183 :
184 0 : return 1;
185 : }
|