1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/tips/netmap/Messages.hpp" 10 #include "hmbdc/tips/netmap/DefaultUserConfig.hpp" 11 #include "hmbdc/tips/TypeTagSet.hpp" 12 #include "hmbdc/app/Base.hpp" 13 #include "hmbdc/comm/eth/Misc.h" 14 #include "hmbdc/Compile.hpp" 15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 16 #include "hmbdc/MetaUtils.hpp" 19 #include <boost/lexical_cast.hpp> 21 #include <type_traits> 24 #include <netinet/ether.h> 25 #include <linux/if_packet.h> 26 #include <sys/sysctl.h> 30 namespace hmbdc {
namespace tips {
namespace netmap {
31 template <
typename OutBuffer>
38 , outBuffer_(outBuffer)
39 , maxMessageSize_(outBuffer.maxItemSize())
40 , doChecksum_(config_.getExt<
bool>(
"doChecksum"))
42 , pollWaitTimeMillisec_(0) {
43 config_(hmbdcName_,
"hmbdcName")
44 (schedPolicy_,
"schedPolicy")
45 (schedPriority_,
"schedPriority")
48 busyWait_ = config_.
getExt<
bool>(
"busyWait");
49 if (!busyWait_) pollWaitTimeMillisec_ = config_.
getExt<
int>(
"pollWaitTimeMillisec");
52 bzero(&baseNmd,
sizeof(baseNmd));
53 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
54 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
55 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
56 config_(baseNmd.nr_tx_rings,
"nmTxRings");
57 config_(baseNmd.nr_rx_rings,
"nmRxRings");
58 auto nmport = config_.
getExt<std::string>(
"netmapPort");
59 uint32_t flags = config_.
getExt<uint32_t>(
"nmOpenFlags");
60 nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
62 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
66 memset(&req, 0,
sizeof(req));
67 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
68 req.nr_version = NETMAP_API;
69 req.nr_cmd = NETMAP_VNET_HDR_GET;
70 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
72 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
74 virtHeader_ = req.nr_arg1;
79 sleep(config_.
getExt<
int>(
"nmResetWaitSec"));
81 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
82 HMBDC_THROW(std::runtime_error,
"IO error");
84 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
85 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
86 if (nm_ring_empty(rxring))
88 rxring->head = rxring->cur = rxring->tail;
98 if (nmd_) nm_close(nmd_);
103 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
104 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
105 if (nm_ring_empty(rxring))
113 HMBDC_LOG_C(e.what());
116 char const* hmbdcName()
const {
117 return this->hmbdcName_.c_str();
120 std::tuple<char const*, int> schedSpec()
const {
121 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
124 template <MessageTupleC Messages,
typename CcNode>
125 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
126 subscriptions_.addSubsFor<Messages>(node, mod, res);
129 template <MessageC Message>
131 subscriptions_.add(Message::typeTag);
143 if (hmbdc_likely(busyWait_)) {
144 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
145 HMBDC_THROW(std::runtime_error,
"IO error");
150 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
151 if (hmbdc_unlikely( res < 0)) {
152 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
159 void recvPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
160 using namespace comm::eth;
161 auto cur = ring->cur;
163 while(cur != ring->tail) {
164 struct netmap_slot *slot = &ring->slot[cur];
165 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
166 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(virt_header));
168 if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
169 && p->ipv4.ip.ip_off == htons(IP_DF))) {
170 if (!data_) data_ = (uint8_t*)p->ipv4.body;
173 if (data_ + header->wireSize() <= buf + slot->len) {
174 if (subscriptions_.check(header->typeTag())) {
175 auto l = std::min(maxMessageSize_, (
size_t)header->messagePayloadLen());
176 outBuffer_.put(header->payload(), l);
178 data_ += header->wireSize();
185 cur = nm_ring_next(ring, cur);
188 ring->head = ring->cur = cur;
192 bool verifyChecksum(comm::eth::pkt* HMBDC_RESTRICT packet,
size_t payloadWireSize) {
193 using namespace comm::eth;
195 auto tmp = packet->ipv4.ip.ip_sum;
196 packet->ipv4.ip.ip_sum = 0;
198 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
201 packet->ipv4.ip.ip_sum = tmp;
205 auto tmp = packet->ipv4.udp.check;
206 packet->ipv4.udp.check = 0;
207 #pragma GCC diagnostic push 209 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 211 auto udp = &packet->ipv4.udp;
213 checksum(udp,
sizeof(*udp),
214 checksum(packet->ipv4.body, payloadWireSize,
215 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
216 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
219 packet->ipv4.udp.check = tmp;
221 #pragma GCC diagnostic pop 225 std::string hmbdcName_;
226 std::string schedPolicy_;
230 OutBuffer& HMBDC_RESTRICT outBuffer_;
231 size_t maxMessageSize_;
232 TypeTagSet subscriptions_;
235 struct nm_desc *nmd_;
240 int pollWaitTimeMillisec_;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: RecvTransportEngine.hpp:32
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:112
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:101
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:142