From f8471afb97a1aa6e97e3d1406880ae3d9268ef0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 13:50:12 +0800 Subject: [PATCH] Add handling of retain handling subscription option --- src/emqx_protocol.erl | 26 ++++++++++---------------- src/emqx_session.erl | 4 ++-- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4f4f4bfb8..364cc0ec1 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -359,9 +359,15 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint}) -> + PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) -> + RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] + end + end, case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of + parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of {ok, TopicFilters} -> case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of {ok, TopicFilters1} -> @@ -490,10 +496,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> +deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg), - Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -741,18 +747,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%----------------------------------------------------------------------------- -%% The retained flag should be propagated for bridge. -%%----------------------------------------------------------------------------- - -clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) -> - case maps:get(retained, Headers, false) of - true -> Msg; - false -> emqx_message:set_flag(retain, false, Msg) - end; -clean_retain(_IsBridge, Msg) -> - Msg. - %%------------------------------------------------------------------------------ %% Update mountpoint diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e8f8ed249..05142297e 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), %% Why??? - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters),