Snippets
Created by
Ramki Gaddipati
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | package olympus.sparta.allocator.allocation;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import olympus.sparta.allocator.db.AllocationDBConnection;
import olympus.sparta.base.PropertyHandler;
import olympus.sparta.base.db.DBAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class Allocator {
public static final int BUCKET_COUNT = PropertyHandler.getInstance().getIntValue("bucket.count");
private static final Logger log = LoggerFactory.getLogger(Allocator.class);
private static final int DEFAULT_VERSION = 0;
private final Map<String, SettableFuture<Allocation>> futureAllocations =
Collections.synchronizedMap(new HashMap<String, SettableFuture<Allocation>>());
private final Set<String> servicesInterested = Collections.synchronizedSet(new HashSet<String>());
private volatile boolean shutdown = false;
private final AllocationWorker worker;
private final Map<String, Integer> requiredAllocationVersion = new ConcurrentHashMap<>();
public Allocator(DBAdapter<AllocationDBConnection> db) {
worker = new AllocationWorker(db, this);
worker.start();
}
public ListenableFuture<Allocation> getAllocation(String serviceType, int version) {
ListenableFuture<Allocation> future = futureAllocations.get(serviceType);
Allocation allocation = null;
boolean requiresRefresh = true;
try {
allocation = future == null || !future.isDone() ? null : future.get();
//////////////////////////////////////////////////////////////////
//
//
// relevant change in this patch to fix the bug
//
// ensure requiresRefresh is true when there is no known allocation
////////////////////////////////////////////////////////////////////////
if (allocation != null) {
requiresRefresh = version == DEFAULT_VERSION || (allocation.getSystemVersion() < version);
}
} catch (Exception e) {
log.error("Exception : ", e);
}
if (requiresRefresh) {
log.trace("Requires refreshing allocations of {}", serviceType);
return refreshAllocation(serviceType, version);
}
if (allocation != null) {
log.trace("Returning allocation from cache for {} version {}", serviceType, version);
} else {
log.trace("Waiting for allocation");
}
return future;
}
public ListenableFuture<Allocation> refreshAllocation(String serviceType, int requiredSystemVersion) {
if (shutdown) throw new RuntimeException("Shutdown");
ListenableFuture<Allocation> f;
synchronized (futureAllocations) {
SettableFuture<Allocation> allocation = futureAllocations.get(serviceType);
if (allocation == null || allocation.isDone()) {
allocation = SettableFuture.create();
futureAllocations.put(serviceType, allocation);
servicesInterested.add(serviceType);
}
if (!requiredAllocationVersion.containsKey(serviceType) ||
requiredSystemVersion > requiredAllocationVersion.get(serviceType)) {
requiredAllocationVersion.put(serviceType, requiredSystemVersion);
}
f = futureAllocations.get(serviceType);
}
worker.refreshAllocation(serviceType);
return f;
}
Set<String> getServicesInterested() {
return servicesInterested;
}
void updateAllocation(Allocation allocation) {
synchronized (futureAllocations) {
if(!canAcceptAllocation(allocation)) {
refreshAllocation(allocation.getServiceType(), getRequiredVersion(allocation.getServiceType()));
return;
}
SettableFuture<Allocation> future = futureAllocations.get(allocation.getServiceType());
if (!future.set(allocation)) {
future = SettableFuture.create();
future.set(allocation);
futureAllocations.put(allocation.getServiceType(), future);
}
if (allocation instanceof NullAllocation) {
//Don't bother about services which don't have any active instances
//until a client requests for allocation with a later version.
servicesInterested.remove(allocation.getServiceType());
}
}
}
private int getRequiredVersion(String serviceType){
synchronized (futureAllocations) {
Integer val = requiredAllocationVersion.get(serviceType);
if( null == val) {
return 0;
}
return val;
}
}
private boolean canAcceptAllocation(Allocation allocation) {
int requiredVersion = getRequiredVersion(allocation.getServiceType());
if (allocation.getSystemVersion() < requiredVersion) {
log.warn("Clients are waiting for version {} of {}; So ignoring allocation version: {}",
requiredVersion, allocation.getServiceType(), allocation.getSystemVersion());
return false;
}
return true;
}
public void shutdown() {
worker.shutdown();
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | package olympus.sparta.allocation;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import olympus.sparta.PropertyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class Allocator {
private static final Logger log = LoggerFactory.getLogger(Allocator.class);
private static Allocator allocator;
public static final int BUCKET_COUNT = PropertyHandler.getInstance().getIntValue("bucket.count");
private static final int DEFAULT_VERSION = 0;
private final Map<String, SettableFuture<Allocation>> futureAllocations =
Collections.synchronizedMap(new HashMap<String, SettableFuture<Allocation>>());
private final Set<String> servicesInterested = Collections.synchronizedSet(new HashSet<String>());
private volatile boolean shutdown = false;
private final AllocationWorker worker = new AllocationWorker(this);
private final HashMap<String, Integer> requiredAllocationVersion = new HashMap<>();
private Allocator() {
worker.start();
}
public synchronized static void init() {
if (allocator == null) {
allocator = new Allocator();
}
}
public static Allocator getInstance() {
return allocator;
}
public ListenableFuture<Allocation> getAllocation(String serviceType, int version) {
ListenableFuture<Allocation> future = futureAllocations.get(serviceType);
Allocation allocation = null;
boolean requiresRefresh = false;
try {
allocation = future == null || !future.isDone() ? null : future.get();
//
// The following line was the problem;
//
// Scenario:
//
// When an allocation request for a numbered version say 43 is received,
// And if the Allocator had no previously known allocation for the service type
// And if there is a request for allocation with DEFAULT_VERSION (zero)
// is pending to be fetched by the AllocationWorker
// then the new request DOES NOT trigger refreshAllocation
//
// The future that was created to hold the previous request's response is returned.
// Whreas, in the above scenario refreshAllocation should have been triggered.
//
// If the worker fetched an allocation less than the version 43 and
// resolved the future with that version, the client would have received a stale version;
//
// This never happened to us as often because the server would have known about the
// version before the client was notified about the version and after that the client
// could request for the allocation of that version.
//
// Due to some sub-optimal logging statements introduced in recent versions, the
// AllocationWorker was taking several milliseconds to fetch and notify about an allocation.
// This surfaced the issue in this condition check.
//
requiresRefresh = future == null || version == DEFAULT_VERSION ||
(allocation != null && allocation.getSystemVersion() < version);
} catch (Exception e) {
log.error("Exception : ", e);
}
if (requiresRefresh) {
log.trace("Requires refreshing allocations of {}", serviceType);
return refreshAllocation(serviceType, version);
}
if (allocation != null) {
log.trace("Returning allocation from cache for {} version {}", serviceType, version);
} else {
log.trace("Waiting for allocation");
}
return future;
}
public ListenableFuture<Allocation> refreshAllocation(String serviceType, int requiredSystemVersion) {
if (shutdown) throw new RuntimeException("Shutdown");
synchronized (futureAllocations) {
SettableFuture<Allocation> allocation = futureAllocations.get(serviceType);
if (allocation == null || allocation.isDone()) {
allocation = SettableFuture.create();
futureAllocations.put(serviceType, allocation);
servicesInterested.add(serviceType);
}
if (requiredAllocationVersion.containsKey(serviceType) &&
requiredSystemVersion > requiredAllocationVersion.get(serviceType)) {
requiredAllocationVersion.put(serviceType, requiredSystemVersion);
}
}
worker.refreshAllocation(serviceType);
return futureAllocations.get(serviceType);
}
Set<String> getServicesInterested() {
return servicesInterested;
}
void updateAllocation(Allocation allocation) {
synchronized (futureAllocations) {
if(!canAcceptAllocation(allocation)) {
refreshAllocation(allocation.getServiceType(), getRequiredVersion(allocation.getServiceType()));
return;
}
SettableFuture<Allocation> future = futureAllocations.get(allocation.getServiceType());
if (!future.set(allocation)) {
future = SettableFuture.create();
future.set(allocation);
futureAllocations.put(allocation.getServiceType(), future);
}
if (allocation instanceof NullAllocation) {
//Don't bother about services which don't have any active instances
//until a client requests for allocation with a later version.
servicesInterested.remove(allocation.getServiceType());
}
}
}
int getRequiredVersion(String serviceType){
Integer val = requiredAllocationVersion.get(serviceType);
if( null == val) {
return 0;
}
return val;
}
boolean canAcceptAllocation(Allocation allocation) {
int requiredVersion = getRequiredVersion(allocation.getServiceType());
if (allocation.getSystemVersion() < requiredVersion) {
log.warn("Clients are waiting for version {} of {}; So ignoring allocation version: {}",
requiredVersion, allocation.getServiceType(), allocation.getSystemVersion());
return false;
}
return true;
}
public void shutdown() {
worker.shutdown();
}
}
|
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.