Commits

dizzyd committed 19da30e

Fix bug 785; each merge now happens on a dedicated process to ensure that all linked files/ports get cleaned up

Comments (0)

Files changed (1)

src/bitcask_merge_worker.erl

          terminate/2, code_change/3]).
 
 -record(state, { queue,
-                 worker,
-                 worker_ready = false}).
+                 worker}).
 
 %% ====================================================================
 %% API
     %% Trap exits of the actual worker process
     process_flag(trap_exit, true),
 
-    %% Use a dedicated worker sub-process to do the actual merging. The
-    %% process may ignore messages for a long while during the merge
-    %% and we want to ensure that our message queue doesn't fill up with
-    %% a bunch of dup requests for the same directory.
-    Self = self(),
-    WorkerPid = spawn_link(fun() -> worker_loop(Self) end),
-    {ok, #state{ queue = queue:new(),
-                 worker = WorkerPid }}.
+    %% Use a dedicated worker sub-process to do the actual merging. The process
+    %% may ignore messages for a long while during the merge and we want to
+    %% ensure that our message queue doesn't fill up with a bunch of dup
+    %% requests for the same directory.
+    %%
+    %% The sub-process is created per-merge request to ensure that any
+    %% ports/file handles opened during the merge get properly cleaned up, even
+    %% in error cases.
+    {ok, #state{ queue = queue:new() }}.
 
 handle_call({merge, Args}, _From, #state { queue = Q } = State) ->
     case queue:member(Args, Q) of
         true ->
             {reply, already_queued, State};
         false ->
-            case State#state.worker_ready of
-                true ->
-                    State#state.worker ! {merge, Args},
-                    {reply, ok, State};
-                false ->
+            case State#state.worker of
+                undefined ->
+                    WorkerPid = spawn_link(fun() -> do_merge(Args) end),
+                    {reply, ok, State#state { worker = WorkerPid }};
+                _ ->
                     {reply, ok, State#state { queue = queue:in(Args, Q) }}
             end
     end.
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info(worker_ready, #state { queue = Q } = State) ->
+
+handle_info({'EXIT', _Pid, normal}, #state { queue = Q } = State) ->
     case queue:is_empty(Q) of
         true ->
-            {noreply, State#state { worker_ready = true }};
+            {noreply, State#state { worker = undefined }};
         false ->
             {{value, Args}, Q2} = queue:out(Q),
-            State#state.worker ! {merge, Args},
+            WorkerPid = spawn_link(fun() -> do_merge(Args) end),
             {noreply, State#state { queue = Q2,
-                                    worker_ready = false }}
+                                    worker = WorkerPid }}
     end;
+
 handle_info({'EXIT', Pid, Reason}, #state { worker = Pid } = State) ->
     error_logger:error_msg("Merge worker PID exited: ~p\n", [Reason]),
     {stop, State}.
 %% Internal worker
 %% ====================================================================
 
-worker_loop(Parent) ->
-    Parent ! worker_ready,
-    receive
-        {merge, Args} ->
-            Start = now(),
-            Result = (catch apply(bitcask, merge, Args)),
-            ElapsedSecs = timer:now_diff(now(), Start) / 1000000,
-            case Result of
-                ok ->
-                    error_logger:info_msg("Merged ~p in ~p seconds.\n",
-                                          [Args, ElapsedSecs]);
-                {Error, Reason} when Error == error; Error == 'EXIT' ->
-                    error_logger:error_msg("Failed to merge ~p: ~p\n",
-                                           [Args, Reason])
-            end,
-            worker_loop(Parent)
+do_merge(Args) ->
+    Start = now(),
+    Result = (catch apply(bitcask, merge, Args)),
+    ElapsedSecs = timer:now_diff(now(), Start) / 1000000,
+    case Result of
+        ok ->
+            error_logger:info_msg("Merged ~p in ~p seconds.\n",
+                                  [Args, ElapsedSecs]);
+        {Error, Reason} when Error == error; Error == 'EXIT' ->
+            error_logger:error_msg("Failed to merge ~p: ~p\n",
+                                   [Args, Reason])
     end.