1 #include "hmbdc/Copyright.hpp"
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/time/Rater.hpp"
14 #include "hmbdc/numeric/BitMath.hpp"
18 #include <type_traits>
22 #include <netinet/ether.h>
23 #include <linux/if_packet.h>
24 #include <sys/sysctl.h>
29 namespace hmbdc {
namespace app {
namespace netmap {
34 namespace sendtransportengine_detail {
36 using namespace hmbdc::time;
37 using namespace hmbdc::comm::eth;
45 :
Client<SendTransportEngine> {
46 using ptr = std::shared_ptr<SendTransportEngine>;
50 outBufferSizePower2();
64 if (runLock_.try_lock()) {
76 bool droppedCb()
override;
79 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
80 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
81 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
82 if (nm_ring_empty(txring))
87 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
88 HMBDC_THROW(std::runtime_error,
"IO error");
92 void stoppedCb(std::exception
const& e)
override;
94 char const* hmbdcName()
const {
95 return this->hmbdcName_.c_str();
98 std::tuple<char const*, int> schedSpec()
const {
99 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
103 bool match(
Topic const& t)
const {
104 return std::regex_match(t.c_str(), topicRegex_);
107 template <
typename... Messages>
108 void queue(Topic
const& t, Messages&&... msgs) HMBDC_RESTRICT {
109 auto n =
sizeof...(msgs);
110 auto it = buffer_.claim(n);
111 queue(it, t, std::forward<Messages>(msgs)...);
112 buffer_.commit(it, n);
115 template <
typename... Messages>
116 bool tryQueue(Topic
const& t, Messages&&... msgs) HMBDC_RESTRICT {
117 auto n =
sizeof...(msgs);
118 auto it = buffer_.tryClaim(n);
120 queue(it, t, std::forward<Messages>(msgs)...);
121 buffer_.commit(it, n);
127 template <
typename M,
typename... Messages>
128 void queue(pattern::MonoLockFreeBuffer::iterator it
129 , Topic
const& t, M&& msg, Messages&&... msgs) {
130 using Message =
typename std::decay<M>::type;
131 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
132 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
133 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
136 TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
137 queue(++it, t, std::forward<M>(msgs)...);
140 template <
typename Message,
typename ... Args>
141 void queueInPlace(Topic
const& t, Args&&... args) HMBDC_RESTRICT {
142 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
143 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
144 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
146 auto s = buffer_.claim();
147 TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
151 void queueBytes(Topic
const& t, uint16_t tag,
void const* bytes,
size_t len);
152 void queue(pattern::MonoLockFreeBuffer::iterator it
155 void sendPackets(
struct netmap_ring *);
157 void getMacAddresses();
159 void initializePacket(
struct pkt *,
int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
162 void updatePacket(
struct pkt *,
size_t,
bool =
true);
164 Config
const config_;
169 std::regex topicRegex_;
170 size_t maxMessageSize_;
173 struct nm_desc *nmd_;
174 pattern::MonoLockFreeBuffer buffer_;
178 ether_addr srcEthAddr_;
179 ether_addr dstEthAddr_;
180 pkt precalculatedPacketHead_;
183 size_t maxSendBatch_;
190 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
class to hold an hmbdc configuration
Definition: Config.hpp:46
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:79
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:11
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:63
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:71
power a netmap port sending functions
Definition: SendTransportEngine.hpp:44
a singleton that holding netmap resources
Definition: NetContext.hpp:38
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57