Commits

Anonymous committed f05f0a8

Comments (0)

Files changed (1)

     [fun(_, _, ReqId) -> ReqId end,
      fun(Pid, Database, ReqId) ->
              Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
-
+             
              Resp = emongo_server:send_recv(Pid, ReqId, Packet, Timeout),
-
-             NewValue = lists:foldl(F, Value, Resp#response.documents),
+             
+             NewValue = fold_documents(F, Value, Resp),
+                            
              fold_more(F, NewValue, Collection, Resp#response{documents=[]}, Timeout)
      end].
 
     Packet = emongo_packet:get_more(Database, Collection, ReqId, 0, CursorID),
     Resp1 = emongo_server:send_recv(Pid, ReqId, Packet, Timeout),
 
-    NewValue = lists:foldl(F, Value, Resp1#response.documents),
+    NewValue = fold_documents(F, Value, Resp1),
     fold_more(F, NewValue, Collection, Resp1#response{documents=[]}, Timeout).
 
 %%------------------------------------------------------------------------------
             undefined
     end.
 
-
-
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
 get_pid_pool(PoolId, RequestCount) ->
     emongo_sup:worker_pid(PoolId, emongo_sup:pools(), RequestCount).
 
+
+fold_documents(F, Value, Resp) ->
+    try
+        lists:foldl(F, Value, Resp#response.documents)
+    catch
+        Class:Exception ->
+            kill_cursor(Resp#response.pool_id, Resp#response.cursor_id),
+            erlang:Class(Exception)
+    end.
+
+
+kill_cursor(_, 0) ->
+    ok;
+kill_cursor(PoolId, CursorID)  ->
+    {Pid, _Database, ReqId} = get_pid_pool(PoolId, 1),
+    Packet = emongo_packet:kill_cursors(ReqId, [CursorID]),
+    emongo_server:send(Pid, ReqId, Packet).
+
+
 dec2hex(Dec) ->
     dec2hex(<<>>, Dec).