A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * SPDX-License-Identifier: GPL-2.0-only
3 *
4 * Author: George Riley <riley@ece.gatech.edu>
5 */
6
7/**
8 * \file
9 * \ingroup mpi
10 * Implementation of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
11 */
12
13// This object contains static methods that provide an easy interface
14// to the necessary MPI information.
15
17
18#include "mpi-interface.h"
19#include "mpi-receiver.h"
20
21#include "ns3/log.h"
22#include "ns3/net-device.h"
23#include "ns3/node-list.h"
24#include "ns3/node.h"
25#include "ns3/nstime.h"
26#include "ns3/simulator-impl.h"
27#include "ns3/simulator.h"
28
29#include <iomanip>
30#include <iostream>
31#include <list>
32#include <mpi.h>
33
34namespace ns3
35{
36
37NS_LOG_COMPONENT_DEFINE("GrantedTimeWindowMpiInterface");
38
39NS_OBJECT_ENSURE_REGISTERED(GrantedTimeWindowMpiInterface);
40
42{
43 m_buffer = nullptr;
44 m_request = MPI_REQUEST_NULL;
45}
46
48{
49 delete[] m_buffer;
50}
51
52uint8_t*
54{
55 return m_buffer;
56}
57
58void
59SentBuffer::SetBuffer(uint8_t* buffer)
60{
61 m_buffer = buffer;
62}
63
64MPI_Request*
66{
67 return &m_request;
68}
69
77
80MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
82
85{
86 static TypeId tid =
87 TypeId("ns3::GrantedTimeWindowMpiInterface").SetParent<Object>().SetGroupName("Mpi");
88 return tid;
89}
90
91void
93{
94 NS_LOG_FUNCTION(this);
95
96 for (uint32_t i = 0; i < GetSize(); ++i)
97 {
98 delete[] g_pRxBuffers[i];
99 }
100 delete[] g_pRxBuffers;
101 delete[] g_requests;
102
103 g_pendingTx.clear();
104}
105
112
119
126
133
134bool
139
140MPI_Comm
146
147void
149{
150 NS_LOG_FUNCTION(this << pargc << pargv);
151
152 NS_ASSERT(g_enabled == false);
153
154 // Initialize the MPI interface
155 MPI_Init(pargc, pargv);
156 Enable(MPI_COMM_WORLD);
157 g_mpiInitCalled = true;
158 g_enabled = true;
159}
160
161void
163{
164 NS_LOG_FUNCTION(this);
165
166 NS_ASSERT(g_enabled == false);
167
168 // Standard MPI practice is to duplicate the communicator for
169 // library to use. Library communicates in isolated communication
170 // context.
171 MPI_Comm_dup(communicator, &g_communicator);
172 g_freeCommunicator = true;
173
174 MPI_Barrier(g_communicator);
175
176 int mpiSystemId;
177 int mpiSize;
178 MPI_Comm_rank(g_communicator, &mpiSystemId);
179 MPI_Comm_size(g_communicator, &mpiSize);
180 g_sid = mpiSystemId;
181 g_size = mpiSize;
182
183 g_enabled = true;
184 // Post a non-blocking receive for all peers
185 g_pRxBuffers = new char*[g_size];
186 g_requests = new MPI_Request[g_size];
187 for (uint32_t i = 0; i < GetSize(); ++i)
188 {
189 g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
190 MPI_Irecv(g_pRxBuffers[i],
192 MPI_CHAR,
193 MPI_ANY_SOURCE,
194 0,
196 &g_requests[i]);
197 }
198}
199
200void
202 const Time& rxTime,
203 uint32_t node,
204 uint32_t dev)
205{
206 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
207
208 SentBuffer sendBuf;
209 g_pendingTx.push_back(sendBuf);
210 auto i = g_pendingTx.rbegin(); // Points to the last element
211
212 uint32_t serializedSize = p->GetSerializedSize();
213 auto buffer = new uint8_t[serializedSize + 16];
214 i->SetBuffer(buffer);
215 // Add the time, dest node and dest device
216 uint64_t t = rxTime.GetInteger();
217 auto pTime = reinterpret_cast<uint64_t*>(buffer);
218 *pTime++ = t;
219 auto pData = reinterpret_cast<uint32_t*>(pTime);
220 *pData++ = node;
221 *pData++ = dev;
222 // Serialize the packet
223 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
224
225 // Find the system id for the destination node
226 Ptr<Node> destNode = NodeList::GetNode(node);
227 uint32_t nodeSysId = destNode->GetSystemId();
228
229 MPI_Isend(reinterpret_cast<void*>(i->GetBuffer()),
230 serializedSize + 16,
231 MPI_CHAR,
232 nodeSysId,
233 0,
235 (i->GetRequest()));
236 g_txCount++;
237}
238
239void
241{
243
244 // Poll the non-block reads to see if data arrived
245 while (true)
246 {
247 int flag = 0;
248 int index = 0;
249 MPI_Status status;
250
251 MPI_Testany(MpiInterface::GetSize(), g_requests, &index, &flag, &status);
252 if (!flag)
253 {
254 break; // No more messages
255 }
256 int count;
257 MPI_Get_count(&status, MPI_CHAR, &count);
258 g_rxCount++; // Count this receive
259
260 // Get the meta data first
261 auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
262 uint64_t time = *pTime++;
263 auto pData = reinterpret_cast<uint32_t*>(pTime);
264 uint32_t node = *pData++;
265 uint32_t dev = *pData++;
266
267 Time rxTime(time);
268
269 count -= sizeof(time) + sizeof(node) + sizeof(dev);
270
271 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
272
273 // Find the correct node/device to schedule receive event
274 Ptr<Node> pNode = NodeList::GetNode(node);
275 Ptr<MpiReceiver> pMpiRec = nullptr;
276 uint32_t nDevices = pNode->GetNDevices();
277 for (uint32_t i = 0; i < nDevices; ++i)
278 {
279 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
280 if (pThisDev->GetIfIndex() == dev)
281 {
282 pMpiRec = pThisDev->GetObject<MpiReceiver>();
283 break;
284 }
285 }
286
287 NS_ASSERT(pNode && pMpiRec);
288
289 // Schedule the rx event
290 Simulator::ScheduleWithContext(pNode->GetId(),
291 rxTime - Simulator::Now(),
293 pMpiRec,
294 p);
295
296 // Re-queue the next read
297 MPI_Irecv(g_pRxBuffers[index],
299 MPI_CHAR,
300 MPI_ANY_SOURCE,
301 0,
303 &g_requests[index]);
304 }
305}
306
307void
309{
311
312 auto i = g_pendingTx.begin();
313 while (i != g_pendingTx.end())
314 {
315 MPI_Status status;
316 int flag = 0;
317 MPI_Test(i->GetRequest(), &flag, &status);
318 auto current = i; // Save current for erasing
319 i++; // Advance to next
320 if (flag)
321 { // This message is complete
322 g_pendingTx.erase(current);
323 }
324 }
325}
326
327void
329{
331
333 {
334 MPI_Comm_free(&g_communicator);
335 g_freeCommunicator = false;
336 }
337
338 // ns-3 should MPI finalize only if ns-3 was used to initialize
339 if (g_mpiInitCalled)
340 {
341 int flag = 0;
342 MPI_Initialized(&flag);
343 if (flag)
344 {
345 MPI_Finalize();
346 }
347 else
348 {
349 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
350 }
351 g_mpiInitCalled = false;
352 }
353
354 g_enabled = false;
355}
356
357} // namespace ns3
static void ReceiveMessages()
Check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
void Disable() override
Clean up the ns-3 parallel communications interface.
static void TestSendComplete()
Check for completed sends.
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
static uint32_t g_size
Size of the MPI COM_WORLD group.
static bool g_enabled
Has this interface been enabled.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
bool IsEnabled() override
Returns enabled state of parallel environment.
static MPI_Request * g_requests
Pending non-blocking receives.
static uint32_t g_rxCount
Total packets received.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
void Destroy() override
Deletes storage used by the parallel environment.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
Class to aggregate to a NetDevice if it supports MPI capability.
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static Ptr< Node > GetNode(uint32_t n)
Definition node-list.cc:240
A base class which provides memory management and object aggregation.
Definition object.h:78
Smart pointer class similar to boost::intrusive_ptr.
Tracks non-blocking sends.
MPI_Request m_request
The MPI request handle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition simulator.h:577
static Time Now()
Return the current simulation virtual time.
Definition simulator.cc:197
Simulation virtual time values and global simulation resolution.
Definition nstime.h:94
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:444
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:434
a unique identifier for an interface.
Definition type-id.h:48
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition type-id.cc:1001
int nDevices
Number of end device nodes to create.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition assert.h:55
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition log.h:191
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition object-base.h:35
Ptr< T > Create(Ts &&... args)
Create class instances by constructors with varying numbers of arguments and return them by Ptr.
Definition ptr.h:436
Declaration of class ns3::MpiInterface.
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation