Commits

Anonymous committed 851215b

beefier fault tolerance in the replicator

- trap exits (enumerator and attachment streamers are linked)
- retry by respawning enumerator with last known good source seq
- checkpoint replication record on every flush of document buffer
- reformat nicer error messages to listeners if we need to exit

Comments (0)

Files changed (2)

src/couchdb/couch_httpd_misc_handlers.erl

                     [{headers, TgtHeaders},
                     {user_ctx, UserCtx}]}
                 | Options],
-    {ok, {JsonResults}} = couch_rep:replicate(Source, Target, Options2),
-    send_json(Req, {[{ok, true} | JsonResults]});
+    case couch_rep:replicate(Source, Target, Options2) of
+        {ok, {JsonResults}} ->
+            send_json(Req, {[{ok, true} | JsonResults]});
+        {error, {Type, Details}} ->
+            send_json(Req, 500, {[{error, Type}, {reason, Details}]});
+        {error, Reason} ->
+            send_json(Req, 500, {[{error, Reason}]})
+    end;
 handle_replicate_req(Req) ->
     send_method_not_allowed(Req, "POST").
 

src/couchdb/couch_rep.erl

 }).
 
 init([Source, Target, Options]) ->
+    process_flag(trap_exit, true),
+    
     {ok, DbSrc} = 
         open_db(Source, proplists:get_value(source_options, Options, [])),
     {ok, DbTgt} = 
 
 handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
     #state{
+        context = Context,
+        current_seq = Seq,
         docs_buffer = Buffer,
         source = Source,
         target = Target,
     ets:update_counter(Stats, docs_read, length(Docs)),
     
     %% save them (maybe in a buffer)
-    NewBuffer = case couch_util:should_flush() of
+    {NewBuffer, NewContext} = case couch_util:should_flush() of
         true ->
             Docs2 = lists:flatten([Docs|Buffer]),
             ok = update_docs(Target, Docs2, [], false),
             ets:update_counter(Stats, docs_written, length(Docs2)),
-            [];
+            {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
+            {[], Ctxt};
         false ->
-            [Docs | Buffer]
+            {[Docs | Buffer], Context}
     end,
     
-    {reply, ok, State#state{docs_buffer=NewBuffer}};
+    {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
 
 handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
     ets:update_counter(State#state.stats, total_revs, RevsCount),
     couch_task_status:update("Processed source update #~p", [Seq]),
     {noreply, State#state{current_seq=Seq}}.
 
+handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
+    ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
+    #state{
+        current_seq = Seq,
+        source = Src,
+        target = Tgt,
+        enum_pid = Pid
+    } = State,
+    Parent = self(),
+    NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
+    {noreply, State#state{enum_pid=NewPid}};
+    
+%% if any linked process dies, respawn the enumerator to get things going again
+handle_info({'EXIT', _From, normal}, State) ->
+    {noreply, State};
+handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
+    ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
+    exit(EnumPid, pls_restart_kthxbye),
+    {noreply, State};
+
 handle_info(_Msg, State) ->
     {noreply, State}.
 
     
     couch_task_status:update("Finishing"),
     
-    %% format replication history
-    JsonStats = [
-        {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
-        {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
-        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
-    ],
+    {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
     ets:delete(Stats),
-    {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats),
-    
-    %% update local documents
-    RepRecSrc = proplists:get_value(src_record, Context),
-    RepRecTgt = proplists:get_value(tgt_record, Context),
-    {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []),
-    {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []),
-    
     close_db(Target),
     
     %% reply to original requester
     end,
     close_db(Source);
 terminate(Reason, State) ->
+    ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
     #state{
+        context = Context,
+        current_seq = Seq,
         listeners = Listeners,
         source = Source,
         target = Target,
     
     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
     
+    {ok, _, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
+    
     ets:delete(Stats),
     close_db(Target),
     close_db(Source).
             attachment_loop(ReqId);
         {ibrowse_async_response, ReqId, chunk_end} ->
             attachment_loop(ReqId);
+        {ibrowse_async_response, ReqId, {error, Err}} ->
+            ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+            exit(attachment_request_failed);
         {ibrowse_async_response, ReqId, Data} -> 
             receive {From, gimme_data} -> From ! {self(), Data} end,
             attachment_loop(ReqId);
     
     %% make the async request
     Options = [{stream_to, Pid}, {response_format, binary}],
-    {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options),
+    ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+        {ibrowse_req_id, X} -> X;
+        {error, _Reason} -> exit(attachment_request_failed)
+    end,
     
     %% tell our receiver about the ReqId it needs to look for
     Pid ! {self(), {set_req_id, ReqId}},
 close_db(Db)->
     couch_db:close(Db).
 
+do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
+    ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+    [
+        {start_seq, SeqNum},
+        {history, OldRepHistoryProps},
+        {rep_starttime, ReplicationStartTime},
+        {src_starttime, SrcInstanceStartTime},
+        {tgt_starttime, TgtInstanceStartTime},
+        {src_record, RepRecSrc},
+        {tgt_record, RepRecTgt}
+    ] = Context,
+    
+    NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+    true ->
+        % nothing changed, don't record results
+        {OldRepHistoryProps};
+    false ->
+        % commit changes to both src and tgt. The src because if changes
+        % we replicated are lost, we'll record the a seq number of ahead 
+        % of what was committed and therefore lose future changes with the
+        % same seq nums.
+        {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+        {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+        
+        RecordSeqNum =
+        if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+                TgtInstanceStartTime2 == TgtInstanceStartTime ->
+            NewSeqNum;
+        true ->
+            ?LOG_INFO("A server has restarted sinced replication start. "
+                "Not recording the new sequence number to ensure the "
+                "replication is redone and documents reexamined.", []),
+            SeqNum
+        end,
+        
+        %% format replication history
+        JsonStats = [
+            {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+            {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+        ],
+        
+        HistEntries =[
+            {
+                [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+                {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+                {<<"start_last_seq">>, SeqNum},
+                {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
+            | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
+        % something changed, record results
+        {[
+            {<<"session_id">>, couch_util:new_uuid()},
+            {<<"source_last_seq">>, RecordSeqNum},
+            {<<"history">>, lists:sublist(HistEntries, 50)}
+        ]}
+    end,
+    
+    %% update local documents
+    RepRecSrc = proplists:get_value(src_record, Context),
+    RepRecTgt = proplists:get_value(tgt_record, Context),
+    {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
+    {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
+    
+    NewContext = [
+        {start_seq, SeqNum},
+        {history, OldRepHistoryProps},
+        {rep_starttime, ReplicationStartTime},
+        {src_starttime, SrcInstanceStartTime},
+        {tgt_starttime, TgtInstanceStartTime},
+        {src_record, RepRecSrc#doc{revs=[SrcRev]}},
+        {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
+    ],
+    
+    {ok, NewHistory, NewContext}.
+
 do_http_request(Url, Action, Headers) ->
     do_http_request(Url, Action, Headers, []).
 
 
 do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
     ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", 
-        [Action, Url]);
+        [Action, Url]),
+    exit({http_request_failed, ?l2b(Url)});
 do_http_request(Url, Action, Headers, JsonBody, Retries) ->
     ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
     Body =
         enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
     end.
 
-finalize_response(Source, Target, Context, NewSeqNum, Stats) ->
-    [
-        {start_seq, SeqNum},
-        {history, OldRepHistoryProps},
-        {rep_starttime, ReplicationStartTime},
-        {src_starttime, SrcInstanceStartTime},
-        {tgt_starttime, TgtInstanceStartTime}
-    |_] = Context,
-    
-    case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
-    true ->
-        % nothing changed, don't record results
-        {ok, {OldRepHistoryProps}};
-    false ->
-        % commit changes to both src and tgt. The src because if changes
-        % we replicated are lost, we'll record the a seq number of ahead 
-        % of what was committed and therefore lose future changes with the
-        % same seq nums.
-        {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
-        {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
-        
-        RecordSeqNum =
-        if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
-                TgtInstanceStartTime2 == TgtInstanceStartTime ->
-            NewSeqNum;
-        true ->
-            ?LOG_INFO("A server has restarted sinced replication start. "
-                "Not recording the new sequence number to ensure the "
-                "replication is redone and documents reexamined.", []),
-            SeqNum
-        end,
-        
-        HistEntries =[
-            {
-                [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
-                {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
-                {<<"start_last_seq">>, SeqNum},
-                {<<"end_last_seq">>, NewSeqNum} | Stats]}
-            | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
-        % something changed, record results
-        NewRepHistory =
-            {
-                [{<<"session_id">>, couch_util:new_uuid()},
-                {<<"source_last_seq">>, RecordSeqNum},
-                {<<"history">>, lists:sublist(HistEntries, 50)}]},
-        {ok, NewRepHistory}
-    end.
-
 fix_url(UrlBin) ->
     Url = binary_to_list(UrlBin),
     case lists:last(Url) of
     Url = DbUrl ++ url_encode(DocId),
     {ResponseMembers} = do_http_request(Url, put, Headers,
             couch_doc:to_json_obj(Doc, [revs,attachments])),
-    RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
+    RevId = proplists:get_value(<<"rev">>, ResponseMembers),
     {ok, RevId};
 update_local_doc(Db, Doc, Options) ->
     couch_db:update_doc(Db, Doc, Options).