From 80d724c5043e20b9fb7c9943a67b8056b9d918d3 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Sat, 27 Apr 2024 12:25:47 +0300 Subject: [PATCH 1/2] feat(authn): add connection expire based on authn data --- apps/emqx/src/emqx_channel.erl | 37 ++++++-- apps/emqx/test/emqx_channel_SUITE.erl | 1 + .../test/emqx_connection_expire_SUITE.erl | 54 +++++++++++ .../src/emqx_authn/emqx_authn_chains.erl | 1 + apps/emqx_auth_jwt/src/emqx_authn_jwt.erl | 51 ++++++---- .../src/emqx_authn_jwt_schema.erl | 6 ++ .../test/emqx_authn_jwt_SUITE.erl | 14 ++- .../test/emqx_authn_jwt_expire_SUITE.erl | 93 +++++++++++++++++++ .../test/emqx_authz_jwt_SUITE.erl | 5 +- .../src/bhvrs/emqx_gateway_conn.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_ctx.erl | 22 ++++- .../test/emqx_gateway_ctx_SUITE.erl | 2 +- .../src/emqx_coap_channel.erl | 12 ++- .../src/emqx_gateway_coap.app.src | 2 +- .../test/emqx_coap_SUITE.erl | 36 +++++++ .../src/emqx_exproto_channel.erl | 15 ++- .../src/emqx_gateway_exproto.app.src | 2 +- .../test/emqx_exproto_SUITE.erl | 49 ++++++++-- .../src/emqx_gateway_gbt32960.app.src | 2 +- .../src/emqx_gbt32960_channel.erl | 22 ++++- .../test/emqx_gbt32960_SUITE.erl | 31 +++++++ .../src/emqx_gateway_lwm2m.app.src | 2 +- .../src/emqx_lwm2m_channel.erl | 14 ++- .../test/emqx_lwm2m_SUITE.erl | 41 ++++++++ .../src/emqx_mqttsn_channel.erl | 15 ++- .../test/emqx_sn_protocol_SUITE.erl | 39 ++++++++ .../src/emqx_ocpp_channel.erl | 36 +++---- .../src/emqx_ocpp_connection.erl | 4 +- .../test/emqx_ocpp_SUITE.erl | 31 ++++++- .../src/emqx_stomp_channel.erl | 20 +++- .../test/emqx_stomp_SUITE.erl | 37 ++++++++ changes/ce/feat-12947.en.md | 10 ++ rel/i18n/emqx_authn_jwt_schema.hocon | 6 ++ 33 files changed, 635 insertions(+), 79 deletions(-) create mode 100644 apps/emqx/test/emqx_connection_expire_SUITE.erl create mode 100644 apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl create mode 100644 changes/ce/feat-12947.en.md diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 05358f889..b3b384541 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1075,7 +1075,7 @@ handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) - Packet = ?DISCONNECT_PACKET(ReasonCode, Props), {ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel}; handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) -> - {ok, {close, ReasonName}, Channel}; + {ok, ?REPLY_CLOSE(ReasonName), Channel}; handle_out(auth, {ReasonCode, Properties}, Channel) -> {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel}; handle_out(Type, Data, Channel) -> @@ -1406,6 +1406,16 @@ handle_timeout( {_, Quota2} -> {ok, clean_timer(TimerName, Channel#channel{quota = Quota2})} end; +handle_timeout( + _TRef, + connection_expire, + #channel{conn_state = ConnState} = Channel0 +) -> + Channel1 = clean_timer(connection_expire, Channel0), + case ConnState of + disconnected -> {ok, Channel1}; + _ -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel1) + end; handle_timeout(TRef, Msg, Channel) -> case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of [] -> @@ -1810,18 +1820,23 @@ log_auth_failure(Reason) -> %% Merge authentication result into ClientInfo %% Authentication result may include: %% 1. `is_superuser': The superuser flag from various backends -%% 2. `acl': ACL rules from JWT, HTTP auth backend -%% 3. `client_attrs': Extra client attributes from JWT, HTTP auth backend -%% 4. Maybe more non-standard fields used by hook callbacks +%% 2. `expire_at`: Authentication validity deadline, the client will be disconnected after this time +%% 3. `acl': ACL rules from JWT, HTTP auth backend +%% 4. `client_attrs': Extra client attributes from JWT, HTTP auth backend +%% 5. Maybe more non-standard fields used by hook callbacks merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) -> IsSuperuser = maps:get(is_superuser, AuthResult0, false), - AuthResult = maps:without([client_attrs], AuthResult0), + ExpireAt = maps:get(expire_at, AuthResult0, undefined), + AuthResult = maps:without([client_attrs, expire_at], AuthResult0), Attrs0 = maps:get(client_attrs, ClientInfo, #{}), Attrs1 = maps:get(client_attrs, AuthResult0, #{}), Attrs = maps:merge(Attrs0, Attrs1), NewClientInfo = maps:merge( ClientInfo#{client_attrs => Attrs}, - AuthResult#{is_superuser => IsSuperuser} + AuthResult#{ + is_superuser => IsSuperuser, + auth_expire_at => ExpireAt + } ), fix_mountpoint(NewClientInfo). @@ -2228,10 +2243,16 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = trim_conninfo(NConnInfo), conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := undefined}}) -> + Channel; +schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := ExpireAt}}) -> + Interval = max(0, ExpireAt - erlang:system_time(millisecond)), + ensure_timer(connection_expire, Interval, Channel). trim_conninfo(ConnInfo) -> maps:without( diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 4b3fa1209..a30cb33f6 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -1061,6 +1061,7 @@ clientinfo(InitProps) -> clientid => <<"clientid">>, username => <<"username">>, is_superuser => false, + auth_expire_at => undefined, is_bridge => false, mountpoint => undefined }, diff --git a/apps/emqx/test/emqx_connection_expire_SUITE.erl b/apps/emqx/test/emqx_connection_expire_SUITE.erl new file mode 100644 index 000000000..c7a76dc2a --- /dev/null +++ b/apps/emqx/test/emqx_connection_expire_SUITE.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connection_expire_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +%%-------------------------------------------------------------------- +%% CT callbacks +%%-------------------------------------------------------------------- + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(proplists:get_value(apps, Config)). + +t_disonnect_by_auth_info(_) -> + _ = process_flag(trap_exit, true), + + _ = meck:new(emqx_access_control, [passthrough, no_history]), + _ = meck:expect(emqx_access_control, authenticate, fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end), + + {ok, C} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + + receive + {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok + after 5000 -> + ct:fail("Client should be disconnected by timeout") + end. diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index 0d21058e3..ba017791c 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -142,6 +142,7 @@ end). -type state() :: #{atom() => term()}. -type extra() :: #{ is_superuser := boolean(), + expire_at => pos_integer(), atom() => term() }. -type user_info() :: #{ diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index 2a12f5acf..f3fee4acf 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -76,6 +76,7 @@ authenticate( Credential, #{ verify_claims := VerifyClaims0, + disconnect_after_expire := DisconnectAfterExpire, jwk := JWK, acl_claim_name := AclClaimName, from := From @@ -84,11 +85,12 @@ authenticate( JWT = maps:get(From, Credential), JWKs = [JWK], VerifyClaims = replace_placeholder(VerifyClaims0, Credential), - verify(JWT, JWKs, VerifyClaims, AclClaimName); + verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire); authenticate( Credential, #{ verify_claims := VerifyClaims0, + disconnect_after_expire := DisconnectAfterExpire, jwk_resource := ResourceId, acl_claim_name := AclClaimName, from := From @@ -104,7 +106,7 @@ authenticate( {ok, JWKs} -> JWT = maps:get(From, Credential), VerifyClaims = replace_placeholder(VerifyClaims0, Credential), - verify(JWT, JWKs, VerifyClaims, AclClaimName) + verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire) end. destroy(#{jwk_resource := ResourceId}) -> @@ -123,6 +125,7 @@ create2(#{ secret := Secret0, secret_base64_encoded := Base64Encoded, verify_claims := VerifyClaims, + disconnect_after_expire := DisconnectAfterExpire, acl_claim_name := AclClaimName, from := From }) -> @@ -134,6 +137,7 @@ create2(#{ {ok, #{ jwk => JWK, verify_claims => VerifyClaims, + disconnect_after_expire => DisconnectAfterExpire, acl_claim_name => AclClaimName, from => From }} @@ -143,6 +147,7 @@ create2(#{ algorithm := 'public-key', public_key := PublicKey, verify_claims := VerifyClaims, + disconnect_after_expire := DisconnectAfterExpire, acl_claim_name := AclClaimName, from := From }) -> @@ -150,6 +155,7 @@ create2(#{ {ok, #{ jwk => JWK, verify_claims => VerifyClaims, + disconnect_after_expire => DisconnectAfterExpire, acl_claim_name => AclClaimName, from => From }}; @@ -157,6 +163,7 @@ create2( #{ use_jwks := true, verify_claims := VerifyClaims, + disconnect_after_expire := DisconnectAfterExpire, acl_claim_name := AclClaimName, from := From } = Config @@ -171,6 +178,7 @@ create2( {ok, #{ jwk_resource => ResourceId, verify_claims => VerifyClaims, + disconnect_after_expire => DisconnectAfterExpire, acl_claim_name => AclClaimName, from => From }}. @@ -214,23 +222,12 @@ replace_placeholder([{Name, {placeholder, PL}} | More], Variables, Acc) -> replace_placeholder([{Name, Value} | More], Variables, Acc) -> replace_placeholder(More, Variables, [{Name, Value} | Acc]). -verify(undefined, _, _, _) -> +verify(undefined, _, _, _, _) -> ignore; -verify(JWT, JWKs, VerifyClaims, AclClaimName) -> +verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire) -> case do_verify(JWT, JWKs, VerifyClaims) of {ok, Extra} -> - IsSuperuser = emqx_authn_utils:is_superuser(Extra), - Attrs = emqx_authn_utils:client_attrs(Extra), - try - ACL = acl(Extra, AclClaimName), - Result = maps:merge(IsSuperuser, maps:merge(ACL, Attrs)), - {ok, Result} - catch - throw:{bad_acl_rule, Reason} -> - %% it's a invalid token, so ok to log - ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}), - {error, bad_username_or_password} - end; + extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire); {error, {missing_claim, Claim}} -> %% it's a invalid token, so it's ok to log ?TRACE_AUTHN_PROVIDER("missing_jwt_claim", #{jwt => JWT, claim => Claim}), @@ -245,6 +242,25 @@ verify(JWT, JWKs, VerifyClaims, AclClaimName) -> {error, bad_username_or_password} end. +extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire) -> + IsSuperuser = emqx_authn_utils:is_superuser(Extra), + Attrs = emqx_authn_utils:client_attrs(Extra), + ExpireAt = expire_at(DisconnectAfterExpire, Extra), + try + ACL = acl(Extra, AclClaimName), + Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), + {ok, Result} + catch + throw:{bad_acl_rule, Reason} -> + %% it's a invalid token, so ok to log + ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}), + {error, bad_username_or_password} + end. + +expire_at(false, _Extra) -> #{}; +expire_at(true, #{<<"exp">> := ExpireTime}) -> #{expire_at => ExpireTime}; +expire_at(true, #{}) -> #{}. + acl(Claims, AclClaimName) -> case Claims of #{AclClaimName := Rules} -> @@ -397,3 +413,6 @@ parse_rule(Rule) -> {error, Reason} -> throw({bad_acl_rule, Reason}) end. + +merge_maps([]) -> #{}; +merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)). diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl index e7bf0a11a..aff0f12c7 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl @@ -122,6 +122,7 @@ common_fields() -> desc => ?DESC(acl_claim_name) }}, {verify_claims, fun verify_claims/1}, + {disconnect_after_expire, fun disconnect_after_expire/1}, {from, fun from/1} ] ++ emqx_authn_schema:common_fields(). @@ -172,6 +173,11 @@ verify_claims(required) -> verify_claims(_) -> undefined. +disconnect_after_expire(type) -> boolean(); +disconnect_after_expire(desc) -> ?DESC(?FUNCTION_NAME); +disconnect_after_expire(default) -> true; +disconnect_after_expire(_) -> undefined. + do_check_verify_claims([]) -> true; do_check_verify_claims([{Name, Expected} | More]) -> diff --git a/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl index 59abe1bbc..8bf0cc68a 100644 --- a/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl @@ -55,7 +55,8 @@ t_hmac_based(_) -> algorithm => 'hmac-based', secret => Secret, secret_base64_encoded => false, - verify_claims => [{<<"username">>, <<"${username}">>}] + verify_claims => [{<<"username">>, <<"${username}">>}], + disconnect_after_expire => false }, {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config), @@ -179,7 +180,8 @@ t_public_key(_) -> use_jwks => false, algorithm => 'public-key', public_key => PublicKey, - verify_claims => [] + verify_claims => [], + disconnect_after_expire => false }, {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config), @@ -207,7 +209,8 @@ t_jwt_in_username(_) -> algorithm => 'hmac-based', secret => Secret, secret_base64_encoded => false, - verify_claims => [] + verify_claims => [], + disconnect_after_expire => false }, {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config), @@ -238,7 +241,7 @@ t_jwks_renewal(_Config) -> algorithm => 'public-key', ssl => #{enable => false}, verify_claims => [], - + disconnect_after_expire => false, use_jwks => true, endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH, refresh_interval => 1000, @@ -335,7 +338,8 @@ t_verify_claims(_) -> algorithm => 'hmac-based', secret => Secret, secret_base64_encoded => false, - verify_claims => [{<<"foo">>, <<"bar">>}] + verify_claims => [{<<"foo">>, <<"bar">>}], + disconnect_after_expire => false }, {ok, State0} = emqx_authn_jwt:create(?AUTHN_ID, Config0), diff --git a/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl new file mode 100644 index 000000000..afcbe3ed9 --- /dev/null +++ b/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl @@ -0,0 +1,93 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authn_jwt_expire_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_auth/include/emqx_authn.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(PATH, [authentication]). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_testcase(_, Config) -> + _ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL), + Config. + +end_per_testcase(_, _Config) -> + _ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL), + ok. + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_jwt], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL), + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +%%-------------------------------------------------------------------- +%% CT cases +%%-------------------------------------------------------------------- + +t_jwt_expire(_Config) -> + _ = process_flag(trap_exit, true), + + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, auth_config()} + ), + + {ok, [#{provider := emqx_authn_jwt}]} = emqx_authn_chains:list_authenticators(?GLOBAL), + + Payload = #{ + <<"username">> => <<"myuser">>, + <<"exp">> => erlang:system_time(second) + 2 + }, + JWS = emqx_authn_jwt_SUITE:generate_jws('hmac-based', Payload, <<"secret">>), + + {ok, C} = emqtt:start_link([{username, <<"myuser">>}, {password, JWS}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + + receive + {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok + after 5000 -> + ct:fail("Client should be disconnected by timeout") + end. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +auth_config() -> + #{ + <<"use_jwks">> => false, + <<"algorithm">> => <<"hmac-based">>, + <<"acl_claim_name">> => <<"acl">>, + <<"secret">> => <<"secret">>, + <<"mechanism">> => <<"jwt">>, + <<"verify_claims">> => #{<<"username">> => <<"${username}">>} + %% Should be enabled by default + %% <<"disconnect_after_expire">> => true + }. diff --git a/apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl index e7f78230a..813cb20e4 100644 --- a/apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl @@ -455,11 +455,12 @@ t_invalid_rule(_Config) -> authn_config() -> #{ <<"mechanism">> => <<"jwt">>, - <<"use_jwks">> => <<"false">>, + <<"use_jwks">> => false, <<"algorithm">> => <<"hmac-based">>, <<"secret">> => ?SECRET, - <<"secret_base64_encoded">> => <<"false">>, + <<"secret_base64_encoded">> => false, <<"acl_claim_name">> => <<"acl">>, + <<"disconnect_after_expire">> => false, <<"verify_claims">> => #{ <<"username">> => ?PH_USERNAME } diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8df531a43..84dfe44a2 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -518,7 +518,7 @@ handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({sock_error, Reason}, State); handle_msg({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), + ?tp(debug, force_socket_close, #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg( {event, connected}, diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index 3609356dd..014c54ec7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -39,6 +39,7 @@ %% Authentication circle -export([ authenticate/2, + connection_expire_interval/2, open_session/5, open_session/6, insert_channel_info/4, @@ -78,6 +79,13 @@ authenticate(_Ctx, ClientInfo0) -> {error, Reason} end. +-spec connection_expire_interval(context(), emqx_types:clientinfo()) -> + undefined | non_neg_integer(). +connection_expire_interval(_Ctx, #{auth_expire_at := undefined}) -> + undefined; +connection_expire_interval(_Ctx, #{auth_expire_at := ExpireAt}) -> + max(0, ExpireAt - erlang:system_time(millisecond)). + %% @doc Register the session to the cluster. %% %% This function should be called after the client has authenticated @@ -157,6 +165,9 @@ set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) -> connection_closed(_Ctx = #{gwname := GwName}, ClientId) -> emqx_gateway_cm:connection_closed(GwName, ClientId). +%%-------------------------------------------------------------------- +%% Message circle + -spec authorize( context(), emqx_types:clientinfo(), @@ -167,6 +178,9 @@ connection_closed(_Ctx = #{gwname := GwName}, ClientId) -> authorize(_Ctx, ClientInfo, Action, Topic) -> emqx_access_control:authorize(ClientInfo, Action, Topic). +%%-------------------------------------------------------------------- +%% Metrics & Stats + metrics_inc(_Ctx = #{gwname := GwName}, Name) -> emqx_gateway_metrics:inc(GwName, Name). @@ -183,6 +197,8 @@ eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) -> MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo), ClientInfo#{mountpoint := MountPoint1}. -merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) -> - IsSuperuser = maps:get(is_superuser, AuthResult, false), - maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}). +merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) -> + IsSuperuser = maps:get(is_superuser, AuthResult0, false), + ExpireAt = maps:get(expire_at, AuthResult0, undefined), + AuthResult1 = maps:without([expire_at], AuthResult0), + maps:merge(ClientInfo#{auth_expire_at => ExpireAt}, AuthResult1#{is_superuser => IsSuperuser}). diff --git a/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl index aead0e554..3d30aa585 100644 --- a/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl @@ -82,4 +82,4 @@ t_authenticate(_) -> ?assertMatch({ok, #{is_superuser := true}}, emqx_gateway_ctx:authenticate(Ctx, Info4)), ok. -default_result(Info) -> Info#{zone => default, is_superuser => false}. +default_result(Info) -> Info#{zone => default, is_superuser => false, auth_expire_at => undefined}. diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index c753fd361..fbab1ff14 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -214,6 +214,8 @@ handle_timeout(_, {transport, Msg}, Channel) -> call_session(timeout, Msg, Channel); handle_timeout(_, disconnect, Channel) -> {shutdown, normal, Channel}; +handle_timeout(_, connection_expire, Channel) -> + {shutdown, expired, Channel}; handle_timeout(_, _, Channel) -> {ok, Channel}. @@ -595,6 +597,14 @@ process_connect( iter(Iter, reply({error, bad_request}, Msg, Result), Channel) end. +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire_timer, Interval, connection_expire, Channel) + end. + run_hooks(Ctx, Name, Args) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run(Name, Args). @@ -619,7 +629,7 @@ ensure_connected( NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]), ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{conninfo = NConnInfo, conn_state = connected}. + schedule_connection_expire(Channel#channel{conninfo = NConnInfo, conn_state = connected}). %%-------------------------------------------------------------------- %% Ensure disconnected diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index 5f17360a7..3a715eac4 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index c3a35774c..3201d5dbf 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -29,6 +29,7 @@ -include_lib("er_coap_client/include/coap.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -83,6 +84,17 @@ init_per_testcase(t_connection_with_authn_failed, Config) -> fun(_) -> {error, bad_username_or_password} end ), Config; +init_per_testcase(t_connection_with_expire, Config) -> + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 100}} + end + ), + snabbkaffe:start_trace(), + Config; init_per_testcase(t_heartbeat, Config) -> NewHeartbeat = 800, OldConf = emqx:get_raw_config([gateway, coap]), @@ -103,6 +115,10 @@ end_per_testcase(t_heartbeat, Config) -> OldConf = ?config(old_conf, Config), {ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf), ok; +end_per_testcase(t_connection_with_expire, Config) -> + snabbkaffe:stop(), + meck:unload(emqx_access_control), + Config; end_per_testcase(_, Config) -> ok = meck:unload(emqx_access_control), Config. @@ -270,6 +286,26 @@ t_connection_with_authn_failed(_) -> ), ok. +t_connection_with_expire(_) -> + ChId = {{127, 0, 0, 1}, 5683}, + {ok, Sock} = er_coap_udp_socket:start_link(), + {ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId), + + URI = ?MQTT_PREFIX ++ "/connection?clientid=client1", + + ?assertWaitEvent( + begin + Req = make_req(post), + {ok, created, _Data} = do_request(Channel, URI, Req) + end, + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"client1">>, + reason := {shutdown, expired} + }, + 5000 + ). + t_publish(_) -> %% can publish to a normal topic Topics = [ diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 7268fa77a..c145506c9 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -302,6 +302,9 @@ handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) - {shutdown, Reason, Channel}; handle_timeout(_TRef, force_close_idle, Channel) -> {shutdown, idle_timeout, Channel}; +handle_timeout(_TRef, connection_expire, Channel) -> + NChannel = remove_timer_ref(connection_expire, Channel), + {ok, [{event, disconnected}, {close, expired}], NChannel}; handle_timeout(_TRef, Msg, Channel) -> ?SLOG(warning, #{ msg => "unexpected_timeout_signal", @@ -666,10 +669,18 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire, Interval, Channel) + end. ensure_disconnected( Reason, diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 3d11acf12..34fcca216 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.9"}, + {vsn, "0.1.10"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index 2517e9fa3..2e73ce8b8 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -81,6 +82,7 @@ groups() -> t_raw_publish, t_auth_deny, t_acl_deny, + t_auth_expire, t_hook_connected_disconnected, t_hook_session_subscribed_unsubscribed, t_hook_message_delivered @@ -157,14 +159,17 @@ end_per_group(_, Cfg) -> init_per_testcase(TestCase, Cfg) when TestCase == t_enter_passive_mode -> + snabbkaffe:start_trace(), case proplists:get_value(listener_type, Cfg) of udp -> {skip, ignore}; _ -> Cfg end; init_per_testcase(_TestCase, Cfg) -> + snabbkaffe:start_trace(), Cfg. end_per_testcase(_TestCase, _Cfg) -> + snabbkaffe:stop(), ok. listener_confs(Type) -> @@ -290,6 +295,42 @@ t_auth_deny(Cfg) -> end, meck:unload([emqx_gateway_ctx]). +t_auth_expire(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + ?assertWaitEvent( + begin + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000) + end, + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"test_client_1">>, + reason := {shutdown, expired} + }, + 5000 + ). + t_acl_deny(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), @@ -332,7 +373,6 @@ t_acl_deny(Cfg) -> close(Sock). t_keepalive_timeout(Cfg) -> - ok = snabbkaffe:start_trace(), SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), @@ -383,8 +423,7 @@ t_keepalive_timeout(Cfg) -> ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))), %% socket port should be closed ?assertEqual({error, closed}, recv(Sock, 5000)) - end, - snabbkaffe:stop(). + end. t_hook_connected_disconnected(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), @@ -513,7 +552,6 @@ t_hook_message_delivered(Cfg) -> emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}). t_idle_timeout(Cfg) -> - ok = snabbkaffe:start_trace(), SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), @@ -551,8 +589,7 @@ t_idle_timeout(Cfg) -> {ok, #{reason := {shutdown, idle_timeout}}}, ?block_until(#{?snk_kind := conn_process_terminated}, 10000) ) - end, - snabbkaffe:stop(). + end. %%-------------------------------------------------------------------- %% Utils diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index 155e4dc25..123b60203 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 0063ae4e0..9652290d3 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -72,7 +72,8 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, - retry_timer => retry_delivery + retry_timer => retry_delivery, + connection_expire_timer => connection_expire }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -468,6 +469,13 @@ handle_timeout( {Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}), {ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)} end; +handle_timeout( + _TRef, + connection_expire, + Channel +) -> + NChannel = clean_timer(connection_expire_timer, Channel), + {ok, [{event, disconnected}, {close, expired}], NChannel}; handle_timeout(_TRef, Msg, Channel) -> log(error, #{msg => "unexpected_timeout", content => Msg}, Channel), {ok, Channel}. @@ -591,10 +599,18 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire_timer, Interval, Channel) + end. process_connect( Frame, diff --git a/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl b/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl index db4d6da94..009ff74eb 100644 --- a/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl +++ b/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/asserts.hrl"). -define(BYTE, 8 / big - integer). -define(WORD, 16 / big - integer). @@ -52,6 +53,14 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), ok. +init_per_testcase(_, Config) -> + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_, _Config) -> + snabbkaffe:stop(), + ok. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% helper functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% encode(Cmd, Vin, Data) -> @@ -171,6 +180,28 @@ t_case01_login_channel_info(_Config) -> ok = gen_tcp:close(Socket). +t_case01_auth_expire(_Config) -> + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + ?assertWaitEvent( + begin + {ok, _Socket} = login_first() + end, + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"1G1BL52P7TR115520">>, + reason := {shutdown, expired} + }, + 5000 + ). + t_case02_reportinfo_0x01(_Config) -> % send VEHICLE LOGIN {ok, Socket} = login_first(), diff --git a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src index 36b6bcf4f..66b2db041 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src +++ b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_lwm2m, [ {description, "LwM2M Gateway"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]}, {env, []}, diff --git a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl index 82ea848bb..595041c53 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl @@ -202,6 +202,8 @@ handle_timeout(_, {transport, _} = Msg, Channel) -> call_session(timeout, Msg, Channel); handle_timeout(_, disconnect, Channel) -> {shutdown, normal, Channel}; +handle_timeout(_, connection_expire, Channel) -> + {shutdown, expired, Channel}; handle_timeout(_, _, Channel) -> {ok, Channel}. @@ -353,10 +355,18 @@ ensure_connected( NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + make_timer(connection_expire, Interval, connection_expire, Channel) + end. %%-------------------------------------------------------------------- %% Ensure disconnected diff --git a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl index 6ee08e735..c302c5cd3 100644 --- a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl @@ -36,6 +36,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -record(coap_content, {content_format, payload = <<>>}). @@ -66,6 +67,7 @@ groups() -> [ {test_grp_0_register, [RepeatOpt], [ case01_register, + case01_auth_expire, case01_register_additional_opts, %% TODO now we can't handle partial decode packet %% case01_register_incorrect_opts, @@ -145,6 +147,7 @@ end_per_suite(Config) -> Config. init_per_testcase(TestCase, Config) -> + snabbkaffe:start_trace(), GatewayConfig = case TestCase of case09_auto_observe -> @@ -171,6 +174,7 @@ end_per_testcase(_AllTestCase, Config) -> timer:sleep(300), gen_udp:close(?config(sock, Config)), emqtt:disconnect(?config(emqx_c, Config)), + snabbkaffe:stop(), ok = application:stop(emqx_gateway). default_config() -> @@ -280,6 +284,43 @@ case01_register(Config) -> timer:sleep(50), false = lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics()). +case01_auth_expire(Config) -> + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + %%---------------------------------------- + %% REGISTER command + %%---------------------------------------- + UdpSock = ?config(sock, Config), + Epn = "urn:oma:lwm2m:oma:3", + MsgId = 12, + + ?assertWaitEvent( + test_send_coap_request( + UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=345&lwm2m=1", [?PORT, Epn]), + #coap_content{ + content_format = <<"text/plain">>, + payload = <<", , , , ">> + }, + [], + MsgId + ), + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"urn:oma:lwm2m:oma:3">>, + reason := {shutdown, expired} + }, + 5000 + ). + case01_register_additional_opts(Config) -> %%---------------------------------------- %% REGISTER command diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index b840d53a3..501308ea0 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -364,10 +364,18 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire, Interval, Channel) + end. process_connect( Channel = #channel{ @@ -2122,6 +2130,9 @@ handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); handle_timeout(_TRef, expire_asleep, Channel) -> shutdown(asleep_timeout, Channel); +handle_timeout(_TRef, connection_expire, Channel) -> + NChannel = clean_timer(connection_expire, Channel), + handle_out(disconnect, expired, NChannel); handle_timeout(_TRef, Msg, Channel) -> %% NOTE %% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl index c35b93553..9a72c21bf 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl @@ -33,6 +33,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -141,6 +142,14 @@ end_per_suite(Config) -> emqx_common_test_http:delete_default_app(), emqx_cth_suite:stop(?config(suite_apps, Config)). +init_per_testcase(_TestCase, Config) -> + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TestCase, _Config) -> + snabbkaffe:stop(), + ok. + restart_mqttsn_with_subs_resume_on() -> Conf = emqx:get_raw_config([gateway, mqttsn]), emqx_gateway_conf:update_gateway( @@ -206,6 +215,36 @@ t_connect(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_auth_expire(_) -> + SockName = {'mqttsn:udp:default', 1884}, + ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), + + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + ?assertWaitEvent( + begin + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"client_id_test1">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket) + end, + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"client_id_test1">>, + reason := {shutdown, expired} + }, + 5000 + ). + t_first_disconnect(_) -> SockName = {'mqttsn:udp:default', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index d20b35d04..8473c9978 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -89,7 +89,8 @@ -type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ - alive_timer => keepalive + alive_timer => keepalive, + connection_expire_timer => connection_expire }). -define(INFO_KEYS, [ @@ -315,20 +316,13 @@ enrich_client( expiry_interval => 0, receive_maximum => 1 }, - NClientInfo = fix_mountpoint( + NClientInfo = ClientInfo#{ clientid => ClientId, username => Username - } - ), + }, {ok, Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}}. -fix_mountpoint(ClientInfo = #{mountpoint := undefined}) -> - ClientInfo; -fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> - Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), - ClientInfo#{mountpoint := Mountpoint1}. - set_log_meta(#channel{ clientinfo = #{clientid := ClientId}, conninfo = #{peername := Peername} @@ -350,15 +344,14 @@ check_banned(_UserInfo, #channel{clientinfo = ClientInfo}) -> auth_connect( #{password := Password}, - #channel{clientinfo = ClientInfo} = Channel + #channel{ctx = Ctx, clientinfo = ClientInfo} = Channel ) -> #{ clientid := ClientId, username := Username } = ClientInfo, - case emqx_access_control:authenticate(ClientInfo#{password => Password}) of - {ok, AuthResult} -> - NClientInfo = maps:merge(ClientInfo, AuthResult), + case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of + {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> ?SLOG(warning, #{ @@ -659,6 +652,9 @@ handle_timeout( {error, timeout} -> handle_out(disconnect, keepalive_timeout, Channel) end; +handle_timeout(_TRef, connection_expire, Channel) -> + %% No take over implemented, so just shutdown + shutdown(expired, Channel); handle_timeout(_TRef, Msg, Channel) -> ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}), {ok, Channel}. @@ -796,10 +792,18 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire_timer, Interval, Channel) + end. ensure_disconnected( Reason, diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl index 0932314fe..1b2434a85 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl @@ -20,6 +20,7 @@ -include("emqx_ocpp.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[OCPP/WS]"). @@ -513,7 +514,8 @@ websocket_close(Reason, State) -> handle_info({sock_closed, Reason}, State). terminate(Reason, _Req, #state{channel = Channel}) -> - ?SLOG(debug, #{msg => "terminated", reason => Reason}), + ClientId = emqx_ocpp_channel:info(clientid, Channel), + ?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}), emqx_ocpp_channel:terminate(Reason, Channel); terminate(_Reason, _Req, _UnExpectedState) -> ok. diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index e63f8891d..ce90d8202 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -19,6 +19,7 @@ -include("emqx_ocpp.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/asserts.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -32,8 +33,6 @@ ] ). --define(HEARTBEAT, <<$\n>>). - %% erlfmt-ignore -define(CONF_DEFAULT, <<" gateway.ocpp { @@ -82,6 +81,14 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), ok. +init_per_testcase(_TestCase, Config) -> + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TestCase, _Config) -> + snabbkaffe:stop(), + ok. + default_config() -> ?CONF_DEFAULT. @@ -188,6 +195,26 @@ t_adjust_keepalive_timer(_Config) -> ?assertEqual(undefined, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)), ok. +t_auth_expire(_Config) -> + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + ?assertWaitEvent( + {ok, _ClientPid} = connect("127.0.0.1", 33033, <<"client1">>), + #{ + ?snk_kind := conn_process_terminated, + clientid := <<"client1">>, + reason := {shutdown, expired} + }, + 5000 + ). + t_listeners_status(_Config) -> {200, [Listener]} = request(get, "/gateways/ocpp/listeners"), ?assertMatch( diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 71458f15e..ef33128d2 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -93,7 +93,8 @@ -define(TIMER_TABLE, #{ incoming_timer => keepalive, outgoing_timer => keepalive_send, - clean_trans_timer => clean_trans + clean_trans_timer => clean_trans, + connection_expire_timer => connection_expire }). -define(TRANS_TIMEOUT, 60000). @@ -356,10 +357,18 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ + schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected - }. + }). + +schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of + undefined -> + Channel; + Interval -> + ensure_timer(connection_expire_timer, Interval, Channel) + end. process_connect( Channel = #channel{ @@ -1137,7 +1146,10 @@ handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) -> end, Trans ), - {ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})}. + {ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})}; +handle_timeout(_TRef, connection_expire, Channel) -> + %% No session take over implemented, just shut down + shutdown(expired, Channel). %%-------------------------------------------------------------------- %% Terminate diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 64d95dc42..048e4f7ca 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include("emqx_stomp.hrl"). -compile(export_all). @@ -78,6 +79,14 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), ok. +init_per_testcase(_TestCase, Config) -> + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TestCase, _Config) -> + snabbkaffe:stop(), + ok. + default_config() -> ?CONF_DEFAULT. @@ -141,6 +150,34 @@ t_connect(_) -> end, with_connection(ProtocolError). +t_auth_expire(_) -> + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> + {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}} + end + ), + + ConnectWithExpire = fun(Sock) -> + ?assertWaitEvent( + begin + ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>), + {ok, Frame} = recv_a_frame(Sock), + ?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command) + end, + #{ + ?snk_kind := conn_process_terminated, + clientid := _, + reason := {shutdown, expired} + }, + 5000 + ) + end, + with_connection(ConnectWithExpire), + meck:unload(emqx_access_control). + t_heartbeat(_) -> %% Test heart beat with_connection(fun(Sock) -> diff --git a/changes/ce/feat-12947.en.md b/changes/ce/feat-12947.en.md new file mode 100644 index 000000000..470a61f80 --- /dev/null +++ b/changes/ce/feat-12947.en.md @@ -0,0 +1,10 @@ +## Breaking changes + +For JWT authentication, support new `disconnect_after_expire` option. When enabled, the client will be disconnected after the JWT token expires. + +This option is enabled by default, so the default behavior is changed. +Previously, the clients with actual JWTs could connect to the broker and stay connected +even after the JWT token expired. +Now, the client will be disconnected after the JWT token expires. + +To preserve the previous behavior, set `disconnect_after_expire` to `false`. diff --git a/rel/i18n/emqx_authn_jwt_schema.hocon b/rel/i18n/emqx_authn_jwt_schema.hocon index 4251358d8..a7a0aad09 100644 --- a/rel/i18n/emqx_authn_jwt_schema.hocon +++ b/rel/i18n/emqx_authn_jwt_schema.hocon @@ -139,4 +139,10 @@ Authentication will verify that the value of claims in the JWT (taken from the P verify_claims.label: """Verify Claims""" +disconnect_after_expire.desc: +"""Disconnect the client after the token expires.""" + +disconnect_after_expire.label: +"""Disconnect After Expire""" + } From e4154dd472d1d18d3d197462ae10565e1a0612c9 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 30 Apr 2024 19:01:16 +0300 Subject: [PATCH 2/2] feat(authn): use correct time resolution for setting channel expire in JWT authn --- apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl | 1 + apps/emqx_auth_jwt/src/emqx_authn_jwt.erl | 9 ++++++--- apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl | 7 +++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index ba017791c..946ef9ff3 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -142,6 +142,7 @@ end). -type state() :: #{atom() => term()}. -type extra() :: #{ is_superuser := boolean(), + %% millisecond timestamp expire_at => pos_integer(), atom() => term() }. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index f3fee4acf..ceaa2dfc2 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -257,9 +257,12 @@ extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire) -> {error, bad_username_or_password} end. -expire_at(false, _Extra) -> #{}; -expire_at(true, #{<<"exp">> := ExpireTime}) -> #{expire_at => ExpireTime}; -expire_at(true, #{}) -> #{}. +expire_at(false, _Extra) -> + #{}; +expire_at(true, #{<<"exp">> := ExpireTime}) -> + #{expire_at => erlang:convert_time_unit(ExpireTime, second, millisecond)}; +expire_at(true, #{}) -> + #{}. acl(Claims, AclClaimName) -> case Claims of diff --git a/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl index afcbe3ed9..91bd7189a 100644 --- a/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl @@ -61,9 +61,11 @@ t_jwt_expire(_Config) -> {ok, [#{provider := emqx_authn_jwt}]} = emqx_authn_chains:list_authenticators(?GLOBAL), + Expire = erlang:system_time(second) + 3, + Payload = #{ <<"username">> => <<"myuser">>, - <<"exp">> => erlang:system_time(second) + 2 + <<"exp">> => Expire }, JWS = emqx_authn_jwt_SUITE:generate_jws('hmac-based', Payload, <<"secret">>), @@ -71,7 +73,8 @@ t_jwt_expire(_Config) -> {ok, _} = emqtt:connect(C), receive - {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok + {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> + ?assert(erlang:system_time(second) >= Expire) after 5000 -> ct:fail("Client should be disconnected by timeout") end.