Line data Source code
1 : /********************************************************************\
2 :
3 : Name: tmfe.cxx
4 : Created by: Konstantin Olchanski - TRIUMF
5 :
6 : Contents: C++ MIDAS frontend
7 :
8 : \********************************************************************/
9 :
10 : #undef NDEBUG // midas required assert() to be always enabled
11 :
12 : #include <stdio.h>
13 : #include <stdarg.h>
14 : #include <assert.h>
15 : #include <signal.h> // signal()
16 : #include <sys/time.h> // gettimeofday()
17 :
18 : #include "tmfe.h"
19 :
20 : #include "midas.h"
21 : #include "msystem.h"
22 : #include "mrpc.h"
23 : #include "mstrlcpy.h"
24 :
25 : //////////////////////////////////////////////////////////////////////
26 : // error handling
27 : //////////////////////////////////////////////////////////////////////
28 :
29 0 : TMFeResult TMFeErrorMessage(const std::string& message)
30 : {
31 0 : return TMFeResult(0, message);
32 : }
33 :
34 0 : TMFeResult TMFeMidasError(const std::string& message, const char* midas_function_name, int midas_status)
35 : {
36 0 : return TMFeResult(midas_status, message + msprintf(", %s() status %d", midas_function_name, midas_status));
37 : }
38 :
39 : //////////////////////////////////////////////////////////////////////
40 : // TMFE singleton class
41 : //////////////////////////////////////////////////////////////////////
42 :
43 :
44 1 : TMFE::TMFE() // ctor
45 : {
46 1 : if (gfVerbose)
47 0 : printf("TMFE::ctor!\n");
48 1 : }
49 :
50 0 : TMFE::~TMFE() // dtor
51 : {
52 0 : if (gfVerbose)
53 0 : printf("TMFE::dtor!\n");
54 0 : assert(!"TMFE::~TMFE(): destruction of the TMFE singleton is not permitted!");
55 0 : }
56 :
57 1 : TMFE* TMFE::Instance()
58 : {
59 1 : if (!gfMFE)
60 1 : gfMFE = new TMFE();
61 :
62 1 : return gfMFE;
63 : }
64 :
65 1 : TMFeResult TMFE::Connect(const char* progname, const char* hostname, const char* exptname)
66 : {
67 1 : if (progname)
68 1 : fProgramName = progname;
69 :
70 1 : if (fProgramName.empty()) {
71 0 : return TMFeErrorMessage("TMFE::Connect: frontend program name is not set");
72 : }
73 :
74 1 : fHostname = ss_gethostname();
75 :
76 : int status;
77 :
78 1 : std::string env_hostname;
79 1 : std::string env_exptname;
80 :
81 : /* get default from environment */
82 1 : status = cm_get_environment(&env_hostname, &env_exptname);
83 :
84 1 : if (status != CM_SUCCESS) {
85 0 : return TMFeMidasError("Cannot connect to MIDAS", "cm_get_environment", status);
86 : }
87 :
88 1 : if (hostname && hostname[0]) {
89 0 : fMserverHostname = hostname;
90 : } else {
91 1 : fMserverHostname = env_hostname;
92 : }
93 :
94 1 : if (exptname && exptname[0]) {
95 0 : fExptname = exptname;
96 : } else {
97 1 : fExptname = env_exptname;
98 : }
99 :
100 1 : if (gfVerbose) {
101 0 : printf("TMFE::Connect: Program \"%s\" connecting to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
102 : }
103 :
104 1 : int watchdog = DEFAULT_WATCHDOG_TIMEOUT;
105 : //int watchdog = 60*1000;
106 :
107 1 : status = cm_connect_experiment1(fMserverHostname.c_str(), fExptname.c_str(), fProgramName.c_str(), NULL, DEFAULT_ODB_SIZE, watchdog);
108 :
109 1 : if (status == CM_UNDEF_EXP) {
110 0 : return TMFeMidasError(msprintf("Cannot connect to MIDAS, experiment \"%s\" is not defined", fExptname.c_str()), "cm_connect_experiment1", status);
111 1 : } else if (status != CM_SUCCESS) {
112 0 : return TMFeMidasError("Cannot connect to MIDAS", "cm_connect_experiment1", status);
113 : }
114 :
115 1 : status = cm_get_experiment_database(&fDB, NULL);
116 1 : if (status != CM_SUCCESS) {
117 0 : return TMFeMidasError("Cannot connect to MIDAS", "cm_get_experiment_database", status);
118 : }
119 :
120 1 : fOdbRoot = MakeMidasOdb(fDB);
121 :
122 1 : int run_state = 0;
123 :
124 1 : fOdbRoot->RI("Runinfo/state", &run_state);
125 1 : fOdbRoot->RI("Runinfo/run number", &fRunNumber);
126 :
127 1 : if (run_state == STATE_RUNNING) {
128 0 : fStateRunning = true;
129 1 : } else if (run_state == STATE_PAUSED) {
130 0 : fStateRunning = true;
131 : } else {
132 1 : fStateRunning = false;
133 : }
134 :
135 1 : RegisterRPCs();
136 :
137 1 : if (gfVerbose) {
138 0 : printf("TMFE::Connect: Program \"%s\" connected to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
139 : }
140 :
141 1 : return TMFeOk();
142 1 : }
143 :
144 0 : TMFeResult TMFE::SetWatchdogSec(int sec)
145 : {
146 0 : if (sec == 0) {
147 0 : cm_set_watchdog_params(false, 0);
148 : } else {
149 0 : cm_set_watchdog_params(true, sec*1000);
150 : }
151 0 : return TMFeOk();
152 : }
153 :
154 1 : TMFeResult TMFE::Disconnect()
155 : {
156 1 : if (gfVerbose)
157 0 : printf("TMFE::Disconnect: Disconnecting from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
158 1 : StopRpcThread();
159 1 : cm_disconnect_experiment();
160 1 : if (gfVerbose)
161 0 : printf("TMFE::Disconnect: Disconnected from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
162 1 : return TMFeOk();
163 : }
164 :
165 : /////////////////////////////////////////////////////////
166 : // event buffer functions
167 : /////////////////////////////////////////////////////////
168 :
169 0 : TMEventBuffer::TMEventBuffer(TMFE* mfe) // ctor
170 : {
171 0 : assert(mfe != NULL);
172 0 : fMfe = mfe;
173 0 : };
174 :
175 0 : TMEventBuffer::~TMEventBuffer() // dtor
176 : {
177 0 : CloseBuffer();
178 :
179 : // poison all pointers
180 0 : fMfe = NULL;
181 0 : };
182 :
183 0 : TMFeResult TMEventBuffer::OpenBuffer(const char* bufname, size_t bufsize)
184 : {
185 0 : if (fBufHandle) {
186 0 : return TMFeErrorMessage(msprintf("Event buffer \"%s\" is already open", fBufName.c_str()));
187 : }
188 :
189 0 : fBufName = bufname;
190 :
191 0 : if (bufsize == 0)
192 0 : bufsize = DEFAULT_BUFFER_SIZE;
193 :
194 0 : int status = bm_open_buffer(fBufName.c_str(), bufsize, &fBufHandle);
195 :
196 0 : if (status != BM_SUCCESS && status != BM_CREATED) {
197 0 : return TMFeMidasError(msprintf("Cannot open event buffer \"%s\"", fBufName.c_str()), "bm_open_buffer", status);
198 : }
199 :
200 0 : fBufSize = 0;
201 0 : fBufMaxEventSize = 0;
202 :
203 0 : uint32_t buf_size = 0;
204 0 : uint32_t max_event_size = 0;
205 :
206 0 : fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &max_event_size);
207 0 : fMfe->fOdbRoot->RU32((std::string("Experiment/Buffer Sizes/") + bufname).c_str(), &buf_size);
208 :
209 0 : if (buf_size > 0) {
210 : // limit event size to half the buffer size, so we can buffer two events
211 0 : uint32_t xmax_event_size = buf_size / 2;
212 : // add extra margin
213 0 : if (xmax_event_size > 1024)
214 0 : xmax_event_size -= 1024;
215 0 : if (max_event_size > xmax_event_size)
216 0 : max_event_size = xmax_event_size;
217 : }
218 :
219 0 : fBufSize = buf_size;
220 0 : fBufMaxEventSize = max_event_size;
221 :
222 0 : if (fBufSize == 0) {
223 0 : return TMFeErrorMessage(msprintf("Cannot get buffer size for event buffer \"%s\"", fBufName.c_str()));
224 : }
225 :
226 0 : if (fBufMaxEventSize == 0) {
227 0 : return TMFeErrorMessage(msprintf("Cannot get MAX_EVENT_SIZE for event buffer \"%s\"", fBufName.c_str()));
228 : }
229 :
230 0 : printf("TMEventBuffer::OpenBuffer: Buffer \"%s\" size %d, max event size %d\n", fBufName.c_str(), (int)fBufSize, (int)fBufMaxEventSize);
231 :
232 0 : return TMFeOk();
233 : }
234 :
235 0 : TMFeResult TMEventBuffer::CloseBuffer()
236 : {
237 0 : if (!fBufHandle)
238 0 : return TMFeOk();
239 :
240 0 : fBufRequests.clear(); // no need to cancel individual requests, they are gone after we close the buffer
241 :
242 0 : int status = bm_close_buffer(fBufHandle);
243 :
244 0 : if (status != BM_SUCCESS) {
245 0 : fBufHandle = 0;
246 0 : return TMFeMidasError(msprintf("Cannot close event buffer \"%s\"", fBufName.c_str()), "bm_close_buffer", status);
247 : }
248 :
249 0 : fBufHandle = 0;
250 0 : fBufSize = 0;
251 0 : fBufMaxEventSize = 0;
252 0 : fBufReadCacheSize = 0;
253 0 : fBufWriteCacheSize = 0;
254 :
255 0 : return TMFeOk();
256 : }
257 :
258 0 : TMFeResult TMEventBuffer::SetCacheSize(size_t read_cache_size, size_t write_cache_size)
259 : {
260 0 : int status = bm_set_cache_size(fBufHandle, read_cache_size, write_cache_size);
261 :
262 0 : if (status != BM_SUCCESS) {
263 0 : return TMFeMidasError(msprintf("Cannot set event buffer \"%s\" cache sizes: read %d, write %d", fBufName.c_str(), (int)read_cache_size, (int)write_cache_size), "bm_set_cache_size", status);
264 : }
265 :
266 0 : fBufReadCacheSize = read_cache_size;
267 0 : fBufWriteCacheSize = write_cache_size;
268 :
269 0 : return TMFeOk();
270 : }
271 :
272 0 : TMFeResult TMEventBuffer::AddRequest(int event_id, int trigger_mask, const char* sampling_type_string)
273 : {
274 0 : if (!fBufHandle) {
275 0 : return TMFeErrorMessage(msprintf("AddRequest: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
276 : }
277 :
278 0 : int sampling_type = 0;
279 :
280 0 : if (strcmp(sampling_type_string, "GET_ALL")==0) {
281 0 : sampling_type = GET_ALL;
282 0 : } else if (strcmp(sampling_type_string, "GET_NONBLOCKING")==0) {
283 0 : sampling_type = GET_NONBLOCKING;
284 0 : } else if (strcmp(sampling_type_string, "GET_RECENT")==0) {
285 0 : sampling_type = GET_RECENT;
286 : } else {
287 0 : sampling_type = GET_ALL;
288 : }
289 :
290 0 : int request_id = 0;
291 :
292 0 : int status = bm_request_event(fBufHandle, event_id, trigger_mask, sampling_type, &request_id, NULL);
293 :
294 0 : if (status != BM_SUCCESS) {
295 0 : return TMFeMidasError(msprintf("Cannot make event request on buffer \"%s\"", fBufName.c_str()), "bm_request_event", status);
296 : }
297 :
298 0 : fBufRequests.push_back(request_id);
299 :
300 0 : return TMFeOk();
301 : }
302 :
303 0 : TMFeResult TMEventBuffer::ReceiveEvent(std::vector<char> *e, int timeout_msec)
304 : {
305 0 : if (!fBufHandle) {
306 0 : return TMFeErrorMessage(msprintf("ReceiveEvent: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
307 : }
308 :
309 0 : assert(e != NULL);
310 :
311 0 : e->resize(0);
312 :
313 0 : int status = bm_receive_event_vec(fBufHandle, e, timeout_msec);
314 :
315 0 : if (status == BM_ASYNC_RETURN) {
316 0 : return TMFeOk();
317 : }
318 :
319 0 : if (status != BM_SUCCESS) {
320 0 : return TMFeMidasError(msprintf("Cannot receive event on buffer \"%s\"", fBufName.c_str()), "bm_receive_event", status);
321 : }
322 :
323 0 : return TMFeOk();
324 : }
325 :
326 0 : TMFeResult TMEventBuffer::SendEvent(const char *e)
327 : {
328 0 : const EVENT_HEADER *pevent = (const EVENT_HEADER*)e;
329 0 : const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
330 : //const size_t total_size = ALIGN8(event_size);
331 0 : return SendEvent(1, &e, &event_size);
332 : }
333 :
334 0 : TMFeResult TMEventBuffer::SendEvent(const std::vector<char>& e)
335 : {
336 0 : const EVENT_HEADER *pevent = (const EVENT_HEADER*)e.data();
337 0 : const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
338 : //const size_t total_size = ALIGN8(event_size);
339 0 : if (e.size() != event_size) {
340 0 : return TMFeErrorMessage(msprintf("Cannot send event, size mismatch: vector size %d, data_size %d, event_size %d", (int)e.size(), (int)pevent->data_size, (int)event_size).c_str());
341 : }
342 :
343 0 : return SendEvent(1, (char**)&pevent, &event_size);
344 : }
345 :
346 0 : TMFeResult TMEventBuffer::SendEvent(const std::vector<std::vector<char>>& e)
347 : {
348 0 : int sg_n = e.size();
349 0 : const char* sg_ptr[sg_n];
350 0 : size_t sg_len[sg_n];
351 0 : for (int i=0; i<sg_n; i++) {
352 0 : sg_ptr[i] = e[i].data();
353 0 : sg_len[i] = e[i].size();
354 : }
355 0 : return SendEvent(sg_n, sg_ptr, sg_len);
356 0 : }
357 :
358 0 : TMFeResult TMEventBuffer::SendEvent(int sg_n, const char* const sg_ptr[], const size_t sg_len[])
359 : {
360 0 : int status = bm_send_event_sg(fBufHandle, sg_n, sg_ptr, sg_len, BM_WAIT);
361 :
362 0 : if (status == BM_CORRUPTED) {
363 0 : fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d, event buffer is corrupted, shutting down the frontend", fBufName.c_str(), status);
364 0 : fMfe->fShutdownRequested = true;
365 0 : return TMFeMidasError("Cannot send event, event buffer is corrupted, shutting down the frontend", "bm_send_event", status);
366 0 : } else if (status != BM_SUCCESS) {
367 0 : fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d", fBufName.c_str(), status);
368 0 : return TMFeMidasError("Cannot send event", "bm_send_event", status);
369 : }
370 :
371 0 : return TMFeOk();
372 : }
373 :
374 0 : TMFeResult TMEventBuffer::FlushCache(bool wait)
375 : {
376 0 : if (!fBufHandle)
377 0 : return TMFeOk();
378 :
379 0 : int flag = BM_NO_WAIT;
380 0 : if (wait)
381 0 : flag = BM_WAIT;
382 :
383 : /* flush of event socket in no-wait mode does nothing */
384 0 : if (wait && rpc_is_remote()) {
385 0 : int status = bm_flush_cache(0, flag);
386 :
387 : //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
388 :
389 0 : if (status == BM_SUCCESS) {
390 : // nothing
391 0 : } else if (status == BM_ASYNC_RETURN) {
392 : // nothing
393 : } else {
394 0 : return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
395 : }
396 : }
397 :
398 0 : int status = bm_flush_cache(fBufHandle, flag);
399 :
400 : //printf("bm_flush_cache(%d,%d) status %d\n", fBufHandle, flag, status);
401 :
402 0 : if (status == BM_SUCCESS) {
403 : // nothing
404 0 : } else if (status == BM_ASYNC_RETURN) {
405 : // nothing
406 : } else {
407 0 : return TMFeMidasError(msprintf("Cannot flush event buffer \"%s\"", fBufName.c_str()).c_str(), "bm_flush_cache", status);
408 : }
409 :
410 0 : return TMFeOk();
411 : }
412 :
413 0 : TMFeResult TMFE::EventBufferOpen(TMEventBuffer** pbuf, const char* bufname, size_t bufsize)
414 : {
415 0 : assert(pbuf != NULL);
416 0 : assert(bufname != NULL);
417 :
418 0 : std::lock_guard<std::mutex> guard(fEventBuffersMutex);
419 :
420 0 : for (auto b : fEventBuffers) {
421 0 : if (!b)
422 0 : continue;
423 :
424 0 : if (b->fBufName == bufname) {
425 0 : *pbuf = b;
426 0 : if (bufsize != 0 && bufsize > b->fBufSize) {
427 0 : Msg(MERROR, "TMFE::EventBufferOpen", "Event buffer \"%s\" size %d is smaller than requested size %d", b->fBufName.c_str(), (int)b->fBufSize, (int)bufsize);
428 : }
429 0 : return TMFeOk();
430 : }
431 : }
432 :
433 0 : TMEventBuffer *b = new TMEventBuffer(this);
434 :
435 0 : fEventBuffers.push_back(b);
436 :
437 0 : *pbuf = b;
438 :
439 0 : TMFeResult r = b->OpenBuffer(bufname, bufsize);
440 :
441 0 : if (r.error_flag) {
442 0 : return r;
443 : }
444 :
445 0 : return TMFeOk();
446 0 : }
447 :
448 0 : TMFeResult TMFE::EventBufferFlushCacheAll(bool wait)
449 : {
450 0 : int flag = BM_NO_WAIT;
451 0 : if (wait)
452 0 : flag = BM_WAIT;
453 :
454 : /* flush of event socket in no-wait mode does nothing */
455 0 : if (wait && rpc_is_remote()) {
456 0 : int status = bm_flush_cache(0, flag);
457 :
458 : //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
459 :
460 0 : if (status == BM_SUCCESS) {
461 : // nothing
462 0 : } else if (status == BM_ASYNC_RETURN) {
463 : // nothing
464 : } else {
465 0 : return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
466 : }
467 : }
468 :
469 0 : std::lock_guard<std::mutex> guard(fEventBuffersMutex);
470 :
471 0 : for (auto b : fEventBuffers) {
472 0 : if (!b)
473 0 : continue;
474 :
475 0 : TMFeResult r = b->FlushCache(wait);
476 :
477 0 : if (r.error_flag)
478 0 : return r;
479 0 : }
480 :
481 0 : return TMFeOk();
482 0 : }
483 :
484 0 : TMFeResult TMFE::EventBufferCloseAll()
485 : {
486 0 : std::lock_guard<std::mutex> guard(fEventBuffersMutex);
487 :
488 0 : for (auto b : fEventBuffers) {
489 0 : if (!b)
490 0 : continue;
491 0 : TMFeResult r = b->CloseBuffer();
492 0 : if (r.error_flag)
493 0 : return r;
494 :
495 0 : delete b;
496 0 : }
497 :
498 0 : fEventBuffers.clear();
499 :
500 0 : return TMFeOk();
501 0 : }
502 :
503 : /////////////////////////////////////////////////////////
504 : // equipment functions
505 : /////////////////////////////////////////////////////////
506 :
507 0 : double TMFrontend::FePeriodicTasks()
508 : {
509 0 : double now = TMFE::GetTime();
510 :
511 0 : double next_periodic = now + 60;
512 :
513 0 : int n = fFeEquipments.size();
514 0 : for (int i=0; i<n; i++) {
515 0 : TMFeEquipment* eq = fFeEquipments[i];
516 0 : if (!eq)
517 0 : continue;
518 0 : if (!eq->fEqConfEnabled)
519 0 : continue;
520 0 : if (!eq->fEqConfEnablePeriodic)
521 0 : continue;
522 0 : double period = eq->fEqConfPeriodMilliSec/1000.0;
523 0 : if (period <= 0)
524 0 : continue;
525 0 : if (eq->fEqPeriodicNextCallTime == 0)
526 0 : eq->fEqPeriodicNextCallTime = now + 0.5; // we are off by 0.5 sec with updating of statistics
527 : //printf("periodic[%d] period %f, last call %f, next call %f (%f)\n", i, period, eq->fEqPeriodicLastCallTime, eq->fEqPeriodicNextCallTime, now - eq->fEqPeriodicNextCallTime);
528 0 : if (now >= eq->fEqPeriodicNextCallTime) {
529 0 : eq->fEqPeriodicNextCallTime += period;
530 :
531 0 : if (eq->fEqPeriodicNextCallTime < now) {
532 0 : if (TMFE::gfVerbose)
533 0 : printf("TMFE::EquipmentPeriodicTasks: periodic equipment \"%s\" skipped some beats!\n", eq->fEqName.c_str());
534 0 : fMfe->Msg(MERROR, "TMFE::EquipmentPeriodicTasks", "Equipment \"%s\" skipped some beats!", eq->fEqName.c_str());
535 0 : while (eq->fEqPeriodicNextCallTime < now) {
536 0 : eq->fEqPeriodicNextCallTime += period;
537 : }
538 : }
539 :
540 0 : if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
541 0 : eq->fEqPeriodicLastCallTime = now;
542 : //printf("handler %d eq [%s] call HandlePeriodic()\n", i, h->fEq->fName.c_str());
543 0 : eq->HandlePeriodic();
544 : }
545 :
546 0 : now = TMFE::GetTime();
547 : }
548 :
549 0 : if (eq->fEqPeriodicNextCallTime < next_periodic)
550 0 : next_periodic = eq->fEqPeriodicNextCallTime;
551 : }
552 :
553 0 : now = TMFE::GetTime();
554 :
555 : // update statistics
556 0 : for (auto eq : fFeEquipments) {
557 0 : if (!eq)
558 0 : continue;
559 0 : if (!eq->fEqConfEnabled)
560 0 : continue;
561 0 : double next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
562 0 : if (now > next) {
563 0 : eq->EqWriteStatistics();
564 0 : next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
565 : }
566 0 : if (next < next_periodic)
567 0 : next_periodic = next;
568 : }
569 :
570 0 : now = TMFE::GetTime();
571 :
572 : // flush write cache
573 0 : if ((fFeFlushWriteCachePeriodSec > 0) && (now >= fFeFlushWriteCacheNextCallTime)) {
574 0 : fMfe->EventBufferFlushCacheAll(false);
575 0 : fFeFlushWriteCacheNextCallTime = now + fFeFlushWriteCachePeriodSec;
576 0 : if (fFeFlushWriteCacheNextCallTime < next_periodic)
577 0 : next_periodic = fFeFlushWriteCacheNextCallTime;
578 : }
579 :
580 0 : return next_periodic;
581 : }
582 :
583 0 : double TMFrontend::FePollTasks(double next_periodic_time)
584 : {
585 : //printf("poll %f next %f diff %f\n", TMFE::GetTime(), next_periodic_time, next_periodic_time - TMFE::GetTime());
586 :
587 0 : double poll_sleep_sec = 9999.0;
588 0 : while (!fMfe->fShutdownRequested) {
589 0 : bool poll_again = false;
590 : // NOTE: ok to use range-based for() loop, there will be a crash if HandlePoll() or HandlePollRead() modify fEquipments, so they should not do that. K.O.
591 0 : for (auto eq : fFeEquipments) {
592 0 : if (!eq)
593 0 : continue;
594 0 : if (!eq->fEqConfEnabled)
595 0 : continue;
596 0 : if (eq->fEqConfEnablePoll && !eq->fEqPollThreadRunning && !eq->fEqPollThreadStarting) {
597 0 : if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
598 0 : if (eq->fEqConfPollSleepSec < poll_sleep_sec)
599 0 : poll_sleep_sec = eq->fEqConfPollSleepSec;
600 0 : bool poll = eq->HandlePoll();
601 0 : if (poll) {
602 0 : poll_again = true;
603 0 : eq->HandlePollRead();
604 : }
605 : }
606 : }
607 : }
608 0 : if (!poll_again)
609 0 : break;
610 :
611 0 : if (next_periodic_time) {
612 : // stop polling if we need to run periodic activity
613 0 : double now = TMFE::TMFE::GetTime();
614 0 : if (now >= next_periodic_time)
615 0 : break;
616 : }
617 : }
618 0 : return poll_sleep_sec;
619 : }
620 :
621 0 : void TMFeEquipment::EqPollThread()
622 : {
623 0 : if (TMFE::gfVerbose)
624 0 : printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread started\n", fEqName.c_str());
625 :
626 0 : fEqPollThreadRunning = true;
627 :
628 0 : while (!fMfe->fShutdownRequested && !fEqPollThreadShutdownRequested) {
629 0 : if (fMfe->fStateRunning || !fEqConfReadOnlyWhenRunning) {
630 0 : bool poll = HandlePoll();
631 0 : if (poll) {
632 0 : HandlePollRead();
633 : } else {
634 0 : if (fEqConfPollSleepSec > 0) {
635 0 : TMFE::Sleep(fEqConfPollSleepSec);
636 : }
637 : }
638 0 : } else {
639 0 : TMFE::Sleep(0.1);
640 : }
641 : }
642 0 : if (TMFE::gfVerbose)
643 0 : printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread stopped\n", fEqName.c_str());
644 :
645 0 : fEqPollThreadRunning = false;
646 0 : }
647 :
648 0 : void TMFeEquipment::EqStartPollThread()
649 : {
650 : // NOTE: this is thread safe
651 :
652 0 : std::lock_guard<std::mutex> guard(fEqMutex);
653 :
654 0 : if (fEqPollThreadRunning || fEqPollThreadStarting || fEqPollThread) {
655 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqStartPollThread", "Equipment \"%s\": poll thread is already running", fEqName.c_str());
656 0 : return;
657 : }
658 :
659 0 : fEqPollThreadShutdownRequested = false;
660 0 : fEqPollThreadStarting = true;
661 :
662 0 : fEqPollThread = new std::thread(&TMFeEquipment::EqPollThread, this);
663 0 : }
664 :
665 0 : void TMFeEquipment::EqStopPollThread()
666 : {
667 : // NOTE: this is thread safe
668 0 : fEqPollThreadStarting = false;
669 0 : fEqPollThreadShutdownRequested = true;
670 0 : for (int i=0; i<100; i++) {
671 0 : if (!fEqPollThreadRunning) {
672 0 : std::lock_guard<std::mutex> guard(fEqMutex);
673 0 : if (fEqPollThread) {
674 0 : fEqPollThread->join();
675 0 : delete fEqPollThread;
676 0 : fEqPollThread = NULL;
677 : }
678 0 : return;
679 0 : }
680 0 : TMFE::Sleep(0.1);
681 : }
682 0 : if (fEqPollThreadRunning) {
683 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqStopPollThread", "Equipment \"%s\": timeout waiting for shutdown of poll thread", fEqName.c_str());
684 : }
685 : }
686 :
687 0 : void TMFE::StopRun()
688 : {
689 : char str[TRANSITION_ERROR_STRING_LENGTH];
690 :
691 0 : int status = cm_transition(TR_STOP, 0, str, sizeof(str), TR_SYNC, FALSE);
692 0 : if (status != CM_SUCCESS) {
693 0 : Msg(MERROR, "TMFE::StopRun", "Cannot stop run, error: %s", str);
694 0 : fRunStopRequested = false;
695 0 : return;
696 : }
697 :
698 0 : fRunStopRequested = false;
699 :
700 0 : bool logger_auto_restart = false;
701 0 : fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
702 :
703 0 : int logger_auto_restart_delay = 0;
704 0 : fOdbRoot->RI("Logger/Auto restart delay", &logger_auto_restart_delay);
705 :
706 0 : if (logger_auto_restart) {
707 0 : Msg(MINFO, "TMFE::StopRun", "Run will restart after %d seconds", logger_auto_restart_delay);
708 0 : fRunStartTime = GetTime() + logger_auto_restart_delay;
709 : } else {
710 0 : fRunStartTime = 0;
711 : }
712 : }
713 :
714 0 : void TMFE::StartRun()
715 : {
716 0 : fRunStartTime = 0;
717 :
718 : /* check if really stopped */
719 0 : int run_state = 0;
720 0 : fOdbRoot->RI("Runinfo/State", &run_state);
721 :
722 0 : if (run_state != STATE_STOPPED) {
723 0 : Msg(MERROR, "TMFE::StartRun", "Run start requested, but run is already in progress");
724 0 : return;
725 : }
726 :
727 0 : bool logger_auto_restart = false;
728 0 : fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
729 :
730 0 : if (!logger_auto_restart) {
731 0 : Msg(MERROR, "TMFE::StartRun", "Run start requested, but logger/auto restart is off");
732 0 : return;
733 : }
734 :
735 0 : Msg(MTALK, "TMFE::StartRun", "Starting new run");
736 :
737 : char str[TRANSITION_ERROR_STRING_LENGTH];
738 :
739 0 : int status = cm_transition(TR_START, 0, str, sizeof(str), TR_SYNC, FALSE);
740 0 : if (status != CM_SUCCESS) {
741 0 : Msg(MERROR, "TMFE::StartRun", "Cannot restart run, error: %s", str);
742 : }
743 : }
744 :
745 0 : void TMFrontend::FePollMidas(double sleep_sec)
746 : {
747 0 : assert(sleep_sec >= 0);
748 0 : bool debug = false;
749 0 : double now = TMFE::GetTime();
750 0 : double sleep_start = now;
751 0 : double sleep_end = now + sleep_sec;
752 0 : int count_yield_loops = 0;
753 :
754 0 : while (!fMfe->fShutdownRequested) {
755 0 : double next_periodic_time = 0;
756 0 : double poll_sleep = 1.0;
757 :
758 0 : if (!fFePeriodicThreadRunning) {
759 0 : next_periodic_time = FePeriodicTasks();
760 0 : poll_sleep = FePollTasks(next_periodic_time);
761 : } else {
762 0 : poll_sleep = FePollTasks(TMFE::GetTime() + 0.100);
763 : }
764 :
765 0 : if (fMfe->fRunStopRequested) {
766 0 : fMfe->StopRun();
767 0 : continue;
768 : }
769 :
770 0 : now = TMFE::GetTime();
771 :
772 0 : if (fMfe->fRunStartTime && now >= fMfe->fRunStartTime) {
773 0 : fMfe->StartRun();
774 0 : continue;
775 : }
776 :
777 0 : double sleep_time = sleep_end - now;
778 :
779 0 : if (next_periodic_time > 0 && next_periodic_time < sleep_end) {
780 0 : sleep_time = next_periodic_time - now;
781 : }
782 :
783 0 : int s = 0;
784 0 : if (sleep_time > 0)
785 0 : s = 1 + sleep_time*1000.0;
786 :
787 0 : if (poll_sleep*1000.0 < s) {
788 0 : s = 0;
789 : }
790 :
791 0 : if (debug) {
792 0 : printf("now %.6f, sleep_end %.6f, next_periodic %.6f, sleep_time %.6f, cm_yield(%d), poll period %.6f\n", now, sleep_end, next_periodic_time, sleep_time, s, poll_sleep);
793 : }
794 :
795 0 : int status = cm_yield(s);
796 :
797 0 : if (status == RPC_SHUTDOWN || status == SS_ABORT) {
798 0 : fMfe->fShutdownRequested = true;
799 0 : if (TMFE::gfVerbose) {
800 0 : fprintf(stderr, "TMFE::PollMidas: cm_yield(%d) status %d, shutdown requested...\n", s, status);
801 : }
802 : }
803 :
804 0 : now = TMFE::GetTime();
805 0 : double sleep_more = sleep_end - now;
806 0 : if (sleep_more <= 0)
807 0 : break;
808 :
809 0 : count_yield_loops++;
810 :
811 0 : if (poll_sleep < sleep_more) {
812 0 : TMFE::Sleep(poll_sleep);
813 : }
814 : }
815 :
816 0 : if (debug) {
817 0 : printf("TMFE::PollMidas: sleep %.1f msec, actual %.1f msec, %d loops\n", sleep_sec * 1000.0, (now - sleep_start) * 1000.0, count_yield_loops);
818 : }
819 0 : }
820 :
821 0 : void TMFE::Yield(double sleep_sec)
822 : {
823 0 : double now = GetTime();
824 : //double sleep_start = now;
825 0 : double sleep_end = now + sleep_sec;
826 :
827 0 : while (!fShutdownRequested) {
828 0 : now = GetTime();
829 :
830 0 : double sleep_time = sleep_end - now;
831 0 : int s = 0;
832 0 : if (sleep_time > 0)
833 0 : s = 1 + sleep_time*1000.0;
834 :
835 : //printf("TMFE::Yield: now %f, sleep_end %f, s %d\n", now, sleep_end, s);
836 :
837 0 : int status = cm_yield(s);
838 :
839 0 : if (status == RPC_SHUTDOWN || status == SS_ABORT) {
840 0 : fShutdownRequested = true;
841 0 : fprintf(stderr, "TMFE::Yield: cm_yield(%d) status %d, shutdown requested...\n", s, status);
842 : }
843 :
844 0 : now = GetTime();
845 0 : if (now >= sleep_end)
846 0 : break;
847 : }
848 :
849 : //printf("TMFE::Yield: sleep_sec %.6f, actual %.6f sec\n", sleep_sec, now - sleep_start);
850 0 : }
851 :
852 0 : void TMFE::MidasPeriodicTasks()
853 : {
854 0 : cm_periodic_tasks();
855 0 : }
856 :
857 0 : void TMFE::RpcThread()
858 : {
859 0 : if (TMFE::gfVerbose)
860 0 : printf("TMFE::RpcThread: RPC thread started\n");
861 :
862 0 : int msec = 1000;
863 :
864 0 : fRpcThreadRunning = true;
865 0 : ss_suspend_set_rpc_thread(ss_gettid());
866 :
867 0 : while (!fShutdownRequested && !fRpcThreadShutdownRequested) {
868 :
869 0 : int status = cm_yield(msec);
870 :
871 0 : if (status == RPC_SHUTDOWN || status == SS_ABORT) {
872 0 : fShutdownRequested = true;
873 0 : if (TMFE::gfVerbose)
874 0 : printf("TMFE::RpcThread: cm_yield(%d) status %d, shutdown requested...\n", msec, status);
875 : }
876 : }
877 0 : ss_suspend_exit();
878 0 : if (TMFE::gfVerbose)
879 0 : printf("TMFE::RpcThread: RPC thread stopped\n");
880 0 : fRpcThreadRunning = false;
881 0 : }
882 :
883 0 : void TMFrontend::FePeriodicThread()
884 : {
885 0 : if (TMFE::gfVerbose)
886 0 : printf("TMFE::PeriodicThread: periodic thread started\n");
887 :
888 0 : fFePeriodicThreadRunning = true;
889 0 : while (!fMfe->fShutdownRequested && !fFePeriodicThreadShutdownRequested) {
890 0 : double next_periodic_time = FePeriodicTasks();
891 0 : double now = TMFE::GetTime();
892 0 : double sleep = next_periodic_time - now;
893 : //printf("TMFrontend::FePeriodicThread: now %.6f next %.6f, sleep %.6f\n", now, next_periodic_time, sleep);
894 0 : if (sleep >= 1.0)
895 0 : sleep = 1.0;
896 0 : TMFE::Sleep(sleep);
897 : }
898 0 : if (TMFE::gfVerbose)
899 0 : printf("TMFE::PeriodicThread: periodic thread stopped\n");
900 0 : fFePeriodicThreadRunning = false;
901 0 : }
902 :
903 0 : void TMFE::StartRpcThread()
904 : {
905 : // NOTE: this is thread safe
906 :
907 0 : std::lock_guard<std::mutex> guard(fMutex);
908 :
909 0 : if (fRpcThreadRunning || fRpcThreadStarting || fRpcThread) {
910 0 : if (gfVerbose)
911 0 : printf("TMFE::StartRpcThread: RPC thread already running\n");
912 0 : return;
913 : }
914 :
915 0 : fRpcThreadStarting = true;
916 0 : fRpcThread = new std::thread(&TMFE::RpcThread, this);
917 0 : }
918 :
919 0 : void TMFrontend::FeStartPeriodicThread()
920 : {
921 : // NOTE: this is thread safe
922 :
923 0 : std::lock_guard<std::mutex> guard(fFeMutex);
924 :
925 0 : if (fFePeriodicThreadRunning || fFePeriodicThreadStarting || fFePeriodicThread) {
926 0 : if (TMFE::gfVerbose)
927 0 : printf("TMFE::StartPeriodicThread: periodic thread already running\n");
928 0 : return;
929 : }
930 :
931 0 : fFePeriodicThreadStarting = true;
932 0 : fFePeriodicThread = new std::thread(&TMFrontend::FePeriodicThread, this);
933 0 : }
934 :
935 1 : void TMFE::StopRpcThread()
936 : {
937 : // NOTE: this is thread safe
938 :
939 1 : fRpcThreadStarting = false;
940 1 : fRpcThreadShutdownRequested = true;
941 :
942 1 : for (int i=0; i<60; i++) {
943 1 : if (!fRpcThreadRunning) {
944 1 : std::lock_guard<std::mutex> guard(fMutex);
945 1 : if (fRpcThread) {
946 0 : fRpcThread->join();
947 0 : delete fRpcThread;
948 0 : fRpcThread = NULL;
949 0 : if (gfVerbose)
950 0 : printf("TMFE::StopRpcThread: RPC thread stopped\n");
951 : }
952 1 : return;
953 1 : }
954 0 : if (i>5) {
955 0 : fprintf(stderr, "TMFE::StopRpcThread: waiting for RPC thread to stop\n");
956 : }
957 0 : ::sleep(1);
958 : }
959 :
960 0 : fprintf(stderr, "TMFE::StopRpcThread: timeout waiting for RPC thread to stop\n");
961 : }
962 :
963 0 : void TMFrontend::FeStopPeriodicThread()
964 : {
965 : // NOTE: this is thread safe
966 :
967 0 : fFePeriodicThreadStarting = false;
968 0 : fFePeriodicThreadShutdownRequested = true;
969 :
970 0 : for (int i=0; i<60; i++) {
971 0 : if (!fFePeriodicThreadRunning) {
972 0 : std::lock_guard<std::mutex> guard(fFeMutex);
973 0 : if (fFePeriodicThread) {
974 0 : fFePeriodicThread->join();
975 0 : delete fFePeriodicThread;
976 0 : fFePeriodicThread = NULL;
977 0 : if (TMFE::gfVerbose)
978 0 : printf("TMFE::StopPeriodicThread: periodic thread stopped\n");
979 : }
980 0 : return;
981 0 : }
982 0 : if (i>5) {
983 0 : fprintf(stderr, "TMFE::StopPeriodicThread: waiting for periodic thread to stop\n");
984 : }
985 0 : ::sleep(1);
986 : }
987 :
988 0 : fprintf(stderr, "TMFE::StopPeriodicThread: timeout waiting for periodic thread to stop\n");
989 : }
990 :
991 0 : void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const char *format, ...)
992 : {
993 : char message[1024];
994 : //printf("format [%s]\n", format);
995 : va_list ap;
996 0 : va_start(ap, format);
997 0 : vsnprintf(message, sizeof(message)-1, format, ap);
998 0 : va_end(ap);
999 : //printf("message [%s]\n", message);
1000 0 : cm_msg(message_type, filename, line, routine, "%s", message);
1001 0 : cm_msg_flush_buffer();
1002 0 : }
1003 :
1004 0 : void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const std::string& message)
1005 : {
1006 : //printf("message [%s]\n", message.c_str());
1007 0 : cm_msg(message_type, filename, line, routine, "%s", message.c_str());
1008 0 : cm_msg_flush_buffer();
1009 0 : }
1010 :
1011 0 : double TMFE::GetTime()
1012 : {
1013 : struct timeval tv;
1014 0 : gettimeofday(&tv, NULL);
1015 0 : return tv.tv_sec*1.0 + tv.tv_usec/1000000.0;
1016 : }
1017 :
1018 : #if 1
1019 0 : void TMFE::Sleep(double time_sec)
1020 : {
1021 0 : if (time_sec < 0) {
1022 0 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with negative sleep time: %f", time_sec);
1023 0 : return;
1024 : }
1025 :
1026 0 : if (time_sec == 0) {
1027 0 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with zero sleep time");
1028 0 : return;
1029 : }
1030 :
1031 0 : if (time_sec > 1.01) {
1032 : // break long sleep into short sleeps
1033 :
1034 0 : double t0 = TMFE::GetTime();
1035 0 : double tend = t0 + time_sec;
1036 :
1037 : while (1) {
1038 0 : double now = TMFE::GetTime();
1039 0 : if (now >= tend) {
1040 : //printf("t0 %f, tend %f, now %f, done!\n", t0, tend, now);
1041 0 : return;
1042 : }
1043 :
1044 0 : double tsleep = tend - now;
1045 :
1046 : //printf("t0 %f, tend %f, now %f, tsleep %f!\n", t0, tend, now, tsleep);
1047 :
1048 0 : if (tsleep > 1.0)
1049 0 : tsleep = 1.0;
1050 :
1051 0 : TMFE::Sleep(tsleep);
1052 0 : }
1053 :
1054 : return;
1055 : }
1056 :
1057 : int status;
1058 : struct timeval timeout;
1059 :
1060 0 : timeout.tv_sec = time_sec;
1061 0 : timeout.tv_usec = (time_sec-timeout.tv_sec)*1000000.0;
1062 :
1063 : while (1) {
1064 0 : status = select(0, NULL, NULL, NULL, &timeout);
1065 : #ifdef EINVAL
1066 0 : if (status < 0 && errno == EINVAL) {
1067 : // #warning HERE EINVAL!
1068 0 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with invalid sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
1069 0 : return;
1070 : }
1071 : #endif
1072 : #ifdef EINTR
1073 0 : if (status < 0 && errno == EINTR) {
1074 : // #warning HERE EINTR!
1075 : // NOTE1: on linux, "timeout" is modified by the kernel to subtract time already slept, we do not need to adjust it while handling EINTR.
1076 : // NOTE2: on macos and other BSD-based systems, "timeout" value is not changed, and for accurate sleeping we should modify here, to account for time already slept, but we do not.
1077 : // NOTE3: see "man select" on Linux and Macos.
1078 : // NOTE4: MIDAS no longer uses SIGALRM to run cm_watchdog() and MIDAS applications do nto use signals.
1079 : // NOTE4: so in theory, we do not have to worry about EINTR interrupting our sleep. K.O. Dec-2024
1080 : // TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() EINTR, sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
1081 0 : continue;
1082 : }
1083 : #endif
1084 0 : break;
1085 : }
1086 :
1087 0 : if (status < 0) {
1088 0 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
1089 : }
1090 : }
1091 : #endif
1092 :
1093 : #if 0
1094 : void TMFE::Sleep(double time)
1095 : {
1096 : struct timespec rqtp;
1097 : struct timespec rmtp;
1098 :
1099 : rqtp.tv_sec = time;
1100 : rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
1101 :
1102 : int status = nanosleep(&rqtp, &rmtp);
1103 :
1104 : //#ifdef EINTR
1105 : //if (status < 0 && errno == EINTR) {
1106 : // return 0; // watchdog interrupt, try again
1107 : //}
1108 : //#endif
1109 :
1110 : if (status < 0) {
1111 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
1112 : }
1113 : }
1114 : #endif
1115 :
1116 : #if 0
1117 : void TMFE::Sleep(double time)
1118 : {
1119 : struct timespec rqtp;
1120 : struct timespec rmtp;
1121 :
1122 : rqtp.tv_sec = time;
1123 : rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
1124 :
1125 : //int status = clock_nanosleep(CLOCK_REALTIME, 0, &rqtp, &rmtp);
1126 : int status = clock_nanosleep(CLOCK_MONOTONIC, 0, &rqtp, &rmtp);
1127 :
1128 : //#ifdef EINTR
1129 : //if (status < 0 && errno == EINTR) {
1130 : // return 0; // watchdog interrupt, try again
1131 : //}
1132 : //#endif
1133 :
1134 : if (status < 0) {
1135 : TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
1136 : }
1137 : }
1138 : #endif
1139 :
1140 0 : std::string TMFE::GetThreadId() ///< return identification of this thread
1141 : {
1142 0 : return ss_tid_to_string(ss_gettid());
1143 : }
1144 :
1145 0 : static INT rpc_callback(INT index, void *prpc_param[])
1146 : {
1147 0 : const char* cmd = CSTRING(0);
1148 0 : const char* args = CSTRING(1);
1149 0 : char* return_buf = CSTRING(2);
1150 0 : int return_max_length = CINT(3);
1151 :
1152 0 : if (TMFE::gfVerbose)
1153 0 : printf("TMFE::rpc_callback: index %d, max_length %d, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
1154 :
1155 0 : TMFE* mfe = TMFE::Instance();
1156 :
1157 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1158 0 : for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1159 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1160 0 : if (!h)
1161 0 : continue;
1162 0 : std::string result = "";
1163 0 : TMFeResult r = h->HandleRpc(cmd, args, result);
1164 0 : if (result.length() > 0) {
1165 : //printf("Handler reply [%s]\n", C(r));
1166 0 : mstrlcpy(return_buf, result.c_str(), return_max_length);
1167 0 : return RPC_SUCCESS;
1168 : }
1169 0 : }
1170 :
1171 0 : return_buf[0] = 0;
1172 0 : return RPC_SUCCESS;
1173 : }
1174 :
1175 0 : static INT binary_rpc_callback(INT index, void *prpc_param[])
1176 : {
1177 0 : const char* cmd = CSTRING(0);
1178 0 : const char* args = CSTRING(1);
1179 0 : char* return_buf = CSTRING(2);
1180 0 : size_t return_max_length = CINT(3);
1181 :
1182 0 : if (TMFE::gfVerbose)
1183 0 : printf("TMFE::binary_rpc_callback: index %d, max_length %zu, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
1184 :
1185 0 : TMFE* mfe = TMFE::Instance();
1186 :
1187 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1188 0 : for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1189 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1190 0 : if (!h)
1191 0 : continue;
1192 0 : std::vector<char> result;
1193 0 : TMFeResult r = h->HandleBinaryRpc(cmd, args, result);
1194 0 : if (result.size() > 0) {
1195 0 : if (result.size() > return_max_length) {
1196 0 : TMFE::Instance()->Msg(MERROR, "TMFE::binary_rpc_callback", "RPC handler returned too much data, %zu bytes truncated to %zu bytes", result.size(), return_max_length);
1197 0 : result.resize(return_max_length);
1198 : }
1199 : //printf("Handler reply [%s]\n", C(r));
1200 0 : assert(result.size() <= return_max_length);
1201 0 : memcpy(return_buf, result.data(), result.size());
1202 0 : CINT(3) = result.size();
1203 0 : return RPC_SUCCESS;
1204 : }
1205 0 : }
1206 :
1207 0 : CINT(3) = 0;
1208 0 : return_buf[0] = 0;
1209 0 : return RPC_SUCCESS;
1210 : }
1211 :
1212 : class TMFrontendRpcHelper: public TMFeRpcHandlerInterface
1213 : {
1214 : public:
1215 : TMFrontend* fFe = NULL;
1216 :
1217 : public:
1218 0 : TMFrontendRpcHelper(TMFrontend* fe) // ctor
1219 0 : {
1220 0 : if (TMFE::gfVerbose)
1221 0 : printf("TMFrontendRpcHelper::ctor!\n");
1222 :
1223 0 : fFe = fe;
1224 0 : }
1225 :
1226 0 : virtual ~TMFrontendRpcHelper() // dtor
1227 0 : {
1228 0 : if (TMFE::gfVerbose)
1229 0 : printf("TMFrontendRpcHelper::dtor!\n");
1230 :
1231 : // poison pointers
1232 0 : fFe = NULL;
1233 0 : }
1234 :
1235 0 : TMFeResult HandleBeginRun(int run_number)
1236 : {
1237 0 : if (TMFE::gfVerbose)
1238 0 : printf("TMFrontendRpcHelper::HandleBeginRun!\n");
1239 :
1240 0 : for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
1241 0 : TMFeEquipment* eq = fFe->fFeEquipments[i];
1242 0 : if (!eq)
1243 0 : continue;
1244 0 : if (!eq->fEqConfEnabled)
1245 0 : continue;
1246 0 : eq->EqZeroStatistics();
1247 0 : eq->EqWriteStatistics();
1248 : }
1249 0 : return TMFeOk();
1250 : }
1251 :
1252 0 : TMFeResult HandleEndRun(int run_number)
1253 : {
1254 0 : if (TMFE::gfVerbose)
1255 0 : printf("TMFrontendRpcHelper::HandleEndRun!\n");
1256 :
1257 0 : for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
1258 0 : TMFeEquipment* eq = fFe->fFeEquipments[i];
1259 0 : if (!eq)
1260 0 : continue;
1261 0 : if (!eq->fEqConfEnabled)
1262 0 : continue;
1263 0 : eq->EqWriteStatistics();
1264 : }
1265 :
1266 0 : TMFeResult r = fFe->fMfe->EventBufferFlushCacheAll();
1267 :
1268 0 : if (r.error_flag)
1269 0 : return r;
1270 :
1271 0 : return TMFeOk();
1272 0 : }
1273 : };
1274 :
1275 0 : static INT tr_start(INT run_number, char *errstr)
1276 : {
1277 0 : if (TMFE::gfVerbose)
1278 0 : printf("TMFE::tr_start!\n");
1279 :
1280 0 : TMFE* mfe = TMFE::Instance();
1281 :
1282 0 : mfe->fRunNumber = run_number;
1283 0 : mfe->fStateRunning = true;
1284 :
1285 0 : TMFeResult result;
1286 :
1287 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleBeginRun() modifies fEquipments. K.O.
1288 0 : for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1289 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1290 0 : if (!h)
1291 0 : continue;
1292 0 : result = h->HandleBeginRun(run_number);
1293 0 : if (result.error_flag) {
1294 : // error handling in this function matches general transition error handling:
1295 : // on run start, the first user handler to return an error code
1296 : // will abort the transition. This leaves everything in an
1297 : // inconsistent state: frontends called before the abort
1298 : // think the run is running, which it does not. They should register
1299 : // a handler for the "start abort" transition. This transition calls
1300 : // all already started frontends so they can cleanup their state. K.O.
1301 : //
1302 0 : break;
1303 : }
1304 : }
1305 :
1306 0 : if (result.error_flag) {
1307 0 : mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1308 0 : return FE_ERR_DRIVER;
1309 : }
1310 :
1311 0 : return SUCCESS;
1312 0 : }
1313 :
1314 0 : static INT tr_stop(INT run_number, char *errstr)
1315 : {
1316 0 : if (TMFE::gfVerbose)
1317 0 : printf("TMFE::tr_stop!\n");
1318 :
1319 0 : TMFeResult result;
1320 :
1321 0 : TMFE* mfe = TMFE::Instance();
1322 :
1323 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleEndRun() modifies fEquipments. K.O.
1324 : // NOTE: we need to stop thing in reverse order, otherwise TMFrontend code
1325 : // does not work right - TMFrontend is registered first, and (correctly) runs
1326 : // first at begin of run (to clear statistics, etc). But at the end of run
1327 : // it needs to run last, to update the statistics, etc. after all the equipments
1328 : // have done their end of run things and are finished. K.O.
1329 0 : for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
1330 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1331 0 : if (!h)
1332 0 : continue;
1333 0 : TMFeResult xresult = h->HandleEndRun(run_number);
1334 0 : if (xresult.error_flag) {
1335 : // error handling in this function matches general transition error handling:
1336 : // the "run stop" transition is always sucessful, the run always stops.
1337 : // if some frontend returns an error, this error is remembered and is returned
1338 : // as the transition over all status. K.O.
1339 0 : result = xresult;
1340 : }
1341 0 : }
1342 :
1343 0 : mfe->fStateRunning = false;
1344 :
1345 0 : if (result.error_flag) {
1346 0 : mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1347 0 : return FE_ERR_DRIVER;
1348 : }
1349 :
1350 0 : return SUCCESS;
1351 0 : }
1352 :
1353 0 : static INT tr_pause(INT run_number, char *errstr)
1354 : {
1355 0 : cm_msg(MINFO, "tr_pause", "tr_pause");
1356 :
1357 0 : TMFeResult result;
1358 :
1359 0 : TMFE* mfe = TMFE::Instance();
1360 :
1361 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandlePauseRun() modifies fEquipments. K.O.
1362 : // NOTE: tr_pause runs in reverse order to match tr_stop. K.O.
1363 0 : for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
1364 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1365 0 : if (!h)
1366 0 : continue;
1367 0 : result = h->HandlePauseRun(run_number);
1368 0 : if (result.error_flag) {
1369 : // error handling in this function matches general transition error handling:
1370 : // logic is same as "start run"
1371 0 : break;
1372 : }
1373 : }
1374 :
1375 0 : if (result.error_flag) {
1376 0 : mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1377 0 : return FE_ERR_DRIVER;
1378 : }
1379 :
1380 0 : return SUCCESS;
1381 0 : }
1382 :
1383 0 : static INT tr_resume(INT run_number, char *errstr)
1384 : {
1385 0 : if (TMFE::gfVerbose)
1386 0 : printf("TMFE::tr_resume!\n");
1387 :
1388 0 : TMFeResult result;
1389 :
1390 0 : TMFE* mfe = TMFE::Instance();
1391 :
1392 0 : mfe->fRunNumber = run_number;
1393 0 : mfe->fStateRunning = true;
1394 :
1395 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleResumeRun() modifies fEquipments. K.O.
1396 0 : for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1397 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1398 0 : if (!h)
1399 0 : continue;
1400 0 : result = h->HandleResumeRun(run_number);
1401 0 : if (result.error_flag) {
1402 : // error handling in this function matches general transition error handling:
1403 : // logic is same as "start run"
1404 0 : break;
1405 : }
1406 : }
1407 :
1408 0 : if (result.error_flag) {
1409 0 : mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1410 0 : return FE_ERR_DRIVER;
1411 : }
1412 :
1413 0 : return SUCCESS;
1414 0 : }
1415 :
1416 0 : static INT tr_startabort(INT run_number, char *errstr)
1417 : {
1418 0 : if (TMFE::gfVerbose)
1419 0 : printf("TMFE::tr_startabort!\n");
1420 :
1421 0 : TMFeResult result;
1422 :
1423 0 : TMFE* mfe = TMFE::Instance();
1424 :
1425 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleStartAbortRun() modifies fEquipments. K.O.
1426 0 : for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1427 0 : TMFeRpcHandlerInterface* h = mfe->fRpcHandlers[i];
1428 0 : if (!h)
1429 0 : continue;
1430 0 : result = h->HandleStartAbortRun(run_number);
1431 0 : if (result.error_flag) {
1432 : // error handling in this function matches general transition error handling:
1433 : // logic is same as "start run"
1434 0 : break;
1435 : }
1436 : }
1437 :
1438 0 : mfe->fStateRunning = false;
1439 :
1440 0 : if (result.error_flag) {
1441 0 : mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1442 0 : return FE_ERR_DRIVER;
1443 : }
1444 :
1445 0 : return SUCCESS;
1446 0 : }
1447 :
1448 0 : void TMFE::SetTransitionSequenceStart(int seqno)
1449 : {
1450 0 : cm_set_transition_sequence(TR_START, seqno);
1451 0 : }
1452 :
1453 0 : void TMFE::SetTransitionSequenceStop(int seqno)
1454 : {
1455 0 : cm_set_transition_sequence(TR_STOP, seqno);
1456 0 : }
1457 :
1458 0 : void TMFE::SetTransitionSequencePause(int seqno)
1459 : {
1460 0 : cm_set_transition_sequence(TR_PAUSE, seqno);
1461 0 : }
1462 :
1463 0 : void TMFE::SetTransitionSequenceResume(int seqno)
1464 : {
1465 0 : cm_set_transition_sequence(TR_RESUME, seqno);
1466 0 : }
1467 :
1468 0 : void TMFE::SetTransitionSequenceStartAbort(int seqno)
1469 : {
1470 0 : cm_set_transition_sequence(TR_STARTABORT, seqno);
1471 0 : }
1472 :
1473 0 : void TMFE::DeregisterTransitions()
1474 : {
1475 0 : cm_deregister_transition(TR_START);
1476 0 : cm_deregister_transition(TR_STOP);
1477 0 : cm_deregister_transition(TR_PAUSE);
1478 0 : cm_deregister_transition(TR_RESUME);
1479 0 : cm_deregister_transition(TR_STARTABORT);
1480 0 : }
1481 :
1482 0 : void TMFE::DeregisterTransitionStart()
1483 : {
1484 0 : cm_deregister_transition(TR_START);
1485 0 : }
1486 :
1487 0 : void TMFE::DeregisterTransitionStop()
1488 : {
1489 0 : cm_deregister_transition(TR_STOP);
1490 0 : }
1491 :
1492 0 : void TMFE::DeregisterTransitionPause()
1493 : {
1494 0 : cm_deregister_transition(TR_PAUSE);
1495 0 : }
1496 :
1497 0 : void TMFE::DeregisterTransitionResume()
1498 : {
1499 0 : cm_deregister_transition(TR_RESUME);
1500 0 : }
1501 :
1502 0 : void TMFE::DeregisterTransitionStartAbort()
1503 : {
1504 0 : cm_deregister_transition(TR_STARTABORT);
1505 0 : }
1506 :
1507 0 : void TMFE::RegisterTransitionStartAbort()
1508 : {
1509 0 : cm_register_transition(TR_STARTABORT, tr_startabort, 500);
1510 0 : }
1511 :
1512 1 : void TMFE::RegisterRPCs()
1513 : {
1514 1 : if (TMFE::gfVerbose)
1515 0 : printf("TMFE::RegisterRPCs!\n");
1516 :
1517 1 : cm_register_function(RPC_JRPC, rpc_callback);
1518 1 : cm_register_function(RPC_BRPC, binary_rpc_callback);
1519 1 : cm_register_transition(TR_START, tr_start, 500);
1520 1 : cm_register_transition(TR_STOP, tr_stop, 500);
1521 1 : cm_register_transition(TR_PAUSE, tr_pause, 500);
1522 1 : cm_register_transition(TR_RESUME, tr_resume, 500);
1523 1 : cm_register_transition(TR_STARTABORT, tr_startabort, 500);
1524 1 : }
1525 :
1526 0 : void TMFE::AddRpcHandler(TMFeRpcHandlerInterface* h)
1527 : {
1528 0 : fRpcHandlers.push_back(h);
1529 0 : }
1530 :
1531 0 : void TMFE::RemoveRpcHandler(TMFeRpcHandlerInterface* h)
1532 : {
1533 0 : for (unsigned i=0; i<fRpcHandlers.size(); i++) {
1534 0 : if (fRpcHandlers[i] == h) {
1535 0 : fRpcHandlers[i] = NULL;
1536 : }
1537 : }
1538 0 : }
1539 :
1540 0 : TMFrontend::TMFrontend() // ctor
1541 : {
1542 0 : fMfe = TMFE::Instance();
1543 0 : }
1544 :
1545 0 : TMFrontend::~TMFrontend() // dtor
1546 : {
1547 0 : if (fFeRpcHelper) {
1548 0 : fMfe->RemoveRpcHandler(fFeRpcHelper);
1549 0 : delete fFeRpcHelper;
1550 0 : fFeRpcHelper = NULL;
1551 : }
1552 : // poison all pointers
1553 0 : fMfe = NULL;
1554 0 : }
1555 :
1556 0 : TMFeResult TMFrontend::FeInitEquipments(const std::vector<std::string>& args)
1557 : {
1558 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleInit() modifies fEquipments. K.O.
1559 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
1560 0 : if (!fFeEquipments[i])
1561 0 : continue;
1562 0 : if (!fFeEquipments[i]->fEqConfEnabled)
1563 0 : continue;
1564 0 : TMFeResult r = fFeEquipments[i]->EqInit(args);
1565 0 : if (r.error_flag)
1566 0 : return r;
1567 0 : }
1568 0 : return TMFeOk();
1569 : }
1570 :
1571 0 : void TMFrontend::FeStopEquipmentPollThreads()
1572 : {
1573 : // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1574 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
1575 0 : if (!fFeEquipments[i])
1576 0 : continue;
1577 0 : fFeEquipments[i]->EqStopPollThread();
1578 : }
1579 0 : }
1580 :
1581 0 : void TMFrontend::FeDeleteEquipments()
1582 : {
1583 : // NOTE: this is thread-safe: we do not modify the fEquipments object. K.O.
1584 : // NOTE: this is not thread-safe, we will race against ourselves and do multiple delete of fEquipents[i]. K.O.
1585 :
1586 : // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1587 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
1588 0 : if (!fFeEquipments[i])
1589 0 : continue;
1590 : //printf("delete equipment [%s]\n", fFeEquipments[i]->fEqName.c_str());
1591 0 : fMfe->RemoveRpcHandler(fFeEquipments[i]);
1592 0 : delete fFeEquipments[i];
1593 0 : fFeEquipments[i] = NULL;
1594 : }
1595 :
1596 0 : fMfe->EventBufferFlushCacheAll();
1597 0 : fMfe->EventBufferCloseAll();
1598 0 : }
1599 :
1600 0 : TMFeResult TMFrontend::FeAddEquipment(TMFeEquipment* eq)
1601 : {
1602 : // NOTE: not thread-safe, we modify the fEquipments object. K.O.
1603 :
1604 : // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1605 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
1606 0 : if (!fFeEquipments[i])
1607 0 : continue;
1608 0 : if (fFeEquipments[i] == eq) {
1609 0 : fprintf(stderr, "TMFE::AddEquipment: Fatal error: Equipment \"%s\" is already registered, bye...\n", fFeEquipments[i]->fEqName.c_str());
1610 0 : fMfe->Disconnect();
1611 0 : exit(1);
1612 : //return TMFeErrorMessage(msprintf("TMFE::AddEquipment: Equipment \"%s\" is already registered", fFeEquipments[i]->fEqName.c_str()));
1613 : }
1614 0 : if (fFeEquipments[i]->fEqName == eq->fEqName) {
1615 0 : fprintf(stderr, "TMFE::AddEquipment: Fatal error: Duplicate equipment name \"%s\", bye...\n", eq->fEqName.c_str());
1616 0 : fMfe->Disconnect();
1617 0 : exit(1);
1618 : //return TMFeErrorMessage(std::string("TMFE::AddEquipment: Duplicate equipment name \"") + eq->fEqName + "\"");
1619 : }
1620 : }
1621 :
1622 0 : eq->fFe = this;
1623 :
1624 : // NOTE: fEquipments must be protected again multithreaded access here. K.O.
1625 0 : fFeEquipments.push_back(eq);
1626 :
1627 0 : return TMFeOk();
1628 : }
1629 :
1630 0 : TMFeResult TMFrontend::FeRemoveEquipment(TMFeEquipment* eq)
1631 : {
1632 : // NOTE: this is thread-safe, we do not modify the fEquipments object. K.O.
1633 :
1634 : // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1635 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
1636 0 : if (!fFeEquipments[i])
1637 0 : continue;
1638 0 : if (fFeEquipments[i] == eq) {
1639 0 : fFeEquipments[i] = NULL;
1640 0 : return TMFeOk();
1641 : }
1642 : }
1643 :
1644 0 : return TMFeErrorMessage(msprintf("TMFE::RemoveEquipment: Cannot find equipment \"%s\"", eq->fEqName.c_str()));
1645 : }
1646 :
1647 0 : void TMFrontend::FeSetName(const char* program_name)
1648 : {
1649 0 : assert(program_name != NULL);
1650 0 : fMfe->fProgramName = program_name;
1651 0 : }
1652 :
1653 0 : TMFeEquipment::TMFeEquipment(const char* eqname, const char* eqfilename) // ctor
1654 : {
1655 0 : assert(eqname != NULL);
1656 0 : assert(eqfilename != NULL);
1657 :
1658 0 : if (TMFE::gfVerbose)
1659 0 : printf("TMFeEquipment::ctor: equipment name [%s] file [%s]\n", eqname, eqfilename);
1660 :
1661 0 : fMfe = TMFE::Instance();
1662 0 : fEqName = eqname;
1663 0 : fEqFilename = eqfilename;
1664 0 : fEqStatNextWrite = TMFE::GetTime();
1665 0 : }
1666 :
1667 0 : TMFeEquipment::~TMFeEquipment() // dtor
1668 : {
1669 0 : if (TMFE::gfVerbose)
1670 0 : printf("TMFeEquipment::dtor: equipment name [%s]\n", fEqName.c_str());
1671 :
1672 0 : EqStopPollThread();
1673 :
1674 : // free data and poison pointers
1675 0 : if (fOdbEq) {
1676 0 : delete fOdbEq;
1677 0 : fOdbEq = NULL;
1678 : }
1679 0 : if (fOdbEqCommon) {
1680 0 : delete fOdbEqCommon;
1681 0 : fOdbEqCommon = NULL;
1682 : }
1683 0 : if (fOdbEqSettings) {
1684 0 : delete fOdbEqSettings;
1685 0 : fOdbEqSettings = NULL;
1686 : }
1687 0 : if (fOdbEqVariables) {
1688 0 : delete fOdbEqVariables;
1689 0 : fOdbEqVariables = NULL;
1690 : }
1691 0 : if (fOdbEqStatistics) {
1692 0 : delete fOdbEqStatistics;
1693 0 : fOdbEqStatistics = NULL;
1694 : }
1695 0 : fMfe = NULL;
1696 0 : fFe = NULL;
1697 0 : fEqEventBuffer = NULL;
1698 0 : }
1699 :
1700 0 : TMFeResult TMFeEquipment::EqInit(const std::vector<std::string>& args)
1701 : {
1702 0 : TMFeResult r;
1703 :
1704 0 : r = EqPreInit();
1705 0 : if (r.error_flag)
1706 0 : return r;
1707 :
1708 0 : r = HandleInit(args);
1709 0 : if (r.error_flag)
1710 0 : return r;
1711 :
1712 0 : r = EqPostInit();
1713 0 : if (r.error_flag)
1714 0 : return r;
1715 :
1716 0 : return TMFeOk();
1717 0 : }
1718 :
1719 0 : TMFeResult TMFeEquipment::EqReadCommon()
1720 : {
1721 0 : if (TMFE::gfVerbose)
1722 0 : printf("TMFeEquipment::EqReadCommon: for [%s]\n", fEqName.c_str());
1723 :
1724 : // list of ODB Common entries always read
1725 :
1726 0 : fOdbEqCommon->RB("Enabled", &fEqConfEnabled, true);
1727 0 : fOdbEqCommon->RD("Event limit", &fEqConfEventLimit, true);
1728 :
1729 0 : if (fEqConfReadConfigFromOdb) {
1730 : // list of ODB Common entries read if we want to control equipment from ODB
1731 :
1732 0 : fOdbEqCommon->RU16("Event ID", &fEqConfEventID, true);
1733 0 : fOdbEqCommon->RU16("Trigger mask", &fEqConfTriggerMask, true);
1734 0 : fOdbEqCommon->RS("Buffer", &fEqConfBuffer, true, NAME_LENGTH);
1735 0 : fOdbEqCommon->RI("Type", &fEqConfType, true);
1736 0 : fOdbEqCommon->RI("Source", &fEqConfSource, true);
1737 0 : fOdbEqCommon->RS("Format", &fEqConfFormat, true, 8);
1738 0 : fOdbEqCommon->RI("Read on", &fEqConfReadOn, true);
1739 0 : fOdbEqCommon->RI("Period", &fEqConfPeriodMilliSec, true);
1740 0 : fOdbEqCommon->RU32("Num subevents", &fEqConfNumSubEvents, true);
1741 0 : fOdbEqCommon->RI("Log history", &fEqConfLogHistory, true);
1742 0 : fOdbEqCommon->RB("Hidden", &fEqConfHidden, true);
1743 0 : fOdbEqCommon->RI("Write cache size", &fEqConfWriteCacheSize, true);
1744 :
1745 : // decode data from ODB Common
1746 :
1747 0 : fEqConfReadOnlyWhenRunning = !(fEqConfReadOn & (RO_PAUSED|RO_STOPPED));
1748 0 : fEqConfWriteEventsToOdb = (fEqConfReadOn & RO_ODB);
1749 : }
1750 :
1751 : // list of ODB Common entries we read and write back to ODB, but do not actually use.
1752 :
1753 : //fOdbEqCommon->RS("Frontend host", &fEqConfFrontendHost, true, NAME_LENGTH);
1754 : //fOdbEqCommon->RS("Frontend name", &fEqConfFrontendName, true, NAME_LENGTH);
1755 : //fOdbEqCommon->RS("Frontend file name", &fEqConfFrontendFileName, true, 256);
1756 : //fOdbEqCommon->RS("Status", &fEqConfStatus, true, 256);
1757 : //fOdbEqCommon->RS("Status color", &fEqConfStatusColor, true, NAME_LENGTH);
1758 :
1759 0 : return TMFeOk();
1760 : }
1761 :
1762 0 : TMFeResult TMFeEquipment::EqWriteCommon(bool create)
1763 : {
1764 0 : if (TMFE::gfVerbose)
1765 0 : printf("TMFeEquipment::EqWriteCommon: for [%s]\n", fEqName.c_str());
1766 :
1767 : // encode data for ODB Common
1768 :
1769 0 : fEqConfReadOn = 0;
1770 0 : if (fEqConfReadOnlyWhenRunning)
1771 0 : fEqConfReadOn |= (RO_RUNNING);
1772 : else
1773 0 : fEqConfReadOn |= (RO_RUNNING|RO_PAUSED|RO_STOPPED);
1774 0 : if (fEqConfWriteEventsToOdb)
1775 0 : fEqConfReadOn |= RO_ODB;
1776 :
1777 : // write to ODB
1778 :
1779 0 : fOdbEqCommon->WU16("Event ID", fEqConfEventID);
1780 0 : fOdbEqCommon->WU16("Trigger mask", fEqConfTriggerMask);
1781 0 : fOdbEqCommon->WS("Buffer", fEqConfBuffer.c_str(), NAME_LENGTH);
1782 0 : fOdbEqCommon->WI("Type", fEqConfType);
1783 0 : fOdbEqCommon->WI("Source", fEqConfSource);
1784 0 : fOdbEqCommon->WS("Format", fEqConfFormat.c_str(), 8);
1785 0 : fOdbEqCommon->WB("Enabled", fEqConfEnabled);
1786 0 : fOdbEqCommon->WI("Read on", fEqConfReadOn);
1787 0 : fOdbEqCommon->WI("Period", fEqConfPeriodMilliSec);
1788 0 : fOdbEqCommon->WD("Event limit", fEqConfEventLimit);
1789 0 : fOdbEqCommon->WU32("Num subevents", fEqConfNumSubEvents);
1790 0 : fOdbEqCommon->WI("Log history", fEqConfLogHistory);
1791 0 : fOdbEqCommon->WS("Frontend host", fMfe->fHostname.c_str(), NAME_LENGTH);
1792 0 : fOdbEqCommon->WS("Frontend name", fMfe->fProgramName.c_str(), NAME_LENGTH);
1793 0 : fOdbEqCommon->WS("Frontend file name", fEqFilename.c_str(), 256);
1794 0 : if (create) {
1795 0 : fOdbEqCommon->WS("Status", "", 256);
1796 0 : fOdbEqCommon->WS("Status color", "", NAME_LENGTH);
1797 : }
1798 0 : fOdbEqCommon->WB("Hidden", fEqConfHidden);
1799 0 : fOdbEqCommon->WI("Write cache size", fEqConfWriteCacheSize);
1800 0 : return TMFeOk();
1801 : }
1802 :
1803 0 : TMFeResult TMFeEquipment::EqPreInit()
1804 : {
1805 0 : if (TMFE::gfVerbose)
1806 0 : printf("TMFeEquipment::PreInit: for [%s]\n", fEqName.c_str());
1807 :
1808 : //
1809 : // Apply frontend index
1810 : //
1811 :
1812 0 : if (fEqName.find("%") != std::string::npos) {
1813 0 : fEqName = msprintf(fEqName.c_str(), fFe->fFeIndex);
1814 : }
1815 :
1816 0 : if (fEqConfBuffer.find("%") != std::string::npos) {
1817 0 : fEqConfBuffer = msprintf(fEqConfBuffer.c_str(), fFe->fFeIndex);
1818 : }
1819 :
1820 : //
1821 : // create ODB /eq/name/common
1822 : //
1823 :
1824 0 : fOdbEq = fMfe->fOdbRoot->Chdir((std::string("Equipment/") + fEqName).c_str(), true);
1825 0 : fOdbEqCommon = fOdbEq->Chdir("Common", false);
1826 0 : if (!fOdbEqCommon) {
1827 0 : if (TMFE::gfVerbose)
1828 0 : printf("TMFeEquipment::PreInit: creating ODB common\n");
1829 0 : fOdbEqCommon = fOdbEq->Chdir("Common", true);
1830 0 : EqWriteCommon(true);
1831 : }
1832 0 : fOdbEqSettings = fOdbEq->Chdir("Settings", true);
1833 0 : fOdbEqVariables = fOdbEq->Chdir("Variables", true);
1834 0 : fOdbEqStatistics = fOdbEq->Chdir("Statistics", true);
1835 :
1836 0 : TMFeResult r = EqReadCommon();
1837 :
1838 0 : if (r.error_flag)
1839 0 : return r;
1840 :
1841 0 : if (rpc_is_remote()) {
1842 0 : EqSetStatus((fMfe->fProgramName + "@" + fMfe->fHostname).c_str(), "greenLight");
1843 : } else {
1844 0 : EqSetStatus(fMfe->fProgramName.c_str(), "greenLight");
1845 : }
1846 :
1847 0 : EqZeroStatistics();
1848 0 : EqWriteStatistics();
1849 :
1850 0 : return TMFeOk();
1851 0 : }
1852 :
1853 0 : TMFeResult TMFeEquipment::EqPostInit()
1854 : {
1855 0 : if (TMFE::gfVerbose)
1856 0 : printf("TMFeEquipment::EqPostInit: for [%s]\n", fEqName.c_str());
1857 :
1858 0 : if (!fEqConfEnabled) {
1859 0 : EqSetStatus("Disabled", "yellowLight");
1860 : }
1861 :
1862 : // open event buffer
1863 :
1864 0 : uint32_t odb_max_event_size = DEFAULT_MAX_EVENT_SIZE;
1865 0 : fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &odb_max_event_size, true);
1866 :
1867 0 : if (fEqConfMaxEventSize == 0) {
1868 0 : fEqConfMaxEventSize = odb_max_event_size;
1869 0 : } else if (fEqConfMaxEventSize > odb_max_event_size) {
1870 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than ODB MAX_EVENT_SIZE %d", fEqName.c_str(), (int)fEqConfMaxEventSize, odb_max_event_size);
1871 0 : fEqConfMaxEventSize = odb_max_event_size;
1872 : }
1873 :
1874 0 : if (!fEqConfBuffer.empty()) {
1875 0 : TMFeResult r = fMfe->EventBufferOpen(&fEqEventBuffer, fEqConfBuffer.c_str(), fEqConfBufferSize);
1876 :
1877 0 : if (r.error_flag)
1878 0 : return r;
1879 :
1880 0 : assert(fEqEventBuffer != NULL);
1881 :
1882 0 : if (fEqConfMaxEventSize > fEqEventBuffer->fBufMaxEventSize) {
1883 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than event buffer \"%s\" max event size %d", fEqName.c_str(), (int)fEqConfMaxEventSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufMaxEventSize);
1884 0 : fEqConfMaxEventSize = fEqEventBuffer->fBufMaxEventSize;
1885 : }
1886 :
1887 0 : if (fEqConfWriteCacheSize > 0) {
1888 0 : if (fEqEventBuffer->fBufWriteCacheSize == 0) {
1889 0 : r = fEqEventBuffer->SetCacheSize(0, fEqConfWriteCacheSize);
1890 :
1891 0 : if (r.error_flag)
1892 0 : return r;
1893 0 : } else if (fEqConfWriteCacheSize < (int)fEqEventBuffer->fBufWriteCacheSize) {
1894 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is smaller then already set write cache size %d, ignoring it", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
1895 0 : } else if (fEqConfWriteCacheSize == (int)fEqEventBuffer->fBufWriteCacheSize) {
1896 : // do nothing
1897 : } else {
1898 0 : fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is different from already set write cache size %d", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
1899 :
1900 0 : r = fEqEventBuffer->SetCacheSize(0, fEqConfWriteCacheSize);
1901 :
1902 0 : if (r.error_flag)
1903 0 : return r;
1904 : }
1905 : }
1906 0 : }
1907 :
1908 0 : if (TMFE::gfVerbose)
1909 0 : printf("TMFeEquipment::EqPostInit: Equipment \"%s\", max event size: %d\n", fEqName.c_str(), (int)fEqConfMaxEventSize);
1910 :
1911 : // update ODB common
1912 :
1913 0 : TMFeResult r = EqWriteCommon();
1914 :
1915 0 : if (r.error_flag)
1916 0 : return r;
1917 :
1918 0 : if (fEqConfEnabled && fEqConfEnableRpc) {
1919 0 : fMfe->AddRpcHandler(this);
1920 : }
1921 :
1922 0 : return TMFeOk();
1923 0 : };
1924 :
1925 0 : TMFeResult TMFeEquipment::EqZeroStatistics()
1926 : {
1927 0 : fEqMutex.lock();
1928 :
1929 0 : if (TMFE::gfVerbose)
1930 0 : printf("TMFeEquipment::EqZeroStatistics: zero statistics for [%s]\n", fEqName.c_str());
1931 :
1932 0 : double now = TMFE::GetTime();
1933 :
1934 0 : fEqStatEvents = 0;
1935 0 : fEqStatBytes = 0;
1936 0 : fEqStatEpS = 0;
1937 0 : fEqStatKBpS = 0;
1938 :
1939 0 : fEqStatLastTime = now;
1940 0 : fEqStatLastEvents = 0;
1941 0 : fEqStatLastBytes = 0;
1942 :
1943 0 : fEqStatNextWrite = now; // force immediate update
1944 :
1945 0 : fEqMutex.unlock();
1946 :
1947 0 : return TMFeOk();
1948 : }
1949 :
1950 0 : TMFeResult TMFeEquipment::EqWriteStatistics()
1951 : {
1952 0 : fEqMutex.lock();
1953 :
1954 0 : if (TMFE::gfVerbose)
1955 0 : printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s]\n", fEqName.c_str());
1956 :
1957 0 : double now = TMFE::GetTime();
1958 0 : double elapsed = now - fEqStatLastTime;
1959 :
1960 0 : if (elapsed > 0.9 || fEqStatLastTime == 0) {
1961 0 : fEqStatEpS = (fEqStatEvents - fEqStatLastEvents) / elapsed;
1962 0 : fEqStatKBpS = (fEqStatBytes - fEqStatLastBytes) / elapsed / 1000.0;
1963 :
1964 0 : fEqStatLastTime = now;
1965 0 : fEqStatLastEvents = fEqStatEvents;
1966 0 : fEqStatLastBytes = fEqStatBytes;
1967 : }
1968 :
1969 : //printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s], now %f, elapsed %f, sent %f, eps %f, kps %f\n", fEqName.c_str(), now, elapsed, fEqStatEvents, fEqStatEpS, fEqStatKBpS);
1970 :
1971 0 : fOdbEqStatistics->WD("Events sent", fEqStatEvents);
1972 0 : fOdbEqStatistics->WD("Events per sec.", fEqStatEpS);
1973 0 : fOdbEqStatistics->WD("kBytes per sec.", fEqStatKBpS);
1974 :
1975 0 : fEqStatLastWrite = now;
1976 :
1977 0 : if (fEqConfPeriodStatisticsSec > 0) {
1978 : // avoid creep of NextWrite: we start it at
1979 : // time of initialization, then increment it strictly
1980 : // by the period value, regardless of when it is actually
1981 : // written to ODB (actual period is longer than requested
1982 : // period because we only over-sleep, never under-sleep). K.O.
1983 0 : while (fEqStatNextWrite <= now) {
1984 0 : fEqStatNextWrite += fEqConfPeriodStatisticsSec;
1985 : }
1986 : } else {
1987 0 : fEqStatNextWrite = now;
1988 : }
1989 :
1990 0 : fEqMutex.unlock();
1991 :
1992 0 : return TMFeOk();
1993 : }
1994 :
1995 0 : TMFeResult TMFeEquipment::ComposeEvent(char* event, size_t size) const
1996 : {
1997 0 : EVENT_HEADER* pevent = (EVENT_HEADER*)event;
1998 0 : pevent->event_id = fEqConfEventID;
1999 0 : pevent->trigger_mask = fEqConfTriggerMask;
2000 0 : pevent->serial_number = fEqSerial;
2001 0 : pevent->time_stamp = TMFE::GetTime();
2002 0 : pevent->data_size = 0;
2003 0 : return TMFeOk();
2004 : }
2005 :
2006 0 : TMFeResult TMFeEquipment::EqSendEvent(const char* event, bool write_to_odb)
2007 : {
2008 0 : std::lock_guard<std::mutex> guard(fEqMutex);
2009 :
2010 0 : fEqSerial++;
2011 :
2012 0 : EVENT_HEADER* pevent = (EVENT_HEADER*)event;
2013 0 : pevent->data_size = BkSize(event);
2014 :
2015 0 : if (fEqEventBuffer != NULL) {
2016 0 : TMFeResult r = fEqEventBuffer->SendEvent(event);
2017 :
2018 0 : if (r.error_flag)
2019 0 : return r;
2020 0 : }
2021 :
2022 0 : fEqStatEvents += 1;
2023 0 : fEqStatBytes += sizeof(EVENT_HEADER) + pevent->data_size;
2024 :
2025 0 : if (fEqConfWriteEventsToOdb && write_to_odb) {
2026 0 : TMFeResult r = EqWriteEventToOdb_locked(event);
2027 0 : if (r.error_flag)
2028 0 : return r;
2029 0 : }
2030 :
2031 0 : if (fMfe->fStateRunning) {
2032 0 : if (fEqConfEventLimit > 0) {
2033 0 : if (fEqStatEvents >= fEqConfEventLimit) {
2034 0 : if (!fMfe->fRunStopRequested) {
2035 0 : fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2036 : }
2037 0 : fMfe->fRunStopRequested = true;
2038 : }
2039 : }
2040 : }
2041 :
2042 0 : return TMFeOk();
2043 0 : }
2044 :
2045 0 : TMFeResult TMFeEquipment::EqSendEvent(const std::vector<char>& event, bool write_to_odb)
2046 : {
2047 0 : std::lock_guard<std::mutex> guard(fEqMutex);
2048 :
2049 0 : fEqSerial++;
2050 :
2051 0 : if (fEqEventBuffer == NULL) {
2052 0 : return TMFeOk();
2053 : }
2054 :
2055 0 : TMFeResult r = fEqEventBuffer->SendEvent(event);
2056 :
2057 0 : if (r.error_flag)
2058 0 : return r;
2059 :
2060 0 : fEqStatEvents += 1;
2061 0 : fEqStatBytes += event.size();
2062 :
2063 0 : if (fEqConfWriteEventsToOdb && write_to_odb) {
2064 0 : TMFeResult r = EqWriteEventToOdb_locked(event.data());
2065 0 : if (r.error_flag)
2066 0 : return r;
2067 0 : }
2068 :
2069 0 : if (fMfe->fStateRunning) {
2070 0 : if (fEqConfEventLimit > 0) {
2071 0 : if (fEqStatEvents >= fEqConfEventLimit) {
2072 0 : if (!fMfe->fRunStopRequested) {
2073 0 : fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2074 : }
2075 0 : fMfe->fRunStopRequested = true;
2076 : }
2077 : }
2078 : }
2079 :
2080 0 : return TMFeOk();
2081 0 : }
2082 :
2083 0 : TMFeResult TMFeEquipment::EqSendEvent(const std::vector<std::vector<char>>& event, bool write_to_odb)
2084 : {
2085 0 : std::lock_guard<std::mutex> guard(fEqMutex);
2086 :
2087 0 : fEqSerial++;
2088 :
2089 0 : if (fEqEventBuffer == NULL) {
2090 0 : return TMFeOk();
2091 : }
2092 :
2093 0 : TMFeResult r = fEqEventBuffer->SendEvent(event);
2094 :
2095 0 : if (r.error_flag)
2096 0 : return r;
2097 :
2098 0 : fEqStatEvents += 1;
2099 0 : for (auto v: event) {
2100 0 : fEqStatBytes += v.size();
2101 0 : }
2102 :
2103 : //if (fEqConfWriteEventsToOdb && write_to_odb) {
2104 : // TMFeResult r = EqWriteEventToOdb_locked(event.data());
2105 : // if (r.error_flag)
2106 : // return r;
2107 : //}
2108 :
2109 0 : if (fMfe->fStateRunning) {
2110 0 : if (fEqConfEventLimit > 0) {
2111 0 : if (fEqStatEvents >= fEqConfEventLimit) {
2112 0 : if (!fMfe->fRunStopRequested) {
2113 0 : fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2114 : }
2115 0 : fMfe->fRunStopRequested = true;
2116 : }
2117 : }
2118 : }
2119 :
2120 0 : return TMFeOk();
2121 0 : }
2122 :
2123 0 : TMFeResult TMFeEquipment::EqSendEvent(int sg_n, const char* sg_ptr[], const size_t sg_len[], bool write_to_odb)
2124 : {
2125 0 : std::lock_guard<std::mutex> guard(fEqMutex);
2126 :
2127 0 : fEqSerial++;
2128 :
2129 0 : if (fEqEventBuffer == NULL) {
2130 0 : return TMFeOk();
2131 : }
2132 :
2133 0 : TMFeResult r = fEqEventBuffer->SendEvent(sg_n, sg_ptr, sg_len);
2134 :
2135 0 : if (r.error_flag)
2136 0 : return r;
2137 :
2138 0 : fEqStatEvents += 1;
2139 0 : for (int i=0; i<sg_n; i++) {
2140 0 : fEqStatBytes += sg_len[i];
2141 : }
2142 :
2143 : //if (fEqConfWriteEventsToOdb && write_to_odb) {
2144 : // TMFeResult r = EqWriteEventToOdb_locked(event.data());
2145 : // if (r.error_flag)
2146 : // return r;
2147 : //}
2148 :
2149 0 : if (fMfe->fStateRunning) {
2150 0 : if (fEqConfEventLimit > 0) {
2151 0 : if (fEqStatEvents >= fEqConfEventLimit) {
2152 0 : if (!fMfe->fRunStopRequested) {
2153 0 : fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2154 : }
2155 0 : fMfe->fRunStopRequested = true;
2156 : }
2157 : }
2158 : }
2159 :
2160 0 : return TMFeOk();
2161 0 : }
2162 :
2163 0 : TMFeResult TMFeEquipment::EqWriteEventToOdb(const char* event)
2164 : {
2165 0 : std::lock_guard<std::mutex> guard(fEqMutex);
2166 0 : return EqWriteEventToOdb_locked(event);
2167 0 : }
2168 :
2169 0 : TMFeResult TMFeEquipment::EqWriteEventToOdb_locked(const char* event)
2170 : {
2171 0 : std::string path = "";
2172 0 : path += "/Equipment/";
2173 0 : path += fEqName;
2174 0 : path += "/Variables";
2175 :
2176 0 : HNDLE hKeyVar = 0;
2177 :
2178 0 : int status = db_find_key(fMfe->fDB, 0, path.c_str(), &hKeyVar);
2179 0 : if (status != DB_SUCCESS) {
2180 0 : return TMFeMidasError(msprintf("Cannot find \"%s\" in ODB", path.c_str()), "db_find_key", status);
2181 : }
2182 :
2183 0 : status = cm_write_event_to_odb(fMfe->fDB, hKeyVar, (const EVENT_HEADER*) event, FORMAT_MIDAS);
2184 0 : if (status != SUCCESS) {
2185 0 : return TMFeMidasError("Cannot write event to ODB", "cm_write_event_to_odb", status);
2186 : }
2187 0 : return TMFeOk();
2188 0 : }
2189 :
2190 0 : int TMFeEquipment::BkSize(const char* event) const
2191 : {
2192 0 : return bk_size(event + sizeof(EVENT_HEADER));
2193 : }
2194 :
2195 0 : TMFeResult TMFeEquipment::BkInit(char* event, size_t size) const
2196 : {
2197 0 : bk_init32a(event + sizeof(EVENT_HEADER));
2198 0 : return TMFeOk();
2199 : }
2200 :
2201 0 : void* TMFeEquipment::BkOpen(char* event, const char* name, int tid) const
2202 : {
2203 : void* ptr;
2204 0 : bk_create(event + sizeof(EVENT_HEADER), name, tid, &ptr);
2205 0 : return ptr;
2206 : }
2207 :
2208 0 : TMFeResult TMFeEquipment::BkClose(char* event, void* ptr) const
2209 : {
2210 0 : bk_close(event + sizeof(EVENT_HEADER), ptr);
2211 0 : ((EVENT_HEADER*)event)->data_size = BkSize(event);
2212 0 : return TMFeOk();
2213 : }
2214 :
2215 0 : TMFeResult TMFeEquipment::EqSetStatus(char const* eq_status, char const* eq_color)
2216 : {
2217 0 : if (eq_status) {
2218 0 : fOdbEqCommon->WS("Status", eq_status, 256);
2219 : }
2220 :
2221 0 : if (eq_color) {
2222 0 : fOdbEqCommon->WS("Status color", eq_color, NAME_LENGTH);
2223 : }
2224 :
2225 0 : return TMFeOk();
2226 : }
2227 :
2228 0 : TMFeResult TMFE::TriggerAlarm(const char* name, const char* message, const char* aclass)
2229 : {
2230 0 : int status = al_trigger_alarm(name, message, aclass, message, AT_INTERNAL);
2231 :
2232 0 : if (status) {
2233 0 : return TMFeMidasError("Cannot trigger alarm", "al_trigger_alarm", status);
2234 : }
2235 :
2236 0 : return TMFeOk();
2237 : }
2238 :
2239 0 : TMFeResult TMFE::ResetAlarm(const char* name)
2240 : {
2241 0 : int status = al_reset_alarm(name);
2242 :
2243 0 : if (status) {
2244 0 : return TMFeMidasError("Cannot reset alarm", "al_reset_alarm", status);
2245 : }
2246 :
2247 0 : return TMFeOk();
2248 : }
2249 :
2250 0 : void TMFrontend::FeUsage(const char* argv0)
2251 : {
2252 0 : fprintf(stderr, "\n");
2253 0 : fprintf(stderr, "Usage: %s args... [-- equipment args...]\n", argv0);
2254 0 : fprintf(stderr, "\n");
2255 0 : fprintf(stderr, " --help -- print this help message\n");
2256 0 : fprintf(stderr, " -h -- print this help message\n");
2257 0 : fprintf(stderr, " -v -- report all activities\n");
2258 0 : fprintf(stderr, "\n");
2259 0 : fprintf(stderr, " -h hostname[:tcpport] -- connect to MIDAS mserver on given host and tcp port number\n");
2260 0 : fprintf(stderr, " -e exptname -- connect to given MIDAS experiment\n");
2261 0 : fprintf(stderr, "\n");
2262 0 : fprintf(stderr, " -D -- Become a daemon\n");
2263 0 : fprintf(stderr, " -O -- Become a daemon but keep stdout for saving in a log file: frontend -O >file.log 2>&1\n");
2264 0 : fprintf(stderr, "\n");
2265 0 : fprintf(stderr, " -i NNN -- Set frontend index number\n");
2266 0 : fprintf(stderr, "\n");
2267 :
2268 : // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleUsage() modifies fEquipments. K.O.
2269 0 : for (unsigned i=0; i<fFeEquipments.size(); i++) {
2270 0 : if (!fFeEquipments[i])
2271 0 : continue;
2272 0 : fprintf(stderr, "Usage of equipment \"%s\":\n", fFeEquipments[i]->fEqName.c_str());
2273 0 : fprintf(stderr, "\n");
2274 0 : fFeEquipments[i]->HandleUsage();
2275 0 : fprintf(stderr, "\n");
2276 : }
2277 0 : }
2278 :
2279 0 : int TMFrontend::FeMain(int argc, char* argv[])
2280 : {
2281 0 : std::vector<std::string> args;
2282 0 : for (int i=0; i<argc; i++) {
2283 0 : args.push_back(argv[i]);
2284 : }
2285 :
2286 0 : return FeMain(args);
2287 0 : }
2288 :
2289 0 : TMFeResult TMFrontend::FeInit(const std::vector<std::string> &args)
2290 : {
2291 0 : setbuf(stdout, NULL);
2292 0 : setbuf(stderr, NULL);
2293 :
2294 0 : signal(SIGPIPE, SIG_IGN);
2295 :
2296 0 : std::vector<std::string> eq_args;
2297 :
2298 0 : bool help = false;
2299 0 : std::string exptname;
2300 0 : std::string hostname;
2301 0 : bool daemon0 = false;
2302 0 : bool daemon1 = false;
2303 :
2304 0 : for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
2305 : //printf("argv[%d] is %s\n", i, args[i].c_str());
2306 0 : if (args[i] == "--") {
2307 : // remaining arguments are passed to equipment Init()
2308 0 : for (unsigned j=i+1; j<args.size(); j++)
2309 0 : eq_args.push_back(args[j]);
2310 0 : break;
2311 0 : } else if (args[i] == "-v") {
2312 0 : TMFE::gfVerbose = true;
2313 0 : } else if (args[i] == "-D") {
2314 0 : daemon0 = true;
2315 0 : } else if (args[i] == "-O") {
2316 0 : daemon1 = true;
2317 0 : } else if (args[i] == "-h") {
2318 0 : i++;
2319 0 : if (i >= args.size()) { help = true; break; }
2320 0 : hostname = args[i];
2321 0 : } else if (args[i] == "-e") {
2322 0 : i++;
2323 0 : if (i >= args.size()) { help = true; break; }
2324 0 : exptname = args[i];
2325 0 : } else if (args[i] == "-i") {
2326 0 : i++;
2327 0 : if (i >= args.size()) { help = true; break; }
2328 0 : fFeIndex = atoi(args[i].c_str());
2329 0 : } else if (args[i] == "--help") {
2330 0 : help = true;
2331 0 : break;
2332 0 : } else if (args[i][0] == '-') {
2333 0 : help = true;
2334 0 : break;
2335 : } else {
2336 0 : help = true;
2337 0 : break;
2338 : }
2339 : }
2340 :
2341 : //
2342 : // daemonize...
2343 : //
2344 :
2345 0 : if (daemon0) {
2346 0 : printf("Becoming a daemon...\n");
2347 0 : ss_daemon_init(FALSE);
2348 0 : } else if (daemon1) {
2349 0 : printf("Becoming a daemon...\n");
2350 0 : ss_daemon_init(TRUE);
2351 : }
2352 :
2353 : //
2354 : // apply frontend index to indexed frontend
2355 : //
2356 :
2357 0 : if (fMfe->fProgramName.find("%") != std::string::npos) {
2358 0 : fMfe->fProgramName = msprintf(fMfe->fProgramName.c_str(), fFeIndex);
2359 : }
2360 :
2361 0 : TMFeResult r;
2362 :
2363 : // call arguments handler before calling the usage handlers. Otherwise,
2364 : // if the arguments handler creates new equipments,
2365 : // we will never see their Usage(). K.O.
2366 0 : r = HandleArguments(eq_args);
2367 :
2368 0 : if (r.error_flag) {
2369 0 : fprintf(stderr, "Fatal error: arguments handler error: %s, bye.\n", r.error_message.c_str());
2370 0 : fMfe->Disconnect();
2371 0 : exit(1);
2372 : }
2373 :
2374 0 : if (help) {
2375 0 : FeUsage(args[0].c_str());
2376 0 : HandleUsage();
2377 0 : fMfe->Disconnect();
2378 0 : exit(1);
2379 : }
2380 :
2381 0 : r = fMfe->Connect(NULL, hostname.c_str(), exptname.c_str());
2382 :
2383 0 : if (r.error_flag) {
2384 0 : fprintf(stderr, "Fatal error: cannot connect to MIDAS, error: %s, bye.\n", r.error_message.c_str());
2385 0 : fMfe->Disconnect();
2386 0 : exit(1);
2387 : }
2388 :
2389 0 : r = HandleFrontendInit(eq_args);
2390 :
2391 0 : if (r.error_flag) {
2392 0 : fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
2393 0 : fMfe->Disconnect();
2394 0 : exit(1);
2395 : }
2396 :
2397 0 : fFeRpcHelper = new TMFrontendRpcHelper(this);
2398 0 : fMfe->AddRpcHandler(fFeRpcHelper);
2399 :
2400 : //mfe->SetWatchdogSec(0);
2401 : //mfe->SetTransitionSequenceStart(910);
2402 : //mfe->SetTransitionSequenceStop(90);
2403 : //mfe->DeregisterTransitionPause();
2404 : //mfe->DeregisterTransitionResume();
2405 : //mfe->RegisterTransitionStartAbort();
2406 :
2407 0 : r = FeInitEquipments(eq_args);
2408 :
2409 0 : if (r.error_flag) {
2410 0 : fprintf(stderr, "Cannot initialize equipments, error message: %s, bye.\n", r.error_message.c_str());
2411 0 : fMfe->Disconnect();
2412 0 : exit(1);
2413 : }
2414 :
2415 0 : r = HandleFrontendReady(eq_args);
2416 :
2417 0 : if (r.error_flag) {
2418 0 : fprintf(stderr, "Fatal error: frontend post-init error: %s, bye.\n", r.error_message.c_str());
2419 0 : fMfe->Disconnect();
2420 0 : exit(1);
2421 : }
2422 :
2423 0 : if (fMfe->fStateRunning) {
2424 0 : if (fFeIfRunningCallExit) {
2425 0 : fprintf(stderr, "Fatal error: Cannot start frontend, run is in progress!\n");
2426 0 : fMfe->Disconnect();
2427 0 : exit(1);
2428 0 : } else if (fFeIfRunningCallBeginRun) {
2429 : char errstr[TRANSITION_ERROR_STRING_LENGTH];
2430 0 : tr_start(fMfe->fRunNumber, errstr);
2431 : }
2432 : }
2433 :
2434 0 : return TMFeOk();
2435 0 : }
2436 :
2437 0 : void TMFrontend::FeMainLoop()
2438 : {
2439 0 : while (!fMfe->fShutdownRequested) {
2440 0 : FePollMidas(0.100);
2441 : }
2442 0 : }
2443 :
2444 0 : void TMFrontend::FeShutdown()
2445 : {
2446 0 : fMfe->StopRpcThread();
2447 0 : FeStopPeriodicThread();
2448 0 : FeStopEquipmentPollThreads();
2449 0 : HandleFrontendExit();
2450 0 : FeDeleteEquipments();
2451 0 : fMfe->Disconnect();
2452 0 : }
2453 :
2454 0 : int TMFrontend::FeMain(const std::vector<std::string> &args)
2455 : {
2456 0 : TMFeResult r = FeInit(args);
2457 :
2458 0 : if (r.error_flag) {
2459 0 : fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
2460 0 : fMfe->Disconnect();
2461 0 : exit(1);
2462 : }
2463 :
2464 0 : FeMainLoop();
2465 0 : FeShutdown();
2466 :
2467 0 : return 0;
2468 0 : }
2469 :
2470 : // singleton instance
2471 : TMFE* TMFE::gfMFE = NULL;
2472 :
2473 : // static data members
2474 : bool TMFE::gfVerbose = false;
2475 :
2476 : /* emacs
2477 : * Local Variables:
2478 : * tab-width: 8
2479 : * c-basic-offset: 3
2480 : * indent-tabs-mode: nil
2481 : * End:
2482 : */
|