Source

mpi3-fortran / ompi / mca / pml / base / pml_base_bsend.c

/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2007 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2007      Sun Microsystems, Inc.  All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

#include "ompi_config.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/mca/allocator/base/base.h"
#include "ompi/mca/allocator/allocator.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_request.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/mca/mpool/mpool.h" 

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif  /* HAVE_UNISTD_H */

static opal_mutex_t     mca_pml_bsend_mutex;      /* lock for thread safety */
static opal_condition_t mca_pml_bsend_condition;  /* condition variable to block on detach */
static mca_allocator_base_component_t* mca_pml_bsend_allocator_component;  
static mca_allocator_base_module_t* mca_pml_bsend_allocator;  /* sub-allocator to manage users buffer */
static size_t           mca_pml_bsend_usersize;   /* user provided buffer size */
static unsigned char   *mca_pml_bsend_userbase;   /* user provided buffer base */
static unsigned char   *mca_pml_bsend_base;       /* adjusted base of user buffer */
static unsigned char   *mca_pml_bsend_addr;       /* current offset into user buffer */
static size_t           mca_pml_bsend_size;       /* adjusted size of user buffer */
static size_t           mca_pml_bsend_count;      /* number of outstanding requests */
static size_t           mca_pml_bsend_pagesz;     /* mmap page size */
static int              mca_pml_bsend_pagebits;   /* number of bits in pagesz */
static int32_t          mca_pml_bsend_init = 0; 



/*
 * Routine to return pages to sub-allocator as needed 
 */
static void* mca_pml_bsend_alloc_segment(
    struct mca_mpool_base_module_t* module, 
    size_t* size_inout, 
    mca_mpool_base_registration_t** registration)
{
    void *addr;
    size_t size = *size_inout;
    if(mca_pml_bsend_addr + size > mca_pml_bsend_base + mca_pml_bsend_size) {
        return NULL;
    }
    /* allocate all that is left */
    size = mca_pml_bsend_size - (mca_pml_bsend_addr - mca_pml_bsend_base); 
    addr = mca_pml_bsend_addr;
    mca_pml_bsend_addr += size;
    *size_inout = size;
    if (NULL != registration) *registration = NULL;
    return addr;
}


/*
 * One time initialization at startup
 */
int mca_pml_base_bsend_init(bool thread_safe)
{
    char *name;
    size_t tmp;

    if(OPAL_THREAD_ADD32(&mca_pml_bsend_init, 1) > 1)
        return OMPI_SUCCESS;

    /* initialize static objects */
    OBJ_CONSTRUCT(&mca_pml_bsend_mutex, opal_mutex_t);
    OBJ_CONSTRUCT(&mca_pml_bsend_condition, opal_condition_t);

    /* lookup name of the allocator to use for buffered sends */
    (void) mca_base_param_reg_string_name ("pml", "base_bsend_allocator", NULL, false, false,
                                           "basic", &name);

    if(NULL == (mca_pml_bsend_allocator_component = mca_allocator_component_lookup(name))) {
        free(name);
        return OMPI_ERR_BUFFER;
    }
    free(name);

    /* determine page size */
    tmp = mca_pml_bsend_pagesz = sysconf(_SC_PAGESIZE);
    mca_pml_bsend_pagebits = 0;
    while( tmp != 0 ) {
        tmp >>= 1;
        mca_pml_bsend_pagebits++;
    }
    return OMPI_SUCCESS;
}


/*
 * One-time cleanup at shutdown - release any resources.
 */
int mca_pml_base_bsend_fini(void)
{
    if(OPAL_THREAD_ADD32(&mca_pml_bsend_init,-1) > 0) 
        return OMPI_SUCCESS;

    if(NULL != mca_pml_bsend_allocator)
        mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator);
    mca_pml_bsend_allocator = NULL;

    OBJ_DESTRUCT(&mca_pml_bsend_condition);
    OBJ_DESTRUCT(&mca_pml_bsend_mutex);
    return OMPI_SUCCESS;
}


/*
 * User-level call to attach buffer.
 */
int mca_pml_base_bsend_attach(void* addr, int size)
{
    int align;

    bool thread_safe = ompi_mpi_thread_multiple;
    if(NULL == addr || size <= 0) {
        return OMPI_ERR_BUFFER;
    }

    /* check for buffer already attached */
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
    if(NULL != mca_pml_bsend_allocator) {
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        return OMPI_ERR_BUFFER;
    }

    /* try to create an instance of the allocator - to determine thread safety level */
    mca_pml_bsend_allocator = mca_pml_bsend_allocator_component->allocator_init(thread_safe, mca_pml_bsend_alloc_segment, NULL, NULL);
    if(NULL == mca_pml_bsend_allocator) {
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        return OMPI_ERR_BUFFER;
    }

    /*
     * Save away what the user handed in.  This is done in case the
     * base and size are modified for alignment issues.
     */
    mca_pml_bsend_userbase = (unsigned char*)addr;
    mca_pml_bsend_usersize = size;
    /* 
     * Align to pointer boundaries. The bsend overhead is large enough
     * to account for this.  Compute any alignment that needs to be done.
     */
    align = sizeof(void *) - ((size_t)addr & (sizeof(void *) - 1));

    /* setup local variables */
    mca_pml_bsend_base = (unsigned char *)addr + align;
    mca_pml_bsend_addr = (unsigned char *)addr + align;
    mca_pml_bsend_size = size - align;
    mca_pml_bsend_count = 0;
    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    return OMPI_SUCCESS;
}

/*
 * User-level call to detach buffer 
 */
int mca_pml_base_bsend_detach(void* addr, int* size)
{
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);

    /* is buffer attached */
    if(NULL == mca_pml_bsend_allocator) {
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        return OMPI_ERR_BUFFER;
    }

    /* wait on any pending requests */
    while(mca_pml_bsend_count != 0)
        opal_condition_wait(&mca_pml_bsend_condition, &mca_pml_bsend_mutex);
    
    /* free resources associated with the allocator */
    mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator);
    mca_pml_bsend_allocator = NULL;

    /* return current settings */
    if(NULL != addr)
        *((void**)addr) = mca_pml_bsend_userbase;
    if(NULL != size)
        *size = (int)mca_pml_bsend_usersize;

    /* reset local variables */
    mca_pml_bsend_userbase = NULL;
    mca_pml_bsend_usersize = 0;
    mca_pml_bsend_base = NULL;
    mca_pml_bsend_addr = NULL;
    mca_pml_bsend_size = 0;
    mca_pml_bsend_count = 0;
    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    return OMPI_SUCCESS;
} 

     
/* 
 * pack send buffer into buffer
 */

int mca_pml_base_bsend_request_start(ompi_request_t* request)
{
    mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
    struct iovec iov;
    unsigned int iov_count;
    size_t max_data;
    int rc;

    if(sendreq->req_bytes_packed > 0) {

        /* has a buffer been provided */
        OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
        if(NULL == mca_pml_bsend_addr) {
            sendreq->req_addr = NULL;
            OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
            return OMPI_ERR_BUFFER;
        }

        /* allocate a buffer to hold packed message */
        sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
            mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0, NULL);
        if(NULL == sendreq->req_addr) {
            /* release resources when request is freed */
            sendreq->req_base.req_pml_complete = true;
            OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
            return OMPI_ERR_BUFFER;
        }
    
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);

        /* The convertor is already initialized in the begining so we just have to
         * pack the data in the newly allocated buffer.
         */
        iov.iov_base = (IOVBASE_TYPE*)sendreq->req_addr;
        iov.iov_len = sendreq->req_bytes_packed;
        iov_count = 1;
        max_data = iov.iov_len;
        if((rc = opal_convertor_pack( &sendreq->req_base.req_convertor, 
                                      &iov, 
                                      &iov_count, 
                                      &max_data )) < 0) {
            return OMPI_ERROR;
        }
 
        /* setup convertor to point to packed buffer (at position zero) */
        opal_convertor_prepare_for_send( &sendreq->req_base.req_convertor, &(ompi_mpi_packed.dt.super),
                                         max_data, sendreq->req_addr );
        /* increment count of pending requests */
        mca_pml_bsend_count++;
    }
    
    return OMPI_SUCCESS;
}


/* 
 * allocate buffer
 */

int mca_pml_base_bsend_request_alloc(ompi_request_t* request)
{
    mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;

    assert( sendreq->req_bytes_packed > 0 );

    /* has a buffer been provided */
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
    if(NULL == mca_pml_bsend_addr) {
        sendreq->req_addr = NULL;
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        return OMPI_ERR_BUFFER;
    }

    /* allocate a buffer to hold packed message */
    sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
        mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0, NULL);
    if(NULL == sendreq->req_addr) {
        /* release resources when request is freed */
        sendreq->req_base.req_pml_complete = true;
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        /* progress communications, with the hope that more resources
         *   will be freed */
        opal_progress();
        return OMPI_ERR_BUFFER;
    }

    /* increment count of pending requests */
    mca_pml_bsend_count++;
    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    
    return OMPI_SUCCESS;
}

/* 
 * allocate buffer
 */

void*  mca_pml_base_bsend_request_alloc_buf( size_t length )
{
    void* buf = NULL;
    /* has a buffer been provided */
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
    if(NULL == mca_pml_bsend_addr) {
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        return NULL;
    }

    /* allocate a buffer to hold packed message */
    buf = mca_pml_bsend_allocator->alc_alloc(
        mca_pml_bsend_allocator, length, 0, NULL);
    if(NULL == buf) {
        /* release resources when request is freed */
        OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
        /* progress communications, with the hope that more resources
         *   will be freed */
        opal_progress();
        return NULL;
    }

    /* increment count of pending requests */
    mca_pml_bsend_count++;
    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    
    return buf;
}


/*
 *  Request completed - free buffer and decrement pending count 
 */
int mca_pml_base_bsend_request_free(void* addr)
{
    /* remove from list of pending requests */
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);

    /* free buffer */
    mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, addr);
    
    /* decrement count of buffered requests */
    if(--mca_pml_bsend_count == 0)
        opal_condition_signal(&mca_pml_bsend_condition);

    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    return OMPI_SUCCESS;
}
                                                                                                             


/*
 *  Request completed - free buffer and decrement pending count 
 */
int mca_pml_base_bsend_request_fini(ompi_request_t* request)
{
    mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
    if(sendreq->req_bytes_packed == 0 || 
       sendreq->req_addr == NULL || 
       sendreq->req_addr == sendreq->req_base.req_addr)
        return OMPI_SUCCESS;

    /* remove from list of pending requests */
    OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);

    /* free buffer */
    mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, sendreq->req_addr);
    sendreq->req_addr = sendreq->req_base.req_addr;

    /* decrement count of buffered requests */
    if(--mca_pml_bsend_count == 0)
        opal_condition_signal(&mca_pml_bsend_condition);

    OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
    return OMPI_SUCCESS;
}