Merge pull request #3010 from emqx/test_cases_zhou

Add test cases and fix bugs
This commit is contained in:
tigercl 2019-11-01 16:29:00 +08:00 committed by GitHub
commit b3731aaff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 504 additions and 368 deletions

View File

@ -43,6 +43,11 @@
, code_change/3
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-define(HELPER, ?MODULE).
-define(SUBID, emqx_subid).
-define(SUBMON, emqx_submon).

View File

@ -149,9 +149,9 @@ handle_cast({detected, Flapping = #flapping{clientid = ClientId,
ets:insert(?FLAPPING_TAB, BannedFlapping);
false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]),
ets:delete_object(?FLAPPING_TAB, Flapping)
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval])
end,
ets:delete_object(?FLAPPING_TAB, Flapping),
{noreply, State};
handle_cast(Msg, State) ->

View File

@ -368,7 +368,7 @@ init([]) ->
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->

View File

@ -47,6 +47,11 @@
, get_port_info/1
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([cpu_util/0]).
-define(UTIL_ALLOCATORS, [temp_alloc,

View File

@ -29,24 +29,8 @@
all() -> emqx_ct:all(?MODULE).
groups() ->
[{pubsub, [sequence],
[t_sub_unsub,
t_publish,
t_pubsub,
t_shared_subscribe,
t_dispatch_with_no_sub,
't_pubsub#',
't_pubsub+'
]},
{metrics, [sequence],
[inc_dec_metric]},
{stats, [sequence],
[set_get_stat]
}].
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules([router, broker]),
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
@ -57,156 +41,173 @@ end_per_suite(_Config) ->
%% PubSub Test
%%--------------------------------------------------------------------
t_sub_unsub(_) ->
ok = emqx_broker:subscribe(<<"topic">>, <<"clientId">>),
ok = emqx_broker:subscribe(<<"topic/1">>, <<"clientId">>, #{qos => 1}),
ok = emqx_broker:subscribe(<<"topic/2">>, <<"clientId">>, #{qos => 2}),
true = emqx_broker:subscribed(<<"clientId">>, <<"topic">>),
Topics = emqx_broker:topics(),
lists:foreach(fun(Topic) ->
?assert(lists:member(Topic, Topics))
end, Topics),
ok = emqx_broker:unsubscribe(<<"topic">>),
ok = emqx_broker:unsubscribe(<<"topic/1">>),
ok = emqx_broker:unsubscribe(<<"topic/2">>).
t_subscribed(_) ->
emqx_broker:subscribe(<<"topic">>),
?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
emqx_broker:unsubscribe(<<"topic">>).
t_publish(_) ->
Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
ok = emqx_broker:subscribe(<<"test/+">>),
timer:sleep(10),
emqx_broker:publish(Msg),
?assert(receive {deliver, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 100 -> false end).
t_subscribed_2(_) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)),
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
emqx_broker:unsubscribe(<<"topic">>).
t_dispatch_with_no_sub(_) ->
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
Delivery = #delivery{sender = self(), message = Msg},
?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
t_subopts(_) ->
?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})),
?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
emqx_broker:unsubscribe(<<"topic">>).
t_pubsub(_) ->
true = emqx:is_running(node()),
Self = self(),
Subscriber = <<"clientId">>,
ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
#{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2),
#{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}),
#{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
timer:sleep(10),
[Self] = emqx_broker:subscribers(<<"a/b/c">>),
emqx_broker:publish(
emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
t_topics(_) ->
Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>),
ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>),
ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>),
Topics1 = emqx_broker:topics(),
?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
case lists:member(Topic, Topics1) of
true -> Acc;
false -> false
end
end, true, Topics)),
emqx_broker:unsubscribe(lists:nth(1, Topics)),
emqx_broker:unsubscribe(lists:nth(2, Topics)),
emqx_broker:unsubscribe(lists:nth(3, Topics)).
t_subscribers(_) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)),
emqx_broker:unsubscribe(<<"topic">>).
t_subscriptions(_) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
?assertEqual(#{qos => 1, subid => <<"clientid">>},
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
?assertEqual(#{qos => 1, subid => <<"clientid">>},
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))),
emqx_broker:unsubscribe(<<"topic">>).
t_sub_pub(_) ->
ok = emqx_broker:subscribe(<<"topic">>),
ct:sleep(10),
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert(
receive {deliver, <<"a/b/c">>, _ } ->
receive
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
true;
P ->
ct:log("Receive Message: ~p~n",[P])
_ ->
false
after 100 ->
false
end).
t_nosub_pub(_) ->
?assertEqual(0, emqx_metrics:val('messages.dropped')),
emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assertEqual(1, emqx_metrics:val('messages.dropped')).
t_shared_subscribe(_) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
ct:sleep(10),
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert(receive
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
emqx_broker:unsubscribe(<<"$share/group/topic">>).
t_shared_subscribe_2(_) ->
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
{ok, _} = emqtt:connect(ConnPid),
{ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
{ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
{ok, _} = emqtt:connect(ConnPid2),
{ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group2/topic">>, 0),
ct:sleep(10),
ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
Msgs = recv_msgs(2),
?assertEqual(2, length(Msgs)),
?assertEqual(true, lists:foldl(fun(#{payload := <<"hello">>, topic := <<"topic">>}, Acc) ->
Acc;
(_, _) ->
false
end, true, Msgs)),
emqtt:disconnect(ConnPid),
emqtt:disconnect(ConnPid2).
t_shared_subscribe_3(_) ->
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
{ok, _} = emqtt:connect(ConnPid),
{ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
{ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
{ok, _} = emqtt:connect(ConnPid2),
{ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group/topic">>, 0),
ct:sleep(10),
ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
Msgs = recv_msgs(2),
?assertEqual(1, length(Msgs)),
emqtt:disconnect(ConnPid),
emqtt:disconnect(ConnPid2).
t_shard(_) ->
ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
ct:sleep(10),
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert(
receive
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
true;
_ ->
false
after 100 ->
false
end),
spawn(fun() ->
emqx_broker:subscribe(<<"a/b/c">>),
emqx_broker:subscribe(<<"c/d/e">>),
timer:sleep(10),
emqx_broker:unsubscribe(<<"a/b/c">>)
end),
timer:sleep(20),
emqx_broker:unsubscribe(<<"a/b/c">>).
t_shared_subscribe(_) ->
emqx_broker:subscribe(<<"$share/group2/topic2">>),
emqx_broker:subscribe(<<"$queue/topic3">>),
timer:sleep(10),
ct:pal("Share subscriptions: ~p",
[emqx_broker:subscriptions(self())]),
?assertEqual(2, length(emqx_broker:subscriptions(self()))),
emqx_broker:unsubscribe(<<"$share/group2/topic2">>),
emqx_broker:unsubscribe(<<"$queue/topic3">>),
?assertEqual(0, length(emqx_broker:subscriptions(self()))).
't_pubsub#'(_) ->
emqx_broker:subscribe(<<"a/#">>),
timer:sleep(10),
emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {deliver, <<"a/#">>, _} -> true after 100 -> false end),
emqx_broker:unsubscribe(<<"a/#">>).
't_pubsub+'(_) ->
emqx_broker:subscribe(<<"a/+/+">>),
timer:sleep(10), %% TODO: why sleep?
emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {deliver, <<"a/+/+">>, _} -> true after 100 -> false end),
emqx_broker:unsubscribe(<<"a/+/+">>).
%%--------------------------------------------------------------------
%% Metric Group
%%--------------------------------------------------------------------
inc_dec_metric(_) ->
emqx_metrics:inc('messages.retained', 10),
emqx_metrics:dec('messages.retained', 10).
%%--------------------------------------------------------------------
%% Stats Group
%%--------------------------------------------------------------------
set_get_stat(_) ->
emqx_stats:setstat('retained.max', 99),
?assertEqual(99, emqx_stats:getstat('retained.max')).
t_dispatch(_) ->
error('TODO').
t_subscriber_down(_) ->
error('TODO').
t_get_subopts(_) ->
error('TODO').
t_set_subopts(_) ->
error('TODO').
t_topics(_) ->
error('TODO').
ok = meck:unload(emqx_broker_helper).
t_stats_fun(_) ->
error('TODO').
?assertEqual(0, emqx_stats:getstat('subscribers.count')),
?assertEqual(0, emqx_stats:getstat('subscriptions.count')),
?assertEqual(0, emqx_stats:getstat('suboptions.count')),
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
emqx_broker:stats_fun(),
ct:sleep(10),
?assertEqual(2, emqx_stats:getstat('subscribers.count')),
?assertEqual(2, emqx_stats:getstat('subscribers.max')),
?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
?assertEqual(2, emqx_stats:getstat('suboptions.count')),
?assertEqual(2, emqx_stats:getstat('suboptions.max')).
t_init(_) ->
error('TODO').
recv_msgs(Count) ->
recv_msgs(Count, []).
t_handle_call(_) ->
error('TODO').
t_handle_cast(_) ->
error('TODO').
t_handle_info(_) ->
error('TODO').
t_terminate(_) ->
error('TODO').
t_code_change(_) ->
error('TODO').
t_safe_publish(_) ->
error('TODO').
t_subscribed(_) ->
error('TODO').
t_subscriptions(_) ->
error('TODO').
t_subscribers(_) ->
error('TODO').
t_unsubscribe(_) ->
error('TODO').
t_subscribe(_) ->
error('TODO').
recv_msgs(0, Msgs) ->
Msgs;
recv_msgs(Count, Msgs) ->
receive
{publish, Msg} ->
recv_msgs(Count-1, [Msg|Msgs]);
_Other -> recv_msgs(Count, Msgs)
after 100 ->
Msgs
end.

View File

@ -23,48 +23,52 @@
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
t_start_link(_) ->
error('TODO').
t_lookup_subid(_) ->
error('TODO').
t_create_seq(_) ->
error('TODO').
t_init(_) ->
error('TODO').
t_handle_call(_) ->
error('TODO').
t_handle_cast(_) ->
error('TODO').
t_handle_info(_) ->
error('TODO').
t_terminate(_) ->
error('TODO').
t_code_change(_) ->
error('TODO').
?assertEqual(undefined, emqx_broker_helper:lookup_subid(self())),
emqx_broker_helper:register_sub(self(), <<"clientid">>),
ct:sleep(10),
?assertEqual(<<"clientid">>, emqx_broker_helper:lookup_subid(self())).
t_lookup_subpid(_) ->
error('TODO').
t_reclaim_seq(_) ->
error('TODO').
t_get_sub_shard(_) ->
error('TODO').
?assertEqual(undefined, emqx_broker_helper:lookup_subpid(<<"clientid">>)),
emqx_broker_helper:register_sub(self(), <<"clientid">>),
ct:sleep(10),
?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)).
t_register_sub(_) ->
error('TODO').
ok = emqx_broker_helper:register_sub(self(), <<"clientid">>),
ct:sleep(10),
ok = emqx_broker_helper:register_sub(self(), <<"clientid">>),
try emqx_broker_helper:register_sub(self(), <<"clientid2">>) of
_ -> ct:fail(should_throw_error)
catch error:Reason ->
?assertEqual(Reason, subid_conflict)
end,
?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)).
t_shard_seq(_) ->
?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)),
emqx_broker_helper:create_seq(<<"topic">>),
?assertEqual([{<<"topic">>, 1}], ets:lookup(emqx_subseq, <<"topic">>)),
emqx_broker_helper:reclaim_seq(<<"topic">>),
?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)).
t_shards_num(_) ->
?assertEqual(emqx_vm:schedulers() * 32, emqx_broker_helper:shards_num()).
t_get_sub_shard(_) ->
?assertEqual(0, emqx_broker_helper:get_sub_shard(self(), <<"topic">>)).

View File

@ -21,44 +21,57 @@
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
t_start_link(_) ->
error('TODO').
t_init(_) ->
error('TODO').
t_handle_call(_) ->
error('TODO').
t_handle_cast(_) ->
error('TODO').
t_handle_info(_) ->
error('TODO').
t_terminate(_) ->
error('TODO').
t_code_change(_) ->
error('TODO').
t_lookup_channels(_) ->
error('TODO').
t_is_enabled(_) ->
error('TODO').
application:set_env(emqx, enable_channel_registry, false),
?assertEqual(false, emqx_cm_registry:is_enabled()),
application:set_env(emqx, enable_channel_registry, true),
?assertEqual(true, emqx_cm_registry:is_enabled()).
t_unregister_channel(_) ->
error('TODO').
t_register_unregister_channel(_) ->
ClientId = <<"clientid">>,
application:set_env(emqx, enable_channel_registry, false),
emqx_cm_registry:register_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
t_register_channel(_) ->
error('TODO').
application:set_env(emqx, enable_channel_registry, true),
emqx_cm_registry:register_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, false),
emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, true),
emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)).
t_cleanup_channels(_) ->
ClientId = <<"clientid">>,
ClientId2 = <<"clientid2">>,
emqx_cm_registry:register_channel(ClientId),
emqx_cm_registry:register_channel(ClientId2),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
emqx_cm_registry ! {membership, {mnesia, down, node()}},
ct:sleep(100),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).

View File

@ -24,24 +24,10 @@
all() -> emqx_ct:all(?MODULE).
t_val(_) ->
error('TODO').
t_dec(_) ->
error('TODO').
t_set(_) ->
error('TODO').
t_commit(_) ->
error('TODO').
t_inc(_) ->
error('TODO').
t_new(_) ->
with_metrics_server(
fun() ->
ok = emqx_metrics:new('metrics.test'),
ok = emqx_metrics:new('metrics.test'),
0 = emqx_metrics:val('metrics.test'),
ok = emqx_metrics:inc('metrics.test'),
@ -86,16 +72,70 @@ t_inc_recv(_) ->
with_metrics_server(
fun() ->
ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)),
?assertEqual(1, emqx_metrics:val('packets.received')),
?assertEqual(1, emqx_metrics:val('packets.connect.received'))
ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(0, 0)),
ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(1, 0)),
ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(2, 0)),
ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(3, 0)),
ok = emqx_metrics:inc_recv(?PACKET(?PUBACK)),
ok = emqx_metrics:inc_recv(?PACKET(?PUBREC)),
ok = emqx_metrics:inc_recv(?PACKET(?PUBREL)),
ok = emqx_metrics:inc_recv(?PACKET(?PUBCOMP)),
ok = emqx_metrics:inc_recv(?PACKET(?SUBSCRIBE)),
ok = emqx_metrics:inc_recv(?PACKET(?UNSUBSCRIBE)),
ok = emqx_metrics:inc_recv(?PACKET(?PINGREQ)),
ok = emqx_metrics:inc_recv(?PACKET(?DISCONNECT)),
ok = emqx_metrics:inc_recv(?PACKET(?AUTH)),
ignore = emqx_metrics:inc_recv(?PACKET(?RESERVED)),
?assertEqual(15, emqx_metrics:val('packets.received')),
?assertEqual(1, emqx_metrics:val('packets.connect.received')),
?assertEqual(4, emqx_metrics:val('messages.received')),
?assertEqual(1, emqx_metrics:val('messages.qos0.received')),
?assertEqual(1, emqx_metrics:val('messages.qos1.received')),
?assertEqual(1, emqx_metrics:val('messages.qos2.received')),
?assertEqual(4, emqx_metrics:val('packets.publish.received')),
?assertEqual(1, emqx_metrics:val('packets.puback.received')),
?assertEqual(1, emqx_metrics:val('packets.pubrec.received')),
?assertEqual(1, emqx_metrics:val('packets.pubrel.received')),
?assertEqual(1, emqx_metrics:val('packets.pubcomp.received')),
?assertEqual(1, emqx_metrics:val('packets.subscribe.received')),
?assertEqual(1, emqx_metrics:val('packets.unsubscribe.received')),
?assertEqual(1, emqx_metrics:val('packets.pingreq.received')),
?assertEqual(1, emqx_metrics:val('packets.disconnect.received')),
?assertEqual(1, emqx_metrics:val('packets.auth.received'))
end).
t_inc_sent(_) ->
with_metrics_server(
fun() ->
ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
?assertEqual(1, emqx_metrics:val('packets.sent')),
?assertEqual(1, emqx_metrics:val('packets.connack.sent'))
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(0, 0)),
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(1, 0)),
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(2, 0)),
ok = emqx_metrics:inc_sent(?PUBACK_PACKET(0, 0)),
ok = emqx_metrics:inc_sent(?PUBREC_PACKET(3, 0)),
ok = emqx_metrics:inc_sent(?PACKET(?PUBREL)),
ok = emqx_metrics:inc_sent(?PACKET(?PUBCOMP)),
ok = emqx_metrics:inc_sent(?PACKET(?SUBACK)),
ok = emqx_metrics:inc_sent(?PACKET(?UNSUBACK)),
ok = emqx_metrics:inc_sent(?PACKET(?PINGRESP)),
ok = emqx_metrics:inc_sent(?PACKET(?DISCONNECT)),
ok = emqx_metrics:inc_sent(?PACKET(?AUTH)),
?assertEqual(13, emqx_metrics:val('packets.sent')),
?assertEqual(1, emqx_metrics:val('packets.connack.sent')),
?assertEqual(3, emqx_metrics:val('messages.sent')),
?assertEqual(1, emqx_metrics:val('messages.qos0.sent')),
?assertEqual(1, emqx_metrics:val('messages.qos1.sent')),
?assertEqual(1, emqx_metrics:val('messages.qos2.sent')),
?assertEqual(3, emqx_metrics:val('packets.publish.sent')),
?assertEqual(1, emqx_metrics:val('packets.puback.sent')),
?assertEqual(1, emqx_metrics:val('packets.pubrec.sent')),
?assertEqual(1, emqx_metrics:val('packets.pubrel.sent')),
?assertEqual(1, emqx_metrics:val('packets.pubcomp.sent')),
?assertEqual(1, emqx_metrics:val('packets.suback.sent')),
?assertEqual(1, emqx_metrics:val('packets.unsuback.sent')),
?assertEqual(1, emqx_metrics:val('packets.pingresp.sent')),
?assertEqual(1, emqx_metrics:val('packets.disconnect.sent')),
?assertEqual(1, emqx_metrics:val('packets.auth.sent'))
end).
t_trans(_) ->

View File

@ -64,7 +64,10 @@ t_pipeline(_) ->
fun(_I, St) -> {ok, St+1} end,
fun(I, St) -> {ok, I+1, St+1} end,
fun(I, St) -> {ok, I*2, St*2} end],
?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)).
?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)),
?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I) -> {error, undefined} end], 1, 1)),
?assertEqual({error, undefined, 2}, emqx_misc:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)),
?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I, _St) -> erlang:error(undefined) end], 1, 1)).
t_start_timer(_) ->
TRef = emqx_misc:start_timer(1, tmsg),
@ -75,7 +78,8 @@ t_start_timer(_) ->
t_cancel_timer(_) ->
Timer = emqx_misc:start_timer(0, foo),
ok = emqx_misc:cancel_timer(Timer),
?assertEqual([], drain()).
?assertEqual([], drain()),
ok = emqx_misc:cancel_timer(undefined).
t_proc_name(_) ->
?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)).
@ -84,7 +88,11 @@ t_proc_stats(_) ->
Pid1 = spawn(fun() -> exit(normal) end),
timer:sleep(10),
?assertEqual([], emqx_misc:proc_stats(Pid1)),
Pid2 = spawn(fun() -> timer:sleep(100) end),
Pid2 = spawn(fun() ->
?assertMatch([{mailbox_len, 0}|_], emqx_misc:proc_stats()),
timer:sleep(200)
end),
timer:sleep(10),
Pid2 ! msg,
timer:sleep(10),
?assertMatch([{mailbox_len, 1}|_], emqx_misc:proc_stats(Pid2)).
@ -100,7 +108,16 @@ t_drain_down(_) ->
{Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
{Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
timer:sleep(100),
?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)).
?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)),
?assertEqual([], emqx_misc:drain_down(1)).
t_index_of(_) ->
try emqx_misc:index_of(a, []) of
_ -> ct:fail(should_throw_error)
catch error:Reason ->
?assertEqual(badarg, Reason)
end,
?assertEqual(3, emqx_misc:index_of(a, [b, c, a, e, f])).
drain() ->
drain([]).

View File

@ -26,142 +26,152 @@
all() -> emqx_ct:all(?SUITE).
t_is_queue(_) ->
error('TODO').
Q = ?PQ:new(),
?assertEqual(true, ?PQ:is_queue(Q)),
Q1 = ?PQ:in(a, 1, Q),
?assertEqual(true, ?PQ:is_queue(Q1)),
?assertEqual(false, ?PQ:is_queue(bad_queue)).
t_is_empty(_) ->
error('TODO').
t_to_list(_) ->
error('TODO').
t_from_list(_) ->
error('TODO').
t_in(_) ->
error('TODO').
t_out_p(_) ->
error('TODO').
t_join(_) ->
error('TODO').
t_filter(_) ->
error('TODO').
t_fold(_) ->
error('TODO').
t_highest(_) ->
error('TODO').
t_out(_) ->
error('TODO').
Q = ?PQ:new(),
?assertEqual(true, ?PQ:is_empty(Q)),
?assertEqual(false, ?PQ:is_empty(?PQ:in(a, Q))).
t_len(_) ->
error('TODO').
Q = ?PQ:new(),
Q1 = ?PQ:in(a, Q),
?assertEqual(1, ?PQ:len(Q1)),
Q2 = ?PQ:in(b, 1, Q1),
?assertEqual(2, ?PQ:len(Q2)).
t_plen(_) ->
error('TODO').
t_new(_) ->
error('TODO').
t_priority_queue_plen(_) ->
Q = ?PQ:new(),
0 = ?PQ:plen(0, Q),
Q0 = ?PQ:in(z, Q),
1 = ?PQ:plen(0, Q0),
Q1 = ?PQ:in(x, 1, Q0),
1 = ?PQ:plen(1, Q1),
Q2 = ?PQ:in(y, 2, Q1),
1 = ?PQ:plen(2, Q2),
Q3 = ?PQ:in(z, 2, Q2),
2 = ?PQ:plen(2, Q3),
{_, Q4} = ?PQ:out(1, Q3),
0 = ?PQ:plen(1, Q4),
{_, Q5} = ?PQ:out(Q4),
1 = ?PQ:plen(2, Q5),
{_, Q6} = ?PQ:out(Q5),
0 = ?PQ:plen(2, Q6),
1 = ?PQ:len(Q6),
{_, Q7} = ?PQ:out(Q6),
0 = ?PQ:len(Q7).
Q1 = ?PQ:in(a, Q),
?assertEqual(1, ?PQ:plen(0, Q1)),
?assertEqual(0, ?PQ:plen(1, Q1)),
Q2 = ?PQ:in(b, 1, Q1),
Q3 = ?PQ:in(c, 1, Q2),
?assertEqual(2, ?PQ:plen(1, Q3)),
?assertEqual(1, ?PQ:plen(0, Q3)),
?assertEqual(0, ?PQ:plen(0, {pqueue, []})).
t_priority_queue_out2(_) ->
Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}],
Q = ?PQ:new(),
Q0 = lists:foldl(
t_to_list(_) ->
Q = ?PQ:new(),
?assertEqual([], ?PQ:to_list(Q)),
Q1 = ?PQ:in(a, Q),
L1 = ?PQ:to_list(Q1),
?assertEqual([{0, a}], L1),
Q2 = ?PQ:in(b, 1, Q1),
L2 = ?PQ:to_list(Q2),
?assertEqual([{1, b}, {0, a}], L2).
t_from_list(_) ->
Q = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
?assertEqual({pqueue, [{-1, {queue, [d], [c], 2}}, {0, {queue, [b], [a], 2}}]}, Q),
?assertEqual(true, ?PQ:is_queue(Q)),
?assertEqual(4, ?PQ:len(Q)).
t_in(_) ->
Q = ?PQ:new(),
Els = [a, b, {c, 1}, {d, 1}, {e, infinity}, {f, 2}],
Q1 = lists:foldl(
fun({El, P}, Acc) ->
?PQ:in(El, P, Acc);
(El, Acc) ->
?PQ:in(El, Acc)
end, Q, Els),
{Val, Q1} = ?PQ:out(Q0),
{value, d} = Val,
{Val1, Q2} = ?PQ:out(2, Q1),
{value, e} = Val1,
{Val2, Q3} = ?PQ:out(1, Q2),
{value, b} = Val2,
{Val3, Q4} = ?PQ:out(Q3),
{value, f} = Val3,
{Val4, Q5} = ?PQ:out(Q4),
{value, c} = Val4,
{Val5, Q6} = ?PQ:out(Q5),
{value, a} = Val5,
{empty, _Q7} = ?PQ:out(Q6).
?assertEqual({pqueue, [{infinity, {queue, [e], [], 1}},
{-2, {queue, [f], [], 1}},
{-1, {queue, [d], [c], 2}},
{0, {queue, [b], [a], 2}}]}, Q1).
t_priority_queues(_) ->
Q0 = ?PQ:new(),
Q1 = ?PQ:new(),
PQueue = {pqueue, [{0, Q0}, {1, Q1}]},
?assert(?PQ:is_queue(PQueue)),
[] = ?PQ:to_list(PQueue),
t_out(_) ->
Q = ?PQ:new(),
{empty, Q} = ?PQ:out(Q),
{empty, Q} = ?PQ:out(0, Q),
try ?PQ:out(1, Q) of
_ -> ct:fail(should_throw_error)
catch error:Reason ->
?assertEqual(Reason, badarg)
end,
{{value, a}, Q} = ?PQ:out(?PQ:from_list([{0, a}])),
{{value, a}, {queue, [], [b], 1}} = ?PQ:out(?PQ:from_list([{0, a}, {0, b}])),
{{value, a}, {queue, [], [], 0}} = ?PQ:out({queue, [], [a], 1}),
{{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b], [a], 3}),
{{value, a}, {queue, [e, d], [b, c], 4}} = ?PQ:out({queue, [e, d, c, b], [a], 5}),
{{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b, a], [], 3}),
{{value, a}, {queue, [d, c], [b], 3}} = ?PQ:out({queue, [d, c], [a, b], 4}),
{{value, a}, {queue, [], [], 0}} = ?PQ:out(?PQ:from_list([{1, a}])),
{{value, a}, {queue, [c], [b], 2}} = ?PQ:out(?PQ:from_list([{1, a}, {0, b}, {0, c}])),
{{value, a}, {pqueue, [{-1, {queue, [b], [], 1}}]}} = ?PQ:out(?PQ:from_list([{1, b}, {2, a}])),
{{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(?PQ:from_list([{1, a}, {1, b}])).
PQueue1 = ?PQ:in(a, 0, ?PQ:new()),
PQueue2 = ?PQ:in(b, 0, PQueue1),
t_out_2(_) ->
{empty, {pqueue, [{-1, {queue, [a], [], 1}}]}} = ?PQ:out(0, ?PQ:from_list([{1, a}])),
{{value, a}, {queue, [], [], 0}} = ?PQ:out(1, ?PQ:from_list([{1, a}])),
{{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {1, b}])),
{{value, a}, {queue, [b], [], 1}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {0, b}])).
PQueue3 = ?PQ:in(c, 1, PQueue2),
PQueue4 = ?PQ:in(d, 1, PQueue3),
t_out_p(_) ->
{empty, {queue, [], [], 0}} = ?PQ:out_p(?PQ:new()),
{{value, a, 1}, {queue, [b], [], 1}} = ?PQ:out_p(?PQ:from_list([{1, a}, {0, b}])).
4 = ?PQ:len(PQueue4),
t_join(_) ->
Q = ?PQ:in(a, ?PQ:new()),
Q = ?PQ:join(Q, ?PQ:new()),
Q = ?PQ:join(?PQ:new(), Q),
[{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4),
PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
Q1 = ?PQ:in(a, ?PQ:new()),
Q2 = ?PQ:in(b, Q1),
Q3 = ?PQ:in(c, Q2),
{queue,[c,b],[a],3} = Q3,
Q4 = ?PQ:in(x, ?PQ:new()),
Q5 = ?PQ:in(y, Q4),
Q6 = ?PQ:in(z, Q5),
{queue,[z,y],[x],3} = Q6,
{queue,[z,y],[a,b,c,x],6} = ?PQ:join(Q3, Q6),
PQueue1 = ?PQ:from_list([{1, c}, {1, d}]),
PQueue2 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
PQueue3 = ?PQ:from_list([{1, c}, {1, d}, {-1, a}, {-1, b}]),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[z,y],[x],3}}]} = ?PQ:join(PQueue1, Q6),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[z,y],[x],3}}]} = ?PQ:join(Q6, PQueue1),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[z,y],[a,b,x],5}}]} = ?PQ:join(PQueue2, Q6),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[b],[x,y,z,a],5}}]} = ?PQ:join(Q6, PQueue2),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[z,y],[x],3}},
{1,{queue,[b],[a],2}}]} = ?PQ:join(PQueue3, Q6),
{pqueue,[{-1,{queue,[d],[c],2}},
{0,{queue,[z,y],[x],3}},
{1,{queue,[b],[a],2}}]} = ?PQ:join(Q6, PQueue3),
PQueue4 = ?PQ:from_list([{1, c}, {1, d}]),
PQueue5 = ?PQ:from_list([{2, a}, {2, b}]),
{pqueue,[{-2,{queue,[b],[a],2}},
{-1,{queue,[d],[c],2}}]} = ?PQ:join(PQueue4, PQueue5).
t_filter(_) ->
{pqueue, [{-2, {queue, [10], [4], 2}},
{-1, {queue, [2], [], 1}}]} =
?PQ:filter(fun(V) when V rem 2 =:= 0 ->
true;
(_) ->
false
end, ?PQ:from_list([{0, 1}, {0, 3}, {1, 2}, {2, 4}, {2, 10}])).
t_highest(_) ->
empty = ?PQ:highest(?PQ:new()),
0 = ?PQ:highest(PQueue1),
1 = ?PQ:highest(PQueue4),
PQueue5 = ?PQ:in(e, infinity, PQueue4),
PQueue6 = ?PQ:in(f, 1, PQueue5),
{{value, e}, PQueue7} = ?PQ:out(PQueue6),
{empty, _} = ?PQ:out(0, ?PQ:new()),
{empty, Q0} = ?PQ:out_p(Q0),
Q2 = ?PQ:in(a, Q0),
Q3 = ?PQ:in(b, Q2),
Q4 = ?PQ:in(c, Q3),
{{value, a, 0}, _Q5} = ?PQ:out_p(Q4),
{{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7),
Q4 = ?PQ:join(Q4, ?PQ:new()),
Q4 = ?PQ:join(?PQ:new(), Q4),
{queue, [a], [a], 2} = ?PQ:join(Q2, Q2),
{pqueue,[{-1,{queue,[f],[d],2}},
{0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2),
{pqueue,[{-1,{queue,[f],[d],2}},
{0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8),
{pqueue,[{-1,{queue,[f],[d,f,d],4}},
{0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8).
0 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}])),
2 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}, {1, c}, {2, d}, {2, e}])).

View File

@ -104,7 +104,8 @@ t_load(_Config) ->
t_systeminfo(_Config) ->
Keys = [Key || {Key, _} <- emqx_vm:get_system_info()],
?SYSTEM_INFO = Keys.
?SYSTEM_INFO = Keys,
?assertEqual(undefined, emqx_vm:get_system_info(undefined)).
t_mem_info(_Config) ->
application:ensure_all_started(os_mon),
@ -139,10 +140,19 @@ t_get_ets_info(_Config) ->
ets:new(test, [named_table]),
[] = emqx_vm:get_ets_info(test1),
EtsInfo = emqx_vm:get_ets_info(test),
test = proplists:get_value(name, EtsInfo).
test = proplists:get_value(name, EtsInfo),
Tid = proplists:get_value(id, EtsInfo),
EtsInfos = emqx_vm:get_ets_info(),
?assertEqual(true, lists:foldl(fun(Info, Acc) ->
case proplists:get_value(id, Info) of
Tid -> true;
_ -> Acc
end
end, false, EtsInfos)).
t_get_ets_object(_Config) ->
ets:new(test, [named_table]),
[] = emqx_vm:get_ets_object(test),
ets:insert(test, {k, v}),
[{k, v}] = emqx_vm:get_ets_object(test).
@ -150,7 +160,20 @@ t_get_port_types(_Config) ->
emqx_vm:get_port_types().
t_get_port_info(_Config) ->
emqx_vm:get_port_info().
emqx_vm:get_port_info(),
spawn(fun easy_server/0),
ct:sleep(100),
{ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
emqx_vm:get_port_info(),
ok = gen_tcp:close(Sock),
[Port | _] = erlang:ports(),
[{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]).
t_transform_port(_Config) ->
[Port | _] = erlang:ports(),
?assertEqual(Port, emqx_vm:transform_port(Port)),
<<131, 102, 100, NameLen:2/unit:8, _Name:NameLen/binary, N:4/unit:8, _Vsn:8>> = erlang:term_to_binary(Port),
?assertEqual(Port, emqx_vm:transform_port("#Port<0." ++ integer_to_list(N) ++ ">")).
t_scheduler_usage(_Config) ->
emqx_vm:scheduler_usage(5000).
@ -170,3 +193,21 @@ t_get_process_group_leader_info(_Config) ->
t_get_process_limit(_Config) ->
emqx_vm:get_process_limit().
t_cpu_util(_Config) ->
?assertEqual(0, emqx_vm:cpu_util()).
easy_server() ->
{ok, LSock} = gen_tcp:listen(5678, [binary, {packet, 0}, {active, false}]),
{ok, Sock} = gen_tcp:accept(LSock),
ok = do_recv(Sock),
ok = gen_tcp:close(Sock),
ok = gen_tcp:close(LSock).
do_recv(Sock) ->
case gen_tcp:recv(Sock, 0) of
{ok, _} ->
do_recv(Sock);
{error, closed} ->
ok
end.