eengbrec / Scala Enhancements

In-progress enhancements to the Scala standard library, mostly centered around actors.

commit 82: 14e33a6ea1f8
parent 70: 2af7041fc9f9
branch: message_queue_patch
message queue patch
eengbrec
11 months ago
Scala Enhancements / src / actors / scala / actors / FJTaskScheduler2.scala
    #   Introduced
1
0557e593c1d4
/*                     __                                               *\
2
0557e593c1d4
**     ________ ___   / /  ___     Scala API                            **
3
20806b7d8dfb
**    / __/ __// _ | / /  / _ |    (c) 2005-2009, LAMP/EPFL             **
4
0557e593c1d4
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
5
0557e593c1d4
** /____/\___/_/ |_/____/_/ | |                                         **
6
0557e593c1d4
**                          |/                                          **
7
0557e593c1d4
\*                                                                      */
8
0557e593c1d4
9
5d8cc1bfea81
// $Id: FJTaskScheduler2.scala 17541 2009-04-21 09:55:52Z phaller $
10
0557e593c1d4
11
0557e593c1d4
package scala.actors
12
0557e593c1d4
13
0557e593c1d4
import compat.Platform
14
0557e593c1d4
15
0557e593c1d4
import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
16
6c3ef4102f41
import java.lang.Thread.State
17
0557e593c1d4
18
0557e593c1d4
import scala.collection.Set
19
0557e593c1d4
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
20
0557e593c1d4
21
0557e593c1d4
/**
22
0557e593c1d4
 * FJTaskScheduler2
23
0557e593c1d4
 *
24
0557e593c1d4
 * @version 0.9.18
25
0557e593c1d4
 * @author Philipp Haller
26
0557e593c1d4
 */
27
595271ba8300
class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
28
595271ba8300
  setDaemon(daemon)
29
595271ba8300
30
5d8cc1bfea81
  /** Default constructor creates a non-daemon thread. */
31
595271ba8300
  def this() =
32
595271ba8300
    this(false)
33
0557e593c1d4
34
0557e593c1d4
  var printStats = false
35
0557e593c1d4
36
0557e593c1d4
  val rt = Runtime.getRuntime()
37
0557e593c1d4
  val minNumThreads = 4
38
0557e593c1d4
39
5d8cc1bfea81
  /** The value of the actors.corePoolSize JVM property. This property
40
5d8cc1bfea81
   *  determines the initial thread pool size.
41
5d8cc1bfea81
   */
42
0557e593c1d4
  val coreProp = try {
43
0557e593c1d4
    System.getProperty("actors.corePoolSize")
44
0557e593c1d4
  } catch {
45
0557e593c1d4
    case ace: java.security.AccessControlException =>
46
0557e593c1d4
      null
47
0557e593c1d4
  }
48
0557e593c1d4
  val maxProp =
49
0557e593c1d4
    try {
50
0557e593c1d4
      System.getProperty("actors.maxPoolSize")
51
0557e593c1d4
    } catch {
52
0557e593c1d4
      case ace: java.security.AccessControlException =>
53
0557e593c1d4
        null
54
0557e593c1d4
    }
55
0557e593c1d4
56
0557e593c1d4
  val initCoreSize =
57
0557e593c1d4
    if (null ne coreProp) Integer.parseInt(coreProp)
58
0557e593c1d4
    else {
59
0557e593c1d4
      val numCores = rt.availableProcessors()
60
0557e593c1d4
      if (2 * numCores > minNumThreads)
61
0557e593c1d4
        2 * numCores
62
0557e593c1d4
      else
63
0557e593c1d4
        minNumThreads
64
0557e593c1d4
    }
65
0557e593c1d4
66
0557e593c1d4
  val maxSize =
67
0557e593c1d4
    if (null ne maxProp) Integer.parseInt(maxProp)
68
0557e593c1d4
    else 256
69
0557e593c1d4
70
0557e593c1d4
  private var coreSize = initCoreSize
71
0557e593c1d4
72
0557e593c1d4
  private val executor =
73
0557e593c1d4
    new FJTaskRunnerGroup(coreSize)
74
0557e593c1d4
75
20806b7d8dfb
  /** The <code>ActorGC</code> instance that keeps track of the
76
20806b7d8dfb
   *  live actor objects that are managed by <code>this</code>
77
20806b7d8dfb
   *  scheduler.
78
20806b7d8dfb
   */
79
20806b7d8dfb
  val actorGC = new ActorGC
80
20806b7d8dfb
81
0557e593c1d4
  private var terminating = false
82
0557e593c1d4
  private var suspending = false
83
0557e593c1d4
84
0557e593c1d4
  private var submittedTasks = 0
85
0557e593c1d4
86
0557e593c1d4
  def printActorDump {}
87
0557e593c1d4
88
0557e593c1d4
  private val CHECK_FREQ = 100
89
0557e593c1d4
90
0557e593c1d4
  def onLockup(handler: () => Unit) =
91
0557e593c1d4
    lockupHandler = handler
92
0557e593c1d4
93
0557e593c1d4
  def onLockup(millis: Int)(handler: () => Unit) = {
94
0557e593c1d4
    //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
95
0557e593c1d4
    lockupHandler = handler
96
0557e593c1d4
  }
97
0557e593c1d4
98
0557e593c1d4
  private var lockupHandler: () => Unit = null
99
0557e593c1d4
100
6c3ef4102f41
  private def allWorkersBlocked: Boolean =
101
6c3ef4102f41
    executor.threads.forall(t => {
102
6c3ef4102f41
      val s = t.getState()
103
6c3ef4102f41
      s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
104
6c3ef4102f41
    })
105
6c3ef4102f41
106
0557e593c1d4
  override def run() {
107
0557e593c1d4
    try {
108
0557e593c1d4
      while (!terminating) {
109
0557e593c1d4
        this.synchronized {
110
0557e593c1d4
          try {
111
0557e593c1d4
            wait(CHECK_FREQ)
112
0557e593c1d4
          } catch {
113
0557e593c1d4
            case _: InterruptedException =>
114
0557e593c1d4
              if (terminating) throw new QuitException
115
0557e593c1d4
          }
116
0557e593c1d4
117
0557e593c1d4
          if (!suspending) {
118
0557e593c1d4
119
20806b7d8dfb
            actorGC.gc()
120
0557e593c1d4
121
0557e593c1d4
            // check if we need more threads
122
6c3ef4102f41
            if (coreSize < maxSize
123
6c3ef4102f41
                && allWorkersBlocked
124
0557e593c1d4
                && executor.checkPoolSize()) {
125
0557e593c1d4
                  //Debug.info(this+": increasing thread pool size")
126
0557e593c1d4
                  coreSize += 1
127
0557e593c1d4
                }
128
0557e593c1d4
            else {
129
20806b7d8dfb
              if (actorGC.allTerminated) {
130
0557e593c1d4
                // if all worker threads idle terminate
131
0557e593c1d4
                if (executor.getActiveCount() == 0) {
132
0557e593c1d4
                  Debug.info(this+": initiating shutdown...")
133
0557e593c1d4
134
0557e593c1d4
                  // Note that we don't have to shutdown
135
0557e593c1d4
                  // the FJTaskRunnerGroup since there is
136
0557e593c1d4
                  // no separate thread associated with it,
137
0557e593c1d4
                  // and FJTaskRunner threads have daemon status.
138
0557e593c1d4
                  throw new QuitException
139
0557e593c1d4
                }
140
0557e593c1d4
              }
141
0557e593c1d4
            }
142
0557e593c1d4
          }
143
0557e593c1d4
        } // sync
144
0557e593c1d4
145
0557e593c1d4
      } // while (!terminating)
146
0557e593c1d4
    } catch {
147
0557e593c1d4
      case _: QuitException =>
148
0557e593c1d4
        // allow thread to exit
149
0557e593c1d4
        if (printStats) executor.stats()
150
0557e593c1d4
    }
151
0557e593c1d4
  }
152
0557e593c1d4
153
0557e593c1d4
  /**
154
0557e593c1d4
   *  @param  task the task to be executed
155
0557e593c1d4
   */
156
0557e593c1d4
  def execute(task: Runnable): Unit =
157
0557e593c1d4
    executor execute task
158
0557e593c1d4
159
0557e593c1d4
  def execute(fun: => Unit): Unit =
160
0557e593c1d4
    executor.execute(new Runnable {
161
0557e593c1d4
      def run() { fun }
162
0557e593c1d4
    })
163
0557e593c1d4
164
0557e593c1d4
  /** Shuts down all idle worker threads.
165
0557e593c1d4
   */
166
0557e593c1d4
  def shutdown(): Unit = synchronized {
167
0557e593c1d4
    terminating = true
168
0557e593c1d4
  }
169
0557e593c1d4
170
0557e593c1d4
  def snapshot(): LinkedQueue = {
171
0557e593c1d4
    suspending = true
172
0557e593c1d4
    executor.snapshot()
173
0557e593c1d4
  }
174
0557e593c1d4
175
0557e593c1d4
}