Commits

Hiram R. Chirino  committed b980677

Improve the replicated leveldb behavior when the number of nodes in the cluster falls below the required minimum. The master node will switch to electing mode. The master store startup will now also block until it syncs up with slaves so that we don't accept connections the master is fully online.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@149860113f79535-47bb-0310-9956-ffa450edef68

  • Participants
  • Parent commits dd69af4

Comments (0)

Files changed (11)

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala

   private val latch:CountDownLatch=new CountDownLatch(1)
   @volatile
   var value:T = _
+  var error:Throwable = _
 
   def cancel(mayInterruptIfRunning: Boolean) = false
   def isCancelled = false
     value = v
     latch.countDown()
   }
+  def failed(v:Throwable) = {
+    error = v
+    latch.countDown()
+  }
 
   def get() = {
     latch.await()
+    if( error!=null ) {
+      throw error;
+    }
     value
   }
 
   def get(p1: Long, p2: TimeUnit) = {
     if(latch.await(p1, p2)) {
+      if( error!=null ) {
+        throw error;
+      }
       value
     } else {
       throw new TimeoutException
     manager.uowCanceledCounter += 1
     canceled = true
     manager.flush_queue.remove(uowId)
-    onCompleted
+    onCompleted()
   }
 
   def getAction(id:MessageId) = {
     }
   }
 
-  def onCompleted() = this.synchronized {
+  def onCompleted(error:Throwable=null) = this.synchronized {
     if ( state.stage < UowCompleted.stage ) {
       state = UowCompleted
       if( asyncCapacityUsed != 0 ) {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
         complete_listeners.foreach(_())
       }
-      countDownFuture.set(null)
+      if( error == null ) {
+        countDownFuture.set(null)
+      } else {
+        countDownFuture.failed(error)
+      }
 
       for( (id, action) <- actions ) {
         if( !action.enqueues.isEmpty ) {
       uowStoringCounter += uows.size
       flushSource.suspend
       writeExecutor {
-        client.store(uows)
+        val e = try {
+          client.store(uows)
+          null
+        } catch {
+          case e:Throwable => e
+        }
         flushSource.resume
         dispatchQueue {
           uowStoredCounter += uows.size
           uows.foreach { uow=>
-            uow.onCompleted
+            uow.onCompleted(e)
           }
         }
       }

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala

 import java.{util=>ju}
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.atomic.AtomicBoolean
 import collection.immutable.TreeMap
 import collection.mutable.{HashMap, ListBuffer}
 import org.iq80.leveldb._
 import org.apache.activemq.leveldb.util._
 import java.util.concurrent._
 import org.fusesource.hawtbuf._
-import java.io.{ObjectInputStream, ObjectOutputStream, File}
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File}
 import scala.Option._
 import org.apache.activemq.command.{MessageAck, Message}
-import org.apache.activemq.util.ByteSequence
+import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
 import java.text.SimpleDateFormat
 import java.util.{Date, Collections}
 import org.apache.activemq.leveldb.util.TimeMetric
     }
   }
 
-  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBClient, store.isStarted, func _)
+  def might_fail[T](func : =>T):T = {
+    def handleFailure(e:IOException) = {
+      store.stop()
+      if( store.broker_service !=null ) {
+        store.broker_service.handleIOException(e);
+      }
+      throw e;
+    }
+    try {
+      func
+    } catch {
+      case e:IOException => handleFailure(e)
+      case e:Throwable => handleFailure(IOExceptionSupport.create(e))
+    }
+  }
 
   def start() = {
     init()
     replay_init()
-    retry {
+    might_fail {
       log.open()
     }
     replay_from(lastIndexSnapshotPos, log.appender_limit)
     snapshots.filterNot(_._1 == lastIndexSnapshotPos).foreach( _._2.recursiveDelete )
     tempIndexFile.recursiveDelete
 
-    retry {
+    might_fail {
       // Setup the plist index.
       plistIndexFile.recursiveDelete
       plistIndexFile.mkdirs()
   }
 
   def replay_from(from:Long, limit:Long) = {
-    retry {
+    might_fail {
       try {
         // Update the index /w what was stored on the logs..
         var pos = from;
     }
   }
 
-  def retryUsingIndex[T](func: =>T):T = retry(usingIndex( func ))
+  def might_fail_using_index[T](func: =>T):T = might_fail(usingIndex( func ))
 
   /**
    * TODO: expose this via management APIs, handy if you want to
       log.close
       locked_purge
     } finally {
-      retry {
+      might_fail {
         log.open()
       }
       resume()
   def addCollection(record: CollectionRecord.Buffer) = {
     val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
     val value = record.toUnframedBuffer
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         appender.append(LOG_ADD_COLLECTION, value)
         index.put(key, value.toByteArray)
 
   def listCollections: Seq[(Long, CollectionRecord.Buffer)] = {
     val rc = ListBuffer[(Long, CollectionRecord.Buffer)]()
-    retryUsingIndex {
+    might_fail_using_index {
       val ro = new ReadOptions
       ro.verifyChecksums(verifyChecksums)
       ro.fillCache(false)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
     collectionMeta.remove(collectionKey)
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
       }
     meta.size = 0
     meta.last_key = null
     
-    retryUsingIndex {
+    might_fail_using_index {
       index.get(key).foreach { collectionData =>
         log.appender { appender =>
           appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
   }
 
   def getAckPosition(subKey: Long): Long = {
-    retryUsingIndex {
+    might_fail_using_index {
       index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
         val record = decodeEntryRecord(value)
         record.getValueLocation()
     ro.verifyChecksums(verifyChecksums)
     val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
     val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
-    retryUsingIndex {
+    might_fail_using_index {
       index.cursorRange(start, end, ro) { case (key, value) =>
         func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
       }
   def collectionIsEmpty(collectionKey: Long) = {
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
     var empty = true
-    retryUsingIndex {
+    might_fail_using_index {
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(verifyChecksums)
   val max_index_write_latency = TimeMetric()
 
   def store(uows: Array[DelayableUOW]) {
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
           write_uows(uows, appender, batch)
     val ro = new ReadOptions
     ro.verifyChecksums(verifyChecksums)
     ro.fillCache(true)
-    retryUsingIndex {
+    might_fail_using_index {
       index.snapshot { snapshot =>
         ro.snapshot(snapshot)
         val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, firstSeq)
 
     // Delete message refs for topics who's consumers have advanced..
     if( !topicPositions.isEmpty ) {
-      retryUsingIndex {
+      might_fail_using_index {
         index.write(new WriteOptions, max_index_write_latency) { batch =>
           for( (topic, first) <- topicPositions ) {
             val ro = new ReadOptions
   def removePlist(collectionKey: Long) = {
     val entryKeyPrefix = encodeLong(collectionKey)
     collectionMeta.remove(collectionKey)
-    retry {
+    might_fail {
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(false)

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

 import collection.mutable.ListBuffer
 import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
 import org.apache.activemq.util._
-import org.apache.activemq.leveldb.util.{RetrySupport, Log}
+import org.apache.activemq.leveldb.util.Log
 import org.apache.activemq.store.PList.PListIterator
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
     BrokerMBeanSupport.createPersistenceAdapterName(brokerON.toString, this.toString)
   }
 
-  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
-
   var snappyCompressLogs = false
 
   def doStart: Unit = {

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala

     override def open = new RandomAccessFile(file, "rw")
 
     override def dispose() = {
-      force
+      flush
       super.dispose()
     }
 

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala

   def start_master(func: (Int) => Unit) = {
     assert(master==null)
     master = create_master()
+    master_started.set(true)
     master.blocking_executor.execute(^{
-      master_started.set(true)
       master.start();
       master_started_latch.countDown()
+    })
+    master.blocking_executor.execute(^{
       func(master.getPort)
     })
   }

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala

     def disconnected = changed
 
     def changed:Unit = elector.synchronized {
-      // info(eid+" cluster state changed: "+members)
+//      info(eid+" cluster state changed: "+members)
       if (isMaster) {
         // We are the master elector, we will choose which node will startup the MasterLevelDBStore
         members.get(store.brokerName) match {
           case Some(members) =>
 
             if (members.size < store.clusterSizeQuorum) {
-              info("Not enough cluster members connected to elect a new master.")
+              info("Not enough cluster members connected to elect a master.")
+              elected = null
             } else {
 
               // If we already elected a master, lets make sure he is still online..

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala

     unstash(directory)
     super.doStart
     start_protocol_server
+    // Lets not complete the startup until at least one slave is synced up.
+    wal_sync_to(wal_append_position)
   }
 
   override def doStop(stopper: ServiceStopper): Unit = {
   // Replication Protocol Stuff
   //////////////////////////////////////
   var transport_server:TransportServer = _
+  val start_latch = new CountDownLatch(1)
 
   def start_protocol_server = {
     transport_server = new TcpTransportServer(new URI(bind))
         warn(error)
       }
     })
-    val start_latch = new CountDownLatch(1)
     transport_server.start(^{
       start_latch.countDown()
     })
     start_latch.await()
   }
 
-  def getPort = transport_server.getSocketAddress.asInstanceOf[InetSocketAddress].getPort
+  def getPort = {
+    start_latch.await()
+    transport_server.getSocketAddress.asInstanceOf[InetSocketAddress].getPort
+  }
 
   def stop_protocol_server = {
     transport_server.stop(NOOP)
     if( minSlaveAcks<1 || (syncToMask & SYNC_TO_REMOTE)==0) {
       return
     }
+
+    if( isStopped ) {
+      throw new IllegalStateException("Store replication stopped")
+    }
+
     val position_sync = new PositionSync(position, minSlaveAcks)
     this.position_sync = position_sync
     for( slave <- slaves.values() ) {
     }
 
     while( !position_sync.await(1, TimeUnit.SECONDS) ) {
+      if( isStopped ) {
+        throw new IllegalStateException("Store replication stopped")
+      }
       val status = slaves.values().map(_.status).mkString(", ")
       warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
     }

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala

 
       session.request_then(DISCONNECT_ACTION, null) { body =>
         // Ok we are now caught up.
-        status = "Synchronize"
+        status = "Synchronized"
         info(status)
         stash_clear(directory) // we don't need the stash anymore.
         transport.stop(NOOP)

File activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala

-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.leveldb.util
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object RetrySupport {
-
-  def retry[T](log:Log, isStarted: ()=>Boolean, func: ()=>T): T = {
-    import log._
-    var error:Throwable = null
-    var rc:Option[T] = None
-
-    // We will loop until the tx succeeds.  Perhaps it's
-    // failing due to a temporary condition like low disk space.
-    while(!rc.isDefined) {
-
-      try {
-        rc = Some(func())
-      } catch {
-        case e:Throwable =>
-          if( error==null ) {
-            warn(e, "DB operation failed. (entering recovery mode)")
-          }
-          error = e
-      }
-
-      if (!rc.isDefined) {
-        // We may need to give up if the store is being stopped.
-        if ( !isStarted() ) {
-          throw error
-        }
-        Thread.sleep(1000)
-      }
-    }
-
-    if( error!=null ) {
-      info("DB recovered from failure.")
-    }
-    rc.get
-  }
-
-}

File activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java

  */
 package org.apache.activemq.leveldb.test;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.leveldb.CountDownFuture;
 import org.apache.activemq.leveldb.LevelDBStore;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
         FileSupport.toRichFile(masterDir).recursiveDelete();
         FileSupport.toRichFile(slaveDir).recursiveDelete();
 
-        MasterLevelDBStore master = createMaster(masterDir);
+        final MasterLevelDBStore master = createMaster(masterDir);
         master.setReplicas(2);
-        master.start();
+        CountDownFuture masterStartLatch = asyncStart(master);
 
-        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
-
-        // Updating the store should not complete since we don't have enough
+        // Start the store should not complete since we don't have enough
         // replicas.
-        CountDownFuture f = asyncAddMessage(ms, "m1");
-        assertFalse(f.await(2, TimeUnit.SECONDS));
+        assertFalse(masterStartLatch.await(2, TimeUnit.SECONDS));
 
-        // Adding a slave should allow that update to complete.
+        // Adding a slave should allow the master startup to complete.
         SlaveLevelDBStore slave = createSlave(master, slaveDir);
         slave.start();
 
-        assertTrue(f.await(2, TimeUnit.SECONDS));
+        assertTrue(masterStartLatch.await(2, TimeUnit.SECONDS));
 
         // New updates should complete quickly now..
-        f = asyncAddMessage(ms, "m2");
+        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        CountDownFuture f = asyncAddMessage(ms, "m1");
         assertTrue(f.await(1, TimeUnit.SECONDS));
 
         // If the slave goes offline, then updates should once again
         // not complete.
         slave.stop();
 
-        f = asyncAddMessage(ms, "m3");
+        f = asyncAddMessage(ms, "m2");
         assertFalse(f.await(2, TimeUnit.SECONDS));
 
         // Restart and the op should complete.
         return f;
     }
 
+    private CountDownFuture asyncStart(final Service service) {
+        final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+        LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+            public void run() {
+                try {
+                    service.start();
+                    f.set(null);
+                } catch (Throwable e) {
+                    f.set(e);
+                }
+            }
+        });
+        return f;
+    }
 
     @Test(timeout = 1000*60*60)
     public void testReplication() throws Exception {
         for (int j = 0; j < 10; j++) {
 
             MasterLevelDBStore master = createMaster(directories.get(0));
-            master.start();
+            CountDownFuture masterStart = asyncStart(master);
             SlaveLevelDBStore slave1 = createSlave(master, directories.get(1));
             SlaveLevelDBStore slave2 = createSlave(master, directories.get(2));
-            slave2.start();
+            asyncStart(slave2);
+            masterStart.await();
 
             LOG.info("Adding messages...");
             MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));

File activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala

   def remoteIndexPath = new Path(dfsDirectory, "index")
 
   override def start() = {
-    retry {
+    might_fail {
       directory.mkdirs()
       dfs.mkdirs(dfsDirectory)
       downloadLogFiles