TDbiResultSetAgg.cxx
Go to the documentation of this file.00001
00002
00003
00004 #include <algorithm>
00005 #include <map>
00006 #include <vector>
00007
00008 #include "TDbiCache.hxx"
00009 #include "TDbiBinaryFile.hxx"
00010 #include "TDbiDBProxy.hxx"
00011 #include "TDbiResultSetAgg.hxx"
00012 #include "TDbiResultSetNonAgg.hxx"
00013 #include "TDbiResultKey.hxx"
00014 #include "TDbiInRowStream.hxx"
00015 #include "TDbiTableRow.hxx"
00016 #include "TDbiTimerManager.hxx"
00017 #include "TDbiValidityRecBuilder.hxx"
00018 #include <TSK_DBI_Log.hxx>
00019 #include <MsgFormat.h>
00020 using std::endl;
00021 #include "TVldRange.hxx"
00022 #include "UtilString.hxx"
00023
00024 ClassImp(TDbiResultSetAgg)
00025
00026 typedef vector<const TDbiResultSet*>::const_iterator ConstResultItr_t;
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 TDbiResultSetAgg::TDbiResultSetAgg(const string& tableName,
00070 const TDbiTableRow* tableRow,
00071 TDbiCache* cache,
00072 const TDbiValidityRecBuilder* vrecBuilder,
00073 const TDbiDBProxy* proxy,
00074 const string& sqlQualifiers) :
00075 TDbiResultSet(0,0,sqlQualifiers),
00076 fSize(0)
00077 {
00078
00079
00080 typedef map<UInt_t,UInt_t> seqToRow_t;
00081
00082 SK_DBI_Trace( "Creating TDbiResultSetAgg" << " ");
00083 SetTableName(tableName);
00084 if ( ! tableRow || ! cache || ! vrecBuilder || ! proxy ) return;
00085
00086
00087
00088
00089
00090 string::size_type loc = sqlQualifiers.find(';');
00091 string::size_type loc2 = sqlQualifiers.find(';',loc+1);
00092 string sqlData = string(sqlQualifiers,loc+1,loc2-loc-1);
00093 string fillOpts = string(sqlQualifiers,loc2+1);
00094
00095
00096
00097
00098 vector<UInt_t> reqSeqNos;
00099 seqToRow_t seqToRow;
00100
00101
00102 UInt_t dbNo = 0;
00103 Int_t maxRowNo = vrecBuilder->GetNumValidityRec() - 1;
00104
00105
00106
00107 for ( Int_t rowNo = 1; rowNo <= maxRowNo; ++rowNo ) {
00108 const TDbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00109
00110
00111 const TDbiResultSet* res = cache->Search(vrecRow,sqlQualifiers);
00112 SK_DBI_Verbose( "Checking validity rec " << rowNo
00113 << " " << vrecRow
00114 << "SQL qual: " << sqlQualifiers
00115 << " cache search: " << (void*) res << " ");
00116 if ( res ) {
00117 fResults.push_back(res);
00118 res->Connect();
00119 fSize += res->GetNumRows();
00120 }
00121
00122
00123
00124 else if ( vrecRow.IsGap() ) {
00125 TDbiResultSet* newRes = new TDbiResultSetNonAgg(0, tableRow, &vrecRow);
00126 cache->Adopt(newRes,false);
00127 fResults.push_back(newRes);
00128 newRes->Connect();
00129 }
00130
00131
00132 else {
00133 UInt_t seqNo = vrecRow.GetSeqNo();
00134 reqSeqNos.push_back(seqNo);
00135 seqToRow[seqNo] = rowNo;
00136 fResults.push_back(0);
00137
00138
00139 dbNo = vrecRow.GetDbNo();
00140 }
00141 }
00142
00143
00144
00145
00146 if ( reqSeqNos.size() ) {
00147
00148
00149 sort(reqSeqNos.begin(),reqSeqNos.end());
00150 TDbiInRowStream* rs = proxy->QuerySeqNos(reqSeqNos,dbNo,sqlData,fillOpts);
00151
00152 this->SetResultsFromDb();
00153 TDbiTimerManager::gTimerManager.StartSubWatch(1);
00154 while ( ! rs->IsExhausted() ) {
00155 Int_t seqNo;
00156 *rs >> seqNo;
00157 rs->DecrementCurCol();
00158 Int_t rowNo = -2;
00159 if ( seqToRow.find(seqNo) == seqToRow.end() ) {
00160 SK_DBI_Severe( "Unexpected SeqNo: " << seqNo << " ");
00161 }
00162 else {
00163 rowNo = seqToRow[seqNo];
00164 SK_DBI_Verbose( "Procesing SeqNo: " << seqNo
00165 << " for row " << rowNo << " ");
00166 }
00167
00168 const TDbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00169 TDbiResultSetNonAgg* newRes = new TDbiResultSetNonAgg(rs,tableRow,&vrecRow);
00170
00171 if ( this->IsExtendedContext() ) newRes->SetCanReuse(false);
00172 if ( rowNo == -2 ) {
00173 delete newRes;
00174 }
00175 else {
00176 SK_DBI_Verbose( "SeqNo: " << seqNo
00177 << " produced " << newRes->GetNumRows() << " rows" << " ");
00178
00179
00180 cache->Adopt(newRes,false);
00181 fResults[rowNo-1] = newRes;
00182 newRes->Connect();
00183 fSize += newRes->GetNumRows();
00184 }
00185 }
00186
00187
00188 delete rs;
00189 }
00190
00191
00192
00193
00194
00195 fRowKeys.reserve(fSize);
00196
00197 TDbiValidityRec vRec = vrecBuilder->GetValidityRec(1);
00198 for ( Int_t rowNo = 1; rowNo <= maxRowNo; ++rowNo ) {
00199
00200 const TDbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00201 TVldRange r = vrecRow.GetVldRange();
00202 vRec.AndTimeWindow(r.GetTimeStart(),r.GetTimeEnd());
00203
00204 const TDbiResultSet* res = fResults[rowNo-1];
00205 if ( res ) {
00206 UInt_t numEnt = res->GetNumRows();
00207 for (UInt_t entNo = 0; entNo < numEnt; ++entNo )
00208 fRowKeys.push_back(res->GetTableRow(entNo));
00209 }
00210 }
00211
00212
00213
00214 this->BuildLookUpTable();
00215
00216
00217 vRec.SetAggregateNo(-1);
00218 SetValidityRec(vRec);
00219
00220 SK_DBI_Debug( "Aggregate contains " << fSize << " entries. vRec:-" << " "
00221 << vRec << " ");
00222
00223 SK_DBI_Info( "Created aggregated result set no. of rows: "
00224 << this->GetNumRows() << " ");
00225
00226 }
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245 TDbiResultSetAgg::~TDbiResultSetAgg() {
00246
00247
00248
00249
00250
00251
00252
00253
00254 SK_DBI_Trace( "Destroying TDbiResultSetAgg." << " ");
00255
00256 for ( ConstResultItr_t itr = fResults.begin();
00257 itr != fResults.end();
00258 ++itr) if ( *itr ) (*itr)->Disconnect();
00259
00260 }
00261
00262
00263
00264
00265
00266
00267 TDbiResultKey* TDbiResultSetAgg::CreateKey() const {
00268
00269 TDbiResultKey* key = 0;
00270 for ( ConstResultItr_t itr = fResults.begin();
00271 itr != fResults.end();
00272 ++itr ) {
00273 const TDbiResultSet* result = *itr;
00274 if ( result ) {
00275
00276 if ( ! key ) key = result->CreateKey();
00277
00278 else {
00279 const TDbiValidityRec& vrec = result->GetValidityRec();
00280 key->AddVRecKey(vrec.GetSeqNo(),vrec.GetCreationDate());
00281 }
00282 }
00283 }
00284
00285
00286 if ( ! key ) key = new TDbiResultKey();
00287
00288 return key;
00289
00290 }
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301 const TDbiTableRow* TDbiResultSetAgg::GetTableRow(UInt_t row) const {
00302
00303
00304
00305
00306
00307
00308
00309
00310 return ( row >= fRowKeys.size() ) ? 0 : fRowKeys[row];
00311
00312 }
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329 const TDbiValidityRec& TDbiResultSetAgg::GetValidityRec(
00330 const TDbiTableRow* row) const {
00331
00332
00333
00334
00335
00336 if ( ! row ) return this->GetValidityRecGlobal();
00337 TDbiResultSet* owner = row->GetOwner();
00338 return owner ? owner->GetValidityRecGlobal() : this->GetValidityRecGlobal();
00339
00340 }
00341
00342
00343 Bool_t TDbiResultSetAgg::Satisfies(const string& sqlQualifiers) {
00344
00345
00346
00347
00348
00349 SK_DBI_Debug( "Trying to satisfy: SQL: " << sqlQualifiers
00350 << "\n with CanReuse: " << this->CanReuse()
00351 << " sqlQualifiers: " << this->GetSqlQualifiers()
00352 << " ");
00353 return this->CanReuse()
00354 && this->GetSqlQualifiers() == sqlQualifiers;
00355 }
00356
00357
00358
00359
00360
00361
00362 void TDbiResultSetAgg::Streamer(TDbiBinaryFile& bf) {
00363
00364
00365
00366
00367
00368
00369 vector<const TDbiResultSet*>::const_iterator itr = fResults.begin();
00370 vector<const TDbiResultSet*>::const_iterator end = fResults.end();
00371
00372 UInt_t numNonAgg = 0;
00373 for (; itr != end; ++itr) {
00374 const TDbiResultSetNonAgg* rna = dynamic_cast<const TDbiResultSetNonAgg*>(*itr);
00375 if ( rna && ! rna->GetValidityRecGlobal().IsGap() ) ++numNonAgg;
00376 }
00377 bf << numNonAgg;
00378
00379
00380 for (itr = fResults.begin(); itr != end; ++itr) {
00381 const TDbiResultSetNonAgg* rna = dynamic_cast<const TDbiResultSetNonAgg*>(*itr);
00382 if ( rna && ! rna->GetValidityRecGlobal().IsGap() ) bf << *rna;
00383 }
00384 }
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417