Commits

Anonymous committed 75ab412

lang/c/msgpack: C++ binding: implemented built-in buffer.

git-svn-id: file:///Users/frsyuki/project/msgpack-git/svn/x@55 5a5092ae-2292-43ba-b2d5-dcab9c1a2731

Comments (0)

Files changed (9)

 
-CXXFLAGS = -I.. -I. -Wall -g -O4
+CXXFLAGS = -I.. -I. -Wall -g
+#CXXFLAGS = -I.. -I. -Wall -g -O4
 LDFLAGS = -L.
 
 NEED_PREPROCESS = zone.hpp
 	{ (s << '"').write((const char*)ptr, len) << '"'; return this; }  // FIXME escape
 
 
-RAW_OBJECT(raw,
-	raw object_raw::xraw() { return raw(ptr, len); }
-	const_raw object_raw::xraw() const { return const_raw(ptr, len); } )
+RAW_OBJECT(raw_ref,
+	raw object_raw_ref::xraw() { return raw(ptr, len); }
+	const_raw object_raw_ref::xraw() const { return const_raw(ptr, len); } )
 
-RAW_OBJECT(const_raw,
-	const_raw object_const_raw::xraw() const { return const_raw(ptr, len); } )
+RAW_OBJECT(const_raw_ref,
+	const_raw object_const_raw_ref::xraw() const { return const_raw(ptr, len); } )
 
 #undef RAW_OBJECT(NAME, EXTRA)
 
 };
 
 struct const_raw {
-	const_raw() : ptr(NULL), len(0) {}
-	const_raw(const void* p, size_t l) : ptr(p), len(l) {}
+	explicit const_raw() : ptr(NULL), len(0) {}
+	explicit const_raw(const void* p, size_t l) : ptr(p), len(l) {}
+	const_raw(const raw& m) : ptr(m.ptr), len(m.len) {}
 public:
 	const void* ptr;
 	size_t len;
 	uint32_t len;														\
 };
 
-RAW_CLASS(raw, void*, raw xraw(); const_raw xraw() const; )
-RAW_CLASS(const_raw, const void*, const_raw xraw() const; )
+RAW_CLASS(raw_ref, void*, raw xraw(); const_raw xraw() const; )
+RAW_CLASS(const_raw_ref, const void*, const_raw xraw() const; )
 
 #undef RAW_CLASS(NAME, TYPE, EXTRA)
 
 #include <string>
 #include <msgpack/unpack.hpp>
 #include <msgpack/pack.hpp>
+#include <sstream>
+#include <boost/scoped_ptr.hpp>
 
 class checker {
 public:
 		};
 		c.check(d, sizeof(d),
 			z.narray(
-				z.nraw("", 0),
-				z.nraw("a", 1),
-				z.nraw("bc", 2),
-				z.nraw("def", 3)
+				z.nraw_ref("", 0),
+				z.nraw_ref("a", 1),
+				z.nraw_ref("bc", 2),
+				z.nraw_ref("def", 3)
 			)
 		);
 	}
 
+	static const uint16_t TASK_ARRAY = 100;
+	static char tarray[3];
+	static char traw[64];
+
+	{
+		memset(traw, 'a', sizeof(traw));
+		traw[0] = 0xda;
+		uint16_t n = htons(sizeof(traw)-3);
+		traw[1] = ((char*)&n)[0];
+		traw[2] = ((char*)&n)[1];
+
+		msgpack::zone z;
+		std::cout << msgpack::unpack(traw, sizeof(traw), z) << std::endl;;
+	}
+
+	{
+		tarray[0] = 0xdc;
+		uint16_t n = htons(TASK_ARRAY);
+		tarray[1] = ((char*)&n)[0];
+		tarray[2] = ((char*)&n)[1];
+	}
+
+	{
+		// write message
+		ssize_t total_bytes = 0;
+		std::stringstream stream;
+		for(unsigned q=0; q < 10; ++q) {
+			stream.write(tarray, sizeof(tarray));
+			total_bytes += sizeof(tarray);
+			for(uint16_t i=0; i < TASK_ARRAY; ++i) {
+				stream.write(traw, sizeof(traw));
+				total_bytes += sizeof(traw);
+			}
+		}
+
+		stream.seekg(0);
+
+		// reserive message
+		unsigned num_msg = 0;
+
+		static const size_t RESERVE_SIZE = 32;//*1024;
+
+		msgpack::unpacker upk;
+		while(stream.good() && total_bytes > 0) {
+
+			upk.reserve_buffer(RESERVE_SIZE);
+			size_t sz = stream.readsome(
+					(char*)upk.buffer(),
+					upk.buffer_capacity());
+
+			total_bytes -= sz;
+			std::cout << "read " << sz << " bytes to capacity "
+					<< upk.buffer_capacity() << " bytes"
+					<< std::endl;
+
+			upk.buffer_consumed(sz);
+			while( upk.execute() ) {
+				std::cout << "message parsed" << std::endl;
+				boost::scoped_ptr<msgpack::zone> pz(upk.release_zone());
+				msgpack::object o = upk.data();
+				upk.reset();
+				std::cout << o << std::endl;
+				++num_msg;
+			}
+
+		}
+
+		std::cout << "stream finished" << std::endl;
+		std::cout << num_msg << " messages reached" << std::endl;
+	}
+
 	return 0;
 }
 
 namespace msgpack {
 
 struct unpacker::context {
-	context(zone& z)
+	context(zone* z)
 	{
 		msgpack_unpacker_init(&m_ctx);
-		m_ctx.user = &z;
+		m_ctx.user = z;
 	}
 
 	~context() { }
 		m_ctx.user = z;
 	}
 
+	void reset(zone* z)
+	{
+		msgpack_unpacker_init(&m_ctx);
+		m_ctx.user = z;
+	}
+
+	zone* user()
+	{
+		return m_ctx.user;
+	}
+
+	void user(zone* z)
+	{
+		m_ctx.user = z;
+	}
+
 private:
 	msgpack_unpacker m_ctx;
 
 };
 
 
-unpacker::unpacker(zone& z) :
-	m_ctx(new context(z)),
-	m_zone(z),
-	m_finished(false)
+unpacker::unpacker() :
+	m_zone(new zone()),
+	m_ctx(new context(m_zone)),
+	m_buffer(NULL),
+	m_used(0),
+	m_free(0),
+	m_off(0)
 { }
 
 
-unpacker::~unpacker() { delete m_ctx; }
+unpacker::~unpacker()
+{
+	free(m_buffer);
+	delete m_ctx;
+	delete m_zone;
+}
 
 
-size_t unpacker::execute(const void* data, size_t len, size_t off)
+void unpacker::expand_buffer(size_t len)
 {
-	int ret = m_ctx->execute(data, len, &off);
-	if(ret < 0) {
-		throw unpack_error("parse error");
-	} else if(ret > 0) {
-		m_finished = true;
-		return off;
+	if(m_off == 0) {
+		size_t next_size;
+		if(m_free != 0) { next_size = m_free * 2; }
+		else { next_size = MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE; }
+		while(next_size < len + m_used) { next_size *= 2; }
+
+		// FIXME realloc?
+
+		void* tmp = malloc(next_size);
+		if(!tmp) { throw std::bad_alloc(); }
+		memcpy(tmp, m_buffer, m_used);
+
+		free(m_buffer);
+		m_buffer = tmp;
+		m_free = next_size - m_used;
+
 	} else {
-		m_finished = false;
-		return off;
+		size_t next_size = MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE;
+		while(next_size < len + m_used - m_off) { next_size *= 2; }
+
+		void* tmp = malloc(next_size);
+		if(!tmp) { throw std::bad_alloc(); }
+		memcpy(tmp, ((char*)m_buffer)+m_off, m_used-m_off);
+
+		try {
+			m_zone->push_finalizer<void>(&zone::finalize_free, NULL, m_buffer);
+		} catch (...) {
+			free(tmp);
+			throw;
+		}
+
+		m_buffer = tmp;
+		m_used = m_used - m_off;
+		m_free = next_size - m_used;
+		m_off = 0;
 	}
 }
 
+bool unpacker::execute()
+{
+	int ret = m_ctx->execute(m_buffer, m_used, &m_off);
+	if(ret < 0) {
+		throw unpack_error("parse error");
+	} else if(ret == 0) {
+		return false;
+	} else {
+		return true;
+	}
+}
+
+zone* unpacker::release_zone()
+{
+	zone* z = m_zone;
+	m_zone = NULL;
+	m_zone = new zone();
+	m_ctx->user(m_zone);
+	return z;
+}
 
 object unpacker::data()
 {
 	return object(m_ctx->data());
 }
 
-
 void unpacker::reset()
 {
+	if(!m_zone->empty()) {
+		delete m_zone;
+		m_zone = NULL;
+		m_zone = new zone();
+	}
+	expand_buffer(0);
 	m_ctx->reset();
 }
 
 
 object unpacker::unpack(const void* data, size_t len, zone& z)
 {
-	context ctx(z);
+	context ctx(&z);
 	size_t off = 0;
 	int ret = ctx.execute(data, len, &off);
 	if(ret < 0) {
 #include "msgpack/zone.hpp"
 #include <stdexcept>
 
+#ifndef MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE
+#define MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE 8*1024
+#endif
+
 namespace msgpack {
 
 
 
 class unpacker {
 public:
-	unpacker(zone& z);
+	unpacker();
 	~unpacker();
+
 public:
-	size_t execute(const void* data, size_t len, size_t off);
-	bool is_finished() { return m_finished; }
+	void reserve_buffer(size_t len);
+	void* buffer();
+	size_t buffer_capacity() const;
+	void buffer_consumed(size_t len);
+	bool execute();
+	zone* release_zone();  // never throw
 	object data();
 	void reset();
+
 private:
+	zone* m_zone;
+
 	struct context;
 	context* m_ctx;
-	zone& m_zone;
-	bool m_finished;
+
+	void* m_buffer;
+	size_t m_used;
+	size_t m_free;
+	size_t m_off;
+	void expand_buffer(size_t len);
+
 private:
-	unpacker();
 	unpacker(const unpacker&);
+
 public:
 	static object unpack(const void* data, size_t len, zone& z);
 };
 
 
+inline void unpacker::reserve_buffer(size_t len)
+{
+	if(m_free >= len) { return; }
+	expand_buffer(len);
+}
+
+inline void* unpacker::buffer()
+	{ return (void*)(((char*)m_buffer)+m_used); }
+
+inline size_t unpacker::buffer_capacity() const
+	{ return m_free; }
+
+inline void unpacker::buffer_consumed(size_t len)
+{
+	m_used += len;
+	m_free -= len;
+}
+
+
 inline object unpack(const void* data, size_t len, zone& z)
 {
 	return unpacker::unpack(data, len, z);

cpp/unpack_inline.cpp

 static inline void msgpack_unpack_map_item(zone** z, object_class* c, object_class* k, object_class* v)
 { reinterpret_cast<object_map*>(c)->store(k, v); }
 
-static inline object_class* msgpack_unpack_string(zone** z, const void* b, size_t l)
-{ return (*z)->nraw(b, l); }
-
-static inline object_class* msgpack_unpack_raw(zone** z, const void* b, size_t l)
-{ return (*z)->nraw(b, l); }
+static inline object_class* msgpack_unpack_raw(zone** z, const void* b, const void* p, size_t l)
+{ return (*z)->nraw_ref(p, l); }
 
 
 }  // extern "C"
 void* zone::alloc()
 {
 	if(m_used >= m_pool.size()*MSGPACK_ZONE_CHUNK_SIZE) {
-		m_pool.push_back(chunk_t());
+		m_pool.push_back(new chunk_t());
 	}
-	void* data = m_pool[m_used/MSGPACK_ZONE_CHUNK_SIZE].cells[m_used%MSGPACK_ZONE_CHUNK_SIZE].data;
+	void* data = m_pool[m_used/MSGPACK_ZONE_CHUNK_SIZE]->cells[m_used%MSGPACK_ZONE_CHUNK_SIZE].data;
 	++m_used;
 	return data;
 }
 
 void zone::clear()
 {
-	for(size_t b=0; b < m_used/MSGPACK_ZONE_CHUNK_SIZE; ++b) {
-		cell_t* c(m_pool[b].cells);
-		for(size_t e=0; e < MSGPACK_ZONE_CHUNK_SIZE; ++e) {
+	if(!m_pool.empty()) {
+		for(size_t b=0; b < m_used/MSGPACK_ZONE_CHUNK_SIZE; ++b) {
+			cell_t* c(m_pool[b]->cells);
+			for(size_t e=0; e < MSGPACK_ZONE_CHUNK_SIZE; ++e) {
+				reinterpret_cast<object_class*>(c[e].data)->~object_class();
+			}
+		}
+		cell_t* c(m_pool.back()->cells);
+		for(size_t e=0; e < m_used%MSGPACK_ZONE_CHUNK_SIZE; ++e) {
 			reinterpret_cast<object_class*>(c[e].data)->~object_class();
 		}
-	}
-	cell_t* c(m_pool.back().cells);
-	for(size_t e=0; e < m_used%MSGPACK_ZONE_CHUNK_SIZE; ++e) {
-		reinterpret_cast<object_class*>(c[e].data)->~object_class();
+
+		for(pool_t::iterator it(m_pool.begin()), it_end(m_pool.end());
+				it != it_end;
+				++it) {
+			delete *it;
+		}
+		m_pool.clear();
 	}
 	m_used = 0;
-	m_pool.resize(1);
+
 	for(user_finalizer_t::reverse_iterator it(m_user_finalizer.rbegin()), it_end(m_user_finalizer.rend());
 			it != it_end;
 			++it) {
 #ifndef MSGPACK_ZONE_HPP__
 #define MSGPACK_ZONE_HPP__
+#include <iostream>
 
 #include "msgpack/object.hpp"
-#include <iostream>
+#include <string.h>
+#include <stdlib.h>
+#include <stdexcept>
 
 #ifndef MSGPACK_ZONE_CHUNK_SIZE
 #define MSGPACK_ZONE_CHUNK_SIZE 64
 
 class zone {
 public:
-zone() : m_used(0), m_pool(1) { }
-~zone() { clear(); }
+	zone() : m_used(0) { }
+	~zone() { clear(); }
 
 public:
 	template <typename T>
 	 object_float*  nfloat(   float v) { return new (alloc())  object_float(v); }
 	object_double* ndouble(  double v) { return new (alloc()) object_double(v); }
 
-	object_raw* nraw(void* ptr, uint32_t len)
-		{ return new (alloc()) object_raw(ptr, len); }
+	object_raw_ref* nraw_ref(void* ptr, uint32_t len)
+		{ return new (alloc()) object_raw_ref(ptr, len); }
 
-	object_const_raw* nraw(const void* ptr, uint32_t len)
-		{ return new (alloc()) object_const_raw(ptr, len); }
+	object_const_raw_ref* nraw_ref(const void* ptr, uint32_t len)
+		{ return new (alloc()) object_const_raw_ref(ptr, len); }
+
+	object_raw_ref* nraw_copy(const void* ptr, uint32_t len)
+		{
+			void* copy = malloc(len);
+			if(!copy) { throw std::bad_alloc(); }
+			object_raw_ref* o;
+			try {
+				o = new (alloc()) object_raw_ref(copy, len);
+				push_finalizer<void>(&zone::finalize_free, NULL, copy);
+			} catch (...) {
+				free(copy);
+				throw;
+			}
+			memcpy(copy, ptr, len);
+			return o;
+		}
 
 	object_array* narray()
 		{ return new (alloc()) object_array(); }
 
 public:
 	void clear();
+	bool empty() const;
 
 private:
 	void* alloc();
 	size_t m_used;
 
 	static const size_t MAX_OBJECT_SIZE =
-		sizeof(object_raw) > sizeof(object_array)
-			? ( sizeof(object_raw) > sizeof(object_map)
-					? sizeof(object_raw)
+		sizeof(object_raw_ref) > sizeof(object_array)
+			? ( sizeof(object_raw_ref) > sizeof(object_map)
+					? sizeof(object_raw_ref)
 					: sizeof(object_map)
 			  )
 			: ( sizeof(object_array) > sizeof(object_map)
 		cell_t cells[MSGPACK_ZONE_CHUNK_SIZE];
 	};
 
-	typedef std::vector<chunk_t> pool_t;
+	typedef std::vector<chunk_t*> pool_t;
 	pool_t m_pool;
 
 
 	user_finalizer_t m_user_finalizer;
 
 private:
+	void resize_pool(size_t n);
+
+public:
+	static void finalize_free(void* obj, void* user)
+		{ free(user); }
+
+private:
 	zone(const zone&);
 };
 
 
+template <typename T>
+inline void zone::push_finalizer(void (*func)(void* obj, void* user), T* obj, void* user)
+{
+	m_user_finalizer.push_back( finalizer(
+			func, reinterpret_cast<void*>(obj),
+			user) );
+}
+
+inline bool zone::empty() const
+{
+	return m_used == 0 && m_user_finalizer.empty();
+}
+
+
 }  // namespace msgpack
 
 #endif /* msgpack/zone.hpp */