Line data Source code
1 : //
2 : // MIDAS analyzer
3 : //
4 : // K.Olchanski
5 : //
6 :
7 : #undef NDEBUG // this program requires working assert()
8 :
9 : #include <stdio.h>
10 : #include <unistd.h> // usleep()
11 : #include <assert.h>
12 : #include <sys/stat.h> // struct stat_buffer;
13 :
14 : #include <algorithm> // std::stable_sort()
15 :
16 : #include "manalyzer.h"
17 : #include "midasio.h"
18 : #include "odbxx.h"
19 :
20 : //////////////////////////////////////////////////////////
21 :
22 : static bool gTrace = false;
23 :
24 : //////////////////////////////////////////////////////////
25 : //
26 : // Methods of TARunInfo
27 : //
28 : //////////////////////////////////////////////////////////
29 :
30 0 : TARunInfo::TARunInfo(int runno, const char* filename, const std::vector<std::string>& args)
31 : {
32 0 : if (gTrace)
33 0 : printf("TARunInfo::ctor!\n");
34 0 : fRunNo = runno;
35 0 : if (filename)
36 0 : fFileName = filename;
37 0 : fOdb = NULL;
38 : #ifdef HAVE_ROOT
39 : fRoot = new TARootHelper(this);
40 : #else
41 0 : fRoot = NULL;
42 : #endif
43 0 : fMtInfo = NULL;
44 0 : fArgs = args;
45 0 : }
46 :
47 0 : TARunInfo::~TARunInfo()
48 : {
49 0 : if (gTrace)
50 0 : printf("TARunInfo::dtor!\n");
51 0 : fRunNo = 0;
52 0 : fFileName = "(deleted)";
53 0 : if (fOdb) {
54 0 : delete fOdb;
55 0 : fOdb = NULL;
56 : }
57 : #ifdef HAVE_ROOT
58 : if (fRoot) {
59 : delete fRoot;
60 : fRoot = NULL;
61 : }
62 : #endif
63 : int count = 0;
64 0 : while (1) {
65 0 : TAFlowEvent* flow = ReadFlowQueue();
66 0 : if (!flow)
67 : break;
68 0 : delete flow;
69 0 : count++;
70 0 : }
71 0 : if (gTrace) {
72 0 : printf("TARunInfo::dtor: deleted %d queued flow events!\n", count);
73 : }
74 :
75 0 : if (fMtInfo) {
76 0 : delete fMtInfo;
77 0 : fMtInfo = NULL;
78 : }
79 0 : }
80 :
81 : //////////////////////////////////////////////////////////
82 : //
83 : // Methods of TAFlowEvent
84 : //
85 : //////////////////////////////////////////////////////////
86 :
87 0 : TAFlowEvent::TAFlowEvent(TAFlowEvent* flow) // ctor
88 : {
89 0 : if (gTrace)
90 0 : printf("TAFlowEvent::ctor: chain %p\n", flow);
91 0 : fNext = flow;
92 0 : }
93 :
94 0 : TAFlowEvent::~TAFlowEvent() // dtor
95 : {
96 0 : if (gTrace)
97 0 : printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext);
98 0 : if (fNext)
99 0 : delete fNext;
100 0 : fNext = NULL;
101 0 : }
102 :
103 : //////////////////////////////////////////////////////////
104 : //
105 : // Methods of TARunObject
106 : //
107 : //////////////////////////////////////////////////////////
108 :
109 0 : TARunObject::TARunObject(TARunInfo* runinfo)
110 : {
111 0 : if (gTrace)
112 0 : printf("TARunObject::ctor, run %d\n", runinfo->fRunNo);
113 0 : }
114 :
115 0 : void TARunObject::BeginRun(TARunInfo* runinfo)
116 : {
117 0 : if (gTrace)
118 0 : printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo);
119 0 : }
120 :
121 0 : void TARunObject::EndRun(TARunInfo* runinfo)
122 : {
123 0 : if (gTrace)
124 0 : printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo);
125 0 : }
126 :
127 0 : void TARunObject::NextSubrun(TARunInfo* runinfo)
128 : {
129 0 : if (gTrace)
130 0 : printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo);
131 0 : }
132 :
133 0 : void TARunObject::PauseRun(TARunInfo* runinfo)
134 : {
135 0 : if (gTrace)
136 0 : printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo);
137 0 : }
138 :
139 0 : void TARunObject::ResumeRun(TARunInfo* runinfo)
140 : {
141 0 : if (gTrace)
142 0 : printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo);
143 0 : }
144 :
145 0 : void TARunObject::PreEndRun(TARunInfo* runinfo)
146 : {
147 0 : if (gTrace)
148 0 : printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo);
149 0 : }
150 :
151 0 : TAFlowEvent* TARunObject::Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
152 : {
153 0 : if (gTrace)
154 0 : printf("TARunObject::Analyze!\n");
155 :
156 : // This default analyze function does no work, instruct the Profiler to not time / log this
157 0 : *flags|=TAFlag_SKIP_PROFILE;
158 :
159 0 : return flow;
160 : }
161 :
162 0 : TAFlowEvent* TARunObject::AnalyzeFlowEvent(TARunInfo* runinfo, TAFlags* flags, TAFlowEvent* flow)
163 : {
164 0 : if (gTrace)
165 0 : printf("TARunObject::Analyze!\n");
166 :
167 : // This default analyze function does no work, instruct the Profiler to not time / log this
168 0 : *flags |= TAFlag_SKIP_PROFILE;
169 :
170 0 : return flow;
171 : }
172 :
173 0 : void TARunObject::AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
174 : {
175 0 : if (gTrace)
176 0 : printf("TARunObject::AnalyzeSpecialEvent!\n");
177 0 : }
178 :
179 : //////////////////////////////////////////////////////////
180 : //
181 : // Methods of TAFactory
182 : //
183 : //////////////////////////////////////////////////////////
184 :
185 0 : void TAFactory::Usage()
186 : {
187 0 : if (gTrace)
188 0 : printf("TAFactory::Usage!\n");
189 0 : }
190 :
191 0 : void TAFactory::Init(const std::vector<std::string> &args)
192 : {
193 0 : if (gTrace)
194 0 : printf("TAFactory::Init!\n");
195 0 : }
196 :
197 0 : void TAFactory::Finish()
198 : {
199 0 : if (gTrace)
200 0 : printf("TAFactory::Finish!\n");
201 0 : }
202 :
203 : #ifdef HAVE_ROOT
204 :
205 : //////////////////////////////////////////////////////////
206 : //
207 : // Methods of TARootHelper
208 : //
209 : //////////////////////////////////////////////////////////
210 :
211 : std::string TARootHelper::fOutputDirectory = "root_output_files";
212 : std::string TARootHelper::fOutputFileName = "";
213 : TApplication* TARootHelper::fgApp = NULL;
214 : TDirectory* TARootHelper::fgDir = NULL;
215 : THttpServer* TARootHelper::fgHttpServer = NULL;
216 : std::vector<std::string> TARootHelper::fgOutputRootFiles;
217 :
218 :
219 : TARootHelper::TARootHelper(const TARunInfo* runinfo) // ctor
220 : {
221 : if (gTrace)
222 : printf("TARootHelper::ctor!\n");
223 :
224 : std::string filename = fOutputFileName;
225 :
226 : if (filename.empty()) {
227 : char xfilename[256];
228 : snprintf(xfilename, sizeof(xfilename), "output%05d.root", runinfo->fRunNo);
229 :
230 : if (fOutputDirectory.empty()) {
231 : filename = xfilename;
232 : } else {
233 : filename = fOutputDirectory + "/" + xfilename;
234 :
235 : struct stat buffer;
236 : int status = stat(fOutputDirectory.c_str(), &buffer);
237 :
238 : if (status < 0 && errno == ENOENT) {
239 : fprintf(stdout, "TARootHelper::ctor: creating output directory \"%s\"\n", fOutputDirectory.c_str());
240 : status = mkdir(fOutputDirectory.c_str(), 0777);
241 : if (status == -1) {
242 : fprintf(stderr, "TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n", fOutputDirectory.c_str(), errno, strerror(errno));
243 : }
244 : }
245 : }
246 : }
247 :
248 : fOutputFile = new TFile(filename.c_str(), "RECREATE");
249 :
250 : if (!fOutputFile->IsOpen()) {
251 : fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
252 : fOutputFile = new TFile("/dev/null", "UPDATE");
253 : assert(fOutputFile);
254 : assert(fOutputFile->IsOpen());
255 : }
256 :
257 : if (fOutputFile != NULL) {
258 : if (gTrace)
259 : printf("TARootHelper::ctor: ROOT output file is \"%s\"!\n", filename.c_str());
260 :
261 : fgOutputRootFiles.push_back(filename);
262 :
263 : fOutputFile->cd();
264 : }
265 : }
266 :
267 : TARootHelper::~TARootHelper() // dtor
268 : {
269 : if (gTrace)
270 : printf("TARootHelper::dtor!\n");
271 :
272 : if (fOutputFile != NULL) {
273 : fOutputFile->Write();
274 : fOutputFile->Close();
275 : fOutputFile = NULL;
276 : if (gTrace)
277 : printf("TARootHelper::dtor: ROOT output file closed!\n");
278 : }
279 :
280 : if (fgDir)
281 : fgDir->cd();
282 : }
283 :
284 : //////////////////////////////////////////////////////////
285 : //
286 : // jsroot
287 : //
288 : //////////////////////////////////////////////////////////
289 :
290 : #ifdef HAVE_THTTP_SERVER
291 :
292 : #include "TKey.h" // class TKey
293 : #include "THttpServer.h" // class THttpServer
294 : #include "TSystem.h" // gSystem
295 :
296 : static void jsroot_ReadFile(TDirectoryFile *f)
297 : {
298 : for (TObject* keyobj: *f->GetListOfKeys()) {
299 : TKey* key = (TKey*)keyobj;
300 : //printf("key %p, name [%s] title [%s]\n", key, key->GetName(), key->GetTitle());
301 : TObject* obj = f->Get(key->GetName());
302 : TClass* t = obj->IsA();
303 : //printf("obj %p, class [%s], key %p, name [%s], title [%s]\n", obj, t->GetName(), key, key->GetName(), key->GetTitle());
304 : if (strcmp(t->GetName(), "TDirectoryFile") == 0) {
305 : jsroot_ReadFile((TDirectoryFile*)obj);
306 : }
307 : }
308 : }
309 :
310 : static void jsroot(const std::vector<std::string>& files)
311 : {
312 : THttpServer* httpServer = TARootHelper::fgHttpServer;
313 :
314 : assert(httpServer != NULL); // server should be already running
315 :
316 : std::vector<TFile*> root_files;
317 :
318 : printf("JSROOT server for %zu files:\n", files.size());
319 :
320 : for (size_t i=0; i<files.size(); i++) {
321 : printf("file[%zu]: %s\n", i, files[i].c_str());
322 :
323 : TFile* f = new TFile(files[i].c_str(), "READ");
324 :
325 : if (!f->IsOpen()) {
326 : fprintf(stderr, "Error: cannot open ROOT file \"%s\"\n", files[i].c_str());
327 : continue;
328 : }
329 :
330 : //f->ReadAll(); // does not read all histograms into memory
331 : //f->ReadKeys(); // reads all the subdirectories, but not histograms, so everything is in the wrong order
332 :
333 : jsroot_ReadFile(f);
334 :
335 : root_files.push_back(f);
336 : }
337 :
338 : while (1) {
339 : int nreq = httpServer->ProcessRequests();
340 : if (nreq > 0) {
341 : //printf("ProcessRequests() returned %d\n", nreq);
342 : continue;
343 : }
344 :
345 : //gSystem->DispatchOneEvent(kTRUE);
346 :
347 : bool intr = gSystem->ProcessEvents();
348 : if (intr)
349 : break;
350 :
351 : usleep(1000);
352 : }
353 :
354 : printf("Interrupted, shutting down!\n");
355 :
356 : // close ROOT files
357 :
358 : for (TFile* f: root_files) {
359 : f->Close();
360 : }
361 : root_files.clear();
362 : }
363 :
364 : #endif
365 :
366 : #endif
367 :
368 : #include <stdio.h>
369 : #include <stdlib.h>
370 : #include <string.h>
371 : #include <sys/time.h>
372 : #include <assert.h>
373 : #include <signal.h>
374 :
375 : #include "manalyzer.h"
376 : #include "midasio.h"
377 :
378 : #ifdef HAVE_THTTP_SERVER
379 : #include "THttpServer.h"
380 : #endif
381 :
382 : #ifdef HAVE_ROOT
383 : #include <TSystem.h>
384 : #include <TROOT.h>
385 : #endif
386 :
387 : //////////////////////////////////////////////////////////
388 : //
389 : // Methods and Defaults of TAMultithreadHelper
390 : //
391 : //////////////////////////////////////////////////////////
392 :
393 : static int gDefaultMultithreadQueueLength = 1000;
394 : static int gDefaultMultithreadWaitEmpty = 10; // microseconds
395 : static int gDefaultMultithreadWaitFull = 10; // microseconds
396 :
397 0 : static void WaitForAllQueuesEmpty(TAMultithreadHelper* mt)
398 : {
399 0 : fprintf(stderr, "Waiting for all queues to empty out!\n");
400 :
401 0 : int count_all_empty = 0;
402 :
403 0 : for (int t=0; ; ) {
404 0 : int count_not_empty = 0;
405 0 : size_t count_events = 0;
406 :
407 0 : for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
408 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
409 0 : if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown
410 0 : continue;
411 0 : if (!mt->fMtFlowQueue[i].empty()) {
412 0 : count_not_empty++;
413 0 : count_events += mt->fMtFlowQueue[i].size();
414 0 : break;
415 : }
416 0 : if (mt->fMtThreadIsBusy[i]) {
417 : count_not_empty++;
418 : count_events += 1; // module is analyzing 1 event
419 : break;
420 : }
421 : // implicit unlock
422 0 : }
423 :
424 : //fprintf(stderr, "Waiting for all queues to empty out: %d queues still have %zu flow events!\n", count_not_empty, count_events);
425 :
426 0 : if (count_not_empty == 0) {
427 0 : count_all_empty++;
428 : }
429 :
430 0 : if (count_all_empty > 1) {
431 : // must loop over "all empty" at least twice! K.O.
432 : break;
433 : }
434 :
435 0 : if (t > 10) {
436 0 : fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %zu flow events!\n", count_not_empty, count_events);
437 : //break;
438 : }
439 :
440 0 : ::sleep(1);
441 0 : t++;
442 0 : }
443 0 : }
444 :
445 0 : static void WaitForAllThreadsShutdown(TAMultithreadHelper* mt)
446 : {
447 0 : fprintf(stderr, "Waiting for all threads to shut down!\n");
448 :
449 0 : mt->fMtShutdownRequested = true;
450 :
451 0 : for (int t=0; ; ) {
452 0 : int count_running = 0;
453 :
454 0 : for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
455 0 : if (mt->fMtThreadIsRunning[i])
456 0 : count_running++;
457 : }
458 :
459 0 : if (count_running == 0) {
460 : break;
461 : }
462 :
463 0 : if (t > 10) {
464 0 : fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running);
465 0 : break;
466 : }
467 :
468 0 : ::sleep(1);
469 0 : t++;
470 0 : }
471 :
472 0 : fprintf(stderr, "Joining all threads!\n");
473 0 : for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
474 0 : if (!mt->fMtThreadIsRunning[i] ) {
475 : // pointer is null if thread is already shutdown
476 0 : if (mt->fMtThreads[i]) {
477 0 : mt->fMtThreads[i]->join();
478 0 : delete mt->fMtThreads[i];
479 0 : mt->fMtThreads[i] = NULL;
480 : }
481 : } else {
482 0 : fprintf(stderr, "Thread %d failed to shut down!\n", i);
483 : }
484 : }
485 0 : }
486 :
487 :
488 0 : TAMultithreadHelper::TAMultithreadHelper(int nModules):
489 0 : fMtFlowQueueMutex(nModules),
490 0 : fMtFlowQueue(nModules),
491 0 : fMtFlagQueue(nModules),
492 0 : fMtThreads(nModules,NULL),
493 0 : fMtThreadIsRunning(nModules),
494 0 : fMtThreadIsBusy(nModules),
495 0 : fMtShutdownRequested(false),
496 0 : fMtQuitRequested(false)
497 : // ctor
498 : {
499 0 : for (auto &b: fMtThreadIsRunning) { b = false; }
500 0 : for (auto &b: fMtThreadIsBusy) { b = false; }
501 : // default max queue size
502 0 : fMtQueueDepth = gDefaultMultithreadQueueLength;
503 : // queue settings
504 0 : fMtQueueFullUSleepTime = gDefaultMultithreadWaitFull; //u seconds
505 0 : fMtQueueEmptyUSleepTime = gDefaultMultithreadWaitEmpty; //u seconds
506 0 : }
507 :
508 0 : TAMultithreadHelper::~TAMultithreadHelper() // dtor
509 : {
510 0 : size_t nmodules = fMtFlowQueueMutex.size();
511 :
512 : // just for kicks, check that all queues have correct size
513 0 : assert(nmodules == fMtFlowQueue.size());
514 0 : assert(nmodules == fMtFlagQueue.size());
515 0 : assert(nmodules == fMtFlowQueueMutex.size());
516 :
517 : // should not come to the destructor while threads are still running
518 0 : WaitForAllThreadsShutdown(this);
519 :
520 0 : int count = 0;
521 0 : for (size_t i=0; i<nmodules; i++) {
522 : // check that the thread is stopped
523 : //assert(!fMtThread[i].joinable());
524 : // empty the thread queue
525 0 : std::lock_guard<std::mutex> lock(fMtFlowQueueMutex[i]);
526 : //printf("TAMultithreadInfo::dtor: thread %zu has %zu queued events\n", i, fMtFlowQueue[i].size());
527 0 : while (!fMtFlowQueue[i].empty()) {
528 0 : TAFlowEvent* flow = fMtFlowQueue[i].front();
529 0 : TAFlags* flag = fMtFlagQueue[i].front();
530 0 : fMtFlowQueue[i].pop_front();
531 0 : fMtFlagQueue[i].pop_front();
532 0 : delete flow;
533 0 : delete flag;
534 0 : count++;
535 : }
536 : // implicit unlock of mutex
537 0 : }
538 0 : if (gTrace) {
539 0 : printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
540 : }
541 0 : }
542 :
543 : std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries)
544 :
545 0 : static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow)
546 : {
547 0 : assert(mt);
548 :
549 0 : if (flag == NULL) {
550 0 : flag = new TAFlags;
551 0 : *flag = 0;
552 : }
553 :
554 : //PrintQueueLength();
555 :
556 0 : while (1) {
557 0 : {
558 : //Lock and queue events
559 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
560 :
561 0 : bool full = (mt->fMtFlowQueue[i].size() >= mt->fMtQueueDepth);
562 :
563 0 : if (full && (i==0)) {
564 : //printf("MtQueueFlowEvent: queue %i is full: %zu out of max %zu entries\n", i, mt->fMtFlowQueue[i].size(), mt->fMtQueueDepth);
565 :
566 : // we will deadlock if first thread queue is full, but
567 : // it is not reading from the queue because it is waiting
568 : // to write to the queue of the next thread, which is not reading
569 : // from it's queue because it is stuck here in AddToFlowQueue().
570 :
571 : full = false;
572 : }
573 :
574 0 : if (!full || mt->fMtShutdownRequested) {
575 0 : mt->fMtFlowQueue[i].push_back(flow);
576 0 : mt->fMtFlagQueue[i].push_back(flag);
577 0 : return;
578 : }
579 : // Unlock when we go out of scope
580 0 : }
581 :
582 0 : usleep(mt->fMtQueueFullUSleepTime);
583 0 : }
584 : }
585 :
586 0 : static bool MtQueueWait(TAMultithreadHelper* mt)
587 : {
588 0 : if (!mt) return false;
589 :
590 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[0]);
591 :
592 0 : bool full = (mt->fMtFlowQueue[0].size() >= mt->fMtQueueDepth);
593 :
594 0 : if (!full) {
595 0 : return false;
596 : }
597 :
598 : //printf("MtQueueWait: queue 0 is full: %zu out of max %zu entries\n", mt->fMtFlowQueue[0].size(), mt->fMtQueueDepth);
599 :
600 : return true;
601 0 : }
602 :
603 : #if 0
604 : //Function to print the length of the flow queue when in multithread mode
605 : //Maybe make root update a graphical window?
606 : static void PrintMtQueueLength(TAMultithreadHelper* mt)
607 : {
608 : printf("Multithread queue lengths:\n");
609 : for (unsigned i=0; i<mt->fMtFlowQueue.size(); i++) {
610 : printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size());
611 : }
612 : }
613 : #endif
614 :
615 : //////////////////////////////////////////////////////////
616 : //
617 : // Methods of TARegister
618 : //
619 : //////////////////////////////////////////////////////////
620 :
621 : std::vector<TAFactory*> *gModules = NULL;
622 :
623 0 : TARegister::TARegister(TAFactory* m)
624 : {
625 0 : if (!gModules)
626 0 : gModules = new std::vector<TAFactory*>;
627 0 : gModules->push_back(m);
628 0 : }
629 :
630 : #if 0
631 : static double GetTimeSec()
632 : {
633 : struct timeval tv;
634 : gettimeofday(&tv,NULL);
635 : return tv.tv_sec + 0.000001*tv.tv_usec;
636 : }
637 : #endif
638 :
639 : //////////////////////////////////////////////////////////
640 : //
641 : // Profiler class
642 : //
643 : //////////////////////////////////////////////////////////
644 :
645 0 : TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name)
646 : {
647 0 : fStart = start;
648 0 : fStop = TAClockNow();
649 0 : }
650 :
651 0 : TAUserProfilerFlow::~TAUserProfilerFlow() // dtor
652 : {
653 0 : }
654 :
655 0 : double TAUserProfilerFlow::GetTimer() const
656 : {
657 0 : TAClockDuration elapsed_seconds = fStop - fStart;
658 0 : return elapsed_seconds.count();
659 : }
660 :
661 : #ifdef HAVE_ROOT
662 : #include "TH1D.h"
663 : #endif
664 : #include <map>
665 :
666 : class Profiler
667 : {
668 : private:
669 : std::string fBinaryName;
670 : std::string fBinaryPath;
671 : clock_t fStartCPU;
672 : std::chrono::system_clock::time_point fStartUser;
673 : uint32_t fMIDASStartTime;
674 : uint32_t fMIDASStopTime;
675 :
676 : //Track Queue lengths when multithreading
677 : #ifdef HAVE_ROOT
678 : int fNQueues=0;
679 : std::vector<TH1D*> fAnalysisQueue;
680 : std::atomic<int> fQueueIntervalCounter;
681 : #endif
682 :
683 : // Track Analyse TMEvent time per module (main thread)
684 : #ifdef HAVE_ROOT
685 : std::vector<TH1D*> fAnalyzeEventTimeHistograms;
686 : #endif
687 : std::vector<std::string> fModuleNames;
688 : std::vector<double> fAnalyzeEventMean;
689 : std::vector<double> fAnalyzeEventRMS;
690 : std::vector<int> fAnalyzeEventEntries;
691 :
692 : std::vector<double> fAnalyzeEventTimeMax;
693 : std::vector<double> fAnalyzeEventTimeTotal;
694 :
695 : //Track Analyse flow event time per module (can be multiple threads)
696 : #ifdef HAVE_ROOT
697 : std::vector<TH1D*> fAnalyzeFlowEventTimeHistograms;
698 : #endif
699 : std::vector<double> fAnalyzeFlowEventMean;
700 : std::vector<double> fAnalyzeFlowEventRMS;
701 : std::vector<int> fAnalyzeFlowEventEntries;
702 :
703 : std::vector<double> fAnalyzeFlowEventTimeMax;
704 : std::vector<double> fAnalyzeFlowEventTimeTotal;
705 : #ifdef HAVE_ROOT
706 : //Track user profiling
707 : std::map<unsigned int,int> fUserMap;
708 : std::vector<TH1D*> fUserHistograms;
709 : std::vector<double> fTotalUserTime;
710 : std::vector<double> fMaxUserTime;
711 :
712 : // Number of events between samples
713 : const int fQueueInterval = 100;
714 : #endif
715 :
716 : public:
717 : Profiler( const int queue_interval_check );
718 : ~Profiler();
719 : void Begin(TARunInfo* runinfo,const std::vector<TARunObject*> fRunRun );
720 : // Function for profiling the 'main' thread (that unpacks TMEvents)
721 : void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
722 : // Function for profiling module threads
723 : void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
724 : // Extra function for users custom profiling windows
725 : void LogUserWindows(TAFlags* flag, TAFlowEvent* flow);
726 : void AddModuleMap( const char* UserProfileName, unsigned long hash);
727 : void LogMTQueueLength(TARunInfo* runinfo);
728 : void End(TARunInfo* runinfo);
729 : void Print() const;
730 : };
731 :
732 0 : Profiler::Profiler(const int queue_interval_check)
733 : #ifdef HAVE_ROOT
734 : :
735 : fQueueIntervalCounter(0),
736 : fQueueInterval(queue_interval_check)
737 : #endif
738 : {
739 0 : if (gTrace)
740 0 : printf("Profiler::ctor\n");
741 0 : fMIDASStartTime = 0;
742 0 : fMIDASStopTime = 0;
743 0 : fStartCPU = clock();
744 0 : fStartUser = std::chrono::system_clock::now();
745 :
746 : #ifdef HAVE_ROOT
747 : #else
748 0 : fprintf(stderr, "Error: manalyzer must be built with ROOT for using the user profiling tools\n");
749 : #endif
750 0 : }
751 :
752 0 : Profiler::~Profiler()
753 : {
754 0 : if (gTrace)
755 0 : printf("Profiler::dtor\n");
756 :
757 : #ifdef HAVE_ROOT
758 : for (TH1D* h: fAnalyzeFlowEventTimeHistograms)
759 : delete h;
760 : fAnalyzeFlowEventTimeHistograms.clear();
761 : for (TH1D* h: fAnalyzeEventTimeHistograms)
762 : delete h;
763 : fAnalyzeEventTimeHistograms.clear();
764 : for( TH1D* h: fAnalysisQueue)
765 : delete h;
766 : fAnalysisQueue.clear();
767 : #endif
768 0 : }
769 :
770 0 : void Profiler::Begin(TARunInfo* runinfo, const std::vector<TARunObject*> runrun)
771 : {
772 0 : if (gTrace)
773 0 : printf("Profiler::begin\n");
774 :
775 0 : runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime);
776 :
777 : #ifdef HAVE_ROOT
778 : // Put Profiler histograms in their own folders in the output root file
779 : if (runinfo->fRoot->fOutputFile) {
780 : runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory
781 : gDirectory->mkdir("ProfilerReport")->cd();
782 : runinfo->fRoot->fOutputFile->cd("ProfilerReport");
783 : gDirectory->mkdir("AnalyzeFlowEventTime");
784 : gDirectory->mkdir("AnalyzeFlowTime");
785 : gDirectory->mkdir("MTQueueLength");
786 : }
787 :
788 : // Setup module processing time histograms
789 : // Number of bins
790 : Int_t Nbins=1000;
791 : // Array of bin edges
792 : Double_t bins[Nbins+1];
793 : // Processing time range (seconds)
794 : Double_t TimeRange = 10;
795 : // Set uneven binning to better sample fast modules with accuracy
796 : // without having a large number of bins
797 : for (int i=0; i<Nbins+1; i++) {
798 : bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
799 : }
800 :
801 : if (runinfo->fMtInfo)
802 : fNQueues = runrun.size();
803 : #endif
804 :
805 : // Per module metric setup
806 0 : for (size_t i = 0; i < runrun.size(); i++) {
807 :
808 0 : if (runrun[i]->fModuleName.empty())
809 0 : fModuleNames.push_back("Unnamed Module " + std::to_string(i));
810 : else
811 0 : fModuleNames.push_back(runrun[i]->fModuleName);
812 :
813 : // Metrics for the AnalyzeEvent function (main thread)
814 0 : fAnalyzeEventMean.push_back(0);
815 0 : fAnalyzeEventRMS.push_back(0);
816 0 : fAnalyzeEventEntries.push_back(0);
817 0 : fAnalyzeEventTimeMax.push_back(0);
818 0 : fAnalyzeEventTimeTotal.push_back(0);
819 :
820 : // Metric for the AnalyzeFlowEvent function (side threads)
821 0 : fAnalyzeFlowEventMean.push_back(0);
822 0 : fAnalyzeFlowEventRMS.push_back(0);
823 0 : fAnalyzeFlowEventEntries.push_back(0);
824 0 : fAnalyzeFlowEventTimeMax.push_back(0);
825 0 : fAnalyzeFlowEventTimeTotal.push_back(0);
826 :
827 : #ifdef HAVE_ROOT
828 : if (runinfo->fRoot->fOutputFile) {
829 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
830 : TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)),
831 : TString(fModuleNames.at(i) + " Event Proccessing Time; s"),
832 : Nbins, bins);
833 : fAnalyzeFlowEventTimeHistograms.push_back(Histo);
834 :
835 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
836 : TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"),
837 : TString(fModuleNames.at(i) + " Flow Proccessing Time; s"),
838 : Nbins, bins);
839 : fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto);
840 : } else {
841 : fAnalyzeFlowEventTimeHistograms.push_back(NULL);
842 : fAnalyzeEventTimeHistograms.push_back(NULL);
843 : }
844 :
845 : // Periodically (once every fQueueInterval events) record the
846 : // flow queue size if running in multithreaded mode
847 : if (runinfo->fMtInfo) {
848 : if (runinfo->fRoot->fOutputFile) {
849 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
850 : TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"),
851 : TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"),
852 : runinfo->fMtInfo->fMtQueueDepth*1.2,
853 : 0,
854 : runinfo->fMtInfo->fMtQueueDepth*1.2);
855 : fAnalysisQueue.push_back(QueueHisto);
856 : } else {
857 : fAnalysisQueue.push_back(NULL);
858 : }
859 : }
860 : #endif
861 : }
862 0 : }
863 :
864 0 : void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start)
865 : {
866 0 : if (gTrace)
867 0 : printf("Profiler::log\n");
868 :
869 0 : TAClock stop = TAClockNow();
870 0 : if ((*flag) & TAFlag_SKIP_PROFILE) {
871 : //Unset bit
872 0 : *flag -= TAFlag_SKIP_PROFILE;
873 0 : return;
874 : }
875 :
876 0 : std::chrono::duration<double> elapsed_seconds = stop - start;
877 0 : double dt = elapsed_seconds.count();
878 0 : fAnalyzeFlowEventTimeTotal[i] += dt;
879 0 : if (dt > fAnalyzeFlowEventTimeMax[i])
880 0 : fAnalyzeFlowEventTimeMax[i] = dt;
881 :
882 0 : fAnalyzeFlowEventMean[i] +=dt;
883 0 : fAnalyzeFlowEventRMS[i] +=dt*dt;
884 0 : fAnalyzeFlowEventEntries[i]++;
885 :
886 : #ifdef HAVE_ROOT
887 : if (fAnalyzeFlowEventTimeHistograms[i])
888 : fAnalyzeFlowEventTimeHistograms[i]->Fill(dt);
889 : #endif
890 : }
891 :
892 0 : void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start)
893 : {
894 0 : if (gTrace)
895 0 : printf("Profiler::log_AnalyzeEvent_time\n");
896 :
897 0 : TAClock stop = TAClockNow();
898 0 : if ((*flag) & TAFlag_SKIP_PROFILE) {
899 : //Unset bit
900 0 : *flag -= TAFlag_SKIP_PROFILE;
901 0 : return;
902 : }
903 :
904 0 : std::chrono::duration<double> elapsed_seconds = stop - start;
905 0 : double dt = elapsed_seconds.count();
906 0 : fAnalyzeEventTimeTotal[i] += dt;
907 0 : if (dt > fAnalyzeEventTimeMax[i])
908 0 : fAnalyzeEventTimeMax[i] = dt;
909 :
910 0 : fAnalyzeEventMean[i] +=dt;
911 0 : fAnalyzeEventRMS[i] +=dt*dt;
912 0 : fAnalyzeEventEntries[i]++;
913 :
914 : #ifdef HAVE_ROOT
915 : if (fAnalyzeEventTimeHistograms[i])
916 : fAnalyzeEventTimeHistograms[i]->Fill(dt);
917 : #endif
918 :
919 : }
920 :
921 0 : void Profiler::LogMTQueueLength(TARunInfo* runinfo)
922 : {
923 0 : if (gTrace)
924 0 : printf("Profiler::log_mt_queue_length\n");
925 :
926 : #ifdef HAVE_ROOT
927 : fQueueIntervalCounter++;
928 : if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) {
929 : for (int i=0; i<fNQueues; i++) {
930 : int j=0;
931 : { //Lock guard
932 : std::lock_guard<std::mutex> lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]);
933 : j=runinfo->fMtInfo->fMtFlowQueue[i].size();
934 : }
935 : fAnalysisQueue.at(i)->Fill(j);
936 : }
937 : }
938 : #endif
939 0 : }
940 :
941 :
942 0 : void Profiler::LogUserWindows(TAFlags* flag, TAFlowEvent* flow)
943 : {
944 0 : if (gTrace)
945 0 : printf("Profiler::LogUserWindows\n");
946 :
947 : #ifdef HAVE_ROOT
948 : //Clocks unfold backwards...
949 : std::vector<TAFlowEvent*> flowArray;
950 : int FlowEvents=0;
951 : TAFlowEvent* f = flow;
952 : while (f) {
953 : flowArray.push_back(f);
954 : f=f->fNext;
955 : FlowEvents++;
956 : }
957 : for (int ii=FlowEvents-1; ii>=0; ii--) {
958 : f=flowArray[ii];
959 : TAUserProfilerFlow* timer=dynamic_cast<TAUserProfilerFlow*>(f);
960 : if (timer) {
961 : const char* name = timer->fModuleName.c_str();
962 : unsigned int hash = std::hash<std::string>{}(timer->fModuleName);
963 : if (!fUserMap.count(hash))
964 : AddModuleMap(name,hash);
965 : double dt=999.;
966 : dt=timer->GetTimer();
967 : int i = fUserMap[hash];
968 : fTotalUserTime[i] += dt;
969 : if (dt > fMaxUserTime[i])
970 : fMaxUserTime.at(i) = dt;
971 : fUserHistograms.at(i)->Fill(dt);
972 : }
973 : }
974 : #endif
975 0 : }
976 :
977 0 : void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash)
978 : {
979 0 : if (gTrace)
980 0 : printf("Profiler::AddModuleMap\n");
981 :
982 0 : std::lock_guard<std::mutex> lock(TAMultithreadHelper::gfLock);
983 :
984 : #ifdef HAVE_ROOT
985 : gDirectory->cd("/ProfilerReport");
986 : fUserMap[hash] = fUserHistograms.size();
987 : Int_t Nbins = 100;
988 : Double_t bins[Nbins+1];
989 : Double_t TimeRange = 10; //seconds
990 : for (int i=0; i<Nbins+1; i++) {
991 : bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
992 : }
993 : TH1D* Histo = new TH1D(UserProfileName,UserProfileName,Nbins,bins);
994 : fUserHistograms.push_back(Histo);
995 : fTotalUserTime.push_back(0.);
996 : fMaxUserTime.push_back(0.);
997 : #endif
998 0 : return;
999 0 : }
1000 :
1001 0 : void Profiler::End(TARunInfo* runinfo)
1002 : {
1003 0 : if (gTrace)
1004 0 : printf("Profiler::End\n");
1005 :
1006 0 : for (size_t i=0; i<fAnalyzeEventMean.size(); i++) {
1007 0 : fAnalyzeEventMean.at(i) = fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i);
1008 0 : fAnalyzeEventRMS.at(i) = fAnalyzeEventRMS.at(i) / fAnalyzeEventEntries.at(i) -
1009 0 : ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) ) *
1010 0 : ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) );
1011 : }
1012 0 : for (size_t i=0; i<fAnalyzeFlowEventMean.size(); i++) {
1013 0 : fAnalyzeFlowEventMean.at(i) = fAnalyzeFlowEventMean.at(i) / fAnalyzeFlowEventEntries.at(i);
1014 0 : fAnalyzeFlowEventRMS.at(i) = fAnalyzeFlowEventRMS.at(i) / fAnalyzeFlowEventEntries.at(i) -
1015 0 : ( fAnalyzeFlowEventMean.at(i) / fAnalyzeFlowEventEntries.at(i) ) *
1016 0 : ( fAnalyzeFlowEventMean.at(i) / fAnalyzeFlowEventEntries.at(i) );
1017 : }
1018 : #ifdef HAVE_ROOT
1019 :
1020 : if (runinfo->fRoot->fOutputFile) {
1021 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
1022 : for (TH1D* h: fAnalyzeFlowEventTimeHistograms) {
1023 : h->Write();
1024 : }
1025 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
1026 : for (TH1D* h: fAnalyzeEventTimeHistograms) {
1027 : h->Write();
1028 : }
1029 : if (runinfo->fMtInfo) {
1030 : runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
1031 : for (TH1D* h: fAnalysisQueue) {
1032 : h->Write();
1033 : }
1034 : }
1035 : }
1036 : #endif
1037 0 : }
1038 :
1039 0 : void Profiler::Print() const
1040 : {
1041 : #ifdef HAVE_ROOT
1042 : if (fAnalyzeFlowEventTimeHistograms.size()>0) {
1043 : #else
1044 0 : if (fAnalyzeEventEntries.size()>0) {
1045 : #endif
1046 0 : double AllAnalyzeFlowEventTime=0;
1047 0 : for (auto& n : fAnalyzeFlowEventTimeTotal)
1048 0 : AllAnalyzeFlowEventTime += n;
1049 0 : double AllAnalyzeEventTime=0;
1050 0 : for (auto& n : fAnalyzeEventTimeTotal)
1051 0 : AllAnalyzeEventTime += n;
1052 : //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end());
1053 0 : printf("Module average processing time\n");
1054 0 : printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
1055 0 : printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
1056 0 : printf("----------------------------------------------------------------------------------------------------------------\n");
1057 0 : for (size_t i=0; i<fModuleNames.size(); i++) {
1058 0 : printf("%-25s", fModuleNames.at(i).c_str());
1059 0 : if (fAnalyzeEventEntries.at(i))
1060 0 : printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
1061 0 : fAnalyzeEventEntries.at(i),
1062 0 : fAnalyzeEventMean.at(i)*1000.,
1063 0 : fAnalyzeEventRMS.at(i)*1000.,
1064 0 : fAnalyzeEventTimeMax.at(i)*1000., //ms
1065 0 : fAnalyzeEventTimeTotal.at(i)); //s
1066 : else
1067 0 : printf("\t-\t-\t-\t-\t-");
1068 :
1069 0 : if (fAnalyzeFlowEventEntries.at(i))
1070 0 : printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
1071 0 : fAnalyzeFlowEventEntries.at(i),
1072 0 : fAnalyzeFlowEventMean.at(i)*1000.,
1073 0 : fAnalyzeFlowEventRMS.at(i)*1000.,
1074 0 : fAnalyzeFlowEventTimeMax.at(i)*1000., //ms
1075 0 : fAnalyzeFlowEventTimeTotal.at(i)); //s
1076 : else
1077 0 : printf("\t-\t-\t-\t-\t-");
1078 0 : printf("\n");
1079 : }
1080 0 : printf("----------------------------------------------------------------------------------------------------------------\n");
1081 0 : printf(" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
1082 0 : printf(" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
1083 : #ifdef HAVE_ROOT
1084 : if (fUserHistograms.size()) {
1085 : printf("Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
1086 : printf("----------------------------------------------------------------------\n");
1087 : for (size_t i=0; i<fUserHistograms.size(); i++) {
1088 : printf("%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",fUserHistograms.at(i)->GetTitle(),
1089 : (int)fUserHistograms.at(i)->GetEntries(),
1090 : fUserHistograms.at(i)->GetMean()*1000., //ms
1091 : fUserHistograms.at(i)->GetRMS()*1000., //ms
1092 : fMaxUserTime.at(i)*1000., //ms
1093 : fTotalUserTime.at(i)); //s
1094 : }
1095 : printf("----------------------------------------------------------------------\n");
1096 : } else {
1097 : printf("----------------------------------------------------------------------------------------------------------------\n");
1098 : }
1099 : #else
1100 0 : fprintf(stderr, "To use custom profile windows, please build rootana with ROOT\n");
1101 : #endif
1102 : }
1103 :
1104 : //CPU and Wall clock time:
1105 0 : double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC;
1106 0 : std::chrono::duration<double> usertime = std::chrono::system_clock::now() - fStartUser;
1107 0 : printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
1108 : getenv("_"),
1109 : cputime,
1110 : usertime.count(),
1111 0 : 100.*cputime/usertime.count());
1112 0 : }
1113 :
1114 : //////////////////////////////////////////////////////////
1115 : //
1116 : // RunHandler class
1117 : //
1118 : //////////////////////////////////////////////////////////
1119 :
1120 : class Profiler;
1121 :
1122 0 : static bool compare_order(const TARunObject* a, const TARunObject* b)
1123 : {
1124 0 : return a->fModuleOrder < b->fModuleOrder;
1125 : }
1126 :
1127 : class RunHandler
1128 : {
1129 : public:
1130 : TARunInfo* fRunInfo = NULL;
1131 : std::vector<TARunObject*> fRunRun;
1132 : std::vector<std::string> fArgs;
1133 : bool fMultithreadEnabled = false;
1134 : Profiler* fProfiler = NULL;
1135 : bool fProfilerEnabled = false;
1136 : int fProfilerIntervalCheck;
1137 :
1138 0 : RunHandler(const std::vector<std::string>& args, bool multithread, bool profile, int queue_interval_check) // ctor
1139 0 : {
1140 0 : fRunInfo = NULL;
1141 0 : fArgs = args;
1142 0 : fMultithreadEnabled = multithread;
1143 0 : fProfiler = NULL;
1144 0 : fProfilerEnabled = profile;
1145 0 : fProfilerIntervalCheck = queue_interval_check;
1146 0 : }
1147 :
1148 0 : ~RunHandler() // dtor
1149 : {
1150 0 : if (fRunInfo) {
1151 0 : delete fRunInfo;
1152 0 : fRunInfo = NULL;
1153 : }
1154 0 : if (fProfiler) {
1155 0 : delete fProfiler;
1156 0 : fProfiler = NULL;
1157 : }
1158 0 : }
1159 :
1160 0 : void PerModuleThread(size_t i)
1161 : {
1162 : //bool data_processing=true;
1163 0 : size_t nModules = fRunRun.size();
1164 :
1165 0 : TAMultithreadHelper* mt = fRunInfo->fMtInfo;
1166 :
1167 0 : assert(nModules == mt->fMtFlowQueueMutex.size());
1168 0 : assert(nModules == mt->fMtFlowQueue.size());
1169 0 : assert(nModules == mt->fMtFlagQueue.size());
1170 :
1171 0 : { //Lock scope
1172 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1173 0 : mt->fMtThreadIsRunning[i] = true;
1174 0 : }
1175 :
1176 0 : while (!mt->fMtShutdownRequested) {
1177 0 : TAFlowEvent* flow = NULL;
1178 0 : TAFlags* flag = NULL;
1179 :
1180 0 : { //Lock scope
1181 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1182 0 : if (!mt->fMtFlowQueue[i].empty()) {
1183 0 : flow=mt->fMtFlowQueue[i].front();
1184 0 : flag=mt->fMtFlagQueue[i].front();
1185 0 : mt->fMtFlowQueue[i].pop_front();
1186 0 : mt->fMtFlagQueue[i].pop_front();
1187 : //printf("thread %zu has %zu queued events\n", i, mt->fMtFlowQueue[i].size());
1188 : }
1189 :
1190 0 : if (flow == NULL)
1191 0 : mt->fMtThreadIsBusy[i] = false; // we will sleep
1192 : else
1193 0 : mt->fMtThreadIsBusy[i] = true; // we will analyze an event
1194 :
1195 : // implicit unlock of mutex
1196 0 : }
1197 :
1198 0 : if (flow == NULL) { // wait until queue not empty
1199 : //if (i==0)
1200 : //printf("S%zu\n", i);
1201 0 : usleep(mt->fMtQueueEmptyUSleepTime);
1202 0 : continue;
1203 : }
1204 :
1205 0 : TAClock start_time = TAClockNow();
1206 :
1207 0 : flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1208 :
1209 0 : if (fProfiler)
1210 0 : fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time);
1211 :
1212 0 : if ((*flag) & TAFlag_QUIT) { // shut down the analyzer
1213 0 : delete flow;
1214 0 : delete flag;
1215 0 : flow = NULL;
1216 0 : flag = NULL;
1217 0 : mt->fMtQuitRequested = true;
1218 0 : mt->fMtShutdownRequested = true;
1219 0 : break; // stop the thread
1220 : }
1221 :
1222 0 : if ((*flag) & TAFlag_SKIP) { // stop processing this event
1223 0 : delete flow;
1224 0 : delete flag;
1225 0 : flow = NULL;
1226 0 : flag = NULL;
1227 0 : continue;
1228 : }
1229 :
1230 0 : if (i==nModules-1) { //If I am the last module... free memory, else queue up for next module to process
1231 0 : if (fProfiler) {
1232 0 : fProfiler->LogUserWindows(flag, flow);
1233 0 : fProfiler->LogMTQueueLength(fRunInfo);
1234 : }
1235 0 : delete flow;
1236 0 : delete flag;
1237 0 : flow = NULL;
1238 0 : flag = NULL;
1239 : } else {
1240 0 : MtQueueFlowEvent(mt, i+1, flag, flow);
1241 0 : flow = NULL;
1242 0 : flag = NULL;
1243 : }
1244 : }
1245 :
1246 0 : { //Lock scope
1247 0 : std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1248 0 : mt->fMtThreadIsRunning[i] = false;
1249 0 : mt->fMtThreadIsBusy[i] = false;
1250 0 : }
1251 0 : }
1252 :
1253 0 : void CreateRun(int run_number, const char* file_name)
1254 : {
1255 0 : assert(fRunInfo == NULL);
1256 0 : assert(fRunRun.size() == 0);
1257 :
1258 0 : fRunInfo = new TARunInfo(run_number, file_name, fArgs);
1259 :
1260 0 : if (fProfilerEnabled)
1261 0 : fProfiler = new Profiler( fProfilerIntervalCheck );
1262 :
1263 0 : size_t nModules = (*gModules).size();
1264 :
1265 0 : for (size_t i=0; i<nModules; i++)
1266 0 : fRunRun.push_back((*gModules)[i]->NewRunObject(fRunInfo));
1267 :
1268 0 : std::stable_sort(fRunRun.begin(), fRunRun.end(), compare_order);
1269 :
1270 0 : if (gTrace) {
1271 0 : printf("Created %zu module run objects:\n", fRunRun.size());
1272 0 : for (size_t i=0; i<fRunRun.size(); i++) {
1273 0 : if (!fRunRun[i]->fModuleName.empty()) {
1274 0 : printf("module %2zu: \"%s\" order %d\n", i, fRunRun[i]->fModuleName.c_str(), fRunRun[i]->fModuleOrder);
1275 : } else {
1276 0 : printf("module %2zu: %p order %d\n", i, fRunRun[i], fRunRun[i]->fModuleOrder);
1277 : }
1278 : }
1279 : }
1280 :
1281 0 : if (fMultithreadEnabled) {
1282 0 : TAMultithreadHelper* mt = new TAMultithreadHelper(nModules);
1283 0 : fRunInfo->fMtInfo = mt;
1284 0 : for (size_t i=0; i<nModules; i++) {
1285 0 : if (gTrace) {
1286 0 : printf("Create fMtFlowQueue thread %zu\n", i);
1287 : }
1288 0 : mt->fMtThreads[i]=new std::thread(&RunHandler::PerModuleThread, this, i);
1289 : }
1290 : }
1291 0 : }
1292 :
1293 0 : void BeginRun()
1294 : {
1295 0 : assert(fRunInfo != NULL);
1296 0 : assert(fRunInfo->fOdb != NULL);
1297 :
1298 0 : if (fProfiler)
1299 0 : fProfiler->Begin(fRunInfo, fRunRun);
1300 :
1301 0 : for (unsigned i=0; i<fRunRun.size(); i++)
1302 0 : fRunRun[i]->BeginRun(fRunInfo);
1303 0 : }
1304 :
1305 0 : void EndRun(TAFlags* flags)
1306 : {
1307 0 : assert(fRunInfo);
1308 :
1309 : // make sure the shutdown sequence matches the description in the README file!
1310 :
1311 : // zeroth. Flush events queued for analysis before calling PreEndRun (insure
1312 : // deterministic behaviour thats the same as in single threaded mode)
1313 :
1314 0 : if (fRunInfo->fMtInfo) {
1315 0 : WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1316 : }
1317 :
1318 : // first, call PreEndRun() to tell analysis modules that there will be no more
1319 : // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more
1320 : // flow events, they to into the flow queue or into the multithread queue
1321 :
1322 0 : for (unsigned i=0; i<fRunRun.size(); i++)
1323 0 : fRunRun[i]->PreEndRun(fRunInfo);
1324 :
1325 : // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent()
1326 : // this can generate additional flow events that will be queued in the queue.
1327 :
1328 0 : AnalyzeFlowQueue(flags);
1329 :
1330 : // if in multi threaded mode, allow all the queues to drain naturally
1331 : // and shutdown the threads
1332 :
1333 0 : if (fRunInfo->fMtInfo) {
1334 0 : WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1335 0 : WaitForAllThreadsShutdown(fRunInfo->fMtInfo);
1336 0 : if (fRunInfo->fMtInfo->fMtQuitRequested) {
1337 0 : (*flags) |= TAFlag_QUIT;
1338 : }
1339 : }
1340 :
1341 : // done with processing all flow events
1342 :
1343 0 : fRunInfo->SetAfterPreEndRun();
1344 :
1345 : // all data analysis is complete
1346 :
1347 0 : for (size_t i=0; i<fRunRun.size(); i++) {
1348 : //printf("Calling EndRun() of module %zu \"%s\"\n", i, fRunRun[i]->fModuleName.c_str());
1349 0 : fRunRun[i]->EndRun(fRunInfo);
1350 : }
1351 :
1352 0 : if (fProfiler) {
1353 0 : fProfiler->End(fRunInfo);
1354 0 : fProfiler->Print();
1355 : }
1356 0 : }
1357 :
1358 0 : void NextSubrun()
1359 : {
1360 0 : assert(fRunInfo);
1361 :
1362 0 : for (unsigned i=0; i<fRunRun.size(); i++)
1363 0 : fRunRun[i]->NextSubrun(fRunInfo);
1364 0 : }
1365 :
1366 0 : void DeleteRun()
1367 : {
1368 0 : assert(fRunInfo);
1369 :
1370 0 : for (unsigned i=0; i<fRunRun.size(); i++) {
1371 0 : delete fRunRun[i];
1372 0 : fRunRun[i] = NULL;
1373 : }
1374 :
1375 0 : fRunRun.clear();
1376 0 : assert(fRunRun.size() == 0);
1377 :
1378 0 : if (fProfiler) {
1379 0 : delete fProfiler;
1380 0 : fProfiler = NULL;
1381 : }
1382 :
1383 0 : delete fRunInfo;
1384 0 : fRunInfo = NULL;
1385 0 : }
1386 :
1387 0 : void AnalyzeSpecialEvent(TMEvent* event)
1388 : {
1389 0 : for (unsigned i=0; i<fRunRun.size(); i++)
1390 0 : fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1391 0 : }
1392 :
1393 0 : TAFlowEvent* AnalyzeFlowEvent(TAFlags* flags, TAFlowEvent* flow)
1394 : {
1395 0 : for (unsigned i=0; i<fRunRun.size(); i++) {
1396 0 : TAClock start_time = TAClockNow();
1397 0 : flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1398 0 : if (fProfiler)
1399 0 : fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time);
1400 0 : if (!flow)
1401 : break;
1402 0 : if ((*flags) & TAFlag_SKIP)
1403 : break;
1404 0 : if ((*flags) & TAFlag_QUIT)
1405 : break;
1406 : }
1407 0 : return flow;
1408 : }
1409 :
1410 0 : void AnalyzeFlowQueue(TAFlags* ana_flags)
1411 : {
1412 0 : while (1) {
1413 0 : if (fRunInfo->fMtInfo)
1414 0 : if (fRunInfo->fMtInfo->fMtQuitRequested) {
1415 0 : *ana_flags |= TAFlag_QUIT;
1416 0 : break;
1417 : }
1418 :
1419 0 : TAFlowEvent* flow = fRunInfo->ReadFlowQueue();
1420 0 : if (!flow)
1421 : break;
1422 :
1423 0 : int flags = 0;
1424 0 : flow = AnalyzeFlowEvent(&flags, flow);
1425 0 : if (flow)
1426 0 : delete flow;
1427 0 : if (flags & TAFlag_QUIT) {
1428 0 : *ana_flags |= TAFlag_QUIT;
1429 0 : break;
1430 : }
1431 0 : }
1432 0 : }
1433 :
1434 0 : void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer)
1435 : {
1436 0 : assert(fRunInfo != NULL);
1437 0 : assert(fRunInfo->fOdb != NULL);
1438 0 : assert(event != NULL);
1439 0 : assert(flags != NULL);
1440 :
1441 0 : if (fRunInfo->fMtInfo)
1442 0 : if (fRunInfo->fMtInfo->fMtQuitRequested) {
1443 0 : *flags |= TAFlag_QUIT;
1444 0 : return;
1445 : }
1446 :
1447 : TAFlowEvent* flow = NULL;
1448 :
1449 0 : for (unsigned i=0; i<fRunRun.size(); i++) {
1450 0 : TAClock start_time = TAClockNow();
1451 0 : flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1452 0 : if (fProfiler)
1453 0 : fProfiler->LogAnalyzeEvent(flags, flow, i, start_time);
1454 0 : if (*flags & TAFlag_SKIP)
1455 : break;
1456 0 : if (*flags & TAFlag_QUIT)
1457 : break;
1458 : }
1459 :
1460 0 : if (flow) {
1461 0 : if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) {
1462 : // skip further processing of this event
1463 : } else {
1464 0 : if (fRunInfo->fMtInfo) {
1465 0 : MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow);
1466 0 : flow = NULL; // ownership passed to the multithread event queue
1467 : } else {
1468 0 : flow = AnalyzeFlowEvent(flags, flow);
1469 : }
1470 : }
1471 : }
1472 :
1473 0 : if (fProfiler && !fRunInfo->fMtInfo) {
1474 0 : fProfiler->LogUserWindows(flags, flow);
1475 : }
1476 :
1477 0 : if (*flags & TAFlag_WRITE)
1478 0 : if (writer)
1479 0 : TMWriteEvent(writer, event);
1480 :
1481 0 : if (flow)
1482 0 : delete flow;
1483 :
1484 0 : if (*flags & TAFlag_QUIT)
1485 : return;
1486 :
1487 0 : if (fRunInfo->fMtInfo)
1488 0 : if (fRunInfo->fMtInfo->fMtQuitRequested) {
1489 0 : *flags |= TAFlag_QUIT;
1490 0 : return;
1491 : }
1492 :
1493 0 : AnalyzeFlowQueue(flags);
1494 : }
1495 : };
1496 :
1497 0 : TAFlowEvent* TARunInfo::ReadFlowQueue()
1498 : {
1499 0 : if (fFlowQueue.empty())
1500 : return NULL;
1501 :
1502 0 : TAFlowEvent* flow = fFlowQueue.front();
1503 0 : fFlowQueue.pop_front();
1504 0 : return flow;
1505 : }
1506 :
1507 0 : void TARunInfo::AddToFlowQueue(TAFlowEvent* flow)
1508 : {
1509 0 : if (fAfterPreEndRun) {
1510 0 : fprintf(stderr, "Error: Calling AddToFlowQueue() after PreEndRun() is not permitted.\n");
1511 0 : delete flow;
1512 0 : return;
1513 : }
1514 :
1515 0 : if (fMtInfo) {
1516 : // call MtQueueFlowEvent with wait=false to avoid deadlock
1517 0 : MtQueueFlowEvent(fMtInfo, 0, NULL, flow);
1518 : } else {
1519 0 : fFlowQueue.push_back(flow);
1520 : }
1521 : }
1522 :
1523 : #ifdef HAVE_MIDAS
1524 :
1525 : #ifdef HAVE_TMFE
1526 :
1527 : #ifdef HAVE_ROOT
1528 : #include "TSystem.h"
1529 : #endif
1530 :
1531 : #include "tmfe.h"
1532 :
1533 : static bool gRunStartRequested = false;
1534 : static bool gRunStopRequested = false;
1535 :
1536 0 : class RpcHandler: public TMFeRpcHandlerInterface
1537 : {
1538 0 : TMFeResult HandleBeginRun(int run_number)
1539 : {
1540 0 : printf("RpcHandler::HandleBeginRun(%d)\n", run_number);
1541 0 : gRunStartRequested = true;
1542 0 : gRunStopRequested = false;
1543 0 : return TMFeOk();
1544 : }
1545 :
1546 0 : TMFeResult HandleEndRun(int run_number)
1547 : {
1548 0 : printf("RpcHandler::HandleEndRun(%d)\n", run_number);
1549 0 : gRunStartRequested = false;
1550 0 : gRunStopRequested = true;
1551 0 : return TMFeOk();
1552 : }
1553 :
1554 0 : TMFeResult HandleStartAbortRun(int run_number)
1555 : {
1556 0 : printf("RpcHandler::HandleStartAbortRun(%d)\n", run_number);
1557 : // run did not really start, pretend it started and immediately ended
1558 0 : gRunStartRequested = false;
1559 0 : gRunStopRequested = true;
1560 0 : return TMFeOk();
1561 : }
1562 :
1563 0 : TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result)
1564 : {
1565 0 : return TMFeOk();
1566 : }
1567 : };
1568 :
1569 0 : TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0)
1570 : {
1571 0 : assert(b != NULL);
1572 0 : assert(e != NULL);
1573 :
1574 0 : e->Reset();
1575 :
1576 0 : TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec);
1577 :
1578 0 : if (r.error_flag)
1579 0 : return r;
1580 :
1581 0 : if (e->data.size() == 0)
1582 0 : return TMFeOk();
1583 :
1584 0 : e->ParseEvent();
1585 :
1586 0 : assert(e->data.size() == e->event_header_size + e->data_size);
1587 :
1588 0 : return TMFeOk();
1589 0 : }
1590 :
1591 0 : static int ProcessMidasOnlineTmfe(const std::vector<std::string>& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1592 : {
1593 0 : TMFE *mfe = TMFE::Instance();
1594 :
1595 0 : TMFeResult r = mfe->Connect(progname, hostname, exptname);
1596 :
1597 0 : if (r.error_flag) {
1598 0 : fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1599 : return -1;
1600 : }
1601 :
1602 : //MVOdb* odb = mfe->fOdbRoot;
1603 :
1604 0 : TMEventBuffer *b = new TMEventBuffer(mfe);
1605 :
1606 : /* open event buffer */
1607 :
1608 0 : r = b->OpenBuffer(bufname);
1609 :
1610 0 : if (r.error_flag) {
1611 0 : fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1612 : return -1;
1613 : }
1614 :
1615 : /* request read cache */
1616 0 : size_t cache_size = 100000;
1617 0 : if(!strcmp(sampling_type_string,"GET_RECENT"))
1618 0 : cache_size=0;
1619 0 : r = b->SetCacheSize(cache_size, 0);
1620 :
1621 0 : if (r.error_flag) {
1622 0 : fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1623 : return -1;
1624 : }
1625 :
1626 : /* request events */
1627 :
1628 0 : r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1629 :
1630 0 : if (r.error_flag) {
1631 0 : fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1632 : return -1;
1633 : }
1634 :
1635 0 : RpcHandler* h = new RpcHandler();
1636 :
1637 0 : mfe->AddRpcHandler(h);
1638 :
1639 0 : mfe->DeregisterTransitionPause();
1640 0 : mfe->DeregisterTransitionResume();
1641 0 : mfe->SetTransitionSequenceStart(300);
1642 0 : mfe->SetTransitionSequenceStop(700);
1643 :
1644 0 : mfe->StartRpcThread();
1645 :
1646 : /* reqister event requests */
1647 :
1648 0 : RunHandler rh(args, multithread, profiler, queue_interval_check);
1649 :
1650 0 : for (unsigned i=0; i<(*gModules).size(); i++)
1651 0 : (*gModules)[i]->Init(args);
1652 :
1653 0 : if (mfe->fStateRunning) {
1654 0 : rh.CreateRun(mfe->fRunNumber, NULL);
1655 0 : rh.fRunInfo->fOdb = mfe->fOdbRoot;
1656 0 : rh.BeginRun();
1657 : }
1658 :
1659 : #ifdef HAVE_ROOT
1660 : TFile* no_run_file = NULL;
1661 : #endif
1662 :
1663 0 : TMEvent e;
1664 :
1665 0 : while (!mfe->fShutdownRequested) {
1666 0 : bool do_sleep = true;
1667 :
1668 0 : if (gRunStartRequested) {
1669 0 : gRunStartRequested = false;
1670 :
1671 0 : if (rh.fRunInfo) {
1672 0 : TAFlags flags = 0;
1673 0 : rh.EndRun(&flags);
1674 0 : rh.fRunInfo->fOdb = NULL;
1675 0 : rh.DeleteRun();
1676 : }
1677 :
1678 : #ifdef HAVE_ROOT
1679 : if (no_run_file) {
1680 : if (gTrace) {
1681 : printf("JSROOT close file\n");
1682 : }
1683 :
1684 : no_run_file->Close();
1685 : delete no_run_file;
1686 : no_run_file = NULL;
1687 : }
1688 : #endif
1689 :
1690 0 : rh.CreateRun(mfe->fRunNumber, NULL);
1691 0 : rh.fRunInfo->fOdb = mfe->fOdbRoot;
1692 0 : rh.BeginRun();
1693 :
1694 0 : continue;
1695 0 : }
1696 :
1697 0 : if (gRunStopRequested) {
1698 0 : gRunStopRequested = false;
1699 :
1700 0 : if (rh.fRunInfo) {
1701 0 : TAFlags flags = 0;
1702 0 : rh.EndRun(&flags);
1703 0 : rh.fRunInfo->fOdb = NULL;
1704 0 : rh.DeleteRun();
1705 : }
1706 :
1707 0 : continue;
1708 0 : }
1709 :
1710 : //r = buf.ReceiveEvent(&e, BM_NO_WAIT);
1711 : //r = buf.ReceiveEvent(&e, BM_WAIT);
1712 : //r = buf.ReceiveEvent(&e, 8000);
1713 : //r = buf.ReceiveEvent(&e, 5000);
1714 0 : r = ReceiveEvent(b, &e, 100);
1715 :
1716 0 : if (r.error_flag) {
1717 0 : fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1718 : break;
1719 : }
1720 :
1721 : //e.PrintHeader();
1722 : //::sleep(1);
1723 :
1724 0 : if ((e.data_size > 0) && (rh.fRunInfo != NULL)) {
1725 :
1726 : //e.PrintHeader();
1727 : //e.PrintBanks(2);
1728 :
1729 0 : TAFlags flags = 0;
1730 :
1731 0 : rh.AnalyzeEvent(&e, &flags, writer);
1732 :
1733 0 : if (flags & TAFlag_QUIT) {
1734 0 : mfe->fShutdownRequested = true;
1735 : }
1736 :
1737 0 : if (num_analyze > 0) {
1738 0 : num_analyze--;
1739 0 : if (num_analyze == 0) {
1740 0 : mfe->fShutdownRequested = true;
1741 : }
1742 : }
1743 :
1744 0 : do_sleep = false;
1745 : }
1746 :
1747 : #ifdef HAVE_ROOT
1748 : if (!rh.fRunInfo) { // no run
1749 : if (!no_run_file) { // no ROOT file
1750 : if (!TARootHelper::fgOutputRootFiles.empty()) {
1751 : std::string filename = TARootHelper::fgOutputRootFiles.back();
1752 : TARootHelper::fgOutputRootFiles.clear();
1753 :
1754 : if (gTrace) {
1755 : printf("JSROOT open file \"%s\"\n", filename.c_str());
1756 : }
1757 :
1758 : no_run_file = new TFile(filename.c_str(), "READ");
1759 :
1760 : if (!no_run_file->IsOpen()) {
1761 : fprintf(stderr, "Error: cannot open ROOT file \"%s\"\n", filename.c_str());
1762 : } else {
1763 : jsroot_ReadFile(no_run_file);
1764 : }
1765 : }
1766 : }
1767 : }
1768 : #endif
1769 :
1770 : #ifdef HAVE_THTTP_SERVER
1771 : if (TARootHelper::fgHttpServer) {
1772 : int nreq = TARootHelper::fgHttpServer->ProcessRequests();
1773 : if (nreq > 0) {
1774 : do_sleep = false;
1775 : //printf("ProcessRequests() returned %d\n", nreq);
1776 : }
1777 : }
1778 : #endif
1779 : #ifdef HAVE_ROOT
1780 : if (TARootHelper::fgApp) {
1781 : gSystem->DispatchOneEvent(kTRUE);
1782 : }
1783 : #endif
1784 :
1785 0 : if (rh.fRunInfo && rh.fRunInfo->fMtInfo) {
1786 0 : while (MtQueueWait(rh.fRunInfo->fMtInfo)) {
1787 0 : mfe->Yield(0.001);
1788 : #ifdef HAVE_ROOT
1789 : if (TARootHelper::fgApp || TARootHelper::fgHttpServer) {
1790 : gSystem->DispatchOneEvent(kTRUE);
1791 : }
1792 : #endif
1793 : }
1794 : }
1795 :
1796 : //printf("do_sleep %d\n", do_sleep);
1797 :
1798 0 : if (do_sleep) {
1799 0 : mfe->Yield(0.010);
1800 : } else {
1801 0 : mfe->Yield(0);
1802 : }
1803 : }
1804 :
1805 0 : if (rh.fRunInfo) {
1806 0 : TAFlags flags = 0;
1807 0 : rh.EndRun(&flags);
1808 0 : rh.fRunInfo->fOdb = NULL;
1809 0 : rh.DeleteRun();
1810 : }
1811 :
1812 : #ifdef HAVE_ROOT
1813 : if (no_run_file) {
1814 : if (gTrace) {
1815 : printf("JSROOT close file\n");
1816 : }
1817 :
1818 : no_run_file->Close();
1819 : delete no_run_file;
1820 : no_run_file = NULL;
1821 : }
1822 : #endif
1823 :
1824 0 : for (unsigned i=0; i<(*gModules).size(); i++)
1825 0 : (*gModules)[i]->Finish();
1826 :
1827 : /* close event buffer */
1828 0 : r = b->CloseBuffer();
1829 :
1830 0 : delete b;
1831 0 : b = NULL;
1832 :
1833 0 : if (r.error_flag) {
1834 0 : fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1835 : //return -1;
1836 : }
1837 :
1838 : /* disconnect from experiment */
1839 0 : mfe->Disconnect();
1840 :
1841 0 : return 0;
1842 0 : }
1843 :
1844 : #else
1845 :
1846 : #include "TMidasOnline.h"
1847 :
1848 : #ifdef HAVE_ROOT
1849 : #include "TSystem.h"
1850 : #endif
1851 :
1852 : class OnlineHandler: public TMHandlerInterface
1853 : {
1854 : public:
1855 : RunHandler fRun;
1856 : int fNumAnalyze = 0;
1857 : TMWriterInterface* fWriter = NULL;
1858 : bool fQuit = false;
1859 : MVOdb* fOdb = NULL;
1860 :
1861 : OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector<std::string>& args, bool multithread, bool profiler, int queue_interval_check) // ctor
1862 : : fRun(args, multithread, profiler, queue_interval_check)
1863 : {
1864 : fNumAnalyze = num_analyze;
1865 : fWriter = writer;
1866 : fOdb = odb;
1867 : }
1868 :
1869 : ~OnlineHandler() // dtor
1870 : {
1871 : fWriter = NULL;
1872 : fOdb = NULL;
1873 : }
1874 :
1875 : void StartRun(int run_number)
1876 : {
1877 : fRun.CreateRun(run_number, NULL);
1878 : fRun.fRunInfo->fOdb = fOdb;
1879 : fRun.BeginRun();
1880 : }
1881 :
1882 : void Transition(int transition, int run_number, int transition_time)
1883 : {
1884 : //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time);
1885 :
1886 : if (transition == TR_START) {
1887 : if (fRun.fRunInfo) {
1888 : TAFlags flags = 0;
1889 : fRun.EndRun(&flags);
1890 : if (flags & TAFlag_QUIT)
1891 : fQuit = true;
1892 : fRun.fRunInfo->fOdb = NULL;
1893 : fRun.DeleteRun();
1894 : }
1895 : assert(fRun.fRunInfo == NULL);
1896 :
1897 : StartRun(run_number);
1898 : printf("Begin run: %d\n", run_number);
1899 : } else if (transition == TR_STOP) {
1900 : TAFlags flags = 0;
1901 : fRun.EndRun(&flags);
1902 : if (flags & TAFlag_QUIT)
1903 : fQuit = true;
1904 : fRun.fRunInfo->fOdb = NULL;
1905 : fRun.DeleteRun();
1906 : printf("End of run %d\n", run_number);
1907 : }
1908 : }
1909 :
1910 : void Event(const void* data, int data_size)
1911 : {
1912 : //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size);
1913 :
1914 : if (!fRun.fRunInfo) {
1915 : StartRun(0); // start fake run for events outside of a run
1916 : }
1917 :
1918 : TMEvent* event = new TMEvent(data, data_size);
1919 :
1920 : TAFlags flags = 0;
1921 :
1922 : fRun.AnalyzeEvent(event, &flags, fWriter);
1923 :
1924 : if (flags & TAFlag_QUIT)
1925 : fQuit = true;
1926 :
1927 : if (fNumAnalyze > 0) {
1928 : fNumAnalyze--;
1929 : if (fNumAnalyze == 0)
1930 : fQuit = true;
1931 : }
1932 :
1933 : if (event) {
1934 : delete event;
1935 : event = NULL;
1936 : }
1937 : }
1938 : };
1939 :
1940 : static int ProcessMidasOnlineOld(const std::vector<std::string>& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1941 : {
1942 : TMidasOnline *midas = TMidasOnline::instance();
1943 :
1944 : int err = midas->connect(hostname, exptname, "rootana");
1945 : if (err != 0) {
1946 : fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err);
1947 : return -1;
1948 : }
1949 :
1950 : MVOdb* odb = MakeMidasOdb(midas->fDB);
1951 :
1952 : OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check);
1953 :
1954 : midas->RegisterHandler(h);
1955 : midas->registerTransitions();
1956 :
1957 : /* reqister event requests */
1958 :
1959 : midas->eventRequest("SYSTEM",-1,-1,(1<<1));
1960 :
1961 : int run_number = 0; // midas->odbReadInt("/runinfo/Run number");
1962 : int run_state = 0; // midas->odbReadInt("/runinfo/State");
1963 :
1964 : odb->RI("runinfo/run number", &run_number);
1965 : odb->RI("runinfo/state", &run_state);
1966 :
1967 : for (unsigned i=0; i<(*gModules).size(); i++)
1968 : (*gModules)[i]->Init(args);
1969 :
1970 : if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1971 : h->StartRun(run_number);
1972 : }
1973 :
1974 : while (!h->fQuit) {
1975 : #ifdef HAVE_THTTP_SERVER
1976 : if (TARootHelper::fgHttpServer) {
1977 : TARootHelper::fgHttpServer->ProcessRequests();
1978 : }
1979 : #endif
1980 : #ifdef HAVE_ROOT
1981 : if (TARootHelper::fgApp) {
1982 : gSystem->DispatchOneEvent(kTRUE);
1983 : }
1984 : #endif
1985 : if (!TMidasOnline::instance()->poll(10))
1986 : break;
1987 : }
1988 :
1989 : if (h->fRun.fRunInfo) {
1990 : TAFlags flags = 0;
1991 : h->fRun.EndRun(&flags);
1992 : h->fRun.fRunInfo->fOdb = NULL;
1993 : h->fRun.DeleteRun();
1994 : }
1995 :
1996 : for (unsigned i=0; i<(*gModules).size(); i++)
1997 : (*gModules)[i]->Finish();
1998 :
1999 : delete h; h = NULL;
2000 : delete odb; odb = NULL;
2001 :
2002 : /* disconnect from experiment */
2003 : midas->disconnect();
2004 :
2005 : return 0;
2006 : }
2007 :
2008 : #endif
2009 : #endif
2010 :
2011 : std::vector<std::string> TARunInfo::fgFileList;
2012 : int TARunInfo::fgCurrentFileIndex = 0;
2013 :
2014 0 : static int ProcessMidasFiles(const std::vector<std::string>& files, const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
2015 : {
2016 0 : int number_of_missing_files = 0;
2017 :
2018 0 : TARunInfo::fgFileList.clear();
2019 :
2020 0 : for (unsigned i=0; i<files.size(); i++)
2021 0 : TARunInfo::fgFileList.push_back(files[i]);
2022 :
2023 0 : for (unsigned i=0; i<(*gModules).size(); i++)
2024 0 : (*gModules)[i]->Init(args);
2025 :
2026 0 : RunHandler run(args, multithread, profiler, queue_interval_check);
2027 :
2028 0 : bool done = false;
2029 :
2030 0 : for (TARunInfo::fgCurrentFileIndex = 0;
2031 0 : TARunInfo::fgCurrentFileIndex < (int)TARunInfo::fgFileList.size();
2032 : TARunInfo::fgCurrentFileIndex++) {
2033 :
2034 0 : std::string filename = TARunInfo::fgFileList[TARunInfo::fgCurrentFileIndex];
2035 :
2036 0 : TMReaderInterface *reader = TMNewReader(filename.c_str());
2037 :
2038 0 : if (reader->fError) {
2039 0 : printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str());
2040 0 : delete reader;
2041 0 : number_of_missing_files++;
2042 0 : continue;
2043 : }
2044 :
2045 0 : while (1) {
2046 0 : TMEvent* event = TMReadEvent(reader);
2047 :
2048 0 : if (!event) // EOF
2049 : break;
2050 :
2051 0 : if (event->error) {
2052 0 : delete event;
2053 : break;
2054 : }
2055 :
2056 0 : if (event->event_id == 0x8000) // begin of run event
2057 : {
2058 0 : int runno = event->serial_number;
2059 :
2060 0 : if (run.fRunInfo) {
2061 0 : if (run.fRunInfo->fRunNo == runno) {
2062 : // next subrun file, nothing to do
2063 0 : run.fRunInfo->fFileName = filename;
2064 0 : run.NextSubrun();
2065 : } else {
2066 : // file with a different run number
2067 0 : TAFlags flags = 0;
2068 0 : run.EndRun(&flags);
2069 0 : if (flags & TAFlag_QUIT) {
2070 0 : done = true;
2071 : }
2072 0 : run.DeleteRun();
2073 : }
2074 : }
2075 :
2076 0 : if (!run.fRunInfo) {
2077 0 : run.CreateRun(runno, filename.c_str());
2078 0 : run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
2079 : // Also set the source for midas::odb in case people prefer that interface
2080 0 : midas::odb::set_odb_source(midas::odb::STRING, std::string(event->GetEventData(), event->data_size));
2081 0 : run.BeginRun();
2082 : }
2083 :
2084 0 : assert(run.fRunInfo);
2085 :
2086 0 : run.AnalyzeSpecialEvent(event);
2087 :
2088 0 : if (writer)
2089 0 : TMWriteEvent(writer, event);
2090 : }
2091 0 : else if (event->event_id == 0x8001) // end of run event
2092 : {
2093 : //int runno = event->serial_number;
2094 0 : run.AnalyzeSpecialEvent(event);
2095 0 : if (writer)
2096 0 : TMWriteEvent(writer, event);
2097 :
2098 0 : if (run.fRunInfo->fOdb) {
2099 0 : delete run.fRunInfo->fOdb;
2100 0 : run.fRunInfo->fOdb = NULL;
2101 : }
2102 :
2103 0 : run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
2104 : // Also set the source for midas::odb in case people prefer that interface
2105 0 : midas::odb::set_odb_source(midas::odb::STRING, std::string(event->GetEventData(), event->data_size));
2106 : }
2107 0 : else if (event->event_id == 0x8002) // message event
2108 : {
2109 0 : run.AnalyzeSpecialEvent(event);
2110 0 : if (writer)
2111 0 : TMWriteEvent(writer, event);
2112 : }
2113 : else
2114 : {
2115 0 : if (!run.fRunInfo) {
2116 : // create a fake begin of run
2117 0 : run.CreateRun(0, filename.c_str());
2118 0 : run.fRunInfo->fOdb = MakeNullOdb();
2119 0 : run.BeginRun();
2120 : }
2121 :
2122 0 : if (num_skip > 0) {
2123 0 : num_skip--;
2124 : } else {
2125 0 : TAFlags flags = 0;
2126 :
2127 0 : run.AnalyzeEvent(event, &flags, writer);
2128 :
2129 0 : if (flags & TAFlag_QUIT)
2130 0 : done = true;
2131 :
2132 0 : if (num_analyze > 0) {
2133 0 : num_analyze--;
2134 0 : if (num_analyze == 0)
2135 0 : done = true;
2136 : }
2137 : }
2138 : }
2139 :
2140 0 : delete event;
2141 :
2142 0 : if (done)
2143 : break;
2144 :
2145 : #ifdef HAVE_ROOT
2146 : if (TARootHelper::fgApp || TARootHelper::fgHttpServer) {
2147 : gSystem->DispatchOneEvent(kTRUE);
2148 : }
2149 : #endif
2150 :
2151 0 : if (run.fRunInfo && run.fRunInfo->fMtInfo) {
2152 0 : while (MtQueueWait(run.fRunInfo->fMtInfo)) {
2153 0 : usleep(run.fRunInfo->fMtInfo->fMtQueueFullUSleepTime);
2154 : #ifdef HAVE_ROOT
2155 : if (TARootHelper::fgApp || TARootHelper::fgHttpServer) {
2156 : gSystem->DispatchOneEvent(kTRUE);
2157 : }
2158 : #endif
2159 : }
2160 : }
2161 : }
2162 :
2163 0 : reader->Close();
2164 0 : delete reader;
2165 :
2166 0 : if (done)
2167 : break;
2168 0 : }
2169 :
2170 : #ifdef HAVE_THTTP_SERVER
2171 : if (0 && TARootHelper::fgHttpServer) {
2172 : while (1) {
2173 : gSystem->DispatchOneEvent(kTRUE);
2174 : //sleep(1);
2175 : }
2176 : }
2177 : #endif
2178 :
2179 0 : if (run.fRunInfo) {
2180 0 : TAFlags flags = 0;
2181 0 : run.EndRun(&flags);
2182 0 : if (flags & TAFlag_QUIT)
2183 0 : done = true;
2184 0 : run.DeleteRun();
2185 : }
2186 :
2187 0 : for (unsigned i=0; i<(*gModules).size(); i++)
2188 0 : (*gModules)[i]->Finish();
2189 :
2190 0 : if (number_of_missing_files) {
2191 0 : printf("%d midas files were not openable\n", number_of_missing_files);
2192 : return number_of_missing_files;
2193 : }
2194 :
2195 : return 0;
2196 0 : }
2197 :
2198 0 : static int ProcessDemoMode(const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
2199 : {
2200 0 : for (unsigned i=0; i<(*gModules).size(); i++)
2201 0 : (*gModules)[i]->Init(args);
2202 :
2203 0 : RunHandler run(args, multithread, profiler, queue_interval_check);
2204 :
2205 : bool done = false;
2206 :
2207 : int runno = 1;
2208 :
2209 0 : for (unsigned i=0; true; i++) {
2210 0 : char s[256];
2211 0 : snprintf(s, sizeof(s), "%03d", i);
2212 0 : std::string filename = std::string("demo_subrun_") + s;
2213 :
2214 0 : if (!run.fRunInfo) {
2215 0 : run.CreateRun(runno, filename.c_str());
2216 0 : run.fRunInfo->fOdb = MakeNullOdb();
2217 0 : run.BeginRun();
2218 : }
2219 :
2220 : // we do not generate a fake begin of run event...
2221 : //run.AnalyzeSpecialEvent(event);
2222 :
2223 : // only switch subruns after the first subrun file
2224 0 : if (i>0) {
2225 0 : run.fRunInfo->fFileName = filename;
2226 0 : run.NextSubrun();
2227 : }
2228 :
2229 0 : TMEvent* event = new TMEvent();
2230 :
2231 0 : for (unsigned j=0; j<100; j++) {
2232 0 : event->Init(0x0001, 0xFFFF, j+1, 0, 0);
2233 0 : uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 };
2234 0 : event->AddBank("TEST", TID_DWORD, (const char*)test_data, sizeof(test_data));
2235 :
2236 0 : if (num_skip > 0) {
2237 0 : num_skip--;
2238 : } else {
2239 0 : TAFlags flags = 0;
2240 :
2241 0 : run.AnalyzeEvent(event, &flags, writer);
2242 :
2243 0 : if (flags & TAFlag_QUIT)
2244 0 : done = true;
2245 :
2246 0 : if (num_analyze > 0) {
2247 0 : num_analyze--;
2248 0 : if (num_analyze == 0)
2249 0 : done = true;
2250 : }
2251 : }
2252 :
2253 0 : if (done)
2254 : break;
2255 :
2256 : #ifdef HAVE_ROOT
2257 : if (TARootHelper::fgApp) {
2258 : gSystem->DispatchOneEvent(kTRUE);
2259 : }
2260 : #endif
2261 : }
2262 :
2263 0 : delete event;
2264 :
2265 : // we do not generate a fake end of run event...
2266 : //run.AnalyzeSpecialEvent(event);
2267 :
2268 0 : if (done)
2269 : break;
2270 0 : }
2271 :
2272 0 : if (run.fRunInfo) {
2273 0 : TAFlags flags = 0;
2274 0 : run.EndRun(&flags);
2275 0 : run.DeleteRun();
2276 : }
2277 :
2278 0 : for (unsigned i=0; i<(*gModules).size(); i++)
2279 0 : (*gModules)[i]->Finish();
2280 :
2281 0 : return 0;
2282 0 : }
2283 :
2284 : static bool gEnableShowMem = false;
2285 :
2286 : #if 0
2287 : static int ShowMem(const char* label)
2288 : {
2289 : if (!gEnableShowMem)
2290 : return 0;
2291 :
2292 : FILE* fp = fopen("/proc/self/statm","r");
2293 : if (!fp)
2294 : return 0;
2295 :
2296 : int mem = 0;
2297 : fscanf(fp,"%d",&mem);
2298 : fclose(fp);
2299 :
2300 : if (label)
2301 : printf("memory at %s is %d\n", label, mem);
2302 :
2303 : return mem;
2304 : }
2305 : #endif
2306 :
2307 : class EventDumpModule: public TARunObject
2308 : {
2309 : public:
2310 0 : EventDumpModule(TARunInfo* runinfo)
2311 0 : : TARunObject(runinfo)
2312 : {
2313 0 : if (gTrace)
2314 0 : printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo);
2315 :
2316 0 : fModuleName = "EventDump";
2317 0 : fModuleOrder = -1;
2318 0 : }
2319 :
2320 0 : ~EventDumpModule()
2321 0 : {
2322 0 : if (gTrace)
2323 0 : printf("EventDumpModule::dtor!\n");
2324 0 : }
2325 :
2326 0 : void BeginRun(TARunInfo* runinfo)
2327 : {
2328 0 : printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo);
2329 0 : }
2330 :
2331 0 : void EndRun(TARunInfo* runinfo)
2332 : {
2333 0 : printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo);
2334 0 : }
2335 :
2336 0 : void NextSubrun(TARunInfo* runinfo)
2337 : {
2338 0 : printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2339 0 : }
2340 :
2341 0 : void PauseRun(TARunInfo* runinfo)
2342 : {
2343 0 : printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo);
2344 0 : }
2345 :
2346 0 : void ResumeRun(TARunInfo* runinfo)
2347 : {
2348 0 : printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo);
2349 0 : }
2350 :
2351 0 : TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2352 : {
2353 0 : printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo);
2354 0 : event->FindAllBanks();
2355 0 : std::string h = event->HeaderToString();
2356 0 : std::string b = event->BankListToString();
2357 0 : printf("%s: %s\n", h.c_str(), b.c_str());
2358 0 : return flow;
2359 0 : }
2360 :
2361 0 : void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2362 : {
2363 0 : printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2364 0 : }
2365 : };
2366 :
2367 0 : class EventDumpModuleFactory: public TAFactory
2368 : {
2369 : public:
2370 :
2371 0 : void Init(const std::vector<std::string> &args)
2372 : {
2373 0 : if (gTrace)
2374 0 : printf("EventDumpModuleFactory::Init!\n");
2375 0 : }
2376 :
2377 0 : void Finish()
2378 : {
2379 0 : if (gTrace)
2380 0 : printf("EventDumpModuleFactory::Finish!\n");
2381 0 : }
2382 :
2383 0 : TARunObject* NewRunObject(TARunInfo* runinfo)
2384 : {
2385 0 : if (gTrace)
2386 0 : printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2387 0 : return new EventDumpModule(runinfo);
2388 : }
2389 : };
2390 :
2391 : #ifdef HAVE_ROOT
2392 : #include <TGMenu.h>
2393 : #include <TGButton.h>
2394 : #include <TBrowser.h>
2395 :
2396 : #define CTRL_QUIT 1
2397 : #define CTRL_NEXT 2
2398 : #define CTRL_CONTINUE 3
2399 : #define CTRL_PAUSE 4
2400 : #define CTRL_NEXT_FLOW 5
2401 :
2402 : #define CTRL_TBROWSER 11
2403 :
2404 : class ValueHolder
2405 : {
2406 : public:
2407 : int fValue;
2408 :
2409 : ValueHolder() // ctor
2410 : {
2411 : fValue = 0;
2412 : }
2413 : };
2414 :
2415 : class TextButton: public TGTextButton
2416 : {
2417 : public:
2418 : ValueHolder* fHolder;
2419 : int fValue;
2420 :
2421 : TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor
2422 : : TGTextButton(p, text)
2423 : {
2424 : fHolder = holder;
2425 : fValue = value;
2426 : }
2427 :
2428 : #if 0
2429 : void Pressed()
2430 : {
2431 : printf("Pressed!\n");
2432 : }
2433 :
2434 : void Released()
2435 : {
2436 : printf("Released!\n");
2437 : }
2438 : #endif
2439 :
2440 : void Clicked()
2441 : {
2442 : //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue);
2443 : if (fHolder)
2444 : fHolder->fValue = fValue;
2445 : //gSystem->ExitLoop();
2446 : }
2447 : };
2448 :
2449 : class MainWindow: public TGMainFrame
2450 : {
2451 : public:
2452 : TGPopupMenu* fMenu;
2453 : TGMenuBar* fMenuBar;
2454 : TGLayoutHints* fMenuBarItemLayout;
2455 :
2456 : TGCompositeFrame* fButtonsFrame;
2457 :
2458 : ValueHolder* fHolder;
2459 :
2460 : TextButton* fNextButton;
2461 : TextButton* fNextFlowButton;
2462 : TextButton* fContinueButton;
2463 : TextButton* fPauseButton;
2464 :
2465 : TextButton* fQuitButton;
2466 :
2467 : public:
2468 : MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor
2469 : : TGMainFrame(w, s1, s2)
2470 : {
2471 : if (gTrace)
2472 : printf("MainWindow::ctor!\n");
2473 :
2474 : fHolder = holder;
2475 : //SetCleanup(kDeepCleanup);
2476 :
2477 : SetWindowName("ROOT Analyzer Control");
2478 :
2479 : // layout the gui
2480 : fMenu = new TGPopupMenu(gClient->GetRoot());
2481 : fMenu->AddEntry("New TBrowser", CTRL_TBROWSER);
2482 : fMenu->AddEntry("-", 0);
2483 : fMenu->AddEntry("Next", CTRL_NEXT);
2484 : fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW);
2485 : fMenu->AddEntry("Continue", CTRL_CONTINUE);
2486 : fMenu->AddEntry("Pause", CTRL_PAUSE);
2487 : fMenu->AddEntry("-", 0);
2488 : fMenu->AddEntry("Quit", CTRL_QUIT);
2489 :
2490 : fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2491 :
2492 : fMenu->Associate(this);
2493 :
2494 : fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame);
2495 : fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout);
2496 : fMenuBar->Layout();
2497 :
2498 : AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2499 :
2500 : fButtonsFrame = new TGVerticalFrame(this);
2501 :
2502 : fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT);
2503 : fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW);
2504 :
2505 : fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2506 : fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2507 :
2508 : TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame);
2509 :
2510 : fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE);
2511 : fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE);
2512 :
2513 : hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2514 : hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2515 :
2516 : fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX));
2517 :
2518 : fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT);
2519 : fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2520 :
2521 : AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX));
2522 :
2523 : MapSubwindows();
2524 : Layout();
2525 : Resize(GetDefaultSize());
2526 : MapWindow();
2527 : }
2528 :
2529 : ~MainWindow() // dtor // Closing the control window closes the whole program
2530 : {
2531 : if (gTrace)
2532 : printf("MainWindow::dtor!\n");
2533 :
2534 : delete fMenu;
2535 : delete fMenuBar;
2536 : delete fMenuBarItemLayout;
2537 : }
2538 :
2539 : void CloseWindow()
2540 : {
2541 : if (gTrace)
2542 : printf("MainWindow::CloseWindow()\n");
2543 :
2544 : if (fHolder)
2545 : fHolder->fValue = CTRL_QUIT;
2546 : //gSystem->ExitLoop();
2547 : }
2548 :
2549 : Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
2550 : {
2551 : //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2);
2552 : switch (GET_MSG(msg))
2553 : {
2554 : default:
2555 : break;
2556 : case kC_COMMAND:
2557 : switch (GET_SUBMSG(msg))
2558 : {
2559 : default:
2560 : break;
2561 : case kCM_MENU:
2562 : //printf("parm1 %d\n", (int)parm1);
2563 : switch (parm1)
2564 : {
2565 : case CTRL_TBROWSER:
2566 : new TBrowser();
2567 : break;
2568 : default:
2569 : //printf("Control %d!\n", (int)parm1);
2570 : if (fHolder)
2571 : fHolder->fValue = parm1;
2572 : //gSystem->ExitLoop();
2573 : break;
2574 : }
2575 : break;
2576 : }
2577 : break;
2578 : }
2579 :
2580 : return kTRUE;
2581 : }
2582 : };
2583 : #endif
2584 :
2585 : class InteractiveModule: public TARunObject
2586 : {
2587 : public:
2588 : bool fContinue;
2589 : bool fNextFlow;
2590 : int fSkip;
2591 : #ifdef HAVE_ROOT
2592 : static ValueHolder* fgHolder;
2593 : static MainWindow *fgCtrlWindow;
2594 : #endif
2595 :
2596 0 : InteractiveModule(TARunInfo* runinfo)
2597 0 : : TARunObject(runinfo)
2598 : {
2599 0 : if (gTrace)
2600 0 : printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo);
2601 0 : fModuleName = "InteractiveModule";
2602 0 : fModuleOrder = 9999;
2603 0 : fContinue = false;
2604 0 : fNextFlow = false;
2605 0 : fSkip = 0;
2606 : #ifdef HAVE_ROOT
2607 : if (!fgHolder)
2608 : fgHolder = new ValueHolder;
2609 : if (!fgCtrlWindow && runinfo->fRoot->fgApp) {
2610 : fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2611 : }
2612 : #endif
2613 0 : }
2614 :
2615 0 : ~InteractiveModule()
2616 0 : {
2617 0 : if (gTrace)
2618 0 : printf("InteractiveModule::dtor!\n");
2619 0 : }
2620 :
2621 0 : void BeginRun(TARunInfo* runinfo)
2622 : {
2623 0 : printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo);
2624 0 : }
2625 :
2626 0 : void EndRun(TARunInfo* runinfo)
2627 : {
2628 0 : printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo);
2629 :
2630 : #ifdef HAVE_ROOT
2631 : if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2632 : fgCtrlWindow->fNextButton->SetEnabled(false);
2633 : fgCtrlWindow->fNextFlowButton->SetEnabled(false);
2634 : fgCtrlWindow->fContinueButton->SetEnabled(false);
2635 : fgCtrlWindow->fPauseButton->SetEnabled(false);
2636 : while (1) {
2637 : #ifdef HAVE_THTTP_SERVER
2638 : if (TARootHelper::fgHttpServer) {
2639 : TARootHelper::fgHttpServer->ProcessRequests();
2640 : }
2641 : #endif
2642 : #ifdef HAVE_ROOT
2643 : if (TARootHelper::fgApp) {
2644 : gSystem->DispatchOneEvent(kTRUE);
2645 : }
2646 : #endif
2647 : #ifdef HAVE_MIDAS
2648 : #ifdef HAVE_TMFE
2649 : TMFE* mfe = TMFE::Instance();
2650 : mfe->Yield(0.010);
2651 : if (mfe->fShutdownRequested) {
2652 : return;
2653 : }
2654 : #else
2655 : if (!TMidasOnline::instance()->sleep(10)) {
2656 : // FIXME: indicate that we should exit the analyzer
2657 : return;
2658 : }
2659 : #endif
2660 : #else
2661 : gSystem->Sleep(10);
2662 : #endif
2663 :
2664 : int ctrl = fgHolder->fValue;
2665 : fgHolder->fValue = 0;
2666 :
2667 : switch (ctrl) {
2668 : case CTRL_QUIT:
2669 : return;
2670 : case CTRL_NEXT:
2671 : return;
2672 : case CTRL_CONTINUE:
2673 : return;
2674 : }
2675 : }
2676 : }
2677 : #endif
2678 0 : }
2679 :
2680 0 : void PauseRun(TARunInfo* runinfo)
2681 : {
2682 0 : printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo);
2683 0 : }
2684 :
2685 0 : void ResumeRun(TARunInfo* runinfo)
2686 : {
2687 0 : printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo);
2688 0 : }
2689 :
2690 0 : void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags)
2691 : {
2692 : #ifdef HAVE_ROOT
2693 : if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2694 : while (1) {
2695 : #ifdef HAVE_THTTP_SERVER
2696 : if (TARootHelper::fgHttpServer) {
2697 : TARootHelper::fgHttpServer->ProcessRequests();
2698 : }
2699 : #endif
2700 : #ifdef HAVE_ROOT
2701 : if (TARootHelper::fgApp) {
2702 : gSystem->DispatchOneEvent(kTRUE);
2703 : }
2704 : #endif
2705 : #ifdef HAVE_MIDAS
2706 : #ifdef HAVE_TMFE
2707 : TMFE* mfe = TMFE::Instance();
2708 : mfe->Yield(0.010);
2709 : if (mfe->fShutdownRequested) {
2710 : *flags |= TAFlag_QUIT;
2711 : return;
2712 : }
2713 : #else
2714 : if (!TMidasOnline::instance()->sleep(10)) {
2715 : *flags |= TAFlag_QUIT;
2716 : return;
2717 : }
2718 : #endif
2719 : #else
2720 : gSystem->Sleep(10);
2721 : #endif
2722 :
2723 : int ctrl = fgHolder->fValue;
2724 : fgHolder->fValue = 0;
2725 :
2726 : switch (ctrl) {
2727 : case CTRL_QUIT:
2728 : *flags |= TAFlag_QUIT;
2729 : return;
2730 : case CTRL_NEXT:
2731 : return;
2732 : case CTRL_NEXT_FLOW:
2733 : fNextFlow = true;
2734 : return;
2735 : case CTRL_CONTINUE:
2736 : fContinue = true;
2737 : return;
2738 : }
2739 : }
2740 : }
2741 : #endif
2742 :
2743 0 : while (1) {
2744 0 : char str[256];
2745 0 : fprintf(stdout, "manalyzer> "); fflush(stdout);
2746 0 : char* s = fgets(str, sizeof(str)-1, stdin);
2747 :
2748 0 : if (s == NULL) {
2749 : // EOF
2750 0 : *flags |= TAFlag_QUIT;
2751 0 : return;
2752 : }
2753 :
2754 0 : printf("command [%s]\n", str);
2755 :
2756 0 : if (str[0] == 'h') { // "help"
2757 0 : printf("Interactive manalyzer commands:\n");
2758 0 : printf(" q - quit\n");
2759 0 : printf(" h - help\n");
2760 0 : printf(" c - continue until next TAFlag_DISPLAY event\n");
2761 0 : printf(" n - next event\n");
2762 0 : printf(" aNNN - analyze N events, i.e. \"a10\"\n");
2763 : } else if (str[0] == 'q') { // "quit"
2764 0 : *flags |= TAFlag_QUIT;
2765 0 : return;
2766 : } else if (str[0] == 'n') { // "next"
2767 : return;
2768 : } else if (str[0] == 'c') { // "continue"
2769 0 : fContinue = true;
2770 0 : return;
2771 : } else if (str[0] == 'a') { // "analyze" N events
2772 0 : int num = atoi(str+1);
2773 0 : printf("Analyzing %d events\n", num);
2774 0 : if (num > 0) {
2775 0 : fSkip = num-1;
2776 : }
2777 0 : return;
2778 : }
2779 0 : }
2780 : }
2781 :
2782 0 : TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2783 : {
2784 0 : printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str());
2785 :
2786 : #ifdef HAVE_ROOT
2787 : if (fgHolder->fValue == CTRL_QUIT) {
2788 : *flags |= TAFlag_QUIT;
2789 : return flow;
2790 : } else if (fgHolder->fValue == CTRL_PAUSE) {
2791 : fContinue = false;
2792 : }
2793 : #endif
2794 :
2795 0 : if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2796 : return flow;
2797 : } else {
2798 0 : fContinue = false;
2799 : }
2800 :
2801 0 : if (fSkip > 0) {
2802 0 : fSkip--;
2803 0 : return flow;
2804 : }
2805 :
2806 0 : InteractiveLoop(runinfo, flags);
2807 :
2808 0 : return flow;
2809 : }
2810 :
2811 0 : TAFlowEvent* AnalyzeFlowEvent(TARunInfo* runinfo, TAFlags* flags, TAFlowEvent* flow)
2812 : {
2813 0 : printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo);
2814 :
2815 : #ifdef HAVE_ROOT
2816 : if (fgHolder->fValue == CTRL_QUIT) {
2817 : *flags |= TAFlag_QUIT;
2818 : return flow;
2819 : } else if (fgHolder->fValue == CTRL_PAUSE) {
2820 : fContinue = false;
2821 : }
2822 : #endif
2823 :
2824 0 : if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2825 : return flow;
2826 : }
2827 :
2828 0 : fNextFlow = false;
2829 :
2830 0 : InteractiveLoop(runinfo, flags);
2831 :
2832 0 : return flow;
2833 : }
2834 :
2835 0 : void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2836 : {
2837 0 : if (gTrace)
2838 0 : printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2839 0 : }
2840 : };
2841 :
2842 : #ifdef HAVE_ROOT
2843 : MainWindow* InteractiveModule::fgCtrlWindow = NULL;
2844 : ValueHolder* InteractiveModule::fgHolder = NULL;
2845 : #endif
2846 :
2847 0 : class InteractiveModuleFactory: public TAFactory
2848 : {
2849 : public:
2850 :
2851 0 : void Init(const std::vector<std::string> &args)
2852 : {
2853 0 : if (gTrace)
2854 0 : printf("InteractiveModuleFactory::Init!\n");
2855 0 : }
2856 :
2857 0 : void Finish()
2858 : {
2859 0 : if (gTrace)
2860 0 : printf("InteractiveModuleFactory::Finish!\n");
2861 0 : }
2862 :
2863 0 : TARunObject* NewRunObject(TARunInfo* runinfo)
2864 : {
2865 0 : if (gTrace)
2866 0 : printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2867 0 : return new InteractiveModule(runinfo);
2868 : }
2869 : };
2870 :
2871 : //////////////////////////////////////////////////////////
2872 : //
2873 : // main program
2874 : //
2875 : //////////////////////////////////////////////////////////
2876 :
2877 0 : static void help()
2878 : {
2879 0 : printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2880 0 : printf("\n");
2881 0 : printf("-h: print this help message\n");
2882 0 : printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2883 0 : printf("\n");
2884 0 : printf("-Hhostname: connect to MIDAS experiment on given host\n");
2885 0 : printf("-Eexptname: connect to this MIDAS experiment\n");
2886 0 : printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2887 0 : printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2888 0 : printf("--midas-exptname EXPTNAME -- connect to given experiment\n");
2889 0 : printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2890 0 : printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2891 0 : printf("--midas-event-id III -- receive only events with matching event ID\n");
2892 0 : printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2893 0 : printf("\n");
2894 0 : printf("-oOutputfile.mid: write selected events into this file\n");
2895 0 : printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2896 0 : printf("-eNNN: Number of events to analyze, 0=unlimited\n");
2897 0 : printf("-sNNN: Number of events to skip before starting analysis\n");
2898 0 : printf("\n");
2899 0 : printf("--dump: activate the event dump module\n");
2900 0 : printf("\n");
2901 0 : printf("-t: Enable tracing of constructors, destructors and function calls\n");
2902 0 : printf("-m: Enable memory leak debugging\n");
2903 0 : printf("-g: Enable graphics display when processing data files\n");
2904 0 : printf("-i: Enable intractive mode\n");
2905 0 : printf("\n");
2906 0 : printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2907 0 : printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength);
2908 0 : printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty);
2909 0 : printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull);
2910 0 : printf("\n");
2911 0 : printf("--no-profiler: Turn off manalyzer module profiler\n");
2912 0 : printf("--pqiNNN: Profile multithread queue lengths every NNN events \n");
2913 : #ifdef HAVE_ROOT
2914 : printf("\n");
2915 : printf("-Doutputdirectory: Specify output root file directory\n");
2916 : printf("-Ooutputfile.root: Specify output root file filename\n");
2917 : printf("--jsroot: After analysis is finished, keep jsroot running\n");
2918 : #endif
2919 0 : printf("\n");
2920 0 : printf("--: All following arguments are passed to the analyzer modules Init() method\n");
2921 0 : printf("\n");
2922 0 : printf("Analyzer modules usage:\n");
2923 0 : if (gModules) {
2924 0 : for (unsigned i=0; i<(*gModules).size(); i++) {
2925 0 : (*gModules)[i]->Usage();
2926 : }
2927 : }
2928 0 : printf("\n");
2929 0 : printf("Example1: analyze online data: ./manalyzer.exe -R9091\n");
2930 0 : printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2931 0 : exit(1);
2932 : }
2933 :
2934 : // duplicate c++20 std::string s.starts_with()
2935 :
2936 0 : static bool starts_with(const std::string& s, const char* prefix)
2937 : {
2938 0 : return (s.substr(0, strlen(prefix)) == prefix);
2939 : }
2940 :
2941 : // Main function call
2942 :
2943 0 : int manalyzer_main(int argc, char* argv[])
2944 : {
2945 0 : setbuf(stdout, NULL);
2946 0 : setbuf(stderr, NULL);
2947 :
2948 0 : signal(SIGILL, SIG_DFL);
2949 0 : signal(SIGBUS, SIG_DFL);
2950 0 : signal(SIGSEGV, SIG_DFL);
2951 0 : signal(SIGPIPE, SIG_DFL);
2952 :
2953 0 : std::vector<std::string> args;
2954 0 : for (int i=0; i<argc; i++) {
2955 0 : if (strcmp(argv[i],"-h")==0)
2956 0 : help(); // does not return
2957 0 : args.push_back(argv[i]);
2958 : }
2959 :
2960 0 : int httpPort = 0;
2961 0 : int num_skip = 0;
2962 0 : int num_analyze = 0;
2963 :
2964 0 : TMWriterInterface *writer = NULL;
2965 :
2966 0 : bool event_dump = false;
2967 0 : bool demo_mode = false;
2968 : #ifdef HAVE_ROOT
2969 : bool root_graphics = false;
2970 : #endif
2971 0 : bool interactive = false;
2972 :
2973 0 : bool multithread = false;
2974 :
2975 : #ifdef HAVE_ROOT
2976 : bool enable_jsroot = false;
2977 : #endif
2978 :
2979 : #ifdef HAVE_ROOT
2980 : bool performance_profiler = true;
2981 : #else
2982 0 : bool performance_profiler = false;
2983 : #endif
2984 0 : int snap_shot_queue_length = 100;
2985 :
2986 0 : std::vector<std::string> files;
2987 0 : std::vector<std::string> modargs;
2988 :
2989 : #ifdef HAVE_MIDAS
2990 0 : std::string midas_hostname = "";
2991 0 : std::string midas_exptname = "";
2992 0 : std::string midas_progname = "ana";
2993 0 : std::string midas_buffer = "SYSTEM";
2994 : //std::string midas_sampling = "GET_ALL";
2995 0 : std::string midas_sampling = "GET_NONBLOCKING";
2996 0 : int midas_event_id = -1;
2997 0 : int midas_trigger_mask = -1;
2998 : #endif
2999 :
3000 0 : for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
3001 0 : std::string arg = args[i];
3002 : //printf("argv[%d] is %s\n",i,arg);
3003 :
3004 0 : if (arg == "--") {
3005 0 : for (unsigned j=i+1; j<args.size(); j++)
3006 0 : modargs.push_back(args[j]);
3007 0 : break;
3008 0 : } else if (arg == "--dump") {
3009 : event_dump = true;
3010 0 : } else if (arg == "--demo") {
3011 : demo_mode = true;
3012 : num_analyze = 100;
3013 : #ifdef HAVE_ROOT
3014 : } else if (arg == "-g") {
3015 : root_graphics = true;
3016 : #endif
3017 0 : } else if (arg == "-i") {
3018 : interactive = true;
3019 0 : } else if (arg == "-t") {
3020 0 : gTrace = true;
3021 0 : TMReaderInterface::fgTrace = true;
3022 0 : TMWriterInterface::fgTrace = true;
3023 0 : } else if (starts_with(arg, "-o")) {
3024 0 : writer = TMNewWriter(arg.c_str()+2);
3025 0 : } else if (starts_with(arg, "-s")) {
3026 0 : num_skip = atoi(arg.c_str()+2);
3027 0 : } else if (starts_with(arg, "-e")) {
3028 0 : num_analyze = atoi(arg.c_str()+2);
3029 0 : } else if (starts_with(arg, "-m")) { // Enable memory debugging
3030 0 : gEnableShowMem = true;
3031 0 : } else if (starts_with(arg, "-R")) { // Set the ROOT THttpServer HTTP port
3032 0 : httpPort = atoi(arg.c_str()+2);
3033 : #ifdef HAVE_MIDAS
3034 0 : } else if (starts_with(arg, "-H")) {
3035 0 : midas_hostname = arg.c_str()+2;
3036 0 : } else if (starts_with(arg, "-E")) {
3037 0 : midas_exptname = arg.c_str()+2;
3038 0 : } else if (arg == "--midas-progname") {
3039 0 : midas_progname = args[i+1]; i++;
3040 0 : } else if (arg == "--midas-hostname") {
3041 0 : midas_hostname = args[i+1]; i++;
3042 0 : } else if (arg == "--midas-exptname") {
3043 0 : midas_exptname = args[i+1]; i++;
3044 0 : } else if (arg == "--midas-buffer") {
3045 0 : midas_buffer = args[i+1]; i++;
3046 0 : } else if (arg == "--midas-sampling") {
3047 0 : midas_sampling = args[i+1]; i++;
3048 0 : } else if (arg == "--midas-event-id") {
3049 0 : midas_event_id = atoi(args[i+1].c_str()); i++;
3050 0 : } else if (arg == "--midas-trigger-mask") {
3051 0 : midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
3052 : #endif
3053 0 : } else if (starts_with(arg, "--mtql")) {
3054 0 : gDefaultMultithreadQueueLength = atoi(arg.c_str()+6);
3055 0 : } else if (starts_with(arg, "--mtse")) {
3056 0 : gDefaultMultithreadWaitEmpty = atoi(arg.c_str()+6);
3057 0 : } else if (starts_with(arg, "--mtsf")) {
3058 0 : gDefaultMultithreadWaitFull = atoi(arg.c_str()+6);
3059 0 : } else if (arg == "--mt") {
3060 : multithread=true;
3061 0 : } else if (arg == "--no-profiler") {
3062 : performance_profiler = 0;
3063 0 : } else if (starts_with(arg, "--pqi")) {
3064 0 : snap_shot_queue_length = atoi(arg.c_str()+5);
3065 : #ifdef HAVE_ROOT
3066 : } else if (starts_with(arg, "-O")) {
3067 : TARootHelper::fOutputFileName = arg.c_str()+2;
3068 : } else if (starts_with(arg, "-D")) {
3069 : TARootHelper::fOutputDirectory = arg.c_str()+2;
3070 : } else if (arg == "--jsroot") {
3071 : enable_jsroot = true;
3072 : #endif
3073 0 : } else if (arg == "-h") {
3074 0 : help(); // does not return
3075 0 : } else if (arg[0] == '-') {
3076 0 : help(); // does not return
3077 : } else {
3078 0 : files.push_back(args[i]);
3079 : }
3080 0 : }
3081 :
3082 0 : if (!gModules)
3083 0 : gModules = new std::vector<TAFactory*>;
3084 :
3085 0 : if ((*gModules).size() == 0)
3086 : event_dump = true;
3087 :
3088 0 : if (event_dump)
3089 0 : (*gModules).push_back(new EventDumpModuleFactory);
3090 :
3091 0 : if (interactive)
3092 0 : (*gModules).push_back(new InteractiveModuleFactory);
3093 :
3094 0 : if (gTrace) {
3095 0 : printf("Registered modules: %zu\n", (*gModules).size());
3096 : }
3097 :
3098 : #ifdef HAVE_ROOT
3099 : if (multithread) {
3100 : // see https://root.cern/manual/multi_threading/
3101 : ROOT::EnableImplicitMT();
3102 : ROOT::EnableThreadSafety();
3103 : }
3104 :
3105 : if (root_graphics) {
3106 : TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0);
3107 : }
3108 :
3109 : TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms");
3110 : TARootHelper::fgDir->cd();
3111 : #endif
3112 :
3113 0 : if (httpPort) {
3114 : #ifdef HAVE_THTTP_SERVER
3115 : char str[256];
3116 : snprintf(str, sizeof(str), "http:127.0.0.1:%d?cors", httpPort);
3117 : THttpServer *s = new THttpServer(str);
3118 : //s->SetTimer(100, kFALSE);
3119 : TARootHelper::fgHttpServer = s;
3120 : #else
3121 0 : fprintf(stderr,"ERROR: No support for the THttpServer!\n");
3122 : #endif
3123 : }
3124 :
3125 0 : for (unsigned i=0; i<files.size(); i++) {
3126 0 : printf("file[%d]: %s\n", i, files[i].c_str());
3127 : }
3128 0 : int exit_state = 0;
3129 0 : if (demo_mode) {
3130 0 : exit_state = ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3131 0 : } else if (files.size() > 0) {
3132 0 : exit_state = ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3133 : } else {
3134 : #ifdef HAVE_MIDAS
3135 : #ifdef HAVE_TMFE
3136 0 : exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3137 : #else
3138 : exit_state = ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
3139 : #endif
3140 : #endif
3141 : }
3142 :
3143 0 : if (writer) {
3144 0 : writer->Close();
3145 0 : delete writer;
3146 0 : writer = NULL;
3147 : }
3148 :
3149 : #ifdef HAVE_ROOT
3150 : if (enable_jsroot && TARootHelper::fgHttpServer) {
3151 : printf("Starting jsroot. Use Ctrl-C to stop.\n");
3152 : jsroot(TARootHelper::fgOutputRootFiles);
3153 : }
3154 : #endif
3155 :
3156 0 : return exit_state;
3157 0 : }
3158 :
3159 : /* emacs
3160 : * Local Variables:
3161 : * tab-width: 8
3162 : * c-basic-offset: 3
3163 : * indent-tabs-mode: nil
3164 : * End:
3165 : */
|