ECCE @ EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Sequencer.cpp
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file Sequencer.cpp
1 // This file is part of the Acts project.
2 //
3 // Copyright (C) 2017-2019 CERN for the benefit of the Acts project
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 
10 
11 #include <TROOT.h>
12 #include <algorithm>
13 #include <chrono>
14 #include <dfe/dfe_io_dsv.hpp>
15 #include <dfe/dfe_namedtuple.hpp>
16 #include <exception>
17 #include <numeric>
18 #include <tbb/tbb.h>
19 
23 
25  : m_cfg(cfg),
26  m_logger(Acts::getDefaultLogger("Sequencer", m_cfg.logLevel)) {
27  // automatically determine the number of concurrent threads to use
28  if (m_cfg.numThreads < 0) {
29  m_cfg.numThreads = tbb::task_scheduler_init::default_num_threads();
30  }
31  ROOT::EnableThreadSafety();
32 }
33 
34 void FW::Sequencer::addService(std::shared_ptr<IService> service) {
35  if (not service) {
36  throw std::invalid_argument("Can not add empty/NULL service");
37  }
38  m_services.push_back(std::move(service));
39  ACTS_INFO("Added service '" << m_services.back()->name() << "'");
40 }
41 
43  std::shared_ptr<IContextDecorator> decorator) {
44  if (not decorator) {
45  throw std::invalid_argument("Can not add empty/NULL context decorator");
46  }
47  m_decorators.push_back(std::move(decorator));
48  ACTS_INFO("Added context decarator '" << m_decorators.back()->name() << "'");
49 }
50 
51 void FW::Sequencer::addReader(std::shared_ptr<IReader> reader) {
52  if (not reader) {
53  throw std::invalid_argument("Can not add empty/NULL reader");
54  }
55  m_readers.push_back(std::move(reader));
56  ACTS_INFO("Added reader '" << m_readers.back()->name() << "'");
57 }
58 
59 void FW::Sequencer::addAlgorithm(std::shared_ptr<IAlgorithm> algorithm) {
60  if (not algorithm) {
61  throw std::invalid_argument("Can not add empty/NULL algorithm");
62  }
63  m_algorithms.push_back(std::move(algorithm));
64  ACTS_INFO("Added algorithm '" << m_algorithms.back()->name() << "'");
65 }
66 
67 void FW::Sequencer::addWriter(std::shared_ptr<IWriter> writer) {
68  if (not writer) {
69  throw std::invalid_argument("Can not add empty/NULL writer");
70  }
71  m_writers.push_back(std::move(writer));
72  ACTS_INFO("Added writer '" << m_writers.back()->name() << "'");
73 }
74 
75 std::vector<std::string> FW::Sequencer::listAlgorithmNames() const {
76  std::vector<std::string> names;
77 
78  // WARNING this must be done in the same order as in the processing
79  for (const auto& service : m_services) {
80  names.push_back("Service:" + service->name());
81  }
82  for (const auto& decorator : m_decorators) {
83  names.push_back("Decorator:" + decorator->name());
84  }
85  for (const auto& reader : m_readers) {
86  names.push_back("Reader:" + reader->name());
87  }
88  for (const auto& algorithm : m_algorithms) {
89  names.push_back("Algorithm:" + algorithm->name());
90  }
91  for (const auto& writer : m_writers) {
92  names.push_back("Writer:" + writer->name());
93  }
94 
95  return names;
96 }
97 
98 namespace {
99 // Saturated addition that does not overflow and exceed SIZE_MAX.
100 //
101 // From http://locklessinc.com/articles/sat_arithmetic/
102 size_t saturatedAdd(size_t a, size_t b) {
103  size_t res = a + b;
104  res |= -(res < a);
105  return res;
106 }
107 } // namespace
108 
109 std::pair<std::size_t, std::size_t> FW::Sequencer::determineEventsRange()
110  const {
111  constexpr auto kInvalidEventsRange = std::make_pair(SIZE_MAX, SIZE_MAX);
112 
113  // Note on skipping events:
114  //
115  // Previously, skipping events was only allowed when readers where available,
116  // since only readers had a `.skip()` functionality. The `.skip()` interface
117  // has been removed in favour of telling the readers the event they are
118  // requested to read via the algorithm context.
119  // Skipping can now also be used when no readers are configured, e.g. for
120  // generating only a few specific events in a simulation setup.
121 
122  // determine intersection of event ranges available from readers
123  size_t beg = 0u;
124  size_t end = SIZE_MAX;
125  for (const auto& reader : m_readers) {
126  auto available = reader->availableEvents();
127  beg = std::max(beg, available.first);
128  end = std::min(end, available.second);
129  }
130 
131  // since we use event ranges (and not just num events) they might not overlap
132  if (end < beg) {
133  ACTS_ERROR("Available events ranges from readers do not overlap");
134  return kInvalidEventsRange;
135  }
136  // configured readers without available events makes no sense
137  // TODO could there be a use-case for zero events? run only setup functions?
138  if (beg == end) {
139  ACTS_ERROR("No events available");
140  return kInvalidEventsRange;
141  }
142  // trying to skip too many events must be an error
143  if (end <= saturatedAdd(beg, m_cfg.skip)) {
144  ACTS_ERROR("Less events available than requested to skip");
145  return kInvalidEventsRange;
146  }
147  // events range was not defined by either the readers or user command line.
148  if ((beg == 0u) and (end == SIZE_MAX) and (m_cfg.events == SIZE_MAX)) {
149  ACTS_ERROR("Could not determine number of events");
150  return kInvalidEventsRange;
151  }
152 
153  // take user selection into account
154  auto begSelected = saturatedAdd(beg, m_cfg.skip);
155  auto endRequested = saturatedAdd(begSelected, m_cfg.events);
156  auto endSelected = std::min(end, endRequested);
157  if (end < endRequested) {
158  ACTS_INFO("Restrict requested number of events to available ones");
159  }
160 
161  return {begSelected, endSelected};
162 }
163 
164 // helpers for per-algorithm timing information
165 namespace {
166 using Clock = std::chrono::high_resolution_clock;
167 using Duration = Clock::duration;
168 using Timepoint = Clock::time_point;
169 using Seconds = std::chrono::duration<double>;
170 using NanoSeconds = std::chrono::duration<double, std::nano>;
171 
172 // RAII-based stopwatch to time execution within a block
173 struct StopWatch {
174  Timepoint start;
175  Duration& store;
176 
177  StopWatch(Duration& s) : start(Clock::now()), store(s) {}
178  ~StopWatch() { store += Clock::now() - start; }
179 };
180 
181 // Convert duration to a printable string w/ reasonable unit.
182 template <typename D>
183 inline std::string asString(D duration) {
184  double ns = std::chrono::duration_cast<NanoSeconds>(duration).count();
185  if (1e9 < std::abs(ns)) {
186  return std::to_string(ns / 1e9) + " s";
187  } else if (1e6 < std::abs(ns)) {
188  return std::to_string(ns / 1e6) + " ms";
189  } else if (1e3 < std::abs(ns)) {
190  return std::to_string(ns / 1e3) + " us";
191  } else {
192  return std::to_string(ns) + " ns";
193  }
194 }
195 
196 // Convert duration scaled to one event to a printable string.
197 template <typename D>
198 inline std::string perEvent(D duration, size_t numEvents) {
199  return asString(duration / numEvents) + "/event";
200 }
201 
202 // Store timing data
203 struct TimingInfo {
204  std::string identifier;
205  double time_total_s;
206  double time_perevent_s;
207 
208  DFE_NAMEDTUPLE(TimingInfo, identifier, time_total_s, time_perevent_s);
209 };
210 
211 void storeTiming(const std::vector<std::string>& identifiers,
212  const std::vector<Duration>& durations, std::size_t numEvents,
213  std::string path) {
214  dfe::NamedTupleTsvWriter<TimingInfo> writer(std::move(path), 4);
215  for (size_t i = 0; i < identifiers.size(); ++i) {
216  TimingInfo info;
217  info.identifier = identifiers[i];
218  info.time_total_s =
219  std::chrono::duration_cast<Seconds>(durations[i]).count();
220  info.time_perevent_s = info.time_total_s / numEvents;
221  writer.append(info);
222  }
223 }
224 } // namespace
225 
227  // measure overall wall clock
228  Timepoint clockWallStart = Clock::now();
229  // per-algorithm time measures
230  std::vector<std::string> names = listAlgorithmNames();
231  std::vector<Duration> clocksAlgorithms(names.size(), Duration::zero());
232  tbb::queuing_mutex clocksAlgorithmsMutex;
233 
234  // processing only works w/ a well-known number of events
235  // error message is already handled by the helper function
236  std::pair<size_t, size_t> eventsRange = determineEventsRange();
237  if ((eventsRange.first == SIZE_MAX) and (eventsRange.second == SIZE_MAX)) {
238  return EXIT_FAILURE;
239  }
240 
241  ACTS_INFO("Processing events [" << eventsRange.first << ", "
242  << eventsRange.second << ")");
243  ACTS_INFO("Starting event loop with " << m_cfg.numThreads << " threads");
244  ACTS_INFO(" " << m_services.size() << " services");
245  ACTS_INFO(" " << m_decorators.size() << " context decorators");
246  ACTS_INFO(" " << m_readers.size() << " readers");
247  ACTS_INFO(" " << m_algorithms.size() << " algorithms");
248  ACTS_INFO(" " << m_writers.size() << " writers");
249 
250  // run start-of-run hooks
251  for (auto& service : m_services) {
252  names.push_back("Service:" + service->name() + ":startRun");
253  clocksAlgorithms.push_back(Duration::zero());
254  StopWatch sw(clocksAlgorithms.back());
255  service->startRun();
256  }
257 
258  // execute the parallel event loop
259  tbb::task_scheduler_init init(m_cfg.numThreads);
260  tbb::parallel_for(
261  tbb::blocked_range<size_t>(eventsRange.first, eventsRange.second),
262  [&](const tbb::blocked_range<size_t>& r) {
263  std::vector<Duration> localClocksAlgorithms(names.size(),
264  Duration::zero());
265 
266  for (size_t event = r.begin(); event != r.end(); ++event) {
267  // Use per-event store
269  "EventStore#" + std::to_string(event), m_cfg.logLevel));
270  // If we ever wanted to run algorithms in parallel, this needs to be
271  // changed to Algorithm context copies
272  AlgorithmContext context(0, event, eventStore);
273  size_t ialgo = 0;
274 
275  // Prepare event store w/ service information
276  for (auto& service : m_services) {
277  StopWatch sw(localClocksAlgorithms[ialgo++]);
278  service->prepare(++context);
279  }
281  for (auto& cdr : m_decorators) {
282  StopWatch sw(localClocksAlgorithms[ialgo++]);
283  if (cdr->decorate(++context) != ProcessCode::SUCCESS) {
284  throw std::runtime_error("Failed to decorate event context");
285  }
286  }
287  // Read everything in
288  for (auto& rdr : m_readers) {
289  StopWatch sw(localClocksAlgorithms[ialgo++]);
290  if (rdr->read(++context) != ProcessCode::SUCCESS) {
291  throw std::runtime_error("Failed to read input data");
292  }
293  }
294  // Execute all algorithms
295  for (auto& alg : m_algorithms) {
296  StopWatch sw(localClocksAlgorithms[ialgo++]);
297  if (alg->execute(++context) != ProcessCode::SUCCESS) {
298  throw std::runtime_error("Failed to process event data");
299  }
300  }
301  // Write out results
302  for (auto& wrt : m_writers) {
303  StopWatch sw(localClocksAlgorithms[ialgo++]);
304  if (wrt->write(++context) != ProcessCode::SUCCESS) {
305  throw std::runtime_error("Failed to write output data");
306  }
307  }
308  ACTS_INFO("finished event " << event);
309  }
310 
311  // add timing info to global information
312  {
313  tbb::queuing_mutex::scoped_lock lock(clocksAlgorithmsMutex);
314  for (size_t i = 0; i < clocksAlgorithms.size(); ++i) {
315  clocksAlgorithms[i] += localClocksAlgorithms[i];
316  }
317  }
318  });
319 
320  // run end-of-run hooks
321  for (auto& wrt : m_writers) {
322  names.push_back("Writer:" + wrt->name() + ":endRun");
323  clocksAlgorithms.push_back(Duration::zero());
324  StopWatch sw(clocksAlgorithms.back());
325  if (wrt->endRun() != ProcessCode::SUCCESS) {
326  return EXIT_FAILURE;
327  }
328  }
329 
330  // summarize timing
331  Duration totalWall = Clock::now() - clockWallStart;
332  Duration totalReal = std::accumulate(
333  clocksAlgorithms.begin(), clocksAlgorithms.end(), Duration::zero());
334  size_t numEvents = eventsRange.second - eventsRange.first;
335  ACTS_INFO("Processed " << numEvents << " events in " << asString(totalWall)
336  << " (wall clock)");
337  ACTS_INFO("Average time per event: " << perEvent(totalReal, numEvents));
338  ACTS_DEBUG("Average time per algorithm:");
339  for (size_t i = 0; i < names.size(); ++i) {
340  ACTS_DEBUG(" " << names[i] << ": "
341  << perEvent(clocksAlgorithms[i], numEvents));
342  }
343  storeTiming(names, clocksAlgorithms, numEvents,
344  joinPaths(m_cfg.outputDir, "timing.tsv"));
345 
346  return EXIT_SUCCESS;
347 }