Merge pull request #12947 from savonarola/0426-auth-timeout

feat(authn): add connection expire based on authn data
This commit is contained in:
Ilia Averianov 2024-05-03 11:32:18 +03:00 committed by GitHub
commit 3ed385201c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 642 additions and 79 deletions

View File

@ -1075,7 +1075,7 @@ handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -
Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
{ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel};
handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
{ok, {close, ReasonName}, Channel};
{ok, ?REPLY_CLOSE(ReasonName), Channel};
handle_out(auth, {ReasonCode, Properties}, Channel) ->
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
handle_out(Type, Data, Channel) ->
@ -1406,6 +1406,16 @@ handle_timeout(
{_, Quota2} ->
{ok, clean_timer(TimerName, Channel#channel{quota = Quota2})}
end;
handle_timeout(
_TRef,
connection_expire,
#channel{conn_state = ConnState} = Channel0
) ->
Channel1 = clean_timer(connection_expire, Channel0),
case ConnState of
disconnected -> {ok, Channel1};
_ -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel1)
end;
handle_timeout(TRef, Msg, Channel) ->
case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
[] ->
@ -1810,18 +1820,23 @@ log_auth_failure(Reason) ->
%% Merge authentication result into ClientInfo
%% Authentication result may include:
%% 1. `is_superuser': The superuser flag from various backends
%% 2. `acl': ACL rules from JWT, HTTP auth backend
%% 3. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 4. Maybe more non-standard fields used by hook callbacks
%% 2. `expire_at`: Authentication validity deadline, the client will be disconnected after this time
%% 3. `acl': ACL rules from JWT, HTTP auth backend
%% 4. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 5. Maybe more non-standard fields used by hook callbacks
merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
IsSuperuser = maps:get(is_superuser, AuthResult0, false),
AuthResult = maps:without([client_attrs], AuthResult0),
ExpireAt = maps:get(expire_at, AuthResult0, undefined),
AuthResult = maps:without([client_attrs, expire_at], AuthResult0),
Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
Attrs = maps:merge(Attrs0, Attrs1),
NewClientInfo = maps:merge(
ClientInfo#{client_attrs => Attrs},
AuthResult#{is_superuser => IsSuperuser}
AuthResult#{
is_superuser => IsSuperuser,
auth_expire_at => ExpireAt
}
),
fix_mountpoint(NewClientInfo).
@ -2228,10 +2243,16 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = trim_conninfo(NConnInfo),
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := undefined}}) ->
Channel;
schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := ExpireAt}}) ->
Interval = max(0, ExpireAt - erlang:system_time(millisecond)),
ensure_timer(connection_expire, Interval, Channel).
trim_conninfo(ConnInfo) ->
maps:without(

View File

@ -1061,6 +1061,7 @@ clientinfo(InitProps) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
auth_expire_at => undefined,
is_bridge => false,
mountpoint => undefined
},

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connection_expire_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
t_disonnect_by_auth_info(_) ->
_ = process_flag(trap_exit, true),
_ = meck:new(emqx_access_control, [passthrough, no_history]),
_ = meck:expect(emqx_access_control, authenticate, fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end),
{ok, C} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
receive
{disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok
after 5000 ->
ct:fail("Client should be disconnected by timeout")
end.

View File

@ -142,6 +142,8 @@ end).
-type state() :: #{atom() => term()}.
-type extra() :: #{
is_superuser := boolean(),
%% millisecond timestamp
expire_at => pos_integer(),
atom() => term()
}.
-type user_info() :: #{

View File

@ -76,6 +76,7 @@ authenticate(
Credential,
#{
verify_claims := VerifyClaims0,
disconnect_after_expire := DisconnectAfterExpire,
jwk := JWK,
acl_claim_name := AclClaimName,
from := From
@ -84,11 +85,12 @@ authenticate(
JWT = maps:get(From, Credential),
JWKs = [JWK],
VerifyClaims = replace_placeholder(VerifyClaims0, Credential),
verify(JWT, JWKs, VerifyClaims, AclClaimName);
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire);
authenticate(
Credential,
#{
verify_claims := VerifyClaims0,
disconnect_after_expire := DisconnectAfterExpire,
jwk_resource := ResourceId,
acl_claim_name := AclClaimName,
from := From
@ -104,7 +106,7 @@ authenticate(
{ok, JWKs} ->
JWT = maps:get(From, Credential),
VerifyClaims = replace_placeholder(VerifyClaims0, Credential),
verify(JWT, JWKs, VerifyClaims, AclClaimName)
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire)
end.
destroy(#{jwk_resource := ResourceId}) ->
@ -123,6 +125,7 @@ create2(#{
secret := Secret0,
secret_base64_encoded := Base64Encoded,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
}) ->
@ -134,6 +137,7 @@ create2(#{
{ok, #{
jwk => JWK,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}}
@ -143,6 +147,7 @@ create2(#{
algorithm := 'public-key',
public_key := PublicKey,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
}) ->
@ -150,6 +155,7 @@ create2(#{
{ok, #{
jwk => JWK,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}};
@ -157,6 +163,7 @@ create2(
#{
use_jwks := true,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
} = Config
@ -171,6 +178,7 @@ create2(
{ok, #{
jwk_resource => ResourceId,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}}.
@ -214,23 +222,12 @@ replace_placeholder([{Name, {placeholder, PL}} | More], Variables, Acc) ->
replace_placeholder([{Name, Value} | More], Variables, Acc) ->
replace_placeholder(More, Variables, [{Name, Value} | Acc]).
verify(undefined, _, _, _) ->
verify(undefined, _, _, _, _) ->
ignore;
verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire) ->
case do_verify(JWT, JWKs, VerifyClaims) of
{ok, Extra} ->
IsSuperuser = emqx_authn_utils:is_superuser(Extra),
Attrs = emqx_authn_utils:client_attrs(Extra),
try
ACL = acl(Extra, AclClaimName),
Result = maps:merge(IsSuperuser, maps:merge(ACL, Attrs)),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
{error, bad_username_or_password}
end;
extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire);
{error, {missing_claim, Claim}} ->
%% it's a invalid token, so it's ok to log
?TRACE_AUTHN_PROVIDER("missing_jwt_claim", #{jwt => JWT, claim => Claim}),
@ -245,6 +242,28 @@ verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
{error, bad_username_or_password}
end.
extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire) ->
IsSuperuser = emqx_authn_utils:is_superuser(Extra),
Attrs = emqx_authn_utils:client_attrs(Extra),
ExpireAt = expire_at(DisconnectAfterExpire, Extra),
try
ACL = acl(Extra, AclClaimName),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
{error, bad_username_or_password}
end.
expire_at(false, _Extra) ->
#{};
expire_at(true, #{<<"exp">> := ExpireTime}) ->
#{expire_at => erlang:convert_time_unit(ExpireTime, second, millisecond)};
expire_at(true, #{}) ->
#{}.
acl(Claims, AclClaimName) ->
case Claims of
#{AclClaimName := Rules} ->
@ -397,3 +416,6 @@ parse_rule(Rule) ->
{error, Reason} ->
throw({bad_acl_rule, Reason})
end.
merge_maps([]) -> #{};
merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).

View File

@ -122,6 +122,7 @@ common_fields() ->
desc => ?DESC(acl_claim_name)
}},
{verify_claims, fun verify_claims/1},
{disconnect_after_expire, fun disconnect_after_expire/1},
{from, fun from/1}
] ++ emqx_authn_schema:common_fields().
@ -172,6 +173,11 @@ verify_claims(required) ->
verify_claims(_) ->
undefined.
disconnect_after_expire(type) -> boolean();
disconnect_after_expire(desc) -> ?DESC(?FUNCTION_NAME);
disconnect_after_expire(default) -> true;
disconnect_after_expire(_) -> undefined.
do_check_verify_claims([]) ->
true;
do_check_verify_claims([{Name, Expected} | More]) ->

View File

@ -55,7 +55,8 @@ t_hmac_based(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"username">>, <<"${username}">>}]
verify_claims => [{<<"username">>, <<"${username}">>}],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
@ -179,7 +180,8 @@ t_public_key(_) ->
use_jwks => false,
algorithm => 'public-key',
public_key => PublicKey,
verify_claims => []
verify_claims => [],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
@ -207,7 +209,8 @@ t_jwt_in_username(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => []
verify_claims => [],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
@ -238,7 +241,7 @@ t_jwks_renewal(_Config) ->
algorithm => 'public-key',
ssl => #{enable => false},
verify_claims => [],
disconnect_after_expire => false,
use_jwks => true,
endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH,
refresh_interval => 1000,
@ -335,7 +338,8 @@ t_verify_claims(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"foo">>, <<"bar">>}]
verify_claims => [{<<"foo">>, <<"bar">>}],
disconnect_after_expire => false
},
{ok, State0} = emqx_authn_jwt:create(?AUTHN_ID, Config0),

View File

@ -0,0 +1,96 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authn_jwt_expire_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_auth/include/emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(PATH, [authentication]).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Config) ->
_ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
Config.
end_per_testcase(_, _Config) ->
_ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
ok.
init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_jwt], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
%%--------------------------------------------------------------------
%% CT cases
%%--------------------------------------------------------------------
t_jwt_expire(_Config) ->
_ = process_flag(trap_exit, true),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, auth_config()}
),
{ok, [#{provider := emqx_authn_jwt}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
Expire = erlang:system_time(second) + 3,
Payload = #{
<<"username">> => <<"myuser">>,
<<"exp">> => Expire
},
JWS = emqx_authn_jwt_SUITE:generate_jws('hmac-based', Payload, <<"secret">>),
{ok, C} = emqtt:start_link([{username, <<"myuser">>}, {password, JWS}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
receive
{disconnected, ?RC_NOT_AUTHORIZED, #{}} ->
?assert(erlang:system_time(second) >= Expire)
after 5000 ->
ct:fail("Client should be disconnected by timeout")
end.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
auth_config() ->
#{
<<"use_jwks">> => false,
<<"algorithm">> => <<"hmac-based">>,
<<"acl_claim_name">> => <<"acl">>,
<<"secret">> => <<"secret">>,
<<"mechanism">> => <<"jwt">>,
<<"verify_claims">> => #{<<"username">> => <<"${username}">>}
%% Should be enabled by default
%% <<"disconnect_after_expire">> => true
}.

View File

@ -455,11 +455,12 @@ t_invalid_rule(_Config) ->
authn_config() ->
#{
<<"mechanism">> => <<"jwt">>,
<<"use_jwks">> => <<"false">>,
<<"use_jwks">> => false,
<<"algorithm">> => <<"hmac-based">>,
<<"secret">> => ?SECRET,
<<"secret_base64_encoded">> => <<"false">>,
<<"secret_base64_encoded">> => false,
<<"acl_claim_name">> => <<"acl">>,
<<"disconnect_after_expire">> => false,
<<"verify_claims">> => #{
<<"username">> => ?PH_USERNAME
}

View File

@ -518,7 +518,7 @@ handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({close, Reason}, State) ->
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
?tp(debug, force_socket_close, #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg(
{event, connected},

View File

@ -39,6 +39,7 @@
%% Authentication circle
-export([
authenticate/2,
connection_expire_interval/2,
open_session/5,
open_session/6,
insert_channel_info/4,
@ -78,6 +79,13 @@ authenticate(_Ctx, ClientInfo0) ->
{error, Reason}
end.
-spec connection_expire_interval(context(), emqx_types:clientinfo()) ->
undefined | non_neg_integer().
connection_expire_interval(_Ctx, #{auth_expire_at := undefined}) ->
undefined;
connection_expire_interval(_Ctx, #{auth_expire_at := ExpireAt}) ->
max(0, ExpireAt - erlang:system_time(millisecond)).
%% @doc Register the session to the cluster.
%%
%% This function should be called after the client has authenticated
@ -157,6 +165,9 @@ set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
emqx_gateway_cm:connection_closed(GwName, ClientId).
%%--------------------------------------------------------------------
%% Message circle
-spec authorize(
context(),
emqx_types:clientinfo(),
@ -167,6 +178,9 @@ connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
authorize(_Ctx, ClientInfo, Action, Topic) ->
emqx_access_control:authorize(ClientInfo, Action, Topic).
%%--------------------------------------------------------------------
%% Metrics & Stats
metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
emqx_gateway_metrics:inc(GwName, Name).
@ -183,6 +197,8 @@ eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
ClientInfo#{mountpoint := MountPoint1}.
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
IsSuperuser = maps:get(is_superuser, AuthResult, false),
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
IsSuperuser = maps:get(is_superuser, AuthResult0, false),
ExpireAt = maps:get(expire_at, AuthResult0, undefined),
AuthResult1 = maps:without([expire_at], AuthResult0),
maps:merge(ClientInfo#{auth_expire_at => ExpireAt}, AuthResult1#{is_superuser => IsSuperuser}).

View File

@ -82,4 +82,4 @@ t_authenticate(_) ->
?assertMatch({ok, #{is_superuser := true}}, emqx_gateway_ctx:authenticate(Ctx, Info4)),
ok.
default_result(Info) -> Info#{zone => default, is_superuser => false}.
default_result(Info) -> Info#{zone => default, is_superuser => false, auth_expire_at => undefined}.

View File

@ -214,6 +214,8 @@ handle_timeout(_, {transport, Msg}, Channel) ->
call_session(timeout, Msg, Channel);
handle_timeout(_, disconnect, Channel) ->
{shutdown, normal, Channel};
handle_timeout(_, connection_expire, Channel) ->
{shutdown, expired, Channel};
handle_timeout(_, _, Channel) ->
{ok, Channel}.
@ -595,6 +597,14 @@ process_connect(
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
end.
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire_timer, Interval, connection_expire, Channel)
end.
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
@ -619,7 +629,7 @@ ensure_connected(
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]),
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = connected}.
schedule_connection_expire(Channel#channel{conninfo = NConnInfo, conn_state = connected}).
%%--------------------------------------------------------------------
%% Ensure disconnected

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_coap, [
{description, "CoAP Gateway"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -29,6 +29,7 @@
-include_lib("er_coap_client/include/coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -83,6 +84,17 @@ init_per_testcase(t_connection_with_authn_failed, Config) ->
fun(_) -> {error, bad_username_or_password} end
),
Config;
init_per_testcase(t_connection_with_expire, Config) ->
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 100}}
end
),
snabbkaffe:start_trace(),
Config;
init_per_testcase(t_heartbeat, Config) ->
NewHeartbeat = 800,
OldConf = emqx:get_raw_config([gateway, coap]),
@ -103,6 +115,10 @@ end_per_testcase(t_heartbeat, Config) ->
OldConf = ?config(old_conf, Config),
{ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
ok;
end_per_testcase(t_connection_with_expire, Config) ->
snabbkaffe:stop(),
meck:unload(emqx_access_control),
Config;
end_per_testcase(_, Config) ->
ok = meck:unload(emqx_access_control),
Config.
@ -270,6 +286,26 @@ t_connection_with_authn_failed(_) ->
),
ok.
t_connection_with_expire(_) ->
ChId = {{127, 0, 0, 1}, 5683},
{ok, Sock} = er_coap_udp_socket:start_link(),
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
URI = ?MQTT_PREFIX ++ "/connection?clientid=client1",
?assertWaitEvent(
begin
Req = make_req(post),
{ok, created, _Data} = do_request(Channel, URI, Req)
end,
#{
?snk_kind := conn_process_terminated,
clientid := <<"client1">>,
reason := {shutdown, expired}
},
5000
).
t_publish(_) ->
%% can publish to a normal topic
Topics = [

View File

@ -302,6 +302,9 @@ handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -
{shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, connection_expire, Channel) ->
NChannel = remove_timer_ref(connection_expire, Channel),
{ok, [{event, disconnected}, {close, expired}], NChannel};
handle_timeout(_TRef, Msg, Channel) ->
?SLOG(warning, #{
msg => "unexpected_timeout_signal",
@ -666,10 +669,18 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire, Interval, Channel)
end.
ensure_disconnected(
Reason,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_exproto, [
{description, "ExProto Gateway"},
{vsn, "0.1.9"},
{vsn, "0.1.10"},
{registered, []},
{applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
{env, []},

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -81,6 +82,7 @@ groups() ->
t_raw_publish,
t_auth_deny,
t_acl_deny,
t_auth_expire,
t_hook_connected_disconnected,
t_hook_session_subscribed_unsubscribed,
t_hook_message_delivered
@ -157,14 +159,17 @@ end_per_group(_, Cfg) ->
init_per_testcase(TestCase, Cfg) when
TestCase == t_enter_passive_mode
->
snabbkaffe:start_trace(),
case proplists:get_value(listener_type, Cfg) of
udp -> {skip, ignore};
_ -> Cfg
end;
init_per_testcase(_TestCase, Cfg) ->
snabbkaffe:start_trace(),
Cfg.
end_per_testcase(_TestCase, _Cfg) ->
snabbkaffe:stop(),
ok.
listener_confs(Type) ->
@ -290,6 +295,42 @@ t_auth_deny(Cfg) ->
end,
meck:unload([emqx_gateway_ctx]).
t_auth_expire(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
?assertWaitEvent(
begin
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000)
end,
#{
?snk_kind := conn_process_terminated,
clientid := <<"test_client_1">>,
reason := {shutdown, expired}
},
5000
).
t_acl_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
@ -332,7 +373,6 @@ t_acl_deny(Cfg) ->
close(Sock).
t_keepalive_timeout(Cfg) ->
ok = snabbkaffe:start_trace(),
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
@ -383,8 +423,7 @@ t_keepalive_timeout(Cfg) ->
?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
%% socket port should be closed
?assertEqual({error, closed}, recv(Sock, 5000))
end,
snabbkaffe:stop().
end.
t_hook_connected_disconnected(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
@ -513,7 +552,6 @@ t_hook_message_delivered(Cfg) ->
emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
t_idle_timeout(Cfg) ->
ok = snabbkaffe:start_trace(),
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
@ -551,8 +589,7 @@ t_idle_timeout(Cfg) ->
{ok, #{reason := {shutdown, idle_timeout}}},
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
)
end,
snabbkaffe:stop().
end.
%%--------------------------------------------------------------------
%% Utils

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_gbt32960, [
{description, "GBT32960 Gateway"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -72,7 +72,8 @@
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery
retry_timer => retry_delivery,
connection_expire_timer => connection_expire
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -468,6 +469,13 @@ handle_timeout(
{Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}),
{ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)}
end;
handle_timeout(
_TRef,
connection_expire,
Channel
) ->
NChannel = clean_timer(connection_expire_timer, Channel),
{ok, [{event, disconnected}, {close, expired}], NChannel};
handle_timeout(_TRef, Msg, Channel) ->
log(error, #{msg => "unexpected_timeout", content => Msg}, Channel),
{ok, Channel}.
@ -591,10 +599,18 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire_timer, Interval, Channel)
end.
process_connect(
Frame,

View File

@ -11,6 +11,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
-define(BYTE, 8 / big - integer).
-define(WORD, 16 / big - integer).
@ -52,6 +53,14 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
init_per_testcase(_, Config) ->
snabbkaffe:start_trace(),
Config.
end_per_testcase(_, _Config) ->
snabbkaffe:stop(),
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% helper functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
encode(Cmd, Vin, Data) ->
@ -171,6 +180,28 @@ t_case01_login_channel_info(_Config) ->
ok = gen_tcp:close(Socket).
t_case01_auth_expire(_Config) ->
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
?assertWaitEvent(
begin
{ok, _Socket} = login_first()
end,
#{
?snk_kind := conn_process_terminated,
clientid := <<"1G1BL52P7TR115520">>,
reason := {shutdown, expired}
},
5000
).
t_case02_reportinfo_0x01(_Config) ->
% send VEHICLE LOGIN
{ok, Socket} = login_first(),

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
{env, []},

View File

@ -202,6 +202,8 @@ handle_timeout(_, {transport, _} = Msg, Channel) ->
call_session(timeout, Msg, Channel);
handle_timeout(_, disconnect, Channel) ->
{shutdown, normal, Channel};
handle_timeout(_, connection_expire, Channel) ->
{shutdown, expired, Channel};
handle_timeout(_, _, Channel) ->
{ok, Channel}.
@ -353,10 +355,18 @@ ensure_connected(
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
make_timer(connection_expire, Interval, connection_expire, Channel)
end.
%%--------------------------------------------------------------------
%% Ensure disconnected

View File

@ -36,6 +36,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-record(coap_content, {content_format, payload = <<>>}).
@ -66,6 +67,7 @@ groups() ->
[
{test_grp_0_register, [RepeatOpt], [
case01_register,
case01_auth_expire,
case01_register_additional_opts,
%% TODO now we can't handle partial decode packet
%% case01_register_incorrect_opts,
@ -145,6 +147,7 @@ end_per_suite(Config) ->
Config.
init_per_testcase(TestCase, Config) ->
snabbkaffe:start_trace(),
GatewayConfig =
case TestCase of
case09_auto_observe ->
@ -171,6 +174,7 @@ end_per_testcase(_AllTestCase, Config) ->
timer:sleep(300),
gen_udp:close(?config(sock, Config)),
emqtt:disconnect(?config(emqx_c, Config)),
snabbkaffe:stop(),
ok = application:stop(emqx_gateway).
default_config() ->
@ -280,6 +284,43 @@ case01_register(Config) ->
timer:sleep(50),
false = lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics()).
case01_auth_expire(Config) ->
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
%%----------------------------------------
%% REGISTER command
%%----------------------------------------
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",
MsgId = 12,
?assertWaitEvent(
test_send_coap_request(
UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=345&lwm2m=1", [?PORT, Epn]),
#coap_content{
content_format = <<"text/plain">>,
payload = <<"</1>, </2>, </3>, </4>, </5>">>
},
[],
MsgId
),
#{
?snk_kind := conn_process_terminated,
clientid := <<"urn:oma:lwm2m:oma:3">>,
reason := {shutdown, expired}
},
5000
).
case01_register_additional_opts(Config) ->
%%----------------------------------------
%% REGISTER command

View File

@ -364,10 +364,18 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire, Interval, Channel)
end.
process_connect(
Channel = #channel{
@ -2122,6 +2130,9 @@ handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(_TRef, expire_asleep, Channel) ->
shutdown(asleep_timeout, Channel);
handle_timeout(_TRef, connection_expire, Channel) ->
NChannel = clean_timer(connection_expire, Channel),
handle_out(disconnect, expired, NChannel);
handle_timeout(_TRef, Msg, Channel) ->
%% NOTE
%% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with

View File

@ -33,6 +33,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -141,6 +142,14 @@ end_per_suite(Config) ->
emqx_common_test_http:delete_default_app(),
emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_TestCase, Config) ->
snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
snabbkaffe:stop(),
ok.
restart_mqttsn_with_subs_resume_on() ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
@ -206,6 +215,36 @@ t_connect(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_auth_expire(_) ->
SockName = {'mqttsn:udp:default', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
?assertWaitEvent(
begin
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test1">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket)
end,
#{
?snk_kind := conn_process_terminated,
clientid := <<"client_id_test1">>,
reason := {shutdown, expired}
},
5000
).
t_first_disconnect(_) ->
SockName = {'mqttsn:udp:default', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),

View File

@ -89,7 +89,8 @@
-type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive
alive_timer => keepalive,
connection_expire_timer => connection_expire
}).
-define(INFO_KEYS, [
@ -315,20 +316,13 @@ enrich_client(
expiry_interval => 0,
receive_maximum => 1
},
NClientInfo = fix_mountpoint(
NClientInfo =
ClientInfo#{
clientid => ClientId,
username => Username
}
),
},
{ok, Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}}.
fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
ClientInfo#{mountpoint := Mountpoint1}.
set_log_meta(#channel{
clientinfo = #{clientid := ClientId},
conninfo = #{peername := Peername}
@ -350,15 +344,14 @@ check_banned(_UserInfo, #channel{clientinfo = ClientInfo}) ->
auth_connect(
#{password := Password},
#channel{clientinfo = ClientInfo} = Channel
#channel{ctx = Ctx, clientinfo = ClientInfo} = Channel
) ->
#{
clientid := ClientId,
username := Username
} = ClientInfo,
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
{ok, AuthResult} ->
NClientInfo = maps:merge(ClientInfo, AuthResult),
case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?SLOG(warning, #{
@ -659,6 +652,9 @@ handle_timeout(
{error, timeout} ->
handle_out(disconnect, keepalive_timeout, Channel)
end;
handle_timeout(_TRef, connection_expire, Channel) ->
%% No take over implemented, so just shutdown
shutdown(expired, Channel);
handle_timeout(_TRef, Msg, Channel) ->
?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
{ok, Channel}.
@ -796,10 +792,18 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire_timer, Interval, Channel)
end.
ensure_disconnected(
Reason,

View File

@ -20,6 +20,7 @@
-include("emqx_ocpp.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[OCPP/WS]").
@ -513,7 +514,8 @@ websocket_close(Reason, State) ->
handle_info({sock_closed, Reason}, State).
terminate(Reason, _Req, #state{channel = Channel}) ->
?SLOG(debug, #{msg => "terminated", reason => Reason}),
ClientId = emqx_ocpp_channel:info(clientid, Channel),
?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
emqx_ocpp_channel:terminate(Reason, Channel);
terminate(_Reason, _Req, _UnExpectedState) ->
ok.

View File

@ -19,6 +19,7 @@
-include("emqx_ocpp.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
-compile(export_all).
-compile(nowarn_export_all).
@ -32,8 +33,6 @@
]
).
-define(HEARTBEAT, <<$\n>>).
%% erlfmt-ignore
-define(CONF_DEFAULT, <<"
gateway.ocpp {
@ -82,6 +81,14 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
init_per_testcase(_TestCase, Config) ->
snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
snabbkaffe:stop(),
ok.
default_config() ->
?CONF_DEFAULT.
@ -188,6 +195,26 @@ t_adjust_keepalive_timer(_Config) ->
?assertEqual(undefined, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)),
ok.
t_auth_expire(_Config) ->
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
?assertWaitEvent(
{ok, _ClientPid} = connect("127.0.0.1", 33033, <<"client1">>),
#{
?snk_kind := conn_process_terminated,
clientid := <<"client1">>,
reason := {shutdown, expired}
},
5000
).
t_listeners_status(_Config) ->
{200, [Listener]} = request(get, "/gateways/ocpp/listeners"),
?assertMatch(

View File

@ -93,7 +93,8 @@
-define(TIMER_TABLE, #{
incoming_timer => keepalive,
outgoing_timer => keepalive_send,
clean_trans_timer => clean_trans
clean_trans_timer => clean_trans,
connection_expire_timer => connection_expire
}).
-define(TRANS_TIMEOUT, 60000).
@ -356,10 +357,18 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire_timer, Interval, Channel)
end.
process_connect(
Channel = #channel{
@ -1137,7 +1146,10 @@ handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
end,
Trans
),
{ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})}.
{ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})};
handle_timeout(_TRef, connection_expire, Channel) ->
%% No session take over implemented, just shut down
shutdown(expired, Channel).
%%--------------------------------------------------------------------
%% Terminate

View File

@ -18,6 +18,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
-include("emqx_stomp.hrl").
-compile(export_all).
@ -78,6 +79,14 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
init_per_testcase(_TestCase, Config) ->
snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
snabbkaffe:stop(),
ok.
default_config() ->
?CONF_DEFAULT.
@ -141,6 +150,34 @@ t_connect(_) ->
end,
with_connection(ProtocolError).
t_auth_expire(_) ->
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
ConnectWithExpire = fun(Sock) ->
?assertWaitEvent(
begin
ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>),
{ok, Frame} = recv_a_frame(Sock),
?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command)
end,
#{
?snk_kind := conn_process_terminated,
clientid := _,
reason := {shutdown, expired}
},
5000
)
end,
with_connection(ConnectWithExpire),
meck:unload(emqx_access_control).
t_heartbeat(_) ->
%% Test heart beat
with_connection(fun(Sock) ->

View File

@ -0,0 +1,10 @@
## Breaking changes
For JWT authentication, support new `disconnect_after_expire` option. When enabled, the client will be disconnected after the JWT token expires.
This option is enabled by default, so the default behavior is changed.
Previously, the clients with actual JWTs could connect to the broker and stay connected
even after the JWT token expired.
Now, the client will be disconnected after the JWT token expires.
To preserve the previous behavior, set `disconnect_after_expire` to `false`.

View File

@ -139,4 +139,10 @@ Authentication will verify that the value of claims in the JWT (taken from the P
verify_claims.label:
"""Verify Claims"""
disconnect_after_expire.desc:
"""Disconnect the client after the token expires."""
disconnect_after_expire.label:
"""Disconnect After Expire"""
}