Skip to content
Snippets Groups Projects
Commit 3096c266 authored by Markus Blatt's avatar Markus Blatt
Browse files

Removed bug in accessing the send and receive buffers of

BufferedCommunicators:

In the MessageInformation the start (offset from beginning of the
buffer) has to be in number of values of type Type as we operate on a
Type* array. In contrary the size has to be in bytes, as MPI works on
void*.

[[Imported from SVN: r1884]]
parent 705c1040
Branches
Tags
No related merge requests found
......@@ -29,6 +29,7 @@ namespace Dune
/**
* @brief Flag for marking indexed data structures where data at
* each index is of the same size.
* @see VariableSize
*/
struct SizeOne
{};
......@@ -36,6 +37,7 @@ namespace Dune
/**
* @brief Flag for marking indexed data structures where the data at each index may
* be a variable multiple of another type.
* @see SizeOne
*/
struct VariableSize
{};
......@@ -99,7 +101,15 @@ namespace Dune
{};
/**
* @brief An utility class for communicating distributed data structures.
* @brief An utility class for communicating distributed data structures via MPI datatypes.
*
* This communicator creates special MPI datatypes that address the non contiguous elements
* to be send and received. The idea was to prevent the copying to an additional buffer and
* the mpi implementation decide whether to allocate buffers or use buffers offered by the
* interconnection network.
*
* Unfortunately the implementation of MPI datatypes seems to be poor. Therefore for most MPI
* implementations using a BufferedCommunicator will be more efficient.
*/
template<typename TG, typename TA, int N=100>
class DatatypeCommunicator : public InterfaceBuilder<TG,TA,N>
......@@ -241,7 +251,7 @@ namespace Dune
void createDataTypes(const T1& source, const T2& destination, V& data);
/**
* @brief initiates the sending and receive.
* @brief Initiates the sending and receive.
*/
void sendRecv(MPI_Request* req);
......@@ -250,6 +260,11 @@ namespace Dune
*/
struct IndexedTypeInformation
{
/**
* @brief Allocate space for setting up the MPI datatype.
*
* @param i The number of values the datatype will have.
*/
void build(int i)
{
length = new int[i];
......@@ -257,14 +272,27 @@ namespace Dune
size = i;
}
/**
* @brief Free the allocated space.
*/
void free()
{
delete[] length;
delete[] displ;
}
/** @brief The number of values at each index. */
int* length;
/** @brief The displacement at each index. */
MPI_Aint* displ;
/**
* @brief The number of elements we send.
* In case of variable sizes this will differ from
* size.
*/
int elements;
/**
* @param The number of indices in the data type.
*/
int size;
};
......@@ -276,13 +304,28 @@ namespace Dune
template<class V>
struct MPIDatatypeInformation
{
/**
* @brief Constructor.
* @param data The data we construct an MPI data type for.
*/
MPIDatatypeInformation(const V& data) : data_(data)
{}
/**
* @brief Reserver space for the information about the datatype.
* @param proc The rank of the process this information is for.
* @param size The number of indices the datatype will contain.
*/
void reserve(int proc, int size)
{
information_[proc].build(size);
}
/**
* @brief Add a new index to the datatype.
* @param proc The rank of the process this index is send to
* or received from.
* @param local The index to add.
*/
void add(int proc, int local)
{
IndexedTypeInformation& info=information_[proc];
......@@ -293,41 +336,194 @@ namespace Dune
info.elements++;
}
/**
* @brief The information about the datatypes to send to or
* receive from each process.
*/
std::map<int,IndexedTypeInformation> information_;
/**
* @brief A representative of the indexed data we send.
*/
const V& data_;
};
};
/**
* @brief A communicator that uses buffers to gather and scatter
* the data to be send or received.
*
* Before the data is sent it it copied to a consecutive buffer and
* then that buffer is sent.
* The data is received in another buffer and then copied to the actual
* position.
*/
template<typename TG, typename TA, int N=100>
class BufferedCommunicator
{
public:
/**
* @brief The type of the global index.
*/
typedef TG GlobalIndex;
/**
* @brief The type of the attributes.
*/
typedef TA Attribute;
/**
* @brief Constructor.
*/
BufferedCommunicator();
/**
* @brief Build the buffers and information for the communication process.
*
*
* @param interface The interface that defines what indices are to be communicated.
*/
template<class Data>
typename EnableIf<SameType<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::Type
build(const Interface<TG,TA,N>& interface);
build(const Interface<GlobalIndex,Attribute,N>& interface);
/**
* @brief Build the buffers and information for the communication process.
*
* @param source The source in a forward send. The values will be copied from here to the send buffers.
* @param target The target in a forward send. The received values will be copied to here.
* @param interface The interface that defines what indices are to be communicated.
*/
template<class Data>
void build(const Data& source, const Data& dest, const Interface<TG,TA,N>& interface);
void build(const Data& source, const Data& target, const Interface<TG,TA,N>& interface);
/**
* @brief Send from source to target.
*
* The template parameter GatherScatter has to have a static method
* <pre>
* // Gather the data at index index of data
* static const typename CommPolicy<Data>::IndexedType>& gather(Data& data, int index);
*
* // Scatter the value at a index of data
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is SizeOne
* and
*
* <pre>
* static onst typename CommPolicy<Data>::IndexedType> gather(Data& data, int index, int subindex);
*
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index, int subindex);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is VariableSize. Here subindex is the
* subindex of the block at index.
* @warning The source and target data have to have the same layout as the ones given
* to the build function in case of variable size values at the indices.
* @param source The values will be copied from here to the send buffers.
* @param target The received values will be copied to here.
*/
template<class GatherScatter, class Data>
void forward(Data& source, Data& dest);
void forward(const Data& source, Data& dest);
/**
* @brief Communicate in the reverse direction, i.e. send from target to source.
*
* The template parameter GatherScatter has to have a static method
* <pre>
* // Gather the data at index index of data
* static const typename CommPolicy<Data>::IndexedType>& gather(Data& data, int index);
*
* // Scatter the value at a index of data
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is SizeOne
* and
*
* <pre>
* static onst typename CommPolicy<Data>::IndexedType> gather(Data& data, int index, int subindex);
*
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index, int subindex);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is VariableSize. Here subindex is the
* subindex of the block at index.
* @warning The source and target data have to have the same layout as the ones given
* to the build function in case of variable size values at the indices.
* @param target The values will be copied from here to the send buffers.
* @param source The received values will be copied to here.
*/
template<class GatherScatter, class Data>
void backward(Data& source, Data& dest);
void backward(Data& source, const Data& dest);
/**
* @brief Forward send where target and source are the same.
*
* The template parameter GatherScatter has to have a static method
* <pre>
* // Gather the data at index index of data
* static const typename CommPolicy<Data>::IndexedType>& gather(Data& data, int index);
*
* // Scatter the value at a index of data
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is SizeOne
* and
*
* <pre>
* static onst typename CommPolicy<Data>::IndexedType> gather(Data& data, int index, int subindex);
*
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index, int subindex);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is VariableSize. Here subindex is the
* subindex of the block at index.
* @param data Source and target of the communication.
*/
template<class GatherScatter, class Data>
void forward(Data& data);
/**
* @brief Backward send where target and source are the same.
*
* The template parameter GatherScatter has to have a static method
* <pre>
* // Gather the data at index index of data
* static const typename CommPolicy<Data>::IndexedType>& gather(Data& data, int index);
*
* // Scatter the value at a index of data
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is SizeOne
* and
*
* <pre>
* static onst typename CommPolicy<Data>::IndexedType> gather(Data& data, int index, int subindex);
*
* static void scatter(Data& data, typename CommPolicy<Data>::IndexedType> value,
* int index, int subindex);
* </pre>
* in the case where CommPolicy<Data>::IndexedTypeFlag is VariableSize. Here subindex is the
* subindex of the block at index.
* @param data Source and target of the communication.
*/
template<class GatherScatter, class Data>
void backward(Data& data);
/**
* @brief Free the allocated memory (i.e. buffers and message information.
*/
void free();
/**
* @brief Destructor.
*/
~BufferedCommunicator();
private:
......@@ -339,16 +535,46 @@ namespace Dune
struct MessageSizeCalculator
{};
/**
* @brief Functor for message size caculation for datatypes
* where at each index is only one value.
*/
template<class Data>
struct MessageSizeCalculator<Data,SizeOne>
{
/**
* @brief Calculate the number of values in message
* @param info The information about the interface corresponding
* to the message.
* @return The number of values in th message.
*/
inline int operator()(const InterfaceInformation& info) const;
/**
* @brief Calculate the number of values in message
*
* @param info The information about the interface corresponding
* to the message.
* @param data ignored.
* @return The number of values in th message.
*/
inline int operator()(const Data& data, const InterfaceInformation& info) const;
};
/**
* @brief Functor for message size caculation for datatypes
* where at each index can be a variable number of values.
*/
template<class Data>
struct MessageSizeCalculator<Data,VariableSize>
{
/**
* @brief Calculate the number of values in message
*
* @param info The information about the interface corresponding
* to the message.
* @param data A representative of the data we send.
* @return The number of values in th message.
*/
inline int operator()(const Data& data, const InterfaceInformation& info) const;
};
......@@ -359,20 +585,74 @@ namespace Dune
struct MessageGatherer
{};
/**
* @brief Functor for message data gathering for datatypes
* where at each index is only one value.
*/
template<class Data, class GatherScatter, bool send>
struct MessageGatherer<Data,GatherScatter,send,SizeOne>
{
/** @brief The type of the values we send. */
typedef typename CommPolicy<Data>::IndexedType Type;
inline void operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer) const;
/**
* @brief The type of the functor that does the actual copying
* during the data Scattering.
*/
typedef GatherScatter Gatherer;
enum {
/**
* @brief The communication mode
*
**True if this was a forward commuication.
*/
forward=send
};
/**
* @brief Copies the values to send into the buffer.
* @param interface The interface used in the send.
* @param data The data from which we copy the values.
* @param buffer The send buffer to copy to.
* @param bufferSize The size of the buffer in bytes. For checks.
*/
inline void operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer, size_t bufferSize) const;
};
/**
* @brief Functor for message data scattering for datatypes
* where at each index can be a variable size of values
*/
template<class Data, class GatherScatter, bool send>
struct MessageGatherer<Data,GatherScatter,send,VariableSize>
{
/** @brief The type of the values we send. */
typedef typename CommPolicy<Data>::IndexedType Type;
inline void operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer) const;
/**
* @brief The type of the functor that does the actual copying
* during the data Scattering.
*/
typedef GatherScatter Gatherer;
enum {
/**
* @brief The communication mode
*
**True if this was a forward commuication.
*/
forward=send
};
/**
* @brief Copies the values to send into the buffer.
* @param interface The interface used in the send.
* @param data The data from which we copy the values.
* @param buffer The send buffer to copy to.
* @param bufferSize The size of the buffer in bytes. For checks.
*/
inline void operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer, size_t bufferSize) const;
};
/**
......@@ -382,36 +662,101 @@ namespace Dune
struct MessageScatterer
{};
/**
* @brief Functor for message data gathering for datatypes
* where at each index is only one value.
*/
template<class Data, class GatherScatter, bool send>
struct MessageScatterer<Data,GatherScatter,send,SizeOne>
{
/** @brief The type of the values we send. */
typedef typename CommPolicy<Data>::IndexedType Type;
/**
* @brief The type of the functor that does the actual copying
* during the data Scattering.
*/
typedef GatherScatter Scatterer;
enum {
/**
* @brief The communication mode
*
**True if this was a forward commuication.
*/
forward=send
};
/**
* @brief Copy the message data from the receive buffer to the data.
* @param interface The interface used in the send.
* @param data The data to which we copy the values.
* @param buffer The receive buffer to copy from.
* @param proc The rank of the process the message is from.
*/
inline void operator()(const Interface<TG,TA,N>& interface, Data& data, Type* buffer, const int& proc) const;
};
/**
* @brief Functor for message data scattering for datatypes
* where at each index can be a variable size of values
*/
template<class Data, class GatherScatter, bool send>
struct MessageScatterer<Data,GatherScatter,send,VariableSize>
{
/** @brief The type of the values we send. */
typedef typename CommPolicy<Data>::IndexedType Type;
/**
* @brief The type of the functor that does the actual copying
* during the data Scattering.
*/
typedef GatherScatter Scatterer;
enum {
/**
* @brief The communication mode
*
**True if this was a forward commuication.
*/
forward=send
};
/**
* @brief Copy the message data from the receive buffer to the data.
* @param interface The interface used in the send.
* @param data The data to which we copy the values.
* @param buffer The receive buffer to copy from.
* @param proc The rank of the process the message is from.
*/
inline void operator()(const Interface<TG,TA,N>& interface, Data& data, Type* buffer, const int& proc) const;
};
/**
* @brief Information about a message to send.
*/
struct MessageInformation
{
/** @brief Constructor. */
MessageInformation()
: start_(0), size_(0)
{}
/**
* @brief Constructor.
* @param start The start of the message in the global buffer.
* Not in bytes but in number of values from the beginning of
* the buffer
* @param size The size of the message in bytes.
*/
MessageInformation(size_t start, size_t size)
: start_(start), size_(size)
{}
/**
* @brief Start of the message in the buffer counted in bytes.
* @brief Start of the message in the buffer counted in number of value.
*/
size_t start_;
/**
* @brief Number of entries in message (not in bytes!).
* @brief Number of bytes in the message.
*/
size_t size_;
};
......@@ -432,6 +777,10 @@ namespace Dune
* @brief Communication buffers.
*/
char* buffers_[2];
/**
* @brief The size of the communication buffers
*/
size_t bufferSize_[2];
enum {
/**
......@@ -449,7 +798,7 @@ namespace Dune
* @brief Send and receive Data.
*/
template<class GatherScatter, bool FORWARD, class Data>
void sendRecv(Data& source, Data& d);
void sendRecv(const Data& source, Data& target);
};
......@@ -669,6 +1018,8 @@ namespace Dune
{
buffers_[0]=0;
buffers_[1]=0;
bufferSize_[0]=0;
bufferSize_[1]=0;
}
......@@ -680,24 +1031,31 @@ namespace Dune
typedef typename Interface<TG,TA,N>::InformationMap::const_iterator const_iterator;
typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
const const_iterator end = interface.interfaces().end();
int sendStart=0, recvStart=0;
int lrank;
MPI_Comm_rank(interface.communicator(), &lrank);
bufferSize_[0]=0;
bufferSize_[1]=0;
for(const_iterator interfacePair = interface.interfaces().begin();
interfacePair != end; ++interfacePair) {
int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
messageInformation_[interfacePair->first]=
std::make_pair(MessageInformation(sendStart,
noSend),
MessageInformation(recvStart,
noRecv));
sendStart += noSend;
recvStart += noRecv;
messageInformation_.insert(std::make_pair(interfacePair->first,
std::make_pair(MessageInformation(bufferSize_[0],
noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
MessageInformation(bufferSize_[1],
noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
bufferSize_[0] += noSend;
bufferSize_[1] += noRecv;
}
// allocate the buffers
buffers_[0] = new char[sendStart];
buffers_[1] = new char[recvStart];
bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
buffers_[0] = new char[bufferSize_[0]];
buffers_[1] = new char[bufferSize_[1]];
interface_ = &interface;
}
......@@ -709,24 +1067,29 @@ namespace Dune
typedef typename Interface<TG,TA,N>::InformationMap::const_iterator const_iterator;
typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
const const_iterator end = interface.interfaces().end();
int sendStart=0, recvStart=0;
bufferSize_[0]=0;
bufferSize_[1]=0;
for(const_iterator interfacePair = interface.interfaces().begin();
interfacePair != end; ++interfacePair) {
int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
messageInformation_[interfacePair->first]=(std::make_pair(MessageInformation(sendStart,
noSend),
MessageInformation(recvStart,
noRecv)));
sendStart += noSend;
recvStart += noRecv;
messageInformation_.insert(std::make_pair(interfacePair->first,
std::make_pair(MessageInformation(bufferSize_[0],
noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
MessageInformation(bufferSize_[1],
noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
bufferSize_[0] += noSend;
bufferSize_[1] += noRecv;
}
bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
// allocate the buffers
buffers_[0] = new char[sendStart];
buffers_[1] = new char[recvStart];
buffers_[0] = new char[bufferSize_[0]];
buffers_[1] = new char[bufferSize_[1]];
interface_ = &interface;
}
......@@ -752,7 +1115,7 @@ namespace Dune
inline int BufferedCommunicator<TG,TA,N>::MessageSizeCalculator<Data,SizeOne>::operator()
(const InterfaceInformation& info) const
{
return info.size()*sizeof(typename CommPolicy<Data>::IndexedType);
return info.size();
}
template<typename TG, typename TA, int N>
......@@ -773,28 +1136,38 @@ namespace Dune
for(int i=0; i < info.size(); i++)
entries += CommPolicy<Data>::getSize(data,info[i]);
return entries * sizeof(typename CommPolicy<Data>::IndexedType);
return entries;
}
template<typename TG, typename TA, int N>
template<class Data, class GatherScatter, bool FORWARD>
inline void BufferedCommunicator<TG,TA,N>::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const Interface<TG,TA,N>& interface,const Data& data, Type* buffer) const
inline void BufferedCommunicator<TG,TA,N>::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const Interface<TG,TA,N>& interface,const Data& data, Type* buffer, size_t bufferSize) const
{
typedef typename Interface<TG,TA,N>::InformationMap::const_iterator
const_iterator;
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
const const_iterator end = interface.interfaces().end();
int offset=0;
size_t index=0;
for(const_iterator interfacePair = interface.interfaces().begin();
interfacePair != end; ++interfacePair) {
int size = forward ? interfacePair->second.first.size() :
interfacePair->second.second.size();
int proc = interfacePair->first();
for(int i=0, index=0; i < size; i++) {
for(int i=0; i < size; i++) {
int local = forward ? interfacePair->second->first[i] :
interfacePair->second->second[i];
for(int j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++)
for(int j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
#ifdef DUNE_ISTL_WITH_CHECKING
assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
#endif
buffer[index]=GatherScatter::gather(data, local, j);
}
}
}
......@@ -802,19 +1175,27 @@ namespace Dune
template<typename TG, typename TA, int N>
template<class Data, class GatherScatter, bool FORWARD>
inline void BufferedCommunicator<TG,TA,N>::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer) const
inline void BufferedCommunicator<TG,TA,N>::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const Interface<TG,TA,N>& interface, const Data& data, Type* buffer, size_t bufferSize) const
{
typedef typename Interface<TG,TA,N>::InformationMap::const_iterator
const_iterator;
const const_iterator end = interface.interfaces().end();
size_t index = 0;
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
for(const_iterator interfacePair = interface.interfaces().begin();
interfacePair != end; ++interfacePair) {
size_t size = FORWARD ? interfacePair->second.first.size() :
interfacePair->second.second.size();
for(size_t i=0; i < size; i++) {
#ifdef DUNE_ISTL_WITH_CHECKING
assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
#endif
buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
interfacePair->second.second[i]);
}
......@@ -873,36 +1254,45 @@ namespace Dune
template<typename TG, typename TA, int N>
template<class GatherScatter, class Data>
void BufferedCommunicator<TG,TA,N>::forward(Data& source, Data& dest)
void BufferedCommunicator<TG,TA,N>::forward(const Data& source, Data& dest)
{
this->template sendRecv<GatherScatter,true>(source, dest);
}
template<typename TG, typename TA, int N>
template<class GatherScatter, class Data>
void BufferedCommunicator<TG,TA,N>::backward(Data& source, Data& dest)
void BufferedCommunicator<TG,TA,N>::backward(Data& source, const Data& dest)
{
this->template sendRecv<GatherScatter,false>(dest, source);
}
template<typename TG, typename TA, int N>
template<class GatherScatter, bool FORWARD, class Data>
void BufferedCommunicator<TG,TA,N>::sendRecv(Data& source, Data& dest)
void BufferedCommunicator<TG,TA,N>::sendRecv(const Data& source, Data& dest)
{
typedef typename CommPolicy<Data>::IndexedType Type;
int rank, lrank;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
typedef typename CommPolicy<Data>::IndexedType Type;
Type *sendBuffer, *recvBuffer;
size_t sendBufferSize, recvBufferSize;
if(FORWARD) {
sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
sendBufferSize = bufferSize_[0];
recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
recvBufferSize = bufferSize_[1];
}else{
sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
sendBufferSize = bufferSize_[1];
recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
recvBufferSize = bufferSize_[0];
}
typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (*interface_, source, sendBuffer);
MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (*interface_, source, sendBuffer, sendBufferSize);
MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
......@@ -916,51 +1306,68 @@ namespace Dune
for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
processMap[i]=info->first;
if(FORWARD)
if(FORWARD) {
int proc = info->first;
int start = info->second.second.start_;
int size = info->second.second.size_;
assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
MPI_BYTE, info->first, commTag_, interface_->communicator(),
recvRequests+i);
else
}else{
assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
MPI_BYTE, info->first, commTag_, interface_->communicator(),
recvRequests+i);
}
}
// now the send requests
i=0;
for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
if(FORWARD)
if(FORWARD) {
assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
MPI_BYTE, info->first, commTag_, interface_->communicator(),
sendRequests+i);
else
}else{
assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
MPI_BYTE, info->first, commTag_, interface_->communicator(),
sendRequests+i);
}
// Wait for completion of receive and immediately start scatter
i=0;
int success=1;
int finished=-1;
int rank;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Status status[messageInformation_.size()];
MPI_Waitall(messageInformation_.size(), recvRequests, status);
int success = 1;
int finished = MPI_UNDEFINED;
MPI_Status status; //[messageInformation_.size()];
//MPI_Waitall(messageInformation_.size(), recvRequests, status);
for(i=0; i< messageInformation_.size(); i++) {
// int ret=MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
if(status[i].MPI_ERROR==MPI_SUCCESS) {
MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (*interface_, dest, recvBuffer, processMap[i]);
recvRequests[i]=MPI_REQUEST_NULL;
MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
assert(finished != MPI_UNDEFINED);
if(status.MPI_ERROR==MPI_SUCCESS) {
int& proc = processMap[finished];
typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
assert(infoIter != messageInformation_.end());
MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
assert(info.start_+info.size_ <= recvBufferSize);
MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (*interface_, dest, recvBuffer+info.start_, proc);
}else{
std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
success=0;
}
}
MPI_Status recvStatus;
// Wait for completion of sends
for(i=0; i< messageInformation_.size(); i++)
if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, status+i)) {
if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
success=0;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment