1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/rnetmap/Transport.hpp" 4 #include "hmbdc/app/Logger.hpp" 5 #include "hmbdc/comm/inet/Misc.hpp" 6 #include "hmbdc/MetaUtils.hpp" 8 #include <boost/bind.hpp> 9 #include <boost/lexical_cast.hpp> 12 #include <type_traits> 14 namespace hmbdc {
namespace tips {
namespace rnetmap {
16 namespace nmrecvtransport_detail {
30 template <
typename OutputBuffer,
typename Ep2SessionDict>
38 , Ep2SessionDict& sessionDict)
40 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
42 , pollWaitTimeMillisec_(0)
44 , cmdBuffer_(cmdBuffer)
46 , subscriptions_(subscriptions)
47 , sessionDict_(sessionDict) {
48 auto nmport = config_.getExt<std::string>(
"netmapPort");
49 busyWait_ = config_.getExt<
bool>(
"busyWait");
50 if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<
int>(
"pollWaitTimeMillisec");
53 bzero(&baseNmd,
sizeof(baseNmd));
54 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
55 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
56 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
57 config_(baseNmd.nr_tx_rings,
"nmTxRings");
58 config_(baseNmd.nr_rx_rings,
"nmRxRings");
60 nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<
int>(
"nmOpenFlags"), NULL);
62 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
66 memset(&req, 0,
sizeof(req));
67 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
68 req.nr_version = NETMAP_API;
69 req.nr_cmd = NETMAP_VNET_HDR_GET;
70 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
72 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
74 virtHeader_ = req.nr_arg1;
79 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
81 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
82 HMBDC_THROW(std::runtime_error,
"IO error");
84 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
85 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
86 if (nm_ring_empty(rxring))
88 rxring->head = rxring->cur = rxring->tail;
96 void runOnce() HMBDC_RESTRICT {
98 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
99 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
100 if (nm_ring_empty(rxring))
115 if (hmbdc_likely(busyWait_)) {
116 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
117 HMBDC_THROW(std::runtime_error,
"IO error");
122 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
123 if (hmbdc_unlikely( res < 0)) {
124 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
131 void recvPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
132 auto cur = ring->cur;
134 while(cur != ring->tail) {
135 struct netmap_slot *slot = &ring->slot[cur];
136 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
137 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(
virt_header));
139 if (p->ipv4.ip.ip_p == IPPROTO_UDP
140 && p->ipv4.ip.ip_off == htons(IP_DF)) {
142 data_ = (uint8_t*)p->ipv4.body;
143 #pragma GCC diagnostic push
144 #
if defined __clang__ || __GNUC_PREREQ(9,0)
145 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 147 auto ip = &p->ipv4.ip;
148 senderEndpoint_ = ip->ip_src.s_addr;
154 if (data_ + header->wireSize() <= buf + slot->len) {
155 if (hmbdc_unlikely(header->typeTag() == TypeTagBackupSource::typeTag)) {
156 auto it = cmdBuffer_.claim();
157 char* b =
static_cast<char*
>(*it);
158 size_t l = header->messagePayloadLen;
159 l = std::min(cmdBuffer_.maxItemSize(), l);
160 memcpy(b, header->payload(), l);
162 bts.sendFrom = senderEndpoint_;
163 cmdBuffer_.commit(it);
165 auto session = sessionDict_.find(senderEndpoint_);
166 if (hmbdc_unlikely(session != sessionDict_.end())) {
167 auto a = session->second->accept(header);
173 data_ += header->wireSize();
180 cur = nm_ring_next(ring, cur);
183 ring->head = ring->cur = cur;
187 bool verifyChecksum(
pkt* HMBDC_RESTRICT packet,
size_t payloadWireSize) {
189 auto tmp = packet->ipv4.ip.ip_sum;
190 packet->ipv4.ip.ip_sum = 0;
192 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
195 packet->ipv4.ip.ip_sum = tmp;
199 auto tmp = packet->ipv4.udp.check;
200 packet->ipv4.udp.check = 0;
202 auto udp = &packet->ipv4.udp;
204 checksum(udp,
sizeof(*udp),
205 checksum(packet->ipv4.body, payloadWireSize,
206 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
207 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
210 #pragma GCC diagnostic pop 211 packet->ipv4.udp.check = tmp;
216 struct nm_desc *nmd_;
221 int pollWaitTimeMillisec_;
226 uint32_t senderEndpoint_;
229 TypeTagSet
const& subscriptions_;
230 Ep2SessionDict& sessionDict_;
234 template <
typename OutputBuffer,
typename Ep2SessionDict>
235 using NmRecvTransport = nmrecvtransport_detail::NmRecvTransport<OutputBuffer, Ep2SessionDict>;
Definition: MonoLockFreeBuffer.hpp:16
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
impl class
Definition: NmRecvTransport.hpp:31
Definition: Transport.hpp:15
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: NmRecvTransport.hpp:114
Definition: Message.hpp:263
Definition: Endpoint.hpp:16