hang when creating dist_object within future chains

Issue #416 invalid
Rob Egan created an issue

I’m having trouble creating a stable class that has member dist_objects and/or atomic_domains. (i.e. it creates & destroys them on construction and destruction). When multiple instances of the class may be pending the program hangs or throws assertions possibly indicating that the distributed objects are getting mixed between the instances on different ranks. This is preventing me from using these classes within a block called by progress(), because I need to call wait() in the destructor to avoid any hangs.

I’ve reduced this to a reproducer code, with effectively 2 versions (with and without DO_IN_FUTURE) which moves the construction of the distributed object into a future, instead of the main loop.

It works fine (on 1 machine) when dist_object construction is in the main loop and (just) FIX_NO_DO_IN_FUTURE is defined. The FIX_NO_DO_IN_FUTURE adds a wait() and barrier at the end of every iteration, effectively serializing the code across all ranks, which is something I’m trying to avoid. All the commented out “// futile “ barriers did not fix the hanging problem that occurs without FIX_NO_DO_IN_FUTURE. i.e. if you uncomment them all but do not include FIX_NO_DO_IN_FUTURE, the loop hangs. In my opinion, even a barrier in the loop should not be necessary, let alone a wait + barrier.

With DO_IN_FUTURE defined, I’ve had no success. I am trying to future-chain the loop but I haven’t been able to get any version to be stable at all (>90% of the time it hangs after a few iterations).

#define ITERATIONS 200
//#define DO_IN_FUTURE
#define FIX_NO_DO_IN_FUTURE

using DO = dist_object<int>;
using ShDO = shared_ptr<DO>;

void test_future_chain() {

  barrier();
  future<> fut_all = make_future();;
  for(int i = 0; i < ITERATIONS ; i++) {

      if (!rank_me()) std::cout << "." << std::flush;

      future<> fut = make_future();

      ShDO sh_do;
#ifdef DO_IN_FUTURE
      assert(!sh_do);
      barrier(); // futile
      fut = barrier_async(); // futile to protect future execution
      fut = when_all(fut, fut_all); // futile to chain here
      barrier(); // futile
#else
      //barrier(); // futile to protect dist_object construction
      sh_do = make_shared< DO >(i);
      assert(sh_do);
      assert(*(*sh_do) == i);
      //fut = barrier_async(); // also futile
      //barrier(); // futile to protect dist_object construction

      //fut = when_all(fut, fut_all); // futile to chain here
#endif

      fut = fut.then([i,sh_do]() {
          ShDO sh_do2 = sh_do; // copy since sh_do is const
          future<ShDO> fut_sh_do;

#ifdef DO_IN_FUTURE
          assert(!sh_do2);
#ifdef IDEALLY
          sh_do2 = make_shared< DO >(i)
          fut_sh_do = make_future(sh_do2);
#else  // but none of these following barriers do any good either
          fut_sh_do = barrier_async().then([i]() {
              auto sh_do2 = make_shared< DO >(i);
              DBG("do=", sh_do2->id(), " for i=", i, "\n");
              return sh_do2;
          });
          fut_sh_do = when_all(fut_sh_do, barrier_async()); // futile
#endif

#else
          assert(sh_do2);
          fut_sh_do = make_future(sh_do2);
          //fut_sh_do = when_all(fut_sh_do, barrier_async()); // futile
#endif

          auto fut_rpc = fut_sh_do.then([i](ShDO sh_do_copy) {
            return rpc((rank_me() + 1) % rank_n(), 
                  [](DO &_do, int other_i) {
                DBG("Got do=", *_do, " other_i=", other_i, "\n");
                assert(*_do == other_i);
                return *_do;
            }, *sh_do_copy, i);
          });
          return when_all(fut_rpc, fut_sh_do).then([i](int returned_i, ShDO sh_do_copy) {
             assert(i == *(*sh_do_copy)); 
             assert(i == returned_i);
          });

      });

#ifdef DO_IN_FUTURE
      fut.wait(); // futile
      barrier();  // futile
#else
#ifdef FIX_NO_DO_IN_FUTURE
      // must serialize with wait *and* barrier!!      
      fut.wait();
      barrier();
#endif

#endif

      fut_all = when_all(fut_all, fut);

  }
  if (!rank_me()) std::cout << "done\n" << std::flush;
  fut_all.wait();
  barrier();
}

Comments (7)

  1. Rob Egan reporter

    I have a somewhat simpler example using just a loop over atomic domain contruction/destruction too.

    I tried this example on my desktop, allocating more shared heap that the default and it does not change the behavior. Where in the loop it hangs is non-deterministic.

        vector<upcxx::atomic_op> ad_ops{upcxx::atomic_op::fetch_add, upcxx::atomic_op::load, upcxx::atomic_op::store};
        future<> all_futs = make_future();
        for (int i = 0; i < 50000; i++) {
    
            upcxx::barrier(); // barrier for atomic_domain contruction
            auto sh_ad = make_shared<upcxx::atomic_domain<int>>(ad_ops, upcxx::world());
    
            upcxx::global_ptr<int> gptr;
            if (!rank_me()) {
                gptr = upcxx::new_<int>(0);
            }
            auto fut = upcxx::broadcast(gptr, 0)
                    .then([sh_ad] (upcxx::global_ptr<int> gptr) {
                        auto val_fut = sh_ad->fetch_add(gptr, rank_me(), std::memory_order_relaxed);
                        auto fut = val_fut.then([sh_ad, gptr](int val) {
                            // noop
                        });
                        return when_all(fut, barrier_async()).then([sh_ad, gptr]() {
                            sh_ad->destroy(upcxx::entry_barrier::none);
                            if (gptr.where() == rank_me()) upcxx::delete_(gptr);
                        });
                    });
            all_futs = when_all(all_futs, fut);
    
            // THIS WAIT IS REQUIRED!!!  
            // If commented out, a hang occurs after some number of iterations
            fut.wait();
    
        }
        all_futs.wait();
    

    hazarding a guess on where the hang occurs when there is no wait within the loop, I would think that the barrier_async, required for the destruction of the atomic_domain, is becoming ready out of synchronous order between the ranks.

    When I replace the call to when_all(fut, barrier_async()) for the destroy lambda with this:

    auto                fut_barrier = fut.then([sh_ad,i](){
                            DBG("starting barrier for i=", i, "\n");
                            return barrier_async();
                        }).then([i](){
                            DBG("completed barrier for i=", i, "\n");
                        });
                        return when_all(fut, fut_barrier).then(...)
    

    I can reproduce a case where some ranks output “completed barrier for i=11849” when other ranks have not even started that barrier iteration.

    So maybe the async_barrier in the future is getting mixed up with the synchronous barrier in the main loop? Perhaps adding some uniqueness like a hash for the file & line where barrier is being called to the underlying dist_object::id member might help keep barriers from getting mixed? Or maybe the async_barriers are being initiated out of order on some ranks?

  2. Rob Egan reporter

    One more thing is that the above code (eventually) hangs with a Debug build, but SIGABRTs (eventually) with a Release build:

    *** Caught a fatal signal (proc 0): SIGABRT(6)
    NOTICE: We recommend linking the debug version of GASNet to assist you in resolving this application issue.
    [0] Invoking GDB for backtrace...
    [0] /usr/bin/gdb -nx -batch -x /tmp/gasnet_38VGDs '/home/regan/workspace/upcxx-utils/build/./test/test_aaaa' 34402
    [0] [Thread debugging using libthread_db enabled]
    [0] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
    [0] 0x00007f04efb976e7 in __GI___waitpid (pid=34450, stat_loc=stat_loc@entry=0x7ffc9602de58, options=options@entry=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
    [0] #0  0x00007f04efb976e7 in __GI___waitpid (pid=34450, stat_loc=stat_loc@entry=0x7ffc9602de58, options=options@entry=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
    [0] #1  0x00007f04efb02107 in do_system (line=<optimized out>) at ../sysdeps/posix/system.c:149
    [0] #2  0x000056302e321eca in gasneti_bt_gdb ()
    [0] #3  0x000056302e326b98 in gasneti_print_backtrace ()
    [0] #4  0x000056302e2e5b4f in gasneti_defaultSignalHandler ()
    [0] #5  <signal handler called>
    [0] #6  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
    [0] #7  0x00007f04efaf38b1 in __GI_abort () at abort.c:79
    [0] #8  0x000056302e3178b5 in mspace_memalign ()
    [0] #9  0x000056302e3015d7 in upcxx::backend::gasnet::allocate(unsigned long, unsigned long, upcxx::backend::gasnet::sheap_footprint_t*) ()
    [0] #10 0x000056302e2e9a17 in test_ad_create() ()
    [0] #11 0x000056302e2eb29b in test_aaaa(int, char**) ()
    [0] #12 0x000056302e2e5d0a in main ()
    

    or also here sometimes too:

    *** Caught a fatal signal (proc 0): SIGABRT(6)
    NOTICE: We recommend linking the debug version of GASNet to assist you in resolving this application issue.
    [0] Invoking GDB for backtrace...
    [0] /usr/bin/gdb -nx -batch -x /tmp/gasnet_T66lgf '/home/regan/workspace/upcxx-utils/build/./test/test_aaaa' 34703
    [0] [Thread debugging using libthread_db enabled]
    [0] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
    [0] 0x00007f6c7d25b6e7 in __GI___waitpid (pid=34754, stat_loc=stat_loc@entry=0x7ffe495f0418, options=options@entry=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
    [0] #0  0x00007f6c7d25b6e7 in __GI___waitpid (pid=34754, stat_loc=stat_loc@entry=0x7ffe495f0418, options=options@entry=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
    [0] #1  0x00007f6c7d1c6107 in do_system (line=<optimized out>) at ../sysdeps/posix/system.c:149
    [0] #2  0x0000555a00d6eeca in gasneti_bt_gdb ()
    [0] #3  0x0000555a00d73b98 in gasneti_print_backtrace ()
    [0] #4  0x0000555a00d32b4f in gasneti_defaultSignalHandler ()
    [0] #5  <signal handler called>
    [0] #6  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
    [0] #7  0x00007f6c7d1b78b1 in __GI_abort () at abort.c:79
    [0] #8  0x0000555a00d61755 in mspace_free ()
    [0] #9  0x0000555a00d48f55 in upcxx::backend::gasnet::deallocate(void*, upcxx::backend::gasnet::sheap_footprint_t*) ()
    [0] #10 0x0000555a00d336ee in upcxx::detail::future_body_then<upcxx::future1<upcxx::detail::future_kind_when_all<upcxx::future1<upcxx::detail::future_kind_shref<upcxx::detail::future_header_ops_general, false>>, upcxx::future1<upcxx::detail::future_kind_shref<upcxx::detail::future_header_ops_general, false>> >>, test_ad_create()::{lambda(upcxx::global_ptr<int, (upcxx::memory_kind)1>)#1}::operator()(upcxx::global_ptr<int, (upcxx::memory_kind)1>) const::{lambda()#3}>::leave_active(upcxx::detail::future_header_dependent*) ()
    [0] #11 0x0000555a00d56d73 in upcxx::detail::future_header::entered_ready_with_sucs(upcxx::detail::future_header*, upcxx::detail::future_header::dependency_link*) ()
    [0] #12 0x0000555a00d53806 in upcxx::progress(upcxx::progress_level) ()
    [0] #13 0x0000555a00d5bfe2 in upcxx::barrier(upcxx::team const&) ()
    [0] #14 0x0000555a00d362cd in test_ad_create() ()
    [0] #15 0x0000555a00d3829b in test_aaaa(int, char**) ()
    [0] #16 0x0000555a00d32d0a in main ()
    

  3. Dan Bonachea

    Hi Rob -

    I haven't fully parsed your first example yet, but your second example seems to be violating the collective calling requirement on barrier_async and atomic_domain::destroy. Quoting from spec Ch12:

    A collective operation is a UPC++ operation that must be matched across all participating processes. Informally, any two processes that both participate in a pair of collective operations must agree on their ordering.

    See 12.0 and 12.1 for the detailed requirements.

    I think one source of confusion here may be that all operations specified as "collective", such as upcxx::broadcast in this example, must be initiated according to the collective ordering property, but are NOT guaranteed to generate and signal completions in a collective order. There are two components to this:

    1. Asynchronous collectives may truly complete in different orders on different processes (eg due to lack of ordering in the underlying network), leading to completion signalling in different orders, and therefore chained callbacks running in non-collective orders.
    2. When two or more completions have been signaled in one process and callbacks are scheduled on each of the now-readied futures, there is no guarantee regarding the relative order in which those callbacks are executed during the next user-level progress call.

    The summary of these is there is no guaranteed order between asynchronous callback invocations aside from what is explicitly constructed using future dependencies, so correctly constructing a compliant collective invocation from inside an asynchronous callback is "challenging".

    Examining the code I see that without the fut.wait() line to synchronous broadcast completion in the loop, there can be broadcast operations initiated during multiple iterations of the loop running concurrently, and as just mentioned they may complete in different orders on different processes. This means the .then callback scheduled on the future completion of the broadcast need not execute in a collective order. Therefore any collective actions taken by that callback (either directly or scheduled for later) are already erroneous when the broadcast completion ordering is actually non-collective at runtime. Once this collective ordering is "lost" within a future chain, there is no simple way to "re-construct" it. Note the future chaining line all_futs = when_all(all_futs, fut); as currently written does not establish any ordering between actions taken in callbacks originating from different iterations of the loop, it just aggregates together the fact that all iterations have fully completed.

    Similarly, consider this construct:

    auto fut = // non-collective asynchronous comms
    when_all(fut, barrier_async()).then([sh_ad, gptr]() {
    sh_ad->destroy(upcxx::entry_barrier::none);
    ...
    }
    

    Even if we ignore the fact this block of code was called non-collectively (making the barrier_async initiation erroneous), this code is problematic in isolation: because it launches an asynchronous collective (barrier), and schedules a collective action (atomic_domain::destroy()) to take place on the completion of that barrier (which might not happen collectively) and the completion of some other non-collective asynchronous comms (which is almost certain to not happen collectively). Passing upcxx::entry_barrier::none does not "cancel" the requirement to make the atomic_domain::destroy() call collectively from all team members.

    Does this answer your questions?

  4. Dan Bonachea

    Re-reading the code in the first comment, it seems to suffer from the same category of failure.

    Specifically, dist_object construction is a collective call, so calls like make_shared< DO >(i) that invoke that constructor must very carefully ensure they are invoked collectively. It's VERY difficult to ensure such a call performed within an asynchronous callback respects this ordering property for non-trivial codes.

    Taking one example snippet:

              fut_sh_do = barrier_async().then([i]() {
                  auto sh_do2 = make_shared< DO >(i);
                   ...
              };
    

    This initiates an asynchronous operation on all ranks and schedules a completion that invokes a collective call. This callback will run later, and (without further explicit dependencies) not necessarily in a collective order with respect to surrounding operations, thus breaking the preconditions and the dist_object itself. Stated differently, if a second invocation of these same lines is not explicitly dependent on the full completion of the first invocation (eg via chaining after fut_sh_do), allowing the two barriers to be in-flight simultaneously on any rank, then the result is undefined behavior.

    Something else going on in this code that I think worth mentioning is lifetime of dynamically created dist_objects. UPC++'s RPC arrival protects against creation hazards on dist_object construction (meaning an incoming RPC with a dist_object argument has its execution deferred until the corresponding dist_object is locally constructed by the target process). However there is NO such protection for dist_object destruction - if an RPC arrives referencing a dist_object that has already died on the target process, that's undefined behavior and will probably just defer that RPC forever, likely leading to a program hang. We've previously internally discussed enhancements/extensions to change this (see spec issue 124), but that's all speculative at the moment and none of it is specified or implemented.

    So although relying on std::shared_ptr is handy for maintaining the lifetime of dist_object, one must be aware that shared_ptr's reference counting is purely local to one process and does not provide any guarantees that the dist_object is globally "dead" before destruction. IOW it would be erroneous for the last local shared_ptr copy to disappear (invoking the dist_object destructor) when RPCs might still arrive from other ranks targeting that dist_object. The code above allows the last reference to such shared_ptrs to appear in captures/arguments to asynchronously executed callbacks, and I don't think it ensures this required global quiescence property before allowing the last such local reference to be destroyed (immediately after the local callback runs).

  5. Rob Egan reporter

    Thanks Dan! This is extremely helpful and instructive. In light of this I will refactor my code to explicitly wait and/or barrier instead of relying on future chains on the necessary collectives.

    So, as a design issue, given that most functions are safe within the lamba of a future::then() call but that no collectives are, it would be helpful to the programmer to have a method that returns whether the code is within a progress call or not (like is_within_progress()), even if it is only enabled in a Debug build. i.e. something similar to is_local() on a global_ptr which can be if-then-branched / validated / asserted.

    I know something like this exists in the debug build as I’ve seen a similar assertion in a debug build when a wait() is called on a future that is within progress, so what is the reasoning to not expose that to the upcxx developer?

    I suppose just adding a future::wait() in the code that cannot be called within a progress block may be sufficient in a debug builds, but it would be nicer, imo to be able to recognize this anti-pattern and abort with a message at runtime.

    And as for the dist_object lifetime issue, I’ve seen that before and solved that by returning a barrier_async() to ensure the lifetime of the shared_ptr<dist_object> persists, but I will refactor that code too.

  6. Dan Bonachea

    @Rob Egan Thanks for the feedback.

    I've opened the following issues responsive to discussion here:

    We're very interested to hear your thoughts on these, especially the impact of spec issue 169 on ExaBiome codes. If we are agreed the code in this issue triggers undefined behavior, may we resolve this issue?

    CC: @Steven Hofmeyr

    I suppose just adding a future::wait() in the code that cannot be called within a progress block may be sufficient in a debug builds, but it would be nicer, imo to be able to recognize this anti-pattern and abort with a message at runtime.

    Note that only future::wait() on non-ready futures is erroneous and triggers the debug error you are referring to. So it cannot be used to construct the anti-pattern test you are suggesting (and still terminate for the correct case). spec issue 170 proposes to add the query tool you are requesting to directly detect the defect in question. And if we accept spec issue 169 to specify a no-collectives-in-progress rule, then we'll deploy some automated enforcement of that rule (severity TBD).

  7. Dan Bonachea

    The programs in the original report break the collective ordering preconditions, leading to undefined behavior.

    Further discussion of the semantics of collectives in the restricted context is taking place in other issues.

  8. Log in to comment