upcxx::progress_required always return 0 for rpc chain with cx as lpc

Issue #168 resolved
Mathias Jacquelin created an issue

If a thread issues an RPC with a completion as LPC on another thread persona, and then calls upcxx::discharge / upcxx::progress_required, the runtime will report that progress is not required. The other thread, which has its own persona + the master persona, will never executes the RPCs.

Comments (8)

  1. Mathias Jacquelin reporter

    Here is a MWE that illustrates the problem: assert on line 28 will fail.

    #include <upcxx/upcxx.hpp>
    #include <thread>
    #include <cassert>
    
    int main (int argc, char ** argv) {
      upcxx::init();
    
      upcxx::persona lpc_persona;
    
      auto t1 = std::thread(
          [&lpc_persona](){
            upcxx::intrank_t nghb = ( upcxx::rank_me() + 1 ) % upcxx::rank_n();
            upcxx::intrank_t sender = upcxx::rank_me();
            upcxx::rpc(nghb,upcxx::operation_cx::as_lpc(lpc_persona,[nghb,sender](){
                  /*Body of LPC*/
                  assert(sender == upcxx::rank_me() );
                  std::stringstream ss; 
                  ss<<"This is the LPC executing on "<<upcxx::rank_me()<<" and tracking RPC executing on "<<nghb<<std::endl;
                  std::cout<<ss.str();
                  }), 
                [sender,nghb](){
                  /*body of RPC*/
                  assert(nghb == upcxx::rank_me() );
                  std::stringstream ss; 
                  ss<<"This is the RPC executing on "<<upcxx::rank_me()<<" and issued by "<<sender<<std::endl;
                  std::cout<<ss.str();
                }); 
            assert(upcxx::progress_required());
            upcxx::discharge();
          }   
      );  
    
      {
        upcxx::persona_scope ps(lpc_persona);
        upcxx::progress();
      }
    
      t1.join();
    
      upcxx::finalize();
      return 0;
    }
    
  2. Dan Bonachea

    @Mathias Jacquelin : Thanks for providing the code. However, I believe the assertion on line 28 of your MWE is incorrect. It is never guaranteed that progress is required after an rpc injection, because it's possible the injection call blocked until the rpc left the node.

    Incidentally, it's also not guaranteed that the first call to upcxx::progress on the lpc_persona will see the rpc acknowledgement (that needs to be a loop).

    I've altered your program to the one below, which I think should work correctly, but deadlocks when run (on dirac/smp/debug/develop) with the main thread stuck in the polling loop after printing the RPC arrival message (ie the LPC completion is never sent). Do you think altered version represents your originally reported problem?

    Note that compiling with -DSTALL enables a polling loop before the injecting thread exits and that causes the observed deadlock to disappear, which suggests the root cause - the initiating thread is being used to forward the lpc completion, but this is not correctly reflected in progress_required.

    #include <upcxx/upcxx.hpp>
    #include <thread>
    #include <cassert>
    
    int main (int argc, char ** argv) {
      upcxx::init();
    
      upcxx::persona lpc_persona;
      int done = 0;
    
      auto t1 = std::thread(
          [&lpc_persona,&done](){
            upcxx::intrank_t nghb = ( upcxx::rank_me() + 1 ) % upcxx::rank_n();
            upcxx::intrank_t sender = upcxx::rank_me();
            upcxx::rpc(nghb,upcxx::operation_cx::as_lpc(lpc_persona,[nghb,sender,&done](){
                  /*Body of LPC*/
                  assert(sender == upcxx::rank_me() );
                  std::stringstream ss;
                  ss<<"This is the LPC executing on "<<upcxx::rank_me()<<" and tracking RPC executing on "<<nghb<<"\n";
                  std::cout<<ss.str()<<std::flush;
                  done = 1;
                  }),
                [sender,nghb](){
                  /*body of RPC*/
                  assert(nghb == upcxx::rank_me() );
                  std::stringstream ss;
                  ss<<"This is the RPC executing on "<<upcxx::rank_me()<<" and issued by "<<sender<<"\n";
                  std::cout<<ss.str()<<std::flush;
                });
            upcxx::discharge();
            assert(!upcxx::progress_required());
            #if STALL
              while (!done) upcxx::progress();
            #endif
          }
      );
    
      { 
        upcxx::persona_scope ps(lpc_persona);
        while (!done) upcxx::progress();
      }
    
      upcxx::barrier();
    
      t1.join();
    
      if (!upcxx::rank_me()) std::cout << "SUCCESS" << std::endl;
    
      upcxx::finalize();
      return 0;
    }
    
  3. Mathias Jacquelin reporter

    Thank you @Dan Bonachea , I believe you are correct. This does reflect the issue I was observing.

  4. Mathias Jacquelin reporter

    It turns out issue #169 illustrate another issue which I think is related to this one. Using -DSTALL allows the code to complete, but -DSTALL_INTERNAL does not, although no RPC are being run on t1 so user level progress shouldn’t be needed as far as I understand. Here is a modified version of the code:

    #include <upcxx/upcxx.hpp>
    #include <thread>
    #include <cassert>
    
    int main (int argc, char ** argv) {
      upcxx::init();
    
      upcxx::persona lpc_persona;
      int done = 0;
    
      auto t1 = std::thread(
          [&lpc_persona,&done](){
            upcxx::intrank_t nghb = ( upcxx::rank_me() + 1 ) % upcxx::rank_n();
            upcxx::intrank_t sender = upcxx::rank_me();
            upcxx::rpc(nghb,upcxx::operation_cx::as_lpc(lpc_persona,[nghb,sender,&done](){
                  /*Body of LPC*/
                  assert(sender == upcxx::rank_me() );
                  std::stringstream ss;
                  ss<<"This is the LPC executing on "<<upcxx::rank_me()<<" and tracking RPC executing on "<<nghb<<"\n";
                  std::cout<<ss.str()<<std::flush;
                  done = 1;
                  }),
                [sender,nghb](){
                  /*body of RPC*/
                  assert(nghb == upcxx::rank_me() );
                  std::stringstream ss;
                  ss<<"This is the RPC executing on "<<upcxx::rank_me()<<" and issued by "<<sender<<"\n";
                  std::cout<<ss.str()<<std::flush;
                });
            upcxx::discharge();
            assert(!upcxx::progress_required());
            #if STALL
              while (!done) upcxx::progress();
            #endif
            #if STALL_INTERNAL
              while (!done) upcxx::progress(upcxx::progress_level::internal);
            #endif
          }
      );
    
      { 
        upcxx::persona_scope ps(lpc_persona);
        while (!done) upcxx::progress();
      }
    
      upcxx::barrier();
    
      t1.join();
    
      if (!upcxx::rank_me()) std::cout << "SUCCESS" << std::endl;
    
      upcxx::finalize();
      return 0;
    }
    
  5. Log in to comment