ROOTANA
Loading...
Searching...
No Matches
manalyzer.cxx
Go to the documentation of this file.
1//
2// MIDAS analyzer
3//
4// K.Olchanski
5//
6
7#undef NDEBUG // this program requires working assert()
8
9#include <stdio.h>
10#include <unistd.h> // usleep()
11#include <assert.h>
12#include <sys/stat.h> // struct stat_buffer;
13#include <math.h> // pow()
14
15#include <algorithm> // std::stable_sort()
16
17#include "manalyzer.h"
18#include "midasio.h"
19
20#ifdef HAVE_MIDAS
21#include "odbxx.h"
22#endif
23
24//////////////////////////////////////////////////////////
25
26static bool gTrace = false;
27
28//////////////////////////////////////////////////////////
29//
30// Methods of TARunInfo
31//
32//////////////////////////////////////////////////////////
33
34TARunInfo::TARunInfo(int runno, const char* filename, const std::vector<std::string>& args)
35{
36 if (gTrace)
37 printf("TARunInfo::ctor!\n");
38 fRunNo = runno;
39 if (filename)
40 fFileName = filename;
41 fOdb = NULL;
42#ifdef HAVE_ROOT
43 fRoot = new TARootHelper(this);
44#else
45 fRoot = NULL;
46#endif
47 fMtInfo = NULL;
48 fArgs = args;
49}
50
52{
53 if (gTrace)
54 printf("TARunInfo::dtor!\n");
55 fRunNo = 0;
56 fFileName = "(deleted)";
57 if (fOdb) {
58 delete fOdb;
59 fOdb = NULL;
60 }
61#ifdef HAVE_ROOT
62 if (fRoot) {
63 delete fRoot;
64 fRoot = NULL;
65 }
66#endif
67 int count = 0;
68 while (1) {
69 TAFlowEvent* flow = ReadFlowQueue();
70 if (!flow)
71 break;
72 delete flow;
73 count++;
74 }
75 if (gTrace) {
76 printf("TARunInfo::dtor: deleted %d queued flow events!\n", count);
77 }
78
79 if (fMtInfo) {
80 delete fMtInfo;
81 fMtInfo = NULL;
82 }
83}
84
85//////////////////////////////////////////////////////////
86//
87// Methods of TAFlowEvent
88//
89//////////////////////////////////////////////////////////
90
92{
93 if (gTrace)
94 printf("TAFlowEvent::ctor: chain %p\n", flow);
95 fNext = flow;
96}
97
99{
100 if (gTrace)
101 printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext);
102 if (fNext)
103 delete fNext;
104 fNext = NULL;
105}
106
107//////////////////////////////////////////////////////////
108//
109// Methods of TARunObject
110//
111//////////////////////////////////////////////////////////
112
114{
115 if (gTrace)
116 printf("TARunObject::ctor, run %d\n", runinfo->fRunNo);
117}
118
120{
121 if (gTrace)
122 printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo);
123}
124
126{
127 if (gTrace)
128 printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo);
129}
130
132{
133 if (gTrace)
134 printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo);
135}
136
138{
139 if (gTrace)
140 printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo);
141}
142
144{
145 if (gTrace)
146 printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo);
147}
148
150{
151 if (gTrace)
152 printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo);
153}
154
156{
157 if (gTrace)
158 printf("TARunObject::Analyze!\n");
159
160 // This default analyze function does no work, instruct the Profiler to not time / log this
161 *flags|=TAFlag_SKIP_PROFILE;
162
163 return flow;
164}
165
167{
168 if (gTrace)
169 printf("TARunObject::Analyze!\n");
170
171 // This default analyze function does no work, instruct the Profiler to not time / log this
172 *flags |= TAFlag_SKIP_PROFILE;
173
174 return flow;
175}
176
178{
179 if (gTrace)
180 printf("TARunObject::AnalyzeSpecialEvent!\n");
181}
182
183//////////////////////////////////////////////////////////
184//
185// Methods of TAFactory
186//
187//////////////////////////////////////////////////////////
188
190{
191 if (gTrace)
192 printf("TAFactory::Usage!\n");
193}
194
195void TAFactory::Init(const std::vector<std::string> &args)
196{
197 if (gTrace)
198 printf("TAFactory::Init!\n");
199}
200
202{
203 if (gTrace)
204 printf("TAFactory::Finish!\n");
205}
206
207#ifdef HAVE_ROOT
208
209//////////////////////////////////////////////////////////
210//
211// Methods of TARootHelper
212//
213//////////////////////////////////////////////////////////
214
215std::string TARootHelper::fgUserOutputDirectory = "root_output_files";
217TApplication* TARootHelper::fgApp = NULL;
219THttpServer* TARootHelper::fgHttpServer = NULL;
220std::vector<std::string> TARootHelper::fgOutputRootFiles;
221
222
224{
225 if (gTrace)
226 printf("TARootHelper::ctor!\n");
227
228 if (!fgUserOutputFileName.empty()) {
230 } else {
231 char xfilename[256];
232 snprintf(xfilename, sizeof(xfilename), "output%05d.root", runinfo->fRunNo);
233
234 if (fgUserOutputDirectory.empty()) {
235 fOutputFileName = xfilename;
236 } else {
237 fOutputFileName = fgUserOutputDirectory + "/" + xfilename;
238 }
239 }
240}
241
243{
244 assert(fOutputFile == NULL);
245
246 if (!fgUserOutputDirectory.empty()) {
247 struct stat buffer;
248 int status = stat(fgUserOutputDirectory.c_str(), &buffer);
249
250 if (status < 0 && errno == ENOENT) {
251 fprintf(stdout, "TARootHelper::Init: creating ROOT output directory \"%s\"\n", fgUserOutputDirectory.c_str());
252 status = mkdir(fgUserOutputDirectory.c_str(), 0777);
253 if (status == -1) {
254 fprintf(stderr, "TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n", fgUserOutputDirectory.c_str(), errno, strerror(errno));
255 }
256 }
257 }
258
259 if (!fOutputFileName.empty()) {
260 fprintf(stdout, "TARootHelper::Init: creating ROOT output file \"%s\"\n", fOutputFileName.c_str());
261
262 fOutputFile = new TFile(fOutputFileName.c_str(), "RECREATE");
263
264 if (!fOutputFile->IsOpen()) {
265 fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", fOutputFileName.c_str());
266 fOutputFile = new TFile("/dev/null", "UPDATE");
267 assert(fOutputFile);
268 assert(fOutputFile->IsOpen());
269 }
270
271 if (fOutputFile != NULL) {
272 if (gTrace)
273 printf("TARootHelper::ctor: ROOT output file is \"%s\"!\n", fOutputFileName.c_str());
274
276
277 fOutputFile->cd();
278 }
279 }
280}
281
283{
284 if (gTrace)
285 printf("TARootHelper::dtor!\n");
286
287 if (fOutputFile != NULL) {
288 fOutputFile->Write();
289 fOutputFile->Close();
290 fOutputFile = NULL;
291 if (gTrace)
292 printf("TARootHelper::dtor: ROOT output file closed!\n");
293 }
294
295 if (fgDir)
296 fgDir->cd();
297}
298
299//////////////////////////////////////////////////////////
300//
301// jsroot
302//
303//////////////////////////////////////////////////////////
304
305#ifdef HAVE_THTTP_SERVER
306
307#include "TKey.h" // class TKey
308#include "THttpServer.h" // class THttpServer
309#include "TSystem.h" // gSystem
310
311static void jsroot_ReadFile(TDirectoryFile *f)
312{
313 for (TObject* keyobj: *f->GetListOfKeys()) {
314 TKey* key = (TKey*)keyobj;
315 //printf("key %p, name [%s] title [%s]\n", key, key->GetName(), key->GetTitle());
316 TObject* obj = f->Get(key->GetName());
317 TClass* t = obj->IsA();
318 //printf("obj %p, class [%s], key %p, name [%s], title [%s]\n", obj, t->GetName(), key, key->GetName(), key->GetTitle());
319 if (strcmp(t->GetName(), "TDirectoryFile") == 0) {
320 jsroot_ReadFile((TDirectoryFile*)obj);
321 }
322 }
323}
324
325static void jsroot(const std::vector<std::string>& files)
326{
327 THttpServer* httpServer = TARootHelper::fgHttpServer;
328
329 assert(httpServer != NULL); // server should be already running
330
331 std::vector<TFile*> root_files;
332
333 printf("JSROOT server for %zu files:\n", files.size());
334
335 for (size_t i=0; i<files.size(); i++) {
336 printf("file[%zu]: %s\n", i, files[i].c_str());
337
338 TFile* f = new TFile(files[i].c_str(), "READ");
339
340 if (!f->IsOpen()) {
341 fprintf(stderr, "Error: cannot open ROOT file \"%s\"\n", files[i].c_str());
342 continue;
343 }
344
345 //f->ReadAll(); // does not read all histograms into memory
346 //f->ReadKeys(); // reads all the subdirectories, but not histograms, so everything is in the wrong order
347
348 jsroot_ReadFile(f);
349
350 root_files.push_back(f);
351 }
352
353 while (1) {
354 int nreq = httpServer->ProcessRequests();
355 if (nreq > 0) {
356 //printf("ProcessRequests() returned %d\n", nreq);
357 continue;
358 }
359
360 //gSystem->DispatchOneEvent(kTRUE);
361
362 bool intr = gSystem->ProcessEvents();
363 if (intr)
364 break;
365
366 usleep(1000);
367 }
368
369 printf("Interrupted, shutting down!\n");
370
371 // close ROOT files
372
373 for (TFile* f: root_files) {
374 f->Close();
375 }
376 root_files.clear();
377}
378
379#endif
380
381#endif
382
383#include <stdio.h>
384#include <stdlib.h>
385#include <string.h>
386#include <sys/time.h>
387#include <assert.h>
388#include <signal.h>
389
390#include "manalyzer.h"
391#include "midasio.h"
392
393#ifdef HAVE_THTTP_SERVER
394#include "THttpServer.h"
395#endif
396
397#ifdef HAVE_ROOT
398#include <TSystem.h>
399#include <TROOT.h>
400#endif
401
402//////////////////////////////////////////////////////////
403//
404// Methods and Defaults of TAMultithreadHelper
405//
406//////////////////////////////////////////////////////////
407
409static int gDefaultMultithreadWaitEmpty = 10; // microseconds
410static int gDefaultMultithreadWaitFull = 10; // microseconds
411
413{
414 fprintf(stderr, "Waiting for all queues to empty out!\n");
415
416 int count_all_empty = 0;
417
418 for (int t=0; ; ) {
419 int count_not_empty = 0;
420 size_t count_events = 0;
421
422 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
423 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
424 if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown
425 continue;
426 if (!mt->fMtFlowQueue[i].empty()) {
427 count_not_empty++;
428 count_events += mt->fMtFlowQueue[i].size();
429 break;
430 }
431 if (mt->fMtThreadIsBusy[i]) {
432 count_not_empty++;
433 count_events += 1; // module is analyzing 1 event
434 break;
435 }
436 // implicit unlock
437 }
438
439 //fprintf(stderr, "Waiting for all queues to empty out: %d queues still have %zu flow events!\n", count_not_empty, count_events);
440
441 if (count_not_empty == 0) {
442 count_all_empty++;
443 }
444
445 if (count_all_empty > 1) {
446 // must loop over "all empty" at least twice! K.O.
447 break;
448 }
449
450 if (t > 10) {
451 fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %zu flow events!\n", count_not_empty, count_events);
452 //break;
453 }
454
455 ::sleep(1);
456 t++;
457 }
458}
459
461{
462 fprintf(stderr, "Waiting for all threads to shut down!\n");
463
464 mt->fMtShutdownRequested = true;
465
466 for (int t=0; ; ) {
467 int count_running = 0;
468
469 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
470 if (mt->fMtThreadIsRunning[i])
471 count_running++;
472 }
473
474 if (count_running == 0) {
475 break;
476 }
477
478 if (t > 10) {
479 fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running);
480 break;
481 }
482
483 ::sleep(1);
484 t++;
485 }
486
487 fprintf(stderr, "Joining all threads!\n");
488 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
489 if (!mt->fMtThreadIsRunning[i] ) {
490 // pointer is null if thread is already shutdown
491 if (mt->fMtThreads[i]) {
492 mt->fMtThreads[i]->join();
493 delete mt->fMtThreads[i];
494 mt->fMtThreads[i] = NULL;
495 }
496 } else {
497 fprintf(stderr, "Thread %d failed to shut down!\n", i);
498 }
499 }
500}
501
502
504 fMtFlowQueueMutex(nModules),
505 fMtFlowQueue(nModules),
506 fMtFlagQueue(nModules),
507 fMtThreads(nModules,NULL),
508 fMtThreadIsRunning(nModules),
509 fMtThreadIsBusy(nModules),
510 fMtShutdownRequested(false),
511 fMtQuitRequested(false)
512 // ctor
513{
514 for (auto &b: fMtThreadIsRunning) { b = false; }
515 for (auto &b: fMtThreadIsBusy) { b = false; }
516 // default max queue size
518 // queue settings
521}
522
524{
525 size_t nmodules = fMtFlowQueueMutex.size();
526
527 // just for kicks, check that all queues have correct size
528 assert(nmodules == fMtFlowQueue.size());
529 assert(nmodules == fMtFlagQueue.size());
530 assert(nmodules == fMtFlowQueueMutex.size());
531
532 // should not come to the destructor while threads are still running
534
535 int count = 0;
536 for (size_t i=0; i<nmodules; i++) {
537 // check that the thread is stopped
538 //assert(!fMtThread[i].joinable());
539 // empty the thread queue
540 std::lock_guard<std::mutex> lock(fMtFlowQueueMutex[i]);
541 //printf("TAMultithreadInfo::dtor: thread %zu has %zu queued events\n", i, fMtFlowQueue[i].size());
542 while (!fMtFlowQueue[i].empty()) {
543 TAFlowEvent* flow = fMtFlowQueue[i].front();
544 TAFlags* flag = fMtFlagQueue[i].front();
545 fMtFlowQueue[i].pop_front();
546 fMtFlagQueue[i].pop_front();
547 delete flow;
548 delete flag;
549 count++;
550 }
551 // implicit unlock of mutex
552 }
553 if (gTrace) {
554 printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
555 }
556}
557
558std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries)
559
560static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow)
561{
562 assert(mt);
563
564 if (flag == NULL) {
565 flag = new TAFlags;
566 *flag = 0;
567 }
568
569 //PrintQueueLength();
570
571 while (1) {
572 {
573 //Lock and queue events
574 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
575
576 bool full = (mt->fMtFlowQueue[i].size() >= mt->fMtQueueDepth);
577
578 if (full && (i==0)) {
579 //printf("MtQueueFlowEvent: queue %i is full: %zu out of max %zu entries\n", i, mt->fMtFlowQueue[i].size(), mt->fMtQueueDepth);
580
581 // we will deadlock if first thread queue is full, but
582 // it is not reading from the queue because it is waiting
583 // to write to the queue of the next thread, which is not reading
584 // from it's queue because it is stuck here in AddToFlowQueue().
585
586 full = false;
587 }
588
589 if (!full || mt->fMtShutdownRequested) {
590 mt->fMtFlowQueue[i].push_back(flow);
591 mt->fMtFlagQueue[i].push_back(flag);
592 return;
593 }
594 // Unlock when we go out of scope
595 }
596
597 usleep(mt->fMtQueueFullUSleepTime);
598 }
599}
600
602{
603 if (!mt) return false;
604
605 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[0]);
606
607 bool full = (mt->fMtFlowQueue[0].size() >= mt->fMtQueueDepth);
608
609 if (!full) {
610 return false;
611 }
612
613 //printf("MtQueueWait: queue 0 is full: %zu out of max %zu entries\n", mt->fMtFlowQueue[0].size(), mt->fMtQueueDepth);
614
615 return true;
616}
617
618#if 0
619//Function to print the length of the flow queue when in multithread mode
620//Maybe make root update a graphical window?
621static void PrintMtQueueLength(TAMultithreadHelper* mt)
622{
623 printf("Multithread queue lengths:\n");
624 for (unsigned i=0; i<mt->fMtFlowQueue.size(); i++) {
625 printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size());
626 }
627}
628#endif
629
630//////////////////////////////////////////////////////////
631//
632// Methods of TARegister
633//
634//////////////////////////////////////////////////////////
635
636std::vector<TAFactory*> *gModules = NULL;
637
639{
640 if (!gModules)
641 gModules = new std::vector<TAFactory*>;
642 gModules->push_back(m);
643}
644
645#if 0
646static double GetTimeSec()
647{
648 struct timeval tv;
649 gettimeofday(&tv,NULL);
650 return tv.tv_sec + 0.000001*tv.tv_usec;
651}
652#endif
653
654//////////////////////////////////////////////////////////
655//
656// Profiler class
657//
658//////////////////////////////////////////////////////////
659
660TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name)
661{
662 fStart = start;
663 fStop = TAClockNow();
664}
665
669
671{
672 TAClockDuration elapsed_seconds = fStop - fStart;
673 return elapsed_seconds.count();
674}
675
676#ifdef HAVE_ROOT
677#include "TH1D.h"
678#endif
679#include <map>
680
682{
683private:
684 std::string fBinaryName;
685 std::string fBinaryPath;
686 clock_t fStartCPU;
687 std::chrono::system_clock::time_point fStartUser;
690
691 //Track Queue lengths when multithreading
692#ifdef HAVE_ROOT
693 int fNQueues=0;
694 std::vector<TH1D*> fAnalysisQueue;
695 std::atomic<int> fQueueIntervalCounter;
696#endif
697
698 // Track Analyse TMEvent time per module (main thread)
699#ifdef HAVE_ROOT
700 std::vector<TH1D*> fAnalyzeEventTimeHistograms;
701#endif
702 std::vector<std::string> fModuleNames;
703 std::vector<double> fAnalyzeEventMean;
704 std::vector<double> fAnalyzeEventRMS;
705 std::vector<int> fAnalyzeEventEntries;
706
707 std::vector<double> fAnalyzeEventTimeMax;
708 std::vector<double> fAnalyzeEventTimeTotal;
709
710 //Track Analyse flow event time per module (can be multiple threads)
711#ifdef HAVE_ROOT
713#endif
714 std::vector<double> fAnalyzeFlowEventMean;
715 std::vector<double> fAnalyzeFlowEventRMS;
716 std::vector<int> fAnalyzeFlowEventEntries;
717
718 std::vector<double> fAnalyzeFlowEventTimeMax;
719 std::vector<double> fAnalyzeFlowEventTimeTotal;
720#ifdef HAVE_ROOT
721 //Track user profiling
722 std::map<unsigned int,int> fUserMap;
723 std::vector<TH1D*> fUserHistograms;
724 std::vector<double> fTotalUserTime;
725 std::vector<double> fMaxUserTime;
726
727 // Number of events between samples
728 const int fQueueInterval = 100;
729#endif
730
731public:
732 Profiler( const int queue_interval_check );
733 ~Profiler();
734 void Begin(TARunInfo* runinfo,const std::vector<TARunObject*> fRunRun );
735 // Function for profiling the 'main' thread (that unpacks TMEvents)
736 void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
737 // Function for profiling module threads
738 void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
739 // Extra function for users custom profiling windows
740 void LogUserWindows(TAFlags* flag, TAFlowEvent* flow);
741 void AddModuleMap( const char* UserProfileName, unsigned long hash);
742 void LogMTQueueLength(TARunInfo* runinfo);
743 void End(TARunInfo* runinfo);
744 void Print() const;
745};
746
747Profiler::Profiler(const int queue_interval_check)
748#ifdef HAVE_ROOT
749 :
750 fQueueIntervalCounter(0),
751 fQueueInterval(queue_interval_check)
752#endif
753{
754 if (gTrace)
755 printf("Profiler::ctor\n");
756 fMIDASStartTime = 0;
757 fMIDASStopTime = 0;
758 fStartCPU = clock();
759 fStartUser = std::chrono::system_clock::now();
760
761#ifdef HAVE_ROOT
762#else
763 fprintf(stderr, "Error: manalyzer must be built with ROOT for using the user profiling tools\n");
764#endif
765}
766
768{
769 if (gTrace)
770 printf("Profiler::dtor\n");
771
772#ifdef HAVE_ROOT
774 delete h;
776 for (TH1D* h: fAnalyzeEventTimeHistograms)
777 delete h;
779 for( TH1D* h: fAnalysisQueue)
780 delete h;
781 fAnalysisQueue.clear();
782#endif
783}
784
785void Profiler::Begin(TARunInfo* runinfo, const std::vector<TARunObject*> runrun)
786{
787 if (gTrace)
788 printf("Profiler::begin\n");
789
790 runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime);
791
792#ifdef HAVE_ROOT
793 // Put Profiler histograms in their own folders in the output root file
794 if (runinfo->fRoot->fOutputFile) {
795 runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory
796 gDirectory->mkdir("ProfilerReport")->cd();
797 runinfo->fRoot->fOutputFile->cd("ProfilerReport");
798 gDirectory->mkdir("AnalyzeFlowEventTime");
799 gDirectory->mkdir("AnalyzeFlowTime");
800 gDirectory->mkdir("MTQueueLength");
801 }
802
803 // Setup module processing time histograms
804 // Number of bins
805 Int_t Nbins=1000;
806 // Array of bin edges
807 Double_t bins[Nbins+1];
808 // Processing time range (seconds)
809 Double_t TimeRange = 10;
810 // Set uneven binning to better sample fast modules with accuracy
811 // without having a large number of bins
812 for (int i=0; i<Nbins+1; i++) {
813 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
814 }
815
816 if (runinfo->fMtInfo)
817 fNQueues = runrun.size();
818#endif
819
820 // Per module metric setup
821 for (size_t i = 0; i < runrun.size(); i++) {
822
823 if (runrun[i]->fModuleName.empty())
824 fModuleNames.push_back("Unnamed Module " + std::to_string(i));
825 else
826 fModuleNames.push_back(runrun[i]->fModuleName);
827
828 // Metrics for the AnalyzeEvent function (main thread)
829 fAnalyzeEventMean.push_back(0);
830 fAnalyzeEventRMS.push_back(0);
831 fAnalyzeEventEntries.push_back(0);
832 fAnalyzeEventTimeMax.push_back(0);
833 fAnalyzeEventTimeTotal.push_back(0);
834
835 // Metric for the AnalyzeFlowEvent function (side threads)
836 fAnalyzeFlowEventMean.push_back(0);
837 fAnalyzeFlowEventRMS.push_back(0);
838 fAnalyzeFlowEventEntries.push_back(0);
839 fAnalyzeFlowEventTimeMax.push_back(0);
840 fAnalyzeFlowEventTimeTotal.push_back(0);
841
842#ifdef HAVE_ROOT
843 if (runinfo->fRoot->fOutputFile) {
844 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
845 TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)),
846 TString(fModuleNames.at(i) + " Event Proccessing Time; s"),
847 Nbins, bins);
848 fAnalyzeFlowEventTimeHistograms.push_back(Histo);
849
850 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
851 TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"),
852 TString(fModuleNames.at(i) + " Flow Proccessing Time; s"),
853 Nbins, bins);
854 fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto);
855 } else {
856 fAnalyzeFlowEventTimeHistograms.push_back(NULL);
857 fAnalyzeEventTimeHistograms.push_back(NULL);
858 }
859
860 // Periodically (once every fQueueInterval events) record the
861 // flow queue size if running in multithreaded mode
862 if (runinfo->fMtInfo) {
863 if (runinfo->fRoot->fOutputFile) {
864 runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
865 TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"),
866 TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"),
867 runinfo->fMtInfo->fMtQueueDepth*1.2,
868 0,
869 runinfo->fMtInfo->fMtQueueDepth*1.2);
870 fAnalysisQueue.push_back(QueueHisto);
871 } else {
872 fAnalysisQueue.push_back(NULL);
873 }
874 }
875#endif
876 }
877}
878
879void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start)
880{
881 if (gTrace)
882 printf("Profiler::log\n");
883
884 TAClock stop = TAClockNow();
885 if ((*flag) & TAFlag_SKIP_PROFILE) {
886 //Unset bit
887 *flag -= TAFlag_SKIP_PROFILE;
888 return;
889 }
890
891 std::chrono::duration<double> elapsed_seconds = stop - start;
892 double dt = elapsed_seconds.count();
894 if (dt > fAnalyzeFlowEventTimeMax[i])
896
897 fAnalyzeFlowEventMean[i] +=dt;
898 fAnalyzeFlowEventRMS[i] +=dt*dt;
900
901#ifdef HAVE_ROOT
904#endif
905}
906
907void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start)
908{
909 if (gTrace)
910 printf("Profiler::log_AnalyzeEvent_time\n");
911
912 TAClock stop = TAClockNow();
913 if ((*flag) & TAFlag_SKIP_PROFILE) {
914 //Unset bit
915 *flag -= TAFlag_SKIP_PROFILE;
916 return;
917 }
918
919 std::chrono::duration<double> elapsed_seconds = stop - start;
920 double dt = elapsed_seconds.count();
921 fAnalyzeEventTimeTotal[i] += dt;
922 if (dt > fAnalyzeEventTimeMax[i])
923 fAnalyzeEventTimeMax[i] = dt;
924
925 fAnalyzeEventMean[i] +=dt;
926 fAnalyzeEventRMS[i] +=dt*dt;
928
929#ifdef HAVE_ROOT
931 fAnalyzeEventTimeHistograms[i]->Fill(dt);
932#endif
933
934}
935
937{
938 if (gTrace)
939 printf("Profiler::log_mt_queue_length\n");
940
941#ifdef HAVE_ROOT
943 if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) {
944 for (int i=0; i<fNQueues; i++) {
945 int j=0;
946 { //Lock guard
947 std::lock_guard<std::mutex> lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]);
948 j=runinfo->fMtInfo->fMtFlowQueue[i].size();
949 }
950 fAnalysisQueue.at(i)->Fill(j);
951 }
952 }
953#endif
954}
955
956
958{
959 if (gTrace)
960 printf("Profiler::LogUserWindows\n");
961
962#ifdef HAVE_ROOT
963 //Clocks unfold backwards...
964 std::vector<TAFlowEvent*> flowArray;
965 int FlowEvents=0;
966 TAFlowEvent* f = flow;
967 while (f) {
968 flowArray.push_back(f);
969 f=f->fNext;
970 FlowEvents++;
971 }
972 for (int ii=FlowEvents-1; ii>=0; ii--) {
973 f=flowArray[ii];
974 TAUserProfilerFlow* timer=dynamic_cast<TAUserProfilerFlow*>(f);
975 if (timer) {
976 const char* name = timer->fModuleName.c_str();
977 unsigned int hash = std::hash<std::string>{}(timer->fModuleName);
978 if (!fUserMap.count(hash))
979 AddModuleMap(name,hash);
980 double dt=999.;
981 dt=timer->GetTimer();
982 int i = fUserMap[hash];
983 fTotalUserTime[i] += dt;
984 if (dt > fMaxUserTime[i])
985 fMaxUserTime.at(i) = dt;
986 fUserHistograms.at(i)->Fill(dt);
987 }
988 }
989#endif
990}
991
992void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash)
993{
994 if (gTrace)
995 printf("Profiler::AddModuleMap\n");
996
997 std::lock_guard<std::mutex> lock(TAMultithreadHelper::gfLock);
998
999#ifdef HAVE_ROOT
1000 gDirectory->cd("/ProfilerReport");
1001 fUserMap[hash] = fUserHistograms.size();
1002 Int_t Nbins = 100;
1003 Double_t bins[Nbins+1];
1004 Double_t TimeRange = 10; //seconds
1005 for (int i=0; i<Nbins+1; i++) {
1006 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
1007 }
1008 TH1D* Histo = new TH1D(UserProfileName,UserProfileName,Nbins,bins);
1009 fUserHistograms.push_back(Histo);
1010 fTotalUserTime.push_back(0.);
1011 fMaxUserTime.push_back(0.);
1012#endif
1013 return;
1014}
1015
1017{
1018 if (gTrace)
1019 printf("Profiler::End\n");
1020
1021 for (size_t i=0; i<fAnalyzeEventMean.size(); i++) {
1024 ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) ) *
1025 ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) );
1026 }
1027 for (size_t i=0; i<fAnalyzeFlowEventMean.size(); i++) {
1032 }
1033#ifdef HAVE_ROOT
1034
1035 if (runinfo->fRoot->fOutputFile) {
1036 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
1037 for (TH1D* h: fAnalyzeFlowEventTimeHistograms) {
1038 h->Write();
1039 }
1040 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
1041 for (TH1D* h: fAnalyzeEventTimeHistograms) {
1042 h->Write();
1043 }
1044 if (runinfo->fMtInfo) {
1045 runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
1046 for (TH1D* h: fAnalysisQueue) {
1047 h->Write();
1048 }
1049 }
1050 }
1051#endif
1052}
1053
1055{
1056#ifdef HAVE_ROOT
1057 if (fAnalyzeFlowEventTimeHistograms.size()>0) {
1058#else
1059 if (fAnalyzeEventEntries.size()>0) {
1060#endif
1061 double AllAnalyzeFlowEventTime=0;
1062 for (auto& n : fAnalyzeFlowEventTimeTotal)
1063 AllAnalyzeFlowEventTime += n;
1064 double AllAnalyzeEventTime=0;
1065 for (auto& n : fAnalyzeEventTimeTotal)
1066 AllAnalyzeEventTime += n;
1067 //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end());
1068 printf("Module average processing time\n");
1069 printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
1070 printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
1071 printf("----------------------------------------------------------------------------------------------------------------\n");
1072 for (size_t i=0; i<fModuleNames.size(); i++) {
1073 printf("%-25s", fModuleNames.at(i).c_str());
1074 if (fAnalyzeEventEntries.at(i))
1075 printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
1077 fAnalyzeEventMean.at(i)*1000.,
1078 fAnalyzeEventRMS.at(i)*1000.,
1079 fAnalyzeEventTimeMax.at(i)*1000., //ms
1080 fAnalyzeEventTimeTotal.at(i)); //s
1081 else
1082 printf("\t-\t-\t-\t-\t-");
1083
1084 if (fAnalyzeFlowEventEntries.at(i))
1085 printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
1087 fAnalyzeFlowEventMean.at(i)*1000.,
1088 fAnalyzeFlowEventRMS.at(i)*1000.,
1089 fAnalyzeFlowEventTimeMax.at(i)*1000., //ms
1090 fAnalyzeFlowEventTimeTotal.at(i)); //s
1091 else
1092 printf("\t-\t-\t-\t-\t-");
1093 printf("\n");
1094 }
1095 printf("----------------------------------------------------------------------------------------------------------------\n");
1096 printf(" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
1097 printf(" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
1098#ifdef HAVE_ROOT
1099 if (fUserHistograms.size()) {
1100 printf("Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
1101 printf("----------------------------------------------------------------------\n");
1102 for (size_t i=0; i<fUserHistograms.size(); i++) {
1103 printf("%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",fUserHistograms.at(i)->GetTitle(),
1104 (int)fUserHistograms.at(i)->GetEntries(),
1105 fUserHistograms.at(i)->GetMean()*1000., //ms
1106 fUserHistograms.at(i)->GetRMS()*1000., //ms
1107 fMaxUserTime.at(i)*1000., //ms
1108 fTotalUserTime.at(i)); //s
1109 }
1110 printf("----------------------------------------------------------------------\n");
1111 } else {
1112 printf("----------------------------------------------------------------------------------------------------------------\n");
1113 }
1114#else
1115 fprintf(stderr, "To use custom profile windows, please build rootana with ROOT\n");
1116#endif
1117 }
1118
1119 //CPU and Wall clock time:
1120 double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC;
1121 std::chrono::duration<double> usertime = std::chrono::system_clock::now() - fStartUser;
1122 printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
1123 getenv("_"),
1124 cputime,
1125 usertime.count(),
1126 100.*cputime/usertime.count());
1127}
1128
1129//////////////////////////////////////////////////////////
1130//
1131// RunHandler class
1132//
1133//////////////////////////////////////////////////////////
1134
1135class Profiler;
1136
1137static bool compare_order(const TARunObject* a, const TARunObject* b)
1138{
1139 return a->fModuleOrder < b->fModuleOrder;
1140}
1141
1143{
1144public:
1145 TARunInfo* fRunInfo = NULL;
1146 std::vector<TARunObject*> fRunRun;
1147 std::vector<std::string> fArgs;
1148 bool fMultithreadEnabled = false;
1149 Profiler* fProfiler = NULL;
1150 bool fProfilerEnabled = false;
1152
1153 RunHandler(const std::vector<std::string>& args, bool multithread, bool profile, int queue_interval_check) // ctor
1154 {
1155 fRunInfo = NULL;
1156 fArgs = args;
1157 fMultithreadEnabled = multithread;
1158 fProfiler = NULL;
1159 fProfilerEnabled = profile;
1160 fProfilerIntervalCheck = queue_interval_check;
1161 }
1162
1163 ~RunHandler() // dtor
1164 {
1165 if (fRunInfo) {
1166 delete fRunInfo;
1167 fRunInfo = NULL;
1168 }
1169 if (fProfiler) {
1170 delete fProfiler;
1171 fProfiler = NULL;
1172 }
1173 }
1174
1175 void PerModuleThread(size_t i)
1176 {
1177 //bool data_processing=true;
1178 size_t nModules = fRunRun.size();
1179
1180 TAMultithreadHelper* mt = fRunInfo->fMtInfo;
1181
1182 assert(nModules == mt->fMtFlowQueueMutex.size());
1183 assert(nModules == mt->fMtFlowQueue.size());
1184 assert(nModules == mt->fMtFlagQueue.size());
1185
1186 { //Lock scope
1187 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1188 mt->fMtThreadIsRunning[i] = true;
1189 }
1190
1191 while (!mt->fMtShutdownRequested) {
1192 TAFlowEvent* flow = NULL;
1193 TAFlags* flag = NULL;
1194
1195 { //Lock scope
1196 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1197 if (!mt->fMtFlowQueue[i].empty()) {
1198 flow=mt->fMtFlowQueue[i].front();
1199 flag=mt->fMtFlagQueue[i].front();
1200 mt->fMtFlowQueue[i].pop_front();
1201 mt->fMtFlagQueue[i].pop_front();
1202 //printf("thread %zu has %zu queued events\n", i, mt->fMtFlowQueue[i].size());
1203 }
1204
1205 if (flow == NULL)
1206 mt->fMtThreadIsBusy[i] = false; // we will sleep
1207 else
1208 mt->fMtThreadIsBusy[i] = true; // we will analyze an event
1209
1210 // implicit unlock of mutex
1211 }
1212
1213 if (flow == NULL) { // wait until queue not empty
1214 //if (i==0)
1215 //printf("S%zu\n", i);
1216 usleep(mt->fMtQueueEmptyUSleepTime);
1217 continue;
1218 }
1219
1220 TAClock start_time = TAClockNow();
1221
1222 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1223
1224 if (fProfiler)
1225 fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time);
1226
1227 if ((*flag) & TAFlag_QUIT) { // shut down the analyzer
1228 delete flow;
1229 delete flag;
1230 flow = NULL;
1231 flag = NULL;
1232 mt->fMtQuitRequested = true;
1233 mt->fMtShutdownRequested = true;
1234 break; // stop the thread
1235 }
1236
1237 if ((*flag) & TAFlag_SKIP) { // stop processing this event
1238 delete flow;
1239 delete flag;
1240 flow = NULL;
1241 flag = NULL;
1242 continue;
1243 }
1244
1245 if (i==nModules-1) { //If I am the last module... free memory, else queue up for next module to process
1246 if (fProfiler) {
1247 fProfiler->LogUserWindows(flag, flow);
1248 fProfiler->LogMTQueueLength(fRunInfo);
1249 }
1250 delete flow;
1251 delete flag;
1252 flow = NULL;
1253 flag = NULL;
1254 } else {
1255 MtQueueFlowEvent(mt, i+1, flag, flow);
1256 flow = NULL;
1257 flag = NULL;
1258 }
1259 }
1260
1261 { //Lock scope
1262 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1263 mt->fMtThreadIsRunning[i] = false;
1264 mt->fMtThreadIsBusy[i] = false;
1265 }
1266 }
1267
1268 void CreateRun(int run_number, const char* file_name)
1269 {
1270 assert(fRunInfo == NULL);
1271 assert(fRunRun.size() == 0);
1272
1273 fRunInfo = new TARunInfo(run_number, file_name, fArgs);
1274
1275 if (fProfilerEnabled)
1276 fProfiler = new Profiler( fProfilerIntervalCheck );
1277
1278 size_t nModules = (*gModules).size();
1279
1280 for (size_t i=0; i<nModules; i++)
1281 fRunRun.push_back((*gModules)[i]->NewRunObject(fRunInfo));
1282
1283 std::stable_sort(fRunRun.begin(), fRunRun.end(), compare_order);
1284
1285 if (gTrace) {
1286 printf("Created %zu module run objects:\n", fRunRun.size());
1287 for (size_t i=0; i<fRunRun.size(); i++) {
1288 if (!fRunRun[i]->fModuleName.empty()) {
1289 printf("module %2zu: \"%s\" order %d\n", i, fRunRun[i]->fModuleName.c_str(), fRunRun[i]->fModuleOrder);
1290 } else {
1291 printf("module %2zu: %p order %d\n", i, fRunRun[i], fRunRun[i]->fModuleOrder);
1292 }
1293 }
1294 }
1295
1296#ifdef HAVE_ROOT
1297 if (fRunInfo->fRoot)
1298 fRunInfo->fRoot->Init();
1299#endif
1300
1301 if (fMultithreadEnabled) {
1302 TAMultithreadHelper* mt = new TAMultithreadHelper(nModules);
1303 fRunInfo->fMtInfo = mt;
1304 for (size_t i=0; i<nModules; i++) {
1305 if (gTrace) {
1306 printf("Create fMtFlowQueue thread %zu\n", i);
1307 }
1308 mt->fMtThreads[i]=new std::thread(&RunHandler::PerModuleThread, this, i);
1309 }
1310 }
1311 }
1312
1314 {
1315 assert(fRunInfo != NULL);
1316 assert(fRunInfo->fOdb != NULL);
1317
1318 if (fProfiler)
1319 fProfiler->Begin(fRunInfo, fRunRun);
1320
1321 for (unsigned i=0; i<fRunRun.size(); i++)
1322 fRunRun[i]->BeginRun(fRunInfo);
1323 }
1324
1325 void EndRun(TAFlags* flags)
1326 {
1327 assert(fRunInfo);
1328
1329 // make sure the shutdown sequence matches the description in the README file!
1330
1331 // zeroth. Flush events queued for analysis before calling PreEndRun (insure
1332 // deterministic behaviour thats the same as in single threaded mode)
1333
1334 if (fRunInfo->fMtInfo) {
1335 WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1336 }
1337
1338 // first, call PreEndRun() to tell analysis modules that there will be no more
1339 // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more
1340 // flow events, they to into the flow queue or into the multithread queue
1341
1342 for (unsigned i=0; i<fRunRun.size(); i++)
1343 fRunRun[i]->PreEndRun(fRunInfo);
1344
1345 // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent()
1346 // this can generate additional flow events that will be queued in the queue.
1347
1348 AnalyzeFlowQueue(flags);
1349
1350 // if in multi threaded mode, allow all the queues to drain naturally
1351 // and shutdown the threads
1352
1353 if (fRunInfo->fMtInfo) {
1354 WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1356 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1357 (*flags) |= TAFlag_QUIT;
1358 }
1359 }
1360
1361 // done with processing all flow events
1362
1363 fRunInfo->SetAfterPreEndRun();
1364
1365 // update ODB
1366
1367 if (fRunInfo->fEorOdbDump.size() > 0) {
1368 if (fRunInfo->fOdb) {
1369 delete fRunInfo->fOdb;
1370 fRunInfo->fOdb = NULL;
1371 }
1372
1373 fRunInfo->fOdb = MakeFileDumpOdb(fRunInfo->fEorOdbDump.data(), fRunInfo->fEorOdbDump.size());
1374#ifdef HAVE_MIDAS
1375 midas::odb::set_odb_source(midas::odb::STRING, std::string(fRunInfo->fEorOdbDump.data(), fRunInfo->fEorOdbDump.size()));
1376#endif
1377 }
1378
1379 // all data analysis is complete
1380
1381 for (size_t i=0; i<fRunRun.size(); i++) {
1382 //printf("Calling EndRun() of module %zu \"%s\"\n", i, fRunRun[i]->fModuleName.c_str());
1383 fRunRun[i]->EndRun(fRunInfo);
1384 }
1385
1386 if (fProfiler) {
1387 fProfiler->End(fRunInfo);
1388 fProfiler->Print();
1389 }
1390 }
1391
1393 {
1394 assert(fRunInfo);
1395
1396 for (unsigned i=0; i<fRunRun.size(); i++)
1397 fRunRun[i]->NextSubrun(fRunInfo);
1398 }
1399
1401 {
1402 assert(fRunInfo);
1403
1404 for (unsigned i=0; i<fRunRun.size(); i++) {
1405 delete fRunRun[i];
1406 fRunRun[i] = NULL;
1407 }
1408
1409 fRunRun.clear();
1410 assert(fRunRun.size() == 0);
1411
1412 if (fProfiler) {
1413 delete fProfiler;
1414 fProfiler = NULL;
1415 }
1416
1417 delete fRunInfo;
1418 fRunInfo = NULL;
1419 }
1420
1422 {
1423 for (unsigned i=0; i<fRunRun.size(); i++)
1424 fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1425 }
1426
1428 {
1429 for (unsigned i=0; i<fRunRun.size(); i++) {
1430 TAClock start_time = TAClockNow();
1431 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1432 if (fProfiler)
1433 fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time);
1434 if (!flow)
1435 break;
1436 if ((*flags) & TAFlag_SKIP)
1437 break;
1438 if ((*flags) & TAFlag_QUIT)
1439 break;
1440 }
1441 return flow;
1442 }
1443
1444 void AnalyzeFlowQueue(TAFlags* ana_flags)
1445 {
1446 while (1) {
1447 if (fRunInfo->fMtInfo)
1448 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1449 *ana_flags |= TAFlag_QUIT;
1450 break;
1451 }
1452
1453 TAFlowEvent* flow = fRunInfo->ReadFlowQueue();
1454 if (!flow)
1455 break;
1456
1457 int flags = 0;
1458 flow = AnalyzeFlowEvent(&flags, flow);
1459 if (flow)
1460 delete flow;
1461 if (flags & TAFlag_QUIT) {
1462 *ana_flags |= TAFlag_QUIT;
1463 break;
1464 }
1465 }
1466 }
1467
1468 void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer)
1469 {
1470 assert(fRunInfo != NULL);
1471 assert(fRunInfo->fOdb != NULL);
1472 assert(event != NULL);
1473 assert(flags != NULL);
1474
1475 if (fRunInfo->fMtInfo)
1476 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1477 *flags |= TAFlag_QUIT;
1478 return;
1479 }
1480
1481 TAFlowEvent* flow = NULL;
1482
1483 for (unsigned i=0; i<fRunRun.size(); i++) {
1484 TAClock start_time = TAClockNow();
1485 flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1486 if (fProfiler)
1487 fProfiler->LogAnalyzeEvent(flags, flow, i, start_time);
1488 if (*flags & TAFlag_SKIP)
1489 break;
1490 if (*flags & TAFlag_QUIT)
1491 break;
1492 }
1493
1494 if (flow) {
1495 if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) {
1496 // skip further processing of this event
1497 } else {
1498 if (fRunInfo->fMtInfo) {
1499 MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow);
1500 flow = NULL; // ownership passed to the multithread event queue
1501 } else {
1502 flow = AnalyzeFlowEvent(flags, flow);
1503 }
1504 }
1505 }
1506
1507 if (fProfiler && !fRunInfo->fMtInfo) {
1508 fProfiler->LogUserWindows(flags, flow);
1509 }
1510
1511 if (*flags & TAFlag_WRITE)
1512 if (writer)
1513 TMWriteEvent(writer, event);
1514
1515 if (flow)
1516 delete flow;
1517
1518 if (*flags & TAFlag_QUIT)
1519 return;
1520
1521 if (fRunInfo->fMtInfo)
1522 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1523 *flags |= TAFlag_QUIT;
1524 return;
1525 }
1526
1527 AnalyzeFlowQueue(flags);
1528 }
1529};
1530
1532{
1533 if (fFlowQueue.empty())
1534 return NULL;
1535
1536 TAFlowEvent* flow = fFlowQueue.front();
1537 fFlowQueue.pop_front();
1538 return flow;
1539}
1540
1542{
1543 if (fAfterPreEndRun) {
1544 fprintf(stderr, "Error: Calling AddToFlowQueue() after PreEndRun() is not permitted.\n");
1545 delete flow;
1546 return;
1547 }
1548
1549 if (fMtInfo) {
1550 // call MtQueueFlowEvent with wait=false to avoid deadlock
1551 MtQueueFlowEvent(fMtInfo, 0, NULL, flow);
1552 } else {
1553 fFlowQueue.push_back(flow);
1554 }
1555}
1556
1557#ifdef HAVE_MIDAS
1558
1559#ifdef HAVE_TMFE
1560
1561#ifdef HAVE_ROOT
1562#include "TSystem.h"
1563#endif
1564
1565#include "tmfe.h"
1566
1567static bool gRunStartRequested = false;
1568static bool gRunStopRequested = false;
1569
1570class RpcHandler: public TMFeRpcHandlerInterface
1571{
1572 TMFeResult HandleBeginRun(int run_number)
1573 {
1574 printf("RpcHandler::HandleBeginRun(%d)\n", run_number);
1575 gRunStartRequested = true;
1576 gRunStopRequested = false;
1577 return TMFeOk();
1578 }
1579
1580 TMFeResult HandleEndRun(int run_number)
1581 {
1582 printf("RpcHandler::HandleEndRun(%d)\n", run_number);
1583 gRunStartRequested = false;
1584 gRunStopRequested = true;
1585 return TMFeOk();
1586 }
1587
1588 TMFeResult HandleStartAbortRun(int run_number)
1589 {
1590 printf("RpcHandler::HandleStartAbortRun(%d)\n", run_number);
1591 // run did not really start, pretend it started and immediately ended
1592 gRunStartRequested = false;
1593 gRunStopRequested = true;
1594 return TMFeOk();
1595 }
1596
1597 TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result)
1598 {
1599 return TMFeOk();
1600 }
1601};
1602
1603TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0)
1604{
1605 assert(b != NULL);
1606 assert(e != NULL);
1607
1608 e->Reset();
1609
1610 TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec);
1611
1612 if (r.error_flag)
1613 return r;
1614
1615 if (e->data.size() == 0)
1616 return TMFeOk();
1617
1618 e->ParseEvent();
1619
1620 assert(e->data.size() == e->event_header_size + e->data_size);
1621
1622 return TMFeOk();
1623}
1624
1625static int ProcessMidasOnlineTmfe(const std::vector<std::string>& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1626{
1627 TMFE *mfe = TMFE::Instance();
1628
1629 TMFeResult r = mfe->Connect(progname, hostname, exptname);
1630
1631 if (r.error_flag) {
1632 fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1633 return -1;
1634 }
1635
1636 //MVOdb* odb = mfe->fOdbRoot;
1637
1638 TMEventBuffer *b = new TMEventBuffer(mfe);
1639
1640 /* open event buffer */
1641
1642 r = b->OpenBuffer(bufname);
1643
1644 if (r.error_flag) {
1645 fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1646 return -1;
1647 }
1648
1649 /* request read cache */
1650 size_t cache_size = 100000;
1651 if(!strcmp(sampling_type_string,"GET_RECENT"))
1652 cache_size=0;
1653 r = b->SetCacheSize(cache_size, 0);
1654
1655 if (r.error_flag) {
1656 fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1657 return -1;
1658 }
1659
1660 /* request events */
1661
1662 r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1663
1664 if (r.error_flag) {
1665 fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1666 return -1;
1667 }
1668
1669 RpcHandler* h = new RpcHandler();
1670
1671 mfe->AddRpcHandler(h);
1672
1673 mfe->DeregisterTransitionPause();
1674 mfe->DeregisterTransitionResume();
1675 mfe->SetTransitionSequenceStart(300);
1676 mfe->SetTransitionSequenceStop(700);
1677
1678 mfe->StartRpcThread();
1679
1680 /* reqister event requests */
1681
1682 RunHandler rh(args, multithread, profiler, queue_interval_check);
1683
1684 for (unsigned i=0; i<(*gModules).size(); i++)
1685 (*gModules)[i]->Init(args);
1686
1687 if (mfe->fStateRunning) {
1688 rh.CreateRun(mfe->fRunNumber, NULL);
1689 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1690 rh.BeginRun();
1691 }
1692
1693#ifdef HAVE_ROOT
1694 TFile* no_run_file = NULL;
1695#endif
1696
1697 TMEvent e;
1698
1699 while (!mfe->fShutdownRequested) {
1700 bool do_sleep = true;
1701
1702 if (gRunStartRequested) {
1703 gRunStartRequested = false;
1704
1705 if (rh.fRunInfo) {
1706 TAFlags flags = 0;
1707 rh.EndRun(&flags);
1708 rh.fRunInfo->fOdb = NULL;
1709 rh.DeleteRun();
1710 }
1711
1712#ifdef HAVE_ROOT
1713 if (no_run_file) {
1714 if (gTrace) {
1715 printf("JSROOT close file\n");
1716 }
1717
1718 no_run_file->Close();
1719 delete no_run_file;
1720 no_run_file = NULL;
1721 }
1722#endif
1723
1724 rh.CreateRun(mfe->fRunNumber, NULL);
1725 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1726 rh.BeginRun();
1727
1728 continue;
1729 }
1730
1731 if (gRunStopRequested) {
1732 gRunStopRequested = false;
1733
1734 if (rh.fRunInfo) {
1735 TAFlags flags = 0;
1736 rh.EndRun(&flags);
1737 rh.fRunInfo->fOdb = NULL;
1738 rh.DeleteRun();
1739 }
1740
1741 continue;
1742 }
1743
1744 //r = buf.ReceiveEvent(&e, BM_NO_WAIT);
1745 //r = buf.ReceiveEvent(&e, BM_WAIT);
1746 //r = buf.ReceiveEvent(&e, 8000);
1747 //r = buf.ReceiveEvent(&e, 5000);
1748 r = ReceiveEvent(b, &e, 100);
1749
1750 if (r.error_flag) {
1751 fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1752 break;
1753 }
1754
1755 //e.PrintHeader();
1756 //::sleep(1);
1757
1758 if ((e.data_size > 0) && (rh.fRunInfo != NULL)) {
1759
1760 //e.PrintHeader();
1761 //e.PrintBanks(2);
1762
1763 TAFlags flags = 0;
1764
1765 rh.AnalyzeEvent(&e, &flags, writer);
1766
1767 if (flags & TAFlag_QUIT) {
1768 mfe->fShutdownRequested = true;
1769 }
1770
1771 if (num_analyze > 0) {
1772 num_analyze--;
1773 if (num_analyze == 0) {
1774 mfe->fShutdownRequested = true;
1775 }
1776 }
1777
1778 do_sleep = false;
1779 }
1780
1781#ifdef HAVE_ROOT
1782 if (!rh.fRunInfo) { // no run
1783 if (!no_run_file) { // no ROOT file
1785 std::string filename = TARootHelper::fgOutputRootFiles.back();
1787
1788 if (gTrace) {
1789 printf("JSROOT open file \"%s\"\n", filename.c_str());
1790 }
1791
1792 no_run_file = new TFile(filename.c_str(), "READ");
1793
1794 if (!no_run_file->IsOpen()) {
1795 fprintf(stderr, "Error: cannot open ROOT file \"%s\"\n", filename.c_str());
1796 } else {
1797 jsroot_ReadFile(no_run_file);
1798 }
1799 }
1800 }
1801 }
1802#endif
1803
1804#ifdef HAVE_THTTP_SERVER
1806 int nreq = TARootHelper::fgHttpServer->ProcessRequests();
1807 if (nreq > 0) {
1808 do_sleep = false;
1809 //printf("ProcessRequests() returned %d\n", nreq);
1810 }
1811 }
1812#endif
1813#ifdef HAVE_ROOT
1814 if (TARootHelper::fgApp) {
1815 gSystem->DispatchOneEvent(kTRUE);
1816 }
1817#endif
1818
1819 if (rh.fRunInfo && rh.fRunInfo->fMtInfo) {
1820 while (MtQueueWait(rh.fRunInfo->fMtInfo)) {
1821 mfe->Yield(0.001);
1822#ifdef HAVE_ROOT
1824 gSystem->DispatchOneEvent(kTRUE);
1825 }
1826#endif
1827 }
1828 }
1829
1830 //printf("do_sleep %d\n", do_sleep);
1831
1832 if (do_sleep) {
1833 mfe->Yield(0.010);
1834 } else {
1835 mfe->Yield(0);
1836 }
1837 }
1838
1839 if (rh.fRunInfo) {
1840 TAFlags flags = 0;
1841 rh.EndRun(&flags);
1842 rh.fRunInfo->fOdb = NULL;
1843 rh.DeleteRun();
1844 }
1845
1846#ifdef HAVE_ROOT
1847 if (no_run_file) {
1848 if (gTrace) {
1849 printf("JSROOT close file\n");
1850 }
1851
1852 no_run_file->Close();
1853 delete no_run_file;
1854 no_run_file = NULL;
1855 }
1856#endif
1857
1858 for (unsigned i=0; i<(*gModules).size(); i++)
1859 (*gModules)[i]->Finish();
1860
1861 /* close event buffer */
1862 r = b->CloseBuffer();
1863
1864 delete b;
1865 b = NULL;
1866
1867 if (r.error_flag) {
1868 fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1869 //return -1;
1870 }
1871
1872 /* disconnect from experiment */
1873 mfe->Disconnect();
1874
1875 return 0;
1876}
1877
1878#else
1879
1880#include "TMidasOnline.h"
1881
1882#ifdef HAVE_ROOT
1883#include "TSystem.h"
1884#endif
1885
1887{
1888public:
1890 int fNumAnalyze = 0;
1891 TMWriterInterface* fWriter = NULL;
1892 bool fQuit = false;
1893 MVOdb* fOdb = NULL;
1894
1895 OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector<std::string>& args, bool multithread, bool profiler, int queue_interval_check) // ctor
1896 : fRun(args, multithread, profiler, queue_interval_check)
1897 {
1898 fNumAnalyze = num_analyze;
1899 fWriter = writer;
1900 fOdb = odb;
1901 }
1902
1904 {
1905 fWriter = NULL;
1906 fOdb = NULL;
1907 }
1908
1909 void StartRun(int run_number)
1910 {
1911 fRun.CreateRun(run_number, NULL);
1912 fRun.fRunInfo->fOdb = fOdb;
1913 fRun.BeginRun();
1914 }
1915
1916 void Transition(int transition, int run_number, int transition_time)
1917 {
1918 //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time);
1919
1920 if (transition == TR_START) {
1921 if (fRun.fRunInfo) {
1922 TAFlags flags = 0;
1923 fRun.EndRun(&flags);
1924 if (flags & TAFlag_QUIT)
1925 fQuit = true;
1926 fRun.fRunInfo->fOdb = NULL;
1927 fRun.DeleteRun();
1928 }
1929 assert(fRun.fRunInfo == NULL);
1930
1931 StartRun(run_number);
1932 printf("Begin run: %d\n", run_number);
1933 } else if (transition == TR_STOP) {
1934 TAFlags flags = 0;
1935 fRun.EndRun(&flags);
1936 if (flags & TAFlag_QUIT)
1937 fQuit = true;
1938 fRun.fRunInfo->fOdb = NULL;
1939 fRun.DeleteRun();
1940 printf("End of run %d\n", run_number);
1941 }
1942 }
1943
1944 void Event(const void* data, int data_size)
1945 {
1946 //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size);
1947
1948 if (!fRun.fRunInfo) {
1949 StartRun(0); // start fake run for events outside of a run
1950 }
1951
1952 TMEvent* event = new TMEvent(data, data_size);
1953
1954 TAFlags flags = 0;
1955
1956 fRun.AnalyzeEvent(event, &flags, fWriter);
1957
1958 if (flags & TAFlag_QUIT)
1959 fQuit = true;
1960
1961 if (fNumAnalyze > 0) {
1962 fNumAnalyze--;
1963 if (fNumAnalyze == 0)
1964 fQuit = true;
1965 }
1966
1967 if (event) {
1968 delete event;
1969 event = NULL;
1970 }
1971 }
1972};
1973
1974static int ProcessMidasOnlineOld(const std::vector<std::string>& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1975{
1977
1978 int err = midas->connect(hostname, exptname, "rootana");
1979 if (err != 0) {
1980 fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err);
1981 return -1;
1982 }
1983
1984 MVOdb* odb = MakeMidasOdb(midas->fDB);
1985
1986 OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check);
1987
1988 midas->RegisterHandler(h);
1989 midas->registerTransitions();
1990
1991 /* reqister event requests */
1992
1993 midas->eventRequest("SYSTEM",-1,-1,(1<<1));
1994
1995 int run_number = 0; // midas->odbReadInt("/runinfo/Run number");
1996 int run_state = 0; // midas->odbReadInt("/runinfo/State");
1997
1998 odb->RI("runinfo/run number", &run_number);
1999 odb->RI("runinfo/state", &run_state);
2000
2001 for (unsigned i=0; i<(*gModules).size(); i++)
2002 (*gModules)[i]->Init(args);
2003
2004 if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
2005 h->StartRun(run_number);
2006 }
2007
2008 while (!h->fQuit) {
2009#ifdef HAVE_THTTP_SERVER
2011 TARootHelper::fgHttpServer->ProcessRequests();
2012 }
2013#endif
2014#ifdef HAVE_ROOT
2015 if (TARootHelper::fgApp) {
2016 gSystem->DispatchOneEvent(kTRUE);
2017 }
2018#endif
2019 if (!TMidasOnline::instance()->poll(10))
2020 break;
2021 }
2022
2023 if (h->fRun.fRunInfo) {
2024 TAFlags flags = 0;
2025 h->fRun.EndRun(&flags);
2026 h->fRun.fRunInfo->fOdb = NULL;
2027 h->fRun.DeleteRun();
2028 }
2029
2030 for (unsigned i=0; i<(*gModules).size(); i++)
2031 (*gModules)[i]->Finish();
2032
2033 delete h; h = NULL;
2034 delete odb; odb = NULL;
2035
2036 /* disconnect from experiment */
2037 midas->disconnect();
2038
2039 return 0;
2040}
2041
2042#endif
2043#endif
2044
2045std::vector<std::string> TARunInfo::fgFileList;
2047
2048static int ProcessMidasFiles(const std::vector<std::string>& files, const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
2049{
2050 int number_of_missing_files = 0;
2051
2052 TARunInfo::fgFileList.clear();
2053
2054 for (unsigned i=0; i<files.size(); i++)
2055 TARunInfo::fgFileList.push_back(files[i]);
2056
2057 for (unsigned i=0; i<(*gModules).size(); i++)
2058 (*gModules)[i]->Init(args);
2059
2060 RunHandler run(args, multithread, profiler, queue_interval_check);
2061
2062 bool done = false;
2063
2067
2069
2070 TMReaderInterface *reader = TMNewReader(filename.c_str());
2071
2072 if (reader->fError) {
2073 printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str());
2074 delete reader;
2075 number_of_missing_files++;
2076 continue;
2077 }
2078
2079 while (1) {
2080 TMEvent* event = TMReadEvent(reader);
2081
2082 if (!event) // EOF
2083 break;
2084
2085 if (event->error) {
2086 delete event;
2087 break;
2088 }
2089
2090 if (event->event_id == 0x8000) // begin of run event
2091 {
2092 int runno = event->serial_number;
2093
2094 if (run.fRunInfo) {
2095 if (run.fRunInfo->fRunNo == runno) {
2096 // next subrun file, nothing to do
2097 run.fRunInfo->fFileName = filename;
2098 run.NextSubrun();
2099 } else {
2100 // file with a different run number
2101 TAFlags flags = 0;
2102 run.EndRun(&flags);
2103 if (flags & TAFlag_QUIT) {
2104 done = true;
2105 }
2106 run.DeleteRun();
2107 }
2108 }
2109
2110 if (!run.fRunInfo) {
2111 run.CreateRun(runno, filename.c_str());
2112 assert(run.fRunInfo);
2113 run.fRunInfo->fBorOdbDump.assign(event->GetEventData(), event->GetEventData() + event->data_size);
2114 run.fRunInfo->fOdb = MakeFileDumpOdb(run.fRunInfo->fBorOdbDump.data(), run.fRunInfo->fBorOdbDump.size());
2115#ifdef HAVE_MIDAS
2116 midas::odb::set_odb_source(midas::odb::STRING, std::string(run.fRunInfo->fBorOdbDump.data(), run.fRunInfo->fBorOdbDump.size()));
2117#endif
2118 run.BeginRun();
2119 }
2120
2121 assert(run.fRunInfo);
2122
2123 run.AnalyzeSpecialEvent(event);
2124
2125 if (writer)
2126 TMWriteEvent(writer, event);
2127 }
2128 else if (event->event_id == 0x8001) // end of run event
2129 {
2130 run.fRunInfo->fEorOdbDump.assign(event->GetEventData(), event->GetEventData() + event->data_size);
2131
2132 run.AnalyzeSpecialEvent(event);
2133
2134 if (writer)
2135 TMWriteEvent(writer, event);
2136
2137 }
2138 else if (event->event_id == 0x8002) // message event
2139 {
2140 run.AnalyzeSpecialEvent(event);
2141 if (writer)
2142 TMWriteEvent(writer, event);
2143 }
2144 else
2145 {
2146 if (!run.fRunInfo) {
2147 // create a fake begin of run
2148 run.CreateRun(0, filename.c_str());
2149 run.fRunInfo->fOdb = MakeNullOdb();
2150 run.BeginRun();
2151 }
2152
2153 if (num_skip > 0) {
2154 num_skip--;
2155 } else {
2156 TAFlags flags = 0;
2157
2158 run.AnalyzeEvent(event, &flags, writer);
2159
2160 if (flags & TAFlag_QUIT)
2161 done = true;
2162
2163 if (num_analyze > 0) {
2164 num_analyze--;
2165 if (num_analyze == 0)
2166 done = true;
2167 }
2168 }
2169 }
2170
2171 delete event;
2172
2173 if (done)
2174 break;
2175
2176#ifdef HAVE_ROOT
2178 gSystem->DispatchOneEvent(kTRUE);
2179 }
2180#endif
2181
2182 if (run.fRunInfo && run.fRunInfo->fMtInfo) {
2183 while (MtQueueWait(run.fRunInfo->fMtInfo)) {
2185#ifdef HAVE_ROOT
2187 gSystem->DispatchOneEvent(kTRUE);
2188 }
2189#endif
2190 }
2191 }
2192 }
2193
2194 reader->Close();
2195 delete reader;
2196
2197 if (done)
2198 break;
2199 }
2200
2201#ifdef HAVE_THTTP_SERVER
2202 if (0 && TARootHelper::fgHttpServer) {
2203 while (1) {
2204 gSystem->DispatchOneEvent(kTRUE);
2205 //sleep(1);
2206 }
2207 }
2208#endif
2209
2210 if (run.fRunInfo) {
2211 TAFlags flags = 0;
2212 run.EndRun(&flags);
2213 if (flags & TAFlag_QUIT)
2214 done = true;
2215 run.DeleteRun();
2216 }
2217
2218 for (unsigned i=0; i<(*gModules).size(); i++)
2219 (*gModules)[i]->Finish();
2220
2221 if (number_of_missing_files) {
2222 printf("%d midas files were not openable\n", number_of_missing_files);
2223 return number_of_missing_files;
2224 }
2225
2226 return 0;
2227}
2228
2229static int ProcessDemoMode(const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
2230{
2231 for (unsigned i=0; i<(*gModules).size(); i++)
2232 (*gModules)[i]->Init(args);
2233
2234 RunHandler run(args, multithread, profiler, queue_interval_check);
2235
2236 bool done = false;
2237
2238 int runno = 1;
2239
2240 for (unsigned i=0; true; i++) {
2241 char s[256];
2242 snprintf(s, sizeof(s), "%03d", i);
2243 std::string filename = std::string("demo_subrun_") + s;
2244
2245 if (!run.fRunInfo) {
2246 run.CreateRun(runno, filename.c_str());
2247 run.fRunInfo->fOdb = MakeNullOdb();
2248 run.BeginRun();
2249 }
2250
2251 // we do not generate a fake begin of run event...
2252 //run.AnalyzeSpecialEvent(event);
2253
2254 // only switch subruns after the first subrun file
2255 if (i>0) {
2256 run.fRunInfo->fFileName = filename;
2257 run.NextSubrun();
2258 }
2259
2260 TMEvent* event = new TMEvent();
2261
2262 for (unsigned j=0; j<100; j++) {
2263 event->Init(0x0001, 0xFFFF, j+1, 0, 0);
2264
2265 uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 };
2266 event->AddBank("TEST", TID_DWORD, (const char*)test_data, sizeof(test_data));
2267
2268 float slow_data[1];
2269 slow_data[0] = (drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() + drand48() - 6.0)*10.0;
2270 event->AddBank("SLOW", TID_FLOAT, (const char*)slow_data, sizeof(slow_data));
2271
2272 if (num_skip > 0) {
2273 num_skip--;
2274 } else {
2275 TAFlags flags = 0;
2276
2277 run.AnalyzeEvent(event, &flags, writer);
2278
2279 if (flags & TAFlag_QUIT)
2280 done = true;
2281
2282 if (num_analyze > 0) {
2283 num_analyze--;
2284 if (num_analyze == 0)
2285 done = true;
2286 }
2287 }
2288
2289 if (done)
2290 break;
2291
2292#ifdef HAVE_THTTP_SERVER
2294 TARootHelper::fgHttpServer->ProcessRequests();
2295 }
2296#endif
2297#ifdef HAVE_ROOT
2298 if (TARootHelper::fgApp) {
2299 gSystem->DispatchOneEvent(kTRUE);
2300 }
2301#endif
2302 }
2303
2304 delete event;
2305
2306 // we do not generate a fake end of run event...
2307 //run.AnalyzeSpecialEvent(event);
2308
2309 if (done)
2310 break;
2311 }
2312
2313 if (run.fRunInfo) {
2314 TAFlags flags = 0;
2315 run.EndRun(&flags);
2316 run.DeleteRun();
2317 }
2318
2319 for (unsigned i=0; i<(*gModules).size(); i++)
2320 (*gModules)[i]->Finish();
2321
2322 return 0;
2323}
2324
2325static bool gEnableShowMem = false;
2326
2327#if 0
2328static int ShowMem(const char* label)
2329{
2330 if (!gEnableShowMem)
2331 return 0;
2332
2333 FILE* fp = fopen("/proc/self/statm","r");
2334 if (!fp)
2335 return 0;
2336
2337 int mem = 0;
2338 fscanf(fp,"%d",&mem);
2339 fclose(fp);
2340
2341 if (label)
2342 printf("memory at %s is %d\n", label, mem);
2343
2344 return mem;
2345}
2346#endif
2347
2349{
2350public:
2352 : TARunObject(runinfo)
2353 {
2354 if (gTrace)
2355 printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo);
2356
2357 fModuleName = "EventDump";
2358 fModuleOrder = -1;
2359 }
2360
2362 {
2363 if (gTrace)
2364 printf("EventDumpModule::dtor!\n");
2365 }
2366
2367 void BeginRun(TARunInfo* runinfo)
2368 {
2369 printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo);
2370 }
2371
2372 void EndRun(TARunInfo* runinfo)
2373 {
2374 printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo);
2375 }
2376
2377 void NextSubrun(TARunInfo* runinfo)
2378 {
2379 printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2380 }
2381
2382 void PauseRun(TARunInfo* runinfo)
2383 {
2384 printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo);
2385 }
2386
2387 void ResumeRun(TARunInfo* runinfo)
2388 {
2389 printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo);
2390 }
2391
2392 TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2393 {
2394 printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo);
2395 event->FindAllBanks();
2396 std::string h = event->HeaderToString();
2397 std::string b = event->BankListToString();
2398 printf("%s: %s\n", h.c_str(), b.c_str());
2399 return flow;
2400 }
2401
2403 {
2404 printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2405 }
2406};
2407
2409{
2410public:
2411
2412 void Init(const std::vector<std::string> &args)
2413 {
2414 if (gTrace)
2415 printf("EventDumpModuleFactory::Init!\n");
2416 }
2417
2418 void Finish()
2419 {
2420 if (gTrace)
2421 printf("EventDumpModuleFactory::Finish!\n");
2422 }
2423
2425 {
2426 if (gTrace)
2427 printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2428 return new EventDumpModule(runinfo);
2429 }
2430};
2431
2432#ifdef HAVE_ROOT
2433#include <TGMenu.h>
2434#include <TGButton.h>
2435#include <TBrowser.h>
2436
2437#define CTRL_QUIT 1
2438#define CTRL_NEXT 2
2439#define CTRL_CONTINUE 3
2440#define CTRL_PAUSE 4
2441#define CTRL_NEXT_FLOW 5
2442
2443#define CTRL_TBROWSER 11
2444
2446{
2447public:
2449
2450 ValueHolder() // ctor
2451 {
2452 fValue = 0;
2453 }
2454};
2455
2457{
2458public:
2461
2462 TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor
2463 : TGTextButton(p, text)
2464 {
2465 fHolder = holder;
2466 fValue = value;
2467 }
2468
2469#if 0
2470 void Pressed()
2471 {
2472 printf("Pressed!\n");
2473 }
2474
2475 void Released()
2476 {
2477 printf("Released!\n");
2478 }
2479#endif
2480
2481 void Clicked()
2482 {
2483 //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue);
2484 if (fHolder)
2485 fHolder->fValue = fValue;
2486 //gSystem->ExitLoop();
2487 }
2488};
2489
2490class MainWindow: public TGMainFrame
2491{
2492public:
2493 TGPopupMenu* fMenu;
2494 TGMenuBar* fMenuBar;
2495 TGLayoutHints* fMenuBarItemLayout;
2496
2497 TGCompositeFrame* fButtonsFrame;
2498
2500
2505
2507
2508public:
2509 MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor
2510 : TGMainFrame(w, s1, s2)
2511 {
2512 if (gTrace)
2513 printf("MainWindow::ctor!\n");
2514
2515 fHolder = holder;
2516 //SetCleanup(kDeepCleanup);
2517
2518 SetWindowName("ROOT Analyzer Control");
2519
2520 // layout the gui
2521 fMenu = new TGPopupMenu(gClient->GetRoot());
2522 fMenu->AddEntry("New TBrowser", CTRL_TBROWSER);
2523 fMenu->AddEntry("-", 0);
2524 fMenu->AddEntry("Next", CTRL_NEXT);
2525 fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW);
2526 fMenu->AddEntry("Continue", CTRL_CONTINUE);
2527 fMenu->AddEntry("Pause", CTRL_PAUSE);
2528 fMenu->AddEntry("-", 0);
2529 fMenu->AddEntry("Quit", CTRL_QUIT);
2530
2531 fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2532
2533 fMenu->Associate(this);
2534
2535 fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame);
2536 fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout);
2537 fMenuBar->Layout();
2538
2539 AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2540
2541 fButtonsFrame = new TGVerticalFrame(this);
2542
2543 fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT);
2544 fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW);
2545
2546 fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2547 fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2548
2549 TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame);
2550
2551 fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE);
2552 fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE);
2553
2554 hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2555 hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2556
2557 fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX));
2558
2559 fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT);
2560 fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2561
2562 AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX));
2563
2564 MapSubwindows();
2565 Layout();
2566 Resize(GetDefaultSize());
2567 MapWindow();
2568 }
2569
2570 ~MainWindow() // dtor // Closing the control window closes the whole program
2571 {
2572 if (gTrace)
2573 printf("MainWindow::dtor!\n");
2574
2575 delete fMenu;
2576 delete fMenuBar;
2577 delete fMenuBarItemLayout;
2578 }
2579
2581 {
2582 if (gTrace)
2583 printf("MainWindow::CloseWindow()\n");
2584
2585 if (fHolder)
2586 fHolder->fValue = CTRL_QUIT;
2587 //gSystem->ExitLoop();
2588 }
2589
2590 Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
2591 {
2592 //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2);
2593 switch (GET_MSG(msg))
2594 {
2595 default:
2596 break;
2597 case kC_COMMAND:
2598 switch (GET_SUBMSG(msg))
2599 {
2600 default:
2601 break;
2602 case kCM_MENU:
2603 //printf("parm1 %d\n", (int)parm1);
2604 switch (parm1)
2605 {
2606 case CTRL_TBROWSER:
2607 new TBrowser();
2608 break;
2609 default:
2610 //printf("Control %d!\n", (int)parm1);
2611 if (fHolder)
2612 fHolder->fValue = parm1;
2613 //gSystem->ExitLoop();
2614 break;
2615 }
2616 break;
2617 }
2618 break;
2619 }
2620
2621 return kTRUE;
2622 }
2623};
2624#endif
2625
2627{
2628public:
2632#ifdef HAVE_ROOT
2635#endif
2636
2638 : TARunObject(runinfo)
2639 {
2640 if (gTrace)
2641 printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo);
2642 fModuleName = "InteractiveModule";
2643 fModuleOrder = 9999;
2644 fContinue = false;
2645 fNextFlow = false;
2646 fSkip = 0;
2647#ifdef HAVE_ROOT
2648 if (!fgHolder)
2649 fgHolder = new ValueHolder;
2650 if (!fgCtrlWindow && runinfo->fRoot->fgApp) {
2651 fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2652 }
2653#endif
2654 }
2655
2657 {
2658 if (gTrace)
2659 printf("InteractiveModule::dtor!\n");
2660 }
2661
2662 void BeginRun(TARunInfo* runinfo)
2663 {
2664 printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo);
2665 }
2666
2667 void EndRun(TARunInfo* runinfo)
2668 {
2669 printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo);
2670
2671#ifdef HAVE_ROOT
2672 if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2673 fgCtrlWindow->fNextButton->SetEnabled(false);
2674 fgCtrlWindow->fNextFlowButton->SetEnabled(false);
2675 fgCtrlWindow->fContinueButton->SetEnabled(false);
2676 fgCtrlWindow->fPauseButton->SetEnabled(false);
2677 while (1) {
2678#ifdef HAVE_THTTP_SERVER
2680 TARootHelper::fgHttpServer->ProcessRequests();
2681 }
2682#endif
2683#ifdef HAVE_ROOT
2684 if (TARootHelper::fgApp) {
2685 gSystem->DispatchOneEvent(kTRUE);
2686 }
2687#endif
2688#ifdef HAVE_MIDAS
2689#ifdef HAVE_TMFE
2690 TMFE* mfe = TMFE::Instance();
2691 mfe->Yield(0.010);
2692 if (mfe->fShutdownRequested) {
2693 return;
2694 }
2695#else
2696 if (!TMidasOnline::instance()->sleep(10)) {
2697 // FIXME: indicate that we should exit the analyzer
2698 return;
2699 }
2700#endif
2701#else
2702 gSystem->Sleep(10);
2703#endif
2704
2705 int ctrl = fgHolder->fValue;
2706 fgHolder->fValue = 0;
2707
2708 switch (ctrl) {
2709 case CTRL_QUIT:
2710 return;
2711 case CTRL_NEXT:
2712 return;
2713 case CTRL_CONTINUE:
2714 return;
2715 }
2716 }
2717 }
2718#endif
2719 }
2720
2721 void PauseRun(TARunInfo* runinfo)
2722 {
2723 printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo);
2724 }
2725
2726 void ResumeRun(TARunInfo* runinfo)
2727 {
2728 printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo);
2729 }
2730
2731 void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags)
2732 {
2733#ifdef HAVE_ROOT
2734 if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2735 while (1) {
2736#ifdef HAVE_THTTP_SERVER
2738 TARootHelper::fgHttpServer->ProcessRequests();
2739 }
2740#endif
2741#ifdef HAVE_ROOT
2742 if (TARootHelper::fgApp) {
2743 gSystem->DispatchOneEvent(kTRUE);
2744 }
2745#endif
2746#ifdef HAVE_MIDAS
2747#ifdef HAVE_TMFE
2748 TMFE* mfe = TMFE::Instance();
2749 mfe->Yield(0.010);
2750 if (mfe->fShutdownRequested) {
2751 *flags |= TAFlag_QUIT;
2752 return;
2753 }
2754#else
2755 if (!TMidasOnline::instance()->sleep(10)) {
2756 *flags |= TAFlag_QUIT;
2757 return;
2758 }
2759#endif
2760#else
2761 gSystem->Sleep(10);
2762#endif
2763
2764 int ctrl = fgHolder->fValue;
2765 fgHolder->fValue = 0;
2766
2767 switch (ctrl) {
2768 case CTRL_QUIT:
2769 *flags |= TAFlag_QUIT;
2770 return;
2771 case CTRL_NEXT:
2772 return;
2773 case CTRL_NEXT_FLOW:
2774 fNextFlow = true;
2775 return;
2776 case CTRL_CONTINUE:
2777 fContinue = true;
2778 return;
2779 }
2780 }
2781 }
2782#endif
2783
2784 while (1) {
2785 char str[256];
2786 fprintf(stdout, "manalyzer> "); fflush(stdout);
2787 char* s = fgets(str, sizeof(str)-1, stdin);
2788
2789 if (s == NULL) {
2790 // EOF
2791 *flags |= TAFlag_QUIT;
2792 return;
2793 }
2794
2795 printf("command [%s]\n", str);
2796
2797 if (str[0] == 'h') { // "help"
2798 printf("Interactive manalyzer commands:\n");
2799 printf(" q - quit\n");
2800 printf(" h - help\n");
2801 printf(" c - continue until next TAFlag_DISPLAY event\n");
2802 printf(" n - next event\n");
2803 printf(" aNNN - analyze N events, i.e. \"a10\"\n");
2804 } else if (str[0] == 'q') { // "quit"
2805 *flags |= TAFlag_QUIT;
2806 return;
2807 } else if (str[0] == 'n') { // "next"
2808 return;
2809 } else if (str[0] == 'c') { // "continue"
2810 fContinue = true;
2811 return;
2812 } else if (str[0] == 'a') { // "analyze" N events
2813 int num = atoi(str+1);
2814 printf("Analyzing %d events\n", num);
2815 if (num > 0) {
2816 fSkip = num-1;
2817 }
2818 return;
2819 }
2820 }
2821 }
2822
2823 TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2824 {
2825 printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str());
2826
2827#ifdef HAVE_ROOT
2828 if (fgHolder->fValue == CTRL_QUIT) {
2829 *flags |= TAFlag_QUIT;
2830 return flow;
2831 } else if (fgHolder->fValue == CTRL_PAUSE) {
2832 fContinue = false;
2833 }
2834#endif
2835
2836 if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2837 return flow;
2838 } else {
2839 fContinue = false;
2840 }
2841
2842 if (fSkip > 0) {
2843 fSkip--;
2844 return flow;
2845 }
2846
2847 InteractiveLoop(runinfo, flags);
2848
2849 return flow;
2850 }
2851
2853 {
2854 printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo);
2855
2856#ifdef HAVE_ROOT
2857 if (fgHolder->fValue == CTRL_QUIT) {
2858 *flags |= TAFlag_QUIT;
2859 return flow;
2860 } else if (fgHolder->fValue == CTRL_PAUSE) {
2861 fContinue = false;
2862 }
2863#endif
2864
2865 if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2866 return flow;
2867 }
2868
2869 fNextFlow = false;
2870
2871 InteractiveLoop(runinfo, flags);
2872
2873 return flow;
2874 }
2875
2877 {
2878 if (gTrace)
2879 printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2880 }
2881};
2882
2883#ifdef HAVE_ROOT
2886#endif
2887
2889{
2890public:
2891
2892 void Init(const std::vector<std::string> &args)
2893 {
2894 if (gTrace)
2895 printf("InteractiveModuleFactory::Init!\n");
2896 }
2897
2898 void Finish()
2899 {
2900 if (gTrace)
2901 printf("InteractiveModuleFactory::Finish!\n");
2902 }
2903
2905 {
2906 if (gTrace)
2907 printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2908 return new InteractiveModule(runinfo);
2909 }
2910};
2911
2912//////////////////////////////////////////////////////////
2913//
2914// main program
2915//
2916//////////////////////////////////////////////////////////
2917
2918static void help()
2919{
2920 printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2921 printf("\n");
2922 printf("-h: print this help message\n");
2923 printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2924 printf("\n");
2925 printf("-Hhostname: connect to MIDAS experiment on given host\n");
2926 printf("-Eexptname: connect to this MIDAS experiment\n");
2927 printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2928 printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2929 printf("--midas-exptname EXPTNAME -- connect to given experiment\n");
2930 printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2931 printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2932 printf("--midas-event-id III -- receive only events with matching event ID\n");
2933 printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2934 printf("\n");
2935 printf("-oOutputfile.mid: write selected events into this file\n");
2936 printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2937 printf("-eNNN: Number of events to analyze, 0=unlimited\n");
2938 printf("-sNNN: Number of events to skip before starting analysis\n");
2939 printf("\n");
2940 printf("--dump: activate the event dump module\n");
2941 printf("\n");
2942 printf("-t: Enable tracing of constructors, destructors and function calls\n");
2943 printf("-m: Enable memory leak debugging\n");
2944 printf("-g: Enable graphics display when processing data files\n");
2945 printf("-i: Enable intractive mode\n");
2946 printf("\n");
2947 printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2948 printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength);
2949 printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty);
2950 printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull);
2951 printf("\n");
2952 printf("--no-profiler: Turn off manalyzer module profiler\n");
2953 printf("--pqiNNN: Profile multithread queue lengths every NNN events \n");
2954#ifdef HAVE_ROOT
2955 printf("\n");
2956 printf("-Doutputdirectory: Specify output root file directory\n");
2957 printf("-Ooutputfile.root: Specify output root file filename\n");
2958 printf("--jsroot: After analysis is finished, keep jsroot running\n");
2959#endif
2960 printf("\n");
2961 printf("--: All following arguments are passed to the analyzer modules Init() method\n");
2962 printf("\n");
2963 printf("Analyzer modules usage:\n");
2964 if (gModules) {
2965 for (unsigned i=0; i<(*gModules).size(); i++) {
2966 (*gModules)[i]->Usage();
2967 }
2968 }
2969 printf("\n");
2970 printf("Example1: analyze online data: ./manalyzer.exe -R9091\n");
2971 printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2972 exit(1);
2973}
2974
2975// duplicate c++20 std::string s.starts_with()
2976
2977static bool starts_with(const std::string& s, const char* prefix)
2978{
2979 return (s.substr(0, strlen(prefix)) == prefix);
2980}
2981
2982// Main function call
2983
2984int manalyzer_main(int argc, char* argv[])
2985{
2986 setbuf(stdout, NULL);
2987 setbuf(stderr, NULL);
2988
2989 signal(SIGILL, SIG_DFL);
2990 signal(SIGBUS, SIG_DFL);
2991 signal(SIGSEGV, SIG_DFL);
2992 signal(SIGPIPE, SIG_DFL);
2993
2994 std::vector<std::string> args;
2995 for (int i=0; i<argc; i++) {
2996 if (strcmp(argv[i],"-h")==0)
2997 help(); // does not return
2998 args.push_back(argv[i]);
2999 }
3000
3001 int httpPort = 0;
3002 int num_skip = 0;
3003 int num_analyze = 0;
3004
3005 TMWriterInterface *writer = NULL;
3006
3007 bool event_dump = false;
3008 bool demo_mode = false;
3009#ifdef HAVE_ROOT
3010 bool root_graphics = false;
3011#endif
3012 bool interactive = false;
3013
3014 bool multithread = false;
3015
3016#ifdef HAVE_ROOT
3017 bool enable_jsroot = false;
3018#endif
3019
3020#ifdef HAVE_ROOT
3021 bool performance_profiler = true;
3022#else
3023 bool performance_profiler = false;
3024#endif
3025 int snap_shot_queue_length = 100;
3026
3027 std::vector<std::string> files;
3028 std::vector<std::string> modargs;
3029
3030#ifdef HAVE_MIDAS
3031 std::string midas_hostname = "";
3032 std::string midas_exptname = "";
3033 std::string midas_progname = "ana";
3034 std::string midas_buffer = "SYSTEM";
3035 //std::string midas_sampling = "GET_ALL";
3036 std::string midas_sampling = "GET_NONBLOCKING";
3037 int midas_event_id = -1;
3038 int midas_trigger_mask = -1;
3039#endif
3040
3041 for (unsigned int i=1; i<args.size(); i++) { // loop over the command line options
3042 std::string arg = args[i];
3043 //printf("argv[%d] is %s\n",i,arg);
3044
3045 if (arg == "--") {
3046 for (unsigned j=i+1; j<args.size(); j++)
3047 modargs.push_back(args[j]);
3048 break;
3049 } else if (arg == "--dump") {
3050 event_dump = true;
3051 } else if (arg == "--demo") {
3052 demo_mode = true;
3053 num_analyze = 100;
3054#ifdef HAVE_ROOT
3055 } else if (arg == "-g") {
3056 root_graphics = true;
3057#endif
3058 } else if (arg == "-i") {
3059 interactive = true;
3060 } else if (arg == "-t") {
3061 gTrace = true;
3064 } else if (starts_with(arg, "-o")) {
3065 writer = TMNewWriter(arg.c_str()+2);
3066 } else if (starts_with(arg, "-s")) {
3067 num_skip = atoi(arg.c_str()+2);
3068 } else if (starts_with(arg, "-e")) {
3069 num_analyze = atoi(arg.c_str()+2);
3070 } else if (starts_with(arg, "-m")) { // Enable memory debugging
3071 gEnableShowMem = true;
3072 } else if (starts_with(arg, "-R")) { // Set the ROOT THttpServer HTTP port
3073 httpPort = atoi(arg.c_str()+2);
3074#ifdef HAVE_MIDAS
3075 } else if (starts_with(arg, "-H")) {
3076 midas_hostname = arg.c_str()+2;
3077 } else if (starts_with(arg, "-E")) {
3078 midas_exptname = arg.c_str()+2;
3079 } else if (arg == "--midas-progname") {
3080 midas_progname = args[i+1]; i++;
3081 } else if (arg == "--midas-hostname") {
3082 midas_hostname = args[i+1]; i++;
3083 } else if (arg == "--midas-exptname") {
3084 midas_exptname = args[i+1]; i++;
3085 } else if (arg == "--midas-buffer") {
3086 midas_buffer = args[i+1]; i++;
3087 } else if (arg == "--midas-sampling") {
3088 midas_sampling = args[i+1]; i++;
3089 } else if (arg == "--midas-event-id") {
3090 midas_event_id = atoi(args[i+1].c_str()); i++;
3091 } else if (arg == "--midas-trigger-mask") {
3092 midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
3093#endif
3094 } else if (starts_with(arg, "--mtql")) {
3095 gDefaultMultithreadQueueLength = atoi(arg.c_str()+6);
3096 } else if (starts_with(arg, "--mtse")) {
3097 gDefaultMultithreadWaitEmpty = atoi(arg.c_str()+6);
3098 } else if (starts_with(arg, "--mtsf")) {
3099 gDefaultMultithreadWaitFull = atoi(arg.c_str()+6);
3100 } else if (arg == "--mt") {
3101 multithread=true;
3102 } else if (arg == "--no-profiler") {
3103 performance_profiler = 0;
3104 } else if (starts_with(arg, "--pqi")) {
3105 snap_shot_queue_length = atoi(arg.c_str()+5);
3106#ifdef HAVE_ROOT
3107 } else if (starts_with(arg, "-O")) {
3108 TARootHelper::fgUserOutputFileName = arg.c_str()+2;
3109 } else if (starts_with(arg, "-D")) {
3111 } else if (arg == "--jsroot") {
3112 enable_jsroot = true;
3113#endif
3114 } else if (arg == "-h") {
3115 help(); // does not return
3116 } else if (arg[0] == '-') {
3117 help(); // does not return
3118 } else {
3119 files.push_back(args[i]);
3120 }
3121 }
3122
3123 if (!gModules)
3124 gModules = new std::vector<TAFactory*>;
3125
3126 if ((*gModules).size() == 0)
3127 event_dump = true;
3128
3129 if (event_dump)
3130 (*gModules).push_back(new EventDumpModuleFactory);
3131
3132 if (interactive)
3133 (*gModules).push_back(new InteractiveModuleFactory);
3134
3135 if (gTrace) {
3136 printf("Registered modules: %zu\n", (*gModules).size());
3137 }
3138
3139#ifdef HAVE_ROOT
3140 if (multithread) {
3141 // see https://root.cern/manual/multi_threading/
3142 ROOT::EnableImplicitMT();
3143 ROOT::EnableThreadSafety();
3144 }
3145
3146 if (root_graphics) {
3147 TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0);
3148 }
3149
3150 TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms");
3152#endif
3153
3154 if (httpPort) {
3155#ifdef HAVE_THTTP_SERVER
3156 char str[256];
3157 snprintf(str, sizeof(str), "http:127.0.0.1:%d?cors", httpPort);
3158 THttpServer *s = new THttpServer(str);
3159 if (!s->IsAnyEngine()) {
3160 fprintf(stderr,"ERROR: Cannot start web server on http port %d, see previous error message.\n", httpPort);
3161 exit(1);
3162 }
3163 //s->SetTimer(100, kFALSE);
3165#else
3166 fprintf(stderr,"ERROR: No support for the THttpServer!\n");
3167#endif
3168 }
3169
3170 for (unsigned i=0; i<files.size(); i++) {
3171 printf("file[%d]: %s\n", i, files[i].c_str());
3172 }
3173
3174 int exit_state = 0;
3175
3176 if (demo_mode) {
3177 exit_state = ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3178 } else if (files.size() > 0) {
3179 exit_state = ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3180 } else {
3181#ifdef HAVE_MIDAS
3182#ifdef HAVE_TMFE
3183 exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3184#else
3185 exit_state = ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3186#endif
3187#endif
3188#ifdef HAVE_ROOT
3189 enable_jsroot = false; // do not start jsroot if wer were stopped from the MIDAS Programs page
3190#endif
3191 }
3192
3193 if (writer) {
3194 writer->Close();
3195 delete writer;
3196 writer = NULL;
3197 }
3198
3199#ifdef HAVE_ROOT
3200#ifdef HAVE_THTTP_SERVER
3201 if (enable_jsroot && TARootHelper::fgHttpServer && !TARootHelper::fgOutputRootFiles.empty()) {
3202 printf("Starting jsroot. Use Ctrl-C to stop.\n");
3204 }
3205#endif
3206#endif
3207
3208 return exit_state;
3209}
3210
3211/* emacs
3212 * Local Variables:
3213 * tab-width: 8
3214 * c-basic-offset: 3
3215 * indent-tabs-mode: nil
3216 * End:
3217 */
R__EXTERN TDirectory * gDirectory
double GetTimeSec()
TARunObject * NewRunObject(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
void PauseRun(TARunInfo *runinfo)
EventDumpModule(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
void EndRun(TARunInfo *runinfo)
void NextSubrun(TARunInfo *runinfo)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
void ResumeRun(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
TARunObject * NewRunObject(TARunInfo *runinfo)
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
void PauseRun(TARunInfo *runinfo)
void EndRun(TARunInfo *runinfo)
static MainWindow * fgCtrlWindow
void ResumeRun(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
InteractiveModule(TARunInfo *runinfo)
static ValueHolder * fgHolder
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
Definition mvodb.h:21
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
void CloseWindow()
TGLayoutHints * fMenuBarItemLayout
TextButton * fNextFlowButton
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
TGCompositeFrame * fButtonsFrame
TGPopupMenu * fMenu
TextButton * fPauseButton
ValueHolder * fHolder
TextButton * fNextButton
TGMenuBar * fMenuBar
TextButton * fContinueButton
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
TextButton * fQuitButton
void Event(const void *data, int data_size)
void Transition(int transition, int run_number, int transition_time)
RunHandler fRun
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
void StartRun(int run_number)
std::vector< double > fAnalyzeFlowEventRMS
const int fQueueInterval
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
std::vector< double > fAnalyzeFlowEventTimeTotal
uint32_t fMIDASStopTime
std::vector< double > fTotalUserTime
std::vector< int > fAnalyzeEventEntries
std::vector< TH1D * > fUserHistograms
std::vector< double > fAnalyzeFlowEventMean
uint32_t fMIDASStartTime
std::atomic< int > fQueueIntervalCounter
std::string fBinaryPath
std::vector< double > fAnalyzeEventRMS
void Print() const
std::vector< double > fAnalyzeEventTimeMax
clock_t fStartCPU
std::string fBinaryName
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
std::vector< std::string > fModuleNames
void End(TARunInfo *runinfo)
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
std::chrono::system_clock::time_point fStartUser
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
std::vector< double > fAnalyzeEventTimeTotal
void AddModuleMap(const char *UserProfileName, unsigned long hash)
std::vector< double > fAnalyzeFlowEventTimeMax
void LogMTQueueLength(TARunInfo *runinfo)
std::map< unsigned int, int > fUserMap
std::vector< double > fAnalyzeEventMean
std::vector< double > fMaxUserTime
std::vector< TH1D * > fAnalyzeEventTimeHistograms
Profiler(const int queue_interval_check)
std::vector< int > fAnalyzeFlowEventEntries
std::vector< TH1D * > fAnalysisQueue
TARunInfo * fRunInfo
std::vector< TARunObject * > fRunRun
std::vector< std::string > fArgs
void NextSubrun()
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
void CreateRun(int run_number, const char *file_name)
void DeleteRun()
void AnalyzeSpecialEvent(TMEvent *event)
void EndRun(TAFlags *flags)
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
void PerModuleThread(size_t i)
int fProfilerIntervalCheck
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
void AnalyzeFlowQueue(TAFlags *ana_flags)
void BeginRun()
virtual void Finish()
virtual void Init(const std::vector< std::string > &args)
virtual void Usage()
virtual ~TAFlowEvent()
Definition manalyzer.cxx:98
TAFlowEvent * fNext
Definition manalyzer.h:58
std::vector< TAFlagsQueue > fMtFlagQueue
Definition manalyzer.h:189
std::vector< std::mutex > fMtFlowQueueMutex
Definition manalyzer.h:187
std::atomic< bool > fMtShutdownRequested
Definition manalyzer.h:195
std::vector< std::thread * > fMtThreads
Definition manalyzer.h:190
std::vector< std::atomic< bool > > fMtThreadIsBusy
Definition manalyzer.h:192
TAMultithreadHelper(int nModules)
std::vector< TAFlowEventQueue > fMtFlowQueue
Definition manalyzer.h:188
static std::mutex gfLock
Definition manalyzer.h:204
std::vector< std::atomic< bool > > fMtThreadIsRunning
Definition manalyzer.h:191
std::atomic< bool > fMtQuitRequested
Definition manalyzer.h:196
TARegister(TAFactory *m)
std::string fOutputFileName
Definition manalyzer.h:160
static std::string fgUserOutputFileName
Definition manalyzer.h:165
static TApplication * fgApp
Definition manalyzer.h:167
static std::string fgUserOutputDirectory
Definition manalyzer.h:164
static THttpServer * fgHttpServer
Definition manalyzer.h:168
static std::vector< std::string > fgOutputRootFiles
Definition manalyzer.h:169
TFile * fOutputFile
Definition manalyzer.h:161
static TDirectory * fgDir
Definition manalyzer.h:166
void AddToFlowQueue(TAFlowEvent *)
TAFlowEvent * ReadFlowQueue()
std::string fFileName
Definition manalyzer.h:25
std::vector< std::string > fArgs
Definition manalyzer.h:29
std::vector< char > fBorOdbDump
Definition manalyzer.h:30
MVOdb * fOdb
Definition manalyzer.h:26
int fRunNo
Definition manalyzer.h:24
static std::vector< std::string > fgFileList
Definition manalyzer.h:33
std::vector< char > fEorOdbDump
Definition manalyzer.h:31
TARootHelper * fRoot
Definition manalyzer.h:27
TAMultithreadHelper * fMtInfo
Definition manalyzer.h:28
static int fgCurrentFileIndex
Definition manalyzer.h:34
void SetAfterPreEndRun()
Definition manalyzer.h:52
virtual void EndRun(TARunInfo *runinfo)
virtual void ResumeRun(TARunInfo *runinfo)
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
virtual void NextSubrun(TARunInfo *runinfo)
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
virtual void PauseRun(TARunInfo *runinfo)
virtual void BeginRun(TARunInfo *runinfo)
int fModuleOrder
Definition manalyzer.h:92
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
virtual void PreEndRun(TARunInfo *runinfo)
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
const std::string fModuleName
Definition manalyzer.h:231
double GetTimer() const
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
Definition midasio.cxx:662
void Reset()
reset everything
Definition midasio.cxx:692
void ParseEvent()
parse event data
Definition midasio.cxx:712
std::vector< char > data
MIDAS event bytes.
Definition midasio.h:67
size_t event_header_size
size of MIDAS event header
Definition midasio.h:63
uint32_t serial_number
MIDAS event serial number.
Definition midasio.h:59
uint32_t data_size
MIDAS event data size.
Definition midasio.h:61
uint16_t event_id
MIDAS event ID.
Definition midasio.h:57
MIDAS online connection, including access to online ODB.
virtual int Close()=0
std::string fErrorString
Definition midasio.h:116
static bool fgTrace
Definition midasio.h:117
virtual int Close()=0
static bool fgTrace
Definition midasio.h:127
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
HNDLE fDB
ODB handle.
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
ValueHolder * fHolder
void Clicked()
std::chrono::high_resolution_clock::time_point TAClock
Definition manalyzer.h:218
int TAFlags
Definition manalyzer.h:79
#define TAFlag_DISPLAY
Definition manalyzer.h:85
#define TAFlag_SKIP
Definition manalyzer.h:82
#define TAFlag_WRITE
Definition manalyzer.h:84
TAClock TAClockNow()
Definition manalyzer.h:220
#define TAFlag_QUIT
Definition manalyzer.h:83
#define TAFlag_SKIP_PROFILE
Definition manalyzer.h:86
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
Definition midasio.cxx:657
TMWriterInterface * TMNewWriter(const char *destination)
Definition midasio.cxx:549
TMEvent * TMReadEvent(TMReaderInterface *reader)
Definition midasio.cxx:591
#define TID_FLOAT
Definition midasio.h:27
TMReaderInterface * TMNewReader(const char *source)
Definition midasio.cxx:453
#define TID_DWORD
Definition midasio.h:22
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
Definition midasodb.cxx:969
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
Definition mvodb.cxx:93
MVOdb * MakeNullOdb()
Definition nullodb.cxx:137
@ full
Definition lz4.cxx:365
std::chrono::high_resolution_clock::time_point TAClock
Definition manalyzer.h:218
int TAFlags
Definition manalyzer.h:79
std::chrono::duration< double > TAClockDuration
Definition manalyzer.h:219
TAClock TAClockNow()
Definition manalyzer.h:220
#define TAFlag_SKIP_PROFILE
Definition manalyzer.h:86
#define CTRL_NEXT_FLOW
static int gDefaultMultithreadWaitFull
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
std::vector< TAFactory * > * gModules
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow)
static bool gTrace
Definition manalyzer.cxx:26
#define CTRL_NEXT
int manalyzer_main(int argc, char *argv[])
static int gDefaultMultithreadWaitEmpty
static bool compare_order(const TARunObject *a, const TARunObject *b)
#define CTRL_TBROWSER
#define CTRL_CONTINUE
#define CTRL_QUIT
static bool starts_with(const std::string &s, const char *prefix)
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static bool gEnableShowMem
static bool MtQueueWait(TAMultithreadHelper *mt)
#define CTRL_PAUSE
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
static void help()
static int gDefaultMultithreadQueueLength
int ShowMem(const char *label)