diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 5ef4a9b0d..4dc2c8e75 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -41,6 +41,7 @@ proto_name, ackprops, client_id, + is_assigned, conn_pid, conn_props, ack_props, @@ -87,6 +88,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, client_id = <<>>, + is_assigned = false, conn_pid = self(), username = init_username(Peercert, Options), is_super = false, @@ -265,17 +267,16 @@ process_packet(?CONNECT_PACKET( %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, - PState#pstate{client_id = ClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - will_topic = WillTopic, - will_msg = WillMsg, - is_bridge = IsBridge, - connected_at = os:timestamp()}), + PState1 = set_username(Username, PState#pstate{client_id = ClientId, + proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + will_topic = WillTopic, + will_msg = WillMsg, + is_bridge = IsBridge, + connected_at = os:timestamp()}), connack( case check_connect(Connect, PState1) of @@ -450,6 +451,33 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); +deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, + proto_ver = ?MQTT_PROTO_V5, + client_id = ClientId, + is_assigned = IsAssigned}) -> + #{max_packet_size := MaxPktSize, + max_qos_allowed := MaxQoS, + mqtt_retain_available := Retain, + max_topic_alias := MaxAlias, + mqtt_shared_subscription := Shared, + mqtt_wildcard_subscription := Wildcard} = caps(PState), + Props = #{'Maximum-QoS' => MaxQoS, + 'Retain-Available' => flag(Retain), + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => Wildcard, + 'Subscription-Identifiers-Available' => 1, + 'Shared-Subscription-Available' => flag(Shared)}, + Props1 = if IsAssigned -> + Props#{'Assigned-Client-Identifier' => ClientId}; + true -> Props + end, + Props2 = case emqx_zone:get_env(Zone, server_keepalive) of + undefined -> Props1; + Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive} + end, + send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState); + deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); @@ -509,7 +537,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#pstate{client_id = ClientId, ackprops = AckProps1}; + PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1}; maybe_assign_client_id(PState) -> PState. @@ -532,9 +560,13 @@ try_open_session(#pstate{zone = Zone, authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of - ok -> {ok, false}; - {ok, IsSuper} -> {ok, IsSuper}; - {error, Error} -> {error, Error} + ok -> {ok, false}; + {ok, IsSuper} when is_boolean(IsSuper) -> + {ok, IsSuper}; + {ok, Result} when is_map(Result) -> + {ok, maps:get(is_superuser, Result, false)}; + {error, Error} -> + {error, Error} end. set_property(Name, Value, undefined) -> @@ -705,3 +737,6 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> sp(true) -> 1; sp(false) -> 0. +flag(false) -> 0; +flag(true) -> 1. +