Support Retain As Published in Subscription Options

This commit is contained in:
周子博 2018-09-05 14:25:33 +08:00
parent b34a4efd8b
commit aa34258f1e
1 changed files with 9 additions and 4 deletions

View File

@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
%% Dispatch message
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
noreply(case maps:find(Topic, SubMap) of
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
error ->
dispatch(emqx_message:unset_flag(dup, Msg), State)
end);
@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State =
run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State);
run_dispatch_steps([{rap, false}|Steps], Msg = #message{flags = Flags}, State = #state{}) ->
Flags1 = maps:put(retain, false, Flags),
run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State);
run_dispatch_steps([{rap, _}|Steps], Msg, State) ->
run_dispatch_steps(Steps, Msg, State);
run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).