diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_client.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_client.erl new file mode 100644 index 000000000..37ff59379 --- /dev/null +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_client.erl @@ -0,0 +1,197 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authn_jwks_client). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("jose/include/jose_jwk.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + start_link/1, + stop/1 +]). + +-export([ + get_jwks/1, + update/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link(Opts) -> + gen_server:start_link(?MODULE, [Opts], []). + +stop(Pid) -> + gen_server:stop(Pid). + +get_jwks(Pid) -> + gen_server:call(Pid, get_cached_jwks, 5000). + +update(Pid, Opts) -> + gen_server:call(Pid, {update, Opts}, 5000). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Opts]) -> + State = handle_options(Opts), + {ok, refresh_jwks(State)}. + +handle_call(get_cached_jwks, _From, #{jwks := Jwks} = State) -> + {reply, {ok, Jwks}, State}; +handle_call({update, Opts}, _From, _State) -> + NewState = handle_options(Opts), + {reply, ok, refresh_jwks(NewState)}; +handle_call(_Req, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(refresh_jwks, State) -> + State0 = cancel_http_request(State), + State1 = refresh_jwks(State0), + ?tp(debug, refresh_jwks_by_timer, #{}), + {noreply, State1}; +handle_info( + {http, {RequestID, Result}}, + #{request_id := RequestID, endpoint := Endpoint} = State0 +) -> + ?tp(debug, jwks_endpoint_response, #{request_id => RequestID}), + State1 = State0#{request_id := undefined}, + NewState = + case Result of + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_request_jwks_endpoint", + endpoint => Endpoint, + reason => Reason + }), + State1; + {StatusLine, Headers, Body} -> + try + JWKS = jose_jwk:from(emqx_json:decode(Body, [return_maps])), + {_, JWKs} = JWKS#jose_jwk.keys, + State1#{jwks := JWKs} + catch + _:_ -> + ?SLOG(warning, #{ + msg => "invalid_jwks_returned", + endpoint => Endpoint, + status => StatusLine, + headers => Headers, + body => Body + }), + State1 + end + end, + {noreply, NewState}; +handle_info({http, {_, _}}, State) -> + %% ignore + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +handle_options(#{ + endpoint := Endpoint, + refresh_interval := RefreshInterval0, + ssl_opts := SSLOpts +}) -> + #{ + endpoint => Endpoint, + refresh_interval => limit_refresh_interval(RefreshInterval0), + ssl_opts => maps:to_list(SSLOpts), + jwks => [], + request_id => undefined + }. + +refresh_jwks( + #{ + endpoint := Endpoint, + ssl_opts := SSLOpts + } = State +) -> + HTTPOpts = [ + {timeout, 5000}, + {connect_timeout, 5000}, + {ssl, SSLOpts} + ], + NState = + case + httpc:request( + get, + {Endpoint, [{"Accept", "application/json"}]}, + HTTPOpts, + [{body_format, binary}, {sync, false}, {receiver, self()}] + ) + of + {error, Reason} -> + ?tp(warning, jwks_endpoint_request_fail, #{ + endpoint => Endpoint, + http_opts => HTTPOpts, + reason => Reason + }), + State; + {ok, RequestID} -> + ?tp(debug, jwks_endpoint_request_ok, #{request_id => RequestID}), + State#{request_id := RequestID} + end, + ensure_expiry_timer(NState). + +ensure_expiry_timer(State = #{refresh_interval := Interval}) -> + State#{refresh_timer => erlang:send_after(timer:seconds(Interval), self(), refresh_jwks)}. + +limit_refresh_interval(Interval) when Interval < 10 -> + 10; +limit_refresh_interval(Interval) -> + Interval. + +cancel_http_request(#{request_id := undefined} = State) -> + State; +cancel_http_request(#{request_id := RequestID} = State) -> + ok = httpc:cancel_request(RequestID), + receive + {http, _} -> ok + after 0 -> + ok + end, + State#{request_id => undefined}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index b2b249b27..86ad02c52 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,187 +16,72 @@ -module(emqx_authn_jwks_connector). --behaviour(gen_server). +-behaviour(emqx_resource). -include_lib("emqx/include/logger.hrl"). --include_lib("jose/include/jose_jwk.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). +%% callbacks of behaviour emqx_resource -export([ - start_link/1, - stop/1 + on_start/2, + on_stop/2, + on_query/4, + on_health_check/2, + connect/1 ]). --export([ - get_jwks/1, - update/2 -]). +-define(DEFAULT_POOL_SIZE, 8). -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). +on_start(InstId, Opts) -> + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + PoolOpts = [ + {pool_size, maps:get(pool_size, Opts, ?DEFAULT_POOL_SIZE)}, + {connector_opts, Opts} + ], + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, PoolOpts) of + ok -> {ok, #{pool_name => PoolName}}; + {error, Reason} -> {error, Reason} + end. -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +on_stop(_InstId, #{pool_name := PoolName}) -> + emqx_plugin_libs_pool:stop_pool(PoolName). -start_link(Opts) -> - gen_server:start_link(?MODULE, [Opts], []). - -stop(Pid) -> - gen_server:stop(Pid). - -get_jwks(Pid) -> - gen_server:call(Pid, get_cached_jwks, 5000). - -update(Pid, Opts) -> - gen_server:call(Pid, {update, Opts}, 5000). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Opts]) -> - ok = jose:json_module(jiffy), - State = handle_options(Opts), - {ok, refresh_jwks(State)}. - -handle_call(get_cached_jwks, _From, #{jwks := Jwks} = State) -> - {reply, {ok, Jwks}, State}; -handle_call({update, Opts}, _From, _State) -> - NewState = handle_options(Opts), - {reply, ok, refresh_jwks(NewState)}; -handle_call(_Req, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({refresh_jwks, _TRef, refresh}, #{request_id := RequestID} = State) -> - case RequestID of - undefined -> - ok; +on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) -> + Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover), + case Result of + {error, Reason} -> + ?SLOG(error, #{ + msg => "emqx_authn_jwks_client_query_failed", + connector => InstId, + command => get_jwks, + reason => Reason + }), + emqx_resource:query_failed(AfterQuery); _ -> - ok = httpc:cancel_request(RequestID), - receive - {http, _} -> ok - after 0 -> - ok - end + emqx_resource:query_success(AfterQuery) end, - {noreply, refresh_jwks(State)}; -handle_info( - {http, {RequestID, Result}}, - #{request_id := RequestID, endpoint := Endpoint} = State0 -) -> - ?tp(debug, jwks_endpoint_response, #{request_id => RequestID}), - State1 = State0#{request_id := undefined}, - NewState = - case Result of - {error, Reason} -> - ?SLOG(warning, #{ - msg => "failed_to_request_jwks_endpoint", - endpoint => Endpoint, - reason => Reason - }), - State1; - {StatusLine, Headers, Body} -> - try - JWKS = jose_jwk:from(emqx_json:decode(Body, [return_maps])), - {_, JWKs} = JWKS#jose_jwk.keys, - State1#{jwks := JWKs} - catch - _:_ -> - ?SLOG(warning, #{ - msg => "invalid_jwks_returned", - endpoint => Endpoint, - status => StatusLine, - headers => Headers, - body => Body - }), - State1 - end + Result; +on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) -> + lists:foreach( + fun({_, Worker}) -> + ok = ecpool_worker:exec(Worker, {emqx_authn_jwks_client, update, [Opts]}, infinity) end, - {noreply, NewState}; -handle_info({http, {_, _}}, State) -> - %% ignore - {noreply, State}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - _ = cancel_timer(State), + ecpool:workers(PoolName) + ), + emqx_resource:query_success(AfterQuery), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -handle_options(#{ - endpoint := Endpoint, - refresh_interval := RefreshInterval0, - ssl_opts := SSLOpts -}) -> - #{ - endpoint => Endpoint, - refresh_interval => limit_refresh_interval(RefreshInterval0), - ssl_opts => maps:to_list(SSLOpts), - jwks => [], - request_id => undefined - }. - -refresh_jwks( - #{ - endpoint := Endpoint, - ssl_opts := SSLOpts - } = State -) -> - HTTPOpts = [ - {timeout, 5000}, - {connect_timeout, 5000}, - {ssl, SSLOpts} - ], - NState = - case - httpc:request( - get, - {Endpoint, [{"Accept", "application/json"}]}, - HTTPOpts, - [{body_format, binary}, {sync, false}, {receiver, self()}] - ) - of - {error, Reason} -> - ?tp(warning, jwks_endpoint_request_fail, #{ - endpoint => Endpoint, - http_opts => HTTPOpts, - reason => Reason - }), - State; - {ok, RequestID} -> - ?tp(debug, jwks_endpoint_request_ok, #{request_id => RequestID}), - State#{request_id := RequestID} +on_health_check(_InstId, State = #{pool_name := PoolName}) -> + emqx_plugin_libs_pool:health_check( + PoolName, + fun(Pid) -> + case emqx_authn_jwks_client:get_jwks(Pid) of + {ok, _} -> true; + _ -> false + end end, - ensure_expiry_timer(NState). + State + ). -ensure_expiry_timer(State = #{refresh_interval := Interval}) -> - State#{refresh_timer => emqx_misc:start_timer(timer:seconds(Interval), refresh_jwks)}. - -cancel_timer(State = #{refresh_timer := undefined}) -> - State; -cancel_timer(State = #{refresh_timer := TRef}) -> - _ = emqx_misc:cancel_timer(TRef), - State#{refresh_timer := undefined}. - -limit_refresh_interval(Interval) when Interval < 10 -> - 10; -limit_refresh_interval(Interval) -> - Interval. +connect(Opts) -> + ConnectorOpts = proplists:get_value(connector_opts, Opts), + emqx_authn_jwks_client:start_link(ConnectorOpts). diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl index 388d36fdd..6b0924760 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl @@ -70,6 +70,7 @@ fields('jwks') -> [ {use_jwks, sc(hoconsc:enum([true]), #{desc => ""})}, {endpoint, fun endpoint/1}, + {pool_size, fun pool_size/1}, {refresh_interval, fun refresh_interval/1}, {ssl, #{ type => hoconsc:union([ @@ -170,6 +171,12 @@ verify_claims(converter) -> verify_claims(_) -> undefined. +pool_size(type) -> integer(); +pool_size(desc) -> "JWKS connection count"; +pool_size(default) -> 8; +pool_size(validator) -> [fun(I) -> I > 0 end]; +pool_size(_) -> undefined. + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -189,53 +196,68 @@ create(#{verify_claims := VerifyClaims} = Config) -> update( #{use_jwks := false} = Config, - #{jwk := Connector} -) when - is_pid(Connector) --> - _ = emqx_authn_jwks_connector:stop(Connector), + #{jwk_resource := ResourceId} +) -> + _ = emqx_resource:remove_local(ResourceId), create(Config); update(#{use_jwks := false} = Config, _State) -> create(Config); update( #{use_jwks := true} = Config, - #{jwk := Connector} = State -) when - is_pid(Connector) --> - ok = emqx_authn_jwks_connector:update(Connector, connector_opts(Config)), - case maps:get(verify_cliams, Config, undefined) of - undefined -> - {ok, State}; - VerifyClaims -> - {ok, State#{verify_claims => handle_verify_claims(VerifyClaims)}} + #{jwk_resource := ResourceId} = State +) -> + case emqx_resource:query(ResourceId, {update, connector_opts(Config)}) of + ok -> + case maps:get(verify_claims, Config, undefined) of + undefined -> + {ok, State}; + VerifyClaims -> + {ok, State#{verify_claims => handle_verify_claims(VerifyClaims)}} + end; + {error, Reason} -> + ?SLOG(error, #{ + msg => "jwks_client_option_update_failed", + resource => ResourceId, + reason => Reason + }) end; update(#{use_jwks := true} = Config, _State) -> create(Config). authenticate(#{auth_method := _}, _) -> ignore; -authenticate(Credential = #{password := JWT}, #{ - jwk := JWK, - verify_claims := VerifyClaims0 -}) -> - JWKs = - case erlang:is_pid(JWK) of - false -> - [JWK]; - true -> - {ok, JWKs0} = emqx_authn_jwks_connector:get_jwks(JWK), - JWKs0 - end, +authenticate( + Credential = #{password := JWT}, + #{ + verify_claims := VerifyClaims0, + jwk := JWK + } +) -> + JWKs = [JWK], VerifyClaims = replace_placeholder(VerifyClaims0, Credential), - case verify(JWT, JWKs, VerifyClaims) of - {ok, Extra} -> {ok, Extra}; - {error, invalid_signature} -> ignore; - {error, {claims, _}} -> {error, bad_username_or_password} + verify(JWT, JWKs, VerifyClaims); +authenticate( + Credential = #{password := JWT}, + #{ + verify_claims := VerifyClaims0, + jwk_resource := ResourceId + } +) -> + case emqx_resource:query(ResourceId, get_jwks) of + {error, Reason} -> + ?SLOG(error, #{ + msg => "get_jwks_failed", + resource => ResourceId, + reason => Reason + }), + ignore; + {ok, JWKs} -> + VerifyClaims = replace_placeholder(VerifyClaims0, Credential), + verify(JWT, JWKs, VerifyClaims) end. -destroy(#{jwk := Connector}) when is_pid(Connector) -> - _ = emqx_authn_jwks_connector:stop(Connector), +destroy(#{jwk_resource := ResourceId}) -> + _ = emqx_resource:remove_local(ResourceId), ok; destroy(_) -> ok. @@ -278,15 +300,17 @@ create2( verify_claims := VerifyClaims } = Config ) -> - case emqx_authn_jwks_connector:start_link(connector_opts(Config)) of - {ok, Connector} -> - {ok, #{ - jwk => Connector, - verify_claims => VerifyClaims - }}; - {error, Reason} -> - {error, Reason} - end. + ResourceId = emqx_authn_utils:make_resource_id(?MODULE), + {ok, _} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_authn_jwks_connector, + connector_opts(Config) + ), + {ok, #{ + jwk_resource => ResourceId, + verify_claims => VerifyClaims + }}. create_jwk_from_pem_or_file(CertfileOrFilePath) when is_binary(CertfileOrFilePath); @@ -328,9 +352,17 @@ replace_placeholder([{Name, {placeholder, PL}} | More], Variables, Acc) -> replace_placeholder([{Name, Value} | More], Variables, Acc) -> replace_placeholder(More, Variables, [{Name, Value} | Acc]). -verify(_JWS, [], _VerifyClaims) -> +verify(JWT, JWKs, VerifyClaims) -> + case do_verify(JWT, JWKs, VerifyClaims) of + {ok, Extra} -> {ok, Extra}; + {error, {missing_claim, _}} -> {error, bad_username_or_password}; + {error, invalid_signature} -> ignore; + {error, {claims, _}} -> {error, bad_username_or_password} + end. + +do_verify(_JWS, [], _VerifyClaims) -> {error, invalid_signature}; -verify(JWS, [JWK | More], VerifyClaims) -> +do_verify(JWS, [JWK | More], VerifyClaims) -> try jose_jws:verify(JWK, JWS) of {true, Payload, _JWS} -> Claims = emqx_json:decode(Payload, [return_maps]), @@ -341,7 +373,7 @@ verify(JWS, [JWK | More], VerifyClaims) -> {error, Reason} end; {false, _, _} -> - verify(JWS, More, VerifyClaims) + do_verify(JWS, More, VerifyClaims) catch _:_Reason -> ?TRACE("JWT", "authn_jwt_invalid_signature", #{jwk => JWK, jws => JWS}), diff --git a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl index a7b1d3f55..e8e5e73ae 100644 --- a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl @@ -38,9 +38,13 @@ init_per_testcase(_, Config) -> init_per_suite(Config) -> _ = application:load(emqx_conf), emqx_common_test_helpers:start_apps([emqx_authn]), + application:ensure_all_started(emqx_resource), + application:ensure_all_started(emqx_connector), Config. end_per_suite(_) -> + application:stop(emqx_connector), + application:stop(emqx_resource), emqx_common_test_helpers:stop_apps([emqx_authn]), ok. @@ -194,11 +198,11 @@ t_jwks_renewal(_Config) -> ok = emqx_authn_http_test_server:set_handler(fun jwks_handler/2), PrivateKey = test_rsa_key(private), - Payload = #{<<"username">> => <<"myuser">>}, - JWS = generate_jws('public-key', Payload, PrivateKey), - Credential = #{ + Payload0 = #{<<"username">> => <<"myuser">>}, + JWS0 = generate_jws('public-key', Payload0, PrivateKey), + Credential0 = #{ username => <<"myuser">>, - password => JWS + password => JWS0 }, BadConfig0 = #{ @@ -209,7 +213,8 @@ t_jwks_renewal(_Config) -> use_jwks => true, endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH, - refresh_interval => 1000 + refresh_interval => 1000, + pool_size => 1 }, ok = snabbkaffe:start_trace(), @@ -222,9 +227,9 @@ t_jwks_renewal(_Config) -> ok = snabbkaffe:stop(), - ?assertEqual(ignore, emqx_authn_jwt:authenticate(Credential, State0)), + ?assertEqual(ignore, emqx_authn_jwt:authenticate(Credential0, State0)), ?assertEqual( - ignore, emqx_authn_jwt:authenticate(Credential#{password => <<"badpassword">>}, State0) + ignore, emqx_authn_jwt:authenticate(Credential0#{password => <<"badpassword">>}, State0) ), ClientSSLOpts = client_ssl_opts(), @@ -246,12 +251,24 @@ t_jwks_renewal(_Config) -> ok = snabbkaffe:stop(), - ?assertEqual(ignore, emqx_authn_jwt:authenticate(Credential, State1)), + ?assertEqual(ignore, emqx_authn_jwt:authenticate(Credential0, State1)), ?assertEqual( - ignore, emqx_authn_jwt:authenticate(Credential#{password => <<"badpassword">>}, State0) + ignore, emqx_authn_jwt:authenticate(Credential0#{password => <<"badpassword">>}, State0) ), - GoodConfig = BadConfig1#{ssl => ClientSSLOpts}, + GoodConfig = BadConfig1#{ + ssl => ClientSSLOpts, + verify_claims => [{<<"foo">>, <<"${username}">>}] + }, + + Payload1 = #{<<"username">> => <<"myuser">>, <<"foo">> => <<"myuser">>}, + Payload2 = #{<<"username">> => <<"myuser">>, <<"foo">> => <<"notmyuser">>}, + JWS1 = generate_jws('public-key', Payload1, PrivateKey), + JWS2 = generate_jws('public-key', Payload2, PrivateKey), + Credential1 = #{ + username => <<"myuser">>, + password => JWS1 + }, ok = snabbkaffe:start_trace(), @@ -263,14 +280,68 @@ t_jwks_renewal(_Config) -> ok = snabbkaffe:stop(), - ?assertEqual({ok, #{is_superuser => false}}, emqx_authn_jwt:authenticate(Credential, State2)), + ?assertEqual({ok, #{is_superuser => false}}, emqx_authn_jwt:authenticate(Credential1, State2)), ?assertEqual( - ignore, emqx_authn_jwt:authenticate(Credential#{password => <<"badpassword">>}, State2) + {error, bad_username_or_password}, + emqx_authn_jwt:authenticate(Credential1#{password => JWS2}, State2) ), ?assertEqual(ok, emqx_authn_jwt:destroy(State2)), ok = emqx_authn_http_test_server:stop(). +t_jwt_authenticator_verify_claims(_) -> + Secret = <<"abcdef">>, + Config0 = #{ + mechanism => jwt, + use_jwks => false, + algorithm => 'hmac-based', + secret => Secret, + secret_base64_encoded => false, + verify_claims => [{<<"foo">>, <<"bar">>}] + }, + {ok, State0} = emqx_authn_jwt:create(?AUTHN_ID, Config0), + + Payload0 = #{<<"username">> => <<"myuser">>, <<"foo">> => <<"bar">>}, + JWS0 = generate_jws('hmac-based', Payload0, Secret), + Credential0 = #{ + username => <<"myuser">>, + password => JWS0 + }, + ?assertEqual({ok, #{is_superuser => false}}, emqx_authn_jwt:authenticate(Credential0, State0)), + + Config1 = Config0#{ + verify_claims => [{<<"foo">>, <<"${username}">>}] + }, + {ok, State1} = emqx_authn_jwt:update(Config1, State0), + + Payload1 = #{<<"username">> => <<"myuser">>}, + JWS1 = generate_jws('hmac-based', Payload1, Secret), + Credential1 = #{ + username => <<"myuser">>, + password => JWS1 + }, + ?assertEqual( + {error, bad_username_or_password}, emqx_authn_jwt:authenticate(Credential1, State1) + ), + + Payload2 = #{<<"username">> => <<"myuser">>, <<"foo">> => <<"notmyuser">>}, + JWS2 = generate_jws('hmac-based', Payload2, Secret), + Credential2 = #{ + username => <<"myuser">>, + password => JWS2 + }, + ?assertEqual( + {error, bad_username_or_password}, emqx_authn_jwt:authenticate(Credential2, State1) + ), + + Payload3 = #{<<"username">> => <<"myuser">>, <<"foo">> => <<"myuser">>}, + JWS3 = generate_jws('hmac-based', Payload3, Secret), + Credential3 = #{ + username => <<"myuser">>, + password => JWS3 + }, + ?assertEqual({ok, #{is_superuser => false}}, emqx_authn_jwt:authenticate(Credential3, State1)). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index b68fba1c1..c0c4b4ff5 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -25,7 +25,6 @@ , on_stop/2 , on_query/4 , on_health_check/2 - , on_config_merge/3 ]). %% callbacks for emqx_resource config schema @@ -85,9 +84,6 @@ on_health_check(_InstId, State = #{pid := Pid}) -> false -> {error, dead, State} end. -on_config_merge(OldConfig, NewConfig, _Params) -> - maps:merge(OldConfig, NewConfig). - spawn_dummy_process(Name, Register) -> spawn( fun() ->