54 assert(!
"TMFE::~TMFE(): destruction of the TMFE singleton is not permitted!");
88 if (hostname && hostname[0]) {
263 return TMFeMidasError(
msprintf(
"Cannot set event buffer \"%s\" cache sizes: read %d, write %d",
fBufName.c_str(), (
int)read_cache_size, (
int)write_cache_size),
"bm_set_cache_size",
status);
278 int sampling_type = 0;
363 fMfe->
Msg(
MERROR,
"TMEventBuffer::SendEvent",
"Cannot send event to buffer \"%s\": bm_send_event() returned %d, event buffer is corrupted, shutting down the frontend",
fBufName.c_str(),
status);
365 return TMFeMidasError(
"Cannot send event, event buffer is corrupted, shutting down the frontend",
"bm_send_event",
status);
367 fMfe->
Msg(
MERROR,
"TMEventBuffer::SendEvent",
"Cannot send event to buffer \"%s\": bm_send_event() returned %d",
fBufName.c_str(),
status);
427 Msg(
MERROR,
"TMFE::EventBufferOpen",
"Event buffer \"%s\" size %d is smaller than requested size %d", b->fBufName.c_str(), (
int)b->fBufSize, (
int)
bufsize);
514 for (
int i=0;
i<
n;
i++) {
518 if (!
eq->fEqConfEnabled)
520 if (!
eq->fEqConfEnablePeriodic)
525 if (
eq->fEqPeriodicNextCallTime == 0)
526 eq->fEqPeriodicNextCallTime =
now + 0.5;
528 if (
now >=
eq->fEqPeriodicNextCallTime) {
529 eq->fEqPeriodicNextCallTime += period;
531 if (
eq->fEqPeriodicNextCallTime <
now) {
533 printf(
"TMFE::EquipmentPeriodicTasks: periodic equipment \"%s\" skipped some beats!\n",
eq->fEqName.c_str());
534 fMfe->
Msg(
MERROR,
"TMFE::EquipmentPeriodicTasks",
"Equipment \"%s\" skipped some beats!",
eq->fEqName.c_str());
535 while (
eq->fEqPeriodicNextCallTime <
now) {
536 eq->fEqPeriodicNextCallTime += period;
541 eq->fEqPeriodicLastCallTime =
now;
543 eq->HandlePeriodic();
559 if (!
eq->fEqConfEnabled)
561 double next =
eq->fEqStatNextWrite;
563 eq->EqWriteStatistics();
564 next =
eq->fEqStatNextWrite;
594 if (!
eq->fEqConfEnabled)
596 if (
eq->fEqConfEnablePoll && !
eq->fEqPollThreadRunning && !
eq->fEqPollThreadStarting) {
600 bool poll =
eq->HandlePoll();
603 eq->HandlePollRead();
613 double now = TMFE::TMFE::GetTime();
624 printf(
"TMFeEquipment::EqPollThread: equipment \"%s\" poll thread started\n",
fEqName.c_str());
643 printf(
"TMFeEquipment::EqPollThread: equipment \"%s\" poll thread stopped\n",
fEqName.c_str());
655 fMfe->
Msg(
MERROR,
"TMFeEquipment::EqStartPollThread",
"Equipment \"%s\": poll thread is already running",
fEqName.c_str());
670 for (
int i=0;
i<100;
i++) {
683 fMfe->
Msg(
MERROR,
"TMFeEquipment::EqStopPollThread",
"Equipment \"%s\": timeout waiting for shutdown of poll thread",
fEqName.c_str());
693 Msg(
MERROR,
"TMFE::StopRun",
"Cannot stop run, error: %s",
str);
723 Msg(
MERROR,
"TMFE::StartRun",
"Run start requested, but run is already in progress");
731 Msg(
MERROR,
"TMFE::StartRun",
"Run start requested, but logger/auto restart is off");
735 Msg(
MTALK,
"TMFE::StartRun",
"Starting new run");
741 Msg(
MERROR,
"TMFE::StartRun",
"Cannot restart run, error: %s",
str);
792 printf(
"now %.6f, sleep_end %.6f, next_periodic %.6f, sleep_time %.6f, cm_yield(%d), poll period %.6f\n",
now,
sleep_end,
next_periodic_time,
sleep_time, s,
poll_sleep);
800 fprintf(
stderr,
"TMFE::PollMidas: cm_yield(%d) status %d, shutdown requested...\n", s,
status);
841 fprintf(
stderr,
"TMFE::Yield: cm_yield(%d) status %d, shutdown requested...\n", s,
status);
860 printf(
"TMFE::RpcThread: RPC thread started\n");
874 printf(
"TMFE::RpcThread: cm_yield(%d) status %d, shutdown requested...\n",
msec,
status);
879 printf(
"TMFE::RpcThread: RPC thread stopped\n");
886 printf(
"TMFE::PeriodicThread: periodic thread started\n");
899 printf(
"TMFE::PeriodicThread: periodic thread stopped\n");
911 printf(
"TMFE::StartRpcThread: RPC thread already running\n");
927 printf(
"TMFE::StartPeriodicThread: periodic thread already running\n");
942 for (
int i=0;
i<60;
i++) {
950 printf(
"TMFE::StopRpcThread: RPC thread stopped\n");
955 fprintf(
stderr,
"TMFE::StopRpcThread: waiting for RPC thread to stop\n");
960 fprintf(
stderr,
"TMFE::StopRpcThread: timeout waiting for RPC thread to stop\n");
970 for (
int i=0;
i<60;
i++) {
978 printf(
"TMFE::StopPeriodicThread: periodic thread stopped\n");
983 fprintf(
stderr,
"TMFE::StopPeriodicThread: waiting for periodic thread to stop\n");
988 fprintf(
stderr,
"TMFE::StopPeriodicThread: timeout waiting for periodic thread to stop\n");
991void TMFE::Msg(
int message_type,
const char *filename,
int line,
const char *routine,
const char *format, ...)
996 va_start(
ap, format);
1000 cm_msg(message_type, filename, line, routine,
"%s",
message);
1004void TMFE::Msg(
int message_type,
const char *filename,
int line,
const char *routine,
const std::string&
message)
1007 cm_msg(message_type, filename, line, routine,
"%s",
message.c_str());
1015 return tv.tv_sec*1.0 +
tv.tv_usec/1000000.0;
1061 timeout.tv_usec = (
time_sec-timeout.tv_sec)*1000000.0;
1068 TMFE::Instance()->
Msg(
MERROR,
"TMFE::Sleep",
"TMFE::Sleep() called with invalid sleep time: %f, tv_sec: %lld, tv_usec: %lld",
time_sec, (
long long int)timeout.tv_sec, (
long long int)timeout.tv_usec);
1100 rqtp.tv_nsec = (time-
rqtp.tv_sec)*1000000000.0;
1123 rqtp.tv_nsec = (time-
rqtp.tv_sec)*1000000000.0;
1158 for (
unsigned i=0;
i<
mfe->fRpcHandlers.size();
i++) {
1162 std::string result =
"";
1164 if (result.length() > 0) {
1188 for (
unsigned i=0;
i<
mfe->fRpcHandlers.size();
i++) {
1192 std::vector<char> result;
1194 if (result.size() > 0) {
1202 CINT(3) = result.size();
1221 printf(
"TMFrontendRpcHelper::ctor!\n");
1229 printf(
"TMFrontendRpcHelper::dtor!\n");
1238 printf(
"TMFrontendRpcHelper::HandleBeginRun!\n");
1244 if (!
eq->fEqConfEnabled)
1246 eq->EqZeroStatistics();
1247 eq->EqWriteStatistics();
1255 printf(
"TMFrontendRpcHelper::HandleEndRun!\n");
1261 if (!
eq->fEqConfEnabled)
1263 eq->EqWriteStatistics();
1278 printf(
"TMFE::tr_start!\n");
1283 mfe->fStateRunning =
true;
1288 for (
unsigned i=0;
i<
mfe->fRpcHandlers.size();
i++) {
1317 printf(
"TMFE::tr_stop!\n");
1329 for (
int i = (
int)
mfe->fRpcHandlers.size() - 1;
i >= 0;
i--) {
1343 mfe->fStateRunning =
false;
1363 for (
int i = (
int)
mfe->fRpcHandlers.size() - 1;
i >= 0;
i--) {
1386 printf(
"TMFE::tr_resume!\n");
1393 mfe->fStateRunning =
true;
1396 for (
unsigned i=0;
i<
mfe->fRpcHandlers.size();
i++) {
1419 printf(
"TMFE::tr_startabort!\n");
1426 for (
unsigned i=0;
i<
mfe->fRpcHandlers.size();
i++) {
1438 mfe->fStateRunning =
false;
1515 printf(
"TMFE::RegisterRPCs!\n");
1609 fprintf(
stderr,
"TMFE::AddEquipment: Fatal error: Equipment \"%s\" is already registered, bye...\n",
fFeEquipments[
i]->fEqName.c_str());
1615 fprintf(
stderr,
"TMFE::AddEquipment: Fatal error: Duplicate equipment name \"%s\", bye...\n",
eq->fEqName.c_str());
1670 printf(
"TMFeEquipment::dtor: equipment name [%s]\n",
fEqName.c_str());
1722 printf(
"TMFeEquipment::EqReadCommon: for [%s]\n",
fEqName.c_str());
1765 printf(
"TMFeEquipment::EqWriteCommon: for [%s]\n",
fEqName.c_str());
1806 printf(
"TMFeEquipment::PreInit: for [%s]\n",
fEqName.c_str());
1812 if (
fEqName.find(
"%") != std::string::npos) {
1828 printf(
"TMFeEquipment::PreInit: creating ODB common\n");
1856 printf(
"TMFeEquipment::EqPostInit: for [%s]\n",
fEqName.c_str());
1930 printf(
"TMFeEquipment::EqZeroStatistics: zero statistics for [%s]\n",
fEqName.c_str());
1955 printf(
"TMFeEquipment::EqWriteStatistics: write statistics for [%s]\n",
fEqName.c_str());
2099 for (
auto v: event) {
2171 std::string path =
"";
2172 path +=
"/Equipment/";
2174 path +=
"/Variables";
2259 fprintf(
stderr,
" -h hostname[:tcpport] -- connect to MIDAS mserver on given host and tcp port number\n");
2260 fprintf(
stderr,
" -e exptname -- connect to given MIDAS experiment\n");
2263 fprintf(
stderr,
" -O -- Become a daemon but keep stdout for saving in a log file: frontend -O >file.log 2>&1\n");
2281 std::vector<std::string>
args;
2296 std::vector<std::string>
eq_args;
2300 std::string hostname;
2304 for (
unsigned int i=1;
i<
args.size();
i++) {
2306 if (
args[
i] ==
"--") {
2308 for (
unsigned j=
i+1;
j<
args.size();
j++)
2311 }
else if (
args[
i] ==
"-v") {
2313 }
else if (
args[
i] ==
"-D") {
2315 }
else if (
args[
i] ==
"-O") {
2317 }
else if (
args[
i] ==
"-h") {
2319 if (
i >=
args.size()) {
help =
true;
break; }
2321 }
else if (
args[
i] ==
"-e") {
2323 if (
i >=
args.size()) {
help =
true;
break; }
2325 }
else if (
args[
i] ==
"-i") {
2327 if (
i >=
args.size()) {
help =
true;
break; }
2329 }
else if (
args[
i] ==
"--help") {
2332 }
else if (
args[
i][0] ==
'-') {
2346 printf(
"Becoming a daemon...\n");
2349 printf(
"Becoming a daemon...\n");
2425 fprintf(
stderr,
"Fatal error: Cannot start frontend, run is in progress!\n");
std::vector< int > fBufRequests
TMFeResult SetCacheSize(size_t read_cache_size, size_t write_cache_size)
TMFeResult OpenBuffer(const char *bufname, size_t bufsize=0)
TMFeResult AddRequest(int event_id, int trigger_mask, const char *sampling_type_string)
TMFeResult ReceiveEvent(std::vector< char > *e, int timeout_msec=0)
TMFeResult FlushCache(bool wait=true)
size_t fBufWriteCacheSize
TMFeResult SendEvent(const char *e)
TMFeResult TriggerAlarm(const char *name, const char *message, const char *aclass)
TMFeResult EventBufferCloseAll()
bool fRunStopRequested
run stop was requested by equipment
void DeregisterTransitionStartAbort()
std::vector< TMEventBuffer * > fEventBuffers
std::atomic_bool fRpcThreadStarting
static std::string GetThreadId()
return identification of this thread
bool fStateRunning
run state is running or paused
void DeregisterTransitionResume()
std::mutex fEventBuffersMutex
TMFE()
default constructor is private for singleton classes
static double GetTime()
return current time in seconds, with micro-second precision
void MidasPeriodicTasks()
void AddRpcHandler(TMFeRpcHandlerInterface *)
int fDB
ODB database handle.
virtual ~TMFE()
destructor is private for singleton classes
void DeregisterTransitionPause()
void SetTransitionSequenceResume(int seqno)
double fRunStartTime
start a new run at this time
std::string fMserverHostname
hostname where the mserver is running, blank if using shared memory
void SetTransitionSequenceStop(int seqno)
void Yield(double sleep_sec)
void Msg(int message_type, const char *filename, int line, const char *routine, const char *format,...) MATTRPRINTF(6
void DeregisterTransitionStop()
TMFeResult Connect(const char *progname=NULL, const char *hostname=NULL, const char *exptname=NULL)
std::atomic_bool fRpcThreadShutdownRequested
void SetTransitionSequenceStartAbort(int seqno)
static void Sleep(double sleep_time_sec)
sleep, with micro-second precision
void SetTransitionSequencePause(int seqno)
std::atomic_bool fShutdownRequested
shutdown was requested by Ctrl-C or by RPC command
std::atomic_bool fRpcThreadRunning
TMFeResult SetWatchdogSec(int sec)
void RegisterTransitionStartAbort()
TMFeResult ResetAlarm(const char *name)
TMFeResult EventBufferFlushCacheAll(bool wait=true)
std::string fHostname
hostname we are running on
MVOdb * fOdbRoot
ODB root.
std::vector< TMFeRpcHandlerInterface * > fRpcHandlers
std::string fProgramName
frontend program name
void DeregisterTransitionStart()
void RemoveRpcHandler(TMFeRpcHandlerInterface *)
void DeregisterTransitions()
int fRunNumber
current run number
void SetTransitionSequenceStart(int seqno)
TMFeResult EventBufferOpen(TMEventBuffer **pbuf, const char *bufname, size_t bufsize=0)
std::string fExptname
experiment name, blank if only one experiment defined in exptab
void * BkOpen(char *pevent, const char *bank_name, int bank_type) const
TMFeResult EqWriteEventToOdb_locked(const char *pevent)
TMFeResult BkInit(char *pevent, size_t size) const
uint16_t fEqConfTriggerMask
virtual void HandlePollRead()
MVOdb * fOdbEqSettings
ODB Equipment/EQNAME/Settings.
std::atomic_bool fEqPollThreadShutdownRequested
int fEqConfWriteCacheSize
TMFeResult EqSetStatus(const char *status, const char *color)
bool fEqConfWriteEventsToOdb
virtual bool HandlePoll()
TMFeResult EqWriteCommon(bool create=false)
Write TMFeEqInfo to ODB /Equipment/NAME/Common.
TMFeResult EqReadCommon()
Read TMFeEqInfo from ODB /Equipment/NAME/Common.
double fEqConfPollSleepSec
TMFeResult EqWriteStatistics()
MVOdb * fOdbEqVariables
ODB Equipment/EQNAME/Variables.
int fEqConfPeriodMilliSec
TMFeResult EqInit(const std::vector< std::string > &args)
Initialize equipment.
double fEqConfPeriodStatisticsSec
TMEventBuffer * fEqEventBuffer
virtual TMFeResult HandleInit(const std::vector< std::string > &args)
TMFeResult EqZeroStatistics()
std::thread * fEqPollThread
std::atomic_bool fEqPollThreadRunning
TMFeResult EqPreInit()
Initialize equipment, before EquipmentBase::Init()
uint32_t fEqConfNumSubEvents
TMFeResult BkClose(char *pevent, void *ptr) const
std::string fEqConfFormat
TMFeResult ComposeEvent(char *pevent, size_t size) const
MVOdb * fOdbEqCommon
ODB Equipment/EQNAME/Common.
MVOdb * fOdbEq
ODB Equipment/EQNAME.
std::atomic_bool fEqPollThreadStarting
TMFeResult EqSendEvent(const char *pevent, bool write_to_odb=true)
bool fEqConfReadConfigFromOdb
TMFeResult EqWriteEventToOdb(const char *pevent)
int BkSize(const char *pevent) const
std::string fEqConfBuffer
size_t fEqConfMaxEventSize
MVOdb * fOdbEqStatistics
ODB Equipment/EQNAME/Statistics.
TMFeResult EqPostInit()
Initialize equipment, after EquipmentBase::Init()
bool fEqConfReadOnlyWhenRunning
std::string error_message
virtual TMFeResult HandleEndRun(int run_number)
virtual TMFeResult HandleResumeRun(int run_number)
virtual TMFeResult HandlePauseRun(int run_number)
virtual TMFeResult HandleRpc(const char *cmd, const char *args, std::string &result)
virtual TMFeResult HandleBeginRun(int run_number)
virtual TMFeResult HandleStartAbortRun(int run_number)
virtual TMFeResult HandleBinaryRpc(const char *cmd, const char *args, std::vector< char > &result)
double fFeFlushWriteCacheNextCallTime
bool fFeIfRunningCallBeginRun
std::thread * fFePeriodicThread
std::atomic_bool fFePeriodicThreadRunning
double fFeFlushWriteCachePeriodSec
void FeDeleteEquipments()
void FePollMidas(double sleep_sec)
TMFeResult FeInitEquipments(const std::vector< std::string > &args)
virtual void HandleFrontendExit()
void FeSetName(const char *program_name)
void FeStopEquipmentPollThreads()
std::vector< TMFeEquipment * > fFeEquipments
void FeUsage(const char *argv0)
TMFeResult FeInit(const std::vector< std::string > &args)
TMFeResult FeAddEquipment(TMFeEquipment *eq)
virtual TMFeResult HandleArguments(const std::vector< std::string > &args)
std::atomic_bool fFePeriodicThreadShutdownRequested
void FeStartPeriodicThread()
virtual TMFeResult HandleFrontendInit(const std::vector< std::string > &args)
bool fFeIfRunningCallExit
virtual TMFeResult HandleFrontendReady(const std::vector< std::string > &args)
double FePollTasks(double next_periodic_time)
TMFeResult FeRemoveEquipment(TMFeEquipment *eq)
void FeStopPeriodicThread()
int FeMain(int argc, char *argv[])
TMFrontendRpcHelper * fFeRpcHelper
std::atomic_bool fFePeriodicThreadStarting
virtual void HandleUsage()
TMFeResult HandleBeginRun(int run_number)
TMFrontendRpcHelper(TMFrontend *fe)
virtual ~TMFrontendRpcHelper()
TMFeResult HandleEndRun(int run_number)
INT al_reset_alarm(const char *alarm_name)
INT al_trigger_alarm(const char *alarm_name, const char *alarm_message, const char *default_class, const char *cond_str, INT type)
void bk_init32a(void *event)
INT bk_close(void *event, void *pdata)
void bk_create(void *event, const char *name, WORD type, void **pdata)
INT bk_size(const void *event)
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
INT bm_set_cache_size(INT buffer_handle, size_t read_size, size_t write_size)
INT bm_close_buffer(INT buffer_handle)
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
INT bm_flush_cache(int buffer_handle, int timeout_msec)
INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number)
INT cm_yield(INT millisec)
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag)
INT cm_register_function(INT id, INT(*func)(INT, void **))
INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void(*func)(char *), INT odb_size, DWORD watchdog_timeout)
INT cm_disconnect_experiment(void)
INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size)
INT cm_deregister_transition(INT transition)
INT cm_set_transition_sequence(INT transition, INT sequence_number)
INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout)
std::string ss_gethostname()
INT ss_suspend_set_rpc_thread(midas_thread_t thread_id)
std::string ss_tid_to_string(midas_thread_t thread_id)
INT ss_daemon_init(BOOL keep_stdout)
midas_thread_t ss_gettid(void)
INT cm_msg_flush_buffer()
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
INT tr_stop(INT rn, char *error)
INT tr_start(INT rn, char *error)
INT tr_pause(INT rn, char *error)
BOOL debug
debug printouts
INT tr_resume(INT rn, char *error)
std::string msprintf(const char *format,...)
int cm_write_event_to_odb(HNDLE hDB, HNDLE hKey, const EVENT_HEADER *pevent, INT format)
#define DEFAULT_WATCHDOG_TIMEOUT
#define DEFAULT_MAX_EVENT_SIZE
#define DEFAULT_BUFFER_SIZE
#define TRANSITION_ERROR_STRING_LENGTH
#define message(type, str)
int gettimeofday(struct timeval *tp, void *tzp)
TH1X EXPRT * h1_book(const char *name, const char *title, int bins, double min, double max)
static INT tr_resume(INT run_number, char *errstr)
static INT tr_startabort(INT run_number, char *errstr)
static INT binary_rpc_callback(INT index, void *prpc_param[])
TMFeResult TMFeErrorMessage(const std::string &message)
static INT tr_start(INT run_number, char *errstr)
static INT rpc_callback(INT index, void *prpc_param[])
static INT tr_stop(INT run_number, char *errstr)
static INT tr_pause(INT run_number, char *errstr)
TMFeResult TMFeMidasError(const std::string &message, const char *midas_function_name, int midas_status)
TMFeResult TMFeErrorMessage(const std::string &message)
TMFeResult TMFeMidasError(const std::string &message, const char *midas_function_name, int midas_status)