hmbdc
simplify-high-performance-messaging-programming
AttBufferAdaptor.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 
5 #include <memory.h>
6 
7 namespace hmbdc { namespace tips { namespace reliable {
8 template <typename Buffer, typename TransportMessageHeader, typename AttachmentAllocator>
10  private:
11  Buffer& buffer_;
12  std::unique_ptr<uint8_t[]> hasMemoryAttachmentMessageWrapSpace_;
13  app::MessageWrap<app::hasMemoryAttachment>* hasMemoryAttachmentMessageWrap_;
14  app::hasMemoryAttachment* hasMemoryAttachment_ = nullptr;
15  size_t attTrainCount_ = 0;
16  AttachmentAllocator alloc_;
17 
18  void abortAtt() {
19  hasMemoryAttachment_->release();
20  hasMemoryAttachment_ = nullptr;
21  }
22 
23  public:
24  AttBufferAdaptor(Buffer& buffer)
25  : buffer_(buffer)
26  , hasMemoryAttachmentMessageWrapSpace_{new uint8_t[maxItemSize()] {0}}
27  , hasMemoryAttachmentMessageWrap_((app::MessageWrap<app::hasMemoryAttachment>*)
28  hasMemoryAttachmentMessageWrapSpace_.get())
29  {}
30 
31  size_t maxItemSize() const {
32  return buffer_.maxItemSize();
33  }
34  void put(TransportMessageHeader* h) {
35  // HMBDC_LOG_D(h->getSeq(), '-', comment, port, h->messagePayloadLen, ':', h->typeTag(), '=', std::this_thread::get_id());
36  using namespace app;
37  if (hmbdc_unlikely(h->typeTag() == StartMemorySegTrain::typeTag)) {
38  if (hmbdc_likely(!hasMemoryAttachment_)) {
39  hasMemoryAttachment_ = &hasMemoryAttachmentMessageWrap_->get<hasMemoryAttachment>();
40  *hasMemoryAttachment_ =
41  h->template wrapped<StartMemorySegTrain>().att;
42  attTrainCount_ = h->template wrapped<StartMemorySegTrain>().segCount;
43  alloc_(h->typeTag(), hasMemoryAttachment_);
44  // hasMemoryAttachment_->attachment = malloc(hasMemoryAttachment_->len);
45  hasMemoryAttachment_->len = 0;
46  // hasMemoryAttachment_->afterConsumedCleanupFunc = app::hasMemoryAttachment::free;
47  }
48  } else if (hmbdc_unlikely(hasMemoryAttachment_
49  && h->typeTag() == MemorySeg::typeTag)) {
50  if (attTrainCount_) {
51  auto seg = &h->template wrapped<MemorySeg>(); //all attachment seg
52  auto segLen = h->messagePayloadLen - (uint16_t)sizeof(MessageHead);
53  memcpy((char*)hasMemoryAttachment_->attachment + hasMemoryAttachment_->len, seg, segLen);
54  hasMemoryAttachment_->len += segLen;
55  attTrainCount_--;
56  } else {
57  abortAtt();
58  }
59  } else if (hmbdc_unlikely(hasMemoryAttachment_)) {
60  if (hmbdc_likely(!attTrainCount_)
61  && h->flag == app::hasMemoryAttachment::flag) {
62  auto l = std::min<size_t>(buffer_.maxItemSize(), h->messagePayloadLen);
63  auto hsm = *hasMemoryAttachment_;
64  memcpy(hasMemoryAttachmentMessageWrapSpace_.get(), h->payload(), l);
65  *hasMemoryAttachment_ = hsm;
66  buffer_.putAtt(hasMemoryAttachmentMessageWrap_, buffer_.maxItemSize());
67  hasMemoryAttachment_ = nullptr;
68  } else {
69  abortAtt();
70  }
71  } else {
72  buffer_.put(h->payload(), h->messagePayloadLen);
73  }
74  }
75 
76  // template <typename T> void put(T& item) {buffer_.put(&item, sizeof(T));}
77  template <typename T> void putSome(T& item) {
78  buffer_.put(&item, std::min(sizeof(item), maxItemSize()));
79  }
80 };
81 }}}
Definition: AttBufferAdaptor.hpp:9
Definition: Message.hpp:263
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12