diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 34e2de817..3364ffc98 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -43,6 +43,11 @@ , code_change/3 ]). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + -define(HELPER, ?MODULE). -define(SUBID, emqx_subid). -define(SUBMON, emqx_submon). diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index e058f0cda..252a85ab9 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -149,9 +149,9 @@ handle_cast({detected, Flapping = #flapping{clientid = ClientId, ets:insert(?FLAPPING_TAB, BannedFlapping); false -> ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", - [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]), - ets:delete_object(?FLAPPING_TAB, Flapping) + [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]) end, + ets:delete_object(?FLAPPING_TAB, Flapping), {noreply, State}; handle_cast(Msg, State) -> diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index db42cd1e8..bf6a3e26d 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -368,7 +368,7 @@ init([]) -> Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, true = ets:insert(?TAB, Metric), ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS), + end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index ba07eee4c..2fee25633 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -47,6 +47,11 @@ , get_port_info/1 ]). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + -export([cpu_util/0]). -define(UTIL_ALLOCATORS, [temp_alloc, diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 3be56157a..a03cbb979 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -29,24 +29,8 @@ all() -> emqx_ct:all(?MODULE). -groups() -> - [{pubsub, [sequence], - [t_sub_unsub, - t_publish, - t_pubsub, - t_shared_subscribe, - t_dispatch_with_no_sub, - 't_pubsub#', - 't_pubsub+' - ]}, - {metrics, [sequence], - [inc_dec_metric]}, - {stats, [sequence], - [set_get_stat] - }]. - init_per_suite(Config) -> - emqx_ct_helpers:boot_modules([router, broker]), + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -57,156 +41,173 @@ end_per_suite(_Config) -> %% PubSub Test %%-------------------------------------------------------------------- -t_sub_unsub(_) -> - ok = emqx_broker:subscribe(<<"topic">>, <<"clientId">>), - ok = emqx_broker:subscribe(<<"topic/1">>, <<"clientId">>, #{qos => 1}), - ok = emqx_broker:subscribe(<<"topic/2">>, <<"clientId">>, #{qos => 2}), - true = emqx_broker:subscribed(<<"clientId">>, <<"topic">>), - Topics = emqx_broker:topics(), - lists:foreach(fun(Topic) -> - ?assert(lists:member(Topic, Topics)) - end, Topics), - ok = emqx_broker:unsubscribe(<<"topic">>), - ok = emqx_broker:unsubscribe(<<"topic/1">>), - ok = emqx_broker:unsubscribe(<<"topic/2">>). +t_subscribed(_) -> + emqx_broker:subscribe(<<"topic">>), + ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)), + ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), + emqx_broker:unsubscribe(<<"topic">>). -t_publish(_) -> - Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), - ok = emqx_broker:subscribe(<<"test/+">>), - timer:sleep(10), - emqx_broker:publish(Msg), - ?assert(receive {deliver, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 100 -> false end). +t_subscribed_2(_) -> + emqx_broker:subscribe(<<"topic">>, <<"clientid">>), + ?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)), + ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), + emqx_broker:unsubscribe(<<"topic">>). -t_dispatch_with_no_sub(_) -> - Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>), - Delivery = #delivery{sender = self(), message = Msg}, - ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}], - emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). +t_subopts(_) -> + ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})), + ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)), + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), + ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)), + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}), + ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})), + ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), + emqx_broker:unsubscribe(<<"topic">>). -t_pubsub(_) -> - true = emqx:is_running(node()), - Self = self(), - Subscriber = <<"clientId">>, - ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }), - #{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2), - #{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>), - true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}), - #{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>), - ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }), - %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), - timer:sleep(10), - [Self] = emqx_broker:subscribers(<<"a/b/c">>), - emqx_broker:publish( - emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), +t_topics(_) -> + Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>], + ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>), + ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>), + ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>), + Topics1 = emqx_broker:topics(), + ?assertEqual(true, lists:foldl(fun(Topic, Acc) -> + case lists:member(Topic, Topics1) of + true -> Acc; + false -> false + end + end, true, Topics)), + emqx_broker:unsubscribe(lists:nth(1, Topics)), + emqx_broker:unsubscribe(lists:nth(2, Topics)), + emqx_broker:unsubscribe(lists:nth(3, Topics)). + +t_subscribers(_) -> + emqx_broker:subscribe(<<"topic">>, <<"clientid">>), + ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)), + emqx_broker:unsubscribe(<<"topic">>). + +t_subscriptions(_) -> + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), + ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))), + ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))), + emqx_broker:unsubscribe(<<"topic">>). + +t_sub_pub(_) -> + ok = emqx_broker:subscribe(<<"topic">>), + ct:sleep(10), + emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert( - receive {deliver, <<"a/b/c">>, _ } -> + receive + {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> true; - P -> - ct:log("Receive Message: ~p~n",[P]) + _ -> + false + after 100 -> + false + end). + +t_nosub_pub(_) -> + ?assertEqual(0, emqx_metrics:val('messages.dropped')), + emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), + ?assertEqual(1, emqx_metrics:val('messages.dropped')). + +t_shared_subscribe(_) -> + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}), + ct:sleep(10), + emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), + ?assert(receive + {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx_broker:unsubscribe(<<"$share/group/topic">>). + +t_shared_subscribe_2(_) -> + {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0), + + {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]), + {ok, _} = emqtt:connect(ConnPid2), + {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group2/topic">>, 0), + + ct:sleep(10), + ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0), + Msgs = recv_msgs(2), + ?assertEqual(2, length(Msgs)), + ?assertEqual(true, lists:foldl(fun(#{payload := <<"hello">>, topic := <<"topic">>}, Acc) -> + Acc; + (_, _) -> + false + end, true, Msgs)), + emqtt:disconnect(ConnPid), + emqtt:disconnect(ConnPid2). + +t_shared_subscribe_3(_) -> + {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0), + + {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]), + {ok, _} = emqtt:connect(ConnPid2), + {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group/topic">>, 0), + + ct:sleep(10), + ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0), + Msgs = recv_msgs(2), + ?assertEqual(1, length(Msgs)), + emqtt:disconnect(ConnPid), + emqtt:disconnect(ConnPid2). + +t_shard(_) -> + ok = meck:new(emqx_broker_helper, [passthrough, no_history]), + ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end), + emqx_broker:subscribe(<<"topic">>, <<"clientid">>), + ct:sleep(10), + emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), + ?assert( + receive + {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> + true; + _ -> + false after 100 -> false end), - spawn(fun() -> - emqx_broker:subscribe(<<"a/b/c">>), - emqx_broker:subscribe(<<"c/d/e">>), - timer:sleep(10), - emqx_broker:unsubscribe(<<"a/b/c">>) - end), - timer:sleep(20), - emqx_broker:unsubscribe(<<"a/b/c">>). - -t_shared_subscribe(_) -> - emqx_broker:subscribe(<<"$share/group2/topic2">>), - emqx_broker:subscribe(<<"$queue/topic3">>), - timer:sleep(10), - ct:pal("Share subscriptions: ~p", - [emqx_broker:subscriptions(self())]), - ?assertEqual(2, length(emqx_broker:subscriptions(self()))), - emqx_broker:unsubscribe(<<"$share/group2/topic2">>), - emqx_broker:unsubscribe(<<"$queue/topic3">>), - ?assertEqual(0, length(emqx_broker:subscriptions(self()))). - -'t_pubsub#'(_) -> - emqx_broker:subscribe(<<"a/#">>), - timer:sleep(10), - emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {deliver, <<"a/#">>, _} -> true after 100 -> false end), - emqx_broker:unsubscribe(<<"a/#">>). - -'t_pubsub+'(_) -> - emqx_broker:subscribe(<<"a/+/+">>), - timer:sleep(10), %% TODO: why sleep? - emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {deliver, <<"a/+/+">>, _} -> true after 100 -> false end), - emqx_broker:unsubscribe(<<"a/+/+">>). - -%%-------------------------------------------------------------------- -%% Metric Group -%%-------------------------------------------------------------------- - -inc_dec_metric(_) -> - emqx_metrics:inc('messages.retained', 10), - emqx_metrics:dec('messages.retained', 10). - -%%-------------------------------------------------------------------- -%% Stats Group -%%-------------------------------------------------------------------- - -set_get_stat(_) -> - emqx_stats:setstat('retained.max', 99), - ?assertEqual(99, emqx_stats:getstat('retained.max')). - - -t_dispatch(_) -> - error('TODO'). - -t_subscriber_down(_) -> - error('TODO'). - -t_get_subopts(_) -> - error('TODO'). - -t_set_subopts(_) -> - error('TODO'). - -t_topics(_) -> - error('TODO'). + ok = meck:unload(emqx_broker_helper). t_stats_fun(_) -> - error('TODO'). + ?assertEqual(0, emqx_stats:getstat('subscribers.count')), + ?assertEqual(0, emqx_stats:getstat('subscriptions.count')), + ?assertEqual(0, emqx_stats:getstat('suboptions.count')), + ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), + ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), + emqx_broker:stats_fun(), + ct:sleep(10), + ?assertEqual(2, emqx_stats:getstat('subscribers.count')), + ?assertEqual(2, emqx_stats:getstat('subscribers.max')), + ?assertEqual(2, emqx_stats:getstat('subscriptions.count')), + ?assertEqual(2, emqx_stats:getstat('subscriptions.max')), + ?assertEqual(2, emqx_stats:getstat('suboptions.count')), + ?assertEqual(2, emqx_stats:getstat('suboptions.max')). -t_init(_) -> - error('TODO'). +recv_msgs(Count) -> + recv_msgs(Count, []). -t_handle_call(_) -> - error('TODO'). - -t_handle_cast(_) -> - error('TODO'). - -t_handle_info(_) -> - error('TODO'). - -t_terminate(_) -> - error('TODO'). - -t_code_change(_) -> - error('TODO'). - -t_safe_publish(_) -> - error('TODO'). - -t_subscribed(_) -> - error('TODO'). - -t_subscriptions(_) -> - error('TODO'). - -t_subscribers(_) -> - error('TODO'). - -t_unsubscribe(_) -> - error('TODO'). - -t_subscribe(_) -> - error('TODO'). +recv_msgs(0, Msgs) -> + Msgs; +recv_msgs(Count, Msgs) -> + receive + {publish, Msg} -> + recv_msgs(Count-1, [Msg|Msgs]); + _Other -> recv_msgs(Count, Msgs) + after 100 -> + Msgs + end. \ No newline at end of file diff --git a/test/emqx_broker_helper_SUITE.erl b/test/emqx_broker_helper_SUITE.erl index e027bb1ef..4e287f341 100644 --- a/test/emqx_broker_helper_SUITE.erl +++ b/test/emqx_broker_helper_SUITE.erl @@ -23,48 +23,52 @@ all() -> emqx_ct:all(?MODULE). +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(_TestCase, Config) -> Config. -t_start_link(_) -> - error('TODO'). - t_lookup_subid(_) -> - error('TODO'). - -t_create_seq(_) -> - error('TODO'). - -t_init(_) -> - error('TODO'). - -t_handle_call(_) -> - error('TODO'). - -t_handle_cast(_) -> - error('TODO'). - -t_handle_info(_) -> - error('TODO'). - -t_terminate(_) -> - error('TODO'). - -t_code_change(_) -> - error('TODO'). + ?assertEqual(undefined, emqx_broker_helper:lookup_subid(self())), + emqx_broker_helper:register_sub(self(), <<"clientid">>), + ct:sleep(10), + ?assertEqual(<<"clientid">>, emqx_broker_helper:lookup_subid(self())). t_lookup_subpid(_) -> - error('TODO'). - -t_reclaim_seq(_) -> - error('TODO'). - -t_get_sub_shard(_) -> - error('TODO'). - + ?assertEqual(undefined, emqx_broker_helper:lookup_subpid(<<"clientid">>)), + emqx_broker_helper:register_sub(self(), <<"clientid">>), + ct:sleep(10), + ?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)). + t_register_sub(_) -> - error('TODO'). + ok = emqx_broker_helper:register_sub(self(), <<"clientid">>), + ct:sleep(10), + ok = emqx_broker_helper:register_sub(self(), <<"clientid">>), + try emqx_broker_helper:register_sub(self(), <<"clientid2">>) of + _ -> ct:fail(should_throw_error) + catch error:Reason -> + ?assertEqual(Reason, subid_conflict) + end, + ?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)). +t_shard_seq(_) -> + ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)), + emqx_broker_helper:create_seq(<<"topic">>), + ?assertEqual([{<<"topic">>, 1}], ets:lookup(emqx_subseq, <<"topic">>)), + emqx_broker_helper:reclaim_seq(<<"topic">>), + ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)). + +t_shards_num(_) -> + ?assertEqual(emqx_vm:schedulers() * 32, emqx_broker_helper:shards_num()). + +t_get_sub_shard(_) -> + ?assertEqual(0, emqx_broker_helper:get_sub_shard(self(), <<"topic">>)). diff --git a/test/emqx_cm_registry_SUITE.erl b/test/emqx_cm_registry_SUITE.erl index ac0988c7f..56f539eba 100644 --- a/test/emqx_cm_registry_SUITE.erl +++ b/test/emqx_cm_registry_SUITE.erl @@ -21,44 +21,57 @@ -include_lib("eunit/include/eunit.hrl"). +%%-------------------------------------------------------------------- +%% CT callbacks +%%-------------------------------------------------------------------- + all() -> emqx_ct:all(?MODULE). +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(_TestCase, Config) -> Config. -t_start_link(_) -> - error('TODO'). - -t_init(_) -> - error('TODO'). - -t_handle_call(_) -> - error('TODO'). - -t_handle_cast(_) -> - error('TODO'). - -t_handle_info(_) -> - error('TODO'). - -t_terminate(_) -> - error('TODO'). - -t_code_change(_) -> - error('TODO'). - -t_lookup_channels(_) -> - error('TODO'). - t_is_enabled(_) -> - error('TODO'). + application:set_env(emqx, enable_channel_registry, false), + ?assertEqual(false, emqx_cm_registry:is_enabled()), + application:set_env(emqx, enable_channel_registry, true), + ?assertEqual(true, emqx_cm_registry:is_enabled()). -t_unregister_channel(_) -> - error('TODO'). +t_register_unregister_channel(_) -> + ClientId = <<"clientid">>, + application:set_env(emqx, enable_channel_registry, false), + emqx_cm_registry:register_channel(ClientId), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), -t_register_channel(_) -> - error('TODO'). + application:set_env(emqx, enable_channel_registry, true), + emqx_cm_registry:register_channel(ClientId), + ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), + application:set_env(emqx, enable_channel_registry, false), + emqx_cm_registry:unregister_channel(ClientId), + ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), + + application:set_env(emqx, enable_channel_registry, true), + emqx_cm_registry:unregister_channel(ClientId), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)). + +t_cleanup_channels(_) -> + ClientId = <<"clientid">>, + ClientId2 = <<"clientid2">>, + emqx_cm_registry:register_channel(ClientId), + emqx_cm_registry:register_channel(ClientId2), + ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), + emqx_cm_registry ! {membership, {mnesia, down, node()}}, + ct:sleep(100), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)). diff --git a/test/emqx_metrics_SUITE.erl b/test/emqx_metrics_SUITE.erl index 2b704d085..a9445958b 100644 --- a/test/emqx_metrics_SUITE.erl +++ b/test/emqx_metrics_SUITE.erl @@ -24,24 +24,10 @@ all() -> emqx_ct:all(?MODULE). -t_val(_) -> - error('TODO'). - -t_dec(_) -> - error('TODO'). - -t_set(_) -> - error('TODO'). - -t_commit(_) -> - error('TODO'). - -t_inc(_) -> - error('TODO'). - t_new(_) -> with_metrics_server( fun() -> + ok = emqx_metrics:new('metrics.test'), ok = emqx_metrics:new('metrics.test'), 0 = emqx_metrics:val('metrics.test'), ok = emqx_metrics:inc('metrics.test'), @@ -86,16 +72,70 @@ t_inc_recv(_) -> with_metrics_server( fun() -> ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)), - ?assertEqual(1, emqx_metrics:val('packets.received')), - ?assertEqual(1, emqx_metrics:val('packets.connect.received')) + ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(0, 0)), + ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(1, 0)), + ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(2, 0)), + ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(3, 0)), + ok = emqx_metrics:inc_recv(?PACKET(?PUBACK)), + ok = emqx_metrics:inc_recv(?PACKET(?PUBREC)), + ok = emqx_metrics:inc_recv(?PACKET(?PUBREL)), + ok = emqx_metrics:inc_recv(?PACKET(?PUBCOMP)), + ok = emqx_metrics:inc_recv(?PACKET(?SUBSCRIBE)), + ok = emqx_metrics:inc_recv(?PACKET(?UNSUBSCRIBE)), + ok = emqx_metrics:inc_recv(?PACKET(?PINGREQ)), + ok = emqx_metrics:inc_recv(?PACKET(?DISCONNECT)), + ok = emqx_metrics:inc_recv(?PACKET(?AUTH)), + ignore = emqx_metrics:inc_recv(?PACKET(?RESERVED)), + ?assertEqual(15, emqx_metrics:val('packets.received')), + ?assertEqual(1, emqx_metrics:val('packets.connect.received')), + ?assertEqual(4, emqx_metrics:val('messages.received')), + ?assertEqual(1, emqx_metrics:val('messages.qos0.received')), + ?assertEqual(1, emqx_metrics:val('messages.qos1.received')), + ?assertEqual(1, emqx_metrics:val('messages.qos2.received')), + ?assertEqual(4, emqx_metrics:val('packets.publish.received')), + ?assertEqual(1, emqx_metrics:val('packets.puback.received')), + ?assertEqual(1, emqx_metrics:val('packets.pubrec.received')), + ?assertEqual(1, emqx_metrics:val('packets.pubrel.received')), + ?assertEqual(1, emqx_metrics:val('packets.pubcomp.received')), + ?assertEqual(1, emqx_metrics:val('packets.subscribe.received')), + ?assertEqual(1, emqx_metrics:val('packets.unsubscribe.received')), + ?assertEqual(1, emqx_metrics:val('packets.pingreq.received')), + ?assertEqual(1, emqx_metrics:val('packets.disconnect.received')), + ?assertEqual(1, emqx_metrics:val('packets.auth.received')) end). t_inc_sent(_) -> with_metrics_server( fun() -> ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)), - ?assertEqual(1, emqx_metrics:val('packets.sent')), - ?assertEqual(1, emqx_metrics:val('packets.connack.sent')) + ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(0, 0)), + ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(1, 0)), + ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(2, 0)), + ok = emqx_metrics:inc_sent(?PUBACK_PACKET(0, 0)), + ok = emqx_metrics:inc_sent(?PUBREC_PACKET(3, 0)), + ok = emqx_metrics:inc_sent(?PACKET(?PUBREL)), + ok = emqx_metrics:inc_sent(?PACKET(?PUBCOMP)), + ok = emqx_metrics:inc_sent(?PACKET(?SUBACK)), + ok = emqx_metrics:inc_sent(?PACKET(?UNSUBACK)), + ok = emqx_metrics:inc_sent(?PACKET(?PINGRESP)), + ok = emqx_metrics:inc_sent(?PACKET(?DISCONNECT)), + ok = emqx_metrics:inc_sent(?PACKET(?AUTH)), + ?assertEqual(13, emqx_metrics:val('packets.sent')), + ?assertEqual(1, emqx_metrics:val('packets.connack.sent')), + ?assertEqual(3, emqx_metrics:val('messages.sent')), + ?assertEqual(1, emqx_metrics:val('messages.qos0.sent')), + ?assertEqual(1, emqx_metrics:val('messages.qos1.sent')), + ?assertEqual(1, emqx_metrics:val('messages.qos2.sent')), + ?assertEqual(3, emqx_metrics:val('packets.publish.sent')), + ?assertEqual(1, emqx_metrics:val('packets.puback.sent')), + ?assertEqual(1, emqx_metrics:val('packets.pubrec.sent')), + ?assertEqual(1, emqx_metrics:val('packets.pubrel.sent')), + ?assertEqual(1, emqx_metrics:val('packets.pubcomp.sent')), + ?assertEqual(1, emqx_metrics:val('packets.suback.sent')), + ?assertEqual(1, emqx_metrics:val('packets.unsuback.sent')), + ?assertEqual(1, emqx_metrics:val('packets.pingresp.sent')), + ?assertEqual(1, emqx_metrics:val('packets.disconnect.sent')), + ?assertEqual(1, emqx_metrics:val('packets.auth.sent')) end). t_trans(_) -> diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 677b25c17..98bb7b072 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -64,7 +64,10 @@ t_pipeline(_) -> fun(_I, St) -> {ok, St+1} end, fun(I, St) -> {ok, I+1, St+1} end, fun(I, St) -> {ok, I*2, St*2} end], - ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)). + ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)), + ?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I) -> {error, undefined} end], 1, 1)), + ?assertEqual({error, undefined, 2}, emqx_misc:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)), + ?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I, _St) -> erlang:error(undefined) end], 1, 1)). t_start_timer(_) -> TRef = emqx_misc:start_timer(1, tmsg), @@ -75,7 +78,8 @@ t_start_timer(_) -> t_cancel_timer(_) -> Timer = emqx_misc:start_timer(0, foo), ok = emqx_misc:cancel_timer(Timer), - ?assertEqual([], drain()). + ?assertEqual([], drain()), + ok = emqx_misc:cancel_timer(undefined). t_proc_name(_) -> ?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)). @@ -84,7 +88,11 @@ t_proc_stats(_) -> Pid1 = spawn(fun() -> exit(normal) end), timer:sleep(10), ?assertEqual([], emqx_misc:proc_stats(Pid1)), - Pid2 = spawn(fun() -> timer:sleep(100) end), + Pid2 = spawn(fun() -> + ?assertMatch([{mailbox_len, 0}|_], emqx_misc:proc_stats()), + timer:sleep(200) + end), + timer:sleep(10), Pid2 ! msg, timer:sleep(10), ?assertMatch([{mailbox_len, 1}|_], emqx_misc:proc_stats(Pid2)). @@ -100,7 +108,16 @@ t_drain_down(_) -> {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end), {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end), timer:sleep(100), - ?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)). + ?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)), + ?assertEqual([], emqx_misc:drain_down(1)). + +t_index_of(_) -> + try emqx_misc:index_of(a, []) of + _ -> ct:fail(should_throw_error) + catch error:Reason -> + ?assertEqual(badarg, Reason) + end, + ?assertEqual(3, emqx_misc:index_of(a, [b, c, a, e, f])). drain() -> drain([]). diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index 377172b18..226cf5423 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -26,142 +26,152 @@ all() -> emqx_ct:all(?SUITE). - t_is_queue(_) -> - error('TODO'). + Q = ?PQ:new(), + ?assertEqual(true, ?PQ:is_queue(Q)), + Q1 = ?PQ:in(a, 1, Q), + ?assertEqual(true, ?PQ:is_queue(Q1)), + ?assertEqual(false, ?PQ:is_queue(bad_queue)). t_is_empty(_) -> - error('TODO'). - -t_to_list(_) -> - error('TODO'). - -t_from_list(_) -> - error('TODO'). - -t_in(_) -> - error('TODO'). - -t_out_p(_) -> - error('TODO'). - -t_join(_) -> - error('TODO'). - -t_filter(_) -> - error('TODO'). - -t_fold(_) -> - error('TODO'). - -t_highest(_) -> - error('TODO'). - -t_out(_) -> - error('TODO'). + Q = ?PQ:new(), + ?assertEqual(true, ?PQ:is_empty(Q)), + ?assertEqual(false, ?PQ:is_empty(?PQ:in(a, Q))). t_len(_) -> - error('TODO'). + Q = ?PQ:new(), + Q1 = ?PQ:in(a, Q), + ?assertEqual(1, ?PQ:len(Q1)), + Q2 = ?PQ:in(b, 1, Q1), + ?assertEqual(2, ?PQ:len(Q2)). t_plen(_) -> - error('TODO'). - -t_new(_) -> - error('TODO'). - -t_priority_queue_plen(_) -> Q = ?PQ:new(), - 0 = ?PQ:plen(0, Q), - Q0 = ?PQ:in(z, Q), - 1 = ?PQ:plen(0, Q0), - Q1 = ?PQ:in(x, 1, Q0), - 1 = ?PQ:plen(1, Q1), - Q2 = ?PQ:in(y, 2, Q1), - 1 = ?PQ:plen(2, Q2), - Q3 = ?PQ:in(z, 2, Q2), - 2 = ?PQ:plen(2, Q3), - {_, Q4} = ?PQ:out(1, Q3), - 0 = ?PQ:plen(1, Q4), - {_, Q5} = ?PQ:out(Q4), - 1 = ?PQ:plen(2, Q5), - {_, Q6} = ?PQ:out(Q5), - 0 = ?PQ:plen(2, Q6), - 1 = ?PQ:len(Q6), - {_, Q7} = ?PQ:out(Q6), - 0 = ?PQ:len(Q7). + Q1 = ?PQ:in(a, Q), + ?assertEqual(1, ?PQ:plen(0, Q1)), + ?assertEqual(0, ?PQ:plen(1, Q1)), + Q2 = ?PQ:in(b, 1, Q1), + Q3 = ?PQ:in(c, 1, Q2), + ?assertEqual(2, ?PQ:plen(1, Q3)), + ?assertEqual(1, ?PQ:plen(0, Q3)), + ?assertEqual(0, ?PQ:plen(0, {pqueue, []})). -t_priority_queue_out2(_) -> - Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}], - Q = ?PQ:new(), - Q0 = lists:foldl( +t_to_list(_) -> + Q = ?PQ:new(), + ?assertEqual([], ?PQ:to_list(Q)), + + Q1 = ?PQ:in(a, Q), + L1 = ?PQ:to_list(Q1), + ?assertEqual([{0, a}], L1), + + Q2 = ?PQ:in(b, 1, Q1), + L2 = ?PQ:to_list(Q2), + ?assertEqual([{1, b}, {0, a}], L2). + +t_from_list(_) -> + Q = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]), + ?assertEqual({pqueue, [{-1, {queue, [d], [c], 2}}, {0, {queue, [b], [a], 2}}]}, Q), + ?assertEqual(true, ?PQ:is_queue(Q)), + ?assertEqual(4, ?PQ:len(Q)). + +t_in(_) -> + Q = ?PQ:new(), + Els = [a, b, {c, 1}, {d, 1}, {e, infinity}, {f, 2}], + Q1 = lists:foldl( fun({El, P}, Acc) -> ?PQ:in(El, P, Acc); (El, Acc) -> ?PQ:in(El, Acc) end, Q, Els), - {Val, Q1} = ?PQ:out(Q0), - {value, d} = Val, - {Val1, Q2} = ?PQ:out(2, Q1), - {value, e} = Val1, - {Val2, Q3} = ?PQ:out(1, Q2), - {value, b} = Val2, - {Val3, Q4} = ?PQ:out(Q3), - {value, f} = Val3, - {Val4, Q5} = ?PQ:out(Q4), - {value, c} = Val4, - {Val5, Q6} = ?PQ:out(Q5), - {value, a} = Val5, - {empty, _Q7} = ?PQ:out(Q6). + ?assertEqual({pqueue, [{infinity, {queue, [e], [], 1}}, + {-2, {queue, [f], [], 1}}, + {-1, {queue, [d], [c], 2}}, + {0, {queue, [b], [a], 2}}]}, Q1). -t_priority_queues(_) -> - Q0 = ?PQ:new(), - Q1 = ?PQ:new(), - PQueue = {pqueue, [{0, Q0}, {1, Q1}]}, - ?assert(?PQ:is_queue(PQueue)), - [] = ?PQ:to_list(PQueue), +t_out(_) -> + Q = ?PQ:new(), + {empty, Q} = ?PQ:out(Q), + {empty, Q} = ?PQ:out(0, Q), + try ?PQ:out(1, Q) of + _ -> ct:fail(should_throw_error) + catch error:Reason -> + ?assertEqual(Reason, badarg) + end, + {{value, a}, Q} = ?PQ:out(?PQ:from_list([{0, a}])), + {{value, a}, {queue, [], [b], 1}} = ?PQ:out(?PQ:from_list([{0, a}, {0, b}])), + {{value, a}, {queue, [], [], 0}} = ?PQ:out({queue, [], [a], 1}), + {{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b], [a], 3}), + {{value, a}, {queue, [e, d], [b, c], 4}} = ?PQ:out({queue, [e, d, c, b], [a], 5}), + {{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b, a], [], 3}), + {{value, a}, {queue, [d, c], [b], 3}} = ?PQ:out({queue, [d, c], [a, b], 4}), + {{value, a}, {queue, [], [], 0}} = ?PQ:out(?PQ:from_list([{1, a}])), + {{value, a}, {queue, [c], [b], 2}} = ?PQ:out(?PQ:from_list([{1, a}, {0, b}, {0, c}])), + {{value, a}, {pqueue, [{-1, {queue, [b], [], 1}}]}} = ?PQ:out(?PQ:from_list([{1, b}, {2, a}])), + {{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(?PQ:from_list([{1, a}, {1, b}])). - PQueue1 = ?PQ:in(a, 0, ?PQ:new()), - PQueue2 = ?PQ:in(b, 0, PQueue1), +t_out_2(_) -> + {empty, {pqueue, [{-1, {queue, [a], [], 1}}]}} = ?PQ:out(0, ?PQ:from_list([{1, a}])), + {{value, a}, {queue, [], [], 0}} = ?PQ:out(1, ?PQ:from_list([{1, a}])), + {{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {1, b}])), + {{value, a}, {queue, [b], [], 1}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {0, b}])). - PQueue3 = ?PQ:in(c, 1, PQueue2), - PQueue4 = ?PQ:in(d, 1, PQueue3), +t_out_p(_) -> + {empty, {queue, [], [], 0}} = ?PQ:out_p(?PQ:new()), + {{value, a, 1}, {queue, [b], [], 1}} = ?PQ:out_p(?PQ:from_list([{1, a}, {0, b}])). - 4 = ?PQ:len(PQueue4), +t_join(_) -> + Q = ?PQ:in(a, ?PQ:new()), + Q = ?PQ:join(Q, ?PQ:new()), + Q = ?PQ:join(?PQ:new(), Q), - [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4), - PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]), + Q1 = ?PQ:in(a, ?PQ:new()), + Q2 = ?PQ:in(b, Q1), + Q3 = ?PQ:in(c, Q2), + {queue,[c,b],[a],3} = Q3, + Q4 = ?PQ:in(x, ?PQ:new()), + Q5 = ?PQ:in(y, Q4), + Q6 = ?PQ:in(z, Q5), + {queue,[z,y],[x],3} = Q6, + + {queue,[z,y],[a,b,c,x],6} = ?PQ:join(Q3, Q6), + + PQueue1 = ?PQ:from_list([{1, c}, {1, d}]), + PQueue2 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]), + PQueue3 = ?PQ:from_list([{1, c}, {1, d}, {-1, a}, {-1, b}]), + + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[z,y],[x],3}}]} = ?PQ:join(PQueue1, Q6), + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[z,y],[x],3}}]} = ?PQ:join(Q6, PQueue1), + + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[z,y],[a,b,x],5}}]} = ?PQ:join(PQueue2, Q6), + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[b],[x,y,z,a],5}}]} = ?PQ:join(Q6, PQueue2), + + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[z,y],[x],3}}, + {1,{queue,[b],[a],2}}]} = ?PQ:join(PQueue3, Q6), + {pqueue,[{-1,{queue,[d],[c],2}}, + {0,{queue,[z,y],[x],3}}, + {1,{queue,[b],[a],2}}]} = ?PQ:join(Q6, PQueue3), + + PQueue4 = ?PQ:from_list([{1, c}, {1, d}]), + PQueue5 = ?PQ:from_list([{2, a}, {2, b}]), + {pqueue,[{-2,{queue,[b],[a],2}}, + {-1,{queue,[d],[c],2}}]} = ?PQ:join(PQueue4, PQueue5). + +t_filter(_) -> + {pqueue, [{-2, {queue, [10], [4], 2}}, + {-1, {queue, [2], [], 1}}]} = + ?PQ:filter(fun(V) when V rem 2 =:= 0 -> + true; + (_) -> + false + end, ?PQ:from_list([{0, 1}, {0, 3}, {1, 2}, {2, 4}, {2, 10}])). + +t_highest(_) -> empty = ?PQ:highest(?PQ:new()), - 0 = ?PQ:highest(PQueue1), - 1 = ?PQ:highest(PQueue4), - - PQueue5 = ?PQ:in(e, infinity, PQueue4), - PQueue6 = ?PQ:in(f, 1, PQueue5), - - {{value, e}, PQueue7} = ?PQ:out(PQueue6), - {empty, _} = ?PQ:out(0, ?PQ:new()), - - {empty, Q0} = ?PQ:out_p(Q0), - - Q2 = ?PQ:in(a, Q0), - Q3 = ?PQ:in(b, Q2), - Q4 = ?PQ:in(c, Q3), - - {{value, a, 0}, _Q5} = ?PQ:out_p(Q4), - - {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7), - - Q4 = ?PQ:join(Q4, ?PQ:new()), - Q4 = ?PQ:join(?PQ:new(), Q4), - - {queue, [a], [a], 2} = ?PQ:join(Q2, Q2), - - {pqueue,[{-1,{queue,[f],[d],2}}, - {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2), - - {pqueue,[{-1,{queue,[f],[d],2}}, - {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8), - - {pqueue,[{-1,{queue,[f],[d,f,d],4}}, - {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8). - + 0 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}])), + 2 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}, {1, c}, {2, d}, {2, e}])). \ No newline at end of file diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index c4532ed26..0e38c91c4 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -104,7 +104,8 @@ t_load(_Config) -> t_systeminfo(_Config) -> Keys = [Key || {Key, _} <- emqx_vm:get_system_info()], - ?SYSTEM_INFO = Keys. + ?SYSTEM_INFO = Keys, + ?assertEqual(undefined, emqx_vm:get_system_info(undefined)). t_mem_info(_Config) -> application:ensure_all_started(os_mon), @@ -139,10 +140,19 @@ t_get_ets_info(_Config) -> ets:new(test, [named_table]), [] = emqx_vm:get_ets_info(test1), EtsInfo = emqx_vm:get_ets_info(test), - test = proplists:get_value(name, EtsInfo). + test = proplists:get_value(name, EtsInfo), + Tid = proplists:get_value(id, EtsInfo), + EtsInfos = emqx_vm:get_ets_info(), + ?assertEqual(true, lists:foldl(fun(Info, Acc) -> + case proplists:get_value(id, Info) of + Tid -> true; + _ -> Acc + end + end, false, EtsInfos)). t_get_ets_object(_Config) -> ets:new(test, [named_table]), + [] = emqx_vm:get_ets_object(test), ets:insert(test, {k, v}), [{k, v}] = emqx_vm:get_ets_object(test). @@ -150,7 +160,20 @@ t_get_port_types(_Config) -> emqx_vm:get_port_types(). t_get_port_info(_Config) -> - emqx_vm:get_port_info(). + emqx_vm:get_port_info(), + spawn(fun easy_server/0), + ct:sleep(100), + {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), + emqx_vm:get_port_info(), + ok = gen_tcp:close(Sock), + [Port | _] = erlang:ports(), + [{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]). + +t_transform_port(_Config) -> + [Port | _] = erlang:ports(), + ?assertEqual(Port, emqx_vm:transform_port(Port)), + <<131, 102, 100, NameLen:2/unit:8, _Name:NameLen/binary, N:4/unit:8, _Vsn:8>> = erlang:term_to_binary(Port), + ?assertEqual(Port, emqx_vm:transform_port("#Port<0." ++ integer_to_list(N) ++ ">")). t_scheduler_usage(_Config) -> emqx_vm:scheduler_usage(5000). @@ -170,3 +193,21 @@ t_get_process_group_leader_info(_Config) -> t_get_process_limit(_Config) -> emqx_vm:get_process_limit(). +t_cpu_util(_Config) -> + ?assertEqual(0, emqx_vm:cpu_util()). + +easy_server() -> + {ok, LSock} = gen_tcp:listen(5678, [binary, {packet, 0}, {active, false}]), + {ok, Sock} = gen_tcp:accept(LSock), + ok = do_recv(Sock), + ok = gen_tcp:close(Sock), + ok = gen_tcp:close(LSock). + +do_recv(Sock) -> + case gen_tcp:recv(Sock, 0) of + {ok, _} -> + do_recv(Sock); + {error, closed} -> + ok + end. +