From aa34258f1e36a2e85eb48f09d06c06b065f5767d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 5 Sep 2018 14:25:33 +0800 Subject: [PATCH] Support Retain As Published in Subscription Options --- src/emqx_session.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 75c3ac197..e7e63763c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> %% Dispatch message handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> noreply(case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); error -> dispatch(emqx_message:unset_flag(dup, Msg), State) end); @@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{rap, false}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> + Flags1 = maps:put(retain, false, Flags), + run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State); +run_dispatch_steps([{rap, _}|Steps], Msg, State) -> + run_dispatch_steps(Steps, Msg, State); run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).