Snippets

Zeta-Engg INC-3418-SEV-0: Fix in Allocator.java

Created by Ramki Gaddipati
package olympus.sparta.allocator.allocation;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import olympus.sparta.allocator.db.AllocationDBConnection;
import olympus.sparta.base.PropertyHandler;
import olympus.sparta.base.db.DBAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class Allocator {
    public static final int BUCKET_COUNT = PropertyHandler.getInstance().getIntValue("bucket.count");

    private static final Logger log = LoggerFactory.getLogger(Allocator.class);
    private static final int DEFAULT_VERSION = 0;

    private final Map<String, SettableFuture<Allocation>> futureAllocations =
            Collections.synchronizedMap(new HashMap<String, SettableFuture<Allocation>>());
    private final Set<String> servicesInterested = Collections.synchronizedSet(new HashSet<String>());
    private volatile boolean shutdown = false;
    private final AllocationWorker worker;
    private final Map<String, Integer> requiredAllocationVersion = new ConcurrentHashMap<>();


    public Allocator(DBAdapter<AllocationDBConnection> db) {
        worker = new AllocationWorker(db, this);
        worker.start();
    }

    public ListenableFuture<Allocation> getAllocation(String serviceType, int version) {
        ListenableFuture<Allocation> future = futureAllocations.get(serviceType);
        Allocation allocation = null;
        boolean requiresRefresh = true;
        try {
            allocation = future == null || !future.isDone() ? null : future.get();
            //////////////////////////////////////////////////////////////////
            //
            //
            // relevant change in this patch to fix the bug
            //
            // ensure requiresRefresh is true when there is no known allocation 
            ////////////////////////////////////////////////////////////////////////
            if (allocation != null) {
                requiresRefresh = version == DEFAULT_VERSION || (allocation.getSystemVersion() < version);
            }
        } catch (Exception e) {
            log.error("Exception : ", e);
        }
        if (requiresRefresh) {
            log.trace("Requires refreshing allocations of {}", serviceType);
            return refreshAllocation(serviceType, version);
        }
        if (allocation != null) {
            log.trace("Returning allocation from cache for {} version {}", serviceType, version);
        } else {
            log.trace("Waiting for allocation");
        }
        return future;
    }

    public ListenableFuture<Allocation> refreshAllocation(String serviceType, int requiredSystemVersion) {
        if (shutdown) throw new RuntimeException("Shutdown");
        ListenableFuture<Allocation> f;
        synchronized (futureAllocations) {
            SettableFuture<Allocation> allocation = futureAllocations.get(serviceType);
            if (allocation == null || allocation.isDone()) {
                allocation = SettableFuture.create();
                futureAllocations.put(serviceType, allocation);
                servicesInterested.add(serviceType);
            }
            if (!requiredAllocationVersion.containsKey(serviceType) ||
                requiredSystemVersion > requiredAllocationVersion.get(serviceType)) {
                requiredAllocationVersion.put(serviceType, requiredSystemVersion);
            }
            f = futureAllocations.get(serviceType);
        }
        worker.refreshAllocation(serviceType);
        return f;
    }

    Set<String> getServicesInterested() {
        return servicesInterested;
    }

    void updateAllocation(Allocation allocation) {
        synchronized (futureAllocations) {
            if(!canAcceptAllocation(allocation)) {
                refreshAllocation(allocation.getServiceType(), getRequiredVersion(allocation.getServiceType()));
                return;
            }
            SettableFuture<Allocation> future = futureAllocations.get(allocation.getServiceType());
            if (!future.set(allocation)) {
                future = SettableFuture.create();
                future.set(allocation);
                futureAllocations.put(allocation.getServiceType(), future);
            }
            if (allocation instanceof NullAllocation) {
                //Don't bother about services which don't have any active instances
                //until a client requests for allocation with a later version.
                servicesInterested.remove(allocation.getServiceType());
            }
        }
    }

    private int getRequiredVersion(String serviceType){
        synchronized (futureAllocations) {
            Integer val = requiredAllocationVersion.get(serviceType);
            if( null == val) {
                return 0;
            }
            return val;
        }
    }

    private boolean canAcceptAllocation(Allocation allocation) {
        int requiredVersion = getRequiredVersion(allocation.getServiceType());
        if (allocation.getSystemVersion() < requiredVersion) {
            log.warn("Clients are waiting for version {} of {}; So ignoring allocation version: {}",
              requiredVersion, allocation.getServiceType(), allocation.getSystemVersion());
            return false;
        }
        return true;
    }

    public void shutdown() {
        worker.shutdown();
    }
}
package olympus.sparta.allocation;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import olympus.sparta.PropertyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class Allocator {
    private static final Logger log = LoggerFactory.getLogger(Allocator.class);

    private static Allocator allocator;
    public static final int BUCKET_COUNT = PropertyHandler.getInstance().getIntValue("bucket.count");
    private static final int DEFAULT_VERSION = 0;

    private final Map<String, SettableFuture<Allocation>> futureAllocations =
            Collections.synchronizedMap(new HashMap<String, SettableFuture<Allocation>>());
    private final Set<String> servicesInterested = Collections.synchronizedSet(new HashSet<String>());
    private volatile boolean shutdown = false;
    private final AllocationWorker worker = new AllocationWorker(this);
    private final HashMap<String, Integer> requiredAllocationVersion = new HashMap<>();


    private Allocator() {
        worker.start();
    }

    public synchronized static void init() {
        if (allocator == null) {
            allocator = new Allocator();
        }
    }

    public static Allocator getInstance() {
        return allocator;
    }

    public ListenableFuture<Allocation> getAllocation(String serviceType, int version) {
        ListenableFuture<Allocation> future = futureAllocations.get(serviceType);
        Allocation allocation = null;
        boolean requiresRefresh = false;
        try {
            allocation = future == null || !future.isDone() ? null : future.get();
            
            //
            // The following line was the problem;
            //
            // Scenario:
            //
            // When an allocation request for a numbered version say 43 is received, 
            // And if the Allocator had no previously known allocation for the service type
            // And if there is a request for allocation with DEFAULT_VERSION (zero) 
            //      is pending to be fetched by the AllocationWorker
            // then the new request DOES NOT trigger refreshAllocation
            //
            // The future that was created to hold the previous request's response is returned.
            // Whreas, in the above scenario refreshAllocation should have been triggered.
            //
            // If the worker fetched an allocation less than the version 43 and 
            // resolved the future with that version, the client would have received a stale version;
            //
            // This never happened to us as often because the server would have known about the
            // version before the client was notified about the version and after that the client 
            // could request for the allocation of that version.  
            //
            // Due to some sub-optimal logging statements introduced in recent versions, the 
            // AllocationWorker was taking several milliseconds to fetch and notify about an allocation.
            // This surfaced the issue in this condition check.
            // 

            
            requiresRefresh = future == null || version == DEFAULT_VERSION ||
                    (allocation != null && allocation.getSystemVersion() < version);
                    
                    
            
                    
        } catch (Exception e) {
            log.error("Exception : ", e);
        }
        if (requiresRefresh) {
            log.trace("Requires refreshing allocations of {}", serviceType);
            return refreshAllocation(serviceType, version);
        }
        if (allocation != null) {
            log.trace("Returning allocation from cache for {} version {}", serviceType, version);
        } else {
            log.trace("Waiting for allocation");
        }
        return future;
    }

    public ListenableFuture<Allocation> refreshAllocation(String serviceType, int requiredSystemVersion) {
        if (shutdown) throw new RuntimeException("Shutdown");
        synchronized (futureAllocations) {
            SettableFuture<Allocation> allocation = futureAllocations.get(serviceType);
            if (allocation == null || allocation.isDone()) {
                allocation = SettableFuture.create();
                futureAllocations.put(serviceType, allocation);
                servicesInterested.add(serviceType);
            }
            if (requiredAllocationVersion.containsKey(serviceType) &&
              requiredSystemVersion > requiredAllocationVersion.get(serviceType)) {
                requiredAllocationVersion.put(serviceType, requiredSystemVersion);
            }
        }
        worker.refreshAllocation(serviceType);
        return futureAllocations.get(serviceType);
    }

    Set<String> getServicesInterested() {
        return servicesInterested;
    }

    void updateAllocation(Allocation allocation) {
        synchronized (futureAllocations) {
            if(!canAcceptAllocation(allocation)) {
                refreshAllocation(allocation.getServiceType(), getRequiredVersion(allocation.getServiceType()));
                return;
            }
            SettableFuture<Allocation> future = futureAllocations.get(allocation.getServiceType());
            if (!future.set(allocation)) {
                future = SettableFuture.create();
                future.set(allocation);
                futureAllocations.put(allocation.getServiceType(), future);
            }
            if (allocation instanceof NullAllocation) {
                //Don't bother about services which don't have any active instances
                //until a client requests for allocation with a later version.
                servicesInterested.remove(allocation.getServiceType());
            }
        }
    }

    int getRequiredVersion(String serviceType){
        Integer val = requiredAllocationVersion.get(serviceType);
        if( null == val) {
            return 0;
        }
        return val;
    }

    boolean canAcceptAllocation(Allocation allocation) {
        int requiredVersion = getRequiredVersion(allocation.getServiceType());
        if (allocation.getSystemVersion() < requiredVersion) {
            log.warn("Clients are waiting for version {} of {}; So ignoring allocation version: {}",
              requiredVersion, allocation.getServiceType(), allocation.getSystemVersion());
            return false;
        }
        return true;
    }

    public void shutdown() {
        worker.shutdown();
    }
}

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.