Source

casbench / src / thrift_disk_log_transport.erl

The default branch has multiple heads

%%
%% Licensed to the Apache Software Foundation (ASF) under one
%% or more contributor license agreements. See the NOTICE file
%% distributed with this work for additional information
%% regarding copyright ownership. The ASF licenses this file
%% to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance
%% with the License. You may obtain a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%

%%% Todo: this might be better off as a gen_server type of transport
%%%       that handles stuff like group commit, similar to TFileTransport
%%%       in cpp land
-module(thrift_disk_log_transport).

-behaviour(thrift_transport).

%% API
-export([new/2, new_transport_factory/2, new_transport_factory/3]).

%% thrift_transport callbacks
-export([read/2, write/2, force_flush/1, flush/1, close/1]).

%% state
-record(dl_transport, {log,
                       close_on_close = false,
                       sync_every = infinity,
                       sync_tref}).


%% Create a transport attached to an already open log.
%% If you'd like this transport to close the disk_log using disk_log:lclose()
%% when the transport is closed, pass a {close_on_close, true} tuple in the
%% Opts list.
new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
    State = parse_opts(Opts, #dl_transport{log = LogName}),

    State2 =
        case State#dl_transport.sync_every of
            N when is_integer(N), N > 0 ->
                {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
                State#dl_transport{sync_tref = TRef};
            _ -> State
        end,

    thrift_transport:new(?MODULE, State2).


parse_opts([], State) ->
    State;
parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
    State#dl_transport{close_on_close = Bool};
parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
    State#dl_transport{sync_every = Int}.


%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% disk_log_transport is write-only
read(_State, Len) ->
    {error, no_read_from_disk_log}.

write(#dl_transport{log = Log}, Data) ->
    disk_log:balog(Log, erlang:iolist_to_binary(Data)).

force_flush(#dl_transport{log = Log}) ->
    error_logger:info_msg("~p syncing~n", [?MODULE]),
    disk_log:sync(Log).

flush(#dl_transport{log = Log, sync_every = SE}) ->
    case SE of
        undefined -> % no time-based sync
            disk_log:sync(Log);
        _Else ->     % sync will happen automagically
            ok
    end.


%% On close, close the underlying log if we're configured to do so.
close(#dl_transport{close_on_close = false}) ->
    ok;
close(#dl_transport{log = Log}) ->
    disk_log:lclose(Log).


%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

new_transport_factory(Name, ExtraLogOpts) ->
    new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
                                               {sync_every, 500}]).

new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
    F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
    {ok, F}.

factory_impl(Name, ExtraLogOpts, TransportOpts) ->
    LogOpts = [{name, Name},
               {format, external},
               {type, wrap} |
               ExtraLogOpts],
    Log =
        case disk_log:open(LogOpts) of
            {ok, Log} ->
                Log;
            {repaired, Log, Info1, Info2} ->
                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
                Log
        end,
    new(Log, TransportOpts).
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.