1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/tips/netmap/Messages.hpp" 10 #include "hmbdc/tips/netmap/DefaultUserConfig.hpp" 11 #include "hmbdc/app/Base.hpp" 12 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 13 #include "hmbdc/comm/eth/Misc.h" 14 #include "hmbdc/time/Rater.hpp" 15 #include "hmbdc/numeric/BitMath.hpp" 18 #include <type_traits> 22 #include <netinet/ether.h> 23 #include <linux/if_packet.h> 24 #include <sys/sysctl.h> 29 namespace hmbdc {
namespace tips {
namespace netmap {
34 namespace sendtransportengine_detail {
35 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
47 :
Client<SendTransportEngine> {
49 size_t bufferedMessageCount()
const {
50 return buffer_.remainingSize();
53 size_t subscribingPartyDetectedCount(uint16_t tag)
const {
57 template <MessageTupleC Messages,
typename Node>
58 void advertiseFor(
Node const& node, uint16_t mod, uint16_t res) {
76 bool droppedCb()
override;
80 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
81 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
82 if (nm_ring_empty(txring))
87 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
88 HMBDC_THROW(std::runtime_error,
"IO error");
92 void stoppedCb(std::exception
const& e)
override;
94 char const* hmbdcName()
const {
95 return this->hmbdcName_.c_str();
98 std::tuple<char const*, int> schedSpec()
const {
99 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
102 template <
typename Message>
103 void queue(Message&& msg) HMBDC_RESTRICT {
105 auto it = buffer_.claim(n);
106 queue(it, std::forward<Message>(msg));
107 buffer_.commit(it, n);
110 template <
typename Message>
111 bool tryQueue(Message&& msg) HMBDC_RESTRICT {
113 auto it = buffer_.tryClaim(n);
115 queue(it, std::forward<Message>(msg));
116 buffer_.commit(it, n);
122 template <
typename M,
typename... Messages>
123 void queue(pattern::MonoLockFreeBuffer::iterator it, M&& msg, Messages&&... msgs) {
124 using Message =
typename std::decay<M>::type;
125 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
126 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
127 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
130 auto tmh = TransportMessageHeader::copyTo(s, std::forward<M>(msg));
132 if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
133 tmh->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
135 queue(++it, std::forward<Messages>(msgs)...);
138 template <
typename Message,
typename ... Args>
139 void queueInPlace(Args&&... args) HMBDC_RESTRICT {
140 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
141 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
142 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
144 auto s = buffer_.claim();
145 TransportMessageHeader::copyToInPlace<Message>(*s, std::forward<Args>(args)...);
150 void queue(pattern::MonoLockFreeBuffer::iterator it) {}
153 uint16_t outBufferSizePower2();
154 void sendPackets(
struct netmap_ring *);
156 void getMacAddresses();
158 void initializePacket(
struct pkt *,
int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
161 void updatePacket(
struct pkt *,
size_t,
bool =
true);
167 size_t maxMessageSize_;
170 struct nm_desc *nmd_;
171 pattern::MonoLockFreeBuffer buffer_;
175 ether_addr srcEthAddr_;
176 ether_addr dstEthAddr_;
177 pkt precalculatedPacketHead_;
180 size_t maxSendBatch_;
186 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
190 namespace hmbdc {
namespace tips {
namespace netmap {
192 namespace sendtransportengine_detail {
200 SendTransportEngine::
201 outBufferSizePower2() {
202 auto res = config_.getExt<uint16_t>(
"outBufferSizePower2");
206 res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
207 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
212 SendTransportEngine::
213 SendTransportEngine(
Config cfg,
size_t maxMessageSize)
214 : config_((cfg.setAdditionalFallbackConfig(
Config(DefaultUserConfig))
215 , cfg.resetSection(
"tx", false)))
216 , maxMessageSize_(maxMessageSize)
218 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(
MessageHead)
219 , outBufferSizePower2())
223 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
224 , rater_(Duration::seconds(1u)
225 , config_.getExt<
size_t>(
"sendBytesPerSec")
226 , config_.getExt<
size_t>(
"sendBytesBurst")
227 , config_.getExt<
size_t>(
"sendBytesBurst") != 0ul
229 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
230 , mtu_(config_.getExt<
size_t>(
"mtu")) {
232 config_(hmbdcName_,
"hmbdcName")
233 (schedPolicy_,
"schedPolicy")
234 (schedPriority_,
"schedPriority")
237 memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>(
"srcEthAddr").c_str())
238 ,
sizeof(srcEthAddr_));
239 memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>(
"dstEthAddr").c_str())
240 ,
sizeof(dstEthAddr_));
243 struct nmreq baseNmd;
244 bzero(&baseNmd,
sizeof(baseNmd));
245 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
246 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
247 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
248 config_(baseNmd.nr_tx_rings,
"nmTxRings");
249 config_(baseNmd.nr_rx_rings,
"nmRxRings");
251 auto nmport = config_.getExt<std::string>(
"netmapPort");
252 uint32_t flags = config_.getExt<uint32_t>(
"nmOpenFlags");
253 nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
255 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
258 if (nmd_->first_tx_ring != nmd_->last_tx_ring) {
259 HMBDC_LOG_W(
"multiple tx rings exist on ", nmport,
" the recv side could receive out of order messages." 260 " to avoid it, use more specific netmapPort with the ring number. for example: netmap::p2p1-2");
264 memset(&req, 0,
sizeof(req));
265 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
266 req.nr_version = NETMAP_API;
267 req.nr_cmd = NETMAP_VNET_HDR_GET;
268 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
270 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
272 virtHeader_ = req.nr_arg1;
274 initializePacket(&precalculatedPacketHead_
275 , config_.getExt<uint16_t>(
"ttl")
276 , config_.getExt<
string>(
"srcIp")
277 , config_.getExt<
string>(
"dstIp")
280 , config_.getExt<uint16_t>(
"srcPort")
281 , config_.getExt<uint16_t>(
"dstPort")
283 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
285 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
286 HMBDC_THROW(std::runtime_error,
"IO error");
288 for (
int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
289 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
290 txring->head = txring->cur = txring->tail;
296 SendTransportEngine::
302 SendTransportEngine::
303 ~SendTransportEngine() {
320 HMBDC_LOG_C(e.what());
325 SendTransportEngine::
326 sendPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
327 uint32_t cur = ring->cur;
328 if (hmbdc_unlikely(cur == ring->tail))
return;
330 auto limit = maxSendBatch_ * ((ring->tail - cur) % ring->num_slots);
331 if (hmbdc_unlikely(!(buffer_.peek(begin, end, limit)))) {
334 bool slotInited =
false;
336 auto batch = maxSendBatch_;
337 struct netmap_slot *slot = &ring->slot[cur];
338 uint32_t slotLen = 0;
339 char *p = NETMAP_BUF(ring, slot->buf_idx);
341 uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
344 if (hmbdc_likely(rater_.check(th->wireSize()))) {
346 auto wireSize = (uint16_t)(
347 sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_
349 memcpy(p, ((
char*)&precalculatedPacketHead_) +
sizeof(
virt_header) - virtHeader_
354 auto wireSize = th->wireSize();
355 if (slotLen + wireSize <= slotLenMax) {
356 memcpy(p + slotLen, th, (
int)wireSize);
366 size_t wireSizeExcludingHead = slotLen
367 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
368 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
369 cur = nm_ring_next(ring, cur);
371 if (cur == ring->tail)
break;
372 slot = &ring->slot[cur];
373 p = NETMAP_BUF(ring, slot->buf_idx);
375 batch = maxSendBatch_;
384 size_t wireSizeExcludingHead = slotLen
385 - (
sizeof(ether_header) +
sizeof(ip) +
sizeof(udphdr) + virtHeader_);
386 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
387 cur = nm_ring_next(ring, cur);
390 ring->head = ring->cur = cur;
391 buffer_.wasteAfterPeek(begin, it - begin,
true);
396 SendTransportEngine::
398 auto nmport = config_.
getExt<std::string>(
"netmapPort");
400 if (strncmp(nmport.c_str(),
"vale", 4) == 0)
return;
402 if (nmport.find_first_of(
":") == std::string::npos) {
403 HMBDC_THROW(std::runtime_error
404 ,
"wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
406 auto iface = nmport.substr(nmport.find_first_of(
":"));
407 iface = iface.substr(1, iface.find_first_of(
"-^") - 1);
410 struct ifaddrs *ifaphead, *ifap;
411 int l =
sizeof(ifap->ifa_name);
413 if (getifaddrs(&ifaphead) != 0) {
414 HMBDC_THROW(std::runtime_error,
"getifaddrs failed for" << iface);
416 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
417 struct sockaddr_ll *sll =
418 (
struct sockaddr_ll *)ifap->ifa_addr;
421 if (!sll || sll->sll_family != AF_PACKET)
423 if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
425 mac = (uint8_t *)(sll->sll_addr);
427 char srcEthAddrStr[20];
428 sprintf(srcEthAddrStr,
"%02x:%02x:%02x:%02x:%02x:%02x",
429 mac[0], mac[1], mac[2],
430 mac[3], mac[4], mac[5]);
431 memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr),
sizeof(srcEthAddr_));
434 freeifaddrs(ifaphead);
436 HMBDC_THROW(std::runtime_error,
"no local interface named " << iface);
442 SendTransportEngine::
443 initializePacket(
struct pkt *
pkt,
int ttl, std::string srcIpStr, std::string dstIpStr
444 , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
445 struct ether_header *eh;
449 sscanf(srcIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
450 auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
451 sscanf(dstIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
452 auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
456 bcopy(&srcEthAddr, eh->ether_shost, 6);
457 bcopy(&dstEthAddr, eh->ether_dhost, 6);
459 eh->ether_type = htons(ETHERTYPE_IP);
461 #pragma GCC diagnostic push 462 #if defined __clang__ || __GNUC_PREREQ(9,0) 463 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 466 udp = &
pkt->ipv4.udp;
467 ip->ip_v = IPVERSION;
468 ip->ip_hl =
sizeof(*ip) >> 2;
470 ip->ip_tos = IPTOS_LOWDELAY;
473 ip->ip_off = htons(IP_DF);
475 ip->ip_p = IPPROTO_UDP;
476 ip->ip_dst.s_addr = htonl(dstIp);
477 ip->ip_src.s_addr = htonl(srcIp);
479 ip->ip_len =
sizeof(*ip) +
sizeof(udphdr);
480 udp->source = htons(srcPort);
481 udp->dest = htons(dstPort);
482 udp->len =
sizeof(udphdr);
485 bzero(&
pkt->vh,
sizeof(
pkt->vh));
490 SendTransportEngine::
491 updatePacket(
struct pkt *packet,
size_t payloadWireSize,
bool doChecksum) {
492 packet->ipv4.ip.ip_len += payloadWireSize;
493 packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
495 packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0));
498 packet->ipv4.udp.len += payloadWireSize;
499 packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
501 auto udp = &packet->ipv4.udp;
502 packet->ipv4.udp.check = wrapsum(
503 checksum(udp,
sizeof(*udp),
504 checksum(packet->ipv4.body, payloadWireSize,
505 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
506 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
510 #pragma GCC diagnostic pop T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:79
power a netmap port sending functions
Definition: SendTransportEngine.hpp:46
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: SendTransportEngine.hpp:319
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:69
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:311
Definition: Message.hpp:212
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: LockFreeBufferMisc.hpp:89