diff --git a/etc/emqx.conf b/etc/emqx.conf index 33fa0ae9a..05c7f074b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -474,6 +474,11 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true +## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +## +## Value: true | false +mqtt.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index f003c37f3..652b84657 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -595,6 +595,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 1a04cf997..ff53554d4 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -201,7 +201,7 @@ aggre(Routes) -> lists:foldl( fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> [{To, Node} | Acc]; - (#route{topic = To, dest = {Group, _Node}}, Acc) -> + (#route{topic = To, dest = {Group, _Node}}, Acc) -> lists:usort([{To, Group} | Acc]) end, [], Routes). diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 435ebaea7..bab3eab56 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -49,7 +49,7 @@ populate(_App) -> %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). read(App) -> - %% TODO: + %% TODO: %% 1. Read the app.conf from etc folder %% 2. Cuttlefish to read the conf %% 3. Return the terms and schema diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index a6a04458f..44585a69e 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -228,7 +228,7 @@ find_plugin(Name) -> find_plugin(Name, list()). find_plugin(Name, Plugins) -> - lists:keyfind(Name, 2, Plugins). + lists:keyfind(Name, 2, Plugins). %% @doc UnLoad a Plugin -spec(unload(atom()) -> ok | {error, term()}). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4867cf2a5..44b0925c3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -63,7 +63,8 @@ recv_stats, send_stats, connected, - connected_at + connected_at, + ignore_loop }). -type(state() :: #pstate{}). @@ -71,6 +72,7 @@ -ifdef(TEST). -compile(export_all). +-compile(nowarn_export_all). -endif. -define(NO_PROPS, undefined). @@ -102,7 +104,8 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) enable_acl = emqx_zone:get_env(Zone, enable_acl), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, - connected = false}. + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -385,11 +388,14 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) -> + PState = #pstate{session = SPid, mountpoint = Mountpoint, + proto_ver = ProtoVer, is_bridge = IsBridge, + ignore_loop = IgnoreLoop}) -> RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> + IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, case IsBridge of - true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; - false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] + true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] end; true -> RawTopicFilters @@ -626,7 +632,6 @@ try_open_session(PState = #pstate{zone = Zone, clean_start => CleanStart, will_msg => WillMsg }, - SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> @@ -685,12 +690,12 @@ get_property(Name, Props, Default) -> maps:get(Name, Props, Default). make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, - will_props = WillProps} = Connect) -> - emqx_packet:will_msg(if + will_props = WillProps} = Connect) -> + emqx_packet:will_msg(if ProtoVer =:= ?MQTT_PROTO_V5 -> WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; - true -> + true -> Connect end). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6cfef70d2..0d7352e66 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -152,6 +152,7 @@ will_msg :: emqx:message(), will_delay_timer :: reference() | undefined + }). -type(spid() :: pid()). @@ -575,7 +576,8 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> %% Dispatch message handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, - State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> + State = #state{subscriptions = SubMap, + topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> TopicAlias = maps:get('Topic-Alias', Headers, undefined), if TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> @@ -591,6 +593,7 @@ handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, noreply(State) end; + %% Do nothing if the client has been disconnected. handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> noreply(State#state{retry_timer = undefined}); diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 55d0e26a7..8d0ac02e4 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -59,7 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, +open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 08b4a3120..759e127c1 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -269,6 +269,7 @@ websocket_info(Info, State) -> terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> + ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 7fcc2a598..e0a2555f1 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -62,7 +62,7 @@ subscribe_unsubscribe(_) -> ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), true = emqx:subscribed(<<"topic">>, <<"clientId">>), Topics = emqx:topics(), - lists:foreach(fun(Topic) -> + lists:foreach(fun(Topic) -> ?assert(lists:member(Topic, Topics)) end, Topics), ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index c7b3455ad..6de507fca 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -150,8 +150,7 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count-1, [Msg|Msgs]); - Other -> - ct:log("~p~n", [Other]), + _Other -> receive_messages(Count, Msgs) after 10 -> Msgs diff --git a/test/emqx_mod_sup_SUITE.erl b/test/emqx_mod_sup_SUITE.erl index 87245e351..8169c3f91 100644 --- a/test/emqx_mod_sup_SUITE.erl +++ b/test/emqx_mod_sup_SUITE.erl @@ -21,7 +21,7 @@ all() -> [t_child_all]. -t_child_all(_) -> +t_child_all(_) -> {ok, _Pid} = emqx_mod_sup:start_link(), {ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker), timer:sleep(10), diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 919be5218..26e343ca6 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -114,8 +114,8 @@ t_check_sub(_) -> [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), - ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, - [{<<"vlient/+/dsofi">>, Opts}], + ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, + [{<<"vlient/+/dsofi">>, Opts}], [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]), emqx_zone:stop(). diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 8bc41cb37..4386ff02e 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -63,7 +63,7 @@ all() -> [{group, connect}]. -groups() -> [{connect, [sequence], +groups() -> [{connect, [sequence], [case1_protocol_name, case2_protocol_ver%, %TOTO case3_invalid_reserved diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index e7672cb0b..9efccf472 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -94,7 +94,7 @@ t_priority_queues(_) -> PQueue6 = ?PQ:in(f, 1, PQueue5), {{value, e}, PQueue7} = ?PQ:out(PQueue6), - {empty, _} = ?PQ:out(0, ?PQ:new()), + {empty, _} = ?PQ:out(0, ?PQ:new()), {empty, Q0} = ?PQ:out_p(Q0), diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 060557c6c..3f6c4f252 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -132,7 +132,7 @@ connect_v5(_) -> raw_recv_parse(Data, ?MQTT_PROTO_V5) end), - % test clean start + % test clean start with_connection(fun(Sock) -> emqx_client_sock:send(Sock, raw_send_serialize( diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index f79b84557..5b6fc477c 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -18,9 +18,11 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("eunit/include/eunit.hrl"). + -include_lib("common_test/include/ct.hrl"). -all() -> [t_session_all]. +all() -> [ignore_loop, t_session_all]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -29,6 +31,19 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). +ignore_loop(_Config) -> + application:set_env(emqx, mqtt_ignore_loop_deliver, true), + {ok, Client} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(Client), + TestTopic = <<"Self">>, + {ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2), + ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0), + {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1), + {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2), + ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))), + ok = emqx_client:disconnect(Client), + application:set_env(emqx, mqtt_ignore_loop_deliver, false). + t_session_all(_) -> ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 2b83b6afb..3aed3090e 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -24,13 +24,13 @@ all() -> [t_open_close_session]. t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - Attrs = #{clean_start => true, - client_id => <<"client">>, + Attrs = #{clean_start => true, + client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, - username => <<"zhou">>, - expiry_interval => 0, - max_inflight => 0, + zone => internal, + username => <<"emqx">>, + expiry_interval => 0, + max_inflight => 0, topic_alias_maximum => 0, will_msg => undefined}, {ok, SPid} = emqx_sm:open_session(Attrs), @@ -47,4 +47,3 @@ t_open_close_session(_) -> ok = emqx_sm:close_session(SPid), [] = emqx_sm:lookup_session(<<"client">>), emqx_ct_broker_helpers:run_teardown_steps(). - diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index b13b949b4..91744764e 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -95,7 +95,7 @@ all() -> - [load, systeminfo, mem_info, process_list, process_info, process_gc, + [load, systeminfo, mem_info, process_list, process_info, process_gc, get_ets_list, get_ets_info, get_ets_object, get_port_types, get_port_info, scheduler_usage, get_memory, microsecs, schedulers, get_process_group_leader_info, get_process_limit]. @@ -121,13 +121,13 @@ process_list(_Config) -> true = lists:member({pid, Pid}, lists:concat(ProcessInfo)). process_info(_Config) -> - ProcessInfos = emqx_vm:get_process_info(), + ProcessInfos = emqx_vm:get_process_info(), ProcessInfo = lists:last(ProcessInfos), Keys = [K || {K, _V}<- ProcessInfo], ?PROCESS_INFO = Keys. process_gc(_Config) -> - ProcessGcs = emqx_vm:get_process_gc(), + ProcessGcs = emqx_vm:get_process_gc(), ProcessGc = lists:last(ProcessGcs), Keys = [K || {K, _V}<- ProcessGc], ?PROCESS_GC = Keys. @@ -137,7 +137,7 @@ get_ets_list(_Config) -> Ets = emqx_vm:get_ets_list(), true = lists:member(test, Ets). -get_ets_info(_Config) -> +get_ets_info(_Config) -> ets:new(test, [named_table]), [] = emqx_vm:get_ets_info(test1), EtsInfo = emqx_vm:get_ets_info(test),