eengbrec / Scala Enhancements

In-progress enhancements to the Scala standard library, mostly centered around actors.

commit 115: 24c0eb1f167b
parent 97: 26ae498dfea0
branch: actor_state_machine
added interop test that fails to shutdown due to link problem but otherwise works
eengbrec
9 months ago
r115:24c0eb1f167b 1715 loc 62.7 KB embed / history / annotate / raw /
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2009, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scalax.actors

import scala.actors.{ BaseActor, IScheduler, Channel, MessageQueue, Message,
                      OutputChannel, Signal, BasicMessageQueue,
                      ReturnControl, ActorExited, ActorSuspended, Future,
                      AbstractActor, Actor, Reaction, TIMEOUT, Scheduler, Exit }

import java.util.TimerTask


@serializable
trait StateActor extends BaseActor {

  /**
   * Base trait for exceptions thrown by actors, provides traceability back to
   * the actor that threw the exception and the thread on which it was running.
   * Note that the instance variables capturing the actor's state when the
   * exception was constructed are initialized without requesting a lock on
   * the actor.  If they are initialized when a lock is not held, then it is
   * possible that they have been initialized to different values than what
   * they were when the error occured, or even into an inconsistent set.  Always
   * check the value of <code>heldLock</code> to see if the lock was held so
   * that the snapshot of the state can be considered to be consistent.
   */
  trait ActorException extends Exception {
    /**
     * The actor that threw the exception
     */
    final def actor = StateActor.this
    /**
     * The thread on which the actor was running when the exceptionwas thrown
     */
    final val thread = Thread.currentThread
    /**
     * The state of the actor when the exception was thrown.
     */
    final val state = _state
    /**
     * The contents of the mailbox when the exception was initialized.
     */
    final val mailbox = StateActor.this.mailbox.foldLeft[List[Any]](Nil)((z, e) => e :: z).reverse
    /**
     * true if the lock was held when this exception was initialized, false otherwise
     */
    final val heldLock = lockIsHeld
    /**
     * The session stack at the time this exception was constructed.
     */
    final val sessions = StateActor.this.sessions
    /**
     * The value of <code>trapExit</code> at the time this exception was constructed.
     */
    final val trapExit = StateActor.this.trapExit
  }

  //class StartReactLoop(val cond: () => Boolean, val pf: PartialFunction[Any, Unit]) extends Signal

  class UnexpectedActorState(msg: String, cause: Throwable)
        extends IllegalStateException(msg, cause) with ActorException {
    def this(msg: String) = this(msg, null)
  }

  class InvalidStateForOperation(msg: String, val badState: ActorState)
    extends IllegalStateException(msg + "\n\t" + badState, null) with ActorException

  class InvalidStateTransition(msg: String) extends IllegalStateException(msg, null) with ActorException

  /**
   * Lock used for synchronization and signalling of this actor
   * Using a lock object instead of normal synchronization allows for
   * simpler locking/unlocking abstractions while providing fine-grained
   * control.  ReentrantLocks also provide better performance under contention.
   * @todo refactor lock, withLock, and withoutLock into an object
   */
  private val lock = new java.util.concurrent.locks.ReentrantLock()
  final protected def lockIsHeld = lock.isHeldByCurrentThread
  /**
   * condition used to signal that a message matching a current wait is available
   * on the queue
   */
  final private val msgAvailCond = lock.newCondition()
  /**
   * Perform <code>body</code> while holding the lock.
   * <p>If the lock is not already held by the current thread, it will be acquired
   * before executing <code>body</code>, and then released when <code>body</code>
   * completes.  It should be used in the following pattern:</p>
   * <code>
   *    def foo(): Unit = withLock {
   *       // ...perform some actions requiring a lock
   *       withoutLock {
   *         // ...perform some actions where lock should not be held
   *       }
   *       // ...perform some more actions that require the lock
   *    }
   * </code>
   * <p>This allows much more flexible locking/unlocking patterns while preserving
   * the safety of lexically synchronization.</p>
   */
  final protected def withLock[R](body: => R): R = {
    val alreadyHolds = lockIsHeld
    if (!alreadyHolds) lock.lock()  // only acquire if not already held
    val r = try {
      body
    } finally {
      if (!alreadyHolds) lock.unlock() // only release if acquired in this invokation
    }
    r
  }
  final protected def withoutLock[R](body: => R): R = {
    val alreadyHolds = lock.isHeldByCurrentThread
    if (alreadyHolds) lock.unlock() // only unlock if the lock is currently held
    val r = try {
      body
    } finally {
      if (alreadyHolds) lock.lock() // only relock if lock was held at invokation
    }
    r
  }

  @volatile var debugEnabled = false
  var msgCnt = 1
  protected def debug(msg: String) {
    if (debugEnabled) withLock {
      Console.synchronized {
        print(msgCnt)
        print("  ")
        print(this)
        print("\t")
        print(Thread.currentThread)
        print("\t")
        println(msg)
        Console.flush()
        msgCnt += 1
      }
    }
  }

  //TODO: a lot of the code that handles waiting for a message could be refactored
  //      inthe the MessageQueue class.
  //TODO: remove [actors]
  /*protected[actors]*/ val mailbox = new BasicMessageQueue[Message[Any]]  //TODO: restrict access to mailbox

  protected def availableMessage(p: Any => Boolean): Option[Message[Any]] = withLock {
    mailbox.extractFirst((m) => p(m.content))
  }

  /**
   * Process the first available message matching <code>p</code> and return the <code>Some(result)</code>.
   * If there is no available message matcing <code>p</code>, return <code>None</code>
   * If the lock is currently held, then <code>op</code> will be executed with
   * the lock still held.  If it is not held, then the lock will be released prior
   * to executing <code>op</code>.
   * The actor must be running when it invokes this method.
   * @param p the predicate to use in matching a message
   * @param op the operation to perform on the message if it is available
   */
  protected def withAvailableMessage[R](p: Any => Boolean)(op: Any => R): Option[R] = {
    withAvailableMessage(p, op, !lockIsHeld)
  }

  protected def withAvailableMessage[R](p: Any => Boolean, op: Any => R, releaseLock: Boolean): Option[R] = withLock {
//    assume(state.isRunning, "actor must be running in order to process a message")
    availableMessage(p) match {
      case None => None
      case Some(msg) => Some(processMessage(msg, op, releaseLock))
    }
  }

  protected def processMessage[R](msg: Message[Any], op: Any => R): R = {
    processMessage(msg, op, !lockIsHeld)
  }

  protected def processMessage[R](msg: Message[Any], op: Any => R, releaseLock: Boolean): R = {
    withLock {
//      assume(state.isRunning, "actor must be running in order to process a message")
      sessions = msg.sender :: sessions
      try {
        if (releaseLock) withoutLock(op(msg.content)) else op(msg.content)
      } finally {
        sessions = sessions.tail
      }
    }
  }

  private var sessions: List[OutputChannel[Any]] = Nil

  /**
   * the current state of the actor, should never be directly accessed
   * access using <code>state</code> and <code>state_=</code> intead.
   */
  private var _state: ActorState = initialState
  /**
   * Defaults to <code>NotStarted</code>.  Derived classes can override this
   * method in order to specify a different initial state.  This method
   * will only be invoked once per actor upon construction.  It should not access
   * any instance variables within the actor unless they were explicitly
   * initialized in an early initialization block.
   */
  protected def initialState: ActorState = NotStarted

  /**
   * the current state of the actor, lock must be held before accessing because
   * if the lock is not held, then the state is quite likely to spontaneously
   * change.
   */
  def state: ActorState = {
    assume(lockIsHeld, "lock must be held in order to access state")
    _state
  }
  /**
   * Change the state of the actor to <code>s</code>. The lock must be held.
   * The state change is performed in the following order:
   * <ol>
   *   <li>Perform exit actions and checks on the current state, including
   *       that the transition is valid.</li>
   *   <li>Update the state of the actor<li>
   *   <li>Perform any entry actions and checks for the new state, including
   *       validating that it was entered from an appropriate state</li>
   * </ol>
   * @param s the new state for the actor, must be a valid state transition
   */
  protected def state_=(s: ActorState): Unit = {
    val oldState = _state
    try {
      assert(lockIsHeld, "current thread must hold lock on this to modify state")
      debug("state change from " + _state + " to " + s)
      _state.exitTo(s)
      _state = s
      s.enter()
    } catch {
      case t: Throwable if !t.isInstanceOf[Signal] => {
        t.printStackTrace()
        val te = new TransitionError(t, oldState, s)
        //TODO: this could be a problem...
        //if (!s.isInstanceOf[Terminated]) terminate(te) else _state = te
        _state = te
        throw te
      }
    }
  }

  final protected def controlLoop(thread: Thread, ap: ActionPending): Unit = {
    try {
//      debug("performing action in controlLoop")
//      assert(state eq ap)
      ap.performAction()
    } catch {
      case ReturnControl =>
      case ActorSuspended => // can't assert state.isSuspended here because state may have changed
                                 // while the exception propogated up the stack
      case ActorExited => //assert(state.hasFinished, "ActorExited signal received by not in finished state")
      case te: TransitionError => //assert(state.isInstanceOf[TransitionError])
    }
    val next: ActionPending = state match {
      case nap: ActionPending if ((nap ne ap) || ap.repeat) && nap.immediate => {
        // if we have a new ActionPending state, or the old one wants to have
        // its action repeated, continue control loop
//        debug("restarting control loop")
        //controlLoop(thread, nap)  // bug #1672 makes this a bad idea
        nap
      }
      case nap: ActionPending if (nap eq ap) && !ap.repeat => {
        // no state change and action not supposed to be repeated, so terminate
  //       debug("New state " + state + ": " + (nap ne ap) + " Repeat: " + ap.repeat + " Immediate: " + nap.immediate)
        // this is the case where we had no state change and the ActionPending should not repeat
        state = new Finished()
        null
      }
      case r: Running => {
        // the actor finished running and has no action to perform
        //debug("Repear")
//        debug(state.toString)
        state = new Finished()
        null
      }
      case t: Terminated => null // noop, already terminated
      case _ => null //debug("exiting control loop in state: " + state)
    }
    if (next ne null) controlLoop(thread, next)
  }


  /**
   *
   */
  trait ActorState {
    /**
     * true if the actor is waiting to receive a message
     */
    def isWaiting: Boolean
    /** true if the actor is not attached to a thread */
    def isDetached: Boolean
    /** true if the actor is attached to a thread */
    def isAttached: Boolean
    /** the actor has started and has not terminated, but may or may not be running */
    def isActive: Boolean
    /** the actor has started, has not terminated, and is currently executing on a thread */
    def isRunning: Boolean
    /**
     * The actor is suspended.
     * An actor is suspended when it is both active and detached.  This usually
     * means that it is either waiting for a message or waiting to be scheduled
     * on a thread.
     */
    final def isSuspended: Boolean = isDetached && isActive
    def hasStarted: Boolean
    def hasFinished: Boolean
    def hasTerminated: Boolean
    /**
     * Check that this state is the current state.
     * Requires the lock to be held.
     * @param throwException if true, and exception will be thrown for invalid state
     * @return true if the actor is currently in this state, false if not
     * @throws UnexpectedActorState if this state is not the current state and throwException is true
     */
    def stateIsValid(throwException: Boolean): Boolean = {
      if (state ne this) { // access to state will check that lock is held
        if (throwException) {
          val msg = "Unexpected state " + state + ", expected: " + this
          throw new UnexpectedActorState(msg)
        }
        false
      } else true
    }

    def enter(): Unit
    def exitTo(s: ActorState): Unit

    def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      // the lock should be acquired prior accessing the state of the actor
//      assume(lockIsHeld, "lock on Actor.this must already be held")
//      assume(stateIsValid(true))
      //debug("receiving message and appending to mailbox: " + msg)
      mailbox.append(Message(msg, replyTo))
    }

// Sometimes it's handy for debugging to know where a state was created
//    final val enterTrace: List[StackTraceElement] = if (!debugEnabled) Nil else {
//      val trace = Thread.currentThread.getStackTrace()
//      trace.take(10).toList
//    }
  }

  object NotStarted extends ActorState {
    def isWaiting = false
    def isAttached = false
    def isDetached = true
    def isActive = false
    def hasStarted = false
    def hasFinished = false
    def isRunning = false
    def hasTerminated = false

    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[StartScheduled] || s.isInstanceOf[InternalError]
//             || s.isInstanceOf[TransitionError])
    }
    def enter(): Unit = {
      throw new IllegalStateException("NotStarted should never be entered")
    }

    def start(): Active = withLock {
//      assume(stateIsValid(true))
      //debug("starting actor")
      val r = new StartScheduled
      state = r
      r
    }
    override def toString = "NotStarted"
  }

  /**
   * A state that represents an active actor.
   * @todo move sessions into the Active state
   *       Moving sessions into the Active state will be a challenge because in
   *       order to allow the user of <code>sender</code> in <code>isDefinedAct</code>
   *       sessions have to be manipulated independently of state changes so that
   *       the presence of a session can be faked while a message is being checked
   *       to see if it matches.
   */
  trait Active extends ActorState {
    final def isActive = true
    final def hasStarted = true
    final def hasFinished = false
    final def hasTerminated = false
    final def kill(): Unit = kill('normal)
    def kill(reason: Any): Unit
  }

  /**
   * An an Active actor that is detached from a thread while it waits for something
   * to happen.  Usually it is either waiting to receive a message or waiting
   * for the scheduler to give it a thread on which to execute.
   */
  trait Suspended extends Active {
    final def isDetached = true
    final def isAttached = false
  }

  trait ActionPending extends Active {
    /** an action to perform for this state with the action loop */
    def performAction(): Unit
    /**
     * true if the action should be repeated if the state doesn't change,
     * false if after the action has completed and there has not been a state
     * change the actor should enter the <code>Finished</code> state.
     */
    def repeat: Boolean

    def immediate: Boolean
  }


  /**
   * The actor is detached and waiting for a action to be executed
   * States that represent an actor that is waiting for a thread (e.g. <code>StartScheduled</code>
   * and <code>ContinuationScheduled</code>) should extend this class.
   * @todo this eliminates the need for the <code>Reaction</code> class
   * @todo refactor actor shutdown-on-error handling
   */
  abstract class ActionScheduled extends Reaction with Suspended with ActionPending {

    val a = StateActor.this

    final def immediate = false
    def repeat = false

    final def isRunning = false

    final def isWaiting = false

    final def enter(): Unit = {
      derivedEnter()
      scheduler.execute(this)
    }

    final def exitTo(s: ActorState): Unit = {
      derivedExitTo(s)
      //assert(hasRun || s.hasTerminated, "reaction cannot transition to non-terminal state until is has run")
    }

    protected def derivedEnter(): Unit
    protected def derivedExitTo(s: ActorState): Unit

    //TODO: submit a bug report for this because the access in run() results in
    //      an IllegalAccessError
    /*private[this]*/ var hasRun = false

    /**
     * Run the scheduled reaction
     */
    protected def performReaction(): Unit = {
      withLock {
//        assert(!hasRun, "reaction has already ran")
        //TODO: include current thread in debug output
        //debug("peforming reaction " + this + " on thread " + Thread.currentThread)
        hasRun = true
        state match {
          case state: Killed => () // actor has been killed, do nothing
          case _ => {
            //val runningState = new Running(currentThread)
            try {
//              assert(stateIsValid(true)) // perform assertion within try block so if fails shutdown will occur
              controlLoop(Thread.currentThread, this)
            } catch {
              case ie: InterruptedException => {
                Thread.interrupted() // clear the interrupted status of this thread
                state match {
                  // the exception was (most likely) thrown as part of the actor kill process,
                  // or if it wasn't the actor is supposed to be killed anyway, so
                  // finish the job
                  case k: Killing => {
                    state = new Killed(k.reason)
                  }
                  case _ => {
                    ie.printStackTrace()
                    state = new UnhandledThrowable(ie)
                  }
                }
              }
              //TODO: is catching all throwables too much?  Errors like assertions
              case t: Throwable => {
                println("unhandled throwable caught in reaction, current state: " + state)
                t.printStackTrace()
                state match {
                  case rs: Running =>  {
                    // there was an unhandled Exception within the actor, exit
                    //TODO: it should be possible to determine if the exception orginated
                    //      in actor code or user code
                    t.printStackTrace() //TODO: find a better way to log the stack trace
                    state = new UnhandledThrowable(t)
                  }
                  case _ => {
                    val uas =  new UnexpectedActorState("Unexpected state: " + state, t)
                    val ie = new InternalError(uas)
                    //TODO: this looks like a dangerous hack...figure out if it is ok
                    if (!state.hasTerminated) state = ie else _state = ie
                  }
                }
              }
            }
          }
        }
      } // withLock
      //The SingleThreadedScheduler still holds the lock when the reaction finishes
      //because the actor runs on the thread that started it rather than a different
      //one.
      //assert(!lockIsHeld, "when reaction is finished lock should no longer be held")
    }
//    /**
//     * Implementing classes should provide an implementation for this method.
//     * <code>performReaction</code> will be invoked with the lock held, so if
//     * any part of this method should be executed without the lock, then it
//     * should be in a <code>withoutLock { ... }</code> block.
//     */
//    protected def performReaction(): Unit
    protected def inRunningState(): Boolean = {
      state match {
        case r: Running if (Thread.currentThread eq r.thread) => true //all good!
        case r: Running => throw new IllegalStateException("attempt to run on the wrong thread")
        case _ => throw new InvalidStateForOperation(
                   "Actor must be in a Running state to perform a reaction, instead in: " + state, state)
      }
    }
    /*protected[Actor]*/ def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      state = new Killed(reason)
    }
  }

  final class StartScheduled extends ActionScheduled {
    protected def derivedEnter(): Unit = {
      //no need to throw ActorSuspended signal here because this state transition
      //will occur on a thread that does not belong to this actor
      scheduler.actorGC.newActor(StateActor.this)
    }
    protected def derivedExitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] || s.isInstanceOf[InternalError],
//              "StartScheduled can only exit to Running or Killed, attempted to: " + s)
    }
    def performAction() {
      state = new BasicRunning(Thread.currentThread, () => withoutLock { act() })
    }
    override def toString = "StartScheduled"
  }

  /**
   * @param continuation should contain any necessary state changes
   */
  class ContinuationScheduled(val continuation: () => Unit) extends ActionScheduled {
    protected def derivedEnter(): Unit = {
      //No need to throw an ActorSuspended signal because this transition will occur
      //when the actor is already suspended and will happen on a thread that is not
      //owned by this actor
      //debug("Continuation scheudled")
    }
    def performAction() {
      continuation()
    }
    protected def derivedExitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError],
//              "ContinuationScheduled may only exit to Running or Killed, attempted: " + s)
    }
    override def toString = "ContinuationScheduled"
  }

  protected final val noop = () => ()


  abstract class Running extends Active {
    val thread: Thread
    final def isWaiting = false
    final def isAttached = true
    final def isDetached = false
    final def isRunning = true

    final def enter() {
//      assert(thread eq Thread.currentThread, "attempt enter into " + this + " from wrong thread")
      derivedEnter()
    }

    protected def derivedEnter(): Unit

    def resumeTo(continuation: () => Unit, t: Thread): Running

    /**
     * Check the mailbox for messages for which <code>pf</code> is defined.  If
     * no messages are available then release the current thread until one
     * becomes available.
     * @param pf a partial function used to recognize and process messages
     */
    def detachAndWaitFor(pf: PartialFunction[Any, Unit]): Unit = {
      detachAndWaitFor(pf, -1L)
    }
    /**
     * Check the mailbox for messages for which <code>pf</code> is defined.
     * If no messges are available then release the current thread until one
     * an appropriate message is received  or the timeout period expires.
     * Note: The currently held thread is not released if there is currently
     *       a message matching <code>pf</code> in the mailbox.
     * @param pf a PartialFunction that will be used to determine if a message
     *           matches and that will be invoked when a matching message is received
     * @param millis the number of milliseconds to wait for a message, or 0 for infinity
     */
    def detachAndWaitFor(pf: PartialFunction[Any, Unit], millis: Long): Unit = {
//      assume(stateIsValid(true))
//      assert(currentThread eq thread,
//             "attempting a detached wait from a thread other than the thread on which the actor is running")
      mailbox.extractFirst((msg) => pf.isDefinedAt(msg.content)) match {
        case Some(msg) => {
          //processMessage(qel, pf, true)
          // can't use processMessage because it doesn't clear sessions
          // sessions need to be cleared so that the don't build up infinitely on
          // recursive reacts and loops
          //TODO: may need to look at a way to clear out sessions on recursive reacts and loops
          sessions = msg.sender :: sessions
          try {
            withoutLock { pf(msg.content) }
          } finally {
            sessions = sessions.tail
          }
        }
        case None => state = new DetachedWait(pf, millis, this)
      }
    }

    def blockAndWaitFor[R](pf: PartialFunction[Any, R]): R = {
      blockAndWaitFor(pf, -1L)
    }

    def blockAndWaitFor[R](pf: PartialFunction[Any, R], millis: Long): R = withLock {
      def blockLoop(millis: Long): Message[Any] = {
        state match {
          case k: Killing => state = new Killed(k.reason)
          case _ => ()
        }
        //debug("starting block loop")
        mailbox.extractFirst((msg) => pf.isDefinedAt(msg.content)) match {
          case None => {
            //TODO: move block logic into BlockedWait state
            if (!state.isInstanceOf[BlockedWait[R]]) state = new BlockedWait(pf, thread, millis)
            val start = System.currentTimeMillis()
            if (millis >= 0L) msgAvailCond.await(millis, java.util.concurrent.TimeUnit.MILLISECONDS)
            else msgAvailCond.await()
            val rem = Math.max(millis - (System.currentTimeMillis() - start), 0L)
            if (millis >= 0L && rem == 0L) {
              if (pf.isDefinedAt(TIMEOUT)) {
                val r: Message[Any] = Message(TIMEOUT, StateActor.this)
                state = resumeTo(noop, thread)
                r
              } else {
                state = new Timedout
                throw ActorExited
              }
            } else {
              blockLoop(rem)
            }
          }
          case Some(msg) => {
            if (state ne this) state = resumeTo(noop, thread)
            msg
          }
        }
      }
//      assume(stateIsValid(true))
//      assume(currentThread eq thread, "attempting to block on wrong thread")
      val qel = blockLoop(millis)
      //debug("have message")
      //assume(stateIsValid(true)) // this isn't necessarily true because the state
                                   // often changes during the blocking process
//      assert(state.isRunning)
      processMessage(qel, pf, true) //release the lock when processing the message
    }
    /**
     * exit for the specified reason
     * Sets the <code>state</code> to <code>Finished</code>.
     * @param reason the reason for exitting
     */
    private[StateActor] final def exit(reason: Any): Nothing = /*withLock*/ {
//      assume(stateIsValid(true))
//      assume(currentThread eq thread,
//             "exit can only be invoked on the thread on which the actor is running")
      state = new Finished(reason)
      throw ActorExited
    }
    /**
     * exit for reason <code>'normal</code>
     * Sets the <code>state</code> to <code>Finished</code>.
     */
    private[StateActor] final def exit(): Nothing = exit('normal)
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      if (Thread.currentThread == thread) {
        // the kill was issued from the thread to which the actor is bound, so
        // the actor can be killed by simply throwing an exception to return
        // the thread to the scheduler
        state = new Killed(reason)
      } else {
        // the kill is coming from a different thread from one to which the actor
        // is bound, so the actor need to be put into the Killing state so that
        // the library will know to terminate it when it regains control
        state = new Killing(reason, thread)
        // interrupt the thread in case it is blocked
        thread.interrupt()  //TODO: make sure interrupted state is properly managed by scheduler
      }
    }
  }


  /**
   * The actor is currently running on the specified thread
   */
  case class BasicRunning(val thread: Thread, val f: () => Unit) extends Running with ActionPending {


    protected def derivedEnter() = ()

    def performAction() {
      //actionLoop(this)
      f()
    }

//    private def actionLoop(s: BasicRunning) {
//      val next: BasicRunning = try {
//        s.f()
//        null
//      } catch {
//        case ReturnControl if state ne s => state match {
//          case br: BasicRunning => br
//          case _ => null
//        }
//      }
//      if (next ne null) actionLoop(next)
//    }

    final def immediate = true
    final def repeat = false

    def exitTo(s: ActorState): Unit = {
      //debug("exiting running state to: " + s)
//      assert(s.isInstanceOf[Wait[_]] /*|| s.isInstanceOf[FunctionScheduled]*/ ||
//             s.isInstanceOf[Killing] || s.isInstanceOf[UnhandledThrowable] ||
//             s.isInstanceOf[Finished] || s.isInstanceOf[InternalError] ||
//             s.isInstanceOf[TransitionError],
//        "Running may only transition to DetachedWait, BlockedWait, Finished, Killing, or UnhandledThrowable; attempted from: " + s)
    }

    def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq f) && (t eq thread)) {
        //println("reusing continuation from BasicRunning")
        this // safe to reuse this object
      } else {
        new BasicRunning(t, continuation)
      }
    }

    override def toString = "BasicRunning"

  }

  class RunningContinuation(val continuation: () => Unit, restoreState: Thread => ActorState, val thread: Thread) extends Running with ActionPending {

    protected def derivedEnter() = ()

    def exitTo(s: ActorState) {}
    def immediate = true
    def repeat = false
    def performAction() {
      continuation()
      state = restoreState(thread)
    }
    def resumeTo(continutation: () => Unit, thread: Thread): Running = {
      if ((continuation eq this.continuation) && (thread eq this.thread)) {
        //println("resuing continuation for RunningContinuation")
        this
      } else new RunningContinuation(continuation, restoreState, thread)
//      new RunningContinuation(continuation, restoreState, thread)
    }
  }

  abstract class Looping(val cond: () => Boolean, val thread: Thread, val f: () => Unit) extends Running with ActionPending {
    def exitTo(s: ActorState): Unit = {
      //assert(!s.isInstanceOf[RunningContinuation])
    }
    final def repeat = cond()  // this leads to multiple evaluations of cond
    final def immediate = true

    override def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq f) && (t eq thread)) {
        //println("reusing continuation for: " + state)
        this
      }
//      else new ResumeLooping(continuation, cond, t, f)
      else new RunningContinuation(continuation, (t: Thread) => new ContinueLooping(cond, t, f), t)
    }

    def performAction() {
      // Using a while loop here instead letting the controlLoop re-invoke
      // performAction leads to a significant performance difference
      while (cond()) {
        try {
          f()
        } catch {
          // catching the ReturnControl signal here versus in the controlLoop
          // cuts the cost of ExceptionBlob by more than 50% on a LoopPing
          // benchmark
          case ReturnControl if state eq this =>
        }
      }
    }
  }

  class InitialLooping(cond: () => Boolean, thread: Thread, f: () => Unit) extends Looping(cond, thread, f) {
    protected def derivedEnter() {
      throw ReturnControl
    }

    override def toString = "InitialLooping"
  }

  class ContinueLooping(cond: () => Boolean, thread: Thread, f: () => Unit) extends Looping(cond, thread, f) {
    //TODO: is ContinueLooping actually needed?
    protected def derivedEnter() = ()
    override def toString = "ContinueLooping"
  }


  class ReactLooping(val pf: PartialFunction[Any, Unit], val cond: () => Boolean, val thread: Thread) extends Running with ActionPending {
    protected def derivedEnter() {
      throw ReturnControl
    }

    def performAction() {
      innerLoop()
    }

    final def immediate = true
    final def repeat = true

    def exitTo(s: ActorState) {}

    protected final def innerLoop() {
      if (cond()) {
        detachAndWaitFor(pf)  //TODO: do we have a locking problem here?
        if (!state.isSuspended) innerLoop()
      } else state = new Finished()
    }

    def resumeTo(continuation: () => Unit, t: Thread): Running = {
      new ResumeReactLooping(continuation, pf, cond, t)
    }
    override def toString = "ReactLooping"
  }

  class ResumeReactLooping(val continuation: () => Unit, pf: PartialFunction[Any, Unit],
                           cond: () => Boolean, thread: Thread)
        extends ReactLooping(pf, cond, thread) {
    override protected def derivedEnter() {
      continuation()
      performAction()
    }

    override def resumeTo(continuation: () => Unit, t: Thread): Running = {
      if ((continuation eq this.continuation) && (t eq thread)) {
        //println("reusing continuation for: " + state)
        this
      }
      else new ResumeReactLooping(continuation, pf, cond, t)
    }
    override def toString = "ResumeReactLooping"
  }

  /**
   * The actor is in the process of being killed
   * @param reason the reason that the actor is being killed
   * @param thread the thread to which the actor is currently bound
   * @todo make killing extend Running instead of BasicRunning
   */
  class Killing(val reason: Any, thread: Thread) extends BasicRunning(thread, null) {
    override def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Killed] || s.isInstanceOf[InternalError]
//             || s.isInstanceOf[TransitionError],
//             "Killing may only exit to a Killed state, attempted: " + s)
    }

    override def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      throw new InvalidStateForOperation("message cannot be accepted, actor is being killed", state)
    }
    /**
     * kill from the killing state is a no-op if the specifed reason matches
     * the current reason for being killed.  If the reason does not match, then
     * an IllegalStateException is thrown because the atempt to kill is likely
     * erroneous.
     * @param reason the reason that the actor shoud be killed
     */
    override def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      if (reason != this.reason)
        throw new IllegalStateException("Attempt to kill " + StateActor.this + " for reason " + reason +
                                        " when in the process of being killed for " + this.reason)
      if (Thread.currentThread eq thread) {
        state = new Killed(reason)
      } else {
        // interrupt the thread again just in case...
        thread.interrupt()
      }
    }
    override def toString = "Killing for " + reason + " on thread " + thread
  }

  /**
   * Base class for all end-states.
   * End-states are final and cannot be exited.
   * @todo It might be reasonable to allow an actor to be restarted
   */
  trait Terminated extends ActorState {
    val reason: Any
    /**
     * Throws an <code>InvalidStateForOperation</code> because <code>Terminated</code> states cannot be exited.
     * @throws InvalidStateForOperation no matter what
     */
    final def exitTo(s: ActorState): Unit = {
      throw new InvalidStateForOperation("A terminated state may not be exitted", state)
    }

    final def enter(): Unit = {
      notifyLinked(reason)
      scheduler.actorGC.terminated(StateActor.this)
    }

    /**
     * a <code>Terminated</code> actor is never active
     * @returns false
     */
    final def isActive = false
    /**
     * a <code>Terminated</code> actor is never attached to a thread
     * Note that this implies that an actor that is permanently bound to a thread,
     * e.g. an actor that is a subclass of thread (not a good idea, but possible),
     * should terminate the thread when the actor terminates, rather than leaving
     * it lying around (probably a good idea, anyway).
     * @returns false
     */
    final def isAttached = false
    /**
     * a <code>Terminated</code> actor is never detached because detached implies
     * that the actor is in a suspended state and therefore may be started up again
     * @returns false
     * @todo this is a somewhat questionable definitino of isDetached
     */
    final def isDetached = false
    /**
     * a <code>Terminated</code> actor is never running
     * @returns false
     */
    final def isRunning = false

    /**
     * an actor must start before it can terminate, so <code>hasStarted</code> is always true
     * @returns true
     */
    final def hasStarted = true
    /**
     * an actor that has terminated cannot be waiting
     * @returns false
     */
    final def isWaiting = false
    /**
     * a <code>Terminated</code> actor always has terminated
     * @returns true
     */
    final def hasTerminated = true
    /**
     * A <code>Terminated</code> actor cannot receive messages and will throw an InvalidStateForOperation.
     * Note that this is a change in behavior.  The original Scala Actors library
     * had no formal concept of an actor being terminated and you could send it
     * a message regardless of its state.
     * @throws InvalidStateForOperation because the actor has been terminated
     */
    override final def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
      throw new InvalidStateForOperation("actor has already terminated", state)
    }
  }

  /**
   * The actor has been killed for the specified reason
   * A <code>Killed</code> actor is an actor that was forcefully terminated, and
   * therefore was not allowed to complete execution and exit normally.
   * @param reason the reason why the actor was killed, defaults to 'killed
   */
  class Killed(val reason: Any) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      // killed can be reached from pretty much any state
//    }
    /** an actor thas has been killed has not finished */
    final def hasFinished = false
    override def toString = "Killed: " + reason
  }

  /**
   * The actor has finished normally with the specified reason
   * @param reason the reason why the actor completed, defaults to 'normal
   */
  class Finished(val reason: Any) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running],
//             "Finished may only be reached from the Running state, attempted: " + s)
//    }
    /** the actor has finished */
    final def hasFinished = true
    override def toString = "Finished: " + reason
  }

  /**
   * The actor has terminated normally due to a timeout
   * @todo Is this really a Finished state or should it be some sort of error state?
   */
  class Timedout extends Finished("Timedout")

  /**
   * An unhandled throwable was encountered
   * @param reason the Throwable that caused the actor to be terminated
   */
  class UnhandledThrowable(override val reason: Throwable) extends Terminated {
//    def enterFrom(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running],
//             "UnhandledThrowable may only be entered from the running State, attempted: " + s)
//    }
    /** the actor has not finished due to an error */
    final def hasFinished = false
    override def toString = "UnhandledThrowable: " + reason
  }

  class TransitionError(override val reason: Throwable, val exitingState: ActorState,
                        val enteringState: ActorState)
        extends RuntimeException("Transition from " + exitingState + " to " +
                                 enteringState + " failed:\n" + reason.getMessage(),
                                 reason)
        with Terminated {

//    def enterFrom(a: ActorState): Unit = {
//      // no-op
//    }
    final def hasFinished = false
    override def toString = getMessage()
  }

  class InternalError(override val reason: Throwable) extends Terminated {
//    override def enterFrom(s: ActorState) {}
    def hasFinished = false
    override def toString = {
      val sw = new java.io.StringWriter()
      val w = new java.io.PrintWriter(sw)
      w.write("InternalErrror: ")
      w.write(reason.getMessage())
      reason.printStackTrace(w)
      w.flush()
      sw.toString()
    }
  }

  /**
   * Base class for wait states, such as BlockedWait and DetachedWait.
   * A wait state will wait until a message is received for which <code>waitingFor</code>
   * is defined.  A wait state should never be entered if there is already a
   * matching message in the mailbox.
   * @param waitingFor the <code>PartialFunction</code> used for checking messages and
   *                   that is applied to the first matching message received
   */
  abstract class Wait[R](val waitingFor: PartialFunction[Any, R], val millis: Long) extends Active {
    final def enter(): Unit = {
//      assert(mailbox.get(0)(waitingFor.isDefinedAt(_)) == None,
//             "attempt to enter a Wait state when a matching message is available")
      derivedEnter()
    }
    protected def derivedEnter(): Unit
    /** an actor in a Wait state is always waiting */
    final def isWaiting = true
    /** an actor in a Wait state is never running */
    final def isRunning = false
    override def receiveMessage(msg: Any, replyTo: OutputChannel[Any]): Unit = {
//      assume(lockIsHeld)
//      assert(stateIsValid(true))
      //debug("receiving " + msg + " while waiting")
      val w = {
        // put replyTo on sessions so that calls to sender in waitingFor
        // will not fail or yield erroneous results
        sessions = replyTo :: sessions
        try {
          waitingFor.isDefinedAt(msg)
        } finally {
          sessions = sessions.tail
        }
      }
      // this can make it so the message is processed twice
      //mailbox.append(msg, replyTo)
      if (w) {
        //debug("message matched, scheduling execution")
        resume(msg, replyTo)
      } else {
        //debug("message did not match, continuing to wait")
        mailbox.append(Message(msg, replyTo))
      }
    }
    /**
     * Take the actions necessary to make sure the message is processed
     * Derived classes must implement this method.  The implementation should
     * schedule the continuation of the actor, signal the waiting thread to wakeup,
     * or take whatever other actions that may be necessary to ensure the processing
     * of the specified message.
     * <code>resume</code> will always be called with the lock held.
     * @param msg the message that was received by the actor
     * @param replyTo the channel from which the message was received and that should
     *                be used for sending replies
     */
    protected def resume(msg: Any, replyTo: OutputChannel[Any]): Unit
  }

  protected var detachCnt = 0L

  /**
   * The actor is wait for <code>waitingFor</code> to be defined and is not
   * currently attached to a thread.
   * @param waitingFor the partial function defining both the message that the actor is
   *                   waiting for and the action to be taken when it is received
   * @param millis the number of milliseconds to wait
   * @param prevRunningState the running state, will be used for resumeTo call
   * @todo would it make more sense to pass in a resumeTo function instead of holding on
   *       to the previous running state?
   */
  class DetachedWait(waitingFor: PartialFunction[Any, Unit], millis: Long, val prevRunningState: Running)
        extends Wait(waitingFor, millis) with Suspended {
    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[ContinuationScheduled] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError] ,
//             "DetachedWait may only exit to ContinuationScheduled or Killed, attempted: " + s)
    }
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
      state = new Killed(reason)
    }
    protected def derivedEnter() {
      //debug("entering detached wait")
      //TODO: make sure ReactionScheduled doesn't do ANYTHING with the actor
      //      after the reaction has been performed
      if (millis >= 0L) {
        // a TIMEOUT has been requested
        //TODO: TIMEOUTS coming from multiple sources could create some odd interactions
        //      perhaps if pf is not defined for TIMEOUT then a unique message should
        //      be used instead of TIMEOUT so that it does not interfere with out TIMEOUTs
        //      ...unfortunately...how would the receiving actor know about the unique
        //      TIMEOUT in order to listen for it specifically??
        //      ...there could be one timeout object per actor.
        //
        // clean out spurious TIMEOUTS (where would the spurious timeouts come from?)
        while (mailbox.extractFirst((m) => m.content == TIMEOUT) != None) {}
        val npf = {
          if (waitingFor.isDefinedAt(TIMEOUT)) waitingFor
          else waitingFor.orElse[Any, Unit] { // make sure TIMEOUT will be handled
            case TIMEOUT => withLock {
                state = new Timedout
                throw ActorExited
            }
          }
        }
        val tt = new TimerTask {
          // a means to cancel the TimerTask is not required because if
          // the actor has left the wait state no message will be sent
          def run(): Unit = withLock {
            // if the actor is still waiting, send it a timeout message
            // ...if the actor is not waiting, then sending the timeout
            // message will result in a spurious timeout
            if (state eq DetachedWait.this) {
              send(TIMEOUT, StateActor.this)
            }
          }
        }
        Actor.timer.schedule(tt, millis)
      }
      detachCnt += 1L
    }
    protected def resume(msg: Any, replyTo: OutputChannel[Any]) {
//      assume(lockIsHeld, "the current thread is expected to hold the lock")
      val c = new ContinuationScheduled(() => {
        state = prevRunningState.resumeTo(() => {
          sessions = replyTo :: sessions
          try {
            //debug("invoking continuation on: " + msg)
            withoutLock { waitingFor(msg) }
          } finally {
            sessions = sessions.tail
          }
        }, Thread.currentThread)
      })
      state = c
      // an exception should NOT be thrown here because the actor is already suspended
      // also, this method is likely to be executed on a thread that is not
      // controlled by the actor library (e.g. main), so the exception will
      // just end up propogating into user code, which is bad.
    }
    override def toString = "DetachedWait"
  }


  class BlockedWait[R](waitingFor: PartialFunction[Any, R], val thread: Thread, millis: Long)
    extends Wait(waitingFor, millis) {

    def exitTo(s: ActorState): Unit = {
//      assert(s.isInstanceOf[Running] || s.isInstanceOf[Killed] ||
//             s.isInstanceOf[InternalError] || s.isInstanceOf[TransitionError],
//             "BlockedWait may only exit to Running or Killed states, attempted: " + s)
    }
    protected def derivedEnter() {
      //TODO: do the actual blocking here
      //assert(thread eq Thread.currentThread)
    }
    final def isAttached = true
    final def isDetached = false
    def kill(reason: Any): Unit = withLock {
//      assume(stateIsValid(true))
//      assume(Thread.currentThread ne thread,
//             "nothing should be executed on a thread while the actor bound to it is in a blocked wait")
      state = new Killed(reason)
      thread.interrupt() // interrupt the thread so it knows it should terminate
      throw ActorExited
    }
    /**
     * Change the state to <code>Running</code> and notify the blocked thread.
     */
    protected def resume(msg: Any, replyTo: OutputChannel[Any]): Unit = withLock {
      // assumes a lock is already held for Actor.this
      //state = new Running(thread) // running state will be restored when signal is received
      //debug("being notified of message from " + replyTo)
      mailbox.append(Message(msg, replyTo))
      msgAvailCond.signal()
    }
    override def toString = "BlockedWait on thread: " + thread
  }

  /**
   * Handles common tasks associated with termination via explicit or implied exit
   * Assumes that already synchronized on this
   * @param newState the stop-state to enter
   * @todo move terminate code into an entry action of the Terminated state class
   */
//  protected def terminate(newState: Terminated) {
//    assume(lockIsHeld, "lock must already be held to terminate")
//    if (newState.hasFinished) {
//      // only execute the hook if the actor has actually finished.  This is because
//      // Actor.loop and Actor.loopWhile use it to subvert the termination process
//      // and keep the actor in an active state
//      beforeExitHook()
//    }
//    state = newState
//    notifyLinked(newState.reason)
//    ActorGC.terminated(Actor.this)
//  }

  /**
   * Notify any linked actors that this actor has terminated
   * @param reason the reason why the actor terminated
   * @todo move into the Terminated state class
   */
  private[this] def notifyLinked(reason: Any) {
    //debug("notifying linked actors of exit")
    exiting = true //TODO: get rid of exiting
    // remove this from links
    links = links.remove(this.==) // why is this necessary?
    // exit linked processes
    links.foreach((linked: AbstractActor) => {
      unlink(linked) // unlink to prevent circular notifications
      linked match {
        case linked: StateActor => linked.withLock {
          linked.state match {
            case _: linked.Killing => () // no-op because already shutting down
            case active: linked.Active => linked.exit(this, reason)
            case _ => () // non-active state so no-op
          }
        }
        case _ => if (!linked.exiting) linked.exit(this, reason)
      }

    })
    //debug("done notifying linked actors")
  }


  /*protected[actors]*/ def scheduler: IScheduler = Scheduler //TODO: restrict access to scheduler

  /**
   * Returns the number of messages in this actor's mailbox
   *
   * @return the number of messages in this actor's mailbox
   */
  def mailboxSize: Int = {
    //assert(lockIsHeld, "lock must be held to check mailboxSize")
    mailbox.size
  }

  /**
   * Sends <code>msg</code> to this actor (asynchronous) supplying
   * explicit reply destination.
   *
   * @param  msg      the message to send
   * @param  replyTo  the reply destination
   */
  def send(msg: Any, replyTo: OutputChannel[Any]) = withLock {
    //debug(replyTo + " sending message to " + this + " which in state " + state)
    state.receiveMessage(msg, replyTo)
  }

  /**
   * Receives a message from this actor's mailbox.
   *
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   * @throws InvalidStateForOperation is the actor is not in a Running state
   */
  def receive[R](f: PartialFunction[Any, R]): R = receiveWithin(-1L)(f)

  /**
   * Receives a message from this actor's mailbox within a certain
   * time span.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = withLock {
    state match {
      case rs: Running => rs.blockAndWaitFor(f, msec)
      case _ => throw new InvalidStateForOperation("receive called while actor not running: " + state, state)
    }
  }

  /**
   * Receives a message from this actor's mailbox.
   * <p>
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  f    a partial function with message patterns and actions
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def react(f: PartialFunction[Any, Unit]): Nothing = reactWithin(-1L)(f)

  /**
   * Receives a message from this actor's mailbox within a certain
   * time span.
   * <p>
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   * </p>
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = withLock {
    state match {
      case ls: Looping => {
        //optimize for the looping case where handling recursion is not
        //is not entirely necessary
        ls.detachAndWaitFor(f, msec)
      }
      case rs: Running => {
        state = new BasicRunning(rs.thread, () => {
          state match {
            case rs: Running => {
              rs.detachAndWaitFor(f, msec)
              //if (state.isSuspended) throw ActorSuspended else throw ReturnControl
            }
          }
        })
      }
      case _ => throw new InvalidStateForOperation("cannot reactWithin in state: " + state, state)
    }
    if (state.isSuspended) throw ActorSuspended else throw ReturnControl
  }


  def reactWhile(cond: => Boolean)(f: PartialFunction[Any, Unit]): Nothing = {
    //loopWhile(cond)(react(f))
    withLock { state = new ReactLooping(f, () => cond, Thread.currentThread) }
    //throw ReturnControl // now thrown by ReactLooping.enter()
    throw new RuntimeException("should never get here")
  }

  def loop(body: => Unit): Nothing = loopWhile(true)(body)

  /**
   *
   */
  def loopWhile(cond: => Boolean)(body: => Unit): Nothing = {
    withLock { state = new InitialLooping(() => cond, Thread.currentThread, () => body) }
    throw ReturnControl
  }

  /**
   * The behavior of an actor is specified by implementing this
   * abstract method. Note that the preferred way to create actors
   * is through the <code>actor</code> method
   * defined in object <code>Actor</code>.
   */
  def act(): Unit

  /**
   * Sends <code>msg</code> to this actor (asynchronous).
   */
  def !(msg: Any) {
    // self is the actor that is currently running, which may or may not be
    // this actor
    send(msg, Actor.self)
  }

  /**
   * Forwards <code>msg</code> to this actor (asynchronous).
   * @todo should forward be protected?
   */
  def forward(msg: Any) {
    send(msg, Actor.sender) // Actor.sender is the sender of the currently executing actor
  }

  /**
   * Sends <code>msg</code> to this actor and awaits reply
   * (synchronous).
   *
   * @param  msg the message to be sent
   * @return     the reply
   */
  def !?(msg: Any): Any = {
    // "self" here does not equal this unless the actor is messaging itself, which
    // in this case would be a very bad idea
    assert(Actor.self ne this, "an actor synchronously messaging itself will cause deadlock")
    val replyCh = Actor.self.freshReplyChannel
    send(msg, replyCh)
    replyCh.receive {
      case x => x
    }
  }

  /**
   * Sends <code>msg</code> to this actor and awaits reply
   * (synchronous) within <code>msec</code> milliseconds.
   *
   * @param  msec the time span before timeout
   * @param  msg  the message to be sent
   * @return      <code>None</code> in case of timeout, otherwise
   *              <code>Some(x)</code> where <code>x</code> is the reply
   */
  def !?(msec: Long, msg: Any): Option[Any] = {
    val replyCh = Actor.self(scheduler).freshReplyChannel
    send(msg, replyCh)
    replyCh.receiveWithin(msec) {
      case TIMEOUT => None
      case x => Some(x)
    }
  }

  /**
   * Sends <code>msg</code> to this actor and immediately
   * returns a future representing the reply value.
   */
  def !!(msg: Any): Future[Any] = {
    val ftch = new Channel[Any](Actor.self(scheduler))
    send(msg, ftch)
    new Future[Any](ftch) {
      def apply() =
        if (isSet) value.get
        else ch.receive {
          case any => value = Some(any); any
        }
      def respond(k: Any => Unit): Unit =
 	if (isSet) k(value.get)
 	else ch.react {
 	  case any => value = Some(any); k(any)
 	}
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

  /**
   * Sends <code>msg</code> to this actor and immediately
   * returns a future representing the reply value.
   * The reply is post-processed using the partial function
   * <code>f</code>. This also allows to recover a more
   * precise type for the reply value.
   */
  def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
    val ftch = new Channel[A](Actor.self(scheduler))
    send(msg, new OutputChannel[Any] {
      def !(msg: Any) =
        ftch ! f(msg)
      def send(msg: Any, replyTo: OutputChannel[Any]) =
        ftch.send(f(msg), replyTo)
      def forward(msg: Any) =
        ftch.forward(f(msg))
      def receiver =
        ftch.receiver
    })
    new Future[A](ftch) {
      def apply() =
        if (isSet) value.get.asInstanceOf[A]
        else ch.receive {
          case any => value = Some(any); value.get.asInstanceOf[A]
        }
      def respond(k: A => Unit): Unit =
 	if (isSet) k(value.get.asInstanceOf[A])
 	else ch.react {
 	  case any => value = Some(any); k(value.get.asInstanceOf[A])
 	}
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

  /**
   * Replies with <code>msg</code> to the sender.
   * reply will throw an exception if there is no sender
   * @todo why isn't reply protected instead of public?
   */
  def reply(msg: Any) {
    sender ! msg
  }

  //private var rc: Channel[Any] = null
  //private[actors] def replyChannel = rc

  /**
   * Receives the next message from this actor's mailbox.
   * This will block if there are no messages in the mailbox.
   * @todo why is ? public?  It probably should be protected
   */
  def ? : Any = receive {
    case x => x
  }

  /**
   * The sender of the message currently being processed.
   * If no message is being processed, this method will throw an exception.
   * @todo make sender throw a meaningful exception
   */
  /*protected*/ def sender: OutputChannel[Any] = sessions.head

  // this is inherited from OutputChannel so it cannot be made protected
  def receiver: StateActor = this

//  /**
//   * This function is run prior to actor exit.
//   * It is used by <code>Actor.loop</code> in combination with <code>seq</code>
//   * in order to hijack the exit process to reschedule the reaction before
//   * the actor is actually terminated.
//   * <code>beforeExitHook</code> will be executed with the lock held.
//   */
//  private var beforeExitHook: () => Unit = () => {}


  /**
   * Starts this actor.
   */
  def start(): StateActor = withLock {
    state match {
      case NotStarted => NotStarted.start()
    }
    this
  }

  private var links: List[AbstractActor] = Nil

  /**
   * Links <code>self</code> to actor <code>to</code>.
   *
   * @param to ...
   * @return   ...
   */
  def link(to: AbstractActor): AbstractActor = withLock {
    assert(Actor.self == this, "link called on actor different from self")
    links = to :: links
    to.linkTo(this)
    to
  }

  /**
   * Links <code>self</code> to actor defined by <code>body</code>.
   */
  def link(body: => Unit): Actor = {
    assert(Actor.self == this, "link called on actor different from self")
    val a = new Actor {
      def act() = body
      override final val scheduler: IScheduler = StateActor.this.scheduler
    }
    link(a)
    a.start()
    a
  }

  //TODO: restrict access to linkTo
  /*private[actors]*/ def linkTo(to: AbstractActor) = withLock {
    links = to :: links
  }

  /**
   * Unlinks <code>self</code> from actor <code>from</code>.
   */
  protected def unlink(from: AbstractActor) = withLock {
    assert(Actor.self == this, "unlink called on actor different from self")
    links = links.remove(from == _)
    from.unlinkFrom(this)
  }

  //TODO: restrict access to unlinkFrom
  /*private[actors]*/ def unlinkFrom(from: AbstractActor) = withLock {
    links = links.remove(from == _)
  }

  /**
   * Controls how the termination of linked actors is handled.
   * If trapExit is true, then when a linked actor exits an Exit message is
   * asynchronously sent to this actor, regardless of the reason for termination.
   * When trapExit is false and the reason for exit is not 'normal, then this
   * actor is terminated.  If the exit reason is 'normal, then nothing is done.
   * @todo trapExit should not be public
   * @todo should trapExit be associated with a given link instead of with all links?
   */
  var trapExit = false
  // exitReason held the reason for termination, or 'normal if the actor has
  // not terminated (or terminated normally).  exitReason has been replaced
  // with reason on the Terminated states.
  //private[actors] var exitReason: AnyRef = 'normal
  // shouldExit was used to inform the actor that it should exit at its earliest
  // convenience.  This has been replaced with state changes.
  //private[actors] var shouldExit = false

  /**
   * <p>
   *   Terminates execution of <code>self</code> with the following
   *   effect on linked actors:
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>true</code>, send message
   *   <code>Exit(self, reason)</code> to <code>a</code>.
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>false</code> (default),
   *   call <code>a.exit(reason)</code> if
   *   <code>reason != 'normal</code>.
   * </p>
   * @param reason the reason for exiting the actor
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def exit(reason: Any): Nothing = withLock {
    state match {
      case s: Running => s.exit('reason)
      case _ => throw new InvalidStateForOperation("actor cannot exit from state: " + state, state)
    }
  }

  /**
   * Terminates with exit reason <code>'normal</code>.
   * @throws InvalidStateForOperation if the actor is not in a Running state
   */
  def exit(): Nothing = exit('normal)

  /**
   * Receive notification from a linked actor that it has terminated and why
   * Assume !this.exiting
   * If trapExit is true the this actor is notified of the exit via an Exit message,
   * otherwise kill this actor if it is currently running
   * @param from the actor that is terminating, set to 'normal for normal termination
   * @param reason the reason that the actor has terminated
   * @todo restrict access to exit
   */
  /*private[actors]*/ def exit(from: AbstractActor, reason: Any): Unit = withLock {
    if (trapExit) {
      this ! Exit(from, reason)
    } else if (reason != 'normal) {
      state match {
        case _: Killing => () // no-op because actor is already being killed
        case a: Active => a.kill(reason)
        case _ => () //no-op when this actor is not running
      }
    }
  }

  private[actors] def terminated() {
    scheduler.actorGC.terminated(this)
  }

  private[actors] def onTerminate(f: => Unit) {
    scheduler.actorGC.onTerminate(this) { f }
  }
}