ECCE @ EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
G4VUserMPIrunMerger.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file G4VUserMPIrunMerger.cc
1 //
2 // ********************************************************************
3 // * License and Disclaimer *
4 // * *
5 // * The Geant4 software is copyright of the Copyright Holders of *
6 // * the Geant4 Collaboration. It is provided under the terms and *
7 // * conditions of the Geant4 Software License, included in the file *
8 // * LICENSE and available at http://cern.ch/geant4/license . These *
9 // * include a list of copyright holders. *
10 // * *
11 // * Neither the authors of this software system, nor their employing *
12 // * institutes,nor the agencies providing financial support for this *
13 // * work make any representation or warranty, express or implied, *
14 // * regarding this software system or assume any liability for its *
15 // * use. Please see the license in the file LICENSE and URL above *
16 // * for the full disclaimer and the limitation of liability. *
17 // * *
18 // * This code implementation is the result of the scientific and *
19 // * technical work of the GEANT4 collaboration. *
20 // * By using, copying, modifying or distributing the software (or *
21 // * any work based on the software) you agree to acknowledge its *
22 // * use in resulting scientific publications, and indicate your *
23 // * acceptance of all terms of the Geant4 Software license. *
24 // ********************************************************************
25 //
26 #include "G4VUserMPIrunMerger.hh"
27 #include <mpi.h>
28 #include <assert.h>
29 #include <algorithm>
30 #include <functional>
31 #include "G4MPIutils.hh"
32 #include "G4MPImanager.hh"
33 
35  G4int destination ,
36  G4int ver) :
37  outputBuffer(nullptr),outputBufferSize(0),outputBufferPosition(0),
38  ownsBuffer(false),
39  destinationRank(destination),
40  run(const_cast<G4Run*>(aRun)),
41  commSize(0),
42  verbose(ver),
43  bytesSent(0) {}
44 
45 #define DMSG( LVL , MSG ) { if ( verbose > LVL ) { G4cout << MSG << G4endl; } }
46 
47 void G4VUserMPIrunMerger::Send(const unsigned int destination)
48 {
49  assert(run!=nullptr);
50  G4int nevts = run->GetNumberOfEvent();
51  DMSG( 1 , "G4VUserMPIrunMerger::Send() : Sending a G4run ("
52  <<run<<") with "<<nevts<<" events to: "<<destination);
53  input_userdata.clear();
54  Pack();//User code
55  InputUserData(&nevts,MPI::INT,1);
56 
57  DestroyBuffer();
58  G4int newbuffsize = 0;
59  for ( const const_registered_data& el : input_userdata ) {
60  newbuffsize += (el.dt.Get_size()*el.count);
61  }
62  char* buffer = new char[newbuffsize];
63  //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the
64  //small cpu penalty, we can live with that).)
65  std::fill(buffer,buffer+newbuffsize,0);
66  ownsBuffer=true;
67  SetupOutputBuffer(buffer,newbuffsize,0);
68  DMSG(3,"Buffer size: "<<newbuffsize<<" bytes at: "<<(void*)outputBuffer);
69 
70  //Now userdata contains all data to be send, do the real packing
71  for ( const const_registered_data& el : input_userdata ) {
72 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST
73  MPI_Pack(const_cast<void*>(el.p_data),el.count,el.dt,
74 #else
75  MPI_Pack(el.p_data,el.count,el.dt,
76 #endif
79  }
81  COMM_G4COMMAND_.Send(outputBuffer , outputBufferSize , MPI::PACKED ,
82  destination , G4MPImanager::kTAG_RUN);
84  DMSG(2 , "G4VUserMPIrunMerger::Send() : Done ");
85 }
86 
87 
88 void G4VUserMPIrunMerger::Receive(const unsigned int source)
89 {
90  const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm();
91  DMSG( 1 , "G4VUserMPIrunMerger::Receive(...) , this rank : "
92  <<parentComm->Get_rank()<<" and receiving from : "<<source);
93  //DestroyBuffer();
94  //Receive from all but one
95  //for (G4int rank = 0; rank < commSize-1; ++rank)
96  //{
97  MPI::Status status;
98  COMM_G4COMMAND_.Probe(source, G4MPImanager::kTAG_RUN, status);
99  //const G4int source = status.Get_source();
100  const G4int newbuffsize = status.Get_count(MPI::PACKED);
101  DMSG(2,"Preparing to receive buffer of size: "<<newbuffsize);
102  char* buffer = outputBuffer;
103  if ( newbuffsize > outputBufferSize ) {
104  DMSG(3,"New larger buffer expected, resize");
105  //New larger buffer incoming, recreate buffer
106  delete[] outputBuffer;
107  buffer = new char[newbuffsize];
108  //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the
109  //small cpu penalty, we can live with that).)
110  std::fill(buffer,buffer+newbuffsize,0);
111  ownsBuffer = true;
112  }
113  SetupOutputBuffer(buffer,newbuffsize,0);
114  COMM_G4COMMAND_.Recv(buffer, newbuffsize, MPI::PACKED,source,
115  G4MPImanager::kTAG_RUN, status);
116  DMSG(3,"Buffer Size: "<<outputBufferSize<< " bytes at: "<<(void*)outputBuffer);
117  output_userdata.clear();
118  //User code, if implemented will return the concrete G4Run class
119  G4Run* aNewRun = UnPack();
120  if ( aNewRun == nullptr ) aNewRun = new G4Run;
121  //Add number of events counter
122  G4int nevets = 0;
123  OutputUserData(&nevets,MPI::INT,1);
124  //now userdata contains all data references, do the real unpacking
125  for ( const registered_data& el : output_userdata ) {
127  el.p_data,el.count,el.dt,COMM_G4COMMAND_);
128  }
129  for ( G4int i = 0 ; i<nevets ; ++i ) aNewRun->RecordEvent(nullptr);
130 
131  //Now merge received MPI run with global one
132  DMSG(2,"Before G4Run::Merge : "<<run->GetNumberOfEvent());
133  run->Merge( aNewRun );
134  DMSG(2,"After G4Run::Merge : "<<run->GetNumberOfEvent());
135  delete aNewRun;
136  //}
137 }
138 
140 {
141  // G4cout << "G4VUserMPIrunMerger::Merge called" << G4endl;
142 
143  DMSG(0, "G4VUserMPIrunMerger::Merge called");
144  const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm();
145  const unsigned int myrank = parentComm->Get_rank();
147  // do not include extra worker in this communication
148 
149  if ( commSize == 1 ) {
150  DMSG(1,"Comm world size is 1, nothing to do");
151  return;
152  }
153  COMM_G4COMMAND_ = parentComm->Dup();
154  bytesSent = 0;
155  const G4double sttime = MPI::Wtime();
156 
157  //Use G4MPIutils to optimize communications between ranks
158  typedef std::function<void(unsigned int)> handler_t;
159  using std::placeholders::_1;
160  handler_t sender = std::bind(&G4VUserMPIrunMerger::Send , this , _1);
161  handler_t receiver = std::bind(&G4VUserMPIrunMerger::Receive, this, _1);
162  std::function<void(void)> barrier =
163  std::bind(&MPI::Intracomm::Barrier,&COMM_G4COMMAND_);
164  // G4cout << "go to G4mpi::Merge" << G4endl;
165  G4mpi::Merge( sender , receiver , barrier , commSize , myrank );
166 
167  //OLD Style p2p communications
168 /*
169  if ( myrank != destinationRank ) {
170  DMSG(0,"Comm world size: "<<commSize<<" this rank is: "
171  <<myrank<<" sending to rank "<<destinationRank);
172  Send(destinationRank);
173  } else {
174  DMSG(1,"Comm world size: "<<commSize<<" this rank is: "
175  <<myrank<<" receiving. ");
176  for ( unsigned int i = 0 ; i<commSize ; ++i) {
177  if ( i != myrank ) Receive(i);
178  }
179  }
180 */
181  const G4double elapsed = MPI::Wtime() - sttime;
182  long total=0;
183  COMM_G4COMMAND_.Reduce(&bytesSent,&total,1,MPI::LONG,MPI::SUM,
185  if ( verbose > 0 && myrank == destinationRank ) {
186  //Collect from ranks how much data was sent around
187  G4cout<<"G4VUserMPIrunMerger::Merge() - data transfer performances: "
188  <<double(total)/1000./elapsed<<" kB/s"
189  <<" (Total Data Transfer= "<<double(total)/1000<<" kB in "
190  <<elapsed<<" s)."<<G4endl;
191  }
192 
193  COMM_G4COMMAND_.Free();
194  DMSG(0,"G4VUserMPIrunMerger::Merge done");
195 }
196