1 #include "hmbdc/Copyright.hpp" 4 #define NETMAP_WITH_LIBS 5 #include <net/netmap_user.h> 6 #undef NETMAP_WITH_LIBS 8 #include "hmbdc/tips/rnetmap/Transport.hpp" 9 #include "hmbdc/tips/rnetmap/Messages.hpp" 10 #include "hmbdc/tips/reliable/BackupSendServerT.hpp" 11 #include "hmbdc/app/Base.hpp" 12 #include "hmbdc/comm/eth/Misc.h" 13 #include "hmbdc/comm/inet/Misc.hpp" 14 #include "hmbdc/time/Time.hpp" 15 #include "hmbdc/time/Rater.hpp" 16 #include "hmbdc/pattern/LockFreeBufferT.hpp" 19 #include <boost/bind.hpp> 24 #include <netinet/ether.h> 25 #include <linux/if_packet.h> 26 #include <sys/sysctl.h> 31 namespace hmbdc {
namespace tips {
namespace rnetmap {
33 namespace nmsendtransport_detail {
43 using ToCleanupAttQueue = reliable::ToCleanupAttQueue;
48 ,
size_t maxMessageSize
51 , ToCleanupAttQueue& toCleanupAttQueue)
57 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
58 , maxMessageSize_(maxMessageSize)
61 , toCleanupAttQueue_(toCleanupAttQueue)
62 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
64 , seqAlertPending_(
false)
66 , startSending_(
false) {
67 if (maxMessageSize_ > mtu_) {
68 HMBDC_THROW(std::out_of_range,
"mtu needs to >= " << maxMessageSize_);
70 if (maxMessageSize_ > TransportMessageHeader::maxPayloadSize()) {
71 HMBDC_THROW(std::out_of_range,
"maxMessageSize_ needs to <=" << TransportMessageHeader::maxPayloadSize());
74 memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>(
"srcEthAddr").c_str())
75 ,
sizeof(srcEthAddr_));
76 memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>(
"dstEthAddr").c_str())
77 ,
sizeof(dstEthAddr_));
81 bzero(&baseNmd,
sizeof(baseNmd));
82 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
83 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
84 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
85 config_(baseNmd.nr_tx_rings,
"nmTxRings");
86 config_(baseNmd.nr_rx_rings,
"nmRxRings");
88 auto nmport = cfg.
getExt<std::string>(
"netmapPort");
89 nmd_ = nm_open(nmport.c_str(), &baseNmd
90 , cfg.
getExt<uint64_t>(
"nmOpenFlags"), NULL);
92 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
94 if (nmd_->first_tx_ring != nmd_->last_tx_ring) {
95 HMBDC_THROW(std::out_of_range
96 ,
"multiple tx rings exist on " << nmport
97 <<
". use more specific netmapPort. for example: netmap::p2p1-2 ");
100 memset(&req, 0,
sizeof(req));
101 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
102 req.nr_version = NETMAP_API;
103 req.nr_cmd = NETMAP_VNET_HDR_GET;
104 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
106 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
108 virtHeader_ = req.nr_arg1;
109 auto srcIpStr = config_.getExt<
string>(
"srcIp");
110 if (srcIpStr ==
"tcpIfaceAddr") {
111 srcIpStr = getLocalIpMatchMask(config_.getExt<
string>(
"tcpIfaceAddr"));
113 initializePacket(&precalculatedPacketHead_
114 , config_.getExt<uint16_t>(
"ttl")
116 , config_.getExt<
string>(
"dstIp")
119 , config_.getExt<uint16_t>(
"srcPort")
120 , config_.getExt<uint16_t>(
"dstPort")
122 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
124 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
125 HMBDC_THROW(std::runtime_error,
"IO error");
127 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, nmd_->first_tx_ring);
128 txring->head = txring->cur = txring->tail;
130 auto addr = seqAlertBuf_;
134 h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
135 seqAlert_ = &(h->wrapped<
SeqAlert>());
143 startSending_ =
true;
146 template <
typename AdvertisingMessages>
147 void setAds(AdvertisingMessages
const& ads) {
148 decltype(adBufs_) newAdBufs;
149 for (
auto const& ad : ads) {
150 newAdBufs.emplace_back();
151 auto addr = newAdBufs.rbegin()->data();
156 h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
158 std::swap(adBufs_, newAdBufs);
161 void setAdPending() {
165 void setSeqAlertPending() {
166 seqAlertPending_ =
true;
169 void runOnce(
size_t sessionCount) HMBDC_RESTRICT {
170 struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, nmd_->first_tx_ring);
171 if (!nm_ring_empty(txring)) {
172 sendPackets(txring, sessionCount);
174 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
175 HMBDC_THROW(std::runtime_error,
"IO error");
180 struct nm_desc *nmd_;
182 ether_addr srcEthAddr_;
183 ether_addr dstEthAddr_;
184 pkt precalculatedPacketHead_;
188 size_t maxMessageSize_;
191 Rater& HMBDC_RESTRICT rater_;
192 ToCleanupAttQueue& HMBDC_RESTRICT toCleanupAttQueue_;
193 size_t maxSendBatch_;
198 bool seqAlertPending_;
202 void sendPackets(
struct netmap_ring * HMBDC_RESTRICT ring,
size_t sessionCount) HMBDC_RESTRICT {
203 uint32_t cur = ring->cur;
204 size_t slotRemaining = 0;
205 bool slotNotInited =
true;
206 auto batch = maxSendBatch_;
207 pkt* currentPktPtr =
nullptr;
208 size_t slotWireSize = 0;
209 struct netmap_slot *slot = &ring->slot[cur];
211 auto limit = maxSendBatch_ * ((ring->tail - ring->cur) % ring->num_slots);
212 buffer_.peek(0, begin, end, limit);
213 if (hmbdc_unlikely(startSending_ && !sessionCount)) {
214 buffer_.wasteAfterPeek(0u, end - begin);
219 bool processBuffer =
false;
222 if (hmbdc_unlikely(adPending_)) {
223 for (
auto& adBuf : adBufs_) {
225 wireSize +=
sizeof(adBuf);
228 }
else if (it != end && startSending_) {
230 wireSize = th->wireSize();
231 processBuffer =
true;
232 }
else if (hmbdc_unlikely(seqAlertPending_)) {
235 wireSize =
sizeof(seqAlertBuf_);
237 seqAlertPending_ =
false;
240 bool raterOk = wireSize && (!processBuffer || rater_.check(wireSize));
242 char *p = NETMAP_BUF(ring, slot->buf_idx);
244 slotRemaining = min((
size_t)ring->nr_buf_size, mtu_);
245 auto headWireSize = (uint16_t)(
246 sizeof(ether_header) +
sizeof(::ip) +
sizeof(udphdr) + virtHeader_
248 memcpy(p, ((
char*)&precalculatedPacketHead_) +
sizeof(
virt_header) - virtHeader_
251 slotRemaining -= headWireSize;
252 slotWireSize = headWireSize;
253 slotNotInited =
false;
255 if (slotRemaining >= wireSize) {
256 if (hmbdc_likely(processBuffer)) {
257 if (hmbdc_unlikely(th->typeTag() == MemorySeg::typeTag)) {
258 auto l = (int)(wireSize - th->wireSizeMemorySeg());
259 memcpy(p + slotWireSize, th->wireBytes(), l);
260 memcpy(p + slotWireSize + l, th->wireBytesMemorySeg(), th->wireSizeMemorySeg());
262 if (hmbdc_unlikely(th->typeTag() == StartMemorySegTrain::typeTag)) {
263 auto& trainHead = th->template wrapped<StartMemorySegTrain>();
264 auto itActual = it; itActual.seq_ += trainHead.segCount + 1;
266 toCleanupAttQueue_.push_back(make_tuple(itActual.seq_
267 , &actual->template wrapped<hasMemoryAttachment>()
268 , trainHead.att.afterConsumedCleanupFunc));
270 memcpy(p + slotWireSize, th, (
int)wireSize);
273 seqAlert_->expectSeq = it.seq_ + 1;
277 memcpy(p + slotWireSize, th, (
int)wireSize);
279 slotRemaining -= wireSize;
280 slotWireSize += wireSize;
285 if (!batch || it == end) {
286 size_t wireSizeExcludingHead = slotWireSize
287 - (
sizeof(ether_header) +
sizeof(::ip) +
sizeof(udphdr) + virtHeader_);
288 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
289 slot->len = slotWireSize;
290 cur = nm_ring_next(ring, cur);
291 slot = &ring->slot[cur];
293 if (cur == ring->tail || !raterOk)
break;
295 batch = maxSendBatch_;
296 slotNotInited =
true;
298 if (it == end)
break;
302 size_t wireSizeExcludingHead = slotWireSize
303 - (
sizeof(ether_header) +
sizeof(::ip) +
sizeof(udphdr) + virtHeader_);
304 updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
305 slot->len = slotWireSize;
306 cur = nm_ring_next(ring, cur);
309 ring->head = ring->cur = cur;
310 buffer_.wasteAfterPeek(0u, it - begin);
313 void getMacAddresses() {
314 auto nmport = config_.getExt<std::string>(
"netmapPort");
316 if (strncmp(nmport.c_str(),
"vale", 4) == 0)
return;
318 if (nmport.find_first_of(
":") == std::string::npos) {
319 HMBDC_THROW(std::runtime_error
320 ,
"wrong netmapPort format " << nmport <<
" (examples: netmap:eth0, netmap:eth0-0)");
322 auto iface = nmport.substr(nmport.find_first_of(
":"));
323 iface = iface.substr(1, iface.find_first_of(
"-^") - 1);
326 struct ifaddrs *ifaphead, *ifap;
327 int l =
sizeof(ifap->ifa_name);
329 if (getifaddrs(&ifaphead) != 0) {
330 HMBDC_THROW(std::runtime_error,
"getifaddrs failed for" << iface);
332 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
333 struct sockaddr_ll *sll =
334 (
struct sockaddr_ll *)ifap->ifa_addr;
337 if (!sll || sll->sll_family != AF_PACKET)
339 if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
341 mac = (uint8_t *)(sll->sll_addr);
343 char srcEthAddrStr[20];
344 sprintf(srcEthAddrStr,
"%02x:%02x:%02x:%02x:%02x:%02x",
345 mac[0], mac[1], mac[2],
346 mac[3], mac[4], mac[5]);
347 memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr),
sizeof(srcEthAddr_));
350 freeifaddrs(ifaphead);
352 HMBDC_THROW(std::runtime_error,
"no local interface named " << iface);
356 void initializePacket(
struct pkt *
pkt,
int ttl, std::string srcIpStr, std::string dstIpStr
357 , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
358 struct ether_header *eh;
362 sscanf(srcIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
363 auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
364 sscanf(dstIpStr.c_str(),
"%d.%d.%d.%d", &a, &b, &c, &d);
365 auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
369 bcopy(&srcEthAddr, eh->ether_shost, 6);
370 bcopy(&dstEthAddr, eh->ether_dhost, 6);
372 eh->ether_type = htons(ETHERTYPE_IP);
374 #pragma GCC diagnostic push 375 #if defined __clang__ || __GNUC_PREREQ(9,0) 376 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 379 udp = &
pkt->ipv4.udp;
380 ip->ip_v = IPVERSION;
381 ip->ip_hl =
sizeof(*ip) >> 2;
383 ip->ip_tos = IPTOS_LOWDELAY;
386 ip->ip_off = htons(IP_DF);
388 ip->ip_p = IPPROTO_UDP;
389 ip->ip_dst.s_addr = htonl(dstIp);
390 ip->ip_src.s_addr = htonl(srcIp);
392 ip->ip_len =
sizeof(*ip) +
sizeof(udphdr);
393 udp->source = htons(srcPort);
394 udp->dest = htons(dstPort);
395 udp->len =
sizeof(udphdr);
398 bzero(&
pkt->vh,
sizeof(
pkt->vh));
402 void updatePacket(
struct pkt *packet,
size_t payloadWireSize,
bool doChecksum =
true) {
403 packet->ipv4.ip.ip_len += payloadWireSize;
404 packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
406 packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0));
409 packet->ipv4.udp.len += payloadWireSize;
410 packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
412 auto udp = &packet->ipv4.udp;
413 packet->ipv4.udp.check = wrapsum(
414 checksum(udp,
sizeof(*udp),
415 checksum(packet->ipv4.body, payloadWireSize,
416 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
417 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
422 #pragma GCC diagnostic pop T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Messages.hpp:185
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: Transport.hpp:15
Definition: BlockingBuffer.hpp:11
Definition: NmSendTransport.hpp:45
Definition: Message.hpp:263
Definition: Endpoint.hpp:16
Definition: LockFreeBufferMisc.hpp:89