Commits

Liam Staskawicz committed 2d0737e

* initial commit of working code

Comments (0)

Files changed (16)

+#! /usr/bin/env fan
+
+using build
+
+**
+** Build: mongo
+**
+class Build : BuildPod
+{
+  override Void setup()
+  {
+    podName = "mongo"
+  }
+}

fan/Collection.fan

+
+
+
+**
+**  Collection
+**
+const class Collection
+{
+  internal const Str name
+  const DB db
+  
+  new make(DB db, Str name)
+  {
+    this.name = validateName(name)
+    this.db = db
+  }
+  
+  Str validateName(Str name)
+  {
+    // if(name.containsChar('$') && !Regex("test").matches(name))
+    //   throw Err("invalid collection name - can't contain \$")
+    
+    if(name.isEmpty)
+      throw MongoArgErr("invalid collection name - can't be empty")
+    
+    if(name.startsWith(".") || name.endsWith("."))
+      throw MongoArgErr("invalid collection name - ${name} can't start or end with .")
+      
+    return name
+  }
+  
+  Map validate()
+  {
+    return db.command(["validate":this.name])
+  }
+  
+  Str fullName()
+  {
+    return "${db.name}.${name}"
+  }
+  
+  Void ensureIndex(Str idxName)
+  {
+    
+  }
+  
+  Void ensureIndexes(Str:Obj? keys)
+  {
+    
+  }
+  
+  Void drop()
+  {
+    db.dropCollection(this.name)
+  }
+  
+  Void dropIndex(Str idxName)
+  {
+    db.dropIndex(this.name, idxName)
+  }
+  
+  Void dropAllIndexes()
+  {
+    db.dropIndex(this.name, "*")
+  }
+  
+  Int count()
+  {
+    return find().count
+  }
+  
+  Cursor find(Map selector := [:], Map opts := [:])
+  {
+    return Cursor(this, selector, opts)
+  }
+  
+  Map? findOne(Map query := [:], Map opts := [:])
+  {
+    return find(query, opts).next
+  }
+  
+  Map save(Map object, Bool safe := false)
+  {
+    if(object.containsKey("_id"))
+      return update(["_id":object["_id"]], object, true, safe)
+    else
+      return insert(object, safe)
+  }
+  
+  Map insert(Map object, Bool safe := false)
+  {
+    return insertDocs([object], safe).first
+  }
+  
+  Map[] insertDocs(Map[] objects, Bool safe := false)
+  {
+    b := Buf() { endian = Endian.little }
+    b.writeI4(0)                        // reserved
+    Bson.writeCStr(b.out, fullName)     // full name
+    objects.each |obj| {                // bson objects
+      // add _id to object if needed
+      if(!obj.containsKey("_id"))
+        obj["_id"] = ObjectID()
+      Bson.write(b.out, obj)
+    }
+    
+    s := db.connection.getSocket()
+    db.connection.sendMsg(s.out, b.flip, MongoOp.INSERT)
+    // todo - read last error in strict mode
+    return objects
+  }
+  
+  Map update(Map query, Map doc, Bool upsert := false, Bool multi := false, Bool safe := false)
+  {
+    b := Buf() { endian = Endian.little }
+    b.writeI4(0) // reserved
+    Bson.writeCStr(b.out, fullName) // full name
+    opts := (upsert == true) ? 1 : 0
+    if(multi == true)
+      opts += opts.and(1.shiftl(1))
+    b.writeI4(opts)
+    Bson.write(b.out, query)
+    Bson.write(b.out, doc)
+    
+    s := db.connection.getSocket()
+    db.connection.sendMsg(s.out, b.flip, MongoOp.UPDATE)
+    // todo - read last error in strict mode
+    return doc
+  }
+  
+  Void remove(Map query, Bool safe := false)
+  {
+    b := Buf() { endian = Endian.little }
+    b.writeI4(0) // reserved
+    Bson.writeCStr(b.out, fullName)
+    b.writeI4(0) // reserved
+    Bson.write(b.out, query)
+    
+    s := db.connection.getSocket()
+    db.connection.sendMsg(s.out, b.flip, MongoOp.DELETE)
+    // todo - read last error in strict mode
+  }
+  
+  // MapReduceOut mapReduce(Str map, Str reduce, Str outputCollection, Map query)
+  // {
+  //   
+  // }
+}
+
+
+

fan/Connection.fan

+
+using inet
+
+**
+** MongoActor
+**
+internal const class Connection
+{
+  private const DB db
+  private const Str id
+  private static const Log log := Log.get("mongo")
+  private static const Actor idInc := Actor(ActorPool()) |msg->Int|
+  {
+    Int count := Actor.locals["count"] ?: 1
+    Actor.locals["count"] = count + 1
+    return count
+  }
+  
+  new make(DB db)
+  {
+    this.db = db
+    this.id = "MongoConn-" + Int.random.toHex
+  }
+  
+  TcpSocket getSocket()
+  {
+    TcpSocket? s := Actor.locals[id]
+    if(s == null) {
+      s = TcpSocket()
+      Actor.locals[id] = s
+    }
+    if(!s.isConnected()) {
+      s.connect(IpAddr(db.mongo.address), db.mongo.port)
+      s.in.endian = s.out.endian = Endian.little
+    }
+    return s
+  }
+  
+  Int sendMsg(OutStream os, Buf b, MongoOp op)
+  {
+    reqId := idInc.send(null).get as Int
+    // header - len, reqID, reserved, opcode
+    os.writeI4(b.size + 16).writeI4(reqId).writeI4(0).writeI4(op.opcode)
+    os.writeBuf(b).flush
+    return reqId
+  }
+  
+  Void close()
+  {
+    TcpSocket? s := Actor.locals[id]
+    if(s != null)
+      s.close
+  }
+}
+
+
+
+**
+**  Cursor
+**
+class Cursor 
+{
+  private static const Log log := Log.get("mongo")
+  internal const Collection coll
+  const Map selector            // document selector
+  const Map opts                // query options
+  private Bool queryStarted := false // have we opened up this cursor via query() yet?
+  private Int cursorID := 0     // DB assigned cursor ID
+  private List cache := [,]     // cache of returned objects
+  private Bool closed := false  // set to true when the DB tells us there's nothing left
+  
+  new make(Collection c, Map selector, Map opts)
+  {
+    this.coll = c
+    this.selector = selector
+    this.opts = opts
+  }
+  
+  **
+  ** Return true if this cursor has remaining data, false if not.
+  **
+  Bool more()
+  {
+    fillErUp
+    return cache.size > 0
+  }
+  
+  **
+  ** Get the next element from this cursor.
+  **
+  Map? next()
+  {
+    fillErUp
+    return (cache.size > 0) ? cache.removeAt(0) : null
+  }
+  
+  // get more data if needed
+  private Void fillErUp()
+  {
+    if (!closed && cache.size == 0) {
+      if(queryStarted == true)
+        getMore()
+      else
+        doQuery()
+    }
+  }
+  
+  This limit(Int lim)
+  {
+    opts["limit"] = lim
+    return this
+  }
+  
+  This skip(Int c)
+  {
+    opts["skip"] = c
+    return this
+  }
+  
+  Int count()
+  {
+    cmd := Str:Obj[:] { ordered = true }
+    cmd.set("count", coll.name).set("query", selector).set("fields", opts["fields"])
+    res := coll.db.command(cmd)
+    if (DB.cmdOk(res)) {
+      c := (res["n"] as Float).toInt
+      return c.min(opts.get("limit", c))
+    } 
+    if (res["errmsg"] == "ns missing")
+      return 0 
+    throw MongoOpErr("count() failed - ${res}")
+  }
+  
+  // This sort()
+  // {
+  //   return this
+  // }
+  
+  Map explain()
+  {
+    expopts := this.opts.dup
+    expopts["explain"] = true
+    c := Cursor(coll, this.selector, expopts)
+    explanation := c.next
+    c.close
+    return explanation
+  }
+  
+  List toList()
+  {
+    objects := Map[,]
+    while(this.more)
+      objects.add(next)
+    return objects
+  }
+  
+  **
+  ** Which query options have been selected.
+  ** See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
+  ** 
+  private Int queryOpts()
+  {
+    timeout := 0 // opts["timeout"] ? 0 : MongoOp.QUERY_NO_CURSOR_TIMEOUT
+    slaveOk := 0 // @connection.slave_ok? ? MongoOp.QUERY_SLAVE_OK : 0 
+    return slaveOk + timeout
+  }
+  
+  private Str fullName()
+  {
+    return (opts["admin"] == true) ? "admin.${coll.name}" : coll.fullName
+  } 
+  
+  private Void doQuery()
+  {
+    b := Buf() { endian = Endian.little }
+    b.writeI4(queryOpts)                        // query opts
+    Bson.writeCStr(b.out, fullName)             // full name
+    b.writeI4(opts.get("skip", 0))              // skip
+    b.writeI4(opts.get("batchsize", 0))         // num to return
+    Bson.write(b.out, selector)                 // query object
+    if(opts.containsKey("fields"))              // optional fieldReturnSelector
+      Bson.write(b.out, opts["fields"])
+    
+    s := coll.db.connection.getSocket()
+    reqID := coll.db.connection.sendMsg(s.out, b.flip, MongoOp.QUERY)
+    readResponse(s.in, reqID)
+    this.queryStarted = true
+  }
+  
+  private Void getMore()
+  {
+    b := Buf() { endian = Endian.little }
+    b.writeI4(0)                          // reserved
+    Bson.writeCStr(b.out, fullName)       // full name
+    b.writeI4(opts.get("batchsize", 0))   // num to return
+    b.writeI8(this.cursorID)              // cursor ID
+    
+    s := coll.db.connection.getSocket()
+    reqID := coll.db.connection.sendMsg(s.out, b.flip, MongoOp.GET_MORE)
+    readResponse(s.in, reqID)
+  }
+  
+  Void close()
+  {
+    // if we don't have a good ID, or we've read through to completion, and received
+    // a cursorID of 0 as a result, no need to talk to the DB - just mark our state
+    if(this.cursorID != 0) {
+      b := Buf() { endian = Endian.little }
+      b.writeI4(0)                          // reserved
+      b.writeI4(1)                          // number of cursors
+      b.writeI8(this.cursorID)              // cursor ID
+      
+      s := coll.db.connection.getSocket()
+      coll.db.connection.sendMsg(s.out, b.flip, MongoOp.KILL_CURSORS)
+    }
+    cursorID = 0
+    closed = true
+  }
+  
+  **
+  ** Read a response back after having sent a message.
+  ** Update our cursorID, and add any return objects
+  ** to our cache.
+  **
+  private Void readResponse(InStream ins, Int requestID)
+  {
+    // standard header
+    ins.skip(8) // eat the length and the request ID
+    if(requestID != ins.readS4())
+      log.warn("Connection - mismatching request/response IDs")
+    if(MongoOp.REPLY.opcode != ins.readS4())
+      log.warn("Connection - unexpected opcode from DB")
+    // end standard header
+    
+    if(0 != ins.readS4()) // not much doc on what to do if this is not 0...
+      log.warn("Connection - non-zero responseFlag")
+    if((cursorID = ins.readS8()) == 0) // this reply had everything in it...nothing more to get
+      close
+    startingFrom := ins.readS4()
+    numberReturned := ins.readS4()
+    // Sys.out.printLine("cursorID - ${cursorID}, startingFrom - ${startingFrom}, numberReturned - ${numberReturned}")
+    
+    numberReturned.times { cache.add(Bson.read(ins)) }
+  }
+  
+}
+
+
+
+**
+**  DB
+**
+const class DB 
+{
+  const Str name
+  internal const Mongo mongo
+  internal const Connection connection
+  private static const Int[] invalidNameChars := [' ', '.', '\$', '/', '\\']
+  
+  internal static const Str SYS_INDEX_COLL      := "system.indexes"
+  internal static const Str SYS_PROFILE_COLL    := "system.profile"
+  
+  new make(Str name, Mongo mongo)
+  {
+    this.name = validateName(name)
+    this.mongo = mongo
+    this.connection = Connection(this)
+  }
+  
+  private Str validateName(Str name)
+  {
+    invalidNameChars.each |c| {
+      if(name.containsChar(c))
+        throw MongoArgErr("invalid db name - ${name} contains ${c.toChar}")
+    }
+    
+    if(name.isEmpty)
+      throw MongoArgErr("invalid db name - must not be empty")
+    
+    return name
+  }
+  
+  **
+  ** Add a new user to the list of authenticable DB users
+  **
+  Void addUser(Str username, Str password)
+  {
+    users := Collection(this, "system.users")
+    u := users.findOne(["user": username]) ?: ["user": username];
+    u["pwd"] = pwdHash(username, password);
+    users.save(u);
+  }
+  
+  **
+  ** Remove a user from the list of authenticable DB users
+  **
+  Void removeUser(Str username)
+  {
+    Collection(this, "system.users").remove(["user": username])
+  }
+  
+  **
+  ** Return a list of all authenticable DB users
+  **
+  Str[] users()
+  {
+    return Collection(this, "system.users").find().toList().map |Str:Obj? o->Str|{
+      return o["user"]
+    }
+  }
+  
+  **
+  ** Log into the DB - required for certain operations.
+  ** The user must already exist, either via addUser() or some other mechanism
+  **
+  Bool authenticate(Str username, Str password)
+  {
+    res := command(["getnonce": 1])
+    if(!cmdOk(res))
+      throw Err("authenticate - error retrieving nonce: ${res}")
+
+    nonce := res["nonce"]
+    auth := [:] { ordered = true }
+    auth.set("authenticate", 1).set("user", username).set("nonce", nonce) 
+    s := "${nonce}${username}${pwdHash(username, password)}"
+    auth["key"] = Buf().print(s).toDigest("MD5").toHex
+    
+    return cmdOk(command(auth))
+  }
+  
+  private Str pwdHash(Str username, Str password)
+  {
+    return Buf().print("${username}:mongo:${password}").toDigest("MD5").toHex
+  }
+  
+  **
+  ** Logout from a DB session after having logged in via authenticate()
+  **
+  Bool logout()
+  {
+    res := command(["logout": 1])
+    return cmdOk(res)
+  }
+  
+  Collection collection(Str name)
+  {
+    return Collection(this, name)
+  }
+  
+  // so we can say db["collname"]
+  Collection get(Str name)
+  {
+    return Collection(this, name)
+  }
+  
+  **
+  ** Return a list of all the collections in this DB
+  **
+  Str[] collectionNames()
+  {
+    names := Str[,]
+    Collection(this, "system.namespaces").find().toList().each |v, i| {
+      Str s := (v as Map)["name"]
+      if(!s.contains("system.") && !s.contains("\$"))
+        names.add(s)
+    }
+    return names
+  }
+  
+  Bool renameCollection(Str from, Str to)
+  {
+    return true
+  }
+  
+  **
+  ** Return the current profiling level of the DB
+  ** Can be between 0-2.  See setProfilingLevel() to set it.
+  **
+  Int profilingLevel()
+  {
+    Str:Float res := command(["profile":-1])
+    if(!cmdOk(res))
+      throw MongoOpErr("""error while retrieving profiling level - ${res["err"]}""")
+    return res["was"].toInt
+  }
+  
+  **
+  ** Set the profiling level of the DB.
+  ** level must be one of the following options
+  ** 0 - off
+  ** 1 - only slow
+  ** 2 - all
+  **
+  Void setProfilingLevel(Int level)
+  {
+    if(!(0..2).contains(level))
+      throw MongoArgErr("invalid profiling level ${level} - must be between 0 - 2.")
+    command(["profile": level])
+  }
+  
+  List profilingInfo()
+  {
+    return Collection(this, "system.profile").find().toList
+  }
+  
+  Str? lastErr()
+  {
+    res := command(["getlasterror": 1])
+    if(!cmdOk(res))
+      throw MongoOpErr("lastErr() failure - ${res}")
+    return res["err"]
+  }
+  
+  **
+  ** 
+  **
+  Map? previousErr()
+  {
+    Str:Obj? res := command(["getpreverror": 1])
+    return (res["err"] != null) ? res : null
+  }
+  
+  **
+  ** Reset the DB error history as far as previousErr() and 
+  ** lastStatus() are concerned
+  **
+  Void resetErrorHistory()
+  {
+    command(["reseterror": 1])
+  }
+  
+  **
+  ** Retrieve build info about the MongoDB instance being interacted with.
+  ** Keys in the Map returned include "version", "gitVersion", "sysInfo", and "bits"
+  **
+  Str:Obj buildInfo()
+  {
+    res := command(["buildinfo": 1], true)
+    if(!cmdOk(res))
+      throw MongoOpErr("""invalid buildInfo request - ${res["err"]}""")
+    return res
+  }
+  
+  Str:Obj validateCollection(Str coll)
+  {
+    res := command(["validate": name])
+    if(!cmdOk(res))
+      throw MongoOpErr("""Error with validate command: ${res["err"]}""")
+    
+    result := res["result"]
+    if(result isnot Str)
+      throw MongoOpErr("Error with validation data: ${res}")
+    // raise "Error: invalid collection #{name}: #{doc.inspect}" if result =~ /\b(exception|corrupt)\b/i
+    return res
+  }
+  
+  Obj? eval(Str javascript, List args)
+  {
+    cmd := ["\$eval": javascript, "args": args] { ordered = true}
+    doc := command(cmd)
+    if(!cmdOk(doc))
+      return doc["retval"]
+    else
+      throw MongoOpErr("eval failed: ${doc}")
+  }
+  
+  Map? command(Map cmd, Bool admin := false)
+  {
+    if(cmd.keys.size > 1 && !cmd.ordered)
+      throw MongoArgErr("commands with more than one key must be ordered")
+    // negative batchsize means "return the abs value specified and close the cursor" 
+    opts := [:]
+    opts["batchsize"] = -1
+    if(admin == true) opts["admin"] = true
+    return collection("\$cmd").findOne(cmd, opts)
+  }
+  
+  internal static Bool cmdOk(Str:Obj? cmd)
+  {
+    return cmd["ok"] == 1f
+  }
+  
+  Void dropIndex(Str coll, Str idx)
+  {
+    cmd := ["deleteIndexes":coll,
+            "index":name] { ordered = true }
+    res := command(cmd)
+    if(!cmdOk(res))
+      throw MongoOpErr("dropIndex failed: ${cmd}")
+  }
+  
+  Void dropCollection(Str coll)
+  {
+    if(collectionNames.contains("${name}.${coll}")) {
+      res := command(["drop":coll])
+      if(!cmdOk(res))
+        throw MongoOpErr("drop collection failed: ${res}")
+    }
+  }
+  
+  **
+  ** Drop this database - careful!
+  **
+  Void drop()
+  {
+    res := command(["dropDatabase": 1])
+    if(!cmdOk(res))
+      throw MongoOpErr("dropDatabase failed: ${res}")
+  }
+  
+}
+
+
+
+
+**
+**  An invalid argument was passed to a Mongo method
+**
+const class MongoArgErr : Err {
+  new make(Str msg) : super(msg) { }
+}
+
+**
+** A Mongo operation resulted in an error
+**
+const class MongoOpErr : Err {
+  new make(Str msg) : super(msg) { }
+}
+
+
+
+
+
+
+**
+**  MinKey
+**
+class MinKey 
+{
+  
+}
+
+**
+**  MaxKey
+**
+class MaxKey 
+{
+  
+}
+
+
+
+
+
+using inet
+
+internal enum MongoOp { 
+  INVALID                   (-1),
+  REPLY                     (1),
+  QUERY_SLAVE_OK            (4),
+  QUERY_NO_CURSOR_TIMEOUT   (16),
+  MSG                       (1000),
+  UPDATE                    (2001),
+  INSERT                    (2002),
+  GET_BY_OID                (2003),
+  QUERY                     (2004),
+  GET_MORE                  (2005),
+  DELETE                    (2006),
+  KILL_CURSORS              (2007)
+  
+  private new make(Int i) { this.opcode = i }
+  const Int opcode
+}
+
+**
+** Mongo Service.
+**
+const class Mongo : Service
+{
+  const Int port
+  const Str address
+  const ActorPool processorPool := ActorPool()
+  static const Log log := Log.get("mongo")
+
+  new make(Str address := "127.0.0.1", Int port := 27017)
+  { 
+    this.address = address
+    this.port    = port
+  }
+
+  override Void onStart()
+  {
+    log.info("Mongo started on port ${port}")
+    // Actor(listenerPool, |,| { listen }).send(null)
+  }
+
+  override Void onStop()
+  {
+    processorPool.stop
+  }
+  
+  DB db(Str name)
+  {
+    return DB(name, this)
+  }
+  
+  Str[] dbNames()
+  {
+    return ["test", "test2"]
+  }
+  
+  Str version()
+  {
+    return "1.0"
+  }
+  
+  Void dropDB(Str dbName)
+  {
+    
+  }
+  
+  // MongoDB getDB(Str dbName)
+  // {
+  //   
+  // }
+}
+
+
+
+
+using inet
+
+**
+**  ObjectID
+**
+const class ObjectID 
+{
+  static const Int SIZE := 12
+  private const Int inc
+  private const Int time
+  private const Int machine
+  private static const Actor idInc := Actor(ActorPool()) |msg->Int|
+  {
+    Int count := Actor.locals["count"] ?: 1
+    Actor.locals["count"] = count + 1
+    return count
+  }
+  
+  new make()
+  { 
+    this.inc = (idInc.send(null).get(null) as Int).and(0xFFFFFFFF)
+    this.time = DateTime.nowUnique().and(0xFFFFFFFF)
+    this.machine = IpAddr.local().bytes().toDigest("MD5").readS4().and(0xFFFFFFFF)
+  }
+  
+  new fromStream(InStream ins)
+  {
+    this.time = ins.readS4().and(0xFFFFFFFF)
+    this.machine = ins.readS4().and(0xFFFFFFFF)
+    this.inc = ins.readS4().and(0xFFFFFFFF)
+  }
+  
+  new fromStr(Str s)
+  {
+    if(!isValid(s))
+      throw Err("invalid ObjectID format")
+      
+    this.time = s[0..7].toInt(16).and(0xFFFFFFFF)
+    this.machine = s[8..15].toInt(16).and(0xFFFFFFFF)
+    this.inc = s[16..23].toInt(16).and(0xFFFFFFFF)
+  }
+  
+  override Str toStr()
+  {
+    s := StrBuf()
+    [time, machine, inc].each |val, i| {
+      s.add(val.toHex(8))
+    }
+    return s.toStr()
+  }
+  
+  override Bool equals(Obj? o)
+  {
+    if(o isnot ObjectID)
+      return false
+    
+    return ((o as ObjectID).inc == this.inc && 
+            (o as ObjectID).machine == this.machine && 
+            (o as ObjectID).time == this.time)
+  }
+  
+  OutStream write(OutStream o)
+  {
+    return o.writeI4(time).writeI4(machine).writeI4(inc)
+  }
+  
+  static Bool isValid(Str s)
+  {
+    if ( s.size != 24 )
+      return false;
+    
+    // make sure it's all hex digits so we can convert to Ints
+    valid := true
+    s.each |Int c| {
+      if ( c >= '0' && c <= '9' )
+        return
+      if ( c >= 'a' && c <= 'f' )
+        return
+      if ( c >= 'A' && c <= 'F' )
+        return
+      valid = false
+    }
+    return valid
+  }
+  
+}
+
+

fan/bson/Bson.fan

+
+
+
+enum BsonBinType {
+  FUNCTION  (0x01),
+  LENGTH    (0x02),
+  UUID      (0x03),
+  MD5       (0x05),
+  USER      (0x80)
+  
+  private new make(Int i) { this.val = i }
+  const Int val
+}
+
+enum BsonType {
+  MINKEY        (-1),
+  EOO           (0),
+  NUMBER        (1),
+  STRING        (2),
+  OBJECT        (3),
+  ARRAY         (4),
+  BINARY        (5),
+  UNDEFINED     (6),
+  OID           (7),
+  BOOLEAN       (8),
+  DATE          (9),
+  NULL          (10),
+  REGEX         (11),
+  REF           (12),
+  CODE          (13),
+  SYMBOL        (14),
+  CODE_W_SCOPE  (15),
+  NUMBER_INT    (16),
+  TIMESTAMP     (17),
+  NUMBER_LONG   (18),
+  MAXKEY        (127)
+  
+  private new make(Int i) { this.val = i }
+  const Int val
+}
+
+**
+**  Bson
+**
+class Bson 
+{  
+  **
+  ** Write a Fantom object to an OutStream in BSON format.
+  ** The OutStream must have its endian member set to Endian.little
+  ** This isn't checked for performance sake, but the data will not 
+  ** be written correctly if it's not.
+  **
+  static Void write(OutStream out, Map obj)
+  {
+    BsonWriter.write(out, obj)
+  }
+  
+  static Void writeCStr(OutStream out, Str s)
+  {
+    BsonWriter.writeCStr(out, s)
+  }
+  
+  **
+  ** Parse a Fantom object from an InStream of BSON formatted data.
+  ** The InStream must have its endian member set to Endian.little
+  ** This isn't checked for performance sake, but the data will not 
+  ** be read correctly if it's not.
+  **
+  static Str:Obj? read(InStream in)
+  {
+    return BsonReader.read(in)
+  }
+}
+
+

fan/bson/BsonReader.fan

+
+
+
+**
+**  BsonReader
+**
+internal class BsonReader
+{
+  static internal Str:Obj? read(InStream ins)
+  {
+    bson := Str:Obj?[:] { ordered = true }
+    readObject(ins, bson)
+    return bson
+  }
+  
+  static private Int readObject(InStream ins, Str:Obj? bson)
+  {
+    objectlen := ins.readS4()
+    remaining := objectlen - 4
+    while(remaining > 0) {
+      type := ins.read()
+      remaining--
+      // Sys.out.printLine("type - ${type}, remaining - ${remaining}")
+      switch(type) {
+        case BsonType.MINKEY.val:
+          remaining = readMinKey(ins, remaining, bson)
+        case BsonType.EOO.val:
+          break // end of object - return whatever we have
+        case BsonType.NUMBER.val:
+          remaining = readFloat(ins, remaining, bson)
+        case BsonType.SYMBOL.val:
+        case BsonType.STRING.val:
+          remaining = readStr(ins, remaining, bson)
+        case BsonType.OBJECT.val:
+          remaining = readMap(ins, remaining, bson)
+        case BsonType.ARRAY.val:
+          remaining = readArray(ins, remaining, bson)
+        case BsonType.BINARY.val:
+          remaining = readBuf(ins, remaining, bson)
+        case BsonType.OID.val:
+          remaining = readOID(ins, remaining, bson)
+        case BsonType.BOOLEAN.val:
+          remaining = readBool(ins, remaining, bson)
+        case BsonType.TIMESTAMP.val: // both have the same format
+        case BsonType.DATE.val:
+          remaining = readDate(ins, remaining, bson)
+        case BsonType.NULL.val:
+          remaining = readNull(ins, remaining, bson)
+        // case BsonType.REGEX.val:
+        // case BsonType.CODE.val:
+        case BsonType.CODE_W_SCOPE.val:
+          remaining = readCodeWScope(ins, remaining, bson)
+        case BsonType.NUMBER_LONG.val: // both are 64-bit Ints
+        case BsonType.NUMBER_INT.val:
+          remaining = readInt(ins, remaining, bson)
+        case BsonType.MAXKEY.val:
+          remaining = readMaxKey(ins, remaining, bson)
+        default:
+          throw Err("Unknown BSON type received - ${type}")
+      }
+    }
+    return objectlen - remaining
+  }
+  
+  static private Str readCStr(InStream ins)
+  {
+    str := ins.readStrToken(null, |i| { return i == 0 }) // read till we get a null
+    ins.skip(1) // eat the null terminator...worth validating that this is 0?
+    return str
+  }
+  
+  static private Int readStr(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    val := ins.readChars(ins.readU4 - 1)
+    ins.skip(1) // eat the null terminator...worth validating that this is 0?
+    bson[name] = val
+    return remaining - (name.size + 1 + val.size + 1 + 4) // str, null, str, null, strlen
+  }
+  
+  static private Int readInt(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = ins.readS4()
+    return remaining - (name.size + 1 + 4)
+  }
+  
+  static private Int readFloat(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = ins.readF8()
+    return remaining - (name.size + 1 + 8)
+  }
+  
+  static private Int readBool(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = (ins.read() == 0x01) ? true : false
+    return remaining - (name.size + 1 + 1)
+  }
+  
+  static private Int readBuf(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    len := ins.readS4()
+    type := ins.read() // type ... todo - handle special types
+    val := ins.readBufFully(null, len)
+    bson[name] = val
+    return remaining - (name.size + 1 + val.size + 4 + 1) // str, null, data, len, type
+  }
+  
+  static private Int readDate(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = DateTime.fromJava(ins.readS8())
+    return remaining - (name.size + 1 + 8)
+  }
+  
+  static private Int readNull(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = null
+    return remaining - (name.size + 1)
+  }
+  
+  static private Int readOID(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = ObjectID.fromStream(ins)
+    return remaining - (name.size + 1 + ObjectID.SIZE)
+  }
+  
+  static private Int readArray(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    arr := Str:Obj?[:] { ordered = true }
+    arraylen := readObject(ins, arr)
+    list := [,]
+    arr.each |val, key| {
+      list.add(val)
+    }
+    bson[name] = list
+    return remaining - (name.size + 1 + arraylen)
+  }
+  
+  static private Int readMap(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    v := Str:Obj?[:] { ordered = true }
+    objlen := readObject(ins, v)
+    bson[name] = v
+    return remaining - (name.size + 1 + objlen)
+  }
+  
+  static private Int readCodeWScope(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    totalsize := ins.readS4()
+    codestr := ins.readChars(ins.readS4() - 1)
+    ins.skip(1) // eat the null terminator...worth validating that this is 0?
+    codescope := Str:Obj?[:] { ordered = true }
+    scopelen := readObject(ins, codescope)
+    bson[name] = [codestr: codescope]
+    return remaining - (name.size + 1 + totalsize)
+  }
+  
+  static private Int readMinKey(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = MinKey() // todo - figure this out
+    return remaining - (name.size + 1)
+  }
+  
+  static private Int readMaxKey(InStream ins, Int remaining, Str:Obj? bson)
+  {
+    name := readCStr(ins)
+    bson[name] = MaxKey() // todo - figure this out
+    return remaining - (name.size + 1)
+  }
+  
+}
+
+
+

fan/bson/BsonWriter.fan

+
+
+
+**
+**  BsonWriter
+**
+internal class BsonWriter 
+{
+  
+  internal static Void write(OutStream out, Str:Obj? obj)
+  {
+    buf := Buf() { endian = Endian.little }
+    // because we need to know the length of the object before we can 
+    // write it out, we need to assemble it in a Buf first...boo
+    out.writeBuf(writeToBuf(buf, obj))
+  }
+  
+  private static Buf writeToBuf(Buf b, Str:Obj? obj)
+  {
+    lenpos := b.pos()
+    b.writeI4(0) // placeholder for len
+    obj.each |val, key| {
+      writeKeyVal(b.out, key, val)
+    }
+    b.write(BsonType.EOO.val) // end of object
+    b.seek(lenpos).writeI4(b.size)
+    return b.seek(0) // rewind to the beginning of the buf
+  }
+  
+  private static Void writeKeyVal(OutStream out, Str key, Obj? val)
+  {
+    if (val == null) writeNull(out, key)
+    else if (val is Str) writeStr(out, key, val)
+    else if (val is Date) writeDateTime(out, key, val as DateTime) // convert to DateTime
+    else if (val is DateTime) writeDateTime(out, key, val)
+    else if (val is Buf) writeBinary(out, key, val)
+    else if (val is Int) writeInt(out, key, val)
+    else if (val is Decimal) writeFloat(out, key, (val as Decimal).toFloat())
+    // todo - maybe maintain Decimal somehow?
+    else if (val is Float) writeFloat(out, key, val)
+    else if (val is ObjectID) writeOID(out, key, val)
+    else if (val is List) writeArray(out, key, val)
+    else if (val is Bool) writeBool(out, key, val)
+    else if (val is Map) writeMap(out, key, val)
+    // else if (val is Regex) writeRegex(out, key, val)
+    else throw Err("unknown BSON type - can't serialize ${Type.of(val).qname}")
+  }
+  
+  internal static Void writeCStr(OutStream out, Str str)
+  {
+    out.writeChars(str)
+    out.write(0)
+  }
+  
+  private static Void writeStr(OutStream out, Str str, Str val)
+  {
+    out.write(BsonType.STRING.val)
+    writeCStr(out, str)
+    out.writeI4(val.size + 1) // account for null terminator
+    writeCStr(out, val)
+  }
+  
+  private static Void writeNull(OutStream out, Str key)
+  {
+    out.write(BsonType.NULL.val)
+    writeCStr(out, key)
+  }
+  
+  private static Void writeBool(OutStream out, Str key, Bool b)
+  {
+    out.write(BsonType.BOOLEAN.val)
+    writeCStr(out, key)
+    out.write(b ? 0x01 : 0x00)
+  }
+  
+  private static Void writeInt(OutStream out, Str key, Int i)
+  {
+    out.write(BsonType.NUMBER_INT.val)
+    writeCStr(out, key)
+    out.writeI4(i)
+  }
+  
+  private static Void writeFloat(OutStream out, Str key, Float f)
+  {
+    out.write(BsonType.NUMBER.val)
+    writeCStr(out, key)
+    out.writeF8(f)
+  }
+  
+  private static Void writeDateTime(OutStream out, Str key, DateTime d)
+  {
+    out.write(BsonType.DATE.val)
+    writeCStr(out, key)
+    out.writeI8(d.toJava())
+  }
+  
+  // bson array is really a map with stringified indexes
+  private static Void writeArray(OutStream out, Str key, List list)
+  {
+    out.write(BsonType.ARRAY.val)
+    writeCStr(out, key)
+    b := Buf() { endian = Endian.little }
+    m := Str:Obj?[:] { ordered = true }
+    list.each |val, i| {
+      m[i.toStr] = val
+    }
+    out.writeBuf(writeToBuf(b, m))
+  }
+  
+  private static Void writeMap(OutStream out, Str key, Map m)
+  {
+    out.write(BsonType.OBJECT.val)
+    writeCStr(out, key)
+    b := Buf() { endian = Endian.little }
+    out.writeBuf(writeToBuf(b, m))
+  }
+  
+  private static Void writeBinary(OutStream out, Str key, Buf b, BsonBinType bbt := BsonBinType.LENGTH)
+  {
+    out.write(BsonType.BINARY.val)
+    writeCStr(out, key)
+    out.writeI4(b.size)
+    out.write(bbt.val)
+    // don't disturb the pos
+    p := b.pos
+    out.writeBuf(b)
+    b.seek(p)
+  }
+  
+  private static Void writeOID(OutStream out, Str key, ObjectID oid)
+  {
+    out.write(BsonType.OID.val)
+    writeCStr(out, key)
+    oid.write(out)
+  }
+  
+  private static Void writeCodeWScope(OutStream out, Str key, Str code, Str:Obj? scope)
+  {
+    out.write(BsonType.CODE_W_SCOPE.val)
+    writeCStr(out, key)
+    b := Buf() { endian = Endian.little }
+    writeToBuf(b, scope)
+    out.writeI4(8 + code.size + 1 + b.size)   // total size
+    out.writeI4(code.size + 1)                // size of code str
+    writeCStr(out, code)                      // code str
+    out.writeBuf(b)                           // scope
+  }
+  
+  private static Void writeMinKey(OutStream out, Str key)
+  {
+    out.write(BsonType.MINKEY.val)
+    writeCStr(out, key)
+  }
+  
+  private static Void writeMaxKey(OutStream out, Str key)
+  {
+    out.write(BsonType.MAXKEY.val)
+    writeCStr(out, key)
+  }
+  
+}
+
+
+
+**
+** Mongo Driver
+**
+
+@podDepends = [Depend("sys 1.0"), Depend("inet 1.0")]
+@podSrcDirs = [`fan/`, `fan/bson/`, `test/`]
+
+pod mongo {}

test/BsonTest.fan

+
+**
+**  BsonTest
+**
+class BsonTest : Test 
+{
+  Buf b := Buf() { endian = Endian.little }
+  Map map := Str:Obj?[:] { ordered = true }
+  
+  Void cleanSlate()
+  {
+    map.clear
+    b.clear
+  }
+  
+  Void testBasics()
+  {
+    cleanSlate()
+    map["testStr"] = "number1"
+    map["testInt"] = 123
+    map["testFloat"] = 12.3f
+    map["testBool"] = true
+    map["testDateTime"] = DateTime.boot()
+    // map["testNull"] = null
+    map["testOID"] = ObjectID()
+    Bson.write(b.out, map)
+    m := Bson.read(b.flip().in)
+    verify(m == map, "${m} != ${map}")
+  }
+  
+  **
+  **  testBinary
+  **  test this separately since Buf.equals actually means ===
+  **
+  Void testBinary()
+  {
+    cleanSlate
+    map["testBuf"] = Buf.random(56)
+    Bson.write(b.out, map)
+    m := Bson.read(b.flip().in)
+    buf := map["testBuf"] as Buf
+    rebuf := m["testBuf"] as Buf
+    verify(rebuf.size == buf.size && rebuf.pos == buf.pos)
+    while(rebuf.more) {
+      if(rebuf.read() != buf.read())
+        fail()
+    }
+  }
+  
+  Void testOID()
+  {
+    oid := ObjectID()
+    verify(ObjectID.isValid(oid.toStr))
+    
+    // round trip via Str
+    verify(oid == ObjectID.fromStr(oid.toStr))
+    
+    // round trip via Buf
+    b := Buf()
+    oid.write(b.out)
+    k := ObjectID.fromStream(b.flip.in)
+    verify(oid == k, "${oid.toStr} != ${k.toStr}")
+  }
+  
+  Void testObject()
+  {
+    cleanSlate
+    e := Str:Obj?[:] { ordered = true }
+    e["test1"] = 1
+    e["test2"] = 2
+    ee := Str:Obj?[:] { ordered = true }
+    ee["embeddedtest"] = "embedded"
+    e["double"] = ee
+    map["testMap"] = e
+    Bson.write(b.out, map)
+    m := Bson.read(b.flip().in)
+    verify(m == map, "${m} != ${map}")
+  }
+  
+  Void testArray()
+  {
+    cleanSlate
+    list := [,]
+    list.add(1).add(2).add(3).add(4)
+    map["testList"] = list
+    Bson.write(b.out, map)
+    m := Bson.read(b.flip().in)
+    verify(m == map, "${m} != ${map}")
+  }
+  
+  Void testMixed()
+  {
+    cleanSlate
+    list := [,]
+    list.add(1).add(2).add(3).add(4)
+    map["testList"] = list
+    e := Str:Obj?[:] { ordered = true }
+    e["test1"] = 1
+    e["test2"] = 2
+    ee := Str:Obj?[:] { ordered = true }
+    ee["embeddedtest"] = "embedded"
+    e["double"] = ee
+    map["testMap"] = e
+    Bson.write(b.out, map)
+    m := Bson.read(b.flip().in)
+    verify(m == map, "${m} != ${map}")
+  }
+  
+}
+
+
+

test/CollectionTest.fan

+
+
+
+
+**
+**  CollectionTest
+**
+class CollectionTest : Test
+{
+  Mongo mongo := Mongo()
+  DB db := mongo.db("collectotron")
+  Collection c := db["tester"]
+  
+  override Void setup()
+  {
+    c.drop
+  }
+  
+  override Void teardown()
+  {
+    // db.drop
+  }
+  
+  Void testValidNames()
+  {
+    verifyErr(MongoArgErr#) { co := db[""] }
+    verifyErr(MongoArgErr#) { co := db[".bad"] }
+    verifyErr(MongoArgErr#) { co := db["notgood."] }
+  }
+  
+  Void testInsert()
+  {
+    verify(c.insert(["foofoo":567]).containsKey("_id"))
+    verify(c.findOne()["foofoo"] == 567)
+    verifyEq(1, c.find().count())
+    c.insertDocs([["t":1], ["g":2], ["h":3]])
+    verifyEq(4, c.find().count())
+    c.drop
+    verifyEq(0, c.find().count())
+  }
+  
+  Void testLotsOfInserts()
+  {
+    c := db["actortest"]
+    a := Actor(ActorPool()) |Int i| {
+      c.insert(["testincrement": i])
+    }
+    
+    c.insert(["warmup":true]) // make sure the collection has been created, etc.
+    
+    runs := 25
+    runlens := Float[,]
+    runs.times |run| {
+      inserts := 10000
+      start := Duration.now
+      inserts.times |i| {
+        f := a.send(i)
+        if(i == (inserts - 1))
+          f.get
+      }
+      elapsed := (Duration.now - start).toMillis().toFloat
+      stat := inserts.toFloat/(elapsed/1000f)
+      runlens.add(stat)
+      Sys.out.printLine("run ${run}: ${inserts} inserts - ${elapsed} millis (${stat} insert/sec)")
+    }
+    sum := runlens.reduce(0f) |Float r, Float v->Float| { return v + r }
+    Sys.out.printLine("average run time - ${(Float)sum/runlens.size.toFloat}")
+    c.drop
+  }
+  
+}
+
+
+
+
+**
+**  DBTest
+**
+class DBTest : Test 
+{
+  Mongo mongo := Mongo()
+  DB db := mongo.db("testeroo")
+  const Str testuser := "fantester"
+  const Str testpass := "fanpass"
+  
+  override Void setup()
+  {
+    if(!db.users().contains(testuser))
+      db.addUser(testuser, testpass)
+  }
+  
+  override Void teardown()
+  {
+    db.removeUser(testuser)
+    // db.drop
+  }
+  
+  Void testProfiling()
+  {
+    proflevel := db.profilingLevel
+    verify((0..2).contains(proflevel))
+    newlevel := proflevel + 1
+    if(newlevel > 2) newlevel = 0
+    db.setProfilingLevel(newlevel)
+    verify(newlevel == db.profilingLevel)
+    db.setProfilingLevel(proflevel) // reset it
+    
+    s := db.profilingInfo
+    Sys.out.printLine("s - ${s}")
+  }
+  
+  Void testAuthentication()
+  {
+    db.removeUser(testuser)
+    verify(!db.users().contains(testuser))
+    db.addUser(testuser, testpass)
+    verify(db.users().contains(testuser))
+    verify(db.authenticate(testuser, testpass))
+  }
+  
+  Void testBuildInfo()
+  {
+    verify(db.buildInfo().keys().containsAll(["version", "gitVersion", "sysInfo", "bits"]))
+  }
+  
+  Void testErrs()
+  {
+    db.resetErrorHistory
+    verifyNull(db.lastErr)
+    verifyNull(db.previousErr)
+
+    db.command(["forceerror": 1])
+    verifyNotNull(db.lastErr)
+    verifyNotNull(db.previousErr)
+
+    db.command(["forceerror": 1])
+    verifyNotNull(db.lastErr)
+    verifyNotNull(db.previousErr)
+    prevErr := db.previousErr
+    verifyEq(1, prevErr["nPrev"])
+    verifyEq(prevErr["err"], db.lastErr)
+    
+    db["test"].findOne
+    verifyNull(db.lastErr)
+    verifyNotNull(db.previousErr)
+    verifyEq(2, db.previousErr["nPrev"])
+    
+    db.resetErrorHistory
+    verifyNull(db.lastErr)
+    verifyNull(db.previousErr)
+  }
+  
+  Void testCollNames()
+  {
+    newcoll := "newcoll"
+    fullname := "${db.name}.${newcoll}"
+    verify(!db.collectionNames().contains(fullname))
+    db[newcoll].insert(["rando":"tester"])
+    verify(db.collectionNames().contains(fullname))
+    db.dropCollection(newcoll)
+    verify(!db.collectionNames().contains(fullname))
+  }
+  
+}
+
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.