RunSet.cpp 12.2 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8 9 10
#include <iomanip>
#include <sstream>
#include <string>
#include "hepnos/DataSet.hpp"
#include "hepnos/RunSet.hpp"
11
#include "hepnos/Prefetcher.hpp"
12 13
#include "DataSetImpl.hpp"
#include "DataStoreImpl.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
14
#include "ItemImpl.hpp"
15
#include "PrefetcherImpl.hpp"
16 17 18

namespace hepnos {

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet::const_iterator::Impl declaration
////////////////////////////////////////////////////////////////////////////////////////////

class RunSet::const_iterator::Impl {
    public:
        Run m_current_run;
        std::shared_ptr<PrefetcherImpl> m_prefetcher;

        Impl()
        : m_current_run()
        {}

        Impl(const Run& run)
        : m_current_run(run)
        {}

        Impl(Run&& run)
            : m_current_run(std::move(run))
        {}

        Impl(const Impl& other)
            : m_current_run(other.m_current_run)
        {}

        ~Impl() {
            if(m_prefetcher)
                m_prefetcher->m_associated = false;
        }

        bool operator==(const Impl& other) const {
            return m_current_run == other.m_current_run;
        }

        void setPrefetcher(const std::shared_ptr<PrefetcherImpl>& p) {
            if(p->m_associated)
                throw Exception("Prefetcher object already in use");
            if(m_prefetcher)
                m_prefetcher->m_associated = false;
            m_prefetcher = p;
            m_prefetcher->m_associated = true;
        }
};

Matthieu Dorier's avatar
Matthieu Dorier committed
63 64 65 66
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet implementation
////////////////////////////////////////////////////////////////////////////////////////////

67 68
static RunSet::iterator RunSet_end;

69
RunSet::RunSet(const std::shared_ptr<RunSetImpl>& impl)
70
: m_impl(impl) {}
71

72
RunSet::RunSet(std::shared_ptr<RunSetImpl>&& impl)
73
: m_impl(std::move(impl)) {}
74

75
Run RunSet::operator[](const RunNumber& runNumber) {
76
    auto it = find(runNumber);
77 78
    if(!it->valid())
        throw Exception("Requested Run does not exist");
79 80 81
    return std::move(*it);
}

82 83 84 85
DataStore RunSet::datastore() const {
    return DataStore(m_impl->m_datastore);
}

86 87
RunSet::iterator RunSet::find(const RunNumber& runNumber) {
    int ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
88 89
    auto& datastore = m_impl->m_datastore;
    bool b = datastore->itemExists(m_impl->m_uuid, runNumber);
90
    if(!b) return end();
91
    return iterator(
Matthieu Dorier's avatar
Matthieu Dorier committed
92
            std::make_shared<ItemImpl>(
93
                datastore,
94
                m_impl->m_uuid,
95
                runNumber));
96 97
}

98 99
RunSet::iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) {
    auto it = find(runNumber);
100
    if(it != end()) {
101
        it.m_impl->setPrefetcher(prefetcher.m_impl);
102 103
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
104 105 106
    return it;
}

107 108 109 110 111
RunSet::const_iterator RunSet::find(const RunNumber& runNumber) const {
    iterator it = const_cast<RunSet*>(this)->find(runNumber);
    return it;
}

112 113
RunSet::const_iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) const {
    iterator it = const_cast<RunSet*>(this)->find(runNumber);
114
    if(it != end()) {
115
        it.m_impl->setPrefetcher(prefetcher.m_impl);
116 117
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
118 119 120
    return it;
}

121 122
RunSet::iterator RunSet::begin() {
    auto it = find(0);
123
    if(it != end()) return it;
124

125 126
    auto ds_level = m_impl->m_level;
    auto datastore = m_impl->m_datastore;
Matthieu Dorier's avatar
Matthieu Dorier committed
127
    auto new_run_impl = std::make_shared<ItemImpl>(datastore, m_impl->m_uuid, 0);
128
    Run run(std::move(new_run_impl));
129 130 131 132 133 134
    run = run.next();

    if(run.valid()) return iterator(run);
    else return end();
}

135 136
RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) {
    auto it = begin();
137
    if(it != end()) {
138
        it.m_impl->setPrefetcher(prefetcher.m_impl);
139
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
140 141
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
142 143 144
    return it;
}

145
RunSet::iterator RunSet::end() {
146
    return RunSet_end;
147 148 149 150 151 152
}

RunSet::const_iterator RunSet::cbegin() const {
    return const_iterator(const_cast<RunSet*>(this)->begin());
}

153 154
RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const {
    auto it = cbegin();
155
    if(it != cend()) {
156
        it.m_impl->setPrefetcher(prefetcher.m_impl);
157
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
158 159
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
160 161 162
    return it;
}

163
RunSet::const_iterator RunSet::cend() const {
164
    return RunSet_end;
165 166 167 168 169 170
}

RunSet::const_iterator RunSet::begin() const {
    return const_iterator(const_cast<RunSet*>(this)->begin());
}

171 172
RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const {
    auto it = const_iterator(const_cast<RunSet*>(this)->begin());
173
    if(it != end()) {
174
        it.m_impl->setPrefetcher(prefetcher.m_impl);
175
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
176 177
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
178 179 180
    return it;
}

181
RunSet::const_iterator RunSet::end() const {
182
    return RunSet_end;
183 184 185 186 187 188 189 190
}

RunSet::iterator RunSet::lower_bound(const RunNumber& lb) {
    if(lb == 0) {
        auto it = find(0);
        if(it != end()) {
            return it;
        } else {
Matthieu Dorier's avatar
Matthieu Dorier committed
191
            Run run(std::make_shared<ItemImpl>(
192
                    m_impl->m_datastore, 
193
                    m_impl->m_uuid, 0));
194 195 196 197 198 199 200 201 202 203
            run = run.next();
            if(!run.valid()) return end();
            else return iterator(run);
        }
    } else {
        auto it = find(lb-1);
        if(it != end()) {
            ++it;
            return it;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
204
        Run run(std::make_shared<ItemImpl>(
205
                m_impl->m_datastore, 
206
                m_impl->m_uuid, lb-1));
207 208 209 210 211 212
        run = run.next();
        if(!run.valid()) return end();
        else return iterator(run);
    }
}

213 214
RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) {
    auto it = lower_bound(lb);
215
    if(it != end()) {
216
        it.m_impl->setPrefetcher(prefetcher.m_impl);
217
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
218 219
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
220 221 222
    return it;
}

223 224 225 226 227
RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb) const {
    iterator it = const_cast<RunSet*>(this)->lower_bound(lb);
    return it;
}

228 229
RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) const {
    iterator it = const_cast<RunSet*>(this)->lower_bound(lb);
230
    if(it != end()) {
231
        it.m_impl->setPrefetcher(prefetcher.m_impl);
232
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
233 234
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
235 236 237
    return it;
}

238
RunSet::iterator RunSet::upper_bound(const RunNumber& ub) {
Matthieu Dorier's avatar
Matthieu Dorier committed
239
    Run run(std::make_shared<ItemImpl>(m_impl->m_datastore, 
240
                                      m_impl->m_uuid, ub));
241 242 243 244 245
    run = run.next();
    if(!run.valid()) return end();
    else return iterator(run);
}

246 247
RunSet::iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) {
    auto it = upper_bound(ub);
248
    if(it != end()) {
249
        it.m_impl->setPrefetcher(prefetcher.m_impl);
250
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
251 252
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
253 254 255
    return it;
}

256 257 258 259 260
RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub) const {
    iterator it = const_cast<RunSet*>(this)->upper_bound(ub);
    return it;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
261

262 263
RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) const {
    iterator it = const_cast<RunSet*>(this)->upper_bound(ub);
264
    if(it != end()) {
265
        it.m_impl->setPrefetcher(prefetcher.m_impl);
266
        prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
267 268
        prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
    }
269 270
    return it;
}
271 272


Matthieu Dorier's avatar
Matthieu Dorier committed
273
////////////////////////////////////////////////////////////////////////////////////////////
274
// RunSet::const_iterator implementation
Matthieu Dorier's avatar
Matthieu Dorier committed
275 276
////////////////////////////////////////////////////////////////////////////////////////////

277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
RunSet::const_iterator::const_iterator()
: m_impl(std::make_unique<Impl>()) {}

RunSet::const_iterator::const_iterator(const Run& run)
: m_impl(std::make_unique<Impl>(run)) {}

RunSet::const_iterator::const_iterator(Run&& run)
: m_impl(std::make_unique<Impl>(std::move(run))) {}

RunSet::const_iterator::~const_iterator() {}

RunSet::const_iterator::const_iterator(const RunSet::const_iterator& other)
: m_impl(std::make_unique<Impl>(*other.m_impl)) {}

RunSet::const_iterator::const_iterator(RunSet::const_iterator&& other)
: m_impl(std::move(other.m_impl)) {}

RunSet::const_iterator& RunSet::const_iterator::operator=(const RunSet::const_iterator& other) {
    if(&other == this) return *this;
    m_impl = std::make_unique<Impl>(*other.m_impl);
    return *this;
}

RunSet::const_iterator& RunSet::const_iterator::operator=(RunSet::const_iterator&& other) {
    if(&other == this) return *this;
    m_impl = std::move(other.m_impl);
    return *this;
}

RunSet::const_iterator::self_type RunSet::const_iterator::operator++() {
    if(!m_impl) {
        throw Exception("Trying to increment an invalid iterator");
    }
310 311 312 313 314 315 316 317 318 319 320 321
    if(!m_impl->m_prefetcher)
        m_impl->m_current_run = m_impl->m_current_run.next();
    else {
        std::vector<std::shared_ptr<ItemImpl>> next_runs;
        size_t s = m_impl->m_prefetcher->nextItems(ItemType::RUN, 
                ItemType::DATASET, m_impl->m_current_run.m_impl, next_runs, 1);
        if(s == 1) {
            m_impl->m_current_run.m_impl = std::move(next_runs[0]);
        } else {
            m_impl->m_current_run = Run();
        }
    }
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
    return *this;
}

RunSet::const_iterator::self_type RunSet::const_iterator::operator++(int) {
    const_iterator copy = *this;
    ++(*this);
    return copy;
}

const RunSet::const_iterator::reference RunSet::const_iterator::operator*() {
    if(!m_impl) {
        throw Exception("Trying to dereference an invalid iterator");
    }
    return m_impl->m_current_run;
}

const RunSet::const_iterator::pointer RunSet::const_iterator::operator->() {
    if(!m_impl) return nullptr;
    return &(m_impl->m_current_run);
}

bool RunSet::const_iterator::operator==(const self_type& rhs) const {
    if(!m_impl && !rhs.m_impl)  return true;
    if(m_impl  && !rhs.m_impl)  return false;
    if(!m_impl && rhs.m_impl)   return false;
    return *m_impl == *(rhs.m_impl);
}

bool RunSet::const_iterator::operator!=(const self_type& rhs) const {
        return !(*this == rhs);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
354
////////////////////////////////////////////////////////////////////////////////////////////
355
// RunSet::iterator implementation
Matthieu Dorier's avatar
Matthieu Dorier committed
356 357
////////////////////////////////////////////////////////////////////////////////////////////

358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
RunSet::iterator::iterator(const Run& current)
: const_iterator(current) {}

RunSet::iterator::iterator(Run&& current)
: const_iterator(std::move(current)) {}

RunSet::iterator::iterator()
: const_iterator() {}

RunSet::iterator::~iterator() {}

RunSet::iterator::iterator(const RunSet::iterator& other)
: const_iterator(other) {}

RunSet::iterator::iterator(RunSet::iterator&& other)
: const_iterator(std::move(other)) {}

RunSet::iterator& RunSet::iterator::operator=(const RunSet::iterator& other) {
    if(this == &other) return *this;
    m_impl = std::make_unique<Impl>(*other.m_impl);
    return *this;
}

RunSet::iterator& RunSet::iterator::operator=(RunSet::iterator&& other) {
    if(this == &other) return *this;
    m_impl = std::move(other.m_impl);
    return *this;
}

RunSet::iterator::reference RunSet::iterator::operator*() {
    return const_cast<reference>(const_iterator::operator*());
}

RunSet::iterator::pointer RunSet::iterator::operator->() {
    return const_cast<pointer>(const_iterator::operator->());
}

}