Commits

vadimtsushko committed a4f4058

3

Comments (0)

Files changed (18)

 lib/bson/bson.dart
 tests/bson/BSonTypesTest.dart
 tests/ConnectionTest.dart
-lib/networking/mongo_getmore_message.dart
 lib/mongo.dart
 tests/bson/BsonBinaryTest.dart
+examples/simple.dart
 tests/DbTest.dart

examples/simple.dart

+#import("../lib/mongo.dart");
+#import("dart:builtin");
+main(){
+  Db db = new Db("mongo-dart-test");
+  //Db db = new Db("mongo-dart-test", new ServerConfig("127.0.0.1",27017));
+  db.open();
+  MCollection coll = db.collection("simple_data");
+  // Remove all existing data from collection;
+  coll.remove();
+  for (var n = 0; n<1000; n++){  
+    coll.insert({"my_field":n});
+  }
+  
+  coll.findOne({"my_field": 17}).then((val){
+    print("Filtered by value: $val");     
+  });
+  
+  coll.find({"my_field": {"\$gt": 985}}).each((v)=>print(v)).then((dummy){    
+    db.close();
+  });
+    
+}

lib/bson/bson.dart

 #library("bson.dart");
 //#import("../../../../../../dartrepo/dart-read-only/dart/utils/string_encoding/utf8.dart");
 #import("dart:utf");
+#import("dart:uri");
 #source("bson_type.dart");
 #source("objectid.dart");
 #source("timestamp.dart");

lib/collection.dart

-class MCollection{
-  Db db;
-  String collectionName;
-  MCollection(this.db, this.collectionName){}  
-  String fullName() => "${db.databaseName}.$collectionName";
-  saveAll(List<Map> documents){
-    MongoInsertMessage insertMessage = new MongoInsertMessage(fullName(),documents);
-    return db.executeDbCommand(insertMessage);
-  } 
-  save(Map document){
-    return saveAll([document]);  
-  } 
-  find([Map selector = const {}, Map fields = null, Map sort, int skip = 0,int limit = 30, bool hint = false, bool explain = false] ){
-    return new Cursor(db, this);//, [selector, skip, limit,sort, hint, explain]);
-  }
-  drop() => db.dropCollection(collectionName);
-}
+typedef MonadicBlock(var value);
 class Cursor{
 /**
  * Init state
   int state = INIT;
   int cursorId = 0;
   Db db;
-  List items;
+  Queue items;
   MCollection collection;
   Map selector;
   Map fields;
   int limit;
   Map sort;
   Map hint;
-  var eachCallback;
+  MonadicBlock eachCallback;
   var eachComplete;
   bool explain;
   int flags = 0;  
   , this.sort, this.hint, this.explain]){
     if (selector === null){
       selector = {};
+    } else{
+      if (!selector.containsKey("\$query")){
+        selector = {"\$query": selector};
+      }          
     }
-    items = [];
+    items = new Queue();
   }
   MongoQueryMessage generateQueryMessage(){
     return new  MongoQueryMessage(collection.fullName(),
             selector,
             fields);
   }
+  MongoGetMoreMessage generateGetMoreMessage(){
+    return new  MongoGetMoreMessage(collection.fullName(),
+            cursorId);
+  }
+  
+  
   getNextItem(){
-    return items.removeLast();
+    return items.removeFirst();
   }
   Future nextObject(){
     if (state == INIT){
       return nextItem.future;
     }  
     else if (state == OPEN && items.length > 0){
+      Completer nextItem = new Completer();
+      var qm = generateGetMoreMessage();
+      Future<MongoReplyMessage> reply = db.executeQueryMessage(qm);
+      reply.then((replyMessage){
+        state = OPEN;
+        cursorId = replyMessage.cursorId;
+        items.addAll(replyMessage.documents);
+        if (items.length > 0){
+          nextItem.complete(getNextItem());
+        }
+        else{
+          nextItem.complete(null);
+        }
+      });
+      return nextItem.future;
+    }
+    else if (state == OPEN && cursorId > 0){
       return new Future.immediate(getNextItem());
     }
+    
     else {
       state = CLOSED;
       return new Future.immediate(null);
       }            
     });
   }
-  Future<bool> each(callback){
+  
+  Future<bool> each(MonadicBlock callback){
     eachCallback = callback; 
     eachComplete = new Completer();
     nextEach();
     return eachComplete.future;
   }
+  Future<List<Map>> toList(){
+    List<Map> result = [];
+    Completer completer = new Completer();
+    this.each((v)=>result.addLast(v)).then((v)=>completer.complete(result));
+    return completer.future;    
+  }
 }
   String databaseName;
   ServerConfig serverConfig;
   Connection connection;
-  validateDatabaseName(String databaseName) {
-    if(databaseName.length === 0) throw "database name cannot be the empty string";  
+  validateDatabaseName(String dbName) {
+    if(dbName.length === 0) throw "database name cannot be the empty string";  
     var invalidChars = [" ", ".", "\$", "/", "\\"];
     for(var i = 0; i < invalidChars.length; i++) {
-      if(databaseName.indexOf(invalidChars[i]) != -1) throw new Exception("database names cannot contain the character '" + invalidChars[i] + "'");
+      if(dbName.indexOf(invalidChars[i]) != -1) throw new Exception("database names cannot contain the character '" + invalidChars[i] + "'");
     }
   }    
   Db(this.databaseName, [this.serverConfig]){
   MCollection collection(String collectionName){
       return new MCollection(this,collectionName);
   }
-  Future<Map> executeQueryMessage(MongoQueryMessage queryMessage){
+  Future<MongoReplyMessage> executeQueryMessage(MongoMessage queryMessage){
     return connection.query(queryMessage);
   }  
   executeMessage(MongoMessage message){
           if (replyMessage.documents[0].containsKey("errmsg")){
             errMsg = replyMessage.documents[0]["errmsg"];
           }
-          print(errMsg);
+          print("Error: $errMsg");
           result.complete(false);
         }         
       });
     return result.future;        
-  }  
-  Future<bool> dropCollection(String collectionName){    
-    return executeDbCommand(DbCommand.createDropCollectionCommand(this,collectionName));
-  }  
+  }
+  Future<bool> dropCollection(String collectionName){
+    Completer completer = new Completer();
+    collectionsInfoCursor(collectionName).toList().then((v){
+      if (v.length == 1){
+//        print("drop collection");
+        executeDbCommand(DbCommand.createDropCollectionCommand(this,collectionName))
+          .then((res)=>completer.complete(res));
+        } else{
+          completer.complete(true);
+        }  
+    });    
+    return completer.future;    
+  }
+  removeFromCollection(String collectionName, [Map selector = const {}]){
+    connection.execute(new MongoRemoveMessage("$databaseName.$collectionName", selector));    
+  }    
+  
   Future<bool> getLastError(){    
     return executeDbCommand(DbCommand.createGetLastErrorCommand(this));
   }  
-
+  close(){
+    connection.close();
+  }
+  
+  Cursor collectionsInfoCursor([String collectionName]) {
+    Map selector = {};
+    // If we are limiting the access to a specific collection name
+    if(collectionName !== null){
+      selector["name"] = this.databaseName + "." + collectionName;
+    }  
+    // Return Cursor
+      return new Cursor(this, new MCollection(this, DbCommand.SYSTEM_NAMESPACE_COLLECTION), selector);      
+  }    
 }

lib/mcollection.dart

+class MCollection{
+  Db db;
+  String collectionName;
+  MCollection(this.db, this.collectionName){}  
+  String fullName() => "${db.databaseName}.$collectionName";
+  saveAll(List<Map> documents){
+    insertAll(documents);
+  } 
+  save(Map document){
+    return saveAll([document]);  
+  }
+  insertAll(List<Map> documents){
+    MongoInsertMessage insertMessage = new MongoInsertMessage(fullName(),documents);
+    return db.executeDbCommand(insertMessage);   
+  }
+  insert(Map document){
+    return insertAll([document]);
+  } 
+  Cursor find([Map selector = const {}, Map fields = null, Map sort, int skip = 0,int limit = 30, bool hint = false, bool explain = false] ){
+    return new Cursor(db, this, selector);//, [selector, skip, limit,sort, hint, explain]);
+  }
+  findOne([Map selector = const {}, Map fields = null, Map sort, int skip = 0,int limit = 30, bool hint = false, bool explain = false] ){
+    return find(selector,fields,sort,skip,limit,hint,explain).nextObject();
+  }
+  drop() => db.dropCollection(collectionName);
+  remove([Map selector = const {}]) => db.removeFromCollection(collectionName, selector);
+}
 #source("networking/mongo_query_message.dart");
 #source("networking/mongo_reply_message.dart");
 #source("networking/mongo_insert_message.dart");
+#source("networking/mongo_remove_message.dart");
+#source("networking/mongo_getmore_message.dart");
 #source("networking/server_config.dart");
 #source("networking/dbcommand.dart");
 #source("db.dart");
-#source("collection.dart");
+#source("mcollection.dart");
 #source("cursor.dart");

lib/networking/connection.dart

 class Connection{
+  Map<int,Completer<MongoReplyMessage>> replyCompleters;
   Binary lengthBuffer;
   ServerConfig serverConfig;
   Binary bufferToSend;
   Queue<Binary> sendQueue;
   Binary messageBuffer;
-  Socket socket;
-  Completer replyCompleter;
+  Socket socket;  
   Connection([this.serverConfig]){
     if (serverConfig === null){
       serverConfig = new ServerConfig();
     }    
   }
   connect(){
+    replyCompleters = new Map();
     sendQueue = new Queue();
     socket = new Socket(serverConfig.host, serverConfig.port);
     if (socket == null) {
     }
     lengthBuffer = new Binary(4);
   }
+  close(){
+    socket.onData = null;
+    socket.onWrite = null;
+    socket.onError = null;
+    socket.close();
+//    print(replyCompleters);
+    replyCompleters.clear();    
+  }
   int sendData(Binary msg){
     while (msg.offset != msg.bytes.length){      
       msg.offset += socket.writeList(msg.bytes,
       bufferToSend.offset += socket.writeList(bufferToSend.bytes,
         bufferToSend.offset,bufferToSend.bytes.length-bufferToSend.offset);
       if (!bufferToSend.atEnd()){
-        print("${bufferToSend.offset}");
+//        print("${bufferToSend.offset}");
       }      
-      new Timer(0,(t)=>sendBufferFromTimer());
+//      new Timer(0,(t)=>sendBufferFromTimer());
+      sendBuffer("Recursevly");
     }        
     else {
-      print(sendQueue);
       socket.onWrite = null;        
-      socket.onClosed = null;        
-      socket.onError = null;        
     }    
   }
   
     }
     messageBuffer.offset += socket.readList(messageBuffer.bytes,messageBuffer.offset,messageBuffer.bytes.length-messageBuffer.offset);
     if (messageBuffer.atEnd()){
-      socket.onData = null;
-      socket.onError = null;
+//      socket.onData = null;
+//      socket.onError = null;
       MongoReplyMessage reply = new MongoReplyMessage();
       messageBuffer.rewind();
       reply.deserialize(messageBuffer);
-      replyCompleter.complete(reply);
+      messageBuffer = null;
+      lengthBuffer.rewind();
+      Completer completer = replyCompleters.remove(reply.responseTo);      
+      if (completer !== null){
+        completer.complete(reply);       
+      }
+      else {
+        print("Unexpected respondTo: ${reply.responseTo} ${reply.documents[0]}");
+      }  
     }   
   }
-  Future<Map> query(MongoMessage queryMessage){
-    replyCompleter = new Completer();    
+  Future<MongoReplyMessage> query(MongoMessage queryMessage){
+    Completer completer = new Completer();
+    replyCompleters[queryMessage.requestId] = completer;    
     Binary buffer = queryMessage.serialize();      
     socket.onData = receiveData;
-    sendData(buffer);
-    return replyCompleter.future;
+//    sendData(buffer);
+    sendQueue.addLast(buffer);
+    sendBuffer("From query");
+    return completer.future;
   }
   execute(MongoMessage message){    
     sendQueue.addLast(message.serialize());    

lib/networking/mongo_getmore_message.dart

-class MongoGetmoreMessage extends MongoMessage{
+class MongoGetMoreMessage extends MongoMessage{
   BsonCString _collectionFullName;
   int cursorId;    
   int numberToReturn;
-  MongoGetmoreMessage(String collectionFullName,
+  MongoGetMoreMessage(String collectionFullName,
             this.cursorId,
             [this.numberToReturn = 20]
             ){
     buffer.writeInt(0);
     _collectionFullName.packValue(buffer);
     buffer.writeInt(numberToReturn);
-    buffer.write64(cursorId);
+    buffer.writeInt64(cursorId);
     buffer.offset = 0;
     return buffer;
   }

lib/networking/mongo_insert_message.dart

 class MongoInsertMessage extends MongoMessage{
   BsonCString _collectionFullName;
   int flags;  
-  List<BSonMap> _documents;
+  List<BsonMap> _documents;
   MongoInsertMessage(String collectionFullName,            
             List<Map> documents,
             [this.flags = 0]

lib/networking/mongo_remove_message.dart

+class MongoRemoveMessage extends MongoMessage{
+  BsonCString _collectionFullName;
+  int flags;  
+  BsonMap _selector;
+  MongoRemoveMessage(String collectionFullName,            
+            [Map selector = const {},
+            this.flags = 0]
+            ){
+    _collectionFullName = new BsonCString(collectionFullName);
+    _selector = new BsonMap(selector);      
+    opcode = MongoMessage.Delete;
+  }
+  int get messageLength(){
+    return 16+4+_collectionFullName.byteLength()+4+_selector.byteLength();
+  }
+  Binary serialize(){
+    Binary buffer = new Binary(messageLength);
+    writeMessageHeaderTo(buffer);
+    buffer.writeInt(0);
+    _collectionFullName.packValue(buffer);
+    buffer.writeInt(flags);
+    _selector.packValue(buffer);       
+    buffer.offset = 0;
+    return buffer;
+  }
+}

lib/networking/mongo_reply_message.dart

 class MongoReplyMessage extends MongoMessage{
   BsonCString _collectionFullName;
   int responseFlags;
-  // 64bit integer
-  int cursorId =-1; 
+  int cursorId =-1; // 64bit integer 
   int startingFrom;
   int numberReturned = -1;
   List documents;  
 #import("dart:io");
 #import("dart:utf");
+#import("dart:builtin");
 main(){
   print(decodeUtf8([123,2443,34]));
 }

tests/ConnectionTest.dart

   Connection conn = new Connection();
   conn.connect();
   MongoQueryMessage queryMessage = new MongoQueryMessage("db.\$cmd",0,0,1,{"ping":1},null);
-  Future<Map> mapFuture = conn.query(queryMessage);
-  mapFuture.then((msg) {
+  var replyFuture = conn.query(queryMessage);
+  replyFuture.then((msg) {
     Expect.mapEquals({'ok': 1.0},msg.documents[0]);
+    conn.close();    
   });
 }
 testStudent(){
   Connection conn = new Connection();
   conn.connect();
-  MongoQueryMessage queryMessage = new MongoQueryMessage("test.student",0,0,10,{},null);
-  Future<Map> mapFuture = conn.query(queryMessage);
-  mapFuture.then((msg) {
+  MongoQueryMessage queryMessage = new MongoQueryMessage("test.student",0,0,10,{"name":"Daniil"},null);
+  Future<MongoReplyMessage> replyFuture = conn.query(queryMessage);
+  replyFuture.then((msg) {
     for (var each in msg.documents){
       print(each);
-    }      
+    }
+    conn.close();
   });
 }
 testGetMore(){

tests/CursorTest.dart

   db.open();
   MCollection collection = db.collection('\$cmd');
   Cursor cursor = new Cursor(db,collection,{"ping":1},limit:1);  
-  Connection conn = new Connection();
-  conn.connect();  
   MongoQueryMessage queryMessage = cursor.generateQueryMessage();
-  Future<Map> mapFuture = conn.query(queryMessage);
+  Future mapFuture = db.connection.query(queryMessage);
   mapFuture.then((msg) {
- //   print(msg.documents);
+//    print(msg.documents);
     Expect.mapEquals({'ok': 1.0},msg.documents[0]);
+    db.close();
   });
 }
 testNextObject(){
   });  
 */  
 }
-testEachWithGetMore(){
+testCursorWithOpenServerCursor(){
   var res;
   Db db = new Db('test');
   db.open();
-  MCollection collection = db.collection('newColl1');
+  MCollection collection = db.collection('new_big_collection');
+  collection.remove();
+  for (int n=0;n < 1000; n++){
+    collection.insert({"a":n});
+  }
   Cursor cursor = new Cursor(db,collection,limit:10);  
-  cursor.each((v) => print(v)).then((v){
-    Expect.isTrue(v);
-    Expect.isTrue(cursor.state == Cursor.CLOSED);
+  cursor.nextObject().then((v){
+    print(v);
+    Expect.isTrue(cursor.state == Cursor.OPEN);
     print("CursorId = ${cursor.cursorId}");
+    Expect.isTrue(cursor.cursorId > 0);
+    db.close();
     });
-  Expect.isTrue(cursor.state == Cursor.INIT);
+}
+testCursorGetMore(){
+  var res;
+  Db db = new Db('test');
+  db.open();
+  MCollection collection = db.collection('new_big_collection');
+  collection.remove();
+  for (int n=0;n < 1000; n++){
+    collection.insert({"a":n});
+  }
+  int count = 0;
+  Cursor cursor = new Cursor(db,collection,limit:10);  
+  cursor.each((v)=>count++).then((v){
+    print(count);
+    Expect.equals(1000, count);
+    Expect.equals(0,cursor.cursorId);
+    Expect.equals(Cursor.CLOSED,cursor.state);
+    db.close();
+    });
 }
 
 main(){
   testCursorCreation();
   testPingRaw();
-  testNextObject();
-  testNextObjectToEnd();
-  testEach();
+  testCursorWithOpenServerCursor();
+  testCursorGetMore();
+//  testNextObject();
+//  testNextObjectToEnd();
+//  testEach();
 //  testEachWithGetMore();
 }

tests/DbTest.dart

 #import("../lib/mongo.dart");
+#import('dart:builtin');
 testDatabaseName(){
   Db db = new Db('db');
   String dbName;
   dbName = 'db';
   db.validateDatabaseName(dbName);  
 }
+testCollectionInfoCursor(){
+  Db db = new Db('test');
+  db.open();
+  MCollection newColl = db.collection("new_collecion");
+  newColl.drop();
+  newColl.insertAll([{"a":1}]);
+  bool found = false;
+  db.collectionsInfoCursor("new_collecion").toList().then((v){
+    Expect.isTrue(v.length == 1);
+//    newColl.drop();
+    db.close();
+  });
+}
+testRemove(){
+  Db db = new Db('test');
+  db.open();  
+  db.removeFromCollection("new_collecion_to_remove");
+  MCollection newColl = db.collection("new_collecion_to_remove");  
+  newColl.insertAll([{"a":1}]);
+  db.collectionsInfoCursor("new_collecion_to_remove").toList().then((v){    
+    Expect.isTrue(v.length == 1);
+    db.removeFromCollection("new_collecion_to_remove");
+    //db.getLastError().then((v)=>print("remove result: $v"));
+    newColl.find().toList().then((v){
+      Expect.isTrue(v.isEmpty());
+      newColl.drop();
+      db.close();
+   });
+  });
+}
+
 main(){
   testDatabaseName();
+  testCollectionInfoCursor();
+  testRemove();
 }

tests/MCollectionTest.dart

 #import("../lib/mongo.dart");
 #import("dart:io");
+#import("dart:builtin");
 testCollectionCreation(){
   Db db = new Db('db');
   MCollection collection = db.collection('student');
     newColl.save({"a":i});
   }
 }
-testFindEach(){
+testEach(){
   Db db = new Db('test');
   db.open();  
   MCollection newColl = db.collection('newColl1');
     {sum += v["a"]; count++;
   }).then((v)=>print("Completed. Sum = $sum, count = $count"));
 }
-testFindEachStudent(){
+testFindEachWithThenClause(){
   Db db = new Db('test');
   db.open();  
-  MCollection newColl = db.collection('student');
+  MCollection students = db.collection('students');
+  students.drop();
+  students.insertAll(
+    [
+     {"name":"Vadim","score":4},
+     {"name": "Daniil","score":4},
+     {"name": "Nick", "score": 5}
+    ]
+  );
   int count = 0;
   int sum = 0;
-  newColl.find().each((v)
+  students.find().each((v)
     {sum += v["score"]; count++;
-  }).then((v)=>print("Students Completed. Sum = $sum, count = $count"));
+  }).then((v){
+    print("Students Completed. Sum = $sum, count = $count average score = ${sum/count}");    
+    db.close();
+  });
 }
-
+testFindEach(){
+  Db db = new Db('test');
+  db.open();  
+  MCollection students = db.collection('students');
+  students.drop();
+  students.insertAll(
+    [
+     {"name":"Vadim","score":4},
+     {"name": "Daniil","score":4},
+     {"name": "Nick", "score": 5}
+    ]
+  );
+  db.getLastError().then((v){
+  print(v);
+  print("there");  
+  int count = 0;
+  int sum = 0;
+  students.find().each((v) => print("student: $v")).then((v){
+    db.close();
+   });
+  });
+}
+testDrop(){
+  Db db = new Db('test');
+  db.open();
+  db.dropCollection("students").then((v)=>print("deleted"));
+  db.dropCollection("students").then((v){
+    db.close();
+  });  
+}
 main(){
+  //testFindEachWithThenClause();
+  testDrop();
+  testFindEach();
+//  testDrop();
+/*  
   testFindEach();
   testFindEachStudent();
-/*  testCollectionCreation();
+  testCollectionCreation();
   testSave();
   testSaveAll();  
 */