31 ROOT::EnableThreadSafety();
36 throw std::invalid_argument(
"Can not add empty/NULL service");
38 m_services.push_back(std::move(service));
39 ACTS_INFO(
"Added service '" << m_services.back()->name() <<
"'");
43 std::shared_ptr<IContextDecorator> decorator) {
45 throw std::invalid_argument(
"Can not add empty/NULL context decorator");
47 m_decorators.push_back(std::move(decorator));
48 ACTS_INFO(
"Added context decarator '" << m_decorators.back()->name() <<
"'");
53 throw std::invalid_argument(
"Can not add empty/NULL reader");
55 m_readers.push_back(std::move(reader));
56 ACTS_INFO(
"Added reader '" << m_readers.back()->name() <<
"'");
61 throw std::invalid_argument(
"Can not add empty/NULL algorithm");
63 m_algorithms.push_back(std::move(algorithm));
64 ACTS_INFO(
"Added algorithm '" << m_algorithms.back()->name() <<
"'");
69 throw std::invalid_argument(
"Can not add empty/NULL writer");
71 m_writers.push_back(std::move(writer));
72 ACTS_INFO(
"Added writer '" << m_writers.back()->name() <<
"'");
76 std::vector<std::string> names;
79 for (
const auto& service : m_services) {
80 names.push_back(
"Service:" + service->name());
82 for (
const auto& decorator : m_decorators) {
83 names.push_back(
"Decorator:" + decorator->name());
85 for (
const auto& reader : m_readers) {
86 names.push_back(
"Reader:" + reader->name());
88 for (
const auto&
algorithm : m_algorithms) {
89 names.push_back(
"Algorithm:" +
algorithm->name());
91 for (
const auto& writer : m_writers) {
92 names.push_back(
"Writer:" + writer->name());
102 size_t saturatedAdd(
size_t a,
size_t b) {
111 constexpr
auto kInvalidEventsRange = std::make_pair(SIZE_MAX, SIZE_MAX);
124 size_t end = SIZE_MAX;
125 for (
const auto& reader : m_readers) {
126 auto available = reader->availableEvents();
127 beg =
std::max(beg, available.first);
128 end =
std::min(end, available.second);
133 ACTS_ERROR(
"Available events ranges from readers do not overlap");
134 return kInvalidEventsRange;
140 return kInvalidEventsRange;
143 if (end <= saturatedAdd(beg, m_cfg.skip)) {
144 ACTS_ERROR(
"Less events available than requested to skip");
145 return kInvalidEventsRange;
148 if ((beg == 0
u) and (end == SIZE_MAX) and (m_cfg.events == SIZE_MAX)) {
149 ACTS_ERROR(
"Could not determine number of events");
150 return kInvalidEventsRange;
154 auto begSelected = saturatedAdd(beg, m_cfg.skip);
155 auto endRequested = saturatedAdd(begSelected, m_cfg.events);
156 auto endSelected =
std::min(end, endRequested);
157 if (end < endRequested) {
158 ACTS_INFO(
"Restrict requested number of events to available ones");
161 return {begSelected, endSelected};
166 using Clock = std::chrono::high_resolution_clock;
167 using Duration = Clock::duration;
168 using Timepoint = Clock::time_point;
169 using Seconds = std::chrono::duration<double>;
170 using NanoSeconds = std::chrono::duration<double, std::nano>;
177 StopWatch(Duration&
s) :
start(Clock::now()), store(s) {}
178 ~StopWatch() { store += Clock::now() -
start; }
182 template <
typename D>
183 inline std::string asString(
D duration) {
184 double ns = std::chrono::duration_cast<NanoSeconds>(duration).count();
197 template <
typename D>
198 inline std::string perEvent(
D duration,
size_t numEvents) {
199 return asString(duration / numEvents) +
"/event";
204 std::string identifier;
206 double time_perevent_s;
208 DFE_NAMEDTUPLE(TimingInfo, identifier, time_total_s, time_perevent_s);
211 void storeTiming(
const std::vector<std::string>& identifiers,
212 const std::vector<Duration>& durations, std::size_t numEvents,
215 for (
size_t i = 0; i < identifiers.size(); ++i) {
217 info.identifier = identifiers[i];
219 std::chrono::duration_cast<Seconds>(durations[i]).count();
220 info.time_perevent_s = info.time_total_s / numEvents;
228 Timepoint clockWallStart = Clock::now();
230 std::vector<std::string> names = listAlgorithmNames();
231 std::vector<Duration> clocksAlgorithms(names.size(),
Duration::zero());
232 tbb::queuing_mutex clocksAlgorithmsMutex;
236 std::pair<size_t, size_t> eventsRange = determineEventsRange();
237 if ((eventsRange.first == SIZE_MAX) and (eventsRange.second == SIZE_MAX)) {
241 ACTS_INFO(
"Processing events [" << eventsRange.first <<
", "
242 << eventsRange.second <<
")");
243 ACTS_INFO(
"Starting event loop with " << m_cfg.numThreads <<
" threads");
244 ACTS_INFO(
" " << m_services.size() <<
" services");
245 ACTS_INFO(
" " << m_decorators.size() <<
" context decorators");
246 ACTS_INFO(
" " << m_readers.size() <<
" readers");
247 ACTS_INFO(
" " << m_algorithms.size() <<
" algorithms");
248 ACTS_INFO(
" " << m_writers.size() <<
" writers");
251 for (
auto& service : m_services) {
252 names.push_back(
"Service:" + service->name() +
":startRun");
254 StopWatch
sw(clocksAlgorithms.back());
259 tbb::task_scheduler_init
init(m_cfg.numThreads);
261 tbb::blocked_range<size_t>(eventsRange.first, eventsRange.second),
262 [&](
const tbb::blocked_range<size_t>&
r) {
263 std::vector<Duration> localClocksAlgorithms(names.size(),
266 for (
size_t event = r.begin();
event != r.end(); ++event) {
276 for (
auto& service : m_services) {
277 StopWatch
sw(localClocksAlgorithms[ialgo++]);
278 service->prepare(++context);
281 for (
auto& cdr : m_decorators) {
282 StopWatch
sw(localClocksAlgorithms[ialgo++]);
284 throw std::runtime_error(
"Failed to decorate event context");
288 for (
auto& rdr : m_readers) {
289 StopWatch
sw(localClocksAlgorithms[ialgo++]);
291 throw std::runtime_error(
"Failed to read input data");
295 for (
auto& alg : m_algorithms) {
296 StopWatch
sw(localClocksAlgorithms[ialgo++]);
298 throw std::runtime_error(
"Failed to process event data");
302 for (
auto& wrt : m_writers) {
303 StopWatch
sw(localClocksAlgorithms[ialgo++]);
305 throw std::runtime_error(
"Failed to write output data");
313 tbb::queuing_mutex::scoped_lock lock(clocksAlgorithmsMutex);
314 for (
size_t i = 0; i < clocksAlgorithms.size(); ++i) {
315 clocksAlgorithms[i] += localClocksAlgorithms[i];
321 for (
auto& wrt : m_writers) {
322 names.push_back(
"Writer:" + wrt->name() +
":endRun");
324 StopWatch
sw(clocksAlgorithms.back());
331 Duration totalWall = Clock::now() - clockWallStart;
332 Duration totalReal = std::accumulate(
333 clocksAlgorithms.begin(), clocksAlgorithms.end(),
Duration::zero());
334 size_t numEvents = eventsRange.second - eventsRange.first;
335 ACTS_INFO(
"Processed " << numEvents <<
" events in " << asString(totalWall)
337 ACTS_INFO(
"Average time per event: " << perEvent(totalReal, numEvents));
339 for (
size_t i = 0; i < names.size(); ++i) {
341 << perEvent(clocksAlgorithms[i], numEvents));
343 storeTiming(names, clocksAlgorithms, numEvents,
344 joinPaths(m_cfg.outputDir,
"timing.tsv"));