243 lines
6.6 KiB
Erlang
243 lines
6.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_utils_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-define(SOCKOPTS, [
|
|
binary,
|
|
{packet, raw},
|
|
{reuseaddr, true},
|
|
{backlog, 512},
|
|
{nodelay, true}
|
|
]).
|
|
|
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
t_merge_opts(_) ->
|
|
Opts = emqx_utils:merge_opts(?SOCKOPTS, [
|
|
raw,
|
|
binary,
|
|
{backlog, 1024},
|
|
{nodelay, false},
|
|
{max_clients, 1024},
|
|
{acceptors, 16}
|
|
]),
|
|
?assertEqual(1024, proplists:get_value(backlog, Opts)),
|
|
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
|
|
?assertEqual(
|
|
[
|
|
binary,
|
|
raw,
|
|
{acceptors, 16},
|
|
{backlog, 1024},
|
|
{max_clients, 1024},
|
|
{nodelay, false},
|
|
{packet, raw},
|
|
{reuseaddr, true}
|
|
],
|
|
lists:sort(Opts)
|
|
).
|
|
|
|
t_maybe_apply(_) ->
|
|
?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)),
|
|
?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)).
|
|
|
|
t_run_fold(_) ->
|
|
?assertEqual(1, emqx_utils:run_fold([], 1, state)),
|
|
Add = fun(I, St) -> I + St end,
|
|
Mul = fun(I, St) -> I * St end,
|
|
?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)).
|
|
|
|
t_pipeline(_) ->
|
|
?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)),
|
|
Funs = [
|
|
fun(_I, _St) -> ok end,
|
|
fun(_I, St) -> {ok, St + 1} end,
|
|
fun(I, St) -> {ok, I + 1, St + 1} end,
|
|
fun(I, St) -> {ok, I * 2, St * 2} end
|
|
],
|
|
?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)),
|
|
?assertEqual(
|
|
{error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
|
|
),
|
|
?assertEqual(
|
|
{error, undefined, 2},
|
|
emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
|
|
).
|
|
|
|
t_start_timer(_) ->
|
|
TRef = emqx_utils:start_timer(1, tmsg),
|
|
timer:sleep(2),
|
|
?assertEqual([{timeout, TRef, tmsg}], drain()),
|
|
ok = emqx_utils:cancel_timer(TRef).
|
|
|
|
t_cancel_timer(_) ->
|
|
Timer = emqx_utils:start_timer(0, foo),
|
|
ok = emqx_utils:cancel_timer(Timer),
|
|
?assertEqual([], drain()),
|
|
ok = emqx_utils:cancel_timer(undefined).
|
|
|
|
t_proc_name(_) ->
|
|
?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)).
|
|
|
|
t_proc_stats(_) ->
|
|
Pid1 = spawn(fun() -> exit(normal) end),
|
|
timer:sleep(10),
|
|
?assertEqual([], emqx_utils:proc_stats(Pid1)),
|
|
Pid2 = spawn(fun() ->
|
|
?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()),
|
|
timer:sleep(200)
|
|
end),
|
|
timer:sleep(10),
|
|
Pid2 ! msg,
|
|
timer:sleep(10),
|
|
?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)).
|
|
|
|
t_drain_deliver(_) ->
|
|
self() ! {deliver, t1, m1},
|
|
self() ! {deliver, t2, m2},
|
|
?assertEqual(
|
|
[
|
|
{deliver, t1, m1},
|
|
{deliver, t2, m2}
|
|
],
|
|
emqx_utils:drain_deliver(2)
|
|
).
|
|
|
|
t_drain_down(_) ->
|
|
{Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
|
|
{Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
|
|
timer:sleep(100),
|
|
?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))),
|
|
?assertEqual([], emqx_utils:drain_down(1)).
|
|
|
|
t_index_of(_) ->
|
|
try emqx_utils:index_of(a, []) of
|
|
_ -> ct:fail(should_throw_error)
|
|
catch
|
|
error:Reason ->
|
|
?assertEqual(badarg, Reason)
|
|
end,
|
|
?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])).
|
|
|
|
t_check(_) ->
|
|
Policy = #{
|
|
max_mailbox_size => 10,
|
|
max_heap_size => 1024 * 1024 * 8,
|
|
enable => true
|
|
},
|
|
[self() ! {msg, I} || I <- lists:seq(1, 5)],
|
|
?assertEqual(ok, emqx_utils:check_oom(Policy)),
|
|
[self() ! {msg, I} || I <- lists:seq(1, 6)],
|
|
?assertEqual(
|
|
{shutdown, #{reason => message_queue_too_long, value => 11, max => 10}},
|
|
emqx_utils:check_oom(Policy)
|
|
).
|
|
|
|
drain() ->
|
|
drain([]).
|
|
|
|
drain(Acc) ->
|
|
receive
|
|
Msg -> drain([Msg | Acc])
|
|
after 0 ->
|
|
lists:reverse(Acc)
|
|
end.
|
|
|
|
t_rand_seed(_) ->
|
|
?assert(is_tuple(emqx_utils:rand_seed())).
|
|
|
|
t_now_to_secs(_) ->
|
|
?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))).
|
|
|
|
t_now_to_ms(_) ->
|
|
?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))).
|
|
|
|
t_gen_id(_) ->
|
|
?assertEqual(10, length(emqx_utils:gen_id(10))),
|
|
?assertEqual(20, length(emqx_utils:gen_id(20))).
|
|
|
|
t_pmap_normal(_) ->
|
|
?assertEqual(
|
|
[5, 7, 9],
|
|
emqx_utils:pmap(
|
|
fun({A, B}) -> A + B end,
|
|
[{2, 3}, {3, 4}, {4, 5}]
|
|
)
|
|
).
|
|
|
|
t_pmap_timeout(_) ->
|
|
?assertExit(
|
|
timeout,
|
|
emqx_utils:pmap(
|
|
fun
|
|
(timeout) -> ct:sleep(1000);
|
|
({A, B}) -> A + B
|
|
end,
|
|
[{2, 3}, {3, 4}, timeout],
|
|
100
|
|
)
|
|
).
|
|
|
|
t_pmap_exception(_) ->
|
|
?assertError(
|
|
foobar,
|
|
emqx_utils:pmap(
|
|
fun
|
|
(error) -> error(foobar);
|
|
({A, B}) -> A + B
|
|
end,
|
|
[{2, 3}, {3, 4}, error]
|
|
)
|
|
).
|
|
|
|
t_pmap_late_reply(_) ->
|
|
?check_trace(
|
|
begin
|
|
?force_ordering(
|
|
#{?snk_kind := pmap_middleman_sent_response},
|
|
#{?snk_kind := pmap_timeout}
|
|
),
|
|
Timeout = 100,
|
|
Res =
|
|
catch emqx_utils:pmap(
|
|
fun(_) ->
|
|
process_flag(trap_exit, true),
|
|
timer:sleep(3 * Timeout),
|
|
done
|
|
end,
|
|
[1, 2, 3],
|
|
Timeout
|
|
),
|
|
receive
|
|
{Ref, LateReply} when is_reference(Ref) ->
|
|
ct:fail("should not receive late reply: ~p", [LateReply])
|
|
after (5 * Timeout) ->
|
|
ok
|
|
end,
|
|
?assertMatch([done, done, done], Res),
|
|
ok
|
|
end,
|
|
[]
|
|
),
|
|
ok.
|