MIDAS
Loading...
Searching...
No Matches
feudp.cxx
Go to the documentation of this file.
1//
2// feudp.cxx
3//
4// Frontend for receiving and storing UDP packets as MIDAS data banks.
5//
6
7#include <stdio.h>
8#include <netdb.h> // getnameinfo()
9//#include <stdlib.h>
10#include <string.h> // memcpy()
11#include <errno.h> // errno
12//#include <unistd.h>
13//#include <time.h>
14
15#include <string>
16#include <vector>
17
18#include "midas.h"
19#include "mfe.h"
20#include "mstrlcpy.h"
21
22const char *frontend_name = "feudp"; /* fe MIDAS client name */
23const char *frontend_file_name = __FILE__; /* The frontend file name */
24
25 BOOL frontend_call_loop = TRUE; /* frontend_loop called periodically TRUE */
26 int display_period = 0; /* status page displayed with this freq[ms] */
27 int max_event_size = 1*1024*1024; /* max event size produced by this frontend */
28 int max_event_size_frag = 5 * 1024 * 1024; /* max for fragmented events */
29 int event_buffer_size = 8*1024*1024; /* buffer size to hold events */
30
31 int interrupt_configure(INT cmd, INT source, PTYPE adr);
33 int frontend_init();
34 int frontend_exit();
35 int begin_of_run(int run, char *err);
36 int end_of_run(int run, char *err);
37 int pause_run(int run, char *err);
38 int resume_run(int run, char *err);
39 int frontend_loop();
40 int read_event(char *pevent, INT off);
41
42#ifndef EQ_NAME
43#define EQ_NAME "UDP"
44#endif
45
46#ifndef EQ_EVID
47#define EQ_EVID 1
48#endif
49
51
53 { EQ_NAME, /* equipment name */
54 {EQ_EVID, 0, "SYSTEM", /* event ID, trigger mask, Evbuf */
55 EQ_MULTITHREAD, 0, "MIDAS", /* equipment type, EventSource, format */
56 TRUE, RO_ALWAYS, /* enabled?, WhenRead? */
57 50, 0, 0, 0, /* poll[ms], Evt Lim, SubEvtLim, LogHist */
58 "", "", "",}, read_event, /* readout routine */
59 },
60 {""}
61};
63
64#include <sys/time.h>
65
66static double GetTimeSec()
67{
68 struct timeval tv;
70 return tv.tv_sec + 0.000001*tv.tv_usec;
71}
72
73struct Source
74{
75 struct sockaddr addr;
76 char bank_name[5];
77 std::string host_name;
78};
79
80static std::vector<Source> gSrc;
81
82//static HNDLE hDB;
83static HNDLE hKeySet; // equipment settings
84
85static int gDataSocket;
86
87static int gUnknownPacketCount = 0;
88static bool gSkipUnknownPackets = false;
89
91{
92 int status;
93
94 int fd = socket(AF_INET, SOCK_DGRAM, 0);
95
96 if (fd < 0) {
97 cm_msg(MERROR, "open_udp_socket", "socket(AF_INET,SOCK_DGRAM) returned %d, errno %d (%s)", fd, errno, strerror(errno));
98 return -1;
99 }
100
101 int opt = 1;
102 status = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
103
104 if (status == -1) {
105 cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_REUSEADDR) returned %d, errno %d (%s)", status, errno, strerror(errno));
106 return -1;
107 }
108
109 int bufsize = 8*1024*1024;
110 //int bufsize = 20*1024;
111
113
114 if (status == -1) {
115 cm_msg(MERROR, "open_udp_socket", "setsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
116 return -1;
117 }
118
119 struct sockaddr_in local_addr;
120 memset(&local_addr, 0, sizeof(local_addr));
121
122 local_addr.sin_family = AF_INET;
123 local_addr.sin_port = htons(server_port);
124 local_addr.sin_addr.s_addr = INADDR_ANY;
125
126 status = bind(fd, (struct sockaddr *)&local_addr, sizeof(local_addr));
127
128 if (status == -1) {
129 cm_msg(MERROR, "open_udp_socket", "bind(port=%d) returned %d, errno %d (%s)", server_port, status, errno, strerror(errno));
130 return -1;
131 }
132
133 int xbufsize = 0;
134 unsigned size = sizeof(xbufsize);
135
137
138 //printf("status %d, xbufsize %d, size %d\n", status, xbufsize, size);
139
140 if (status == -1) {
141 cm_msg(MERROR, "open_udp_socket", "getsockopt(SOL_SOCKET,SO_RCVBUF) returned %d, errno %d (%s)", status, errno, strerror(errno));
142 return -1;
143 }
144
145 cm_msg(MINFO, "open_udp_socket", "UDP port %d socket receive buffer size is %d", server_port, xbufsize);
146
147 return fd;
148}
149
150bool addr_match(const Source* s, void *addr, int addr_len)
151{
152 int v = memcmp(&s->addr, addr, addr_len);
153#if 0
154 for (int i=0; i<addr_len; i++)
155 printf("%3d - 0x%02x 0x%02x\n", i, ((char*)&s->addr)[i], ((char*)addr)[i]);
156 printf("match %d, hostname [%s] bank [%s], status %d\n", addr_len, s->host_name.c_str(), s->bank_name, v);
157#endif
158 return v==0;
159}
160
161int wait_udp(int socket, int msec)
162{
163 int status;
165 struct timeval timeout;
166
167 FD_ZERO(&fdset);
169
170 timeout.tv_sec = msec/1000;
171 timeout.tv_usec = (msec%1000)*1000;
172
173 status = select(socket+1, &fdset, NULL, NULL, &timeout);
174
175#ifdef EINTR
176 if (status < 0 && errno == EINTR) {
177 return 0; // watchdog interrupt, try again
178 }
179#endif
180
181 if (status < 0) {
182 cm_msg(MERROR, "wait_udp", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
183 return -1;
184 }
185
186 if (status == 0) {
187 return 0; // timeout
188 }
189
190 if (FD_ISSET(socket, &fdset)) {
191 return 1; // have data
192 }
193
194 // timeout
195 return 0;
196}
197
199{
200 char host[NI_MAXHOST], service[NI_MAXSERV];
201
203
204 if (status != 0) {
205 cm_msg(MERROR, "read_udp", "getnameinfo() returned %d (%s), errno %d (%s)", status, gai_strerror(status), errno, strerror(errno));
206 return -1;
207 }
208
209 char bankname[NAME_LENGTH];
210 int size = sizeof(bankname);
211
213
214 if (status == DB_NO_KEY) {
215 cm_msg(MERROR, "read_udp", "UDP packet from unknown host \"%s\"", host);
216 cm_msg(MINFO, "read_udp", "Register this host by running following commands:");
217 cm_msg(MINFO, "read_udp", "odbedit -c \"create STRING /Equipment/%s/Settings/%s\"", EQ_NAME, host);
218 cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
219 return -1;
220 } else if (status != DB_SUCCESS) {
221 cm_msg(MERROR, "read_udp", "db_get_value(\"/Equipment/%s/Settings/%s\") status %d", EQ_NAME, host, status);
222 return -1;
223 }
224
225 if (strlen(bankname) != 4) {
226 cm_msg(MERROR, "read_udp", "ODB \"/Equipment/%s/Settings/%s\" should be set to a 4 character MIDAS bank name", EQ_NAME, host);
227 cm_msg(MINFO, "read_udp", "Use this command:");
228 cm_msg(MINFO, "read_udp", "odbedit -c \"set /Equipment/%s/Settings/%s AAAA\", where AAAA is the MIDAS bank name for this host", EQ_NAME, host);
229 return -1;
230 }
231
232 cm_msg(MINFO, "read_udp", "UDP packets from host \"%s\" will be stored in bank \"%s\"", host, bankname);
233
234 src->host_name = host;
235 mstrlcpy(src->bank_name, bankname, 5);
236 memcpy(&src->addr, paddr, sizeof(src->addr));
237
238 return 0;
239}
240
241int read_udp(int socket, char* buf, int bufsize, char* pbankname)
242{
243 if (wait_udp(socket, 100) < 1)
244 return 0;
245
246#if 0
247 static int count = 0;
248 static double tt = 0;
249 double t = GetTimeSec();
250
251 double dt = (t-tt)*1e6;
252 count++;
253 if (dt > 1000) {
254 printf("read_udp: %5d %6.0f usec\n", count, dt);
255 count = 0;
256 }
257 tt = t;
258#endif
259
260 struct sockaddr addr;
261 socklen_t addr_len = sizeof(addr);
262 int rd = recvfrom(socket, buf, bufsize, 0, &addr, &addr_len);
263
264 if (rd < 0) {
265 cm_msg(MERROR, "read_udp", "recvfrom() returned %d, errno %d (%s)", rd, errno, strerror(errno));
266 return -1;
267 }
268
269 for (unsigned i=0; i<gSrc.size(); i++) {
270 if (addr_match(&gSrc[i], &addr, addr_len)) {
272 //printf("rd %d, bank [%s]\n", rd, pbankname);
273 return rd;
274 }
275 }
276
278 return -1;
279
280 Source sss;
281
283
284 if (status < 0) {
285
287
288 if (gUnknownPacketCount > 10) {
289 gSkipUnknownPackets = true;
290 cm_msg(MERROR, "read_udp", "further messages are now suppressed...");
291 return -1;
292 }
293
294 return -1;
295 }
296
297 gSrc.push_back(sss);
298
299 mstrlcpy(pbankname, sss.bank_name, 5);
300
301 return rd;
302}
303
304int interrupt_configure(INT cmd, INT source, PTYPE adr)
305{
306 return SUCCESS;
307}
308
310{
311 int status;
312
314 if (status != CM_SUCCESS) {
315 cm_msg(MERROR, "frontend_init", "Cannot connect to ODB, cm_get_experiment_database() returned %d", status);
316 return FE_ERR_ODB;
317 }
318
319 std::string path;
320 path += "/Equipment";
321 path += "/";
322 path += EQ_NAME;
323 path += "/Settings";
324
325 std::string path1 = path + "/udp_port";
326
327 int udp_port = 50005;
328 int size = sizeof(udp_port);
329 status = db_get_value(hDB, 0, path1.c_str(), &udp_port, &size, TID_INT, TRUE);
330
331 if (status != DB_SUCCESS) {
332 cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_get_value() returned %d", path1.c_str(), status);
333 return FE_ERR_ODB;
334 }
335
336 status = db_find_key(hDB, 0, path.c_str(), &hKeySet);
337
338 if (status != DB_SUCCESS) {
339 cm_msg(MERROR, "frontend_init", "Cannot find \"%s\", db_find_key() returned %d", path.c_str(), status);
340 return FE_ERR_ODB;
341 }
342
344
345 if (gDataSocket < 0) {
346 printf("frontend_init: cannot open udp socket\n");
347 cm_msg(MERROR, "frontend_init", "Cannot open UDP socket for port %d", udp_port);
348 return FE_ERR_HW;
349 }
350
351 cm_msg(MINFO, "frontend_init", "Frontend equipment \"%s\" is ready, listening on UDP port %d", EQ_NAME, udp_port);
352 return SUCCESS;
353}
354
356{
357 ss_sleep(10);
358 return SUCCESS;
359}
360
361int begin_of_run(int run_number, char *error)
362{
364 gSkipUnknownPackets = false;
365 return SUCCESS;
366}
367
368int end_of_run(int run_number, char *error)
369{
370 return SUCCESS;
371}
372
373int pause_run(INT run_number, char *error)
374{
375 return SUCCESS;
376}
377
378int resume_run(INT run_number, char *error)
379{
380 return SUCCESS;
381}
382
384{
385 return SUCCESS;
386}
387
389{
390 //printf("poll_event: source %d, count %d, test %d\n", source, count, test);
391
392 if (test) {
393 for (int i=0; i<count; i++)
394 ss_sleep(10);
395 return 1;
396 }
397
398 return 1;
399}
400
401#define MAX_UDP_SIZE (0x10000)
402
403int read_event(char *pevent, int off)
404{
405 char buf[MAX_UDP_SIZE];
406 char bankname[5];
407
409 if (length <= 0)
410 return 0;
411
412 bk_init32(pevent);
413 char* pdata;
414 bk_create(pevent, bankname, TID_BYTE, (void**)&pdata);
415 memcpy(pdata, buf, length);
416 bk_close(pevent, pdata + length);
417 return bk_size(pevent);
418}
419
420/* emacs
421 * Local Variables:
422 * tab-width: 8
423 * c-basic-offset: 3
424 * indent-tabs-mode: nil
425 * End:
426 */
#define FALSE
Definition cfortran.h:309
BOOL frontend_call_loop
Definition feudp.cxx:25
const char * frontend_file_name
Definition feudp.cxx:23
int max_event_size_frag
Definition feudp.cxx:28
#define EQ_NAME
Definition feudp.cxx:43
int event_buffer_size
Definition feudp.cxx:29
int frontend_exit()
Frontend exit.
Definition feudp.cxx:383
int read_event(char *pevent, INT off)
Definition feudp.cxx:403
int end_of_run(int run, char *err)
End of Run.
Definition feudp.cxx:368
static HNDLE hKeySet
Definition feudp.cxx:83
bool addr_match(const Source *s, void *addr, int addr_len)
Definition feudp.cxx:150
static int gUnknownPacketCount
Definition feudp.cxx:87
int frontend_loop()
Frontend loop.
Definition feudp.cxx:355
int max_event_size
Definition feudp.cxx:27
BOOL equipment_common_overwrite
Definition feudp.cxx:50
int interrupt_configure(INT cmd, INT source, PTYPE adr)
Definition feudp.cxx:304
#define MAX_UDP_SIZE
Definition feudp.cxx:401
int open_udp_socket(int server_port)
Definition feudp.cxx:90
INT poll_event(INT source, INT count, BOOL test)
Definition feudp.cxx:388
EQUIPMENT equipment[]
Definition feudp.cxx:52
int read_udp(int socket, char *buf, int bufsize, char *pbankname)
Definition feudp.cxx:241
int find_source(Source *src, const sockaddr *paddr, int addr_len)
Definition feudp.cxx:198
const char * frontend_name
Definition feudp.cxx:22
static double GetTimeSec()
Definition feudp.cxx:66
static bool gSkipUnknownPackets
Definition feudp.cxx:88
#define EQ_EVID
Definition feudp.cxx:47
int wait_udp(int socket, int msec)
Definition feudp.cxx:161
int frontend_init()
Frontend initialization.
Definition feudp.cxx:309
static int gDataSocket
Definition feudp.cxx:85
int display_period
Definition feudp.cxx:26
static std::vector< Source > gSrc
Definition feudp.cxx:80
int begin_of_run(int run, char *err)
Begin of Run.
Definition feudp.cxx:361
INT bk_close(void *event, void *pdata)
Definition midas.cxx:16780
void bk_init32(void *event)
Definition midas.cxx:16469
void bk_create(void *event, const char *name, WORD type, void **pdata)
Definition midas.cxx:16561
INT bk_size(const void *event)
Definition midas.cxx:16495
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3011
#define CM_SUCCESS
Definition midas.h:582
#define DB_SUCCESS
Definition midas.h:631
#define DB_NO_KEY
Definition midas.h:642
#define FE_ERR_ODB
Definition midas.h:718
#define FE_ERR_HW
Definition midas.h:719
#define SUCCESS
Definition mcstd.h:54
#define MINFO
Definition midas.h:560
#define TID_BYTE
Definition midas.h:327
#define EQ_MULTITHREAD
Definition midas.h:417
#define TID_STRING
Definition midas.h:346
#define MERROR
Definition midas.h:559
#define TID_INT
Definition midas.h:338
#define RO_ALWAYS
Definition midas.h:436
INT ss_sleep(INT millisec)
Definition system.cxx:3628
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:915
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition odb.cxx:5415
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4079
INT run_number[2]
Definition mana.cxx:246
HNDLE hDB
main ODB handle
Definition mana.cxx:207
char addr[128]
Definition mcnaf.cxx:104
char bank_name[4]
Definition mdump.cxx:26
double count
Definition mdump.cxx:33
INT i
Definition mdump.cxx:32
INT HNDLE
Definition midas.h:132
DWORD BOOL
Definition midas.h:105
int INT
Definition midas.h:129
#define PTYPE
Definition midas.h:170
#define TRUE
Definition midas.h:182
#define NAME_LENGTH
Definition midas.h:272
#define resume_run
#define pause_run
program test
Definition miniana.f:6
int gettimeofday(struct timeval *tp, void *tzp)
timeval tv
Definition msysmon.cxx:1095
DWORD run
Definition odbhist.cxx:39
DWORD status
Definition odbhist.cxx:39
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
Definition rmidas.h:24
struct sockaddr addr
Definition feudp.cxx:75
std::string host_name
Definition feudp.cxx:77
char bank_name[5]
Definition feudp.cxx:76