Fix issue#1874 (#1964)

* Fix issue#1874
Prior to this change, if user use one client connect emqx with mqtt
v3.1.1, the client subscribe the topic and publish message to this
topic, it would receive this message itself published, this commit
provide a configure option to let user ignore the message itself published.

This change fix issue 1874.

* Small Fix

* Fix bug

* Better design

* Fix compile warning and improve coverage

* Better design to solve the performance issue

* Fix typo

* Fix typo

* Delete spaces in end of lines.

* Do not use anonymous function

* Better performance
This commit is contained in:
Gilbert 2018-11-19 13:34:03 +08:00 committed by turtleDeng
parent bc1464a33f
commit 16821490ce
19 changed files with 68 additions and 35 deletions

View File

@ -474,6 +474,11 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
mqtt.ignore_loop_deliver = false
##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------

View File

@ -595,6 +595,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------

View File

@ -201,7 +201,7 @@ aggre(Routes) ->
lists:foldl(
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
[{To, Node} | Acc];
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
lists:usort([{To, Group} | Acc])
end, [], Routes).

View File

@ -49,7 +49,7 @@ populate(_App) ->
%% @doc Read the configuration of an application.
-spec(read(atom()) -> {ok, list(env())} | {error, term()}).
read(App) ->
%% TODO:
%% TODO:
%% 1. Read the app.conf from etc folder
%% 2. Cuttlefish to read the conf
%% 3. Return the terms and schema

View File

@ -228,7 +228,7 @@ find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
lists:keyfind(Name, 2, Plugins).
%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).

View File

@ -63,7 +63,8 @@
recv_stats,
send_stats,
connected,
connected_at
connected_at,
ignore_loop
}).
-type(state() :: #pstate{}).
@ -71,6 +72,7 @@
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-define(NO_PROPS, undefined).
@ -102,7 +104,8 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false}.
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.
init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
@ -385,11 +388,14 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session =
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) ->
PState = #pstate{session = SPid, mountpoint = Mountpoint,
proto_ver = ProtoVer, is_bridge = IsBridge,
ignore_loop = IgnoreLoop}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters]
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
true ->
RawTopicFilters
@ -626,7 +632,6 @@ try_open_session(PState = #pstate{zone = Zone,
clean_start => CleanStart,
will_msg => WillMsg
},
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
case emqx_sm:open_session(SessAttrs1) of
{ok, SPid} ->
@ -685,12 +690,12 @@ get_property(Name, Props, Default) ->
maps:get(Name, Props, Default).
make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
will_props = WillProps} = Connect) ->
emqx_packet:will_msg(if
will_props = WillProps} = Connect) ->
emqx_packet:will_msg(if
ProtoVer =:= ?MQTT_PROTO_V5 ->
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
true ->
true ->
Connect
end).

View File

@ -152,6 +152,7 @@
will_msg :: emqx:message(),
will_delay_timer :: reference() | undefined
}).
-type(spid() :: pid()).
@ -575,7 +576,8 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
%% Dispatch message
handle_info({dispatch, Topic, Msg = #message{headers = Headers}},
State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) ->
State = #state{subscriptions = SubMap,
topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) ->
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
if
TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum ->
@ -591,6 +593,7 @@ handle_info({dispatch, Topic, Msg = #message{headers = Headers}},
noreply(State)
end;
%% Do nothing if the client has been disconnected.
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
noreply(State#state{retry_timer = undefined});

View File

@ -59,7 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
end,
emqx_sm_locker:trans(ClientId, CleanStart);
open_session(SessAttrs = #{clean_start := false,
open_session(SessAttrs = #{clean_start := false,
client_id := ClientId}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, SessAttrs) of

View File

@ -269,6 +269,7 @@ websocket_info(Info, State) ->
terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError]),
emqx_keepalive:cancel(Keepalive),

View File

@ -62,7 +62,7 @@ subscribe_unsubscribe(_) ->
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
true = emqx:subscribed(<<"topic">>, <<"clientId">>),
Topics = emqx:topics(),
lists:foreach(fun(Topic) ->
lists:foreach(fun(Topic) ->
?assert(lists:member(Topic, Topics))
end, Topics),
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),

View File

@ -150,8 +150,7 @@ receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
Other ->
ct:log("~p~n", [Other]),
_Other ->
receive_messages(Count, Msgs)
after 10 ->
Msgs

View File

@ -21,7 +21,7 @@
all() -> [t_child_all].
t_child_all(_) ->
t_child_all(_) ->
{ok, _Pid} = emqx_mod_sup:start_link(),
{ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker),
timer:sleep(10),

View File

@ -114,8 +114,8 @@ t_check_sub(_) ->
[{<<"client/stat">>, Opts}],
[{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
[{<<"vlient/+/dsofi">>, Opts}],
ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
[{<<"vlient/+/dsofi">>, Opts}],
[{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
emqx_zone:stop().

View File

@ -63,7 +63,7 @@
all() -> [{group, connect}].
groups() -> [{connect, [sequence],
groups() -> [{connect, [sequence],
[case1_protocol_name,
case2_protocol_ver%,
%TOTO case3_invalid_reserved

View File

@ -94,7 +94,7 @@ t_priority_queues(_) ->
PQueue6 = ?PQ:in(f, 1, PQueue5),
{{value, e}, PQueue7} = ?PQ:out(PQueue6),
{empty, _} = ?PQ:out(0, ?PQ:new()),
{empty, _} = ?PQ:out(0, ?PQ:new()),
{empty, Q0} = ?PQ:out_p(Q0),

View File

@ -132,7 +132,7 @@ connect_v5(_) ->
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
% test clean start
% test clean start
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(

View File

@ -18,9 +18,11 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> [t_session_all].
all() -> [ignore_loop, t_session_all].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
@ -29,6 +31,19 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
ignore_loop(_Config) ->
application:set_env(emqx, mqtt_ignore_loop_deliver, true),
{ok, Client} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(Client),
TestTopic = <<"Self">>,
{ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2),
ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0),
{ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1),
{ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2),
?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))),
ok = emqx_client:disconnect(Client),
application:set_env(emqx, mqtt_ignore_loop_deliver, false).
t_session_all(_) ->
ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),

View File

@ -24,13 +24,13 @@ all() -> [t_open_close_session].
t_open_close_session(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true,
client_id => <<"client">>,
Attrs = #{clean_start => true,
client_id => <<"client">>,
conn_pid => ClientPid,
zone => internal,
username => <<"zhou">>,
expiry_interval => 0,
max_inflight => 0,
zone => internal,
username => <<"emqx">>,
expiry_interval => 0,
max_inflight => 0,
topic_alias_maximum => 0,
will_msg => undefined},
{ok, SPid} = emqx_sm:open_session(Attrs),
@ -47,4 +47,3 @@ t_open_close_session(_) ->
ok = emqx_sm:close_session(SPid),
[] = emqx_sm:lookup_session(<<"client">>),
emqx_ct_broker_helpers:run_teardown_steps().

View File

@ -95,7 +95,7 @@
all() ->
[load, systeminfo, mem_info, process_list, process_info, process_gc,
[load, systeminfo, mem_info, process_list, process_info, process_gc,
get_ets_list, get_ets_info, get_ets_object, get_port_types, get_port_info,
scheduler_usage, get_memory, microsecs, schedulers, get_process_group_leader_info,
get_process_limit].
@ -121,13 +121,13 @@ process_list(_Config) ->
true = lists:member({pid, Pid}, lists:concat(ProcessInfo)).
process_info(_Config) ->
ProcessInfos = emqx_vm:get_process_info(),
ProcessInfos = emqx_vm:get_process_info(),
ProcessInfo = lists:last(ProcessInfos),
Keys = [K || {K, _V}<- ProcessInfo],
?PROCESS_INFO = Keys.
process_gc(_Config) ->
ProcessGcs = emqx_vm:get_process_gc(),
ProcessGcs = emqx_vm:get_process_gc(),
ProcessGc = lists:last(ProcessGcs),
Keys = [K || {K, _V}<- ProcessGc],
?PROCESS_GC = Keys.
@ -137,7 +137,7 @@ get_ets_list(_Config) ->
Ets = emqx_vm:get_ets_list(),
true = lists:member(test, Ets).
get_ets_info(_Config) ->
get_ets_info(_Config) ->
ets:new(test, [named_table]),
[] = emqx_vm:get_ets_info(test1),
EtsInfo = emqx_vm:get_ets_info(test),