From da4d71917fcfc04dee83addb2767cbed80906fee Mon Sep 17 00:00:00 2001 From: tigercl Date: Tue, 29 Sep 2020 14:11:51 +0800 Subject: [PATCH] fix(ignore-loop-deliver): fix issue#3738 (#3741) --- src/emqx.appup.src | 12 ++++++++ src/emqx_alarm.erl | 8 ++--- src/emqx_channel.erl | 59 ++++++++++++++++++++----------------- src/emqx_session.erl | 2 +- test/emqx_channel_SUITE.erl | 15 +++++----- 5 files changed, 57 insertions(+), 39 deletions(-) create mode 100644 src/emqx.appup.src diff --git a/src/emqx.appup.src b/src/emqx.appup.src new file mode 100644 index 000000000..c8a48d544 --- /dev/null +++ b/src/emqx.appup.src @@ -0,0 +1,12 @@ +{"4.2.1", + [ + {"4.2.0", [ + {load_module, emqx_channel, brutal_purge, soft_purge, []} + ]} + ], + [ + {"4.2.0", [ + {load_module, emqx_channel, brutal_purge, soft_purge, []} + ]} + ] +}. \ No newline at end of file diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 510de9a4c..37d6fe1e8 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -200,10 +200,10 @@ handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions, ok end, Alarm = #deactivated_alarm{activate_at = ActivateAt, - name = Name, - details = Details, - message = Message, - deactivate_at = erlang:system_time(microsecond)}, + name = Name, + details = Details, + message = Message, + deactivate_at = erlang:system_time(microsecond)}, mnesia:dirty_delete(?ACTIVATED_ALARM, Name), mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm), do_actions(deactivate, Alarm, Actions), diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2a3e35ccd..289ed50f0 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -648,17 +648,21 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}). handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, - session = Session}) -> - NSession = emqx_session:enqueue(maybe_nack(Delivers), Session), + session = Session, + clientinfo = #{clientid := ClientId}}) -> + NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), {ok, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel = #channel{takeover = true, - pendings = Pendings}) -> - NPendings = lists:append(Pendings, maybe_nack(Delivers)), + pendings = Pendings, + session = Session, + clientinfo = #{clientid := ClientId}}) -> + NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; -handle_deliver(Delivers, Channel = #channel{session = Session}) -> - case emqx_session:deliver(Delivers, Session) of +handle_deliver(Delivers, Channel = #channel{session = Session, + clientinfo = #{clientid := ClientId}}) -> + case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); @@ -666,6 +670,19 @@ handle_deliver(Delivers, Channel = #channel{session = Session}) -> {ok, Channel#channel{session = NSession}} end. +ignore_local(Delivers, Subscriber, Session) -> + Subs = emqx_session:info(subscriptions, Session), + lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> + case maps:find(Topic, Subs) of + {ok, #{nl := 1}} when Subscriber =:= Publisher -> + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'), + true; + _ -> + false + end + end, Delivers). + %% Nack delivers from shared subscription maybe_nack(Delivers) -> lists:filter(fun not_nacked/1, Delivers). @@ -782,22 +799,15 @@ do_deliver({pubrel, PacketId}, Channel) -> do_deliver({PacketId, Msg}, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}}) -> - case ignore_local(Msg, ClientInfo) of - true -> - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'), - {[], Channel}; - false -> - ok = emqx_metrics:inc('messages.delivered'), - Msg1 = emqx_hooks:run_fold('message.delivered', - [ClientInfo], - emqx_message:update_expiry(Msg) - ), - Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), - Packet = emqx_message:to_packet(PacketId, Msg2), - {NPacket, NChannel} = packing_alias(Packet, Channel), - {[NPacket], NChannel} - end; + ok = emqx_metrics:inc('messages.delivered'), + Msg1 = emqx_hooks:run_fold('message.delivered', + [ClientInfo], + emqx_message:update_expiry(Msg) + ), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + Packet = emqx_message:to_packet(PacketId, Msg2), + {NPacket, NChannel} = packing_alias(Packet, Channel), + {[NPacket], NChannel}; do_deliver([Publish], Channel) -> do_deliver(Publish, Channel); @@ -810,11 +820,6 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> end, {[], Channel}, Publishes), {lists:reverse(Packets), NChannel}. -ignore_local(#message{flags = #{nl := true}, from = ClientId}, - #{clientid := ClientId}) -> - true; -ignore_local(_Msg, _ClientInfo) -> false. - %%-------------------------------------------------------------------- %% Handle out suback %%-------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 88664e028..e89943f2a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -429,7 +429,7 @@ deliver(Delivers, Session) -> deliver([], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver([Msg|More], Acc, Session) -> +deliver([Msg | More], Acc, Session) -> case deliver_msg(Msg, Session) of {ok, Session1} -> deliver(More, Acc, Session1); diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 3f282ba6a..c90c7510c 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -417,6 +417,14 @@ t_handle_deliver(_) -> {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()), ?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]). +t_handle_deliver_nl(_) -> + ClientInfo = clientinfo(#{clientid => <<"clientid">>}), + Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}), + Channel = channel(#{clientinfo => ClientInfo, session => Session}), + Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), + NMsg = emqx_message:set_flag(nl, Msg), + {ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel). + %%-------------------------------------------------------------------- %% Test cases for handle_out %%-------------------------------------------------------------------- @@ -434,13 +442,6 @@ t_handle_out_publish_1(_) -> {ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan} = emqx_channel:handle_out(publish, [{1, Msg}], channel()). -t_handle_out_publish_nl(_) -> - ClientInfo = clientinfo(#{clientid => <<"clientid">>}), - Channel = channel(#{clientinfo => ClientInfo}), - Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), - Pubs = [{1, emqx_message:set_flag(nl, Msg)}], - {ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). - t_handle_out_connack_sucess(_) -> {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),