ECCE @ EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
G4MPImanager.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file G4MPImanager.cc
1 //
2 // ********************************************************************
3 // * License and Disclaimer *
4 // * *
5 // * The Geant4 software is copyright of the Copyright Holders of *
6 // * the Geant4 Collaboration. It is provided under the terms and *
7 // * conditions of the Geant4 Software License, included in the file *
8 // * LICENSE and available at http://cern.ch/geant4/license . These *
9 // * include a list of copyright holders. *
10 // * *
11 // * Neither the authors of this software system, nor their employing *
12 // * institutes,nor the agencies providing financial support for this *
13 // * work make any representation or warranty, express or implied, *
14 // * regarding this software system or assume any liability for its *
15 // * use. Please see the license in the file LICENSE and URL above *
16 // * for the full disclaimer and the limitation of liability. *
17 // * *
18 // * This code implementation is the result of the scientific and *
19 // * technical work of the GEANT4 collaboration. *
20 // * By using, copying, modifying or distributing the software (or *
21 // * any work based on the software) you agree to acknowledge its *
22 // * use in resulting scientific publications, and indicate your *
23 // * acceptance of all terms of the Geant4 Software license. *
24 // ********************************************************************
27 
28 #include "mpi.h"
29 #include <getopt.h>
30 #include <stdio.h>
31 #include <time.h>
32 #include "G4Run.hh"
33 #include "G4RunManager.hh"
34 #include "G4StateManager.hh"
35 #include "G4UIcommand.hh"
36 #include "G4UImanager.hh"
37 #include "G4MPIbatch.hh"
38 #include "G4MPImanager.hh"
39 #include "G4MPImessenger.hh"
41 #include "G4MPIsession.hh"
42 #include "G4MPIstatus.hh"
43 #include "G4MPIextraWorker.hh"
44 
46 
47 // --------------------------------------------------------------------------
48 namespace {
49 
50 // wrappers for thread functions
51 void thread_ExecuteThreadCommand(const G4String* command)
52 {
53  G4MPImanager::GetManager()-> ExecuteThreadCommand(*command);
54 }
55 
56 // --------------------------------------------------------------------------
57 void Wait(G4int ausec)
58 {
59  struct timespec treq, trem;
60  treq.tv_sec = 0;
61  treq.tv_nsec = ausec*1000;
62 
63  nanosleep(&treq, &trem);
64 }
65 
66 } // end of namespace
67 
68 // --------------------------------------------------------------------------
69 G4MPImanager::G4MPImanager(int nof_extra_workers)
70  : verbose_(0),
71  COMM_G4COMMAND_(MPI_COMM_NULL), processing_comm_(MPI_COMM_NULL),
72  collecting_comm_(MPI_COMM_NULL), all_comm_(MPI_COMM_NULL),
73  qfcout_(false), qinitmacro_(false), qbatchmode_(false),
74  thread_id_(0), master_weight_(1.), nof_extra_workers_(nof_extra_workers)
75 {
76  //MPI::Init();
77  MPI::Init_thread(MPI::THREAD_SERIALIZED);
78  Initialize();
79 }
80 
81 // --------------------------------------------------------------------------
82 G4MPImanager::G4MPImanager(int argc, char** argv, int nof_extra_workers)
83  : verbose_(0),
84  COMM_G4COMMAND_(MPI_COMM_NULL), processing_comm_(MPI_COMM_NULL),
85  collecting_comm_(MPI_COMM_NULL), all_comm_(MPI_COMM_NULL),
86  qfcout_(false), qinitmacro_(false), qbatchmode_(false),
87  thread_id_(0), master_weight_(1.), nof_extra_workers_(nof_extra_workers)
88 {
89  //MPI::Init(argc, argv);
90  MPI::Init_thread(argc, argv, MPI::THREAD_SERIALIZED);
91  Initialize();
92  ParseArguments(argc, argv);
93 }
94 
95 // --------------------------------------------------------------------------
97 {
98  if( is_slave_ && qfcout_ ) fscout_.close();
99 
100  delete status_;
101  delete messenger_;
102  delete session_;
103 
104  if ( nof_extra_workers_ ) {
105  MPI_Group_free(&world_group_);
106  MPI_Group_free(&processing_group_);
107  MPI_Group_free(&collecting_group_);
108  MPI_Group_free(&all_group_);
109  if (processing_comm_ != MPI_COMM_NULL) {
110  MPI_Comm_free(&processing_comm_);
111  }
112  if (collecting_comm_ != MPI_COMM_NULL) {
113  MPI_Comm_free(&collecting_comm_);
114  }
115  if (all_comm_ != MPI_COMM_NULL) {
116  MPI_Comm_free(&all_comm_);
117  }
118  }
119  else {
120  COMM_G4COMMAND_.Free();
121  }
122 
123  MPI::Finalize();
124 }
125 
126 // --------------------------------------------------------------------------
128 {
129  if ( g4mpi_ == NULL ) {
130  G4Exception("G4MPImanager::GetManager()", "MPI001",
131  FatalException, "G4MPImanager is not instantiated.");
132  }
133  return g4mpi_;
134 }
135 
136 // --------------------------------------------------------------------------
138 {
139  if ( ! nof_extra_workers_ ) {
140  G4Exception("G4MPImanager::SetExtraWorker()", "MPI001",
141  FatalException, "Number of extra workers >0 must be set first.");
142  }
143 
144  extra_worker_ = extraWorker;
145 }
146 
147 // --------------------------------------------------------------------------
149 {
150  // G4cout << "G4MPImanager::Initialize" << G4endl;
151 
152  if ( g4mpi_ != NULL ) {
153  G4Exception("G4MPImanager::Initialize()", "MPI002",
154  FatalException, "G4MPImanager is already instantiated.");
155  }
156 
157  g4mpi_ = this;
158 
159  // get rank information
160  world_size_ = MPI::COMM_WORLD.Get_size();
161  if ( world_size_ - nof_extra_workers_ <= 0 ) {
162  G4Exception("G4MPImanager::SetExtraWorker()", "MPI001",
163  JustWarning, "Cannot reserve extra ranks: the MPI size is not sufficient.");
164  nof_extra_workers_ = 0;
165  }
167  rank_ = MPI::COMM_WORLD.Get_rank();
170  is_extra_worker_ = false;
171 
172  if ( nof_extra_workers_ ) {
173  // G4cout << "Extra workers requested" << G4endl;
174 
175  // Define three groups of workers: processing, collecting and all;
176  // if no extra workers are declared, all world ranks are processing ranks
177 
178  // MPI_Group world_group;
179  MPI_Comm_group(MPI_COMM_WORLD, &world_group_);
180 
181  // Group 1 - processing ranks
182  int* ranks1 = new int[size_];
183  for (int i=0; i<size_; i++) ranks1[i] = i;
184  // Construct a group containing all of the processing ranks in world_group
185  MPI_Group_incl(world_group_, size_, ranks1, &processing_group_);
186 
187  // Group 2 - collecting ranks
188  int* ranks2 = new int[nof_extra_workers_];
189  for (int i=0; i<nof_extra_workers_; i++) ranks2[i] = (world_size_ - nof_extra_workers_) + i;
190  // Construct a group containing all of the collecting ranks in world_group
191  MPI_Group_incl(world_group_, nof_extra_workers_, ranks2, &collecting_group_);
192 
193  // Group 3 - all ranks
194  int* ranks3 = new int[world_size_];
195  for (int i=0; i<world_size_; i++) ranks3[i] = i;
196  // Construct a group containing all of the processing ranks in world_group
197  MPI_Group_incl(world_group_, world_size_, ranks3, &all_group_);
198 
199  // Create new communicators based on the groups
200  MPI_Comm_create_group(MPI_COMM_WORLD, processing_group_, 0, &processing_comm_);
201  MPI_Comm_create_group(MPI_COMM_WORLD, collecting_group_, 0, &collecting_comm_);
202  MPI_Comm_create_group(MPI_COMM_WORLD, all_group_, 0, &all_comm_);
203 
204  // COMM_G4COMMAND_ = processing_comm_ copy
205  COMM_G4COMMAND_ = MPI::Intracomm(processing_comm_);
206 
207  } else {
208  // G4cout << "No extra workers requested" << G4endl;
209  // initialize MPI communicator
210  COMM_G4COMMAND_ = MPI::COMM_WORLD.Dup();
211  }
212 
213  is_extra_worker_ = (collecting_comm_ != MPI_COMM_NULL);
214 
215  // new G4MPI stuffs
216  messenger_ = new G4MPImessenger();
217  messenger_-> SetTargetObject(this);
218  session_ = new G4MPIsession;
219  status_ = new G4MPIstatus;
220 
221  if ( ! is_extra_worker_ ) {
222  // default seed generator is random generator.
224  DistributeSeeds();
225  }
226 
227  // print status of this worker
228  // G4cout << this << " world_size_ " << world_size_ << G4endl;
229  // G4cout << this << " size_ " << size_ << G4endl;
230  // G4cout << this << " nof_extra_workers_ " << nof_extra_workers_ << G4endl;
231  // G4cout << this << " is_master_ " << is_master_ << G4endl;
232  // G4cout << this << " is_slave_ " << is_slave_ << G4endl;
233  // G4cout << this << " is_extra_worker_ " << is_extra_worker_ << G4endl;
234  // G4cout << this << " is_processing_worker_ "
235  // << (processing_comm_ != MPI_COMM_NULL) << G4endl;
236 }
237 
238 // --------------------------------------------------------------------------
239 void G4MPImanager::ParseArguments(int argc, char** argv)
240 {
241  G4int qhelp = 0;
242  G4String ofprefix = "mpi";
243 
244  G4int c;
245  while ( 1 ) {
246  G4int option_index = 0;
247  static struct option long_options[] = {
248  {"help", no_argument, NULL, 'h'},
249  {"verbose", no_argument, NULL, 'v'},
250  {"init", required_argument, NULL, 'i'},
251  {"ofile", optional_argument, NULL, 'o'},
252  {NULL, 0, NULL, 0}
253  };
254 
255  opterr = 0; // suppress message
256  c = getopt_long(argc, argv, "hvi:o", long_options, &option_index);
257  opterr = 1;
258 
259  if( c == -1 ) break;
260 
261  switch (c) {
262  case 'h' :
263  qhelp = 1;
264  break;
265  case 'v' :
266  verbose_ = 1;
267  break;
268  case 'i' :
269  qinitmacro_ = true;
271  break;
272  case 'o' :
273  qfcout_ = true;
274  if ( optarg ) ofprefix = optarg;
275  break;
276  default:
277  G4cerr << "*** invalid options specified." << G4endl;
278  std::exit(EXIT_FAILURE);
279  break;
280  }
281  }
282 
283  // show help
284  if ( qhelp ) {
285  if ( is_master_ ) ShowHelp();
286  MPI::Finalize();
287  std::exit(EXIT_SUCCESS);
288  }
289 
290  // file output
291  if( is_slave_ && qfcout_ ) {
292  G4String prefix = ofprefix + ".%03d" + ".cout";
293  char str[1024];
294  sprintf(str, prefix.c_str(), rank_);
295  G4String fname(str);
296  fscout_.open(fname.c_str(), std::ios::out);
297  }
298 
299  // non-option ARGV-elements ...
300  if ( optind < argc ) {
301  qbatchmode_ = true;
302  macro_file_name_ = argv[optind];
303  }
304 }
305 
306 // ====================================================================
308 {
310  const G4Run* run = runManager-> GetCurrentRun();
311 
312  G4int runid, eventid, neventTBP;
313 
315  G4ApplicationState g4state = stateManager-> GetCurrentState();
316 
317  if ( run ) {
318  runid = run-> GetRunID();
319  neventTBP = run -> GetNumberOfEventToBeProcessed();
320  eventid = run-> GetNumberOfEvent();
321  if( g4state == G4State_GeomClosed || g4state == G4State_EventProc ) {
322  status_-> StopTimer();
323  }
324  } else {
325  runid = 0;
326  eventid = 0;
327  neventTBP = 0;
328  }
329 
330  status_-> SetStatus(rank_, runid, neventTBP, eventid, g4state);
331 }
332 
333 // --------------------------------------------------------------------------
335 {
337 
338  UpdateStatus();
339  G4bool gstatus = CheckThreadStatus();
340 
341  if ( is_master_ ) {
342  status_-> Print(); // for maser itself
343 
344  G4int nev = status_-> GetEventID();
345  G4int nevtp = status_-> GetNEventToBeProcessed();
346  G4double cputime = status_-> GetCPUTime();
347 
348  // receive from each slave
349  for ( G4int islave = 1; islave < size_; islave++ ) {
351  islave, kTAG_G4STATUS);
352  status_-> UnPack(buff);
353  status_-> Print();
354 
355  // aggregation
356  nev += status_-> GetEventID();
357  nevtp += status_-> GetNEventToBeProcessed();
358  cputime += status_-> GetCPUTime();
359  }
360 
361  G4String strStatus;
362  if ( gstatus ) {
363  strStatus = "Run";
364  } else {
365  strStatus = "Idle";
366  }
367 
368  G4cout << "-------------------------------------------------------"
369  << G4endl
370  << "* #ranks= " << size_
371  << " event= " << nev << "/" << nevtp
372  << " state= " << strStatus
373  << " time= " << cputime << "s"
374  << G4endl;
375  } else {
376  status_-> Pack(buff);
379  }
380 }
381 
382 // ====================================================================
384 {
385  // Do nothing if not processing worker
386  if ( is_extra_worker_ ) return;
387 
388  std::vector<G4long> seed_list = seed_generator_-> GetSeedList();
389  G4Random::setTheSeed(seed_list[rank_]);
390 }
391 
392 // --------------------------------------------------------------------------
394 {
395  G4long buff;
396 
397  if ( is_master_ ) {
398  // print master
399  G4cout << "* rank= " << rank_
400  << " seed= " << G4Random::getTheSeed()
401  << G4endl;
402  // receive from each slave
403  for ( G4int islave = 1; islave < size_; islave++ ) {
404  COMM_G4COMMAND_.Recv(&buff, 1, MPI::LONG, islave, kTAG_G4SEED);
405  G4cout << "* rank= " << islave
406  << " seed= " << buff
407  << G4endl;
408  }
409  } else { // slaves
410  buff = G4Random::getTheSeed();
411  COMM_G4COMMAND_.Send(&buff, 1, MPI::LONG, kRANK_MASTER, kTAG_G4SEED);
412  }
413 }
414 
415 // --------------------------------------------------------------------------
417 {
418  if( rank_ == inode ) {
420  }
421 }
422 
423 // ====================================================================
425 {
426  unsigned buff;
427  unsigned qstatus = 0;
428 
429  if( is_master_ ) {
430  qstatus = (thread_id_ != 0);
431  // get slave status
432  for ( G4int islave = 1; islave < size_; islave++ ) {
433  MPI::Request request = COMM_G4COMMAND_.Irecv(&buff, 1, MPI::UNSIGNED,
434  islave, kTAG_G4STATUS);
435  while( ! request.Test() ) {
436  ::Wait(1000);
437  }
438  qstatus |= buff;
439  }
440  } else {
441  buff = (thread_id_ !=0);
442  COMM_G4COMMAND_.Send(&buff, 1, MPI::UNSIGNED, kRANK_MASTER, kTAG_G4STATUS);
443  }
444 
445  // broadcast
446  buff = qstatus; // for master
447  COMM_G4COMMAND_.Bcast(&buff, 1, MPI::UNSIGNED, kRANK_MASTER);
448  qstatus = buff; // for slave
449 
450  if ( qstatus != 0 ) return true;
451  else return false;
452 }
453 
454 // --------------------------------------------------------------------------
456 {
457  // this method is a thread function.
459  G4int rc = UI-> ApplyCommand(command);
460 
461  G4int commandStatus = rc - (rc%100);
462 
463  switch( commandStatus ) {
464  case fCommandSucceeded:
465  break;
467  G4cerr << "illegal application state -- command refused" << G4endl;
468  break;
469  default:
470  G4cerr << "command refused (" << commandStatus << ")" << G4endl;
471  break;
472  }
473 
474  // thread is joined
475  if ( thread_id_ ) {
476  pthread_join(thread_id_, 0);
477  thread_id_ = 0;
478  }
479 
480  return;
481 }
482 
483 // --------------------------------------------------------------------------
485 {
486  G4bool threadStatus = CheckThreadStatus();
487 
488  if (threadStatus) {
489  if ( is_master_ ) {
490  G4cout << "G4MPIsession:: beamOn is still running." << G4endl;
491  }
492  } else { // ok
493  static G4String cmdstr;
494  cmdstr = command;
495  G4int rc = pthread_create(&thread_id_, 0,
496  (Func_t)thread_ExecuteThreadCommand,
497  (void*)&cmdstr);
498  if (rc != 0)
499  G4Exception("G4MPImanager::ExecuteBeamOnThread()",
500  "MPI003", FatalException,
501  "Failed to create a beamOn thread.");
502  }
503 }
504 
505 // --------------------------------------------------------------------------
507 {
508  if ( thread_id_ ) {
509  pthread_join(thread_id_, 0);
510  thread_id_ = 0;
511  }
512 }
513 
514 // ====================================================================
516 {
517  // Do nothing if not processing worker
518  if (is_extra_worker_) return G4String("exit");
519 
520  enum { kBUFF_SIZE = 512 };
521  static char sbuff[kBUFF_SIZE];
522  command.copy(sbuff, kBUFF_SIZE);
523  G4int len = command.size();
524  sbuff[len] ='\0'; // no boundary check
525 
526  // "command" is not yet fixed in slaves at this time.
527 
528  // waiting message exhausts CPU in LAM!
529  //COMM_G4COMMAND_.Bcast(sbuff, ssize, MPI::CHAR, RANK_MASTER);
530 
531  // another implementation
532  if( is_master_ ) {
533  for ( G4int islave = 1; islave < size_; islave++ ) {
534  COMM_G4COMMAND_.Send(sbuff, kBUFF_SIZE, MPI::CHAR,
535  islave, kTAG_G4COMMAND);
536  }
537  } else {
538  // try non-blocking receive
539  MPI::Request request= COMM_G4COMMAND_.Irecv(sbuff, kBUFF_SIZE, MPI::CHAR,
541  // polling...
542  while(! request.Test()) {
543  ::Wait(1000);
544  }
545  }
546 
547  return G4String(sbuff);
548 }
549 
550 // ====================================================================
552 {
553  G4bool currentmode = qbatchmode_;
554  qbatchmode_ = true;
555  G4MPIbatch* batchSession = new G4MPIbatch(fname, qbatch);
556  batchSession-> SessionStart();
557  delete batchSession;
558  qbatchmode_ = currentmode;
559 }
560 
561 // --------------------------------------------------------------------------
563 {
564  // G4cout << "G4MPImanager::BeamOn " << nevent << G4endl;
565 
566 #ifndef G4MULTITHREADED
568 #endif
569 
570  if ( qdivide ) { // events are divided
571  G4double ntot = master_weight_ + size_ - 1.;
572  G4int nproc = G4int(nevent/ntot);
573  G4int nproc0 = nevent - nproc*(size_ - 1);
574 
575  if ( verbose_ > 0 && is_master_ ) {
576  G4cout << "#events in master=" << nproc0 << " / "
577  << "#events in slave=" << nproc << G4endl;
578  }
579 
580  status_-> StartTimer(); // start timer
581 
582 #ifdef G4MULTITHREADED
583  G4String str_nevt;
584  if ( is_master_ ) str_nevt = G4UIcommand::ConvertToString(nproc0);
585  else str_nevt = G4UIcommand::ConvertToString(nproc);
587  UI-> ApplyCommand("/run/beamOn " + str_nevt);
588 #else
589  if ( is_master_ ) runManager-> BeamOn(nproc0);
590  else runManager-> BeamOn(nproc);
591 #endif
592 
593  status_-> StopTimer(); // stop timer
594 
595  } else { // same events are generated in each node (for test use)
596  if( verbose_ > 0 && is_master_ ) {
597  G4cout << "#events in master=" << nevent << " / "
598  << "#events in slave=" << nevent << G4endl;
599  }
600  status_-> StartTimer(); // start timer
601 
602 #ifdef G4MULTITHREADED
603  G4String str_nevt = G4UIcommand::ConvertToString(nevent);
605  UI-> ApplyCommand("/run/beamOn " + str_nevt);
606 #else
607  runManager-> BeamOn(nevent);
608 #endif
609 
610  status_-> StopTimer(); // stop timer
611  }
612 }
613 
614 // --------------------------------------------------------------------------
616 {
617  // G4cout << "G4MPImanager::WaitBeamOn" << G4endl;
618 
619  // Extra worker
620  if (is_extra_worker_) {
621  if ( extra_worker_ ) {
622  G4cout << "Calling extra_worker " << G4endl;
624  } else {
625  G4cout << " !!!! extra_worker_ is not defined " << G4endl;
626  }
627  return;
628  }
629 
630  G4int buff = 0;
631  if ( qbatchmode_ ) { // valid only in batch mode
632  if ( is_master_ ) {
633  // receive from each slave
634  for (G4int islave = 1; islave < size_; islave++) {
635  // G4cout << "calling Irecv for islave " << islave << G4endl;
636  MPI::Request request = COMM_G4COMMAND_.Irecv(&buff, 1, MPI::INT,
637  islave, kTAG_G4STATUS);
638  while(! request.Test()) {
639  ::Wait(1000);
640  }
641  }
642  } else {
643  buff = 1;
644  // G4cout << "calling send for i " << kRANK_MASTER << G4endl;
646  }
647  }
648 }
649 
650 // --------------------------------------------------------------------------
652 {
653  if ( is_master_ ){
654  std::cout << message << std::flush;
655  } else {
656  if ( qfcout_ ) { // output to a file
657  fscout_ << message << std::flush;
658  } else { // output to stdout
659  std::cout << rank_ << ":" << message << std::flush;
660  }
661  }
662 }
663 
664 // --------------------------------------------------------------------------
666 {
667  if (is_slave_ ) return;
668 
669  G4cout << "Geant4 MPI interface" << G4endl;
670  G4cout << "usage:" << G4endl;
671  G4cout << "<app> [options] [macro file]"
672  << G4endl << G4endl;
673  G4cout << " -h, --help show this message."
674  << G4endl;
675  G4cout << " -v, --verbose show verbose message"
676  << G4endl;
677  G4cout << " -i, --init=FNAME set an init macro file"
678  << G4endl;
679  G4cout << " -o, --ofile[=FNAME] set slave output to a flie"
680  << G4endl;
681  G4cout << G4endl;
682 }