Commits

Anonymous committed 7f2d931

Fix indexer to return qilr analyzers workers when finished

Comments (0)

Files changed (1)

apps/riak_search/src/riak_search_dir_indexer.erl

                         last_status_time }).
 
 -define(STATUS_INTERVAL, 10000).                % Milliseconds between status callbacks
+-define(WORKER_TIMEOUT, 2000).
 
 %% ===================================================================
 %% API functions
 %% Internal functions
 %% ===================================================================
 
+console_status(done) ->
+    io:format("Finished indexing.\n");
 console_status(Status) ->
     ElapsedSecs = timer:now_diff(now(), Status#index_status.start_time) / 1000000,
     AvgKbSec = (Status#index_status.processed_bytes / ElapsedSecs) / 1024,
 
     %% Spawn a bunch of workers
     Self = self(),
-    [spawn_link(fun() -> index_worker_loop0(Self, Index) end) || _ <- lists:seq(1,Workers)],
+    WorkerPids = [spawn_link(fun() -> index_worker_loop0(Self, Index) end) || _ <- lists:seq(1,Workers)],
 
     %% Initialize status and start processing files
     Status = #index_status {total_files = TotalFiles,
                             total_bytes = TotalBytes,
                             start_time  = now(),
                             last_status_time = now()},
-    index_master_loop(list_dir(Dir), Status, StatusFn).
+    index_master_loop(list_dir(Dir), Status, StatusFn),
 
-
+    %% Stop all workers.
+    [X!stop || X <- WorkerPids],
+    StatusFn(done),
+    ok.
 
 index_master_loop([], Status, StatusFn) ->
     StatusFn(Status),
-    ok;
+    stop;
 index_master_loop([FName | Rest], Status, StatusFn) ->
     TimeSinceUpdate = timer:now_diff(now(), Status#index_status.last_status_time) / 1000,
     NewStatus = case TimeSinceUpdate > ?STATUS_INTERVAL of
                                               processed_files = Status#index_status.processed_files + 1
                                              }, StatusFn)
             end;
-
         stop ->
-            StatusFn(NewStatus)
+            StatusFn(NewStatus),
+            stop
     end.
 
-
 index_worker_loop0(QueuePid, Index) ->
     {ok, Client} = riak_search:local_client(),
     {ok, AnalyzerPid} = qilr:new_analyzer(),
                     error_logger:error_msg("Failed to read file ~p: ~p\n", [FName, Reason]),
                     QueuePid ! {next_file, self(), 0}
             end,
-            index_worker_loop(QueuePid, Client, AnalyzerPid, IndexFsmPid, Index, DefaultField)
+            index_worker_loop(QueuePid, Client, AnalyzerPid, IndexFsmPid, Index, DefaultField);
+        stop ->
+            stop
+    after ?WORKER_TIMEOUT ->
+            stop
     end.