36 outputBuffer(nullptr),outputBufferSize(0),outputBufferPosition(0),
38 destinationRank(destination),
39 run(const_cast<
G4Run*>(aRun)),
44 #define DMSG( LVL , MSG ) { if ( verbose > LVL ) { G4cout << MSG << G4endl; } }
50 DMSG( 1 ,
"G4VUserMPIrunMerger::Send() : Sending a G4run ("
51 <<run<<
") with "<<nevts<<
" events to: "<<destination);
52 input_userdata.clear();
57 G4int newbuffsize = 0;
58 for (
const const_registered_data& el : input_userdata ) {
59 newbuffsize += (el.dt.Get_size()*el.count);
61 char*
buffer =
new char[newbuffsize];
64 std::fill(buffer,buffer+newbuffsize,0);
67 DMSG(3,
"Buffer size: "<<newbuffsize<<
" bytes at: "<<(
void*)outputBuffer);
70 for (
const const_registered_data& el : input_userdata ) {
71 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST
72 MPI_Pack(const_cast<void*>(el.p_data),el.count,el.dt,
76 outputBuffer,outputBufferSize,
77 &outputBufferPosition,COMM_G4COMMAND_);
79 assert(outputBufferSize==outputBufferPosition);
80 COMM_G4COMMAND_.Send(outputBuffer , outputBufferSize , MPI::PACKED ,
82 bytesSent+=outputBufferSize;
83 DMSG(2 ,
"G4VUserMPIrunMerger::Send() : Done ");
89 DMSG( 1 ,
"G4VUserMPIrunMerger::Receive(...) , this rank : "
90 <<MPI::COMM_WORLD.Get_rank()<<
" and receiving from : "<<source);
98 const G4int newbuffsize = status.Get_count(MPI::PACKED);
99 DMSG(2,
"Preparing to receive buffer of size: "<<newbuffsize);
100 char*
buffer = outputBuffer;
101 if ( newbuffsize > outputBufferSize ) {
102 DMSG(3,
"New larger buffer expected, resize");
104 delete[] outputBuffer;
105 buffer =
new char[newbuffsize];
108 std::fill(buffer,buffer+newbuffsize,0);
112 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MPI::PACKED,source,
114 DMSG(3,
"Buffer Size: "<<outputBufferSize<<
" bytes at: "<<(
void*)outputBuffer);
115 output_userdata.clear();
118 if ( aNewRun ==
nullptr ) aNewRun =
new G4Run;
123 for (
const registered_data& el : output_userdata ) {
124 MPI_Unpack(outputBuffer,outputBufferSize,&outputBufferPosition,
125 el.p_data,el.count,el.dt,COMM_G4COMMAND_);
131 run->
Merge( aNewRun );
139 DMSG(0,
"G4VUserMPIrunMerger::Merge called");
140 const unsigned int myrank = MPI::COMM_WORLD.Get_rank();
141 commSize = MPI::COMM_WORLD.Get_size();
142 if ( commSize == 1 ) {
143 DMSG(1,
"Comm world size is 1, nothing to do");
146 COMM_G4COMMAND_ = MPI::COMM_WORLD.Dup();
148 const G4double sttime = MPI::Wtime();
151 typedef std::function<void(unsigned int)> handler_t;
152 using std::placeholders::_1;
155 std::function<void(void)> barrier =
156 std::bind(&MPI::Intracomm::Barrier,&COMM_G4COMMAND_);
157 G4mpi::Merge( sender , receiver , barrier , commSize , myrank );
174 const G4double elapsed = MPI::Wtime() - sttime;
176 COMM_G4COMMAND_.Reduce(&bytesSent,&total,1,MPI::LONG,MPI::SUM,
178 if ( verbose > 0 && myrank == destinationRank ) {
180 G4cout<<
"G4VUserMPIrunMerger::Merge() - data transfer performances: "
181 <<double(total)/1000./elapsed<<
" kB/s"
182 <<
" (Total Data Transfer= "<<double(total)/1000<<
" kB in "
183 <<elapsed<<
" s)."<<
G4endl;
188 COMM_G4COMMAND_.Free();
189 DMSG(0,
"G4VUserMPIrunMerger::Merge done");
int MPI_Unpack(const void *, int, int *, void *, int, MPI_Datatype, MPI_Comm)
virtual void Merge(const G4Run *)
void Merge(std::function< void(unsigned int)> senderF, std::function< void(unsigned int)> receiverF, std::function< void(void)> barrierF, unsigned int commSize, unsigned int myrank)
G4GLOB_DLL std::ostream G4cout
G4int GetNumberOfEvent() const
int MPI_Pack(const void *, int, MPI_Datatype, void *, int, int *, MPI_Comm)
virtual G4Run * UnPack()=0
void Send(const unsigned int destination)
G4double total(Particle const *const p1, Particle const *const p2)
void Receive(const unsigned int source)
void OutputUserData(void *input_data, const MPI::Datatype &dt, int count)
virtual void RecordEvent(const G4Event *)
static G4String Status(G4StepStatus stps)
void SetupOutputBuffer(char *buff, G4int size, G4int position)
void InputUserData(void *input_data, const MPI::Datatype &dt, int count)