Commits

vadimtsushko committed f35d109

1

Comments (0)

Files changed (24)

+tests/bson/BsonObjectIdTest.dart
+lib/networking/mongo_update_message.dart
+tests/AdHoq.dart
+tests/CursorTest.dart
+tests/DbCommandTest.dart
+tests/bson/BsonImplTest.dart
+tests/MCollectionTest.dart
+lib/bson/bson.dart
+tests/bson/BSonTypesTest.dart
+tests/ConnectionTest.dart
+lib/networking/mongo_getmore_message.dart
+lib/mongo.dart
+tests/bson/BsonBinaryTest.dart
+tests/DbTest.dart
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>mongo-dart</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>com.google.dart.tools.core.dartBuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>com.google.dart.tools.core.dartNature</nature>
+	</natures>
+</projectDescription>

lib/bson/binary.dart

     encodeInt(offset,value, 1,false,false);
     offset += 1;
   }
-  int writeDouble(){    
-    bytes.setFloat64(offset);
+  int writeDouble(double value){    
+    bytes.setFloat64(offset, value);
     offset+=8;
   } 
-  int writeInt64(){    
-    bytes.setInt64(offset);
+  int writeInt64(int value){    
+    bytes.setInt64(offset, value);
     offset+=8;
   } 
   int readByte(){    
     return new String.fromCharCodes(stringBytes);
   }  
 
-  warn(String msg){
-    print("Warning!!! $msg");
-  }
   int byteLength() => bytes.length;
   bool atEnd() => offset == bytes.length;
   rewind(){

lib/bson/bson.dart

 #library("bson.dart");
 //#import("../../../../../../dartrepo/dart-read-only/dart/utils/string_encoding/utf8.dart");
-#import("dart:utf8");
+#import("dart:utf");
 #source("bson_type.dart");
 #source("objectid.dart");
 #source("timestamp.dart");
 #source("bson_array.dart");
 #source("bson_impl.dart");
 #source("bson_double.dart");
+#source("bson_null.dart");
 
 

lib/bson/bson_double.dart

 class BsonDouble extends BsonObject{
-  num data;
+  double data;
   BsonDouble(this.data);
   get value()=>data;
   byteLength()=>8;

lib/bson/bson_impl.dart

   static final BSON_INT32_MAX = 0x7FFFFFFF;
   static final BSON_INT32_MIN = -0x80000000;
 
-  static final BSON_INT64_MAX = Math.pow(2, 63) - 1;
-  static final BSON_INT64_MIN = -Math.pow(2, 63);
+  //static final BSON_INT64_MAX = Math.pow(2, 63) - 1;
+  //static final BSON_INT64_MIN = -Math.pow(2, 63);
 
   // JS MAX PRECISE VALUES
   static final JS_INT_MAX = 0x20000000000000;  // Any integer up to 2^53 can be precisely represented by a double.

lib/bson/bson_null.dart

+class BsonNull extends BsonObject{  
+  BsonNull();
+  get value()=>null;
+  byteLength()=>0;
+  int get typeByte() => BSON.BSON_DATA_NULL;
+  packValue(Binary buffer){     
+  }
+  unpackValue(Binary buffer){  
+  }
+}

lib/bson/bson_string.dart

   unpackValue(Binary buffer){
      int size = buffer.readInt32()-1; 
      List<int> utf8Bytes = buffer.bytes.getRange(buffer.offset,size);
-     data = new String.fromCharCodes(Utf8Decoder.decodeUtf8(utf8Bytes));
+     data = decodeUtf8(utf8Bytes);
      buffer.offset += size+1;
   }
 

lib/bson/bson_type.dart

     result.value = value;
     return result;
   }
-  uppackValue(var buffer){ throw const Exception("must be implemented");}
+  unpackValue(var buffer){ throw const Exception("must be implemented");}
   get value()=>null;
 }
 
   if (value is List){
     return new BsonArray(value);
   }        
+  if (value === null){
+    return new BsonNull();
+  }        
   throw new Exception("Not implemented for $value");           
 }  
 
       return new BsonMap({});
     case BSON.BSON_DATA_OID:
       return new ObjectId();
+    case BSON.BSON_DATA_NULL:
+      return new BsonNull();
 
     default:
       throw new Exception("Not implemented for BSON TYPE $typeByte");           

lib/collection.dart

   String collectionName;
   MCollection(this.db, this.collectionName){}  
   String fullName() => "${db.databaseName}.$collectionName";
-  save(List<Map> documents){
+  saveAll(List<Map> documents){
     MongoInsertMessage insertMessage = new MongoInsertMessage(fullName(),documents);
-    db.executeMessage(insertMessage);
+    return db.executeDbCommand(insertMessage);
   } 
+  save(Map document){
+    return saveAll([document]);  
+  } 
+  find([Map selector = const {}, Map fields = null, Map sort, int skip = 0,int limit = 30, bool hint = false, bool explain = false] ){
+    return new Cursor(db, this);//, [selector, skip, limit,sort, hint, explain]);
+  }
+  drop() => db.dropCollection(collectionName);
 }
   var eachComplete;
   bool explain;
   int flags = 0;  
-  Cursor(this.db, this.collection, [this.selector, this.fields, this.skip=0, this.limit=1
+  Cursor(this.db, this.collection, [this.selector, this.fields, this.skip=0, this.limit=50
   , this.sort, this.hint, this.explain]){
     if (selector === null){
       selector = {};
       Future<MongoReplyMessage> reply = db.executeQueryMessage(qm);
       reply.then((replyMessage){
         state = OPEN;
+        cursorId = replyMessage.cursorId;
         items.addAll(replyMessage.documents);
         if (items.length > 0){
           nextItem.complete(getNextItem());
       return new Future.immediate(null);
     }
   }
-  nextTick(Timer timer){
+  nextEach(){
     nextObject().then((val){
       if (val === null){
-          eachCallback = null;
-         eachComplete.complete(true);
+        eachCallback = null;
+        eachComplete.complete(true);
       } else {
-            eachCallback(val);
-            nextTick(null);
-//            new Timer(nextTick,0);
+        eachCallback(val);
+        nextEach();
       }            
     });
   }
   Future<bool> each(callback){
     eachCallback = callback; 
     eachComplete = new Completer();
-//     new Timer(nextTick,0);
-    nextTick(null);
+    nextEach();
     return eachComplete.future;
   }
 }
     return connection.query(queryMessage);
   }  
   executeMessage(MongoMessage message){
-    connection.execute(message);
+    return connection.execute(message);
   }    
   open(){
     connection.connect();
   }
+  Future<bool> executeDbCommand(MongoMessage message){
+      Completer<bool> result = new Completer();
+      connection.query(message).then((replyMessage){
+        if (replyMessage.documents[0]["ok"] == 1.0){
+          print(replyMessage.documents[0]);
+          result.complete(true);
+        } else {
+          String errMsg = "Error executing Db command";
+          if (replyMessage.documents[0].containsKey("errmsg")){
+            errMsg = replyMessage.documents[0]["errmsg"];
+          }
+          print(errMsg);
+          result.complete(false);
+        }         
+      });
+    return result.future;        
+  }  
+  Future<bool> dropCollection(String collectionName){    
+    return executeDbCommand(DbCommand.createDropCollectionCommand(this,collectionName));
+  }  
+  Future<bool> getLastError(){    
+    return executeDbCommand(DbCommand.createGetLastErrorCommand(this));
+  }  
+
 }
 #source("networking/mongo_reply_message.dart");
 #source("networking/mongo_insert_message.dart");
 #source("networking/server_config.dart");
+#source("networking/dbcommand.dart");
 #source("db.dart");
 #source("collection.dart");
 #source("cursor.dart");

lib/networking/connection.dart

       if (!bufferToSend.atEnd()){
         print("${bufferToSend.offset}");
       }      
-      new Timer((t)=>sendBufferFromTimer(),0);
+      new Timer(0,(t)=>sendBufferFromTimer());
     }        
     else {
       print(sendQueue);
       replyCompleter.complete(reply);
     }   
   }
-  Future<Map> query(MongoQueryMessage queryMessage){
+  Future<Map> query(MongoMessage queryMessage){
     replyCompleter = new Completer();    
     Binary buffer = queryMessage.serialize();      
     socket.onData = receiveData;

lib/networking/dbcommand.dart

+class DbCommand extends MongoQueryMessage{
+  // Constants
+  static final SYSTEM_NAMESPACE_COLLECTION = "system.namespaces";
+  static final SYSTEM_INDEX_COLLECTION = "system.indexes";
+  static final SYSTEM_PROFILE_COLLECTION = "system.profile";
+  static final SYSTEM_USER_COLLECTION = "system.users";
+  static final SYSTEM_COMMAND_COLLECTION = "\$cmd";
+
+  Db db;  
+  DbCommand(this.db, collectionName, flags, numberToSkip, numberToReturn, query, fields)
+    :super(collectionName,flags, numberToSkip, numberToReturn, query, fields){      
+    _collectionFullName = new BsonCString("${db.databaseName}.$collectionName");      
+  }
+  static createDropCollectionCommand(Db db, String collectionName) {
+    return new DbCommand(db, SYSTEM_COMMAND_COLLECTION, MongoQueryMessage.OPTS_NO_CURSOR_TIMEOUT, 0, -1, {'drop':collectionName}, null);
+  }
+  static createPingCommand(Db db) {
+    return new DbCommand(db, SYSTEM_COMMAND_COLLECTION, MongoQueryMessage.OPTS_NO_CURSOR_TIMEOUT, 0, 1, {'ping':1}, null);
+  }  
+  static createGetLastErrorCommand(Db db) {
+    return new DbCommand(db, SYSTEM_COMMAND_COLLECTION, MongoQueryMessage.OPTS_NO_CURSOR_TIMEOUT, 0, 1, {"getlasterror":1}, null);
+  }  
+
+}

lib/networking/mongo_getmore_message.dart

+class MongoGetmoreMessage extends MongoMessage{
+  BsonCString _collectionFullName;
+  int cursorId;    
+  int numberToReturn;
+  MongoGetmoreMessage(String collectionFullName,
+            this.cursorId,
+            [this.numberToReturn = 20]
+            ){
+    _collectionFullName = new BsonCString(collectionFullName);    
+    opcode = MongoMessage.GetMore;
+  }
+  int get messageLength(){
+    return 16+4+_collectionFullName.byteLength()+4+8;
+  }
+  Binary serialize(){
+    Binary buffer = new Binary(messageLength);
+    writeMessageHeaderTo(buffer);
+    buffer.writeInt(0);
+    _collectionFullName.packValue(buffer);
+    buffer.writeInt(numberToReturn);
+    buffer.write64(cursorId);
+    buffer.offset = 0;
+    return buffer;
+  }
+}

lib/networking/mongo_query_message.dart

 class MongoQueryMessage extends MongoMessage{
+static final OPTS_NONE = 0;
+static final OPTS_TAILABLE_CURSOR = 2;
+static final OPTS_SLAVE = 4;
+static final OPTS_OPLOG_REPLY = 8;
+static final OPTS_NO_CURSOR_TIMEOUT = 16;
+static final OPTS_AWAIT_DATA = 32;
+static final OPTS_EXHAUST = 64;
+
+
   BsonCString _collectionFullName;
   int flags;
   int numberToSkip;
   int numberToReturn;
   BsonMap _query;
   BsonMap _fields;
+  BsonCString get collectionNameBson() => _collectionFullName;
   MongoQueryMessage(String collectionFullName,
             this.flags,
             this.numberToSkip,

lib/networking/mongo_reply_message.dart

       documents[n] = doc.value;
     }
   }
-  int get messageLength(){
-    int result = 16+4+_collectionFullName.byteLength()+4+4+_query.byteLength();
-    if (_fields !== null){
-      result += _fields.byteLength();
-    }
-    return result;
-  }
-  Binary serialize(){
-    Binary buffer = new Binary(messageLength);
-    writeMessageHeaderTo(buffer);
-    buffer.writeInt(flags);
-    _collectionFullName.packValue(buffer);
-    buffer.writeInt(numberToSkip);
-    buffer.writeInt(numberToReturn);
-    _query.packValue(buffer);
-    buffer.offset = 0;
-    return buffer;
-  }
 }
+#import("dart:io");
+#import("dart:utf");
+main(){
+  print(decodeUtf8([123,2443,34]));
+}
+

tests/CompleterTest.dart

-#import("dart:io");
-main(){  
-  Completer c = new Completer();
-  new Timer((timer){c.complete('Tra');},4000);
-
-  var f  = c.future; 
-  //c.complete('Tra');
-  waitFor(f){    
-    f.then((v){return v;});
-  }  
-  print(waitFor(f));
-}

tests/ConnectionTest.dart

 #import("../lib/mongo.dart");
 #import("../lib/bson/bson.dart");
 #import("dart:io");
+#import('dart:builtin');
 testPing(){
   Connection conn = new Connection();
   conn.connect();
     }      
   });
 }
+testGetMore(){
 
+}
 main(){
   testPing();
   testStudent();

tests/CursorTest.dart

 #import("../lib/mongo.dart");
+#import("dart:io");
+#import('dart:builtin'); 
 testCursorCreation(){
   Db db = new Db('db');
   MCollection collection = db.collection('student');
 
 testEach(){
   var res;
+  int sumScore = 0;
+  int count = 0;
+  var futures = new List();
   Db db = new Db('test');
   db.open();
-  MCollection collection = db.collection('student');
+  
+  MCollection collection = db.collection('student');  
+  //collection.drop().chain((v){
+/*  new Future.immediate(true).chain((v){    
+  print("there");  
+  return db.getLastError();}).then((v){
+  print("here");
+  collection.saveAll([{"name":"Daniil","score":4},{"name":"Nick","score":5}]);
+  db.getLastError().then((m)
+  {
+    print(m);
+  });
+  });  
+*/
+collection.saveAll([{"name":"Daniil","score":4},{"name":"Nick","score":5}]);
+  db.getLastError().then((m)
+  {
+    print(m);
+    print(db.connection.sendQueue);
+  });
+  
+/*  MCollection newColl = db.collection('student');
+
+  int sum = 0;
+  newColl.find().each((v)
+    { count++; print(v);
+  }).then((v)=>print("Completed. Sum = $sum, count = $count"));
+
+*/  
+
+/*  
+  collection.save({"name":"Daniil","score":4}).then((v)=>print(v));
+  collection.save({"name":"Nick","score":5}).then((v)=>print(v));   
+*/
+/*
+  new Timer((timer){
+      Cursor cursor = new Cursor(db,collection,limit:50);  
+      cursor.each((e){
+          print(e);//;count++;sumScore += e["score"];
+      }).then((v){
+      Expect.isTrue(v);
+      Expect.isTrue(cursor.state == Cursor.CLOSED);
+//      Expect.equals((4+4+5)/3, sumScore/count);
+      print("CursorId = ${cursor.cursorId}");    
+      
+  });        
+}, 0);
+*/
+/*  var f = Futures.wait(futures);
+  print(f);
+  f.then((v){ 
+      print("there");
+      Cursor cursor = new Cursor(db,collection,limit:10);  
+      cursor.each((e){
+          print(e);count++;sumScore += v["score"];
+      }).then((v){
+      Expect.isTrue(v);
+      Expect.isTrue(cursor.state == Cursor.CLOSED);
+      Expect.equals((4+4+5)/3, sumScore/count);
+      print("CursorId = ${cursor.cursorId}");    
+      
+  });  
+*/  
+}
+testEachWithGetMore(){
+  var res;
+  Db db = new Db('test');
+  db.open();
+  MCollection collection = db.collection('newColl1');
   Cursor cursor = new Cursor(db,collection,limit:10);  
   cursor.each((v) => print(v)).then((v){
     Expect.isTrue(v);
     Expect.isTrue(cursor.state == Cursor.CLOSED);
+    print("CursorId = ${cursor.cursorId}");
     });
   Expect.isTrue(cursor.state == Cursor.INIT);
 }
   testNextObject();
   testNextObjectToEnd();
   testEach();
+//  testEachWithGetMore();
 }

tests/DbCommandTest.dart

+#import("../lib/mongo.dart");
+testDbCommandCreation(){
+  Db db = new Db('test');
+  DbCommand dbCommand = new DbCommand(db,"student",0,0,1,{},{});
+  Expect.stringEquals('test.student',dbCommand.collectionNameBson.value);
+}
+testPingDbCommand(){
+  Db db = new Db("test");
+  db.open();
+  DbCommand pingCommand = DbCommand.createPingCommand(db);
+  Future<Map> mapFuture = db.executeQueryMessage(pingCommand);
+  mapFuture.then((msg) {
+    Expect.mapEquals({'ok': 1},msg.documents[0]);
+  });
+}
+testDropCollectionCommand(){
+  Db db = new Db("test");
+  db.open();
+//  print(isFutureThrowsException(db.dropCollection("collection_with_that_name_does_not_exists")));
+//  db.dropCollection("newColl");
+//  db.collection("student").drop().then((v)=>print("Student collection dropped"));
+}
+
+isFutureThrowsException(Future future){
+  bool result = false;  
+  future.handleException((ex){
+    result = true;
+    return true;
+  });
+  future.then((v) => v);
+  return result;
+}
+
+main(){
+  testDbCommandCreation();  
+  testPingDbCommand();
+  testDropCollectionCommand();
+}

tests/MCollectionTest.dart

   Db db = new Db('db');
   MCollection collection = db.collection('student');
 }
-testInsertCommand(){
+testSaveAll(){
   Db db = new Db('test');
   db.open();
   MCollection newColl; 
-/*  newColl = db.collection('newColl');  
+  newColl = db.collection('newColl');  
   print(newColl.fullName());
   List<Map> docsToInsert  = new List();
   for (int i = 0; i < 200; i++){
     docsToInsert.add({"a":i});
   }
-  newColl.save(docsToInsert);
-*/  
-  newColl = db.collection('newColl1');  
+  newColl.saveAll(docsToInsert);
+}
+testSave(){
+  Db db = new Db('test');
+  db.open();  
+  MCollection newColl = db.collection('newColl1');  
   print(newColl.fullName());
   for (int i = 0; i < 5000; i++){
-    newColl.save([{"a":i}]);
+    newColl.save({"a":i});
   }
 }
+testFindEach(){
+  Db db = new Db('test');
+  db.open();  
+  MCollection newColl = db.collection('newColl1');
+  int count = 0;
+  int sum = 0;
+  newColl.find().each((v)
+    {sum += v["a"]; count++;
+  }).then((v)=>print("Completed. Sum = $sum, count = $count"));
+}
+testFindEachStudent(){
+  Db db = new Db('test');
+  db.open();  
+  MCollection newColl = db.collection('student');
+  int count = 0;
+  int sum = 0;
+  newColl.find().each((v)
+    {sum += v["score"]; count++;
+  }).then((v)=>print("Students Completed. Sum = $sum, count = $count"));
+}
+
 main(){
-  testCollectionCreation();
-  testInsertCommand();
+  testFindEach();
+  testFindEachStudent();
+/*  testCollectionCreation();
+  testSave();
+  testSaveAll();  
+*/  
 }