MIDAS
Loading...
Searching...
No Matches
tmfe.cxx
Go to the documentation of this file.
1/********************************************************************\
2
3 Name: tmfe.cxx
4 Created by: Konstantin Olchanski - TRIUMF
5
6 Contents: C++ MIDAS frontend
7
8\********************************************************************/
9
10#undef NDEBUG // midas required assert() to be always enabled
11
12#include <stdio.h>
13#include <stdarg.h>
14#include <assert.h>
15#include <signal.h> // signal()
16#include <sys/time.h> // gettimeofday()
17
18#include "tmfe.h"
19
20#include "midas.h"
21#include "msystem.h"
22#include "mrpc.h"
23#include "mstrlcpy.h"
24
26// error handling
28
30{
31 return TMFeResult(0, message);
32}
33
34TMFeResult TMFeMidasError(const std::string& message, const char* midas_function_name, int midas_status)
35{
36 return TMFeResult(midas_status, message + msprintf(", %s() status %d", midas_function_name, midas_status));
37}
38
40// TMFE singleton class
42
43
44TMFE::TMFE() // ctor
45{
46 if (gfVerbose)
47 printf("TMFE::ctor!\n");
48}
49
50TMFE::~TMFE() // dtor
51{
52 if (gfVerbose)
53 printf("TMFE::dtor!\n");
54 assert(!"TMFE::~TMFE(): destruction of the TMFE singleton is not permitted!");
55}
56
58{
59 if (!gfMFE)
60 gfMFE = new TMFE();
61
62 return gfMFE;
63}
64
65TMFeResult TMFE::Connect(const char* progname, const char* hostname, const char* exptname)
66{
67 if (progname)
68 fProgramName = progname;
69
70 if (fProgramName.empty()) {
71 return TMFeErrorMessage("TMFE::Connect: frontend program name is not set");
72 }
73
75
76 int status;
77
78 std::string env_hostname;
79 std::string env_exptname;
80
81 /* get default from environment */
82 status = cm_get_environment(&env_hostname, &env_exptname);
83
84 if (status != CM_SUCCESS) {
85 return TMFeMidasError("Cannot connect to MIDAS", "cm_get_environment", status);
86 }
87
88 if (hostname && hostname[0]) {
89 fMserverHostname = hostname;
90 } else {
91 fMserverHostname = env_hostname;
92 }
93
94 if (exptname && exptname[0]) {
95 fExptname = exptname;
96 } else {
97 fExptname = env_exptname;
98 }
99
100 if (gfVerbose) {
101 printf("TMFE::Connect: Program \"%s\" connecting to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
102 }
103
104 int watchdog = DEFAULT_WATCHDOG_TIMEOUT;
105 //int watchdog = 60*1000;
106
107 status = cm_connect_experiment1(fMserverHostname.c_str(), fExptname.c_str(), fProgramName.c_str(), NULL, DEFAULT_ODB_SIZE, watchdog);
108
109 if (status == CM_UNDEF_EXP) {
110 return TMFeMidasError(msprintf("Cannot connect to MIDAS, experiment \"%s\" is not defined", fExptname.c_str()), "cm_connect_experiment1", status);
111 } else if (status != CM_SUCCESS) {
112 return TMFeMidasError("Cannot connect to MIDAS", "cm_connect_experiment1", status);
113 }
114
116 if (status != CM_SUCCESS) {
117 return TMFeMidasError("Cannot connect to MIDAS", "cm_get_experiment_database", status);
118 }
119
120 fOdbRoot = MakeMidasOdb(fDB);
121
122 int run_state = 0;
123
124 fOdbRoot->RI("Runinfo/state", &run_state);
125 fOdbRoot->RI("Runinfo/run number", &fRunNumber);
126
127 if (run_state == STATE_RUNNING) {
128 fStateRunning = true;
129 } else if (run_state == STATE_PAUSED) {
130 fStateRunning = true;
131 } else {
132 fStateRunning = false;
133 }
134
135 RegisterRPCs();
136
137 if (gfVerbose) {
138 printf("TMFE::Connect: Program \"%s\" connected to experiment \"%s\" on host \"%s\"\n", fProgramName.c_str(), fExptname.c_str(), fMserverHostname.c_str());
139 }
140
141 return TMFeOk();
142}
143
145{
146 if (sec == 0) {
147 cm_set_watchdog_params(false, 0);
148 } else {
149 cm_set_watchdog_params(true, sec*1000);
150 }
151 return TMFeOk();
152}
153
155{
156 if (gfVerbose)
157 printf("TMFE::Disconnect: Disconnecting from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
160 if (gfVerbose)
161 printf("TMFE::Disconnect: Disconnected from experiment \"%s\" on host \"%s\"\n", fExptname.c_str(), fMserverHostname.c_str());
162 return TMFeOk();
163}
164
166// event buffer functions
168
170{
171 assert(mfe != NULL);
172 fMfe = mfe;
173};
174
176{
177 CloseBuffer();
178
179 // poison all pointers
180 fMfe = NULL;
181};
182
183TMFeResult TMEventBuffer::OpenBuffer(const char* bufname, size_t bufsize)
184{
185 if (fBufHandle) {
186 return TMFeErrorMessage(msprintf("Event buffer \"%s\" is already open", fBufName.c_str()));
187 }
188
189 fBufName = bufname;
190
191 if (bufsize == 0)
192 bufsize = DEFAULT_BUFFER_SIZE;
193
194 int status = bm_open_buffer(fBufName.c_str(), bufsize, &fBufHandle);
195
196 if (status != BM_SUCCESS && status != BM_CREATED) {
197 return TMFeMidasError(msprintf("Cannot open event buffer \"%s\"", fBufName.c_str()), "bm_open_buffer", status);
198 }
199
200 fBufSize = 0;
202
203 uint32_t buf_size = 0;
204 uint32_t max_event_size = 0;
205
206 fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &max_event_size);
207 fMfe->fOdbRoot->RU32((std::string("Experiment/Buffer Sizes/") + bufname).c_str(), &buf_size);
208
209 if (buf_size > 0) {
210 // limit event size to half the buffer size, so we can buffer two events
211 uint32_t xmax_event_size = buf_size / 2;
212 // add extra margin
213 if (xmax_event_size > 1024)
214 xmax_event_size -= 1024;
215 if (max_event_size > xmax_event_size)
216 max_event_size = xmax_event_size;
217 }
218
219 fBufSize = buf_size;
221
222 if (fBufSize == 0) {
223 return TMFeErrorMessage(msprintf("Cannot get buffer size for event buffer \"%s\"", fBufName.c_str()));
224 }
225
226 if (fBufMaxEventSize == 0) {
227 return TMFeErrorMessage(msprintf("Cannot get MAX_EVENT_SIZE for event buffer \"%s\"", fBufName.c_str()));
228 }
229
230 printf("TMEventBuffer::OpenBuffer: Buffer \"%s\" size %d, max event size %d\n", fBufName.c_str(), (int)fBufSize, (int)fBufMaxEventSize);
231
232 return TMFeOk();
233}
234
236{
237 if (!fBufHandle)
238 return TMFeOk();
239
240 fBufRequests.clear(); // no need to cancel individual requests, they are gone after we close the buffer
241
243
244 if (status != BM_SUCCESS) {
245 fBufHandle = 0;
246 return TMFeMidasError(msprintf("Cannot close event buffer \"%s\"", fBufName.c_str()), "bm_close_buffer", status);
247 }
248
249 fBufHandle = 0;
250 fBufSize = 0;
254
255 return TMFeOk();
256}
257
258TMFeResult TMEventBuffer::SetCacheSize(size_t read_cache_size, size_t write_cache_size)
259{
260 int status = bm_set_cache_size(fBufHandle, read_cache_size, write_cache_size);
261
262 if (status != BM_SUCCESS) {
263 return TMFeMidasError(msprintf("Cannot set event buffer \"%s\" cache sizes: read %d, write %d", fBufName.c_str(), (int)read_cache_size, (int)write_cache_size), "bm_set_cache_size", status);
264 }
265
266 fBufReadCacheSize = read_cache_size;
267 fBufWriteCacheSize = write_cache_size;
268
269 return TMFeOk();
270}
271
272TMFeResult TMEventBuffer::AddRequest(int event_id, int trigger_mask, const char* sampling_type_string)
273{
274 if (!fBufHandle) {
275 return TMFeErrorMessage(msprintf("AddRequest: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
276 }
277
278 int sampling_type = 0;
279
280 if (strcmp(sampling_type_string, "GET_ALL")==0) {
281 sampling_type = GET_ALL;
282 } else if (strcmp(sampling_type_string, "GET_NONBLOCKING")==0) {
283 sampling_type = GET_NONBLOCKING;
284 } else if (strcmp(sampling_type_string, "GET_RECENT")==0) {
285 sampling_type = GET_RECENT;
286 } else {
287 sampling_type = GET_ALL;
288 }
289
290 int request_id = 0;
291
292 int status = bm_request_event(fBufHandle, event_id, trigger_mask, sampling_type, &request_id, NULL);
293
294 if (status != BM_SUCCESS) {
295 return TMFeMidasError(msprintf("Cannot make event request on buffer \"%s\"", fBufName.c_str()), "bm_request_event", status);
296 }
297
298 fBufRequests.push_back(request_id);
299
300 return TMFeOk();
301}
302
303TMFeResult TMEventBuffer::ReceiveEvent(std::vector<char> *e, int timeout_msec)
304{
305 if (!fBufHandle) {
306 return TMFeErrorMessage(msprintf("ReceiveEvent: Error: Event buffer \"%s\" is not open", fBufName.c_str()));
307 }
308
309 assert(e != NULL);
310
311 e->resize(0);
312
313 int status = bm_receive_event_vec(fBufHandle, e, timeout_msec);
314
315 if (status == BM_ASYNC_RETURN) {
316 return TMFeOk();
317 }
318
319 if (status != BM_SUCCESS) {
320 return TMFeMidasError(msprintf("Cannot receive event on buffer \"%s\"", fBufName.c_str()), "bm_receive_event", status);
321 }
322
323 return TMFeOk();
324}
325
327{
328 const EVENT_HEADER *pevent = (const EVENT_HEADER*)e;
329 const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
330 //const size_t total_size = ALIGN8(event_size);
331 return SendEvent(1, &e, &event_size);
332}
333
334TMFeResult TMEventBuffer::SendEvent(const std::vector<char>& e)
335{
336 const EVENT_HEADER *pevent = (const EVENT_HEADER*)e.data();
337 const size_t event_size = sizeof(EVENT_HEADER) + pevent->data_size;
338 //const size_t total_size = ALIGN8(event_size);
339 if (e.size() != event_size) {
340 return TMFeErrorMessage(msprintf("Cannot send event, size mismatch: vector size %d, data_size %d, event_size %d", (int)e.size(), (int)pevent->data_size, (int)event_size).c_str());
341 }
342
343 return SendEvent(1, (char**)&pevent, &event_size);
344}
345
346TMFeResult TMEventBuffer::SendEvent(const std::vector<std::vector<char>>& e)
347{
348 int sg_n = e.size();
349 const char* sg_ptr[sg_n];
350 size_t sg_len[sg_n];
351 for (int i=0; i<sg_n; i++) {
352 sg_ptr[i] = e[i].data();
353 sg_len[i] = e[i].size();
354 }
355 return SendEvent(sg_n, sg_ptr, sg_len);
356}
357
358TMFeResult TMEventBuffer::SendEvent(int sg_n, const char* const sg_ptr[], const size_t sg_len[])
359{
360 int status = bm_send_event_sg(fBufHandle, sg_n, sg_ptr, sg_len, BM_WAIT);
361
362 if (status == BM_CORRUPTED) {
363 fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d, event buffer is corrupted, shutting down the frontend", fBufName.c_str(), status);
364 fMfe->fShutdownRequested = true;
365 return TMFeMidasError("Cannot send event, event buffer is corrupted, shutting down the frontend", "bm_send_event", status);
366 } else if (status != BM_SUCCESS) {
367 fMfe->Msg(MERROR, "TMEventBuffer::SendEvent", "Cannot send event to buffer \"%s\": bm_send_event() returned %d", fBufName.c_str(), status);
368 return TMFeMidasError("Cannot send event", "bm_send_event", status);
369 }
370
371 return TMFeOk();
372}
373
375{
376 if (!fBufHandle)
377 return TMFeOk();
378
379 int flag = BM_NO_WAIT;
380 if (wait)
381 flag = BM_WAIT;
382
383 /* flush of event socket in no-wait mode does nothing */
384 if (wait && rpc_is_remote()) {
385 int status = bm_flush_cache(0, flag);
386
387 //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
388
389 if (status == BM_SUCCESS) {
390 // nothing
391 } else if (status == BM_ASYNC_RETURN) {
392 // nothing
393 } else {
394 return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
395 }
396 }
397
398 int status = bm_flush_cache(fBufHandle, flag);
399
400 //printf("bm_flush_cache(%d,%d) status %d\n", fBufHandle, flag, status);
401
402 if (status == BM_SUCCESS) {
403 // nothing
404 } else if (status == BM_ASYNC_RETURN) {
405 // nothing
406 } else {
407 return TMFeMidasError(msprintf("Cannot flush event buffer \"%s\"", fBufName.c_str()).c_str(), "bm_flush_cache", status);
408 }
409
410 return TMFeOk();
411}
412
413TMFeResult TMFE::EventBufferOpen(TMEventBuffer** pbuf, const char* bufname, size_t bufsize)
414{
415 assert(pbuf != NULL);
416 assert(bufname != NULL);
417
418 std::lock_guard<std::mutex> guard(fEventBuffersMutex);
419
420 for (auto b : fEventBuffers) {
421 if (!b)
422 continue;
423
424 if (b->fBufName == bufname) {
425 *pbuf = b;
426 if (bufsize != 0 && bufsize > b->fBufSize) {
427 Msg(MERROR, "TMFE::EventBufferOpen", "Event buffer \"%s\" size %d is smaller than requested size %d", b->fBufName.c_str(), (int)b->fBufSize, (int)bufsize);
428 }
429 return TMFeOk();
430 }
431 }
432
433 TMEventBuffer *b = new TMEventBuffer(this);
434
435 fEventBuffers.push_back(b);
436
437 *pbuf = b;
438
439 TMFeResult r = b->OpenBuffer(bufname, bufsize);
440
441 if (r.error_flag) {
442 return r;
443 }
444
445 return TMFeOk();
446}
447
449{
450 int flag = BM_NO_WAIT;
451 if (wait)
452 flag = BM_WAIT;
453
454 /* flush of event socket in no-wait mode does nothing */
455 if (wait && rpc_is_remote()) {
456 int status = bm_flush_cache(0, flag);
457
458 //printf("bm_flush_cache(0,%d) status %d\n", flag, status);
459
460 if (status == BM_SUCCESS) {
461 // nothing
462 } else if (status == BM_ASYNC_RETURN) {
463 // nothing
464 } else {
465 return TMFeMidasError("Cannot flush mserver event socket", "bm_flush_cache", status);
466 }
467 }
468
469 std::lock_guard<std::mutex> guard(fEventBuffersMutex);
470
471 for (auto b : fEventBuffers) {
472 if (!b)
473 continue;
474
475 TMFeResult r = b->FlushCache(wait);
476
477 if (r.error_flag)
478 return r;
479 }
480
481 return TMFeOk();
482}
483
485{
486 std::lock_guard<std::mutex> guard(fEventBuffersMutex);
487
488 for (auto b : fEventBuffers) {
489 if (!b)
490 continue;
491 TMFeResult r = b->CloseBuffer();
492 if (r.error_flag)
493 return r;
494
495 delete b;
496 }
497
498 fEventBuffers.clear();
499
500 return TMFeOk();
501}
502
504// equipment functions
506
508{
509 double now = TMFE::GetTime();
510
511 double next_periodic = now + 60;
512
513 int n = fFeEquipments.size();
514 for (int i=0; i<n; i++) {
516 if (!eq)
517 continue;
518 if (!eq->fEqConfEnabled)
519 continue;
520 if (!eq->fEqConfEnablePeriodic)
521 continue;
522 double period = eq->fEqConfPeriodMilliSec/1000.0;
523 if (period <= 0)
524 continue;
525 if (eq->fEqPeriodicNextCallTime == 0)
526 eq->fEqPeriodicNextCallTime = now + 0.5; // we are off by 0.5 sec with updating of statistics
527 //printf("periodic[%d] period %f, last call %f, next call %f (%f)\n", i, period, eq->fEqPeriodicLastCallTime, eq->fEqPeriodicNextCallTime, now - eq->fEqPeriodicNextCallTime);
528 if (now >= eq->fEqPeriodicNextCallTime) {
529 eq->fEqPeriodicNextCallTime += period;
530
531 if (eq->fEqPeriodicNextCallTime < now) {
532 if (TMFE::gfVerbose)
533 printf("TMFE::EquipmentPeriodicTasks: periodic equipment \"%s\" skipped some beats!\n", eq->fEqName.c_str());
534 fMfe->Msg(MERROR, "TMFE::EquipmentPeriodicTasks", "Equipment \"%s\" skipped some beats!", eq->fEqName.c_str());
535 while (eq->fEqPeriodicNextCallTime < now) {
536 eq->fEqPeriodicNextCallTime += period;
537 }
538 }
539
540 if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
541 eq->fEqPeriodicLastCallTime = now;
542 //printf("handler %d eq [%s] call HandlePeriodic()\n", i, h->fEq->fName.c_str());
543 eq->HandlePeriodic();
544 }
545
546 now = TMFE::GetTime();
547 }
548
549 if (eq->fEqPeriodicNextCallTime < next_periodic)
550 next_periodic = eq->fEqPeriodicNextCallTime;
551 }
552
553 now = TMFE::GetTime();
554
555 // update statistics
556 for (auto eq : fFeEquipments) {
557 if (!eq)
558 continue;
559 if (!eq->fEqConfEnabled)
560 continue;
561 double next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
562 if (now > next) {
563 eq->EqWriteStatistics();
564 next = eq->fEqStatNextWrite; // NOTE: this is not thread-safe, possible torn read of "double"
565 }
566 if (next < next_periodic)
567 next_periodic = next;
568 }
569
570 now = TMFE::GetTime();
571
572 // flush write cache
576 if (fFeFlushWriteCacheNextCallTime < next_periodic)
577 next_periodic = fFeFlushWriteCacheNextCallTime;
578 }
579
580 return next_periodic;
581}
582
583double TMFrontend::FePollTasks(double next_periodic_time)
584{
585 //printf("poll %f next %f diff %f\n", TMFE::GetTime(), next_periodic_time, next_periodic_time - TMFE::GetTime());
586
587 double poll_sleep_sec = 9999.0;
588 while (!fMfe->fShutdownRequested) {
589 bool poll_again = false;
590 // NOTE: ok to use range-based for() loop, there will be a crash if HandlePoll() or HandlePollRead() modify fEquipments, so they should not do that. K.O.
591 for (auto eq : fFeEquipments) {
592 if (!eq)
593 continue;
594 if (!eq->fEqConfEnabled)
595 continue;
596 if (eq->fEqConfEnablePoll && !eq->fEqPollThreadRunning && !eq->fEqPollThreadStarting) {
597 if (fMfe->fStateRunning || !eq->fEqConfReadOnlyWhenRunning) {
598 if (eq->fEqConfPollSleepSec < poll_sleep_sec)
599 poll_sleep_sec = eq->fEqConfPollSleepSec;
600 bool poll = eq->HandlePoll();
601 if (poll) {
602 poll_again = true;
603 eq->HandlePollRead();
604 }
605 }
606 }
607 }
608 if (!poll_again)
609 break;
610
611 if (next_periodic_time) {
612 // stop polling if we need to run periodic activity
613 double now = TMFE::TMFE::GetTime();
614 if (now >= next_periodic_time)
615 break;
616 }
617 }
618 return poll_sleep_sec;
619}
620
622{
623 if (TMFE::gfVerbose)
624 printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread started\n", fEqName.c_str());
625
627
630 bool poll = HandlePoll();
631 if (poll) {
633 } else {
634 if (fEqConfPollSleepSec > 0) {
636 }
637 }
638 } else {
639 TMFE::Sleep(0.1);
640 }
641 }
642 if (TMFE::gfVerbose)
643 printf("TMFeEquipment::EqPollThread: equipment \"%s\" poll thread stopped\n", fEqName.c_str());
644
645 fEqPollThreadRunning = false;
646}
647
649{
650 // NOTE: this is thread safe
651
652 std::lock_guard<std::mutex> guard(fEqMutex);
653
655 fMfe->Msg(MERROR, "TMFeEquipment::EqStartPollThread", "Equipment \"%s\": poll thread is already running", fEqName.c_str());
656 return;
657 }
658
661
662 fEqPollThread = new std::thread(&TMFeEquipment::EqPollThread, this);
663}
664
666{
667 // NOTE: this is thread safe
668 fEqPollThreadStarting = false;
670 for (int i=0; i<100; i++) {
672 std::lock_guard<std::mutex> guard(fEqMutex);
673 if (fEqPollThread) {
674 fEqPollThread->join();
675 delete fEqPollThread;
676 fEqPollThread = NULL;
677 }
678 return;
679 }
680 TMFE::Sleep(0.1);
681 }
683 fMfe->Msg(MERROR, "TMFeEquipment::EqStopPollThread", "Equipment \"%s\": timeout waiting for shutdown of poll thread", fEqName.c_str());
684 }
685}
686
688{
690
691 int status = cm_transition(TR_STOP, 0, str, sizeof(str), TR_SYNC, FALSE);
692 if (status != CM_SUCCESS) {
693 Msg(MERROR, "TMFE::StopRun", "Cannot stop run, error: %s", str);
694 fRunStopRequested = false;
695 return;
696 }
697
698 fRunStopRequested = false;
699
700 bool logger_auto_restart = false;
701 fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
702
703 int logger_auto_restart_delay = 0;
704 fOdbRoot->RI("Logger/Auto restart delay", &logger_auto_restart_delay);
705
706 if (logger_auto_restart) {
707 Msg(MINFO, "TMFE::StopRun", "Run will restart after %d seconds", logger_auto_restart_delay);
708 fRunStartTime = GetTime() + logger_auto_restart_delay;
709 } else {
710 fRunStartTime = 0;
711 }
712}
713
715{
716 fRunStartTime = 0;
717
718 /* check if really stopped */
719 int run_state = 0;
720 fOdbRoot->RI("Runinfo/State", &run_state);
721
722 if (run_state != STATE_STOPPED) {
723 Msg(MERROR, "TMFE::StartRun", "Run start requested, but run is already in progress");
724 return;
725 }
726
727 bool logger_auto_restart = false;
728 fOdbRoot->RB("Logger/Auto restart", &logger_auto_restart);
729
730 if (!logger_auto_restart) {
731 Msg(MERROR, "TMFE::StartRun", "Run start requested, but logger/auto restart is off");
732 return;
733 }
734
735 Msg(MTALK, "TMFE::StartRun", "Starting new run");
736
738
739 int status = cm_transition(TR_START, 0, str, sizeof(str), TR_SYNC, FALSE);
740 if (status != CM_SUCCESS) {
741 Msg(MERROR, "TMFE::StartRun", "Cannot restart run, error: %s", str);
742 }
743}
744
745void TMFrontend::FePollMidas(double sleep_sec)
746{
747 assert(sleep_sec >= 0);
748 bool debug = false;
749 double now = TMFE::GetTime();
750 double sleep_start = now;
751 double sleep_end = now + sleep_sec;
752 int count_yield_loops = 0;
753
754 while (!fMfe->fShutdownRequested) {
755 double next_periodic_time = 0;
756 double poll_sleep = 1.0;
757
759 next_periodic_time = FePeriodicTasks();
760 poll_sleep = FePollTasks(next_periodic_time);
761 } else {
762 poll_sleep = FePollTasks(TMFE::GetTime() + 0.100);
763 }
764
765 if (fMfe->fRunStopRequested) {
766 fMfe->StopRun();
767 continue;
768 }
769
770 now = TMFE::GetTime();
771
772 if (fMfe->fRunStartTime && now >= fMfe->fRunStartTime) {
773 fMfe->StartRun();
774 continue;
775 }
776
777 double sleep_time = sleep_end - now;
778
779 if (next_periodic_time > 0 && next_periodic_time < sleep_end) {
780 sleep_time = next_periodic_time - now;
781 }
782
783 int s = 0;
784 if (sleep_time > 0)
785 s = 1 + sleep_time*1000.0;
786
787 if (poll_sleep*1000.0 < s) {
788 s = 0;
789 }
790
791 if (debug) {
792 printf("now %.6f, sleep_end %.6f, next_periodic %.6f, sleep_time %.6f, cm_yield(%d), poll period %.6f\n", now, sleep_end, next_periodic_time, sleep_time, s, poll_sleep);
793 }
794
795 int status = cm_yield(s);
796
797 if (status == RPC_SHUTDOWN || status == SS_ABORT) {
798 fMfe->fShutdownRequested = true;
799 if (TMFE::gfVerbose) {
800 fprintf(stderr, "TMFE::PollMidas: cm_yield(%d) status %d, shutdown requested...\n", s, status);
801 }
802 }
803
804 now = TMFE::GetTime();
805 double sleep_more = sleep_end - now;
806 if (sleep_more <= 0)
807 break;
808
809 count_yield_loops++;
810
811 if (poll_sleep < sleep_more) {
812 TMFE::Sleep(poll_sleep);
813 }
814 }
815
816 if (debug) {
817 printf("TMFE::PollMidas: sleep %.1f msec, actual %.1f msec, %d loops\n", sleep_sec * 1000.0, (now - sleep_start) * 1000.0, count_yield_loops);
818 }
819}
820
821void TMFE::Yield(double sleep_sec)
822{
823 double now = GetTime();
824 //double sleep_start = now;
825 double sleep_end = now + sleep_sec;
826
827 while (!fShutdownRequested) {
828 now = GetTime();
829
830 double sleep_time = sleep_end - now;
831 int s = 0;
832 if (sleep_time > 0)
833 s = 1 + sleep_time*1000.0;
834
835 //printf("TMFE::Yield: now %f, sleep_end %f, s %d\n", now, sleep_end, s);
836
837 int status = cm_yield(s);
838
839 if (status == RPC_SHUTDOWN || status == SS_ABORT) {
840 fShutdownRequested = true;
841 fprintf(stderr, "TMFE::Yield: cm_yield(%d) status %d, shutdown requested...\n", s, status);
842 }
843
844 now = GetTime();
845 if (now >= sleep_end)
846 break;
847 }
848
849 //printf("TMFE::Yield: sleep_sec %.6f, actual %.6f sec\n", sleep_sec, now - sleep_start);
850}
851
856
858{
859 if (TMFE::gfVerbose)
860 printf("TMFE::RpcThread: RPC thread started\n");
861
862 int msec = 1000;
863
864 fRpcThreadRunning = true;
866
868
869 int status = cm_yield(msec);
870
871 if (status == RPC_SHUTDOWN || status == SS_ABORT) {
872 fShutdownRequested = true;
873 if (TMFE::gfVerbose)
874 printf("TMFE::RpcThread: cm_yield(%d) status %d, shutdown requested...\n", msec, status);
875 }
876 }
878 if (TMFE::gfVerbose)
879 printf("TMFE::RpcThread: RPC thread stopped\n");
880 fRpcThreadRunning = false;
881}
882
884{
885 if (TMFE::gfVerbose)
886 printf("TMFE::PeriodicThread: periodic thread started\n");
887
890 double next_periodic_time = FePeriodicTasks();
891 double now = TMFE::GetTime();
892 double sleep = next_periodic_time - now;
893 //printf("TMFrontend::FePeriodicThread: now %.6f next %.6f, sleep %.6f\n", now, next_periodic_time, sleep);
894 if (sleep >= 1.0)
895 sleep = 1.0;
897 }
898 if (TMFE::gfVerbose)
899 printf("TMFE::PeriodicThread: periodic thread stopped\n");
901}
902
904{
905 // NOTE: this is thread safe
906
907 std::lock_guard<std::mutex> guard(fMutex);
908
910 if (gfVerbose)
911 printf("TMFE::StartRpcThread: RPC thread already running\n");
912 return;
913 }
914
915 fRpcThreadStarting = true;
916 fRpcThread = new std::thread(&TMFE::RpcThread, this);
917}
918
920{
921 // NOTE: this is thread safe
922
923 std::lock_guard<std::mutex> guard(fFeMutex);
924
926 if (TMFE::gfVerbose)
927 printf("TMFE::StartPeriodicThread: periodic thread already running\n");
928 return;
929 }
930
932 fFePeriodicThread = new std::thread(&TMFrontend::FePeriodicThread, this);
933}
934
936{
937 // NOTE: this is thread safe
938
939 fRpcThreadStarting = false;
941
942 for (int i=0; i<60; i++) {
943 if (!fRpcThreadRunning) {
944 std::lock_guard<std::mutex> guard(fMutex);
945 if (fRpcThread) {
946 fRpcThread->join();
947 delete fRpcThread;
948 fRpcThread = NULL;
949 if (gfVerbose)
950 printf("TMFE::StopRpcThread: RPC thread stopped\n");
951 }
952 return;
953 }
954 if (i>5) {
955 fprintf(stderr, "TMFE::StopRpcThread: waiting for RPC thread to stop\n");
956 }
957 ::sleep(1);
958 }
959
960 fprintf(stderr, "TMFE::StopRpcThread: timeout waiting for RPC thread to stop\n");
961}
962
964{
965 // NOTE: this is thread safe
966
969
970 for (int i=0; i<60; i++) {
972 std::lock_guard<std::mutex> guard(fFeMutex);
973 if (fFePeriodicThread) {
974 fFePeriodicThread->join();
975 delete fFePeriodicThread;
976 fFePeriodicThread = NULL;
977 if (TMFE::gfVerbose)
978 printf("TMFE::StopPeriodicThread: periodic thread stopped\n");
979 }
980 return;
981 }
982 if (i>5) {
983 fprintf(stderr, "TMFE::StopPeriodicThread: waiting for periodic thread to stop\n");
984 }
985 ::sleep(1);
986 }
987
988 fprintf(stderr, "TMFE::StopPeriodicThread: timeout waiting for periodic thread to stop\n");
989}
990
991void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const char *format, ...)
992{
993 char message[1024];
994 //printf("format [%s]\n", format);
995 va_list ap;
996 va_start(ap, format);
997 vsnprintf(message, sizeof(message)-1, format, ap);
998 va_end(ap);
999 //printf("message [%s]\n", message);
1000 cm_msg(message_type, filename, line, routine, "%s", message);
1002}
1003
1004void TMFE::Msg(int message_type, const char *filename, int line, const char *routine, const std::string& message)
1005{
1006 //printf("message [%s]\n", message.c_str());
1007 cm_msg(message_type, filename, line, routine, "%s", message.c_str());
1009}
1010
1012{
1013 struct timeval tv;
1014 gettimeofday(&tv, NULL);
1015 return tv.tv_sec*1.0 + tv.tv_usec/1000000.0;
1016}
1017
1018#if 1
1019void TMFE::Sleep(double time_sec)
1020{
1021 if (time_sec < 0) {
1022 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with negative sleep time: %f", time_sec);
1023 return;
1024 }
1025
1026 if (time_sec == 0) {
1027 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with zero sleep time");
1028 return;
1029 }
1030
1031 if (time_sec > 1.01) {
1032 // break long sleep into short sleeps
1033
1034 double t0 = TMFE::GetTime();
1035 double tend = t0 + time_sec;
1036
1037 while (1) {
1038 double now = TMFE::GetTime();
1039 if (now >= tend) {
1040 //printf("t0 %f, tend %f, now %f, done!\n", t0, tend, now);
1041 return;
1042 }
1043
1044 double tsleep = tend - now;
1045
1046 //printf("t0 %f, tend %f, now %f, tsleep %f!\n", t0, tend, now, tsleep);
1047
1048 if (tsleep > 1.0)
1049 tsleep = 1.0;
1050
1051 TMFE::Sleep(tsleep);
1052 }
1053
1054 return;
1055 }
1056
1057 int status;
1058 struct timeval timeout;
1059
1060 timeout.tv_sec = time_sec;
1061 timeout.tv_usec = (time_sec-timeout.tv_sec)*1000000.0;
1062
1063 while (1) {
1064 status = select(0, NULL, NULL, NULL, &timeout);
1065#ifdef EINVAL
1066 if (status < 0 && errno == EINVAL) {
1067 // #warning HERE EINVAL!
1068 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() called with invalid sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
1069 return;
1070 }
1071#endif
1072#ifdef EINTR
1073 if (status < 0 && errno == EINTR) {
1074 // #warning HERE EINTR!
1075 // NOTE1: on linux, "timeout" is modified by the kernel to subtract time already slept, we do not need to adjust it while handling EINTR.
1076 // NOTE2: on macos and other BSD-based systems, "timeout" value is not changed, and for accurate sleeping we should modify here, to account for time already slept, but we do not.
1077 // NOTE3: see "man select" on Linux and Macos.
1078 // NOTE4: MIDAS no longer uses SIGALRM to run cm_watchdog() and MIDAS applications do nto use signals.
1079 // NOTE4: so in theory, we do not have to worry about EINTR interrupting our sleep. K.O. Dec-2024
1080 // TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "TMFE::Sleep() EINTR, sleep time: %f, tv_sec: %lld, tv_usec: %lld", time_sec, (long long int)timeout.tv_sec, (long long int)timeout.tv_usec);
1081 continue;
1082 }
1083#endif
1084 break;
1085 }
1086
1087 if (status < 0) {
1088 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "select() returned %d, errno %d (%s)", status, errno, strerror(errno));
1089 }
1090}
1091#endif
1092
1093#if 0
1094void TMFE::Sleep(double time)
1095{
1096 struct timespec rqtp;
1097 struct timespec rmtp;
1098
1099 rqtp.tv_sec = time;
1100 rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
1101
1102 int status = nanosleep(&rqtp, &rmtp);
1103
1104 //#ifdef EINTR
1105 //if (status < 0 && errno == EINTR) {
1106 // return 0; // watchdog interrupt, try again
1107 //}
1108 //#endif
1109
1110 if (status < 0) {
1111 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
1112 }
1113}
1114#endif
1115
1116#if 0
1117void TMFE::Sleep(double time)
1118{
1119 struct timespec rqtp;
1120 struct timespec rmtp;
1121
1122 rqtp.tv_sec = time;
1123 rqtp.tv_nsec = (time-rqtp.tv_sec)*1000000000.0;
1124
1125 //int status = clock_nanosleep(CLOCK_REALTIME, 0, &rqtp, &rmtp);
1126 int status = clock_nanosleep(CLOCK_MONOTONIC, 0, &rqtp, &rmtp);
1127
1128 //#ifdef EINTR
1129 //if (status < 0 && errno == EINTR) {
1130 // return 0; // watchdog interrupt, try again
1131 //}
1132 //#endif
1133
1134 if (status < 0) {
1135 TMFE::Instance()->Msg(MERROR, "TMFE::Sleep", "nanosleep() returned %d, errno %d (%s)", status, errno, strerror(errno));
1136 }
1137}
1138#endif
1139
1140std::string TMFE::GetThreadId()
1141{
1142 return ss_tid_to_string(ss_gettid());
1143}
1144
1145static INT rpc_callback(INT index, void *prpc_param[])
1146{
1147 const char* cmd = CSTRING(0);
1148 const char* args = CSTRING(1);
1149 char* return_buf = CSTRING(2);
1150 int return_max_length = CINT(3);
1151
1152 if (TMFE::gfVerbose)
1153 printf("TMFE::rpc_callback: index %d, max_length %d, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
1154
1155 TMFE* mfe = TMFE::Instance();
1156
1157 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1158 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1160 if (!h)
1161 continue;
1162 std::string result = "";
1163 TMFeResult r = h->HandleRpc(cmd, args, result);
1164 if (result.length() > 0) {
1165 //printf("Handler reply [%s]\n", C(r));
1166 mstrlcpy(return_buf, result.c_str(), return_max_length);
1167 return RPC_SUCCESS;
1168 }
1169 }
1170
1171 return_buf[0] = 0;
1172 return RPC_SUCCESS;
1173}
1174
1175static INT rpc_cxx_callback(INT index, void *prpc_param[])
1176{
1177 const char* cmd = CSTRING(0);
1178 const char* args = CSTRING(1);
1179 std::string* pstr = CPSTDSTRING(2);
1180
1181 pstr->clear();
1182
1183 if (TMFE::gfVerbose)
1184 printf("TMFE::rpc_cxx_callback: index %d, cmd [%s], args [%s]\n", index, cmd, args);
1185
1186 TMFE* mfe = TMFE::Instance();
1187
1188 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1189 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1191 if (!h)
1192 continue;
1193 TMFeResult r = h->HandleRpc(cmd, args, *pstr);
1194 if (pstr->length() > 0) {
1195 //printf("Handler reply [%s]\n", pstr->c_str());
1196 return RPC_SUCCESS;
1197 }
1198 }
1199
1200 return RPC_SUCCESS;
1201}
1202
1203static INT binary_rpc_callback(INT index, void *prpc_param[])
1204{
1205 const char* cmd = CSTRING(0);
1206 const char* args = CSTRING(1);
1207 char* return_buf = CSTRING(2);
1208 size_t return_max_length = CINT(3);
1209
1210 if (TMFE::gfVerbose)
1211 printf("TMFE::binary_rpc_callback: index %d, max_length %zu, cmd [%s], args [%s]\n", index, return_max_length, cmd, args);
1212
1213 TMFE* mfe = TMFE::Instance();
1214
1215 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1216 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1218 if (!h)
1219 continue;
1220 std::vector<char> result;
1221 TMFeResult r = h->HandleBinaryRpc(cmd, args, result);
1222 if (result.size() > 0) {
1223 if (result.size() > return_max_length) {
1224 TMFE::Instance()->Msg(MERROR, "TMFE::binary_rpc_callback", "RPC handler returned too much data, %zu bytes truncated to %zu bytes", result.size(), return_max_length);
1225 result.resize(return_max_length);
1226 }
1227 //printf("Handler reply [%s]\n", C(r));
1228 assert(result.size() <= return_max_length);
1229 memcpy(return_buf, result.data(), result.size());
1230 CINT(3) = result.size();
1231 return RPC_SUCCESS;
1232 }
1233 }
1234
1235 CINT(3) = 0;
1236 return_buf[0] = 0;
1237 return RPC_SUCCESS;
1238}
1239
1240static INT binary_rpc_cxx_callback(INT index, void *prpc_param[])
1241{
1242 const char* cmd = CSTRING(0);
1243 const char* args = CSTRING(1);
1244 std::vector<char>* pbuf = CPSTDVECTOR(2);
1245
1246 if (TMFE::gfVerbose)
1247 printf("TMFE::binary_rpc_callback: index %d, cmd [%s], args [%s]\n", index, cmd, args);
1248
1249 pbuf->clear();
1250
1251 TMFE* mfe = TMFE::Instance();
1252
1253 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleRpc() modifies fEquipments. K.O.
1254 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1256 if (!h)
1257 continue;
1258 TMFeResult r = h->HandleBinaryRpc(cmd, args, *pbuf);
1259 if (pbuf->size() > 0) {
1260 return RPC_SUCCESS;
1261 }
1262 }
1263
1264 return RPC_SUCCESS;
1265}
1266
1268{
1269public:
1271
1272public:
1274 {
1275 if (TMFE::gfVerbose)
1276 printf("TMFrontendRpcHelper::ctor!\n");
1277
1278 fFe = fe;
1279 }
1280
1281 virtual ~TMFrontendRpcHelper() // dtor
1282 {
1283 if (TMFE::gfVerbose)
1284 printf("TMFrontendRpcHelper::dtor!\n");
1285
1286 // poison pointers
1287 fFe = NULL;
1288 }
1289
1291 {
1292 if (TMFE::gfVerbose)
1293 printf("TMFrontendRpcHelper::HandleBeginRun!\n");
1294
1295 for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
1297 if (!eq)
1298 continue;
1299 if (!eq->fEqConfEnabled)
1300 continue;
1301 eq->EqZeroStatistics();
1302 eq->EqWriteStatistics();
1303 }
1304 return TMFeOk();
1305 }
1306
1308 {
1309 if (TMFE::gfVerbose)
1310 printf("TMFrontendRpcHelper::HandleEndRun!\n");
1311
1312 for (unsigned i=0; i<fFe->fFeEquipments.size(); i++) {
1314 if (!eq)
1315 continue;
1316 if (!eq->fEqConfEnabled)
1317 continue;
1318 eq->EqWriteStatistics();
1319 }
1320
1322
1323 if (r.error_flag)
1324 return r;
1325
1326 return TMFeOk();
1327 }
1328};
1329
1330static INT tr_start(INT run_number, char *errstr)
1331{
1332 if (TMFE::gfVerbose)
1333 printf("TMFE::tr_start!\n");
1334
1335 TMFE* mfe = TMFE::Instance();
1336
1337 mfe->fRunNumber = run_number;
1338 mfe->fStateRunning = true;
1339
1340 TMFeResult result;
1341
1342 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleBeginRun() modifies fEquipments. K.O.
1343 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1345 if (!h)
1346 continue;
1347 result = h->HandleBeginRun(run_number);
1348 if (result.error_flag) {
1349 // error handling in this function matches general transition error handling:
1350 // on run start, the first user handler to return an error code
1351 // will abort the transition. This leaves everything in an
1352 // inconsistent state: frontends called before the abort
1353 // think the run is running, which it does not. They should register
1354 // a handler for the "start abort" transition. This transition calls
1355 // all already started frontends so they can cleanup their state. K.O.
1356 //
1357 break;
1358 }
1359 }
1360
1361 if (result.error_flag) {
1362 mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1363 return FE_ERR_DRIVER;
1364 }
1365
1366 return SUCCESS;
1367}
1368
1369static INT tr_stop(INT run_number, char *errstr)
1370{
1371 if (TMFE::gfVerbose)
1372 printf("TMFE::tr_stop!\n");
1373
1374 TMFeResult result;
1375
1376 TMFE* mfe = TMFE::Instance();
1377
1378 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleEndRun() modifies fEquipments. K.O.
1379 // NOTE: we need to stop thing in reverse order, otherwise TMFrontend code
1380 // does not work right - TMFrontend is registered first, and (correctly) runs
1381 // first at begin of run (to clear statistics, etc). But at the end of run
1382 // it needs to run last, to update the statistics, etc. after all the equipments
1383 // have done their end of run things and are finished. K.O.
1384 for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
1386 if (!h)
1387 continue;
1388 TMFeResult xresult = h->HandleEndRun(run_number);
1389 if (xresult.error_flag) {
1390 // error handling in this function matches general transition error handling:
1391 // the "run stop" transition is always sucessful, the run always stops.
1392 // if some frontend returns an error, this error is remembered and is returned
1393 // as the transition over all status. K.O.
1394 result = xresult;
1395 }
1396 }
1397
1398 mfe->fStateRunning = false;
1399
1400 if (result.error_flag) {
1401 mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1402 return FE_ERR_DRIVER;
1403 }
1404
1405 return SUCCESS;
1406}
1407
1408static INT tr_pause(INT run_number, char *errstr)
1409{
1410 cm_msg(MINFO, "tr_pause", "tr_pause");
1411
1412 TMFeResult result;
1413
1414 TMFE* mfe = TMFE::Instance();
1415
1416 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandlePauseRun() modifies fEquipments. K.O.
1417 // NOTE: tr_pause runs in reverse order to match tr_stop. K.O.
1418 for (int i = (int)mfe->fRpcHandlers.size() - 1; i >= 0; i--) {
1420 if (!h)
1421 continue;
1422 result = h->HandlePauseRun(run_number);
1423 if (result.error_flag) {
1424 // error handling in this function matches general transition error handling:
1425 // logic is same as "start run"
1426 break;
1427 }
1428 }
1429
1430 if (result.error_flag) {
1431 mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1432 return FE_ERR_DRIVER;
1433 }
1434
1435 return SUCCESS;
1436}
1437
1438static INT tr_resume(INT run_number, char *errstr)
1439{
1440 if (TMFE::gfVerbose)
1441 printf("TMFE::tr_resume!\n");
1442
1443 TMFeResult result;
1444
1445 TMFE* mfe = TMFE::Instance();
1446
1447 mfe->fRunNumber = run_number;
1448 mfe->fStateRunning = true;
1449
1450 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleResumeRun() modifies fEquipments. K.O.
1451 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1453 if (!h)
1454 continue;
1455 result = h->HandleResumeRun(run_number);
1456 if (result.error_flag) {
1457 // error handling in this function matches general transition error handling:
1458 // logic is same as "start run"
1459 break;
1460 }
1461 }
1462
1463 if (result.error_flag) {
1464 mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1465 return FE_ERR_DRIVER;
1466 }
1467
1468 return SUCCESS;
1469}
1470
1471static INT tr_startabort(INT run_number, char *errstr)
1472{
1473 if (TMFE::gfVerbose)
1474 printf("TMFE::tr_startabort!\n");
1475
1476 TMFeResult result;
1477
1478 TMFE* mfe = TMFE::Instance();
1479
1480 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleStartAbortRun() modifies fEquipments. K.O.
1481 for (unsigned i=0; i<mfe->fRpcHandlers.size(); i++) {
1483 if (!h)
1484 continue;
1485 result = h->HandleStartAbortRun(run_number);
1486 if (result.error_flag) {
1487 // error handling in this function matches general transition error handling:
1488 // logic is same as "start run"
1489 break;
1490 }
1491 }
1492
1493 mfe->fStateRunning = false;
1494
1495 if (result.error_flag) {
1496 mstrlcpy(errstr, result.error_message.c_str(), TRANSITION_ERROR_STRING_LENGTH);
1497 return FE_ERR_DRIVER;
1498 }
1499
1500 return SUCCESS;
1501}
1502
1507
1512
1517
1522
1527
1536
1541
1546
1551
1556
1561
1566
1582
1584{
1585 fRpcHandlers.push_back(h);
1586}
1587
1589{
1590 for (unsigned i=0; i<fRpcHandlers.size(); i++) {
1591 if (fRpcHandlers[i] == h) {
1592 fRpcHandlers[i] = NULL;
1593 }
1594 }
1595}
1596
1598{
1599 fMfe = TMFE::Instance();
1600}
1601
1603{
1604 if (fFeRpcHelper) {
1606 delete fFeRpcHelper;
1607 fFeRpcHelper = NULL;
1608 }
1609 // poison all pointers
1610 fMfe = NULL;
1611}
1612
1613TMFeResult TMFrontend::FeInitEquipments(const std::vector<std::string>& args)
1614{
1615 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleInit() modifies fEquipments. K.O.
1616 for (unsigned i=0; i<fFeEquipments.size(); i++) {
1617 if (!fFeEquipments[i])
1618 continue;
1619 if (!fFeEquipments[i]->fEqConfEnabled)
1620 continue;
1621 TMFeResult r = fFeEquipments[i]->EqInit(args);
1622 if (r.error_flag)
1623 return r;
1624 }
1625 return TMFeOk();
1626}
1627
1629{
1630 // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1631 for (unsigned i=0; i<fFeEquipments.size(); i++) {
1632 if (!fFeEquipments[i])
1633 continue;
1634 fFeEquipments[i]->EqStopPollThread();
1635 }
1636}
1637
1639{
1640 // NOTE: this is thread-safe: we do not modify the fEquipments object. K.O.
1641 // NOTE: this is not thread-safe, we will race against ourselves and do multiple delete of fEquipents[i]. K.O.
1642
1643 // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1644 for (unsigned i=0; i<fFeEquipments.size(); i++) {
1645 if (!fFeEquipments[i])
1646 continue;
1647 //printf("delete equipment [%s]\n", fFeEquipments[i]->fEqName.c_str());
1649 delete fFeEquipments[i];
1650 fFeEquipments[i] = NULL;
1651 }
1652
1655}
1656
1658{
1659 // NOTE: not thread-safe, we modify the fEquipments object. K.O.
1660
1661 // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1662 for (unsigned i=0; i<fFeEquipments.size(); i++) {
1663 if (!fFeEquipments[i])
1664 continue;
1665 if (fFeEquipments[i] == eq) {
1666 fprintf(stderr, "TMFE::AddEquipment: Fatal error: Equipment \"%s\" is already registered, bye...\n", fFeEquipments[i]->fEqName.c_str());
1667 fMfe->Disconnect();
1668 exit(1);
1669 //return TMFeErrorMessage(msprintf("TMFE::AddEquipment: Equipment \"%s\" is already registered", fFeEquipments[i]->fEqName.c_str()));
1670 }
1671 if (fFeEquipments[i]->fEqName == eq->fEqName) {
1672 fprintf(stderr, "TMFE::AddEquipment: Fatal error: Duplicate equipment name \"%s\", bye...\n", eq->fEqName.c_str());
1673 fMfe->Disconnect();
1674 exit(1);
1675 //return TMFeErrorMessage(std::string("TMFE::AddEquipment: Duplicate equipment name \"") + eq->fEqName + "\"");
1676 }
1677 }
1678
1679 eq->fFe = this;
1680
1681 // NOTE: fEquipments must be protected again multithreaded access here. K.O.
1682 fFeEquipments.push_back(eq);
1683
1684 return TMFeOk();
1685}
1686
1688{
1689 // NOTE: this is thread-safe, we do not modify the fEquipments object. K.O.
1690
1691 // NOTE: should not use range-based for() loop, it uses an iterator and it not thread-safe. K.O.
1692 for (unsigned i=0; i<fFeEquipments.size(); i++) {
1693 if (!fFeEquipments[i])
1694 continue;
1695 if (fFeEquipments[i] == eq) {
1696 fFeEquipments[i] = NULL;
1697 return TMFeOk();
1698 }
1699 }
1700
1701 return TMFeErrorMessage(msprintf("TMFE::RemoveEquipment: Cannot find equipment \"%s\"", eq->fEqName.c_str()));
1702}
1703
1704void TMFrontend::FeSetName(const char* program_name)
1705{
1706 assert(program_name != NULL);
1707 fMfe->fProgramName = program_name;
1708}
1709
1710TMFeEquipment::TMFeEquipment(const char* eqname, const char* eqfilename) // ctor
1711{
1712 assert(eqname != NULL);
1713 assert(eqfilename != NULL);
1714
1715 if (TMFE::gfVerbose)
1716 printf("TMFeEquipment::ctor: equipment name [%s] file [%s]\n", eqname, eqfilename);
1717
1718 fMfe = TMFE::Instance();
1719 fEqName = eqname;
1720 fEqFilename = eqfilename;
1722}
1723
1725{
1726 if (TMFE::gfVerbose)
1727 printf("TMFeEquipment::dtor: equipment name [%s]\n", fEqName.c_str());
1728
1730
1731 // free data and poison pointers
1732 if (fOdbEq) {
1733 delete fOdbEq;
1734 fOdbEq = NULL;
1735 }
1736 if (fOdbEqCommon) {
1737 delete fOdbEqCommon;
1738 fOdbEqCommon = NULL;
1739 }
1740 if (fOdbEqSettings) {
1741 delete fOdbEqSettings;
1742 fOdbEqSettings = NULL;
1743 }
1744 if (fOdbEqVariables) {
1745 delete fOdbEqVariables;
1746 fOdbEqVariables = NULL;
1747 }
1748 if (fOdbEqStatistics) {
1749 delete fOdbEqStatistics;
1750 fOdbEqStatistics = NULL;
1751 }
1752 fMfe = NULL;
1753 fFe = NULL;
1754 fEqEventBuffer = NULL;
1755}
1756
1757TMFeResult TMFeEquipment::EqInit(const std::vector<std::string>& args)
1758{
1759 TMFeResult r;
1760
1761 r = EqPreInit();
1762 if (r.error_flag)
1763 return r;
1764
1765 r = HandleInit(args);
1766 if (r.error_flag)
1767 return r;
1768
1769 r = EqPostInit();
1770 if (r.error_flag)
1771 return r;
1772
1773 return TMFeOk();
1774}
1775
1777{
1778 if (TMFE::gfVerbose)
1779 printf("TMFeEquipment::EqReadCommon: for [%s]\n", fEqName.c_str());
1780
1781 // list of ODB Common entries always read
1782
1783 fOdbEqCommon->RB("Enabled", &fEqConfEnabled, true);
1784 fOdbEqCommon->RD("Event limit", &fEqConfEventLimit, true);
1785
1787 // list of ODB Common entries read if we want to control equipment from ODB
1788
1789 fOdbEqCommon->RU16("Event ID", &fEqConfEventID, true);
1790 fOdbEqCommon->RU16("Trigger mask", &fEqConfTriggerMask, true);
1791 fOdbEqCommon->RS("Buffer", &fEqConfBuffer, true, NAME_LENGTH);
1792 fOdbEqCommon->RI("Type", &fEqConfType, true);
1793 fOdbEqCommon->RI("Source", &fEqConfSource, true);
1794 fOdbEqCommon->RS("Format", &fEqConfFormat, true, 8);
1795 fOdbEqCommon->RI("Read on", &fEqConfReadOn, true);
1796 fOdbEqCommon->RI("Period", &fEqConfPeriodMilliSec, true);
1797 fOdbEqCommon->RU32("Num subevents", &fEqConfNumSubEvents, true);
1798 fOdbEqCommon->RI("Log history", &fEqConfLogHistory, true);
1799 fOdbEqCommon->RB("Hidden", &fEqConfHidden, true);
1800 fOdbEqCommon->RI("Write cache size", &fEqConfWriteCacheSize, true);
1801
1802 // decode data from ODB Common
1803
1806 }
1807
1808 // list of ODB Common entries we read and write back to ODB, but do not actually use.
1809
1810 //fOdbEqCommon->RS("Frontend host", &fEqConfFrontendHost, true, NAME_LENGTH);
1811 //fOdbEqCommon->RS("Frontend name", &fEqConfFrontendName, true, NAME_LENGTH);
1812 //fOdbEqCommon->RS("Frontend file name", &fEqConfFrontendFileName, true, 256);
1813 //fOdbEqCommon->RS("Status", &fEqConfStatus, true, 256);
1814 //fOdbEqCommon->RS("Status color", &fEqConfStatusColor, true, NAME_LENGTH);
1815
1816 return TMFeOk();
1817}
1818
1820{
1821 if (TMFE::gfVerbose)
1822 printf("TMFeEquipment::EqWriteCommon: for [%s]\n", fEqName.c_str());
1823
1824 // encode data for ODB Common
1825
1826 fEqConfReadOn = 0;
1829 else
1833
1834 // write to ODB
1835
1836 fOdbEqCommon->WU16("Event ID", fEqConfEventID);
1837 fOdbEqCommon->WU16("Trigger mask", fEqConfTriggerMask);
1838 fOdbEqCommon->WS("Buffer", fEqConfBuffer.c_str(), NAME_LENGTH);
1839 fOdbEqCommon->WI("Type", fEqConfType);
1840 fOdbEqCommon->WI("Source", fEqConfSource);
1841 fOdbEqCommon->WS("Format", fEqConfFormat.c_str(), 8);
1842 fOdbEqCommon->WB("Enabled", fEqConfEnabled);
1843 fOdbEqCommon->WI("Read on", fEqConfReadOn);
1844 fOdbEqCommon->WI("Period", fEqConfPeriodMilliSec);
1845 fOdbEqCommon->WD("Event limit", fEqConfEventLimit);
1846 fOdbEqCommon->WU32("Num subevents", fEqConfNumSubEvents);
1847 fOdbEqCommon->WI("Log history", fEqConfLogHistory);
1848 fOdbEqCommon->WS("Frontend host", fMfe->fHostname.c_str(), NAME_LENGTH);
1849 fOdbEqCommon->WS("Frontend name", fMfe->fProgramName.c_str(), NAME_LENGTH);
1850 fOdbEqCommon->WS("Frontend file name", fEqFilename.c_str(), 256);
1851 if (create) {
1852 fOdbEqCommon->WS("Status", "", 256);
1853 fOdbEqCommon->WS("Status color", "", NAME_LENGTH);
1854 }
1855 fOdbEqCommon->WB("Hidden", fEqConfHidden);
1856 fOdbEqCommon->WI("Write cache size", fEqConfWriteCacheSize);
1857 return TMFeOk();
1858}
1859
1861{
1862 if (TMFE::gfVerbose)
1863 printf("TMFeEquipment::PreInit: for [%s]\n", fEqName.c_str());
1864
1865 //
1866 // Apply frontend index
1867 //
1868
1869 if (fEqName.find("%") != std::string::npos) {
1870 fEqName = msprintf(fEqName.c_str(), fFe->fFeIndex);
1871 }
1872
1873 if (fEqConfBuffer.find("%") != std::string::npos) {
1875 }
1876
1877 //
1878 // create ODB /eq/name/common
1879 //
1880
1881 fOdbEq = fMfe->fOdbRoot->Chdir((std::string("Equipment/") + fEqName).c_str(), true);
1882 fOdbEqCommon = fOdbEq->Chdir("Common", false);
1883 if (!fOdbEqCommon) {
1884 if (TMFE::gfVerbose)
1885 printf("TMFeEquipment::PreInit: creating ODB common\n");
1886 fOdbEqCommon = fOdbEq->Chdir("Common", true);
1887 EqWriteCommon(true);
1888 }
1889 fOdbEqSettings = fOdbEq->Chdir("Settings", true);
1890 fOdbEqVariables = fOdbEq->Chdir("Variables", true);
1891 fOdbEqStatistics = fOdbEq->Chdir("Statistics", true);
1892
1894
1895 if (r.error_flag)
1896 return r;
1897
1898 if (rpc_is_remote()) {
1899 EqSetStatus((fMfe->fProgramName + "@" + fMfe->fHostname).c_str(), "greenLight");
1900 } else {
1901 EqSetStatus(fMfe->fProgramName.c_str(), "greenLight");
1902 }
1903
1906
1907 return TMFeOk();
1908}
1909
1911{
1912 if (TMFE::gfVerbose)
1913 printf("TMFeEquipment::EqPostInit: for [%s]\n", fEqName.c_str());
1914
1915 if (!fEqConfEnabled) {
1916 EqSetStatus("Disabled", "yellowLight");
1917 }
1918
1919 // open event buffer
1920
1921 uint32_t odb_max_event_size = DEFAULT_MAX_EVENT_SIZE;
1922 fMfe->fOdbRoot->RU32("Experiment/MAX_EVENT_SIZE", &odb_max_event_size, true);
1923
1924 if (fEqConfMaxEventSize == 0) {
1925 fEqConfMaxEventSize = odb_max_event_size;
1926 } else if (fEqConfMaxEventSize > odb_max_event_size) {
1927 fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than ODB MAX_EVENT_SIZE %d", fEqName.c_str(), (int)fEqConfMaxEventSize, odb_max_event_size);
1928 fEqConfMaxEventSize = odb_max_event_size;
1929 }
1930
1931 if (!fEqConfBuffer.empty()) {
1933
1934 if (r.error_flag)
1935 return r;
1936
1937 assert(fEqEventBuffer != NULL);
1938
1940 fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested event size %d is bigger than event buffer \"%s\" max event size %d", fEqName.c_str(), (int)fEqConfMaxEventSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufMaxEventSize);
1942 }
1943
1944 if (fEqConfWriteCacheSize > 0) {
1947
1948 if (r.error_flag)
1949 return r;
1951 fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is smaller then already set write cache size %d, ignoring it", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
1953 // do nothing
1954 } else {
1955 fMfe->Msg(MERROR, "TMFeEquipment::EqPostInit", "Equipment \"%s\" requested write cache size %d for buffer \"%s\" is different from already set write cache size %d", fEqName.c_str(), (int)fEqConfWriteCacheSize, fEqEventBuffer->fBufName.c_str(), (int)fEqEventBuffer->fBufWriteCacheSize);
1956
1958
1959 if (r.error_flag)
1960 return r;
1961 }
1962 }
1963 }
1964
1965 if (TMFE::gfVerbose)
1966 printf("TMFeEquipment::EqPostInit: Equipment \"%s\", max event size: %d\n", fEqName.c_str(), (int)fEqConfMaxEventSize);
1967
1968 // update ODB common
1969
1971
1972 if (r.error_flag)
1973 return r;
1974
1976 fMfe->AddRpcHandler(this);
1977 }
1978
1979 return TMFeOk();
1980};
1981
1983{
1984 fEqMutex.lock();
1985
1986 if (TMFE::gfVerbose)
1987 printf("TMFeEquipment::EqZeroStatistics: zero statistics for [%s]\n", fEqName.c_str());
1988
1989 double now = TMFE::GetTime();
1990
1991 fEqStatEvents = 0;
1992 fEqStatBytes = 0;
1993 fEqStatEpS = 0;
1994 fEqStatKBpS = 0;
1995
1996 fEqStatLastTime = now;
1998 fEqStatLastBytes = 0;
1999
2000 fEqStatNextWrite = now; // force immediate update
2001
2002 fEqMutex.unlock();
2003
2004 return TMFeOk();
2005}
2006
2008{
2009 fEqMutex.lock();
2010
2011 if (TMFE::gfVerbose)
2012 printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s]\n", fEqName.c_str());
2013
2014 double now = TMFE::GetTime();
2015 double elapsed = now - fEqStatLastTime;
2016
2017 if (elapsed > 0.9 || fEqStatLastTime == 0) {
2019 fEqStatKBpS = (fEqStatBytes - fEqStatLastBytes) / elapsed / 1000.0;
2020
2021 fEqStatLastTime = now;
2024 }
2025
2026 //printf("TMFeEquipment::EqWriteStatistics: write statistics for [%s], now %f, elapsed %f, sent %f, eps %f, kps %f\n", fEqName.c_str(), now, elapsed, fEqStatEvents, fEqStatEpS, fEqStatKBpS);
2027
2028 fOdbEqStatistics->WD("Events sent", fEqStatEvents);
2029 fOdbEqStatistics->WD("Events per sec.", fEqStatEpS);
2030 fOdbEqStatistics->WD("kBytes per sec.", fEqStatKBpS);
2031
2032 fEqStatLastWrite = now;
2033
2035 // avoid creep of NextWrite: we start it at
2036 // time of initialization, then increment it strictly
2037 // by the period value, regardless of when it is actually
2038 // written to ODB (actual period is longer than requested
2039 // period because we only over-sleep, never under-sleep). K.O.
2040 while (fEqStatNextWrite <= now) {
2042 }
2043 } else {
2044 fEqStatNextWrite = now;
2045 }
2046
2047 fEqMutex.unlock();
2048
2049 return TMFeOk();
2050}
2051
2052TMFeResult TMFeEquipment::ComposeEvent(char* event, size_t size) const
2053{
2054 EVENT_HEADER* pevent = (EVENT_HEADER*)event;
2055 pevent->event_id = fEqConfEventID;
2057 pevent->serial_number = fEqSerial;
2058 pevent->time_stamp = TMFE::GetTime();
2059 pevent->data_size = 0;
2060 return TMFeOk();
2061}
2062
2063TMFeResult TMFeEquipment::EqSendEvent(const char* event, bool write_to_odb)
2064{
2065 std::lock_guard<std::mutex> guard(fEqMutex);
2066
2067 fEqSerial++;
2068
2069 EVENT_HEADER* pevent = (EVENT_HEADER*)event;
2070 pevent->data_size = BkSize(event);
2071
2072 if (fEqEventBuffer != NULL) {
2074
2075 if (r.error_flag)
2076 return r;
2077 }
2078
2079 fEqStatEvents += 1;
2080 fEqStatBytes += sizeof(EVENT_HEADER) + pevent->data_size;
2081
2082 if (fEqConfWriteEventsToOdb && write_to_odb) {
2084 if (r.error_flag)
2085 return r;
2086 }
2087
2088 if (fMfe->fStateRunning) {
2089 if (fEqConfEventLimit > 0) {
2091 if (!fMfe->fRunStopRequested) {
2092 fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2093 }
2094 fMfe->fRunStopRequested = true;
2095 }
2096 }
2097 }
2098
2099 return TMFeOk();
2100}
2101
2102TMFeResult TMFeEquipment::EqSendEvent(const std::vector<char>& event, bool write_to_odb)
2103{
2104 std::lock_guard<std::mutex> guard(fEqMutex);
2105
2106 fEqSerial++;
2107
2108 if (fEqEventBuffer == NULL) {
2109 return TMFeOk();
2110 }
2111
2113
2114 if (r.error_flag)
2115 return r;
2116
2117 fEqStatEvents += 1;
2118 fEqStatBytes += event.size();
2119
2120 if (fEqConfWriteEventsToOdb && write_to_odb) {
2121 TMFeResult r = EqWriteEventToOdb_locked(event.data());
2122 if (r.error_flag)
2123 return r;
2124 }
2125
2126 if (fMfe->fStateRunning) {
2127 if (fEqConfEventLimit > 0) {
2129 if (!fMfe->fRunStopRequested) {
2130 fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2131 }
2132 fMfe->fRunStopRequested = true;
2133 }
2134 }
2135 }
2136
2137 return TMFeOk();
2138}
2139
2140TMFeResult TMFeEquipment::EqSendEvent(const std::vector<std::vector<char>>& event, bool write_to_odb)
2141{
2142 std::lock_guard<std::mutex> guard(fEqMutex);
2143
2144 fEqSerial++;
2145
2146 if (fEqEventBuffer == NULL) {
2147 return TMFeOk();
2148 }
2149
2151
2152 if (r.error_flag)
2153 return r;
2154
2155 fEqStatEvents += 1;
2156 for (auto v: event) {
2157 fEqStatBytes += v.size();
2158 }
2159
2160 //if (fEqConfWriteEventsToOdb && write_to_odb) {
2161 // TMFeResult r = EqWriteEventToOdb_locked(event.data());
2162 // if (r.error_flag)
2163 // return r;
2164 //}
2165
2166 if (fMfe->fStateRunning) {
2167 if (fEqConfEventLimit > 0) {
2169 if (!fMfe->fRunStopRequested) {
2170 fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2171 }
2172 fMfe->fRunStopRequested = true;
2173 }
2174 }
2175 }
2176
2177 return TMFeOk();
2178}
2179
2180TMFeResult TMFeEquipment::EqSendEvent(int sg_n, const char* sg_ptr[], const size_t sg_len[], bool write_to_odb)
2181{
2182 std::lock_guard<std::mutex> guard(fEqMutex);
2183
2184 fEqSerial++;
2185
2186 if (fEqEventBuffer == NULL) {
2187 return TMFeOk();
2188 }
2189
2190 TMFeResult r = fEqEventBuffer->SendEvent(sg_n, sg_ptr, sg_len);
2191
2192 if (r.error_flag)
2193 return r;
2194
2195 fEqStatEvents += 1;
2196 for (int i=0; i<sg_n; i++) {
2197 fEqStatBytes += sg_len[i];
2198 }
2199
2200 //if (fEqConfWriteEventsToOdb && write_to_odb) {
2201 // TMFeResult r = EqWriteEventToOdb_locked(event.data());
2202 // if (r.error_flag)
2203 // return r;
2204 //}
2205
2206 if (fMfe->fStateRunning) {
2207 if (fEqConfEventLimit > 0) {
2209 if (!fMfe->fRunStopRequested) {
2210 fMfe->Msg(MINFO, "TMFeEquipment::EqSendEvent", "Equipment \"%s\" sent %.0f events out of %.0f requested, run will stop now", fEqName.c_str(), fEqStatEvents, fEqConfEventLimit);
2211 }
2212 fMfe->fRunStopRequested = true;
2213 }
2214 }
2215 }
2216
2217 return TMFeOk();
2218}
2219
2221{
2222 std::lock_guard<std::mutex> guard(fEqMutex);
2223 return EqWriteEventToOdb_locked(event);
2224}
2225
2227{
2228 std::string path = "";
2229 path += "/Equipment/";
2230 path += fEqName;
2231 path += "/Variables";
2232
2233 HNDLE hKeyVar = 0;
2234
2235 int status = db_find_key(fMfe->fDB, 0, path.c_str(), &hKeyVar);
2236 if (status != DB_SUCCESS) {
2237 return TMFeMidasError(msprintf("Cannot find \"%s\" in ODB", path.c_str()), "db_find_key", status);
2238 }
2239
2240 status = cm_write_event_to_odb(fMfe->fDB, hKeyVar, (const EVENT_HEADER*) event, FORMAT_MIDAS);
2241 if (status != SUCCESS) {
2242 return TMFeMidasError("Cannot write event to ODB", "cm_write_event_to_odb", status);
2243 }
2244 return TMFeOk();
2245}
2246
2247int TMFeEquipment::BkSize(const char* event) const
2248{
2249 return bk_size(event + sizeof(EVENT_HEADER));
2250}
2251
2252TMFeResult TMFeEquipment::BkInit(char* event, size_t size) const
2253{
2254 bk_init32a(event + sizeof(EVENT_HEADER));
2255 return TMFeOk();
2256}
2257
2258void* TMFeEquipment::BkOpen(char* event, const char* name, int tid) const
2259{
2260 void* ptr;
2261 bk_create(event + sizeof(EVENT_HEADER), name, tid, &ptr);
2262 return ptr;
2263}
2264
2265TMFeResult TMFeEquipment::BkClose(char* event, void* ptr) const
2266{
2267 bk_close(event + sizeof(EVENT_HEADER), ptr);
2268 ((EVENT_HEADER*)event)->data_size = BkSize(event);
2269 return TMFeOk();
2270}
2271
2272TMFeResult TMFeEquipment::EqSetStatus(char const* eq_status, char const* eq_color)
2273{
2274 if (eq_status) {
2275 fOdbEqCommon->WS("Status", eq_status, 256);
2276 }
2277
2278 if (eq_color) {
2279 fOdbEqCommon->WS("Status color", eq_color, NAME_LENGTH);
2280 }
2281
2282 return TMFeOk();
2283}
2284
2285TMFeResult TMFE::TriggerAlarm(const char* name, const char* message, const char* aclass)
2286{
2288
2289 if (status) {
2290 return TMFeMidasError("Cannot trigger alarm", "al_trigger_alarm", status);
2291 }
2292
2293 return TMFeOk();
2294}
2295
2297{
2298 int status = al_reset_alarm(name);
2299
2300 if (status) {
2301 return TMFeMidasError("Cannot reset alarm", "al_reset_alarm", status);
2302 }
2303
2304 return TMFeOk();
2305}
2306
2307void TMFrontend::FeUsage(const char* argv0)
2308{
2309 fprintf(stderr, "\n");
2310 fprintf(stderr, "Usage: %s args... [-- equipment args...]\n", argv0);
2311 fprintf(stderr, "\n");
2312 fprintf(stderr, " --help -- print this help message\n");
2313 fprintf(stderr, " -h -- print this help message\n");
2314 fprintf(stderr, " -v -- report all activities\n");
2315 fprintf(stderr, "\n");
2316 fprintf(stderr, " -h hostname[:tcpport] -- connect to MIDAS mserver on given host and tcp port number\n");
2317 fprintf(stderr, " -e exptname -- connect to given MIDAS experiment\n");
2318 fprintf(stderr, "\n");
2319 fprintf(stderr, " -D -- Become a daemon\n");
2320 fprintf(stderr, " -O -- Become a daemon but keep stdout for saving in a log file: frontend -O >file.log 2>&1\n");
2321 fprintf(stderr, "\n");
2322 fprintf(stderr, " -i NNN -- Set frontend index number\n");
2323 fprintf(stderr, "\n");
2324
2325 // NOTE: cannot use range-based for() loop, it uses an iterator and will crash if HandleUsage() modifies fEquipments. K.O.
2326 for (unsigned i=0; i<fFeEquipments.size(); i++) {
2327 if (!fFeEquipments[i])
2328 continue;
2329 fprintf(stderr, "Usage of equipment \"%s\":\n", fFeEquipments[i]->fEqName.c_str());
2330 fprintf(stderr, "\n");
2331 fFeEquipments[i]->HandleUsage();
2332 fprintf(stderr, "\n");
2333 }
2334}
2335
2336int TMFrontend::FeMain(int argc, char* argv[])
2337{
2338 std::vector<std::string> args;
2339 for (int i=0; i<argc; i++) {
2340 args.push_back(argv[i]);
2341 }
2342
2343 return FeMain(args);
2344}
2345
2346TMFeResult TMFrontend::FeInit(const std::vector<std::string> &args)
2347{
2348 setbuf(stdout, NULL);
2349 setbuf(stderr, NULL);
2350
2351 signal(SIGPIPE, SIG_IGN);
2352
2353 std::vector<std::string> eq_args;
2354
2355 bool help = false;
2356 std::string exptname;
2357 std::string hostname;
2358 bool daemon0 = false;
2359 bool daemon1 = false;
2360
2361 for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
2362 //printf("argv[%d] is %s\n", i, args[i].c_str());
2363 if (args[i] == "--") {
2364 // remaining arguments are passed to equipment Init()
2365 for (unsigned j=i+1; j<args.size(); j++)
2366 eq_args.push_back(args[j]);
2367 break;
2368 } else if (args[i] == "-v") {
2369 TMFE::gfVerbose = true;
2370 } else if (args[i] == "-D") {
2371 daemon0 = true;
2372 } else if (args[i] == "-O") {
2373 daemon1 = true;
2374 } else if (args[i] == "-h") {
2375 i++;
2376 if (i >= args.size()) { help = true; break; }
2377 hostname = args[i];
2378 } else if (args[i] == "-e") {
2379 i++;
2380 if (i >= args.size()) { help = true; break; }
2381 exptname = args[i];
2382 } else if (args[i] == "-i") {
2383 i++;
2384 if (i >= args.size()) { help = true; break; }
2385 fFeIndex = atoi(args[i].c_str());
2386 } else if (args[i] == "--help") {
2387 help = true;
2388 break;
2389 } else if (args[i][0] == '-') {
2390 help = true;
2391 break;
2392 } else {
2393 help = true;
2394 break;
2395 }
2396 }
2397
2398 //
2399 // daemonize...
2400 //
2401
2402 if (daemon0) {
2403 printf("Becoming a daemon...\n");
2405 } else if (daemon1) {
2406 printf("Becoming a daemon...\n");
2408 }
2409
2410 //
2411 // apply frontend index to indexed frontend
2412 //
2413
2414 if (fMfe->fProgramName.find("%") != std::string::npos) {
2416 }
2417
2418 TMFeResult r;
2419
2420 // call arguments handler before calling the usage handlers. Otherwise,
2421 // if the arguments handler creates new equipments,
2422 // we will never see their Usage(). K.O.
2423 r = HandleArguments(eq_args);
2424
2425 if (r.error_flag) {
2426 fprintf(stderr, "Fatal error: arguments handler error: %s, bye.\n", r.error_message.c_str());
2427 fMfe->Disconnect();
2428 exit(1);
2429 }
2430
2431 if (help) {
2432 FeUsage(args[0].c_str());
2433 HandleUsage();
2434 fMfe->Disconnect();
2435 exit(1);
2436 }
2437
2438 r = fMfe->Connect(NULL, hostname.c_str(), exptname.c_str());
2439
2440 if (r.error_flag) {
2441 fprintf(stderr, "Fatal error: cannot connect to MIDAS, error: %s, bye.\n", r.error_message.c_str());
2442 fMfe->Disconnect();
2443 exit(1);
2444 }
2445
2446 r = HandleFrontendInit(eq_args);
2447
2448 if (r.error_flag) {
2449 fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
2450 fMfe->Disconnect();
2451 exit(1);
2452 }
2453
2456
2457 //mfe->SetWatchdogSec(0);
2458 //mfe->SetTransitionSequenceStart(910);
2459 //mfe->SetTransitionSequenceStop(90);
2460 //mfe->DeregisterTransitionPause();
2461 //mfe->DeregisterTransitionResume();
2462 //mfe->RegisterTransitionStartAbort();
2463
2464 r = FeInitEquipments(eq_args);
2465
2466 if (r.error_flag) {
2467 fprintf(stderr, "Cannot initialize equipments, error message: %s, bye.\n", r.error_message.c_str());
2468 fMfe->Disconnect();
2469 exit(1);
2470 }
2471
2472 r = HandleFrontendReady(eq_args);
2473
2474 if (r.error_flag) {
2475 fprintf(stderr, "Fatal error: frontend post-init error: %s, bye.\n", r.error_message.c_str());
2476 fMfe->Disconnect();
2477 exit(1);
2478 }
2479
2480 if (fMfe->fStateRunning) {
2482 fprintf(stderr, "Fatal error: Cannot start frontend, run is in progress!\n");
2483 fMfe->Disconnect();
2484 exit(1);
2485 } else if (fFeIfRunningCallBeginRun) {
2486 char errstr[TRANSITION_ERROR_STRING_LENGTH];
2487 tr_start(fMfe->fRunNumber, errstr);
2488 }
2489 }
2490
2491 return TMFeOk();
2492}
2493
2495{
2496 while (!fMfe->fShutdownRequested) {
2497 FePollMidas(0.100);
2498 }
2499}
2500
2510
2511int TMFrontend::FeMain(const std::vector<std::string> &args)
2512{
2513 TMFeResult r = FeInit(args);
2514
2515 if (r.error_flag) {
2516 fprintf(stderr, "Fatal error: frontend init error: %s, bye.\n", r.error_message.c_str());
2517 fMfe->Disconnect();
2518 exit(1);
2519 }
2520
2521 FeMainLoop();
2522 FeShutdown();
2523
2524 return 0;
2525}
2526
2527// singleton instance
2528TMFE* TMFE::gfMFE = NULL;
2529
2530// static data members
2531bool TMFE::gfVerbose = false;
2532
2533/* emacs
2534 * Local Variables:
2535 * tab-width: 8
2536 * c-basic-offset: 3
2537 * indent-tabs-mode: nil
2538 * End:
2539 */
#define FALSE
Definition cfortran.h:309
std::vector< int > fBufRequests
Definition tmfe.h:141
std::string fBufName
Definition tmfe.h:119
TMFeResult CloseBuffer()
Definition tmfe.cxx:235
int fBufHandle
Definition tmfe.h:138
size_t fBufReadCacheSize
Definition tmfe.h:139
TMFeResult SetCacheSize(size_t read_cache_size, size_t write_cache_size)
Definition tmfe.cxx:258
TMFeResult OpenBuffer(const char *bufname, size_t bufsize=0)
Definition tmfe.cxx:183
TMFeResult AddRequest(int event_id, int trigger_mask, const char *sampling_type_string)
Definition tmfe.cxx:272
TMFeResult ReceiveEvent(std::vector< char > *e, int timeout_msec=0)
Definition tmfe.cxx:303
TMFE * fMfe
Definition tmfe.h:118
TMFeResult FlushCache(bool wait=true)
Definition tmfe.cxx:374
size_t fBufWriteCacheSize
Definition tmfe.h:140
TMEventBuffer(TMFE *mfe)
Definition tmfe.cxx:169
size_t fBufSize
Definition tmfe.h:120
TMFeResult SendEvent(const char *e)
Definition tmfe.cxx:326
size_t fBufMaxEventSize
Definition tmfe.h:121
Definition tmfe.h:381
TMFeResult TriggerAlarm(const char *name, const char *message, const char *aclass)
Definition tmfe.cxx:2285
TMFeResult EventBufferCloseAll()
Definition tmfe.cxx:484
bool fRunStopRequested
run stop was requested by equipment
Definition tmfe.h:444
std::thread * fRpcThread
Definition tmfe.h:405
void DeregisterTransitionStartAbort()
Definition tmfe.cxx:1557
std::vector< TMEventBuffer * > fEventBuffers
Definition tmfe.h:436
std::atomic_bool fRpcThreadStarting
Definition tmfe.h:406
static std::string GetThreadId()
return identification of this thread
Definition tmfe.cxx:1140
bool fStateRunning
run state is running or paused
Definition tmfe.h:402
void RegisterRPCs()
Definition tmfe.cxx:1567
void DeregisterTransitionResume()
Definition tmfe.cxx:1552
std::mutex fEventBuffersMutex
Definition tmfe.h:435
TMFE()
default constructor is private for singleton classes
Definition tmfe.cxx:44
static double GetTime()
return current time in seconds, with micro-second precision
Definition tmfe.cxx:1011
void MidasPeriodicTasks()
Definition tmfe.cxx:852
void AddRpcHandler(TMFeRpcHandlerInterface *)
Definition tmfe.cxx:1583
int fDB
ODB database handle.
Definition tmfe.h:394
virtual ~TMFE()
destructor is private for singleton classes
Definition tmfe.cxx:50
void DeregisterTransitionPause()
Definition tmfe.cxx:1547
void SetTransitionSequenceResume(int seqno)
Definition tmfe.cxx:1518
void RpcThread()
Definition tmfe.cxx:857
double fRunStartTime
start a new run at this time
Definition tmfe.h:445
static bool gfVerbose
Definition tmfe.h:424
std::string fMserverHostname
hostname where the mserver is running, blank if using shared memory
Definition tmfe.h:385
void SetTransitionSequenceStop(int seqno)
Definition tmfe.cxx:1508
void Yield(double sleep_sec)
Definition tmfe.cxx:821
void Msg(int message_type, const char *filename, int line, const char *routine, const char *format,...) MATTRPRINTF(6
Definition tmfe.cxx:991
void StartRpcThread()
Definition tmfe.cxx:903
void DeregisterTransitionStop()
Definition tmfe.cxx:1542
TMFeResult Connect(const char *progname=NULL, const char *hostname=NULL, const char *exptname=NULL)
Definition tmfe.cxx:65
std::atomic_bool fRpcThreadShutdownRequested
Definition tmfe.h:408
void StopRun()
Definition tmfe.cxx:687
void SetTransitionSequenceStartAbort(int seqno)
Definition tmfe.cxx:1523
static TMFE * Instance()
Definition tmfe.cxx:57
static void Sleep(double sleep_time_sec)
sleep, with micro-second precision
Definition tmfe.cxx:1019
static TMFE * gfMFE
Definition tmfe.h:413
void SetTransitionSequencePause(int seqno)
Definition tmfe.cxx:1513
void StartRun()
Definition tmfe.cxx:714
std::atomic_bool fShutdownRequested
shutdown was requested by Ctrl-C or by RPC command
Definition tmfe.h:398
std::atomic_bool fRpcThreadRunning
Definition tmfe.h:407
TMFeResult SetWatchdogSec(int sec)
Definition tmfe.cxx:144
void StopRpcThread()
Definition tmfe.cxx:935
void RegisterTransitionStartAbort()
Definition tmfe.cxx:1562
TMFeResult ResetAlarm(const char *name)
Definition tmfe.cxx:2296
TMFeResult EventBufferFlushCacheAll(bool wait=true)
Definition tmfe.cxx:448
std::string fHostname
hostname we are running on
Definition tmfe.h:388
MVOdb * fOdbRoot
ODB root.
Definition tmfe.h:395
TMFeResult Disconnect()
Definition tmfe.cxx:154
std::vector< TMFeRpcHandlerInterface * > fRpcHandlers
Definition tmfe.h:464
std::string fProgramName
frontend program name
Definition tmfe.h:387
std::mutex fMutex
Definition tmfe.h:391
void DeregisterTransitionStart()
Definition tmfe.cxx:1537
void RemoveRpcHandler(TMFeRpcHandlerInterface *)
Definition tmfe.cxx:1588
void DeregisterTransitions()
Definition tmfe.cxx:1528
int fRunNumber
current run number
Definition tmfe.h:401
void SetTransitionSequenceStart(int seqno)
Definition tmfe.cxx:1503
TMFeResult EventBufferOpen(TMEventBuffer **pbuf, const char *bufname, size_t bufsize=0)
Definition tmfe.cxx:413
std::string fExptname
experiment name, blank if only one experiment defined in exptab
Definition tmfe.h:384
bool fEqConfHidden
Definition tmfe.h:182
int fEqConfSource
Definition tmfe.h:175
std::mutex fEqMutex
Definition tmfe.h:204
double fEqStatKBpS
Definition tmfe.h:225
void * BkOpen(char *pevent, const char *bank_name, int bank_type) const
Definition tmfe.cxx:2258
TMFeResult EqWriteEventToOdb_locked(const char *pevent)
Definition tmfe.cxx:2226
TMFeResult BkInit(char *pevent, size_t size) const
Definition tmfe.cxx:2252
uint16_t fEqConfTriggerMask
Definition tmfe.h:172
virtual void HandlePollRead()
Definition tmfe.h:274
MVOdb * fOdbEqSettings
ODB Equipment/EQNAME/Settings.
Definition tmfe.h:213
std::atomic_bool fEqPollThreadShutdownRequested
Definition tmfe.h:243
bool fEqConfEnabled
Definition tmfe.h:170
TMFE * fMfe
Definition tmfe.h:207
int fEqConfReadOn
Definition tmfe.h:177
int fEqConfWriteCacheSize
Definition tmfe.h:186
virtual ~TMFeEquipment()
Definition tmfe.cxx:1724
TMFeResult EqSetStatus(const char *status, const char *color)
Definition tmfe.cxx:2272
double fEqStatLastTime
Definition tmfe.h:228
void EqPollThread()
Definition tmfe.cxx:621
TMFrontend * fFe
Definition tmfe.h:208
bool fEqConfWriteEventsToOdb
Definition tmfe.h:197
virtual bool HandlePoll()
Definition tmfe.h:273
TMFeResult EqWriteCommon(bool create=false)
Write TMFeEqInfo to ODB /Equipment/NAME/Common.
Definition tmfe.cxx:1819
uint16_t fEqConfEventID
Definition tmfe.h:171
TMFeResult EqReadCommon()
Read TMFeEqInfo from ODB /Equipment/NAME/Common.
Definition tmfe.cxx:1776
double fEqConfPollSleepSec
Definition tmfe.h:199
TMFeResult EqWriteStatistics()
Definition tmfe.cxx:2007
MVOdb * fOdbEqVariables
ODB Equipment/EQNAME/Variables.
Definition tmfe.h:214
int fEqConfPeriodMilliSec
Definition tmfe.h:178
double fEqStatLastWrite
Definition tmfe.h:233
TMFeResult EqInit(const std::vector< std::string > &args)
Initialize equipment.
Definition tmfe.cxx:1757
double fEqConfPeriodStatisticsSec
Definition tmfe.h:198
double fEqStatEpS
Definition tmfe.h:224
TMEventBuffer * fEqEventBuffer
Definition tmfe.h:218
void EqStartPollThread()
Definition tmfe.cxx:648
virtual TMFeResult HandleInit(const std::vector< std::string > &args)
Definition tmfe.h:258
bool fEqConfEnableRpc
Definition tmfe.h:164
TMFeResult EqZeroStatistics()
Definition tmfe.cxx:1982
double fEqStatEvents
Definition tmfe.h:222
std::thread * fEqPollThread
Definition tmfe.h:277
void EqStopPollThread()
Definition tmfe.cxx:665
double fEqStatBytes
Definition tmfe.h:223
std::atomic_bool fEqPollThreadRunning
Definition tmfe.h:242
TMFeResult EqPreInit()
Initialize equipment, before EquipmentBase::Init()
Definition tmfe.cxx:1860
uint32_t fEqConfNumSubEvents
Definition tmfe.h:180
TMFeResult BkClose(char *pevent, void *ptr) const
Definition tmfe.cxx:2265
TMFeEquipment()
Definition tmfe.h:255
std::string fEqConfFormat
Definition tmfe.h:176
int fEqConfLogHistory
Definition tmfe.h:181
double fEqStatNextWrite
Definition tmfe.h:234
double fEqConfEventLimit
Definition tmfe.h:179
TMFeResult ComposeEvent(char *pevent, size_t size) const
Definition tmfe.cxx:2052
std::string fEqName
Definition tmfe.h:159
size_t fEqConfBufferSize
Definition tmfe.h:201
int fEqSerial
Definition tmfe.h:219
double fEqStatLastEvents
Definition tmfe.h:229
int fEqConfType
Definition tmfe.h:174
MVOdb * fOdbEqCommon
ODB Equipment/EQNAME/Common.
Definition tmfe.h:212
MVOdb * fOdbEq
ODB Equipment/EQNAME.
Definition tmfe.h:211
std::atomic_bool fEqPollThreadStarting
Definition tmfe.h:241
TMFeResult EqSendEvent(const char *pevent, bool write_to_odb=true)
Definition tmfe.cxx:2063
std::string fEqFilename
Definition tmfe.h:160
bool fEqConfReadConfigFromOdb
Definition tmfe.h:168
TMFeResult EqWriteEventToOdb(const char *pevent)
Definition tmfe.cxx:2220
int BkSize(const char *pevent) const
Definition tmfe.cxx:2247
std::string fEqConfBuffer
Definition tmfe.h:173
double fEqStatLastBytes
Definition tmfe.h:230
size_t fEqConfMaxEventSize
Definition tmfe.h:200
MVOdb * fOdbEqStatistics
ODB Equipment/EQNAME/Statistics.
Definition tmfe.h:215
TMFeResult EqPostInit()
Initialize equipment, after EquipmentBase::Init()
Definition tmfe.cxx:1910
bool fEqConfReadOnlyWhenRunning
Definition tmfe.h:196
bool error_flag
Definition tmfe.h:89
std::string error_message
Definition tmfe.h:91
virtual TMFeResult HandleEndRun(int run_number)
Definition tmfe.h:148
virtual TMFeResult HandleResumeRun(int run_number)
Definition tmfe.h:150
virtual TMFeResult HandlePauseRun(int run_number)
Definition tmfe.h:149
virtual TMFeResult HandleRpc(const char *cmd, const char *args, std::string &result)
Definition tmfe.h:152
virtual TMFeResult HandleBeginRun(int run_number)
Definition tmfe.h:147
virtual TMFeResult HandleStartAbortRun(int run_number)
Definition tmfe.h:151
virtual TMFeResult HandleBinaryRpc(const char *cmd, const char *args, std::vector< char > &result)
Definition tmfe.h:153
void FeShutdown()
Definition tmfe.cxx:2501
double fFeFlushWriteCacheNextCallTime
Definition tmfe.h:376
bool fFeIfRunningCallBeginRun
Definition tmfe.h:316
std::thread * fFePeriodicThread
Definition tmfe.h:369
std::atomic_bool fFePeriodicThreadRunning
Definition tmfe.h:371
double fFeFlushWriteCachePeriodSec
Definition tmfe.h:375
void FeDeleteEquipments()
Definition tmfe.cxx:1638
void FePollMidas(double sleep_sec)
Definition tmfe.cxx:745
double FePeriodicTasks()
Definition tmfe.cxx:507
TMFE * fMfe
Definition tmfe.h:309
void FePeriodicThread()
Definition tmfe.cxx:883
virtual ~TMFrontend()
Definition tmfe.cxx:1602
int fFeIndex
Definition tmfe.h:313
TMFeResult FeInitEquipments(const std::vector< std::string > &args)
Definition tmfe.cxx:1613
std::mutex fFeMutex
Definition tmfe.h:319
void FeMainLoop()
Definition tmfe.cxx:2494
virtual void HandleFrontendExit()
Definition tmfe.h:335
void FeSetName(const char *program_name)
Definition tmfe.cxx:1704
void FeStopEquipmentPollThreads()
Definition tmfe.cxx:1628
std::vector< TMFeEquipment * > fFeEquipments
Definition tmfe.h:343
void FeUsage(const char *argv0)
Definition tmfe.cxx:2307
TMFeResult FeInit(const std::vector< std::string > &args)
Definition tmfe.cxx:2346
TMFeResult FeAddEquipment(TMFeEquipment *eq)
Definition tmfe.cxx:1657
virtual TMFeResult HandleArguments(const std::vector< std::string > &args)
Definition tmfe.h:331
std::atomic_bool fFePeriodicThreadShutdownRequested
Definition tmfe.h:372
void FeStartPeriodicThread()
Definition tmfe.cxx:919
virtual TMFeResult HandleFrontendInit(const std::vector< std::string > &args)
Definition tmfe.h:333
bool fFeIfRunningCallExit
Definition tmfe.h:315
virtual TMFeResult HandleFrontendReady(const std::vector< std::string > &args)
Definition tmfe.h:334
double FePollTasks(double next_periodic_time)
Definition tmfe.cxx:583
TMFeResult FeRemoveEquipment(TMFeEquipment *eq)
Definition tmfe.cxx:1687
void FeStopPeriodicThread()
Definition tmfe.cxx:963
int FeMain(int argc, char *argv[])
Definition tmfe.cxx:2336
TMFrontendRpcHelper * fFeRpcHelper
Definition tmfe.h:310
std::atomic_bool fFePeriodicThreadStarting
Definition tmfe.h:370
virtual void HandleUsage()
Definition tmfe.h:332
TMFeResult HandleBeginRun(int run_number)
Definition tmfe.cxx:1290
TMFrontendRpcHelper(TMFrontend *fe)
Definition tmfe.cxx:1273
TMFrontend * fFe
Definition tmfe.cxx:1270
virtual ~TMFrontendRpcHelper()
Definition tmfe.cxx:1281
TMFeResult HandleEndRun(int run_number)
Definition tmfe.cxx:1307
INT al_reset_alarm(const char *alarm_name)
Definition alarm.cxx:525
INT al_trigger_alarm(const char *alarm_name, const char *alarm_message, const char *default_class, const char *cond_str, INT type)
Definition alarm.cxx:283
void bk_init32a(void *event)
Definition midas.cxx:17886
INT bk_close(void *event, void *pdata)
Definition midas.cxx:18184
void bk_create(void *event, const char *name, WORD type, void **pdata)
Definition midas.cxx:17965
INT bk_size(const void *event)
Definition midas.cxx:17899
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition midas.cxx:6728
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition midas.cxx:10948
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition midas.cxx:8476
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
Definition midas.cxx:8151
INT bm_close_buffer(INT buffer_handle)
Definition midas.cxx:7107
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition midas.cxx:9789
INT bm_flush_cache(int buffer_handle, int timeout_msec)
Definition midas.cxx:10233
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
Definition midas.cxx:3617
INT cm_yield(INT millisec)
Definition midas.cxx:5660
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition midas.cxx:3027
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
Definition midas.cxx:5304
INT cm_register_function(INT id, INT(*func)(INT, void **))
Definition midas.cxx:5808
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
Definition midas.cxx:2313
INT cm_periodic_tasks()
Definition midas.cxx:5597
INT cm_disconnect_experiment(void)
Definition midas.cxx:2862
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
Definition midas.cxx:2150
INT cm_deregister_transition(INT transition)
Definition midas.cxx:3693
INT cm_set_transition_sequence(INT transition, INT sequence_number)
Definition midas.cxx:3747
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
Definition midas.cxx:3299
#define CM_SUCCESS
Definition midas.h:582
#define CM_UNDEF_EXP
Definition midas.h:586
#define BM_ASYNC_RETURN
Definition midas.h:613
#define BM_SUCCESS
Definition midas.h:605
#define BM_CORRUPTED
Definition midas.h:623
#define BM_CREATED
Definition midas.h:606
#define DB_SUCCESS
Definition midas.h:632
#define SS_ABORT
Definition midas.h:678
#define RPC_SHUTDOWN
Definition midas.h:708
#define RPC_SUCCESS
Definition midas.h:699
#define FE_ERR_DRIVER
Definition midas.h:722
#define SUCCESS
Definition mcstd.h:54
#define RO_STOPPED
Definition midas.h:427
#define TR_RESUME
Definition midas.h:408
#define GET_NONBLOCKING
Definition midas.h:322
#define TR_PAUSE
Definition midas.h:407
#define GET_ALL
Definition midas.h:321
#define TR_START
Definition midas.h:405
#define RO_ODB
Definition midas.h:438
#define TR_SYNC
Definition midas.h:358
#define GET_RECENT
Definition midas.h:323
#define BM_NO_WAIT
Definition midas.h:366
#define TR_STARTABORT
Definition midas.h:409
#define STATE_STOPPED
Definition midas.h:305
#define MINFO
Definition midas.h:560
#define RO_PAUSED
Definition midas.h:428
#define STATE_PAUSED
Definition midas.h:306
#define MERROR
Definition midas.h:559
#define STATE_RUNNING
Definition midas.h:307
#define FORMAT_MIDAS
Definition midas.h:311
#define TR_STOP
Definition midas.h:406
#define BM_WAIT
Definition midas.h:365
#define RO_RUNNING
Definition midas.h:426
std::string ss_gethostname()
Definition system.cxx:5784
INT ss_suspend_set_rpc_thread(midas_thread_t thread_id)
Definition system.cxx:4074
std::string ss_tid_to_string(midas_thread_t thread_id)
Definition system.cxx:1643
INT ss_daemon_init(BOOL keep_stdout)
Definition system.cxx:2073
INT ss_suspend_exit()
Definition system.cxx:4298
midas_thread_t ss_gettid(void)
Definition system.cxx:1591
INT cm_msg_flush_buffer()
Definition midas.cxx:881
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition midas.cxx:931
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition odb.cxx:4256
#define RPC_BRPC
Definition mrpc.h:135
#define RPC_JRPC_CXX
Definition mrpc.h:136
#define RPC_JRPC
Definition mrpc.h:134
bool rpc_is_remote(void)
Definition midas.cxx:12892
#define RPC_BRPC_CXX
Definition mrpc.h:137
INT run_number[2]
Definition mana.cxx:246
DWORD n[4]
Definition mana.cxx:247
INT index
Definition mana.cxx:271
INT tr_stop(INT rn, char *error)
Definition mana.cxx:2036
INT tr_start(INT rn, char *error)
Definition mana.cxx:2014
INT tr_pause(INT rn, char *error)
Definition mana.cxx:2065
BOOL debug
debug printouts
Definition mana.cxx:254
INT tr_resume(INT rn, char *error)
Definition mana.cxx:2078
BOOL create
Definition mchart.cxx:39
INT i
Definition mdump.cxx:32
std::vector< FMT_ID > eq
Definition mdump.cxx:55
INT run_state
Definition mfe.cxx:35
INT max_event_size
Definition mfed.cxx:30
void help()
Definition mh2sql.cxx:244
std::string msprintf(const char *format,...)
Definition midas.cxx:419
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
Definition midas.cxx:19127
#define AT_INTERNAL
Definition midas.h:1441
INT HNDLE
Definition midas.h:132
#define CINT(_i)
Definition midas.h:1622
#define DEFAULT_WATCHDOG_TIMEOUT
Definition midas.h:290
int INT
Definition midas.h:129
#define CSTRING(_i)
Definition midas.h:1646
#define CPSTDSTRING(_i)
Definition midas.h:1685
#define DEFAULT_MAX_EVENT_SIZE
Definition midas.h:254
#define DEFAULT_BUFFER_SIZE
Definition midas.h:255
#define CPSTDVECTOR(_i)
Definition midas.h:1686
#define TRUE
Definition midas.h:182
#define DEFAULT_ODB_SIZE
Definition midas.h:270
#define TRANSITION_ERROR_STRING_LENGTH
Definition midas.h:280
#define NAME_LENGTH
Definition midas.h:272
#define trigger_mask
#define message(type, str)
#define sleep(ms)
#define event_id
#define name(x)
Definition midas_macro.h:24
int gettimeofday(struct timeval *tp, void *tzp)
timeval tv
Definition msysmon.cxx:1095
int event_size
Definition msysmon.cxx:527
INT j
Definition odbhist.cxx:40
char str[256]
Definition odbhist.cxx:33
DWORD status
Definition odbhist.cxx:39
short int event_id
Definition midas.h:853
DWORD data_size
Definition midas.h:857
DWORD serial_number
Definition midas.h:855
DWORD time_stamp
Definition midas.h:856
short int trigger_mask
Definition midas.h:854
static double e(void)
Definition tinyexpr.c:136
static INT rpc_cxx_callback(INT index, void *prpc_param[])
Definition tmfe.cxx:1175
static INT tr_resume(INT run_number, char *errstr)
Definition tmfe.cxx:1438
static INT tr_startabort(INT run_number, char *errstr)
Definition tmfe.cxx:1471
static INT binary_rpc_callback(INT index, void *prpc_param[])
Definition tmfe.cxx:1203
TMFeResult TMFeErrorMessage(const std::string &message)
Definition tmfe.cxx:29
static INT tr_start(INT run_number, char *errstr)
Definition tmfe.cxx:1330
static INT rpc_callback(INT index, void *prpc_param[])
Definition tmfe.cxx:1145
static INT binary_rpc_cxx_callback(INT index, void *prpc_param[])
Definition tmfe.cxx:1240
static INT tr_stop(INT run_number, char *errstr)
Definition tmfe.cxx:1369
static INT tr_pause(INT run_number, char *errstr)
Definition tmfe.cxx:1408
TMFeResult TMFeMidasError(const std::string &message, const char *midas_function_name, int midas_status)
Definition tmfe.cxx:34
TMFeResult TMFeErrorMessage(const std::string &message)
Definition tmfe.cxx:29
TMFeResult TMFeOk()
Definition tmfe.h:106
#define MERROR
Definition tmfe.h:72
TMFeResult TMFeMidasError(const std::string &message, const char *midas_function_name, int midas_status)
Definition tmfe.cxx:34
#define MINFO
Definition tmfe_rev0.h:71
#define MERROR
Definition tmfe_rev0.h:70
#define MTALK
Definition tmfe_rev0.h:75