Commits

Anonymous committed b837304

first bits of global change handler. It manage longpolling and
continuous (set feed=continuous, default is longpoll).

Each change send a line
{"type": "typeofchange", "db": "nameof db", "seq": "updated seq in db"}

  • Participants
  • Parent commits bde3b68
  • Branches dbchanges

Comments (0)

Files changed (3)

File etc/couchdb/default.ini.tpl.in

 [httpd_global_handlers]
 / = {couch_httpd_misc_handlers, handle_welcome_req, <<"Welcome">>}
 favicon.ico = {couch_httpd_misc_handlers, handle_favicon_req, "%localdatadir%/www"}
-
+_dbs_changes = {couch_httpd_misc_handlers, handle_dbs_changes_req}
 _utils = {couch_httpd_misc_handlers, handle_utils_dir_req, "%localdatadir%/www"}
 _all_dbs = {couch_httpd_misc_handlers, handle_all_dbs_req}
 _active_tasks = {couch_httpd_misc_handlers, handle_task_status_req}

File src/couchdb/couch_httpd_db.erl

 -export([handle_request/1, handle_compact_req/2, handle_design_req/2,
     db_req/2, couch_doc_open/4,handle_changes_req/2,
     update_doc_result_to_json/1, update_doc_result_to_json/2,
-    handle_design_info_req/2, handle_view_cleanup_req/2]).
+    handle_design_info_req/2, handle_view_cleanup_req/2,
+    get_changes_timeout/2]).
 
 -import(couch_httpd,
     [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,

File src/couchdb/couch_httpd_misc_handlers.erl

 -export([handle_welcome_req/2,handle_favicon_req/2,handle_utils_dir_req/2,
     handle_all_dbs_req/1,handle_replicate_req/1,handle_restart_req/1,
     handle_uuids_req/1,handle_config_req/1,handle_log_req/1,
-    handle_task_status_req/1,handle_sleep_req/1]).
+    handle_task_status_req/1,handle_sleep_req/1,
+    handle_dbs_changes_req/1]).
 
 -export([increment_update_seq_req/2]).
 
 handle_log_req(Req) ->
     send_method_not_allowed(Req, "GET").
 
+% httpd db changes handler
 
+handle_dbs_changes_req(#httpd{method='GET'}=Req) ->
+    ResponseType = couch_httpd:qs_value(Req, "feed", "longpoll"),
+    if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
+        {ok, Resp} = couch_httpd:start_json_response(Req, 200),
+        Self = self(),
+        {ok, Notify} = couch_db_update_notifier:start_link(
+            fun({Change, DbName}) ->
+                ?LOG_DEBUG("ici ~p ~n", [Change]),
+                Self ! {db_change, Change, DbName};
+            (_) ->
+                ok
+            end),
+        {Timeout, TimeoutFun} = couch_httpd_db:get_changes_timeout(Req, Resp),
+        try
+            keep_sending_changes(Req, Resp, 0, Timeout, TimeoutFun, ResponseType)
+        after
+            couch_db_update_notifier:stop(Notify),
+            get_rest_db_changes() % clean out any remaining update messages
+        end;
+    true ->
+        throw({bad_request, <<"Invalid argument">>})
+    end;
+
+handle_dbs_changes_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").  
+
+
+% waits for a db_updated msg, if there are multiple msgs, collects them.
+wait_db_changes(Timeout, TimeoutFun) ->
+    receive {db_change, Change, DbName} -> 
+        {updated, Change, DbName}
+    after Timeout ->
+        case TimeoutFun() of
+        ok -> wait_db_changes(Timeout, TimeoutFun);
+        stop -> stop
+        end
+    end.
+
+get_rest_db_changes() ->
+    receive {db_change, _, _} -> get_rest_db_changes()
+    after 0 -> updated
+    end.
+
+
+keep_sending_changes(#httpd{user_ctx=UserCtx}=Req, Resp, ChangeInc, Timeout, TimeoutFun, ResponseType) ->
+    case wait_db_changes(Timeout, TimeoutFun) of
+        {updated, Change, DbName} ->
+            ?LOG_DEBUG("ici", []),
+            ChangeInc2 = case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+                {ok, Db} ->
+                    {ok, Info} = couch_db:get_db_info(Db),
+                    Seq = proplists:get_value(update_seq, Info),
+                    send_change(Resp, Change, DbName, Seq),
+                    Inc = ChangeInc + 1,
+                    Inc;
+                _ -> 
+                    ChangeInc
+            end,
+            if
+                ChangeInc2 > 0, ResponseType =:= "longpoll" ->
+                    couch_httpd:end_json_response(Resp);
+                true ->
+                    keep_sending_changes(Req, Resp, ChangeInc2, Timeout, TimeoutFun, ResponseType)
+            end;
+        stop ->
+        couch_httpd:end_json_response(Resp)
+    end.
+
+
+send_change(Resp, Change, DbName, Seq) ->
+    couch_httpd:send_chunk(Resp, [?JSON_ENCODE({[{type,Change},{db,DbName},{seq, Seq}]}) | "\n"]).