diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index dd384af7c..d6f959510 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -54,3 +54,4 @@ -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. -define(TEST_ID_PREFIX, "_test_:"). +-define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/include/emqx_resource_utils.hrl b/apps/emqx_resource/include/emqx_resource_utils.hrl index 8d94746eb..3df64b1e5 100644 --- a/apps/emqx_resource/include/emqx_resource_utils.hrl +++ b/apps/emqx_resource/include/emqx_resource_utils.hrl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -define(SAFE_CALL(_EXP_), - ?SAFE_CALL(_EXP_, ok) + ?SAFE_CALL(_EXP_, {error, {_EXCLASS_, _EXCPTION_, _ST_}}) ). -define(SAFE_CALL(_EXP_, _EXP_ON_FAIL_), @@ -24,8 +24,7 @@ (_EXP_) catch _EXCLASS_:_EXCPTION_:_ST_ -> - _EXP_ON_FAIL_, - {error, {_EXCLASS_, _EXCPTION_, _ST_}} + _EXP_ON_FAIL_ end end() ). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 33f0d0a3d..793b9f446 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -83,8 +83,7 @@ stop/1, %% query the instance query/2, - %% query the instance with after_query() - query/3 + query_async/3 ]). %% Direct calls to the callback module @@ -111,6 +110,8 @@ list_group_instances/1 ]). +-export([inc_metrics_funcs/1, inc_success/1, inc_failed/1]). + -optional_callbacks([ on_query/4, on_get_status/2 @@ -150,16 +151,16 @@ is_resource_mod(Module) -> -spec query_success(after_query()) -> ok. query_success(undefined) -> ok; -query_success({OnSucc, _}) -> apply_query_after_calls(OnSucc). +query_success({OnSucc, _}) -> exec_query_after_calls(OnSucc). -spec query_failed(after_query()) -> ok. query_failed(undefined) -> ok; -query_failed({_, OnFailed}) -> apply_query_after_calls(OnFailed). +query_failed({_, OnFailed}) -> exec_query_after_calls(OnFailed). -apply_query_after_calls(Funcs) -> +exec_query_after_calls(Funcs) -> lists:foreach( - fun({Fun, Args}) -> - safe_apply(Fun, Args) + fun({Fun, Arg}) -> + emqx_resource_utils:safe_exec(Fun, Arg) end, Funcs ). @@ -243,29 +244,12 @@ reset_metrics(ResId) -> %% ================================================================================= -spec query(resource_id(), Request :: term()) -> Result :: term(). query(ResId, Request) -> - query(ResId, Request, inc_metrics_funcs(ResId)). + emqx_resource_worker:query(ResId, Request). -%% same to above, also defines what to do when the Module:on_query success or failed -%% it is the duty of the Module to apply the `after_query()` functions. --spec query(resource_id(), Request :: term(), after_query()) -> Result :: term(). -query(ResId, Request, AfterQuery) -> - case emqx_resource_manager:ets_lookup(ResId) of - {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> - %% the resource state is readonly to Module:on_query/4 - %% and the `after_query()` functions should be thread safe - ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched), - try - Mod:on_query(ResId, Request, AfterQuery, ResourceState) - catch - Err:Reason:ST -> - emqx_metrics_worker:inc(resource_metrics, ResId, exception), - erlang:raise(Err, Reason, ST) - end; - {ok, _Group, _Data} -> - query_error(not_connected, <<"resource not connected">>); - {error, not_found} -> - query_error(not_found, <<"resource not found">>) - end. +-spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) -> + ok. +query_async(ResId, Request, ReplyFun) -> + emqx_resource_worker:query_async(ResId, Request, ReplyFun). -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> @@ -429,16 +413,16 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> %% ================================================================================= +inc_success(ResId) -> + emqx_metrics_worker:inc(?RES_METRICS, ResId, success). + +inc_failed(ResId) -> + emqx_metrics_worker:inc(?RES_METRICS, ResId, failed). + filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. inc_metrics_funcs(ResId) -> - OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}], - OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}], + OnSucc = [{fun ?MODULE:inc_success/1, ResId}], + OnFailed = [{fun ?MODULE:inc_failed/1, ResId}], {OnSucc, OnFailed}. - -safe_apply(Func, Args) -> - ?SAFE_CALL(erlang:apply(Func, Args)). - -query_error(Reason, Msg) -> - {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 82be9c58f..f1fa36173 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -109,9 +109,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), ok = emqx_metrics_worker:create_metrics( - resource_metrics, + ?RES_METRICS, ResId, - [matched, success, failed, exception], + [matched, success, failed, exception, resource_error], [matched] ), case maps:get(start_after_created, Opts, true) of @@ -207,12 +207,12 @@ ets_lookup(ResId) -> %% @doc Get the metrics for the specified resource get_metrics(ResId) -> - emqx_metrics_worker:get_metrics(resource_metrics, ResId). + emqx_metrics_worker:get_metrics(?RES_METRICS, ResId). %% @doc Reset the metrics for the specified resource -spec reset_metrics(resource_id()) -> ok. reset_metrics(ResId) -> - emqx_metrics_worker:reset_metrics(resource_metrics, ResId). + emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId). %% @doc Returns the data for all resources -spec list_all() -> [resource_data()] | []. @@ -298,8 +298,7 @@ handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> Result = stop_resource(Data), - UpdatedData = Data#data{status = disconnected}, - {next_state, stopped, UpdatedData, [{reply, From, Result}]}; + {next_state, stopped, Data, [{reply, From, Result}]}; % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); @@ -315,9 +314,10 @@ handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> + UpdatedData = Data#data{status = connected}, insert_cache(Data#data.id, Data#data.group, Data), Actions = [{state_timeout, 0, health_check}], - {keep_state_and_data, Actions}; + {keep_state, UpdatedData, Actions}; handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> @@ -326,22 +326,24 @@ handle_event(state_timeout, health_check, connecting, Data) -> %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> - insert_cache(Data#data.id, Data#data.group, Data), + UpdatedData = Data#data{status = connected}, + insert_cache(Data#data.id, Data#data.group, UpdatedData), _ = emqx_alarm:deactivate(Data#data.id), Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], - {next_state, connected, Data, Actions}; + {next_state, connected, UpdatedData, Actions}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); %% State: DISCONNECTED handle_event(enter, _OldState, disconnected, Data) -> - insert_cache(Data#data.id, Data#data.group, Data), - handle_disconnected_state_enter(Data); + UpdatedData = Data#data{status = disconnected}, + insert_cache(Data#data.id, Data#data.group, UpdatedData), + handle_disconnected_state_enter(UpdatedData); handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped handle_event(enter, _OldState, stopped, Data) -> - UpdatedData = Data#data{status = disconnected}, + UpdatedData = Data#data{status = stopped}, insert_cache(Data#data.id, Data#data.group, UpdatedData), {next_state, stopped, UpdatedData}; % Ignore all other events @@ -415,7 +417,7 @@ handle_disconnected_state_enter(Data) -> handle_remove_event(From, ClearMetrics, Data) -> stop_resource(Data), case ClearMetrics of - true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id); + true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, {stop_and_reply, normal, [{reply, From, ok}]}. @@ -433,7 +435,7 @@ start_resource(Data, From) -> _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Reason}, + UpdatedData = Data#data{error = Reason}, Actions = maybe_reply([], From, Err), {next_state, disconnected, UpdatedData, Actions} end. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 1120723c3..84458f0d5 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_resource_sup). +-include("emqx_resource.hrl"). + -behaviour(supervisor). -export([start_link/0]). @@ -29,7 +31,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, - Metrics = emqx_metrics_worker:child_spec(resource_metrics), + Metrics = emqx_metrics_worker:child_spec(?RES_METRICS), ResourceManager = #{ diff --git a/apps/emqx_resource/src/emqx_resource_utils.erl b/apps/emqx_resource/src/emqx_resource_utils.erl new file mode 100644 index 000000000..715691d2a --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_utils.erl @@ -0,0 +1,17 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_utils). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl new file mode 100644 index 000000000..6c3b05830 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% An FIFO queue using ETS-ReplayQ as backend. + +-module(emqx_resource_worker). + +-include("emqx_resource.hrl"). +-include("emqx_resource_utils.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-behaviour(gen_statem). + +-export([ + start_link/2, + query/2, + query_async/3, + query_mfa/3 +]). + +-export([ + callback_mode/0, + init/1 +]). + +-export([do/3]). + +%% count +-define(DEFAULT_BATCH_SIZE, 100). +%% milliseconds +-define(DEFAULT_BATCH_TIME, 10). + +-define(QUERY(FROM, REQUEST), {FROM, REQUEST}). +-define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}). +-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). + +-type id() :: binary(). +-type request() :: term(). +-type result() :: term(). +-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()}. +-type from() :: pid() | reply_fun(). + +-callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> + {{from(), result()}, NewCbState :: term()}. + +callback_mode() -> [state_functions]. + +start_link(Id, Opts) -> + gen_statem:start_link({local, name(Id)}, ?MODULE, {Id, Opts}, []). + +-spec query(id(), request()) -> ok. +query(Id, Request) -> + gen_statem:call(name(Id), {query, Request}). + +-spec query_async(id(), request(), reply_fun()) -> ok. +query_async(Id, Request, ReplyFun) -> + gen_statem:cast(name(Id), {query, Request, ReplyFun}). + +-spec name(id()) -> atom(). +name(Id) -> + Mod = atom_to_binary(?MODULE, utf8), + <>. + +disk_cache_dir(Id) -> + filename:join([emqx:data_dir(), Id, cache]). + +init({Id, Opts}) -> + BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), + Queue = + case maps:get(cache_enabled, Opts, true) of + true -> replayq:open(#{dir => disk_cache_dir(Id), seg_bytes => 10000000}); + false -> undefined + end, + St = #{ + id => Id, + batch_enabled => maps:get(batch_enabled, Opts, true), + batch_size => BatchSize, + batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + cache_queue => Queue, + acc => [], + acc_left => BatchSize, + tref => undefined + }, + {ok, do, St}. + +do(cast, {query, Request, ReplyFun}, #{batch_enabled := true} = State) -> + do_acc(ReplyFun, Request, State); +do(cast, {query, Request, ReplyFun}, #{batch_enabled := false} = State) -> + do_query(ReplyFun, Request, State); +do({call, From}, {query, Request}, #{batch_enabled := true} = State) -> + do_acc(From, Request, State); +do({call, From}, {query, Request}, #{batch_enabled := false} = State) -> + do_query(From, Request, State); +do(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> + {keep_state, flush(St#{tref := undefined})}; +do(info, {flush, _Ref}, _St) -> + keep_state_and_data; +do(info, Info, _St) -> + ?SLOG(error, #{msg => unexpected_msg, info => Info}), + keep_state_and_data. + +do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) -> + Acc1 = [?QUERY(From, Request) | Acc], + St = St0#{acc := Acc1, acc_left := Left - 1}, + case Left =< 1 of + true -> {keep_state, flush(St)}; + false -> {keep_state, ensure_flush_timer(St)} + end. + +do_query(From, Request, #{id := Id, cache_queue := Q0} = St0) -> + Result = call_query(Id, Request), + Q1 = reply_caller(Id, Q0, ?REPLY(From, Request, Result)), + {keep_state, St0#{cache_queue := Q1}}. + +flush(#{acc := []} = St) -> + St; +flush( + #{ + id := Id, + acc := Batch, + batch_size := Size, + cache_queue := Q0 + } = St +) -> + BatchResults = call_batch_query(Id, Batch), + Q1 = batch_reply_caller(Id, Q0, BatchResults), + cancel_flush_timer( + St#{ + acc_left := Size, + acc := [], + cache_queue := Q1 + } + ). + +maybe_append_cache(undefined, _Request) -> undefined; +maybe_append_cache(Q, Request) -> replayq:append(Q, Request). + +batch_reply_caller(Id, Q, BatchResults) -> + lists:foldl( + fun(Reply, Q1) -> + reply_caller(Id, Q1, Reply) + end, + Q, + BatchResults + ). + +reply_caller(Id, Q, ?REPLY({ReplyFun, Args}, Request, Result)) when is_function(ReplyFun) -> + ?SAFE_CALL(ReplyFun(Result, Args)), + handle_query_result(Id, Q, Request, Result); +reply_caller(Id, Q, ?REPLY(From, Request, Result)) -> + gen_statem:reply(From, Result), + handle_query_result(Id, Q, Request, Result). + +handle_query_result(Id, Q, _Request, ok) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, success), + Q; +handle_query_result(Id, Q, _Request, {ok, _}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, success), + Q; +handle_query_result(Id, Q, _Request, {error, _}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + Q; +handle_query_result(Id, Q, Request, {error, {resource_error, #{reason := not_connected}}}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error), + maybe_append_cache(Q, Request); +handle_query_result(Id, Q, _Request, {error, {resource_error, #{}}}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error), + Q; +handle_query_result(Id, Q, Request, {error, {exception, _}}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, exception), + maybe_append_cache(Q, Request). + +call_query(Id, Request) -> + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + case emqx_resource_manager:ets_lookup(Id) of + {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> + try Mod:on_query(Id, Request, ResourceState) of + Result -> Result + catch + Err:Reason:ST -> + ModB = atom_to_binary(Mod, utf8), + Msg = <<"call failed, func: ", ModB/binary, ":on_query/3">>, + exception_error(Reason, Msg, {Err, Reason, ST}) + end; + {ok, _Group, #{status := stopped}} -> + resource_error(stopped, <<"resource stopped or disabled">>); + {ok, _Group, _Data} -> + resource_error(not_connected, <<"resource not connected">>); + {error, not_found} -> + resource_error(not_found, <<"resource not found">>) + end. + +call_batch_query(Id, Batch) -> + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + case emqx_resource_manager:ets_lookup(Id) of + {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> + try Mod:on_batch_query(Id, Batch, ResourceState) of + BatchResults -> BatchResults + catch + Err:Reason:ST -> + ModB = atom_to_binary(Mod, utf8), + Msg = <<"call failed, func: ", ModB/binary, ":on_batch_query/3">>, + ?EXPAND(exception_error(Reason, Msg, {Err, Reason, ST}), Batch) + end; + {ok, _Group, _Data} -> + ?EXPAND(resource_error(not_connected, <<"resource not connected">>), Batch); + {error, not_found} -> + ?EXPAND(resource_error(not_found, <<"resource not found">>), Batch) + end. + +resource_error(Reason, Msg) -> + {error, {resource_error, #{reason => Reason, msg => Msg}}}. +exception_error(Reason, Msg, Details) -> + {error, {exception, #{reason => Reason, msg => Msg, details => Details}}}. + +%% ========================================== +ensure_flush_timer(St = #{tref := undefined, batch_time := T}) -> + Ref = make_ref(), + TRef = erlang:send_after(T, self(), {flush, Ref}), + St#{tref => {TRef, Ref}}; +ensure_flush_timer(St) -> + St. + +cancel_flush_timer(St = #{tref := undefined}) -> + St; +cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> + _ = erlang:cancel_timer(TRef), + St#{tref => undefined}. + +query_mfa(InsertMode, Request, SyncTimeout) -> + {?MODULE, query_fun(InsertMode), query_args(InsertMode, Request, SyncTimeout)}. + +query_fun(<<"sync">>) -> query; +query_fun(<<"async">>) -> query_async. + +query_args(<<"sync">>, Request, SyncTimeout) -> + [Request, SyncTimeout]; +query_args(<<"async">>, Request, _) -> + [Request]. diff --git a/apps/emqx_resource/src/replayq.erl b/apps/emqx_resource/src/replayq.erl new file mode 100644 index 000000000..437702a5e --- /dev/null +++ b/apps/emqx_resource/src/replayq.erl @@ -0,0 +1,779 @@ +-module(replayq). + +-export([open/1, close/1]). +-export([append/2, pop/2, ack/2, ack_sync/2, peek/1, overflow/1]). +-export([count/1, bytes/1, is_empty/1, is_mem_only/1]). +%% exported for troubleshooting +-export([do_read_items/2]). + +%% internal exports for beam reload +-export([committer_loop/2, default_sizer/1, default_marshaller/1]). + +-export_type([config/0, q/0, ack_ref/0, sizer/0, marshaller/0]). + +-define(NOTHING_TO_ACK, nothing_to_ack). +-define(PENDING_ACKS(Ref), {replayq_pending_acks, Ref}). + +-type segno() :: pos_integer(). +-type item() :: term(). +-type count() :: non_neg_integer(). +-type id() :: count(). +-type bytes() :: non_neg_integer(). +-type filename() :: file:filename_all(). +-type dir() :: filename(). +-type ack_ref() :: ?NOTHING_TO_ACK | {segno(), ID :: pos_integer()}. +-type sizer() :: fun((item()) -> bytes()). +-type marshaller() :: fun((item()) -> binary()). + +-type config() :: #{ + dir => dir(), + seg_bytes => bytes(), + mem_only => boolean(), + max_total_bytes => bytes(), + offload => boolean(), + sizer => sizer(), + marshaller => marshaller() +}. +%% writer cursor +-define(NO_FD, no_fd). +-type w_cur() :: #{ + segno := segno(), + bytes := bytes(), + count := count(), + fd := ?NO_FD | file:fd() +}. + +-type stats() :: #{ + bytes := bytes(), + count := count() +}. + +-opaque q() :: #{ + config := mem_only | config(), + stats := stats(), + in_mem := queue:queue(in_mem_item()), + w_cur => w_cur(), + committer => pid(), + head_segno => segno(), + sizer := sizer(), + marshaller => marshaller(), + max_total_bytes := bytes() +}. + +-define(LAYOUT_VSN_0, 0). +-define(LAYOUT_VSN_1, 1). +-define(MAGIC, 841265288). +-define(SUFFIX, "replaylog"). +-define(DEFAULT_POP_BYTES_LIMIT, 2000000). +-define(DEFAULT_POP_COUNT_LIMIT, 1000). +-define(DEFAULT_REPLAYQ_LIMIT, 2000000000). +-define(COMMIT(SEGNO, ID, From), {commit, SEGNO, ID, From}). +-define(NO_COMMIT_HIST, no_commit_hist). +-define(FIRST_SEGNO, 1). +-define(NEXT_SEGNO(N), (N + 1)). +-define(STOP, stop). +-define(MEM_ONLY_ITEM(Bytes, Item), {Bytes, Item}). +-define(DISK_CP_ITEM(Id, Bytes, Item), {Id, Bytes, Item}). + +-type in_mem_item() :: + ?MEM_ONLY_ITEM(bytes(), item()) + | ?DISK_CP_ITEM(id(), bytes(), item()). + +-spec open(config()) -> q(). +open(#{mem_only := true} = C) -> + #{ + stats => #{bytes => 0, count => 0}, + in_mem => queue:new(), + sizer => get_sizer(C), + config => mem_only, + max_total_bytes => maps:get(max_total_bytes, C, ?DEFAULT_REPLAYQ_LIMIT) + }; +open(#{dir := Dir, seg_bytes := _} = Config) -> + ok = filelib:ensure_dir(filename:join(Dir, "foo")), + Sizer = get_sizer(Config), + Marshaller = get_marshaller(Config), + IsOffload = is_offload_mode(Config), + Q = + case delete_consumed_and_list_rest(Dir) of + [] -> + %% no old segments, start over from zero + #{ + stats => #{bytes => 0, count => 0}, + w_cur => init_writer(Dir, empty, IsOffload), + committer => spawn_committer(?FIRST_SEGNO, Dir), + head_segno => ?FIRST_SEGNO, + in_mem => queue:new() + }; + Segs -> + LastSegno = lists:last(Segs), + CommitHist = get_commit_hist(Dir), + Reader = fun(Seg, Ch) -> read_items(Dir, Seg, Ch, Sizer, Marshaller) end, + HeadItems = Reader(hd(Segs), CommitHist), + #{ + stats => collect_stats(HeadItems, tl(Segs), Reader), + w_cur => init_writer(Dir, LastSegno, IsOffload), + committer => spawn_committer(hd(Segs), Dir), + head_segno => hd(Segs), + in_mem => queue:from_list(HeadItems) + } + end, + Q#{ + sizer => Sizer, + marshaller => Marshaller, + config => maps:without([sizer, marshaller], Config), + max_total_bytes => maps:get(max_total_bytes, Config, ?DEFAULT_REPLAYQ_LIMIT) + }. + +-spec close(q() | w_cur()) -> ok | {error, any()}. +close(#{config := mem_only}) -> + ok; +close(#{w_cur := W_Cur, committer := Pid} = Q) -> + MRef = erlang:monitor(process, Pid), + Pid ! ?STOP, + unlink(Pid), + receive + {'DOWN', MRef, process, Pid, _Reason} -> + ok + end, + ok = maybe_dump_back_to_disk(Q), + do_close(W_Cur). + +do_close(#{fd := ?NO_FD}) -> ok; +do_close(#{fd := Fd}) -> file:close(Fd). + +%% In case of offload mode, dump the unacked (and un-popped) on disk +%% before close. this serves as a best-effort data loss protection +maybe_dump_back_to_disk(#{config := Config} = Q) -> + case is_offload_mode(Config) of + true -> dump_back_to_disk(Q); + false -> ok + end. + +dump_back_to_disk(#{ + config := #{dir := Dir}, + head_segno := ReaderSegno, + in_mem := InMem, + marshaller := Marshaller +}) -> + IoData0 = get_unacked(process_info(self(), dictionary), ReaderSegno, Marshaller), + Items1 = queue:to_list(InMem), + IoData1 = lists:map(fun(?DISK_CP_ITEM(_, _, I)) -> make_iodata(I, Marshaller) end, Items1), + %% ensure old segment file is deleted + ok = ensure_deleted(filename(Dir, ReaderSegno)), + %% rewrite the segment with what's currently in memory + IoData = [IoData0, IoData1], + case iolist_size(IoData) > 0 of + true -> + #{fd := Fd} = open_segment(Dir, ReaderSegno), + ok = file:write(Fd, [IoData0, IoData1]), + ok = file:close(Fd); + false -> + %% nothing to write + ok + end. + +get_unacked({dictionary, Dict}, ReaderSegno, Marshaller) -> + F = fun + ({?PENDING_ACKS(AckRef), Items}) -> + erase(?PENDING_ACKS(AckRef)), + {Segno, Id} = AckRef, + Segno =:= ReaderSegno andalso + {true, {Id, Items}}; + (_) -> + false + end, + Pendings0 = lists:filtermap(F, Dict), + Pendings = lists:keysort(1, Pendings0), + do_get_unacked(Pendings, Marshaller). + +do_get_unacked([], _Marshaller) -> + []; +do_get_unacked([{_, Items} | Rest], Marshaller) -> + [ + [make_iodata(I, Marshaller) || I <- Items] + | do_get_unacked(Rest, Marshaller) + ]. + +-spec append(q(), [item()]) -> q(). +append(Q, []) -> + Q; +append( + #{ + config := mem_only, + in_mem := InMem, + stats := #{bytes := Bytes0, count := Count0}, + sizer := Sizer + } = Q, + Items0 +) -> + {CountDiff, BytesDiff, Items} = transform(false, Items0, Sizer), + + Stats = #{count => Count0 + CountDiff, bytes => Bytes0 + BytesDiff}, + Q#{ + stats := Stats, + in_mem := append_in_mem(Items, InMem) + }; +append( + #{ + config := #{seg_bytes := BytesLimit, dir := Dir} = Config, + stats := #{bytes := Bytes0, count := Count0}, + w_cur := #{count := CountInSeg, segno := WriterSegno} = W_Cur0, + head_segno := ReaderSegno, + sizer := Sizer, + marshaller := Marshaller, + in_mem := HeadItems0 + } = Q, + Items0 +) -> + IoData = lists:map(fun(I) -> make_iodata(I, Marshaller) end, Items0), + {CountDiff, BytesDiff, Items} = transform(CountInSeg + 1, Items0, Sizer), + TotalBytes = Bytes0 + BytesDiff, + Stats = #{count => Count0 + CountDiff, bytes => TotalBytes}, + IsOffload = is_offload_mode(Config), + W_Cur1 = do_append(W_Cur0, CountDiff, BytesDiff, IoData), + W_Cur = + case is_segment_full(W_Cur1, TotalBytes, BytesLimit, ReaderSegno, IsOffload) of + true -> + ok = do_close(W_Cur1), + %% get ready for the next append + open_segment(Dir, ?NEXT_SEGNO(WriterSegno)); + false -> + W_Cur1 + end, + HeadItems = + case ReaderSegno =:= WriterSegno of + true -> append_in_mem(Items, HeadItems0); + false -> HeadItems0 + end, + Q#{ + stats := Stats, + w_cur := W_Cur, + in_mem := HeadItems + }. + +%% @doc pop out at least one item from the queue. +%% volume limited by `bytes_limit' and `count_limit'. +-spec pop(q(), #{bytes_limit => bytes(), count_limit => count()}) -> + {q(), ack_ref(), [item()]}. +pop(Q, Opts) -> + Bytes = maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT), + Count = maps:get(count_limit, Opts, ?DEFAULT_POP_COUNT_LIMIT), + true = (Count > 0), + pop(Q, Bytes, Count, ?NOTHING_TO_ACK, []). + +%% @doc peek the queue front item. +-spec peek(q()) -> empty | item(). +peek(#{in_mem := HeadItems}) -> + case queue:peek(HeadItems) of + empty -> empty; + {value, ?MEM_ONLY_ITEM(_, Item)} -> Item; + {value, ?DISK_CP_ITEM(_, _, Item)} -> Item + end. + +%% @doc Asynch-ly write the consumed item Segment number + ID to a file. +-spec ack(q(), ack_ref()) -> ok. +ack(_, ?NOTHING_TO_ACK) -> + ok; +ack(#{committer := Pid}, {Segno, Id} = AckRef) -> + _ = erlang:erase(?PENDING_ACKS(AckRef)), + Pid ! ?COMMIT(Segno, Id, false), + ok. + +%% @hidden Synced ack, for deterministic tests only +-spec ack_sync(q(), ack_ref()) -> ok. +ack_sync(_, ?NOTHING_TO_ACK) -> + ok; +ack_sync(#{committer := Pid}, {Segno, Id} = AckRef) -> + _ = erlang:erase(?PENDING_ACKS(AckRef)), + Ref = make_ref(), + Pid ! ?COMMIT(Segno, Id, {self(), Ref}), + receive + {Ref, ok} -> ok + end. + +-spec count(q()) -> count(). +count(#{stats := #{count := Count}}) -> Count. + +-spec bytes(q()) -> bytes(). +bytes(#{stats := #{bytes := Bytes}}) -> Bytes. + +is_empty(#{config := mem_only, in_mem := All}) -> + queue:is_empty(All); +is_empty( + #{ + w_cur := #{segno := WriterSegno}, + head_segno := ReaderSegno, + in_mem := HeadItems + } = Q +) -> + Result = ((WriterSegno =:= ReaderSegno) andalso queue:is_empty(HeadItems)), + %% assert + Result = (count(Q) =:= 0). + +%% @doc Returns number of bytes the size of the queue has exceeded +%% total bytes limit. Result is negative when it is not overflow. +-spec overflow(q()) -> integer(). +overflow(#{ + max_total_bytes := MaxTotalBytes, + stats := #{bytes := Bytes} +}) -> + Bytes - MaxTotalBytes. + +-spec is_mem_only(q()) -> boolean(). +is_mem_only(#{config := mem_only}) -> + true; +is_mem_only(_) -> + false. + +%% internals ========================================================= + +transform(Id, Items, Sizer) -> + transform(Id, Items, Sizer, 0, 0, []). + +transform(_Id, [], _Sizer, Count, Bytes, Acc) -> + {Count, Bytes, lists:reverse(Acc)}; +transform(Id, [Item0 | Rest], Sizer, Count, Bytes, Acc) -> + Size = Sizer(Item0), + {NextId, Item} = + case Id of + false -> {false, ?MEM_ONLY_ITEM(Size, Item0)}; + N -> {N + 1, ?DISK_CP_ITEM(Id, Size, Item0)} + end, + transform(NextId, Rest, Sizer, Count + 1, Bytes + Size, [Item | Acc]). + +append_in_mem([], Q) -> Q; +append_in_mem([Item | Rest], Q) -> append_in_mem(Rest, queue:in(Item, Q)). + +pop(Q, _Bytes, 0, AckRef, Acc) -> + Result = lists:reverse(Acc), + ok = maybe_save_pending_acks(AckRef, Q, Result), + {Q, AckRef, Result}; +pop(#{config := Cfg} = Q, Bytes, Count, AckRef, Acc) -> + case is_empty(Q) of + true -> + {Q, AckRef, lists:reverse(Acc)}; + false when Cfg =:= mem_only -> + pop_mem(Q, Bytes, Count, Acc); + false -> + pop2(Q, Bytes, Count, AckRef, Acc) + end. + +pop_mem( + #{ + in_mem := InMem, + stats := #{count := TotalCount, bytes := TotalBytes} = Stats + } = Q, + Bytes, + Count, + Acc +) -> + case queue:out(InMem) of + {{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] -> + {Q, ?NOTHING_TO_ACK, lists:reverse(Acc)}; + {{value, ?MEM_ONLY_ITEM(Sz, Item)}, Rest} -> + NewQ = Q#{ + in_mem := Rest, + stats := Stats#{ + count := TotalCount - 1, + bytes := TotalBytes - Sz + } + }, + pop(NewQ, Bytes - Sz, Count - 1, ?NOTHING_TO_ACK, [Item | Acc]) + end. + +pop2( + #{ + head_segno := ReaderSegno, + in_mem := HeadItems, + stats := #{count := TotalCount, bytes := TotalBytes} = Stats, + w_cur := #{segno := WriterSegno} + } = Q, + Bytes, + Count, + AckRef, + Acc +) -> + case queue:out(HeadItems) of + {{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] -> + %% taking the head item would cause exceeding size limit + {Q, AckRef, lists:reverse(Acc)}; + {{value, ?DISK_CP_ITEM(Id, Sz, Item)}, Rest} -> + Q1 = Q#{ + in_mem := Rest, + stats := Stats#{ + count := TotalCount - 1, + bytes := TotalBytes - Sz + } + }, + %% read the next segment in case current is drained + NewQ = + case queue:is_empty(Rest) andalso ReaderSegno < WriterSegno of + true -> read_next_seg(Q1); + false -> Q1 + end, + NewAckRef = {ReaderSegno, Id}, + pop(NewQ, Bytes - Sz, Count - 1, NewAckRef, [Item | Acc]) + end. + +%% due to backward compatibility reasons for the ack api +%% we ca nnot track pending acks in q() -- reason to use process dictionary +maybe_save_pending_acks(?NOTHING_TO_ACK, _, _) -> + ok; +maybe_save_pending_acks(AckRef, #{config := Config}, Items) -> + case is_offload_mode(Config) of + true -> + _ = erlang:put(?PENDING_ACKS(AckRef), Items), + ok; + false -> + ok + end. + +read_next_seg( + #{ + config := #{dir := Dir} = Config, + head_segno := ReaderSegno, + w_cur := #{segno := WriterSegno, fd := Fd} = WCur0, + sizer := Sizer, + marshaller := Marshaller + } = Q +) -> + NextSegno = ReaderSegno + 1, + %% reader has caught up to latest segment + case NextSegno =:= WriterSegno of + true -> + %% force flush to disk so the next read can get all bytes + ok = file:sync(Fd); + false -> + ok + end, + IsOffload = is_offload_mode(Config), + WCur = + case IsOffload andalso NextSegno =:= WriterSegno of + true -> + %% reader has caught up to latest segment in offload mode, + %% close the writer's fd. Continue in mem-only mode for the head segment + ok = do_close(WCur0), + WCur0#{fd := ?NO_FD}; + false -> + WCur0 + end, + NextSegItems = read_items(Dir, NextSegno, ?NO_COMMIT_HIST, Sizer, Marshaller), + Q#{ + head_segno := NextSegno, + in_mem := queue:from_list(NextSegItems), + w_cur := WCur + }. + +delete_consumed_and_list_rest(Dir0) -> + Dir = unicode:characters_to_list(Dir0), + Segnos0 = lists:sort([parse_segno(N) || N <- filelib:wildcard("*." ?SUFFIX, Dir)]), + {SegnosToDelete, Segnos} = find_segnos_to_delete(Dir, Segnos0), + ok = lists:foreach(fun(Segno) -> ensure_deleted(filename(Dir, Segno)) end, SegnosToDelete), + case Segnos of + [] -> + %% delete commit file in case there is no segments left + %% segment number will start from 0 again. + ensure_deleted(commit_filename(Dir)), + []; + X -> + X + end. + +find_segnos_to_delete(Dir, Segnos) -> + CommitHist = get_commit_hist(Dir), + do_find_segnos_to_delete(Dir, Segnos, CommitHist). + +do_find_segnos_to_delete(_Dir, Segnos, ?NO_COMMIT_HIST) -> + {[], Segnos}; +do_find_segnos_to_delete(Dir, Segnos0, {CommittedSegno, CommittedId}) -> + {SegnosToDelete, Segnos} = lists:partition(fun(N) -> N < CommittedSegno end, Segnos0), + case + Segnos =/= [] andalso + hd(Segnos) =:= CommittedSegno andalso + is_all_consumed(Dir, CommittedSegno, CommittedId) + of + true -> + %% assert + CommittedSegno = hd(Segnos), + %% all items in the oldest segment have been consumed, + %% no need to keep this segment + {[CommittedSegno | SegnosToDelete], tl(Segnos)}; + _ -> + {SegnosToDelete, Segnos} + end. + +%% ALL items are consumed if the committed item ID is no-less than the number +%% of items in this segment +is_all_consumed(Dir, CommittedSegno, CommittedId) -> + CommittedId >= erlang:length(do_read_items(Dir, CommittedSegno)). + +ensure_deleted(Filename) -> + case file:delete(Filename) of + ok -> ok; + {error, enoent} -> ok + end. + +%% The committer writes consumer's acked segmeng number + item ID +%% to a file. The file is only read at start/restart. +spawn_committer(ReaderSegno, Dir) -> + Name = iolist_to_binary(filename:join([Dir, committer])), + %% register a name to avoid having two committers spawned for the same dir + RegName = binary_to_atom(Name, utf8), + Pid = erlang:spawn_link(fun() -> committer_loop(ReaderSegno, Dir) end), + true = erlang:register(RegName, Pid), + Pid. + +committer_loop(ReaderSegno, Dir) -> + receive + ?COMMIT(Segno0, Id0, false) -> + {Segno, Id} = collect_async_commits(Segno0, Id0), + ok = handle_commit(ReaderSegno, Dir, Segno, Id, false), + ?MODULE:committer_loop(Segno, Dir); + ?COMMIT(Segno, Id, From) -> + ok = handle_commit(ReaderSegno, Dir, Segno, Id, From), + ?MODULE:committer_loop(Segno, Dir); + ?STOP -> + ok; + Msg -> + exit({replayq_committer_unkown_msg, Msg}) + after 200 -> + ?MODULE:committer_loop(ReaderSegno, Dir) + end. + +handle_commit(ReaderSegno, Dir, Segno, Id, From) -> + IoData = io_lib:format("~p.\n", [#{segno => Segno, id => Id}]), + ok = do_commit(Dir, IoData), + case Segno > ReaderSegno of + true -> + SegnosToDelete = lists:seq(ReaderSegno, Segno - 1), + lists:foreach(fun(N) -> ok = ensure_deleted(filename(Dir, N)) end, SegnosToDelete); + false -> + ok + end, + ok = reply_ack_ok(From). + +%% Collect async acks which are already sent in the mailbox, +%% and keep only the last one for the current segment. +collect_async_commits(Segno, Id) -> + receive + ?COMMIT(Segno, AnotherId, false) -> + collect_async_commits(Segno, AnotherId) + after 0 -> + {Segno, Id} + end. + +reply_ack_ok({Pid, Ref}) -> + Pid ! {Ref, ok}, + ok; +reply_ack_ok(_) -> + ok. + +get_commit_hist(Dir) -> + CommitFile = commit_filename(Dir), + case filelib:is_regular(CommitFile) of + true -> + {ok, [#{segno := Segno, id := Id}]} = file:consult(CommitFile), + {Segno, Id}; + false -> + ?NO_COMMIT_HIST + end. + +do_commit(Dir, IoData) -> + TmpName = commit_filename(Dir, "COMMIT.tmp"), + Name = commit_filename(Dir), + ok = file:write_file(TmpName, IoData), + ok = file:rename(TmpName, Name). + +commit_filename(Dir) -> + commit_filename(Dir, "COMMIT"). + +commit_filename(Dir, Name) -> + filename:join([Dir, Name]). + +do_append( + #{fd := ?NO_FD, bytes := Bytes0, count := Count0} = Cur, + Count, + Bytes, + _IoData +) -> + %% offload mode, fd is not initialized yet + Cur#{ + bytes => Bytes0 + Bytes, + count => Count0 + Count + }; +do_append( + #{fd := Fd, bytes := Bytes0, count := Count0} = Cur, + Count, + Bytes, + IoData +) -> + ok = file:write(Fd, IoData), + Cur#{ + bytes => Bytes0 + Bytes, + count => Count0 + Count + }. + +read_items(Dir, Segno, CommitHist, Sizer, Marshaller) -> + Items0 = do_read_items(Dir, Segno), + Items = + case CommitHist of + ?NO_COMMIT_HIST -> + %% no commit hist, return all + Items0; + {CommitedSegno, _} when CommitedSegno < Segno -> + %% committed at an older segment + Items0; + {Segno, CommittedId} -> + %% committed at current segment keep only the tail + {_, R} = lists:splitwith(fun({I, _}) -> I =< CommittedId end, Items0), + R + end, + lists:map( + fun({Id, Bin}) -> + Item = Marshaller(Bin), + Size = Sizer(Item), + ?DISK_CP_ITEM(Id, Size, Item) + end, + Items + ). + +do_read_items(Dir, Segno) -> + Filename = filename(Dir, Segno), + {ok, Bin} = file:read_file(Filename), + case parse_items(Bin, 1, []) of + {Items, <<>>} -> + Items; + {Items, Corrupted} -> + error_logger:error_msg( + "corrupted replayq log: ~s, skipped ~p bytes", + [Filename, size(Corrupted)] + ), + Items + end. + +parse_items(<<>>, _Id, Acc) -> + {lists:reverse(Acc), <<>>}; +parse_items( + <> = All, + Id, + Acc +) -> + case CRC =:= erlang:crc32(Item) of + true -> parse_items(Rest, Id + 1, [{Id, Item} | Acc]); + false -> {lists:reverse(Acc), All} + end; +parse_items( + <> = All, + Id, + Acc +) -> + case CRC =:= erlang:crc32(Item) andalso Item =/= <<>> of + true -> parse_items(Rest, Id + 1, [{Id, Item} | Acc]); + false -> {lists:reverse(Acc), All} + end; +parse_items(Corrupted, _Id, Acc) -> + {lists:reverse(Acc), Corrupted}. + +make_iodata(Item0, Marshaller) -> + Item = Marshaller(Item0), + Size = size(Item), + CRC = erlang:crc32(Item), + [ + <>, + Item + ]. + +collect_stats(HeadItems, SegsOnDisk, Reader) -> + ItemF = fun(?DISK_CP_ITEM(_Id, Sz, _Item), {B, C}) -> + {B + Sz, C + 1} + end, + Acc0 = lists:foldl(ItemF, {0, 0}, HeadItems), + {Bytes, Count} = + lists:foldl( + fun(Segno, Acc) -> + Items = Reader(Segno, ?NO_COMMIT_HIST), + lists:foldl(ItemF, Acc, Items) + end, + Acc0, + SegsOnDisk + ), + #{bytes => Bytes, count => Count}. + +parse_segno(Filename) -> + [Segno, ?SUFFIX] = string:tokens(Filename, "."), + list_to_integer(Segno). + +filename(Dir, Segno) -> + Name = lists:flatten(io_lib:format("~10.10.0w." ?SUFFIX, [Segno])), + filename:join(Dir, Name). + +%% open the current segment for write if it is empty +%% otherwise rollout to the next segment +-spec init_writer(dir(), empty | segno(), boolean()) -> w_cur(). +init_writer(_Dir, empty, true) -> + %% clean start for offload mode + #{fd => ?NO_FD, segno => ?FIRST_SEGNO, bytes => 0, count => 0}; +init_writer(Dir, empty, false) -> + open_segment(Dir, ?FIRST_SEGNO); +init_writer(Dir, Segno, _IsOffload) when is_number(Segno) -> + Filename = filename(Dir, Segno), + case filelib:file_size(Filename) of + 0 -> open_segment(Dir, Segno); + _ -> open_segment(Dir, ?NEXT_SEGNO(Segno)) + end. + +-spec open_segment(dir(), segno()) -> w_cur(). +open_segment(Dir, Segno) -> + Filename = filename(Dir, Segno), + %% raw so there is no need to go through the single gen_server file_server + {ok, Fd} = file:open(Filename, [raw, read, write, binary, delayed_write]), + #{fd => Fd, segno => Segno, bytes => 0, count => 0}. + +get_sizer(C) -> + maps:get(sizer, C, fun ?MODULE:default_sizer/1). + +get_marshaller(C) -> + maps:get(marshaller, C, fun ?MODULE:default_marshaller/1). + +is_offload_mode(Config) when is_map(Config) -> + maps:get(offload, Config, false). + +default_sizer(I) when is_binary(I) -> erlang:size(I). + +default_marshaller(I) when is_binary(I) -> I. + +is_segment_full( + #{segno := WriterSegno, bytes := SegmentBytes}, + TotalBytes, + SegmentBytesLimit, + ReaderSegno, + true +) -> + %% in offload mode, when reader is lagging behind, we try + %% writer rolls to a new segment when file size limit is reached + %% when reader is reading off from the same segment as writer + %% i.e. the in memory queue, only start writing to segment file + %% when total bytes (in memory) is reached segment limit + %% + %% NOTE: we never shrink segment bytes, even when popping out + %% from the in-memory queue. + case ReaderSegno < WriterSegno of + true -> SegmentBytes >= SegmentBytesLimit; + false -> TotalBytes >= SegmentBytesLimit + end; +is_segment_full( + #{bytes := SegmentBytes}, + _TotalBytes, + SegmentBytesLimit, + _ReaderSegno, + false +) -> + %% here we check if segment size is greater than segment size limit + %% after append based on the assumption that the items are usually + %% very small in size comparing to segment size. + %% We can change implementation to split items list to avoid + %% segment overflow if really necessary + SegmentBytes >= SegmentBytesLimit.