From e6bed24bb32a6d30057c132d998866d012730a00 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 00:32:56 +0800 Subject: [PATCH 1/2] Add server_keepalive config --- etc/emqx.conf | 5 +++++ priv/emqx.schema | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/etc/emqx.conf b/etc/emqx.conf index 5975ae542..e8435a3b3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -580,6 +580,11 @@ zone.external.enable_stats = on ## Value: boolean ## zone.external.shared_subscription = false +## Server Keep Alive +## +## Value: Number +## zone.external.server_keepalive = 0 + ## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## until 'Keepalive * backoff * 2' timeout. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 07ff5ce6f..1e1892b33 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -735,6 +735,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Server Keepalive +{mapping, "zone.$name.server_keepalive", "emqx.zones", [ + {datatype, integer} +]}. + %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ {default, 0.75}, From b6006b5947b784fa5cdfdd558f03fd8938bd6708 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 00:40:10 +0800 Subject: [PATCH 2/2] Support CONNACK properties --- src/emqx_protocol.erl | 65 +++++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 5ef4a9b0d..4dc2c8e75 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -41,6 +41,7 @@ proto_name, ackprops, client_id, + is_assigned, conn_pid, conn_props, ack_props, @@ -87,6 +88,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, client_id = <<>>, + is_assigned = false, conn_pid = self(), username = init_username(Peercert, Options), is_super = false, @@ -265,17 +267,16 @@ process_packet(?CONNECT_PACKET( %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, - PState#pstate{client_id = ClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - will_topic = WillTopic, - will_msg = WillMsg, - is_bridge = IsBridge, - connected_at = os:timestamp()}), + PState1 = set_username(Username, PState#pstate{client_id = ClientId, + proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + will_topic = WillTopic, + will_msg = WillMsg, + is_bridge = IsBridge, + connected_at = os:timestamp()}), connack( case check_connect(Connect, PState1) of @@ -450,6 +451,33 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); +deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, + proto_ver = ?MQTT_PROTO_V5, + client_id = ClientId, + is_assigned = IsAssigned}) -> + #{max_packet_size := MaxPktSize, + max_qos_allowed := MaxQoS, + mqtt_retain_available := Retain, + max_topic_alias := MaxAlias, + mqtt_shared_subscription := Shared, + mqtt_wildcard_subscription := Wildcard} = caps(PState), + Props = #{'Maximum-QoS' => MaxQoS, + 'Retain-Available' => flag(Retain), + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => Wildcard, + 'Subscription-Identifiers-Available' => 1, + 'Shared-Subscription-Available' => flag(Shared)}, + Props1 = if IsAssigned -> + Props#{'Assigned-Client-Identifier' => ClientId}; + true -> Props + end, + Props2 = case emqx_zone:get_env(Zone, server_keepalive) of + undefined -> Props1; + Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive} + end, + send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState); + deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); @@ -509,7 +537,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#pstate{client_id = ClientId, ackprops = AckProps1}; + PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1}; maybe_assign_client_id(PState) -> PState. @@ -532,9 +560,13 @@ try_open_session(#pstate{zone = Zone, authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of - ok -> {ok, false}; - {ok, IsSuper} -> {ok, IsSuper}; - {error, Error} -> {error, Error} + ok -> {ok, false}; + {ok, IsSuper} when is_boolean(IsSuper) -> + {ok, IsSuper}; + {ok, Result} when is_map(Result) -> + {ok, maps:get(is_superuser, Result, false)}; + {error, Error} -> + {error, Error} end. set_property(Name, Value, undefined) -> @@ -705,3 +737,6 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> sp(true) -> 1; sp(false) -> 0. +flag(false) -> 0; +flag(true) -> 1. +