Use of promises with dist_object rpc
Following chapter 10 in the programmer’s guide, I’m trying to use upcxx::dist_object
with upcxx::promise
. The goal is to have something like dist_object::fetch
, but with promises. Sample code:
upcxx::dist_object<double> psum_d(0);
upcxx::barrier();
// compute values for psum_d
// ...
upcxx::promise<> prom;
std::vector<double> result_v(upcxx::rank_n());
for (int k = 0; k < upcxx::rank_n(); ++k) {
result_v[k] = upcxx::rpc(k, upcxx::operation_cx::as_promise(prom),
[](upcxx::dist_object<double> &dobj) {
return *dobj;
},
psum_d);
}
upcxx::future<> fut = prom.finalize();
fut.wait();
This results in the following error messages:
[build] [1/2 50% :: 3.088] Building CXX object reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o
[build] FAILED: reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o
[build] /bin/g++-10 -DGASNET_SEQ -DUPCXX_ASSERT_ENABLED=1 -DUPCXX_BACKEND=1 -DUPCXX_BACKEND_GASNET_SEQ=1 -DUPCXX_MPSC_QUEUE_ATOMIC=1 -D_GNU_SOURCE=1 -isystem /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include -isystem /usr/local/upcxx/gasnet.debug/include -isystem /usr/local/upcxx/gasnet.debug/include/smp-conduit -g -Wall -Wextra -Wpedantic -g3 -Wno-unused -Wunused-result -Wno-unused-parameter -Wno-address -std=gnu++17 -MD -MT reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o -MF reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o.d -o reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o -c /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build] from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build] from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp: In instantiation of ‘struct upcxx::detail::completions_state_head<true, upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>, upcxx::promise_cx<upcxx::operation_cx_event>, 0>’:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:854:12: required from ‘struct upcxx::detail::completions_state<upcxx::detail::event_is_here, upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>, upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >, 0>’
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/rpc.hpp:376:19: required from ‘typename upcxx::detail::rpc_return<Fn(Arg ...), typename std::decay<_Tp>::type>::type upcxx::detail::rpc(const upcxx::team&, upcxx::intrank_t, Fn&&, Arg&& ..., Cxs&&, int) [with Cxs = upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >; Fn = main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&; Arg = {upcxx::dist_object<double>&}; typename upcxx::detail::rpc_return<Fn(Arg ...), typename std::decay<_Tp>::type>::type = void; upcxx::intrank_t = int]’
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/rpc.hpp:471:53: required from ‘typename std::enable_if<upcxx::detail::is_completions<typename std::decay<_Tp>::type>::value, typename upcxx::detail::rpc_return_no_sfinae<Fn(Arg ...), typename std::decay<_Tp>::type>::type>::type upcxx::rpc(upcxx::intrank_t, Cxs&&, Fn&&, Arg&& ...) [with Cxs = upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >; Fn = main(int, char**)::<lambda(upcxx::dist_object<double>&)>; Arg = {upcxx::dist_object<double>&}; typename std::enable_if<upcxx::detail::is_completions<typename std::decay<_Tp>::type>::value, typename upcxx::detail::rpc_return_no_sfinae<Fn(Arg ...), typename std::decay<_Tp>::type>::type>::type = void; upcxx::intrank_t = int]’
[build] /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:123:27: required from here
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:785:76: error: ‘upcxx::detail::completions_state_head<true, EventValues, Cx, ordinal>::state_’ has incomplete type
[build] 785 | cx_state<Cx, typename EventValues::template tuple_t<cx_event_t<Cx>>> state_;
[build] | ^~~~~~
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build] from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build] from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:467:12: note: declaration of ‘struct upcxx::detail::cx_state<upcxx::promise_cx<upcxx::operation_cx_event>, std::tuple<double> >’
[build] 467 | struct cx_state;
[build] | ^~~~~~~~
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build] from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build] from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:875:7: warning: ‘upcxx::detail::completions_state<EventPredicate, EventValues, upcxx::completions<CxH, CxT ...>, ordinal>::completions_state(upcxx::completions<CxH, CxT ...>&&) [with EventPredicate = upcxx::detail::event_is_here; EventValues = upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>; CxH = upcxx::promise_cx<upcxx::operation_cx_event>; CxT = {}; int ordinal = 0]’ used but never defined
[build] 875 | completions_state(completions<CxH,CxT...> &&cxs):
[build] | ^~~~~~~~~~~~~~~~~
[build] ninja: build stopped: subcommand failed.
[build] Build finished with exit code 1
What am I doing wrong here?
Comments (5)
-
-
Account Deleted reporter Thanks for the detailed and fast response. The idea is indeed to have a sum reduction - however using the built-in
upcxx::reduce_one
and the UDP conduit (the KNL cluster I’m on does not support the faster, high-performance conduits) I get a rather low bandwidth: with an array of size1<<30
(and measuring beginning before computing partial sums, and afterupcxx::reduce_one
completed) I get a bandwidth of around ~1GB/s with 64 UPCXX processes on 4 nodes. (However, see below.)The same when I use
upcxx::barrier
after intialization ofpsum_d
with a hand-rolled RPC (though runtime is not affected). So I was really looking for some ways to improve bandwidth without possible races.If I understand the specification (section 14.3) correctly, with
upcxx::rpc
(in particular,dist_object::fetch
) I should have another option besides usingupcxx::barrier
:Asynchronous point-to-point: The user performs no synchronization to ensure remote existence. Instead, an RPC is sent which, upon arrival, must wait asynchronously via a continuation for the peer to construct the distributed object.
2 UPC++ enables the asynchronous point-to-point approach implicitly when dist_object<T>& arguments are given to any of the RPC family of functions (see Ch. 9).
Another observation I made is if I use the above methods in a loop (on the same input), only the first iteration gives the low bandwidth. I.e. in something like:
std::vector<double> vt; for (int iter = 1; iter <= iterations; ++iter) { time_point<Clock> t = Clock::now(); double psum(0); for (index_t i = 0; i < block_size; ++i) { psum += u[i]; } double result = upcxx::reduce_one(psum, upcxx::op_fast_add, 0).wait(); if (proc_id == 0) { Duration d = Clock::now() - t; double time = d.count(); // time in seconds vt.push_back(time); } upcxx::barrier(); } if (proc_id == 0 && bench) { for (auto&& time: vt) { double throughput = N * sizeof(float) * 1e-9 / time; std::fprintf(stdout, "%ld,%.12f,%.12f\n", N, time, throughput); } }
I get something like (again on 64 processes):
1073741824,5.015013009000,0.856421965066 1073741824,0.240534014000,17.855966499607 1073741824,0.232970593000,18.435662804876 1073741824,0.224616679000,19.121319552588 1073741824,0.224659049000,19.117713331013
Just to make sure it’s not the compiler playing tricks on me, I can confirm similar behavior with
-g -O0
.Sorry if this is a wider scope than the original request; but this has had me thinking for some time.
-
Account Deleted reporter As it turns out the barrier belongs at the top of the iteration loop, not the end. Everything works as intended now with
upcxx::reduce_one
. -
Account Deleted reporter - changed status to resolved
-
- changed component to Support: Programming
-
assigned issue to
Hi @{5ef25820cdec5c0aaf52ca9b} - Glad to hear this is resolved.
One clarification, for the record:
You don't need explicit synchronization to ensure
dist_object
construction, provided you pass the relevant value it should contain to thedist_object
constructor. If you construct thedist_object
with one value and mutate it by assigning a second value later (as separate steps, as in the first comment), then you're still responsible for synchronization to ensure the "second" value is the one fetched. This is why I moved the barrier in the initial example. - Log in to comment
Hi @{5ef25820cdec5c0aaf52ca9b}, thanks for the question.
Your program fails to compile for several reasons:
double
so the operation completion event will produce that value. However theoperation_cx::as_promise
completion specifies an empty promise, so there's no place to "catch" the value produced by the operation completion event.rpc
call requests onlyoperation_cx::as_promise
completion for the RPC, so therpc
function template will return void (because you've overridden the default ofoperation_cx::as_future
that instructs the call to return aupcxx::future
). The assignment of this void return toresult_v[k]
is incorrect and won't work as intended.You could pass a non-empty promise (ie
promise<double>
) tooperation_cx::as_promise
to "catch" the resulting value, but that only works for a single RPC (a given promise may only have its value fulfilled once) so it won't aggregate completion of separate fetches.I'm not sure what the exact goal here is, so let me provide a few alternatives.
If the idea is to use a promise to orchestrate the synchronization for the non-blocking fetches, you still need to catch the return value from the RPC and save it to the right place. Here's one way you might do that:
If the idea is to combine the result values using something like a sum reduction, then the simplest and best approach is to ditch RPC entirely and use the built-in reduction collective: (which notably can be much more efficient than hand-rolling a linear RPC traversal)
See spec section 12.2 for the list of built-in (offloadable) reduction operators, or you can write your own combinator callable if necessary.
Hope this helps!