1 #include "hmbdc/Copyright.hpp" 5 #define NETMAP_WITH_LIBS 6 #include <net/netmap_user.h> 7 #undef NETMAP_WITH_LIBS 9 #include "hmbdc/app/netmap/Messages.hpp" 10 #include "hmbdc/app/Base.hpp" 11 #include "hmbdc/comm/Topic.hpp" 12 #include "hmbdc/comm/eth/Misc.h" 13 #include "hmbdc/Compile.hpp" 14 #include "hmbdc/text/StringTrieSet.hpp" 15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 16 #include "hmbdc/MetaUtils.hpp" 19 #include <boost/lexical_cast.hpp> 21 #include <type_traits> 24 #include <netinet/ether.h> 25 #include <linux/if_packet.h> 26 #include <sys/sysctl.h> 30 namespace hmbdc {
namespace app {
namespace netmap {
38 using ptr = std::shared_ptr<RecvTransport>;
69 template <
typename OutBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
72 ,
Client<RecvTransportEngine<OutBuffer, MsgArbitrator>> {
74 using MD = MessageDispacher<RecvTransportEngine, std::tuple<Subscribe, Unsubscribe>>;
89 , MsgArbitrator arb =
NoOpArb())
93 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
94 , outBuffer_(outBuffer)
95 , maxMessageSize_(outBuffer.maxItemSize())
96 , doChecksum_(config_.getExt<bool>(
"doChecksum"))
98 , pollWaitTimeMillisec_(0)
100 config (hmbdcName_,
"hmbdcName")
101 (schedPolicy_,
"schedPolicy")
102 (schedPriority_,
"schedPriority")
105 busyWait_ = config_.
getExt<
bool>(
"busyWait");
106 if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<
int>(
"pollWaitTimeMillisec");
108 struct nmreq baseNmd;
109 bzero(&baseNmd,
sizeof(baseNmd));
110 baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
111 config_(baseNmd.nr_tx_slots,
"nmTxSlots");
112 config_(baseNmd.nr_rx_slots,
"nmRxSlots");
113 config_(baseNmd.nr_tx_rings,
"nmTxRings");
114 config_(baseNmd.nr_rx_rings,
"nmRxRings");
115 auto nmport = config_.getExt<std::string>(
"netmapPort");
116 uint32_t flags = config_.getExt<uint32_t>(
"nmOpenFlags");
117 nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
119 HMBDC_THROW(std::runtime_error,
"cannot open " << nmport);
123 memset(&req, 0,
sizeof(req));
124 bcopy(nmd_->req.nr_name, req.nr_name,
sizeof(req.nr_name));
125 req.nr_version = NETMAP_API;
126 req.nr_cmd = NETMAP_VNET_HDR_GET;
127 int err = ioctl(nmd_->fd, NIOCREGIF, &req);
129 HMBDC_THROW(std::runtime_error,
"Unable to get virtio-net header length");
131 virtHeader_ = req.nr_arg1;
135 pfd_.events = POLLIN;
136 sleep(config_.getExt<
int>(
"nmResetWaitSec"));
138 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
139 HMBDC_THROW(std::runtime_error,
"IO error");
141 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
142 struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
143 if (nm_ring_empty(rxring))
145 rxring->head = rxring->cur = rxring->tail;
149 void listenTo(
Topic const& t)
override {
153 void stopListenTo(
Topic const& t)
override {
167 if (runLock_.try_lock()) {
182 this->runLock_.unlock();
183 if (nmd_) nm_close(nmd_);
189 auto n = buffer_.peek(begin, end);
194 buffer_.wasteAfterPeek(begin, n);
197 for (
int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
198 struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
199 if (nm_ring_empty(rxring))
206 void handleMessageCb(
Subscribe const& t) {
207 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
211 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
215 HMBDC_LOG_C(e.what());
218 char const* hmbdcName()
const {
219 return this->hmbdcName_.c_str();
222 std::tuple<char const*, int> schedSpec()
const {
223 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
227 template <
bool is_raw_arb>
228 typename std::enable_if<is_raw_arb, int>::type applyRawArb(
size_t len) {
229 return arb_(data_, len);
232 template <
bool is_raw_arb>
233 typename std::enable_if<!is_raw_arb, int>::type applyRawArb(
size_t) {
237 template <
bool is_raw_arb>
242 template <
bool is_raw_arb>
254 if (hmbdc_likely(busyWait_)) {
255 if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
256 HMBDC_THROW(std::runtime_error,
"IO error");
261 auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
262 if (hmbdc_unlikely( res < 0)) {
263 HMBDC_THROW(std::runtime_error,
"IO error errno=" << errno);
270 void recvPackets(
struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
271 using namespace comm::eth;
272 auto cur = ring->cur;
274 while(cur != ring->tail) {
275 struct netmap_slot *slot = &ring->slot[cur];
276 auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
277 auto* p =
reinterpret_cast<pkt*
>(buf + virtHeader_ -
sizeof(virt_header));
279 if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
280 && p->ipv4.ip.ip_off == htons(IP_DF))) {
281 if (!data_) data_ = (uint8_t*)p->ipv4.body;
285 using arg0 =
typename traits::template arg<0>::type;
286 bool const is_raw_arb = std::is_same<
void*,
typename std::decay<arg0>::type>::value;
287 auto a = applyRawArb<is_raw_arb>(slot->len);
288 if (hmbdc_unlikely(a == 0)) {
290 ring->head = ring->cur = cur;
292 }
else if (hmbdc_likely(a > 0)) {
295 if (data_ + header->wireSize() <= buf + slot->len) {
296 if (subscriptions_.check(header->topic())) {
297 auto a = applyArb<is_raw_arb>(header);
298 if (hmbdc_likely(a > 0)) {
299 auto l = std::min(maxMessageSize_, (
size_t)header->messagePayloadLen());
300 outBuffer_.put(header->payload(), l);
301 }
else if (hmbdc_unlikely(a == 0)) {
302 ring->head = ring->cur = cur;
306 data_ += header->wireSize();
314 cur = nm_ring_next(ring, cur);
317 ring->head = ring->cur = cur;
321 bool verifyChecksum(
comm::eth::pkt* HMBDC_RESTRICT packet,
size_t payloadWireSize) {
322 using namespace comm::eth;
324 auto tmp = packet->ipv4.ip.ip_sum;
325 packet->ipv4.ip.ip_sum = 0;
327 checksum(&packet->ipv4.ip,
sizeof(packet->ipv4.ip), 0))) {
330 packet->ipv4.ip.ip_sum = tmp;
334 auto tmp = packet->ipv4.udp.check;
335 packet->ipv4.udp.check = 0;
336 #pragma GCC diagnostic push 338 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 340 auto udp = &packet->ipv4.udp;
342 checksum(udp,
sizeof(*udp),
343 checksum(packet->ipv4.body, payloadWireSize,
344 checksum(&packet->ipv4.ip.ip_src, 2 *
sizeof(packet->ipv4.ip.ip_src),
345 IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
348 packet->ipv4.udp.check = tmp;
350 #pragma GCC diagnostic pop 354 std::string hmbdcName_;
355 std::string schedPolicy_;
360 OutBuffer& HMBDC_RESTRICT outBuffer_;
361 size_t maxMessageSize_;
365 struct nm_desc *nmd_;
370 int pollWaitTimeMillisec_;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:214
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:15
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:46
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:177
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:169
Definition: Message.hpp:78
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
RecvTransportEngine(Config const &config, OutBuffer &outBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:88
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:253
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:187
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:166
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:181
Definition: MetaUtils.hpp:9
Definition: LockFreeBufferMisc.hpp:89