From 492d224728ecee85393255a29fd29c28e4118c84 Mon Sep 17 00:00:00 2001 From: Rory Z Date: Fri, 17 Jul 2020 18:25:46 +0800 Subject: [PATCH] Auto-pull-request-on-2020-07-17 (#3600) * refactor(channel): skip the ACL checking for inner subscribe * fix(props): fix the prop_emqx_sys results of judgment * Update esockd to 5.7.1 * test(topic-metrics): add test cases for topic metrics * perf(emqx_vm): make emqx_vm:get_memory/0 more efficiency --- rebar.config | 2 +- src/emqx_channel.erl | 87 +++++++------------------ src/emqx_mod_delayed.erl | 2 +- src/emqx_mod_topic_metrics.erl | 63 +++++++++--------- src/emqx_vm.erl | 38 +++++++---- test/emqx_mod_topic_metrics_SUITE.erl | 92 +++++++++++++++++++++++++++ test/mqtt_protocol_v5_SUITE.erl | 28 ++++++-- test/props/prop_emqx_sys.erl | 3 +- 8 files changed, 202 insertions(+), 113 deletions(-) create mode 100644 test/emqx_mod_topic_metrics_SUITE.erl diff --git a/rebar.config b/rebar.config index 76c927a78..1ac3d4c9a 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.0"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index a5d02734b..184bfa2fc 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -457,7 +457,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), case QoS of ?QOS_0 -> {ok, NChannel}; _ -> - handle_out(puback, {PacketId, ReasonCode}, NChannel) + handle_out(puback, {PacketId, ReasonCode}, NChannel) end; disconnect -> handle_out(disconnect, ReasonCode, NChannel) @@ -535,38 +535,26 @@ process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> - {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), - process_subscribe(More, SubProps, NChannel, [RC|Acc]). + case check_subscribe(TopicFilter, SubOpts, Channel) of + ok -> + {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), + process_subscribe(More, SubProps, NChannel, [RC|Acc]); + {error, RC} -> + process_subscribe(More, SubProps, Channel, [RC|Acc]) + end. do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> - case check_subscribe(TopicFilter, SubOpts, Channel) of - ok -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), - SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), - case emqx_session:subscribe(ClientInfo, TopicFilter1, SubOpts1, Session) of - {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; - {error, RC} -> {RC, Channel} - end; - {error, RC} -> {RC, Channel} + NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), + NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), + case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of + {ok, NSession} -> + {QoS, Channel#channel{session = NSession}}; + {error, RC} -> + {RC, Channel} end. --compile({inline, [process_force_subscribe/2]}). -process_force_subscribe(Subscriptions, Channel = - #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, - session = Session}) -> - lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) -> - NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc), - case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of - {ok, NSession} -> - {ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}}; - {error, ReasonCode} -> - {ReasonCodes ++ [ReasonCode], ChannelAcc} - end - end, {[], Channel}, Subscriptions). - %%-------------------------------------------------------------------- %% Process Unsubscribe %%-------------------------------------------------------------------- @@ -591,21 +579,6 @@ do_unsubscribe(TopicFilter, SubOpts, Channel = {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} end. - --compile({inline, [process_force_unsubscribe/2]}). -process_force_unsubscribe(Subscriptions, Channel = - #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, - session = Session}) -> - lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) -> - NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of - {ok, NSession} -> - {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}}; - {error, ReasonCode} -> - {ReasonCodes ++ [ReasonCode], ChannelAcc} - end - end, {[], Channel}, Subscriptions). - %%-------------------------------------------------------------------- %% Process Disconnect %%-------------------------------------------------------------------- @@ -855,28 +828,16 @@ handle_call(Req, Channel) -> -spec(handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). -handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = run_hooks('client.subscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters) - ), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel), + +handle_info({subscribe, TopicFilters}, Channel ) -> + {_, NChannel} = lists:foldl( + fun({TopicFilter, SubOpts}, {_, ChannelAcc}) -> + do_subscribe(TopicFilter, SubOpts, ChannelAcc) + end, {[], Channel}, parse_topic_filters(TopicFilters)), {ok, NChannel}; -handle_info({force_subscribe, TopicFilters}, Channel) -> - {_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel), - {ok, NChannel}; - -handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = run_hooks('client.unsubscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters) - ), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel), - {ok, NChannel}; - -handle_info({force_unsubscribe, TopicFilters}, Channel) -> - {_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel), +handle_info({unsubscribe, TopicFilters}, Channel) -> + {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel), {ok, NChannel}; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> @@ -1253,7 +1214,7 @@ packing_alias(Packet = #mqtt_packet{ }, Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) -> case find_alias(outbound, Topic, TopicAliases) of - {ok, AliasId} -> + {ok, AliasId} -> NPublish = Publish#mqtt_packet_publish{ topic_name = <<>>, properties = #{'Topic-Alias' => AliasId} diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl index b4ea28e77..096fecbb3 100644 --- a/src/emqx_mod_delayed.erl +++ b/src/emqx_mod_delayed.erl @@ -182,7 +182,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> case mnesia:dirty_read(?TAB, Key) of [] -> ok; [#delayed_message{msg = Msg}] -> - emqx_pool:async_submit(fun emqx_broker:publish/1, [Msg]) + emqx_pool:async_submit(fun emqx:publish/1, [Msg]) end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]). diff --git a/src/emqx_mod_topic_metrics.erl b/src/emqx_mod_topic_metrics.erl index 66fac6ee4..b5b4c24e6 100644 --- a/src/emqx_mod_topic_metrics.erl +++ b/src/emqx_mod_topic_metrics.erl @@ -155,18 +155,28 @@ inc(Topic, Metric) -> inc(Topic, Metric, Val) -> case get_counters(Topic) of - {error, not_found} -> - {error, not_found}; + {error, topic_not_found} -> + {error, topic_not_found}; CRef -> - counters:add(CRef, metrics_idx(Metric), Val) + case metric_idx(Metric) of + {error, invalid_metric} -> + {error, invalid_metric}; + Idx -> + counters:add(CRef, Idx, Val) + end end. val(Topic, Metric) -> case ets:lookup(?TAB, Topic) of [] -> - {error, not_found}; + {error, topic_not_found}; [{Topic, CRef}] -> - counters:get(CRef, metrics_idx(Metric)) + case metric_idx(Metric) of + {error, invalid_metric} -> + {error, invalid_metric}; + Idx -> + counters:get(CRef, Idx) + end end. rate(Topic, Metric) -> @@ -175,10 +185,10 @@ rate(Topic, Metric) -> metrics(Topic) -> case ets:lookup(?TAB, Topic) of [] -> - {error, not_found}; + {error, topic_not_found}; [{Topic, CRef}] -> lists:foldl(fun(Metric, Acc) -> - [{to_count(Metric), counters:get(CRef, metrics_idx(Metric))}, + [{to_count(Metric), counters:get(CRef, metric_idx(Metric))}, {to_rate(Metric), rate(Topic, Metric)} | Acc] end, [], ?TOPIC_METRICS) end. @@ -240,25 +250,14 @@ handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) -> {reply, ok, State#state{speeds = maps:remove(Topic, Speeds)}} end; -handle_call({get_rate, Topic}, _From, State = #state{speeds = Speeds}) -> - case is_registered(Topic) of - false -> - {reply, {error, not_found}, State}; - true -> - lists:foldl(fun(Metric, Acc) -> - Speed = maps:get({Topic, Metric}, Speeds), - [{Metric, Speed#speed.last} | Acc] - end, [], ?TOPIC_METRICS) - end; - handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> - {reply, {error, not_found}, State}; + {reply, {error, topic_not_found}, State}; true -> case maps:get({Topic, Metric}, Speeds, undefined) of undefined -> - {reply, {error, not_found}, State}; + {reply, {error, invalid_metric}, State}; #speed{last = Last} -> {reply, Last, State} end @@ -272,7 +271,7 @@ handle_info(ticking, State = #state{speeds = Speeds}) -> NSpeeds = maps:map( fun({Topic, Metric}, Speed) -> case val(Topic, Metric) of - {error, not_found} -> maps:remove(Topic, Speeds); + {error, topic_not_found} -> maps:remove(Topic, Speeds); Val -> calculate_speed(Val, Speed) end end, Speeds), @@ -290,15 +289,17 @@ terminate(_Reason, _State) -> %% Internal Functions %%------------------------------------------------------------------------------ -metrics_idx('messages.in') -> 01; -metrics_idx('messages.out') -> 02; -metrics_idx('messages.qos0.in') -> 03; -metrics_idx('messages.qos0.out') -> 04; -metrics_idx('messages.qos1.in') -> 05; -metrics_idx('messages.qos1.out') -> 06; -metrics_idx('messages.qos2.in') -> 07; -metrics_idx('messages.qos2.out') -> 08; -metrics_idx('messages.dropped') -> 09. +metric_idx('messages.in') -> 01; +metric_idx('messages.out') -> 02; +metric_idx('messages.qos0.in') -> 03; +metric_idx('messages.qos0.out') -> 04; +metric_idx('messages.qos1.in') -> 05; +metric_idx('messages.qos1.out') -> 06; +metric_idx('messages.qos2.in') -> 07; +metric_idx('messages.qos2.out') -> 08; +metric_idx('messages.dropped') -> 09; +metric_idx(_) -> + {error, invalid_metric}. to_count('messages.in') -> 'messages.in.count'; @@ -344,7 +345,7 @@ delete_counters(Topic) -> get_counters(Topic) -> case ets:lookup(?TAB, Topic) of - [] -> {error, not_found}; + [] -> {error, topic_not_found}; [{Topic, CRef}] -> CRef end. diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 539b8b91b..3971de56d 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -22,6 +22,7 @@ , get_system_info/0 , get_system_info/1 , get_memory/0 + , get_memory/2 , mem_info/0 , loads/0 ]). @@ -241,32 +242,47 @@ scheduler_usage_diff(First, Last) -> end, lists:zip(lists:sort(First), lists:sort(Last))). get_memory()-> - [{Key, get_memory(Key, current)} || Key <- [used, allocated, unused, usage]] ++ erlang:memory(). + get_memory_once(current) ++ erlang:memory(). + +get_memory(Ks, Keyword) when is_list(Ks) -> + Ms = get_memory_once(Keyword) ++ erlang:memory(), + [M || M = {K, _} <- Ms, lists:member(K, Ks)]; get_memory(used, Keyword) -> lists:sum(lists:map(fun({_, Prop}) -> container_size(Prop, Keyword, blocks_size) end, util_alloc())); + get_memory(allocated, Keyword) -> - lists:sum(lists:map(fun({_, Prop})-> + lists:sum(lists:map(fun({_, Prop}) -> container_size(Prop, Keyword, carriers_size) end, util_alloc())); + get_memory(unused, Keyword) -> - get_memory(allocated, Keyword) - get_memory(used, Keyword); + Ms = get_memory_once(Keyword), + proplists:get_value(allocated, Ms) - proplists:get_value(used, Ms); + get_memory(usage, Keyword) -> - get_memory(used, Keyword) / get_memory(allocated, Keyword). + Ms = get_memory_once(Keyword), + proplists:get_value(used, Ms) / proplists:get_value(allocated, Ms). + +%% @private A more quickly function to calculate memory +get_memory_once(Keyword) -> + Calc = fun({_, Prop}, {N1, N2}) -> + {N1 + container_size(Prop, Keyword, blocks_size), + N2 + container_size(Prop, Keyword, carriers_size)} + end, + {Used, Allocated} = lists:foldl(Calc, {0, 0}, util_alloc()), + [{used, Used}, + {allocated, Allocated}, + {unused, Allocated - Used}, + {usage, Used / Allocated}]. util_alloc()-> alloc(?UTIL_ALLOCATORS). -alloc()-> - {_Mem, Allocs} = snapshot_int(), - Allocs. alloc(Type) -> - [{{T, Instance}, Props} || {{T, Instance}, Props} <- alloc(), lists:member(T, Type)]. - -snapshot_int() -> - {erlang:memory(), allocators()}. + [{{T, Instance}, Props} || {{T, Instance}, Props} <- allocators(), lists:member(T, Type)]. allocators() -> UtilAllocators = erlang:system_info(alloc_util_allocators), diff --git a/test/emqx_mod_topic_metrics_SUITE.erl b/test/emqx_mod_topic_metrics_SUITE.erl new file mode 100644 index 000000000..4ac1b00eb --- /dev/null +++ b/test/emqx_mod_topic_metrics_SUITE.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_topic_metrics_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_nonexistent_topic_metrics(_) -> + emqx_mod_topic_metrics:load([]), + ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), + emqx_mod_topic_metrics:register(<<"a/b/c">>), + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), + ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), + ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), + emqx_mod_topic_metrics:unregister(<<"a/b/c">>), + emqx_mod_topic_metrics:unload([]). + +t_topic_metrics(_) -> + emqx_mod_topic_metrics:load([]), + + ?assertEqual(false, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)), + ?assertEqual([], emqx_mod_topic_metrics:all_registered_topics()), + emqx_mod_topic_metrics:register(<<"a/b/c">>), + ?assertEqual(true, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)), + ?assertEqual([<<"a/b/c">>], emqx_mod_topic_metrics:all_registered_topics()), + + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), + emqx_mod_topic_metrics:unregister(<<"a/b/c">>), + emqx_mod_topic_metrics:unload([]). + +t_hook(_) -> + emqx_mod_topic_metrics:load([]), + emqx_mod_topic_metrics:register(<<"a/b/c">>), + + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')), + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), + ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + + {ok, C} = emqtt:start_link([{host, "localhost"}, + {clientid, "myclient"}, + {username, "myuser"}]), + {ok, _} = emqtt:connect(C), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + ct:sleep(100), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + + emqtt:subscribe(C, <<"a/b/c">>), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + ct:sleep(100), + ?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), + ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + emqx_mod_topic_metrics:unregister(<<"a/b/c">>), + emqx_mod_topic_metrics:unload([]). \ No newline at end of file diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index cd32e9bd8..2d74ec6fb 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -175,6 +175,26 @@ t_connect_will_message(_) -> ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10] ok = emqtt:disconnect(Client4). +t_batch_subscribe(_) -> + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]), + {ok, _} = emqtt:connect(Client), + application:set_env(emqx, enable_acl_cache, false), + TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"), + file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe, [\"t1\", \"t2\", \"t3\"]}.\n"), + application:set_env(emqx, acl_file, TempAcl), + emqx_mod_acl_internal:reload([]), + {ok, _, [?RC_NOT_AUTHORIZED, + ?RC_NOT_AUTHORIZED, + ?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1}, + {<<"t2">>, qos2}, + {<<"t3">>, qos0}]), + {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED, + ?RC_NO_SUBSCRIPTION_EXISTED, + ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, + <<"t2">>, + <<"t3">>]), + emqtt:disconnect(Client). + t_connect_will_retain(_) -> Topic = nth(1, ?TOPICS), Payload = "will message", @@ -556,7 +576,7 @@ t_publish_topic_alias(_) -> waiting_client_process_exit(Client2), process_flag(trap_exit, false). - + t_publish_response_topic(_) -> process_flag(trap_exit, true), Topic = nth(1, ?TOPICS), @@ -609,7 +629,7 @@ t_subscribe_topic_alias(_) -> Topic1 = nth(1, ?TOPICS), Topic2 = nth(2, ?TOPICS), {ok, Client1} = emqtt:start_link([{proto_ver, v5}, - {properties, #{'Topic-Alias-Maximum' => 1}} + {properties, #{'Topic-Alias-Maximum' => 1}} ]), {ok, _} = emqtt:connect(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), @@ -701,8 +721,8 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)), CRef = counters:new(1, [atomics]), - meck:expect(emqtt, connected, - fun(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) -> + meck:expect(emqtt, connected, + fun(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) -> ok = counters:add(CRef, 1, 1), {stop, {shutdown, for_testiong}}; (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) diff --git a/test/props/prop_emqx_sys.erl b/test/props/prop_emqx_sys.erl index 3c2b599c4..d80edbe4e 100644 --- a/test/props/prop_emqx_sys.erl +++ b/test/props/prop_emqx_sys.erl @@ -50,7 +50,7 @@ prop_sys() -> ok = emqx_sys:stop(), ?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n", [History,State,Result]), - aggregate(command_names(Cmds), true)) + aggregate(command_names(Cmds), Result =:= ok)) end). %%-------------------------------------------------------------------- @@ -107,7 +107,6 @@ command(_State) -> ]). precondition(_State, {call, _Mod, _Fun, _Args}) -> - timer:sleep(1), true. postcondition(_State, {call, emqx_sys, info, []}, Info) ->