Use of promises with dist_object rpc

Issue #452 resolved
Former user created an issue

Following chapter 10 in the programmer’s guide, I’m trying to use upcxx::dist_object with upcxx::promise. The goal is to have something like dist_object::fetch, but with promises. Sample code:

upcxx::dist_object<double> psum_d(0);
upcxx::barrier();

// compute values for psum_d
// ...

upcxx::promise<> prom;
std::vector<double> result_v(upcxx::rank_n());

for (int k = 0; k < upcxx::rank_n(); ++k) {
    result_v[k] = upcxx::rpc(k, upcxx::operation_cx::as_promise(prom),
        [](upcxx::dist_object<double> &dobj) {
            return *dobj;
        },
        psum_d);
}
upcxx::future<> fut = prom.finalize();
fut.wait();

This results in the following error messages:

[build] [1/2  50% :: 3.088] Building CXX object reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o
[build] FAILED: reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o 
[build] /bin/g++-10 -DGASNET_SEQ -DUPCXX_ASSERT_ENABLED=1 -DUPCXX_BACKEND=1 -DUPCXX_BACKEND_GASNET_SEQ=1 -DUPCXX_MPSC_QUEUE_ATOMIC=1 -D_GNU_SOURCE=1 -isystem /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include -isystem /usr/local/upcxx/gasnet.debug/include -isystem /usr/local/upcxx/gasnet.debug/include/smp-conduit -g -Wall -Wextra -Wpedantic -g3 -Wno-unused -Wunused-result -Wno-unused-parameter -Wno-address -std=gnu++17 -MD -MT reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o -MF reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o.d -o reduction/CMakeFiles/reduction-upcxx.dir/upcxx.cpp.o -c /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build]                  from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build]                  from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp: In instantiation of struct upcxx::detail::completions_state_head<true, upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>, upcxx::promise_cx<upcxx::operation_cx_event>, 0>’:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:854:12:   required from struct upcxx::detail::completions_state<upcxx::detail::event_is_here, upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>, upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >, 0>
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/rpc.hpp:376:19:   required from typename upcxx::detail::rpc_return<Fn(Arg ...), typename std::decay<_Tp>::type>::type upcxx::detail::rpc(const upcxx::team&, upcxx::intrank_t, Fn&&, Arg&& ..., Cxs&&, int) [with Cxs = upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >; Fn = main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&; Arg = {upcxx::dist_object<double>&}; typename upcxx::detail::rpc_return<Fn(Arg ...), typename std::decay<_Tp>::type>::type = void; upcxx::intrank_t = int]
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/rpc.hpp:471:53:   required from typename std::enable_if<upcxx::detail::is_completions<typename std::decay<_Tp>::type>::value, typename upcxx::detail::rpc_return_no_sfinae<Fn(Arg ...), typename std::decay<_Tp>::type>::type>::type upcxx::rpc(upcxx::intrank_t, Cxs&&, Fn&&, Arg&& ...) [with Cxs = upcxx::completions<upcxx::promise_cx<upcxx::operation_cx_event> >; Fn = main(int, char**)::<lambda(upcxx::dist_object<double>&)>; Arg = {upcxx::dist_object<double>&}; typename std::enable_if<upcxx::detail::is_completions<typename std::decay<_Tp>::type>::value, typename upcxx::detail::rpc_return_no_sfinae<Fn(Arg ...), typename std::decay<_Tp>::type>::type>::type = void; upcxx::intrank_t = int]
[build] /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:123:27:   required from here
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:785:76: error: upcxx::detail::completions_state_head<true, EventValues, Cx, ordinal>::state_ has incomplete type
[build]   785 |       cx_state<Cx, typename EventValues::template tuple_t<cx_event_t<Cx>>> state_;
[build]       |                                                                            ^~~~~~
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build]                  from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build]                  from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:467:12: note: declaration of struct upcxx::detail::cx_state<upcxx::promise_cx<upcxx::operation_cx_event>, std::tuple<double> >
[build]   467 |     struct cx_state;
[build]       |            ^~~~~~~~
[build] In file included from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/atomic.hpp:5,
[build]                  from /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/upcxx.hpp:6,
[build]                  from /mnt/c/Users/Ferdinand/source/repos/pad-project/reduction/upcxx.cpp:12:
[build] /usr/local/upcxx/upcxx.debug.gasnet_seq.smp/include/upcxx/completion.hpp:875:7: warning: upcxx::detail::completions_state<EventPredicate, EventValues, upcxx::completions<CxH, CxT ...>, ordinal>::completions_state(upcxx::completions<CxH, CxT ...>&&) [with EventPredicate = upcxx::detail::event_is_here; EventValues = upcxx::detail::rpc_event_values<main(int, char**)::<lambda(upcxx::dist_object<double>&)>&&(upcxx::dist_object<double>&)>; CxH = upcxx::promise_cx<upcxx::operation_cx_event>; CxT = {}; int ordinal = 0] used but never defined
[build]   875 |       completions_state(completions<CxH,CxT...> &&cxs):
[build]       |       ^~~~~~~~~~~~~~~~~
[build] ninja: build stopped: subcommand failed.
[build] Build finished with exit code 1

What am I doing wrong here?

Comments (5)

  1. Dan Bonachea

    Hi @{5ef25820cdec5c0aaf52ca9b}, thanks for the question.

    Your program fails to compile for several reasons:

    1. The RPC callback is returning a value of type double so the operation completion event will produce that value. However the operation_cx::as_promise completion specifies an empty promise, so there's no place to "catch" the value produced by the operation completion event.
    2. The rpc call requests only operation_cx::as_promise completion for the RPC, so the rpc function template will return void (because you've overridden the default of operation_cx::as_future that instructs the call to return a upcxx::future). The assignment of this void return to result_v[k] is incorrect and won't work as intended.

    You could pass a non-empty promise (ie promise<double>) to operation_cx::as_promise to "catch" the resulting value, but that only works for a single RPC (a given promise may only have its value fulfilled once) so it won't aggregate completion of separate fetches.

    I'm not sure what the exact goal here is, so let me provide a few alternatives.

    If the idea is to use a promise to orchestrate the synchronization for the non-blocking fetches, you still need to catch the return value from the RPC and save it to the right place. Here's one way you might do that:

    upcxx::dist_object<double> psum_d(0);
    
    // compute values for psum_d
    // ...
    *psum_d = upcxx::rank_me()+1;
    
    upcxx::barrier(); // note must come AFTER initializing psum_d
    
    upcxx::promise<> prom(upcxx::rank_n()); // init with dependency count
    std::vector<double> result_v(upcxx::rank_n());
    
    for (int k = 0; k < upcxx::rank_n(); ++k) {
      psum_d.fetch(k) // start a fetch, returns a future
       .then([=,&result_v](double val) { // chain a completion callback
        result_v[k] = val;  // save the value
        prom.fulfill_anonymous(1); // signal one arrival
      });
    }
    upcxx::future<> fut = prom.get_future();
    fut.wait(); // wait for all operations above to finish
    if (!upcxx::rank_me()) {
     for (auto d : result_v) std::cout << d << ", ";
     std::cout << std::endl;
    }
    

    If the idea is to combine the result values using something like a sum reduction, then the simplest and best approach is to ditch RPC entirely and use the built-in reduction collective: (which notably can be much more efficient than hand-rolling a linear RPC traversal)

    upcxx::future<double> fsum = upcxx::reduce_one(*psum_d, upcxx::op_fast_add, 0);
    if (!upcxx::rank_me()) {
     std::cout << "Sum: " << fsum.wait() << std::endl;
    }
    

    See spec section 12.2 for the list of built-in (offloadable) reduction operators, or you can write your own combinator callable if necessary.

    Hope this helps!

  2. Former user Account Deleted reporter

    Thanks for the detailed and fast response. The idea is indeed to have a sum reduction - however using the built-in upcxx::reduce_one and the UDP conduit (the KNL cluster I’m on does not support the faster, high-performance conduits) I get a rather low bandwidth: with an array of size 1<<30 (and measuring beginning before computing partial sums, and after upcxx::reduce_one completed) I get a bandwidth of around ~1GB/s with 64 UPCXX processes on 4 nodes. (However, see below.)

    The same when I use upcxx::barrier after intialization of psum_d with a hand-rolled RPC (though runtime is not affected). So I was really looking for some ways to improve bandwidth without possible races.

    If I understand the specification (section 14.3) correctly, withupcxx::rpc (in particular, dist_object::fetch) I should have another option besides using upcxx::barrier:

    Asynchronous point-to-point: The user performs no synchronization to ensure remote existence. Instead, an RPC is sent which, upon arrival, must wait asynchronously via a continuation for the peer to construct the distributed object.

    2 UPC++ enables the asynchronous point-to-point approach implicitly when dist_object<T>& arguments are given to any of the RPC family of functions (see Ch. 9).

    Another observation I made is if I use the above methods in a loop (on the same input), only the first iteration gives the low bandwidth. I.e. in something like:

    std::vector<double> vt;
    for (int iter = 1; iter <= iterations; ++iter) {
        time_point<Clock> t = Clock::now();
        double psum(0);
    
        for (index_t i = 0; i < block_size; ++i) {
            psum += u[i];
        }
        double result = upcxx::reduce_one(psum, upcxx::op_fast_add, 0).wait();
    
        if (proc_id == 0) {
            Duration d = Clock::now() - t;
            double time = d.count(); // time in seconds
            vt.push_back(time);
        }
        upcxx::barrier();
    }
    if (proc_id == 0 && bench) {
      for (auto&& time: vt) {
        double throughput = N * sizeof(float) * 1e-9 / time;
        std::fprintf(stdout, "%ld,%.12f,%.12f\n", N, time, throughput);
      }
    }
    

    I get something like (again on 64 processes):

    1073741824,5.015013009000,0.856421965066
    1073741824,0.240534014000,17.855966499607
    1073741824,0.232970593000,18.435662804876
    1073741824,0.224616679000,19.121319552588
    1073741824,0.224659049000,19.117713331013
    

    Just to make sure it’s not the compiler playing tricks on me, I can confirm similar behavior with -g -O0.

    Sorry if this is a wider scope than the original request; but this has had me thinking for some time.

  3. Former user Account Deleted reporter

    As it turns out the barrier belongs at the top of the iteration loop, not the end. Everything works as intended now with upcxx::reduce_one.

  4. Dan Bonachea

    Hi @{5ef25820cdec5c0aaf52ca9b} - Glad to hear this is resolved.

    One clarification, for the record:

    You don't need explicit synchronization to ensure dist_object construction, provided you pass the relevant value it should contain to the dist_object constructor. If you construct the dist_object with one value and mutate it by assigning a second value later (as separate steps, as in the first comment), then you're still responsible for synchronization to ensure the "second" value is the one fetched. This is why I moved the barrier in the initial example.

  5. Log in to comment