From 16821490ce79d2867319a2fb7995ad698769b7df Mon Sep 17 00:00:00 2001 From: Gilbert Date: Mon, 19 Nov 2018 13:34:03 +0800 Subject: [PATCH] Fix issue#1874 (#1964) * Fix issue#1874 Prior to this change, if user use one client connect emqx with mqtt v3.1.1, the client subscribe the topic and publish message to this topic, it would receive this message itself published, this commit provide a configure option to let user ignore the message itself published. This change fix issue 1874. * Small Fix * Fix bug * Better design * Fix compile warning and improve coverage * Better design to solve the performance issue * Fix typo * Fix typo * Delete spaces in end of lines. * Do not use anonymous function * Better performance --- etc/emqx.conf | 5 +++++ priv/emqx.schema | 6 ++++++ src/emqx_broker.erl | 2 +- src/emqx_config.erl | 2 +- src/emqx_plugins.erl | 2 +- src/emqx_protocol.erl | 23 ++++++++++++++--------- src/emqx_session.erl | 5 ++++- src/emqx_sm.erl | 2 +- src/emqx_ws_connection.erl | 1 + test/emqx_broker_SUITE.erl | 2 +- test/emqx_client_SUITE.erl | 3 +-- test/emqx_mod_sup_SUITE.erl | 2 +- test/emqx_mqtt_caps_SUITE.erl | 4 ++-- test/emqx_mqtt_packet_SUITE.erl | 2 +- test/emqx_pqueue_SUITE.erl | 2 +- test/emqx_protocol_SUITE.erl | 2 +- test/emqx_session_SUITE.erl | 17 ++++++++++++++++- test/emqx_sm_SUITE.erl | 13 ++++++------- test/emqx_vm_SUITE.erl | 8 ++++---- 19 files changed, 68 insertions(+), 35 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 33fa0ae9a..05c7f074b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -474,6 +474,11 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true +## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +## +## Value: true | false +mqtt.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index f003c37f3..652b84657 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -595,6 +595,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 1a04cf997..ff53554d4 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -201,7 +201,7 @@ aggre(Routes) -> lists:foldl( fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> [{To, Node} | Acc]; - (#route{topic = To, dest = {Group, _Node}}, Acc) -> + (#route{topic = To, dest = {Group, _Node}}, Acc) -> lists:usort([{To, Group} | Acc]) end, [], Routes). diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 435ebaea7..bab3eab56 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -49,7 +49,7 @@ populate(_App) -> %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). read(App) -> - %% TODO: + %% TODO: %% 1. Read the app.conf from etc folder %% 2. Cuttlefish to read the conf %% 3. Return the terms and schema diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index a6a04458f..44585a69e 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -228,7 +228,7 @@ find_plugin(Name) -> find_plugin(Name, list()). find_plugin(Name, Plugins) -> - lists:keyfind(Name, 2, Plugins). + lists:keyfind(Name, 2, Plugins). %% @doc UnLoad a Plugin -spec(unload(atom()) -> ok | {error, term()}). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4867cf2a5..44b0925c3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -63,7 +63,8 @@ recv_stats, send_stats, connected, - connected_at + connected_at, + ignore_loop }). -type(state() :: #pstate{}). @@ -71,6 +72,7 @@ -ifdef(TEST). -compile(export_all). +-compile(nowarn_export_all). -endif. -define(NO_PROPS, undefined). @@ -102,7 +104,8 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) enable_acl = emqx_zone:get_env(Zone, enable_acl), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, - connected = false}. + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -385,11 +388,14 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) -> + PState = #pstate{session = SPid, mountpoint = Mountpoint, + proto_ver = ProtoVer, is_bridge = IsBridge, + ignore_loop = IgnoreLoop}) -> RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> + IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, case IsBridge of - true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; - false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] + true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] end; true -> RawTopicFilters @@ -626,7 +632,6 @@ try_open_session(PState = #pstate{zone = Zone, clean_start => CleanStart, will_msg => WillMsg }, - SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> @@ -685,12 +690,12 @@ get_property(Name, Props, Default) -> maps:get(Name, Props, Default). make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, - will_props = WillProps} = Connect) -> - emqx_packet:will_msg(if + will_props = WillProps} = Connect) -> + emqx_packet:will_msg(if ProtoVer =:= ?MQTT_PROTO_V5 -> WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; - true -> + true -> Connect end). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6cfef70d2..0d7352e66 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -152,6 +152,7 @@ will_msg :: emqx:message(), will_delay_timer :: reference() | undefined + }). -type(spid() :: pid()). @@ -575,7 +576,8 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> %% Dispatch message handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, - State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> + State = #state{subscriptions = SubMap, + topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> TopicAlias = maps:get('Topic-Alias', Headers, undefined), if TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> @@ -591,6 +593,7 @@ handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, noreply(State) end; + %% Do nothing if the client has been disconnected. handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> noreply(State#state{retry_timer = undefined}); diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 55d0e26a7..8d0ac02e4 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -59,7 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, +open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 08b4a3120..759e127c1 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -269,6 +269,7 @@ websocket_info(Info, State) -> terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> + ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 7fcc2a598..e0a2555f1 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -62,7 +62,7 @@ subscribe_unsubscribe(_) -> ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), true = emqx:subscribed(<<"topic">>, <<"clientId">>), Topics = emqx:topics(), - lists:foreach(fun(Topic) -> + lists:foreach(fun(Topic) -> ?assert(lists:member(Topic, Topics)) end, Topics), ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index c7b3455ad..6de507fca 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -150,8 +150,7 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count-1, [Msg|Msgs]); - Other -> - ct:log("~p~n", [Other]), + _Other -> receive_messages(Count, Msgs) after 10 -> Msgs diff --git a/test/emqx_mod_sup_SUITE.erl b/test/emqx_mod_sup_SUITE.erl index 87245e351..8169c3f91 100644 --- a/test/emqx_mod_sup_SUITE.erl +++ b/test/emqx_mod_sup_SUITE.erl @@ -21,7 +21,7 @@ all() -> [t_child_all]. -t_child_all(_) -> +t_child_all(_) -> {ok, _Pid} = emqx_mod_sup:start_link(), {ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker), timer:sleep(10), diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 919be5218..26e343ca6 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -114,8 +114,8 @@ t_check_sub(_) -> [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), - ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, - [{<<"vlient/+/dsofi">>, Opts}], + ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, + [{<<"vlient/+/dsofi">>, Opts}], [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]), emqx_zone:stop(). diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 8bc41cb37..4386ff02e 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -63,7 +63,7 @@ all() -> [{group, connect}]. -groups() -> [{connect, [sequence], +groups() -> [{connect, [sequence], [case1_protocol_name, case2_protocol_ver%, %TOTO case3_invalid_reserved diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index e7672cb0b..9efccf472 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -94,7 +94,7 @@ t_priority_queues(_) -> PQueue6 = ?PQ:in(f, 1, PQueue5), {{value, e}, PQueue7} = ?PQ:out(PQueue6), - {empty, _} = ?PQ:out(0, ?PQ:new()), + {empty, _} = ?PQ:out(0, ?PQ:new()), {empty, Q0} = ?PQ:out_p(Q0), diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 060557c6c..3f6c4f252 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -132,7 +132,7 @@ connect_v5(_) -> raw_recv_parse(Data, ?MQTT_PROTO_V5) end), - % test clean start + % test clean start with_connection(fun(Sock) -> emqx_client_sock:send(Sock, raw_send_serialize( diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index f79b84557..5b6fc477c 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -18,9 +18,11 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("eunit/include/eunit.hrl"). + -include_lib("common_test/include/ct.hrl"). -all() -> [t_session_all]. +all() -> [ignore_loop, t_session_all]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -29,6 +31,19 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). +ignore_loop(_Config) -> + application:set_env(emqx, mqtt_ignore_loop_deliver, true), + {ok, Client} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(Client), + TestTopic = <<"Self">>, + {ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2), + ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0), + {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1), + {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2), + ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))), + ok = emqx_client:disconnect(Client), + application:set_env(emqx, mqtt_ignore_loop_deliver, false). + t_session_all(_) -> ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 2b83b6afb..3aed3090e 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -24,13 +24,13 @@ all() -> [t_open_close_session]. t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - Attrs = #{clean_start => true, - client_id => <<"client">>, + Attrs = #{clean_start => true, + client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, - username => <<"zhou">>, - expiry_interval => 0, - max_inflight => 0, + zone => internal, + username => <<"emqx">>, + expiry_interval => 0, + max_inflight => 0, topic_alias_maximum => 0, will_msg => undefined}, {ok, SPid} = emqx_sm:open_session(Attrs), @@ -47,4 +47,3 @@ t_open_close_session(_) -> ok = emqx_sm:close_session(SPid), [] = emqx_sm:lookup_session(<<"client">>), emqx_ct_broker_helpers:run_teardown_steps(). - diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index b13b949b4..91744764e 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -95,7 +95,7 @@ all() -> - [load, systeminfo, mem_info, process_list, process_info, process_gc, + [load, systeminfo, mem_info, process_list, process_info, process_gc, get_ets_list, get_ets_info, get_ets_object, get_port_types, get_port_info, scheduler_usage, get_memory, microsecs, schedulers, get_process_group_leader_info, get_process_limit]. @@ -121,13 +121,13 @@ process_list(_Config) -> true = lists:member({pid, Pid}, lists:concat(ProcessInfo)). process_info(_Config) -> - ProcessInfos = emqx_vm:get_process_info(), + ProcessInfos = emqx_vm:get_process_info(), ProcessInfo = lists:last(ProcessInfos), Keys = [K || {K, _V}<- ProcessInfo], ?PROCESS_INFO = Keys. process_gc(_Config) -> - ProcessGcs = emqx_vm:get_process_gc(), + ProcessGcs = emqx_vm:get_process_gc(), ProcessGc = lists:last(ProcessGcs), Keys = [K || {K, _V}<- ProcessGc], ?PROCESS_GC = Keys. @@ -137,7 +137,7 @@ get_ets_list(_Config) -> Ets = emqx_vm:get_ets_list(), true = lists:member(test, Ets). -get_ets_info(_Config) -> +get_ets_info(_Config) -> ets:new(test, [named_table]), [] = emqx_vm:get_ets_info(test1), EtsInfo = emqx_vm:get_ets_info(test),