Commits

vadimtsushko committed 6a4d7de

22

  • Participants
  • Parent commits ccd8fe4

Comments (0)

Files changed (7)

File lib/collection.dart

   Db db;
   String collectionName;
   MCollection(this.db, this.collectionName){}  
-  
+  String fullName() => "${db.databaseName}.$collectionName";
+  save(List<Map> documents){
+    MongoInsertMessage insertMessage = new MongoInsertMessage(fullName(),documents);
+    db.executeMessage(insertMessage);
+  } 
 }

File lib/cursor.dart

   var eachCallback;
   var eachComplete;
   bool explain;
-  int flags = 0;
-  String collectionName() => "${db.databaseName}.${collection.collectionName}";
+  int flags = 0;  
   Cursor(this.db, this.collection, [this.selector, this.fields, this.skip=0, this.limit=1
   , this.sort, this.hint, this.explain]){
     if (selector === null){
     }
     items = [];
   }
-
   MongoQueryMessage generateQueryMessage(){
-    return new  MongoQueryMessage(collectionName(),
+    return new  MongoQueryMessage(collection.fullName(),
             flags,
             skip,
             limit,
     if (state == INIT){
       Completer nextItem = new Completer();
       MongoQueryMessage qm = generateQueryMessage();
-      Future<MongoReplyMessage> reply = db._executeQueryMessage(qm);
+      Future<MongoReplyMessage> reply = db.executeQueryMessage(qm);
       reply.then((replyMessage){
         state = OPEN;
         items.addAll(replyMessage.documents);
          eachComplete.complete(true);
       } else {
             eachCallback(val);
-            new Timer(nextTick,0);
+            nextTick(null);
+//            new Timer(nextTick,0);
       }            
     });
   }
   Future<bool> each(callback){
-     eachCallback = callback; 
-     eachComplete = new Completer();
-     new Timer(nextTick,0);
-     return eachComplete.future;
+    eachCallback = callback; 
+    eachComplete = new Completer();
+//     new Timer(nextTick,0);
+    nextTick(null);
+    return eachComplete.future;
   }
 }
   MCollection collection(String collectionName){
       return new MCollection(this,collectionName);
   }
-  Future<Map> _executeQueryMessage(MongoQueryMessage queryMessage){
+  Future<Map> executeQueryMessage(MongoQueryMessage queryMessage){
     return connection.query(queryMessage);
   }  
+  executeMessage(MongoMessage message){
+    connection.execute(message);
+  }    
   open(){
     connection.connect();
   }

File lib/mongo.dart

 #source("networking/mongo_message.dart");
 #source("networking/mongo_query_message.dart");
 #source("networking/mongo_reply_message.dart");
+#source("networking/mongo_insert_message.dart");
 #source("networking/server_config.dart");
 #source("db.dart");
 #source("collection.dart");

File lib/networking/connection.dart

 class Connection{
   Binary lengthBuffer;
   ServerConfig serverConfig;
+  Binary bufferToSend;
+  Queue<Binary> sendQueue;
   Binary messageBuffer;
   Socket socket;
   Completer replyCompleter;
     }    
   }
   connect(){
+    sendQueue = new Queue();
     socket = new Socket(serverConfig.host, serverConfig.port);
     if (socket == null) {
       throw "can't get send socket";
     lengthBuffer = new Binary(4);
   }
   int sendData(Binary msg){
-    while (msg.offset != msg.bytes.length){
-      msg.offset += socket.writeList(msg.bytes,msg.offset,msg.bytes.length-msg.offset);
+    while (msg.offset != msg.bytes.length){      
+      msg.offset += socket.writeList(msg.bytes,
+        msg.offset,msg.bytes.length-msg.offset);      
     }    
     return msg.offset;
   }
+  getNextBufferToSend(){
+    if (bufferToSend === null || bufferToSend.atEnd()){
+      if(!sendQueue.isEmpty()){
+        bufferToSend = sendQueue.removeFirst();
+      } else {
+        bufferToSend = null;  
+      } 
+    }
+  }
+  sendBufferFromTimer() => sendBuffer("from Timer");
+  sendBufferFromOnWrite() => sendBuffer("from OnWrite");
+  sendBuffer(String origin){
+//    print(origin);
+    getNextBufferToSend();
+    if (bufferToSend !== null){
+      bufferToSend.offset += socket.writeList(bufferToSend.bytes,
+        bufferToSend.offset,bufferToSend.bytes.length-bufferToSend.offset);
+      if (!bufferToSend.atEnd()){
+        print("${bufferToSend.offset}");
+      }      
+      new Timer((t)=>sendBufferFromTimer(),0);
+    }        
+    else {
+      print(sendQueue);
+      socket.onWrite = null;        
+      socket.onClosed = null;        
+      socket.onError = null;        
+    }    
+  }
   
    void receiveData() {
     if (messageBuffer === null){
     sendData(buffer);
     return replyCompleter.future;
   }
+  execute(MongoMessage message){    
+    sendQueue.addLast(message.serialize());    
+    socket.onWrite = sendBufferFromOnWrite;  
+  }
 }

File lib/networking/mongo_insert_message.dart

 class MongoInsertMessage extends MongoMessage{
   BsonCString _collectionFullName;
-  int flags;
-  int numberToSkip;
-  int numberToReturn;
-  BsonMap _query;
-  BsonMap _fields;
-  MongoInsertMessage(String collectionFullName,
-            this.flags,
-            this.numberToSkip,
-            this.numberToReturn,
-            Map query,
-            Map fields){
+  int flags;  
+  List<BSonMap> _documents;
+  MongoInsertMessage(String collectionFullName,            
+            List<Map> documents,
+            [this.flags = 0]
+            ){
     _collectionFullName = new BsonCString(collectionFullName);
-    _query = new BsonMap(query);
-    if (fields !== null){
-      _fields = new BsonMap(fields);
-    }
-    opcode = MongoMessage.Query;    
+    _documents = new List();
+    for (var document in documents){
+      _documents.add(new BsonMap(document));
+    }      
+    opcode = MongoMessage.Insert;
   }
   int get messageLength(){
-    int result = 16+4+_collectionFullName.byteLength()+4+4+_query.byteLength();
-    if (_fields !== null){
-      result += _fields.byteLength();
-    }
+    int docsSize = 0;
+    for (var _doc in _documents){
+      docsSize += _doc.byteLength();
+    }   
+    int result = 16+4+_collectionFullName.byteLength()+docsSize;
     return result;
   }
   Binary serialize(){
     writeMessageHeaderTo(buffer);
     buffer.writeInt(flags);
     _collectionFullName.packValue(buffer);
-    buffer.writeInt(numberToSkip);
-    buffer.writeInt(numberToReturn);
-    _query.packValue(buffer);
+    for (var _doc in _documents){
+      _doc.packValue(buffer);
+    }       
     buffer.offset = 0;
     return buffer;
   }

File tests/MCollectionTest.dart

 #import("../lib/mongo.dart");
+#import("dart:io");
 testCollectionCreation(){
   Db db = new Db('db');
   MCollection collection = db.collection('student');
 }
+testInsertCommand(){
+  Db db = new Db('test');
+  db.open();
+  MCollection newColl; 
+/*  newColl = db.collection('newColl');  
+  print(newColl.fullName());
+  List<Map> docsToInsert  = new List();
+  for (int i = 0; i < 200; i++){
+    docsToInsert.add({"a":i});
+  }
+  newColl.save(docsToInsert);
+*/  
+  newColl = db.collection('newColl1');  
+  print(newColl.fullName());
+  for (int i = 0; i < 5000; i++){
+    newColl.save([{"a":i}]);
+  }
+}
 main(){
-
   testCollectionCreation();
+  testInsertCommand();
 }