diff --git a/Makefile b/Makefile index 3dce566d0..bedeb7250 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ + emqx_hooks emqx_batch CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_batch.erl b/src/emqx_batch.erl new file mode 100644 index 000000000..ffa9a7224 --- /dev/null +++ b/src/emqx_batch.erl @@ -0,0 +1,74 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_batch). + +-export([init/1, push/2, commit/1]). +-export([size/1, items/1]). + +-type(options() :: #{ + batch_size => non_neg_integer(), + linger_ms => pos_integer(), + commit_fun := function() + }). +-export_type([options/0]). + +-record(batch, { + batch_size :: non_neg_integer(), + batch_q :: list(any()), + linger_ms :: pos_integer(), + linger_timer :: reference() | undefined, + commit_fun :: function() + }). +-type(batch() :: #batch{}). +-export_type([batch/0]). + +-spec(init(options()) -> batch()). +init(Opts) when is_map(Opts) -> + #batch{batch_size = maps:get(batch_size, Opts, 1000), + batch_q = [], + linger_ms = maps:get(linger_ms, Opts, 1000), + commit_fun = maps:get(commit_fun, Opts)}. + +-spec(push(any(), batch()) -> batch()). +push(El, Batch = #batch{batch_q = Q, linger_ms = Ms, linger_timer = undefined}) when length(Q) == 0 -> + Batch#batch{batch_q = [El], linger_timer = erlang:send_after(Ms, self(), batch_linger_expired)}; + +%% no limit. +push(El, Batch = #batch{batch_size = 0, batch_q = Q}) -> + Batch#batch{batch_q = [El|Q]}; + +push(El, Batch = #batch{batch_size = MaxSize, batch_q = Q}) when length(Q) >= MaxSize -> + commit(Batch#batch{batch_q = [El|Q]}); + +push(El, Batch = #batch{batch_q = Q}) -> + Batch#batch{batch_q = [El|Q]}. + +-spec(commit(batch()) -> batch()). +commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) -> + _ = Commit(lists:reverse(Q)), + reset(Batch). + +reset(Batch = #batch{linger_timer = TRef}) -> + _ = emqx_misc:cancel_timer(TRef), + Batch#batch{batch_q = [], linger_timer = undefined}. + +-spec(size(batch()) -> non_neg_integer()). +size(#batch{batch_q = Q}) -> + length(Q). + +-spec(items(batch()) -> list(any())). +items(#batch{batch_q = Q}) -> + lists:reverse(Q). + diff --git a/test/emqx_batch_SUITE.erl b/test/emqx_batch_SUITE.erl new file mode 100644 index 000000000..c4c69080b --- /dev/null +++ b/test/emqx_batch_SUITE.erl @@ -0,0 +1,50 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_batch_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [batch_full_commit, batch_linger_commit]. + +batch_full_commit(_) -> + B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 2000, commit_fun => fun(_) -> ok end}), + B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]), + ?assertEqual(3, emqx_batch:size(B3)), + ?assertEqual([a, b, c], emqx_batch:items(B3)), + %% Trigger commit fun. + B4 = emqx_batch:push(a, B3), + ?assertEqual(0, emqx_batch:size(B4)), + ?assertEqual([], emqx_batch:items(B4)). + +batch_linger_commit(_) -> + CommitFun = fun(Q) -> ?assertEqual(3, length(Q)) end, + B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 500, commit_fun => CommitFun}), + B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]), + ?assertEqual(3, emqx_batch:size(B3)), + ?assertEqual([a, b, c], emqx_batch:items(B3)), + receive + batch_linger_expired -> + B4 = emqx_batch:commit(B3), + ?assertEqual(0, emqx_batch:size(B4)), + ?assertEqual([], emqx_batch:items(B4)) + after + 1000 -> + error(linger_timer_not_triggered) + end. +