Commits

rzezeski  committed befc328

Bring in merge_index as a dep

tags: az341

  • Participants
  • Parent commits 2eac8fe

Comments (0)

Files changed (29)

File apps/merge_index/Emakefile

-{ 
-  [	
-    "./src/*", 
-    "./src/*/*", 
-    "./src/*/*/*"
-  ], 
-  [
-    { i, "./include" },
-    { outdir, "./ebin" },
-    debug_info
-  ]
-}.

File apps/merge_index/Makefile

-compile:
-	erl -make
-
-clean:
-	rm ebin/*.beam
-
-run:
-	erl -pa ebin
-

File apps/merge_index/Notes.txt

-- Delete values.
-- Make the buffer sort dates correctly.
-
-
-
-

File apps/merge_index/docs/MergeIndex.org

-#+SETUPFILE: html-style.org
-
-* Overview
-  
-  This describes a new type of merge\_index that solves a few problems
-  with the old merge\_index, namely:
-
-  + Data files are capped at a certain size and "rolled over", so that
-    the sort process works on a smaller subset of data. This means
-    that we need to merge results from a few different files at read
-    time.
-
-  + Indexes, Fields, and Terms are separated into their own gb\_trees
-    and each unique value is given its own ID. This allows us to store
-    integer IDs in the data files rather than strings, saving space.
-
-  + Adds the concept of a subterm, to be used for efficient date
-    matching, phrase searching, line searching, etc. A subterm is like
-    a facet that gets indexed. 
-
-* API
-
-   + index(Index, Field, Term, SubTermType, SubTerm, Value, Props)
-     Index a value.
-
-   + deindex(Index, Field, Term, SubTermType, SubTerm, Value)
-     De-index a value.
-
-** Use Default SubTypes
-
-  + info(Index, Field, Term)
-    Same as info (Index, Field, Term, 0, all, all).
-
-  + range(Index, Field, Term) 
-    Same as info (Index, Field, Term, 0, all, all).
-
-  + stream(Index, Field, Term)
-    Same as stream(Index, Field, Term, 0, all, all).
-
-** With SubTypes
-
-   + info(Index, Field, Term, SubTermType, Start, Stop)
-     Return the info for term subtype between start and stop.
- 
-   + range(Index, Field, StartTerm, EndTerm, SubTermType, StartSub,
-     EndSub, Inclusive)
-     Return the infos for terms between start and stop.
- 
-   + stream(Index, Field, Term, SubTermType, Start, Stop, FilterFun)
-     Stream out the keys for this term between the two ranges.
-
-  SubTerm acts like an integer facet, can be used for limiting based on
-  time or other things.
-
-  DEFAULT  = 0
-  TIME     = 1
-  CHARPOS  = 2
-  WORDPOS  = 3
-  LINEPOS  = 4
-
-  #term { index, field, term, subterm\_type=0, subterm=0, facets }
-  
-* Data Structures
-
-** PARTITION.indexes
-
-   Stored in memory as a gb\_tree mapping <<"index">> to integer.
-   Stored on disk as:
-   <<Size:16/integer>>
-   binary\_to\_term({<<"index">>, int}).
-
-   New entries are simply added to the end.
-   On startup, file\_sort the file, then read into a gb\_tree.
-   
-** PARTITION.fields
-
-   Same as indexes, mapping fields to integers.
-
-** PARTITION.terms
-
-   Same as indexes, mapping terms to integers.
-
-** PARTITION.offsets
-
-   Stored in memory as a gb\_tree of the form {IDF, [{FileNum, Offset,
-   Count}]}. IDF is a packed binary:
-
-   + IndexID:16/integer
-   + FieldID:32/integer, 
-   + TermID:32/integer,
-   + SubTermID:8/integer, 
-   + SubTerm:64/integer
-
-   Stored on disk as a series of terms:
-
-   + <<Size:8/integer>>
-   + {IDF, [{Filenum, Offset, Size}]}
-
-** PARTITION.seg.N
-
-   Stored on disk as:
-
-   <<Size:24/integer>>, <<Term/binary>>
-
-   Where Term is {UDF, Value, [{propkey, propvalue}]}.
-
-** cur\_segment\_num / cur\_segment\_size
-
-   Calculated on startup by looping through available segment numbers
-   until we find the last file, then getting the size.
-
-* Configuration Settings
-
-  + merge\_at\_interval - How often should we merge, in seconds?
-  + merge\_at\_buffer\_size - Merge if the unmerged data exceeds this size.
-  + segment\_size - What's the max size of a segment before rolling over and merging?
-  + temp\_directory - Where should we sort the segment.
-
-* State
-  
-  rootname : string
-  indexes  : gb\_tree
-  fields : gb\_tree
-  terms : gb\_tree
-  offsets : gb\_tree
-  cur\_segment\_num : integer
-  cur\_segment\_size : integer (segment + buffer size)
-  last\_merge : now()
-  buffer\_handle : either PARTITION.buffer or PARTITION.in\_merge\_buffer
-  
-* Events/Processes
-** On Startup
-
-   - Check if the offsets file exists, if not, then scan through all
-     PARTITION.seg.* files recreating the offsets gb\_tree. When done,
-     write it back out. Otherwise, just read offsets file into a gb\_tree.
-
-   - Read indexes, fields, and terms into gb\_trees.
-
-   - Open a handle to the non-merge buffer file.
-
-** Writing Data
-
-   - Look up the index in the 'indexes' tree. If it's not found, then add entry and log to disk.
-   - Repeat for fields and terms.
-   - Create the IDF, write the value to a segment.
-   - If size greater than segment\_size, then start a merge.
-
-** Merging Segments
-
-   - Only one merge at a time.
-   - Spawn a background process that sorts the latest segment and returns a new partial offsets tree.
-
-** At Merge Start
-  
-   - Close buffer handle, open merge buffer handle.
-   - Do file sort of current seg plus buffer.
-   - When done, call merge\_complete with new offsets for this segment.
-
-** At Merge Complete
-   
-   - Swap new segment file and old segment file.
-   - Copy offsets into offsets gbtree.
-
-** Range Searches
-
-   - Get index into term gbtree at first possible start. Generate a
-     list of the possible.
-   - Iterate through creating IDFs doing lookups into the offset file.
-   - Send back the counts that we find.
-
-** Streaming Data
-
-   - Look up the indexID, fieldID, and termID.
-   - Look up the [{file, offset, count}] list.
-   - Open a file handle on each file, read and do a merge.
-   - Close when done.
-   
-** Handoff
-
-   - Invert the indexes, fields, and terms gb\_trees.
-   - Plow through each segment doing reverse lookups. Things will be
-     roughly grouped, so just cache the last access for each gb\_tree.
-
-** Updates
-
-   No updates. User must explicitly delete a value.
-   Delete original document first, then index the new document. 
-
-* Tricky Stuff
-
-** SubType
-
-   Offset file will now have this: <<Index, Field, Term, SubTerm>> ->
-   [{File, Offset, Count}]
-
-** Time Series
-
-   + If we make it too granular (down to millisecond) then every value
-     will have its own offset.
-
-   + If we make it not granular enough (to year) then we will need to stream more results.
-
-   + Let the user set granularity, then do more filtering at the facet level.
-
-   Let the user figure out granularity. 
-
-** Phrase Search
-
-   Use the SubTerm field to store the term's position. Then expand the
-   first word and use it to narrow down the possible positions for the
-   second word, and use that to narrow down the positions for the third
-   word.
-
-   Can we do phrase search and date search at the same time? No. So how do we decide?
-   What does the time series search look like?
-
-

File apps/merge_index/docs/html-style.org

-#+STYLE: <style>
-#+STYLE: h1.title {
-#+STYLE:     font-family:'HelveticaNeue-Light','Helvetica Neue Light','Helvetica Neue',Arial,Helvetica,sans-serif;
-#+STYLE:     padding: 20px;
-#+STYLE:     font-size: 3em;
-#+STYLE:     line-height: 3em;
-#+STYLE:     font-weight: 300;
-#+STYLE:     border-bottom: solid 1px black;
-#+STYLE: }
-#+STYLE:  
-#+STYLE: h2 {
-#+STYLE:     font-family:'HelveticaNeue-Light','Helvetica Neue Light','Helvetica Neue',Arial,Helvetica,sans-serif;
-#+STYLE:     font-size: 1.8em;
-#+STYLE:     line-height: 1.8em;
-#+STYLE:     font-weight:300;
-#+STYLE:     padding: 20px 0px 0px 0px;
-#+STYLE: }
-#+STYLE:  
-#+STYLE: h3 {
-#+STYLE:     font-family:'HelveticaNeue-Light','Helvetica Neue Light','Helvetica Neue',Arial,Helvetica,sans-serif;
-#+STYLE:     font-size:1.4em;
-#+STYLE:     line-height: 1.4em;
-#+STYLE:     font-weight:300;
-#+STYLE:     color: #306990
-#+STYLE: }
-#+STYLE:  
-#+STYLE: body {
-#+STYLE:     font-family: "Helvetica", San-Serif;
-#+STYLE:     font-size: 90%;
-#+STYLE:     line-height: 160%;
-#+STYLE:     margin: 50px;
-#+STYLE: }
-#+STYLE:  
-#+STYLE: li { padding: 3px 0px 3px 0px; }
-#+STYLE:  
-#+STYLE: a { text-decoration: none; }
-#+STYLE: a:hover { text-decoration: underline; }
-#+STYLE: </style>

File apps/merge_index/docs/pdf-style.org

-#+LANGUAGE:     en
-#+LATEX_HEADER: \usepackage{color}
-#+LATEX_HEADER: \usepackage{sectsty}
-#+LATEX_HEADER: \usepackage{listings}
-#+LATEX_HEADER: \usepackage[T1]{fontenc}
-#+LATEX_HEADER: \usepackage{cmbright}
-#+LATEX_HEADER: \usepackage[left=1in,top=1in,right=1in,bottom=1in, nohead]{geometry}
-#+LATEX_HEADER: \usepackage{hyperref}
-#+LATEX_HEADER: \setlength\parindent{0in}
-#+LATEX_HEADER: \setlength\parskip{0.1in}
-#+LATEX_HEADER:\sectionfont{\pagebreak\fontfamily{iwona}\Huge\selectfont}
-#+LATEX_HEADER:\subsectionfont{\fontfamily{iwona}\color[rgb]{0.18,0.41,0.56}\selectfont}
-#+LATEX_HEADER:\hypersetup{pdfborder={0 0 0 0}, colorlinks=true, linkcolor=[rgb]{0.80,0.44,0.02},urlcolor=[rgb]{0.80,0.44,0.02}}
-#+STYLE: <link rel="stylesheet" type="text/css" href="stylesheet.css" />
-#+OPTIONS:      H:3 toc:2 num:t

File apps/merge_index/extra/bashobench.config

-{mode, max}.
-
-{duration, 10}.
-
-{concurrent, 5}.
-
-{report_interval, 5}.
-
-{driver, basho_bench_driver_merge_index}.
-
-{key_generator, {uniform_int_bin, 500000}}.
-
-{value_generator, {fixed_bin, 250}}.
-
-{operations, [{index, 50}]}.
-%{operations, [{info, 1}, {stream, 1}]}.
-
-{merge_index_rollover_size, 52428800}.
-{merge_index_sync_interval, 2000}.
-
-{code_paths, ["deps/stats",
-              "deps/ibrowse",
-             "ebin"]}.

File apps/merge_index/include/basho_bench.hrl

-
-
--define(FAIL_MSG(Str, Args), ?ERROR(Str, Args), halt(1)).
-
--define(CONSOLE(Str, Args), basho_bench_log:log(console, Str, Args)).
-
--define(DEBUG(Str, Args), basho_bench_log:log(debug, Str, Args)).
--define(INFO(Str, Args), basho_bench_log:log(info, Str, Args)).
--define(WARN(Str, Args), basho_bench_log:log(warn, Str, Args)).
--define(ERROR(Str, Args), basho_bench_log:log(error, Str, Args)).
-
--define(FMT(Str, Args), lists:flatten(io_lib:format(Str, Args))).
-

File apps/merge_index/include/merge_index.hrl

--define(PRINT(Var), error_logger:info_msg("DEBUG: ~p:~p~n~p~n  ~p~n", [?MODULE, ?LINE, ??Var, Var])).
--define(TIMEON, erlang:put(debug_timer, [now()|case erlang:get(debug_timer) == undefined of true -> []; false -> erlang:get(debug_timer) end])).
--define(TIMEOFF(Var, Count), io:format("~s :: ~p @ ~10.2fms (~10.2f total) : ~p : ~p~n", [string:copies(" ", length(erlang:get(debug_timer))), Count, (timer:now_diff(now(), hd(erlang:get(debug_timer)))/1000/(Count+1)),(timer:now_diff(now(), hd(erlang:get(debug_timer)))/1000), ??Var, Var]), erlang:put(debug_timer, tl(erlang:get(debug_timer)))).
-
--record(segment,{root,
-                 offsets_table,
-                 size}).
-
-

File apps/merge_index/rebar.config

-{cover_enabled, true}.

File apps/merge_index/src/basho_bench_driver_merge_index.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(basho_bench_driver_merge_index).
-
--export([new/1,
-         run/4]).
-
--include("basho_bench.hrl").
-
--record(state, { pid }).
--define(INDEX, <<"index">>).
--define(FIELD, <<"field">>).
--define(PRINT(Var), error_logger:info_msg("DEBUG: ~p:~p~n~p~n  ~p~n", [?MODULE, ?LINE, ??Var, Var])).
-
-%% ====================================================================
-%% API
-%% ====================================================================
-
-new(Id) ->
-    %% Get reference to local merge_index.
-    Root = "../data_" ++ integer_to_list(Id),
-    SyncInterval = basho_bench_config:get(merge_index_sync_interval, 30 * 1000),
-    RolloverSize = basho_bench_config:get(merge_index_rollover_size, 50 * 1024 * 1024),
-    Options = [{merge_index_sync_interval, SyncInterval}, {merge_index_rollover_size, RolloverSize}],
-    {ok, Pid} = merge_index:start_link(Root, Options),
-    {ok, #state { pid=Pid }}.
-
-now_to_timestamp({Mega, Sec, Micro}) ->
-    <<TS:64/integer>> = <<Mega:16/integer, Sec:24/integer, Micro:24/integer>>,
-    TS.
-
-run(index, KeyGen, ValueGen, State) ->
-    #state { pid=Pid } = State,
-    TS = now_to_timestamp(now()),
-    merge_index:index(Pid, ?INDEX, ?FIELD, KeyGen(), ValueGen(), [], TS),
-    {ok, State};
-
-run(info, KeyGen, _ValueGen, State) ->
-    #state { pid=Pid } = State,
-    merge_index:info(Pid, ?INDEX, ?FIELD, KeyGen()),
-    {ok, State};
-
-run(stream, KeyGen, _ValueGen, State) ->
-    #state { pid=Pid } = State,
-    Ref = make_ref(),
-    F = fun(_X, _Y) -> true end,
-    merge_index:stream(Pid, ?INDEX, ?FIELD, KeyGen(), self(), Ref, F),
-    collect_stream(Ref, 0, undefined),
-    {ok, State}.
-
-collect_stream(Ref, Count, LastKey) ->
-    receive 
-        {result, '$end_of_table', Ref} ->
-            ok;
-        {result, {Key, _Props}, Ref} when (LastKey == undefined orelse LastKey =< Key) ->
-            collect_stream(Ref, Count + 1, Key);
-        {result, {Key, _Props}, Ref} ->
-            throw({key_out_of_order, Key})
-    end.

File apps/merge_index/src/merge_index.app.src

-{application, merge_index,
-    [{description, "Merge-Index Data Store"},
-     {vsn, "0.14.0"},
-     {applications, [kernel,
-                     stdlib,
-                     sasl]},
-     {mod, {mi_app, []}},
-     {env, [
-            {data_root, "data/merge_index"},
-            {buffer_rollover_size, 1048576},
-            {buffer_delayed_write_size, 524288},
-            {buffer_delayed_write_ms, 2000},
-            {max_compact_segments, 20},
-            {segment_query_read_ahead_size, 65536},
-            {segment_compact_read_ahead_size, 5242880},
-            {segment_file_buffer_size, 20971520},
-            {segment_delayed_write_size, 20971520},
-            {segment_delayed_write_ms, 10000},
-            {segment_full_read_size, 5242880},
-            {segment_block_size, 32767},
-            {segment_values_staging_size, 1000},
-            {segment_values_compression_threshold, 0},
-            {segment_values_compression_level, 1}
-           ]}
-]}.

File apps/merge_index/src/merge_index.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(merge_index).
--author("Rusty Klophaus <rusty@basho.com>").
--include("merge_index.hrl").
--include_lib("kernel/include/file.hrl").
-
--export([
-    %% API
-    start_link/1,
-    stop/1,
-    index/7, index/2, 
-    stream/7,
-    range/9,
-    info/4,
-    is_empty/1,
-    fold/3,
-    drop/1,
-    compact/1
-]).
-
-start_link(Root) ->
-    gen_server:start_link(mi_server, [Root], [{timeout, infinity}]).
-
-stop(_ServerPid) ->
-    ok.
-
-index(ServerPid, Index, Field, Term, Value, Props, Timestamp) ->
-    index(ServerPid, [{Index, Field, Term, Value, Props, Timestamp}]).
-
-index(ServerPid, Postings) ->
-    gen_server:call(ServerPid, {index, Postings}, infinity).
-
-info(ServerPid, Index, Field, Term) ->
-    gen_server:call(ServerPid, {info, Index, Field, Term}, infinity).
-
-stream(ServerPid, Index, Field, Term, Pid, Ref, FilterFun) ->
-    gen_server:call(ServerPid, 
-        {stream, Index, Field, Term, Pid, Ref, FilterFun}, infinity).
-
-range(ServerPid, Index, Field, StartTerm, EndTerm, Size, Pid, Ref, FilterFun) ->
-    gen_server:call(ServerPid, 
-        {range, Index, Field, StartTerm, EndTerm, Size, Pid, Ref, FilterFun}, infinity).
-
-is_empty(ServerPid) ->
-    gen_server:call(ServerPid, is_empty, infinity).
-
-fold(ServerPid, Fun, Acc) ->
-    gen_server:call(ServerPid, {fold, Fun, Acc}, infinity).
-
-drop(ServerPid) ->
-    gen_server:call(ServerPid, drop, infinity).
-
-compact(ServerPid) ->
-    gen_server:call(ServerPid, start_compaction, infinity).

File apps/merge_index/src/mi_app.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_app).
-
--behaviour(application).
-
-%% Application callbacks
--export([start/2, stop/1]).
-
-
-%% ===================================================================
-%% Application callbacks
-%% ===================================================================
-
-start(_StartType, _StartArgs) ->
-    mi_sup:start_link().
-
-stop(_State) ->
-    ok.

File apps/merge_index/src/mi_bloom.erl

-%% @doc Implementation of the Bloom filter data structure.
-%% @reference [http://en.wikipedia.org/wiki/Bloom_filter]
-
-%% Adapted from http://code.google.com/p/bloomerl for use in
-%% merge_index, where we are worried about speed of creating the bloom
-%% filter and testing membership as well as size of the bloom
-%% filter. By hard coding some parameters, we reduce the size. Also,
-%% by calculating the bloom filter in batches, we improve the
-%% performance.
-
--module(mi_bloom).
--export([new/1, is_element/2]).
--include("merge_index.hrl").
-
--ifdef(TEST).
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--endif.
--include_lib("eunit/include/eunit.hrl").
--endif.
-
-%% These settings give us a max 256 keys with 0.05 error rate.
--define(M, 1600).
--define(K, 4).
-
-%% @doc Generate a new bloom filter containing the specified keys.
-new(Keys) ->
-    OnBits = lists:usort(lists:flatten([calc_idxs(X) || X <- Keys])),
-    list_to_bitstring(generate_bits(0, OnBits)).
-
-generate_bits(Pos, [NextOnPos|T]) ->
-    Gap = NextOnPos - Pos - 1,
-    case Gap > 0 of
-        true ->
-            Bits = <<0:Gap/integer, 1:1/integer>>,
-            [Bits|generate_bits(Pos + Gap + 1, T)];
-        false ->
-            Bits = <<1:1/integer>>,
-            [Bits|generate_bits(Pos + 1, T)]
-    end;
-generate_bits(Pos, []) ->
-    Gap = ?M - Pos,
-    [<<0:Gap/integer>>].
-
-%% @spec is_element(string(), bloom()) -> bool()
-%% @doc Determines if the key is (probably) an element of the filter.
-is_element(Key, Bitmap) -> 
-    is_element(Key, Bitmap, calc_idxs(Key)).
-is_element(Key, Bitmap, [Idx | T]) ->
-    %% If we are looking for the first bit, do slightly different math
-    %% than if we are looking for later bits.
-    case Idx > 0 of
-        true ->
-            PreSize = Idx - 1,
-            <<_:PreSize/bits, Bit:1/bits, _/bits>> = Bitmap;
-        false ->
-            <<Bit:1/bits, _/bits>> = Bitmap
-    end,
-
-    %% Check if the bit is on.
-    case Bit of
-        <<1:1>> -> is_element(Key, Bitmap, T);
-        <<0:1>> -> false
-    end;
-is_element(_, _, []) -> 
-    true.
-
-% This uses the "enhanced double hashing" algorithm.
-% Todo: handle case of m > 2^32.
-calc_idxs(Key) ->
-    X = erlang:phash2(Key, ?M),
-    Y = erlang:phash2({"salt", Key}, ?M),
-    calc_idxs(?K - 1, X, Y, [X]).
-calc_idxs(0, _, _, Acc) -> 
-    Acc;
-calc_idxs(I, X, Y, Acc) ->
-    Xi = (X+Y) rem ?M,
-    Yi = (Y+I) rem ?M,
-    calc_idxs(I-1, Xi, Yi, [Xi | Acc]).
-
-%% UNIT TESTS
-
--ifdef(TEST).
-
--ifdef(EQC).
-
-prop_bloom_test_() ->
-    {timeout, 60, fun() -> ?assert(eqc:quickcheck(prop_bloom())) end}.
-
-g_keys() ->
-    non_empty(list(non_empty(binary()))).
-
-prop_bloom() ->
-    ?FORALL(Keys, g_keys(),
-            begin
-                Bloom = ?MODULE:new(Keys),
-                F = fun(X) -> is_element(X, Bloom) end,
-                lists:all(F, Keys)
-            end).
-
--endif.
-
--endif.

File apps/merge_index/src/mi_buffer.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_buffer).
--author("Rusty Klophaus <rusty@basho.com>").
--include("merge_index.hrl").
--export([
-    new/1,
-    filename/1,
-    close_filehandle/1,
-    delete/1,
-    filesize/1,
-    size/1,
-    write/7, write/2,
-    info/4,
-    iterator/1, iterator/4, iterators/6
-]).
-
--ifdef(TEST).
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--endif.
--include_lib("eunit/include/eunit.hrl").
--endif.
-
-
--record(buffer, {
-    filename,
-    handle,
-    table,
-    size
-}).
-
-%%% Creates a disk-based append-mode buffer file with support for a
-%%% sorted iterator.
-
-%% Open a new buffer. Returns a buffer structure.
-new(Filename) ->
-    %% Open the existing buffer file...
-    filelib:ensure_dir(Filename),
-    {ok, DelayedWriteSize} = application:get_env(merge_index, buffer_delayed_write_size),
-    {ok, DelayedWriteMS} = application:get_env(merge_index, buffer_delayed_write_ms),
-    FuzzedWriteSize = trunc(mi_utils:fuzz(DelayedWriteSize, 0.1)),
-    FuzzedWriteMS = trunc(mi_utils:fuzz(DelayedWriteMS, 0.1)),
-    {ok, FH} = file:open(Filename, [read, write, raw, binary, {delayed_write, FuzzedWriteSize, FuzzedWriteMS}]),
-
-    %% Read into an ets table...
-    Table = ets:new(buffer, [duplicate_bag, public]),
-    open_inner(FH, Table),
-    {ok, Size} = file:position(FH, cur),
-
-    %% Return the buffer.
-    #buffer { filename=Filename, handle=FH, table=Table, size=Size }.
-
-open_inner(FH, Table) ->
-    case read_value(FH) of
-        {ok, Postings} ->
-            write_to_ets(Table, Postings),
-            open_inner(FH, Table);
-        eof ->
-            ok
-    end.
-
-filename(Buffer) ->
-    Buffer#buffer.filename.
-
-delete(Buffer) ->
-    ets:delete(Buffer#buffer.table),
-    close_filehandle(Buffer),
-    file:delete(Buffer#buffer.filename),
-    file:delete(Buffer#buffer.filename ++ ".deleted"),
-    ok.
-
-close_filehandle(Buffer) ->
-    file:close(Buffer#buffer.handle).
-
-%% Return the current size of the buffer file.
-filesize(Buffer) ->
-    Buffer#buffer.size.
-
-size(Buffer) ->
-    ets:info(Buffer#buffer.table, size).
-
-%% Write the value to the buffer.
-%% Returns the new buffer structure.
-write(Index, Field, Term, Value, Props, TS, Buffer) ->
-    write([{Index, Field, Term, Value, Props, TS}], Buffer).
-
-write(Postings, Buffer) ->
-    %% Write to file...
-    FH = Buffer#buffer.handle,
-    BytesWritten = write_to_file(FH, Postings),
-
-    %% Return a new buffer with a new tree and size...
-    write_to_ets(Buffer#buffer.table, Postings),
-
-    %% Return the new buffer.
-    Buffer#buffer {
-        size = (BytesWritten + Buffer#buffer.size)
-    }.
-
-%% Return the number of results under this IFT.
-info(Index, Field, Term, Buffer) ->
-    Table = Buffer#buffer.table,
-    Key = {Index, Field, Term},
-    length(ets:lookup(Table, Key)).
-
-%% Return an iterator that traverses the entire buffer.
-iterator(Buffer) ->
-    Table = Buffer#buffer.table,
-    List1 = lists:sort(ets:tab2list(Table)),
-    List2 = [{I,F,T,V,K,P} || {{I,F,T},V,K,P} <- List1],
-    fun() -> iterate_list(List2) end.
-    
-%% Return an iterator that traverses the values for a term in the buffer.
-iterator(Index, Field, Term, Buffer) ->
-    Table = Buffer#buffer.table,
-    List1 = ets:lookup(Table, {Index, Field, Term}),
-    List2 = [{V,K,P} || {_Key,V,K,P} <- List1],
-    List3 = lists:sort(List2),
-    fun() -> iterate_list(List3) end.
-
-%% Return a list of iterators over a range.
-iterators(Index, Field, StartTerm, EndTerm, Size, Buffer) ->
-    Table = Buffer#buffer.table,
-    Keys = mi_utils:ets_keys(Table),
-    Filter = fun(Key) ->
-                     Key >= {Index, Field, StartTerm} 
-                         andalso 
-                         Key =< {Index, Field, EndTerm}
-                         andalso
-                         (Size == all orelse erlang:size(element(3, Key)) == Size)
-        end,
-    MatchingKeys = lists:filter(Filter, Keys),
-    [iterator(I,F,T, Buffer) || {I,F,T} <- MatchingKeys].
-
-%% Turn a list into an iterator.
-iterate_list([]) ->
-    eof;
-iterate_list([H|T]) ->
-    {H, fun() -> iterate_list(T) end}.
-
-
-%% ===================================================================
-%% Internal functions
-%% ===================================================================
-
-read_value(FH) ->
-    case file:read(FH, 4) of
-        {ok, <<Size:32/unsigned-integer>>} ->
-            {ok, B} = file:read(FH, Size),
-            {ok, binary_to_term(B)};
-        eof ->
-            eof
-    end.
-
-write_to_file(FH, Terms) when is_list(Terms) ->
-    %% Convert all values to binaries, count the bytes.
-    B = term_to_binary(Terms),
-    Size = erlang:size(B),
-    Bytes = <<Size:32/unsigned-integer, B/binary>>,
-    file:write(FH, Bytes),
-    Size + 2.
-
-write_to_ets(Table, Postings) ->
-    ets:insert(Table, Postings).
-
-%% %% ===================================================================
-%% %% EUnit tests
-%% %% ===================================================================
-%% -ifdef(TEST).
-
-%% -ifdef(EQC).
-
-%% -define(QC_OUT(P),
-%%         eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
-
-%% -define(POW_2(N), trunc(math:pow(2, N))).
-
-%% -define(FMT(Str, Args), lists:flatten(io_lib:format(Str, Args))).
-
-%% g_iftv() ->
-%%     non_empty(binary()).
-
-%% g_props() ->
-%%     list({oneof([word_pos, offset]), choose(0, ?POW_2(31))}).
-
-%% g_tstamp() ->
-%%     choose(0, ?POW_2(31)).
-
-%% %% g_ift_range(IFTs) ->
-%% %%     ?SUCHTHAT({Start, End}, {oneof(IFTs), oneof(IFTs)}, End >= Start).
-
-%% fold_iterator(Itr, Fn, Acc0) ->
-%%     fold_iterator_inner(Itr(), Fn, Acc0).
-
-%% fold_iterator_inner(eof, _Fn, Acc) ->
-%%     lists:reverse(Acc);
-%% fold_iterator_inner({Term, NextItr}, Fn, Acc0) ->
-%%     Acc = Fn(Term, Acc0),
-%%     fold_iterator_inner(NextItr(), Fn, Acc).
-
-
-%% prop_basic_test(Root) ->
-%%     ?FORALL(Entries, list({g_iftv(), g_iftv(), g_iftv(), g_iftv(), g_props(), g_tstamp()}),
-%%             begin
-%%                 %% Delete old files
-%%                 [file:delete(X) || X <- filelib:wildcard(filename:dirname(Root) ++ "/*")],
-
-%%                 %% Create a buffer
-%%                 Buffer = mi_buffer:write(Entries, mi_buffer:new(Root ++ "_buffer")),
-
-%%                 %% Filter the generated entries such that each {IFT, Value} is only present
-%%                 %% once and has the latest timestamp for that key
-%%                 F = fun({Index, Field, Term, Value, Props, Tstamp}, Acc) ->
-%%                             Key = {Index, Field, Term, Value},
-%%                             case orddict:find(Key, Acc) of
-%%                                 {ok, {_, ExistingTstamp}} when Tstamp >= ExistingTstamp ->
-%%                                     orddict:store({Key, {Props, Tstamp}}, Acc);
-%%                                 error ->
-%%                                     orddict:store({Key, {Props, Tstamp}}, Acc);
-%%                                 _ ->
-%%                                     Acc
-%%                             end
-%%                     end,
-%%                 ExpectedEntries = [{Index, Field, Term, Value, Props, Tstamp} ||
-%%                                       {{Index, Field, Term, Value}, {Props, Tstamp}}
-%%                                           <- lists:foldl(F, [], Entries)],
-
-%%                 %% Build a list of what was stored in the buffer
-%%                 ActualEntries = fold_iterator(mi_buffer:iterator(Buffer),
-%%                                               fun(Item, Acc0) -> [Item | Acc0] end, []),
-%%                 ?assertEqual(ExpectedEntries, ActualEntries),
-%%                 true
-%%             end).
-
-%% %% prop_iter_range_test(Root) ->
-%% %%     ?LET(IFTs, non_empty(list(g_iftv())),
-%% %%          ?FORALL({Entries, Range}, {list({oneof(IFTs), g_value(), g_props(), g_tstamp()}), g_ift_range(IFTs)},
-%% %%             begin
-%% %%                 %% Delete old files
-%% %%                 [file:delete(X) || X <- filelib:wildcard(filename:dirname(Root) ++ "/*")],
-
-%% %%                 %% Create a buffer
-%% %%                 Buffer = make_buffer(Entries, mi_buffer:new(Root ++ "_buffer")),
-
-%% %%                 %% Identify those values in the buffer that are in the generated range
-%% %%                 {Start, End} = Range,
-%% %%                 RangeEntries = fold_iterator(iterator(Start, End, Buffer),
-%% %%                                              fun(Item, Acc0) -> [Item | Acc0] end, []),
-
-%% %%                 %% Verify that all IFTs within the actual entries satisfy the constraint
-%% %%                 ?assertEqual([], [IFT || {IFT, _, _, _} <- RangeEntries,
-%% %%                                          IFT < Start, IFT > End]),
-
-%% %%                 %% Check that the count for the range matches the length of the returned
-%% %%                 %% range entries list
-%% %%                 ?assertEqual(length(RangeEntries), info(Start, End, Buffer)),
-%% %%                 true
-%% %%             end)).
-
-
-%% prop_basic_test_() ->
-%%     test_spec("/tmp/test/mi_buffer_basic", fun prop_basic_test/1).
-
-%% %% prop_iter_range_test_() ->
-%% %%     test_spec("/tmp/test/mi_buffer_iter", fun prop_iter_range_test/1).
-
-%% test_spec(Root, PropertyFn) ->
-%%     {timeout, 60, fun() ->      
-%%                           application:load(merge_index),
-%%                           os:cmd(?FMT("rm -rf ~s; mkdir -p ~s", [Root, Root])),
-%%                           ?assert(eqc:quickcheck(eqc:numtests(250, ?QC_OUT(PropertyFn(Root ++ "/t1")))))
-%%                   end}.
-
-
-
-%% -endif. %EQC
-%% -endif.

File apps/merge_index/src/mi_buffer_converter.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
-%%
-%% This module is a supervisor and gen_sever rolled into one.  The
-%% supervisor's sole purpose is to monitor the one gen_server it owns,
-%% and attempt a couple of restarts, or blow up if restarts happen too
-%% fast.
-%%
-%% The supervisor is started by mi_server, and the gen_server alerts
-%% mi_server to its presence.  mi_server talks directly to the worker
-%% using the convert/3 function.  mi_server is linked to the supervisor,
-%% so it gets an EXIT message only after the gen_server has reached its
-%% max restart limit.
--module(mi_buffer_converter).
-
-%-behaviour(supervisor). % not actually conflicting...
--behaviour(gen_server).
-
-%% API
--export([start_link/2, convert/3]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
-%% private callbacks
--export([start_worker/2]).
-
--record(state, {mi_server, mi_root}).
-
-%%====================================================================
-%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-%%--------------------------------------------------------------------
-start_link(MIServerPid, MIServerRoot) ->
-    supervisor:start_link(
-      ?MODULE, [supervisor, MIServerPid, MIServerRoot]).
-
-%% @private
-start_worker(MIServerPid, MIServerRoot) ->
-    gen_server:start_link(
-      ?MODULE, [gen_server, MIServerPid, MIServerRoot], []).
-
-convert(undefined, _Root, _Buffer) ->
-    %% mi_server tried to convert a buffer before
-    %% it had a registered converter: ignore
-    ok;
-convert(Converter, Root, Buffer) ->
-    gen_server:cast(Converter, {convert, Root, Buffer}).
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Func: init(Args) -> {ok,  {SupFlags,  [ChildSpec]}} |
-%%                     ignore                          |
-%%                     {error, Reason}
-%% Description: Whenever a supervisor is started using 
-%% supervisor:start_link/[2,3], this function is called by the new process 
-%% to find out about restart strategy, maximum restart frequency and child 
-%% specifications.
-%%--------------------------------------------------------------------
-init([supervisor, MIServerPid, MIServerRoot]) ->
-    AChild = {buffer_converter_worker,
-              {mi_buffer_converter,start_worker,
-               [MIServerPid, MIServerRoot]},
-              permanent,2000,worker,[mi_buffer_converter]},
-    {ok,{{one_for_all,2,1}, [AChild]}};
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%%                         {ok, State, Timeout} |
-%%                         ignore               |
-%%                         {stop, Reason}
-%% Description: Initiates the server
-%%--------------------------------------------------------------------
-init([gen_server, MIServerPid, MIServerRoot]) ->
-    mi_server:register_buffer_converter(MIServerPid, self()),
-    {ok, #state{mi_server=MIServerPid, mi_root=MIServerRoot}}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%%                                      {reply, Reply, State, Timeout} |
-%%                                      {noreply, State} |
-%%                                      {noreply, State, Timeout} |
-%%                                      {stop, Reason, Reply, State} |
-%%                                      {stop, Reason, State}
-%% Description: Handling call messages
-%%--------------------------------------------------------------------
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%%                                      {noreply, State, Timeout} |
-%%                                      {stop, Reason, State}
-%% Description: Handling cast messages
-%%--------------------------------------------------------------------
-handle_cast({convert, Root, Buffer}, #state{mi_root=Root}=State) ->
-    %% Calculate the segment filename, open the segment, and convert.
-    SNum  = mi_server:get_id_number(mi_buffer:filename(Buffer)),
-    SName = filename:join(Root, "segment." ++ integer_to_list(SNum)),
-
-    case mi_server:has_deleteme_flag(SName) of
-        true ->
-            %% remove files from a previously-failed conversion
-            file:delete(mi_segment:data_file(SName)),
-            file:delete(mi_segment:offsets_file(SName));
-        false ->
-            mi_server:set_deleteme_flag(SName)
-    end,
-    SegmentWO = mi_segment:open_write(SName),
-    mi_segment:from_buffer(Buffer, SegmentWO),
-    mi_server:buffer_to_segment(State#state.mi_server, Buffer, SegmentWO),
-    {noreply, State};
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%%                                       {noreply, State, Timeout} |
-%%                                       {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
-    ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------

File apps/merge_index/src/mi_locks.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_locks).
--include("merge_index.hrl").
--author("Rusty Klophaus <rusty@basho.com>").
--export([
-    new/0,
-    claim/2,
-    release/2,
-    when_free/3
-]).
-
--record(lock, {
-    key,
-    count,
-    funs=[]
-}).
-
-new() -> [].
-
-claim(Key, Locks) ->
-    case lists:keyfind(Key, #lock.key, Locks) of
-        Lock = #lock { count=Count } ->
-            NewLock = Lock#lock { count=Count + 1 },
-            lists:keystore(Key, #lock.key, Locks, NewLock);
-
-        false ->
-            NewLock = #lock { key=Key, count=1, funs=[] },
-            lists:keystore(Key, #lock.key, Locks, NewLock)
-    end.
-
-release(Key, Locks) ->
-    case lists:keyfind(Key, #lock.key, Locks) of
-        #lock { count=1, funs=Funs } ->
-            [X() || X <- Funs],
-            lists:keydelete(Key, #lock.key, Locks);
-
-        Lock = #lock { count=Count } ->
-            NewLock = Lock#lock { count = Count - 1 },
-            lists:keystore(Key, #lock.key, Locks, NewLock);
-
-        false ->
-            throw({lock_does_not_exist, Key})
-    end.
-
-%% Run the provided function when the key is free. If the key is
-%% currently free, then this is run immeditaely.
-when_free(Key, Fun, Locks) ->
-    case lists:keyfind(Key, #lock.key, Locks) of
-        false ->
-            Fun(),
-            Locks;
-
-        #lock { count=0, funs=Funs } ->
-            [X() || X <- [Fun|Funs]],
-            lists:keydelete(Key, #lock.key, Locks);
-            
-        Lock = #lock { funs=Funs} ->
-            NewLock = Lock#lock { funs=[Fun|Funs] },
-            lists:keystore(Key, #lock.key, Locks, NewLock)
-    end.

File apps/merge_index/src/mi_scheduler.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_scheduler).
-
-%% API
--export([
-    start_link/0,
-    start/0,
-    schedule_compaction/1
-]).
-%% Private export
--export([worker_loop/1]).
-
--include("merge_index.hrl").
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--record(state, { queue,
-                 worker }).
-
-%% ====================================================================
-%% API
-%% ====================================================================
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-start() ->
-    gen_server:start({local, ?MODULE}, ?MODULE, [], []).
-
-schedule_compaction(Pid) ->
-    gen_server:call(?MODULE, {schedule_compaction, Pid}, infinity).
-    
-
-%% ====================================================================
-%% gen_server callbacks
-%% ====================================================================
-
-init([]) ->
-    %% Trap exits of the actual worker process
-    process_flag(trap_exit, true),
-
-    %% Use a dedicated worker sub-process to do the actual merging. The
-    %% process may ignore messages for a long while during the compaction
-    %% and we want to ensure that our message queue doesn't fill up with
-    %% a bunch of dup requests for the same directory.
-    Self = self(),
-    WorkerPid = spawn_link(fun() -> worker_loop(Self) end),
-    {ok, #state{ queue = queue:new(),
-                 worker = WorkerPid }}.
-
-handle_call({schedule_compaction, Pid}, _From, #state { queue = Q } = State) ->
-    case queue:member(Pid, Q) of
-        true ->
-            {reply, already_queued, State};
-        false ->
-            NewState = State#state { queue = queue:in(Pid, Q) },
-            {reply, ok, NewState}
-    end;
-
-handle_call(Event, _From, State) ->
-    ?PRINT({unhandled_call, Event}),
-    {reply, ok, State}.
-
-handle_cast(Msg, State) ->
-    ?PRINT({unhandled_cast, Msg}),
-    {noreply, State}.
-
-handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) ->
-    case queue:out(Q) of
-        {empty, Q} ->
-            {noreply, State};
-        {{value, Pid}, NewQ} ->
-            WorkerPid ! {compaction, Pid},
-            NewState = State#state { queue=NewQ },
-            {noreply, NewState}
-    end;
-handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) ->
-    error_logger:error_msg("Compaction worker ~p exited: ~p\n", [WorkerPid, Reason]),
-    %% Start a new worker.
-    Self=self(),
-    NewWorkerPid = spawn_link(fun() -> worker_loop(Self) end),
-    NewState = State#state { worker=NewWorkerPid },
-    {noreply, NewState};
-
-handle_info(Info, State) ->
-    ?PRINT({unhandled_info, Info}),
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%% ====================================================================
-%% Internal worker
-%% ====================================================================
-
-worker_loop(Parent) ->
-    Parent ! {worker_ready, self()},
-    receive
-        {compaction, Pid} ->
-            Start = now(),
-            Result = merge_index:compact(Pid),
-            ElapsedSecs = timer:now_diff(now(), Start) / 1000000,
-            case Result of
-                {ok, OldSegments, OldBytes} ->
-                    case ElapsedSecs > 1 of
-                        true ->
-                            error_logger:info_msg(
-                              "Pid ~p compacted ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec\n",
-                              [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]);
-                        false ->
-                            ok
-                    end;
-                
-                {Error, Reason} when Error == error; Error == 'EXIT' ->
-                    error_logger:error_msg("Failed to compact ~p: ~p\n",
-                                           [Pid, Reason])
-            end,
-            ?MODULE:worker_loop(Parent);
-        _ ->
-            %% ignore unknown messages
-            ?MODULE:worker_loop(Parent)
-    after 1000 ->
-            ?MODULE:worker_loop(Parent)
-    end.
-

File apps/merge_index/src/mi_segment.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_segment).
--author("Rusty Klophaus <rusty@basho.com>").
--export([
-    exists/1,
-    open_read/1,
-    open_write/1,
-    filename/1,
-    filesize/1,
-    delete/1,
-    data_file/1,
-    offsets_file/1,
-    from_buffer/2,
-    from_iterator/2,
-    info/4,
-    iterator/1,
-    iterator/4,
-    iterators/6,
-
-    %% Used by QC tests, this is here to make compiler happy.
-    fold_iterator/3
-]).
-
--include("merge_index.hrl").
-
--include_lib("kernel/include/file.hrl").
--define(BLOCK_SIZE, 65536).
--define(BLOOM_CAPACITY, 512).
--define(BLOOM_ERROR, 0.01).
-
--ifdef(TEST).
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--endif.
--include_lib("eunit/include/eunit.hrl").
--endif.
-
-
-exists(Root) ->
-    filelib:is_file(data_file(Root)).
-
-%% Create and return a new segment structure.
-open_read(Root) ->
-    %% Create the file if it doesn't exist...
-    DataFileExists = filelib:is_file(data_file(Root)),
-    case DataFileExists of
-        true  ->
-            %% Get the fileinfo...
-            {ok, FileInfo} = file:read_file_info(data_file(Root)),
-
-            OffsetsTable = read_offsets(Root),
-            #segment {
-                       root=Root,
-                       offsets_table=OffsetsTable,
-                       size = FileInfo#file_info.size
-                     };
-        false ->
-            throw({?MODULE, missing__file, Root})
-    end.
-
-open_write(Root) ->
-    %% Create the file if it doesn't exist...
-    DataFileExists = filelib:is_file(data_file(Root)),
-    OffsetsFileExists = filelib:is_file(offsets_file(Root)),
-    case DataFileExists orelse OffsetsFileExists of
-        true  ->
-            throw({?MODULE, segment_already_exists, Root});
-        false ->
-            %% TODO: Do we really need to go through the trouble of writing empty files here?
-            file:write_file(data_file(Root), <<"">>),
-            file:write_file(offsets_file(Root), <<"">>),
-            #segment {
-                       root = Root,     
-                       offsets_table = ets:new(segment_offsets, [ordered_set, public])
-                     }
-    end.
-
-filename(Segment) ->
-    Segment#segment.root.
-
-filesize(Segment) ->
-    Segment#segment.size.
-
-delete(Segment) ->
-    [ok = file:delete(X) || X <- filelib:wildcard(Segment#segment.root ++ ".*")],
-    ets:delete(Segment#segment.offsets_table),
-    ok.
-
-%% Create a segment from a Buffer (see mi_buffer.erl)
-from_buffer(Buffer, Segment) ->
-    %% Open the iterator...
-    Iterator = mi_buffer:iterator(Buffer),
-    mi_segment_writer:from_iterator(Iterator, Segment).
-
-from_iterator(Iterator, Segment) ->
-    mi_segment_writer:from_iterator(Iterator, Segment).
-
-%% Return the number of results under this IFT.
-info(Index, Field, Term, Segment) ->
-    Key = {Index, Field, Term},
-    case get_offset_entry(Key, Segment) of
-        {OffsetEntryKey, {_, Bloom, _, KeyInfoList}} ->
-            case mi_bloom:is_element(Key, Bloom) of
-                true  -> 
-                    {_, _, OffsetEntryTerm} = OffsetEntryKey,
-                    EditSig = mi_utils:edit_signature(OffsetEntryTerm, Term),
-                    HashSig = mi_utils:hash_signature(Term),
-                    F = fun({EditSig2, HashSig2, _, _, Count}, Acc) ->
-                                case EditSig == EditSig2 andalso HashSig == HashSig2 of
-                                    true ->
-                                        Acc + Count;
-                                    false -> 
-                                        Acc
-                                end
-                        end,
-                    lists:foldl(F, 0, KeyInfoList);
-                false -> 
-                    0
-            end;
-        _ ->
-            0
-    end.
-
-
-%% iterator/1 - Create an iterator over the entire segment.
-iterator(Segment) ->
-    %% Check if the segment is small enough such that we want to read
-    %% the entire thing into memory.
-    {ok, FullReadSize} = application:get_env(merge_index, segment_full_read_size),
-    case filesize(Segment) =< FullReadSize of
-        true ->
-            %% Read the entire segment into memory.
-            {ok, Bytes} = file:read_file(data_file(Segment)),
-            fun() -> iterate_all_bytes(undefined, Bytes) end;
-        false ->
-            %% Open a filehandle to the start of the segment.
-            {ok, ReadAheadSize} = application:get_env(merge_index, segment_compact_read_ahead_size),
-            {ok, FH} = file:open(data_file(Segment), [read, raw, binary, {read_ahead, ReadAheadSize}]),
-            fun() -> iterate_all_filehandle(FH, undefined, undefined) end
-    end.
-
-%% @private Create an iterator over a binary which represents the
-%% entire segment.
-iterate_all_bytes(LastKey, <<1:1/integer, Size:15/unsigned-integer, Bytes:Size/binary, Rest/binary>>) ->
-    Key = expand_key(LastKey, binary_to_term(Bytes)),
-    iterate_all_bytes(Key, Rest);
-iterate_all_bytes(Key, <<0:1/integer, Size:31/unsigned-integer, Bytes:Size/binary, Rest/binary>>) ->
-    Results = binary_to_term(Bytes),
-    iterate_all_bytes_1(Key, Results, Rest);
-iterate_all_bytes(_, <<>>) ->
-    eof.
-iterate_all_bytes_1(Key, [Result|Results], Rest) ->
-    {I,F,T} = Key,
-    {V,K,P} = Result,
-    {{I,F,T,V,K,P}, fun() -> iterate_all_bytes_1(Key, Results, Rest) end};
-iterate_all_bytes_1(Key, [], Rest) ->
-    iterate_all_bytes(Key, Rest).
-    
-%% @private Create an iterator over a filehandle starting at position
-%% 0 of the segment.
-iterate_all_filehandle(File, BaseKey, {key, ShrunkenKey}) ->
-    CurrKey = expand_key(BaseKey, ShrunkenKey),
-    {I,F,T} = CurrKey,
-    Transform = fun({V,K,P}) -> {I,F,T,V,K,P} end,
-    WhenDone = fun(NextEntry) -> iterate_all_filehandle(File, CurrKey, NextEntry) end,
-    iterate_by_term_values(File, Transform, WhenDone);
-iterate_all_filehandle(File, BaseKey, undefined) ->
-    iterate_all_filehandle(File, BaseKey, read_seg_entry(File));
-iterate_all_filehandle(File, _, eof) ->
-    file:close(File),
-    eof.
-
-
-%%% Create an iterater over a single Term.
-iterator(Index, Field, Term, Segment) ->
-    %% Find the Key containing the offset information we need.
-    Key = {Index, Field, Term},
-    case get_offset_entry(Key, Segment) of
-        {OffsetEntryKey, {BlockStart, Bloom, _LongestPrefix, KeyInfoList}} ->
-            %% If we're aiming for an exact match, then check the
-            %% bloom filter.
-            case mi_bloom:is_element(Key, Bloom) of
-                true -> 
-                    {_, _, OffsetEntryTerm} = OffsetEntryKey,
-                    EditSig = mi_utils:edit_signature(OffsetEntryTerm, Term),
-                    HashSig = mi_utils:hash_signature(Term),
-                    iterate_by_keyinfo(OffsetEntryKey, Key, EditSig, HashSig, BlockStart, KeyInfoList, Segment);
-                false ->
-                    fun() -> eof end
-            end;
-        undefined ->
-            fun() -> eof end
-    end.
-
-%% Use the provided KeyInfo list to skip over terms that don't match
-%% based on the edit signature. Clauses are ordered for most common
-%% paths first.
-iterate_by_keyinfo(BaseKey, Key, EditSigA, HashSigA, FileOffset, [Match={EditSigB, HashSigB, KeySize, ValuesSize, _}|Rest], Segment) ->
-    %% In order to consider this a match, both the edit signature AND the hash signature must match.
-    case EditSigA /= EditSigB orelse HashSigA /= HashSigB of
-        true ->
-            iterate_by_keyinfo(BaseKey, Key, EditSigA, HashSigA, FileOffset + KeySize + ValuesSize, Rest, Segment);
-        false ->
-            {ok, ReadAheadSize} = application:get_env(merge_index, segment_query_read_ahead_size),
-            {ok, FH} = file:open(data_file(Segment), [read, raw, binary, {read_ahead, ReadAheadSize}]),
-            file:position(FH, FileOffset),
-            iterate_by_term(FH, BaseKey, [Match|Rest], Key)
-    end;
-iterate_by_keyinfo(_, _, _, _, _, [], _) ->
-    fun() -> eof end.
-
-%% Iterate over the segment file until we find the start of the values
-%% section we want.
-iterate_by_term(File, BaseKey, [{_, _, _, ValuesSize, _}|KeyInfoList], Key) ->
-    %% Read the next entry in the segment file.  Value should be a
-    %% key, otherwise error. 
-    case read_seg_entry(File) of
-        {key, ShrunkenKey} ->
-            CurrKey = expand_key(BaseKey, ShrunkenKey),
-            %% If the key is smaller than the one we need, keep
-            %% jumping. If it's the one we need, then iterate
-            %% values. Otherwise, it's too big, so close the file and
-            %% return.
-            if 
-                CurrKey < Key ->
-                    file:read(File, ValuesSize),
-                    iterate_by_term(File, CurrKey, KeyInfoList, Key);
-                CurrKey == Key ->
-                    Transform = fun(Value) -> Value end,
-                    WhenDone = fun(_) -> file:close(File), eof end,
-                    fun() -> iterate_by_term_values(File, Transform, WhenDone) end;
-                CurrKey > Key ->
-                    file:close(File),
-                    fun() -> eof end
-            end;
-        _ ->
-            %% Shouldn't get here. If we're here, then the Offset
-            %% values are broken in some way.
-            file:close(File),
-            throw({iterate_term, offset_fail})
-    end;
-iterate_by_term(File, _, [], _) ->
-    file:close(File),
-    fun() -> eof end.
-    
-iterate_by_term_values(File, TransformFun, WhenDoneFun) ->
-    %% Read the next value, expose as iterator.
-    case read_seg_entry(File) of
-        {values, Results} ->
-            iterate_by_term_values_1(Results, File, TransformFun, WhenDoneFun);
-        Other ->
-            WhenDoneFun(Other)
-    end.
-iterate_by_term_values_1([Result|Results], File, TransformFun, WhenDoneFun) ->
-    {TransformFun(Result), fun() -> iterate_by_term_values_1(Results, File, TransformFun, WhenDoneFun) end};
-iterate_by_term_values_1([], File, TransformFun, WhenDoneFun) ->
-    iterate_by_term_values(File, TransformFun, WhenDoneFun).
-
-%% iterators/5 - Return a list of iterators for all the terms in a
-%% given range.
-iterators(Index, Field, StartTerm, EndTerm, Size, Segment) ->
-    %% Find the Key containing the offset information we need.
-    StartKey = {Index, Field, StartTerm},
-    EndKey = {Index, Field, EndTerm},
-    case get_offset_entry(StartKey, Segment) of
-        {OffsetEntryKey, {BlockStart, _, _, _}} ->
-            {ok, ReadAheadSize} = application:get_env(merge_index, segment_query_read_ahead_size),
-            {ok, FH} = file:open(data_file(Segment), [read, raw, binary, {read_ahead, ReadAheadSize}]),
-            file:position(FH, BlockStart),
-            iterate_range_by_term(FH, OffsetEntryKey, StartKey, EndKey, Size);
-        undefined ->
-            [fun() -> eof end]
-    end.
-
-%% iterate_range_by_term/5 - Generate a list of iterators matching the
-%% provided range. Keep everything in memory for now. Returns the list
-%% of iterators. TODO - In the future, once we've amassed enough
-%% iterators, write the data out to a separate temporary file.
-iterate_range_by_term(File, BaseKey, StartKey, EndKey, Size) ->
-    iterate_range_by_term_1(File, BaseKey, StartKey, EndKey, Size, false, [], []).
-iterate_range_by_term_1(File, BaseKey, StartKey, EndKey, Size, IterateOverValues, ResultsAcc, IteratorsAcc) ->
-    case read_seg_entry(File) of
-        {key, ShrunkenKey} ->
-            %% Expand the possibly shrunken key...
-            CurrKey = expand_key(BaseKey, ShrunkenKey),
-
-            %% If the key is smaller than the one we need, keep
-            %% jumping. If it's in the range we need, then iterate
-            %% values. Otherwise, it's too big, so close the file and
-            %% return.
-            if 
-                CurrKey < StartKey ->
-                    iterate_range_by_term_1(File, CurrKey, StartKey, EndKey, Size, false, [], IteratorsAcc);
-                CurrKey =< EndKey ->
-                    NewIteratorsAcc = possibly_add_iterator(ResultsAcc, IteratorsAcc),
-                    case Size == 'all' orelse size(element(3, CurrKey)) == Size of
-                        true ->
-                            iterate_range_by_term_1(File, CurrKey, StartKey, EndKey, Size, true, [], NewIteratorsAcc);
-                        false ->
-                            iterate_range_by_term_1(File, CurrKey, StartKey, EndKey, Size, false, [], NewIteratorsAcc)
-                    end;
-                CurrKey > EndKey ->
-                    file:close(File),
-                    possibly_add_iterator(ResultsAcc, IteratorsAcc)
-            end;
-        {values, Results} when IterateOverValues ->
-            iterate_range_by_term_1(File, BaseKey, StartKey, EndKey, Size, true, [Results|ResultsAcc], IteratorsAcc);
-        {values, _Results} when not IterateOverValues ->
-            iterate_range_by_term_1(File, BaseKey, StartKey, EndKey, Size, false, [], IteratorsAcc);
-        eof ->
-            %% Shouldn't get here. If we're here, then the Offset
-            %% values are broken in some way.
-            file:close(File),
-            possibly_add_iterator(ResultsAcc, IteratorsAcc)
-    end.
-
-possibly_add_iterator([], IteratorsAcc) ->
-    IteratorsAcc;
-possibly_add_iterator(Results, IteratorsAcc) ->
-    Results1 = lists:flatten(lists:reverse(Results)),
-    Iterator = fun() -> iterate_list(Results1) end,
-    [Iterator, IteratorsAcc].
-    
-%% Turn a list into an iterator.
-iterate_list([]) ->
-    eof;
-iterate_list([H|T]) ->
-    {H, fun() -> iterate_list(T) end}.
-
-%% PRIVATE FUNCTIONS
-
-%% Given a key, look up the entry in the offsets table and return
-%% {OffsetKey, StartPos, Offsets, Bloom} or 'undefined'.
-get_offset_entry(Key, Segment) ->
-    case ets:lookup(Segment#segment.offsets_table, Key) of
-        [] ->
-            case ets:next(Segment#segment.offsets_table, Key) of
-                '$end_of_table' -> 
-                    undefined;
-                OffsetKey ->    
-                    %% Look up the offset information.
-                    [{OffsetKey, Value}] = ets:lookup(Segment#segment.offsets_table, OffsetKey),
-                    {OffsetKey, binary_to_term(Value)}
-            end;
-        [{OffsetKey, Value}] ->
-            {OffsetKey, binary_to_term(Value)}
-    end.
-
-
-%% Read the offsets file from disk. If it's not found, then recreate
-%% it from the data file. Return the offsets tree.
-read_offsets(Root) ->
-    case ets:file2tab(offsets_file(Root)) of
-        {ok, OffsetsTable} ->
-            OffsetsTable;
-        {error, Reason} ->
-            %% TODO - File doesn't exist -- Rebuild it.
-            throw({?MODULE, {offsets_file_error, Reason}})
-    end.
-
-
-read_seg_entry(FH) ->
-    case file:read(FH, 1) of
-        {ok, <<0:1/integer, Size1:7/bitstring>>} ->
-            {ok, <<Size2:24/bitstring>>} = file:read(FH, 3),
-            <<TotalSize:31/unsigned-integer>> = <<Size1:7/bitstring, Size2:24/bitstring>>,
-            {ok, B} = file:read(FH, TotalSize),
-            {values, binary_to_term(B)};
-        {ok, <<1:1/integer, Size1:7/bitstring>>} ->
-            {ok, <<Size2:8/bitstring>>} = file:read(FH, 1),
-            <<TotalSize:15/unsigned-integer>> = <<Size1:7/bitstring, Size2:8/bitstring>>,
-            {ok, B} = file:read(FH, TotalSize),
-            {key, binary_to_term(B)};
-        eof ->
-            eof
-    end.
-
-data_file(Segment) when is_record(Segment, segment) ->
-    data_file(Segment#segment.root);
-data_file(Root) ->
-    Root ++ ".data".
-
-offsets_file(Segment) when is_record(Segment, segment) ->
-    offsets_file(Segment#segment.root);
-offsets_file(Root) ->
-    Root ++ ".offsets".
-
-fold_iterator(Itr, Fn, Acc0) ->
-    fold_iterator_inner(Itr(), Fn, Acc0).
-
-fold_iterator_inner(eof, _Fn, Acc) ->
-    lists:reverse(Acc);
-fold_iterator_inner({Term, NextItr}, Fn, Acc0) ->
-    Acc = Fn(Term, Acc0),
-    fold_iterator_inner(NextItr(), Fn, Acc).
-
-%% expand_key/2 - Given a BaseKey and a shrunken Key, return
-%% the actual key by re-adding the field and term if
-%% encessary. Clauses ordered by most common first.
-expand_key({Index, Field, _}, Term) when not is_tuple(Term) ->
-    {Index, Field, Term};
-expand_key({Index, _, _}, {Field, Term}) ->
-    {Index, Field, Term};
-expand_key(_, {Index, Field, Term}) ->
-    {Index, Field, Term}.
-
-
-%% %% ===================================================================
-%% %% EUnit tests
-%% %% ===================================================================
-%% -ifdef(TEST).
-
-%% -ifdef(EQC).
-
-%% -define(QC_OUT(P),
-%%         eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
-
-%% -define(POW_2(N), trunc(math:pow(2, N))).
-
-%% g_ift() ->
-%%     choose(0, ?POW_2(62)).
-
-%% g_value() ->
-%%     non_empty(binary()).
-
-%% g_props() ->
-%%     list({oneof([word_pos, offset]), choose(0, ?POW_2(31))}).
-
-%% g_tstamp() ->
-%%     choose(0, ?POW_2(31)).
-
-%% make_buffer([], B) ->
-%%     B;
-%% make_buffer([{Ift, Value, Props, Tstamp} | Rest], B0) ->
-%%     B = mi_buffer:write(Ift, Value, Props, Tstamp, B0),
-%%     make_buffer(Rest, B).
-
-
-%% prop_basic_test(Root) ->
-%%     ?FORALL(Entries, list({g_ift(), g_value(), g_props(), g_tstamp()}),
-%%             begin
-%%                 %% Delete old files
-%%                 [file:delete(X) || X <- filelib:wildcard(filename:dirname(Root) ++ "/*")],
-
-%%                 %% Setup a buffer
-%%                 Buffer = make_buffer(Entries, mi_buffer:new(Root ++ "_buffer")),
-
-%%                 %% Build a list of what was actually stored in the buffer -- this is what
-%%                 %% we expect to be present in the segment
-%%                 BufferEntries = fold_iterator(mi_buffer:iterator(Buffer),
-%%                                               fun(Item, Acc0) -> [Item | Acc0] end, []),
-
-%%                 %% Merge the buffer into a segment
-%%                 from_buffer(Buffer, open_write(Root ++ "_segment")),
-%%                 Segment = open_read(Root ++ "_segment"),
-
-%%                 %% Fold over the entire segment
-%%                 SegEntries = fold_iterator(iterator(Segment), fun(Item, Acc0) -> [Item | Acc0] end, []),
-
-%%                 ?assertEqual(BufferEntries, SegEntries),
-%%                 true
-%%             end).
-
-
-%% prop_basic_test_() ->
-%%     {timeout, 60, fun() ->
-%%                           os:cmd("rm -rf /tmp/test_mi; mkdir -p /tmp/test_mi"),
-%%                           ?assert(eqc:quickcheck(?QC_OUT(prop_basic_test("/tmp/test_mi/t1"))))
-%%                   end}.
-
-
-%% -endif. % EQC
-
-%% -endif.
-
-%% %% test() ->
-%% %%     %% Clean up old files...
-%% %%     [file:delete(X) || X <- filelib:wildcard("/tmp/test_merge_index_*")],
-
-%% %%     %% Create a buffer...
-%% %%     BufferA = mi_buffer:open("/tmp/test_merge_index_bufferA", [write]),
-%% %%     BufferA1 = mi_buffer:write(<<1>>, 1, [], 1, BufferA),
-%% %%     BufferA2 = mi_buffer:write(<<2>>, 2, [], 1, BufferA1),
-%% %%     BufferA3 = mi_buffer:write(<<3>>, 3, [], 1, BufferA2),
-%% %%     BufferA4 = mi_buffer:write(<<4>>, 4, [], 1, BufferA3),
-%% %%     BufferA5 = mi_buffer:write(<<4>>, 5, [], 1, BufferA4),
-
-%% %%     %% Merge into the segment...
-%% %%     SegmentA = from_buffer("/tmp/test_merge_index_segment", BufferA5),
-    
-%% %%     %% Check the results...
-%% %%     SegmentIteratorA = iterator(SegmentA),
-%% %%     {{<<1>>, 1, [], 1}, SegmentIteratorA1} = SegmentIteratorA(),
-%% %%     {{<<2>>, 2, [], 1}, SegmentIteratorA2} = SegmentIteratorA1(),
-%% %%     {{<<3>>, 3, [], 1}, SegmentIteratorA3} = SegmentIteratorA2(),
-%% %%     {{<<4>>, 4, [], 1}, SegmentIteratorA4} = SegmentIteratorA3(),
-%% %%     {{<<4>>, 5, [], 1}, SegmentIteratorA5} = SegmentIteratorA4(),
-%% %%     eof = SegmentIteratorA5(),
-
-%% %%     %% Do a partial iterator...
-%% %%     SegmentIteratorB = iterator(<<2>>, <<3>>, SegmentA),
-%% %%     {{<<2>>, 2, [], 1}, SegmentIteratorB1} = SegmentIteratorB(),
-%% %%     {{<<3>>, 3, [], 1}, SegmentIteratorB2} = SegmentIteratorB1(),
-%% %%     eof = SegmentIteratorB2(),
-
-%% %%     %% Check out the infos...
-%% %%     1 = info(<<2>>, SegmentA),
-%% %%     2 = info(<<4>>, SegmentA),
-%% %%     4 = info(<<2>>, <<4>>, SegmentA),
-
-%% %%     %% Read from an existing segment...
-%% %%     SegmentB = open_read(SegmentA#segment.root),
-%% %%     1 = info(<<2>>, SegmentB),
-%% %%     2 = info(<<4>>, SegmentB),
-%% %%     4 = info(<<2>>, <<4>>, SegmentB),
-
-%% %%     %% Test offset repair...
-%% %%     file:delete(offsets_file(SegmentA)),
-%% %%     SegmentC = open_read(SegmentA#segment.root),
-%% %%     1 = info(<<2>>, SegmentC),
-%% %%     2 = info(<<4>>, SegmentC),
-%% %%     4 = info(<<2>>, <<4>>, SegmentC),
-
-%% %%     all_tests_passed.

File apps/merge_index/src/mi_segment_writer.erl

-%% -------------------------------------------------------------------
-%%
-%% mi: Merge-Index Data Store
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% -------------------------------------------------------------------
--module(mi_segment_writer).
--author("Rusty Klophaus <rusty@basho.com>").
-
--export([
-    from_iterator/2
-]).
-
--include("merge_index.hrl").
-
--include_lib("kernel/include/file.hrl").
--define(BLOCK_SIZE(W), W#writer.segment_block_size).
--define(FILE_BUFFER_SIZE(W), W#writer.segment_file_buffer_size).
--define(VALUES_STAGING_SIZE(W), W#writer.segment_values_staging_size).
--define(VALUES_COMPRESS_THRESHOLD(W), W#writer.segment_values_compression_threshold).
--define(VALUES_COMPRESS_LEVEL(W), W#writer.segment_values_compression_level).
--define(INDEX_FIELD_TERM(X), {element(1, X), element(2, X), element(3, X)}).
--define(VALUE(X), element(4, X)).
--define(VALUE_TSTAMP_PROPS(X), {element(4, X), element(5, X), element(6, X)}).
-
--record(writer, {data_file,
-                 offsets_table,
-                 pos=0,
-                 block_start=0,
-                 keys=[],
-                 last_key=undefined,
-                 key_start=0,
-                 values_start=0,
-                 values_count=0,
-                 values_staging=[],
-                 compressed_values=false,
-                 buffer=[],
-                 buffer_size=0,
-                 
-                 %% We are caching these settings in the #writer state
-                 %% to avoid looking them up repeatedly. This saves
-                 %% quite a bit of time, as 100,000 lookups on my
-                 %% computer is about 1/2 second.
-                 segment_block_size=element(2,application:get_env(merge_index, segment_block_size)),
-                 segment_file_buffer_size=element(2,application:get_env(merge_index, segment_file_buffer_size)),
-                 segment_values_staging_size=element(2,application:get_env(merge_index, segment_values_staging_size)),
-                 segment_values_compression_threshold=element(2,application:get_env(merge_index, segment_values_compression_threshold)),
-                 segment_values_compression_level=element(2,application:get_env(merge_index, segment_values_compression_level))
-         }).
-
-from_iterator(Iterator, Segment) ->
-    %% Open the data file...
-    {ok, DelayedWriteSize} = application:get_env(merge_index, segment_delayed_write_size),
-    {ok, DelayedWriteMS} = application:get_env(merge_index, segment_delayed_write_ms),
-    {ok, DataFile} = file:open(mi_segment:data_file(Segment), [write, raw, binary, {delayed_write, DelayedWriteSize, DelayedWriteMS}]),
-
-    W = #writer {
-      data_file=DataFile,
-      offsets_table=Segment#segment.offsets_table
-     },
-
-    try
-        from_iterator(Iterator(), undefined, undefined, W),
-        ok = ets:tab2file(W#writer.offsets_table, mi_segment:offsets_file(Segment))
-    after
-        file:close(W#writer.data_file),
-        ets:delete(W#writer.offsets_table)
-    end,    
-    ok.
-    
-
-%% from_iterator_inner/4 - responsible for taking an iterator,
-%% removing duplicate values, and then calling the correct
-%% from_iterator_process_*/N functions, which should update the state variable.
-from_iterator({Entry, Iterator}, StartIFT, LastValue, W) 
-  when ?INDEX_FIELD_TERM(Entry) == StartIFT andalso
-       ?VALUE(Entry) /= LastValue ->
-    %% Add the next value to the segment.
-    W1 = from_iterator_process_value(?VALUE_TSTAMP_PROPS(Entry), W),
-    from_iterator(Iterator(), StartIFT, ?VALUE(Entry), W1);
-
-from_iterator({Entry, Iterator}, StartIFT, _LastValue, W) 
-  when ?INDEX_FIELD_TERM(Entry) /= StartIFT andalso StartIFT /= undefined ->
-    %% Start a new term.
-    W1 = from_iterator_process_end_term(StartIFT, W),
-    W2 = from_iterator_process_start_term(?INDEX_FIELD_TERM(Entry), W1),
-    W3 = from_iterator_process_value(?VALUE_TSTAMP_PROPS(Entry), W2),
-    from_iterator(Iterator(), ?INDEX_FIELD_TERM(Entry), ?VALUE(Entry), W3);
-
-from_iterator({Entry, Iterator}, StartIFT, LastValue, W) 
-  when ?INDEX_FIELD_TERM(Entry) == StartIFT andalso
-       ?VALUE(Entry) == LastValue ->
-    %% Eliminate a duplicate value.
-    from_iterator(Iterator(), StartIFT, LastValue, W);
-
-from_iterator({Entry, Iterator}, undefined, undefined, W) ->
-    %% Start of segment.
-    W1 = from_iterator_process_start_segment(W),
-    W2 = from_iterator_process_start_term(?INDEX_FIELD_TERM(Entry), W1),
-    W3 = from_iterator_process_value(?VALUE_TSTAMP_PROPS(Entry), W2),
-    from_iterator(Iterator(), ?INDEX_FIELD_TERM(Entry), ?VALUE(Entry), W3);
-
-from_iterator(eof, undefined, _, W) ->
-    %% End of segment, but we never got any values.
-    W;
-
-from_iterator(eof, StartIFT, _LastValue, W) ->
-    %% End of segment.
-    W1 = from_iterator_process_end_term(StartIFT, W),
-    W2 = from_iterator_process_end_segment(W1),
-    W2.
-
-
-%% One method below for each different stage of writing a segment: start_segment, start_block, start_term, value, end_term, end_block, end_segment.
-
-from_iterator_process_start_segment(W) -> 
-    W.
-
-from_iterator_process_start_block(W) -> 
-    %% Set the block_start position, and zero out keys for this block..
-    W#writer {
-      block_start = W#writer.pos,
-      keys = []
-     }.
-
-from_iterator_process_start_term(Key, W) -> 
-    %% If the Index or Field value for this key is different from the
-    %% last one, then close the last block and start a new block. This