Commits

Martin Vejnár committed ccfc8a5

Added task::abort_on, fixed loop.

  • Participants
  • Parent commits 59ef78c

Comments (0)

Files changed (7)

File libyb/async/detail/cancel_level_upgrade_task.hpp

+#ifndef LIBYB_ASYNC_DETAIL_CANCEL_LEVEL_UPGRADE_TASK_HPP
+#define LIBYB_ASYNC_DETAIL_CANCEL_LEVEL_UPGRADE_TASK_HPP
+
+#include "../task_base.hpp"
+#include "../task.hpp"
+
+namespace yb {
+namespace detail {
+
+template <typename R>
+class cancel_level_upgrade_task
+	: public task_base<R>
+{
+public:
+	cancel_level_upgrade_task(task<R> && nested, cancel_level from, cancel_level to)
+		: m_nested(std::move(nested)), m_from(from), m_to(to)
+	{
+	}
+
+	void cancel(cancel_level cl) throw()
+	{
+		if (cl >= m_from && cl < m_to)
+			cl = m_to;
+
+		m_nested.cancel(cl);
+	}
+
+	task_result<R> cancel_and_wait() throw()
+	{
+		return m_nested.cancel_and_wait();
+	}
+
+	void prepare_wait(task_wait_preparation_context & ctx)
+	{
+		m_nested.prepare_wait(ctx);
+	}
+
+	task<R> finish_wait(task_wait_finalization_context & ctx) throw()
+	{
+		m_nested.finish_wait(ctx);
+		if (m_nested.has_task())
+			return nulltask;
+		return std::move(m_nested);
+	}
+
+private:
+	task<R> m_nested;
+	cancel_level m_from;
+	cancel_level m_to;
+};
+
+} // namespace detail
+} // namespace yb
+
+#endif // LIBYB_ASYNC_DETAIL_CANCEL_LEVEL_UPGRADE_TASK_HPP

File libyb/async/detail/loop_task.hpp

 	cancel_level m_cancel_level;
 };
 
-template <typename F>
-class loop_task<void, F>
-	: public task_base<void>
-{
-public:
-	loop_task(task<void> && t, F const & f);
-
-	void cancel(cancel_level cl) throw();
-	task_result<void> cancel_and_wait() throw();
-	void prepare_wait(task_wait_preparation_context & ctx);
-	task<void> finish_wait(task_wait_finalization_context & ctx) throw();
-
-private:
-	task<void> m_task;
-	F m_f;
-	cancel_level m_cancel_level;
-};
-
 } // namespace detail
 } // namespace yb
 
 namespace yb {
 namespace detail {
 
+template <typename R, typename F>
+task<R> invoke_loop_body(F & f, task_result<R> && r, cancel_level cl)
+{
+	try
+	{
+		return f(r.get(), cl);
+	}
+	catch (...)
+	{
+		return async::raise<R>();
+	}
+}
+
+template <typename F>
+task<void> invoke_loop_body(F & f, task_result<void> &&, cancel_level cl)
+{
+	try
+	{
+		return f(cl);
+	}
+	catch (...)
+	{
+		return async::raise<void>();
+	}
+}
+
 template <typename S, typename F>
 loop_task<S, F>::loop_task(task<S> && t, F const & f)
 	: m_task(std::move(t)), m_f(f), m_cancel_level(cl_none)
 		task_result<S> r = m_task.cancel_and_wait();
 		if (r.has_exception())
 			return r.exception();
-
-		try
-		{
-			m_task = m_f(r.get(), cl_kill);
-		}
-		catch (...)
-		{
-			return task_result<void>(std::current_exception());
-		}
+		m_task = invoke_loop_body(m_f, std::move(r), cl_kill);
 	}
 
 	return task_result<void>();
 		task_result<S> r = m_task.get_result();
 		if (r.has_exception())
 			return async::raise<void>(r.exception());
-
-		try
-		{
-			m_task = m_f(r.get(), m_cancel_level);
-		}
-		catch (...)
-		{
-			return async::raise<void>();
-		}
-
+		m_task = invoke_loop_body(m_f, std::move(r), m_cancel_level);
 		if (m_task.empty())
 			return async::value();
 	}
 	return nulltask;
 }
 
-template <typename F>
-loop_task<void, F>::loop_task(task<void> && t, F const & f)
-	: m_task(std::move(t)), m_f(f), m_cancel_level(cl_none)
-{
-}
-
-template <typename F>
-void loop_task<void, F>::cancel(cancel_level cl) throw()
-{
-	m_cancel_level = (std::max)(m_cancel_level, cl);
-	m_task.cancel(cl);
-}
+} // namespace detail
 
-template <typename F>
-task_result<void> loop_task<void, F>::cancel_and_wait() throw()
+template <typename S, typename F>
+task<void> loop(task<S> && t, F f)
 {
-	while (!m_task.empty())
+	while (t.has_result())
 	{
-		task_result<void> r = m_task.cancel_and_wait();
+		task_result<S> r = t.get_result();
 		if (r.has_exception())
-			return r;
-
-		try
-		{
-			m_task = m_f(cl_kill);
-		}
-		catch (...)
-		{
-			return task_result<void>(std::current_exception());
-		}
+			return async::raise<void>(r.exception());
+		t = detail::invoke_loop_body(f, std::move(r), cl_none);
+		if (t.empty())
+			return async::value();
 	}
 
-	return task_result<void>();
-}
-
-template <typename F>
-void loop_task<void, F>::prepare_wait(task_wait_preparation_context & ctx)
-{
-	assert(!m_task.empty());
-
-	if (m_task.has_result())
-		ctx.set_finished();
-	else
-		m_task.prepare_wait(ctx);
-}
-
-template <typename F>
-task<void> loop_task<void, F>::finish_wait(task_wait_finalization_context & ctx) throw()
-{
-	assert(!m_task.empty());
-
-	if (!m_task.has_result())
-		m_task.finish_wait(ctx);
-
-	while (m_task.has_result())
+	try
 	{
-		task_result<void> r = m_task.get_result();
-		if (r.has_exception())
-			return async::result(std::move(r));
-
-		try
-		{
-			m_task = m_f(m_cancel_level);
-		}
-		catch (...)
+		return task<void>(new detail::loop_task<S, F>(std::move(t), f));
+	}
+	catch (...)
+	{
+		while (!t.empty())
 		{
-			return async::raise<void>();
+			task_result<S> r = t.cancel_and_wait();
+			if (r.has_exception())
+				return async::raise<void>(r.exception());
+			t = detail::invoke_loop_body(f, std::move(r), cl_none);
 		}
 
-		if (m_task.empty())
-			return async::value();
+		return async::raise<void>();
 	}
+}
 
-	return nulltask;
+template <typename F>
+task<void> loop(F && f)
+{
+	return loop(async::value(), std::move(f));
 }
 
-} // namespace detail
 } // namespace yb
 
 #endif // LIBYB_ASYNC_DETAIL_LOOP_TASK_HPP

File libyb/async/detail/task_fwd.hpp

 
 #include "../task_result.hpp"
 #include "../cancel_level.hpp"
+#include "../../utils/noncopyable.hpp"
 #include <memory> // unique_ptr
 #include <exception> // exception_ptr, exception
 
 
 template <typename R>
 class task
+	: noncopyable
 {
 public:
 	typedef R result_type;
 	template <typename F>
 	task<R> follow_with(F f);
 
+	task<R> abort_on(cancel_level cl, cancel_level abort_cl = cl_abort);
+
 private:
 	typedef task_base<R> * task_base_ptr;
 
 			std::alignment_of<task_result<R>>::value
 			>::value
 		>::type m_storage;
-
-	task(task const &);
-	task & operator=(task const &);
 };
 
 task<void> operator|(task<void> && lhs, task<void> && rhs);
 typename detail::task_protect_type<F>::type protect(F f);
 
 template <typename F>
-task<void> loop(F f);
+task<void> loop(F && f);
 
 template <typename S, typename F>
 task<void> loop(task<S> && t, F f);

File libyb/async/detail/task_impl.hpp

 #include "value_task.hpp"
 #include "sequential_composition_task.hpp"
 #include "loop_task.hpp"
+#include "cancel_level_upgrade_task.hpp"
 #include <type_traits>
 
 namespace yb {
 	});
 }
 
+template <typename R>
+task<R> task<R>::abort_on(cancel_level cl, cancel_level abort_cl)
+{
+	if (m_kind == k_task)
+	{
+		try
+		{
+			return task<R>(new detail::cancel_level_upgrade_task<R>(std::move(*this), cl, abort_cl));
+		}
+		catch (...)
+		{
+			return async::result(this->cancel_and_wait());
+		}
+	}
+
+	return std::move(*this);
+}
+
 template <typename F>
 typename detail::task_protect_type<F>::type protect(F f)
 {
 	}
 }
 
-template <typename F>
-task<void> loop(F f)
-{
-	return protect([&f] {
-		return task<void>(new detail::loop_task<void, F>(async::value(), f));
-	});
-}
-
-template <typename S, typename F>
-task<void> loop(task<S> && t, F f)
-{
-	return protect([&t, &f] {
-		return task<void>(new detail::loop_task<S, F>(std::move(t), f));
-	});
-}
-
 } // namespace yb
 
 #endif // LIBYB_ASYNC_DETAIL_TASK_HPP

File libyb/async/stream_device.cpp

 
 task<void> stream_device::write_loop(stream & s)
 {
-	return wait_for(m_start_write).then([this, &s] {
+	return wait_for(m_start_write).abort_on(cl_quit).then([this, &s] {
 		return s.write_all(m_write_buffer.data(), m_write_buffer.size());
 	}).then([this]() -> task<void> {
 		m_write_buffer.swap(m_write_backlog);
 {
 	try
 	{
-		task<void> read_task = loop<size_t>(s.read(m_read_buffer, sizeof m_read_buffer), [this, &s](size_t r, cancel_level cl) -> task<size_t> {
+		task<void> read_task = loop<size_t>(async::value((size_t)0), [this, &s](size_t r, cancel_level cl) -> task<size_t> {
 			m_parser.parse(*this, buffer_ref(m_read_buffer, r));
 			if (cl >= cl_quit)
 				return nulltask;
-			return s.read(m_read_buffer, sizeof m_read_buffer);
+			return s.read(m_read_buffer, sizeof m_read_buffer).abort_on(cl_quit);
 		});
 
 		task<void> write_task = loop([this, &s](cancel_level cl) {

File test/test.vcxproj

     <ClInclude Include="..\libyb\async\channel.hpp" />
     <ClInclude Include="..\libyb\async\descriptor_reader.hpp" />
     <ClInclude Include="..\libyb\async\detail\canceller_task.hpp" />
+    <ClInclude Include="..\libyb\async\detail\cancel_level_upgrade_task.hpp" />
     <ClInclude Include="..\libyb\async\detail\loop_task.hpp" />
     <ClInclude Include="..\libyb\async\detail\parallel_composition_task.hpp" />
     <ClInclude Include="..\libyb\async\detail\sequential_composition_task.hpp" />

File test/test.vcxproj.filters

     <ClInclude Include="..\libyb\async\cancel_level.hpp">
       <Filter>libyb\async</Filter>
     </ClInclude>
+    <ClInclude Include="..\libyb\async\detail\cancel_level_upgrade_task.hpp">
+      <Filter>libyb\async\detail</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="libyb">