Commits

Adam Kocoloski  committed 92e206a

attempt at gentler replication shutdown when one of the DBs is deleted

  • Participants
  • Parent commits 8b704d2

Comments (0)

Files changed (3)

File src/couchdb/couch_rep.erl

 
     source,
     target,
+    continuous,
     init_args,
     checkpoint_scheduled = nil,
 
     {BaseId, Extension} = make_replication_id(PostBody, UserCtx),
     Replicator = {BaseId ++ Extension,
         {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
-        transient,
+        temporary,
         1,
         worker,
         [?MODULE]
     SourceProps = proplists:get_value(<<"source">>, PostProps),
     TargetProps = proplists:get_value(<<"target">>, PostProps),
 
+    Continuous = proplists:get_value(<<"continuous">>, PostProps, false),
+
     Source = open_db(SourceProps, UserCtx),
     Target = open_db(TargetProps, UserCtx),
 
 
         source = Source,
         target = Target,
+        continuous = Continuous,
         init_args = InitArgs,
         stats = Stats,
         checkpoint_scheduled = nil,
     ets:update_counter(State#state.stats, Key, N),
     {noreply, State};
 
+handle_info({'DOWN', _, _, _, _}, State) ->
+    ?LOG_INFO("replication terminating because local DB is shutting down", []),
+    timer:cancel(State#state.checkpoint_scheduled),
+    {stop, shutdown, State};
+
 handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
     case State#state.listeners of
     [] ->
 
 handle_info({'EXIT', _, normal}, State) ->
     {noreply, State};
-handle_info({'EXIT', Pid, Reason}, State) ->
-    ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
+        Err == target_error ->
+    ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
+    timer:cancel(State#state.checkpoint_scheduled),
+    {stop, shutdown, State};
+handle_info({'EXIT', _Pid, Reason}, State) ->
     {stop, Reason, State}.
 
 terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
         listeners = Listeners,
         source = Source,
         target = Target,
+        continuous = Continuous,
         stats = Stats,
         source_log = #doc{body={OldHistory}}
     } = State,
     end,
 
     %% reply to original requester
-    [Original|OtherListeners] = lists:reverse(Listeners),
-    gen_server:reply(Original, {ok, NewRepHistory}),
+    OtherListeners = case Continuous of
+    true ->
+        []; % continuous replications have no listeners
+    _ ->
+        [Original|Rest] = lists:reverse(Listeners),
+        gen_server:reply(Original, {ok, NewRepHistory}),
+        Rest
+    end,
 
     %% maybe trigger another replication. If this replicator uses a local
     %% source Db, changes to that Db since we started will not be included in
     open_db({[{<<"url">>,Url}]}, []);
 open_db(<<DbName/binary>>, UserCtx) ->
     case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
-    {ok, Db} -> Db;
+    {ok, Db} ->
+        couch_db:monitor(Db),
+        Db;
     {not_found, no_db_file} -> throw({db_not_found, DbName})
     end.
 

File src/couchdb/couch_rep_missing_revs.erl

         {noreply, State#state{complete=true, changes_loop=nil}}
     end;
 handle_changes_loop_exit(Reason, State) ->
-    ?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
-    {stop, changes_loop_died, State#state{changes_loop=nil}}.
+    {stop, Reason, State#state{changes_loop=nil}}.
 
 changes_loop(OurServer, SourceChangesServer, Target) ->
     case couch_rep_changes_feed:next(SourceChangesServer) of
         body = {IdRevsList}
     },
     {Resp} = couch_rep_httpc:request(Request),
-    {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp),
-    X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} ||
-        {Id,RevStrs} <- MissingRevs],
-    {HighSeq, X};
-        
+    case proplists:get_value(<<"missing_revs">>, Resp) of
+    {MissingRevs} ->
+        X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} ||
+            {Id,RevStrs} <- MissingRevs],
+        {HighSeq, X};
+    _ ->
+        exit({target_error, proplists:get_value(<<"error">>, Resp)})
+    end;
+
 get_missing_revs(Target, Changes) ->
     Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
         {Id, [R || {[{<<"rev">>, R}]} <- C]} end,

File src/couchdb/couch_rep_writer.erl

 
 write_docs(#http_db{} = Db, Docs) ->
     JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
-    ErrorsJson = couch_rep_httpc:request(Db#http_db{
+    Request = Db#http_db{
         resource = "_bulk_docs",
         method = post,
         body = {[{new_edits, false}, {docs, JsonDocs}]},
         headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
-    }),
+    },
+    ErrorsJson = case couch_rep_httpc:request(Request) of
+    {FailProps} ->
+        exit({target_error, proplists:get_value(<<"error">>, FailProps)});
+    List when is_list(List) ->
+        List
+    end,
     ErrorsList =
     lists:map(
         fun({Props}) ->