eengbrec / Scala Enhancements
In-progress enhancements to the Scala standard library, mostly centered around actors.
Clone this repository (size: 130.0 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/eengbrec/scala-enhancements/
| commit 82: | 14e33a6ea1f8 |
| parent 70: | 2af7041fc9f9 |
| branch: | message_queue_patch |
message queue patch
9 months ago
| r82:14e33a6ea1f8 | 175 loc | 4.5 KB | embed / history / annotate / raw / |
|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | /* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
// $Id: FJTaskScheduler2.scala 17541 2009-04-21 09:55:52Z phaller $
package scala.actors
import compat.Platform
import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
import java.lang.Thread.State
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
/**
* FJTaskScheduler2
*
* @version 0.9.18
* @author Philipp Haller
*/
class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
setDaemon(daemon)
/** Default constructor creates a non-daemon thread. */
def this() =
this(false)
var printStats = false
val rt = Runtime.getRuntime()
val minNumThreads = 4
/** The value of the actors.corePoolSize JVM property. This property
* determines the initial thread pool size.
*/
val coreProp = try {
System.getProperty("actors.corePoolSize")
} catch {
case ace: java.security.AccessControlException =>
null
}
val maxProp =
try {
System.getProperty("actors.maxPoolSize")
} catch {
case ace: java.security.AccessControlException =>
null
}
val initCoreSize =
if (null ne coreProp) Integer.parseInt(coreProp)
else {
val numCores = rt.availableProcessors()
if (2 * numCores > minNumThreads)
2 * numCores
else
minNumThreads
}
val maxSize =
if (null ne maxProp) Integer.parseInt(maxProp)
else 256
private var coreSize = initCoreSize
private val executor =
new FJTaskRunnerGroup(coreSize)
/** The <code>ActorGC</code> instance that keeps track of the
* live actor objects that are managed by <code>this</code>
* scheduler.
*/
val actorGC = new ActorGC
private var terminating = false
private var suspending = false
private var submittedTasks = 0
def printActorDump {}
private val CHECK_FREQ = 100
def onLockup(handler: () => Unit) =
lockupHandler = handler
def onLockup(millis: Int)(handler: () => Unit) = {
//LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
lockupHandler = handler
}
private var lockupHandler: () => Unit = null
private def allWorkersBlocked: Boolean =
executor.threads.forall(t => {
val s = t.getState()
s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
})
override def run() {
try {
while (!terminating) {
this.synchronized {
try {
wait(CHECK_FREQ)
} catch {
case _: InterruptedException =>
if (terminating) throw new QuitException
}
if (!suspending) {
actorGC.gc()
// check if we need more threads
if (coreSize < maxSize
&& allWorkersBlocked
&& executor.checkPoolSize()) {
//Debug.info(this+": increasing thread pool size")
coreSize += 1
}
else {
if (actorGC.allTerminated) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
Debug.info(this+": initiating shutdown...")
// Note that we don't have to shutdown
// the FJTaskRunnerGroup since there is
// no separate thread associated with it,
// and FJTaskRunner threads have daemon status.
throw new QuitException
}
}
}
}
} // sync
} // while (!terminating)
} catch {
case _: QuitException =>
// allow thread to exit
if (printStats) executor.stats()
}
}
/**
* @param task the task to be executed
*/
def execute(task: Runnable): Unit =
executor execute task
def execute(fun: => Unit): Unit =
executor.execute(new Runnable {
def run() { fun }
})
/** Shuts down all idle worker threads.
*/
def shutdown(): Unit = synchronized {
terminating = true
}
def snapshot(): LinkedQueue = {
suspending = true
executor.snapshot()
}
}
|
