diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6dcd9aba1..4ddce1067 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -57,7 +57,6 @@ will_topic, will_msg, keepalive, - mountpoint, is_bridge, enable_ban, enable_acl, @@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF clean_start = false, topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_acl = emqx_zone:get_env(Zone, enable_acl), @@ -153,7 +151,6 @@ attrs(#pstate{zone = Zone, proto_ver = ProtoVer, proto_name = ProtoName, keepalive = Keepalive, - mountpoint = Mountpoint, is_bridge = IsBridge, connected_at = ConnectedAt, conn_mod = ConnMod, @@ -167,7 +164,6 @@ attrs(#pstate{zone = Zone, {proto_name, ProtoName}, {clean_start, CleanStart}, {keepalive, Keepalive}, - {mountpoint, Mountpoint}, {is_bridge, IsBridge}, {connected_at, ConnectedAt}, {conn_mod, ConnMod}, @@ -202,8 +198,6 @@ caps(#pstate{zone = Zone}) -> client_id(#pstate{client_id = ClientId}) -> ClientId. -credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 -> - Credentials; credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, @@ -212,7 +206,8 @@ credentials(#pstate{zone = Zone, with_cert(#{zone => Zone, client_id => ClientId, username => Username, - peername => Peername}, Peercert). + peername => Peername, + mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; with_cert(Credentials, Peercert) -> @@ -481,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint, - proto_ver = ProtoVer, is_bridge = IsBridge, - ignore_loop = IgnoreLoop}) -> - RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> - IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, - case IsBridge of - true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; - false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] - end; - true -> - RawTopicFilters - end, - case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of + PState = #pstate{session = SPid, credentials = Credentials}) -> + case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of {ok, TopicFilters} -> - TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters), - ok = emqx_session:subscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(Mountpoint, TopicFilters0)), + TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), + TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), + ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1), {ok, PState}; {error, TopicFilters} -> {SubTopics, ReasonCodes} = @@ -518,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), end; process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)], + PState = #pstate{session = SPid, credentials = Credentials}) -> + TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), ok = emqx_session:unsubscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(MountPoint, TopicFilters)), + emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)), {ok, PState}; process(?PACKET(?PINGREQ), PState) -> @@ -550,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) -> %% ConnAck --> Client %%------------------------------------------------------------------------------ -connack({?RC_SUCCESS, SP, PState}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), - deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); +connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]), + deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); -connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), +connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]), [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer), _ = deliver({connack, ReasonCode1}, PState), {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. @@ -565,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - Msg = emqx_mountpoint:mount(MountPoint, - emqx_packet:to_message(credentials(PState), Packet)), + PState = #pstate{session = SPid, credentials = Credentials}) -> + Msg = emqx_mountpoint:mount(mountpoint(Credentials), + emqx_packet:to_message(Credentials, Packet)), puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). %%------------------------------------------------------------------------------ @@ -663,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), +deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) -> + Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg), Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -830,8 +813,8 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl -> ok; -check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of +check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, WillTopic) of allow -> ok; deny -> ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]), @@ -850,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} end. @@ -877,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> {ok, TopicFilters}; -check_sub_acl(TopicFilters, PState) -> +check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) -> lists:foldr( fun({Topic, SubOpts}, {Ok, Acc}) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} @@ -912,9 +895,9 @@ terminate(conflict, _PState) -> ok; terminate(discard, _PState) -> ok; -terminate(Reason, PState) -> +terminate(Reason, #pstate{credentials = Credentials}) -> ?LOG(info, "Shutdown for ~p", [Reason]), - ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). + ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> ignore; @@ -932,14 +915,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%------------------------------------------------------------------------------ -%% Update mountpoint - -update_mountpoint(PState = #pstate{mountpoint = undefined}) -> - PState; -update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> - PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}. - sp(true) -> 1; sp(false) -> 0. @@ -986,3 +961,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. + +raw_topic_filters(#pstate{proto_ver = ProtoVer, + is_bridge = IsBridge, + ignore_loop = IgnoreLoop}, RawTopicFilters) -> + case ProtoVer < ?MQTT_PROTO_V5 of + true -> + IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] + end; + false -> + RawTopicFilters + end. + +mountpoint(Credentials) -> + maps:get(mountpoint, Credentials, undefined).