From 48cf089870804fe99febb0e21762d296c03c83f4 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 28 Apr 2023 21:21:04 +0200 Subject: [PATCH 1/3] fix(mqtt): drop all local messages in session deliver --- apps/emqx/src/emqx_session.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8b15340e9..25bee629e 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -291,16 +291,16 @@ stats(Session) -> info(?STATS_KEYS, Session). ignore_local(ClientInfo, Delivers, Subscriber, Session) -> Subs = info(subscriptions, Session), - lists:dropwhile( + lists:filter( fun({deliver, Topic, #message{from = Publisher} = Msg}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.no_local'), - true; + false; _ -> - false + true end end, Delivers From 8545d3d4a7e5e750a131c902abfffba4faca206c Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 3 May 2023 12:18:30 +0200 Subject: [PATCH 2/3] test: subscribe with no_local, mixed pub from different clients --- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index d3de74f72..fe608f600 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -829,6 +829,42 @@ t_subscribe_no_local(Config) -> ?assertEqual(1, length(receive_messages(2))), ok = emqtt:disconnect(Client1). +t_subscribe_no_local_mixed(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = nth(1, ?TOPICS), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), + + %% Given tow clients and client1 subscribe to topic with 'no local' set to true + {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), + + %% When mixed publish traffic are sent from both clients (Client1 sent 6 and Client2 sent 2) + CB = {fun emqtt:sync_publish_result/3, [self(), async_res]}, + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed1">>, 0, CB), + ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed2">>, 0, CB), + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed3">>, 0, CB), + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed4">>, 0, CB), + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed5">>, 0, CB), + ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed6">>, 0, CB), + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed7">>, 0, CB), + ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed8">>, 0, CB), + [ + receive + {async_res, Res} -> ?assertEqual(ok, Res) + end + || _ <- lists:seq(1, 8) + ], + + %% Then only two messages from clients 2 are received + PubRecvd = receive_messages(9), + ct:pal("~p", [PubRecvd]), + ?assertEqual(2, length(PubRecvd)), + ok = emqtt:disconnect(Client1), + ok = emqtt:disconnect(Client2). + t_subscribe_actions(Config) -> ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), From fc46b81fb1de2b3ec75ae3e88bdf4fb2c523c55c Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 5 May 2023 11:00:00 +0200 Subject: [PATCH 3/3] docs: change log for #10563 --- changes/ce/fix-10563.en.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/ce/fix-10563.en.md diff --git a/changes/ce/fix-10563.en.md b/changes/ce/fix-10563.en.md new file mode 100644 index 000000000..f902fb57b --- /dev/null +++ b/changes/ce/fix-10563.en.md @@ -0,0 +1,2 @@ +Corrected an issue where the no_local flag was not functioning correctly. +