Index: src/mpi/model/mpi-interface.cc |
=================================================================== |
--- a/src/mpi/model/mpi-interface.cc |
+++ b/src/mpi/model/mpi-interface.cc |
@@ -1,5 +1,7 @@ |
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
/* |
+ * Copyright 2013. Lawrence Livermore National Security, LLC. |
+ * |
* This program is free software; you can redistribute it and/or modify |
* it under the terms of the GNU General Public License version 2 as |
* published by the Free Software Foundation; |
@@ -13,297 +15,115 @@ |
* along with this program; if not, write to the Free Software |
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
* |
- * Author: George Riley <riley@ece.gatech.edu> |
+ * Author: Steven Smith <smith84@llnl.gov> |
+ * |
*/ |
-// This object contains static methods that provide an easy interface |
-// to the necessary MPI information. |
+#include "mpi-interface.h" |
-#include <iostream> |
-#include <iomanip> |
-#include <list> |
+#include <ns3/global-value.h> |
+#include <ns3/string.h> |
+#include <ns3/log.h> |
-#include "mpi-interface.h" |
-#include "mpi-receiver.h" |
+#include "null-message-mpi-interface.h" |
+#include "granted-time-window-mpi-interface.h" |
-#include "ns3/node.h" |
-#include "ns3/node-list.h" |
-#include "ns3/net-device.h" |
-#include "ns3/simulator.h" |
-#include "ns3/simulator-impl.h" |
-#include "ns3/nstime.h" |
- |
-#ifdef NS3_MPI |
-#include <mpi.h> |
-#endif |
+NS_LOG_COMPONENT_DEFINE ("MpiInterface"); |
namespace ns3 { |
-SentBuffer::SentBuffer () |
-{ |
- m_buffer = 0; |
- m_request = 0; |
-} |
- |
-SentBuffer::~SentBuffer () |
-{ |
- delete [] m_buffer; |
-} |
- |
-uint8_t* |
-SentBuffer::GetBuffer () |
-{ |
- return m_buffer; |
-} |
- |
-void |
-SentBuffer::SetBuffer (uint8_t* buffer) |
-{ |
- m_buffer = buffer; |
-} |
- |
-#ifdef NS3_MPI |
-MPI_Request* |
-SentBuffer::GetRequest () |
-{ |
- return &m_request; |
-} |
-#endif |
- |
-uint32_t MpiInterface::m_sid = 0; |
-uint32_t MpiInterface::m_size = 1; |
-bool MpiInterface::m_initialized = false; |
-bool MpiInterface::m_enabled = false; |
-uint32_t MpiInterface::m_rxCount = 0; |
-uint32_t MpiInterface::m_txCount = 0; |
-std::list<SentBuffer> MpiInterface::m_pendingTx; |
- |
-#ifdef NS3_MPI |
-MPI_Request* MpiInterface::m_requests; |
-char** MpiInterface::m_pRxBuffers; |
-#endif |
+ParallelCommunicationInterface* MpiInterface::g_parallelCommunicationInterface = 0; |
void |
MpiInterface::Destroy () |
{ |
-#ifdef NS3_MPI |
- for (uint32_t i = 0; i < GetSize (); ++i) |
- { |
- delete [] m_pRxBuffers[i]; |
- } |
- delete [] m_pRxBuffers; |
- delete [] m_requests; |
- |
- m_pendingTx.clear (); |
-#endif |
-} |
- |
-uint32_t |
-MpiInterface::GetRxCount () |
-{ |
- return m_rxCount; |
-} |
- |
-uint32_t |
-MpiInterface::GetTxCount () |
-{ |
- return m_txCount; |
+ NS_ASSERT (g_parallelCommunicationInterface); |
+ g_parallelCommunicationInterface->Destroy (); |
} |
uint32_t |
MpiInterface::GetSystemId () |
{ |
- if (!m_initialized) |
- { |
- Simulator::GetImplementation (); |
- m_initialized = true; |
- } |
- return m_sid; |
+ if ( g_parallelCommunicationInterface ) |
+ return g_parallelCommunicationInterface->GetSystemId (); |
+ else |
+ return 0; |
} |
uint32_t |
MpiInterface::GetSize () |
{ |
- if (!m_initialized) |
- { |
- Simulator::GetImplementation (); |
- m_initialized = true; |
- } |
- return m_size; |
+ if ( g_parallelCommunicationInterface ) |
+ return g_parallelCommunicationInterface->GetSize (); |
+ else |
+ return 1; |
} |
bool |
MpiInterface::IsEnabled () |
{ |
- if (!m_initialized) |
+ if (g_parallelCommunicationInterface) |
{ |
- Simulator::GetImplementation (); |
- m_initialized = true; |
+ return g_parallelCommunicationInterface->IsEnabled (); |
} |
- return m_enabled; |
+ else |
+ { |
+ return false; |
+ } |
} |
void |
MpiInterface::Enable (int* pargc, char*** pargv) |
{ |
-#ifdef NS3_MPI |
- // Initialize the MPI interface |
- MPI_Init (pargc, pargv); |
- MPI_Barrier (MPI_COMM_WORLD); |
- MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid)); |
- MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size)); |
- m_enabled = true; |
- m_initialized = true; |
- // Post a non-blocking receive for all peers |
- m_pRxBuffers = new char*[m_size]; |
- m_requests = new MPI_Request[m_size]; |
- for (uint32_t i = 0; i < GetSize (); ++i) |
+ StringValue simulationTypeValue; |
+ bool useDefault = true; |
+ |
+ if (GlobalValue::GetValueByNameFailSafe ("SimulatorImplementationType", simulationTypeValue)) |
{ |
- m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE]; |
- MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, |
- MPI_COMM_WORLD, &m_requests[i]); |
+ std::string simulationType = simulationTypeValue.Get (); |
+ |
+ // Set communication interface based on the simulation type being used. |
+ // Defaults to synchronous. |
+ if (simulationType.compare ("ns3::NullMessageSimulatorImpl") == 0) |
+ { |
+ g_parallelCommunicationInterface = new NullMessageMpiInterface (); |
+ useDefault = false; |
+ } |
+ else if (simulationType.compare ("ns3::DistributedSimulatorImpl") == 0) |
+ { |
+ g_parallelCommunicationInterface = new GrantedTimeWindowMpiInterface (); |
+ useDefault = false; |
+ } |
} |
-#else |
- NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
-#endif |
+ |
+ // User did not specify a valid parallel simulator; use the default. |
+ if (useDefault) |
+ { |
+ g_parallelCommunicationInterface = new GrantedTimeWindowMpiInterface (); |
+ GlobalValue::Bind ("SimulatorImplementationType", |
+ StringValue ("ns3::DistributedSimulatorImpl")); |
+ NS_LOG_WARN ("SimulatorImplementationType was set to non-parallel simulator; setting type to ns3::DistributedSimulatorImp"); |
+ } |
+ |
+ g_parallelCommunicationInterface->Enable (pargc, pargv); |
} |
void |
MpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev) |
{ |
-#ifdef NS3_MPI |
- SentBuffer sendBuf; |
- m_pendingTx.push_back (sendBuf); |
- std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element |
- |
- uint32_t serializedSize = p->GetSerializedSize (); |
- uint8_t* buffer = new uint8_t[serializedSize + 16]; |
- i->SetBuffer (buffer); |
- // Add the time, dest node and dest device |
- uint64_t t = rxTime.GetNanoSeconds (); |
- uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer); |
- *pTime++ = t; |
- uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
- *pData++ = node; |
- *pData++ = dev; |
- // Serialize the packet |
- p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize); |
- |
- // Find the system id for the destination node |
- Ptr<Node> destNode = NodeList::GetNode (node); |
- uint32_t nodeSysId = destNode->GetSystemId (); |
- |
- MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId, |
- 0, MPI_COMM_WORLD, (i->GetRequest ())); |
- m_txCount++; |
-#else |
- NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
-#endif |
+ NS_ASSERT (g_parallelCommunicationInterface); |
+ g_parallelCommunicationInterface->SendPacket (p, rxTime, node, dev); |
} |
+ |
void |
-MpiInterface::ReceiveMessages () |
-{ // Poll the non-block reads to see if data arrived |
-#ifdef NS3_MPI |
- while (true) |
- { |
- int flag = 0; |
- int index = 0; |
- MPI_Status status; |
- |
- MPI_Testany (GetSize (), m_requests, &index, &flag, &status); |
- if (!flag) |
- { |
- break; // No more messages |
- } |
- int count; |
- MPI_Get_count (&status, MPI_CHAR, &count); |
- m_rxCount++; // Count this receive |
- |
- // Get the meta data first |
- uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]); |
- uint64_t nanoSeconds = *pTime++; |
- uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
- uint32_t node = *pData++; |
- uint32_t dev = *pData++; |
- |
- Time rxTime = NanoSeconds (nanoSeconds); |
- |
- count -= sizeof (nanoSeconds) + sizeof (node) + sizeof (dev); |
- |
- Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true); |
- |
- // Find the correct node/device to schedule receive event |
- Ptr<Node> pNode = NodeList::GetNode (node); |
- Ptr<MpiReceiver> pMpiRec = 0; |
- uint32_t nDevices = pNode->GetNDevices (); |
- for (uint32_t i = 0; i < nDevices; ++i) |
- { |
- Ptr<NetDevice> pThisDev = pNode->GetDevice (i); |
- if (pThisDev->GetIfIndex () == dev) |
- { |
- pMpiRec = pThisDev->GetObject<MpiReceiver> (); |
- break; |
- } |
- } |
- |
- NS_ASSERT (pNode && pMpiRec); |
- |
- // Schedule the rx event |
- Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (), |
- &MpiReceiver::Receive, pMpiRec, p); |
- |
- // Re-queue the next read |
- MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, |
- MPI_COMM_WORLD, &m_requests[index]); |
- } |
-#else |
- NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
-#endif |
+MpiInterface::Disable () |
+{ |
+ NS_ASSERT (g_parallelCommunicationInterface); |
+ g_parallelCommunicationInterface->Disable (); |
+ delete g_parallelCommunicationInterface; |
+ g_parallelCommunicationInterface = 0; |
} |
-void |
-MpiInterface::TestSendComplete () |
-{ |
-#ifdef NS3_MPI |
- std::list<SentBuffer>::iterator i = m_pendingTx.begin (); |
- while (i != m_pendingTx.end ()) |
- { |
- MPI_Status status; |
- int flag = 0; |
- MPI_Test (i->GetRequest (), &flag, &status); |
- std::list<SentBuffer>::iterator current = i; // Save current for erasing |
- i++; // Advance to next |
- if (flag) |
- { // This message is complete |
- m_pendingTx.erase (current); |
- } |
- } |
-#else |
- NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
-#endif |
-} |
- |
-void |
-MpiInterface::Disable () |
-{ |
-#ifdef NS3_MPI |
- int flag = 0; |
- MPI_Initialized (&flag); |
- if (flag) |
- { |
- MPI_Finalize (); |
- m_enabled = false; |
- m_initialized = false; |
- } |
- else |
- { |
- NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first"); |
- } |
-#else |
- NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
-#endif |
-} |
- |
} // namespace ns3 |