Line data Source code
1 : //
2 : // feudp.cxx
3 : //
4 : // Frontend for receiving and storing UDP packets as MIDAS data banks.
5 : //
6 :
7 : #include <stdio.h>
8 : #include <netdb.h> // getnameinfo()
9 : //#include <stdlib.h>
10 : #include <string.h> // memcpy()
11 : #include <errno.h> // errno
12 : //#include <unistd.h>
13 : //#include <time.h>
14 :
15 : #include <string>
16 : #include <vector>
17 :
18 : #include "midas.h"
19 : #include "mfe.h"
20 : #include "mstrlcpy.h"
21 :
22 : const char *frontend_name = "feudp"; /* fe MIDAS client name */
23 : const char *frontend_file_name = __FILE__; /* The frontend file name */
24 :
25 : BOOL frontend_call_loop = TRUE; /* frontend_loop called periodically TRUE */
26 : int display_period = 0; /* status page displayed with this freq[ms] */
27 : int max_event_size = 1*1024*1024; /* max event size produced by this frontend */
28 : int max_event_size_frag = 5 * 1024 * 1024; /* max for fragmented events */
29 : int event_buffer_size = 8*1024*1024; /* buffer size to hold events */
30 :
31 : int interrupt_configure(INT cmd, INT source, PTYPE adr);
32 : INT poll_event(INT source, INT count, BOOL test);
33 : int frontend_init();
34 : int frontend_exit();
35 : int begin_of_run(int run, char *err);
36 : int end_of_run(int run, char *err);
37 : int pause_run(int run, char *err);
38 : int resume_run(int run, char *err);
39 : int frontend_loop();
40 : int read_event(char *pevent, INT off);
41 :
42 : #ifndef EQ_NAME
43 : #define EQ_NAME "UDP"
44 : #endif
45 :
46 : #ifndef EQ_EVID
47 : #define EQ_EVID 1
48 : #endif
49 :
50 : BOOL equipment_common_overwrite = FALSE;
51 :
52 : EQUIPMENT equipment[] = {
53 : { EQ_NAME, /* equipment name */
54 : {EQ_EVID, 0, "SYSTEM", /* event ID, trigger mask, Evbuf */
55 : EQ_MULTITHREAD, 0, "MIDAS", /* equipment type, EventSource, format */
56 : TRUE, RO_ALWAYS, /* enabled?, WhenRead? */
57 : 50, 0, 0, 0, /* poll[ms], Evt Lim, SubEvtLim, LogHist */
58 : "", "", "",}, read_event, /* readout routine */
59 : },
60 : {""}
61 : };
62 : ////////////////////////////////////////////////////////////////////////////
63 :
64 : #include <sys/time.h>
65 :
66 0 : static double GetTimeSec()
67 : {
68 : struct timeval tv;
69 0 : gettimeofday(&tv,NULL);
70 0 : return tv.tv_sec + 0.000001*tv.tv_usec;
71 : }
72 :
73 : struct Source
74 : {
75 : struct sockaddr addr;
76 : char bank_name[5];
77 : std::string host_name;
78 : };
79 :
80 : static std::vector<Source> gSrc;
81 :
82 : //static HNDLE hDB;
83 : static HNDLE hKeySet; // equipment settings
84 :
85 : static int gDataSocket;
86 :
87 : static int gUnknownPacketCount = 0;
88 : static bool gSkipUnknownPackets = false;
89 :
90 0 : int open_udp_socket(int server_port)
91 : {
92 : int status;
93 :
94 0 : int fd = socket(AF_INET, SOCK_DGRAM, 0);
95 :
96 0 : if (fd < 0) {
97 0 : cm_msg(MERROR, "open_udp_socket", "socket(AF_INET,SOCK_DGRAM) returned %d, errno %d (%s)", fd, errno, strerror(errno));
98 0 : return -1;
99 : }
100 :
101 0 : int opt = 1;
102 0 : status = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
103 :
104 0 : if (status == -1) {
105 0 : cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_REUSEADDR) returned %d, errno %d (%s)", status, errno, strerror(errno));
106 0 : return -1;
107 : }
108 :
109 0 : int bufsize = 8*1024*1024;
110 : //int bufsize = 20*1024;
111 :
112 0 : status = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
113 :
114 0 : if (status == -1) {
115 0 : cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
116 0 : return -1;
117 : }
118 :
119 : struct sockaddr_in local_addr;
120 0 : memset(&local_addr, 0, sizeof(local_addr));
121 :
122 0 : local_addr.sin_family = AF_INET;
123 0 : local_addr.sin_port = htons(server_port);
124 0 : local_addr.sin_addr.s_addr = INADDR_ANY;
125 :
126 0 : status = bind(fd, (struct sockaddr *)&local_addr, sizeof(local_addr));
127 :
128 0 : if (status == -1) {
129 0 : cm_msg(MERROR, "open_udp_socket", "bind(port=%d) returned %d, errno %d (%s)", server_port, status, errno, strerror(errno));
130 0 : return -1;
131 : }
132 :
133 0 : int xbufsize = 0;
134 0 : unsigned size = sizeof(xbufsize);
135 :
136 0 : status = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &xbufsize, &size);
137 :
138 : //printf("status %d, xbufsize %d, size %d\n", status, xbufsize, size);
139 :
140 0 : if (status == -1) {
141 0 : cm_msg(MERROR, "open_udp_socket", "getsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
142 0 : return -1;
143 : }
144 :
145 0 : cm_msg(MINFO, "open_udp_socket", "UDP port %d socket receive buffer size is %d", server_port, xbufsize);
146 :
147 0 : return fd;
148 : }
149 :
150 0 : bool addr_match(const Source* s, void *addr, int addr_len)
151 : {
152 0 : int v = memcmp(&s->addr, addr, addr_len);
153 : #if 0
154 : for (int i=0; i<addr_len; i++)
155 : printf("%3d - 0x%02x 0x%02x\n", i, ((char*)&s->addr)[i], ((char*)addr)[i]);
156 : printf("match %d, hostname [%s] bank [%s], status %d\n", addr_len, s->host_name.c_str(), s->bank_name, v);
157 : #endif
158 0 : return v==0;
159 : }
160 :
161 0 : int wait_udp(int socket, int msec)
162 : {
163 : int status;
164 : fd_set fdset;
165 : struct timeval timeout;
166 :
167 0 : FD_ZERO(&fdset);
168 0 : FD_SET(socket, &fdset);
169 :
170 0 : timeout.tv_sec = msec/1000;
171 0 : timeout.tv_usec = (msec%1000)*1000;
172 :
173 0 : status = select(socket+1, &fdset, NULL, NULL, &timeout);
174 :
175 : #ifdef EINTR
176 0 : if (status < 0 && errno == EINTR) {
177 0 : return 0; // watchdog interrupt, try again
178 : }
179 : #endif
180 :
181 0 : if (status < 0) {
182 0 : cm_msg(MERROR, "wait_udp", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
183 0 : return -1;
184 : }
185 :
186 0 : if (status == 0) {
187 0 : return 0; // timeout
188 : }
189 :
190 0 : if (FD_ISSET(socket, &fdset)) {
191 0 : return 1; // have data
192 : }
193 :
194 : // timeout
195 0 : return 0;
196 : }
197 :
198 0 : int find_source(Source* src, const sockaddr* paddr, int addr_len)
199 : {
200 : char host[NI_MAXHOST], service[NI_MAXSERV];
201 :
202 0 : int status = getnameinfo(paddr, addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICSERV);
203 :
204 0 : if (status != 0) {
205 0 : cm_msg(MERROR, "read_udp", "getnameinfo() returned %d (%s), errno %d (%s)", status, gai_strerror(status), errno, strerror(errno));
206 0 : return -1;
207 : }
208 :
209 : char bankname[NAME_LENGTH];
210 0 : int size = sizeof(bankname);
211 :
212 0 : status = db_get_value(hDB, hKeySet, host, bankname, &size, TID_STRING, FALSE);
213 :
214 0 : if (status == DB_NO_KEY) {
215 0 : cm_msg(MERROR, "read_udp", "UDP packet from unknown host \"%s\"", host);
216 0 : cm_msg(MINFO, "read_udp", "Register this host by running following commands:");
217 0 : cm_msg(MINFO, "read_udp", "odbedit -c \"create STRING /Equipment/%s/Settings/%s\"", EQ_NAME, host);
218 0 : cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
219 0 : return -1;
220 0 : } else if (status != DB_SUCCESS) {
221 0 : cm_msg(MERROR, "read_udp", "db_get_value(\"/Equipment/%s/Settings/%s\") status %d", EQ_NAME, host, status);
222 0 : return -1;
223 : }
224 :
225 0 : if (strlen(bankname) != 4) {
226 0 : cm_msg(MERROR, "read_udp", "ODB \"/Equipment/%s/Settings/%s\" should be set to a 4 character MIDAS bank name", EQ_NAME, host);
227 0 : cm_msg(MINFO, "read_udp", "Use this command:");
228 0 : cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
229 0 : return -1;
230 : }
231 :
232 0 : cm_msg(MINFO, "read_udp", "UDP packets from host \"%s\" will be stored in bank \"%s\"", host, bankname);
233 :
234 0 : src->host_name = host;
235 0 : mstrlcpy(src->bank_name, bankname, 5);
236 0 : memcpy(&src->addr, paddr, sizeof(src->addr));
237 :
238 0 : return 0;
239 : }
240 :
241 0 : int read_udp(int socket, char* buf, int bufsize, char* pbankname)
242 : {
243 0 : if (wait_udp(socket, 100) < 1)
244 0 : return 0;
245 :
246 : #if 0
247 : static int count = 0;
248 : static double tt = 0;
249 : double t = GetTimeSec();
250 :
251 : double dt = (t-tt)*1e6;
252 : count++;
253 : if (dt > 1000) {
254 : printf("read_udp: %5d %6.0f usec\n", count, dt);
255 : count = 0;
256 : }
257 : tt = t;
258 : #endif
259 :
260 : struct sockaddr addr;
261 0 : socklen_t addr_len = sizeof(addr);
262 0 : int rd = recvfrom(socket, buf, bufsize, 0, &addr, &addr_len);
263 :
264 0 : if (rd < 0) {
265 0 : cm_msg(MERROR, "read_udp", "recvfrom() returned %d, errno %d (%s)", rd, errno, strerror(errno));
266 0 : return -1;
267 : }
268 :
269 0 : for (unsigned i=0; i<gSrc.size(); i++) {
270 0 : if (addr_match(&gSrc[i], &addr, addr_len)) {
271 0 : mstrlcpy(pbankname, gSrc[i].bank_name, 5);
272 : //printf("rd %d, bank [%s]\n", rd, pbankname);
273 0 : return rd;
274 : }
275 : }
276 :
277 0 : if (gSkipUnknownPackets)
278 0 : return -1;
279 :
280 0 : Source sss;
281 :
282 0 : int status = find_source(&sss, &addr, addr_len);
283 :
284 0 : if (status < 0) {
285 :
286 0 : gUnknownPacketCount++;
287 :
288 0 : if (gUnknownPacketCount > 10) {
289 0 : gSkipUnknownPackets = true;
290 0 : cm_msg(MERROR, "read_udp", "further messages are now suppressed...");
291 0 : return -1;
292 : }
293 :
294 0 : return -1;
295 : }
296 :
297 0 : gSrc.push_back(sss);
298 :
299 0 : mstrlcpy(pbankname, sss.bank_name, 5);
300 :
301 0 : return rd;
302 0 : }
303 :
304 0 : int interrupt_configure(INT cmd, INT source, PTYPE adr)
305 : {
306 0 : return SUCCESS;
307 : }
308 :
309 0 : int frontend_init()
310 : {
311 : int status;
312 :
313 0 : status = cm_get_experiment_database(&hDB, NULL);
314 0 : if (status != CM_SUCCESS) {
315 0 : cm_msg(MERROR, "frontend_init", "Cannot connect to ODB, cm_get_experiment_database() returned %d", status);
316 0 : return FE_ERR_ODB;
317 : }
318 :
319 0 : std::string path;
320 0 : path += "/Equipment";
321 0 : path += "/";
322 0 : path += EQ_NAME;
323 0 : path += "/Settings";
324 :
325 0 : std::string path1 = path + "/udp_port";
326 :
327 0 : int udp_port = 50005;
328 0 : int size = sizeof(udp_port);
329 0 : status = db_get_value(hDB, 0, path1.c_str(), &udp_port, &size, TID_INT, TRUE);
330 :
331 0 : if (status != DB_SUCCESS) {
332 0 : cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_get_value() returned %d", path1.c_str(), status);
333 0 : return FE_ERR_ODB;
334 : }
335 :
336 0 : status = db_find_key(hDB, 0, path.c_str(), &hKeySet);
337 :
338 0 : if (status != DB_SUCCESS) {
339 0 : cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_find_key() returned %d", path.c_str(), status);
340 0 : return FE_ERR_ODB;
341 : }
342 :
343 0 : gDataSocket = open_udp_socket(udp_port);
344 :
345 0 : if (gDataSocket < 0) {
346 0 : printf("frontend_init: cannot open udp socket\n");
347 0 : cm_msg(MERROR, "frontend_init", "Cannot open UDP socket for port %d", udp_port);
348 0 : return FE_ERR_HW;
349 : }
350 :
351 0 : cm_msg(MINFO, "frontend_init", "Frontend equipment \"%s\" is ready, listening on UDP port %d", EQ_NAME, udp_port);
352 0 : return SUCCESS;
353 0 : }
354 :
355 0 : int frontend_loop()
356 : {
357 0 : ss_sleep(10);
358 0 : return SUCCESS;
359 : }
360 :
361 0 : int begin_of_run(int run_number, char *error)
362 : {
363 0 : gUnknownPacketCount = 0;
364 0 : gSkipUnknownPackets = false;
365 0 : return SUCCESS;
366 : }
367 :
368 0 : int end_of_run(int run_number, char *error)
369 : {
370 0 : return SUCCESS;
371 : }
372 :
373 0 : int pause_run(INT run_number, char *error)
374 : {
375 0 : return SUCCESS;
376 : }
377 :
378 0 : int resume_run(INT run_number, char *error)
379 : {
380 0 : return SUCCESS;
381 : }
382 :
383 0 : int frontend_exit()
384 : {
385 0 : return SUCCESS;
386 : }
387 :
388 0 : INT poll_event(INT source, INT count, BOOL test)
389 : {
390 : //printf("poll_event: source %d, count %d, test %d\n", source, count, test);
391 :
392 0 : if (test) {
393 0 : for (int i=0; i<count; i++)
394 0 : ss_sleep(10);
395 0 : return 1;
396 : }
397 :
398 0 : return 1;
399 : }
400 :
401 : #define MAX_UDP_SIZE (0x10000)
402 :
403 0 : int read_event(char *pevent, int off)
404 : {
405 : char buf[MAX_UDP_SIZE];
406 : char bankname[5];
407 :
408 0 : int length = read_udp(gDataSocket, buf, MAX_UDP_SIZE, bankname);
409 0 : if (length <= 0)
410 0 : return 0;
411 :
412 0 : bk_init32(pevent);
413 : char* pdata;
414 0 : bk_create(pevent, bankname, TID_BYTE, (void**)&pdata);
415 0 : memcpy(pdata, buf, length);
416 0 : bk_close(pevent, pdata + length);
417 0 : return bk_size(pevent);
418 : }
419 :
420 : /* emacs
421 : * Local Variables:
422 : * tab-width: 8
423 : * c-basic-offset: 3
424 : * indent-tabs-mode: nil
425 : * End:
426 : */
|