Add handling of retain handling subscription option

This commit is contained in:
周子博 2018-09-07 13:50:12 +08:00
parent d819ec0b58
commit f8471afb97
2 changed files with 12 additions and 18 deletions

View File

@ -359,9 +359,15 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session =
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint}) ->
PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters]
end
end,
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
@ -490,10 +496,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);
deliver({puback, PacketId, ReasonCode}, PState) ->
@ -741,18 +747,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
lists:map(fun emqx_topic:parse/1, RawTopicFilters).
%%-----------------------------------------------------------------------------
%% The retained flag should be propagated for bridge.
%%-----------------------------------------------------------------------------
clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) ->
case maps:get(retained, Headers, false) of
true -> Msg;
false -> emqx_message:set_flag(retain, false, Msg)
end;
clean_retain(_IsBridge, Msg) ->
Msg.
%%------------------------------------------------------------------------------
%% Update mountpoint

View File

@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),