A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
distributed-simulator-impl.cc
Go to the documentation of this file.
1/*
2 * SPDX-License-Identifier: GPL-2.0-only
3 *
4 * Author: George Riley <riley@ece.gatech.edu>
5 *
6 */
7
8/**
9 * \file
10 * \ingroup mpi
11 * Implementation of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
12 */
13
15
17#include "mpi-interface.h"
18
19#include "ns3/assert.h"
20#include "ns3/channel.h"
21#include "ns3/event-impl.h"
22#include "ns3/log.h"
23#include "ns3/node-container.h"
24#include "ns3/pointer.h"
25#include "ns3/ptr.h"
26#include "ns3/scheduler.h"
27#include "ns3/simulator.h"
28
29#include <cmath>
30#include <mpi.h>
31
32namespace ns3
33{
34
35NS_LOG_COMPONENT_DEFINE("DistributedSimulatorImpl");
36
37NS_OBJECT_ENSURE_REGISTERED(DistributedSimulatorImpl);
38
42
43Time
48
51{
52 return m_txCount;
53}
54
57{
58 return m_rxCount;
59}
60
63{
64 return m_myId;
65}
66
67bool
69{
70 return m_isFinished;
71}
72
73/**
74 * Initialize m_lookAhead to maximum, it will be constrained by
75 * user supplied time via BoundLookAhead and the
76 * minimum latency network between ranks.
77 */
79
82{
83 static TypeId tid = TypeId("ns3::DistributedSimulatorImpl")
85 .SetGroupName("Mpi")
86 .AddConstructor<DistributedSimulatorImpl>();
87 return tid;
88}
89
111
116
117void
119{
120 NS_LOG_FUNCTION(this);
121
122 while (!m_events->IsEmpty())
123 {
124 Scheduler::Event next = m_events->RemoveNext();
125 next.impl->Unref();
126 }
127 m_events = nullptr;
128 delete[] m_pLBTS;
130}
131
132void
134{
135 NS_LOG_FUNCTION(this);
136
137 while (!m_destroyEvents.empty())
138 {
139 Ptr<EventImpl> ev = m_destroyEvents.front().PeekEventImpl();
140 m_destroyEvents.pop_front();
141 NS_LOG_LOGIC("handle destroy " << ev);
142 if (!ev->IsCancelled())
143 {
144 ev->Invoke();
145 }
146 }
147
149}
150
151void
153{
154 NS_LOG_FUNCTION(this);
155
156 /* If running sequential simulation can ignore lookahead */
157 if (MpiInterface::GetSize() <= 1)
158 {
159 m_lookAhead = Seconds(0);
160 }
161 else
162 {
164 for (auto iter = c.Begin(); iter != c.End(); ++iter)
165 {
166 if ((*iter)->GetSystemId() != MpiInterface::GetSystemId())
167 {
168 continue;
169 }
170
171 for (uint32_t i = 0; i < (*iter)->GetNDevices(); ++i)
172 {
173 Ptr<NetDevice> localNetDevice = (*iter)->GetDevice(i);
174 // only works for p2p links currently
175 if (!localNetDevice->IsPointToPoint())
176 {
177 continue;
178 }
179 Ptr<Channel> channel = localNetDevice->GetChannel();
180 if (!channel)
181 {
182 continue;
183 }
184
185 // grab the adjacent node
186 Ptr<Node> remoteNode;
187 if (channel->GetDevice(0) == localNetDevice)
188 {
189 remoteNode = (channel->GetDevice(1))->GetNode();
190 }
191 else
192 {
193 remoteNode = (channel->GetDevice(0))->GetNode();
194 }
195
196 // if it's not remote, don't consider it
197 if (remoteNode->GetSystemId() == MpiInterface::GetSystemId())
198 {
199 continue;
200 }
201
202 // compare delay on the channel with current value of
203 // m_lookAhead. if delay on channel is smaller, make
204 // it the new lookAhead.
205 TimeValue delay;
206 channel->GetAttribute("Delay", delay);
207
208 if (delay.Get() < m_lookAhead)
209 {
210 m_lookAhead = delay.Get();
211 }
212 }
213 }
214 }
215
216 // m_lookAhead is now set
218
219 /*
220 * Compute the maximum inter-task latency and use that value
221 * for tasks with no inter-task links.
222 *
223 * Special processing for edge cases. For tasks that have no
224 * nodes need to determine a reasonable lookAhead value. Infinity
225 * would work correctly but introduces a performance issue; tasks
226 * with an infinite lookAhead would execute all their events
227 * before doing an AllGather resulting in very bad load balance
228 * during the first time window. Since all tasks participate in
229 * the AllGather it is desirable to have all the tasks advance in
230 * simulation time at a similar rate assuming roughly equal events
231 * per unit of simulation time in order to equalize the amount of
232 * work per time window.
233 */
234 long sendbuf;
235 long recvbuf;
236
237 /* Tasks with no inter-task links do not contribute to max */
239 {
240 sendbuf = 0;
241 }
242 else
243 {
244 sendbuf = m_lookAhead.GetInteger();
245 }
246
247 MPI_Allreduce(&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator());
248
249 /* For nodes that did not compute a lookahead use max from ranks
250 * that did compute a value. An edge case occurs if all nodes have
251 * no inter-task links (max will be 0 in this case). Use infinity so all tasks
252 * will proceed without synchronization until a single AllGather
253 * occurs when all tasks have finished.
254 */
255 if (m_lookAhead == GetMaximumSimulationTime() && recvbuf != 0)
256 {
257 m_lookAhead = Time(recvbuf);
259 }
260}
261
262void
264{
265 if (lookAhead > Time(0))
266 {
267 NS_LOG_FUNCTION(this << lookAhead);
268 m_lookAhead = Min(m_lookAhead, lookAhead);
269 }
270 else
271 {
272 NS_LOG_WARN("attempted to set lookahead to a negative time: " << lookAhead);
273 }
274}
275
276void
278{
279 NS_LOG_FUNCTION(this << schedulerFactory);
280
281 Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler>();
282
283 if (m_events)
284 {
285 while (!m_events->IsEmpty())
286 {
287 Scheduler::Event next = m_events->RemoveNext();
288 scheduler->Insert(next);
289 }
290 }
291 m_events = scheduler;
292}
293
294void
296{
297 NS_LOG_FUNCTION(this);
298
299 Scheduler::Event next = m_events->RemoveNext();
300
301 PreEventHook(EventId(next.impl, next.key.m_ts, next.key.m_context, next.key.m_uid));
302
303 NS_ASSERT(next.key.m_ts >= m_currentTs);
305 m_eventCount++;
306
307 NS_LOG_LOGIC("handle " << next.key.m_ts);
308 m_currentTs = next.key.m_ts;
310 m_currentUid = next.key.m_uid;
311 next.impl->Invoke();
312 next.impl->Unref();
313}
314
315bool
320
321bool
323{
324 return m_events->IsEmpty() || m_stop;
325}
326
327uint64_t
329{
330 // If local MPI task is has no more events or stop was called
331 // next event time is infinity.
332 if (IsLocalFinished())
333 {
335 }
336 else
337 {
338 Scheduler::Event ev = m_events->PeekNext();
339 return ev.key.m_ts;
340 }
341}
342
343Time
345{
346 return TimeStep(NextTs());
347}
348
349void
351{
352 NS_LOG_FUNCTION(this);
353
355 m_stop = false;
356 m_globalFinished = false;
357 while (!m_globalFinished)
358 {
359 Time nextTime = Next();
360
361 // If local event is beyond grantedTime then need to synchronize
362 // with other tasks to determine new time window. If local task
363 // is finished then continue to participate in allgather
364 // synchronizations with other tasks until all tasks have
365 // completed.
366 if (nextTime > m_grantedTime || IsLocalFinished())
367 {
368 // Can't process next event, calculate a new LBTS
369 // First receive any pending messages
371 // reset next time
372 nextTime = Next();
373 // And check for send completes
375 // Finally calculate the lbts
378 m_myId,
380 nextTime);
381 m_pLBTS[m_myId] = lMsg;
382 MPI_Allgather(&lMsg,
383 sizeof(LbtsMessage),
384 MPI_BYTE,
385 m_pLBTS,
386 sizeof(LbtsMessage),
387 MPI_BYTE,
389 Time smallestTime = m_pLBTS[0].GetSmallestTime();
390 // The totRx and totTx counts insure there are no transient
391 // messages; If totRx != totTx, there are transients,
392 // so we don't update the granted time.
393 uint32_t totRx = m_pLBTS[0].GetRxCount();
394 uint32_t totTx = m_pLBTS[0].GetTxCount();
396
397 for (uint32_t i = 1; i < m_systemCount; ++i)
398 {
399 if (m_pLBTS[i].GetSmallestTime() < smallestTime)
400 {
401 smallestTime = m_pLBTS[i].GetSmallestTime();
402 }
403 totRx += m_pLBTS[i].GetRxCount();
404 totTx += m_pLBTS[i].GetTxCount();
406 }
407
408 // Global halting condition is all nodes have empty queue's and
409 // no messages are in-flight.
410 m_globalFinished &= totRx == totTx;
411
412 if (totRx == totTx)
413 {
414 // If lookahead is infinite then granted time should be as well.
415 // Covers the edge case if all the tasks have no inter tasks
416 // links, prevents overflow of granted time.
418 {
420 }
421 else
422 {
423 // Overflow is possible here if near end of representable time.
424 m_grantedTime = smallestTime + m_lookAhead;
425 }
426 }
427 }
428
429 // Execute next event if it is within the current time window.
430 // Local task may be completed.
431 if ((nextTime <= m_grantedTime) && (!IsLocalFinished()))
432 { // Safe to process
434 }
435 }
436
437 // If the simulator stopped naturally by lack of events, make a
438 // consistency test to check that we didn't lose any events along the way.
439 NS_ASSERT(!m_events->IsEmpty() || m_unscheduledEvents == 0);
440}
441
444{
445 return m_myId;
446}
447
448void
450{
451 NS_LOG_FUNCTION(this);
452
453 m_stop = true;
454}
455
458{
459 NS_LOG_FUNCTION(this << delay.GetTimeStep());
460
461 return Simulator::Schedule(delay, &Simulator::Stop);
462}
463
464//
465// Schedule an event for a _relative_ time in the future.
466//
469{
470 NS_LOG_FUNCTION(this << delay.GetTimeStep() << event);
471
472 Time tAbsolute = delay + TimeStep(m_currentTs);
473
474 NS_ASSERT(tAbsolute.IsPositive());
475 NS_ASSERT(tAbsolute >= TimeStep(m_currentTs));
477 ev.impl = event;
478 ev.key.m_ts = static_cast<uint64_t>(tAbsolute.GetTimeStep());
479 ev.key.m_context = GetContext();
480 ev.key.m_uid = m_uid;
481 m_uid++;
483 m_events->Insert(ev);
484 return EventId(event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
485}
486
487void
489{
490 NS_LOG_FUNCTION(this << context << delay.GetTimeStep() << m_currentTs << event);
491
493 ev.impl = event;
494 ev.key.m_ts = m_currentTs + delay.GetTimeStep();
495 ev.key.m_context = context;
496 ev.key.m_uid = m_uid;
497 m_uid++;
499 m_events->Insert(ev);
500}
501
504{
505 NS_LOG_FUNCTION(this << event);
506 return Schedule(Time(0), event);
507}
508
511{
512 NS_LOG_FUNCTION(this << event);
513
514 EventId id(Ptr<EventImpl>(event, false), m_currentTs, 0xffffffff, 2);
515 m_destroyEvents.push_back(id);
516 m_uid++;
517 return id;
518}
519
520Time
522{
523 return TimeStep(m_currentTs);
524}
525
526Time
528{
529 if (IsExpired(id))
530 {
531 return TimeStep(0);
532 }
533 else
534 {
535 return TimeStep(id.GetTs() - m_currentTs);
536 }
537}
538
539void
541{
542 if (id.GetUid() == EventId::UID::DESTROY)
543 {
544 // destroy events.
545 for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
546 {
547 if (*i == id)
548 {
549 m_destroyEvents.erase(i);
550 break;
551 }
552 }
553 return;
554 }
555 if (IsExpired(id))
556 {
557 return;
558 }
559 Scheduler::Event event;
560 event.impl = id.PeekEventImpl();
561 event.key.m_ts = id.GetTs();
562 event.key.m_context = id.GetContext();
563 event.key.m_uid = id.GetUid();
564 m_events->Remove(event);
565 event.impl->Cancel();
566 // whenever we remove an event from the event list, we have to unref it.
567 event.impl->Unref();
568
570}
571
572void
574{
575 if (!IsExpired(id))
576 {
577 id.PeekEventImpl()->Cancel();
578 }
579}
580
581bool
583{
584 if (id.GetUid() == EventId::UID::DESTROY)
585 {
586 if (id.PeekEventImpl() == nullptr || id.PeekEventImpl()->IsCancelled())
587 {
588 return true;
589 }
590 // destroy events.
591 for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
592 {
593 if (*i == id)
594 {
595 return false;
596 }
597 }
598 return true;
599 }
600 return id.PeekEventImpl() == nullptr || id.GetTs() < m_currentTs ||
601 (id.GetTs() == m_currentTs && id.GetUid() <= m_currentUid) ||
602 id.PeekEventImpl()->IsCancelled();
603}
604
605Time
607{
608 /// \todo I am fairly certain other compilers use other non-standard
609 /// post-fixes to indicate 64 bit constants.
610 return TimeStep(0x7fffffffffffffffLL);
611}
612
618
619uint64_t
624
625} // namespace ns3
#define Min(a, b)
Distributed simulator implementation using lookahead.
EventId Schedule(const Time &delay, EventImpl *event) override
Schedule a future event execution (in the same context).
uint64_t GetEventCount() const override
Get the number of events executed.
void Remove(const EventId &id) override
Remove an event from the event list.
static TypeId GetTypeId()
Register this type.
DestroyEvents m_destroyEvents
The container of events to run at Destroy()
void ScheduleWithContext(uint32_t context, const Time &delay, EventImpl *event) override
Schedule a future event execution (in a different context).
EventId ScheduleNow(EventImpl *event) override
Schedule an event to run at the current virtual time.
uint64_t NextTs() const
Get the timestep of the next event.
EventId ScheduleDestroy(EventImpl *event) override
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
Time m_grantedTime
End of current window.
uint32_t m_currentContext
Execution context of the current event.
Time GetMaximumSimulationTime() const override
Get the maximum representable simulation time.
LbtsMessage * m_pLBTS
Container for Lbts messages, one per rank.
uint64_t m_currentTs
Timestamp of the current event.
uint32_t GetSystemId() const override
Get the system id of this simulator.
void SetScheduler(ObjectFactory schedulerFactory) override
Set the Scheduler to be used to manage the event list.
bool IsFinished() const override
Check if the simulation should finish.
Ptr< Scheduler > m_events
The event priority queue.
Time Next() const
Get the time of the next event, as returned by NextTs().
void CalculateLookAhead()
Calculate lookahead constraint based on network latency.
uint32_t GetContext() const override
Get the current simulation context.
bool m_globalFinished
Are all parallel instances completed.
uint32_t m_uid
Next event unique id.
void Run() override
Run the simulation.
void ProcessOneEvent()
Process the next event.
void Cancel(const EventId &id) override
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
void Stop() override
Tell the Simulator the calling event should be the last one executed.
int m_unscheduledEvents
Number of events that have been inserted but not yet scheduled, not counting the "destroy" events; th...
void Destroy() override
Execute the events scheduled with ScheduleDestroy().
Time GetDelayLeft(const EventId &id) const override
Get the remaining time until this event will execute.
uint32_t m_systemCount
MPI communicator size.
virtual void BoundLookAhead(const Time lookAhead)
Add additional bound to lookahead constraints.
bool IsExpired(const EventId &id) const override
Check if an event has already run or been cancelled.
Time Now() const override
Return the current simulation virtual time.
bool IsLocalFinished() const
Check if this rank is finished.
bool m_stop
Flag calling for the end of the simulation.
static Time m_lookAhead
Current window size.
uint32_t m_currentUid
Unique id of the current event.
void DoDispose() override
Destructor implementation.
An identifier for simulation events.
Definition event-id.h:45
@ INVALID
Invalid UID value.
Definition event-id.h:51
@ VALID
Schedule(), etc.
Definition event-id.h:59
@ DESTROY
ScheduleDestroy() events.
Definition event-id.h:55
A simulation event.
Definition event-impl.h:35
void Invoke()
Called by the simulation engine to notify the event that it is time to execute.
Definition event-impl.cc:36
void Cancel()
Marks the event as 'canceled'.
Definition event-impl.cc:46
static void ReceiveMessages()
Check for received messages complete.
static void TestSendComplete()
Check for completed sends.
Structure used for all-reduce LBTS computation.
uint32_t m_txCount
Count of transmitted messages.
uint32_t m_rxCount
Count of received messages.
uint32_t m_myId
System Id of the rank sending this LBTS.
Time m_smallestTime
Earliest next event timestamp.
bool m_isFinished
true when this rank has no more events.
static MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
static void Destroy()
Deletes storage used by the parallel environment.
static uint32_t GetSystemId()
Get the id number of this rank.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
keep track of a set of node pointers.
Iterator End() const
Get an iterator which indicates past-the-last Node in the container.
static NodeContainer GetGlobal()
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
Iterator Begin() const
Get an iterator which refers to the first Node in the container.
Instantiate subclasses of ns3::Object.
Ptr< Object > Create() const
Create an Object instance of the configured TypeId.
virtual void DoDispose()
Destructor implementation.
Definition object.cc:433
Smart pointer class similar to boost::intrusive_ptr.
Maintain the event list.
Definition scheduler.h:146
void Unref() const
Decrement the reference count.
static EventId Schedule(const Time &delay, FUNC f, Ts &&... args)
Schedule an event to expire after delay.
Definition simulator.h:560
@ NO_CONTEXT
Flag for events not associated with any particular context.
Definition simulator.h:199
static void Stop()
Tell the Simulator the calling event should be the last one executed.
Definition simulator.cc:175
The SimulatorImpl base class.
virtual void PreEventHook(const EventId &id)
Hook called before processing each event.
Simulation virtual time values and global simulation resolution.
Definition nstime.h:94
bool IsPositive() const
Exactly equivalent to t >= 0.
Definition nstime.h:322
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:444
static Time Max()
Maximum representable Time Not to be confused with Max(Time,Time).
Definition nstime.h:286
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:434
Time Get() const
Definition time.cc:519
a unique identifier for an interface.
Definition type-id.h:48
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition type-id.cc:1001
Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition assert.h:55
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition log.h:191
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition log.h:271
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition log.h:250
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition object-base.h:35
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition nstime.h:1308
Declaration of class ns3::MpiInterface.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
Scheduler event.
Definition scheduler.h:173
EventKey key
Key for sorting and ordering Events.
Definition scheduler.h:175
EventImpl * impl
Pointer to the event implementation.
Definition scheduler.h:174
uint32_t m_context
Event context.
Definition scheduler.h:162
uint64_t m_ts
Event time stamp.
Definition scheduler.h:160
uint32_t m_uid
Event unique id.
Definition scheduler.h:161