diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 12bb008ac..e5298656d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -38,44 +38,47 @@ , terminate/2 ]). +-import(emqx_access_control, + [ authenticate/1 + , check_acl/3 + ]). + -export_type([proto_state/0]). -record(protocol, { - proto_name :: binary(), - proto_ver :: emqx_types:version(), client :: emqx_types:client(), session :: emqx_session:session(), + proto_name :: binary(), + proto_ver :: emqx_types:ver(), keepalive :: non_neg_integer(), will_msg :: emqx_types:message(), - enable_acl :: boolean(), topic_aliases :: maybe(map()), - alias_maximum :: maybe(map()) + alias_maximum :: maybe(map()), + ack_props :: maybe(emqx_types:properties()) %% Tmp props }). -opaque(proto_state() :: #protocol{}). +-define(NO_PROPS, undefined). + -spec(info(proto_state()) -> emqx_types:infos()). -info(#protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - client = Client, +info(#protocol{client = Client, session = Session, + proto_name = ProtoName, + proto_ver = ProtoVer, keepalive = Keepalive, will_msg = WillMsg, topic_aliases = Aliases}) -> - #{proto_name => ProtoName, - proto_ver => ProtoVer, - client => Client, + #{client => Client, session => emqx_session:info(Session), + proto_name => ProtoName, + proto_ver => ProtoVer, keepalive => Keepalive, will_msg => WillMsg, topic_aliases => Aliases }. -spec(info(atom(), proto_state()) -> term()). -info(proto_name, #protocol{proto_name = ProtoName}) -> - ProtoName; -info(proto_ver, #protocol{proto_ver = ProtoVer}) -> - ProtoVer; info(client, #protocol{client = Client}) -> Client; info(zone, #protocol{client = #{zone := Zone}}) -> @@ -84,53 +87,54 @@ info(client_id, #protocol{client = #{client_id := ClientId}}) -> ClientId; info(session, #protocol{session = Session}) -> Session; +info(proto_name, #protocol{proto_name = ProtoName}) -> + ProtoName; +info(proto_ver, #protocol{proto_ver = ProtoVer}) -> + ProtoVer; info(keepalive, #protocol{keepalive = Keepalive}) -> Keepalive; info(topic_aliases, #protocol{topic_aliases = Aliases}) -> Aliases. -attrs(#protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - client = Client, +attrs(#protocol{client = Client, session = Session, + proto_name = ProtoName, + proto_ver = ProtoVer, keepalive = Keepalive}) -> - #{proto_name => ProtoName, - proto_ver => ProtoVer, - client => Client, + #{client => Client, session => emqx_session:attrs(Session), + proto_name => ProtoName, + proto_ver => ProtoVer, keepalive => Keepalive }. caps(#protocol{client = #{zone := Zone}}) -> emqx_mqtt_caps:get_caps(Zone). --spec(init(map(), proplists:proplist()) -> proto_state()). +-spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()). init(ConnInfo, Options) -> Zone = proplists:get_value(zone, Options), Peercert = maps:get(peercert, ConnInfo, undefined), - Username = peer_cert_as_username(Peercert, Options), - Mountpoint = emqx_zone:get_env(Zone, mountpoint), - EnableAcl = emqx_zone:get_env(Zone, enable_acl, true), + Username = case peer_cert_as_username(Options) of + cn -> esockd_peercert:common_name(Peercert); + dn -> esockd_peercert:subject(Peercert); + crt -> Peercert; + _ -> undefined + end, + MountPoint = emqx_zone:get_env(Zone, mountpoint), Client = maps:merge(#{zone => Zone, username => Username, - mountpoint => Mountpoint, + mountpoint => MountPoint, is_bridge => false, is_superuser => false }, ConnInfo), - #protocol{proto_ver = ?MQTT_PROTO_V4, + #protocol{client = Client, proto_name = <<"MQTT">>, - client = Client, - %%mountfun = MountFun, - enable_acl = EnableAcl + proto_ver = ?MQTT_PROTO_V4 }. -peer_cert_as_username(Peercert, Options) -> - case proplists:get_value(peer_cert_as_username, Options) of - cn -> esockd_peercert:common_name(Peercert); - dn -> esockd_peercert:subject(Peercert); - crt -> Peercert; - _ -> undefined - end. +peer_cert_as_username(Options) -> + proplists:get_value(peer_cert_as_username, Options). %%-------------------------------------------------------------------- %% Handle incoming packet @@ -139,40 +143,49 @@ peer_cert_as_username(Peercert, Options) -> -spec(handle_in(emqx_types:packet(), proto_state()) -> {ok, proto_state()} | {ok, emqx_types:packet(), proto_state()} + | {ok, list(emqx_types:packet()), proto_state()} | {error, Reason :: term(), proto_state()} | {stop, Error :: atom(), proto_state()}). -handle_in(?CONNECT_PACKET(#mqtt_packet_connect{client_id = ClientId} = ConnPkt), - PState) -> +handle_in(?CONNECT_PACKET( + #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + keepalive = Keepalive, + client_id = ClientId + } = ConnPkt), PState) -> + PState1 = PState#protocol{proto_name = ProtoName, + proto_ver = ProtoVer, + keepalive = Keepalive + }, ok = emqx_logger:set_metadata_client_id(ClientId), case pipeline([fun validate_in/2, - fun preprocess_props/2, + fun process_props/2, fun check_connect/2, - fun enrich_pstate/2, - fun auth_connect/2], ConnPkt, PState) of + fun enrich_client/2, + fun auth_connect/2], ConnPkt, PState1) of {ok, NConnPkt, NPState} -> - handle_connect(NConnPkt, NPState); + process_connect(NConnPkt, maybe_assign_clientid(NPState)); {error, ReasonCode, NPState} -> handle_out({disconnect, ReasonCode}, NPState) end; handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) -> case pipeline([fun validate_in/2, - fun preprocess_props/2, + fun process_alias/2, fun check_publish/2], Packet, PState) of {ok, NPacket, NPState} -> - handle_publish(NPacket, NPState); - {error, ReasonCode, PState1} -> + process_publish(NPacket, NPState); + {error, ReasonCode, NPState} -> ?LOG(warning, "Cannot publish message to ~s due to ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - handle_puback(QoS, PacketId, ReasonCode, PState1) + puback(QoS, PacketId, ReasonCode, NPState) end; handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:puback(PacketId, ReasonCode, Session) of - {ok, NSession} -> - {ok, PState#protocol{session = NSession}}; {ok, Publishes, NSession} -> handle_out({publish, Publishes}, PState#protocol{session = NSession}); + {ok, NSession} -> + {ok, PState#protocol{session = NSession}}; {error, _NotFound} -> {ok, PState} end; @@ -195,10 +208,10 @@ handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubcomp(PacketId, ReasonCode, Session) of - {ok, NSession} -> - {ok, PState#protocol{session = NSession}}; {ok, Publishes, NSession} -> handle_out({publish, Publishes}, PState#protocol{session = NSession}); + {ok, NSession} -> + {ok, PState#protocol{session = NSession}}; {error, _NotFound} -> {ok, PState} end; @@ -206,12 +219,14 @@ handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Se handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), PState = #protocol{client = Client}) -> case validate_in(Packet, PState) of - ok -> - ok = emqx_hooks:run('client.subscribe', - [Client, Properties, TopicFilters]), - TopicFilters1 = enrich_subid(Properties, TopicFilters), - {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState), - handle_out({suback, PacketId, ReasonCodes}, PState1); + ok -> TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts) + || {TopicFilter, SubOpts} <- TopicFilters], + TopicFilters2 = emqx_hooks:run_fold('client.subscribe', + [Client, Properties], + TopicFilters1), + TopicFilters3 = enrich_subid(Properties, TopicFilters2), + {ReasonCodes, NPState} = process_subscribe(TopicFilters3, PState), + handle_out({suback, PacketId, ReasonCodes}, NPState); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, PState) end; @@ -219,11 +234,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), PState = #protocol{client = Client}) -> case validate_in(Packet, PState) of - ok -> - ok = emqx_hooks:run('client.unsubscribe', - [Client, Properties, TopicFilters]), - {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState), - handle_out({unsuback, PacketId, ReasonCodes}, PState1); + ok -> TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters), + TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe', + [Client, Properties], + TopicFilters1), + {ReasonCodes, NPState} = process_unsubscribe(TopicFilters2, PState), + handle_out({unsuback, PacketId, ReasonCodes}, NPState); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, PState) end; @@ -238,7 +254,7 @@ handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> handle_in(?DISCONNECT_PACKET(RC), PState = #protocol{proto_ver = Ver}) -> %% TODO: %% {stop, {shutdown, abnormal_disconnet}, PState}; - {sto, {shutdown, emqx_reason_codes:name(RC, Ver)}, PState}; + {stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, PState}; handle_in(?AUTH_PACKET(), PState) -> %%TODO: implement later. @@ -252,45 +268,65 @@ handle_in(Packet, PState) -> %% Handle delivers %%-------------------------------------------------------------------- -handle_deliver(Delivers, PState = #protocol{client = Client, session = Session}) +handle_deliver(Delivers, PState = #protocol{session = Session}) when is_list(Delivers) -> case emqx_session:deliver(Delivers, Session) of {ok, Publishes, NSession} -> - Packets = lists:map(fun({publish, PacketId, Msg}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg), - Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1), - emqx_packet:from_message(PacketId, Msg2) - end, Publishes), - {ok, Packets, PState#protocol{session = NSession}}; + handle_out({publish, Publishes}, PState#protocol{session = NSession}); {ok, NSession} -> {ok, PState#protocol{session = NSession}} end. -%%-------------------------------------------------------------------- -%% Handle puback -%%-------------------------------------------------------------------- - -handle_puback(?QOS_0, _PacketId, ReasonCode, PState) -> - handle_out({puberr, ReasonCode}, PState); -handle_puback(?QOS_1, PacketId, ReasonCode, PState) -> - handle_out({puback, PacketId, ReasonCode}, PState); -handle_puback(?QOS_2, PacketId, ReasonCode, PState) -> - handle_out({pubrec, PacketId, ReasonCode}, PState). - %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- -handle_out({connack, ?RC_SUCCESS, SP}, PState = #protocol{client = Client}) -> - ok = emqx_hooks:run('client.connected', - [Client, ?RC_SUCCESS, info(PState)]), - Props = #{}, %% TODO: ... - {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, Props), PState}; +handle_out({connack, ?RC_SUCCESS, SP}, + PState = #protocol{client = Client = #{zone := Zone}, + ack_props = AckProps, + alias_maximum = AliasMaximum}) -> + ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(PState)]), + #{max_packet_size := MaxPktSize, + max_qos_allowed := MaxQoS, + mqtt_retain_available := Retain, + max_topic_alias := MaxAlias, + mqtt_shared_subscription := Shared, + mqtt_wildcard_subscription := Wildcard + } = caps(PState), + %% Response-Information is so far not set by broker. + %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme. + %% According to MQTT 5.0 spec: + %% A common use of this is to pass a globally unique portion of the topic tree which + %% is reserved for this Client for at least the lifetime of its Session. + %% This often cannot just be a random name as both the requesting Client and the + %% responding Client need to be authorized to use it. + %% If we are to support it in the feature, the implementation should be flexible + %% to allow prefixing the response topic based on different ACL config. + %% e.g. prefix by username or client-id, so that unauthorized clients can not + %% subscribe requests or responses that are not intended for them. + AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end, + AckProps2 = AckProps1#{'Retain-Available' => flag(Retain), + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => flag(Wildcard), + 'Subscription-Identifier-Available' => 1, + %'Response-Information' => + 'Shared-Subscription-Available' => flag(Shared), + 'Maximum-QoS' => MaxQoS + }, + AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of + undefined -> AckProps2; + Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive} + end, + AliasMaximum1 = set_property(inbound, MaxAlias, AliasMaximum), + PState1 = PState#protocol{alias_maximum = AliasMaximum1, + ack_props = undefined + }, + {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps3), PState1}; handle_out({connack, ReasonCode}, PState = #protocol{client = Client, proto_ver = ProtoVer}) -> - ok = emqx_hooks:run('client.connected', [Client, ReasonCode, info(PState)]), + ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(PState)]), ReasonCode1 = if ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode; true -> emqx_reason_codes:compat(connack, ReasonCode) @@ -298,21 +334,15 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState}; -handle_out({publish, Publishes}, PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) -> - Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end, - Packets = lists:map( - fun({publish, PacketId, Msg}) -> - Msg1 = emqx_hooks:run_fold('message.deliver', [Client], Msg), - Msg2 = Mount(emqx_message:update_expiry(Msg1)), - emqx_packet:from_message(PacketId, Msg2) - end, Publishes), +handle_out({publish, Publishes}, PState = #protocol{client = Client}) -> + Packets = [element(2, handle_out(Publish, PState)) || Publish <- Publishes], {ok, Packets, PState}; handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg), - Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1), - {ok, emqx_packet:from_message(PacketId, Msg2), PState}; + Msg1 = emqx_hooks:run_fold('message.deliver', [Client], + emqx_message:update_expiry(Msg)), + Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)), + {ok, Packet, PState}; %% TODO: How to handle the err? handle_out({puberr, _ReasonCode}, PState) -> @@ -393,40 +423,20 @@ validate_in(Packet, _PState) -> end. %%-------------------------------------------------------------------- -%% PreProcess Properties +%% Preprocess properties %%-------------------------------------------------------------------- -preprocess_props(#mqtt_packet_connect{ - properties = #{'Topic-Alias-Maximum' := Max} - }, - PState = #protocol{alias_maximum = AliasMaximum}) -> - {ok, PState#protocol{alias_maximum = AliasMaximum#{outbound => Max}}}; +process_props(#mqtt_packet_connect{ + properties = #{'Topic-Alias-Maximum' := Max} + }, + PState = #protocol{alias_maximum = AliasMaximum}) -> + NAliasMaximum = if AliasMaximum == undefined -> + #{outbound => Max}; + true -> AliasMaximum#{outbound => Max} + end, + {ok, PState#protocol{alias_maximum = NAliasMaximum}}; -preprocess_props(Packet = #mqtt_packet{variable = Publish}, PState) -> - case preprocess_props(Publish, PState) of - {ok, Publish1, PState1} -> - {ok, Packet#mqtt_packet{variable = Publish1}, PState1}; - Error -> Error - end; - -preprocess_props(Publish = #mqtt_packet_publish{topic_name = <<>>, - properties = #{'Topic-Alias' := AliasId} - }, - PState = #protocol{topic_aliases = TopicAliases}) -> - case maps:find(AliasId, TopicAliases) of - {ok, Topic} -> - {ok, Publish#mqtt_packet_publish{topic_name = Topic}, PState}; - false -> {error, ?RC_TOPIC_ALIAS_INVALID} - end; - -preprocess_props(Publish = #mqtt_packet_publish{topic_name = Topic, - properties = #{'Topic-Alias' := AliasId} - }, - PState = #protocol{topic_aliases = Aliases}) -> - Aliases1 = maps:put(AliasId, Topic, Aliases), - {ok, Publish, PState#protocol{topic_aliases = Aliases1}}; - -preprocess_props(Packet, PState) -> +process_props(Packet, PState) -> {ok, Packet, PState}. %%-------------------------------------------------------------------- @@ -434,12 +444,15 @@ preprocess_props(Packet, PState) -> %%-------------------------------------------------------------------- check_connect(ConnPkt, PState) -> - pipeline([fun check_proto_ver/2, - fun check_client_id/2, - %%fun check_flapping/2, - fun check_banned/2, - fun check_will_topic/2, - fun check_will_retain/2], ConnPkt, PState). + case pipeline([fun check_proto_ver/2, + fun check_client_id/2, + %%fun check_flapping/2, + fun check_banned/2, + fun check_will_topic/2, + fun check_will_retain/2], ConnPkt, PState) of + ok -> {ok, PState}; + Error -> Error + end. check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> @@ -450,16 +463,19 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, %% MQTT3.1 does not allow null clientId check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, - client_id = <<>>}, _PState) -> + client_id = <<>> + }, _PState) -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; %% Issue#599: Null clientId and clean_start = false check_client_id(#mqtt_packet_connect{client_id = <<>>, clean_start = false}, _PState) -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; + check_client_id(#mqtt_packet_connect{client_id = <<>>, clean_start = true}, _PState) -> ok; + check_client_id(#mqtt_packet_connect{client_id = ClientId}, #protocol{client = #{zone := Zone}}) -> Len = byte_size(ClientId), @@ -469,6 +485,7 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +%%TODO: check banned... check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, #protocol{client = Client = #{zone := Zone}}) -> @@ -501,25 +518,32 @@ check_will_retain(#mqtt_packet_connect{will_retain = true}, end. %%-------------------------------------------------------------------- -%% Enrich state +%% Enrich client %%-------------------------------------------------------------------- -enrich_pstate(#mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - keepalive = Keepalive, - client_id = ClientId, +enrich_client(#mqtt_packet_connect{client_id = ClientId, username = Username, is_bridge = IsBridge }, PState = #protocol{client = Client}) -> - Client1 = maps:merge(Client, #{client_id => ClientId, - username => Username, - is_bridge => IsBridge - }), - {ok, PState#protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - client = Client1, - keepalive = Keepalive}}. + Client1 = set_username(Username, Client#{client_id => ClientId, + is_bridge => IsBridge + }), + {ok, PState#protocol{client = maybe_username_as_clientid(Client1)}}. + +%% Username maybe not undefined if peer_cert_as_username +set_username(Username, Client = #{username := undefined}) -> + Client#{username => Username}; +set_username(_Username, Client) -> Client. + +maybe_username_as_clientid(Client = #{username := undefined}) -> + Client; +maybe_username_as_clientid(Client = #{zone := Zone, + username := Username}) -> + case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + true -> Client#{client_id => Username}; + false -> Client + end. %%-------------------------------------------------------------------- %% Auth Connect @@ -529,25 +553,39 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId, username = Username, password = Password}, PState = #protocol{client = Client}) -> - case emqx_access_control:authenticate(Client#{password => Password}) of + case authenticate(Client#{password => Password}) of {ok, AuthResult} -> {ok, PState#protocol{client = maps:merge(Client, AuthResult)}}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [ClientId, Username, Reason]), - {error, Reason} + {error, emqx_reason_codes:connack_error(Reason)} end. %%-------------------------------------------------------------------- -%% Handle Connect +%% Assign a random clientId %%-------------------------------------------------------------------- -handle_connect(ConnPkt, PState) -> +maybe_assign_clientid(PState = #protocol{client = Client = #{client_id := <<>>}, + ack_props = AckProps}) -> + ClientId = emqx_guid:to_base62(emqx_guid:gen()), + Client1 = Client#{client_id => ClientId}, + AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), + PState#protocol{client = Client1, ack_props = AckProps1}; +maybe_assign_clientid(PState) -> PState. + +%%-------------------------------------------------------------------- +%% Process Connect +%%-------------------------------------------------------------------- + +process_connect(ConnPkt, PState) -> case open_session(ConnPkt, PState) of {ok, Session, SP} -> WillMsg = emqx_packet:will_msg(ConnPkt), - handle_out({connack, ?RC_SUCCESS, sp(SP)}, - PState#protocol{session = Session, will_msg = WillMsg}); + NPState = PState#protocol{session = Session, + will_msg = WillMsg + }, + handle_out({connack, ?RC_SUCCESS, sp(SP)}, NPState); {error, Reason} -> %% TODO: Unknown error? ?LOG(error, "Failed to open session: ~p", [Reason]), @@ -561,24 +599,65 @@ handle_connect(ConnPkt, PState) -> open_session(#mqtt_packet_connect{clean_start = CleanStart, properties = ConnProps}, #protocol{client = Client = #{zone := Zone}}) -> - MaxInflight = maps:get('Receive-Maximum', ConnProps, - emqx_zone:get_env(Zone, max_inflight, 65535)), - Interval = maps:get('Session-Expiry-Interval', ConnProps, - emqx_zone:get_env(Zone, session_expiry_interval, 0)), + MaxInflight = get_property('Receive-Maximum', ConnProps, + emqx_zone:get_env(Zone, max_inflight, 65535)), + Interval = get_property('Session-Expiry-Interval', ConnProps, + emqx_zone:get_env(Zone, session_expiry_interval, 0)), emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight, expiry_interval => Interval }). %%-------------------------------------------------------------------- -%% Handle Publish Message: Client -> Broker +%% Process publish message: Client -> Broker %%-------------------------------------------------------------------- +process_alias(Packet = #mqtt_packet{ + variable = #mqtt_packet_publish{topic_name = <<>>, + properties = #{'Topic-Alias' := AliasId} + } = Publish + }, PState = #protocol{topic_aliases = Aliases}) -> + case find_alias(AliasId, Aliases) of + {ok, Topic} -> + {ok, Packet#mqtt_packet{ + variable = Publish#mqtt_packet_publish{ + topic_name = Topic}}, PState}; + false -> {error, ?RC_TOPIC_ALIAS_INVALID} + end; + +process_alias(#mqtt_packet{ + variable = #mqtt_packet_publish{topic_name = Topic, + properties = #{'Topic-Alias' := AliasId} + } + }, PState = #protocol{topic_aliases = Aliases}) -> + {ok, PState#protocol{topic_aliases = save_alias(AliasId, Topic, Aliases)}}; + +process_alias(_Packet, PState) -> + {ok, PState}. + +find_alias(_AliasId, undefined) -> + false; +find_alias(AliasId, Aliases) -> + maps:find(AliasId, Aliases). + +save_alias(AliasId, Topic, undefined) -> + #{AliasId => Topic}; +save_alias(AliasId, Topic, Aliases) -> + maps:put(AliasId, Topic, Aliases). %% Check Publish check_publish(Packet, PState) -> - pipeline([fun check_pub_alias/2, - fun check_pub_caps/2, - fun check_pub_acl/2], Packet, PState). + pipeline([fun check_pub_acl/2, + fun check_pub_alias/2, + fun check_pub_caps/2], Packet, PState). + +%% Check Pub ACL +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, + #protocol{client = Client}) -> + case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of + false -> ok; + allow -> ok; + deny -> {error, ?RC_NOT_AUTHORIZED} + end. %% Check Pub Alias check_pub_alias(#mqtt_packet{ @@ -590,52 +669,41 @@ check_pub_alias(#mqtt_packet{ case (Limits == undefined) orelse (Max = maps:get(inbound, Limits, 0)) == 0 orelse (AliasId > Max) of - true -> {error, ?RC_TOPIC_ALIAS_INVALID}; - false -> ok + false -> ok; + true -> {error, ?RC_TOPIC_ALIAS_INVALID} end; check_pub_alias(_Packet, _PState) -> ok. %% Check Pub Caps check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain - }}, + } + }, #protocol{client = #{zone := Zone}}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). -%% Check Pub ACL -check_pub_acl(_Packet, #protocol{enable_acl = false}) -> - ok; -check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) -> - ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, - #protocol{client = Client}) -> - case emqx_access_control:check_acl(Client, publish, Topic) of - allow -> ok; - deny -> {error, ?RC_NOT_AUTHORIZED} - end. +%% Process Publish +process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), + PState = #protocol{client = Client}) -> + Msg = emqx_packet:to_message(Client, Packet), + %%TODO: Improve later. + Msg1 = emqx_message:set_flag(dup, false, Msg), + process_publish(PacketId, mount(Client, Msg1), PState). -handle_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), - PState = #protocol{client = Client = #{mountpoint := MountPoint}}) -> - %% TODO: ugly... publish_to_msg(...) - Mount = fun(Msg) -> emqx_mountpoint:mount(MountPoint, Msg) end, - Msg1 = emqx_packet:to_message(Client, Packet), - Msg2 = Mount(emqx_message:set_flag(dup, false, Msg1)), - handle_publish(PacketId, Msg2, PState). - -handle_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) -> +process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) -> _ = emqx_broker:publish(Msg), {ok, PState}; -handle_publish(PacketId, Msg = #message{qos = ?QOS_1}, PState) -> - Results = emqx_broker:publish(Msg), - ReasonCode = emqx_reason_codes:puback(Results), +process_publish(PacketId, Msg = #message{qos = ?QOS_1}, PState) -> + Deliveries = emqx_broker:publish(Msg), + ReasonCode = emqx_reason_codes:puback(Deliveries), handle_out({puback, PacketId, ReasonCode}, PState); -handle_publish(PacketId, Msg = #message{qos = ?QOS_2}, - PState = #protocol{session = Session}) -> +process_publish(PacketId, Msg = #message{qos = ?QOS_2}, + PState = #protocol{session = Session}) -> case emqx_session:publish(PacketId, Msg, Session) of - {ok, Results, NSession} -> - ReasonCode = emqx_reason_codes:puback(Results), + {ok, Deliveries, NSession} -> + ReasonCode = emqx_reason_codes:puback(Deliveries), handle_out({pubrec, PacketId, ReasonCode}, PState#protocol{session = NSession}); {error, ReasonCode} -> @@ -643,35 +711,40 @@ handle_publish(PacketId, Msg = #message{qos = ?QOS_2}, end. %%-------------------------------------------------------------------- -%% Handle Subscribe Request +%% Puback %%-------------------------------------------------------------------- -handle_subscribe(TopicFilters, PState) -> - handle_subscribe(TopicFilters, [], PState). +puback(?QOS_0, _PacketId, ReasonCode, PState) -> + handle_out({puberr, ReasonCode}, PState); +puback(?QOS_1, PacketId, ReasonCode, PState) -> + handle_out({puback, PacketId, ReasonCode}, PState); +puback(?QOS_2, PacketId, ReasonCode, PState) -> + handle_out({pubrec, PacketId, ReasonCode}, PState). -handle_subscribe([], Acc, PState) -> +%%-------------------------------------------------------------------- +%% Process subscribe request +%%-------------------------------------------------------------------- + +process_subscribe(TopicFilters, PState) -> + process_subscribe(TopicFilters, [], PState). + +process_subscribe([], Acc, PState) -> {lists:reverse(Acc), PState}; -handle_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> - {RC, PState1} = do_subscribe(TopicFilter, SubOpts, PState), - handle_subscribe(More, [RC|Acc], PState1). +process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> + {RC, NPState} = do_subscribe(TopicFilter, SubOpts, PState), + process_subscribe(More, [RC|Acc], NPState). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, - PState = #protocol{client = Client = #{mountpoint := Mountpoint}, - session = Session}) -> - Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end, - %% 1. Parse 2. Check 3. Enrich 5. MountPoint 6. Session - SubOpts1 = maps:merge(?DEFAULT_SUBOPTS, SubOpts), - {TopicFilter1, SubOpts2} = emqx_topic:parse(TopicFilter, SubOpts1), - SubOpts3 = enrich_subopts(SubOpts2, PState), - case check_subscribe(TopicFilter1, PState) of - {ok, _, _} -> %% TODO:... - TopicFilter2 = Mount(TopicFilter1), - case emqx_session:subscribe(Client, TopicFilter2, SubOpts3, Session) of - {ok, NSession} -> - {QoS, PState#protocol{session = NSession}}; - {error, RC} -> {RC, PState} - end; + PState = #protocol{client = Client, session = Session}) -> + case check_subscribe(TopicFilter, PState) of + ok -> TopicFilter1 = mount(Client, TopicFilter), + SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState), + case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of + {ok, NSession} -> + {QoS, PState#protocol{session = NSession}}; + {error, RC} -> {RC, PState} + end; {error, RC} -> {RC, PState} end. @@ -682,55 +755,59 @@ enrich_subid(_Properties, TopicFilters) -> enrich_subopts(SubOpts, #protocol{proto_ver = ?MQTT_PROTO_V5}) -> SubOpts; -enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, - is_bridge := IsBridge}}) -> +enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge}}) -> Rap = flag(IsBridge), Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)), SubOpts#{rap => Rap, nl => Nl}. +%% Check Sub check_subscribe(TopicFilter, PState) -> - pipeline([%%TODO: fun check_sub_caps/2, - fun check_sub_acl/2], TopicFilter, PState). + case check_sub_acl(TopicFilter, PState) of + allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState); + deny -> {error, ?RC_NOT_AUTHORIZED} + end. + +%% Check Sub ACL +check_sub_acl(TopicFilter, #protocol{client = Client}) -> + case is_acl_enabled(Client) andalso + check_acl(Client, subscribe, TopicFilter) of + false -> allow; + Result -> Result + end. %% Check Sub Caps check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) -> emqx_mqtt_caps:check_sub(Zone, TopicFilter). -%% Check Sub ACL -check_sub_acl(_TopicFilter, #protocol{enable_acl = false}) -> - ok; -check_sub_acl(_TopicFilter, #protocol{client = #{is_superuser := true}}) -> - ok; -check_sub_acl(TopicFilter, #protocol{client = Client}) -> - case emqx_access_control:check_acl(Client, subscribe, TopicFilter) of - allow -> ok; - deny -> {error, ?RC_NOT_AUTHORIZED} - end. - %%-------------------------------------------------------------------- -%% Handle Unsubscribe Request +%% Process unsubscribe request %%-------------------------------------------------------------------- -handle_unsubscribe(TopicFilters, PState) -> - handle_unsubscribe(TopicFilters, [], PState). +process_unsubscribe(TopicFilters, PState) -> + process_unsubscribe(TopicFilters, [], PState). -handle_unsubscribe([], Acc, PState) -> +process_unsubscribe([], Acc, PState) -> {lists:reverse(Acc), PState}; -handle_unsubscribe([TopicFilter|More], Acc, PState) -> - {RC, PState1} = do_unsubscribe(TopicFilter, PState), - handle_unsubscribe(More, [RC|Acc], PState1). +process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> + {RC, PState1} = do_unsubscribe(TopicFilter, SubOpts, PState), + process_unsubscribe(More, [RC|Acc], PState1). -do_unsubscribe(TopicFilter, PState = #protocol{client = Client = #{mountpoint := Mountpoint}, - session = Session}) -> - Mount = fun(Topic) -> emqx_mountpoint:mount(Mountpoint, Topic) end, - TopicFilter1 = Mount(element(1, emqx_topic:parse(TopicFilter))), - case emqx_session:unsubscribe(Client, TopicFilter1, Session) of +do_unsubscribe(TopicFilter, _SubOpts, PState = #protocol{client = Client, + session = Session}) -> + case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of {ok, NSession} -> {?RC_SUCCESS, PState#protocol{session = NSession}}; {error, RC} -> {RC, PState} end. +%%-------------------------------------------------------------------- +%% Is ACL enabled? +%%-------------------------------------------------------------------- + +is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> + (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true). + %%-------------------------------------------------------------------- %% Pipeline %%-------------------------------------------------------------------- @@ -745,16 +822,36 @@ pipeline([Fun|More], Packet, PState) -> pipeline(More, Packet, NPState); {ok, NPacket, NPState} -> pipeline(More, NPacket, NPState); - {error, Reason} -> - {error, Reason, PState}; - {error, Reason, NPState} -> - {error, Reason, NPState} + {error, ReasonCode} -> + {error, ReasonCode, PState}; + {error, ReasonCode, NPState} -> + {error, ReasonCode, NPState} end. +%%-------------------------------------------------------------------- +%% Mount/Unmount +%%-------------------------------------------------------------------- + +mount(#{mountpoint := MountPoint}, TopicOrMsg) -> + emqx_mountpoint:mount(MountPoint, TopicOrMsg). + +unmount(#{mountpoint := MountPoint}, TopicOrMsg) -> + emqx_mountpoint:unmount(MountPoint, TopicOrMsg). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +set_property(Name, Value, ?NO_PROPS) -> + #{Name => Value}; +set_property(Name, Value, Props) -> + Props#{Name => Value}. + +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default). + sp(true) -> 1; sp(false) -> 0. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8d529a921..81b24087b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -136,6 +136,8 @@ -opaque(session() :: #session{}). +-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}). + -define(DEFAULT_BATCH_N, 1000). %%-------------------------------------------------------------------- @@ -334,7 +336,8 @@ do_publish(PacketId, Msg = #message{timestamp = Ts}, %%-------------------------------------------------------------------- -spec(puback(emqx_types:packet_id(), emqx_types:reason_code(), session()) - -> {ok, session()} | {error, emqx_types:reason_code()}). + -> {ok, session()} | {ok, list(publish()), session()} | + {error, emqx_types:reason_code()}). puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> @@ -388,7 +391,8 @@ pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) -> %%-------------------------------------------------------------------- -spec(pubcomp(emqx_types:packet_id(), emqx_types:reason_code(), session()) - -> {ok, session()} | {error, emqx_types:reason_code()}). + -> {ok, session()} | {ok, list(publish()), session()} | + {error, emqx_types:reason_code()}). pubcomp(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true ->