diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index f827f65de..a5acfbb8a 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -60,6 +60,7 @@ safe_filename/1, diff_lists/3, merge_lists/3, + flattermap/2, tcp_keepalive_opts/4, format/1, format_mfal/1, @@ -999,6 +1000,36 @@ search(ExpectValue, KeyFunc, [Item | List]) -> false -> search(ExpectValue, KeyFunc, List) end. +%% @doc Maps over a term or a list of terms and flattens the result, giving back +%% again a term or a flat list of terms. It's similar to `lists:flatmap/2`, but +%% it works on a single term as well, both as input and `Fun` output (thus, the +%% wordplay on "flatter"). +%% The purpose of this function is to adapt to `Fun`s that return either a `[]` +%% or a term, and to avoid costs of list construction and flattening when dealing +%% with large lists. +-spec flattermap(Fun, FlatList) -> FlatList when + Fun :: fun((X) -> FlatList), + FlatList :: [X] | X. +flattermap(_Fun, []) -> + []; +flattermap(Fun, [X | Xs]) -> + flatcomb(Fun(X), flattermap(Fun, Xs)); +flattermap(Fun, X) -> + Fun(X). + +flatcomb([], Z) -> + Z; +flatcomb(Y, []) -> + Y; +flatcomb(Ys = [_ | _], Zs = [_ | _]) -> + Ys ++ Zs; +flatcomb(Ys = [_ | _], Z) -> + Ys ++ [Z]; +flatcomb(Y, Zs = [_ | _]) -> + [Y | Zs]; +flatcomb(Y, Z) -> + [Y, Z]. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 12e99c917..154e065b1 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ @@ -87,13 +88,13 @@ t_pipeline(_) -> t_start_timer(_) -> TRef = emqx_utils:start_timer(1, tmsg), timer:sleep(2), - ?assertEqual([{timeout, TRef, tmsg}], drain()), + ?assertEqual([{timeout, TRef, tmsg}], ?drainMailbox()), ok = emqx_utils:cancel_timer(TRef). t_cancel_timer(_) -> Timer = emqx_utils:start_timer(0, foo), ok = emqx_utils:cancel_timer(Timer), - ?assertEqual([], drain()), + ?assertEqual([], ?drainMailbox()), ok = emqx_utils:cancel_timer(undefined). t_proc_name(_) -> @@ -153,16 +154,6 @@ t_check(_) -> emqx_utils:check_oom(Policy) ). -drain() -> - drain([]). - -drain(Acc) -> - receive - Msg -> drain([Msg | Acc]) - after 0 -> - lists:reverse(Acc) - end. - t_rand_seed(_) -> ?assert(is_tuple(emqx_utils:rand_seed())). @@ -240,3 +231,46 @@ t_pmap_late_reply(_) -> [] ), ok. + +t_flattermap(_) -> + ?assertEqual( + [42, 42], + emqx_utils:flattermap(fun duplicate/1, 42) + ), + ?assertEqual( + [1, 1, 2, 2, 3, 3], + emqx_utils:flattermap(fun duplicate/1, [1, 2, 3]) + ), + ?assertEqual( + [], + emqx_utils:flattermap(fun nil/1, 42) + ), + ?assertEqual( + [], + emqx_utils:flattermap(fun nil/1, [1, 2, 3]) + ), + ?assertEqual( + 42, + emqx_utils:flattermap(fun identity/1, [42]) + ), + ?assertEqual( + [1, 3, 5], + emqx_utils:flattermap( + fun(X) -> + case X rem 2 of + 0 -> []; + 1 -> X + end + end, + [1, 2, 3, 4, 5] + ) + ). + +duplicate(X) -> + [X, X]. + +nil(_) -> + []. + +identity(X) -> + X.