Line data Source code
1 : /********************************************************************\
2 :
3 : Name: produce.c
4 : Created by: Stefan Ritt
5 :
6 : Contents: Buffer manager test program. Simple producer connec-
7 : ting to a SYSTEM buffer and sending some data.
8 :
9 : $Id$
10 :
11 : \********************************************************************/
12 :
13 : #include <stdio.h>
14 : #include <string.h>
15 : #include "midas.h"
16 : #include "msystem.h"
17 :
18 : #define MAX_EVENT_SIZE (10*1024*1024)
19 :
20 : /*------------------------------------------------------------------*/
21 :
22 : #ifdef OS_VXWORKS
23 : produce()
24 : #else
25 0 : int main()
26 : #endif
27 : {
28 : INT hBuf, status, i;
29 : char *event, str[256];
30 : INT *pdata;
31 : INT start, stop;
32 : double count, rate;
33 0 : int id, size, event_size, act_size, variable_size, rpc_mode, flush = 0;
34 : char host_name[256];
35 : BUFFER_HEADER buffer_header;
36 :
37 0 : setbuf(stdout, NULL);
38 0 : setbuf(stderr, NULL);
39 :
40 : /* get parameters */
41 :
42 0 : printf("ID of event to produce: ");
43 0 : ss_gets(str, 256);
44 0 : id = atoi(str);
45 :
46 0 : printf("Host to connect: ");
47 0 : ss_gets(host_name, 256);
48 :
49 0 : printf("Event size: ");
50 0 : ss_gets(str, 256);
51 0 : event_size = atoi(str);
52 0 : if (event_size < 0) {
53 0 : variable_size = 1;
54 0 : event_size = -event_size;
55 : } else
56 0 : variable_size = 0;
57 :
58 0 : printf("RPC mode ([0]/1): ");
59 0 : ss_gets(str, 32);
60 0 : rpc_mode = atoi(str);
61 :
62 : /* connect to experiment */
63 0 : status = cm_connect_experiment(host_name, "", "Producer", NULL);
64 0 : if (status != CM_SUCCESS)
65 0 : return 1;
66 :
67 : /* open the event buffer with default size */
68 0 : bm_open_buffer(EVENT_BUFFER_NAME, 2*MAX_EVENT_SIZE, &hBuf);
69 :
70 : /* set the buffer write cache size */
71 0 : bm_set_cache_size(hBuf, 0, 100000);
72 :
73 : /* allocate event buffer */
74 0 : event = (char *) malloc(event_size + sizeof(EVENT_HEADER));
75 0 : memset(event, 0, event_size + sizeof(EVENT_HEADER));
76 0 : if (event == NULL) {
77 0 : printf("Not enough memory for event buffer\n");
78 0 : goto error;
79 : }
80 0 : pdata = (INT *) (event + sizeof(EVENT_HEADER));
81 :
82 : do {
83 0 : start = ss_millitime();
84 0 : count = 0;
85 :
86 : do {
87 0 : for (i = 0; i < 10; i++) {
88 0 : if (variable_size)
89 0 : act_size = (rand() % (event_size - 10)) + 10;
90 : else
91 0 : act_size = event_size;
92 :
93 : /* place the event size in the first and last word of
94 : the event to check later if data has been overwritten
95 : accidentally */
96 0 : pdata[0] = act_size;
97 0 : pdata[act_size / 4 - 1] = act_size;
98 :
99 : /* compose an event header with serial number */
100 0 : bm_compose_event((EVENT_HEADER *) event, (short) id, 1,
101 0 : act_size, ((EVENT_HEADER *) (event))->serial_number + 1);
102 :
103 0 : if (act_size < 0)
104 0 : printf("Error: act_size = %d, size = %d\n", act_size, event_size);
105 :
106 : /* now send event */
107 0 : status = rpc_send_event(hBuf, (const EVENT_HEADER *)event, act_size + sizeof(EVENT_HEADER), BM_WAIT, rpc_mode);
108 :
109 0 : if (status != BM_SUCCESS) {
110 0 : printf("rpc_send_event returned error %d, event_size %d\n",
111 : status, act_size);
112 0 : goto error;
113 : }
114 :
115 0 : count += act_size + sizeof(EVENT_HEADER);
116 : }
117 :
118 : /* repeat this loop for 1s */
119 0 : } while (ss_millitime() - start < 1000);
120 :
121 : /* Now calculate the rate */
122 0 : stop = ss_millitime();
123 0 : if (stop != start)
124 0 : rate = count / 1024.0 / 1024.0 / (stop / 1000.0 - start / 1000.0);
125 : else
126 0 : rate = 0;
127 :
128 : /* get information about filling level of the buffer */
129 0 : bm_get_buffer_info(hBuf, &buffer_header);
130 0 : size = buffer_header.read_pointer - buffer_header.write_pointer;
131 0 : if (size <= 0)
132 0 : size += buffer_header.size;
133 0 : printf("Level: %5.1lf %%, ", 100 - 100.0 * size / buffer_header.size);
134 :
135 0 : printf("Rate: %1.2lf MB/sec\n", rate);
136 :
137 : /* flush buffers every 10 seconds */
138 0 : if ((flush++) % 10 == 0) {
139 0 : rpc_flush_event();
140 0 : bm_flush_cache(hBuf, BM_WAIT);
141 0 : printf("flush\n");
142 : }
143 :
144 0 : status = cm_yield(0);
145 :
146 0 : } while (status != RPC_SHUTDOWN && status != SS_ABORT);
147 :
148 0 : error:
149 :
150 0 : cm_disconnect_experiment();
151 0 : return 1;
152 : }
|