1. Brian McKenna
  2. querylang

Commits

Brian McKenna  committed ebd2b49

Initial commit

  • Participants
  • Branches master

Comments (0)

Files changed (10)

File build.sbt

View file
  • Ignore whitespace
+resolvers += "sonatype-snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
+
+libraryDependencies ++= Seq(
+  "com.typesafe.akka" % "akka-actor" % "2.0.2",
+  "com.typesafe.akka" % "akka-testkit" % "2.0.2",
+  "org.scalaz" %% "scalaz-core" % "7.0-SNAPSHOT" changing(),
+  "org.specs2" %% "specs2" % "1.11" % "test",
+  "org.scala-tools.testing" %% "scalacheck" % "1.9" % "test"
+)
+
+scalacOptions ++= Seq("-deprecation", "-Ydependent-method-types", "-unchecked")

File sbt

View file
  • Ignore whitespace
+#!/usr/bin/env bash
+#
+# A more capable sbt runner, coincidentally also called sbt.
+# Author: Paul Phillips <paulp@typesafe.com>
+
+# todo - make this dynamic
+declare -r sbt_release_version=0.11.3
+declare -r sbt_snapshot_version=0.13.0-SNAPSHOT
+
+unset sbt_jar sbt_dir sbt_create sbt_snapshot sbt_launch_dir
+unset scala_version java_home sbt_explicit_version
+unset verbose debug quiet
+
+build_props_sbt () {
+  if [[ -f project/build.properties ]]; then
+    versionLine=$(grep ^sbt.version project/build.properties)
+    versionString=${versionLine##sbt.version=}
+    echo "$versionString"
+  fi
+}
+
+update_build_props_sbt () {
+  local ver="$1"
+  local old=$(build_props_sbt)
+
+  if [[ $ver == $old ]]; then
+    return
+  elif [[ -f project/build.properties ]]; then
+    perl -pi -e "s/^sbt\.version=.*\$/sbt.version=${ver}/" project/build.properties
+    grep -q '^sbt.version=' project/build.properties || echo "sbt.version=${ver}" >> project/build.properties
+
+    echo !!!
+    echo !!! Updated file project/build.properties setting sbt.version to: $ver
+    echo !!! Previous value was: $old
+    echo !!!
+  fi
+}
+
+sbt_version () {
+  if [[ -n $sbt_explicit_version ]]; then
+    echo $sbt_explicit_version
+  else
+    local v=$(build_props_sbt)
+    if [[ -n $v ]]; then
+      echo $v
+    else
+      echo $sbt_release_version
+    fi
+  fi
+}
+
+echoerr () {
+  echo 1>&2 "$@"
+}
+vlog () {
+  [[ $verbose || $debug ]] && echoerr "$@"
+}
+dlog () {
+  [[ $debug ]] && echoerr "$@"
+}
+
+# this seems to cover the bases on OSX, and someone will
+# have to tell me about the others.
+get_script_path () {
+  local path="$1"
+  [[ -L "$path" ]] || { echo "$path" ; return; }
+
+  local target=$(readlink "$path")
+  if [[ "${target:0:1}" == "/" ]]; then
+    echo "$target"
+  else
+    echo "$(dirname $path)/$target"
+  fi
+}
+
+# a ham-fisted attempt to move some memory settings in concert
+# so they need not be dicked around with individually.
+get_mem_opts () {
+  local mem=${1:-1536}
+  local perm=$(( $mem / 4 ))
+  (( $perm > 256 )) || perm=256
+  (( $perm < 1024 )) || perm=1024
+  local codecache=$(( $perm / 2 ))
+  
+  echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
+}
+
+die() {
+  echo "Aborting: $@"
+  exit 1
+}
+
+make_url () {
+  groupid="$1"
+  category="$2"
+  version="$3"
+  
+  echo "http://typesafe.artifactoryonline.com/typesafe/ivy-$category/$groupid/sbt-launch/$version/sbt-launch.jar"
+}
+
+declare -r default_jvm_opts="-Dfile.encoding=UTF8"
+declare -r default_sbt_opts="-XX:+CMSClassUnloadingEnabled"
+declare -r default_sbt_mem=1536
+declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy"
+declare -r sbt_opts_file=".sbtopts"
+declare -r jvm_opts_file=".jvmopts"
+declare -r latest_28="2.8.2"
+declare -r latest_29="2.9.1"
+declare -r latest_210="2.10.0-SNAPSHOT"
+
+declare -r script_path=$(get_script_path "$BASH_SOURCE")
+declare -r script_dir="$(dirname $script_path)"
+declare -r script_name="$(basename $script_path)"
+
+# some non-read-onlies set with defaults
+declare java_cmd=java
+declare sbt_launch_dir="$script_dir/.lib"
+declare sbt_mem=$default_sbt_mem
+
+# pull -J and -D options to give to java.
+declare -a residual_args
+declare -a java_args
+declare -a scalac_args
+declare -a sbt_commands
+
+build_props_scala () {
+  if [[ -f project/build.properties ]]; then
+    versionLine=$(grep ^build.scala.versions project/build.properties)
+    versionString=${versionLine##build.scala.versions=}
+    echo ${versionString%% .*}
+  fi
+}
+
+execRunner () {
+  # print the arguments one to a line, quoting any containing spaces
+  [[ $verbose || $debug ]] && echo "# Executing command line:" && {
+    for arg; do
+      if printf "%s\n" "$arg" | grep -q ' '; then
+        printf "\"%s\"\n" "$arg"
+      else
+        printf "%s\n" "$arg"
+      fi
+    done
+    echo ""
+  }
+
+  exec "$@"
+}
+
+sbt_groupid () {
+  case $(sbt_version) in
+        0.7.*) echo org.scala-tools.sbt ;;
+       0.10.*) echo org.scala-tools.sbt ;;
+    0.11.[12]) echo org.scala-tools.sbt ;;
+            *) echo org.scala-sbt ;;
+  esac
+}
+
+sbt_artifactory_list () {
+  local version0=$(sbt_version)
+  local version=${version0%-SNAPSHOT}
+  local url="http://typesafe.artifactoryonline.com/typesafe/ivy-snapshots/$(sbt_groupid)/sbt-launch/"
+  dlog "Looking for snapshot list at: $url "
+  
+  curl -s --list-only "$url" | \
+    grep -F $version | \
+    perl -e 'print reverse <>' | \
+    perl -pe 's#^<a href="([^"/]+).*#$1#;'
+}
+
+make_release_url () {
+  make_url $(sbt_groupid) releases $(sbt_version)
+}
+
+# argument is e.g. 0.13.0-SNAPSHOT
+# finds the actual version (with the build id) at artifactory
+make_snapshot_url () {
+  for ver in $(sbt_artifactory_list); do
+    local url=$(make_url $(sbt_groupid) snapshots $ver)
+    dlog "Testing $url"
+    curl -s --head "$url" >/dev/null
+    dlog "curl returned: $?"
+    echo "$url"
+    return
+  done
+}
+
+jar_url () {
+  case $(sbt_version) in
+             0.7.*) echo "http://simple-build-tool.googlecode.com/files/sbt-launch-0.7.7.jar" ;;
+        *-SNAPSHOT) make_snapshot_url ;;
+                 *) make_release_url ;;
+  esac
+}
+
+jar_file () {
+  echo "$sbt_launch_dir/$1/sbt-launch.jar"
+}
+
+download_url () {
+  local url="$1"
+  local jar="$2"
+  
+  echo "Downloading sbt launcher $(sbt_version):"
+  echo "  From  $url"
+  echo "    To  $jar"
+
+  mkdir -p $(dirname "$jar") && {
+    if which curl >/dev/null; then
+      curl --fail --silent "$url" --output "$jar"
+    elif which wget >/dev/null; then
+      wget --quiet -O "$jar" "$url"
+    fi
+  } && [[ -f "$jar" ]]
+}
+
+acquire_sbt_jar () {
+  sbt_url="$(jar_url)"
+  sbt_jar="$(jar_file $(sbt_version))"
+
+  [[ -f "$sbt_jar" ]] || download_url "$sbt_url" "$sbt_jar"
+}
+
+usage () {
+  cat <<EOM
+Usage: $script_name [options]
+
+  -h | -help         print this message
+  -v | -verbose      this runner is chattier
+  -d | -debug        set sbt log level to Debug
+  -q | -quiet        set sbt log level to Error
+  -no-colors         disable ANSI color codes
+  -sbt-create        start sbt even if current directory contains no sbt project
+  -sbt-dir   <path>  path to global settings/plugins directory (default: ~/.sbt/<version>)
+  -sbt-boot  <path>  path to shared boot directory (default: ~/.sbt/boot in 0.11 series)
+  -ivy       <path>  path to local Ivy repository (default: ~/.ivy2)
+  -mem    <integer>  set memory options (default: $sbt_mem, which is
+                       $(get_mem_opts $sbt_mem) )
+  -no-share          use all local caches; no sharing
+  -offline           put sbt in offline mode
+  -jvm-debug <port>  Turn on JVM debugging, open at the given port.
+  -batch             Disable interactive mode
+
+  # sbt version (default: from project/build.properties if present, else latest release)
+  !!! The only way to accomplish this pre-0.12.0 if there is a build.properties file which
+  !!! contains an sbt.version property is to update the file on disk.  That's what this does.
+  -sbt-version  <version>   use the specified version of sbt 
+  -sbt-jar      <path>      use the specified jar as the sbt launcher
+  -sbt-snapshot             use a snapshot version of sbt
+  -sbt-launch-dir <path>    directory to hold sbt launchers (default: $sbt_launch_dir)
+
+  # scala version (default: as chosen by sbt)
+  -28                       use $latest_28
+  -29                       use $latest_29
+  -210                      use $latest_210
+  -scala-home <path>        use the scala build at the specified directory
+  -scala-version <version>  use the specified version of scala
+
+  # java version (default: java from PATH, currently $(java -version |& grep version))
+  -java-home <path>         alternate JAVA_HOME
+
+  # jvm options and output control
+  JAVA_OPTS     environment variable holding jvm args, if unset uses "$default_jvm_opts"
+  SBT_OPTS      environment variable holding jvm args, if unset uses "$default_sbt_opts"
+  .jvmopts      if file is in sbt root, it is prepended to the args given to the jvm
+  .sbtopts      if file is in sbt root, it is prepended to the args given to **sbt**
+  -Dkey=val     pass -Dkey=val directly to the jvm
+  -J-X          pass option -X directly to the jvm (-J is stripped)
+  -S-X          add -X to sbt's scalacOptions (-J is stripped)
+
+In the case of duplicated or conflicting options, the order above
+shows precedence: JAVA_OPTS lowest, command line options highest.
+EOM
+}
+
+addJava () {
+  dlog "[addJava] arg = '$1'"
+  java_args=( "${java_args[@]}" "$1" )
+}
+addSbt () {
+  dlog "[addSbt] arg = '$1'"
+  sbt_commands=( "${sbt_commands[@]}" "$1" )
+}
+addScalac () {
+  dlog "[addScalac] arg = '$1'"
+  scalac_args=( "${scalac_args[@]}" "$1" )
+}
+addResidual () {
+  dlog "[residual] arg = '$1'"
+  residual_args=( "${residual_args[@]}" "$1" )
+}
+addResolver () {
+  addSbt "set resolvers in ThisBuild += $1"
+}
+addDebugger () {
+  addJava "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1"
+}
+get_jvm_opts () {
+  # echo "${JAVA_OPTS:-$default_jvm_opts}"
+  # echo "${SBT_OPTS:-$default_sbt_opts}"
+
+  [[ -f "$jvm_opts_file" ]] && cat "$jvm_opts_file"
+}
+
+process_args ()
+{
+  require_arg () {
+    local type="$1"
+    local opt="$2"
+    local arg="$3"
+    
+    if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then
+      die "$opt requires <$type> argument"
+    fi
+  }
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+       -h|-help) usage; exit 1 ;;
+    -v|-verbose) verbose=1 && shift ;;
+      -d|-debug) debug=1 && shift ;;
+      -q|-quiet) quiet=1 && shift ;;
+
+           -ivy) require_arg path "$1" "$2" && addJava "-Dsbt.ivy.home=$2" && shift 2 ;;
+           -mem) require_arg integer "$1" "$2" && sbt_mem="$2" && shift 2 ;;
+     -no-colors) addJava "-Dsbt.log.noformat=true" && shift ;;
+      -no-share) addJava "$noshare_opts" && shift ;;
+      -sbt-boot) require_arg path "$1" "$2" && addJava "-Dsbt.boot.directory=$2" && shift 2 ;;
+       -sbt-dir) require_arg path "$1" "$2" && sbt_dir="$2" && shift 2 ;;
+     -debug-inc) addJava "-Dxsbt.inc.debug=true" && shift ;;
+       -offline) addSbt "set offline := true" && shift ;;
+     -jvm-debug) require_arg port "$1" "$2" && addDebugger $2 && shift 2 ;;
+         -batch) exec </dev/null && shift ;;
+
+    -sbt-create) sbt_create=true && shift ;;
+  -sbt-snapshot) sbt_explicit_version=$sbt_snapshot_version && shift ;;
+       -sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
+   -sbt-version) require_arg version "$1" "$2" && sbt_explicit_version="$2" && shift 2 ;;
+-sbt-launch-dir) require_arg path "$1" "$2" && sbt_launch_dir="$2" && shift 2 ;;
+ -scala-version) require_arg version "$1" "$2" && addSbt "set scalaVersion := \"$2\"" && shift 2 ;;
+    -scala-home) require_arg path "$1" "$2" && addSbt "set scalaHome in ThisBuild := Some(file(\"$2\"))" && shift 2 ;;
+     -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;;
+
+            -D*) addJava "$1" && shift ;;
+            -J*) addJava "${1:2}" && shift ;;
+            -S*) addScalac "${1:2}" && shift ;;
+            -28) addSbt "++ $latest_28" && shift ;;
+            -29) addSbt "++ $latest_29" && shift ;;
+           -210) addSbt "++ $latest_210" && shift ;;
+
+              *) addResidual "$1" && shift ;;
+    esac
+  done
+  
+  [[ $debug ]] && {
+    case $(sbt_version) in
+     0.7.*) addSbt "debug" ;; 
+         *) addSbt "set logLevel in Global := Level.Debug" ;;
+    esac
+  }
+  [[ $quiet ]] && {
+    case $(sbt_version) in
+     0.7.*) ;; 
+         *) addSbt "set logLevel in Global := Level.Error" ;;
+    esac
+  }
+}
+
+# if .sbtopts exists, prepend its contents to $@ so it can be processed by this runner
+[[ -f "$sbt_opts_file" ]] && {
+  sbtargs=()
+  while IFS= read -r arg; do
+    sbtargs=( "${sbtargs[@]}" "$arg" )
+  done <"$sbt_opts_file"
+
+  set -- "${sbtargs[@]}" "$@"
+}
+
+# process the combined args, then reset "$@" to the residuals
+process_args "$@"
+set -- "${residual_args[@]}"
+argumentCount=$#
+
+# set scalacOptions if we were given any -S opts
+[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[@]}\""
+
+# Update build.properties no disk to set explicit version - sbt gives us no choice
+[[ -n "$sbt_explicit_version" ]] && update_build_props_sbt "$sbt_explicit_version"
+echo "Detected sbt version $(sbt_version)"
+
+[[ -n "$scala_version" ]] && echo "Overriding scala version to $scala_version"
+
+# no args - alert them there's stuff in here
+(( $argumentCount > 0 )) || echo "Starting $script_name: invoke with -help for other options"
+
+# verify this is an sbt dir or -create was given
+[[ -f ./build.sbt || -d ./project || -n "$sbt_create" ]] || {
+  cat <<EOM
+$(pwd) doesn't appear to be an sbt project.
+If you want to start sbt anyway, run:
+  $0 -sbt-create
+
+EOM
+  exit 1
+}
+
+# pick up completion if present; todo
+[[ -f .sbt_completion.sh ]] && source .sbt_completion.sh
+
+# no jar? download it.
+[[ -f "$sbt_jar" ]] || acquire_sbt_jar || {
+  # still no jar? uh-oh.
+  echo "Download failed. Obtain the jar manually and place it at $sbt_jar"
+  exit 1
+}
+
+[[ -n "$sbt_dir" ]] || {
+  sbt_dir=~/.sbt/$(sbt_version)
+  addJava "-Dsbt.global.base=$sbt_dir"
+  echo "Using $sbt_dir as sbt dir, -sbt-dir to override."
+}
+
+# since sbt 0.7 doesn't understand iflast
+(( ${#residual_args[@]} == 0 )) && residual_args=( "shell" )
+
+# run sbt
+execRunner "$java_cmd" \
+  $(get_mem_opts $sbt_mem) \
+  $(get_jvm_opts) \
+  ${java_args[@]} \
+  -jar "$sbt_jar" \
+  "${sbt_commands[@]}" \
+  "${residual_args[@]}"

File src/main/scala/Evaluator.scala

View file
  • Ignore whitespace
+package querylang
+
+import ast._
+
+trait Evaluator {
+  def apply(t: Map[Field, Seq[Double]])(v: Value): Double = v match {
+    case Constant(c) => c
+    case UnaryTransform(op, a) => op match {
+      case Exp => math.exp(apply(t)(a))
+      case Sine => math.sin(apply(t)(a))
+      case Cosine => math.cos(apply(t)(a))
+    }
+    case BinaryTransform(op, a, b) => op match {
+      case Add => apply(t)(a) + apply(t)(b)
+      case Subtract => apply(t)(a) - apply(t)(b)
+      case Multiply => apply(t)(a) * apply(t)(b)
+      case Divide => apply(t)(a) / apply(t)(b)
+      case Pow => math.pow(apply(t)(a), apply(t)(b))
+    }
+    // Sequential reduction implementations
+    case Reduction(op, field) =>
+      val rows = t(field)
+      op match {
+        case Count => rows.length
+        case Sum => rows.sum
+        case Mean => rows.sum / rows.length
+        case Median =>
+          val sorted = rows.sorted
+          sorted(sorted.length / 2)
+        case Mode =>
+          rows.groupBy(identity).maxBy(_._2.length)._1
+        case StdDev =>
+          val sum = rows.sum
+          val mean = sum / rows.length
+          math.sqrt((sum * sum - sum * mean) / (rows.length - 1))
+      }
+  }
+}
+
+object Evaluator extends Evaluator {
+  val empty = apply(Map.empty) _
+}

File src/main/scala/ast/Nodes.scala

View file
  • Ignore whitespace
+package querylang.ast
+
+// SELECT count(total) FROM purchases
+// SELECT avg(price) + avg(taxes) FROM purchases
+
+case class Select(value: Value, table: Table)
+case class Table(name: String)
+case class Field(name: String)
+
+sealed trait Value
+case class Constant(c: Double) extends Value
+case class Reduction(r: ReductionOp, field: Field) extends Value
+case class UnaryTransform(op: UnaryOp, a: Value) extends Value
+case class BinaryTransform(op: BinaryOp, a: Value, b: Value) extends Value
+
+sealed trait ReductionOp
+case object Count extends ReductionOp
+case object Sum extends ReductionOp
+case object Mean extends ReductionOp
+case object Median extends ReductionOp
+case object Mode extends ReductionOp
+case object StdDev extends ReductionOp
+
+sealed trait BinaryOp
+case object Add extends BinaryOp
+case object Subtract extends BinaryOp
+case object Divide extends BinaryOp
+case object Multiply extends BinaryOp
+case object Pow extends BinaryOp
+
+sealed trait UnaryOp
+case object Exp extends UnaryOp
+case object Sine extends UnaryOp
+case object Cosine extends UnaryOp

File src/main/scala/cluster/Master.scala

View file
  • Ignore whitespace
+package querylang
+package cluster
+
+import akka.actor._
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
+
+import scalaz.syntax.foldable._
+import scalaz.std.list._
+
+import PartialResult._
+
+import ast._
+
+class Master(workers: List[ActorRef]) extends Actor {
+  import context.dispatcher
+  implicit val timeout = Timeout(5 seconds)
+
+  def receive = {
+    case MasterQuery(Select(v, t)) =>
+      val originalSender = sender
+      val rs = reductions(v)
+
+      // Asynchronously ask each worker for a partial response for
+      // each reduction.
+      val fs = Future.sequence(for {
+        r <- rs
+      } yield {
+        val wq = WorkerQuery(r, t)
+        Future.sequence(
+          for(worker <- workers)
+          yield ask(worker, wq).mapTo[PartialResult]
+        )
+      })
+
+      fs.onSuccess {
+        case partialResults =>
+          // Place each PartialResponse back into the tree as
+          // constants, evaluate the tree and return to sender.
+          val subs = substitutions(rs, partialResults)
+          originalSender ! Evaluator.empty(substituteReductions(v, subs))
+      }
+  }
+
+  /** Finds all Reduction nodes in the AST. */
+  def reductions(value: Value): Seq[Reduction] = value match {
+    case Constant(_) => Seq()
+    case r@Reduction(_, _) => Seq(r)
    1. Brian McKenna author

      Haskell programmer. I really dislike reified types.

      r: Reduction is specifically a type-tag check (via instanceof). Theoretically, r@Reduction(_, _) could be compiled into something else, so I use it.

+    case UnaryTransform(_, a) => reductions(a)
+    case BinaryTransform(_, a, b) => reductions(a) ++ reductions(b)
+  }
+
+  /** Replaces Reduction nodes with Constant values throughout the AST. */
+  def substituteReductions(value: Value, subs: Map[Reduction, Double]): Value = value match {
+    case c@Constant(_) =>
+      c
+    case r@Reduction(_, _) =>
+      Constant(subs(r))
+    case UnaryTransform(op, a) =>
+      UnaryTransform(op, substituteReductions(a, subs))
+    case BinaryTransform(op, a, b) =>
+      BinaryTransform(op, substituteReductions(a, subs), substituteReductions(b, subs))
+  }
+
+  /** Creates a substitution map from Reduction to Double results. */
+  def substitutions(reductions: Seq[Reduction], partialResults: Seq[List[PartialResult]]) =
+    Map(reductions.zip(results(partialResults)): _*)
+
+  /** Calculates a Double result for each List of PartialResult.
+   *
+   * Uses the PartialResult Monoid instance to sum the workers' responses.
+   */
+  def results(partialResults: Seq[List[PartialResult]]) = for {
+    xs <- partialResults
+  } yield partialResultToResult(xs.suml)
+
+  /** Takes a PartialResult and calculates the final Double value. */
+  def partialResultToResult(p: PartialResult): Double = p match {
+    case PartialCount(c) => c
+    case PartialSum(s) => s
+    case PartialMean(c, s) => s / c
+    case PartialMedian(r) =>
+      val sorted = r.sorted
+      sorted(sorted.length / 2)
+    case PartialMode(f) =>
+      f.maxBy(_._2)._1
+    case PartialStdDev(c, s) =>
+      val mean = s / c
+      math.sqrt((s * s - s * mean) / (c - 1))
+    case PartialZero => sys.error("No workers to serve request")
+  }
+}

File src/main/scala/cluster/Messages.scala

View file
  • Ignore whitespace
+package querylang
+package cluster
+
+import akka.actor.ActorRef
+
+sealed trait MasterMessage
+case class MasterQuery(s: ast.Select) extends WorkerMessage
+
+sealed trait WorkerMessage
+case class WorkerQuery(r: ast.Reduction, t: ast.Table) extends WorkerMessage
+
+// PartialZero only exists to form a Monoid. A Monoid is required
+// because scalaz.Foldable isn't correctly abstracted. It should be
+// split into Foldable1 like in Kmett's semigroupoids package:
+//
+// http://hackage.haskell.org/packages/archive/semigroupoids/1.2.2/doc/html/Data-Semigroup-Foldable.html
+//
+// This way I'd be able to use Foldable1.suml which would only require
+// a Semigroup definition. I've started better abstracting Foldable in
+// my fork:
+//
+// https://github.com/pufuwozu/scalaz
+sealed trait PartialResult
+case object PartialZero extends PartialResult
+case class PartialSum(sum: Double) extends PartialResult
+case class PartialCount(count: Int) extends PartialResult
+case class PartialMean(count: Int, sum: Double) extends PartialResult
+case class PartialMedian(results: Seq[Double]) extends PartialResult
+case class PartialMode(frequencies: Map[Double, Int]) extends PartialResult
+case class PartialStdDev(count: Int, sum: Double) extends PartialResult
+
+object PartialResult {
+  // Scala or specs2 bugs out with this in the package object
+  import scalaz.Monoid
+  import scalaz.std.anyVal.intInstance
+  import scalaz.std.map.mapMonoid
+  import scalaz.syntax.semigroup._
+
+  implicit def partialResultMonoid: Monoid[PartialResult] = new Monoid[PartialResult] {
+    def zero = PartialZero
+    def append(a: PartialResult, b: => PartialResult) = (a, b) match {
+      case (PartialSum(as), PartialSum(bs)) => PartialSum(as + bs)
+      case (PartialCount(ac), PartialCount(bc)) => PartialCount(ac + bc)
+      case (PartialMean(ac, as), PartialMean(bc, bs)) => PartialMean(ac + bc, as + bs)
+      case (PartialMedian(ar), PartialMedian(br)) => PartialMedian(ar ++ br)
+      case (PartialMode(af), PartialMode(bf)) => PartialMode(af |+| bf)
+      case (PartialStdDev(ac, as), PartialStdDev(bc, bs)) => PartialStdDev(ac + bc, as + bs)
+      case (PartialZero, _) => b
+      case (_, _) => sys.error("Workers returned a bad combination of partial results")
+    }
+  }
+}

File src/main/scala/cluster/Worker.scala

View file
  • Ignore whitespace
+package querylang
+package cluster
+
+import akka.actor.Actor
+
+import ast._
+
+class Worker(data: Map[ast.Table, Map[ast.Field, Seq[Double]]]) extends Actor {
+  def receive = {
+    case WorkerQuery(Reduction(op, f), t) =>
+      val rows = data(t)(f)
+      sender ! (op match {
+        case Count => PartialCount(rows.length)
+        case Sum => PartialSum(rows.sum)
+        case Mean => PartialMean(rows.length, rows.sum)
+        case Median => PartialMedian(rows)
+        case Mode => PartialMode(rows.groupBy(identity).mapValues(_.length))
+        case StdDev => PartialStdDev(rows.length, rows.sum)
+      })
+  }
+}

File src/test/scala/ClusterSpec.scala

View file
  • Ignore whitespace
+package querylang
+package cluster
+
+import org.specs2.specification.Step
+import org.scalacheck.Arbitrary
+
+import akka.actor.{ Actor, ActorSystem, Props }
+import akka.testkit.TestKit
+import akka.util.duration._
+
+import scalaz.syntax.foldable._
+import scalaz.std.list._
+import scalaz.std.map._
+
+class ClusterSpec(system: ActorSystem) extends TestKit(system) with AkkaScalaCheck {
+  def this() = this(ActorSystem("ClusterSpec"))
+
+  def is =
+    sequential ^
+    "Cluster should" ^
+      p ^
+      "correctly evaluate queries" ! e1 ^
+    Step(system.shutdown()) ^ end
+
+
+  val table = ast.Table("test")
+
+  // Lists/streams are the only sequence Monoid instances in current
+  // scalaz 7.0-SNAPSHOT
+  val workerData =
+    for(i <- 0.0 to 4.0 by 1.0)
+    yield Map(
+      table -> Map(
+        ast.Field("test") -> List(i, i * 2, i * 3, i * 4)
+      )
+    )
+
+  val tableData = (for {
+    data <- workerData.toList
+  } yield data(table)).suml
+
+  val evaluator = Evaluator(tableData) _
+
+  val workerActors =
+    for(data <- workerData)
+    yield system.actorOf(Props(new Worker(data)))
+
+  val master = system.actorOf(Props(new Master(workerActors.toList)))
+
+  def message[A] = receiveOne(1 second).asInstanceOf[A]
+
+  implicit def arbitraryValue: Arbitrary[ast.Value] =
+    Arbitrary(genValueFromFields(tableData.keys.toSeq))
+
+
+  def e1 = check((v: ast.Value) => {
+    master ! MasterQuery(ast.Select(v, table))
+    val calculated = evaluator(v)
+    if(calculated.isNaN)
+      message must beNaN
+    else
+      message must be equalTo calculated
+  })
+}

File src/test/scala/MasterSpec.scala

View file
  • Ignore whitespace
+package querylang
+package cluster
+
+import org.specs2.specification.Step
+
+import akka.actor.{ ActorSystem, Props }
+import akka.testkit.TestKit
+import akka.util.duration._
+
+class MasterSpec(system: ActorSystem) extends TestKit(system) with AkkaScalaCheck {
+  def this() = this(ActorSystem("MasterSpec"))
+
+  def is =
+    sequential ^
+    "Master should" ^
+      p ^
+      "reply to constant queries" ! e1 ^
+      "reply to constant binary queries" ! e2 ^
+      "reply to constant unary queries" ! e3 ^
+    Step(system.shutdown()) ^ end
+
+
+  val master = system.actorOf(Props(new Master(List())))
+
+  def message[A] = receiveOne(1 second).asInstanceOf[A]
+
+
+  def e1 = check((a: Double) => {
+    master ! MasterQuery(ast.Select(
+      ast.Constant(a),
+      ast.Table("test")
+    ))
+    message must be equalTo a
+  })
+
+  def e2 = check((op: ast.BinaryOp, a: Double, b: Double) => {
+    val transform = ast.BinaryTransform(op, ast.Constant(a), ast.Constant(b))
+    master ! MasterQuery(ast.Select(transform, ast.Table("test")))
+
+    if(op == ast.Divide && a == 0.0 && b == 0.0) {
+      message must beNaN
+    } else {
+      message must be equalTo Evaluator.empty(transform)
+    }
+  })
+
+  def e3 = check((op: ast.UnaryOp, a: Double) => {
+    val transform = ast.UnaryTransform(op, ast.Constant(a))
+    master ! MasterQuery(ast.Select(transform, ast.Table("test")))
+    message must be equalTo Evaluator.empty(transform)
+  })
+}

File src/test/scala/package.scala

View file
  • Ignore whitespace
+package querylang
+
+import org.specs2.{ ScalaCheck, Specification }
+import org.specs2.matcher.{ Expectable, Matcher }
+import org.specs2.time.NoTimeConversions
+import org.scalacheck.{ Arbitrary, Gen }
+import akka.testkit.{ ImplicitSender, TestKit }
+
+package object cluster {
+  trait AkkaScalaCheck extends
+    Specification with
+    NoTimeConversions with
+    ScalaCheck with
+    ImplicitSender { self: TestKit =>
+
+    // Test runner seems to break with definitions in here. Have to do
+    // a 'clean test' to get working again.
+
+    // TODO: Reduce and report bug
+  }
+
+  def beNaN = new Matcher[Double] {
+    def apply[S <: Double](a: Expectable[S]) =
+      result(a.value.isNaN, a.value.toString + " is NaN", a.value.toString + "is a number", a)
+  }
+
+  implicit def arbitraryBinaryOp: Arbitrary[ast.BinaryOp] = Arbitrary(genBinaryOp)
+
+  implicit def arbitraryUnaryOp: Arbitrary[ast.UnaryOp] = Arbitrary(genUnaryOp)
+
+  // Boilerplate to generate AST ADTs
+  def genBinaryOp = Gen.oneOf(Seq(
+    ast.Add,
+    ast.Subtract,
+    ast.Divide,
+    ast.Multiply,
+    ast.Pow
+  ))
+
+  def genUnaryOp = Gen.oneOf(Seq(
+    ast.Exp,
+    ast.Sine,
+    ast.Cosine
+  ))
+
+  def genReductionOp = Gen.oneOf(Seq(
+    ast.Count,
+    ast.Sum,
+    ast.Mean,
+    ast.Median,
+    ast.Mode,
+    ast.StdDev
+  ))
+
+  // AST Value generation
+  def genConstant = for {
+    c <- Gen.chooseNum(Double.MinValue, Double.MaxValue)
+  } yield ast.Constant(c)
+
+  def genUnaryTransformFromFields(fs: Seq[ast.Field]) = for {
+    op <- genUnaryOp
+    a <- genValueFromFields(fs)
+  } yield ast.UnaryTransform(op, a)
+
+  def genBinaryTransformFromFields(fs: Seq[ast.Field]) = for {
+    op <- genBinaryOp
+    a <- genValueFromFields(fs)
+    b <- genValueFromFields(fs)
+  } yield ast.BinaryTransform(op, a, b)
+
+  def genField = for {
+    s <- Gen.identifier
+  } yield ast.Field(s)
+
+  def genReductionFromFields(fs: Seq[ast.Field]) = for {
+    op <- genReductionOp
+    field <- Gen.oneOf(fs)
+  } yield ast.Reduction(op, field)
+
+  def genValueFromFields(fs: Seq[ast.Field]): Gen[ast.Value] = Gen.oneOf(
+    genConstant,
+    genBinaryTransformFromFields(fs),
+    genUnaryTransformFromFields(fs),
+    genReductionFromFields(fs)
+  )
+}