Merge pull request #1775 from emqtt/emqx30-feng

Support server_keepalive and CONNACK properties
This commit is contained in:
Feng Lee 2018-08-31 00:45:51 +08:00 committed by GitHub
commit 1574d85570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 15 deletions

View File

@ -580,6 +580,11 @@ zone.external.enable_stats = on
## Value: boolean ## Value: boolean
## zone.external.shared_subscription = false ## zone.external.shared_subscription = false
## Server Keep Alive
##
## Value: Number
## zone.external.server_keepalive = 0
## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## The backoff for MQTT keepalive timeout. The broker will kick a connection out
## until 'Keepalive * backoff * 2' timeout. ## until 'Keepalive * backoff * 2' timeout.
## ##

View File

@ -735,6 +735,11 @@ end}.
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc Server Keepalive
{mapping, "zone.$name.server_keepalive", "emqx.zones", [
{datatype, integer}
]}.
%% @doc Keepalive backoff %% @doc Keepalive backoff
{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
{default, 0.75}, {default, 0.75},

View File

@ -41,6 +41,7 @@
proto_name, proto_name,
ackprops, ackprops,
client_id, client_id,
is_assigned,
conn_pid, conn_pid,
conn_props, conn_props,
ack_props, ack_props,
@ -87,6 +88,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
client_id = <<>>, client_id = <<>>,
is_assigned = false,
conn_pid = self(), conn_pid = self(),
username = init_username(Peercert, Options), username = init_username(Peercert, Options),
is_super = false, is_super = false,
@ -264,18 +266,17 @@ process_packet(?CONNECT_PACKET(
%% TODO: Mountpoint... %% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg) %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = emqx_packet:will_msg(Connect), WillMsg = emqx_packet:will_msg(Connect),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_topic = WillTopic,
will_msg = WillMsg,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
PState1 = set_username(Username, PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_topic = WillTopic,
will_msg = WillMsg,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
connack( connack(
case check_connect(Connect, PState1) of case check_connect(Connect, PState1) of
{ok, PState2} -> {ok, PState2} ->
@ -449,6 +450,33 @@ puback(?QOS_2, PacketId, {ok, _}, PState) ->
deliver({connack, ReasonCode}, PState) -> deliver({connack, ReasonCode}, PState) ->
send(?CONNACK_PACKET(ReasonCode), PState); send(?CONNACK_PACKET(ReasonCode), PState);
deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId,
is_assigned = IsAssigned}) ->
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
mqtt_retain_available := Retain,
max_topic_alias := MaxAlias,
mqtt_shared_subscription := Shared,
mqtt_wildcard_subscription := Wildcard} = caps(PState),
Props = #{'Maximum-QoS' => MaxQoS,
'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => Wildcard,
'Subscription-Identifiers-Available' => 1,
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if IsAssigned ->
Props#{'Assigned-Client-Identifier' => ClientId};
true -> Props
end,
Props2 = case emqx_zone:get_env(Zone, server_keepalive) of
undefined -> Props1;
Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive}
end,
send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState);
deliver({connack, ReasonCode, SP}, PState) -> deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState); send(?CONNACK_PACKET(ReasonCode, SP), PState);
@ -508,7 +536,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
ClientId = emqx_guid:to_base62(emqx_guid:gen()), ClientId = emqx_guid:to_base62(emqx_guid:gen()),
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
PState#pstate{client_id = ClientId, ackprops = AckProps1}; PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1};
maybe_assign_client_id(PState) -> maybe_assign_client_id(PState) ->
PState. PState.
@ -531,9 +559,13 @@ try_open_session(#pstate{zone = Zone,
authenticate(Credentials, Password) -> authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false}; ok -> {ok, false};
{ok, IsSuper} -> {ok, IsSuper}; {ok, IsSuper} when is_boolean(IsSuper) ->
{error, Error} -> {error, Error} {ok, IsSuper};
{ok, Result} when is_map(Result) ->
{ok, maps:get(is_superuser, Result, false)};
{error, Error} ->
{error, Error}
end. end.
set_property(Name, Value, undefined) -> set_property(Name, Value, undefined) ->
@ -709,3 +741,7 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
sp(true) -> 1; sp(true) -> 1;
sp(false) -> 0. sp(false) -> 0.
flag(false) -> 0;
flag(true) -> 1.