Add an elegant batch module
This commit is contained in:
parent
309f3560f3
commit
21ed012a0c
3
Makefile
3
Makefile
|
@ -36,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
|||
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
|
||||
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
||||
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
|
||||
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks
|
||||
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
|
||||
emqx_hooks emqx_batch
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
-module(emqx_batch).
|
||||
|
||||
-export([init/1, push/2, commit/1]).
|
||||
-export([size/1, items/1]).
|
||||
|
||||
-type(options() :: #{
|
||||
batch_size => non_neg_integer(),
|
||||
linger_ms => pos_integer(),
|
||||
commit_fun := function()
|
||||
}).
|
||||
-export_type([options/0]).
|
||||
|
||||
-record(batch, {
|
||||
batch_size :: non_neg_integer(),
|
||||
batch_q :: list(any()),
|
||||
linger_ms :: pos_integer(),
|
||||
linger_timer :: reference() | undefined,
|
||||
commit_fun :: function()
|
||||
}).
|
||||
-type(batch() :: #batch{}).
|
||||
-export_type([batch/0]).
|
||||
|
||||
-spec(init(options()) -> batch()).
|
||||
init(Opts) when is_map(Opts) ->
|
||||
#batch{batch_size = maps:get(batch_size, Opts, 1000),
|
||||
batch_q = [],
|
||||
linger_ms = maps:get(linger_ms, Opts, 1000),
|
||||
commit_fun = maps:get(commit_fun, Opts)}.
|
||||
|
||||
-spec(push(any(), batch()) -> batch()).
|
||||
push(El, Batch = #batch{batch_q = Q, linger_ms = Ms, linger_timer = undefined}) when length(Q) == 0 ->
|
||||
Batch#batch{batch_q = [El], linger_timer = erlang:send_after(Ms, self(), batch_linger_expired)};
|
||||
|
||||
%% no limit.
|
||||
push(El, Batch = #batch{batch_size = 0, batch_q = Q}) ->
|
||||
Batch#batch{batch_q = [El|Q]};
|
||||
|
||||
push(El, Batch = #batch{batch_size = MaxSize, batch_q = Q}) when length(Q) >= MaxSize ->
|
||||
commit(Batch#batch{batch_q = [El|Q]});
|
||||
|
||||
push(El, Batch = #batch{batch_q = Q}) ->
|
||||
Batch#batch{batch_q = [El|Q]}.
|
||||
|
||||
-spec(commit(batch()) -> batch()).
|
||||
commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) ->
|
||||
_ = Commit(lists:reverse(Q)),
|
||||
reset(Batch).
|
||||
|
||||
reset(Batch = #batch{linger_timer = TRef}) ->
|
||||
_ = emqx_misc:cancel_timer(TRef),
|
||||
Batch#batch{batch_q = [], linger_timer = undefined}.
|
||||
|
||||
-spec(size(batch()) -> non_neg_integer()).
|
||||
size(#batch{batch_q = Q}) ->
|
||||
length(Q).
|
||||
|
||||
-spec(items(batch()) -> list(any())).
|
||||
items(#batch{batch_q = Q}) ->
|
||||
lists:reverse(Q).
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
-module(emqx_batch_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() ->
|
||||
[batch_full_commit, batch_linger_commit].
|
||||
|
||||
batch_full_commit(_) ->
|
||||
B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 2000, commit_fun => fun(_) -> ok end}),
|
||||
B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]),
|
||||
?assertEqual(3, emqx_batch:size(B3)),
|
||||
?assertEqual([a, b, c], emqx_batch:items(B3)),
|
||||
%% Trigger commit fun.
|
||||
B4 = emqx_batch:push(a, B3),
|
||||
?assertEqual(0, emqx_batch:size(B4)),
|
||||
?assertEqual([], emqx_batch:items(B4)).
|
||||
|
||||
batch_linger_commit(_) ->
|
||||
CommitFun = fun(Q) -> ?assertEqual(3, length(Q)) end,
|
||||
B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 500, commit_fun => CommitFun}),
|
||||
B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]),
|
||||
?assertEqual(3, emqx_batch:size(B3)),
|
||||
?assertEqual([a, b, c], emqx_batch:items(B3)),
|
||||
receive
|
||||
batch_linger_expired ->
|
||||
B4 = emqx_batch:commit(B3),
|
||||
?assertEqual(0, emqx_batch:size(B4)),
|
||||
?assertEqual([], emqx_batch:items(B4))
|
||||
after
|
||||
1000 ->
|
||||
error(linger_timer_not_triggered)
|
||||
end.
|
||||
|
Loading…
Reference in New Issue