1. Bart van Strien
  2. love-experiments

Commits

Bart van Strien  committed 1a59c21

Add Channel:supply(), a blocking version of Channel:push()

  • Participants
  • Parent commits fd3f318
  • Branches minor

Comments (0)

Files changed (4)

File src/modules/thread/Channel.cpp

View file
  • Ignore whitespace
 #include <map>
 #include <string>
 
+namespace
+{
+	union uslong
+	{
+		unsigned long u;
+		long i;
+	};
+
+	// target <= current, but semi-wrapsafe, one wrap, anyway
+	inline bool past(unsigned int target, unsigned int current)
+	{
+		if (target > current)
+			return false;
+		if (target == current)
+			return true;
+
+		uslong t, c;
+		t.u = target;
+		c.u = current;
+
+		return !(t.i < 0 && c.i > 0);
+	}
+}
+
 namespace love
 {
 namespace thread
 }
 
 Channel::Channel()
-	: named(false)
+	: named(false), sent(0), received(0)
 {
 	mutex = newMutex();
 	cond = newConditional();
 }
 
 Channel::Channel(const std::string &name)
-	: named(true), name(name)
+	: named(true), name(name), sent(0), received(0)
 {
 	mutex = newMutex();
 	cond = newConditional();
 		namedChannels.erase(name);
 }
 
-void Channel::push(Variant *var)
+unsigned long Channel::push(Variant *var)
 {
 	if (!var)
-		return;
+		return 0;
 	Lock l(mutex);
 	var->retain();
 	// Keep a reference to ourselves
 	if (named && queue.empty())
 		retain();
 	queue.push(var);
-	cond->signal();
+	cond->broadcast();
+
+	return ++sent;
+}
+
+void Channel::supply(Variant *var)
+{
+	if (!var)
+		return;
+
+	unsigned long id = push(var);
+
+	mutex->lock();
+	while (!past(id, received))
+	{
+		cond->wait(mutex);
+	}
+	mutex->unlock();
 }
 
 Variant *Channel::pop()
 	Variant *var = queue.front();
 	queue.pop();
 
+	received++;
+	cond->broadcast();
+
 	// Release our reference to ourselves
 	// if we're empty and named.
 	if (named && queue.empty())

File src/modules/thread/Channel.h

View file
  • Ignore whitespace
 	std::string name;
 	Channel(const std::string &name);
 
+	unsigned long sent;
+	unsigned long received;
+
 public:
 	Channel();
 	~Channel();
 	static Channel *getChannel(const std::string &name);
 
-	void push(Variant *var);
+	unsigned long push(Variant *var);
+	void supply(Variant *var); // blocking push
 	Variant *pop();
-	Variant *demand();
+	Variant *demand(); // blocking pop
 	Variant *peek();
 	int count();
 	void clear();

File src/modules/thread/wrap_Channel.cpp

View file
  • Ignore whitespace
 		return 0;
 	}
 
+	int w_Channel_supply(lua_State *L)
+	{
+		Channel *c = luax_checkchannel(L, 1);
+		Variant *var = Variant::fromLua(L, 2);
+		c->supply(var);
+		var->release();
+		return 0;
+	}
+
 	int w_Channel_pop(lua_State *L)
 	{
 		Channel *c = luax_checkchannel(L, 1);
 
 	static const luaL_Reg type_functions[] = {
 		{ "push", w_Channel_push },
+		{ "supply", w_Channel_supply },
 		{ "pop", w_Channel_pop },
 		{ "demand", w_Channel_demand },
 		{ "peek", w_Channel_peek },

File src/modules/thread/wrap_Channel.h

View file
  • Ignore whitespace
 {
 	Channel *luax_checkchannel(lua_State *L, int idx);
 	int w_Channel_push(lua_State *L);
+	int w_Channel_supply(lua_State *L);
 	int w_Channel_pop(lua_State *L);
 	int w_Channel_demand(lua_State *L);
 	int w_Channel_peek(lua_State *L);