/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2009, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scalax.actors

import scala.actors.{ BaseActor, IScheduler, Channel, MessageQueue, Message,
                      OutputChannel, Signal, BasicMessageQueue,
                      ReturnControl, ActorExited, ActorSuspended, Future,
                      AbstractActor, Actor, Reaction, TIMEOUT, Scheduler, Exit }

import java.util.TimerTask


@serializable
trait StateActor extends BaseActor {

  /**
   * Base trait for exceptions thrown by actors, provides traceability back to
   * the actor that threw the exception and the thread on which it was running.
   * Note that the instance variables capturing the actor's state when the
   * exception was constructed are initialized without requesting a lock on
   * the actor.  If they are initialized when a lock is not held, then it is
   * possible that they have been initialized to different values than what
   * they were when the error occured, or even into an inconsistent set.  Always
   * check the value of <code>heldLock</code> to see if the lock was held so
   * that the snapshot of the state can be considered to be consistent.
   */
  trait ActorException extends Exception {
    /**
     * The actor that threw the exception
     */
    final def actor = StateActor.this
    /**
     * The thread on which the actor was running when the exceptionwas thrown
     */
    final val thread = Thread.currentThread
    /**
     * The state of the actor when the exception was thrown.
     */
    final val state = _state
    /**
     * The contents of the mailbox when the exception was initialized.
     */
    final val mailbox = StateActor.this.mailbox.foldLeft[List[Any]](Nil)((z, e) => e :: z).reverse
    /**
     * true if the lock was held when this exception was initialized, false otherwise
     */
    final val heldLock = lockIsHeld
    /**
     * The session stack at the time this exception was constructed.
     */
    final val sessions = StateActor.this.sessions
    /**
     * The value of <code>trapExit</code> at the time this exception was constructed.
     */
    final val trapExit = StateActor.this.trapExit
  }

  //class StartReactLoop(val cond: () => Boolean, val pf: PartialFunction[Any, Unit]) extends Signal

  class UnexpectedActorState(msg: String, cause: Throwable)
        extends IllegalStateException(msg, cause) with ActorException {
    def this(msg: String) = this(msg, null)
  }

  class InvalidStateForOperation(msg: String, val badState: ActorState)
    extends IllegalStateException(msg + "\n\t" + badState, null) with ActorException

  class InvalidStateTransition(msg: String) extends IllegalStateException(msg, null) with ActorException

  /**
   * Lock used for synchronization and signalling of this actor
   * Using a lock object instead of normal synchronization allows for
   * simpler locking/unlocking abstractions while providing fine-grained
   * control.  ReentrantLocks also provide better performance under contention.
   * @todo refactor lock, withLock, and withoutLock into an object
   */
  private val lock = new java.util.concurrent.locks.ReentrantLock()
  final protected def lockIsHeld = lock.isHeldByCurrentThread
  /**
   * condition used to signal that a message matching a current wait is available
   * on the queue
   */
  final private val msgAvailCond = lock.newCondition()
  /**
   * Perform <code>body</code> while holding the lock.
   * <p>If the lock is not already held by the current thread, it will be acquired
   * before executing <code>body</code>, and then released when <code>body</code>
   * completes.  It should be used in the following pattern:</p>
   * <code>
   *    def foo(): Unit = withLock {
   *       // ...perform some actions requiring a lock
   *       withoutLock {
   *         // ...perform some actions where lock should not be held
   *       }
   *       // ...perform some more actions that require the lock
   *    }
   * </code>
   * <p>This allows much more flexible locking/unlocking patterns while preserving
   * the safety of lexically synchronization.</p>
   */
  final protected def withLock[R](body: => R): R = {
    val alreadyHolds = lockIsHeld
    if (!alreadyHolds) lock.lock()  // only acquire if not already held
    val r = try {
      body
    } finally {
      if (!alreadyHolds) lock.unlock() // only release if acquired in this invokation
    }
    r
  }
  final protected def withoutLock[R](body: => R): R = {
    val alreadyHolds = lock.isHeldByCurrentThread
    if (alreadyHolds) lock.unlock() // only unlock if the lock is currently held
    val r = try {
      body
    } finally {
      if (alreadyHolds) lock.lock() // only relock if lock was held at invokation
    }
    r
  }

  @volatile var debugEnabled = false
  var msgCnt = 1
  protected def debug(msg: String) {
    if (debugEnabled) withLock {
      Console.synchronized {
        print(msgCnt)
        print("  ")
        print(this)
        print("\t")
        print(Thread.currentThread)
        print("\t")
        println(msg)
        Console.flush()
        msgCnt += 1
      }
    }
  }

  //TODO: a lot of the code that handles waiting for a message could be refactored
  //      inthe the MessageQueue class.
  //TODO: remove [actors]
  /*protected[actors]*/ val mailbox = new BasicMessageQueue[Message[Any]]  //TODO: restrict access to mailbox

  protected def availableMessage(p: Any => Boolean): Option[Message[Any]] = withLock {
    mailbox.extractFirst((m) => p(m.content))
  }

  /**
   * Process the first available message matching <code>p</code> and return the <code>Some(result)</code>.
   * If there is no available message matcing <code>p</code>, return <code>None</code>
   * If the lock is currently held, then <code>op</code> will be executed with
   * the lock still held.  If it is not held, then the lock will be released prior
   * to executing <code>op</code>.
   * The actor must be running when it invokes this method.
   * @param p the predicate to use in matching a message
   * @param op the operation to perform on the message if it is available
   */
  protected def withAvailableMessage[R](p: Any => Boolean)(op: Any => R): Option[R] = {
    withAvailableMessage(p, op, !lockIsHeld)
  }

  protected def withAvailableMessage[R](p: Any => Boolean, op: Any => R, releaseLock: Boolean): Option[R] = withLock {
//    assume(state.isRunning, "actor must be running in order to process a message")
    availableMessage(p) match {
      case None => None
      case Some(msg) => Some(processMessage(msg, op, releaseLock))
    }
  }

  protected def processMessage[R](msg: Message[Any], op: Any => R): R = {
    processMessage(msg, op, !lockIsHeld)
  }

  protected def processMessage[R](msg: Message[Any], op: Any => R, releaseLock: Boolean): R = {
    withLock {
//      assume(state.isRunning, "actor must be running in order to process a message")
      sessions = msg.sender :: sessions
      try {
        if (releaseLock) withoutLock(op(msg.content)) else op(msg.content)
      } finally {
        sessions = sessions.tail
      }
    }
  }

  private var sessions: List[OutputChannel[Any]] = Nil

  /**
   * the current state of the actor, should never be directly accessed
   * access using <code>state</code> and <code>state_=</code> intead.
   */
  private var _state: ActorState = initialState
  /**
   * Defaults to <code>NotStarted</code>.  Derived classes can override this
   * method in order to specify a different initial state.  This method
   * will only be invoked once per actor upon construction.  It should not access
   * any instance variables within the actor unless they were explicitly
   * initialized in an early initialization block.
   */
  protected def initialState: ActorState = NotStarted

  /**
   * the current state of the actor, lock must be held before accessing because
   * if the lock is not held, then the state is quite likely to spontaneously
   * change.
   */
  def state: ActorState = {
    assume(lockIsHeld, "lock must be held in order to access state")
    _state
  }
  /**
   * Change the state of the actor to <code>s</code>. The lock must be held.
   * The state change is performed in the following order:
   * <ol>
   *   <li>Perform exit actions and checks on the current state, including
   *       that the transition is valid.</li>
   *   <li>Update the state of the actor<li>
   *   <li>Perform any entry actions and checks for the new state, including
   *       validating that it was entered from an appropriate state</li>
   * </ol>
   * @param s the new state for the actor, must be a valid state transition
   */
  protected def state_=(s: ActorState): Unit = {
    val oldState = _state
    try {
      assert(lockIsHeld, "current thread must hold lock on this to modify state")
      debug("state change from " + _state + " to " + s)
      _state.exitTo(s)
      _state = s
      s.enter()
    } catch {
      case t: Throwable if !t.isInstanceOf[Signal] => {
        t.printStackTrace()
        val te = new TransitionError(t, oldState, s)
        //TODO: this could be a problem...
        //if (!s.isInstanceOf[Terminated]) terminate(te) else _state = te
        _state = te
        throw te
      }
    }
  }

  final protected def controlLoop(thread: Thread, ap: ActionPending): Unit = {
    try {
//      debug("performing action in controlLoop")
//      assert(state eq ap)
      ap.performAction()
    } catch {
      case ReturnControl =>
      case ActorSuspended => // can't assert state.isSuspended here because state may have changed
                                 // while the exception propogated up the stack
      case ActorExited => //assert(state.hasFinished, "ActorExited signal received by not in finished state")
      case te: TransitionError => //assert(state.isInstanceOf[TransitionError])
    }
    val next: ActionPending = state match {
      case nap: ActionPending if ((nap ne ap) || ap.repeat) && nap.immediate => {
        // if we have a new ActionPending state, or the old one wants to have
        // its action repeated, continue control loop
//        debug("restarting control loop")
        //controlLoop(thread, nap)  // bug #1672 makes this a bad idea
        nap
      }
      case nap: ActionPending if (nap eq ap) && !ap.repeat => {
        // no state change and action not supposed to be repeated, so terminate
  //       debug("New state " + state + ": " + (nap ne ap) + " Repeat: " + ap.repeat + " Immediate: " + nap.immediate)
        // this is the case where we had no state change and the ActionPending should not repeat
        state = new Finished()
        null
      }
      case r: Running => {
        // the actor finished running and has no action to perform
        //debug("Repear")
//        debug(state.toString)
        state = new Finished()
        null
      }
      case t: Terminated => null // noop, already terminated
      case _ => null //debug("exiting control loop in state: " + state)
    }
    if (next ne null) controlLoop(thread, next)
  }


  /**
   *
   */
  trait ActorState {
    /**
     * true if the actor is waiting to receive a message
     */
    def isWaiting: Boolean
    /** true if the actor is not attached to a thread */
    def isDetached: Boolean
    /** true if the actor is attached to a thread */
    def isAttached: Boolean
    /** the actor has started and has not terminated, but may or may not be running */
    def isActive: Boolean
    /** the actor has started, has not terminated, and is currently executing on a thread */
    def isRunning: Boolean
    /**
     * The actor is suspended.
     * An actor is suspended when it is both active and detached.  This usually
     * means that it is either waiting for a message or waiting to be scheduled
     * on a thread.
     */
    final def isSuspended: Boolean = isDetached && isActive
    def hasStarted: Boolean
    def hasFinished: Boolean
    def hasTerminated: Boolean
    /**
     * Check that this state is the current state.
     * Requires the lock to be held.
     * @param throwException if true, and exception will be thrown for invalid state
     * @return true if the actor is currently in this state, false if not
     * @throws UnexpectedActorState if this state is not the current state and throwException is true
     */
    def stateIsValid(throwException: Boolean): Boolean = {
      if (state ne this) { // access to state will check that lock is held
        if (throwException) {
          val msg = "Unexpected state " + state + ", expected: " + this
          throw new UnexpectedActorState(msg)
        }
        false
      } else true
    }

    def enter(): Unit
    def exitTo(s: ActorState): Unit

    def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      // the lock should be acquired prior accessing the state of the actor
//      assume(lockIsHeld, "lock on Actor.this must already be held")
//      assume(stateIsValid(true))
      //debug("receiving message and appending to mailbox: " + msg)
      mailbox.append(Message(msg, replyTo))
    }

// Sometimes it's handy for debugging to know where a state was created
//    final val enterTrace: List[StackTraceElement] = if (!debugEnabled) Nil else {
//      val trace = Thread.currentThread.getStackTrace()
//      trace.take(10).toList
//    }
  }

  object NotStarted extends ActorState {
    def isWaiting = false
    def isAttached = false
    def isDetached = true
    def isActive = false
    def hasStarted = false
    def hasFinished = false
    def isRunning = false
    def hasTerminated = false

    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[StartScheduled] || s.isInstanceOf[InternalError]
//             || s.isInstanceOf[TransitionError])
    }
    def enter(): Unit = {
      throw new IllegalStateException("NotStarted should never be entered")
    }

    def start(): Active = withLock {
//      assume(stateIsValid(true))
      //debug("starting actor")
      val r = new StartScheduled
      state = r
      r
    }
    override def toString = "NotStarted"
  }

  /**
   * A state that represents an active actor.
   * @todo move sessions into the Active state
   *       Moving sessions into the Active state will be a challenge because in
   *       order to allow the user of <code>sender</code> in <code>isDefinedAct</code>
   *       sessions have to be manipulated independently of state changes so that
   *       the presence of a session can be faked while a message is being checked
   *       to see if it matches.
   */
  trait Active extends ActorState {
    final def isActive = true
    final def hasStarted = true
    final def hasFinished = false
    final def hasTerminated = false
    final def kill(): Unit = kill('normal)
    def kill(reason: Any): Unit
  }

  /**
   * An an Active actor that is detached from a thread while it waits for something
   * to happen.  Usually it is either waiting to receive a message or waiting
   * for the scheduler to give it a thread on which to execute.
   */
  trait Suspended extends Active {
    final def isDetached = true
    final def isAttached = false
  }

  trait ActionPending extends Active {
    /** an action to perform for this state with the action loop */
    def performAction(): Unit
    /**
     * true if the action should be repeated if the state doesn't change,
     * false if after the action has completed and there has not been a state
     * change the actor should enter the <code>Finished</code> state.
     */
    def repeat: Boolean

    def immediate: Boolean
  }


  /**
   * The actor is detached and waiting for a action to be executed
   * States that represent an actor that is waiting for a thread (e.g. <code>StartScheduled</code>
   * and <code>ContinuationScheduled</code>) should extend this class.
   * @todo this eliminates the need for the <code>Reaction</code> class
   * @todo refactor actor shutdown-on-error handling
   */
  abstract class ActionScheduled extends Reaction with Suspended with ActionPending {

    val a = StateActor.this

    final def immediate = false
    def repeat = false

    final def isRunning = false

    final def isWaiting = false

    final def enter(): Unit = {
      derivedEnter()
      scheduler.execute(this)
    }

    final def exitTo(s: ActorState): Unit = {
      derivedExitTo(s)
      //assert(hasRun || s.hasTerminated, "reaction cannot transition to non-terminal state until is has run")
    }

    protected def derivedEnter(): Unit
    protected def derivedExitTo(s: ActorState): Unit

    //TODO: submit a bug report for this because the access in run() results in
    //      an IllegalAccessError
    /*private[this]*/ var hasRun = false

    /**
     * Run the scheduled reaction
     */
    protected def performReaction(): Unit = {
      withLock {
//        assert(!hasRun, "reaction has already ran")
        //TODO: include current thread in debug output
        //debug("peforming reaction " + this + " on thread " + Thread.currentThread)
        hasRun = true
        state match {
          case state: Killed => () // actor has been killed, do nothing
          case _ => {
            //val runningState = new Running(currentThread)
            try {
//              assert(stateIsValid(true)) // perform assertion within try block so if fails shutdown will occur
              controlLoop(Thread.currentThread, this)
            } catch {
              case ie: InterruptedException => {
                Thread.interrupted() // clear the interrupted status of this thread
                state match {
                  // the exception was (most likely) thrown as part of the actor kill process,
                  // or if it wasn't the actor is supposed to be killed anyway, so
                  // finish the job
                  case k: Killing => {
                    state = new Killed(k.reason)
                  }
                  case _ => {
                    ie.printStackTrace()
                    state = new UnhandledThrowable(ie)
                  }
                }
              }
              //TODO: is catching all throwables too much?  Errors like assertions
              case t: Throwable => {
                println("unhandled throwable caught in reaction, current state: " + state)
                t.printStackTrace()
                state match {
                  case rs: Running =>  {
                    // there was an unhandled Exception within the actor, exit
                    //TODO: it should be possible to determine if the exception orginated
                    //      in actor code or user code
                    t.printStackTrace() //TODO: find a better way to log the stack trace
                    state = new UnhandledThrowable(t)
                  }
                  case _ => {
                    val uas =  new UnexpectedActorState("Unexpected state: " + state, t)
                    val ie = new InternalError(uas)
                    //TODO: this looks like a dangerous hack...figure out if it is ok
                    if (!state.hasTerminated) state = ie else _state = ie
                  }
                }
              }
            }
          }
        }
      } // withLock
      //The SingleThreadedScheduler still holds the lock when the reaction finishes
      //because the actor runs on the thread that started it rather than a different
      //one.
      //assert(!lockIsHeld, "when reaction is finished lock should no longer be held")
    }
//    /**
//     * Implementing classes should provide an implementation for this method.
//     * <code>performReaction</code> will be invoked with the lock held, so if
//     * any part of this method should be executed without the lock, then it
//     * should be in a <code>withoutLock { ... }</code> block.
//     */
//    protected def performReaction(): Unit
    protected def inRunningState(): Boolean = {
      state match {
        case r: Running if (Thread.currentThread eq r.thread) => true //all good!
        case r: Running => throw new IllegalStateException("attempt to run on the wrong thread")
        case _ => throw new InvalidStateForOperation(
                   "Actor must be in a Running state to perform a reaction, instead in: " + state, state)
      }
    }
    /*protected[Actor]*/ def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      state = new Killed(reason)
    }
  }

  final class StartScheduled extends ActionScheduled {
    protected def derivedEnter(): Unit = {
      //no need to throw ActorSuspended signal here because this state transition
      //will occur on a thread that does not belong to this actor
      scheduler.actorGC.newActor(StateActor.this)
    }
    protected def derivedExitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] || s.isInstanceOf[InternalError],
//              "StartScheduled can only exit to Running or Killed, attempted to: " + s)
    }
    def performAction() {
      state = new BasicRunning(Thread.currentThread, () => withoutLock { act() })
    }
    override def toString = "StartScheduled"
  }

  /**
   * @param continuation should contain any necessary state changes
   */
  class ContinuationScheduled(val continuation: () => Unit) extends ActionScheduled {
    protected def derivedEnter(): Unit = {
      //No need to throw an ActorSuspended signal because this transition will occur
      //when the actor is already suspended and will happen on a thread that is not
      //owned by this actor
      //debug("Continuation scheudled")
    }
    def performAction() {
      continuation()
    }
    protected def derivedExitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError],
//              "ContinuationScheduled may only exit to Running or Killed, attempted: " + s)
    }
    override def toString = "ContinuationScheduled"
  }

  protected final val noop = () => ()


  abstract class Running extends Active {
    val thread: Thread
    final def isWaiting = false
    final def isAttached = true
    final def isDetached = false
    final def isRunning = true

    final def enter() {
//      assert(thread eq Thread.currentThread, "attempt enter into " + this + " from wrong thread")
      derivedEnter()
    }

    protected def derivedEnter(): Unit

    def resumeTo(continuation: () => Unit, t: Thread): Running

    /**
     * Check the mailbox for messages for which <code>pf</code> is defined.  If
     * no messages are available then release the current thread until one
     * becomes available.
     * @param pf a partial function used to recognize and process messages
     */
    def detachAndWaitFor(pf: PartialFunction[Any, Unit]): Unit = {
      detachAndWaitFor(pf, -1L)
    }
    /**
     * Check the mailbox for messages for which <code>pf</code> is defined.
     * If no messges are available then release the current thread until one
     * an appropriate message is received  or the timeout period expires.
     * Note: The currently held thread is not released if there is currently
     *       a message matching <code>pf</code> in the mailbox.
     * @param pf a PartialFunction that will be used to determine if a message
     *           matches and that will be invoked when a matching message is received
     * @param millis the number of milliseconds to wait for a message, or 0 for infinity
     */
    def detachAndWaitFor(pf: PartialFunction[Any, Unit], millis: Long): Unit = {
//      assume(stateIsValid(true))
//      assert(currentThread eq thread,
//             "attempting a detached wait from a thread other than the thread on which the actor is running")
      mailbox.extractFirst((msg) => pf.isDefinedAt(msg.content)) match {
        case Some(msg) => {
          //processMessage(qel, pf, true)
          // can't use processMessage because it doesn't clear sessions
          // sessions need to be cleared so that the don't build up infinitely on
          // recursive reacts and loops
          //TODO: may need to look at a way to clear out sessions on recursive reacts and loops
          sessions = msg.sender :: sessions
          try {
            withoutLock { pf(msg.content) }
          } finally {
            sessions = sessions.tail
          }
        }
        case None => state = new DetachedWait(pf, millis, this)
      }
    }

    def blockAndWaitFor[R](pf: PartialFunction[Any, R]): R = {
      blockAndWaitFor(pf, -1L)
    }

    def blockAndWaitFor[R](pf: PartialFunction[Any, R], millis: Long): R = withLock {
      def blockLoop(millis: Long): Message[Any] = {
        state match {
          case k: Killing => state = new Killed(k.reason)
          case _ => ()
        }
        //debug("starting block loop")
        mailbox.extractFirst((msg) => pf.isDefinedAt(msg.content)) match {
          case None => {
            //TODO: move block logic into BlockedWait state
            if (!state.isInstanceOf[BlockedWait[R]]) state = new BlockedWait(pf, thread, millis)
            val start = System.currentTimeMillis()
            if (millis >= 0L) msgAvailCond.await(millis, java.util.concurrent.TimeUnit.MILLISECONDS)
            else msgAvailCond.await()
            val rem = Math.max(millis - (System.currentTimeMillis() - start), 0L)
            if (millis >= 0L && rem == 0L) {
              if (pf.isDefinedAt(TIMEOUT)) {
                val r: Message[Any] = Message(TIMEOUT, StateActor.this)
                state = resumeTo(noop, thread)
                r
              } else {
                state = new Timedout
                throw ActorExited
              }
            } else {
              blockLoop(rem)
            }
          }
          case Some(msg) => {
            if (state ne this) state = resumeTo(noop, thread)
            msg
          }
        }
      }
//      assume(stateIsValid(true))
//      assume(currentThread eq thread, "attempting to block on wrong thread")
      val qel = blockLoop(millis)
      //debug("have message")
      //assume(stateIsValid(true)) // this isn't necessarily true because the state
                                   // often changes during the blocking process
//      assert(state.isRunning)
      processMessage(qel, pf, true) //release the lock when processing the message
    }
    /**
     * exit for the specified reason
     * Sets the <code>state</code> to <code>Finished</code>.
     * @param reason the reason for exitting
     */
    private[StateActor] final def exit(reason: Any): Nothing = /*withLock*/ {
//      assume(stateIsValid(true))
//      assume(currentThread eq thread,
//             "exit can only be invoked on the thread on which the actor is running")
      state = new Finished(reason)
      throw ActorExited
    }
    /**
     * exit for reason <code>'normal</code>
     * Sets the <code>state</code> to <code>Finished</code>.
     */
    private[StateActor] final def exit(): Nothing = exit('normal)
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      if (Thread.currentThread == thread) {
        // the kill was issued from the thread to which the actor is bound, so
        // the actor can be killed by simply throwing an exception to return
        // the thread to the scheduler
        state = new Killed(reason)
      } else {
        // the kill is coming from a different thread from one to which the actor
        // is bound, so the actor need to be put into the Killing state so that
        // the library will know to terminate it when it regains control
        state = new Killing(reason, thread)
        // interrupt the thread in case it is blocked
        thread.interrupt()  //TODO: make sure interrupted state is properly managed by scheduler
      }
    }
  }


  /**
   * The actor is currently running on the specified thread
   */
  case class BasicRunning(val thread: Thread, val f: () => Unit) extends Running with ActionPending {


    protected def derivedEnter() = ()

    def performAction() {
      //actionLoop(this)
      f()
    }

//    private def actionLoop(s: BasicRunning) {
//      val next: BasicRunning = try {
//        s.f()
//        null
//      } catch {
//        case ReturnControl if state ne s => state match {
//          case br: BasicRunning => br
//          case _ => null
//        }
//      }
//      if (next ne null) actionLoop(next)
//    }

    final def immediate = true
    final def repeat = false

    def exitTo(s: ActorState): Unit = {
      //debug("exiting running state to: " + s)
//      assert(s.isInstanceOf[Wait[_]] /*|| s.isInstanceOf[FunctionScheduled]*/ ||
//             s.isInstanceOf[Killing] || s.isInstanceOf[UnhandledThrowable] ||
//             s.isInstanceOf[Finished] || s.isInstanceOf[InternalError] ||
//             s.isInstanceOf[TransitionError],
//        "Running may only transition to DetachedWait, BlockedWait, Finished, Killing, or UnhandledThrowable; attempted from: " + s)
    }

    def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq f) && (t eq thread)) {
        //println("reusing continuation from BasicRunning")
        this // safe to reuse this object
      } else {
        new BasicRunning(t, continuation)
      }
    }

    override def toString = "BasicRunning"

  }

  class RunningContinuation(val continuation: () => Unit, restoreState: Thread => ActorState, val thread: Thread) extends Running with ActionPending {

    protected def derivedEnter() = ()

    def exitTo(s: ActorState) {}
    def immediate = true
    def repeat = false
    def performAction() {
      continuation()
      state = restoreState(thread)
    }
    def resumeTo(continutation: () => Unit, thread: Thread): Running = {
      if ((continuation eq this.continuation) && (thread eq this.thread)) {
        //println("resuing continuation for RunningContinuation")
        this
      } else new RunningContinuation(continuation, restoreState, thread)
//      new RunningContinuation(continuation, restoreState, thread)
    }
  }

  abstract class Looping(val cond: () => Boolean, val thread: Thread, val f: () => Unit) extends Running with ActionPending {
    def exitTo(s: ActorState): Unit = {
      //assert(!s.isInstanceOf[RunningContinuation])
    }
    final def repeat = cond()  // this leads to multiple evaluations of cond
    final def immediate = true

    override def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq f) && (t eq thread)) {
        //println("reusing continuation for: " + state)
        this
      }
//      else new ResumeLooping(continuation, cond, t, f)
      else new RunningContinuation(continuation, (t: Thread) => new ContinueLooping(cond, t, f), t)
    }

    def performAction() {
      // Using a while loop here instead letting the controlLoop re-invoke
      // performAction leads to a significant performance difference
      while (cond()) {
        try {
          f()
        } catch {
          // catching the ReturnControl signal here versus in the controlLoop
          // cuts the cost of ExceptionBlob by more than 50% on a LoopPing
          // benchmark
          case ReturnControl if state eq this =>
        }
      }
    }
  }

  class InitialLooping(cond: () => Boolean, thread: Thread, f: () => Unit) extends Looping(cond, thread, f) {
    protected def derivedEnter() {
      throw ReturnControl
    }

    override def toString = "InitialLooping"
  }

  class ContinueLooping(cond: () => Boolean, thread: Thread, f: () => Unit) extends Looping(cond, thread, f) {
    //TODO: is ContinueLooping actually needed?
    protected def derivedEnter() = ()
    override def toString = "ContinueLooping"
  }


  class ReactLooping(val pf: PartialFunction[Any, Unit], val cond: () => Boolean, val thread: Thread) extends Running with ActionPending {
    protected def derivedEnter() {
      throw ReturnControl
    }

    def performAction() {
      innerLoop()
    }

    final def immediate = true
    final def repeat = true

    def exitTo(s: ActorState) {}

    protected final def innerLoop() {
      if (cond()) {
        detachAndWaitFor(pf)  //TODO: do we have a locking problem here?
        if (!state.isSuspended) innerLoop()
      } else state = new Finished()
    }

    def resumeTo(continuation: () => Unit, t: Thread): Running = {
      new ResumeReactLooping(continuation, pf, cond, t)
    }
    override def toString = "ReactLooping"
  }

  class ResumeReactLooping(val continuation: () => Unit, pf: PartialFunction[Any, Unit],
                           cond: () => Boolean, thread: Thread)
        extends ReactLooping(pf, cond, thread) {
    override protected def derivedEnter() {
      continuation()
      performAction()
    }

    override def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq this.continuation) && (t eq thread)) {
        //println("reusing continuation for: " + state)
        this
      }
      else new ResumeReactLooping(continuation, pf, cond, t)
    }
    override def toString = "ResumeReactLooping"
  }

  /**
   * The actor is in the process of being killed
   * @param reason the reason that the actor is being killed
   * @param thread the thread to which the actor is currently bound
   * @todo make killing extend Running instead of BasicRunning
   */
  class Killing(val reason: Any, thread: Thread) extends BasicRunning(thread, null) {
    override def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Killed] || s.isInstanceOf[InternalError]
//             || s.isInstanceOf[TransitionError],
//             "Killing may only exit to a Killed state, attempted: " + s)
    }

    override def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      throw new InvalidStateForOperation("message cannot be accepted, actor is being killed", state)
    }
    /**
     * kill from the killing state is a no-op if the specifed reason matches
     * the current reason for being killed.  If the reason does not match, then
     * an IllegalStateException is thrown because the atempt to kill is likely
     * erroneous.
     * @param reason the reason that the actor shoud be killed
     */
    override def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      if (reason != this.reason)
        throw new IllegalStateException("Attempt to kill " + StateActor.this + " for reason " + reason +
                                        " when in the process of being killed for " + this.reason)
      if (Thread.currentThread eq thread) {
        state = new Killed(reason)
      } else {
        // interrupt the thread again just in case...
        thread.interrupt()
      }
    }
    override def toString = "Killing for " + reason + " on thread " + thread
  }

  /**
   * Base class for all end-states.
   * End-states are final and cannot be exited.
   * @todo It might be reasonable to allow an actor to be restarted
   */
  trait Terminated extends ActorState {
    val reason: Any
    /**
     * Throws an <code>InvalidStateForOperation</code> because <code>Terminated</code> states cannot be exited.
     * @throws InvalidStateForOperation no matter what
     */
    final def exitTo(s: ActorState): Unit = {
      throw new InvalidStateForOperation("A terminated state may not be exitted", state)
    }

    final def enter(): Unit = {
      notifyLinked(reason)
      scheduler.actorGC.terminated(StateActor.this)
    }

    /**
     * a <code>Terminated</code> actor is never active
     * @returns false
     */
    final def isActive = false
    /**
     * a <code>Terminated</code> actor is never attached to a thread
     * Note that this implies that an actor that is permanently bound to a thread,
     * e.g. an actor that is a subclass of thread (not a good idea, but possible),
     * should terminate the thread when the actor terminates, rather than leaving
     * it lying around (probably a good idea, anyway).
     * @returns false
     */
    final def isAttached = false
    /**
     * a <code>Terminated</code> actor is never detached because detached implies
     * that the actor is in a suspended state and therefore may be started up again
     * @returns false
     * @todo this is a somewhat questionable definitino of isDetached
     */
    final def isDetached = false
    /**
     * a <code>Terminated</code> actor is never running
     * @returns false
     */
    final def isRunning = false

    /**
     * an actor must start before it can terminate, so <code>hasStarted</code> is always true
     * @returns true
     */
    final def hasStarted = true
    /**
     * an actor that has terminated cannot be waiting
     * @returns false
     */
    final def isWaiting = false
    /**
     * a <code>Terminated</code> actor always has terminated
     * @returns true
     */
    final def hasTerminated = true
    /**
     * A <code>Terminated</code> actor cannot receive messages and will throw an InvalidStateForOperation.
     * Note that this is a change in behavior.  The original Scala Actors library
     * had no formal concept of an actor being terminated and you could send it
     * a message regardless of its state.
     * @throws InvalidStateForOperation because the actor has been terminated
     */
    override final def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      throw new InvalidStateForOperation("actor has already terminated", state)
    }
  }

  /**
   * The actor has been killed for the specified reason
   * A <code>Killed</code> actor is an actor that was forcefully terminated, and
   * therefore was not allowed to complete execution and exit normally.
   * @param reason the reason why the actor was killed, defaults to 'killed
   */
  class Killed(val reason: Any) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      // killed can be reached from pretty much any state
//    }
    /** an actor thas has been killed has not finished */
    final def hasFinished = false
    override def toString = "Killed: " + reason
  }

  /**
   * The actor has finished normally with the specified reason
   * @param reason the reason why the actor completed, defaults to 'normal
   */
  class Finished(val reason: Any) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running],
//             "Finished may only be reached from the Running state, attempted: " + s)
//    }
    /** the actor has finished */
    final def hasFinished = true
    override def toString = "Finished: " + reason
  }

  /**
   * The actor has terminated normally due to a timeout
   * @todo Is this really a Finished state or should it be some sort of error state?
   */
  class Timedout extends Finished("Timedout")

  /**
   * An unhandled throwable was encountered
   * @param reason the Throwable that caused the actor to be terminated
   */
  class UnhandledThrowable(override val reason: Throwable) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running],
//             "UnhandledThrowable may only be entered from the running State, attempted: " + s)
//    }
    /** the actor has not finished due to an error */
    final def hasFinished = false
    override def toString = "UnhandledThrowable: " + reason
  }

  class TransitionError(override val reason: Throwable, val exitingState: ActorState,
                        val enteringState: ActorState)
        extends RuntimeException("Transition from " + exitingState + " to " +
                                 enteringState + " failed:\n" + reason.getMessage(),
                                 reason)
        with Terminated {

//    def enterFrom(a: ActorState): Unit = {
//      // no-op
//    }
    final def hasFinished = false
    override def toString = getMessage()
  }

  class InternalError(override val reason: Throwable) extends Terminated {
//    override def enterFrom(s: ActorState) {}
    def hasFinished = false
    override def toString = {
      val sw = new java.io.StringWriter()
      val w = new java.io.PrintWriter(sw)
      w.write("InternalErrror: ")
      w.write(reason.getMessage())
      reason.printStackTrace(w)
      w.flush()
      sw.toString()
    }
  }

  /**
   * Base class for wait states, such as BlockedWait and DetachedWait.
   * A wait state will wait until a message is received for which <code>waitingFor</code>
   * is defined.  A wait state should never be entered if there is already a
   * matching message in the mailbox.
   * @param waitingFor the <code>PartialFunction</code> used for checking messages and
   *                   that is applied to the first matching message received
   */
  abstract class Wait[R](val waitingFor: PartialFunction[Any, R], val millis: Long) extends Active {
    final def enter(): Unit = {
//      assert(mailbox.get(0)(waitingFor.isDefinedAt(_)) == None,
//             "attempt to enter a Wait state when a matching message is available")
      derivedEnter()
    }
    protected def derivedEnter(): Unit
    /** an actor in a Wait state is always waiting */
    final def isWaiting = true
    /** an actor in a Wait state is never running */
    final def isRunning = false
    override def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
//      assume(lockIsHeld)
//      assert(stateIsValid(true))
      //debug("receiving " + msg + " while waiting")
      val w = {
        // put replyTo on sessions so that calls to sender in waitingFor
        // will not fail or yield erroneous results
        sessions = replyTo :: sessions
        try {
          waitingFor.isDefinedAt(msg)
        } finally {
          sessions = sessions.tail
        }
      }
      // this can make it so the message is processed twice
      //mailbox.append(msg, replyTo)
      if (w) {
        //debug("message matched, scheduling execution")
        resume(msg, replyTo)
      } else {
        //debug("message did not match, continuing to wait")
        mailbox.append(Message(msg, replyTo))
      }
    }
    /**
     * Take the actions necessary to make sure the message is processed
     * Derived classes must implement this method.  The implementation should
     * schedule the continuation of the actor, signal the waiting thread to wakeup,
     * or take whatever other actions that may be necessary to ensure the processing
     * of the specified message.
     * <code>resume</code> will always be called with the lock held.
     * @param msg the message that was received by the actor
     * @param replyTo the channel from which the message was received and that should
     *                be used for sending replies
     */
    protected def resume(msg: Any, replyTo: OutputChannel[Any]): Unit
  }

  protected var detachCnt = 0L

  /**
   * The actor is wait for <code>waitingFor</code> to be defined and is not
   * currently attached to a thread.
   * @param waitingFor the partial function defining both the message that the actor is
   *                   waiting for and the action to be taken when it is received
   * @param millis the number of milliseconds to wait
   * @param prevRunningState the running state, will be used for resumeTo call
   * @todo would it make more sense to pass in a resumeTo function instead of holding on
   *       to the previous running state?
   */
  class DetachedWait(waitingFor: PartialFunction[Any, Unit], millis: Long, val prevRunningState: Running)
        extends Wait(waitingFor, millis) with Suspended {
    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[ContinuationScheduled] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError] ,
//             "DetachedWait may only exit to ContinuationScheduled or Killed, attempted: " + s)
    }
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      state = new Killed(reason)
    }
    protected def derivedEnter() {
      //debug("entering detached wait")
      //TODO: make sure ReactionScheduled doesn't do ANYTHING with the actor
      //      after the reaction has been performed
      if (millis >= 0L) {
        // a TIMEOUT has been requested
        //TODO: TIMEOUTS coming from multiple sources could create some odd interactions
        //      perhaps if pf is not defined for TIMEOUT then a unique message should
        //      be used instead of TIMEOUT so that it does not interfere with out TIMEOUTs
        //      ...unfortunately...how would the receiving actor know about the unique
        //      TIMEOUT in order to listen for it specifically??
        //      ...there could be one timeout object per actor.
        //
        // clean out spurious TIMEOUTS (where would the spurious timeouts come from?)
        while (mailbox.extractFirst((m) => m.content == TIMEOUT) != None) {}
        val npf = {
          if (waitingFor.isDefinedAt(TIMEOUT)) waitingFor
          else waitingFor.orElse[Any, Unit] { // make sure TIMEOUT will be handled
            case TIMEOUT => withLock {
                state = new Timedout
                throw ActorExited
            }
          }
        }
        val tt = new TimerTask {
          // a means to cancel the TimerTask is not required because if
          // the actor has left the wait state no message will be sent
          def run(): Unit = withLock {
            // if the actor is still waiting, send it a timeout message
            // ...if the actor is not waiting, then sending the timeout
            // message will result in a spurious timeout
            if (state eq DetachedWait.this) {
              send(TIMEOUT, StateActor.this)
            }
          }
        }
        Actor.timer.schedule(tt, millis)
      }
      detachCnt += 1L
    }
    protected def resume(msg: Any, replyTo: OutputChannel[Any]) {
//      assume(lockIsHeld, "the current thread is expected to hold the lock")
      val c = new ContinuationScheduled(() => {
        state = prevRunningState.resumeTo(() => {
          sessions = replyTo :: sessions
          try {
            //debug("invoking continuation on: " + msg)
            withoutLock { waitingFor(msg) }
          } finally {
            sessions = sessions.tail
          }
        }, Thread.currentThread)
      })
      state = c
      // an exception should NOT be thrown here because the actor is already suspended
      // also, this method is likely to be executed on a thread that is not
      // controlled by the actor library (e.g. main), so the exception will
      // just end up propogating into user code, which is bad.
    }
    override def toString = "DetachedWait"
  }


  class BlockedWait[R](waitingFor: PartialFunction[Any, R], val thread: Thread, millis: Long)
    extends Wait(waitingFor, millis) {

    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError],
//             "BlockedWait may only exit to Running or Killed states, attempted: " + s)
    }
    protected def derivedEnter() {
      //TODO: do the actual blocking here
      //assert(thread eq Thread.currentThread)
    }
    final def isAttached = true
    final def isDetached = false
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
//      assume(Thread.currentThread ne thread,
//             "nothing should be executed on a thread while the actor bound to it is in a blocked wait")
      state = new Killed(reason)
      thread.interrupt() // interrupt the thread so it knows it should terminate
      throw ActorExited
    }
    /**
     * Change the state to <code>Running</code> and notify the blocked thread.
     */
    protected def resume(msg: Any, replyTo: OutputChannel[Any]): Unit = withLock {
      // assumes a lock is already held for Actor.this
      //state = new Running(thread) // running state will be restored when signal is received
      //debug("being notified of message from " + replyTo)
      mailbox.append(Message(msg, replyTo))
      msgAvailCond.signal()
    }
    override def toString = "BlockedWait on thread: " + thread
  }

  /**
   * Handles common tasks associated with termination via explicit or implied exit
   * Assumes that already synchronized on this
   * @param newState the stop-state to enter
   * @todo move terminate code into an entry action of the Terminated state class
   */
//  protected def terminate(newState: Terminated) {
//    assume(lockIsHeld, "lock must already be held to terminate")
//    if (newState.hasFinished) {
//      // only execute the hook if the actor has actually finished.  This is because
//      // Actor.loop and Actor.loopWhile use it to subvert the termination process
//      // and keep the actor in an active state
//      beforeExitHook()
//    }
//    state = newState
//    notifyLinked(newState.reason)
//    ActorGC.terminated(Actor.this)
//  }

  /**
   * Notify any linked actors that this actor has terminated
   * @param reason the reason why the actor terminated
   * @todo move into the Terminated state class
   */
  private[this] def notifyLinked(reason: Any) {
    //debug("notifying linked actors of exit")
    exiting = true //TODO: get rid of exiting
    // remove this from links
    links = links.remove(this.==) // why is this necessary?
    // exit linked processes
    links.foreach((linked: AbstractActor) => {
      unlink(linked) // unlink to prevent circular notifications
      linked match {
        case linked: StateActor => linked.withLock {
          linked.state match {
            case _: linked.Killing => () // no-op because already shutting down
            case active: linked.Active => linked.exit(this, reason)
            case _ => () // non-active state so no-op
          }
        }
        case _ => if (!linked.exiting) linked.exit(this, reason)
      }

    })
    //debug("done notifying linked actors")
  }


  /*protected[actors]*/ def scheduler: IScheduler = Scheduler //TODO: restrict access to scheduler

  /**
   * Returns the number of messages in this actor's mailbox
   *
   * @return the number of messages in this actor's mailbox
   */
  def mailboxSize: Int = {
    //assert(lockIsHeld, "lock must be held to check mailboxSize")
    mailbox.size
  }

  /**
   * Sends <code>msg</code> to this actor (asynchronous) supplying
   * explicit reply destination.
   *
   * @param  msg      the message to send
   * @param  replyTo  the reply destination
   */
  def send(msg: Any, replyTo: OutputChannel[Any]) = withLock {
    //debug(replyTo + " sending message to " + this + " which in state " + state)
    state.receiveMessage(msg, replyTo)
  }

  /**
   * Receives a message from this actor's mailbox.
   *
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   * @throws InvalidStateForOperation is the actor is not in a Running state
   */
  def receive[R](f: PartialFunction[Any, R]): R = receiveWithin(-1L)(f)

  /**
   * Receives a message from this actor's mailbox within a certain
   * time span.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = withLock {
    state match {
      case rs: Running => rs.blockAndWaitFor(f, msec)
      case _ => throw new InvalidStateForOperation("receive called while actor not running: " + state, state)
    }
  }

  /**
   * Receives a message from this actor's mailbox.
   * <p>
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  f    a partial function with message patterns and actions
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def react(f: PartialFunction[Any, Unit]): Nothing = reactWithin(-1L)(f)

  /**
   * Receives a message from this actor's mailbox within a certain
   * time span.
   * <p>
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   * </p>
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = withLock {
    state match {
      case ls: Looping => {
        //optimize for the looping case where handling recursion is not
        //is not entirely necessary
        ls.detachAndWaitFor(f, msec)
      }
      case rs: Running => {
        state = new BasicRunning(rs.thread, () => {
          state match {
            case rs: Running => {
              rs.detachAndWaitFor(f, msec)
              //if (state.isSuspended) throw ActorSuspended else throw ReturnControl
            }
          }
        })
      }
      case _ => throw new InvalidStateForOperation("cannot reactWithin in state: " + state, state)
    }
    if (state.isSuspended) throw ActorSuspended else throw ReturnControl
  }


  def reactWhile(cond: => Boolean)(f: PartialFunction[Any, Unit]): Nothing = {
    //loopWhile(cond)(react(f))
    withLock { state = new ReactLooping(f, () => cond, Thread.currentThread) }
    //throw ReturnControl // now thrown by ReactLooping.enter()
    throw new RuntimeException("should never get here")
  }

  def loop(body: => Unit): Nothing = loopWhile(true)(body)

  /**
   *
   */
  def loopWhile(cond: => Boolean)(body: => Unit): Nothing = {
    withLock { state = new InitialLooping(() => cond, Thread.currentThread, () => body) }
    throw ReturnControl
  }

  /**
   * The behavior of an actor is specified by implementing this
   * abstract method. Note that the preferred way to create actors
   * is through the <code>actor</code> method
   * defined in object <code>Actor</code>.
   */
  def act(): Unit

  /**
   * Sends <code>msg</code> to this actor (asynchronous).
   */
  def !(msg: Any) {
    // self is the actor that is currently running, which may or may not be
    // this actor
    send(msg, Actor.self)
  }

  /**
   * Forwards <code>msg</code> to this actor (asynchronous).
   * @todo should forward be protected?
   */
  def forward(msg: Any) {
    send(msg, Actor.sender) // Actor.sender is the sender of the currently executing actor
  }

  /**
   * Sends <code>msg</code> to this actor and awaits reply
   * (synchronous).
   *
   * @param  msg the message to be sent
   * @return     the reply
   */
  def !?(msg: Any): Any = {
    // "self" here does not equal this unless the actor is messaging itself, which
    // in this case would be a very bad idea
    assert(Actor.self ne this, "an actor synchronously messaging itself will cause deadlock")
    val replyCh = Actor.self.freshReplyChannel
    send(msg, replyCh)
    replyCh.receive {
      case x => x
    }
  }

  /**
   * Sends <code>msg</code> to this actor and awaits reply
   * (synchronous) within <code>msec</code> milliseconds.
   *
   * @param  msec the time span before timeout
   * @param  msg  the message to be sent
   * @return      <code>None</code> in case of timeout, otherwise
   *              <code>Some(x)</code> where <code>x</code> is the reply
   */
  def !?(msec: Long, msg: Any): Option[Any] = {
    val replyCh = Actor.self(scheduler).freshReplyChannel
    send(msg, replyCh)
    replyCh.receiveWithin(msec) {
      case TIMEOUT => None
      case x => Some(x)
    }
  }

  /**
   * Sends <code>msg</code> to this actor and immediately
   * returns a future representing the reply value.
   */
  def !!(msg: Any): Future[Any] = {
    val ftch = new Channel[Any](Actor.self(scheduler))
    send(msg, ftch)
    new Future[Any](ftch) {
      def apply() =
        if (isSet) value.get
        else ch.receive {
          case any => value = Some(any); any
        }
      def respond(k: Any => Unit): Unit =
 	if (isSet) k(value.get)
 	else ch.react {
 	  case any => value = Some(any); k(any)
 	}
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

  /**
   * Sends <code>msg</code> to this actor and immediately
   * returns a future representing the reply value.
   * The reply is post-processed using the partial function
   * <code>f</code>. This also allows to recover a more
   * precise type for the reply value.
   */
  def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
    val ftch = new Channel[A](Actor.self(scheduler))
    send(msg, new OutputChannel[Any] {
      def !(msg: Any) =
        ftch ! f(msg)
      def send(msg: Any, replyTo: OutputChannel[Any]) =
        ftch.send(f(msg), replyTo)
      def forward(msg: Any) =
        ftch.forward(f(msg))
      def receiver =
        ftch.receiver
    })
    new Future[A](ftch) {
      def apply() =
        if (isSet) value.get.asInstanceOf[A]
        else ch.receive {
          case any => value = Some(any); value.get.asInstanceOf[A]
        }
      def respond(k: A => Unit): Unit =
 	if (isSet) k(value.get.asInstanceOf[A])
 	else ch.react {
 	  case any => value = Some(any); k(value.get.asInstanceOf[A])
 	}
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

  /**
   * Replies with <code>msg</code> to the sender.
   * reply will throw an exception if there is no sender
   * @todo why isn't reply protected instead of public?
   */
  def reply(msg: Any) {
    sender ! msg
  }

  //private var rc: Channel[Any] = null
  //private[actors] def replyChannel = rc

  /**
   * Receives the next message from this actor's mailbox.
   * This will block if there are no messages in the mailbox.
   * @todo why is ? public?  It probably should be protected
   */
  def ? : Any = receive {
    case x => x
  }

  /**
   * The sender of the message currently being processed.
   * If no message is being processed, this method will throw an exception.
   * @todo make sender throw a meaningful exception
   */
  /*protected*/ def sender: OutputChannel[Any] = sessions.head

  // this is inherited from OutputChannel so it cannot be made protected
  def receiver: StateActor = this

//  /**
//   * This function is run prior to actor exit.
//   * It is used by <code>Actor.loop</code> in combination with <code>seq</code>
//   * in order to hijack the exit process to reschedule the reaction before
//   * the actor is actually terminated.
//   * <code>beforeExitHook</code> will be executed with the lock held.
//   */
//  private var beforeExitHook: () => Unit = () => {}


  /**
   * Starts this actor.
   */
  def start(): StateActor = withLock {
    state match {
      case NotStarted => NotStarted.start()
    }
    this
  }

  private var links: List[AbstractActor] = Nil

  /**
   * Links <code>self</code> to actor <code>to</code>.
   *
   * @param to ...
   * @return   ...
   */
  def link(to: AbstractActor): AbstractActor = withLock {
    assert(Actor.self == this, "link called on actor different from self")
    links = to :: links
    to.linkTo(this)
    to
  }

  /**
   * Links <code>self</code> to actor defined by <code>body</code>.
   */
  def link(body: => Unit): Actor = {
    assert(Actor.self == this, "link called on actor different from self")
    val a = new Actor {
      def act() = body
      override final val scheduler: IScheduler = StateActor.this.scheduler
    }
    link(a)
    a.start()
    a
  }

  //TODO: restrict access to linkTo
  /*private[actors]*/ def linkTo(to: AbstractActor) = withLock {
    links = to :: links
  }

  /**
   * Unlinks <code>self</code> from actor <code>from</code>.
   */
  protected def unlink(from: AbstractActor) = withLock {
    assert(Actor.self == this, "unlink called on actor different from self")
    links = links.remove(from == _)
    from.unlinkFrom(this)
  }

  //TODO: restrict access to unlinkFrom
  /*private[actors]*/ def unlinkFrom(from: AbstractActor) = withLock {
    links = links.remove(from == _)
  }

  /**
   * Controls how the termination of linked actors is handled.
   * If trapExit is true, then when a linked actor exits an Exit message is
   * asynchronously sent to this actor, regardless of the reason for termination.
   * When trapExit is false and the reason for exit is not 'normal, then this
   * actor is terminated.  If the exit reason is 'normal, then nothing is done.
   * @todo trapExit should not be public
   * @todo should trapExit be associated with a given link instead of with all links?
   */
  var trapExit = false
  // exitReason held the reason for termination, or 'normal if the actor has
  // not terminated (or terminated normally).  exitReason has been replaced
  // with reason on the Terminated states.
  //private[actors] var exitReason: AnyRef = 'normal
  // shouldExit was used to inform the actor that it should exit at its earliest
  // convenience.  This has been replaced with state changes.
  //private[actors] var shouldExit = false

  /**
   * <p>
   *   Terminates execution of <code>self</code> with the following
   *   effect on linked actors:
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>true</code>, send message
   *   <code>Exit(self, reason)</code> to <code>a</code>.
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>false</code> (default),
   *   call <code>a.exit(reason)</code> if
   *   <code>reason != 'normal</code>.
   * </p>
   * @param reason the reason for exiting the actor
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def exit(reason: Any): Nothing = withLock {
    state match {
      case s: Running => s.exit('reason)
      case _ => throw new InvalidStateForOperation("actor cannot exit from state: " + state, state)
    }
  }

  /**
   * Terminates with exit reason <code>'normal</code>.
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def exit(): Nothing = exit('normal)

  /**
   * Receive notification from a linked actor that it has terminated and why
   * Assume !this.exiting
   * If trapExit is true the this actor is notified of the exit via an Exit message,
   * otherwise kill this actor if it is currently running
   * @param from the actor that is terminating, set to 'normal for normal termination
   * @param reason the reason that the actor has terminated
   * @todo restrict access to exit
   */
  /*private[actors]*/ def exit(from: AbstractActor, reason: Any): Unit = withLock {
    if (trapExit) {
      this ! Exit(from, reason)
    } else if (reason != 'normal) {
      state match {
        case _: Killing => () // no-op because actor is already being killed
        case a: Active => a.kill(reason)
        case _ => () //no-op when this actor is not running
      }
    }
  }

  private[actors] def terminated() {
    scheduler.actorGC.terminated(this)
  }

  private[actors] def onTerminate(f: => Unit) {
    scheduler.actorGC.onTerminate(this) { f }
  }
}
